Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions rust/otap-dataflow/benchmarks/benches/exporter/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use tonic::{Request, Response, Status};
use otap_df_config::node::NodeUserConfig;
use otap_df_engine::context::ControllerContext;
use otap_df_engine::control::{Controllable, NodeControlMsg, pipeline_ctrl_msg_channel};
use otap_df_engine::extensions::ExtensionRegistry;
use otap_df_otap::otap_exporter::OTAP_EXPORTER_URN;
use otap_df_otap::otlp_grpc::OTLPData;
use otap_df_otap::perf_exporter::exporter::OTAP_PERF_EXPORTER_URN;
Expand Down Expand Up @@ -454,7 +455,7 @@ fn bench_exporter(c: &mut Criterion) {
let local = LocalSet::new();
let _run_exporter_handle = local.spawn_local(async move {
exporter
.start(node_req_tx, metrics_reporter)
.start(node_req_tx, metrics_reporter, ExtensionRegistry::empty())
.await
.expect("Exporter event loop failed")
});
Expand Down Expand Up @@ -520,7 +521,7 @@ fn bench_exporter(c: &mut Criterion) {
let local = LocalSet::new();
let _run_exporter_handle = local.spawn_local(async move {
exporter
.start(node_req_tx, metrics_reporter)
.start(node_req_tx, metrics_reporter, ExtensionRegistry::empty())
.await
.expect("Exporter event loop failed")
});
Expand Down Expand Up @@ -591,7 +592,7 @@ fn bench_exporter(c: &mut Criterion) {
let local = LocalSet::new();
let _run_exporter_handle = local.spawn_local(async move {
exporter
.start(node_req_tx, metrics_reporter)
.start(node_req_tx, metrics_reporter, ExtensionRegistry::empty())
.await
.expect("Exporter event loop failed")
});
Expand Down
3 changes: 3 additions & 0 deletions rust/otap-dataflow/crates/config/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ pub enum NodeKind {
// Connector,
/// A merged chain of consecutive processors (experimental).
ProcessorChain,
/// A non-pipeline extension (e.g., auth provider, health check).
Extension,
}

impl From<NodeKind> for Cow<'static, str> {
Expand All @@ -103,6 +105,7 @@ impl From<NodeKind> for Cow<'static, str> {
NodeKind::Processor => "processor".into(),
NodeKind::Exporter => "exporter".into(),
NodeKind::ProcessorChain => "processor_chain".into(),
NodeKind::Extension => "extension".into(),
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion rust/otap-dataflow/crates/config/src/node_urn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ const fn kind_suffix(expected_kind: NodeKind) -> &'static str {
NodeKind::Receiver => "receiver",
NodeKind::Processor | NodeKind::ProcessorChain => "processor",
NodeKind::Exporter => "exporter",
NodeKind::Extension => "extension",
}
}

Expand All @@ -228,9 +229,12 @@ fn parse_kind(raw: &str, kind: &str) -> Result<NodeKind, Error> {
"receiver" => Ok(NodeKind::Receiver),
"processor" => Ok(NodeKind::Processor),
"exporter" => Ok(NodeKind::Exporter),
"extension" => Ok(NodeKind::Extension),
_ => Err(invalid_plugin_urn(
raw,
format!("expected kind `receiver`, `processor`, or `exporter`, found `{kind}`"),
format!(
"expected kind `receiver`, `processor`, `exporter`, or `extension`, found `{kind}`"
),
)),
}
}
Expand Down
3 changes: 3 additions & 0 deletions rust/otap-dataflow/crates/config/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,9 @@ impl PipelineConfig {
!has_incoming || !has_outgoing
}
NodeKind::Exporter => !has_incoming,
// Extensions are standalone services; they never participate
// in the data-flow graph and must not be pruned.
NodeKind::Extension => false,
};

