diff --git a/src/openarmature/observability/langfuse/observer.py b/src/openarmature/observability/langfuse/observer.py index 520a70d..3654148 100644 --- a/src/openarmature/observability/langfuse/observer.py +++ b/src/openarmature/observability/langfuse/observer.py @@ -23,6 +23,7 @@ from __future__ import annotations import json +import uuid from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any, cast @@ -73,6 +74,12 @@ class _OpenObservation: handle: LangfuseSpanHandle | LangfuseGenerationHandle +def _empty_str_frozenset() -> frozenset[str]: + """Typed empty frozenset factory for ``detached_subgraphs`` / + ``detached_fan_outs`` defaults.""" + return frozenset() + + @dataclass class _InvState: """Per-invocation state, isolated by invocation_id. @@ -87,6 +94,55 @@ class _InvState: default_factory=dict[_StackKey, _OpenObservation] ) open_llm_observations: dict[str, _OpenObservation] = field(default_factory=dict[str, _OpenObservation]) + # Synthetic subgraph dispatch Span observations, keyed by namespace + # prefix. Per spec §8.3 each subgraph wrapper produces a Span + # observation in its parent's Trace; descendant node observations + # parent under it. For a detached subgraph, this dictionary holds + # the dispatch Span observation that lives in the DETACHED Trace + # (so descendants in that subtree parent under it via the detached + # Trace's observation tree); the main Trace carries a separate + # link observation surfacing metadata.detached_child_trace_ids + # that's opened and closed in one shot, not tracked here. + subgraph_observations: dict[tuple[str, ...], _OpenObservation] = field( + default_factory=dict[tuple[str, ...], _OpenObservation] + ) + # Per-instance fan-out dispatch Span observations (non-detached), + # keyed by ``prefix + (str(fan_out_index),)``. Parents under the + # fan-out node's own Span observation; inner-node observations + # parent under this dispatch instead of the shared fan-out node + # span. Closed when the fan-out node's completed event fires. + fan_out_instance_observations: dict[tuple[str, ...], _OpenObservation] = field( + default_factory=dict[tuple[str, ...], _OpenObservation] + ) + # Maps a namespace prefix to the detached Langfuse trace_id when + # that subtree is configured detached (per the observer's + # ``detached_subgraphs`` / ``detached_fan_outs`` knobs). The + # presence of a prefix here switches descendant observations onto + # the detached Trace. + detached_traces: dict[tuple[str, ...], str] = field(default_factory=dict[tuple[str, ...], str]) + # Set of detached fan-out instance prefixes + # (``prefix + (str(fan_out_index),)``) — distinguished from + # detached subgraph prefixes because they're closed when the + # fan-out node's completed event fires, not when the namespace + # cursor leaves the subtree. + fan_out_instance_root_prefixes: set[tuple[str, ...]] = field(default_factory=set[tuple[str, ...]]) + # ``parent_node_name`` cache for per-instance attribution + # (spec proposal 0013 v0.10.0 — inner events from inside a + # non-detached fan-out instance don't carry fan_out_config + # themselves; the cache bridges the lookup so the synthetic + # per-instance dispatch observation can attach + # metadata.fan_out_parent_node_name). + fan_out_parent_node_name: dict[tuple[str, ...], str] = field(default_factory=dict[tuple[str, ...], str]) + # Side-cache: accumulator for `metadata.detached_child_trace_ids` + # on dispatch observations that spawn detached children. Keyed by + # the dispatch observation's prefix (the fan-out node's namespace, + # or the detached-subgraph parent's prefix). Each new detached + # child append-then-snapshot lets us preserve §8.5's string-array + # shape across multiple instances without re-reading metadata + # from the client (the Protocol doesn't expose a read accessor). + detached_child_trace_ids: dict[tuple[str, ...], list[str]] = field( + default_factory=dict[tuple[str, ...], list[str]] + ) @dataclass @@ -113,6 +169,14 @@ class LangfuseObserver: ``payload_max_bytes`` semantic — emission preserves the raw truncated string when the §5.5.5 marker is present (per §8.7). Default 64 KiB; same minimum (256 bytes) applies. + - ``detached_subgraphs``: set of subgraph wrapper node names that + run in their own Langfuse Trace per §8.5. Each such subgraph + gets a fresh trace_id; the main Trace's dispatch observation + surfaces the link via ``metadata.detached_child_trace_ids``. + - ``detached_fan_outs``: set of fan-out node names whose instances + each get their own Langfuse Trace. Same link mechanism on the + fan-out node observation: each per-instance detached trace_id + lands in the array. The observer reads the spec version from the package at construction time. Safe to share across concurrent invocations @@ -124,6 +188,8 @@ class LangfuseObserver: disable_llm_spans: bool = False disable_llm_payload: bool = True payload_byte_cap: int = 65536 + detached_subgraphs: frozenset[str] = field(default_factory=_empty_str_frozenset) + detached_fan_outs: frozenset[str] = field(default_factory=_empty_str_frozenset) spec_version: str = field(default_factory=_read_spec_version) # Internal state populated during invocation. @@ -177,6 +243,14 @@ def _open_started_observation(self, event: NodeEvent) -> None: self._open_trace(invocation_id, correlation_id, event) inv_state = self._inv_states[invocation_id] + # Cache the fan-out node's parent_node_name from its own + # started event so synthetic per-instance dispatch observations + # can attach metadata.fan_out_parent_node_name (the inner + # events from inside the fan-out don't carry fan_out_config + # themselves; this cache bridges). + if event.fan_out_config is not None and event.fan_out_index is None: + inv_state.fan_out_parent_node_name[event.namespace] = event.fan_out_config.parent_node_name + key = self._key_for(event) if key in inv_state.open_observations: # Idempotent: a second started for the same (namespace, @@ -184,10 +258,16 @@ def _open_started_observation(self, event: NodeEvent) -> None: # the OTel observer's behavior under retry-replay). return + # Synthesize any subgraph dispatch / fan-out per-instance + # dispatch observations the leaf needs as ancestors. Also + # closes dispatch observations whose subtree we've left. + self._sync_subgraph_observations(inv_state, correlation_id, event) + parent_observation_id = self._resolve_parent_observation_id(inv_state, event) metadata = self._observation_metadata(event, correlation_id) + target_trace_id = self._trace_id_for(inv_state, event.namespace, event.fan_out_index) handle = self.client.span( - trace_id=inv_state.trace_id, + trace_id=target_trace_id, name=event.node_name, metadata=metadata, parent_observation_id=parent_observation_id, @@ -203,6 +283,36 @@ def _handle_completed(self, event: NodeEvent) -> None: inv_state = self._inv_states.get(invocation_id) if inv_state is None: return + + # If this is the fan-out node's own completion (event.fan_out_index + # is None) AND the fan-out is configured detached, close any + # detached per-instance Trace dispatch observations the fan-out + # spawned. Done BEFORE the regular pop so the close ordering is + # children-before-parents. + if event.fan_out_index is None and event.namespace and event.namespace[0] in self.detached_fan_outs: + for prefix in list(inv_state.fan_out_instance_root_prefixes): + if len(prefix) > len(event.namespace) and prefix[: len(event.namespace)] == event.namespace: + # Detached per-instance dispatches live in + # fan_out_instance_observations (same map as + # non-detached); close via the matching helper. + self._close_fan_out_instance_dispatch_observation(inv_state, prefix) + inv_state.fan_out_instance_root_prefixes.discard(prefix) + inv_state.detached_traces.pop(prefix, None) + # Per spec proposal 0013 (v0.10.0): when the fan-out node's + # own completion fires, close all per-instance dispatch + # observations synthesized for it. Children-before-parents. + if event.fan_out_index is None and event.fan_out_config is not None: + for prefix in list(inv_state.fan_out_instance_observations.keys()): + if len(prefix) > len(event.namespace) and prefix[: len(event.namespace)] == event.namespace: + self._close_fan_out_instance_dispatch_observation(inv_state, prefix) + inv_state.fan_out_parent_node_name.pop(event.namespace, None) + # Clear the detached-child-trace-ids accumulator for this + # fan-out node — cyclic execution that re-enters the same + # fan-out starts the next iteration with a fresh list + # rather than appending to the previous iteration's + # accumulator and overwriting the prior link metadata. + inv_state.detached_child_trace_ids.pop(event.namespace, None) + key = self._key_for(event) observation = inv_state.open_observations.pop(key, None) if observation is None: @@ -213,6 +323,9 @@ def _handle_completed(self, event: NodeEvent) -> None: observation.handle.end(level="ERROR", status_message=event.error.category) else: observation.handle.end() + # If this was a detached subgraph root prefix, drop the + # detached_traces entry so a subsequent re-entry mints fresh. + inv_state.detached_traces.pop(event.namespace, None) def _open_trace(self, invocation_id: str, correlation_id: str | None, event: NodeEvent) -> None: metadata: dict[str, Any] = { @@ -233,24 +346,32 @@ def _key_for(self, event: NodeEvent) -> _StackKey: return (event.namespace, event.attempt_index, event.fan_out_index) def _resolve_parent_observation_id(self, inv_state: _InvState, event: NodeEvent) -> str | None: - # Walk namespace ancestors longest-prefix-first looking for the - # innermost open observation; fall back to None (Trace becomes - # the parent). The outer loop counts down from len(namespace)-1 - # so the deepest matching ancestor wins. - # - # First-match-by-iteration is approximate when multiple open - # observations share the same namespace prefix at different - # _StackKey slots (multiple retry attempts, multiple fan-out - # instances). In practice retry middleware ends one attempt - # before opening the next, so concurrent same-namespace - # observations only arise under fan-out. Spec §8.3 mandates - # dedicated dispatch Span observations for subgraphs and - # per-instance fan-out spans; those land in a follow-on PR - # alongside dedicated subgraph_observations / - # fan_out_instance_observations maps mirroring the OTel - # observer's structure. Until then this resolver covers the - # linear-graph and basic-LLM cases the v0.23.0 conformance - # fixtures exercise. + # Parent precedence (innermost wins): + # 1. Per-instance fan-out dispatch observation at + # namespace[:1] + (str(fan_out_index),) — both detached + # (where the dispatch observation lives in the detached + # Trace) and non-detached (where it lives in the main + # Trace) cases route here when event is inside a fan-out + # instance. + # 2. Subgraph dispatch observation at any matching ancestor + # prefix, walked longest-first. + # 3. Leaf node observation at any matching ancestor prefix, + # walked longest-first. + # 4. None — the Trace itself becomes the implicit parent. + if event.fan_out_index is not None and event.namespace: + instance_key = event.namespace[:1] + (str(event.fan_out_index),) + dispatch = inv_state.fan_out_instance_observations.get(instance_key) + if dispatch is not None: + return dispatch.handle.id + for prefix_len in range(len(event.namespace) - 1, 0, -1): + prefix = event.namespace[:prefix_len] + sg = inv_state.subgraph_observations.get(prefix) + if sg is not None: + return sg.handle.id + # Open leaf-node observation fallback. The outer loop already + # walks longest-first; the inner scan picks the first matching + # open observation, which is fine for the cases dispatch + # synthesis didn't cover (no subgraph wrapping the namespace). for prefix_len in range(len(event.namespace) - 1, 0, -1): prefix = event.namespace[:prefix_len] for key, observation in inv_state.open_observations.items(): @@ -258,6 +379,377 @@ def _resolve_parent_observation_id(self, inv_state: _InvState, event: NodeEvent) return observation.handle.id return None + def _trace_id_for( + self, + inv_state: _InvState, + namespace: tuple[str, ...], + fan_out_index: int | None, + ) -> str: + # Walk ancestor prefixes longest-first to find the innermost + # detached Trace mapping; fall back to the main invocation + # Trace. Detached fan-out instance Traces are keyed by + # ``namespace[:1] + (str(fan_out_index),)`` so check that + # specific composite first. + if fan_out_index is not None and namespace: + instance_key = namespace[:1] + (str(fan_out_index),) + if instance_key in inv_state.detached_traces: + return inv_state.detached_traces[instance_key] + for prefix_len in range(len(namespace), 0, -1): + prefix = namespace[:prefix_len] + if prefix in inv_state.detached_traces: + return inv_state.detached_traces[prefix] + return inv_state.trace_id + + def _sync_subgraph_observations( + self, + inv_state: _InvState, + correlation_id: str | None, + event: NodeEvent, + ) -> None: + # Open synthetic subgraph dispatch / fan-out per-instance + # dispatch observations for any ancestor prefix of this + # event's namespace that doesn't have one yet. Also closes + # subgraph dispatch observations whose subtree we've left. + # + # Called BEFORE opening the leaf observation, so descendants + # find the right parent via _resolve_parent_observation_id. + namespace = event.namespace + # 1. Close subgraph dispatch observations whose prefix is no + # longer an ancestor of the current namespace. + for prefix in list(inv_state.subgraph_observations.keys()): + if prefix in inv_state.fan_out_instance_root_prefixes: + # Detached fan-out instance dispatches close with the + # fan-out's completed event, not on namespace moves. + continue + if not (len(prefix) < len(namespace) and namespace[: len(prefix)] == prefix): + self._close_subgraph_observation(inv_state, prefix) + inv_state.detached_traces.pop(prefix, None) + # 2. Open ancestor dispatch observations for prefixes that + # don't have one yet. + for depth in range(1, len(namespace)): + prefix = namespace[:depth] + if prefix in inv_state.subgraph_observations: + continue + # Non-detached per-instance dispatch for the current + # event's own fan-out instance gets opened below; skip + # the regular subgraph path here so we don't double-open. + if ( + depth == 1 + and event.fan_out_index is not None + and (prefix + (str(event.fan_out_index),)) in inv_state.fan_out_instance_observations + ): + continue + # Detached subgraph: the first segment matches a + # configured detached_subgraphs name → mint a fresh + # detached Trace + open the dispatch observation in it. + if depth == 1 and prefix[0] in self.detached_subgraphs: + self._open_detached_subgraph_trace(inv_state, correlation_id, prefix) + continue + # Detached fan-out: the fan-out instance gets its own + # Trace per spec §8.5. The fan-out node's Span observation + # in the parent Trace already exists (opened on the + # fan-out node's started event); the detached dispatch + # observation goes into the new Trace. + if depth == 1 and event.fan_out_index is not None and prefix[0] in self.detached_fan_outs: + self._open_detached_fan_out_instance_trace(inv_state, correlation_id, prefix, event) + continue + # Non-detached fan-out: synthesize per-instance dispatch + # observation under the fan-out node observation (proposal + # 0013 v0.10.0). Only triggers when the inner event is + # inside a fan-out instance AND the fan-out node's + # parent_node_name has been cached (i.e., the fan-out + # node's own started event was seen). + if ( + depth == 1 + and event.fan_out_index is not None + and prefix[0] not in self.detached_fan_outs + and prefix in inv_state.fan_out_parent_node_name + ): + self._open_fan_out_instance_dispatch_observation(inv_state, correlation_id, prefix, event) + continue + # Plain non-detached subgraph dispatch. + self._open_subgraph_observation(inv_state, correlation_id, prefix) + + def _open_subgraph_observation( + self, + inv_state: _InvState, + correlation_id: str | None, + prefix: tuple[str, ...], + ) -> None: + # Parent is the nearest enclosing subgraph dispatch (if any), + # else None (the Trace is the implicit parent for top-level + # subgraphs). + parent_observation_id: str | None = None + for plen in range(len(prefix) - 1, 0, -1): + outer = prefix[:plen] + sg = inv_state.subgraph_observations.get(outer) + if sg is not None: + parent_observation_id = sg.handle.id + break + metadata: dict[str, Any] = {"subgraph_name": prefix[-1]} + if correlation_id is not None: + metadata["correlation_id"] = correlation_id + handle = self.client.span( + trace_id=inv_state.trace_id, + name=prefix[-1], + metadata=metadata, + parent_observation_id=parent_observation_id, + ) + inv_state.subgraph_observations[prefix] = _OpenObservation(handle=handle) + + def _open_fan_out_instance_dispatch_observation( + self, + inv_state: _InvState, + correlation_id: str | None, + prefix: tuple[str, ...], + event: NodeEvent, + ) -> None: + # Non-detached per-instance dispatch lives in the parent + # Trace under the fan-out node's own Span observation. + fan_out_open = self._find_fan_out_node_observation(inv_state, prefix) + parent_observation_id = fan_out_open.handle.id if fan_out_open is not None else None + parent_node_name = inv_state.fan_out_parent_node_name.get(prefix, prefix[-1]) + metadata: dict[str, Any] = { + "fan_out_parent_node_name": parent_node_name, + "fan_out_index": event.fan_out_index, + } + if correlation_id is not None: + metadata["correlation_id"] = correlation_id + handle = self.client.span( + trace_id=inv_state.trace_id, + name=prefix[-1], + metadata=metadata, + parent_observation_id=parent_observation_id, + ) + instance_key = prefix + (str(event.fan_out_index),) + inv_state.fan_out_instance_observations[instance_key] = _OpenObservation(handle=handle) + + def _open_detached_subgraph_trace( + self, + inv_state: _InvState, + correlation_id: str | None, + prefix: tuple[str, ...], + ) -> None: + # Mint a fresh Trace for the detached subtree. The main Trace's + # dispatch observation surfaces the link via + # metadata.detached_child_trace_ids; the detached Trace gets + # its own dispatch observation that descendants parent under. + # + # Asymmetry note vs. _open_detached_fan_out_instance_trace: + # subgraphs are namespace-prefix-only constructs with no + # per-subgraph node event of their own. The observer never + # opens a leaf Span observation for the subgraph itself, only + # synthesized dispatch observations. To carry the cross-Trace + # link in the main Trace's shape, this helper opens an extra + # "link" Span observation in the main Trace — a small + # observation whose subtree is empty but whose + # detached_child_trace_ids metadata points at the new Trace. + # Dashboard users see two observations named ``prefix[-1]``: + # one in the main Trace (link with link metadata, no subtree) + # and one in the detached Trace (the real dispatch with the + # subgraph subtree under it). + # + # Detached fan-out instances, by contrast, already have a + # parent observation in the main Trace (the fan-out node's + # leaf observation opened on its own started event). The + # link metadata accumulates on that pre-existing observation + # instead of synthesizing a separate link observation. + detached_trace_id = str(uuid.uuid4()) + # Open the link observation in the main Trace and update its + # metadata immediately — the array-form preserves §8.5's + # "string array, one entry per detached child" shape so + # later detached siblings under the same parent can append. + link_metadata: dict[str, Any] = { + "subgraph_name": prefix[-1], + "detached_child_trace_ids": [detached_trace_id], + } + if correlation_id is not None: + link_metadata["correlation_id"] = correlation_id + parent_observation_id: str | None = None + for plen in range(len(prefix) - 1, 0, -1): + outer = prefix[:plen] + sg = inv_state.subgraph_observations.get(outer) + if sg is not None: + parent_observation_id = sg.handle.id + break + # Zero-duration link observation in the main Trace — it + # exists only to surface the cross-Trace reference via + # metadata.detached_child_trace_ids; close it immediately so + # nothing perceives it as in-flight. Mirrors the OTel + # observer's synthetic-event zero-duration spans. + link_handle = self.client.span( + trace_id=inv_state.trace_id, + name=prefix[-1], + metadata=link_metadata, + parent_observation_id=parent_observation_id, + ) + link_handle.end() + # Open the detached Trace + the dispatch observation that + # subtree descendants parent under. + detached_metadata: dict[str, Any] = {"detached_from_invocation_id": inv_state.trace_id} + if correlation_id is not None: + detached_metadata["correlation_id"] = correlation_id + self.client.trace(id=detached_trace_id, name=prefix[-1], metadata=detached_metadata) + dispatch_metadata: dict[str, Any] = { + "subgraph_name": prefix[-1], + "detached": True, + } + if correlation_id is not None: + dispatch_metadata["correlation_id"] = correlation_id + handle = self.client.span( + trace_id=detached_trace_id, + name=prefix[-1], + metadata=dispatch_metadata, + parent_observation_id=None, + ) + inv_state.subgraph_observations[prefix] = _OpenObservation(handle=handle) + inv_state.detached_traces[prefix] = detached_trace_id + + def _open_detached_fan_out_instance_trace( + self, + inv_state: _InvState, + correlation_id: str | None, + prefix: tuple[str, ...], + event: NodeEvent, + ) -> None: + # Mint a fresh Trace per instance. The fan-out node's own + # Span observation in the parent Trace accumulates the + # detached_child_trace_ids array (one entry per instance); + # each detached Trace gets its own per-instance dispatch + # observation that inner-node observations parent under. + # + # See _open_detached_subgraph_trace's docstring for why the + # detached-fan-out path doesn't synthesize a separate "link" + # observation in the main Trace: the fan-out node already + # has a leaf observation there (opened on its started event), + # so the link metadata accumulates on that existing + # observation rather than on a parallel link observation. + detached_trace_id = str(uuid.uuid4()) + # Accumulate the per-fan-out link-ids list via the side cache + # so each new instance appends to the array on the fan-out + # node's observation rather than overwriting the previous + # instance's entry. + ids_list = inv_state.detached_child_trace_ids.setdefault(prefix, []) + ids_list.append(detached_trace_id) + fan_out_open = self._find_fan_out_node_observation(inv_state, prefix) + if fan_out_open is not None: + link_metadata: dict[str, Any] = { + "detached_child_trace_ids": list(ids_list), + } + if correlation_id is not None: + link_metadata["correlation_id"] = correlation_id + fan_out_open.handle.update(metadata=link_metadata) + # Open the detached Trace + per-instance dispatch observation. + detached_metadata: dict[str, Any] = { + "detached_from_invocation_id": inv_state.trace_id, + "fan_out_index": event.fan_out_index, + } + if correlation_id is not None: + detached_metadata["correlation_id"] = correlation_id + self.client.trace( + id=detached_trace_id, + name=prefix[-1], + metadata=detached_metadata, + ) + parent_node_name = inv_state.fan_out_parent_node_name.get(prefix, prefix[-1]) + dispatch_metadata: dict[str, Any] = { + "fan_out_parent_node_name": parent_node_name, + "fan_out_index": event.fan_out_index, + "detached": True, + } + if correlation_id is not None: + dispatch_metadata["correlation_id"] = correlation_id + handle = self.client.span( + trace_id=detached_trace_id, + name=prefix[-1], + metadata=dispatch_metadata, + parent_observation_id=None, + ) + instance_key = prefix + (str(event.fan_out_index),) + inv_state.fan_out_instance_observations[instance_key] = _OpenObservation(handle=handle) + inv_state.detached_traces[instance_key] = detached_trace_id + inv_state.fan_out_instance_root_prefixes.add(instance_key) + + def _close_subgraph_observation(self, inv_state: _InvState, prefix: tuple[str, ...]) -> None: + observation = inv_state.subgraph_observations.pop(prefix, None) + if observation is None: + return + observation.handle.end() + + def _close_fan_out_instance_dispatch_observation( + self, inv_state: _InvState, prefix: tuple[str, ...] + ) -> None: + observation = inv_state.fan_out_instance_observations.pop(prefix, None) + if observation is None: + return + observation.handle.end() + + def _find_fan_out_node_observation( + self, inv_state: _InvState, prefix: tuple[str, ...] + ) -> _OpenObservation | None: + # Find the fan-out node's open leaf observation at the given + # prefix. Retry middleware wrapping a fan-out bumps the + # attempt_index; this scans for any entry at ``prefix`` with + # ``fan_out_index is None``. Only one such entry is open at a + # time (retry opens and closes within an attempt's lifecycle). + for key, observation in inv_state.open_observations.items(): + if key[0] == prefix and key[2] is None: + return observation + return None + + # ------------------------------------------------------------------ + # Lifecycle: close_invocation / shutdown + # ------------------------------------------------------------------ + + def close_invocation(self, invocation_id: str) -> None: + """Drain still-open observations for ``invocation_id``. + + Synthetic dispatch observations only close on cursor-move when + a subsequent event arrives with a different namespace prefix. + For a subgraph or fan-out that's the last subtree of an + invocation, no follow-up event triggers the close — this + method walks the per-invocation state and ends anything left + in child→parent order so the Langfuse-side observations don't + stay perpetually in-flight. + + Idempotent: calling twice (or for an invocation_id with no + open state) is a no-op. + """ + inv_state = self._inv_states.pop(invocation_id, None) + if inv_state is None: + return + # Order: deepest leaves first so parents see all children + # closed before they end. LLM observations → leaf nodes + # (sorted deepest-first by namespace length) → per-instance + # fan-out dispatches → subgraph dispatches. + for call_id in list(inv_state.open_llm_observations.keys()): + obs = inv_state.open_llm_observations.pop(call_id, None) + if obs is not None: + obs.handle.end() + for key in sorted( + inv_state.open_observations.keys(), + key=lambda k: -len(k[0]), + ): + obs = inv_state.open_observations.pop(key, None) + if obs is not None: + obs.handle.end() + for prefix in list(inv_state.fan_out_instance_observations.keys()): + self._close_fan_out_instance_dispatch_observation(inv_state, prefix) + for prefix in sorted( + inv_state.subgraph_observations.keys(), + key=lambda p: -len(p), + ): + self._close_subgraph_observation(inv_state, prefix) + + def shutdown(self) -> None: + """Drain every in-flight invocation. Use for long-lived + observers shared across requests; CLI / one-shot processes + typically call this from a ``finally`` block alongside + ``compiled.drain()``. + """ + for invocation_id in list(self._inv_states.keys()): + self.close_invocation(invocation_id) + def _observation_metadata(self, event: NodeEvent, correlation_id: str | None) -> dict[str, Any]: # §8.4.2 observation-level mapping. Fields below mirror the # OTel observer's _node_attrs() output, renamed for Langfuse's @@ -314,8 +806,11 @@ def _handle_llm_event(self, event: NodeEvent) -> None: metadata, model_parameters, input_value, output_value = self._llm_metadata_and_payload( payload, correlation_id, phase="started" ) + target_trace_id = self._trace_id_for( + inv_state, payload.calling_namespace_prefix, payload.calling_fan_out_index + ) handle = self.client.generation( - trace_id=inv_state.trace_id, + trace_id=target_trace_id, name="openarmature.llm.complete", model=payload.model, model_parameters=model_parameters, @@ -353,9 +848,16 @@ def _resolve_llm_parent_observation_id( self, inv_state: _InvState, payload: LlmEventPayload ) -> str | None: # Calling-node identity comes from the payload (set at - # dispatch time per llm-provider §5.5). Resolve the calling - # node's open observation; fall back to None (Trace parent) - # if not found. + # dispatch time per llm-provider §5.5). Precedence: + # 1. Exact-match leaf node at the calling key. + # 2. Per-instance fan-out dispatch observation when the + # call originated inside a fan-out instance. + # 3. Subgraph dispatch observations along the calling + # namespace prefix, walked longest-prefix-first. + # 4. None — Trace becomes the implicit parent. + # The dispatch fallbacks cover the wrapped-call cases the + # exact-match miss would otherwise need a leaf-ancestor walk + # to handle. key: _StackKey = ( payload.calling_namespace_prefix, payload.calling_attempt_index, @@ -364,6 +866,18 @@ def _resolve_llm_parent_observation_id( observation = inv_state.open_observations.get(key) if observation is not None: return observation.handle.id + # Per-instance fan-out dispatch. + if payload.calling_fan_out_index is not None and payload.calling_namespace_prefix: + instance_key = payload.calling_namespace_prefix[:1] + (str(payload.calling_fan_out_index),) + dispatch = inv_state.fan_out_instance_observations.get(instance_key) + if dispatch is not None: + return dispatch.handle.id + # Subgraph dispatch, longest-prefix-first. + for prefix_len in range(len(payload.calling_namespace_prefix), 0, -1): + prefix = payload.calling_namespace_prefix[:prefix_len] + sg = inv_state.subgraph_observations.get(prefix) + if sg is not None: + return sg.handle.id return None def _llm_metadata_and_payload( diff --git a/tests/unit/test_observability_langfuse.py b/tests/unit/test_observability_langfuse.py index 57c7f8a..7553001 100644 --- a/tests/unit/test_observability_langfuse.py +++ b/tests/unit/test_observability_langfuse.py @@ -4,16 +4,24 @@ exercises the end-to-end Trace + Observation shape against spec/observability/conformance/022-024. These unit tests fill gaps those fixtures don't isolate directly: payload-cap validation, -truncation algorithm boundaries, in-memory recorder field handling. +truncation algorithm boundaries, in-memory recorder field handling, +and the synthetic-dispatch-observation paths (subgraph, fan-out +non-detached, detached subgraph, detached fan-out) that no Langfuse +spec fixture exercises today. """ from __future__ import annotations +from typing import Annotated, Any, cast + import pytest +from openarmature.graph import END, GraphBuilder, State, append from openarmature.observability.langfuse import ( InMemoryLangfuseClient, + LangfuseObservation, LangfuseObserver, + LangfuseTrace, LangfuseUsage, ) @@ -114,3 +122,229 @@ def test_in_memory_recorder_children_of_walks_parent_links() -> None: assert [o.name for o in root_children] == ["child"] # Unrelated observation not under root. assert child.id != other.id + + +# --------------------------------------------------------------------------- +# Dispatch synthesis (PR 3.5) — subgraph, fan-out non-detached, detached +# --------------------------------------------------------------------------- +# The Langfuse mapping has no spec fixtures for subgraph dispatch / +# fan-out per-instance / detached-trace mode (spec proposal 0031's +# 022-024 only exercise linear graphs + LLM + prompt linkage). These +# tests pin the synthesis-helper behavior locally so future changes +# don't silently break parenting under composition. + + +class _S(State): + trail: Annotated[list[str], append] = [] + worker_results: Annotated[list[str], append] = [] + + +class _WorkerState(State): + result: str = "" + + +async def _record(name: str) -> Any: + return {"trail": [name]} + + +def _attach(graph: Any) -> tuple[Any, InMemoryLangfuseClient, LangfuseObserver]: + client = InMemoryLangfuseClient() + observer = LangfuseObserver(client=client) + graph.attach_observer(observer) + return graph, client, observer + + +def _attach_with_detached( + graph: Any, + *, + detached_subgraphs: frozenset[str] = frozenset(), + detached_fan_outs: frozenset[str] = frozenset(), +) -> tuple[Any, InMemoryLangfuseClient, LangfuseObserver]: + client = InMemoryLangfuseClient() + observer = LangfuseObserver( + client=client, + detached_subgraphs=detached_subgraphs, + detached_fan_outs=detached_fan_outs, + ) + graph.attach_observer(observer) + return graph, client, observer + + +def _find_observation(trace: LangfuseTrace, name: str) -> LangfuseObservation: + for obs in trace.observations: + if obs.name == name: + return obs + raise AssertionError(f"observation {name!r} not in trace {trace.id!r}") + + +async def test_subgraph_dispatch_observation_parents_inner_node() -> None: + inner = ( + GraphBuilder(_S) + .add_node("inner_a", lambda _s: _record("inner_a")) + .add_edge("inner_a", END) + .set_entry("inner_a") + .compile() + ) + parent = GraphBuilder(_S).add_subgraph_node("sub", inner).add_edge("sub", END).set_entry("sub").compile() + graph, client, _ = _attach(parent) + + await graph.invoke(_S()) + await graph.drain() + + trace = next(iter(client.traces.values())) + sub_dispatch = _find_observation(trace, "sub") + inner_node = _find_observation(trace, "inner_a") + # inner_a must parent under the synthesized subgraph dispatch + # observation, not directly under the Trace. + assert inner_node.parent_observation_id == sub_dispatch.id + # The subgraph dispatch lives at the top level of the Trace. + assert sub_dispatch.parent_observation_id is None + + +async def test_fan_out_non_detached_per_instance_dispatch() -> None: + async def _worker(_s: _WorkerState) -> Any: + return {"result": "done"} + + inner = ( + GraphBuilder(_WorkerState) + .add_node("worker", _worker) + .add_edge("worker", END) + .set_entry("worker") + .compile() + ) + parent = ( + GraphBuilder(_S) + .add_fan_out_node( + "fan", + subgraph=inner, + count=2, + collect_field="result", + target_field="worker_results", + ) + .add_edge("fan", END) + .set_entry("fan") + .compile() + ) + graph, client, _ = _attach(parent) + + await graph.invoke(_S()) + await graph.drain() + + trace = next(iter(client.traces.values())) + fan_node = _find_observation(trace, "fan") + # Per-instance dispatch observations share the fan-out node name. + dispatches = [o for o in trace.observations if o.name == "fan" and o.parent_observation_id == fan_node.id] + assert len(dispatches) == 2, f"expected 2 per-instance dispatches, got {len(dispatches)}" + # Each per-instance dispatch carries the fan_out_index in metadata. + indices = {d.metadata.get("fan_out_index") for d in dispatches} + assert indices == {0, 1} + # Worker observations parent under their per-instance dispatch. + workers = [o for o in trace.observations if o.name == "worker"] + assert len(workers) == 2 + worker_parents = {w.parent_observation_id for w in workers} + dispatch_ids = {d.id for d in dispatches} + assert worker_parents == dispatch_ids + + +async def test_detached_subgraph_opens_separate_trace() -> None: + inner = ( + GraphBuilder(_S) + .add_node("inner_a", lambda _s: _record("inner_a")) + .add_edge("inner_a", END) + .set_entry("inner_a") + .compile() + ) + parent = GraphBuilder(_S).add_subgraph_node("sub", inner).add_edge("sub", END).set_entry("sub").compile() + graph, client, _ = _attach_with_detached(parent, detached_subgraphs=frozenset({"sub"})) + + await graph.invoke(_S()) + await graph.drain() + + # Two Traces: main invocation + detached subgraph. + assert len(client.traces) == 2 + main = next(t for t in client.traces.values() if "detached_from_invocation_id" not in t.metadata) + detached = next(t for t in client.traces.values() if "detached_from_invocation_id" in t.metadata) + + # Main Trace has the link observation with detached_child_trace_ids. + link_obs = _find_observation(main, "sub") + assert detached.id in link_obs.metadata["detached_child_trace_ids"] + # Detached Trace has its own dispatch observation + inner_a under it. + detached_dispatch = _find_observation(detached, "sub") + assert detached_dispatch.parent_observation_id is None + inner_node = _find_observation(detached, "inner_a") + assert inner_node.parent_observation_id == detached_dispatch.id + + +async def test_detached_fan_out_each_instance_gets_trace() -> None: + async def _worker(_s: _WorkerState) -> Any: + return {"result": "done"} + + inner = ( + GraphBuilder(_WorkerState) + .add_node("worker", _worker) + .add_edge("worker", END) + .set_entry("worker") + .compile() + ) + parent = ( + GraphBuilder(_S) + .add_fan_out_node( + "fan", + subgraph=inner, + count=3, + collect_field="result", + target_field="worker_results", + ) + .add_edge("fan", END) + .set_entry("fan") + .compile() + ) + graph, client, _ = _attach_with_detached(parent, detached_fan_outs=frozenset({"fan"})) + + await graph.invoke(_S()) + await graph.drain() + + # Main Trace + one detached Trace per instance. + assert len(client.traces) == 1 + 3 + main = next(t for t in client.traces.values() if "detached_from_invocation_id" not in t.metadata) + detached_traces = [t for t in client.traces.values() if "detached_from_invocation_id" in t.metadata] + assert len(detached_traces) == 3 + + fan_node = _find_observation(main, "fan") + # The fan-out node's metadata accumulates all 3 detached trace ids. + link_ids = fan_node.metadata.get("detached_child_trace_ids") + assert isinstance(link_ids, list) + assert set(cast(list[str], link_ids)) == {t.id for t in detached_traces} + + # Each detached Trace has its own per-instance dispatch with a + # worker observation under it. + for t in detached_traces: + dispatch = _find_observation(t, "fan") + worker = _find_observation(t, "worker") + assert worker.parent_observation_id == dispatch.id + + +async def test_subgraph_dispatch_observation_ended_on_invocation_close() -> None: + # Synthetic dispatch observations close on cursor-move; without + # the close_invocation drain a subgraph at the tail of an + # invocation would leave its dispatch in-flight forever. Verifies + # the drain path ends everything. + inner = ( + GraphBuilder(_S) + .add_node("inner_a", lambda _s: _record("inner_a")) + .add_edge("inner_a", END) + .set_entry("inner_a") + .compile() + ) + parent = GraphBuilder(_S).add_subgraph_node("sub", inner).add_edge("sub", END).set_entry("sub").compile() + graph, client, observer = _attach(parent) + + await graph.invoke(_S()) + await graph.drain() + # Without explicit close_invocation, the sub dispatch would still + # be in-flight (ended=False). Call shutdown() to drain. + observer.shutdown() + + trace = next(iter(client.traces.values())) + for obs in trace.observations: + assert obs.ended, f"observation {obs.name!r} not ended after shutdown()"