Skip to content

Commit 32d44d4

Browse files
committed
up
1 parent 6393f7c commit 32d44d4

File tree

1 file changed

+5
-2
lines changed

1 file changed

+5
-2
lines changed

client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ class ReducePartitionCommitHandler(
7878
// TODO: Move this to native Int -> Int Map
7979
private val shuffleToCompletedMappers = JavaUtils.newConcurrentHashMap[Int, Int]()
8080
private val shuffleIdLocks = JavaUtils.newConcurrentHashMap[Int, Object]()
81+
private val shuffleIdLocksRegisterFunc = new util.function.Function[Int, Object] {
82+
override def apply(key: Int): Object = new Object()
83+
}
8184

8285
private val stageEndTimeout = conf.clientPushStageEndTimeout
8386
private val mockShuffleLost = conf.testMockShuffleLost
@@ -317,7 +320,7 @@ class ReducePartitionCommitHandler(
317320
numPartitions: Int,
318321
crc32PerPartition: Array[Int],
319322
bytesWrittenPerPartition: Array[Long]): (Boolean, Boolean) = {
320-
val shuffleLock = shuffleIdLocks.computeIfAbsent(shuffleId, _ => new Object())
323+
val shuffleLock = shuffleIdLocks.computeIfAbsent(shuffleId, shuffleIdLocksRegisterFunc)
321324
val (mapperAttemptFinishedSuccess, allMapperFinished) = shuffleLock.synchronized {
322325
if (getMapperAttempts(shuffleId) == null) {
323326
logDebug(s"[handleMapperEnd] $shuffleId not registered, create one.")
@@ -432,7 +435,7 @@ class ReducePartitionCommitHandler(
432435
}
433436

434437
private def initMapperAttempts(shuffleId: Int, numMappers: Int, numPartitions: Int): Unit = {
435-
val shuffleLock = shuffleIdLocks.computeIfAbsent(shuffleId, _ => new Object())
438+
val shuffleLock = shuffleIdLocks.computeIfAbsent(shuffleId, shuffleIdLocksRegisterFunc)
436439
shuffleLock.synchronized {
437440
if (!shuffleMapperAttempts.containsKey(shuffleId)) {
438441
val attempts = new Array[Int](numMappers)

0 commit comments

Comments
 (0)