Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,17 @@ case class CometScanRule(session: SparkSession)

private def transformV1Scan(plan: SparkPlan, scanExec: FileSourceScanExec): SparkPlan = {

if (COMET_DPP_FALLBACK_ENABLED.get() &&
scanExec.partitionFilters.exists(isDynamicPruningFilter)) {
return withInfo(scanExec, "Dynamic Partition Pruning is not supported")
val hasDPP = scanExec.partitionFilters.exists(isDynamicPruningFilter)
val aqeEnabled = session.sessionState.conf.adaptiveExecutionEnabled

// DPP is supported for native_datafusion scans via lazy partition serialization,
// but only in AQE mode where subqueries are properly prepared before execution.
// In non-AQE mode, DPP subqueries aren't ready when the scan tries to use them.
val scanImpl = COMET_NATIVE_SCAN_IMPL.get()
val shouldFallbackForDPP = COMET_DPP_FALLBACK_ENABLED.get() && hasDPP &&
(scanImpl != SCAN_NATIVE_DATAFUSION || !aqeEnabled)
if (shouldFallbackForDPP) {
return withInfo(scanExec, "Dynamic Partition Pruning is not supported for this scan type")
}

scanExec.relation match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,9 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging {
withInfo(scanExec, s"Full native scan disabled because ${COMET_EXEC_ENABLED.key} disabled")
}

// Native DataFusion doesn't support subqueries/dynamic pruning
if (scanExec.partitionFilters.exists(isDynamicPruningFilter)) {
withInfo(scanExec, "Native DataFusion scan does not support subqueries/dynamic pruning")
}
// DPP (Dynamic Partition Pruning) is now supported via lazy partition serialization
// in CometNativeScanExec. DPP subqueries are resolved at execution time before
// partition data is serialized, following the pattern from CometIcebergNativeScanExec.

if (SQLConf.get.ignoreCorruptFiles ||
scanExec.relation.options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import com.google.common.base.Objects
import org.apache.comet.CometConf
import org.apache.comet.parquet.{CometParquetFileFormat, CometParquetUtils}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.shims.ShimSubqueryBroadcast

/**
* Native scan operator for DataSource V1 Parquet files using DataFusion's ParquetExec.
Expand All @@ -54,6 +55,11 @@ import org.apache.comet.serde.OperatorOuterClass.Operator
* projections) is serialized once at planning time, while per-partition file lists are lazily
* serialized at execution time. This reduces memory when scanning tables with many partitions, as
* each executor task receives only its partition's file list rather than all files.
*
* Supports Dynamic Partition Pruning (DPP) by deferring partition serialization to execution
* time. The doPrepare() method waits for DPP subqueries to resolve, then lazy
* serializedPartitionData serializes the DPP-filtered partitions from
* CometScanExec.getFilePartitions().
*/
case class CometNativeScanExec(
override val nativeOp: Operator,
Expand All @@ -72,7 +78,8 @@ case class CometNativeScanExec(
sourceKey: String) // Key for PlanDataInjector to match common+partition data at runtime
extends CometLeafExec
with DataSourceScanExec
with ShimStreamSourceAwareSparkPlan {
with ShimStreamSourceAwareSparkPlan
with ShimSubqueryBroadcast {

override lazy val metadata: Map[String, String] = originalPlan.metadata

Expand All @@ -93,31 +100,98 @@ case class CometNativeScanExec(
override lazy val outputOrdering: Seq[SortOrder] = originalPlan.outputOrdering

/**
* Lazy partition serialization - deferred until execution time to reduce driver memory.
* Prepare DPP subquery plans. Called by Spark's prepare() before doExecuteColumnar().
*
* Split-mode serialization pattern:
* This follows Spark's convention of preparing subqueries in doPrepare() rather than
* doExecuteColumnar(). While the actual waiting for DPP results happens later in
* serializedPartitionData, calling prepare() here ensures subquery plans are set up before
* execution begins.
*/
override protected def doPrepare(): Unit = {
partitionFilters.foreach {
case DynamicPruningExpression(e: InSubqueryExec) =>
e.plan.prepare()
case _ =>
}
super.doPrepare()
}

/**
* Lazy partition serialization - deferred until execution time for DPP support.
*
* DPP (Dynamic Partition Pruning) Flow:
* {{{
* Planning time:
* - CometNativeScan.convert() serializes common data (schemas, filters, projections)
* - commonData embedded in nativeOp protobuf
* - File partitions NOT serialized yet
* - CometNativeScanExec created with partitionFilters containing DynamicPruningExpression
* - serializedPartitionData not evaluated (lazy)
* - No partition serialization yet
*
* Execution time:
* - doExecuteColumnar() accesses commonData and perPartitionData
* - Forces serializedPartitionData evaluation (here)
* - Each partition's file list serialized separately
* - CometExecRDD receives per-partition data and injects at runtime
* 1. Spark calls prepare() on the plan tree
* - doPrepare() calls e.plan.prepare() for each DPP filter
* - Subquery plans are set up (but not yet executed)
*
* 2. Spark calls doExecuteColumnar()
* - Accesses perPartitionData
* - Forces serializedPartitionData evaluation (here)
* - Waits for DPP values (updateResult or reflection)
* - Calls scan.getFilePartitions() with DPP-filtered partitions
* - Only matching partitions are serialized
* }}}
*
* This pattern reduces memory usage for tables with many partitions - instead of serializing
* all files for all partitions in the driver, we serialize only common metadata (once) and each
* partition's files (lazily, as tasks are scheduled).
*/
@transient private lazy val serializedPartitionData: (Array[Byte], Array[Array[Byte]]) = {
// Ensure DPP subqueries are resolved before accessing file partitions.
// This follows the pattern from CometIcebergNativeScanExec.
partitionFilters.foreach {
case DynamicPruningExpression(e: InSubqueryExec) if e.values().isEmpty =>
e.plan match {
case sab: SubqueryAdaptiveBroadcastExec =>
// SubqueryAdaptiveBroadcastExec.executeCollect() throws, so we call
// child.executeCollect() directly. We use the index from SAB to find the
// right buildKey, then locate that key's column in child.output.
val rows = sab.child.executeCollect()
val indices = getSubqueryBroadcastIndices(sab)

// SPARK-46946 changed index: Int to indices: Seq[Int] as a preparatory refactor
// for future features (Null Safe Equality DPP, multiple equality predicates).
// Currently indices always has one element.
assert(
indices.length == 1,
s"Multi-index DPP not supported: indices=$indices. See SPARK-46946.")
val buildKeyIndex = indices.head
val buildKey = sab.buildKeys(buildKeyIndex)

// Find column index in child.output by matching buildKey's exprId
val colIndex = buildKey match {
case attr: Attribute =>
sab.child.output.indexWhere(_.exprId == attr.exprId)
// DPP may cast partition column to match join key type
case Cast(attr: Attribute, _, _, _) =>
sab.child.output.indexWhere(_.exprId == attr.exprId)
case _ => buildKeyIndex
}
if (colIndex < 0) {
throw new IllegalStateException(
s"DPP build key '$buildKey' not found in ${sab.child.output.map(_.name)}")
}

setInSubqueryResult(e, rows.map(_.get(colIndex, e.child.dataType)))
case _ =>
e.updateResult()
}
case _ =>
}

// Extract common data from nativeOp
val commonBytes = nativeOp.getNativeScan.getCommon.toByteArray

// Get file partitions from CometScanExec (handles bucketing, etc.)
// Get file partitions from CometScanExec (handles bucketing, DPP filtering, etc.)
// CometScanExec.getFilePartitions() uses dynamicallySelectedPartitions which
// evaluates DPP filters against partition values.
val filePartitions = scan.getFilePartitions()

// Serialize each partition's files
Expand All @@ -135,6 +209,29 @@ case class CometNativeScanExec(
(commonBytes, perPartitionBytes)
}

/**
* Sets InSubqueryExec's private result field via reflection.
*
* Reflection is required because:
* - SubqueryAdaptiveBroadcastExec.executeCollect() throws UnsupportedOperationException
* - InSubqueryExec has no public setter for result, only updateResult() which calls
* executeCollect()
* - We can't replace e.plan since it's a val
*/
private def setInSubqueryResult(e: InSubqueryExec, result: Array[_]): Unit = {
val fields = e.getClass.getDeclaredFields
// Field name is mangled by Scala compiler, e.g. "org$apache$...$InSubqueryExec$$result"
val resultField = fields
.find(f => f.getName.endsWith("$result") && !f.getName.contains("Broadcast"))
.getOrElse {
throw new IllegalStateException(
s"Cannot find 'result' field in ${e.getClass.getName}. " +
"Spark version may be incompatible with Comet's DPP implementation.")
}
resultField.setAccessible(true)
resultField.set(e, result)
}

def commonData: Array[Byte] = serializedPartitionData._1
def perPartitionData: Array[Array[Byte]] = serializedPartitionData._2

Expand Down
Loading
Loading