Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev/release/rat_exclude_files.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ arrow-flight/src/sql/arrow.flight.protocol.sql.rs
.github/*
parquet/src/bin/parquet-fromcsv-help.txt
arrow-flight/examples/data/*
parquet/examples/page_store_dedup/page_store_concept.svg
16 changes: 15 additions & 1 deletion parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ arrow-csv = { workspace = true, optional = true }
arrow-data = { workspace = true, optional = true }
arrow-schema = { workspace = true, optional = true }
arrow-select = { workspace = true, optional = true }
arrow-cast = { workspace = true, optional = true, features = ["prettyprint"] }
arrow-ipc = { workspace = true, optional = true }
parquet-geospatial = { workspace = true, optional = true }
parquet-variant = { workspace = true, optional = true }
Expand All @@ -65,6 +66,7 @@ num-integer = { version = "0.1.46", default-features = false, features = ["std"]
num-traits = { version = "0.2.19", default-features = false, features = ["std"] }
base64 = { version = "0.22", default-features = false, features = ["std", ], optional = true }
clap = { version = "4.1", default-features = false, features = ["std", "derive", "env", "help", "error-context", "usage"], optional = true }
glob = { version = "0.3", default-features = false, optional = true }
serde = { version = "1.0", default-features = false, features = ["derive"], optional = true }
serde_json = { version = "1.0", default-features = false, features = ["std"], optional = true }
seq-macro = { version = "0.3", default-features = false }
Expand All @@ -77,6 +79,7 @@ half = { version = "2.1", default-features = false, features = ["num-traits"] }
crc32fast = { version = "1.4.2", optional = true, default-features = false }
simdutf8 = { workspace = true , optional = true }
ring = { version = "0.17", default-features = false, features = ["std"], optional = true }
blake3 = { version = "1", default-features = false, optional = true }

[dev-dependencies]
base64 = { version = "0.22", default-features = false, features = ["std"] }
Expand Down Expand Up @@ -108,7 +111,7 @@ arrow = ["base64", "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema",
# Enable support for arrow canonical extension types
arrow_canonical_extension_types = ["arrow-schema?/canonical_extension_types"]
# Enable CLI tools
cli = ["json", "base64", "clap", "arrow-csv", "serde"]
cli = ["json", "base64", "clap", "arrow-csv", "serde", "dep:glob"]
# Enable JSON APIs
json = ["serde_json", "base64"]
# Enable internal testing APIs
Expand All @@ -134,6 +137,8 @@ flate2-zlib-rs = ["flate2/zlib-rs"]
variant_experimental = ["arrow", "parquet-variant", "parquet-variant-json", "parquet-variant-compute"]
# Enable geospatial support
geospatial = ["parquet-geospatial"]
# Enable page store (content-addressed page storage)
page_store = ["arrow", "dep:blake3", "dep:arrow-cast", "serde", "serde_json"]


[[example]]
Expand All @@ -151,6 +156,11 @@ name = "write_parquet"
required-features = ["cli"]
path = "./examples/write_parquet.rs"

[[example]]
name = "page_store"
required-features = ["page_store"]
path = "./examples/page_store.rs"

[[example]]
name = "read_with_rowgroup"
required-features = ["arrow", "async"]
Expand Down Expand Up @@ -180,6 +190,10 @@ name = "variant_integration"
required-features = ["arrow", "variant_experimental", "serde"]
path = "./tests/variant_integration.rs"

[[bin]]
name = "parquet-page-store"
required-features = ["page_store", "cli"]

[[bin]]
name = "parquet-read"
required-features = ["cli"]
Expand Down
106 changes: 106 additions & 0 deletions parquet/examples/page_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Example demonstrating the Parquet Page Store.
//!
//! Writes Arrow RecordBatches to a content-addressed page store and reads them back.

use std::sync::Arc;

use arrow_array::{ArrayRef, Float64Array, Int32Array, RecordBatch, StringArray};
use arrow_cast::pretty::pretty_format_batches;
use parquet::arrow::page_store::{PageStoreReader, PageStoreWriter};
use parquet::file::properties::{CdcOptions, EnabledStatistics, WriterProperties};
use tempfile::TempDir;

fn main() -> parquet::errors::Result<()> {
let tempdir = TempDir::new().unwrap();
let store_dir = tempdir.path().join("page_store");

// Create sample data
let batch = RecordBatch::try_from_iter(vec![
(
"id",
Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])) as ArrayRef,
),
(
"value",
Arc::new(Float64Array::from(vec![
1.0, 2.5, 3.7, 4.2, 5.9, 6.1, 7.3, 8.8, 9.0, 10.5,
])) as ArrayRef,
),
(
"name",
Arc::new(StringArray::from(vec![
"alice", "bob", "charlie", "diana", "eve", "frank", "grace", "heidi", "ivan",
"judy",
])) as ArrayRef,
),
])
.unwrap();

let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Page)
.set_content_defined_chunking(Some(CdcOptions::default()))
.build();

let metadata_path = tempdir.path().join("table.parquet");

// Write to page store
println!("Page store dir: {}", store_dir.display());
println!("Metadata file: {}", metadata_path.display());
let mut writer = PageStoreWriter::try_new(&store_dir, batch.schema(), Some(props))?;
writer.write(&batch)?;
let metadata = writer.finish(&metadata_path)?;

println!(
"Wrote {} row group(s), {} total rows",
metadata.num_row_groups(),
metadata.file_metadata().num_rows()
);

// List page files
let page_files: Vec<_> = std::fs::read_dir(&store_dir)
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| e.path().extension().is_some_and(|ext| ext == "page"))
.collect();
println!("Page files in store: {}", page_files.len());

// Read back from page store
println!("\nReading from page store...");
let reader = PageStoreReader::try_new(&metadata_path, &store_dir)?;
let batches = reader.read_batches()?;

let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
println!(
"Read {} batch(es), {} total rows",
batches.len(),
total_rows
);

// Display
let formatted = pretty_format_batches(&batches).unwrap();
println!("\n{formatted}");

// Verify round-trip
assert_eq!(batches.len(), 1);
assert_eq!(batches[0], batch);
println!("\nRound-trip verification: PASSED");

Ok(())
}
6 changes: 6 additions & 0 deletions parquet/examples/page_store_dedup/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
data/
meta/
pages/
verify/
.venv/
.cache/
179 changes: 179 additions & 0 deletions parquet/examples/page_store_dedup/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Parquet Page Store — Deduplication Demo

> **Prototype**: This is an experimental feature exploring content-defined
> chunking for Parquet. APIs and file formats may change.

Demonstrates how Content-Defined Chunking (CDC) enables efficient deduplication
across multiple versions of a dataset using the Parquet page store writer in
Apache Arrow Rust. The deduplication is self-contained in the Parquet writer —
no special storage system is required.

## What this demo shows

Four common dataset operations are applied to a real-world dataset
([OpenHermes-2.5](https://huggingface.co/datasets/teknium/OpenHermes-2.5)
conversational data, ~800 MB per file). Each operation produces a separate
Parquet file. Without a page store, storing all four files costs the full sum
of their sizes. With the CDC page store, identical pages are stored **exactly
once** — indexed by their BLAKE3 hash — so the four files share most of their
bytes. The resulting files can be stored anywhere.

| File | Operation |
| ------------------- | -------------------------------------- |
| `original.parquet` | Baseline dataset (~996k rows) |
| `filtered.parquet` | Keep rows where `num_turns ≤ 3` |
| `augmented.parquet` | Original + computed column `num_turns` |
| `appended.parquet` | Original + 5 000 new rows appended |

## Prerequisites

```bash
pip install pyarrow matplotlib huggingface_hub
cargo build --release -p parquet --features page_store,cli
```

## Running the demo

```bash
cd parquet/examples/page_store_dedup

# Run the full pipeline: prepare data, build binary, ingest into page store, show stats
python pipeline.py

# Then generate diagrams
python diagram.py
```

Individual steps can be skipped if they've already run:

```bash
python pipeline.py --skip-prepare --skip-build # re-run ingest + stats only
python pipeline.py --skip-prepare --skip-build --skip-ingest # stats only
```

Outputs:

- `page_store_concept.png` — architectural overview of how shared pages work
- `page_store_savings.png` — side-by-side storage comparison with real numbers

## Using your own dataset

```bash
python pipeline.py --file /path/to/your.parquet
```

The script requires a `conversations` list column for the filtered and augmented
variants. Adapt `pipeline.py` to your own schema as needed.

## Results

Dataset: **OpenHermes-2.5** (short conversations, `num_turns < 10`)

### Dataset variants

| File | Operation | Rows | Size |
| ------------------- | ------------------------------------------- | --------- | -------------- |
| `original.parquet` | Baseline | 996,009 | 782.1 MB |
| `filtered.parquet` | Keep `num_turns ≤ 3` (removes 0.2% of rows) | 993,862 | 776.8 MB |
| `augmented.parquet` | Add column `num_turns` | 996,009 | 782.2 MB |
| `appended.parquet` | Append 5,000 rows | 1,001,009 | 788.6 MB |
| **Total** | | | **3,129.7 MB** |

### Page store results

| Metric | Value |
| ------------------------- | -------------------- |
| Unique pages stored | 3,400 |
| Total page references | 15,179 |
| Page store size | 559.0 MB |
| Metadata files size | 4.4 MB |
| **Page store + metadata** | **563.4 MB** |
| **Storage saved** | **2,566.3 MB (82%)** |
| **Deduplication ratio** | **5.6×** |

### Per-file page breakdown

| File | Page refs | Unique hashes | New pages | Reused pages |
| ------------------- | --------- | ------------- | --------- | ------------ |
| `original.parquet` | 3,782 | 3,100 | 3,100 | 0 |
| `filtered.parquet` | 3,755 | 3,075 | 222 | 2,853 (92%) |
| `augmented.parquet` | 3,834 | 3,136 | 36 | 3,100 (98%) |
| `appended.parquet` | 3,808 | 3,125 | 42 | 3,083 (98%) |

### Key insights

1. **Adding a column** (`augmented`): only 36 new pages out of 3,136 (1.1%).
The existing 17 columns produce identical CDC pages — only the new `num_turns`
column contributes new pages.

2. **Appending rows** (`appended`): only 42 new pages out of 3,125 (1.3%).
The original 996k rows' pages are unchanged; only the 5k new rows create new pages.

3. **Filtering rows** (`filtered`): 92% of pages reused despite row removal.
Removing just 0.2% of rows barely shifts CDC boundaries — most pages are
unchanged. Heavier filtering (removing 20–50% of rows) would produce more new
pages, as CDC boundaries shift further throughout the file.

4. **Net result**: 4 dataset versions stored for **563 MB instead of 3.1 GB** — an
**82% reduction**, or equivalently, 4 versions for the cost of **0.72×** a single
version.

## How it works

```
Standard Parquet — each file stored independently:

original.parquet ──► [ page 1 ][ page 2 ][ page 3 ]...[ page N ]
filtered.parquet ──► [ page 1'][ page 2 ][ page 3 ]...[ page M ]
augmented.parquet ──► [ page 1 ][ page 2 ][ page 3 ]...[ page N ][ extra ]
appended.parquet ──► [ page 1 ][ page 2 ][ page 3 ]...[ page N ][ new ]

Total: sum of all four file sizes

CDC Page Store — content-addressed, deduplicated:

pages/
<hash-of-page-1>.page ← shared by original, augmented, appended
<hash-of-page-2>.page ← shared by original, filtered, augmented, appended
<hash-of-page-3>.page ← shared by filtered only (boundary page)
... (only UNIQUE pages stored)

meta/
original.meta.parquet ← tiny manifest referencing page hashes
filtered.meta.parquet
augmented.meta.parquet
appended.meta.parquet

Total: ~18% of the combined file sizes
```

CDC ensures that page boundaries are **content-defined** (not fixed row
counts), so adding columns or appending rows only requires storing the small
number of new pages — the rest remain identical and are reused.

## Further reading

- [`parquet::arrow::page_store`][api] API docs
- [`parquet-page-store` CLI][cli] source

[api]: https://docs.rs/parquet/latest/parquet/arrow/page_store/index.html
[cli]: ../../src/bin/parquet-page-store.rs
Loading
Loading