diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java index 719db92d1..66086f5f2 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java @@ -508,6 +508,12 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { super.notifyCheckpointComplete(checkpointId); } + @Override + public void notifyCheckpointAborted(long checkpointId) throws Exception { + durableExecManager.notifyCheckpointAborted(checkpointId); + super.notifyCheckpointAborted(checkpointId); + } + private MailboxProcessor getMailboxProcessor() throws Exception { Field field = MailboxExecutorImpl.class.getDeclaredField("mailboxProcessor"); field.setAccessible(true); diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java index 85c5df23a..295b36c9c 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java @@ -333,11 +333,15 @@ void maybePruneState(Object key, long sequenceNum) throws Exception { * via {@link #snapshotLastCompletedSequenceNumbers}. After pruning, the entry for that * checkpoint is removed. No-op when durable execution is disabled. * - *
Invariant: the {@code checkpointIdToSeqNums.remove} below and the {@code put} in - * {@link #snapshotLastCompletedSequenceNumbers} MUST share the same {@code actionStateStore != - * null} guard. Dropping the guard on either side breaks the symmetry and reintroduces the + *
Invariant: the {@code checkpointIdToSeqNums.remove} below, the {@code put} in + * {@link #snapshotLastCompletedSequenceNumbers}, and the {@code remove} in {@link + * #notifyCheckpointAborted} MUST all share the same {@code actionStateStore != null} guard. + * Every snapshotted entry is released by exactly one of the two paths — Flink notifies either + * {@code notifyCheckpointComplete} OR {@code notifyCheckpointAborted} for each checkpoint, + * never both. Dropping the guard on any side breaks the symmetry and reintroduces the * unbounded-map leak tracked by - * issue #645. + * issue #645 (complete path) or issue #665 (abort path). * * @param checkpointId the id of the completed checkpoint. */ @@ -352,6 +356,35 @@ void notifyCheckpointComplete(long checkpointId) { } } + /** + * Releases the per-checkpoint sequence-number snapshot recorded by {@link + * #snapshotLastCompletedSequenceNumbers} when Flink aborts the checkpoint instead of completing + * it. Unlike {@link #notifyCheckpointComplete}, this method does NOT prune durable action + * state: the aborted checkpoint's writes were never committed, so the previously-pruned-up-to + * point is still the {@code lastCompletedSequenceNumber} from the last successful checkpoint, + * and any state recorded since is still load-bearing for recovery from that prior checkpoint. + * We only release the in-memory tracking entry to prevent unbounded growth of {@code + * checkpointIdToSeqNums} when checkpoints abort under sustained pressure (issue #665). + * + *
Safe when no entry exists for {@code checkpointId} (e.g., abort fires for a checkpoint + * this task never snapshotted): {@link Map#remove} returns {@code null}. No-op when durable + * execution is disabled. + * + *
Invariant: see {@link #snapshotLastCompletedSequenceNumbers} — together with {@link + * #notifyCheckpointComplete}, this method shares the same {@code actionStateStore != null} + * guard that releases entries recorded by the snapshot side. Dropping the guard on any side + * breaks the symmetry and reintroduces the unbounded-map leak tracked by issue #645 (complete path) or issue #665 (abort path). + * + * @param checkpointId the id of the aborted checkpoint. + */ + void notifyCheckpointAborted(long checkpointId) { + if (actionStateStore != null) { + checkpointIdToSeqNums.remove(checkpointId); + } + } + void snapshotRecoveryMarker() throws Exception { if (actionStateStore != null) { Object recoveryMarker = actionStateStore.getRecoveryMarker(); @@ -371,11 +404,13 @@ void snapshotRecoveryMarker() throws Exception { * strictly up to the sequence number that was committed by that checkpoint. No-op when durable * execution is disabled. * - *
Invariant: the {@code checkpointIdToSeqNums.put} below and the {@code remove} in - * {@link #notifyCheckpointComplete(long)} MUST share the same {@code actionStateStore != null} - * guard. Dropping the guard on either side breaks the symmetry and reintroduces the - * unbounded-map leak tracked by - * issue #645. + *
Invariant: the {@code checkpointIdToSeqNums.put} below, the {@code remove} in + * {@link #notifyCheckpointComplete(long)}, and the {@code remove} in {@link + * #notifyCheckpointAborted(long)} MUST all share the same {@code actionStateStore != null} + * guard. Dropping the guard on any side breaks the symmetry and reintroduces the unbounded-map + * leak tracked by issue #645 + * (complete path) or issue #665 + * (abort path). * * @param keyedStateBackend the keyed state backend to scan. * @param checkpointId the id of the checkpoint being snapshotted. diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java index f5fc2a172..4d62dc165 100644 --- a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java @@ -28,6 +28,7 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.KeyedStateFunction; import org.apache.flink.runtime.state.OperatorStateBackend; @@ -240,4 +241,132 @@ void handleRecoveryCallsRebuildState() throws Exception { dem.close(); } + + @Test + void notifyAbortedRemovesEntryWithoutPruning() throws Exception { + // Use a cleanup-enabled in-memory store so pruning would be observable if it + // (incorrectly) fired on abort. doCleanup=true makes pruneState remove the keyed entry + // from getKeyedActionStates(). + InMemoryActionStateStore store = new InMemoryActionStateStore(true); + DurableExecutionManager dem = new DurableExecutionManager(store); + + Action action = TestActions.noopAction(); + Event event = new InputEvent(1L); + String key = "key-a"; + long seq = 7L; + long checkpointId = 1L; + + // Seed durable state for the key/seq pair. + dem.maybeInitActionState(key, seq, action, event); + assertThat(store.getKeyedActionStates()).containsKey(key); + + // Snapshot the per-key sequence number for this checkpoint. Stub the backend so that + // applyToAllKeys invokes the provided KeyedStateFunction once with our seeded key and a + // ValueState that returns the seeded sequence number. + @SuppressWarnings("unchecked") + KeyedStateBackend