Skip to content

Commit 0d9ae49

Browse files
authored
KAFKA-17302: APIs for Follower Fetch from Last Tiered Offset (#21153)
In this PR, we are adding the methods required for supporting follower fetch from Last Tiered Offset. - Added new method `shouldFetchFromLastTieredOffset` in AbstractFetcherThread to determine if the folllower should fetch from the last tiered offset for the remote storage enabled topic - Added new API `fetchEarliestPendingUploadOffset` to LeaderEndPoint for retrieving the offset from where it should start replicating data from the leader. Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
1 parent 3a950f4 commit 0d9ae49

File tree

11 files changed

+201
-11
lines changed

11 files changed

+201
-11
lines changed

core/src/main/scala/kafka/server/AbstractFetcherThread.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ abstract class AbstractFetcherThread(name: String,
101101

102102
protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Optional[OffsetAndEpoch]
103103

104+
protected def shouldFetchFromLastTieredOffset(topicPartition: TopicPartition, leaderEndOffset: Long, replicaEndOffset: Long): Boolean
105+
104106
override def shutdown(): Unit = {
105107
initiateShutdown()
106108
inLock(partitionMapLock) {

core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,34 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
135135
new OffsetAndEpoch(localLogStartOffset, epoch.orElse(0))
136136
}
137137

138+
override def fetchEarliestPendingUploadOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = {
139+
val partition = replicaManager.getPartitionOrException(topicPartition)
140+
val log = partition.localLogOrException
141+
142+
if (!log.remoteLogEnabled()) {
143+
new OffsetAndEpoch(-1L, -1)
144+
} else {
145+
val highestRemoteOffset = log.highestOffsetInRemoteStorage()
146+
val logStartOffset = fetchEarliestOffset(topicPartition, currentLeaderEpoch)
147+
148+
highestRemoteOffset match {
149+
case -1L =>
150+
val localLogStartOffset = fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch)
151+
if (localLogStartOffset.offset() == logStartOffset.offset()) {
152+
// No segments have been uploaded yet
153+
logStartOffset
154+
} else {
155+
// Leader currently does not know about the already uploaded segments
156+
new OffsetAndEpoch(-1L, -1)
157+
}
158+
case _ =>
159+
val earliestPendingUploadOffset = Math.max(highestRemoteOffset + 1, logStartOffset.offset())
160+
val epoch = log.leaderEpochCache.epochForOffset(earliestPendingUploadOffset)
161+
new OffsetAndEpoch(earliestPendingUploadOffset, epoch.orElse(0))
162+
}
163+
}
164+
}
165+
138166
override def fetchEpochEndOffsets(partitions: util.Map[TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition]): util.Map[TopicPartition, EpochEndOffset] = {
139167
partitions.asScala.map { case (tp, epochData) =>
140168
try {

core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,10 @@ class RemoteLeaderEndPoint(logPrefix: String,
105105
fetchOffset(topicPartition, currentLeaderEpoch, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)
106106
}
107107

108+
override def fetchEarliestPendingUploadOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = {
109+
fetchOffset(topicPartition, currentLeaderEpoch, ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
110+
}
111+
108112
private def fetchOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int, timestamp: Long): OffsetAndEpoch = {
109113
val topic = new ListOffsetsTopic()
110114
.setName(topicPartition.topic)

core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ class ReplicaAlterLogDirsThread(name: String,
6868
replicaMgr.futureLocalLogOrException(topicPartition).endOffsetForEpoch(epoch)
6969
}
7070

71+
override protected def shouldFetchFromLastTieredOffset(topicPartition: TopicPartition, leaderEndOffset: Long, replicaEndOffset: Long): Boolean = false
72+
7173
// process fetched data
7274
override def processPartitionData(
7375
topicPartition: TopicPartition,

core/src/main/scala/kafka/server/ReplicaFetcherThread.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,17 @@ class ReplicaFetcherThread(name: String,
6363
replicaMgr.localLogOrException(topicPartition).endOffsetForEpoch(epoch)
6464
}
6565

66+
override protected[server] def shouldFetchFromLastTieredOffset(topicPartition: TopicPartition, leaderEndOffset: Long, replicaEndOffset: Long): Boolean = {
67+
val isCompactTopic = replicaMgr.localLog(topicPartition).exists(_.config.compact)
68+
val remoteStorageEnabled = replicaMgr.localLog(topicPartition).exists(_.remoteLogEnabled())
69+
70+
brokerConfig.followerFetchLastTieredOffsetEnable &&
71+
remoteStorageEnabled &&
72+
!isCompactTopic &&
73+
replicaEndOffset == 0 &&
74+
leaderEndOffset != 0
75+
}
76+
6677
override def initiateShutdown(): Boolean = {
6778
val justShutdown = super.initiateShutdown()
6879
if (justShutdown) {

core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package kafka.server
2020
import kafka.server.QuotaFactory.QuotaManagers
2121
import kafka.utils.{CoreUtils, Logging, TestUtils}
2222
import org.apache.kafka.common.compress.Compression
23+
import org.apache.kafka.common.config.TopicConfig
2324
import org.apache.kafka.common.{TopicIdPartition, Uuid}
2425
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
2526
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
@@ -34,13 +35,13 @@ import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, OffsetAndE
3435
import org.apache.kafka.server.network.BrokerEndPoint
3536
import org.apache.kafka.server.LeaderEndPoint
3637
import org.apache.kafka.server.util.{MockScheduler, MockTime}
37-
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogDirFailureChannel}
38+
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, LogDirFailureChannel}
3839
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
3940
import org.junit.jupiter.api.Assertions._
4041
import org.mockito.Mockito.mock
4142

4243
import java.io.File
43-
import java.util.{Map => JMap}
44+
import java.util.{Properties, Map => JMap}
4445
import scala.collection.Map
4546
import scala.jdk.CollectionConverters._
4647

@@ -62,7 +63,16 @@ class LocalLeaderEndPointTest extends Logging {
6263
def setUp(): Unit = {
6364
val props = TestUtils.createBrokerConfig(sourceBroker.id, port = sourceBroker.port)
6465
val config = KafkaConfig.fromProps(props)
65-
val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)))
66+
67+
val logProps = new Properties()
68+
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
69+
// Keep cleanup.policy=delete (default), not compact, so remote storage is allowed
70+
val defaultLogConfig = LogConfig.fromProps(Map.empty[String, Object].asJava, logProps)
71+
val mockLogMgr = TestUtils.createLogManager(
72+
config.logDirs.asScala.map(new File(_)),
73+
defaultConfig = defaultLogConfig,
74+
remoteStorageSystemEnable = true
75+
)
6676
val alterPartitionManager = mock(classOf[AlterPartitionManager])
6777
val metrics = new Metrics
6878
quotaManager = QuotaFactory.instantiate(config, metrics, time, "", "")
@@ -233,6 +243,46 @@ class LocalLeaderEndPointTest extends Logging {
233243
assertEquals(expected, result)
234244
}
235245

246+
@Test
247+
def testEarliestPendingUploadOffsetWhenNoSegmentsUploaded(): Unit = {
248+
// Append some records; no remote upload happened yet
249+
appendRecords(replicaManager, topicIdPartition, records)
250+
.onFire(response => assertEquals(Errors.NONE, response.error))
251+
252+
val expected = endPoint.fetchEarliestOffset(topicPartition, 0)
253+
val result = endPoint.fetchEarliestPendingUploadOffset(topicPartition, 0)
254+
assertEquals(expected, result)
255+
}
256+
257+
@Test
258+
def testEarliestPendingUploadOffsetWhenLocalStartGreaterThanStart(): Unit = {
259+
appendRecords(replicaManager, topicIdPartition, records)
260+
.onFire(response => assertEquals(Errors.NONE, response.error))
261+
262+
// Bump epoch and advance local log start offset without changing log start offset
263+
bumpLeaderEpoch()
264+
replicaManager.logManager.getLog(topicPartition).foreach(_.updateLocalLogStartOffset(3))
265+
266+
val result = endPoint.fetchEarliestPendingUploadOffset(topicPartition, 1)
267+
assertEquals(new OffsetAndEpoch(-1L, -1), result)
268+
}
269+
270+
@Test
271+
def testEarliestPendingUploadOffsetWhenHighestRemoteOffsetKnown(): Unit = {
272+
appendRecords(replicaManager, topicIdPartition, records)
273+
.onFire(response => assertEquals(Errors.NONE, response.error))
274+
275+
// Highest remote is 1 => earliest pending should be max(1+1, logStart)
276+
val log = replicaManager.getPartitionOrException(topicPartition).localLogOrException
277+
log.updateHighestOffsetInRemoteStorage(1)
278+
279+
val expectedOffset = Math.max(2L, log.logStartOffset())
280+
val epoch = log.leaderEpochCache().epochForOffset(expectedOffset).orElse(0)
281+
282+
val result = endPoint.fetchEarliestPendingUploadOffset(topicPartition, 0)
283+
assertEquals(new OffsetAndEpoch(expectedOffset, epoch), result)
284+
}
285+
236286
private class CallbackResult[T] {
237287
private var value: Option[T] = None
238288
private var fun: Option[T => Unit] = None

core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ package kafka.server
1818

1919
import com.yammer.metrics.core.Gauge
2020
import kafka.utils.TestUtils
21-
import org.apache.kafka.common.message.{FetchResponseData, OffsetForLeaderEpochRequestData}
2221
import org.apache.kafka.common.message.FetchResponseData.PartitionData
22+
import org.apache.kafka.common.message.{FetchResponseData, OffsetForLeaderEpochRequestData}
2323
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
2424
import org.apache.kafka.common.metrics.Metrics
2525
import org.apache.kafka.common.requests.FetchRequest
@@ -318,6 +318,8 @@ class AbstractFetcherManagerTest {
318318
override val isTruncationOnFetchSupported: Boolean = false
319319

320320
override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = new OffsetAndEpoch(1L, 0)
321+
322+
override def fetchEarliestPendingUploadOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = new OffsetAndEpoch(-1L, -1)
321323
}
322324

323325
private class MockResizeFetcherTierStateMachine extends TierStateMachine(null, null, false) {
@@ -354,6 +356,8 @@ class AbstractFetcherManagerTest {
354356
override protected def logEndOffset(topicPartition: TopicPartition): Long = 1
355357

356358
override protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Optional[OffsetAndEpoch] = Optional.of(new OffsetAndEpoch(1, 0))
359+
360+
override protected def shouldFetchFromLastTieredOffset(topicPartition: TopicPartition, leaderEndOffset: Long, replicaEndOffset: Long): Boolean = false
357361
}
358362

359363
@Test

core/src/test/scala/unit/kafka/server/MockFetcherThread.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint,
3838
val replicaId: Int = 0,
3939
val leaderId: Int = 1,
4040
fetchBackOffMs: Int = 0,
41-
failedPartitions: FailedPartitions = new FailedPartitions)
41+
failedPartitions: FailedPartitions = new FailedPartitions,
42+
fetchFromLastTieredOffset: Boolean = false)
4243
extends AbstractFetcherThread("mock-fetcher",
4344
clientId = "mock-fetcher",
4445
leader = mockLeader,
@@ -183,4 +184,6 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint,
183184
assertEquals(expectedEpoch, fetchState(partition).map(_.lastFetchedEpoch.get()))
184185
}
185186
}
187+
188+
override def shouldFetchFromLastTieredOffset(topicPartition: TopicPartition, leaderEndOffset: Long, replicaEndOffset: Long): Boolean = fetchFromLastTieredOffset
186189
}

core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,15 @@ class MockLeaderEndPoint(sourceBroker: BrokerEndPoint = new BrokerEndPoint(1, "l
135135
new OffsetAndEpoch(leaderState.localLogStartOffset, leaderState.leaderEpoch)
136136
}
137137

138+
override def fetchEarliestPendingUploadOffset(topicPartition: TopicPartition, leaderEpoch: Int): OffsetAndEpoch = {
139+
val leaderState = leaderPartitionState(topicPartition)
140+
checkLeaderEpochAndThrow(leaderEpoch, leaderState)
141+
leaderState.earliestPendingUploadOffset match {
142+
case -1L => new OffsetAndEpoch(-1L, -1)
143+
case _ => new OffsetAndEpoch(math.max(leaderState.earliestPendingUploadOffset, leaderState.logStartOffset), leaderState.leaderEpoch)
144+
}
145+
}
146+
138147
override def fetchEpochEndOffsets(partitions: java.util.Map[TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition]): java.util.Map[TopicPartition, EpochEndOffset] = {
139148
val endOffsets = new java.util.HashMap[TopicPartition, EpochEndOffset]()
140149
partitions.forEach { (partition, epochData) =>
@@ -262,13 +271,14 @@ class PartitionState(var log: mutable.Buffer[RecordBatch],
262271
var logEndOffset: Long,
263272
var highWatermark: Long,
264273
var rlmEnabled: Boolean = false,
265-
var localLogStartOffset: Long)
274+
var localLogStartOffset: Long,
275+
var earliestPendingUploadOffset: Long)
266276

267277
object PartitionState {
268-
def apply(log: Seq[RecordBatch], leaderEpoch: Int, highWatermark: Long, rlmEnabled: Boolean = false): PartitionState = {
278+
def apply(log: Seq[RecordBatch], leaderEpoch: Int, highWatermark: Long, rlmEnabled: Boolean = false, earliestPendingUploadOffset: Long = -1L): PartitionState = {
269279
val logStartOffset = log.headOption.map(_.baseOffset).getOrElse(0L)
270280
val logEndOffset = log.lastOption.map(_.nextOffset).getOrElse(0L)
271-
new PartitionState(log.toBuffer, leaderEpoch, logStartOffset, logEndOffset, highWatermark, rlmEnabled, logStartOffset)
281+
new PartitionState(log.toBuffer, leaderEpoch, logStartOffset, logEndOffset, highWatermark, rlmEnabled, logStartOffset, earliestPendingUploadOffset)
272282
}
273283

274284
def apply(leaderEpoch: Int): PartitionState = {

core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala

Lines changed: 68 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import kafka.server.epoch.util.MockBlockingSender
2424
import kafka.utils.TestUtils
2525
import org.apache.kafka.clients.FetchSessionHandler
2626
import org.apache.kafka.common.compress.Compression
27+
import org.apache.kafka.common.config.TopicConfig
2728
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
2829
import org.apache.kafka.common.message.FetchResponseData
2930
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
@@ -38,20 +39,21 @@ import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, OffsetAndE
3839
import org.apache.kafka.server.network.BrokerEndPoint
3940
import org.apache.kafka.server.ReplicaState
4041
import org.apache.kafka.server.PartitionFetchState
41-
import org.apache.kafka.storage.internals.log.{LogAppendInfo, UnifiedLog}
42+
import org.apache.kafka.server.config.ReplicationConfigs
43+
import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogConfig, UnifiedLog}
4244
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
4345
import org.junit.jupiter.api.Assertions._
4446
import org.junit.jupiter.api.{AfterEach, Test}
4547
import org.junit.jupiter.params.ParameterizedTest
46-
import org.junit.jupiter.params.provider.ValueSource
48+
import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
4749
import org.mockito.ArgumentCaptor
4850
import org.mockito.ArgumentMatchers.{any, anyBoolean, anyLong}
4951
import org.mockito.Mockito.{mock, times, verify, when}
5052

5153
import java.lang.{Long => JLong}
5254
import java.nio.charset.StandardCharsets
5355
import java.util
54-
import java.util.{Collections, Optional}
56+
import java.util.{Collections, Optional, Properties}
5557
import scala.collection.mutable
5658
import scala.jdk.CollectionConverters._
5759
import scala.jdk.OptionConverters._
@@ -818,4 +820,67 @@ class ReplicaFetcherThreadTest {
818820
when(replicaManager.localLogOrException(t2p1)).thenReturn(log)
819821
when(replicaManager.getPartitionOrException(t2p1)).thenReturn(partition)
820822
}
823+
824+
@ParameterizedTest
825+
@CsvSource(Array(
826+
"false, false, compact, 0, 0, false",
827+
"false, false, compact, 5, 0, false",
828+
"false, false, compact, 5, 1, false",
829+
"false, false, delete, 0, 0, false",
830+
"false, false, delete, 5, 0, false",
831+
"false, false, delete, 5, 1, false",
832+
"false, true, compact, 0, 0, false",
833+
"false, true, compact, 5, 0, false",
834+
"false, true, compact, 5, 1, false",
835+
"false, true, delete, 0, 0, false",
836+
"false, true, delete, 5, 0, false",
837+
"false, true, delete, 5, 1, false",
838+
"true, false, compact, 0, 0, false",
839+
"true, false, compact, 5, 0, false",
840+
"true, false, compact, 5, 1, false",
841+
"true, false, delete, 0, 0, false",
842+
"true, false, delete, 5, 0, false",
843+
"true, false, delete, 5, 1, false",
844+
"true, true, compact, 0, 0, false",
845+
"true, true, compact, 5, 0, false",
846+
"true, true, compact, 5, 1, false",
847+
"true, true, delete, 0, 0, false",
848+
"true, true, delete, 5, 0, true",
849+
"true, true, delete, 5, 1, false"))
850+
def testShouldFetchFromLastTieredOffset(enableLastTieredOffsetFetch: Boolean,
851+
remoteStorageEnabled: Boolean,
852+
cleanUpPolicy: String,
853+
leaderEndOffset: Long,
854+
replicaEndOffset: Long,
855+
expected: Boolean): Unit = {
856+
val tp = new TopicPartition("t", 0)
857+
858+
val props = TestUtils.createBrokerConfig(1)
859+
props.put(ReplicationConfigs.FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_CONFIG, String.valueOf(enableLastTieredOffsetFetch))
860+
val config = KafkaConfig.fromProps(props)
861+
862+
val mockBlockingSend: BlockingSend = mock(classOf[BlockingSend])
863+
when(mockBlockingSend.brokerEndPoint()).thenReturn(brokerEndPoint)
864+
865+
val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
866+
when(replicaManager.brokerTopicStats).thenReturn(new BrokerTopicStats)
867+
868+
val lcOverrides = new Properties()
869+
lcOverrides.put(TopicConfig.CLEANUP_POLICY_CONFIG, cleanUpPolicy)
870+
val logConfig = LogConfig.fromProps(config.extractLogConfigMap, lcOverrides)
871+
872+
val log: UnifiedLog = mock(classOf[UnifiedLog])
873+
when(log.config).thenReturn(logConfig)
874+
when(log.remoteLogEnabled()).thenReturn(remoteStorageEnabled)
875+
when(replicaManager.localLog(tp)).thenReturn(Some(log))
876+
877+
val logContext = new LogContext(s"[ReplicaFetcher replicaId=${config.brokerId}, leaderId=${mockBlockingSend.brokerEndPoint().id}, fetcherId=0] ")
878+
val fetchSessionHandler = new FetchSessionHandler(logContext, mockBlockingSend.brokerEndPoint().id)
879+
val leader = new RemoteLeaderEndPoint(logContext.logPrefix, mockBlockingSend, fetchSessionHandler,
880+
config, replicaManager, UNBOUNDED_QUOTA, () => MetadataVersion.MINIMUM_VERSION, () => 1)
881+
882+
val thread = new ReplicaFetcherThread("test-fetcher", leader, config, failedPartitions, replicaManager, UNBOUNDED_QUOTA, logContext.logPrefix)
883+
884+
assertEquals(expected, thread.shouldFetchFromLastTieredOffset(tp, leaderEndOffset, replicaEndOffset))
885+
}
821886
}

0 commit comments

Comments
 (0)