feat: add S3 SAIL with Parquet columnar storage engine#5696
feat: add S3 SAIL with Parquet columnar storage engine#5696odysa wants to merge 9 commits intoeclipse-rdf4j:developfrom
Conversation
Introduce rdf4j-sail-s3, an S3-backed SAIL using LSM-tree architecture adapted from RisingWave's Hummock engine. This commit implements the module skeleton and in-memory storage layer: - Config: S3StoreConfig, S3StoreFactory, S3StoreSchema - Storage: Varint encoding, QuadIndex permutations, MemTable (ConcurrentSkipListMap) - Value/NS: S3ValueStore (ConcurrentHashMap ID mapping), S3NamespaceStore - Core SAIL: S3Store, S3StoreConnection, S3SailStore with SailSource/Sink/Dataset - SPI registration via META-INF/services Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add persistence layer so MemTables flush to immutable SSTable files on S3-compatible storage. When s3Bucket is not configured the store stays in pure in-memory mode and all existing tests remain unaffected. Key additions: - ObjectStore interface with S3ObjectStore (MinIO) and FileSystemObjectStore (test double) - SSTableWriter/SSTable: binary format with block index for range scans - MergeIterator: K-way merge across MemTable + SSTables with deduplication and tombstone suppression - Manifest: versioned JSON manifest tracking SSTables on S3 - S3ValueStore and S3NamespaceStore serialization/deserialization - S3StoreConfig: S3 connectivity properties (bucket, endpoint, etc.) - S3SailStore: flush path, merged read path, startup loading - 29 new tests (unit + persistence); 541 total tests pass
|
@odysa Thank you - this is really cool. By using Parquet you get bloom filters, filter pushdown and advanced compression (dictionary and others) for free. Best regards, |
…iered cache (Phase 2)
Replace custom SSTable binary format with Apache Parquet columnar storage,
introduce vertical partitioning by predicate, and add a three-tier cache
(Caffeine heap -> local disk LRU -> S3).
Storage redesign:
- Parquet files on S3 with ZSTD compression and dictionary encoding
- Predicate-based partitioning (data/predicates/{id}/) eliminates
predicate column from files, tightening column statistics
- Three sort orders per partition (SOC, OSC, CSO) for optimal query
performance regardless of access pattern
- Single MemTable in SPOC order, partitioned on flush
- JSON catalog with per-file column statistics for catalog-level pruning
Cache system:
- L1: Caffeine heap cache (configurable, default 256 MB)
- L2: Local disk LRU cache (configurable, default 10 GB)
- L3: S3 source of truth
- Write-through on flush avoids cold reads
Compaction:
- L0->L1 merge when epoch count >= 8 per predicate
- L1->L2 merge when epoch count >= 4 per predicate
- Tombstone suppression at highest level
Hadoop dependency elimination:
- Zero Hadoop JARs in dependency tree
- PlainParquetConfiguration + custom SimpleCodecFactory bypass all
Hadoop runtime paths
- 14 minimal stub classes in org.apache.hadoop.* satisfy parquet-hadoop
JVM class loading requirements
Deleted: SSTable, SSTableWriter, Manifest (replaced by Parquet + Catalog)
All 529 tests pass.
|
@odysa That looks promising. Do you already have any benchmarks? |
|
@kenwenzel Thank you for comments. It's not completed yet, just experimental. |
|
@odysa Maybe you also like to take a look at the QLever paper for some inspiration: Does the store also work with local files? |
|
Great idea, I really like it. Idea could the access to data via S3 code can be hidden behind an interface? allowing either a local files system or an S3 store? |
… (Phase 3) Replace predicate partitioning with flat files and per-file min/max stats for pruning. Add S3 Store to RDF4J Workbench with creation form and TTL config template. S3 connection settings (bucket, endpoint, credentials) resolve from environment variables (RDF4J_S3_*) or system properties so multiple repositories share a single bucket, each isolated by s3Prefix.
|
I was testing locally with MinIO, but using the local filesystem is actually a big plus too. Let me add that |
- Promote FileSystemObjectStore from test to production, enabling 3-mode backend selection (S3 / filesystem / in-memory) via config - Extract QuadStats value type to deduplicate stats computation in S3SailStore and Compactor - Add QuadIndex.matches() helper, eliminating 4-place quad-filter duplication across MergeIterator, MemTable, and ParquetQuadSource - Extract hasPersistence(), queryQuads(), resolveValueId() helpers in S3SailStore to remove repeated guard logic - Split flushToObjectStore() into focused methods - Merge CompactionPolicy.shouldCompactL0/L1 into shouldCompact() - Remove unused explicit param from MemTable.remove() - Delete dead ParquetFilterBuilder (zero usages) - Fix QuadIndex wildcard sentinel inconsistency in getMaxKey - Narrow Throwable catch to Exception in S3Store - Add dataDir config field for filesystem persistence mode
- Catalog: volatile copy-on-write for thread-safe concurrent reads - S3SailStore: save catalog before values for crash-safe ordering; persist namespaces/values even when memTable is empty; delete old compaction files only after catalog is saved - QuadIndex: fix context=0 treated as wildcard in range scans - QuadStats: filter tombstones from stats computation - FileSystemObjectStore: atomic writes via temp-file-then-rename - L2DiskCache: volatile lastAccessNanos, synchronized eviction - Remove dead config fields (quadIndexes, blockSize, valueCacheSize, valueIdCacheSize) from S3StoreConfig and S3StoreSchema - Unify ALL_INDEXES with SortOrder.values() single source of truth - Fix inline FQNs and wildcard imports across main and test sources - Fix stale javadocs in CompactionPolicy, MemTable, S3SailDataset
- Eliminate double serialization of values/namespaces on flush - Make MemTable.approximateSizeInBytes() O(1) via AtomicLong counter - Precompute field indices in sort comparator to avoid hot-loop switch - Remove duplicate rowGroupSize/pageSize fields from S3SailStore/Compactor - Centralize storage key literals into named constants - Add named type discriminator constants in S3ValueStore - Unify QuadStats accumulation with shared Accumulator inner class - Centralize data key generation in Catalog.dataKey() - Restrict ParquetFileInfo 14-param constructor to package-private
|
FYI: https://www.morling.dev/blog/hardwood-new-parser-for-apache-parquet/ might be easier to have as a dependency than parquet-java. |
|
@JervenBolleman Looks promising. But it does not support writing now.
|
…y estimates - Parallelize Parquet writes with CompletableFuture (3 files written concurrently) - Optimize getContextIDs() to use CSPO index with last-seen dedup - Move compaction to background single-thread executor - Implement catalog-based cardinality estimation in S3EvaluationStatistics - Refactor ParquetQuadSource to stream rows lazily instead of loading all into memory - Add row group filtering using Parquet column statistics (min/max) - Add BloomFilter class for leading-component filtering per Parquet file - Add close() to RawEntrySource; MergeIterator closes sources when exhausted
Consolidate duplicated buildBloomFilter and leading-component switch logic from S3SailStore and Compactor into BloomFilter. Replace hand-rolled byte serialization with ByteBuffer. Merge queryQuads overloads and eliminate double shouldCompact evaluation in compaction.
Summary
Adds an experimental SAIL implementation (
rdf4j-sail-s3) that stores RDF data on S3-compatible object storage using an LSM-tree architecture with Apache Parquet as the storage format.Phase 1 — Foundation
S3Store,S3StoreConfig,S3StoreFactory)MemTable(sorted skip list),QuadIndex(configurable index permutations), andVarintencodingPhase 2 — Parquet + Tiered Cache
PlainParquetConfiguration+ customSimpleCodecFactory(zstd-jni) bypass all Hadoop runtime paths. 14 minimal stub classes satisfy parquet-java's JVM class loading requirementsPhase 3 — Workbench UI + Instance-level S3 Config + Local Filesystem Backend
create-s3.xslform,s3.ttlconfig templateRDF4J_S3_BUCKET,RDF4J_S3_ENDPOINT,RDF4J_S3_REGION,RDF4J_S3_ACCESS_KEY,RDF4J_S3_SECRET_KEY,RDF4J_S3_FORCE_PATH_STYLE) or system properties (rdf4j.s3.*)s3Prefix— all data is namespaced under that prefix within the shared bucketdataDirconfigured), or pure in-memory (neither configured)Phase 4 — Safety + Code Quality
QuadIndexrange scans treating context=0 (default graph) as wildcardQuadStatsnow filters tombstone entries for more precise pruningFileSystemObjectStoreandL2DiskCacheuse temp-file-then-rename patternquadIndexes,blockSize,valueCacheSize,valueIdCacheSizeremoved from schemaALL_INDEXESderives fromSortOrder.values()Phase 5 — Performance Optimizations
CompletableFuturewith a shared thread poolParquetQuadSourcestreams rows lazily one row group at a time instead of loading entire files into memorymayContain()before loading any file dataS3EvaluationStatisticsresolves bound values to IDs, sums row counts from matching files (via min/max + bloom filter pruning), enabling better query planningRead Path
flowchart TD A[getStatements\nsubj, pred, obj, ctx] --> B[Resolve Value → ID\nvia ValueStore] B --> C{ID found?} C -- No --> D[Return EmptyIteration] C -- Yes --> E[Select best QuadIndex\nfor query pattern] E --> F[Build source list] F --> G[MemTable.asRawSource\nre-encode keys in best index order] F --> H[Catalog: get files\nfor selected sort order] H --> I{Per-file pruning\nmin/max + bloom filter} I -- Outside range --> J[Skip file] I -- May contain --> K[Load file via\nTieredCache] K --> L{L1 Caffeine\nheap cache} L -- Hit --> M[Return bytes] L -- Miss --> N{L2 disk\nLRU cache} N -- Hit --> M N -- Miss --> O[L3 S3 fetch] O --> M M --> P[ParquetQuadSource\nstream rows + row group filtering] G --> Q[MergeIterator\nK-way merge, dedup, tombstone suppression] P --> Q Q --> R[QuadToStatementIteration\nID → Value resolution] R --> S[Statement stream]Insert + Commit Path
flowchart TD A[approve / approveAll\nsubj, pred, obj, ctx] --> B[Store values → IDs\nvia ValueStore] B --> C[MemTable.put\ns, p, o, c, flag] C --> D{MemTable size\n≥ flush threshold?} D -- No --> E[Return — buffered in memory] D -- Yes --> F[Trigger flush] F --> G[Freeze MemTable\nswap in fresh one] G --> H[Collect all quads\ncompute min/max stats] H --> I[Parallel: for each sort order\nSPOC, OPSC, CSPO] I --> J[Sort entries by index] J --> K[ParquetFileBuilder.build\nZSTD + dictionary encoding] J --> K2[Build bloom filter\nfor leading component] K --> L[objectStore.put → S3] K --> M[cache.writeThrough\nL1 + L2 populated] L --> N[Catalog.addFile\nwith stats + bloom filter] N --> O[Persist catalog first\nthen ValueStore + NamespaceStore] O --> P[Catalog.save\natomic version bump] P --> Q{Compaction\ntriggers?} Q -- L0 count ≥ 8 --> R[Background: L0 → L1 merge] Q -- L1 count ≥ 4 --> S[Background: L1 → L2 merge] Q -- No --> T[Done] R --> S S --> T subgraph Transaction Commit U[SailSink.flush] --> F endS3 Layout
Key Files
S3SailStore.javastorage/ParquetFileBuilder.javastorage/ParquetQuadSource.javaRawEntrySourceover Parquet files with row group filteringstorage/MergeIterator.javastorage/Catalog.javastorage/Compactor.javastorage/BloomFilter.javastorage/QuadIndex.javastorage/QuadEntry.javastorage/FileSystemObjectStore.javastorage/SimpleCodecFactory.javacache/TieredCache.javaconfig/S3StoreConfig.javaS3EvaluationStatistics.javaThree storage modes: S3 (bucket configured), local filesystem (
dataDirconfigured), or in-memory (neither configured).Test plan
S3PersistenceTest— write/flush/restart, multiple flushes, delete/restart, context graphs, sort orders, namespace persistenceCatalogTest,MemTableReorderTest,ParquetRoundTripTest,QuadIndexSelectionTest,MergeIteratorTest— unit testsS3PersistenceMinioIT— integration test against real MinIO via Testcontainers🤖 Generated with Claude Code
Future Work