Skip to content

Commit f00a07b

Browse files
fix(v3): prevent SERVER span context leak in Kafka consumer; add SpanLinks
The Kafka consumer batch handler was inheriting the active Context.current() when building its CONSUMER span. On Vert.x's event loop, this meant the last HTTP SERVER span's context leaked into consumer spans, making them appear as children of unrelated HTTP traces. Fixes: - Use Context.root() as explicit parent in tracedBatchHandler so consumer spans are always root spans, independent of any leaked HTTP context - Extract traceparent from each Kafka record's headers and add as SpanLink, connecting the consumer span to its producer span(s) per OTel messaging semconv (async decoupled relationship → SpanLink, not parent/child) Tests added: - consumerSpanIsRootSpanEvenWhenServerSpanIsActive: verifies Context.root() prevents SERVER span context leak - consumerSpanLinksToProducerViaTraceparentHeader: verifies traceparent header extraction and SpanLink creation Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent cb579b8 commit f00a07b

File tree

2 files changed

+137
-0
lines changed

2 files changed

+137
-0
lines changed

vertx3-rxjava2-otel-autoconfigure/src/main/java/io/last9/tracing/otel/v3/KafkaTracing.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,30 @@
44
import io.opentelemetry.api.OpenTelemetry;
55
import io.opentelemetry.api.common.Attributes;
66
import io.opentelemetry.api.trace.Span;
7+
import io.opentelemetry.api.trace.SpanContext;
78
import io.opentelemetry.api.trace.SpanKind;
89
import io.opentelemetry.api.trace.StatusCode;
910
import io.opentelemetry.api.trace.Tracer;
1011
import io.opentelemetry.context.Context;
1112
import io.opentelemetry.context.Scope;
13+
import io.opentelemetry.context.propagation.TextMapGetter;
1214
import io.opentelemetry.context.propagation.TextMapSetter;
1315
import io.opentelemetry.semconv.ExceptionAttributes;
1416
import io.opentelemetry.semconv.SemanticAttributes;
1517
import io.reactivex.Single;
1618
import io.vertx.core.Handler;
19+
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
1720
import io.vertx.kafka.client.consumer.KafkaConsumerRecords;
21+
import io.vertx.kafka.client.producer.KafkaHeader;
1822
import io.vertx.kafka.client.producer.RecordMetadata;
1923
import io.vertx.reactivex.kafka.client.consumer.KafkaConsumer;
2024
import io.vertx.reactivex.kafka.client.producer.KafkaProducer;
2125
import io.vertx.reactivex.kafka.client.producer.KafkaProducerRecord;
2226

