Skip to content

Commit 9ff8f58

Browse files
committed
feat: hash
1 parent 1ee0a09 commit 9ff8f58

File tree

2 files changed

+180
-0
lines changed

2 files changed

+180
-0
lines changed

crates/cli/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ duckdb-bundled = ["stac-duckdb/bundled"]
1818

1919
[dependencies]
2020
anyhow.workspace = true
21+
chrono.workspace = true
2122
async-stream.workspace = true
2223
axum.workspace = true
2324
clap = { workspace = true, features = ["derive"] }

crates/cli/src/lib.rs

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,47 @@ pub enum Command {
313313
shell: clap_complete::Shell,
314314
},
315315

316+
/// Sort items by spatio-temporal hash and prefix their ids.
317+
///
318+
/// Creates a sortable Z-order curve hash from each item's datetime and
319+
/// bounding box center, then prefixes item ids with the base64-encoded
320+
/// hash and sorts items by hash value.
321+
Hash {
322+
/// The input file.
323+
///
324+
/// To read from standard input, pass `-` or don't provide an argument at all.
325+
infile: Option<String>,
326+
327+
/// The output file.
328+
///
329+
/// To write to standard output, pass `-` or don't provide an argument at all.
330+
outfile: Option<String>,
331+
332+
/// Minimum spatial cell size in degrees.
333+
#[arg(long)]
334+
spatial_extent: f64,
335+
336+
/// Minimum temporal cell size, as an ISO 8601 duration.
337+
///
338+
/// Examples: P1D (1 day), PT1H (1 hour), P30D (30 days)
339+
#[arg(long)]
340+
temporal_extent: String,
341+
342+
/// Sort items by hash value.
343+
#[arg(long, default_value_t = false)]
344+
sort: bool,
345+
346+
/// Time range for the hasher, as a '/'-separated RFC 3339 interval.
347+
///
348+
/// Examples: 2020-01-01T00:00:00Z/2025-01-01T00:00:00Z
349+
///
350+
/// If provided, the hasher is created directly from this range and
351+
/// items are streamed instead of loaded into memory (unless --sort
352+
/// is also provided).
353+
#[arg(long)]
354+
time_range: Option<String>,
355+
},
356+
316357
/// Generate a STAC collection from one or more items
317358
Collection {
318359
/// The input file.
@@ -636,6 +677,92 @@ impl Rustac {
636677
clap_complete::generate(shell, &mut command, "rustac", &mut std::io::stdout());
637678
Ok(())
638679
}
680+
Command::Hash {
681+
ref infile,
682+
ref outfile,
683+
spatial_extent,
684+
ref temporal_extent,
685+
sort,
686+
ref time_range,
687+
} => {
688+
let temporal_extent = parse_iso8601_duration(temporal_extent)?;
689+
if let Some(time_range) = time_range {
690+
let (start, end) = stac::datetime::parse(time_range)?;
691+
let start = start
692+
.ok_or_else(|| anyhow!("time range start must not be open"))?
693+
.to_utc();
694+
let end = end
695+
.ok_or_else(|| anyhow!("time range end must not be open"))?
696+
.to_utc();
697+
let hasher =
698+
stac::hash::Hasher::new(spatial_extent, temporal_extent, start..end)?;
699+
if sort {
700+
let items = self.get_item_stream(infile.as_deref()).await?;
701+
let mut hashed: Vec<(u64, Item)> = items
702+
.map(|item| {
703+
let mut item = item?;
704+
let hash = item.hash(&hasher).unwrap_or(0);
705+
item.id = format!("{hash:016x}-{}", item.id);
706+
Ok((hash, item))
707+
})
708+
.collect::<Result<Vec<_>>>()?;
709+
hashed.sort_by_key(|(hash, _)| *hash);
710+
let items: Vec<Item> = hashed.into_iter().map(|(_, item)| item).collect();
711+
self.put_item_stream(outfile.as_deref(), items.into_iter().map(Ok))
712+
.await
713+
} else {
714+
let items = self.get_item_stream(infile.as_deref()).await?;
715+
let items = items.map(move |item| {
716+
let mut item = item?;
717+
let hash = item.hash(&hasher).unwrap_or(0);
718+
item.id = format!("{hash:016x}-{}", item.id);
719+
Ok(item)
720+
});
721+
self.put_item_stream(outfile.as_deref(), items).await
722+
}
723+
} else {
724+
let value = self.get(infile.as_deref()).await?;
725+
let mut items = match value {
726+
stac::Value::ItemCollection(ic) => ic.items,
727+
stac::Value::Item(item) => vec![item],
728+
other => {
729+
return Err(anyhow!(
730+
"expected an item collection or item, got {}",
731+
other.type_name()
732+
));
733+
}
734+
};
735+
let mut collection = Collection::from_id_and_items("hash", &items);
736+
if let Some([Some(start), Some(end)]) =
737+
collection.extent.temporal.interval.first_mut()
738+
{
739+
if *start == *end {
740+
*end = *end + temporal_extent;
741+
}
742+
}
743+
let hasher = collection
744+
.hasher(spatial_extent, temporal_extent)?
745+
.ok_or_else(|| {
746+
anyhow!(
747+
"collection temporal extent does not have both start and end dates"
748+
)
749+
})?;
750+
let mut hashed: Vec<(u64, Item)> = items
751+
.drain(..)
752+
.map(|mut item| {
753+
let hash = item.hash(&hasher).unwrap_or(0);
754+
item.id = format!("{hash:016x}-{}", item.id);
755+
(hash, item)
756+
})
757+
.collect();
758+
if sort {
759+
hashed.sort_by_key(|(hash, _)| *hash);
760+
}
761+
let items: Vec<Item> = hashed.into_iter().map(|(_, item)| item).collect();
762+
self.put_item_stream(outfile.as_deref(), items.into_iter().map(Ok))
763+
.await
764+
}
765+
}
639766
Command::Collection {
640767
ref infile,
641768
ref outfile,
@@ -1018,5 +1145,57 @@ async fn crawl(value: stac::Value, store: StacStore) -> impl TryStream<Item = Re
10181145
}
10191146
}
10201147

