Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/sail-delta-lake/src/datasource/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl DeltaTableProvider {
log_store: LogStoreRef,
config: DeltaScanConfig,
) -> DeltaResult<Self> {
snapshot.ensure_data_read_supported()?;
Ok(DeltaTableProvider {
schema: df_logical_schema(
snapshot.as_ref(),
Expand Down
80 changes: 74 additions & 6 deletions crates/sail-delta-lake/src/datasource/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::conversion::ScalarConverter;
use crate::datasource::{create_object_store_url, partitioned_file_from_action, DeltaScanConfig};
use crate::physical_plan::DeltaPhysicalExprAdapterFactory;
use crate::schema::arrow_field_physical_name;
use crate::spec::Add;
use crate::spec::{Add, MaxStat, MinStat};
use crate::storage::LogStoreRef;
use crate::table::DeltaSnapshot;

Expand Down Expand Up @@ -108,6 +108,14 @@ pub fn build_file_scan_config(
let mut per_file_stats: Vec<Arc<Statistics>> = Vec::new();

for action in files.iter() {
if action.deletion_vector.is_some() {
// TODO: Implement deletion-vector-aware scans by excluding masked row ids during file
// reads instead of rejecting the file at planning time.
return Err(DataFusionError::NotImplemented(
"Reading Delta tables with Deletion Vectors is not yet supported".to_string(),
));
}

let mut part =
partitioned_file_from_action(action, &partition_columns_mapped, &complete_schema)?;
let action_stats = stats_for_add(action, &file_schema, &physical_to_logical)?;
Expand Down Expand Up @@ -487,30 +495,44 @@ fn stats_for_add(

for name in name_candidates {
if min_value == Precision::Absent {
if let Some(value) = stats.min_value(name).and_then(|v| {
let min_stat = stats.get_min_stat(name);
if let Some(value) = min_stat.value().and_then(|v| {
ScalarConverter::stat_value_to_arrow_scalar_value(v, field.data_type())
.ok()
.flatten()
}) {
if !value.is_null() {
min_value = Precision::Exact(value);
min_value = match min_stat {
MinStat::Exact(_) => Precision::Exact(value),
MinStat::LowerBound(_) => Precision::Inexact(value),
MinStat::Absent => Precision::Absent,
};
}
}
}
if max_value == Precision::Absent {
if let Some(value) = stats.max_value(name).and_then(|v| {
let max_stat = stats.get_max_stat(name);
if let Some(value) = max_stat.value().and_then(|v| {
ScalarConverter::stat_value_to_arrow_scalar_value(v, field.data_type())
.ok()
.flatten()
}) {
if !value.is_null() {
max_value = Precision::Exact(value);
max_value = match max_stat {
MaxStat::Exact(_) => Precision::Exact(value),
MaxStat::UpperBound(_) => Precision::Inexact(value),
MaxStat::Absent => Precision::Absent,
};
}
}
}
if null_count == Precision::Absent {
if let Some(value) = stats.null_count_value(name) {
null_count = Precision::Exact(value.max(0) as usize);
null_count = if stats.tight_bounds {
Precision::Exact(value.max(0) as usize)
} else {
Precision::Inexact(value.max(0) as usize)
};
}
}
}
Expand Down Expand Up @@ -540,6 +562,7 @@ fn stats_for_add(

#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;

use datafusion::arrow::datatypes::{DataType, Field, Schema};
Expand All @@ -549,8 +572,10 @@ mod tests {

use super::{
add_column_statistics, rewrite_data_file_location, sanitize_statistics_for_schema,
stats_for_add,
};
use crate::conversion::ScalarConverter;
use crate::spec::Add;

#[test]
fn test_scalar_from_json_null_returns_typed_null() {
Expand Down Expand Up @@ -632,4 +657,47 @@ mod tests {
Path::from("bucket/table/part=1/part-000.parquet")
);
}

#[test]
#[expect(clippy::expect_used, clippy::unwrap_used)]
fn test_stats_for_add_marks_wide_bounds_as_inexact() {
let file_schema = Arc::new(Schema::new(vec![Field::new(
"value",
DataType::Int32,
true,
)]));
let add = Add {
path: "part-000.parquet".to_string(),
partition_values: HashMap::new(),
size: 1,
modification_time: 0,
data_change: true,
stats: Some(
r#"{"numRecords":3,"tightBounds":false,"minValues":{"value":1},"maxValues":{"value":7},"nullCount":{"value":0}}"#
.to_string(),
),
tags: None,
deletion_vector: None,
base_row_id: None,
default_row_commit_version: None,
clustering_provider: None,
commit_version: None,
commit_timestamp: None,
};

let stats = stats_for_add(&add, &file_schema, &HashMap::new())
.unwrap()
.expect("stats should be present");
let column = &stats.column_statistics[0];

assert_eq!(
column.min_value,
Precision::Inexact(ScalarValue::Int32(Some(1)))
);
assert_eq!(
column.max_value,
Precision::Inexact(ScalarValue::Int32(Some(7)))
);
assert_eq!(column.null_count, Precision::Inexact(0));
}
}
15 changes: 13 additions & 2 deletions crates/sail-delta-lake/src/delta_log/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ async fn read_checkpoint_header_from_parquet(
.map_err(DeltaTableError::generic_err)?;

let parquet_schema = builder.parquet_schema();
let mask = ProjectionMask::columns(parquet_schema, ["metaData", "protocol", "txn"]);
let mask = ProjectionMask::columns(
parquet_schema,
["metaData", "protocol", "txn", "domainMetadata"],
);

let mut batches = builder
.with_projection(mask)
Expand All @@ -41,7 +44,7 @@ async fn read_checkpoint_header_from_parquet(
let batch = batch_result.map_err(DeltaTableError::generic_err)?;
let rows: Vec<CheckpointActionRow> = decode_checkpoint_rows(&batch)?;
for row in rows {
state.apply_checkpoint_row(row);
state.apply_checkpoint_row(row)?;
}
}
Ok::<_, DeltaTableError>(state)
Expand Down Expand Up @@ -106,6 +109,12 @@ pub(crate) async fn load_replayed_table_state(
.metadata
.ok_or_else(|| DeltaTableError::generic("Cannot load table state without metadata"))?;
let txns = state.txns;
let domain_metadata = state
.domain_metadata
.into_iter()
.collect::<BTreeMap<_, _>>()
.into_values()
.collect::<Vec<_>>();
let adds = state
.adds
.into_iter()
Expand All @@ -123,6 +132,7 @@ pub(crate) async fn load_replayed_table_state(
protocol,
metadata,
txns,
domain_metadata,
adds,
removes,
commit_timestamps,
Expand Down Expand Up @@ -195,6 +205,7 @@ pub(crate) async fn load_replayed_table_header(
protocol,
metadata,
txns: Arc::new(state.txns),
domain_metadata: Arc::new(state.domain_metadata),
commit_timestamps: Arc::new(commit_timestamps),
}))
}
Expand Down
13 changes: 12 additions & 1 deletion crates/sail-delta-lake/src/delta_log/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use super::timestamps::version_uses_in_commit_timestamps;
use super::{list_delta_log_entries_from, read_last_checkpoint_version_from_store};
use crate::spec::{
checksum_path, parse_checkpoint_version, parse_checksum_version, parse_commit_version,
DeltaError, DeltaResult, Metadata, Protocol, Transaction, VersionChecksum,
DeltaError, DeltaResult, DomainMetadata, Metadata, Protocol, Transaction, VersionChecksum,
};
use crate::storage::LogStore;

Expand All @@ -20,6 +20,7 @@ pub(crate) struct ReplayedTableHeader {
pub protocol: Protocol,
pub metadata: Metadata,
pub txns: Arc<HashMap<String, Transaction>>,
pub domain_metadata: Arc<HashMap<String, DomainMetadata>>,
pub commit_timestamps: Arc<BTreeMap<i64, i64>>,
}

Expand Down Expand Up @@ -275,6 +276,12 @@ fn validate_and_build_header(
.into_iter()
.map(|txn| (txn.app_id.clone(), txn))
.collect::<HashMap<_, _>>();
let domain_metadata = checksum
.domain_metadata
.unwrap_or_default()
.into_iter()
.map(|domain| (domain.domain.clone(), domain))
.collect::<HashMap<_, _>>();
let commit_timestamps =
if version_uses_in_commit_timestamps(version, &checksum.protocol, &checksum.metadata) {
checksum
Expand All @@ -290,6 +297,7 @@ fn validate_and_build_header(
protocol: checksum.protocol,
metadata: checksum.metadata,
txns: Arc::new(txns),
domain_metadata: Arc::new(domain_metadata),
commit_timestamps: Arc::new(commit_timestamps),
})
}
Expand Down Expand Up @@ -333,6 +341,9 @@ pub(crate) async fn list_log_files(
}
}

// TODO(v2-checkpoints): This groups checkpoint candidates by version only. It does not yet
// distinguish classic vs. V2 checkpoint layouts, nor does it validate multipart completeness;
// readers rely on later replay-time handling of checkpointMetadata/sidecar fields instead.
let latest_checkpoint_version = checkpoint_candidates.iter().map(|(v, _)| *v).max();
let checkpoint = latest_checkpoint_version.map(|latest_v| {
let mut files: Vec<ObjectMeta> = checkpoint_candidates
Expand Down
Loading
Loading