27+
import java.nio.charset.StandardCharsets;
28+
import java.util.Iterator;
29+
import java.util.List;
30+
2331
/**
2432
* Utility for adding OpenTelemetry tracing to Vert.x 3 Kafka producers and consumers.
2533
*
@@ -63,6 +71,35 @@ public final class KafkaTracing {
6371
private static final TextMapSetter<KafkaProducerRecord> RECORD_SETTER =
6472
(record, key, value) -> record.addHeader(key, value);
6573

74+
/**
75+
* {@link TextMapGetter} that extracts trace context from Kafka consumer record headers.
76+
* Used to extract the producer's {@code traceparent} and add it as a
77+
* {@link io.opentelemetry.api.trace.SpanContext} link on the CONSUMER span.
78+
*/
79+
@SuppressWarnings({"rawtypes", "unchecked"})
80+
private static final TextMapGetter<KafkaConsumerRecord> RECORD_GETTER =
81+
new TextMapGetter<KafkaConsumerRecord>() {
82+
@Override
83+
public Iterable<String> keys(KafkaConsumerRecord record) {
84+
List<String> keys = new java.util.ArrayList<>();
85+
for (KafkaHeader h : (List<KafkaHeader>) record.headers()) {
86+
keys.add(h.key());
87+
}
88+
return keys;
89+
}
90+
91+
@Override
92+
public String get(KafkaConsumerRecord record, String key) {
93+
if (record == null) return null;
94+
for (KafkaHeader h : (List<KafkaHeader>) record.headers()) {
95+
if (key.equals(h.key())) {
96+
return new String(h.value().getBytes(), StandardCharsets.UTF_8);
97+
}
98+
}
99+
return null;
100+
}
101+
};
102+
66103
private KafkaTracing() {
67104
// Utility class
68105
}
@@ -129,14 +166,20 @@ public static <K, V> Handler<KafkaConsumerRecords<K, V>> tracedBatchHandler(
129166
* @param openTelemetry the OpenTelemetry instance to use
130167
* @return a wrapped handler that creates and ends a CONSUMER span around each batch
131168
*/
169+
@SuppressWarnings({"unchecked", "rawtypes"})
132170
public static <K, V> Handler<KafkaConsumerRecords<K, V>> tracedBatchHandler(
133171
String topic,
134172
String consumerGroup,
135173
Handler<KafkaConsumerRecords<K, V>> delegate,
136174
OpenTelemetry openTelemetry) {
137175
Tracer tracer = openTelemetry.getTracer(TRACER_NAME);
138176
return records -> {
177+
// Use Context.root() as parent to prevent leaked HTTP SERVER span context from
178+
// polluting Kafka consumer spans (same fix as TracedRouter for HTTP handlers).
179+
// Consumer spans are root spans per OTel messaging semconv — they link to producers
180+
// via SpanLink rather than parent/child to model the async relationship accurately.
139181
var spanBuilder = tracer.spanBuilder(topic + " process")
182+
.setParent(Context.root())
140183
.setSpanKind(SpanKind.CONSUMER)
141184
.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "kafka")
142185
.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_NAME, topic)
@@ -150,6 +193,19 @@ public static <K, V> Handler<KafkaConsumerRecords<K, V>> tracedBatchHandler(
150193
consumerGroup);
151194
}
152195

