Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
84 changes: 65 additions & 19 deletions dev/benchmarks/README.md → benchmarks/tpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,26 @@ For full instructions on running these benchmarks on an EC2 instance, see the [C

[Comet Benchmarking on EC2 Guide]: https://datafusion.apache.org/comet/contributor-guide/benchmarking_aws_ec2.html

## Usage

All benchmarks are run via `run.py`:

```
python3 run.py --engine <engine> --benchmark <tpch|tpcds> [options]
```

| Option | Description |
| -------------- | ------------------------------------------------ |
| `--engine` | Engine name (matches a TOML file in `engines/`) |
| `--benchmark` | `tpch` or `tpcds` |
| `--iterations` | Number of iterations (default: 1) |
| `--output` | Output directory (default: `.`) |
| `--query` | Run a single query number |
| `--no-restart` | Skip Spark master/worker restart |
| `--dry-run` | Print the spark-submit command without executing |

Available engines: `spark`, `comet`, `comet-iceberg`, `gluten`

## Example usage

Set Spark environment variables:
Expand All @@ -47,7 +67,7 @@ Run Spark benchmark:
```shell
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
sudo ./drop-caches.sh
./spark-tpch.sh
python3 run.py --engine spark --benchmark tpch
```

Run Comet benchmark:
Expand All @@ -56,7 +76,7 @@ Run Comet benchmark:
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
export COMET_JAR=/opt/comet/comet-spark-spark3.5_2.12-0.10.0.jar
sudo ./drop-caches.sh
./comet-tpch.sh
python3 run.py --engine comet --benchmark tpch
```

Run Gluten benchmark:
Expand All @@ -65,7 +85,13 @@ Run Gluten benchmark:
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export GLUTEN_JAR=/opt/gluten/gluten-velox-bundle-spark3.5_2.12-linux_amd64-1.4.0.jar
sudo ./drop-caches.sh
./gluten-tpch.sh
python3 run.py --engine gluten --benchmark tpch
```

Preview a command without running it:

```shell
python3 run.py --engine comet --benchmark tpch --dry-run
```

Generating charts:
Expand All @@ -74,6 +100,11 @@ Generating charts:
python3 generate-comparison.py --benchmark tpch --labels "Spark 3.5.3" "Comet 0.9.0" "Gluten 1.4.0" --title "TPC-H @ 100 GB (single executor, 8 cores, local Parquet files)" spark-tpch-1752338506381.json comet-tpch-1752337818039.json gluten-tpch-1752337474344.json
```

## Engine Configuration

Each engine is defined by a TOML file in `engines/`. The config specifies JARs, Spark conf overrides,
required environment variables, and optional defaults/exports. See existing files for examples.

## Iceberg Benchmarking

Comet includes native Iceberg support via iceberg-rust integration. This enables benchmarking TPC-H queries
Expand All @@ -90,14 +121,16 @@ export ICEBERG_JAR=/path/to/iceberg-spark-runtime-3.5_2.12-1.8.1.jar

Note: Table creation uses `--packages` which auto-downloads the dependency.

### Create Iceberg TPC-H tables
### Create Iceberg tables

Convert existing Parquet TPC-H data to Iceberg format:
Convert existing Parquet data to Iceberg format using `create-iceberg-tables.py`.
The script configures the Iceberg catalog automatically -- no `--conf` flags needed.

```shell
export ICEBERG_WAREHOUSE=/mnt/bigdata/iceberg-warehouse
export ICEBERG_CATALOG=${ICEBERG_CATALOG:-local}
mkdir -p $ICEBERG_WAREHOUSE

# TPC-H
$SPARK_HOME/bin/spark-submit \
--master $SPARK_MASTER \
--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \
Expand All @@ -106,13 +139,24 @@ $SPARK_HOME/bin/spark-submit \
--conf spark.executor.cores=8 \
--conf spark.cores.max=8 \
--conf spark.executor.memory=16g \
--conf spark.sql.catalog.${ICEBERG_CATALOG}=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.${ICEBERG_CATALOG}.type=hadoop \
--conf spark.sql.catalog.${ICEBERG_CATALOG}.warehouse=$ICEBERG_WAREHOUSE \
create-iceberg-tpch.py \
create-iceberg-tables.py \
--benchmark tpch \
--parquet-path $TPCH_DATA \
--catalog $ICEBERG_CATALOG \
--database tpch
--warehouse $ICEBERG_WAREHOUSE

# TPC-DS
$SPARK_HOME/bin/spark-submit \
--master $SPARK_MASTER \
--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \
--conf spark.driver.memory=8G \
--conf spark.executor.instances=2 \
--conf spark.executor.cores=8 \
--conf spark.cores.max=16 \
--conf spark.executor.memory=16g \
create-iceberg-tables.py \
--benchmark tpcds \
--parquet-path $TPCDS_DATA \
--warehouse $ICEBERG_WAREHOUSE
```

### Run Iceberg benchmark
Expand All @@ -124,20 +168,22 @@ export ICEBERG_JAR=/path/to/iceberg-spark-runtime-3.5_2.12-1.8.1.jar
export ICEBERG_WAREHOUSE=/mnt/bigdata/iceberg-warehouse
export TPCH_QUERIES=/mnt/bigdata/tpch/queries/
sudo ./drop-caches.sh
./comet-tpch-iceberg.sh
python3 run.py --engine comet-iceberg --benchmark tpch
```

The benchmark uses `spark.comet.scan.icebergNative.enabled=true` to enable Comet's native iceberg-rust
integration. Verify native scanning is active by checking for `CometIcebergNativeScanExec` in the
physical plan output.

### Iceberg-specific options
### create-iceberg-tables.py options

| Environment Variable | Default | Description |
| -------------------- | ---------- | ----------------------------------- |
| `ICEBERG_CATALOG` | `local` | Iceberg catalog name |
| `ICEBERG_DATABASE` | `tpch` | Database containing TPC-H tables |
| `ICEBERG_WAREHOUSE` | (required) | Path to Iceberg warehouse directory |
| Option | Required | Default | Description |
| ---------------- | -------- | -------------- | ----------------------------------- |
| `--benchmark` | Yes | | `tpch` or `tpcds` |
| `--parquet-path` | Yes | | Path to source Parquet data |
| `--warehouse` | Yes | | Path to Iceberg warehouse directory |
| `--catalog` | No | `local` | Iceberg catalog name |
| `--database` | No | benchmark name | Database name for the tables |

### Comparing Parquet vs Iceberg performance

Expand Down
171 changes: 171 additions & 0 deletions benchmarks/tpc/create-iceberg-tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Convert TPC-H or TPC-DS Parquet data to Iceberg tables.

Usage:
spark-submit \
--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \
create-iceberg-tables.py \
--benchmark tpch \
--parquet-path /path/to/tpch/parquet \
--warehouse /path/to/iceberg-warehouse

spark-submit \
--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \
create-iceberg-tables.py \
--benchmark tpcds \
--parquet-path /path/to/tpcds/parquet \
--warehouse /path/to/iceberg-warehouse
"""

import argparse
import os
import sys
from pyspark.sql import SparkSession
import time

TPCH_TABLES = [
"customer",
"lineitem",
"nation",
"orders",
"part",
"partsupp",
"region",
"supplier",
]

TPCDS_TABLES = [
"call_center",
"catalog_page",
"catalog_returns",
"catalog_sales",
"customer",
"customer_address",
"customer_demographics",
"date_dim",
"time_dim",
"household_demographics",
"income_band",
"inventory",
"item",
"promotion",
"reason",
"ship_mode",
"store",
"store_returns",
"store_sales",
"warehouse",
"web_page",
"web_returns",
"web_sales",
"web_site",
]

BENCHMARK_TABLES = {
"tpch": TPCH_TABLES,
"tpcds": TPCDS_TABLES,
}


def main(benchmark: str, parquet_path: str, warehouse: str, catalog: str, database: str):
table_names = BENCHMARK_TABLES[benchmark]

# Validate paths before starting Spark
errors = []
if not os.path.isdir(parquet_path):
errors.append(f"Error: --parquet-path '{parquet_path}' does not exist or is not a directory")
if not os.path.isdir(warehouse):
errors.append(f"Error: --warehouse '{warehouse}' does not exist or is not a directory. "
"Create it with: mkdir -p " + warehouse)
if errors:
for e in errors:
print(e, file=sys.stderr)
sys.exit(1)

spark = SparkSession.builder \
.appName(f"Create Iceberg {benchmark.upper()} Tables") \
.config(f"spark.sql.catalog.{catalog}", "org.apache.iceberg.spark.SparkCatalog") \
.config(f"spark.sql.catalog.{catalog}.type", "hadoop") \
.config(f"spark.sql.catalog.{catalog}.warehouse", warehouse) \
.getOrCreate()

# Set the Iceberg catalog as the current catalog so that
# namespace operations are routed correctly
spark.sql(f"USE {catalog}")

# Create namespace if it doesn't exist
try:
spark.sql(f"CREATE NAMESPACE IF NOT EXISTS {database}")
except Exception:
# Namespace may already exist
pass

for table in table_names:
parquet_table_path = f"{parquet_path}/{table}.parquet"
iceberg_table = f"{catalog}.{database}.{table}"

print(f"Converting {parquet_table_path} -> {iceberg_table}")
start_time = time.time()

# Drop table if exists to allow re-running
spark.sql(f"DROP TABLE IF EXISTS {iceberg_table}")

# Read parquet and write as Iceberg
df = spark.read.parquet(parquet_table_path)
df.writeTo(iceberg_table).using("iceberg").create()

row_count = spark.table(iceberg_table).count()
elapsed = time.time() - start_time
print(f" Created {iceberg_table} with {row_count} rows in {elapsed:.2f}s")

print(f"\nAll {benchmark.upper()} tables created successfully!")
print(f"Tables available at: {catalog}.{database}.*")

spark.stop()


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Convert TPC-H or TPC-DS Parquet data to Iceberg tables"
)
parser.add_argument(
"--benchmark", required=True, choices=["tpch", "tpcds"],
help="Benchmark whose tables to convert (tpch or tpcds)"
)
parser.add_argument(
"--parquet-path", required=True,
help="Path to Parquet data directory"
)
parser.add_argument(
"--warehouse", required=True,
help="Path to Iceberg warehouse directory"
)
parser.add_argument(
"--catalog", default="local",
help="Iceberg catalog name (default: 'local')"
)
parser.add_argument(
"--database", default=None,
help="Database name to create tables in (defaults to benchmark name)"
)
args = parser.parse_args()

database = args.database if args.database else args.benchmark
main(args.benchmark, args.parquet_path, args.warehouse, args.catalog, database)
File renamed without changes.
48 changes: 48 additions & 0 deletions benchmarks/tpc/engines/comet-iceberg.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[engine]
name = "comet-iceberg"

[env]
required = ["COMET_JAR", "ICEBERG_JAR", "ICEBERG_WAREHOUSE"]

[env.defaults]
ICEBERG_CATALOG = "local"

[spark_submit]
jars = ["$COMET_JAR", "$ICEBERG_JAR"]
driver_class_path = ["$COMET_JAR", "$ICEBERG_JAR"]

[spark_conf]
"spark.driver.extraClassPath" = "$COMET_JAR:$ICEBERG_JAR"
"spark.executor.extraClassPath" = "$COMET_JAR:$ICEBERG_JAR"
"spark.plugins" = "org.apache.spark.CometPlugin"
"spark.shuffle.manager" = "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager"
"spark.comet.exec.replaceSortMergeJoin" = "true"
"spark.comet.expression.Cast.allowIncompatible" = "true"
"spark.comet.enabled" = "true"
"spark.comet.exec.enabled" = "true"
"spark.comet.scan.icebergNative.enabled" = "true"
"spark.comet.explainFallback.enabled" = "true"
"spark.sql.catalog.${ICEBERG_CATALOG}" = "org.apache.iceberg.spark.SparkCatalog"
"spark.sql.catalog.${ICEBERG_CATALOG}.type" = "hadoop"
"spark.sql.catalog.${ICEBERG_CATALOG}.warehouse" = "$ICEBERG_WAREHOUSE"
"spark.sql.defaultCatalog" = "${ICEBERG_CATALOG}"

[tpcbench_args]
use_iceberg = true
Loading
Loading