feat(cloud): relay telemetry over the trickle events channel#1040
feat(cloud): relay telemetry over the trickle events channel#1040emranemran wants to merge 10 commits into
Conversation
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
🚀 fal.ai Preview Deployment
Testing on Cloud |
There was a problem hiding this comment.
Pull request overview
This PR ensures telemetry emitted on the cloud runner (which lacks Kafka credentials) is no longer silently dropped by relaying those events over the existing trickle events channel to the local SDK client, which becomes the single telemetry egress point (Kafka/HTTP/log).
Changes:
- Introduces a
TelemetrySinkseam and “stamp-once”build_event_envelope()so events can be forwarded verbatim and published downstream without re-stamping. - Adds runner-side forwarding via
TrickleEventsSinkand client-side dispatch fortype=="telemetry"to republish through the configured egress sink. - Adds an HTTP-based
MetricsReportersink with buffering/backoff/disk-resume and installs the default egress sink at server startup / Livepeer connect.
Reviewed changes
Copilot reviewed 11 out of 12 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_metrics_reporter.py | Adds unit tests for MetricsReporter behavior and sink conformance. |
| src/scope/server/metrics_reporter.py | New buffered HTTP metrics reporter implementing the sink protocol. |
| src/scope/server/livepeer.py | Installs the default telemetry egress sink on Livepeer connect. |
| src/scope/server/livepeer_client.py | Handles incoming telemetry events and republishes via publish_raw_event(). |
| src/scope/server/kafka_publisher.py | Adds TelemetrySink, envelope stamping, raw/prebuilt publishing, and sink installation logic. |
| src/scope/server/frame_processor.py | Includes VRAM stats in stream_heartbeat metadata. |
| src/scope/server/app.py | Starts MetricsReporter, installs default telemetry egress sink at startup, and shuts it down cleanly. |
| src/scope/cloud/trickle_events_sink.py | New runner-side sink forwarding telemetry over the trickle events channel. |
| src/scope/cloud/livepeer_app.py | Installs/tears down the runner telemetry sink; adds runner cold-start + media-loop error telemetry. |
| docs/telemetry-over-trickle.html | New documentation describing telemetry relay over trickle. |
| docs/metrics-report-architecture.html | New documentation describing end-to-end telemetry egress via /v1/metrics. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| <div class="note"> | ||
| The Kafka code was never deprecated or removed — it's live, gated behind | ||
| <code>KAFKA_BOOTSTRAP_SERVERS</code>. This change is a | ||
| <strong>redirect</strong>: route the runner's telemetry over the trickle | ||
| events channel so the client (which <em>does</em> have egress) publishes | ||
| it, instead of relying on a broker the runner can't reach. | ||
| </p> |
| <h1>Metrics Reporting Architecture</h1> | ||
| <p>How Daydream Scope reports telemetry from the cloud runner, through the local SDK client, to the remote <code>api.daydream.live/v1/metrics</code> endpoint for publishing into Kafka.</p> |
| <h2>4. MetricsReporter (HTTP Egress)</h2> | ||
| <p>The <code>MetricsReporter</code> implements the <code>TelemetrySink</code> protocol and buffers events for reliable delivery to <code>POST /v1/metrics</code>:</p> | ||
|
|
||
| <pre><code>POST https://api.daydream.live/v1/metrics |
| assert reporter._paused is True | ||
| assert reporter.buffered_count == 1 | ||
|
|
||
| @pytest.mark.anyio |
leszko
left a comment
There was a problem hiding this comment.
Added some comments. In general, my understanding is that this PR is doing too much. Like:
- It changes some unrelated things like
frame_processor.py, or evenapp.py, the best would be to isolate it to onlu thelivepeer_app.py. because we want to send metrics only for the cloud usage - Some of the code seems unused
- There is some docs added, but I'm not sure I understand who will read it
Also it'd be nice to clean the PR description to explain well the intention of this PR.
There was a problem hiding this comment.
Do we need all these docs? To me it sounds like the metrics stuff is kind-of internal to Scope Cloud, not really something that people will want to read about. So, I'd suggest removing these 3 files.
There was a problem hiding this comment.
Also: if this needs docs, I am not sure why they are in HTML when everything else is in Markdown.
| raise | ||
| except Exception as exc: | ||
| logger.error("Media audio output loop failed: %s", exc) | ||
| _publish_media_loop_error(session, "media_audio_output_loop_failed", exc) |
There was a problem hiding this comment.
Do we published these metrics only for users who opted in for sending analytics?
| def buffered_count(self) -> int: | ||
| return len(self._buffer) | ||
|
|
||
| def update_api_key(self, api_key: str) -> None: |
There was a problem hiding this comment.
Where is this one used? Also check other functions if they are used anywhere.
|
|
||
| # Initialize metrics reporter (forwards telemetry to Daydream /v1/metrics) | ||
| if is_metrics_reporter_enabled(): | ||
| api_key = os.getenv("SCOPE_CLOUD_API_KEY", "") |
There was a problem hiding this comment.
What's that SCOPE_CLOUD_API_KEY env variable? Is it set only in the cloud?
## Summary - Lowers `MAX_CONTROL_EVENT_BYTES` (cloud) and `MAX_EVENT_BYTES` (server) from 128 MiB to 5 MiB. - Follow-up to #1035, which raised the cap from the 1 MiB library default. 128 MiB is much larger than any legitimate control-channel event needs; 5 MiB still accommodates workflow imports with embedded assets while bounding memory if a malformed or runaway producer streams unterminated JSONL. ## Test plan - [ ] Verify a workflow import with embedded assets still succeeds end-to-end - [ ] Confirm trickle control channel does not tear down mid-import under normal use 🤖 Generated with [Claude Code](https://claude.com/claude-code) Signed-off-by: Rafał Leszko <rafal@livepeer.org> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
## Release v0.2.5 This PR bumps the version from `0.2.4` to `0.2.5` across all packages. ### Changed files - `pyproject.toml` - `frontend/package.json` - `app/package.json` - `uv.lock` - `frontend/package-lock.json` - `app/package-lock.json` ### What happens on merge 1. The `tag-on-merge` workflow will automatically create and push tag `v0.2.5` 2. The tag will trigger the Electron build and Docker build workflows Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: leszko <2834997+leszko@users.noreply.github.com>
In Livepeer/trickle cloud mode the GPU runner executes the full embedded
Scope app, so every publish_event() call fires on the runner -- but the
runner has no Kafka credentials, so lifecycle/metric/error events were
silently dropped. This routes runner telemetry over the existing trickle
events channel; the local SDK client receives it and is the single egress
point that publishes to Kafka (or another backend).
High-level changes:
- kafka_publisher: add a TelemetrySink abstraction. publish_event() builds
the envelope once and hands it to the active sink (KafkaSink / LogSink /
TrickleEventsSink); falls back to the legacy direct-Kafka path when no
sink is set, so local Scope is unchanged. Add publish_raw_event() and
publish_prebuilt() to forward an envelope verbatim (preserving origin
id/timestamp/ids).
- TrickleEventsSink (new): thread-safe, drop-oldest forwarding onto the
per-session events writer as {"type":"telemetry","event":{...}},
mirroring the existing notification-forwarding pattern.
- Runner installs/tears down the sink in _subscribe_control; adds new
signals: runner cold-start (runner_ready startup_ms), media-loop error
events (previously only logged), and VRAM folded into stream_heartbeat.
- Client dispatches the new "telemetry" event type and re-publishes it
through a pluggable egress sink installed on connect (Kafka today,
HTTP/other swappable via install_default_egress_sink). LogSink
(SCOPE_TELEMETRY_LOG_SINK) allows verifying the relay without a broker.
The fal-host-layer Kafka publisher (livepeer_fal_app.py) is left unchanged.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: emranemran <emran.mah@gmail.com>
HTML walkthrough of the telemetry relay: how it works, the TelemetrySink seam, how to plug in a client-side egress backend, and how to test locally and on cloud (including the two-instance go-livepeer path not yet run). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: emranemran <emran.mah@gmail.com>
Introduce a reliable HTTP egress sink that buffers telemetry events and forwards them in batches to the Daydream metrics API. The reporter satisfies the TelemetrySink protocol so it plugs into the existing install_default_egress_sink() seam alongside KafkaSink and LogSink. Key features: - Thread-safe emit() with bounded deque (50k events max) - Background flush loop with configurable interval (default 2s) - Full reliability contract: exponential backoff on 5xx, pause on 401, rate-limit handling on 429, batch splitting on 413, drop on 400 - Disk-backed JSONL persistence for crash-safe resume - Activated when SCOPE_CLOUD_API_KEY is present Wiring: - install_default_egress_sink() prefers MetricsReporter over KafkaSink - app.py lifespan starts/stops the reporter and installs the sink for local mode; cloud mode re-installs via livepeer.py on connect Signed-off-by: qianghan <qiang@livepeer.org> Co-authored-by: Cursor <cursoragent@cursor.com>
HTML document with data flow diagram covering the full telemetry path from cloud runner through trickle relay to the MetricsReporter HTTP egress and Daydream /v1/metrics Kafka publishing. Signed-off-by: qianghan <qiang@livepeer.org> Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
- metrics_reporter: guard against infinite 413 retry loop when a single event exceeds the API body limit — drop it with a log instead - kafka_publisher: make _send_prebuilt data extraction defensive so a non-dict `data` field doesn't crash the send path - telemetry-over-trickle.html: fix mismatched <div>/<p> closing tag - metrics-report-architecture.html: use api.daydream.monster (the actual default) instead of api.daydream.live, note DAYDREAM_METRICS_URL env var - tests: add 413 single-event and multi-event regression tests Signed-off-by: qianghan <qiang@livepeer.org> Co-authored-by: Cursor <cursoragent@cursor.com>
1379cd1 to
f3708b3
Compare
- Remove internal HTML docs (leszko/j0sh: not needed for internal cloud feature) - Remove unused functions: enqueue_raw_event, update_api_key, report_event (leszko) - Clarify SCOPE_CLOUD_API_KEY is user-provided and opt-in only (leszko) - Add note that telemetry only fires for opted-in cloud users (leszko) Signed-off-by: qianghan <qiang@livepeer.org> Co-authored-by: Cursor <cursoragent@cursor.com>
Why
In Livepeer/trickle cloud mode the GPU runner executes the full embedded Scope app, so every
publish_event()call (session/stream/pipeline/error lifecycle) already fires on the runner — but the runner has no Kafka credentials, so those events silently no-op and are lost.This PR redirects runner telemetry over the existing trickle events channel. The local SDK client receives it and becomes the single egress point that publishes to Kafka (or another backend).
How it works
TelemetrySinkis one shared protocol used on both sides; only the implementation differs.publish_event()builds the envelope once and hands it to the active sink, falling back to today's direct-Kafka behavior when no sink is set — so local (non-cloud) Scope is unchanged. Events are forwarded verbatim (originid/timestamp/session_id/connection_idpreserved), best-effort and lossy under pressure so telemetry never stalls the media or control paths.High-level changes
server/kafka_publisher.py—TelemetrySinkprotocol,build_event_envelope()(stamp once),publish_prebuilt()/publish_raw_event()(verbatim send),KafkaSink,LogSink, sink globals, andinstall_default_egress_sink()(the single swappable seam for the client backend).cloud/trickle_events_sink.py(new) —TrickleEventsSink: thread-safe drop-oldest forwarding onto the per-session events writer, mirroring the existing notification-forwarding pattern.cloud/livepeer_app.py— installs/tears down the sink in_subscribe_control; new instrumentation: runner cold-start (runner_readystartup_ms), media input/output/audio loop errors (previously only logged), and VRAM.server/livepeer_client.py— dispatches the newtelemetryevent type and re-publishes through the egress sink.server/livepeer.py— installs the egress sink on client connect.server/frame_processor.py— foldsvram_allocated_mb/vram_reserved_mbinto the existingstream_heartbeat.The separate fal-host-layer Kafka publisher (
livepeer_fal_app.py) is left unchanged — different tier, has its own Kafka access.Pluggable client egress
The backend is chosen in one place —
install_default_egress_sink()inkafka_publisher.py. Today it selectsKafkaSinkwhen Kafka is configured (orLogSinkwhenSCOPE_TELEMETRY_LOG_SINKis set). A futureHttpSink/other only needs to be added there; the runner-relay and client-dispatch paths don't change.Testing
ruff check src/+ format cleanVerify the relay without a broker: set
SCOPE_TELEMETRY_LOG_SINK=1on the client and watchGET /api/v1/logs/tailforTELEMETRY type=... relayed_via=tricklelines.🤖 Generated with Claude Code