diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java index 03bfe2643..85c5df23a 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java @@ -333,6 +333,12 @@ void maybePruneState(Object key, long sequenceNum) throws Exception { * via {@link #snapshotLastCompletedSequenceNumbers}. After pruning, the entry for that * checkpoint is removed. No-op when durable execution is disabled. * + *
Invariant: the {@code checkpointIdToSeqNums.remove} below and the {@code put} in + * {@link #snapshotLastCompletedSequenceNumbers} MUST share the same {@code actionStateStore != + * null} guard. Dropping the guard on either side breaks the symmetry and reintroduces the + * unbounded-map leak tracked by + * issue #645. + * * @param checkpointId the id of the completed checkpoint. */ void notifyCheckpointComplete(long checkpointId) { @@ -365,6 +371,12 @@ void snapshotRecoveryMarker() throws Exception { * strictly up to the sequence number that was committed by that checkpoint. No-op when durable * execution is disabled. * + *
Invariant: the {@code checkpointIdToSeqNums.put} below and the {@code remove} in + * {@link #notifyCheckpointComplete(long)} MUST share the same {@code actionStateStore != null} + * guard. Dropping the guard on either side breaks the symmetry and reintroduces the + * unbounded-map leak tracked by + * issue #645. + * * @param keyedStateBackend the keyed state backend to scan. * @param checkpointId the id of the checkpoint being snapshotted. */ diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java index 6c737d47b..f5fc2a172 100644 --- a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java @@ -43,6 +43,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; /** Contract tests for {@link DurableExecutionManager}. */ @@ -69,6 +70,30 @@ void noStoreModeMakesAllMaybeOperationsNoOp() throws Exception { dem.close(); } + @Test + @SuppressWarnings("unchecked") + void noStoreModeSnapshotAndNotifyKeepCheckpointMapEmpty() throws Exception { + DurableExecutionManager dem = new DurableExecutionManager(null); + KeyedStateBackend