Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,33 +128,30 @@ public void testBasicProducerConsumerTracing() throws Exception {
// Force flush tracer provider
flushSpans();

// Verify spans - at least one span should be created
// Verify spans - expected 2 spans to be created
List<SpanData> spans = spanExporter.getFinishedSpanItems();
assertTrue(spans.size() > 0, "Expected at least one span, got: " + spans.size());
assertEquals(spans.size(), 2, "Expected 2 spans, got: " + spans.size());

// Verify producer span if present
spans.stream()
// Verify producer span
SpanData producerSpan = spans.stream()
.filter(s -> s.getKind() == SpanKind.PRODUCER)
.findFirst()
.ifPresent(producerSpan -> {
assertEquals(producerSpan.getName(), "send " + topic);
assertEquals(producerSpan.getAttributes().get(
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")), "pulsar");
});
.orElseThrow();
assertEquals(producerSpan.getName(), "send " + topic);
assertEquals(producerSpan.getAttributes().get(
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")), "pulsar");

// Verify consumer span if present
spans.stream()
// Verify consumer span
SpanData consumerSpan = spans.stream()
.filter(s -> s.getKind() == SpanKind.CONSUMER)
.findFirst()
.ifPresent(consumerSpan -> {
assertEquals(consumerSpan.getName(), "process " + topic);
assertEquals(consumerSpan.getAttributes().get(
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")), "pulsar");
assertEquals(consumerSpan.getAttributes().get(
io.opentelemetry.api.common.AttributeKey.stringKey(
"messaging.pulsar.acknowledgment.type")),
"acknowledge");
});
.orElseThrow();
assertEquals(consumerSpan.getName(), "process " + topic);
assertEquals(consumerSpan.getAttributes().get(
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")), "pulsar");
assertEquals(consumerSpan.getAttributes().get(
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.pulsar.acknowledgment.type")),
"acknowledge");
}

@Test
Expand Down Expand Up @@ -720,14 +717,16 @@ public void testBatchMessagesTracing() throws Exception {
.subscriptionName("test-sub")
.subscribe();

int numMessages = 5;

// Send batch of messages
for (int i = 0; i < 5; i++) {
for (int i = 0; i < numMessages; i++) {
producer.sendAsync("message-" + i);
}
producer.flush();

// Receive and acknowledge all messages
for (int i = 0; i < 5; i++) {
for (int i = 0; i < numMessages; i++) {
Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
assertNotNull(msg);
consumer.acknowledge(msg);
Expand All @@ -741,9 +740,10 @@ public void testBatchMessagesTracing() throws Exception {
flushSpans();

// Verify spans for batched messages
// Note: Tracing behavior may vary for batched messages depending on when spans are created
List<SpanData> spans = spanExporter.getFinishedSpanItems();
assertTrue(spans.size() > 0, "Expected at least some spans for batched messages");
int expectedNumSpans = numMessages * 2;
assertEquals(spans.size(), expectedNumSpans,
"Expected " + expectedNumSpans + " spans for batched messages, got " + spans.size());

// Verify that spans have correct attributes
spans.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ public CompletableFuture<Consumer<T>> subscribeAsync() {
effectiveInterceptors = new java.util.ArrayList<>(effectiveInterceptors);
}
effectiveInterceptors.add(
new org.apache.pulsar.client.impl.tracing.OpenTelemetryConsumerInterceptor<>());
new org.apache.pulsar.client.impl.tracing.OpenTelemetryConsumerInterceptor<>(
client.instrumentProvider()));
}

if (effectiveInterceptors == null || effectiveInterceptors.size() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ public CompletableFuture<Producer<T>> createAsync() {
} else {
effectiveInterceptors = new ArrayList<>(effectiveInterceptors);
}
effectiveInterceptors.add(new org.apache.pulsar.client.impl.tracing.OpenTelemetryProducerInterceptor());
effectiveInterceptors.add(new org.apache.pulsar.client.impl.tracing.OpenTelemetryProducerInterceptor(
client.instrumentProvider()));
}

return effectiveInterceptors == null || effectiveInterceptors.size() == 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,8 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.api.TraceableMessageId;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;

/**
Expand All @@ -64,11 +61,10 @@
@CustomLog
public class OpenTelemetryConsumerInterceptor<T> implements ConsumerInterceptor<T> {

private Tracer tracer;
private TextMapPropagator propagator;
private final Tracer tracer;
private final TextMapPropagator propagator;
private String topic;
private String subscription;
private boolean initialized = false;

/**
* Used for cumulative acknowledgment support (Failover/Exclusive subscriptions).
Expand All @@ -80,10 +76,12 @@ public class OpenTelemetryConsumerInterceptor<T> implements ConsumerInterceptor<
* instance handles messages from multiple topic partitions. Cumulative ack only affects
* messages from the same topic partition.
*/
private volatile Map<String, ConcurrentSkipListMap<MessageIdAdv, Span>> messageSpansByTopic;
private final Map<String, ConcurrentSkipListMap<MessageIdAdv, Span>> messageSpansByTopic;

public OpenTelemetryConsumerInterceptor() {
// Tracer and propagator will be initialized in beforeConsume when we have access to the consumer
public OpenTelemetryConsumerInterceptor(InstrumentProvider instrumentProvider) {
this.tracer = instrumentProvider.getTracer();
this.propagator = GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
this.messageSpansByTopic = new ConcurrentHashMap<>();
}

/**
Expand All @@ -97,52 +95,17 @@ private String getTopicKey(MessageId messageId) {
return topic != null ? topic : "";
}

/**
* Initialize the tracer from the consumer's client.
* This is called lazily on the first message.
*/
private void initializeIfNeeded(Consumer<T> consumer) {
if (!initialized && consumer instanceof ConsumerBase<?> consumerBase) {
PulsarClientImpl client = consumerBase.getClient();
InstrumentProvider instrumentProvider = client.instrumentProvider();

this.tracer = instrumentProvider.getTracer();
this.propagator = GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
this.initialized = true;
if (consumerBase.getConf().getSubscriptionType() == SubscriptionType.Exclusive
|| consumerBase.getConf().getSubscriptionType() == SubscriptionType.Failover) {
ensureMapInitialized();
}
}
}

/**
* Ensure the map is initialized for cumulative acknowledgment support.
* This is called when we detect cumulative ack is being used.
*/
private void ensureMapInitialized() {
if (messageSpansByTopic == null) {
messageSpansByTopic = new ConcurrentHashMap<>();
log.debug("Initialized message spans map for cumulative acknowledgment support");
}
}

@Override
public void close() {
// Clean up any remaining spans for Failover/Exclusive subscriptions
if (messageSpansByTopic != null) {
messageSpansByTopic.values().forEach(topicSpans ->
topicSpans.values().forEach(TracingContext::endSpan)
);
messageSpansByTopic.clear();
}
messageSpansByTopic.values().forEach(topicSpans ->
topicSpans.values().forEach(TracingContext::endSpan)
);
messageSpansByTopic.clear();
}

@Override
public Message<T> beforeConsume(Consumer<T> consumer, Message<T> message) {
// Initialize tracer from consumer on first call
initializeIfNeeded(consumer);

if (tracer == null || propagator == null) {
return message;
}
Expand All @@ -162,7 +125,7 @@ public Message<T> beforeConsume(Consumer<T> consumer, Message<T> message) {
MessageId messageId = message.getMessageId();

// Store in map for cumulative ack support (Failover/Exclusive)
if (messageSpansByTopic != null && messageId instanceof MessageIdAdv) {
if (messageId instanceof MessageIdAdv) {
String topicKey = getTopicKey(messageId);
messageSpansByTopic.computeIfAbsent(topicKey,
k -> new ConcurrentSkipListMap<>()).put((MessageIdAdv) messageId, span);
Expand Down Expand Up @@ -204,7 +167,7 @@ public void onAcknowledge(Consumer<T> consumer, MessageId messageId, Throwable e
((TraceableMessageId) messageId).setTracingSpan(null);

// Remove from map if it exists (Failover/Exclusive)
if (messageSpansByTopic != null && messageId instanceof MessageIdAdv) {
if (messageId instanceof MessageIdAdv) {
String topicKey = getTopicKey(messageId);
ConcurrentSkipListMap<MessageIdAdv, Span> topicSpans = messageSpansByTopic.get(topicKey);
if (topicSpans != null) {
Expand Down Expand Up @@ -244,8 +207,7 @@ public void onAcknowledgeCumulative(Consumer<T> consumer, MessageId messageId, T
String topicKey = getTopicKey(messageId);

// Get the topic-specific map
ConcurrentSkipListMap<MessageIdAdv, Span> topicSpans = messageSpansByTopic != null
? messageSpansByTopic.get(topicKey) : null;
ConcurrentSkipListMap<MessageIdAdv, Span> topicSpans = messageSpansByTopic.get(topicKey);

// First, try to get the span for the cumulative ack position itself
Span currentSpan = null;
Expand Down Expand Up @@ -295,7 +257,7 @@ public void onAcknowledgeCumulative(Consumer<T> consumer, MessageId messageId, T
}

// If the cumulative ack position span wasn't in the map, end it directly
if (currentSpan != null && messageId instanceof TraceableMessageId) {
if (currentSpan != null) {
try {
if (exception != null) {
TracingContext.endSpan(currentSpan, exception);
Expand Down Expand Up @@ -327,7 +289,7 @@ public void onNegativeAcksSend(Consumer<T> consumer, Set<MessageId> messageIds)
((TraceableMessageId) messageId).setTracingSpan(null);

// Remove from map if it exists (Failover/Exclusive)
if (messageSpansByTopic != null && messageId instanceof MessageIdAdv) {
if (messageId instanceof MessageIdAdv) {
String topicKey = getTopicKey(messageId);
ConcurrentSkipListMap<MessageIdAdv, Span> topicSpans = messageSpansByTopic.get(topicKey);
if (topicSpans != null) {
Expand Down Expand Up @@ -359,7 +321,7 @@ public void onAckTimeoutSend(Consumer<T> consumer, Set<MessageId> messageIds) {
((TraceableMessageId) messageId).setTracingSpan(null);

// Remove from map if it exists (Failover/Exclusive)
if (messageSpansByTopic != null && messageId instanceof MessageIdAdv) {
if (messageId instanceof MessageIdAdv) {
String topicKey = getTopicKey(messageId);
ConcurrentSkipListMap<MessageIdAdv, Span> topicSpans = messageSpansByTopic.get(topicKey);
if (topicSpans != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.TraceableMessage;
import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;

/**
Expand All @@ -45,28 +43,13 @@
@CustomLog
public class OpenTelemetryProducerInterceptor implements ProducerInterceptor {

private Tracer tracer;
private TextMapPropagator propagator;
private final Tracer tracer;
private final TextMapPropagator propagator;
private String topic;
private boolean initialized = false;

public OpenTelemetryProducerInterceptor() {
// Tracer and propagator will be initialized in beforeSend when we have access to the producer
}

/**
* Initialize the tracer from the producer's client.
* This is called lazily on the first message.
*/
private void initializeIfNeeded(Producer producer) {
if (!initialized && producer instanceof ProducerBase<?> producerBase) {
PulsarClientImpl client = producerBase.getClient();
InstrumentProvider instrumentProvider = client.instrumentProvider();

this.tracer = instrumentProvider.getTracer();
this.propagator = GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
this.initialized = true;
}
public OpenTelemetryProducerInterceptor(InstrumentProvider instrumentProvider) {
this.tracer = instrumentProvider.getTracer();
this.propagator = GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
}

@Override
Expand All @@ -82,9 +65,6 @@ public boolean eligible(Message<?> message) {

@Override
public Message<?> beforeSend(Producer<?> producer, Message<?> message) {
// Initialize tracer from producer on first call
initializeIfNeeded(producer);

if (!eligible(message)) {
return message;
}
Expand Down
Loading