Skip to content

Commit a3d446c

Browse files
committed
CteWorkTable: properly apply TableProvider::scan projection argument
It was previously ignored
1 parent 70edcc1 commit a3d446c

File tree

2 files changed

+45
-17
lines changed

2 files changed

+45
-17
lines changed

datafusion/catalog/src/cte_worktable.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use std::{any::Any, borrow::Cow};
2323
use crate::Session;
2424
use arrow::datatypes::SchemaRef;
2525
use async_trait::async_trait;
26+
use datafusion_common::{assert_or_internal_err, DataFusionError};
2627
use datafusion_physical_plan::work_table::WorkTableExec;
2728

2829
use datafusion_physical_plan::ExecutionPlan;
@@ -86,15 +87,20 @@ impl TableProvider for CteWorkTable {
8687
async fn scan(
8788
&self,
8889
_state: &dyn Session,
89-
_projection: Option<&Vec<usize>>,
90-
_filters: &[Expr],
91-
_limit: Option<usize>,
90+
projection: Option<&Vec<usize>>,
91+
filters: &[Expr],
92+
limit: Option<usize>,
9293
) -> Result<Arc<dyn ExecutionPlan>> {
93-
// TODO: pushdown filters and limits
94+
assert_or_internal_err!(
95+
filters.is_empty(),
96+
"CteWorkTable does not support pushing filters"
97+
);
98+
assert_or_internal_err!(limit.is_none(), "CteWorkTable pushing limit");
9499
Ok(Arc::new(WorkTableExec::new(
95100
self.name.clone(),
96101
Arc::clone(&self.table_schema),
97-
)))
102+
projection.cloned(),
103+
)?))
98104
}
99105

100106
fn supports_filters_pushdown(

datafusion/physical-plan/src/work_table.rs

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -103,25 +103,35 @@ pub struct WorkTableExec {
103103
name: String,
104104
/// The schema of the stream
105105
schema: SchemaRef,
106+
/// Projection to apply to build the output stream from the recursion state
107+
projection: Option<Vec<usize>>,
106108
/// The work table
107109
work_table: Arc<WorkTable>,
108110
/// Execution metrics
109111
metrics: ExecutionPlanMetricsSet,
110112
/// Cache holding plan properties like equivalences, output partitioning etc.
111-
cache: PlanProperties,
113+
plan_properties: PlanProperties,
112114
}
113115

114116
impl WorkTableExec {
115117
/// Create a new execution plan for a worktable exec.
116-
pub fn new(name: String, schema: SchemaRef) -> Self {
117-
let cache = Self::compute_properties(Arc::clone(&schema));
118-
Self {
118+
pub fn new(
119+
name: String,
120+
mut schema: SchemaRef,
121+
projection: Option<Vec<usize>>,
122+
) -> Result<Self> {
123+
if let Some(projection) = &projection {
124+
schema = Arc::new(schema.project(projection)?);
125+
}
126+
let plan_properties = Self::compute_properties(Arc::clone(&schema));
127+
Ok(Self {
119128
name,
120129
schema,
130+
projection,
121131
metrics: ExecutionPlanMetricsSet::new(),
122132
work_table: Arc::new(WorkTable::new()),
123-
cache,
124-
}
133+
plan_properties,
134+
})
125135
}
126136

127137
/// Ref to name
@@ -173,7 +183,7 @@ impl ExecutionPlan for WorkTableExec {
173183
}
174184

175185
fn properties(&self) -> &PlanProperties {
176-
&self.cache
186+
&self.plan_properties
177187
}
178188

179189
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
@@ -199,11 +209,22 @@ impl ExecutionPlan for WorkTableExec {
199209
0,
200210
"WorkTableExec got an invalid partition {partition} (expected 0)"
201211
);
202-
let batch = self.work_table.take()?;
212+
let ReservedBatches {
213+
mut batches,
214+
reservation,
215+
} = self.work_table.take()?;
216+
if let Some(projection) = &self.projection {
217+
// We apply the projection
218+
// TODO: it would be better to apply it as soon as possible and not only here
219+
// TODO: an aggressive projection makes the memory reservation smaller, even if we do not edit it
220+
batches = batches
221+
.into_iter()
222+
.map(|b| b.project(projection))
223+
.collect::<Result<Vec<_>, _>>()?;
224+
}
203225

204-
let stream =
205-
MemoryStream::try_new(batch.batches, Arc::clone(&self.schema), None)?
206-
.with_reservation(batch.reservation);
226+
let stream = MemoryStream::try_new(batches, Arc::clone(&self.schema), None)?
227+
.with_reservation(reservation);
207228
Ok(Box::pin(cooperative(stream)))
208229
}
209230

@@ -236,9 +257,10 @@ impl ExecutionPlan for WorkTableExec {
236257
Some(Arc::new(Self {
237258
name: self.name.clone(),
238259
schema: Arc::clone(&self.schema),
260+
projection: self.projection.clone(),
239261
metrics: ExecutionPlanMetricsSet::new(),
240262
work_table,
241-
cache: self.cache.clone(),
263+
plan_properties: self.plan_properties.clone(),
242264
}))
243265
}
244266
}

0 commit comments

Comments
 (0)