diff --git a/pkg/eventdrop/doc.go b/pkg/eventdrop/doc.go new file mode 100644 index 00000000000..126041c9ba2 --- /dev/null +++ b/pkg/eventdrop/doc.go @@ -0,0 +1,32 @@ +// Package eventdrop provides a centralized entry point for capturing +// telemetry about dropped events inside the Knative Broker data plane. +// +// This package introduces the types and helpers that Broker components +// (such as the filter and ingress handlers) will call when an event is +// dropped due to TTL exhaustion or other well-defined conditions. +// +// # Phase 1 Scope +// +// Phase 1 only introduces the API surface and OpenTelemetry wiring, +// without modifying existing handler code. Integration points will be +// added in a follow-up PR once the design is reviewed and approved. +// +// The RecordEventDropped function is the single entry point for all +// drop-related telemetry, ensuring consistency across multiple components. +// +// # Telemetry Design +// +// - Metrics: Uses low-cardinality attributes (namespace, broker, trigger, reason) +// to avoid metric explosion. EventType and EventSource are omitted from metrics. +// +// - Traces: Includes richer, high-cardinality attributes (eventType, eventSource) +// since traces are sampled and can safely carry detailed context. +// +// # Future Phases +// +// Follow-up phases will build on this instrumentation to add: +// - Kubernetes Event reporting +// - Series aggregation and series.count tracking +// - Buffering and periodic reconciliation +// - Dead-letter reporting sinks +package eventdrop diff --git a/pkg/eventdrop/otel.go b/pkg/eventdrop/otel.go new file mode 100644 index 00000000000..776fdfb9b4c --- /dev/null +++ b/pkg/eventdrop/otel.go @@ -0,0 +1,123 @@ +package eventdrop + +import ( + "context" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" +) + +// OTEL providers obtained from the global OpenTelemetry setup. +// These are used to emit metrics and trace events when events are dropped. +var ( + meter = otel.Meter("knative.dev/eventing/eventdrop") + tracer = otel.Tracer("knative.dev/eventing/eventdrop") + + // droppedEventsCounter is a monotonic counter that increments each time + // an event is dropped. It uses low-cardinality labels to avoid metric explosion. + // It is initialized to nil and only set if metric initialization succeeds. + droppedEventsCounter metric.Int64Counter + counterInitError error +) + +// init initializes the OpenTelemetry metric counter. +// If initialization fails, the error is stored and metrics will be skipped. +// This best-effort approach ensures that telemetry initialization failures do not +// disrupt normal Broker operation. +func init() { + var err error + droppedEventsCounter, err = meter.Int64Counter( + "eventing_broker_events_dropped_total", + metric.WithDescription("Number of events dropped by the Broker data plane"), + metric.WithUnit("1"), + ) + if err != nil { + // Phase 1 does not fail on metric initialization errors. + // Store the error in case a future phase wants to log it. + // For now, we simply skip metrics if initialization fails; the RecordEventDropped + // function will gracefully degrade to trace-only telemetry. + counterInitError = err + } +} + +// recordMetrics emits a low-cardinality metric when an event is dropped. +// +// As per OpenTelemetry best practices and Evan's design review, we keep the +// label set minimal to avoid high-cardinality metric explosion: +// +// - namespace: Kubernetes namespace of the Broker +// - broker: Name of the Broker +// - trigger: Name of the Trigger (may be empty at ingress) +// - reason: Enum value explaining why the event was dropped +// +// EventType and EventSource are intentionally excluded from metrics due to their +// high cardinality. These attributes are included in traces instead, where sampling +// makes cardinality manageable. +func recordMetrics(ctx context.Context, info Info) { + // If the counter failed to initialize, gracefully skip metrics. + if droppedEventsCounter == nil { + return + } + + // Build the low-cardinality attribute set. + attrs := []attribute.KeyValue{ + attribute.String("namespace", info.Namespace), + attribute.String("broker", info.Broker), + attribute.String("trigger", info.Trigger), + attribute.String("reason", string(info.Reason)), + } + + // Increment the dropped events counter. + droppedEventsCounter.Add(ctx, 1, metric.WithAttributes(attrs...)) +} + +// recordTrace emits a trace event with richer, high-cardinality attributes. +// +// Traces are sampled (e.g., 1 in 1000 events), so they can safely include +// high-cardinality attributes like eventType and eventSource without impacting +// the observability backend. +// +// The function adds an "event-dropped" event to the current span if one is active. +// If no span is currently recording, the function returns early without creating +// a new span (Phase 1 design choice). +func recordTrace(ctx context.Context, info Info) { + // Get the current span from the context. + span := trace.SpanFromContext(ctx) + + // Only emit trace events if the span is actively recording. + if !span.IsRecording() { + return + } + + // Build the full attribute set for the trace event. + // Start with core attributes that are always present. + attrs := []attribute.KeyValue{ + attribute.String("namespace", info.Namespace), + attribute.String("broker", info.Broker), + attribute.String("trigger", info.Trigger), + attribute.String("reason", string(info.Reason)), + } + + // Include event metadata – safe for traces due to sampling. + // These attributes may vary widely between events, making them unsuitable + // for metrics but ideal for sampled trace context. + if info.EventType != "" { + attrs = append(attrs, attribute.String("event_type", info.EventType)) + } + if info.EventSource != "" { + attrs = append(attrs, attribute.String("event_source", info.EventSource)) + } + + // Include optional details field if provided (e.g., "TTL=0", "loop detected"). + // This provides additional context for debugging without adding cardinality + // to metrics. + if info.Details != "" { + attrs = append(attrs, attribute.String("details", info.Details)) + } + + // Add the "event-dropped" event to the current span. + // This event, along with its attributes, will be recorded in the trace. + span.AddEvent("event-dropped", trace.WithAttributes(attrs...)) +} diff --git a/pkg/eventdrop/record.go b/pkg/eventdrop/record.go new file mode 100644 index 00000000000..6316ffb6e0d --- /dev/null +++ b/pkg/eventdrop/record.go @@ -0,0 +1,51 @@ +package eventdrop + +import "context" + +// RecordEventDropped is the single entry point for recording telemetry +// whenever an event is dropped in the Broker data plane. +// +// This function records both metrics (low-cardinality) and traces (high-cardinality), +// providing visibility into drop events across different observability dimensions. +// +// # Integration Points (Phase 1.5) +// +// The following Broker handlers will invoke this function in a follow-up PR: +// +// 1. pkg/broker/filter/filter_handler.go (ReasonTTLMissing) +// - Called when an event lacks the internal TTL extension. +// - Typically indicates the event was not sent by the Broker and cannot +// - be safely treated as a looped event. +// - At this point, the Trigger name is known and should be populated. +// +// 2. pkg/broker/ingress/ingress_handler.go (ReasonTTLExhausted) +// - Called when the TTL countdown reaches <= 0. +// - Indicates the event has been evaluated by multiple Triggers and +// - the TTL mechanism is breaking an event loop. +// - At this point, the Trigger may not be known (will be populated in future phases). +// +// # Parameters +// +// ctx: The context carrying the current span and OTEL context. +// info: The Info struct describing the dropped event and drop reason. +// +// # Error Handling +// +// RecordEventDropped does not return errors and performs best-effort telemetry: +// - If metrics initialization failed, only traces will be recorded. +// - If the span is not recording, only metrics will be recorded. +// - If both metric and trace recording fail silently, the function returns cleanly. +// +// This design ensures that telemetry failures do not disrupt event processing. +// +// # Phase 1 Contract +// +// This function is safe to call even if OpenTelemetry is not fully configured. +// It gracefully degrades to the available observability infrastructure. +func RecordEventDropped(ctx context.Context, info Info) { + // Record both metrics and traces. + // Each function performs its own availability checks and gracefully + // degrades if the underlying OTEL infrastructure is not ready. + recordMetrics(ctx, info) + recordTrace(ctx, info) +} diff --git a/pkg/eventdrop/record_test.go b/pkg/eventdrop/record_test.go new file mode 100644 index 00000000000..1fcafe7b55c --- /dev/null +++ b/pkg/eventdrop/record_test.go @@ -0,0 +1,105 @@ +package eventdrop + +import ( + "context" + "testing" +) + +// TestRecordEventDropped_DoesNotPanic validates the Phase 1 contract: +// RecordEventDropped is safe to call even if OTEL infrastructure is not +// fully initialized or configured. +// +// Phase 1 tests focus on the API surface and graceful degradation. +// Detailed metric and trace verification will be added in Phase 1.5 +// once the handlers integrate with RecordEventDropped and emit real events. +func TestRecordEventDropped_DoesNotPanic(t *testing.T) { + ctx := context.Background() + + // Construct a complete Info struct with all fields populated. + // This validates that RecordEventDropped handles the full API surface. + info := Info{ + Namespace: "test-ns", + Broker: "test-broker", + Trigger: "test-trigger", + EventType: "dev.knative.test", + EventSource: "test-source", + Reason: ReasonTTLExhausted, + Details: "TTL count reached 0", + } + + // The function should not panic under any circumstances. + // Even if OTEL is not initialized, it should degrade gracefully. + RecordEventDropped(ctx, info) +} + +// TestRecordEventDropped_WithPartialInfo validates that RecordEventDropped +// gracefully handles Info structs where not all fields are populated. +// +// This is important because: +// - At ingress time, Trigger is not yet known (empty string is OK). +// - EventType and EventSource may not always be available. +// - Details field is optional for all call sites. +// +// Metrics should still be emitted with empty string values for missing fields. +// Traces should omit attributes that are empty. +func TestRecordEventDropped_WithPartialInfo(t *testing.T) { + ctx := context.Background() + + // Simulate the ingress handler case where Trigger is not yet known. + info := Info{ + Namespace: "test-ns", + Broker: "test-broker", + Trigger: "", // Not known at ingress + EventType: "com.example.event", + EventSource: "", // May not be available + Reason: ReasonTTLMissing, + Details: "", // Optional + } + + // Should handle partial Info without panicking. + RecordEventDropped(ctx, info) +} + +// TestRecordEventDropped_MinimalInfo validates that RecordEventDropped +// works with only the required fields populated. +// +// This test ensures backward compatibility and graceful degradation +// if handler code is written before all context is available. +func TestRecordEventDropped_MinimalInfo(t *testing.T) { + ctx := context.Background() + + // Only required fields (Namespace, Broker, Reason). + info := Info{ + Namespace: "test-ns", + Broker: "test-broker", + Reason: ReasonTTLExhausted, + } + + // Should work fine with minimal Info. + RecordEventDropped(ctx, info) +} + +// TestRecordEventDropped_AllReasons validates that both Reason enum values +// are handled correctly. +// +// This ensures that as new reasons are added in future phases, they will +// be handled without code changes. +func TestRecordEventDropped_AllReasons(t *testing.T) { + ctx := context.Background() + + reasons := []Reason{ + ReasonTTLMissing, + ReasonTTLExhausted, + } + + for _, reason := range reasons { + info := Info{ + Namespace: "test-ns", + Broker: "test-broker", + Reason: reason, + } + + // Each reason should be handled without panic. + RecordEventDropped(ctx, info) + } +} diff --git a/pkg/eventdrop/types.go b/pkg/eventdrop/types.go new file mode 100644 index 00000000000..624d4d3bdcc --- /dev/null +++ b/pkg/eventdrop/types.go @@ -0,0 +1,59 @@ +package eventdrop + +// Reason indicates why a Broker data-plane component dropped an event. +// This enum is intentionally small for Phase 1; additional reasons can be +// added in future phases as new drop conditions are identified and instrumented. +type Reason string + +const ( + // ReasonTTLMissing is used by the filter handler when an event lacks the + // internal TTL extension. This typically means the event was not sent by + // the Broker itself (e.g., direct ingestion) and cannot be safely treated + // as a looped event. + // + // Call site: pkg/broker/filter/filter_handler.go + ReasonTTLMissing Reason = "ttl-missing" + + // ReasonTTLExhausted is used by the ingress handler when the TTL countdown + // has reached <= 0. This indicates the event has been processed through + // multiple Trigger evaluations and the TTL mechanism is breaking an event loop. + // + // Call site: pkg/broker/ingress/ingress_handler.go + ReasonTTLExhausted Reason = "ttl-exhausted" + + // Future reasons might include: + // - ReasonDeadLetterFailed (Phase 2: dead-letter delivery failure) + // - ReasonDeliveryExhausted (Phase 2: max retry attempts exceeded) + // - ReasonInternalError (Phase 2: non-retryable internal error) +) + +// Info describes the contextual metadata related to a dropped event. +// Handler code will populate these fields when invoking RecordEventDropped. +// +// Phase 1 does not require all fields to be available at all drop locations. +// For example, at ingress time, the Trigger is not yet known, so it may be empty. +// The minimal subset (Namespace, Broker, Reason) is sufficient for metrics, +// while richer data (EventType, EventSource, Trigger) will be used in traces. +// +// Fields: +// +// Namespace: The Kubernetes namespace where the Broker resides. Required for metrics. +// Broker: The name of the Broker that dropped the event. Required for metrics. +// Trigger: The name of the Trigger associated with this drop (if known). +// May be empty at ingress; populated by filter. Important for traces. +// EventType: The CloudEvents "type" attribute of the dropped event. +// Used in traces only; omitted from metrics to avoid cardinality explosion. +// EventSource: The CloudEvents "source" attribute of the dropped event. +// Used in traces only; omitted from metrics to avoid cardinality explosion. +// Reason: The Reason enum indicating why the event was dropped. Required. +// Details: Optional additional context (e.g., "TTL=0", "loop detected"). +// Used in traces for richer debugging information. +type Info struct { + Namespace string + Broker string + Trigger string + EventType string + EventSource string + Reason Reason + Details string // Optional: rich context for traces, not metrics. +}