Skip to content

Commit d56994f

Browse files
committed
Don't clone the schema in logical2physical
1 parent 1bb4229 commit d56994f

File tree

10 files changed

+209
-167
lines changed

10 files changed

+209
-167
lines changed

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ mod tests {
159159
let predicate = self
160160
.predicate
161161
.as_ref()
162-
.map(|p| logical2physical(p, &table_schema));
162+
.map(|p| logical2physical(p, Arc::clone(&table_schema)));
163163

164164
let mut source = ParquetSource::new(table_schema);
165165
if let Some(predicate) = predicate {

datafusion/datasource-parquet/benches/parquet_nested_filter_pushdown.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ fn parquet_nested_filter_pushdown(c: &mut Criterion) {
6565

6666
group.bench_function("no_pushdown", |b| {
6767
let file_schema = setup_reader(&dataset_path);
68-
let predicate = logical2physical(&create_predicate(), &file_schema);
68+
let predicate = logical2physical(&create_predicate(), file_schema);
6969
b.iter(|| {
7070
let matched = scan_with_predicate(&dataset_path, &predicate, false)
7171
.expect("baseline parquet scan with filter succeeded");
@@ -75,7 +75,7 @@ fn parquet_nested_filter_pushdown(c: &mut Criterion) {
7575

7676
group.bench_function("with_pushdown", |b| {
7777
let file_schema = setup_reader(&dataset_path);
78-
let predicate = logical2physical(&create_predicate(), &file_schema);
78+
let predicate = logical2physical(&create_predicate(), file_schema);
7979
b.iter(|| {
8080
let matched = scan_with_predicate(&dataset_path, &predicate, true)
8181
.expect("pushdown parquet scan with filter succeeded");

datafusion/datasource-parquet/src/opener.rs

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1391,7 +1391,7 @@ mod test {
13911391

13921392
// A filter on "a" should not exclude any rows even if it matches the data
13931393
let expr = col("a").eq(lit(1));
1394-
let predicate = logical2physical(&expr, &schema);
1394+
let predicate = logical2physical(&expr, Arc::clone(&schema));
13951395
let opener = make_opener(predicate);
13961396
let stream = opener.open(file.clone()).unwrap().await.unwrap();
13971397
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1400,7 +1400,7 @@ mod test {
14001400

14011401
// A filter on `b = 5.0` should exclude all rows
14021402
let expr = col("b").eq(lit(ScalarValue::Float32(Some(5.0))));
1403-
let predicate = logical2physical(&expr, &schema);
1403+
let predicate = logical2physical(&expr, Arc::clone(&schema));
14041404
let opener = make_opener(predicate);
14051405
let stream = opener.open(file).unwrap().await.unwrap();
14061406
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1446,7 +1446,8 @@ mod test {
14461446
let expr = col("part").eq(lit(1));
14471447
// Mark the expression as dynamic even if it's not to force partition pruning to happen
14481448
// Otherwise we assume it already happened at the planning stage and won't re-do the work here
1449-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
1449+
let predicate =
1450+
make_dynamic_expr(logical2physical(&expr, Arc::clone(&table_schema)));
14501451
let opener = make_opener(predicate);
14511452
let stream = opener.open(file.clone()).unwrap().await.unwrap();
14521453
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1457,7 +1458,7 @@ mod test {
14571458
let expr = col("part").eq(lit(2));
14581459
// Mark the expression as dynamic even if it's not to force partition pruning to happen
14591460
// Otherwise we assume it already happened at the planning stage and won't re-do the work here
1460-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
1461+
let predicate = make_dynamic_expr(logical2physical(&expr, table_schema));
14611462
let opener = make_opener(predicate);
14621463
let stream = opener.open(file).unwrap().await.unwrap();
14631464
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1513,7 +1514,7 @@ mod test {
15131514

15141515
// Filter should match the partition value and file statistics
15151516
let expr = col("part").eq(lit(1)).and(col("b").eq(lit(1.0)));
1516-
let predicate = logical2physical(&expr, &table_schema);
1517+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
15171518
let opener = make_opener(predicate);
15181519
let stream = opener.open(file.clone()).unwrap().await.unwrap();
15191520
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1522,7 +1523,7 @@ mod test {
15221523

15231524
// Should prune based on partition value but not file statistics
15241525
let expr = col("part").eq(lit(2)).and(col("b").eq(lit(1.0)));
1525-
let predicate = logical2physical(&expr, &table_schema);
1526+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
15261527
let opener = make_opener(predicate);
15271528
let stream = opener.open(file.clone()).unwrap().await.unwrap();
15281529
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1531,7 +1532,7 @@ mod test {
15311532

15321533
// Should prune based on file statistics but not partition value
15331534
let expr = col("part").eq(lit(1)).and(col("b").eq(lit(7.0)));
1534-
let predicate = logical2physical(&expr, &table_schema);
1535+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
15351536
let opener = make_opener(predicate);
15361537
let stream = opener.open(file.clone()).unwrap().await.unwrap();
15371538
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1540,7 +1541,7 @@ mod test {
15401541

15411542
// Should prune based on both partition value and file statistics
15421543
let expr = col("part").eq(lit(2)).and(col("b").eq(lit(7.0)));
1543-
let predicate = logical2physical(&expr, &table_schema);
1544+
let predicate = logical2physical(&expr, table_schema);
15441545
let opener = make_opener(predicate);
15451546
let stream = opener.open(file).unwrap().await.unwrap();
15461547
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1586,7 +1587,7 @@ mod test {
15861587

15871588
// Filter should match the partition value and data value
15881589
let expr = col("part").eq(lit(1)).or(col("a").eq(lit(1)));
1589-
let predicate = logical2physical(&expr, &table_schema);
1590+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
15901591
let opener = make_opener(predicate);
15911592
let stream = opener.open(file.clone()).unwrap().await.unwrap();
15921593
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1595,7 +1596,7 @@ mod test {
15951596

15961597
// Filter should match the partition value but not the data value
15971598
let expr = col("part").eq(lit(1)).or(col("a").eq(lit(3)));
1598-
let predicate = logical2physical(&expr, &table_schema);
1599+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
15991600
let opener = make_opener(predicate);
16001601
let stream = opener.open(file.clone()).unwrap().await.unwrap();
16011602
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1604,7 +1605,7 @@ mod test {
16041605

16051606
// Filter should not match the partition value but match the data value
16061607
let expr = col("part").eq(lit(2)).or(col("a").eq(lit(1)));
1607-
let predicate = logical2physical(&expr, &table_schema);
1608+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
16081609
let opener = make_opener(predicate);
16091610
let stream = opener.open(file.clone()).unwrap().await.unwrap();
16101611
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1613,7 +1614,7 @@ mod test {
16131614

16141615
// Filter should not match the partition value or the data value
16151616
let expr = col("part").eq(lit(2)).or(col("a").eq(lit(3)));
1616-
let predicate = logical2physical(&expr, &table_schema);
1617+
let predicate = logical2physical(&expr, table_schema);
16171618
let opener = make_opener(predicate);
16181619
let stream = opener.open(file).unwrap().await.unwrap();
16191620
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1666,7 +1667,7 @@ mod test {
16661667
// This filter could prune based on statistics, but since it's not dynamic it's not applied for pruning
16671668
// (the assumption is this happened already at planning time)
16681669
let expr = col("a").eq(lit(42));
1669-
let predicate = logical2physical(&expr, &table_schema);
1670+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
16701671
let opener = make_opener(predicate);
16711672
let stream = opener.open(file.clone()).unwrap().await.unwrap();
16721673
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1675,7 +1676,8 @@ mod test {
16751676

16761677
// If we make the filter dynamic, it should prune.
16771678
// This allows dynamic filters to prune partitions/files even if they are populated late into execution.
1678-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
1679+
let predicate =
1680+
make_dynamic_expr(logical2physical(&expr, Arc::clone(&table_schema)));
16791681
let opener = make_opener(predicate);
16801682
let stream = opener.open(file.clone()).unwrap().await.unwrap();
16811683
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1685,7 +1687,8 @@ mod test {
16851687
// If we have a filter that touches partition columns only and is dynamic, it should prune even if there are no stats.
16861688
file.statistics = Some(Arc::new(Statistics::new_unknown(&file_schema)));
16871689
let expr = col("part").eq(lit(2));
1688-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
1690+
let predicate =
1691+
make_dynamic_expr(logical2physical(&expr, Arc::clone(&table_schema)));
16891692
let opener = make_opener(predicate);
16901693
let stream = opener.open(file.clone()).unwrap().await.unwrap();
16911694
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1694,7 +1697,8 @@ mod test {
16941697

16951698
// Similarly a filter that combines partition and data columns should prune even if there are no stats.
16961699
let expr = col("part").eq(lit(2)).and(col("a").eq(lit(42)));
1697-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
1700+
let predicate =
1701+
make_dynamic_expr(logical2physical(&expr, Arc::clone(&table_schema)));
16981702
let opener = make_opener(predicate);
16991703
let stream = opener.open(file.clone()).unwrap().await.unwrap();
17001704
let (num_batches, num_rows) = count_batches_and_rows(stream).await;

datafusion/datasource-parquet/src/row_filter.rs

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -674,15 +674,13 @@ mod test {
674674

675675
let metadata = reader.metadata();
676676

677-
let table_schema =
677+
let table_schema = Arc::new(
678678
parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
679-
.expect("parsing schema");
679+
.expect("parsing schema"),
680+
);
680681

681682
let expr = col("int64_list").is_not_null();
682-
let expr = logical2physical(&expr, &table_schema);
683-
684-
let table_schema = Arc::new(table_schema.clone());
685-
683+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
686684
let list_index = table_schema
687685
.index_of("int64_list")
688686
.expect("list column should exist");
@@ -708,23 +706,23 @@ mod test {
708706

709707
// This is the schema we would like to coerce to,
710708
// which is different from the physical schema of the file.
711-
let table_schema = Schema::new(vec![Field::new(
709+
let table_schema = Arc::new(Schema::new(vec![Field::new(
712710
"timestamp_col",
713711
DataType::Timestamp(Nanosecond, Some(Arc::from("UTC"))),
714712
false,
715-
)]);
713+
)]));
716714

717715
// Test all should fail
718716
let expr = col("timestamp_col").lt(Expr::Literal(
719717
ScalarValue::TimestampNanosecond(Some(1), Some(Arc::from("UTC"))),
720718
None,
721719
));
722-
let expr = logical2physical(&expr, &table_schema);
720+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
723721
let expr = DefaultPhysicalExprAdapterFactory {}
724-
.create(Arc::new(table_schema.clone()), Arc::clone(&file_schema))
722+
.create(Arc::clone(&table_schema), Arc::clone(&file_schema))
725723
.rewrite(expr)
726724
.expect("rewriting expression");
727-
let candidate = FilterCandidateBuilder::new(expr, file_schema.clone())
725+
let candidate = FilterCandidateBuilder::new(expr, Arc::clone(&file_schema))
728726
.build(&metadata)
729727
.expect("building candidate")
730728
.expect("candidate expected");
@@ -757,10 +755,10 @@ mod test {
757755
ScalarValue::TimestampNanosecond(Some(0), Some(Arc::from("UTC"))),
758756
None,
759757
));
760-
let expr = logical2physical(&expr, &table_schema);
758+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
761759
// Rewrite the expression to add CastExpr for type coercion
762760
let expr = DefaultPhysicalExprAdapterFactory {}
763-
.create(Arc::new(table_schema), Arc::clone(&file_schema))
761+
.create(table_schema, Arc::clone(&file_schema))
764762
.rewrite(expr)
765763
.expect("rewriting expression");
766764
let candidate = FilterCandidateBuilder::new(expr, file_schema)
@@ -792,8 +790,7 @@ mod test {
792790
)]));
793791

794792
let expr = col("struct_col").is_not_null();
795-
let expr = logical2physical(&expr, &table_schema);
796-
793+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
797794
assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
798795
}
799796

@@ -819,15 +816,15 @@ mod test {
819816
let expr = col("struct_col")
820817
.is_not_null()
821818
.and(col("int_col").eq(Expr::Literal(ScalarValue::Int32(Some(5)), None)));
822-
let expr = logical2physical(&expr, &table_schema);
819+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
823820

824821
// The entire expression should not be pushed down
825822
assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
826823

827824
// However, just the int_col predicate alone should be pushable
828825
let expr_int_only =
829826
col("int_col").eq(Expr::Literal(ScalarValue::Int32(Some(5)), None));
830-
let expr_int_only = logical2physical(&expr_int_only, &table_schema);
827+
let expr_int_only = logical2physical(&expr_int_only, Arc::clone(&table_schema));
831828
assert!(can_expr_be_pushed_down_with_schemas(
832829
&expr_int_only,
833830
&table_schema
@@ -839,7 +836,7 @@ mod test {
839836
let table_schema = Arc::new(get_lists_table_schema());
840837

841838
let expr = col("utf8_list").is_not_null();
842-
let expr = logical2physical(&expr, &table_schema);
839+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
843840
check_expression_can_evaluate_against_schema(&expr, &table_schema);
844841

845842
assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
@@ -909,7 +906,7 @@ mod test {
909906
let metadata = parquet_reader_builder.metadata().clone();
910907
let file_schema = parquet_reader_builder.schema().clone();
911908

912-
let expr = logical2physical(&predicate_expr, &file_schema);
909+
let expr = logical2physical(&predicate_expr, Arc::clone(&file_schema));
913910
if expect_list_support {
914911
assert!(supports_list_predicates(&expr));
915912
}
@@ -1023,22 +1020,22 @@ mod test {
10231020

10241021
#[test]
10251022
fn basic_expr_doesnt_prevent_pushdown() {
1026-
let table_schema = get_basic_table_schema();
1023+
let table_schema = Arc::new(get_basic_table_schema());
10271024

10281025
let expr = col("string_col").is_null();
1029-
let expr = logical2physical(&expr, &table_schema);
1026+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
10301027

10311028
assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
10321029
}
10331030

10341031
#[test]
10351032
fn complex_expr_doesnt_prevent_pushdown() {
1036-
let table_schema = get_basic_table_schema();
1033+
let table_schema = Arc::new(get_basic_table_schema());
10371034

10381035
let expr = col("string_col")
10391036
.is_not_null()
10401037
.or(col("bigint_col").gt(Expr::Literal(ScalarValue::Int64(Some(5)), None)));
1041-
let expr = logical2physical(&expr, &table_schema);
1038+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
10421039

10431040
assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
10441041
}

0 commit comments

Comments
 (0)