-
Notifications
You must be signed in to change notification settings - Fork 58
Description
We are consuming Kafka topic using Decaton processor with retrying.
However, ProcessorsBuilder.consuming(String topic, TaskExtractor<T> taskExtractor) is not working correctly with DefaultTaskExtractor.
retryTaskExtractor will unwrap DecatonTaskRequest using DefaultTaskExtractor, then taskExtractor.extract() here.
https://github.com/line/decaton/blob/v3.0.2/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java#L76
But if taskExtractor is DefaultTaskExtractor or a TaskExctractor which is delegating deserialization to DefaultTaskExtractor, deserialization will be failed on retryTaskExtractor and the retrying task will be discarded.
Stacktrace:
java.lang.IllegalArgumentException: com.linecorp.decaton.com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
at com.linecorp.decaton.processor.runtime.internal.DefaultTaskExtractor.extract(DefaultTaskExtractor.java:45)
at com.linecorp.bot.commons.decaton.processor.TimedTaskExtractor.extract(TimedTaskExtractor.kt:31)
at com.linecorp.decaton.processor.runtime.ProcessorsBuilder.lambda$consuming$1(ProcessorsBuilder.java:83)
at com.linecorp.decaton.processor.runtime.internal.ProcessPipeline.extract(ProcessPipeline.java:96)
at com.linecorp.decaton.processor.runtime.internal.ProcessPipeline.scheduleThenProcess(ProcessPipeline.java:68)
at com.linecorp.decaton.processor.runtime.internal.ProcessorUnit.processTask(ProcessorUnit.java:73)
at com.linecorp.decaton.processor.runtime.internal.ProcessorUnit.lambda$putTask$1(ProcessorUnit.java:60)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.linecorp.decaton.com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
at com.linecorp.decaton.com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:101)
at com.linecorp.decaton.com.google.protobuf.CodedInputStream$ArrayDecoder.readTag(CodedInputStream.java:551)
at com.linecorp.decaton.com.google.protobuf.CodedInputStream$ArrayDecoder.skipMessage(CodedInputStream.java:649)
at com.linecorp.decaton.com.google.protobuf.CodedInputStream$ArrayDecoder.skipField(CodedInputStream.java:581)
at com.linecorp.decaton.protocol.Decaton$DecatonTaskRequest.<init>(Decaton.java:1073)
at com.linecorp.decaton.protocol.Decaton$DecatonTaskRequest.<init>(Decaton.java:1041)
at com.linecorp.decaton.protocol.Decaton$DecatonTaskRequest$1.parsePartialFrom(Decaton.java:1638)
at com.linecorp.decaton.protocol.Decaton$DecatonTaskRequest$1.parsePartialFrom(Decaton.java:1633)
at com.linecorp.decaton.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:163)
at com.linecorp.decaton.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:197)
at com.linecorp.decaton.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:209)
at com.linecorp.decaton.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:214)
at com.linecorp.decaton.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
at com.linecorp.decaton.protocol.Decaton$DecatonTaskRequest.parseFrom(Decaton.java:1250)
at com.linecorp.decaton.processor.runtime.internal.DefaultTaskExtractor.extract(DefaultTaskExtractor.java:36)
... 9 common frames omitted
We passed original com.linecorp.bot.commons.decaton.processor.TimedTaskExtractor to taskExtractor which is like following (written in Kotlin).
This TimedTaskExtractor is for observing consuming delay.
The issue will cause with delegate = DefaultTaskExtractor.
class TimedTaskExtractor<T>(
private val delegate: TaskExtractor<T>,
subscription: String,
topic: String,
meterRegistry: MeterRegistry
) : TaskExtractor<T> {
private val timer = meterRegistry.timer(
"decaton.processor.${TimedTaskExtractor::class.simpleName?.toLowerCase()}.timestamp_delay",
Tags.of(Tag.of("topic", topic), Tag.of("subscription", subscription))
)
override fun extract(bytes: ByteArray): DecatonTask<T> {
return delegate.extract(bytes).also {
if (it.metadata().timestampMillis() > 0) {
timer.record(
System.currentTimeMillis() - it.metadata().timestampMillis(),
TimeUnit.MILLISECONDS
)
}
}
}
}