From 58996d6c8e5de3850ca228d8488c9335948cc814 Mon Sep 17 00:00:00 2001 From: philo Date: Fri, 19 Aug 2022 15:42:06 +0800 Subject: [PATCH 1/8] Initial commit --- .../datasources/v2/arrow/ArrowScan.scala | 73 ++++++++++++++++++- .../com/intel/oap/sql/shims/SparkShims.scala | 4 +- .../sql/shims/spark311/Spark311Shims.scala | 5 +- .../org/apache/spark/util/ShimUtils.scala | 6 +- .../sql/shims/spark321/Spark321Shims.scala | 5 +- 5 files changed, 87 insertions(+), 6 deletions(-) diff --git a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala index 20e069e01..affc0d993 100644 --- a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala +++ b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala @@ -16,14 +16,20 @@ */ package com.intel.oap.spark.sql.execution.datasources.v2.arrow +import com.intel.oap.sql.shims.SparkShimLoader + +import java.util.Locale + import scala.collection.JavaConverters._ import org.apache.hadoop.fs.Path -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.connector.read.PartitionReaderFactory -import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex +import org.apache.spark.sql.execution.PartitionedFileUtil +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitioningAwareFileIndex} import org.apache.spark.sql.execution.datasources.v2.FileScan import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType @@ -41,6 +47,9 @@ case class ArrowScan( dataFilters: Seq[Expression] = Seq.empty) extends FileScan { + // Use the default value for org.apache.spark.internal.config.IO_WARNING_LARGEFILETHRESHOLD. + val IO_WARNING_LARGEFILETHRESHOLD: Long = 1024 * 1024 * 1024 + override def isSplitable(path: Path): Boolean = { ArrowUtils.isOriginalFormatSplitable( new ArrowOptions(new CaseInsensitiveStringMap(options).asScala.toMap)) @@ -63,4 +72,64 @@ case class ArrowScan( override def withFilters(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): FileScan = this.copy(partitionFilters = partitionFilters, dataFilters = dataFilters) + + // compute maxSplitBytes + + override def partitions: Seq[FilePartition] = { + val selectedPartitions = fileIndex.listFiles(partitionFilters, dataFilters) + val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions) + // val partitionAttributes = fileIndex.partitionSchema.toAttributes + val partitionAttributes = SparkShimLoader.getSparkShims.toAttributes(fileIndex) + val attributeMap = partitionAttributes.map(a => normalizeName(a.name) -> a).toMap + val readPartitionAttributes = readPartitionSchema.map { readField => + attributeMap.get(normalizeName(readField.name)).getOrElse { + // throw QueryCompilationErrors.cannotFindPartitionColumnInPartitionSchemaError( + // readField, fileIndex.partitionSchema) + throw new AnalysisException(s"Can't find required partition column ${readField.name} " + + s"in partition schema $fileIndex.partitionSchema") + } + } + lazy val partitionValueProject = + GenerateUnsafeProjection.generate(readPartitionAttributes, partitionAttributes) + val splitFiles = selectedPartitions.flatMap { partition => + // Prune partition values if part of the partition columns are not required. + val partitionValues = if (readPartitionAttributes != partitionAttributes) { + partitionValueProject(partition.values).copy() + } else { + partition.values + } + partition.files.flatMap { file => + val filePath = file.getPath + PartitionedFileUtil.splitFiles( + sparkSession = sparkSession, + file = file, + filePath = filePath, + isSplitable = isSplitable(filePath), + maxSplitBytes = maxSplitBytes, + partitionValues = partitionValues + ) + }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse) + } + + if (splitFiles.length == 1) { + val path = new Path(splitFiles(0).filePath) + if (!isSplitable(path) && splitFiles(0).length > + IO_WARNING_LARGEFILETHRESHOLD) { + logWarning(s"Loading one large unsplittable file ${path.toString} with only one " + + s"partition, the reason is: ${getFileUnSplittableReason(path)}") + } + } + + FilePartition.getFilePartitions(sparkSession, splitFiles, maxSplitBytes) + } + + private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis + + private def normalizeName(name: String): String = { + if (isCaseSensitive) { + name + } else { + name.toLowerCase(Locale.ROOT) + } + } } diff --git a/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala b/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala index 61bd49b57..a9b63a6c0 100644 --- a/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala +++ b/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan} import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec -import org.apache.spark.sql.execution.datasources.OutputWriter +import org.apache.spark.sql.execution.datasources.{OutputWriter, PartitioningAwareFileIndex} import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, ParquetOptions, ParquetReadSupport, VectorizedParquetRecordReader} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleOrigin} @@ -121,4 +121,6 @@ trait SparkShims { def getEndMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int def getNumReducersOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int + + def toAttributes(fileIndex: PartitioningAwareFileIndex): Seq[AttributeReference] } diff --git a/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala b/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala index 46912bb70..8989c95ce 100644 --- a/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala +++ b/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala @@ -27,7 +27,6 @@ import org.apache.spark.SparkConf import org.apache.spark.TaskContext import org.apache.spark.shuffle.MigratableResolver import org.apache.spark.shuffle.ShuffleHandle -import org.apache.spark.util.ShimUtils import org.apache.spark.shuffle.api.ShuffleExecutorComponents import org.apache.spark.shuffle.sort.SortShuffleWriter import org.apache.spark.sql.SQLContext @@ -43,6 +42,7 @@ import org.apache.spark.sql.execution.datasources.v2.arrow.SparkVectorUtils import org.apache.spark.sql.execution.datasources.{DataSourceUtils, OutputWriter} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, REPARTITION, ReusedExchangeExec, ShuffleExchangeExec, ShuffleOrigin} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.util.ShimUtils class Spark311Shims extends SparkShims { @@ -205,4 +205,7 @@ class Spark311Shims extends SparkShims { throw new RuntimeException("This method should not be invoked in spark 3.1.") } + override def toAttributes(fileIndex: PartitioningAwareFileIndex): Seq[AttributeReference] = { + ShimUtils.toAttributes(fileIndex) + } } \ No newline at end of file diff --git a/shims/spark311/src/main/scala/org/apache/spark/util/ShimUtils.scala b/shims/spark311/src/main/scala/org/apache/spark/util/ShimUtils.scala index a9f79b97f..17ad16802 100644 --- a/shims/spark311/src/main/scala/org/apache/spark/util/ShimUtils.scala +++ b/shims/spark311/src/main/scala/org/apache/spark/util/ShimUtils.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.apache.spark.util +package org.apache.spark.sql.util import java.io.File @@ -70,4 +70,8 @@ object ShimUtils { targetFileName: String, sparkConf: SparkConf): Unit = { Utils.doFetchFile(urlString, targetDirHandler, targetFileName, sparkConf, null, null) } + + def toAttributes(fileIndex: PartitioningAwareFileIndex): Seq[AttributeReference] = { + fileIndex.partitionSchema.toAttributes + } } \ No newline at end of file diff --git a/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala b/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala index 34f5bd03c..ce2377ef4 100644 --- a/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala +++ b/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala @@ -28,7 +28,6 @@ import org.apache.spark.TaskContext import org.apache.spark.shuffle.MigratableResolver import org.apache.spark.shuffle.ShuffleHandle import org.apache.spark.unsafe.map.BytesToBytesMap -import org.apache.spark.util.ShimUtils import org.apache.spark.shuffle.api.ShuffleExecutorComponents import org.apache.spark.shuffle.sort.SortShuffleWriter import org.apache.spark.sql.SQLContext @@ -47,6 +46,7 @@ import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, REPARTITI import org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.LongType +import org.apache.spark.sql.util.ShimUtils class Spark321Shims extends SparkShims { @@ -235,4 +235,7 @@ class Spark321Shims extends SparkShims { } } + override def toAttributes(fileIndex: PartitioningAwareFileIndex): Seq[AttributeReference] = { + ShimUtils.toAttributes(fileIndex) + } } \ No newline at end of file From c5f51e9359d751f814cdfe7681e52d3170f6ee21 Mon Sep 17 00:00:00 2001 From: philo Date: Fri, 19 Aug 2022 16:39:43 +0800 Subject: [PATCH 2/8] Refine the code --- .../datasources/v2/arrow/ScanUtils.scala | 29 +++++++++++++++++++ .../datasources/v2/arrow/ArrowScan.scala | 11 +++---- .../com/intel/oap/sql/shims/SparkShims.scala | 3 +- .../sql/shims/spark311/Spark311Shims.scala | 8 ++--- .../org/apache/spark/util/ShimUtils.scala | 15 +--------- .../sql/shims/spark321/Spark321Shims.scala | 8 ++--- 6 files changed, 39 insertions(+), 35 deletions(-) create mode 100644 arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/ScanUtils.scala diff --git a/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/ScanUtils.scala b/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/ScanUtils.scala new file mode 100644 index 000000000..359e280c1 --- /dev/null +++ b/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/ScanUtils.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2.arrow + +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex + + +object ScanUtils { + + def toAttributes(fileIndex: PartitioningAwareFileIndex): Seq[AttributeReference] = { + fileIndex.partitionSchema.toAttributes + } +} \ No newline at end of file diff --git a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala index affc0d993..01459d7a5 100644 --- a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala +++ b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala @@ -16,14 +16,10 @@ */ package com.intel.oap.spark.sql.execution.datasources.v2.arrow -import com.intel.oap.sql.shims.SparkShimLoader - import java.util.Locale import scala.collection.JavaConverters._ - import org.apache.hadoop.fs.Path - import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection @@ -31,6 +27,7 @@ import org.apache.spark.sql.connector.read.PartitionReaderFactory import org.apache.spark.sql.execution.PartitionedFileUtil import org.apache.spark.sql.execution.datasources.{FilePartition, PartitioningAwareFileIndex} import org.apache.spark.sql.execution.datasources.v2.FileScan +import org.apache.spark.sql.execution.datasources.v2.arrow.ScanUtils import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -79,14 +76,14 @@ case class ArrowScan( val selectedPartitions = fileIndex.listFiles(partitionFilters, dataFilters) val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions) // val partitionAttributes = fileIndex.partitionSchema.toAttributes - val partitionAttributes = SparkShimLoader.getSparkShims.toAttributes(fileIndex) + val partitionAttributes = ScanUtils.toAttributes(fileIndex) val attributeMap = partitionAttributes.map(a => normalizeName(a.name) -> a).toMap val readPartitionAttributes = readPartitionSchema.map { readField => attributeMap.get(normalizeName(readField.name)).getOrElse { // throw QueryCompilationErrors.cannotFindPartitionColumnInPartitionSchemaError( // readField, fileIndex.partitionSchema) - throw new AnalysisException(s"Can't find required partition column ${readField.name} " + - s"in partition schema $fileIndex.partitionSchema") + throw new RuntimeException(s"Can't find required partition column ${readField.name} " + + s"in partition schema ${fileIndex.partitionSchema}") } } lazy val partitionValueProject = diff --git a/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala b/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala index a9b63a6c0..64144ed13 100644 --- a/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala +++ b/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan} import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec -import org.apache.spark.sql.execution.datasources.{OutputWriter, PartitioningAwareFileIndex} +import org.apache.spark.sql.execution.datasources.OutputWriter import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, ParquetOptions, ParquetReadSupport, VectorizedParquetRecordReader} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleOrigin} @@ -122,5 +122,4 @@ trait SparkShims { def getNumReducersOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int - def toAttributes(fileIndex: PartitioningAwareFileIndex): Seq[AttributeReference] } diff --git a/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala b/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala index 8989c95ce..9eadacf53 100644 --- a/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala +++ b/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala @@ -29,8 +29,7 @@ import org.apache.spark.shuffle.MigratableResolver import org.apache.spark.shuffle.ShuffleHandle import org.apache.spark.shuffle.api.ShuffleExecutorComponents import org.apache.spark.shuffle.sort.SortShuffleWriter -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{SQLContext, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} @@ -42,7 +41,7 @@ import org.apache.spark.sql.execution.datasources.v2.arrow.SparkVectorUtils import org.apache.spark.sql.execution.datasources.{DataSourceUtils, OutputWriter} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, REPARTITION, ReusedExchangeExec, ShuffleExchangeExec, ShuffleOrigin} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.util.ShimUtils +import org.apache.spark.util.ShimUtils class Spark311Shims extends SparkShims { @@ -205,7 +204,4 @@ class Spark311Shims extends SparkShims { throw new RuntimeException("This method should not be invoked in spark 3.1.") } - override def toAttributes(fileIndex: PartitioningAwareFileIndex): Seq[AttributeReference] = { - ShimUtils.toAttributes(fileIndex) - } } \ No newline at end of file diff --git a/shims/spark311/src/main/scala/org/apache/spark/util/ShimUtils.scala b/shims/spark311/src/main/scala/org/apache/spark/util/ShimUtils.scala index 17ad16802..4ce0185d8 100644 --- a/shims/spark311/src/main/scala/org/apache/spark/util/ShimUtils.scala +++ b/shims/spark311/src/main/scala/org/apache/spark/util/ShimUtils.scala @@ -14,19 +14,10 @@ * limitations under the License. */ -package org.apache.spark.sql.util +package org.apache.spark.util import java.io.File -import org.apache.spark.SparkConf -import org.apache.spark.TaskContext -import org.apache.spark.shuffle.BaseShuffleHandle -import org.apache.spark.shuffle.IndexShuffleBlockResolver -import org.apache.spark.shuffle.MigratableResolver -import org.apache.spark.shuffle.ShuffleHandle -import org.apache.spark.shuffle.api.ShuffleExecutorComponents -import org.apache.spark.shuffle.sort.SortShuffleWriter - object ShimUtils { /** @@ -70,8 +61,4 @@ object ShimUtils { targetFileName: String, sparkConf: SparkConf): Unit = { Utils.doFetchFile(urlString, targetDirHandler, targetFileName, sparkConf, null, null) } - - def toAttributes(fileIndex: PartitioningAwareFileIndex): Seq[AttributeReference] = { - fileIndex.partitionSchema.toAttributes - } } \ No newline at end of file diff --git a/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala b/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala index ce2377ef4..504493d2a 100644 --- a/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala +++ b/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala @@ -30,8 +30,7 @@ import org.apache.spark.shuffle.ShuffleHandle import org.apache.spark.unsafe.map.BytesToBytesMap import org.apache.spark.shuffle.api.ShuffleExecutorComponents import org.apache.spark.shuffle.sort.SortShuffleWriter -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{SQLContext, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} @@ -46,7 +45,7 @@ import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, REPARTITI import org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.LongType -import org.apache.spark.sql.util.ShimUtils +import org.apache.spark.util.ShimUtils class Spark321Shims extends SparkShims { @@ -235,7 +234,4 @@ class Spark321Shims extends SparkShims { } } - override def toAttributes(fileIndex: PartitioningAwareFileIndex): Seq[AttributeReference] = { - ShimUtils.toAttributes(fileIndex) - } } \ No newline at end of file From 758246f541741302f01c5a186ce1bc28baceefba Mon Sep 17 00:00:00 2001 From: philo Date: Fri, 19 Aug 2022 16:47:24 +0800 Subject: [PATCH 3/8] Remove some changes --- .../main/scala/com/intel/oap/sql/shims/SparkShims.scala | 1 - .../com/intel/oap/sql/shims/spark311/Spark311Shims.scala | 5 +++-- .../src/main/scala/org/apache/spark/util/ShimUtils.scala | 9 +++++++++ .../com/intel/oap/sql/shims/spark321/Spark321Shims.scala | 5 +++-- 4 files changed, 15 insertions(+), 5 deletions(-) diff --git a/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala b/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala index 64144ed13..61bd49b57 100644 --- a/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala +++ b/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala @@ -121,5 +121,4 @@ trait SparkShims { def getEndMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int def getNumReducersOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int - } diff --git a/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala b/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala index 9eadacf53..46912bb70 100644 --- a/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala +++ b/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala @@ -27,9 +27,11 @@ import org.apache.spark.SparkConf import org.apache.spark.TaskContext import org.apache.spark.shuffle.MigratableResolver import org.apache.spark.shuffle.ShuffleHandle +import org.apache.spark.util.ShimUtils import org.apache.spark.shuffle.api.ShuffleExecutorComponents import org.apache.spark.shuffle.sort.SortShuffleWriter -import org.apache.spark.sql.{SQLContext, SparkSession} +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} @@ -41,7 +43,6 @@ import org.apache.spark.sql.execution.datasources.v2.arrow.SparkVectorUtils import org.apache.spark.sql.execution.datasources.{DataSourceUtils, OutputWriter} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, REPARTITION, ReusedExchangeExec, ShuffleExchangeExec, ShuffleOrigin} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.util.ShimUtils class Spark311Shims extends SparkShims { diff --git a/shims/spark311/src/main/scala/org/apache/spark/util/ShimUtils.scala b/shims/spark311/src/main/scala/org/apache/spark/util/ShimUtils.scala index 4ce0185d8..a9f79b97f 100644 --- a/shims/spark311/src/main/scala/org/apache/spark/util/ShimUtils.scala +++ b/shims/spark311/src/main/scala/org/apache/spark/util/ShimUtils.scala @@ -18,6 +18,15 @@ package org.apache.spark.util import java.io.File +import org.apache.spark.SparkConf +import org.apache.spark.TaskContext +import org.apache.spark.shuffle.BaseShuffleHandle +import org.apache.spark.shuffle.IndexShuffleBlockResolver +import org.apache.spark.shuffle.MigratableResolver +import org.apache.spark.shuffle.ShuffleHandle +import org.apache.spark.shuffle.api.ShuffleExecutorComponents +import org.apache.spark.shuffle.sort.SortShuffleWriter + object ShimUtils { /** diff --git a/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala b/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala index 504493d2a..34f5bd03c 100644 --- a/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala +++ b/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala @@ -28,9 +28,11 @@ import org.apache.spark.TaskContext import org.apache.spark.shuffle.MigratableResolver import org.apache.spark.shuffle.ShuffleHandle import org.apache.spark.unsafe.map.BytesToBytesMap +import org.apache.spark.util.ShimUtils import org.apache.spark.shuffle.api.ShuffleExecutorComponents import org.apache.spark.shuffle.sort.SortShuffleWriter -import org.apache.spark.sql.{SQLContext, SparkSession} +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} @@ -45,7 +47,6 @@ import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, REPARTITI import org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.LongType -import org.apache.spark.util.ShimUtils class Spark321Shims extends SparkShims { From 633a548c6b18e105f19af093a06b4d40c3c17f01 Mon Sep 17 00:00:00 2001 From: philo Date: Fri, 19 Aug 2022 18:29:42 +0800 Subject: [PATCH 4/8] Make required config accessible --- .../datasources/v2/arrow/ArrowScan.scala | 22 ++++++++++++++- .../com/intel/oap/sql/shims/SparkShims.scala | 6 ++-- .../sql/shims/spark311/Spark311Shims.scala | 4 +++ .../sql/shims/spark321/Spark321Shims.scala | 4 +++ .../org/apache/spark/sql/util/ShimUtils.scala | 28 +++++++++++++++++++ 5 files changed, 61 insertions(+), 3 deletions(-) create mode 100644 shims/spark321/src/main/scala/org/apache/spark/sql/util/ShimUtils.scala diff --git a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala index 01459d7a5..f5d8fc0a5 100644 --- a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala +++ b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala @@ -16,6 +16,8 @@ */ package com.intel.oap.spark.sql.execution.datasources.v2.arrow +import com.intel.oap.sql.shims.SparkShimLoader + import java.util.Locale import scala.collection.JavaConverters._ @@ -25,9 +27,10 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.connector.read.PartitionReaderFactory import org.apache.spark.sql.execution.PartitionedFileUtil -import org.apache.spark.sql.execution.datasources.{FilePartition, PartitioningAwareFileIndex} +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionDirectory, PartitioningAwareFileIndex} import org.apache.spark.sql.execution.datasources.v2.FileScan import org.apache.spark.sql.execution.datasources.v2.arrow.ScanUtils +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -71,9 +74,26 @@ case class ArrowScan( this.copy(partitionFilters = partitionFilters, dataFilters = dataFilters) // compute maxSplitBytes + def maxSplitBytes( + sparkSession: SparkSession, + selectedPartitions: Seq[PartitionDirectory]): Long = { + val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes + val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes + // val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum + // .getOrElse(sparkSession.leafNodeDefaultParallelism) + val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum + .getOrElse(SparkShimLoader.getSparkShims.leafNodeDefaultParallelism(sparkSession)) + val PREFERRED_PARTITION_SIZE_LOWER_BOUND: Long = 128 * 1024 * 1024 + val PREFERRED_PARTITION_SIZE_UPPER_BOUND: Long = 512 * 1024 * 1024 + val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum + val bytesPerCore = totalBytes / minPartitionNum + + Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + } override def partitions: Seq[FilePartition] = { val selectedPartitions = fileIndex.listFiles(partitionFilters, dataFilters) + // val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions) val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions) // val partitionAttributes = fileIndex.partitionSchema.toAttributes val partitionAttributes = ScanUtils.toAttributes(fileIndex) diff --git a/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala b/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala index 61bd49b57..9a4ccfc2b 100644 --- a/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala +++ b/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala @@ -28,8 +28,7 @@ import org.apache.spark.shuffle.MigratableResolver import org.apache.spark.shuffle.ShuffleHandle import org.apache.spark.shuffle.api.ShuffleExecutorComponents import org.apache.spark.shuffle.sort.SortShuffleWriter -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{SQLContext, SparkSession} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan} @@ -121,4 +120,7 @@ trait SparkShims { def getEndMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int def getNumReducersOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int + + def leafNodeDefaultParallelism(sparkSession: SparkSession): Int + } diff --git a/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala b/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala index 46912bb70..43f7d99d5 100644 --- a/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala +++ b/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala @@ -205,4 +205,8 @@ class Spark311Shims extends SparkShims { throw new RuntimeException("This method should not be invoked in spark 3.1.") } + override def leafNodeDefaultParallelism(sparkSession: SparkSession): Int = { + sparkSession.sparkContext.defaultParallelism + } + } \ No newline at end of file diff --git a/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala b/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala index 34f5bd03c..4c9ca9273 100644 --- a/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala +++ b/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala @@ -235,4 +235,8 @@ class Spark321Shims extends SparkShims { } } + override def leafNodeDefaultParallelism(sparkSession: SparkSession): Int = { + org.apache.spark.sql.util.ShimUtils.leafNodeDefaultParallelism(sparkSession) + } + } \ No newline at end of file diff --git a/shims/spark321/src/main/scala/org/apache/spark/sql/util/ShimUtils.scala b/shims/spark321/src/main/scala/org/apache/spark/sql/util/ShimUtils.scala new file mode 100644 index 000000000..26b2ea965 --- /dev/null +++ b/shims/spark321/src/main/scala/org/apache/spark/sql/util/ShimUtils.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2020 Intel Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.util + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf + +object ShimUtils { + + def leafNodeDefaultParallelism(sparkSession: SparkSession): Int = { + sparkSession.conf.get(SQLConf.LEAF_NODE_DEFAULT_PARALLELISM).getOrElse( + sparkSession.sparkContext.defaultParallelism) + } +} \ No newline at end of file From de19cbc0161c41fdc75698a40094748abac0be0c Mon Sep 17 00:00:00 2001 From: philo Date: Fri, 19 Aug 2022 21:15:23 +0800 Subject: [PATCH 5/8] Adjust partition size empirically --- .../datasources/v2/arrow/ArrowScan.scala | 29 ++++++++++++------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala index f5d8fc0a5..4e6695ea7 100644 --- a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala +++ b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala @@ -74,27 +74,36 @@ case class ArrowScan( this.copy(partitionFilters = partitionFilters, dataFilters = dataFilters) // compute maxSplitBytes - def maxSplitBytes( - sparkSession: SparkSession, - selectedPartitions: Seq[PartitionDirectory]): Long = { + def maxSplitBytes(sparkSession: SparkSession, + selectedPartitions: Seq[PartitionDirectory]): Long = { val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes - val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes + val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes // val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum // .getOrElse(sparkSession.leafNodeDefaultParallelism) val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum .getOrElse(SparkShimLoader.getSparkShims.leafNodeDefaultParallelism(sparkSession)) - val PREFERRED_PARTITION_SIZE_LOWER_BOUND: Long = 128 * 1024 * 1024 - val PREFERRED_PARTITION_SIZE_UPPER_BOUND: Long = 512 * 1024 * 1024 + val PREFERRED_PARTITION_SIZE_LOWER_BOUND: Long = 256 * 1024 * 1024 + val PREFERRED_PARTITION_SIZE_UPPER_BOUND: Long = 1024 * 1024 * 1024 val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum - val bytesPerCore = totalBytes / minPartitionNum - - Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + var maxBytesPerCore = totalBytes / minPartitionNum + var bytesPerCoreFinal = maxBytesPerCore + var bytesPerCore = maxBytesPerCore + var i = 2 + while (bytesPerCore > PREFERRED_PARTITION_SIZE_UPPER_BOUND && i < 4) { + bytesPerCore = maxBytesPerCore / i + if (bytesPerCore > PREFERRED_PARTITION_SIZE_LOWER_BOUND) { + bytesPerCoreFinal = bytesPerCore + } + i = i + 1 + } + Math.min(PREFERRED_PARTITION_SIZE_UPPER_BOUND, bytesPerCoreFinal) + // Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) } override def partitions: Seq[FilePartition] = { val selectedPartitions = fileIndex.listFiles(partitionFilters, dataFilters) // val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions) - val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions) + val maxSplitBytes = this.maxSplitBytes(sparkSession, selectedPartitions) // val partitionAttributes = fileIndex.partitionSchema.toAttributes val partitionAttributes = ScanUtils.toAttributes(fileIndex) val attributeMap = partitionAttributes.map(a => normalizeName(a.name) -> a).toMap From 6fbb54a1a682cb28a706ad669e752b2ac3594a22 Mon Sep 17 00:00:00 2001 From: philo Date: Fri, 19 Aug 2022 22:59:06 +0800 Subject: [PATCH 6/8] Change upper/lower bound --- .../sql/execution/datasources/v2/arrow/ArrowScan.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala index 4e6695ea7..d5cbd6f98 100644 --- a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala +++ b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala @@ -82,14 +82,14 @@ case class ArrowScan( // .getOrElse(sparkSession.leafNodeDefaultParallelism) val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum .getOrElse(SparkShimLoader.getSparkShims.leafNodeDefaultParallelism(sparkSession)) - val PREFERRED_PARTITION_SIZE_LOWER_BOUND: Long = 256 * 1024 * 1024 - val PREFERRED_PARTITION_SIZE_UPPER_BOUND: Long = 1024 * 1024 * 1024 + val PREFERRED_PARTITION_SIZE_LOWER_BOUND: Long = 128 * 1024 * 1024 + val PREFERRED_PARTITION_SIZE_UPPER_BOUND: Long = 512 * 1024 * 1024 val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum var maxBytesPerCore = totalBytes / minPartitionNum var bytesPerCoreFinal = maxBytesPerCore var bytesPerCore = maxBytesPerCore var i = 2 - while (bytesPerCore > PREFERRED_PARTITION_SIZE_UPPER_BOUND && i < 4) { + while (bytesPerCore > PREFERRED_PARTITION_SIZE_UPPER_BOUND) { bytesPerCore = maxBytesPerCore / i if (bytesPerCore > PREFERRED_PARTITION_SIZE_LOWER_BOUND) { bytesPerCoreFinal = bytesPerCore From 66269c2023442f547dbf939f20f9ee26e7eeae15 Mon Sep 17 00:00:00 2001 From: philo Date: Tue, 23 Aug 2022 16:50:55 +0800 Subject: [PATCH 7/8] Port the code for getFilePartitions --- .../datasources/v2/arrow/ArrowScan.scala | 85 ++++++++++++++++--- 1 file changed, 72 insertions(+), 13 deletions(-) diff --git a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala index d5cbd6f98..cf90cfb56 100644 --- a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala +++ b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala @@ -17,7 +17,6 @@ package com.intel.oap.spark.sql.execution.datasources.v2.arrow import com.intel.oap.sql.shims.SparkShimLoader - import java.util.Locale import scala.collection.JavaConverters._ @@ -27,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.connector.read.PartitionReaderFactory import org.apache.spark.sql.execution.PartitionedFileUtil -import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionDirectory, PartitioningAwareFileIndex} +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionDirectory, PartitionedFile, PartitioningAwareFileIndex} import org.apache.spark.sql.execution.datasources.v2.FileScan import org.apache.spark.sql.execution.datasources.v2.arrow.ScanUtils import org.apache.spark.sql.internal.SQLConf @@ -36,6 +35,8 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.SerializableConfiguration +import scala.collection.mutable.ArrayBuffer + case class ArrowScan( sparkSession: SparkSession, fileIndex: PartitioningAwareFileIndex, @@ -49,6 +50,7 @@ case class ArrowScan( // Use the default value for org.apache.spark.internal.config.IO_WARNING_LARGEFILETHRESHOLD. val IO_WARNING_LARGEFILETHRESHOLD: Long = 1024 * 1024 * 1024 + var openCostInBytesFinal = sparkSession.sessionState.conf.filesOpenCostInBytes override def isSplitable(path: Path): Boolean = { ArrowUtils.isOriginalFormatSplitable( @@ -76,30 +78,87 @@ case class ArrowScan( // compute maxSplitBytes def maxSplitBytes(sparkSession: SparkSession, selectedPartitions: Seq[PartitionDirectory]): Long = { + // TODO: unify it with PREFERRED_PARTITION_SIZE_UPPER_BOUND. val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes - val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes + val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes // val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum // .getOrElse(sparkSession.leafNodeDefaultParallelism) val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum .getOrElse(SparkShimLoader.getSparkShims.leafNodeDefaultParallelism(sparkSession)) - val PREFERRED_PARTITION_SIZE_LOWER_BOUND: Long = 128 * 1024 * 1024 - val PREFERRED_PARTITION_SIZE_UPPER_BOUND: Long = 512 * 1024 * 1024 + val PREFERRED_PARTITION_SIZE_LOWER_BOUND: Long = 256 * 1024 * 1024 + val PREFERRED_PARTITION_SIZE_UPPER_BOUND: Long = 1024 * 1024 * 1024 val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum var maxBytesPerCore = totalBytes / minPartitionNum var bytesPerCoreFinal = maxBytesPerCore var bytesPerCore = maxBytesPerCore - var i = 2 - while (bytesPerCore > PREFERRED_PARTITION_SIZE_UPPER_BOUND) { - bytesPerCore = maxBytesPerCore / i - if (bytesPerCore > PREFERRED_PARTITION_SIZE_LOWER_BOUND) { - bytesPerCoreFinal = bytesPerCore + + if (bytesPerCore > PREFERRED_PARTITION_SIZE_UPPER_BOUND) { + // Adjust partition size. + var i = 2 + while (bytesPerCore > PREFERRED_PARTITION_SIZE_UPPER_BOUND) { + bytesPerCore = maxBytesPerCore / i + if (bytesPerCore > PREFERRED_PARTITION_SIZE_LOWER_BOUND) { + bytesPerCoreFinal = bytesPerCore + } + i = i + 1 + } + Math.min(PREFERRED_PARTITION_SIZE_UPPER_BOUND, bytesPerCoreFinal) + // Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + } else { + // adjust open cost. + var i = 2 + while (bytesPerCore < PREFERRED_PARTITION_SIZE_LOWER_BOUND) { + val dynamicOpenCostInBytes = openCostInBytesFinal * i + val totalBytes = + selectedPartitions.flatMap(_.files.map(_.getLen + dynamicOpenCostInBytes)).sum + maxBytesPerCore = totalBytes / minPartitionNum + if (maxBytesPerCore < PREFERRED_PARTITION_SIZE_UPPER_BOUND) { + openCostInBytesFinal = dynamicOpenCostInBytes + bytesPerCoreFinal = maxBytesPerCore + } + i = i + 1 + } + Math.max(PREFERRED_PARTITION_SIZE_LOWER_BOUND, bytesPerCoreFinal) + } + } + + // This implementation is ported from spark FilePartition.scala with changes for + // adjusting openCost. + def getFilePartitions(sparkSession: SparkSession, + partitionedFiles: Seq[PartitionedFile], + maxSplitBytes: Long): Seq[FilePartition] = { + val partitions = new ArrayBuffer[FilePartition] + val currentFiles = new ArrayBuffer[PartitionedFile] + var currentSize = 0L + + /** Close the current partition and move to the next. */ + def closePartition(): Unit = { + if (currentFiles.nonEmpty) { + // Copy to a new Array. + val newPartition = FilePartition(partitions.size, currentFiles.toArray) + partitions += newPartition + } + currentFiles.clear() + currentSize = 0 + } + + // val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes + val openCostInBytes = openCostInBytesFinal + // Assign files to partitions using "Next Fit Decreasing" + partitionedFiles.foreach { file => + if (currentSize + file.length > maxSplitBytes) { + closePartition() } - i = i + 1 + // Add the given file to the current partition. + currentSize += file.length + openCostInBytes + currentFiles += file } - Math.min(PREFERRED_PARTITION_SIZE_UPPER_BOUND, bytesPerCoreFinal) - // Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + closePartition() + partitions.toSeq } + // This implementation is ported from spark FileScan with only changes for computing + // maxSplitBytes. override def partitions: Seq[FilePartition] = { val selectedPartitions = fileIndex.listFiles(partitionFilters, dataFilters) // val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions) From 0bc4f43808783053455bc0b3945b03e4c5927494 Mon Sep 17 00:00:00 2001 From: philo Date: Tue, 23 Aug 2022 16:54:12 +0800 Subject: [PATCH 8/8] Make WIP code commented --- .../datasources/v2/arrow/ArrowScan.scala | 189 +++++++++--------- 1 file changed, 94 insertions(+), 95 deletions(-) diff --git a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala index cf90cfb56..e1f6693b3 100644 --- a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala +++ b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala @@ -76,51 +76,51 @@ case class ArrowScan( this.copy(partitionFilters = partitionFilters, dataFilters = dataFilters) // compute maxSplitBytes - def maxSplitBytes(sparkSession: SparkSession, - selectedPartitions: Seq[PartitionDirectory]): Long = { - // TODO: unify it with PREFERRED_PARTITION_SIZE_UPPER_BOUND. - val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes - val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes - // val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum - // .getOrElse(sparkSession.leafNodeDefaultParallelism) - val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum - .getOrElse(SparkShimLoader.getSparkShims.leafNodeDefaultParallelism(sparkSession)) - val PREFERRED_PARTITION_SIZE_LOWER_BOUND: Long = 256 * 1024 * 1024 - val PREFERRED_PARTITION_SIZE_UPPER_BOUND: Long = 1024 * 1024 * 1024 - val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum - var maxBytesPerCore = totalBytes / minPartitionNum - var bytesPerCoreFinal = maxBytesPerCore - var bytesPerCore = maxBytesPerCore - - if (bytesPerCore > PREFERRED_PARTITION_SIZE_UPPER_BOUND) { - // Adjust partition size. - var i = 2 - while (bytesPerCore > PREFERRED_PARTITION_SIZE_UPPER_BOUND) { - bytesPerCore = maxBytesPerCore / i - if (bytesPerCore > PREFERRED_PARTITION_SIZE_LOWER_BOUND) { - bytesPerCoreFinal = bytesPerCore - } - i = i + 1 - } - Math.min(PREFERRED_PARTITION_SIZE_UPPER_BOUND, bytesPerCoreFinal) - // Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) - } else { - // adjust open cost. - var i = 2 - while (bytesPerCore < PREFERRED_PARTITION_SIZE_LOWER_BOUND) { - val dynamicOpenCostInBytes = openCostInBytesFinal * i - val totalBytes = - selectedPartitions.flatMap(_.files.map(_.getLen + dynamicOpenCostInBytes)).sum - maxBytesPerCore = totalBytes / minPartitionNum - if (maxBytesPerCore < PREFERRED_PARTITION_SIZE_UPPER_BOUND) { - openCostInBytesFinal = dynamicOpenCostInBytes - bytesPerCoreFinal = maxBytesPerCore - } - i = i + 1 - } - Math.max(PREFERRED_PARTITION_SIZE_LOWER_BOUND, bytesPerCoreFinal) - } - } +// def maxSplitBytes(sparkSession: SparkSession, +// selectedPartitions: Seq[PartitionDirectory]): Long = { +// // TODO: unify it with PREFERRED_PARTITION_SIZE_UPPER_BOUND. +// val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes +// val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes +// // val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum +// // .getOrElse(sparkSession.leafNodeDefaultParallelism) +// val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum +// .getOrElse(SparkShimLoader.getSparkShims.leafNodeDefaultParallelism(sparkSession)) +// val PREFERRED_PARTITION_SIZE_LOWER_BOUND: Long = 256 * 1024 * 1024 +// val PREFERRED_PARTITION_SIZE_UPPER_BOUND: Long = 1024 * 1024 * 1024 +// val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum +// var maxBytesPerCore = totalBytes / minPartitionNum +// var bytesPerCoreFinal = maxBytesPerCore +// var bytesPerCore = maxBytesPerCore +// +// if (bytesPerCore > PREFERRED_PARTITION_SIZE_UPPER_BOUND) { +// // Adjust partition size. +// var i = 2 +// while (bytesPerCore > PREFERRED_PARTITION_SIZE_UPPER_BOUND) { +// bytesPerCore = maxBytesPerCore / i +// if (bytesPerCore > PREFERRED_PARTITION_SIZE_LOWER_BOUND) { +// bytesPerCoreFinal = bytesPerCore +// } +// i = i + 1 +// } +// Math.min(PREFERRED_PARTITION_SIZE_UPPER_BOUND, bytesPerCoreFinal) +// // Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) +// } else { +// // adjust open cost. +// var i = 2 +// while (bytesPerCore < PREFERRED_PARTITION_SIZE_LOWER_BOUND) { +// val dynamicOpenCostInBytes = openCostInBytesFinal * i +// val totalBytes = +// selectedPartitions.flatMap(_.files.map(_.getLen + dynamicOpenCostInBytes)).sum +// maxBytesPerCore = totalBytes / minPartitionNum +// if (maxBytesPerCore < PREFERRED_PARTITION_SIZE_UPPER_BOUND) { +// openCostInBytesFinal = dynamicOpenCostInBytes +// bytesPerCoreFinal = maxBytesPerCore +// } +// i = i + 1 +// } +// Math.max(PREFERRED_PARTITION_SIZE_LOWER_BOUND, bytesPerCoreFinal) +// } +// } // This implementation is ported from spark FilePartition.scala with changes for // adjusting openCost. @@ -142,8 +142,7 @@ case class ArrowScan( currentSize = 0 } - // val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes - val openCostInBytes = openCostInBytesFinal + val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes // Assign files to partitions using "Next Fit Decreasing" partitionedFiles.foreach { file => if (currentSize + file.length > maxSplitBytes) { @@ -159,54 +158,54 @@ case class ArrowScan( // This implementation is ported from spark FileScan with only changes for computing // maxSplitBytes. - override def partitions: Seq[FilePartition] = { - val selectedPartitions = fileIndex.listFiles(partitionFilters, dataFilters) - // val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions) - val maxSplitBytes = this.maxSplitBytes(sparkSession, selectedPartitions) - // val partitionAttributes = fileIndex.partitionSchema.toAttributes - val partitionAttributes = ScanUtils.toAttributes(fileIndex) - val attributeMap = partitionAttributes.map(a => normalizeName(a.name) -> a).toMap - val readPartitionAttributes = readPartitionSchema.map { readField => - attributeMap.get(normalizeName(readField.name)).getOrElse { - // throw QueryCompilationErrors.cannotFindPartitionColumnInPartitionSchemaError( - // readField, fileIndex.partitionSchema) - throw new RuntimeException(s"Can't find required partition column ${readField.name} " + - s"in partition schema ${fileIndex.partitionSchema}") - } - } - lazy val partitionValueProject = - GenerateUnsafeProjection.generate(readPartitionAttributes, partitionAttributes) - val splitFiles = selectedPartitions.flatMap { partition => - // Prune partition values if part of the partition columns are not required. - val partitionValues = if (readPartitionAttributes != partitionAttributes) { - partitionValueProject(partition.values).copy() - } else { - partition.values - } - partition.files.flatMap { file => - val filePath = file.getPath - PartitionedFileUtil.splitFiles( - sparkSession = sparkSession, - file = file, - filePath = filePath, - isSplitable = isSplitable(filePath), - maxSplitBytes = maxSplitBytes, - partitionValues = partitionValues - ) - }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse) - } - - if (splitFiles.length == 1) { - val path = new Path(splitFiles(0).filePath) - if (!isSplitable(path) && splitFiles(0).length > - IO_WARNING_LARGEFILETHRESHOLD) { - logWarning(s"Loading one large unsplittable file ${path.toString} with only one " + - s"partition, the reason is: ${getFileUnSplittableReason(path)}") - } - } - - FilePartition.getFilePartitions(sparkSession, splitFiles, maxSplitBytes) - } +// override def partitions: Seq[FilePartition] = { +// val selectedPartitions = fileIndex.listFiles(partitionFilters, dataFilters) +// // val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions) +// val maxSplitBytes = this.maxSplitBytes(sparkSession, selectedPartitions) +// // val partitionAttributes = fileIndex.partitionSchema.toAttributes +// val partitionAttributes = ScanUtils.toAttributes(fileIndex) +// val attributeMap = partitionAttributes.map(a => normalizeName(a.name) -> a).toMap +// val readPartitionAttributes = readPartitionSchema.map { readField => +// attributeMap.get(normalizeName(readField.name)).getOrElse { +// // throw QueryCompilationErrors.cannotFindPartitionColumnInPartitionSchemaError( +// // readField, fileIndex.partitionSchema) +// throw new RuntimeException(s"Can't find required partition column ${readField.name} " + +// s"in partition schema ${fileIndex.partitionSchema}") +// } +// } +// lazy val partitionValueProject = +// GenerateUnsafeProjection.generate(readPartitionAttributes, partitionAttributes) +// val splitFiles = selectedPartitions.flatMap { partition => +// // Prune partition values if part of the partition columns are not required. +// val partitionValues = if (readPartitionAttributes != partitionAttributes) { +// partitionValueProject(partition.values).copy() +// } else { +// partition.values +// } +// partition.files.flatMap { file => +// val filePath = file.getPath +// PartitionedFileUtil.splitFiles( +// sparkSession = sparkSession, +// file = file, +// filePath = filePath, +// isSplitable = isSplitable(filePath), +// maxSplitBytes = maxSplitBytes, +// partitionValues = partitionValues +// ) +// }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse) +// } +// +// if (splitFiles.length == 1) { +// val path = new Path(splitFiles(0).filePath) +// if (!isSplitable(path) && splitFiles(0).length > +// IO_WARNING_LARGEFILETHRESHOLD) { +// logWarning(s"Loading one large unsplittable file ${path.toString} with only one " + +// s"partition, the reason is: ${getFileUnSplittableReason(path)}") +// } +// } +// +// FilePartition.getFilePartitions(sparkSession, splitFiles, maxSplitBytes) +// } private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis