Skip to content

Commit 727df98

Browse files
committed
Don't clone the schema in logical2physical
1 parent 1939235 commit 727df98

File tree

11 files changed

+210
-168
lines changed

11 files changed

+210
-168
lines changed

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ mod tests {
159159
let predicate = self
160160
.predicate
161161
.as_ref()
162-
.map(|p| logical2physical(p, &table_schema));
162+
.map(|p| logical2physical(p, Arc::clone(&table_schema)));
163163

164164
let mut source = ParquetSource::new(table_schema);
165165
if let Some(predicate) = predicate {

datafusion/datasource-parquet/benches/parquet_nested_filter_pushdown.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ fn parquet_nested_filter_pushdown(c: &mut Criterion) {
6565

6666
group.bench_function("no_pushdown", |b| {
6767
let file_schema = setup_reader(&dataset_path);
68-
let predicate = logical2physical(&create_predicate(), &file_schema);
68+
let predicate = logical2physical(&create_predicate(), file_schema);
6969
b.iter(|| {
7070
let matched = scan_with_predicate(&dataset_path, &predicate, false)
7171
.expect("baseline parquet scan with filter succeeded");
@@ -75,7 +75,7 @@ fn parquet_nested_filter_pushdown(c: &mut Criterion) {
7575

7676
group.bench_function("with_pushdown", |b| {
7777
let file_schema = setup_reader(&dataset_path);
78-
let predicate = logical2physical(&create_predicate(), &file_schema);
78+
let predicate = logical2physical(&create_predicate(), file_schema);
7979
b.iter(|| {
8080
let matched = scan_with_predicate(&dataset_path, &predicate, true)
8181
.expect("pushdown parquet scan with filter succeeded");

datafusion/datasource-parquet/src/opener.rs

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1391,7 +1391,7 @@ mod test {
13911391

13921392
// A filter on "a" should not exclude any rows even if it matches the data
13931393
let expr = col("a").eq(lit(1));
1394-
let predicate = logical2physical(&expr, &schema);
1394+
let predicate = logical2physical(&expr, Arc::clone(&schema));
13951395
let opener = make_opener(predicate);
13961396
let stream = opener.open(file.clone()).unwrap().await.unwrap();
13971397
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1400,7 +1400,7 @@ mod test {
14001400

14011401
// A filter on `b = 5.0` should exclude all rows
14021402
let expr = col("b").eq(lit(ScalarValue::Float32(Some(5.0))));
1403-
let predicate = logical2physical(&expr, &schema);
1403+
let predicate = logical2physical(&expr, Arc::clone(&schema));
14041404
let opener = make_opener(predicate);
14051405
let stream = opener.open(file).unwrap().await.unwrap();
14061406
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1446,7 +1446,8 @@ mod test {
14461446
let expr = col("part").eq(lit(1));
14471447
// Mark the expression as dynamic even if it's not to force partition pruning to happen
14481448
// Otherwise we assume it already happened at the planning stage and won't re-do the work here
1449-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
1449+
let predicate =
1450+
make_dynamic_expr(logical2physical(&expr, Arc::clone(&table_schema)));
14501451
let opener = make_opener(predicate);
14511452
let stream = opener.open(file.clone()).unwrap().await.unwrap();
14521453
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1457,7 +1458,7 @@ mod test {
14571458
let expr = col("part").eq(lit(2));
14581459
// Mark the expression as dynamic even if it's not to force partition pruning to happen
14591460
// Otherwise we assume it already happened at the planning stage and won't re-do the work here
1460-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
1461+
let predicate = make_dynamic_expr(logical2physical(&expr, table_schema));
14611462
let opener = make_opener(predicate);
14621463
let stream = opener.open(file).unwrap().await.unwrap();
14631464
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1513,7 +1514,7 @@ mod test {
15131514

15141515
// Filter should match the partition value and file statistics
15151516
let expr = col("part").eq(lit(1)).and(col("b").eq(lit(1.0)));
1516-
let predicate = logical2physical(&expr, &table_schema);
1517+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
15171518
let opener = make_opener(predicate);
15181519
let stream = opener.open(file.clone()).unwrap().await.unwrap();
15191520
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1522,7 +1523,7 @@ mod test {
15221523

15231524
// Should prune based on partition value but not file statistics
15241525
let expr = col("part").eq(lit(2)).and(col("b").eq(lit(1.0)));
1525-
let predicate = logical2physical(&expr, &table_schema);
1526+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
15261527
let opener = make_opener(predicate);
15271528
let stream = opener.open(file.clone()).unwrap().await.unwrap();
15281529
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1531,7 +1532,7 @@ mod test {
15311532

15321533
// Should prune based on file statistics but not partition value
15331534
let expr = col("part").eq(lit(1)).and(col("b").eq(lit(7.0)));
1534-
let predicate = logical2physical(&expr, &table_schema);
1535+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
15351536
let opener = make_opener(predicate);
15361537
let stream = opener.open(file.clone()).unwrap().await.unwrap();
15371538
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1540,7 +1541,7 @@ mod test {
15401541

15411542
// Should prune based on both partition value and file statistics
15421543
let expr = col("part").eq(lit(2)).and(col("b").eq(lit(7.0)));
1543-
let predicate = logical2physical(&expr, &table_schema);
1544+
let predicate = logical2physical(&expr, table_schema);
15441545
let opener = make_opener(predicate);
15451546
let stream = opener.open(file).unwrap().await.unwrap();
15461547
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1586,7 +1587,7 @@ mod test {
15861587

15871588
// Filter should match the partition value and data value
15881589
let expr = col("part").eq(lit(1)).or(col("a").eq(lit(1)));
1589-
let predicate = logical2physical(&expr, &table_schema);
1590+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
15901591
let opener = make_opener(predicate);
15911592
let stream = opener.open(file.clone()).unwrap().await.unwrap();
15921593
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1595,7 +1596,7 @@ mod test {
15951596

15961597
// Filter should match the partition value but not the data value
15971598
let expr = col("part").eq(lit(1)).or(col("a").eq(lit(3)));
1598-
let predicate = logical2physical(&expr, &table_schema);
1599+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
15991600
let opener = make_opener(predicate);
16001601
let stream = opener.open(file.clone()).unwrap().await.unwrap();
16011602
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1604,7 +1605,7 @@ mod test {
16041605

16051606
// Filter should not match the partition value but match the data value
16061607
let expr = col("part").eq(lit(2)).or(col("a").eq(lit(1)));
1607-
let predicate = logical2physical(&expr, &table_schema);
1608+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
16081609
let opener = make_opener(predicate);
16091610
let stream = opener.open(file.clone()).unwrap().await.unwrap();
16101611
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1613,7 +1614,7 @@ mod test {
16131614

16141615
// Filter should not match the partition value or the data value
16151616
let expr = col("part").eq(lit(2)).or(col("a").eq(lit(3)));
1616-
let predicate = logical2physical(&expr, &table_schema);
1617+
let predicate = logical2physical(&expr, table_schema);
16171618
let opener = make_opener(predicate);
16181619
let stream = opener.open(file).unwrap().await.unwrap();
16191620
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1666,7 +1667,7 @@ mod test {
16661667
// This filter could prune based on statistics, but since it's not dynamic it's not applied for pruning
16671668
// (the assumption is this happened already at planning time)
16681669
let expr = col("a").eq(lit(42));
1669-
let predicate = logical2physical(&expr, &table_schema);
1670+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
16701671
let opener = make_opener(predicate);
16711672
let stream = opener.open(file.clone()).unwrap().await.unwrap();
16721673
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1675,7 +1676,8 @@ mod test {
16751676

16761677
// If we make the filter dynamic, it should prune.
16771678
// This allows dynamic filters to prune partitions/files even if they are populated late into execution.
1678-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
1679+
let predicate =
1680+
make_dynamic_expr(logical2physical(&expr, Arc::clone(&table_schema)));
16791681
let opener = make_opener(predicate);
16801682
let stream = opener.open(file.clone()).unwrap().await.unwrap();
16811683
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1685,7 +1687,8 @@ mod test {
16851687
// If we have a filter that touches partition columns only and is dynamic, it should prune even if there are no stats.
16861688
file.statistics = Some(Arc::new(Statistics::new_unknown(&file_schema)));
16871689
let expr = col("part").eq(lit(2));
1688-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
1690+
let predicate =
1691+
make_dynamic_expr(logical2physical(&expr, Arc::clone(&table_schema)));
16891692
let opener = make_opener(predicate);
16901693
let stream = opener.open(file.clone()).unwrap().await.unwrap();
16911694
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1694,7 +1697,8 @@ mod test {
16941697

16951698
// Similarly a filter that combines partition and data columns should prune even if there are no stats.
16961699
let expr = col("part").eq(lit(2)).and(col("a").eq(lit(42)));
1697-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
1700+
let predicate =
1701+
make_dynamic_expr(logical2physical(&expr, Arc::clone(&table_schema)));
16981702
let opener = make_opener(predicate);
16991703
let stream = opener.open(file.clone()).unwrap().await.unwrap();
17001704
let (num_batches, num_rows) = count_batches_and_rows(stream).await;

datafusion/datasource-parquet/src/row_filter.rs

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -691,15 +691,13 @@ mod test {
691691

692692
let metadata = reader.metadata();
693693

694-
let table_schema =
694+
let table_schema = Arc::new(
695695
parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
696-
.expect("parsing schema");
696+
.expect("parsing schema"),
697+
);
697698

698699
let expr = col("int64_list").is_not_null();
699-
let expr = logical2physical(&expr, &table_schema);
700-
701-
let table_schema = Arc::new(table_schema.clone());
702-
700+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
703701
let list_index = table_schema
704702
.index_of("int64_list")
705703
.expect("list column should exist");
@@ -725,23 +723,23 @@ mod test {
725723

726724
// This is the schema we would like to coerce to,
727725
// which is different from the physical schema of the file.
728-
let table_schema = Schema::new(vec![Field::new(
726+
let table_schema = Arc::new(Schema::new(vec![Field::new(
729727
"timestamp_col",
730728
DataType::Timestamp(Nanosecond, Some(Arc::from("UTC"))),
731729
false,
732-
)]);
730+
)]));
733731

734732
// Test all should fail
735733
let expr = col("timestamp_col").lt(Expr::Literal(
736734
ScalarValue::TimestampNanosecond(Some(1), Some(Arc::from("UTC"))),
737735
None,
738736
));
739-
let expr = logical2physical(&expr, &table_schema);
737+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
740738
let expr = DefaultPhysicalExprAdapterFactory {}
741-
.create(Arc::new(table_schema.clone()), Arc::clone(&file_schema))
739+
.create(Arc::clone(&table_schema), Arc::clone(&file_schema))
742740
.rewrite(expr)
743741
.expect("rewriting expression");
744-
let candidate = FilterCandidateBuilder::new(expr, file_schema.clone())
742+
let candidate = FilterCandidateBuilder::new(expr, Arc::clone(&file_schema))
745743
.build(&metadata)
746744
.expect("building candidate")
747745
.expect("candidate expected");
@@ -774,10 +772,10 @@ mod test {
774772
ScalarValue::TimestampNanosecond(Some(0), Some(Arc::from("UTC"))),
775773
None,
776774
));
777-
let expr = logical2physical(&expr, &table_schema);
775+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
778776
// Rewrite the expression to add CastExpr for type coercion
779777
let expr = DefaultPhysicalExprAdapterFactory {}
780-
.create(Arc::new(table_schema), Arc::clone(&file_schema))
778+
.create(table_schema, Arc::clone(&file_schema))
781779
.rewrite(expr)
782780
.expect("rewriting expression");
783781
let candidate = FilterCandidateBuilder::new(expr, file_schema)
@@ -809,8 +807,7 @@ mod test {
809807
)]));
810808

811809
let expr = col("struct_col").is_not_null();
812-
let expr = logical2physical(&expr, &table_schema);
813-
810+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
814811
assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
815812
}
816813

@@ -836,15 +833,15 @@ mod test {
836833
let expr = col("struct_col")
837834
.is_not_null()
838835
.and(col("int_col").eq(Expr::Literal(ScalarValue::Int32(Some(5)), None)));
839-
let expr = logical2physical(&expr, &table_schema);
836+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
840837

841838
// The entire expression should not be pushed down
842839
assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
843840

844841
// However, just the int_col predicate alone should be pushable
845842
let expr_int_only =
846843
col("int_col").eq(Expr::Literal(ScalarValue::Int32(Some(5)), None));
847-
let expr_int_only = logical2physical(&expr_int_only, &table_schema);
844+
let expr_int_only = logical2physical(&expr_int_only, Arc::clone(&table_schema));
848845
assert!(can_expr_be_pushed_down_with_schemas(
849846
&expr_int_only,
850847
&table_schema
@@ -856,7 +853,7 @@ mod test {
856853
let table_schema = Arc::new(get_lists_table_schema());
857854

858855
let expr = col("utf8_list").is_not_null();
859-
let expr = logical2physical(&expr, &table_schema);
856+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
860857
check_expression_can_evaluate_against_schema(&expr, &table_schema);
861858

862859
assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
@@ -926,7 +923,7 @@ mod test {
926923
let metadata = parquet_reader_builder.metadata().clone();
927924
let file_schema = parquet_reader_builder.schema().clone();
928925

929-
let expr = logical2physical(&predicate_expr, &file_schema);
926+
let expr = logical2physical(&predicate_expr, Arc::clone(&file_schema));
930927
if expect_list_support {
931928
assert!(supports_list_predicates(&expr));
932929
}
@@ -1040,22 +1037,22 @@ mod test {
10401037

10411038
#[test]
10421039
fn basic_expr_doesnt_prevent_pushdown() {
1043-
let table_schema = get_basic_table_schema();
1040+
let table_schema = Arc::new(get_basic_table_schema());
10441041

10451042
let expr = col("string_col").is_null();
1046-
let expr = logical2physical(&expr, &table_schema);
1043+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
10471044

10481045
assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
10491046
}
10501047

10511048
#[test]
10521049
fn complex_expr_doesnt_prevent_pushdown() {
1053-
let table_schema = get_basic_table_schema();
1050+
let table_schema = Arc::new(get_basic_table_schema());
10541051

10551052
let expr = col("string_col")
10561053
.is_not_null()
10571054
.or(col("bigint_col").gt(Expr::Literal(ScalarValue::Int64(Some(5)), None)));
1058-
let expr = logical2physical(&expr, &table_schema);
1055+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
10591056

10601057
assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
10611058
}

0 commit comments

Comments
 (0)