Add extension support to the pipeline engine#2113
Add extension support to the pipeline engine#2113utpilla wants to merge 10 commits intoopen-telemetry:mainfrom
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests.
Additional details and impacted files@@ Coverage Diff @@
## main #2113 +/- ##
===========================================
- Coverage 87.27% 81.97% -5.31%
===========================================
Files 553 181 -372
Lines 181329 51898 -129431
===========================================
- Hits 158252 42542 -115710
+ Misses 22543 8822 -13721
Partials 534 534
🚀 New features to boost your workflow:
|
Receivers and exporters both receive |
| /// Provides a minimal set of capabilities — primarily node identity and logging. | ||
| /// Extensions that need periodic timers should use `tokio::time::interval` directly. | ||
| #[derive(Clone)] | ||
| pub struct EffectHandler { |
There was a problem hiding this comment.
In local mode, extensions and pipeline nodes share a single LocalSet thread, so anything that blocks between .await points - sync I/O, heavy crypto, thread::sleep - will stall the whole pipeline silently. Probably worth documenting the non-blocking requirement on the Extension trait so implementors know upfront.
Also noticed EffectHandler doesn't have a spawn_blocking helper. Authors who need to run blocking work will either reach for tokio::task::spawn_blocking directly (works, but not discoverable) or block the thread without realising. Something like:
pub async fn spawn_blocking<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
tokio::task::spawn_blocking(f)
.await
.expect("blocking task panicked")
}would make the safe path obvious. One thing to note - !Send fields can't cross into the closure, so callers need to extract/clone before passing in, might be worth a doc note on the method.
There was a problem hiding this comment.
This issue applies equally to all node types (receivers, processors, exporters). They all share the same single-threaded runtime. None of them currently provide a spawn_blocking helper. I think documenting the non-blocking contract and potentially adding a spawn_blocking helper would be better as a follow-up that covers all node types uniformly, not just extensions.
| @@ -0,0 +1,126 @@ | |||
| // Copyright The OpenTelemetry Authors | |||
There was a problem hiding this comment.
do we need local/shared versions of extensions? both here and in the extensionwrapper? it seems like we already use arc for cloning and sync support anyway, and extensions are send only. so maybe we can just make it so that extensions don't have this separation?
There was a problem hiding this comment.
The extension's service handles are Arc-based and Send + Sync, but the extension implementation itself can hold !Send internal state. This mirrors the pattern used by receivers, processors, and exporters. They all have local/shared variants. Since the engine runs on current_thread + LocalSet, !Send is the natural default. Removing the local variant would force extension authors to add unnecessary Send boilerplate for state that never leaves the thread. I'd prefer to keep it for consistency and flexibility.
| /// | ||
| /// Returns an [`AuthError`] if credentials are unavailable | ||
| /// (e.g., token not yet refreshed, provider unreachable). | ||
| fn get_request_metadata(&self) |
There was a problem hiding this comment.
does client here mean clients that use http headers? I think something that is more agnostic and focuses more on atomic functionality could be more widely useful. something like I did in my pr -> BearerTokenProvider or sth like that, that returns bearer token. How the consumer uses it is none of our concern. This is also very beneficial if consumer wants to have access to stuff like expiration date of bearer token etc easily.
@lalitb @utpilla, I second this. In my view, all node types should be able to access extensions. However, before we can get there, we first need to introduce an |
|
@utpilla First feedback, given that I haven't read the entire PR. I like the idea of reusing the Before continuing the review, I'd really like to see a concrete example of an extension configuration (in our YAML files) and how it hooks up to, for example, a receiver. |
| /// Implement this trait in an auth extension to provide client-side | ||
| /// authentication. The extension decides what headers to attach | ||
| /// (e.g., `Authorization: Bearer <token>`, custom API key headers). | ||
| pub trait ClientAuthenticator: Send { |
There was a problem hiding this comment.
can async traits (funcs) be supported in this pattern?
|
Please don't merge this PR without my approval. I have been working on an extension system as well and I have a different opinion on how I think extensions should be implemented. I think the core idea is very similar in many cases, but I would like us to work together on this feature @utpilla. |
Based on utpilla's insight in open-telemetry#2113 that extensions never touch pipeline data.
@lquerel You could check this diff to get an idea of how a sample config could look like on both receiver and exporter end: utpilla#3 |
Change Summary
Introduces first-class extension support into the dataflow pipeline engine. Extensions are non-pipeline components that provide cross-cutting capabilities (e.g., authentication, health checks, service discovery) to receivers and exporters without participating in the
pdataflow.Motivation
Pipeline components like receivers and exporters often need shared services — credential management, token refresh, header validation — that don't fit the receiver → processor → exporter data-flow model. Extensions provide a clean separation: an independent task produces service handles, and pipeline components consume them at startup via a type-safe registry.
What's included
Engine core
ExtensionWrapper— Unified wrapper supporting bothSendand!Sendextension implementations, analogous to ReceiverWrapper/ExporterWrapper.local::Extension/shared::Extensiontraits — Lifecycle trait with astart()method that receives a control channel and effect handler.ExtensionConfig— Runtime configuration (control channel capacity) for extensions. Extensions only receive control messages, no pdata channels.ExtensionControlMsg— PData-free control message enum (Shutdown,TimerTick,CollectTelemetry).ExtensionFactory— Factory struct (not generic over PData) registered via#[distributed_slice].ExtensionHandles/ExtensionRegistryBuilder/ExtensionRegistry— Type-safe,Clone + Sendregistry. Extension factories register typed handles; pipeline components retrieve them by(extension_name, TypeId)at startup.ServerAuthenticator/ClientAuthenticatortraits — Pluggable auth contract for receivers (validate incoming requests) and exporters (attach outgoing credentials), with cloneable handle wrappers (ServerAuthenticatorHandle,ClientAuthenticatorHandle).Engine macros (engine-macros)
#[pipeline_factory]macro now generates anEXTENSION_FACTORIESdistributed slice and aget_<prefix>_extension_factory_map()helper.Config (config)
NodeKind::Extensionrecognized in URN parsing (:extensionsuffix).Pipeline components (
otap,contrib-nodes,validation,benchmarks)start()signatures now acceptExtensionRegistryas a third parameter._extension_registry(unused) — no behavioral changes.What's NOT included
No concrete extension implementations are shipped yet (no entries in OTAP_EXTENSION_FACTORIES).
What issue does this PR close?
How are these changes tested?
Are there any user-facing changes?
Yes