diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java index fcc0bd1776eb5..1007f7747882f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java @@ -128,33 +128,30 @@ public void testBasicProducerConsumerTracing() throws Exception { // Force flush tracer provider flushSpans(); - // Verify spans - at least one span should be created + // Verify spans - expected 2 spans to be created List spans = spanExporter.getFinishedSpanItems(); - assertTrue(spans.size() > 0, "Expected at least one span, got: " + spans.size()); + assertEquals(spans.size(), 2, "Expected 2 spans, got: " + spans.size()); - // Verify producer span if present - spans.stream() + // Verify producer span + SpanData producerSpan = spans.stream() .filter(s -> s.getKind() == SpanKind.PRODUCER) .findFirst() - .ifPresent(producerSpan -> { - assertEquals(producerSpan.getName(), "send " + topic); - assertEquals(producerSpan.getAttributes().get( - io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")), "pulsar"); - }); + .orElseThrow(); + assertEquals(producerSpan.getName(), "send " + topic); + assertEquals(producerSpan.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")), "pulsar"); - // Verify consumer span if present - spans.stream() + // Verify consumer span + SpanData consumerSpan = spans.stream() .filter(s -> s.getKind() == SpanKind.CONSUMER) .findFirst() - .ifPresent(consumerSpan -> { - assertEquals(consumerSpan.getName(), "process " + topic); - assertEquals(consumerSpan.getAttributes().get( - io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")), "pulsar"); - assertEquals(consumerSpan.getAttributes().get( - io.opentelemetry.api.common.AttributeKey.stringKey( - "messaging.pulsar.acknowledgment.type")), - "acknowledge"); - }); + .orElseThrow(); + assertEquals(consumerSpan.getName(), "process " + topic); + assertEquals(consumerSpan.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")), "pulsar"); + assertEquals(consumerSpan.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey("messaging.pulsar.acknowledgment.type")), + "acknowledge"); } @Test @@ -720,14 +717,16 @@ public void testBatchMessagesTracing() throws Exception { .subscriptionName("test-sub") .subscribe(); + int numMessages = 5; + // Send batch of messages - for (int i = 0; i < 5; i++) { + for (int i = 0; i < numMessages; i++) { producer.sendAsync("message-" + i); } producer.flush(); // Receive and acknowledge all messages - for (int i = 0; i < 5; i++) { + for (int i = 0; i < numMessages; i++) { Message msg = consumer.receive(5, TimeUnit.SECONDS); assertNotNull(msg); consumer.acknowledge(msg); @@ -741,9 +740,10 @@ public void testBatchMessagesTracing() throws Exception { flushSpans(); // Verify spans for batched messages - // Note: Tracing behavior may vary for batched messages depending on when spans are created List spans = spanExporter.getFinishedSpanItems(); - assertTrue(spans.size() > 0, "Expected at least some spans for batched messages"); + int expectedNumSpans = numMessages * 2; + assertEquals(spans.size(), expectedNumSpans, + "Expected " + expectedNumSpans + " spans for batched messages, got " + spans.size()); // Verify that spans have correct attributes spans.stream() diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index a4b9a52a7fb73..296e854c1d560 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -233,7 +233,8 @@ public CompletableFuture> subscribeAsync() { effectiveInterceptors = new java.util.ArrayList<>(effectiveInterceptors); } effectiveInterceptors.add( - new org.apache.pulsar.client.impl.tracing.OpenTelemetryConsumerInterceptor<>()); + new org.apache.pulsar.client.impl.tracing.OpenTelemetryConsumerInterceptor<>( + client.instrumentProvider())); } if (effectiveInterceptors == null || effectiveInterceptors.size() == 0) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java index 9b9c2c07e4958..9242cfd6a08cf 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java @@ -116,7 +116,8 @@ public CompletableFuture> createAsync() { } else { effectiveInterceptors = new ArrayList<>(effectiveInterceptors); } - effectiveInterceptors.add(new org.apache.pulsar.client.impl.tracing.OpenTelemetryProducerInterceptor()); + effectiveInterceptors.add(new org.apache.pulsar.client.impl.tracing.OpenTelemetryProducerInterceptor( + client.instrumentProvider())); } return effectiveInterceptors == null || effectiveInterceptors.size() == 0 diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryConsumerInterceptor.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryConsumerInterceptor.java index 145ef5fa8c914..79d13cd22493b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryConsumerInterceptor.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryConsumerInterceptor.java @@ -33,11 +33,8 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageIdAdv; -import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.TopicMessageId; import org.apache.pulsar.client.api.TraceableMessageId; -import org.apache.pulsar.client.impl.ConsumerBase; -import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.metrics.InstrumentProvider; /** @@ -64,11 +61,10 @@ @CustomLog public class OpenTelemetryConsumerInterceptor implements ConsumerInterceptor { - private Tracer tracer; - private TextMapPropagator propagator; + private final Tracer tracer; + private final TextMapPropagator propagator; private String topic; private String subscription; - private boolean initialized = false; /** * Used for cumulative acknowledgment support (Failover/Exclusive subscriptions). @@ -80,10 +76,12 @@ public class OpenTelemetryConsumerInterceptor implements ConsumerInterceptor< * instance handles messages from multiple topic partitions. Cumulative ack only affects * messages from the same topic partition. */ - private volatile Map> messageSpansByTopic; + private final Map> messageSpansByTopic; - public OpenTelemetryConsumerInterceptor() { - // Tracer and propagator will be initialized in beforeConsume when we have access to the consumer + public OpenTelemetryConsumerInterceptor(InstrumentProvider instrumentProvider) { + this.tracer = instrumentProvider.getTracer(); + this.propagator = GlobalOpenTelemetry.getPropagators().getTextMapPropagator(); + this.messageSpansByTopic = new ConcurrentHashMap<>(); } /** @@ -97,52 +95,17 @@ private String getTopicKey(MessageId messageId) { return topic != null ? topic : ""; } - /** - * Initialize the tracer from the consumer's client. - * This is called lazily on the first message. - */ - private void initializeIfNeeded(Consumer consumer) { - if (!initialized && consumer instanceof ConsumerBase consumerBase) { - PulsarClientImpl client = consumerBase.getClient(); - InstrumentProvider instrumentProvider = client.instrumentProvider(); - - this.tracer = instrumentProvider.getTracer(); - this.propagator = GlobalOpenTelemetry.getPropagators().getTextMapPropagator(); - this.initialized = true; - if (consumerBase.getConf().getSubscriptionType() == SubscriptionType.Exclusive - || consumerBase.getConf().getSubscriptionType() == SubscriptionType.Failover) { - ensureMapInitialized(); - } - } - } - - /** - * Ensure the map is initialized for cumulative acknowledgment support. - * This is called when we detect cumulative ack is being used. - */ - private void ensureMapInitialized() { - if (messageSpansByTopic == null) { - messageSpansByTopic = new ConcurrentHashMap<>(); - log.debug("Initialized message spans map for cumulative acknowledgment support"); - } - } - @Override public void close() { // Clean up any remaining spans for Failover/Exclusive subscriptions - if (messageSpansByTopic != null) { - messageSpansByTopic.values().forEach(topicSpans -> - topicSpans.values().forEach(TracingContext::endSpan) - ); - messageSpansByTopic.clear(); - } + messageSpansByTopic.values().forEach(topicSpans -> + topicSpans.values().forEach(TracingContext::endSpan) + ); + messageSpansByTopic.clear(); } @Override public Message beforeConsume(Consumer consumer, Message message) { - // Initialize tracer from consumer on first call - initializeIfNeeded(consumer); - if (tracer == null || propagator == null) { return message; } @@ -162,7 +125,7 @@ public Message beforeConsume(Consumer consumer, Message message) { MessageId messageId = message.getMessageId(); // Store in map for cumulative ack support (Failover/Exclusive) - if (messageSpansByTopic != null && messageId instanceof MessageIdAdv) { + if (messageId instanceof MessageIdAdv) { String topicKey = getTopicKey(messageId); messageSpansByTopic.computeIfAbsent(topicKey, k -> new ConcurrentSkipListMap<>()).put((MessageIdAdv) messageId, span); @@ -204,7 +167,7 @@ public void onAcknowledge(Consumer consumer, MessageId messageId, Throwable e ((TraceableMessageId) messageId).setTracingSpan(null); // Remove from map if it exists (Failover/Exclusive) - if (messageSpansByTopic != null && messageId instanceof MessageIdAdv) { + if (messageId instanceof MessageIdAdv) { String topicKey = getTopicKey(messageId); ConcurrentSkipListMap topicSpans = messageSpansByTopic.get(topicKey); if (topicSpans != null) { @@ -244,8 +207,7 @@ public void onAcknowledgeCumulative(Consumer consumer, MessageId messageId, T String topicKey = getTopicKey(messageId); // Get the topic-specific map - ConcurrentSkipListMap topicSpans = messageSpansByTopic != null - ? messageSpansByTopic.get(topicKey) : null; + ConcurrentSkipListMap topicSpans = messageSpansByTopic.get(topicKey); // First, try to get the span for the cumulative ack position itself Span currentSpan = null; @@ -295,7 +257,7 @@ public void onAcknowledgeCumulative(Consumer consumer, MessageId messageId, T } // If the cumulative ack position span wasn't in the map, end it directly - if (currentSpan != null && messageId instanceof TraceableMessageId) { + if (currentSpan != null) { try { if (exception != null) { TracingContext.endSpan(currentSpan, exception); @@ -327,7 +289,7 @@ public void onNegativeAcksSend(Consumer consumer, Set messageIds) ((TraceableMessageId) messageId).setTracingSpan(null); // Remove from map if it exists (Failover/Exclusive) - if (messageSpansByTopic != null && messageId instanceof MessageIdAdv) { + if (messageId instanceof MessageIdAdv) { String topicKey = getTopicKey(messageId); ConcurrentSkipListMap topicSpans = messageSpansByTopic.get(topicKey); if (topicSpans != null) { @@ -359,7 +321,7 @@ public void onAckTimeoutSend(Consumer consumer, Set messageIds) { ((TraceableMessageId) messageId).setTracingSpan(null); // Remove from map if it exists (Failover/Exclusive) - if (messageSpansByTopic != null && messageId instanceof MessageIdAdv) { + if (messageId instanceof MessageIdAdv) { String topicKey = getTopicKey(messageId); ConcurrentSkipListMap topicSpans = messageSpansByTopic.get(topicKey); if (topicSpans != null) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java index aa3fea9615582..e93e68e9a969a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java @@ -29,8 +29,6 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.TraceableMessage; import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; -import org.apache.pulsar.client.impl.ProducerBase; -import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.metrics.InstrumentProvider; /** @@ -45,28 +43,13 @@ @CustomLog public class OpenTelemetryProducerInterceptor implements ProducerInterceptor { - private Tracer tracer; - private TextMapPropagator propagator; + private final Tracer tracer; + private final TextMapPropagator propagator; private String topic; - private boolean initialized = false; - public OpenTelemetryProducerInterceptor() { - // Tracer and propagator will be initialized in beforeSend when we have access to the producer - } - - /** - * Initialize the tracer from the producer's client. - * This is called lazily on the first message. - */ - private void initializeIfNeeded(Producer producer) { - if (!initialized && producer instanceof ProducerBase producerBase) { - PulsarClientImpl client = producerBase.getClient(); - InstrumentProvider instrumentProvider = client.instrumentProvider(); - - this.tracer = instrumentProvider.getTracer(); - this.propagator = GlobalOpenTelemetry.getPropagators().getTextMapPropagator(); - this.initialized = true; - } + public OpenTelemetryProducerInterceptor(InstrumentProvider instrumentProvider) { + this.tracer = instrumentProvider.getTracer(); + this.propagator = GlobalOpenTelemetry.getPropagators().getTextMapPropagator(); } @Override @@ -82,9 +65,6 @@ public boolean eligible(Message message) { @Override public Message beforeSend(Producer producer, Message message) { - // Initialize tracer from producer on first call - initializeIfNeeded(producer); - if (!eligible(message)) { return message; }