From b93199e2706976c0c94aff860f27973063793eed Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Tue, 12 May 2026 22:43:23 -0700 Subject: [PATCH] [runtime] Handle notifyCheckpointAborted to stop leaking checkpoint entries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Issue #665. When Flink aborts a checkpoint, it calls notifyCheckpointAborted instead of notifyCheckpointComplete. The DurableExecutionManager only handled the complete path, so the per-checkpoint sequence-number entry recorded by snapshotLastCompletedSequenceNumbers was never released for aborted checkpoints. Under sustained abort pressure (timeouts, alignment failures, backend pressure), checkpointIdToSeqNums grew unboundedly. Changes: - Add DurableExecutionManager.notifyCheckpointAborted(long): removes the entry from checkpointIdToSeqNums, guarded by the same actionStateStore != null check as notifyCheckpointComplete. Does NOT prune durable action state — the aborted checkpoint's writes were never committed, so the prior committed checkpoint's recovery state is still load-bearing and must not be pruned. - Add ActionExecutionOperator.notifyCheckpointAborted(long): thin override that delegates to the manager and then calls super, mirroring the existing notifyCheckpointComplete override. - Extend the symmetric-guard invariant javadoc on snapshotLastCompletedSequenceNumbers and notifyCheckpointComplete to name both release paths (complete OR abort). The actionStateStore != null guard now lives on three methods; the cross-linked javadoc makes that explicit and cites issues #645 and #665. - Three new DurableExecutionManagerTest cases (using the existing getCheckpointIdToSeqNums() @VisibleForTesting accessor introduced in #659): * notifyAbortedRemovesEntryWithoutPruning — entry released, durable state untouched (verified against a real InMemoryActionStateStore so wrongful pruning would be observable). * completedAndAbortedInterleavedKeepsInFlightEntries — three in-flight checkpoints, one completes (state pruned), one aborts (state preserved), one remains. * noStoreModeNotifyCheckpointAbortedIsNoOp — symmetric null-store no-op coverage matching the existing notifyCheckpointComplete null-store case. --- .../operator/ActionExecutionOperator.java | 6 + .../operator/DurableExecutionManager.java | 53 +++++-- .../operator/DurableExecutionManagerTest.java | 129 ++++++++++++++++++ 3 files changed, 179 insertions(+), 9 deletions(-) diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java index 719db92d1..66086f5f2 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java @@ -508,6 +508,12 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { super.notifyCheckpointComplete(checkpointId); } + @Override + public void notifyCheckpointAborted(long checkpointId) throws Exception { + durableExecManager.notifyCheckpointAborted(checkpointId); + super.notifyCheckpointAborted(checkpointId); + } + private MailboxProcessor getMailboxProcessor() throws Exception { Field field = MailboxExecutorImpl.class.getDeclaredField("mailboxProcessor"); field.setAccessible(true); diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java index 85c5df23a..295b36c9c 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java @@ -333,11 +333,15 @@ void maybePruneState(Object key, long sequenceNum) throws Exception { * via {@link #snapshotLastCompletedSequenceNumbers}. After pruning, the entry for that * checkpoint is removed. No-op when durable execution is disabled. * - *

