-
Notifications
You must be signed in to change notification settings - Fork 413
[CELEBORN-2252] Skip writing data from other attempts. #3590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…e map attempts is completed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds logic to skip writing data from map task attempts that are not the completed attempt. When a map task completes and commits, the attempt ID is stored in shuffleMapperAttempts. If subsequent push data requests arrive from other attempts of the same map task, they are now rejected rather than written to disk.
Changes:
- Added attempt validation logic in the
writeDatanested function to check if incoming data is from the completed map attempt - Data from non-completed attempts now calls
decrementPendingWrites()instead offileWriter.write(body)and logs an info message
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| val (endedAttempt, toWrite, curMapId, curMapAttempt) = | ||
| if (shuffleMapperAttempts.containsKey(shuffleKey)) { | ||
| val (mapId, attemptId) = getMapAttempt(body) | ||
| val endedAttemptId = shuffleMapperAttempts.get(shuffleKey).get(mapId) |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential ArrayIndexOutOfBoundsException when accessing shuffleMapperAttempts. If mapId is greater than or equal to the length of the AtomicIntegerArray, this will throw an exception. Consider adding a bounds check before accessing the array, similar to how other parts of the codebase handle this.
| val toWriteAttempt = attemptId == endedAttemptId | ||
| (endedAttemptId, toWriteAttempt, mapId, attemptId) | ||
| } else (-1, true, -1, -1) | ||
| if (endedAttempt == -1 || toWrite) { |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic for determining whether to write has a subtle issue. When endedAttemptId is -1 (meaning the map task has not completed yet), the check 'attemptId == endedAttemptId' will only be true if attemptId is also -1, which is unlikely to be a valid scenario. The condition on line 1532 'endedAttempt == -1 || toWrite' correctly handles this by allowing writes when endedAttempt is -1, but the logic could be clearer. Consider restructuring to: if endedAttemptId is -1, always write; otherwise only write if attemptId matches endedAttemptId.
| val toWriteAttempt = attemptId == endedAttemptId | |
| (endedAttemptId, toWriteAttempt, mapId, attemptId) | |
| } else (-1, true, -1, -1) | |
| if (endedAttempt == -1 || toWrite) { | |
| val toWriteAttempt = | |
| if (endedAttemptId == -1) { | |
| // Map task has not completed yet, always write | |
| true | |
| } else { | |
| // Only write if the current attempt matches the ended attempt | |
| attemptId == endedAttemptId | |
| } | |
| (endedAttemptId, toWriteAttempt, mapId, attemptId) | |
| } else (-1, true, -1, -1) | |
| if (toWrite) { |
| fileWriter.write(body) | ||
| } else { | ||
| fileWriter.decrementPendingWrites() | ||
| logInfo(s"Shuffle $shuffleKey is committing, map $curMapId, ignore attemptId $curMapAttempt data and endedAttempt $endedAttempt") |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The log message could be more clear about what is happening. Currently it says "Shuffle $shuffleKey is committing" but at this point the shuffle has already been committed (the map task ended). Consider rephrasing to something like "Skipping data from map $curMapId attempt $curMapAttempt because map already completed with attempt $endedAttempt" for better clarity.
| logInfo(s"Shuffle $shuffleKey is committing, map $curMapId, ignore attemptId $curMapAttempt data and endedAttempt $endedAttempt") | |
| logInfo( | |
| s"Skipping data from map $curMapId attempt $curMapAttempt for shuffle $shuffleKey " + | |
| s"because map already completed with attempt $endedAttempt") |
| val (endedAttempt, toWrite, curMapId, curMapAttempt) = | ||
| if (shuffleMapperAttempts.containsKey(shuffleKey)) { | ||
| val (mapId, attemptId) = getMapAttempt(body) | ||
| val endedAttemptId = shuffleMapperAttempts.get(shuffleKey).get(mapId) | ||
| val toWriteAttempt = attemptId == endedAttemptId | ||
| (endedAttemptId, toWriteAttempt, mapId, attemptId) | ||
| } else (-1, true, -1, -1) | ||
| if (endedAttempt == -1 || toWrite) { | ||
| fileWriter.write(body) | ||
| } else { | ||
| fileWriter.decrementPendingWrites() | ||
| logInfo(s"Shuffle $shuffleKey is committing, map $curMapId, ignore attemptId $curMapAttempt data and endedAttempt $endedAttempt") | ||
| } |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new logic for skipping data from non-ended map attempts lacks test coverage. Consider adding unit tests to verify that: 1) data from the ended attempt is written successfully, 2) data from other attempts is skipped and decrementPendingWrites is called, 3) edge cases like mapId out of bounds are handled gracefully, and 4) the behavior when shuffleMapperAttempts doesn't contain the shuffle key.
…e map attempts is completed
What changes were proposed in this pull request?
Skip writing data from other attempts after one of the map attempts is completed.
Why are the changes needed?
Does this PR resolve a correctness bug?
Does this PR introduce any user-facing change?
How was this patch tested?