196+
// Add a SpanLink for each record that carries a traceparent header.
197+
// This connects the consumer span to the producer span(s) without making it
198+
// a child — correct for async messaging where producer and consumer are decoupled.
199+
for (int i = 0; i < records.size(); i++) {
200+
KafkaConsumerRecord record = records.recordAt(i);
201+
Context producerCtx = openTelemetry.getPropagators().getTextMapPropagator()
202+
.extract(Context.root(), record, RECORD_GETTER);
203+
SpanContext producerSpanCtx = Span.fromContext(producerCtx).getSpanContext();
204+
if (producerSpanCtx.isValid()) {
205+
spanBuilder.addLink(producerSpanCtx);
206+
}
207+
}
208+
153209
Span span = spanBuilder.startSpan();
154210
try (Scope ignored = span.makeCurrent()) {
155211
delegate.handle(records);

vertx3-rxjava2-otel-autoconfigure/src/test/java/io/last9/tracing/otel/v3/KafkaTracingTest.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io.opentelemetry.api.trace.Span;
55
import io.opentelemetry.api.trace.SpanKind;
66
import io.opentelemetry.api.trace.StatusCode;
7+
import io.opentelemetry.context.Scope;
78
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
89
import io.opentelemetry.sdk.trace.data.SpanData;
910
import io.reactivex.Single;
@@ -19,6 +20,7 @@
1920
import org.junit.jupiter.api.BeforeEach;
2021
import org.junit.jupiter.api.Test;
2122

23+
import java.nio.charset.StandardCharsets;
2224
import java.util.ArrayList;
2325
import java.util.Collections;
2426
import java.util.List;
@@ -294,6 +296,85 @@ void producerSendTombstoneSetsMsgTombstoneAttribute() {
294296
.isTrue();
295297
}
296298

299+
// ---- Context isolation tests ----
300+
301+
@Test
302+
void consumerSpanIsRootSpanEvenWhenServerSpanIsActive() {
303+
// Reproduce the SERVER span context leak: a Vert.x HTTP SERVER span is active
304+
// on the event loop thread. Without Context.root() as parent, the consumer span
305+
// would become a child of that SERVER span — producing a wrong trace.
306+
Span serverSpan = otel.getOpenTelemetry().getTracer("test")
307+
.spanBuilder("GET /api/data")
308+
.setSpanKind(SpanKind.SERVER)
309+
.startSpan();
310+
311+
try (Scope ignored = serverSpan.makeCurrent()) {
312+
Handler<KafkaConsumerRecords<String, String>> traced = KafkaTracing.tracedBatchHandler(
313+
"orders",
314+
records -> {},
315+
otel.getOpenTelemetry()
316+
);
317+
traced.handle(emptyBatch());
318+
} finally {
319+
serverSpan.end();
320+
}
321+
322+
List<SpanData> spans = spanExporter.getFinishedSpanItems();
323+
SpanData consumerSpan = spans.stream()
324+
.filter(s -> s.getKind() == SpanKind.CONSUMER)
325+
.findFirst().orElseThrow();
326+
327+
// Consumer span must be a root span — not a child of the HTTP SERVER span
328+
assertThat(consumerSpan.getParentSpanContext().isValid()).isFalse();
329+
}
330+
331+
@Test
332+
void consumerSpanLinksToProducerViaTraceparentHeader() {
333+
// Simulate a producer span with a known trace/span ID
334+
Span producerSpan = otel.getOpenTelemetry().getTracer("test")
335+
.spanBuilder("orders publish")
336+
.setSpanKind(SpanKind.PRODUCER)
337+
.startSpan();
338+
String producerTraceId = producerSpan.getSpanContext().getTraceId();
339+
String producerSpanId = producerSpan.getSpanContext().getSpanId();
340+
producerSpan.end();
341+
342+
// Build a W3C traceparent header value (same format TracedKafkaProducer injects)
343+
String traceparent = "00-" + producerTraceId + "-" + producerSpanId + "-01";
344+
345+
// Create a Kafka ConsumerRecord carrying the traceparent header
346+
ConsumerRecord<String, String> rawRecord =
347+
new ConsumerRecord<>("orders", 0, 0L, "key-1", "value-1");
348+
rawRecord.headers().add("traceparent", traceparent.getBytes(StandardCharsets.UTF_8));
349+
350+
ConsumerRecords<String, String> raw = new ConsumerRecords<>(
351+
Collections.singletonMap(new TopicPartition("orders", 0),
352+
Collections.singletonList(rawRecord)));
353+
354+
Handler<KafkaConsumerRecords<String, String>> traced = KafkaTracing.tracedBatchHandler(
355+
"orders",
356+
records -> {},
357+
otel.getOpenTelemetry()
358+
);
359+
traced.handle(new KafkaConsumerRecordsImpl<>(raw));
360+
361+
List<SpanData> spans = spanExporter.getFinishedSpanItems();
362+
// Only the consumer span — producer span ended before the handler ran
363+
SpanData consumerSpan = spans.stream()
364+
.filter(s -> s.getKind() == SpanKind.CONSUMER)
365+
.findFirst().orElseThrow();
366+
367+
// Consumer span must be a root span (no parent)
368+
assertThat(consumerSpan.getParentSpanContext().isValid()).isFalse();
369+
370+
// Consumer span must carry a SpanLink pointing to the producer span
371+
assertThat(consumerSpan.getLinks()).hasSize(1);
372+
assertThat(consumerSpan.getLinks().get(0).getSpanContext().getTraceId())
373+
.isEqualTo(producerTraceId);
374+
assertThat(consumerSpan.getLinks().get(0).getSpanContext().getSpanId())
375+
.isEqualTo(producerSpanId);
376+
}
377+
297378
// ---- Helpers ----
298379

299380
private KafkaConsumerRecords<String, String> emptyBatch() {

0 commit comments

Comments
 (0)