Skip to content

[runtime] Handle notifyCheckpointAborted to stop leaking checkpoint entries#667

Open
weiqingy wants to merge 1 commit into
apache:mainfrom
weiqingy:665-impl
Open

[runtime] Handle notifyCheckpointAborted to stop leaking checkpoint entries#667
weiqingy wants to merge 1 commit into
apache:mainfrom
weiqingy:665-impl

Conversation

@weiqingy
Copy link
Copy Markdown
Collaborator

@weiqingy weiqingy commented May 13, 2026

Closes #665.

What this PR does

DurableExecutionManager.checkpointIdToSeqNums leaked entries on aborted checkpoints. Flink calls notifyCheckpointAborted(...) when a checkpoint is aborted (timeout, alignment failure, backend pressure); the existing code only handled the complete path. Under sustained abort pressure the map grew unboundedly.

Production changes

  1. DurableExecutionManager.notifyCheckpointAborted(long) (new, package-private) — removes the entry from checkpointIdToSeqNums. No pruneState call — durable writes for an aborted checkpoint were never committed, so the prior committed checkpoint's recovery state is still load-bearing and must not be pruned. Guarded by actionStateStore != null to mirror the symmetric guard from [Bug][runtime] Fix memory leak in DurableExecutionManager.checkpointIdToSeqNums #645.
  2. ActionExecutionOperator.notifyCheckpointAborted(long) (new @Override) — thin delegate to the manager, then super.notifyCheckpointAborted(...). Mirrors the existing notifyCheckpointComplete override exactly.
  3. Javadoc invariant statement on snapshotLastCompletedSequenceNumbers and notifyCheckpointComplete strengthened to name BOTH release paths (complete OR abort). The actionStateStore != null guard now lives on three methods; the javadoc makes the three-way symmetry explicit and cross-links [Bug][runtime] Fix memory leak in DurableExecutionManager.checkpointIdToSeqNums #645 + [Bug][runtime] DurableExecutionManager leaks checkpointIdToSeqNums entries on aborted checkpoints #665.
  4. @VisibleForTesting getCheckpointIdToSeqNums() accessor — mirrors getActionStateStore() precedent. (Same addition as in [runtime] Lock null-store symmetry invariant in DurableExecutionManager #666; the second PR to land drops the duplicate on rebase.)

Tests (DEM-level, three new)

  • notifyAbortedRemovesEntryWithoutPruning — entry released, durable state untouched. Uses a real InMemoryActionStateStore (not a mock) so wrongful pruning would be observable.
  • completedAndAbortedInterleavedKeepsInFlightEntries — three in-flight checkpoints; one completes (state pruned), one aborts (state preserved), one remains.
  • noStoreModeNotifyCheckpointAbortedIsNoOp — symmetric null-store no-op coverage.

Sanity-mutation verified locally:

  • Emptying the new method's body → 2 of 3 new tests fail.
  • Adding a wrongful actionStateStore.pruneState(...) call → 2 of 3 new tests fail (state was incorrectly pruned).

Operator-level harness test deferred to #646 — the new operator override is a one-line delegate; the logic is in the manager.

Test plan

  • mvn test -Dtest=DurableExecutionManagerTest -pl runtime — 5/5 pass (2 existing + 3 new)
  • mvn test -Dtest=ActionExecutionOperatorTest -pl runtime — 28/28 pass
  • mvn test -pl runtime — 307/307 pass (no regressions)
  • ./tools/lint.sh -c — 0 violations
  • ./tools/check-license.sh — clean (no new tracked files)
  • Sanity mutation: empty new method body → expected tests fail
  • Sanity mutation: wrongful pruneState call on abort → expected tests fail

Documentation

  • doc-needed
  • doc-not-needed
  • doc-included

…ntries

Issue apache#665. When Flink aborts a checkpoint, it calls notifyCheckpointAborted
instead of notifyCheckpointComplete. The DurableExecutionManager only handled
the complete path, so the per-checkpoint sequence-number entry recorded by
snapshotLastCompletedSequenceNumbers was never released for aborted
checkpoints. Under sustained abort pressure (timeouts, alignment failures,
backend pressure), checkpointIdToSeqNums grew unboundedly.

Changes:

- Add DurableExecutionManager.notifyCheckpointAborted(long): removes the
  entry from checkpointIdToSeqNums, guarded by the same actionStateStore
  != null check as notifyCheckpointComplete. Does NOT prune durable action
  state — the aborted checkpoint's writes were never committed, so the
  prior committed checkpoint's recovery state is still load-bearing and
  must not be pruned.

- Add ActionExecutionOperator.notifyCheckpointAborted(long): thin override
  that delegates to the manager and then calls super, mirroring the
  existing notifyCheckpointComplete override.

- Extend the symmetric-guard invariant javadoc on
  snapshotLastCompletedSequenceNumbers and notifyCheckpointComplete to
  name both release paths (complete OR abort). The actionStateStore !=
  null guard now lives on three methods; the cross-linked javadoc makes
  that explicit and cites issues apache#645 and apache#665.

- Add @VisibleForTesting accessor for checkpointIdToSeqNums to enable the
  new regression tests. Mirrors the existing getActionStateStore() pattern.

- Three new DurableExecutionManagerTest cases:
  * notifyAbortedRemovesEntryWithoutPruning — entry released, durable
    state untouched (verified against a real InMemoryActionStateStore so
    wrongful pruning would be observable).
  * completedAndAbortedInterleavedKeepsInFlightEntries — three in-flight
    checkpoints, one completes (state pruned), one aborts (state preserved),
    one remains.
  * noStoreModeNotifyCheckpointAbortedIsNoOp — symmetric null-store no-op
    coverage matching the existing notifyCheckpointComplete null-store case.
@github-actions github-actions Bot added doc-label-missing The Bot applies this label either because none or multiple labels were provided. fixVersion/0.3.0 The feature or bug should be implemented/fixed in the 0.3.0 version. priority/major Default priority of the PR or issue. doc-not-needed Your PR changes do not impact docs and removed doc-label-missing The Bot applies this label either because none or multiple labels were provided. labels May 13, 2026
@wenjin272
Copy link
Copy Markdown
Collaborator

Hi, @weiqingy. It appears that after the merge of #659, both #666 and #667 have some conflicts.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs fixVersion/0.3.0 The feature or bug should be implemented/fixed in the 0.3.0 version. priority/major Default priority of the PR or issue.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug][runtime] DurableExecutionManager leaks checkpointIdToSeqNums entries on aborted checkpoints

2 participants