Skip to content

Commit 8033aea

Browse files
authored
Merge branch 'main' into feat/add-support-to_date
2 parents 55a6893 + 5c1f131 commit 8033aea

File tree

21 files changed

+235
-221
lines changed

21 files changed

+235
-221
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 44 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,11 @@ object CometConf extends ShimCometConf {
7878

7979
val COMET_PREFIX = "spark.comet";
8080

81-
val COMET_EXEC_CONFIG_PREFIX: String = s"$COMET_PREFIX.exec";
81+
val COMET_EXEC_CONFIG_PREFIX: String = s"$COMET_PREFIX.exec"
8282

83-
val COMET_EXPR_CONFIG_PREFIX: String = s"$COMET_PREFIX.expression";
83+
val COMET_EXPR_CONFIG_PREFIX: String = s"$COMET_PREFIX.expression"
8484

85-
val COMET_OPERATOR_CONFIG_PREFIX: String = s"$COMET_PREFIX.operator";
85+
val COMET_OPERATOR_CONFIG_PREFIX: String = s"$COMET_PREFIX.operator"
8686

8787
val COMET_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.enabled")
8888
.category(CATEGORY_EXEC)
@@ -112,7 +112,7 @@ object CometConf extends ShimCometConf {
112112
"feature is highly experimental and only partially implemented. It should not " +
113113
"be used in production.")
114114
.booleanConf
115-
.createWithDefault(false)
115+
.createWithEnvVarOrDefault("ENABLE_COMET_WRITE", false)
116116

117117
// Deprecated: native_comet uses mutable buffers incompatible with Arrow FFI best practices
118118
// and does not support complex types. Use native_iceberg_compat or auto instead.
@@ -125,16 +125,14 @@ object CometConf extends ShimCometConf {
125125
val SCAN_AUTO = "auto"
126126

127127
val COMET_NATIVE_SCAN_IMPL: ConfigEntry[String] = conf("spark.comet.scan.impl")
128-
.category(CATEGORY_SCAN)
128+
.category(CATEGORY_PARQUET)
129129
.doc(
130-
"The implementation of Comet Native Scan to use. Available modes are " +
130+
"The implementation of Comet's Parquet scan to use. Available scans are " +
131131
s"`$SCAN_NATIVE_DATAFUSION`, and `$SCAN_NATIVE_ICEBERG_COMPAT`. " +
132-
s"`$SCAN_NATIVE_DATAFUSION` is a fully native implementation of scan based on " +
133-
"DataFusion. " +
134-
s"`$SCAN_NATIVE_ICEBERG_COMPAT` is the recommended native implementation that " +
135-
"exposes apis to read parquet columns natively and supports complex types. " +
136-
s"`$SCAN_AUTO` (default) chooses the best scan.")
137-
.internal()
132+
s"`$SCAN_NATIVE_DATAFUSION` is a fully native implementation, and " +
133+
s"`$SCAN_NATIVE_ICEBERG_COMPAT` is a hybrid implementation that supports some " +
134+
"additional features, such as row indexes and field ids. " +
135+
s"`$SCAN_AUTO` (default) chooses the best available scan based on the scan schema.")
138136
.stringConf
139137
.transform(_.toLowerCase(Locale.ROOT))
140138
.checkValues(Set(SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT, SCAN_AUTO))
@@ -490,13 +488,23 @@ object CometConf extends ShimCometConf {
490488
"Ensure that Comet shuffle memory overhead factor is a double greater than 0")
491489
.createWithDefault(1.0)
492490

491+
val COMET_BATCH_SIZE: ConfigEntry[Int] = conf("spark.comet.batchSize")
492+
.category(CATEGORY_TUNING)
493+
.doc("The columnar batch size, i.e., the maximum number of rows that a batch can contain.")
494+
.intConf
495+
.checkValue(v => v > 0, "Batch size must be positive")
496+
.createWithDefault(8192)
497+
493498
val COMET_COLUMNAR_SHUFFLE_BATCH_SIZE: ConfigEntry[Int] =
494499
conf("spark.comet.columnar.shuffle.batch.size")
495500
.category(CATEGORY_SHUFFLE)
496501
.doc("Batch size when writing out sorted spill files on the native side. Note that " +
497502
"this should not be larger than batch size (i.e., `spark.comet.batchSize`). Otherwise " +
498503
"it will produce larger batches than expected in the native operator after shuffle.")
499504
.intConf
505+
.checkValue(
506+
v => v <= COMET_BATCH_SIZE.get(),
507+
"Should not be larger than batch size `spark.comet.batchSize`")
500508
.createWithDefault(8192)
501509

502510
val COMET_SHUFFLE_WRITE_BUFFER_SIZE: ConfigEntry[Long] =
@@ -552,6 +560,7 @@ object CometConf extends ShimCometConf {
552560
.booleanConf
553561
.createWithDefault(false)
554562

563+
// Used on native side. Check spark_config.rs how the config is used
555564
val COMET_DEBUG_MEMORY_ENABLED: ConfigEntry[Boolean] =
556565
conf(s"$COMET_PREFIX.debug.memory")
557566
.category(CATEGORY_TESTING)
@@ -610,12 +619,6 @@ object CometConf extends ShimCometConf {
610619
.booleanConf
611620
.createWithDefault(false)
612621

613-
val COMET_BATCH_SIZE: ConfigEntry[Int] = conf("spark.comet.batchSize")
614-
.category(CATEGORY_TUNING)
615-
.doc("The columnar batch size, i.e., the maximum number of rows that a batch can contain.")
616-
.intConf
617-
.createWithDefault(8192)
618-
619622
val COMET_PARQUET_ENABLE_DIRECT_BUFFER: ConfigEntry[Boolean] =
620623
conf("spark.comet.parquet.enable.directBuffer")
621624
.category(CATEGORY_PARQUET)
@@ -795,14 +798,6 @@ object CometConf extends ShimCometConf {
795798
.booleanConf
796799
.createWithDefault(false)
797800

798-
val COMET_REGEXP_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
799-
conf("spark.comet.regexp.allowIncompatible")
800-
.category(CATEGORY_EXEC)
801-
.doc("Comet is not currently fully compatible with Spark for all regular expressions. " +
802-
s"Set this config to true to allow them anyway. $COMPAT_GUIDE.")
803-
.booleanConf
804-
.createWithDefault(false)
805-
806801
val COMET_METRICS_UPDATE_INTERVAL: ConfigEntry[Long] =
807802
conf("spark.comet.metrics.updateInterval")
808803
.category(CATEGORY_EXEC)
@@ -821,6 +816,7 @@ object CometConf extends ShimCometConf {
821816
.stringConf
822817
.createOptional
823818

819+
// Used on native side. Check spark_config.rs how the config is used
824820
val COMET_MAX_TEMP_DIRECTORY_SIZE: ConfigEntry[Long] =
825821
conf("spark.comet.maxTempDirectorySize")
826822
.category(CATEGORY_EXEC)
@@ -845,6 +841,9 @@ object CometConf extends ShimCometConf {
845841
.booleanConf
846842
.createWithEnvVarOrDefault("ENABLE_COMET_STRICT_TESTING", false)
847843

844+
val COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT: ConfigEntry[Boolean] =
845+
createOperatorIncompatConfig("DataWritingCommandExec")
846+
848847
/** Create a config to enable a specific operator */
849848
private def createExecEnabledConfig(
850849
exec: String,
@@ -860,6 +859,25 @@ object CometConf extends ShimCometConf {
860859
.createWithDefault(defaultValue)
861860
}
862861

862+
/**
863+
* Converts a config key to a valid environment variable name. Example:
864+
* "spark.comet.operator.DataWritingCommandExec.allowIncompatible" ->
865+
* "SPARK_COMET_OPERATOR_DATAWRITINGCOMMANDEXEC_ALLOWINCOMPATIBLE"
866+
*/
867+
private def configKeyToEnvVar(configKey: String): String =
868+
configKey.toUpperCase(Locale.ROOT).replace('.', '_')
869+
870+
private def createOperatorIncompatConfig(name: String): ConfigEntry[Boolean] = {
871+
val configKey = getOperatorAllowIncompatConfigKey(name)
872+
val envVar = configKeyToEnvVar(configKey)
873+
conf(configKey)
874+
.category(CATEGORY_EXEC)
875+
.doc(s"Whether to allow incompatibility for operator: $name. " +
876+
s"False by default. Can be overridden with $envVar env variable")
877+
.booleanConf
878+
.createWithEnvVarOrDefault(envVar, false)
879+
}
880+
863881
def isExprEnabled(name: String, conf: SQLConf = SQLConf.get): Boolean = {
864882
getBooleanConf(getExprEnabledConfigKey(name), defaultValue = true, conf)
865883
}

dev/diffs/3.4.3.diff

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1009,7 +1009,7 @@ index 18123a4d6ec..fbe4c766eee 100644
10091009
- regexp_extract($"a", "(\\d+)-(\\d+)", 1)),
10101010
- Row("num-num", "300", "100") :: Row("num-num", "400", "100") ::
10111011
- Row("num-num", "400-400", "100") :: Nil)
1012-
+ withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
1012+
+ withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
10131013
+ val df = Seq(
10141014
+ ("100-200", "(\\d+)-(\\d+)", "300"),
10151015
+ ("100-200", "(\\d+)-(\\d+)", "400"),

dev/diffs/3.5.8.diff

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -985,7 +985,7 @@ index fa1a64460fc..1d2e215d6a3 100644
985985
- ("100-200", "(\\d+)-(\\d+)", "300"),
986986
- ("100-200", "(\\d+)-(\\d+)", "400"),
987987
- ("100-200", "(\\d+)", "400")).toDF("a", "b", "c")
988-
+ withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
988+
+ withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
989989
+ val df = Seq(
990990
+ ("100-200", "(\\d+)-(\\d+)", "300"),
991991
+ ("100-200", "(\\d+)-(\\d+)", "400"),

dev/diffs/4.0.1.diff

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1165,7 +1165,7 @@ index 0df7f806272..52d33d67328 100644
11651165
- ("100-200", "(\\d+)-(\\d+)", "300"),
11661166
- ("100-200", "(\\d+)-(\\d+)", "400"),
11671167
- ("100-200", "(\\d+)", "400")).toDF("a", "b", "c")
1168-
+ withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
1168+
+ withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
11691169
+ val df = Seq(
11701170
+ ("100-200", "(\\d+)-(\\d+)", "300"),
11711171
+ ("100-200", "(\\d+)-(\\d+)", "400"),

docs/source/contributor-guide/ffi.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -177,9 +177,10 @@ message Scan {
177177

178178
#### When ownership is NOT transferred to native:
179179

180-
If the data originates from `native_comet` scan (deprecated, will be removed in a future release) or from
181-
`native_iceberg_compat` in some cases, then ownership is not transferred to native and the JVM may re-use the
182-
underlying buffers in the future.
180+
If the data originates from a scan that uses mutable buffers (such as Iceberg scans using the [hybrid Iceberg reader]),
181+
then ownership is not transferred to native and the JVM may re-use the underlying buffers in the future.
182+
183+
[hybrid Iceberg reader]: https://datafusion.apache.org/comet/user-guide/latest/iceberg.html#hybrid-reader
183184

184185
It is critical that the native code performs a deep copy of the arrays if the arrays are to be buffered by
185186
operators such as `SortExec` or `ShuffleWriterExec`, otherwise data corruption is likely to occur.

0 commit comments

Comments
 (0)