Skip to content

Commit 634efc8

Browse files
committed
fix: push down limit needs to extract numRecords
1 parent f938c57 commit 634efc8

File tree

2 files changed

+44
-16
lines changed

2 files changed

+44
-16
lines changed

kernel/src/scan/log_replay.rs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use super::{PhysicalPredicate, ScanMetadata};
88
use crate::actions::deletion_vector::DeletionVectorDescriptor;
99
use crate::actions::get_log_add_schema;
1010
use crate::engine_data::{GetData, RowVisitor, TypedGetData as _};
11-
use crate::expressions::{column_name, ColumnName, Expression, ExpressionRef, PredicateRef};
11+
use crate::expressions::{column_name, ColumnName, Expression, ExpressionRef, PredicateRef, UnaryExpressionOp};
1212
use crate::kernel_predicates::{DefaultKernelPredicateEvaluator, KernelPredicateEvaluator as _};
1313
use crate::log_replay::{ActionsBatch, FileActionDeduplicator, FileActionKey, LogReplayProcessor};
1414
use crate::scan::Scalar;
@@ -44,6 +44,7 @@ pub(crate) struct ScanLogReplayProcessor {
4444
partition_filter: Option<PredicateRef>,
4545
data_skipping_filter: Option<DataSkippingFilter>,
4646
add_transform: Arc<dyn ExpressionEvaluator>,
47+
add_checkpoint_transform: Arc<dyn ExpressionEvaluator>,
4748
state_info: Arc<StateInfo>,
4849
/// A set of (data file path, dv_unique_id) pairs that have been seen thus
4950
/// far in the log. This is used to filter out files with Remove actions as
@@ -84,6 +85,11 @@ impl ScanLogReplayProcessor {
8485
partition_filter: physical_predicate.as_ref().map(|(e, _)| e.clone()),
8586
data_skipping_filter,
8687
add_transform: engine.evaluation_handler().new_expression_evaluator(
88+
get_log_add_schema().clone(),
89+
get_add_transform_expr(false),
90+
SCAN_ROW_DATATYPE.clone(),
91+
)?,
92+
add_checkpoint_transform: engine.evaluation_handler().new_expression_evaluator(
8793
get_log_add_schema().clone(),
8894
get_add_transform_expr(skip_stats),
8995
SCAN_ROW_DATATYPE.clone(),
@@ -317,11 +323,21 @@ pub(crate) static SCAN_ROW_SCHEMA: LazyLock<Arc<StructType>> = LazyLock::new(||
317323
pub(crate) static SCAN_ROW_DATATYPE: LazyLock<DataType> =
318324
LazyLock::new(|| SCAN_ROW_SCHEMA.clone().into());
319325

326+
static STATS_JSON_EXPR: LazyLock<ExpressionRef> = LazyLock::new(|| {
327+
Arc::new(
328+
Expression::unary(
329+
UnaryExpressionOp::ToJson,
330+
Expression::column(["add", "stats_parsed"]),
331+
)
332+
)
333+
});
334+
320335
fn get_add_transform_expr(skip_stats: bool) -> ExpressionRef {
321336
use crate::expressions::column_expr_ref;
322337

323338
let stats_expr = if skip_stats {
324-
Arc::new(Expression::Literal(Scalar::Null(DataType::STRING)))
339+
// Arc::new(Expression::Literal(Scalar::Null(DataType::STRING)))
340+
STATS_JSON_EXPR.clone()
325341
} else {
326342
column_expr_ref!("add.stats")
327343
};
@@ -387,7 +403,11 @@ impl LogReplayProcessor for ScanLogReplayProcessor {
387403
visitor.visit_rows_of(actions.as_ref())?;
388404

389405
// TODO: Teach expression eval to respect the selection vector we just computed so carefully!
390-
let result = self.add_transform.evaluate(actions.as_ref())?;
406+
let result = if is_log_batch {
407+
self.add_transform.evaluate(actions.as_ref())?
408+
} else {
409+
self.add_checkpoint_transform.evaluate(actions.as_ref())?
410+
};
391411
ScanMetadata::try_new(
392412
result,
393413
visitor.selection_vector,

kernel/src/scan/mod.rs

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,7 @@ use crate::log_replay::{ActionsBatch, HasSelectionVector};
2323
use crate::log_segment::LogSegment;
2424
use crate::scan::log_replay::BASE_ROW_ID_NAME;
2525
use crate::scan::state_info::StateInfo;
26-
use crate::schema::{
27-
ArrayType, DataType, MapType, PrimitiveType, Schema, SchemaRef, SchemaTransform, StructField,
28-
ToSchema as _,
29-
};
26+
use crate::schema::{ArrayType, DataType, MapType, PrimitiveType, Schema, SchemaRef, SchemaTransform, StructField, StructType, ToSchema as _};
3027
use crate::table_features::{ColumnMappingMode, Operation};
3128
use crate::{DeltaResult, Engine, EngineData, Error, FileMeta, SnapshotRef, Version};
3229

@@ -65,16 +62,27 @@ static CHECKPOINT_READ_SCHEMA_NO_STATS: LazyLock<SchemaRef> = LazyLock::new(|| {
6562
let add_struct = match add_field.data_type() {
6663
DataType::Struct(s) => s.as_ref(),
6764
_ => panic!("Expected add field to be a struct"),
68-
};
69-
let fields_without_stats: Vec<_> = add_struct
70-
.fields()
71-
.filter(|f: &&StructField| f.name() != "stats")
72-
.map(|f: &StructField| f.name())
73-
.collect();
74-
let add_schema_no_stats = add_struct.project(&fields_without_stats).unwrap();
75-
Arc::new(crate::schema::StructType::new_unchecked([StructField::nullable(
65+
}.clone();
66+
67+
// add stats_parsed field after stats
68+
let stats_parsed_field = StructField::new(
69+
"stats_parsed",
70+
StructType::try_new([StructField::new("numRecords", DataType::LONG, true)]).unwrap(),
71+
true,
72+
);
73+
let add_struct = add_struct.with_field_inserted_after(
74+
Some("stats"),
75+
stats_parsed_field,
76+
).unwrap();
77+
78+
// remove stats field
79+
let add_struct = add_struct.with_field_removed("stats");
80+
81+
println!("=== add_schema_no_stats: {:?}", add_struct);
82+
83+
Arc::new(StructType::new_unchecked([StructField::nullable(
7684
ADD_NAME,
77-
add_schema_no_stats,
85+
add_struct,
7886
)]))
7987
});
8088

0 commit comments

Comments
 (0)