if should_remove {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use otap_df_engine::ConsumerEffectHandlerExtension;
use otap_df_engine::context::PipelineContext;
use otap_df_engine::control::{AckMsg, NackMsg, NodeControlMsg};
use otap_df_engine::error::Error as EngineError;
use otap_df_engine::extensions::ExtensionRegistry;
use otap_df_engine::local::exporter::{EffectHandler, Exporter};
use otap_df_engine::message::{Message, MessageChannel};
use otap_df_engine::terminal_state::TerminalState;
Expand Down Expand Up @@ -462,6 +463,7 @@ impl Exporter<OtapPdata> for AzureMonitorExporter {
mut self: Box<Self>,
mut msg_chan: MessageChannel<OtapPdata>,
effect_handler: EffectHandler<OtapPdata>,
_extension_registry: ExtensionRegistry,
) -> Result<TerminalState, EngineError> {
effect_handler
.info(&format!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use otap_df_engine::control::NodeControlMsg;
use otap_df_engine::control::{AckMsg, NackMsg};
use otap_df_engine::error::Error;
use otap_df_engine::exporter::ExporterWrapper;
use otap_df_engine::extensions::ExtensionRegistry;
use otap_df_engine::local::exporter::{EffectHandler, Exporter};
use otap_df_engine::message::{Message, MessageChannel};
use otap_df_engine::node::NodeId;
Expand Down Expand Up @@ -501,6 +502,7 @@ impl Exporter<OtapPdata> for GenevaExporter {
mut self: Box<Self>,
mut msg_chan: MessageChannel<OtapPdata>,
effect_handler: EffectHandler<OtapPdata>,
_extension_registry: ExtensionRegistry,
) -> Result<TerminalState, Error> {
otel_info!(
"geneva_exporter.start",
Expand Down
15 changes: 15 additions & 0 deletions rust/otap-dataflow/crates/engine-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub fn pipeline_factory(args: TokenStream, input: TokenStream) -> TokenStream {
let receiver_factories_name = quote::format_ident!("{}_RECEIVER_FACTORIES", prefix);
let processor_factories_name = quote::format_ident!("{}_PROCESSOR_FACTORIES", prefix);
let exporter_factories_name = quote::format_ident!("{}_EXPORTER_FACTORIES", prefix);
let extension_factories_name = quote::format_ident!("{}_EXTENSION_FACTORIES", prefix);
let get_receiver_factory_map_name = quote::format_ident!(
"get_{}_receiver_factory_map",
prefix.to_string().to_lowercase()
Expand All @@ -87,6 +88,10 @@ pub fn pipeline_factory(args: TokenStream, input: TokenStream) -> TokenStream {
"get_{}_exporter_factory_map",
prefix.to_string().to_lowercase()
);
let get_extension_factory_map_name = quote::format_ident!(
"get_{}_extension_factory_map",
prefix.to_string().to_lowercase()
);

let output = quote! {
/// A slice of receiver factories.
Expand All @@ -101,6 +106,10 @@ pub fn pipeline_factory(args: TokenStream, input: TokenStream) -> TokenStream {
#[::otap_df_engine::distributed_slice]
pub static #exporter_factories_name: [::otap_df_engine::ExporterFactory<#pdata_type>] = [..];

/// A slice of extension factories.
#[::otap_df_engine::distributed_slice]
pub static #extension_factories_name: [::otap_df_engine::ExtensionFactory] = [..];

/// The factory registry instance.
#registry_vis static #registry_name: std::sync::LazyLock<PipelineFactory<#pdata_type>> = std::sync::LazyLock::new(|| {
// Reference build_registry to avoid unused import warning, even though we don't call it
Expand All @@ -109,6 +118,7 @@ pub fn pipeline_factory(args: TokenStream, input: TokenStream) -> TokenStream {
&#receiver_factories_name,
&#processor_factories_name,
&#exporter_factories_name,
&#extension_factories_name,
)
});

Expand All @@ -126,6 +136,11 @@ pub fn pipeline_factory(args: TokenStream, input: TokenStream) -> TokenStream {
pub fn #get_exporter_factory_map_name() -> &'static std::collections::HashMap<&'static str, ::otap_df_engine::ExporterFactory<#pdata_type>> {
#registry_name.get_exporter_factory_map()
}

/// Gets the extension factory map, initializing it if necessary.
pub fn #get_extension_factory_map_name() -> &'static std::collections::HashMap<&'static str, ::otap_df_engine::ExtensionFactory> {
#registry_name.get_extension_factory_map()
}
};

output.into()
Expand Down
1 change: 1 addition & 0 deletions rust/otap-dataflow/crates/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ data-encoding = { workspace = true }
prost = { workspace = true }
byte-unit = { workspace = true }
cpu-time = { workspace = true }
http = { workspace = true }
nix = { workspace = true, features = ["resource"] }

[target.'cfg(not(windows))'.dependencies]
Expand Down
14 changes: 5 additions & 9 deletions rust/otap-dataflow/crates/engine/src/channel_mode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use crate::channel_metrics::{
ChannelSenderMetrics, control_channel_id,
};
use crate::context::PipelineContext;
use crate::control::NodeControlMsg;
use crate::entity_context::current_node_telemetry_handle;
use crate::local::message::{LocalReceiver, LocalSender};
use crate::shared::message::{SharedReceiver, SharedSender};
Expand Down Expand Up @@ -171,24 +170,21 @@ impl ChannelMode for SharedMode {
}
}

/// Generic helper used by receiver, processor, and exporter wrappers.
/// Generic helper used by receiver, processor, exporter, and extension wrappers.
/// It keeps local and shared wiring identical while still emitting mode-specific code.
///
/// The logic first attempts to unwrap the inner MPSC channel so metrics can be attached.
/// If the channel is already wrapped, it preserves the existing wrapper to avoid double
/// instrumentation.
pub(crate) fn wrap_control_channel_metrics<M, PData>(
pub(crate) fn wrap_control_channel_metrics<M, T>(
node_id: &crate::node::NodeId,
pipeline_ctx: &PipelineContext,
channel_metrics: &mut ChannelMetricsRegistry,
channel_metrics_enabled: bool,
capacity: u64,
control_sender: M::ControlSender<NodeControlMsg<PData>>,
control_receiver: M::ControlReceiver<NodeControlMsg<PData>>,
) -> (
M::ControlSender<NodeControlMsg<PData>>,
M::ControlReceiver<NodeControlMsg<PData>>,
)
control_sender: M::ControlSender<T>,
control_receiver: M::ControlReceiver<T>,
) -> (M::ControlSender<T>, M::ControlReceiver<T>)
where
M: ChannelMode,
{
Expand Down
36 changes: 36 additions & 0 deletions rust/otap-dataflow/crates/engine/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,18 @@ pub struct ExporterConfig {
pub input_pdata_channel: PdataChannelConfig,
}

/// Generic configuration for an extension.
///
/// Extensions are non-pipeline components (e.g., auth providers, health checks)
/// that only receive control messages. They do not participate in pdata flow.
#[derive(Clone, Debug)]
pub struct ExtensionConfig {
/// Name of the extension.
pub name: NodeId,
/// Configuration for control channel.
pub control_channel: ControlChannelConfig,
}

impl ReceiverConfig {
/// Creates a new receiver configuration with default channel capacities.
pub fn new<T>(name: T) -> Self
Expand Down Expand Up @@ -172,3 +184,27 @@ impl ExporterConfig {
}
}
}

impl ExtensionConfig {
/// Creates a new extension configuration with default channel capacities.
pub fn new<T>(name: T) -> Self
where
T: Into<NodeId>,
{
Self::with_channel_capacity(name, DEFAULT_CONTROL_CHANNEL_CAPACITY)
}

/// Creates a new extension configuration with an explicit control channel capacity.
#[must_use]
pub fn with_channel_capacity<T>(name: T, control_channel_capacity: usize) -> Self
where
T: Into<NodeId>,
{
ExtensionConfig {
name: name.into(),
control_channel: ControlChannelConfig {
capacity: control_channel_capacity,
},
}
}
}
38 changes: 38 additions & 0 deletions rust/otap-dataflow/crates/engine/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,44 @@ pub enum NodeControlMsg<PData> {
},
}

/// Control messages sent by the pipeline engine to **extensions**.
///
/// This is a PData-free subset of [`NodeControlMsg`] — extensions never process
/// Ack/Nack/DelayedData, so they receive a simpler, non-generic enum.
#[derive(Debug, Clone)]
pub enum ExtensionControlMsg {
/// Notifies the extension of a configuration change.
Config {
/// The new configuration as a JSON value.
config: serde_json::Value,
},

/// Emitted when a scheduled timer expires.
TimerTick {},

/// Signal to collect/flush local telemetry metrics.
CollectTelemetry {
/// Metrics reporter used to collect telemetry metrics.
metrics_reporter: MetricsReporter,
},

/// Requests a graceful shutdown.
Shutdown {
/// Deadline for shutdown.
deadline: Instant,
/// Human-readable reason for the shutdown.
reason: String,
},
}

impl ExtensionControlMsg {
/// Returns `true` if this control message is a shutdown request.
#[must_use]
pub const fn is_shutdown(&self) -> bool {
matches!(self, ExtensionControlMsg::Shutdown { .. })
}
}

/// Control messages sent by nodes to the pipeline engine to manage node-specific operations
/// and control pipeline behavior.
#[derive(Debug, Clone)]
Expand Down
Loading
Loading