feat(spark-4): add Apache Spark 4.x connector + batch-ingestion plugin (JDK 21+)#18261
feat(spark-4): add Apache Spark 4.x connector + batch-ingestion plugin (JDK 21+)#18261xiangfu0 wants to merge 36 commits intoapache:masterfrom
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #18261 +/- ##
============================================
+ Coverage 63.38% 63.41% +0.03%
+ Complexity 1668 1644 -24
============================================
Files 3252 3253 +1
Lines 198661 198764 +103
Branches 30770 30791 +21
============================================
+ Hits 125925 126051 +126
+ Misses 62666 62642 -24
- Partials 10070 10071 +1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
xiangfu0
left a comment
There was a problem hiding this comment.
Found a few high-signal correctness issues; see inline comments.
| case _: StringStartsWith => true | ||
| case _: StringEndsWith => true | ||
| case _: StringContains => true | ||
| case _: Not => true |
There was a problem hiding this comment.
Marking compound filters as supported without recursively checking their children can silently drop predicates. For example, an Or/And/Not containing an unsupported child is removed from Spark residual filtering, then compileFilter returns None, so Pinot can return rows that do not satisfy the original Spark filter. Recursively accept compounds only when every child compiles; otherwise return the original filter as residual.
| case GreaterThanOrEqual(attr, value) => s"${escapeAttr(attr)} >= ${compileValue(value)}" | ||
| case IsNull(attr) => s"${escapeAttr(attr)} IS NULL" | ||
| case IsNotNull(attr) => s"${escapeAttr(attr)} IS NOT NULL" | ||
| case StringStartsWith(attr, value) => s"${escapeAttr(attr)} LIKE '$value%'" |
There was a problem hiding this comment.
The LIKE pushdowns interpolate the raw Spark value while equality predicates escape string literals. Values containing quotes, percent, or underscore change the Pinot SQL predicate semantics, which can produce silent wrong results. Escape SQL literals and LIKE wildcards for StringStartsWith/StringEndsWith/StringContains before pushing these filters.
|
|
||
| override def build(): Write = { | ||
| // TODO: utilize predicates | ||
| new PinotWrite(logicalWriteInfo) |
There was a problem hiding this comment.
This advertises SupportsOverwriteV2 but drops the overwrite predicates and builds an unconditional writer. Spark overwrite-by-filter callers will believe Pinot replaced the matching data, while this path only writes new segment files, which can leave old matching segments in place and produce duplicate or stale query results. Either implement predicate-aware replacement semantics or reject overwrite predicates explicitly.
cc8bd0a to
5021c6b
Compare
220e677 to
955dc8e
Compare
Add a new top-level `pinot-spark-4/` umbrella module containing: - `pinot-batch-ingestion-spark-4`: Spark 4.x batch ingestion runners (Java), ported from `pinot-batch-ingestion-spark-3` with the package rename `spark3 → spark4` and the Spark 4 JVM flags required on JDK 17+. - `pinot-spark-4-connector`: Spark 4.x DataSourceV2 read/write connector (Scala 2.13), ported from `pinot-spark-3-connector`. Migrates `PinotWriteBuilder` from the Spark 3 `SupportsOverwrite`/`Filter` API to the Spark 4 `SupportsOverwriteV2`/`Predicate` API; all other source files are verbatim ports with the package rename `v3 → v4`. The umbrella module is activated only when the active JDK is 21+ via a root-pom profile with `<jdk>[21,)</jdk>` activation. Spark 3 modules are untouched and continue to build on JDK 11/17/21. Coverage: - 3 unit tests for the batch-ingestion runners (port of spark-3 tests) - 25 scalatest cases for the connector (port of spark-3 + a new `PinotDataSourceRegistrationTest` that verifies the DataSourceRegister SPI wiring; the same test is added to the Spark 3 connector for parity) - A `SparkSegmentMetadataPushIntegrationTest4` end-to-end integration test in `pinot-integration-tests/src/test/java-spark4/`, wired up under a `pinot-spark-4-integration-tests` profile that activates on JDK 21 and overrides `spark-core`/`spark-sql`/`spark-launcher` to Spark 4.x at test scope (provided-scope in the plugin pom does not transit to test scope, and the root pom pins those coordinates to Spark 3 by default). Packaging: - Root pom gains `spark4.version=4.0.0` and dependencyManagement entries for the two new jars. - `pinot-distribution/pinot-assembly.xml` gains a `<fileSet>` entry that conditionally includes the Spark 4 shaded jar when its target directory exists (present on JDK 21 builds, silently absent on JDK 11 builds), so a single descriptor handles both paths without forked variants. - `pinot-spark-4` is added to the `dependency-verifier` `skipModules` list, matching the pattern used for the other plugin trees. Build & test verification on JDK 21: - `./mvnw -pl pinot-spark-4/pinot-batch-ingestion-spark-4,pinot-spark-4/pinot-spark-4-connector -am test` — all tests pass - `./mvnw -pl pinot-integration-tests test-compile` — the JDK-21 profile picks up the new Spark 4 integration test and compiles cleanly - `./mvnw -pl ... spotless:apply checkstyle:check license:check` — clean README files at `pinot-spark-4/pinot-batch-ingestion-spark-4/README.md` and `pinot-spark-4/pinot-spark-4-connector/README.md` document prerequisites, build commands, data-source options, quickstart snippets, and the Spark 3 vs Spark 4 compatibility matrix. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Relocate the two Spark 4 modules from the top-level `pinot-spark-4/` umbrella into the same directories as their Spark 3 counterparts: - `pinot-spark-4/pinot-spark-4-connector/` → `pinot-connectors/pinot-spark-4-connector/` - `pinot-spark-4/pinot-batch-ingestion-spark-4/` → `pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-4/` This matches the layout of `pinot-spark-3-connector` / `pinot-batch-ingestion-spark-3` and lets the Spark 4 modules inherit common dependencies (pinot-core as provided, testng as test) through the normal `pinot-plugins` / `pinot-batch-ingestion` parent chain, which the umbrella pom had to declare by hand. JDK 21 gating moves accordingly: - `pinot-connectors/pom.xml` and `pinot-plugins/pinot-batch-ingestion/pom.xml` each gain a profile with `<jdk>[21,)</jdk>` activation that registers the corresponding Spark 4 module. On JDK 11/17 the modules are absent from the reactor entirely; on JDK 21 they build alongside Spark 3. - The root-pom `pinot-spark-4` profile is removed — no longer needed, since gating is now handled by each parent. - `dependency-verifier` `skipModules` drops the `pinot-spark-4` entry; `pinot-connectors` and `pinot-plugins` already cover the new locations. - Each Spark 4 module pom carries `<jdk.version>21</jdk.version>` and a maven-enforcer `requireJavaVersion [21,)` + `requireProperty scala.compat.version=2.13` safety net, matching what the old umbrella pom provided when the module was invoked directly. - `pinot-distribution/pinot-assembly.xml` fileSet directory updated to the new jar location. - READMEs updated to point to the new locations and parent poms. All unit + scalatest cases still pass on JDK 21 (3 junit + 25 scalatest). The JDK-21 integration-tests profile (and its `SparkSegmentMetadataPushIntegrationTest4`) is unchanged and continues to pull in `pinot-batch-ingestion-spark-4` from its new location via dependencyManagement. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… 21 CI The Spark 3 `SparkSegmentMetadataPushIntegrationTest` was failing in the "Pinot Integration Test Set 2 (temurin-21)" CI job with Servlet class org.glassfish.jersey.servlet.ServletContainer is not a jakarta.servlet.Servlet because the `pinot-spark-4-integration-tests` profile this PR added to `pinot-integration-tests/pom.xml` poisoned the whole module's test classpath on JDK 21: it forced `spark-core`/`spark-sql`/`spark-launcher` to Spark 4.0.0 via `<dependencyManagement>` and pulled in `jakarta.servlet-api:5.0.0`. With those on the classpath, Spark 3's Jersey 2 / javax-servlet stack could no longer register `ServletContainer` at driver startup. Two tests with conflicting Spark runtime needs cannot share one Maven module's test classpath. Rather than drop more exotic Maven plumbing (per-test classpaths, separate failsafe executions, etc.) into this PR, remove the Spark 4 integration test for now and leave spark-4 coverage to the existing in-process unit tests: - 3 @test methods in `SparkSegmentGenerationJobRunnerTest` that exercise the full Spark 4 driver path with a real `SparkContext` - 25 scalatest cases in `pinot-spark-4-connector` that exercise the DataSourceV2 read/write surface An end-to-end cluster integration test for Spark 4 can land as a follow-up PR once we pick a home that isolates its classpath -- most likely a new `pinot-spark-4-integration-tests` module, or failsafe configured with a separate classpath realm. Changes: - Delete `pinot-integration-tests/src/test/java-spark4/` - Revert the `pinot-spark-4-integration-tests` profile block in `pinot-integration-tests/pom.xml` Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two small fixes from the latest code review: - pinot-integration-tests/pom.xml: revert the one-line whitespace hunk added during the earlier profile edit. File now matches master again. - pinot-spark-4-connector: replace `scala.collection.JavaConverters._` (deprecated in Scala 2.13) with `scala.jdk.CollectionConverters._` in DataExtractor.scala and PinotWriteTest.scala. Matches the already-correct import in PinotDataSourceRegistrationTest.scala, and is safe because this module is Scala-2.13-only (enforced by the maven-enforcer rule on scala.compat.version). No behavioral change; unit + scalatest suites continue to pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Per the latest code review, flag the intentional gap (the Spark 4 counterpart to the Spark 3 `SparkSegmentMetadataPushIntegrationTest` is deferred to a follow-up because Spark 4's Jetty 12 / Jakarta Servlet 5 runtime cannot share `pinot-integration-tests`' classpath with the Spark 3 / Jersey 2 stack) in the module README so it doesn't get forgotten. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add a new tutorial at pinot-connectors/pinot-spark-4-connector/documentation/end-to-end-docker-tutorial.md that walks through the full Spark 4 ↔ Pinot flow validated in this PR: Docker-based Pinot quickstart → build the shaded connector jar → custom Spark 4 + JDK 21 image → pyspark read of the sample baseballStats table → create a target schema/table → pyspark write to segment tars at a savePath → push tars to the controller → verify roundtrip. Includes a read/write options reference and a "Known gotchas" section capturing every issue I hit during validation (class-file-65 vs JDK 17, commons-lang3 collision, container-internal server address for gRPC, the DataFrameWriter.save() path requirement, the two-step write model, and the cosmetic jline HOME warning). Also: - Link the tutorial from the Quick Start section of the connector README so new users see it first. - Link from the pinot-batch-ingestion-spark-4 README, since the push runners in that module are the production-grade alternative to the curl step in the tutorial. Every command in the tutorial was run verbatim against apachepinot/pinot:latest and apache/spark:4.0.0 (+JDK 21) before this commit. No code changes; docs only. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three correctness fixes flagged by review of
pinot-connectors/pinot-spark-{3,4}-connector:
1. FilterPushDown soundness for compound filters. `acceptFilters` used
to return true for any `Or`, `And`, or `Not` regardless of its
children. When a child was a type the connector does not recognize,
Spark dropped the compound from its residual filter list but
`compileFilter` returned None, so Pinot received no filter at all
and returned unfiltered rows. `isFilterSupported` is now recursive
and accepts compounds only when every child is itself supported.
2. LIKE pushdown escaping for StringStartsWith / StringEndsWith /
StringContains. The raw Spark value was interpolated into the Pinot
`LIKE '...'` literal without escaping. Values containing `'`, `%`,
`_`, or `\` changed the predicate semantics (and could break the
SQL entirely). `escapeLikeLiteral` now doubles backslashes, escapes
`%` and `_` with a backslash, and doubles single-quotes; each LIKE
pushdown gets an explicit `ESCAPE '\'` clause.
3. Spark 4 `SupportsOverwriteV2.overwrite(...)` silently dropped its
predicates and returned an append-only writer. Spark callers using
`df.writeTo(...).overwrite(...)` or `mode("overwrite")` would
believe Pinot replaced matching rows, but only new segments would be
written, leaving stale data queryable. `overwrite` now throws
`UnsupportedOperationException` with a message pointing at the
supported alternatives (drop-and-replace via controller REST, or the
SparkSegment*PushJobRunner with REFRESH / consistent-push enabled).
The `predicates` constructor parameter on `PinotWriteBuilder` is
gone since it was never consulted.
Fixes 1 and 2 are applied to both spark-3 and spark-4 connectors per
the consistency guideline (the code is a verbatim port). Fix 3 only
applies to spark-4 (SupportsOverwriteV2 is a Spark 4 API).
Also fix the CI RAT failure on Spark 4 Connector: add the Apache 2.0
license header to the newly added
pinot-connectors/pinot-spark-4-connector/documentation/end-to-end-docker-tutorial.md.
Test coverage:
- new scalatest cases on both connectors for (a) compound filter with
unsupported child being rejected, (b) compound filter with supported
children still being accepted, (c) LIKE value escaping; all using
`AlwaysTrue` as the unsupported leaf since `Filter` is sealed.
- new scalatest case on the spark-4 connector asserting
`PinotWriteBuilder.overwrite(...)` throws
UnsupportedOperationException with a helpful message.
Test counts now 29 / 29 on both connector modules (was 25 / 25).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Code review flagged that the spark-4 overwrite-predicate fix was not
applied symmetrically to spark-3, even though spark-3 has the exact same
silent-drop bug via the V1 SupportsOverwrite (Filter-based) API. A user
calling `df.write.mode("overwrite").format("pinot").save(...)` on
spark-3 would receive appended segments rather than overwritten data —
the same correctness regression the spark-4 fix prevents.
Mirror the fix:
- PinotWriteBuilder.overwrite(filters) now throws
UnsupportedOperationException with a message pointing at the supported
alternatives (drop table via controller REST, or pinot-batch-ingestion-
spark-3's SparkSegment*PushJobRunner with REFRESH / consistent-push).
- Drop the unused `filters` constructor parameter that was only there to
let `overwrite(...)` re-construct the builder. PinotWriteBuilder is
now constructed with just LogicalWriteInfo, matching spark-4.
- Update PinotTable.newWriteBuilder accordingly.
- Add a regression test mirroring spark-4's: assert overwrite throws
with a helpful message including the filter count.
Spark 3 connector test count is now 30 (was 29).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two MINOR items from the latest review:
1. PinotDataSourceRegistrationTest in pinot-spark-3-connector imported
`scala.jdk.CollectionConverters._`, which only exists in the Scala
2.13 stdlib. The rest of the spark-3 module uses
`scala.collection.JavaConverters._` for cross-2.12/2.13 compat (the
root pom still ships a `-Pscala-2.12` profile). Switch the import
so a `-Pscala-2.12` build of just the spark-3 connector continues
to compile. The spark-4 module is Scala-2.13-only and continues to
use the modern `scala.jdk.CollectionConverters` import.
2. Document the pre-existing backslash round-trip caveat in the
`escapeLikeLiteral` helper of both connectors. Pinot's
`RegexpPatternConverterUtils#likeToRegexpLike` does not fully
round-trip `\\` (translates to a regex matching two backslashes
rather than one), so Spark predicates like
`col.contains("a\\b")` still won't match rows containing literal
`a\b` until that conversion is fixed in pinot-common. The %, _, and
`'` cases this helper does handle are unaffected. Tracking this in
the docstring rather than fixing the runtime conversion in this PR
per scope discipline.
No behavioral change. Tests still 30 / 29 in spark-3 / spark-4.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…in BatchWrite Two MAJOR findings from the latest review: 1. Backslash in LIKE pushdown silently produced wrong results. Pinot's RegexpPatternConverterUtils#likeToRegexpLike does not round-trip `\\` correctly (it emits a regex matching two backslashes rather than one). The previous fix escaped backslashes at the SQL layer and added a docstring caveat, but the predicate was still pushed down and the user got silently wrong results. Fix: reject StringStartsWith / StringEndsWith / StringContains filters whose value contains a literal backslash from isFilterSupported, so Spark applies them post-scan instead. The `escapeLikeLiteral` helper keeps its backslash-escape branch as a defensive fallback in case it is reached via a future code path that does not gate on isFilterSupported. Symmetric fix in both Spark 3 and Spark 4 connectors. Regression test added on both sides asserting these filters land in the post-scan bucket. The previous "should escape" test no longer includes backslash inputs since they are now rejected upstream. 2. PinotWrite.commit/abort wrote commit messages to stdout via `messages.foreach(println)` (pre-existing, copied verbatim from Spark 3 into Spark 4). Reviewer separately flagged that abort() does no cleanup of leftover segment tars from already-succeeded tasks, which on retry produces duplicate segments after push. Fix: route both commit() and abort() through SLF4J at info / warn levels so messages reach the standard log collection. Add an explicit TODO documenting the abort-cleanup gap and pointing at PinotWriteBuilder.overwrite(...) as the contract-level guard, tracking the runtime-level guard as a follow-up. Symmetric in both Spark 3 and Spark 4. Both connectors now have 30 / 30 tests passing. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… dirs
Three findings from the latest code review:
1. [CRITICAL] PinotWriteBuilder.overwrite() rejection was bypassed by
SupportsTruncate.truncate(). In both Spark 3 and Spark 4,
SupportsOverwrite[V2] extends SupportsTruncate, and the V2Writes
analyzer dispatches df.write.mode("overwrite") → truncate() rather
than overwrite([AlwaysTrue]). The default truncate() returns `this`,
so build() then silently appended — exactly the silent-overwrite-
then-append bug the overwrite() rejection was meant to prevent.
Fix: override truncate() in both connectors to throw
UnsupportedOperationException with a message explaining that
df.write.mode("overwrite") / INSERT OVERWRITE is not supported and
pointing at df.write.mode("append") / drop-and-re-create /
REFRESH-push as the alternatives. Add a regression test on both
sides calling builder.truncate() directly.
2. [MAJOR] PinotDataWriter leaked the per-partition temp segment
build directory on every successful task. Files.createTempDirectory
in generateSegment() was never deleted by commit()/abort()/close().
On a long-running executor producing thousands of segments this
accumulates uncompressed segment trees under the JVM tmpdir.
Fix: track the temp dir on a module-private field, wrap commit()
in a try/finally that deletes it, and delete it from abort() and
close() as well (idempotent via null-out). Applied symmetrically
to Spark 3 and Spark 4.
3. [MAJOR] Document pinot-fastdev activation behavior. The new
Spark 4 JDK-21 profile in pinot-connectors/pom.xml and
pinot-plugins/pinot-batch-ingestion/pom.xml is activated by JDK
alone, so -Ppinot-fastdev on JDK 21 also builds the Spark 4
modules. This is intentional (both modules are small and their
shade is disabled under -Ppinot-fastdev via their own profile)
but wasn't documented. Add clarifying comments to both parent
pom activation sites, pointing at `-P!pinot-spark-4-connector`
and `-P!pinot-batch-ingestion-spark-4` as the opt-out toggles.
Minor polish also rolled into this commit:
- Spark 4 connector + batch pom comments said "JDK 17/11" but the
enforcer rejects [21,), so update to "any JDK below 21".
Test counts are now 31 / 31 in both spark-3 and spark-4 connectors
(was 30 / 30). The new truncate regression tests pin the fix across
both Spark 3.x SupportsOverwrite and Spark 4 SupportsOverwriteV2.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two MAJOR review findings addressed:
1. PinotWrite.abort() implements best-effort leftover-tar cleanup.
When one task fails the job aborts but the connector previously
only logged the leftover tars at savePath; on retry users would
get duplicate segments after the controller push step. Now abort()
iterates the SuccessWriterCommitMessage entries, derives each
segment tar path under savePath, and deletes via the same Hadoop
FileSystem machinery PinotDataWriter uses for upload. Errors are
swallowed and logged at WARN — the driver already has a failure
to surface, and the user can recover by hand if cleanup misses.
Applied symmetrically in spark-3 and spark-4. The
SuccessWriterCommitMessage class gains a public `val segmentName`
accessor (was a private constructor parameter) so abort() can
reach the segment name without parsing toString.
2. PinotDataWriterTest gains three temp-dir lifecycle tests in both
connectors:
- commit() success path: snapshot diff of tmpdir entries shows no
new pinot-spark-writer leftovers after commit.
- abort() path: a generated segment dir captured before abort()
is gone after.
- close()-after-abort idempotency: explicit `noException should
be thrownBy writer.close()` after a prior abort.
The new tests use the package-private generateSegment() to capture
the temp dir directly, so they pin both the cleanup and the
lifecycle field reset behavior. A future refactor that drops the
finally{} block or forgets to null out segmentOutputDir will fail
these tests.
Test counts now 34 / 34 in both connectors (was 31 / 31).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Spark's `BatchWrite.abort(WriterCommitMessage[])` contract allows null
entries — Spark inserts a null for any task that failed before
producing a commit message. The current pattern match in
PinotWrite.abort fell through `case other => logger.warn("unknown
commit message type: {}", other)`, producing one misleading WARN log
line per failed task during a partial-failure abort.
Add an explicit `case null =>` branch (no-op, since there's nothing to
clean up for a writer that never committed). Symmetric in spark-3 and
spark-4. No behavioral change beyond log-noise reduction.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Reviewer flagged that the abort() runtime cleanup logic added in the previous round had no unit tests despite C6.3 (bug fixes require regression tests). Add 4 cases on each connector covering the branches of the new abort() method: 1. empty savePath — abort short-circuits without any FS call 2. malformed scheme (`not-a-real-scheme://...`) — FileSystem.get throws and abort swallows + logs without rethrowing 3. happy path — a mix of SuccessWriterCommitMessage entries each correspond to an existing tar file under savePath, and abort deletes them all 4. null + unknown subclass — abort tolerates Spark's documented null entries (writers that failed before producing a message) and an anonymous WriterCommitMessage subclass without throwing, while still cleaning up the SuccessWriterCommitMessage entries that appear alongside them Test counts now 38 / 38 in both connectors (was 34 / 34). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two MINOR items from the latest code review:
1. The PinotWriteBuilder Scaladoc claimed "the default truncate()
returns this, so without an explicit override the silent-append
bug is reachable via mode("overwrite") alone." Inspecting the
pinned Spark versions (3.5.8 SupportsOverwrite and 4.0.0
SupportsOverwriteV2) shows the default truncate() actually
delegates to overwrite([AlwaysTrue]) — so the override above
would already throw transitively, even without our explicit
truncate() override. The explicit override is still good
defense (a) for a tailored error message on the
df.write.mode("overwrite") path and (b) against future Spark
default-implementation changes. Update the comment to reflect
this on both spark-3 and spark-4 builders.
2. The new abort() test files declared their imports inside the
class body under a section comment. Move them to the top of
the file alongside the existing imports, matching Pinot Scala
convention (CLAUDE.md "Prefer imports over fully qualified
class names").
No behavior change. Tests still 38 / 38 in both connectors.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…mantics Three MAJORs from the latest review (the CRITICAL one — stale rebase — was already addressed by this commit's parent rebase onto current upstream/master). 1. escapeLikeLiteral(null) used to return null, which Scala's s"" interpolation then renders as the four-char string "null". The defensive comment said this was "kept defensively so this helper produces well-formed SQL even if called via a future code path that does not gate on isFilterSupported" — but the actual emitted SQL would be `LIKE 'null%' ESCAPE '\'`, silently matching the literal string "null". Replace the silent-degradation branch with `require(value != null, ...)` so a future caller that bypasses isFilterSupported fails loudly instead. 2. PinotDataWriter had no class-level Javadoc note about thread safety, despite adding a new mutable `segmentOutputDir` field. Per CLAUDE.md the new module's classes should describe behavior and thread-safety. Add a paragraph explaining the Spark DataWriter single-task ownership invariant and the commit/abort/close ordering contract. 3. close()-after-abort idempotence relied on PinotBufferedRecordReader.close() being idempotent, which it currently is but the contract is not declared. Add a `closed` flag and consolidate abort()/close() into a private `closeOnce()` so future changes to the buffered reader don't silently break the writer's lifecycle. Existing regression test "close() after abort() is idempotent and does not throw" still passes. Symmetric in both Spark 3 and Spark 4 connectors. Tests still pass at 38 / 38 in both modules. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Reviewer noted the LIKE pushdowns emit `ESCAPE '\'` which Pinot's RequestContextUtils.toFilterContext currently ignores, leaving RegexpPatternConverterUtils.likeToRegexpLike's hardcoded `\` as the de-facto escape character. The pushdown is correct *only because* the connector chose `\` to match Pinot's hardcoded value. Add an inline comment near the LIKE clause documenting this implicit contract, so a future refactor that switches the escape character on either side can't silently produce wrong-row matches. Symmetric in spark-3 and spark-4. No behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…spath
Reviewer flagged a MAJOR collision risk: pinot-spark-3-connector and
pinot-spark-4-connector both register the same `"pinot"` data-source
short name via META-INF/services. If both jars end up on the same
Spark application classpath (fat-jar bundle, --packages misuse, etc.),
Spark's DataSource.lookupDataSource resolves the format
non-deterministically — a Spark 4 user could silently get a writer
implementing the V1 SupportsOverwrite contract (or vice versa), with
diverging overwrite/truncate semantics that the symmetric overwrite/
truncate rejection in this PR cannot itself protect against.
Fix in pinot-spark-4-connector:
1. PinotDataSource (Spark 4) probes for the v3 class by name via
Class.forName(...) — no compile-time dep is added between the
modules. The probe runs once per JVM, cached in the companion
object. If the v3 class is found, the v4 connector throws
IllegalStateException with a tailored message instructing the
user to remove one of the two jars.
2. README documents the mutual exclusion explicitly under a new
"Mutual exclusion with pinot-spark-3-connector" section, including
the guidance for fat-jar packagers.
3. Three new regression tests:
- isSpark3ConnectorOnClasspath returns false in the default v4-only
test classpath (defends against an accidental v3 module
dependency creeping in)
- PinotDataSource() constructor does not throw under the v4-only
classpath
- The spark3ConflictMessage text contains the actionable strings
("pinot-spark-4-connector", "pinot-spark-3-connector", "Remove")
so a future docstring change can't quietly weaken the diagnostic.
The guard is intentionally one-sided (Spark 4 detects Spark 3, not the
other way around) because Spark 4 is the new arrival; existing Spark 3
users picking up the new v4 jar should hit the failure on their first
spark.read.format("pinot") call rather than after a successful write
that turned out to be wrong.
Test counts: 41 in spark-4 connector (was 38), 38 in spark-3 connector
(unchanged; the guard is v4-only).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Reviewer flagged a MAJOR race in the new classpath-collision guard: the previous two-flag @volatile pattern flipped `spark3Probed = true` *before* computing `spark3Conflict`, leaving a window where a second thread could observe `spark3Probed=true` and `spark3Conflict=false` (default) and skip the throw — the exact failure mode the guard was meant to prevent. Replace with a Scala `lazy val`. The compiler emits a synchronized initialization barrier so every caller observes the fully-computed result; there is no half-initialized window. This also simplifies the guard to a single read. Tests still pass: spark-4 41 / 41, spark-3 38 / 38. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Reviewer flagged that Class.forName can throw NoClassDefFoundError / LinkageError (in addition to ClassNotFoundException) when the v3 jar is on the classpath but a transitive dependency is missing. The probe previously only caught ClassNotFoundException, so such an error would escape the PinotDataSource constructor with a message that doesn't explain the conflict scenario. Conservatively treat LinkageError as "v3 not present" — the worst case is the guard is bypassed and the user falls back to Spark's own multi-source error, which is still preferable to leaking a bytecode-resolution failure with no context. No behavior change in the dominant case (v3 absent or v3 fully loadable). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…own + abort
Five MAJORs from this round, applied symmetrically to spark-3 and spark-4 unless noted.
1. EqualNullSafe(_, null) and IN(_, array-with-null) silently leaked the literal Java
string "null" into the rendered SQL via compileValue's fallback branch. Pinot would
then parse `attr != null` syntactically rather than as a NULL test. Reject both at
isFilterSupported so Spark evaluates them post-scan with proper three-valued logic.
Compound gating (already in place) propagates the rejection through enclosing
And/Or/Not.
2. escapeAttr previously passed through any column name containing a `"` unmodified.
A name like `weird"col` then flowed into the SQL as raw, broken SQL — and a
sufficiently adversarial catalog could exploit this as a SQL injection vector.
Replace the brittle `contains("\\"")` heuristic with a regex that recognizes the
already-escaped dotted-quoted form (`"col"."col"`) and otherwise wraps the name in
`"..."` while doubling embedded quotes. Existing dotted-quoted callers (`"some"."nested"."column"`)
still pass through unchanged; the new test pins the broken-input case.
3. PinotDataSource (spark-4) classpath-conflict probe used only `getClass.getClassLoader`,
which is the v4 jar's own loader — in `--packages`/plugin-classloader Spark deployments
the v3 jar can be visible to Spark's `DataSource.lookupDataSource` (which uses
Thread.currentThread.getContextClassLoader) but invisible to v4's loader. Probe both
loaders so the conflict guard cannot be silently bypassed.
4. PinotWrite.abort log fidelity. fs.delete returns false when the path doesn't exist,
but the previous code logged "cleaned up leftover segment tar at {} (deleted=false)"
at WARN — making "no leftover" indistinguishable from "deleted a real one" in
production logs. Branch on the boolean: WARN only on actual deletions; INFO when no
leftover existed; WARN with the exception when delete throws.
5. spark-3 PinotWriteBuilder previously silently appended on
df.write.mode("overwrite"); the symmetric fix in this PR now throws
UnsupportedOperationException. That is a backward-incompatible behavior change for
any existing spark-3 user that depended on the (incorrect) silent-append. Add an
explicit "Behavior changes since the previous release" callout at the top of
pinot-spark-3-connector/README.md so the upgrade path is documented.
Six new regression tests across both connectors:
- EqualNullSafe(_, null) → post-scan
- IN(_, array-with-null) → post-scan; all-non-null IN still pushed down
- escapeAttr quotes a name containing a stray `"` correctly
Test counts: spark-4 now 44 (was 41), spark-3 now 41 (was 38).
Branch was also rebased onto current upstream/master (was 2 commits behind);
no conflicts.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…bort + classpath guard Six MAJORs from the latest review, applied symmetrically v3+v4 unless noted. 1. EqualNullSafe / IN null-leaf rejection lacked compound-context test coverage. Added tests pinning that the rejection propagates through enclosing And/Or/Not (including nested), so a future change to the gate or the compound recursion can't silently re-introduce the null-leak through compound predicates. 2. compileFilter EqualNullSafe branch evaluated `compileValue(value)` three times and `escapeAttr(attr)` four times. Bind once; same emitted SQL but no divergence risk if either helper ever becomes effectful (per C5.10). 3. In(_, value) gate accepted patterns where `value` itself was null because the conditional `value != null && value.contains(null) => false` doesn't fire on null `value`, then `_: In => true` accepted, then compileFilter's `value.isEmpty` would NPE. Tighten to `value == null || value.contains(null) => false`. Test added pinning the null-array rejection. 4. Classpath-collision guard had no escape hatch. Add system property `pinot.spark.connector.skip-conflict-guard=true` that downgrades the throw to a one-time WARN. Default behavior unchanged. Documented in spark-4 README under the existing Mutual-exclusion section. 5. PinotWrite.abort() catches `case t: Throwable` swallowed Fatal errors (OOM, StackOverflow). Switch to `case NonFatal(t)` per Scala best practice. Symmetric in both connectors. 6. Spark 4 README lacked the "Write semantics / overwrite-rejection" callout that the spark-3 README has. Added under a "Write semantics" section before "Features" so first-time spark-4 users see the contract before reading the Quick Start. Test counts: spark-4 now 46 (was 44), spark-3 now 43 (was 41). +2 regression tests each side (compound-null-leaf propagation + In(_, null array) rejection). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ment Reviewer noted (and verified against spark-catalyst_2.13-3.5.8.jar) that the comment "SupportsOverwrite extends SupportsTruncate" is incorrect for Spark 3.5.x — the actual chain is SupportsOverwrite extends SupportsOverwriteV2 extends SupportsTruncate. The transitive relationship is the same so the override rationale stands; this is a docstring accuracy fix only. No behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
955dc8e to
ad9759c
Compare
…close() throws Reviewer noted that closeOnce() set the closed flag and then called bufferedRecordReader.close() before the temp-dir delete. If the reader's close() ever throws (today PinotBufferedRecordReader is guaranteed not to, but the contract is not declared — exactly the rationale the existing comment cites for the closed flag), the closed flag is already set and subsequent close()/abort() calls become no-ops, leaving segmentOutputDir on disk. Wrap the reader close in try/finally so the temp-dir cleanup always runs. Symmetric in spark-3 and spark-4. No behavior change in the current path; this is defensive hardening that matches the same intent the closed flag itself encodes. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…guard Reviewer pointed out that `lazy val spark3Conflict` is computed exactly once per JVM at first construction, so a v3 jar added to the classpath later in the same session (spark-shell `:require`, custom plugin loader, post-startup mutation) won't be detected. Document the limitation in the companion-object Javadoc and point users at the existing escape hatch / JVM-restart workaround. No behavior change; pure documentation. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ark-3 Pinot's master branch raised the Java baseline to JDK 21 (PR apache#18046), which means the spark-3 jars now compile to class file 65 and pull in JDK-21-only pinot-core / pinot-common transitively. Spark 3.5.x users on JDK 17 (the previously-common Spark 3.5 deployment) cannot load the new jars at all — UnsupportedClassVersionError at first class resolution. Rather than reverting the JDK 21 baseline for these modules (which would also require pinning pinot-core / pinot-common back to JDK 17, undoing PR apache#18046's intent), declare pinot-spark-3-connector and pinot-batch-ingestion-spark-3 deprecated for one release cycle: - New users should adopt pinot-spark-4-connector / pinot-batch-ingestion-spark-4 (Spark 4.0.x + JDK 21). - Existing Spark 3.5.x users on JDK 21 + Spark 3.5.5+ (the first Spark 3.5 patch with official JDK 21 support) can keep using the spark-3 jars during the deprecation window. - Existing Spark 3.5.x users on JDK 17 stay on Pinot 1.5.x for the connector jar until they upgrade. The next minor release will remove both spark-3 modules. Changes: - pinot-connectors/pinot-spark-3-connector/README.md — top-of-file deprecation banner with concrete runtime requirements + migration path to pinot-spark-4-connector. - pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/ README.md — new file (the module previously had none) with the same deprecation banner and migration path for the batch ingestion runners. - pom.xml — added a comment on the spark3.version property block recording the deprecation status so a future maintainer reading the pom sees it before bumping the version. No code changes; pure documentation. Tests still pass at 46 / 43 in spark-4 / spark-3 connectors respectively. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…R imports Two MINOR items from the latest review. 1. SupportsOverwrite[V2]#canOverwrite defaults to true, which advertises overwrite support that the connector then immediately rejects in overwrite()/truncate(). Override to false so Spark's V2Writes analyzer surfaces its own rejection earlier — and defends against a future analyzer change that might trust canOverwrite to gate dispatch to overwrite()/truncate(). Belt-and-braces; the existing throw paths continue to fire if Spark calls them anyway. Symmetric in spark-3 (SupportsOverwrite, Filter[]) and spark-4 (SupportsOverwriteV2, Predicate[]). Regression tests added on both sides asserting both empty-array and non-empty-array calls return false. 2. PinotDataSource.scala (spark-4) declared LOGGER with the fully-qualified `org.slf4j.Logger` and `org.slf4j.LoggerFactory` types, inconsistent with the project's import-then-bare-name convention. Hoist to a top-of-file import. Test counts: spark-4 47 (was 46), spark-3 44 (was 43). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…l literal EqualTo / LessThan(OrEqual) / GreaterThan(OrEqual) with a `null` literal value were not gated by `isFilterSupported` and would render as `attr <op> null` via `compileValue`'s fallback branch — Pinot would parse the literal token `null` syntactically rather than as a Spark NULL. Catalyst usually constant-folds these out upstream, but the connector now defensively rejects them so the symmetric three-valued-logic guarantee already provided for `EqualNullSafe(_, null)` and `IN(_, [..., null, ...])` holds for every comparison operator. Compound gating ensures the rejection propagates to enclosing And/Or/Not. Also tighten the `EqualNullSafe` null-check to use `if v == null` rather than the `case _(_, null)` extractor for robustness against typed-null wrappers, mirroring the new comparison-leaf rule. Apply the same fix symmetrically to spark-3 and spark-4 with regression tests in both `FilterPushDownTest.scala` files. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ll-leaf comment
- Spark 4 README: drop the extra `pinot-connectors/` path segment from the
links to `pinot-spark-3-connector` and `pinot-spark-common`. The README
lives under `pinot-connectors/` already, so `..` already resolves there.
- End-to-end Docker tutorial: replace the non-existent `{timestamp}`
placeholder with the actual `{startTime}` / `{endTime}` placeholders that
`PinotDataWriter.getSegmentName` resolves.
- FilterPushDown null-leaf comment (v3 + v4): drop the misleading "Pinot
parses literal `null` syntactically" framing — Pinot's Calcite-based
parser actually treats lowercase `null` as SQL NULL. Reframe the
rationale: `attr = NULL` etc. are SQL-equivalent to FALSE/UNKNOWN
regardless, so pushing them down is at best a no-op, and rejecting
defensively keeps the symmetric three-valued-logic fallback contract
with EqualNullSafe / IN intact.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…r throw
Five MAJOR correctness fixes flagged by review, applied symmetrically to
spark-3 and spark-4:
1. internalRowToGenericRow array branch: replace `record.getArray(idx).array`
with ArrayData's type-specific accessors (toIntArray, toLongArray, …).
`.array` only works on GenericArrayData (test scaffolding) and throws on
the UnsafeArrayData that Spark uses in real workloads. The previous
`.array.map(_.asInstanceOf[T])` would ClassCastException at runtime, with
a particularly bad failure for StringType (stored as UTF8String, not
String). String arrays now go through getUTF8String + toString.
2. getSegmentName: handle non-numeric variables under width spec. The
previous `value.asInstanceOf[Number]` cast crashed `{table:N}` with
`String cannot be cast to java.lang.Number`, failing the whole write
task at commit time. Numeric variables (`partitionId`, `startTime`,
`endTime`) keep `%Nd`; non-numeric ones (`table`) use `%Ns`. The Javadoc
advertises `{partitionId:05}` alongside `{table}`, so a user reasonably
might try `{table:NN}`.
3. Add scalar writer cases for TimestampType, DateType, DecimalType so the
schema translator (which already accepts them and maps to LONG / INT /
BIG_DECIMAL) and the writer agree. Previously the schema built but every
write task immediately failed with "Unsupported data type: TimestampType".
4. SparkToPinotTypeTranslator throws on unknown Spark types instead of
returning null. The previous null-return + caller-side rethrow split the
error site and risked silent propagation if a future caller forgot the
null-check. Direct throw makes the offending type appear in the stack
trace at the actual translation site.
5. Reject ArrayType(BinaryType) and ArrayType(DecimalType) at the translator
— Pinot has no multi-value BYTES or BIG_DECIMAL writer support, so
accepting these would have failed deep in the segment driver instead of
at translation time with a clear message.
Add regression tests:
- PinotDataWriterTest: `{table:20}_{partitionId:03}` width-spec format case
- SparkToPinotTypeTranslatorTest: drop ArrayType(BinaryType) /
ArrayType(DecimalType) from the accepted-mapping list, add explicit
rejection tests for those plus MapType / StructType / NullType.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…be; add LIKE contract test
Five MAJOR correctness fixes flagged by review, applied symmetrically to
spark-3 and spark-4:
1. getSegmentName: reject `{table:N}` (and any width spec on a non-numeric
variable) at job submission with an IllegalArgumentException. The
previous fix mapped non-numeric width-spec to `%Ns`, but %Ns produces
leading whitespace inside the segment name — path-hostile for Hadoop FS,
controller URLs, and listings. The Javadoc never advertised width on
`{table}`, so reject is safer than silently producing ugly filenames.
2. SparkToPinotTypeTranslator: reject TimestampType (microseconds-since-
epoch in Spark) and DateType (days-since-epoch) instead of mapping
straight to LONG/INT. Pinot's broker convention for time columns is
millis-since-epoch (TimestampUtils#toMillisSinceEpoch); naively
accepting these types would silently produce wrong-by-1000 timestamps.
The error message instructs the user to cast upstream:
`df.withColumn("ts", col("ts").cast("long") / 1000)`. Drop the writer
cases for these types since they are no longer reachable from the
translator.
3. PinotBufferedRecordReader.next(reuse): drop the per-record `.copy()` in
the segment-build hot path. SegmentIndexCreationDriverImpl reads each
record across two passes and does not mutate observed rows; the
defensive deep-clone allocated 2N transient GenericRow objects for an
N-record segment build (GC pressure on the executor) without protecting
anything observable.
4. PinotDataSource.spark3Conflict: switch from a one-shot lazy val to a
per-call def. Class.forName is JVM-cached after first lookup so the
per-call cost is negligible — and probing per call detects v3 jars
added to the classpath after the first PinotDataSource was constructed
(e.g. via spark-shell `:require` or a custom plugin loader), which the
lazy val explicitly couldn't.
5. FilterPushDownTest: add a regression test that takes the LIKE patterns
emitted by FilterPushDown.compileFilter and runs them through Pinot's
own RegexpPatternConverterUtils.likeToRegexpLike, then verifies the
resulting regex (evaluated via Matcher#find, mirroring Pinot's runtime
behavior in RegexpLikeConstFunctions) matches the original literal
value. This pins the cross-module `\` escape contract that was
previously documented only by comment, so a future change on either
side fails the build instead of producing silently wrong WHERE-clause
results in production.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…lter literals
Two MAJOR correctness fixes from review (both pre-existing, hardened now):
1. PinotDataWriter.internalRowToGenericRow: honor `record.isNullAt(idx)`
before calling typed accessors. Spark's primitive accessors silently
return zero for null cells (`getInt`→0, `getLong`→0L, `getBoolean`→
false), corrupting the segment with synthetic zeros. `getString` and
`getDecimal` would NPE. Mark the field null on GenericRow via
addNullValueField so the segment driver applies the column's
defaultNullValue per Pinot's null-handling contract.
2. FilterPushDown: tighten leaf filters to reject literal values that
are not in {String, Number, Boolean, Timestamp, Date, byte[]}. Previous
compileValue catchall rendered Seq/List/Vector/Map/etc. via
`value.toString` — for `EqualTo("a", Seq(1,2))` that produces
`attr = List(1, 2)`, malformed SQL Pinot would either reject or
misparse. Drop the test case `EqualTo("attr21", Seq(1, 2)) → "attr21"
= List(1, 2)` that pinned the broken output, and add a regression test
`Filters with collection-shaped or unrecognized literal values fall
back to post-scan` that verifies Seq / Map / Vector / IN-with-Seq-
elements all route to Spark's residual-filter path.
Apply symmetrically to spark-3 and spark-4. Also fix a MINOR raw-type
warning in pinot-batch-ingestion-spark-4 (`new ArrayList()` → `new
ArrayList<>()`).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…regression tests
Two MAJOR fixes from review (one a follow-on to the prior null-aware writer
commit):
1. PinotDataWriter.write: time-column tracking now (a) honors
`record.isNullAt(timeColumnIndex)` so a single null cell no longer
silently collapses startTime to 0 and corrupts `{startTime}` /
`{endTime}` in segmentNameFormat, and (b) dispatches on the column's
actual Spark type — `record.getLong(idx)` on an IntegerType slot in
UnsafeRow reads 8 bytes from a 4-byte field and returns garbage from
the next slot. Same root cause as the writer-side synthetic-zero bug,
but in the segment-name-tracking branch.
2. Add regression tests in PinotDataWriterTest that exercise the prior
null-aware writer fix (null cells in nullable String/Int/Long columns
propagate as null + addNullValueField, not synthetic zeros) and the
new time-tracking fix (IntegerType time column with one null and one
populated row produces the correct segment-name placeholders).
Plus two MINOR cleanups:
- FilterPushDown.isPushableValue: drop Array[Byte] from the whitelist
since compileValue has no `bytes → X'<hex>'` rendering branch — gating
it through would render `attr = [B@<hashcode>` SQL.
- PinotInputPartition: add a thread-safety note in the scaladoc
documenting the immutable-after-construction contract that Spark's
driver→executor serialization relies on.
Apply symmetrically to spark-3 and spark-4 per C8.6.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three MINOR cleanups from review (no CRITICAL or MAJOR): 1. PinotDataWriter.write time-column match: add a defensive `case other => IllegalStateException(...)` so a future contributor extending `isTimeColumnNumeric` to ShortType/ByteType/TimestampType without updating this match gets a clear error instead of MatchError mid-task. 2. PinotDataWriter.getSegmentName: use Map#getOrElse with an explicit IllegalArgumentException listing the supported placeholder names, instead of a `NoSuchElementException: key not found` that surfaced only at commit() time after the segment was already built. 3. PinotDataWriterTest "Time-column tracking is null-safe and type-correct for IntegerType time columns": now converts the populated row through UnsafeProjection. Without this, the test passed even with only the isNullAt half of the previous fix, because GenericInternalRow stores boxed Integer and getLong returns the int value correctly. The UnsafeRow path is what actually exercises the type-dispatch fix — without it, getLong would read 8 bytes from a 4-byte slot and the segment-name placeholders would not match. Apply symmetrically to spark-3 and spark-4 per C8.6. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ument type-rejection
Two MAJOR fixes from review:
1. PinotBufferedRecordReader.next(reuse): restore the deep `.copy()` that
was removed in an earlier commit as a perf optimization. The
optimization relied on an undocumented assumption — that no transformer
in SegmentIndexCreationDriverImpl's TransformPipeline mutates a
GenericRow value (Object[], byte[], List, ...) in place. The mainline
transformers replace via putValue, but the RecordReader#next(reuse)
SPI does not declare this precondition, so any future transformer
change could silently corrupt pass-2 segment build via the shared
reference into recordBuffer. The defensive copy isolates the two
passes at the cost of 2N transient GenericRow allocations during
segment build — acceptable given the silent-corruption risk if the
assumption breaks.
2. spark-3 README: document the new type-translator rejections
(TimestampType / DateType / ArrayType(BinaryType) /
ArrayType(DecimalType)) in the "Behavior changes since the previous
release" section. Previously only the overwrite/truncate failure was
documented. Existing pipelines that wrote TimestampType columns will
now fail at translation time on upgrade — surface the upstream-cast
recipe (`col("ts").cast("long") / 1000` for micros→millis) so users
can validate before bumping the connector jar.
Apply symmetrically to spark-3 and spark-4 per C8.6.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Upgrade spark4.version from 4.0.0 to 4.1.1. Spark 4.1+ depends on io.netty 4.2.x (introduces io.netty.channel.kqueue.KQueueIoHandler and related APIs); the rest of Pinot stays on netty 4.1.132. Pin the batch-ingestion-spark-4 module's local netty.version to 4.2.7.Final so the embedded SparkContext in SparkSegmentGenerationJobRunnerTest can construct its NettyRpcEnv on macOS (kqueue) and Linux (epoll). The spark-4-connector module needs no override (it doesn't instantiate a real SparkContext in tests). Also pin the test SparkConf to 127.0.0.1 driver bind address — Spark 4.1's stricter address binding fails with "Can't assign requested address" on dev hosts where the system hostname does not resolve back to a local interface. Update README compatibility matrix (4.0.x → 4.1.x), the end-to-end Docker tutorial (apache/spark:4.1.1-scala2.13-java21-python3-ubuntu is published with a JDK 21 variant, no custom build needed), and a Spark-4.0.0-specific docstring in PinotWriteBuilder. All 56 spark-4-connector tests + 3 batch-ingestion-spark-4 tests pass against Spark 4.1.1. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
Adds first-class Apache Spark 4.x support to Pinot, alongside the existing Spark 3 connector and batch-ingestion plugin. The two new modules live next to their Spark 3 siblings:
pinot-connectors/pinot-spark-4-connectorpinot-spark-3-connectorpinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-4pinot-batch-ingestion-spark-3Both modules compile against Apache Spark 4.1.1 (the latest 4.x release) and are activated only on JDK 21+ via the
pinot-spark-4-connector/pinot-batch-ingestion-spark-4profile in their respective parent poms (<jdk>[21,)</jdk>). On JDK 11/17 the reactor still builds Spark 3 and completely skips Spark 4, so existing Spark 3 users are unaffected.The PR also deprecates
pinot-spark-3-connector/pinot-batch-ingestion-spark-3: with Pinot's master branch raising the Java baseline to JDK 21, the Spark 3 modules are now JDK 21 + Spark 3.5.5+ only and slated for removal in the next minor release. New users should adopt Spark 4.Why a separate module?
Apache Spark 4 requires JDK 17+, is Scala 2.13 only (no more 2.12), ships Jetty 12 / Jakarta Servlet 5, and made a few small DataSourceV2 API tweaks. Running the Spark 3 connector under Spark 4 doesn't work cleanly. The modules sit side-by-side so:
pinot-spark-3-connector(deprecated, one release of overlap).pinot-spark-4-connector.Netty
Spark 4.1.x depends on netty 4.2.x (the new
io.netty.channel.kqueue.KQueueIoHandler/EpollIoHandlerAPIs). Pinot-wide netty stays at 4.1.x — only thepinot-batch-ingestion-spark-4module locally overridesnetty.versionto 4.2.7.Final so its embedded-SparkContext test can constructNettyRpcEnv. The spark-4-connector module does not instantiate a real SparkContext in tests, so it inherits Pinot's netty without conflict.End-to-end tutorial
The End-to-end Docker tutorial walks through:
docker run apachepinot/pinot:latest QuickStartapache/spark:4.1.1-scala2.13-java21-python3-ubuntu(Spark 4.1.1 + JDK 21, no custom build needed)baseballStats(97,889 rows) from Spark 4 via PySparkSparkSegment*PushJobRunnerrunners)Every command was executed against
apachepinot/pinot:latestandapache/spark:4.0.0before being checked in (the netty bump to 4.1.1 was applied after end-to-end validation; the Docker tutorial now references the publishedapache/spark:4.1.1-java21image variants).Quick usage (read)
Invoke with
spark-submit(Spark 4.1.1 on JDK 21):The prepended
commons-lang3-3.20.0.jarworks around Spark 4 shipping an older version that lacksObjectUtils.getIfNull(...). See the tutorial's Known gotchas for the full list.Quick usage (write)
This produces segment tar files at the
save(...)path. Then push them to the controller — either REST for one-offs:…or in production use the runners from
pinot-batch-ingestion-spark-4via aSegmentGenerationJobSpecYAML (see the module's README):SparkSegmentGenerationJobRunner— build segments from Avro/CSV/Parquet inputSparkSegmentTarPushJobRunner— upload tars to the controllerSparkSegmentUriPushJobRunner— register tars already staged in deep storage (S3/GCS/HDFS)SparkSegmentMetadataPushJobRunner— metadata-only push for large segments (avoid streaming the whole tar through the controller)df.write.mode("overwrite").save(...),df.writeTo(...).overwrite(...), and SQLINSERT OVERWRITEall fail fast withUnsupportedOperationException. Pinot's write path can only append new segments — it cannot atomically drop or replace rows matching a Spark predicate. To replace data, drop the table first (controller REST) or use a push runner withREFRESH/ consistent-push enabled. Prior to the fixes in this PR, overwrite calls silently appended without replacement, leaving stale data queryable.Compatibility matrix
pinot-spark-3-connector/pinot-batch-ingestion-spark-3(deprecated)pinot-spark-4-connector/pinot-batch-ingestion-spark-4Both connector jars are now compiled with
--release 21(class file major version 65). Deployments stuck on JDK 17 should pin to Pinot1.5.xuntil they can upgrade.Correctness fixes (in addition to the Spark 4 port)
Multi-round review surfaced numerous correctness issues — many pre-existing in the Spark 3 code that was being ported. All fixes are applied symmetrically to both modules with regression tests on each side.
Filter pushdown
Compound filter soundness.
Or/And/Notwere marked as unconditionally supported even if a child was a filter type the connector does not recognize. Spark would remove the compound from its residual filter list, whilecompileFiltersilently returnedNone, so Pinot received no filter at all and returned unfiltered rows. Fix:isFilterSupportedis now recursive — compounds are accepted only when every child is supported.Null-leaf gating across all comparison operators.
EqualNullSafe(_, null)andIn(_, [..., null, ...])originally renderedattr <op> nullviacompileValue's fallback branch; comparison operators (EqualTo/LessThan/LessThanOrEqual/GreaterThan/GreaterThanOrEqual) had the same gap. Fix: every leaf with a null literal is rejected and falls through to Spark post-scan; anInwith a nullvaluearray (NPE risk) is also rejected. Compound gating ensures the rejection propagates through enclosing And/Or/Not.Collection-shaped literal values.
EqualTo("attr", Seq(1, 2))rendered asattr = List(1, 2)viavalue.toString— malformed SQL. Fix: newisPushableValuewhitelist allows onlyString/Number/Boolean/Timestamp/Date; everything else (Seq / Map / Vector / Set / case classes) routes to post-scan.LIKE pushdown escaping + cross-module invariant.
StringStartsWith/StringEndsWith/StringContainsinterpolated raw user-supplied strings intoLIKE '...'without escaping%,_,\, or'. Values containing these changed the predicate meaning (or produced invalid SQL). Fix:escapeLikeLiteralescapes LIKE wildcards, backslashes, and single-quotes; each LIKE predicate carries anESCAPE '\'clause; values containing a literal\are rejected upstream because Pinot'sRegexpPatternConverterUtils#likeToRegexpLikedoes not round-trip\\correctly. A new regression test runs the emitted SQL fragment back throughRegexpPatternConverterUtils.likeToRegexpLikeand verifies the resulting regex (viaMatcher#find, mirroringRegexpLikeConstFunctions) matches the original literal — pinning the cross-module\escape contract.Identifier escaping.
escapeAttrnow wraps any column name in double-quotes and doubles inner"so SQL stays well-formed even for column names containing stray quotes.Type translator
Throw on unknown types instead of returning null. Previously
translateTypereturnednullfor unsupported Spark types, with the caller checking and rethrowing — a future caller forgetting the null-check would silently skip the field. Fix: translator throwsUnsupportedOperationExceptiondirectly with the offending type in the message.Reject TimestampType / DateType. Spark stores
TimestampTypeas microseconds-since-epoch, while Pinot's broker convention is milliseconds-since-epoch (TimestampUtils#toMillisSinceEpoch). The prior translator silently produced values 1000× too large. Fix: translator rejects with a hint to cast upstream (col("ts").cast("long") / 1000); same forDateType. Documented in the spark-3 README as a breaking change.Reject ArrayType(BinaryType) / ArrayType(DecimalType). Pinot has no MV BYTES / MV BIG_DECIMAL writer support. Fix: rejected at translator with a clear error rather than failing deep in the segment driver.
Writer (
PinotDataWriter)SupportsOverwrite[V2]silently dropped the overwrite intent. The writer advertisedSupportsOverwrite(V1) /SupportsOverwriteV2(V2) but ignored filters/predicates and returned an append-only writer.df.write.mode("overwrite")anddf.writeTo(...).overwrite(...)callers believed matching rows would be replaced. Fix:overwrite(...),truncate(), andcanOverwrite(...)all now reject.truncate()is load-bearing:SupportsOverwrite[V2]extendsSupportsTruncate, and Spark's V2Writes rule dispatchesdf.write.mode("overwrite")→truncate()(which defaulted to returningthis→ silent append) rather thanoverwrite([AlwaysTrue]).Per-partition temp-dir leak.
Files.createTempDirectoryingenerateSegment()was never deleted, so long-running executors accumulated uncompressed segment trees under the JVM tmpdir. Fix: track the temp dir on a module-private field; wrapcommit()in try/finally; also delete fromabort()andclose()(idempotent via null-out).closeOnce()wraps the reader close in try/finally so cleanup runs even ifbufferedRecordReader.close()throws.PinotWrite.commit/abortusedprintln. Fix: switched to SLF4J. Driver-sideabort(messages)now best-effort deletes leftover segment tars atsavePathfor everySuccessWriterCommitMessage, tolerating null entries, unknown subclasses, and malformed/emptysavePathURIs.ArrayDataaccessor correctness.record.getArray(idx).array.map(_.asInstanceOf[T])only works onGenericArrayData(test scaffolding) and throws onUnsafeArrayData(real workloads); forStringTypearrays it wouldClassCastExceptionon every row (UTF8String → String). Fix: use type-specificArrayData.toIntArray() / toLongArray() / …accessors and explicitgetUTF8String → toStringconversion for StringType.Null-aware writer + null-safe time-column tracking. Spark's primitive accessors silently return 0 / false for null cells in
UnsafeRow, andgetString/getDecimalNPE on null. Fix: the per-field branch ininternalRowToGenericRowchecksrecord.isNullAt(idx)first and marks the field viaaddNullValueFieldinstead of synthesizing zeros. The time-column-tracking branch inwrite()also honorsisNullAt(timeColumnIndex)(one null cell would have collapsedstartTimeto 0, corrupting{startTime}/{endTime}insegmentNameFormat) and dispatches on the column's actual Spark type —record.getLong(idx)on anIntegerTypeslot inUnsafeRowreads 8 bytes from a 4-byte field and returns garbage.segmentNameFormatvalidation. Width spec on a non-numeric variable ({table:N}) previously crashed withClassCastExceptionat commit time; an unknown placeholder ({instanceId}) crashed withNoSuchElementException. Fix: both now fail fast at job submission with a clearIllegalArgumentExceptionlisting the supported placeholder names and noting that width spec is numeric-only.Add scalar
DecimalTypewriter support. Schema translator already mapped toBIG_DECIMAL; writer now correctly reads viarecord.getDecimal(idx, precision, scale).toJavaBigDecimal.Other
PinotDataSourceclasspath-collision guard (Spark 4 only). Both v3 and v4 register the"pinot"short name; if both jars end up on a classpath, Spark resolves non-deterministically. The Spark 4 connector now probes for the v3 class on every constructor call (Class.forName is JVM-cached), checks both the module classloader and the thread context classloader, swallowsLinkageError, and throws with a clear message instructing the user to remove one jar. Escape hatch:-Dpinot.spark.connector.skip-conflict-guard=true.PinotBufferedRecordReader.next(reuse). A defensive.copy()is preserved (an earlier round of review tried removing it for performance; the more recent review correctly noted that the underlying invariant — that noTransformPipelinetransformer mutates a value object in place — is undocumented at the SPI level, so the deep copy is restored to defend against silent pass-2 corruption).Regression tests for every numbered fix above are added to both
pinot-spark-3-connectorandpinot-spark-4-connector.Test plan
./mvnw -pl pinot-connectors/pinot-spark-4-connector,pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-4 -am teston JDK 21 with Spark 4.1.1 — 56 spark-4-connector scalatest cases plus 3 batch-ingestion-spark-4 testng cases, all green../mvnw -pl pinot-connectors/pinot-spark-3-connector -am teston JDK 21 — 54 scalatest cases, all green../mvnw -pl pinot-integration-tests -am test-compile— integration tests still compile unchanged.spotless:apply checkstyle:check license:check apache-rat:checkclean on all modified modules.apachepinot/pinot:latest+apache/spark:4.0.0(+JDK 21): read 97,889-rowbaseballStatstable, write a 10-row DataFrame to a fresh OFFLINE table, push via controller REST, verify roundtrip through both the Pinot SQL broker and the connector's own read path.Known coverage gaps (tracked for follow-up)
SparkSegmentMetadataPushIntegrationTestequivalent for Spark 4 was added then dropped (commitf18309c95e) because the Spark 4 runtime (Jakarta Servlet 5) conflicts withpinot-integration-tests' Spark 3 + javax stack in a shared Maven module. Restoring end-to-end integration coverage requires either a dedicatedpinot-spark-4-integration-testsmodule or failsafe with a separate classpath realm. Noted in thepinot-batch-ingestion-spark-4README.SparkSegmentMetadataPushJobRunner/SparkSegmentTarPushJobRunner/SparkSegmentUriPushJobRunnerhave no unit tests in either the Spark 3 or the Spark 4 batch-ingestion module today; onlySparkSegmentGenerationJobRunnerTestexercises the in-process driver path. Adding coverage is orthogonal to this PR and would benefit both modules.PinotWrite.abort(...)partial-failure leftover tars — driver-side cleanup runs only for tasks that produced aSuccessWriterCommitMessage; tasks that copied a tar tosavePathand then died before commit message return leave the tar behind. The contract-level guard against silent overwrite is provided by theoverwrite/truncaterejection (Add a Gitter chat badge to README.md #9 above); this runtime-level cleanup is best-effort.Notes for reviewers
commons-lang3:3.20+onspark.{driver,executor}.extraClassPath. A follow-up PR can relocateorg.apache.commons.lang3.*in the connector's shade config to eliminate this step.requireProperty scala.compat.version=2.13enforcer rules so-Pscala-2.12builds fail fast with a clear message.-Ppinot-fastdev— activation is JDK-based, so-Ppinot-fastdevon JDK 21 also builds the Spark 4 modules. The Spark 4 shade is disabled under-Ppinot-fastdevvia each module's own profile. Explicit opt-out:-P!pinot-spark-4-connector/-P!pinot-batch-ingestion-spark-4.pinot-batch-ingestion-spark-4/pom.xmlpinsnetty.version=4.2.7.Finalso the embedded SparkContext inSparkSegmentGenerationJobRunnerTestcan construct itsNettyRpcEnv(Spark 4.1+ requires netty 4.2 APIs). The override is module-scoped; the rest of Pinot stays on netty 4.1.x.🤖 Generated with Claude Code