1148+
fn parse_iso8601_duration(s: &str) -> Result<chrono::TimeDelta> {
1149+
let s = s
1150+
.strip_prefix('P')
1151+
.ok_or_else(|| anyhow!("ISO 8601 duration must start with 'P': {s}"))?;
1152+
let (date_part, time_part) = if let Some(idx) = s.find('T') {
1153+
(&s[..idx], Some(&s[idx + 1..]))
1154+
} else {
1155+
(s, None)
1156+
};
1157+
let mut total_seconds: i64 = 0;
1158+
let mut num_buf = String::new();
1159+
for ch in date_part.chars() {
1160+
if ch.is_ascii_digit() {
1161+
num_buf.push(ch);
1162+
} else {
1163+
let n: i64 = num_buf
1164+
.parse()
1165+
.map_err(|_| anyhow!("invalid number in duration: {s}"))?;
1166+
num_buf.clear();
1167+
match ch {
1168+
'Y' => total_seconds += n * 365 * 86400,
1169+
'W' => total_seconds += n * 7 * 86400,
1170+
'D' => total_seconds += n * 86400,
1171+
_ => return Err(anyhow!("unknown date duration unit '{ch}' in: P{s}")),
1172+
}
1173+
}
1174+
}
1175+
if let Some(time_part) = time_part {
1176+
for ch in time_part.chars() {
1177+
if ch.is_ascii_digit() {
1178+
num_buf.push(ch);
1179+
} else {
1180+
let n: i64 = num_buf
1181+
.parse()
1182+
.map_err(|_| anyhow!("invalid number in duration: {s}"))?;
1183+
num_buf.clear();
1184+
match ch {
1185+
'H' => total_seconds += n * 3600,
1186+
'M' => total_seconds += n * 60,
1187+
'S' => total_seconds += n,
1188+
_ => return Err(anyhow!("unknown time duration unit '{ch}' in: P{s}")),
1189+
}
1190+
}
1191+
}
1192+
}
1193+
if total_seconds == 0 {
1194+
return Err(anyhow!("duration must be positive: P{s}"));
1195+
}
1196+
chrono::TimeDelta::try_seconds(total_seconds)
1197+
.ok_or_else(|| anyhow!("duration out of range: P{s}"))
1198+
}
1199+
10211200
#[cfg(test)]
10221201
use {assert_cmd as _, rstest as _, tempfile as _};

0 commit comments

Comments
 (0)