Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,12 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
}

@Override
public void notifyCheckpointAborted(long checkpointId) throws Exception {
durableExecManager.notifyCheckpointAborted(checkpointId);
super.notifyCheckpointAborted(checkpointId);
}

private MailboxProcessor getMailboxProcessor() throws Exception {
Field field = MailboxExecutorImpl.class.getDeclaredField("mailboxProcessor");
field.setAccessible(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,11 +333,15 @@ void maybePruneState(Object key, long sequenceNum) throws Exception {
* via {@link #snapshotLastCompletedSequenceNumbers}. After pruning, the entry for that
* checkpoint is removed. No-op when durable execution is disabled.
*
* <p><b>Invariant:</b> the {@code checkpointIdToSeqNums.remove} below and the {@code put} in
* {@link #snapshotLastCompletedSequenceNumbers} MUST share the same {@code actionStateStore !=
* null} guard. Dropping the guard on either side breaks the symmetry and reintroduces the
* <p><b>Invariant:</b> the {@code checkpointIdToSeqNums.remove} below, the {@code put} in
* {@link #snapshotLastCompletedSequenceNumbers}, and the {@code remove} in {@link
* #notifyCheckpointAborted} MUST all share the same {@code actionStateStore != null} guard.
* Every snapshotted entry is released by exactly one of the two paths — Flink notifies either
* {@code notifyCheckpointComplete} OR {@code notifyCheckpointAborted} for each checkpoint,
* never both. Dropping the guard on any side breaks the symmetry and reintroduces the
* unbounded-map leak tracked by <a href="https://github.com/apache/flink-agents/issues/645">
* issue #645</a>.
* issue #645</a> (complete path) or <a
* href="https://github.com/apache/flink-agents/issues/665">issue #665</a> (abort path).
*
* @param checkpointId the id of the completed checkpoint.
*/
Expand All @@ -352,6 +356,35 @@ void notifyCheckpointComplete(long checkpointId) {
}
}

/**
* Releases the per-checkpoint sequence-number snapshot recorded by {@link
* #snapshotLastCompletedSequenceNumbers} when Flink aborts the checkpoint instead of completing
* it. Unlike {@link #notifyCheckpointComplete}, this method does NOT prune durable action
* state: the aborted checkpoint's writes were never committed, so the previously-pruned-up-to
* point is still the {@code lastCompletedSequenceNumber} from the last successful checkpoint,
* and any state recorded since is still load-bearing for recovery from that prior checkpoint.
* We only release the in-memory tracking entry to prevent unbounded growth of {@code
* checkpointIdToSeqNums} when checkpoints abort under sustained pressure (issue #665).
*
* <p>Safe when no entry exists for {@code checkpointId} (e.g., abort fires for a checkpoint
* this task never snapshotted): {@link Map#remove} returns {@code null}. No-op when durable
* execution is disabled.
*
* <p><b>Invariant:</b> see {@link #snapshotLastCompletedSequenceNumbers} — together with {@link
* #notifyCheckpointComplete}, this method shares the same {@code actionStateStore != null}
* guard that releases entries recorded by the snapshot side. Dropping the guard on any side
* breaks the symmetry and reintroduces the unbounded-map leak tracked by <a
* href="https://github.com/apache/flink-agents/issues/645">issue #645</a> (complete path) or <a
* href="https://github.com/apache/flink-agents/issues/665">issue #665</a> (abort path).
*
* @param checkpointId the id of the aborted checkpoint.
*/
void notifyCheckpointAborted(long checkpointId) {
if (actionStateStore != null) {
checkpointIdToSeqNums.remove(checkpointId);
}
}

void snapshotRecoveryMarker() throws Exception {
if (actionStateStore != null) {
Object recoveryMarker = actionStateStore.getRecoveryMarker();
Expand All @@ -371,11 +404,13 @@ void snapshotRecoveryMarker() throws Exception {
* strictly up to the sequence number that was committed by that checkpoint. No-op when durable
* execution is disabled.
*
* <p><b>Invariant:</b> the {@code checkpointIdToSeqNums.put} below and the {@code remove} in
* {@link #notifyCheckpointComplete(long)} MUST share the same {@code actionStateStore != null}
* guard. Dropping the guard on either side breaks the symmetry and reintroduces the
* unbounded-map leak tracked by <a href="https://github.com/apache/flink-agents/issues/645">
* issue #645</a>.
* <p><b>Invariant:</b> the {@code checkpointIdToSeqNums.put} below, the {@code remove} in
* {@link #notifyCheckpointComplete(long)}, and the {@code remove} in {@link
* #notifyCheckpointAborted(long)} MUST all share the same {@code actionStateStore != null}
* guard. Dropping the guard on any side breaks the symmetry and reintroduces the unbounded-map
* leak tracked by <a href="https://github.com/apache/flink-agents/issues/645">issue #645</a>
* (complete path) or <a href="https://github.com/apache/flink-agents/issues/665">issue #665</a>
* (abort path).
*
* @param keyedStateBackend the keyed state backend to scan.
* @param checkpointId the id of the checkpoint being snapshotted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.OperatorStateBackend;
Expand Down Expand Up @@ -240,4 +241,132 @@ void handleRecoveryCallsRebuildState() throws Exception {

dem.close();
}

@Test
void notifyAbortedRemovesEntryWithoutPruning() throws Exception {
// Use a cleanup-enabled in-memory store so pruning would be observable if it
// (incorrectly) fired on abort. doCleanup=true makes pruneState remove the keyed entry
// from getKeyedActionStates().
InMemoryActionStateStore store = new InMemoryActionStateStore(true);
DurableExecutionManager dem = new DurableExecutionManager(store);

Action action = TestActions.noopAction();
Event event = new InputEvent(1L);
String key = "key-a";
long seq = 7L;
long checkpointId = 1L;

// Seed durable state for the key/seq pair.
dem.maybeInitActionState(key, seq, action, event);
assertThat(store.getKeyedActionStates()).containsKey(key);

// Snapshot the per-key sequence number for this checkpoint. Stub the backend so that
// applyToAllKeys invokes the provided KeyedStateFunction once with our seeded key and a
// ValueState that returns the seeded sequence number.
@SuppressWarnings("unchecked")
KeyedStateBackend<Object> backend = mock(KeyedStateBackend.class);
stubApplyToAllKeysSingle(backend, key, seq);
dem.snapshotLastCompletedSequenceNumbers(backend, checkpointId);
assertThat(dem.getCheckpointIdToSeqNums()).containsKey(checkpointId);

// Abort the checkpoint.
dem.notifyCheckpointAborted(checkpointId);

// The in-memory tracking entry is released.
assertThat(dem.getCheckpointIdToSeqNums()).doesNotContainKey(checkpointId);
// The durable state was NOT pruned — the key is still present in the store.
assertThat(store.getKeyedActionStates()).containsKey(key);

dem.close();
}

@Test
void completedAndAbortedInterleavedKeepsInFlightEntries() throws Exception {
// doCleanup=true so a completed checkpoint's pruneState calls are observable as a
// disappearance from getKeyedActionStates().
InMemoryActionStateStore store = new InMemoryActionStateStore(true);
DurableExecutionManager dem = new DurableExecutionManager(store);

Action action = TestActions.noopAction();
Event eventA = new InputEvent(1L);
Event eventB = new InputEvent(2L);
Event eventC = new InputEvent(3L);
String keyA = "key-a";
String keyB = "key-b";
String keyC = "key-c";

// Seed durable state for three distinct keys, one per upcoming checkpoint.
dem.maybeInitActionState(keyA, 10L, action, eventA);
dem.maybeInitActionState(keyB, 20L, action, eventB);
dem.maybeInitActionState(keyC, 30L, action, eventC);
assertThat(store.getKeyedActionStates()).containsKeys(keyA, keyB, keyC);

// Snapshot three checkpoints. Each snapshot is stubbed to record exactly one key with its
// seeded sequence number, so the per-checkpoint map is fully deterministic.
@SuppressWarnings("unchecked")
KeyedStateBackend<Object> backendA = mock(KeyedStateBackend.class);
stubApplyToAllKeysSingle(backendA, keyA, 10L);
dem.snapshotLastCompletedSequenceNumbers(backendA, 1L);

@SuppressWarnings("unchecked")
KeyedStateBackend<Object> backendB = mock(KeyedStateBackend.class);
stubApplyToAllKeysSingle(backendB, keyB, 20L);
dem.snapshotLastCompletedSequenceNumbers(backendB, 2L);

@SuppressWarnings("unchecked")
KeyedStateBackend<Object> backendC = mock(KeyedStateBackend.class);
stubApplyToAllKeysSingle(backendC, keyC, 30L);
dem.snapshotLastCompletedSequenceNumbers(backendC, 3L);

assertThat(dem.getCheckpointIdToSeqNums()).containsKeys(1L, 2L, 3L);

// Complete checkpoint 1 (prunes keyA) and abort checkpoint 2 (releases entry without
// pruning keyB). Checkpoint 3 stays in-flight.
dem.notifyCheckpointComplete(1L);
dem.notifyCheckpointAborted(2L);

// Only the in-flight checkpoint entry remains.
assertThat(dem.getCheckpointIdToSeqNums()).containsOnlyKeys(3L);

// Completed checkpoint's durable state was pruned away.
assertThat(store.getKeyedActionStates()).doesNotContainKey(keyA);
// Aborted checkpoint's durable state is untouched.
assertThat(store.getKeyedActionStates()).containsKey(keyB);
// In-flight checkpoint's durable state is untouched.
assertThat(store.getKeyedActionStates()).containsKey(keyC);

dem.close();
}

@Test
void noStoreModeNotifyCheckpointAbortedIsNoOp() {
DurableExecutionManager dem = new DurableExecutionManager(null);

// Should not throw and should not populate any per-checkpoint map entries.
dem.notifyCheckpointAborted(42L);

assertThat(dem.getCheckpointIdToSeqNums()).isEmpty();
}

/**
* Stubs {@code backend.applyToAllKeys(...)} to invoke the supplied {@link KeyedStateFunction}
* exactly once with the given key and a {@link ValueState} mock that returns the given sequence
* number. This mirrors the per-key iteration shape used by {@link
* DurableExecutionManager#snapshotLastCompletedSequenceNumbers}.
*/
@SuppressWarnings("unchecked")
private static void stubApplyToAllKeysSingle(
KeyedStateBackend<Object> backend, Object key, long sequenceNumber) throws Exception {
ValueState<Long> valueState = mock(ValueState.class);
when(valueState.value()).thenReturn(sequenceNumber);
doAnswer(
invocation -> {
KeyedStateFunction<Object, ValueState<Long>> function =
invocation.getArgument(3);
function.process(key, valueState);
return null;
})
.when(backend)
.applyToAllKeys(any(), any(), any(ValueStateDescriptor.class), any());
}
}
Loading