Invariant: the {@code checkpointIdToSeqNums.remove} below and the {@code put} in - * {@link #snapshotLastCompletedSequenceNumbers} MUST share the same {@code actionStateStore != - * null} guard. Dropping the guard on either side breaks the symmetry and reintroduces the + *

Invariant: the {@code checkpointIdToSeqNums.remove} below, the {@code put} in + * {@link #snapshotLastCompletedSequenceNumbers}, and the {@code remove} in {@link + * #notifyCheckpointAborted} MUST all share the same {@code actionStateStore != null} guard. + * Every snapshotted entry is released by exactly one of the two paths — Flink notifies either + * {@code notifyCheckpointComplete} OR {@code notifyCheckpointAborted} for each checkpoint, + * never both. Dropping the guard on any side breaks the symmetry and reintroduces the * unbounded-map leak tracked by - * issue #645. + * issue #645 (complete path) or issue #665 (abort path). * * @param checkpointId the id of the completed checkpoint. */ @@ -352,6 +356,35 @@ void notifyCheckpointComplete(long checkpointId) { } } + /** + * Releases the per-checkpoint sequence-number snapshot recorded by {@link + * #snapshotLastCompletedSequenceNumbers} when Flink aborts the checkpoint instead of completing + * it. Unlike {@link #notifyCheckpointComplete}, this method does NOT prune durable action + * state: the aborted checkpoint's writes were never committed, so the previously-pruned-up-to + * point is still the {@code lastCompletedSequenceNumber} from the last successful checkpoint, + * and any state recorded since is still load-bearing for recovery from that prior checkpoint. + * We only release the in-memory tracking entry to prevent unbounded growth of {@code + * checkpointIdToSeqNums} when checkpoints abort under sustained pressure (issue #665). + * + *

Safe when no entry exists for {@code checkpointId} (e.g., abort fires for a checkpoint + * this task never snapshotted): {@link Map#remove} returns {@code null}. No-op when durable + * execution is disabled. + * + *

Invariant: see {@link #snapshotLastCompletedSequenceNumbers} — together with {@link + * #notifyCheckpointComplete}, this method shares the same {@code actionStateStore != null} + * guard that releases entries recorded by the snapshot side. Dropping the guard on any side + * breaks the symmetry and reintroduces the unbounded-map leak tracked by issue #645 (complete path) or issue #665 (abort path). + * + * @param checkpointId the id of the aborted checkpoint. + */ + void notifyCheckpointAborted(long checkpointId) { + if (actionStateStore != null) { + checkpointIdToSeqNums.remove(checkpointId); + } + } + void snapshotRecoveryMarker() throws Exception { if (actionStateStore != null) { Object recoveryMarker = actionStateStore.getRecoveryMarker(); @@ -371,11 +404,13 @@ void snapshotRecoveryMarker() throws Exception { * strictly up to the sequence number that was committed by that checkpoint. No-op when durable * execution is disabled. * - *

Invariant: the {@code checkpointIdToSeqNums.put} below and the {@code remove} in - * {@link #notifyCheckpointComplete(long)} MUST share the same {@code actionStateStore != null} - * guard. Dropping the guard on either side breaks the symmetry and reintroduces the - * unbounded-map leak tracked by - * issue #645. + *

Invariant: the {@code checkpointIdToSeqNums.put} below, the {@code remove} in + * {@link #notifyCheckpointComplete(long)}, and the {@code remove} in {@link + * #notifyCheckpointAborted(long)} MUST all share the same {@code actionStateStore != null} + * guard. Dropping the guard on any side breaks the symmetry and reintroduces the unbounded-map + * leak tracked by issue #645 + * (complete path) or issue #665 + * (abort path). * * @param keyedStateBackend the keyed state backend to scan. * @param checkpointId the id of the checkpoint being snapshotted. diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java index f5fc2a172..4d62dc165 100644 --- a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java @@ -28,6 +28,7 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.KeyedStateFunction; import org.apache.flink.runtime.state.OperatorStateBackend; @@ -240,4 +241,132 @@ void handleRecoveryCallsRebuildState() throws Exception { dem.close(); } + + @Test + void notifyAbortedRemovesEntryWithoutPruning() throws Exception { + // Use a cleanup-enabled in-memory store so pruning would be observable if it + // (incorrectly) fired on abort. doCleanup=true makes pruneState remove the keyed entry + // from getKeyedActionStates(). + InMemoryActionStateStore store = new InMemoryActionStateStore(true); + DurableExecutionManager dem = new DurableExecutionManager(store); + + Action action = TestActions.noopAction(); + Event event = new InputEvent(1L); + String key = "key-a"; + long seq = 7L; + long checkpointId = 1L; + + // Seed durable state for the key/seq pair. + dem.maybeInitActionState(key, seq, action, event); + assertThat(store.getKeyedActionStates()).containsKey(key); + + // Snapshot the per-key sequence number for this checkpoint. Stub the backend so that + // applyToAllKeys invokes the provided KeyedStateFunction once with our seeded key and a + // ValueState that returns the seeded sequence number. + @SuppressWarnings("unchecked") + KeyedStateBackend backend = mock(KeyedStateBackend.class); + stubApplyToAllKeysSingle(backend, key, seq); + dem.snapshotLastCompletedSequenceNumbers(backend, checkpointId); + assertThat(dem.getCheckpointIdToSeqNums()).containsKey(checkpointId); + + // Abort the checkpoint. + dem.notifyCheckpointAborted(checkpointId); + + // The in-memory tracking entry is released. + assertThat(dem.getCheckpointIdToSeqNums()).doesNotContainKey(checkpointId); + // The durable state was NOT pruned — the key is still present in the store. + assertThat(store.getKeyedActionStates()).containsKey(key); + + dem.close(); + } + + @Test + void completedAndAbortedInterleavedKeepsInFlightEntries() throws Exception { + // doCleanup=true so a completed checkpoint's pruneState calls are observable as a + // disappearance from getKeyedActionStates(). + InMemoryActionStateStore store = new InMemoryActionStateStore(true); + DurableExecutionManager dem = new DurableExecutionManager(store); + + Action action = TestActions.noopAction(); + Event eventA = new InputEvent(1L); + Event eventB = new InputEvent(2L); + Event eventC = new InputEvent(3L); + String keyA = "key-a"; + String keyB = "key-b"; + String keyC = "key-c"; + + // Seed durable state for three distinct keys, one per upcoming checkpoint. + dem.maybeInitActionState(keyA, 10L, action, eventA); + dem.maybeInitActionState(keyB, 20L, action, eventB); + dem.maybeInitActionState(keyC, 30L, action, eventC); + assertThat(store.getKeyedActionStates()).containsKeys(keyA, keyB, keyC); + + // Snapshot three checkpoints. Each snapshot is stubbed to record exactly one key with its + // seeded sequence number, so the per-checkpoint map is fully deterministic. + @SuppressWarnings("unchecked") + KeyedStateBackend backendA = mock(KeyedStateBackend.class); + stubApplyToAllKeysSingle(backendA, keyA, 10L); + dem.snapshotLastCompletedSequenceNumbers(backendA, 1L); + + @SuppressWarnings("unchecked") + KeyedStateBackend backendB = mock(KeyedStateBackend.class); + stubApplyToAllKeysSingle(backendB, keyB, 20L); + dem.snapshotLastCompletedSequenceNumbers(backendB, 2L); + + @SuppressWarnings("unchecked") + KeyedStateBackend backendC = mock(KeyedStateBackend.class); + stubApplyToAllKeysSingle(backendC, keyC, 30L); + dem.snapshotLastCompletedSequenceNumbers(backendC, 3L); + + assertThat(dem.getCheckpointIdToSeqNums()).containsKeys(1L, 2L, 3L); + + // Complete checkpoint 1 (prunes keyA) and abort checkpoint 2 (releases entry without + // pruning keyB). Checkpoint 3 stays in-flight. + dem.notifyCheckpointComplete(1L); + dem.notifyCheckpointAborted(2L); + + // Only the in-flight checkpoint entry remains. + assertThat(dem.getCheckpointIdToSeqNums()).containsOnlyKeys(3L); + + // Completed checkpoint's durable state was pruned away. + assertThat(store.getKeyedActionStates()).doesNotContainKey(keyA); + // Aborted checkpoint's durable state is untouched. + assertThat(store.getKeyedActionStates()).containsKey(keyB); + // In-flight checkpoint's durable state is untouched. + assertThat(store.getKeyedActionStates()).containsKey(keyC); + + dem.close(); + } + + @Test + void noStoreModeNotifyCheckpointAbortedIsNoOp() { + DurableExecutionManager dem = new DurableExecutionManager(null); + + // Should not throw and should not populate any per-checkpoint map entries. + dem.notifyCheckpointAborted(42L); + + assertThat(dem.getCheckpointIdToSeqNums()).isEmpty(); + } + + /** + * Stubs {@code backend.applyToAllKeys(...)} to invoke the supplied {@link KeyedStateFunction} + * exactly once with the given key and a {@link ValueState} mock that returns the given sequence + * number. This mirrors the per-key iteration shape used by {@link + * DurableExecutionManager#snapshotLastCompletedSequenceNumbers}. + */ + @SuppressWarnings("unchecked") + private static void stubApplyToAllKeysSingle( + KeyedStateBackend backend, Object key, long sequenceNumber) throws Exception { + ValueState valueState = mock(ValueState.class); + when(valueState.value()).thenReturn(sequenceNumber); + doAnswer( + invocation -> { + KeyedStateFunction> function = + invocation.getArgument(3); + function.process(key, valueState); + return null; + }) + .when(backend) + .applyToAllKeys(any(), any(), any(ValueStateDescriptor.class), any()); + } }