Skip to content

Add pipeline broker support for the subject's internal pipeline bus#21

Merged
yusufozturk merged 2 commits into
mainfrom
dev-pipe-broker
Jun 22, 2026
Merged

Add pipeline broker support for the subject's internal pipeline bus#21
yusufozturk merged 2 commits into
mainfrom
dev-pipe-broker

Conversation

@yusufozturk

@yusufozturk yusufozturk commented Jun 22, 2026

Copy link
Copy Markdown
Member

What

Adds an optional pipeline_broker: block to a test case, which stands up a
second, dedicated Redpanda broker (hostname pipeline-broker, default port
19092) for the subject's own internal transport — e.g. VirtualMetric's
pipeline_bus: {type: kafka}.

This is independent of the existing kafka: block (hostname redpanda, port
9092), which feeds the generator/subject device. Different hostname and
port means a case can use both at once with no collision. Point the subject
config at pipeline-broker:<port>.

Why

The kafka: broker models the device's input/output transport. A subject
that uses Kafka as its internal pipeline bus needs a separate broker it owns —
mixing the two on one broker conflates the harness's traffic with the subject's
internal traffic. This gives the subject a clean, dedicated bus.

Details

  • Startup ordering: the subject depends_on the broker being healthy
    (rpk cluster health) before starting, so the subject's broker dial can't
    race container startup.
  • Topics: auto-create on first produce by default (the bus owns its topics).
    Set auto_create: false to mirror brokers that disallow it (e.g. Azure Event
    Hubs); in that case topics: MUST list the bus topics, which a one-shot
    pipeline-broker-init pre-creates before the subject starts (the subject then
    depends_on that init completing rather than just broker health).
  • Config: all fields optional with sensible defaults — image
    (redpandadata/redpanda:latest), memory (1G), smp (1), port
    (19092), auto_create (true).

Changes

  • internal/config/case.go — new PipelineBrokerConfig type, PipelineBroker
    field on TestCase, and *OrDefault() accessors.
  • internal/orchestrator/docker.go — render the pipeline-broker (and optional
    pipeline-broker-init) compose services, wire the subject's depends_on, and
    the corresponding composeVars fields.

Summary by CodeRabbit

Release Notes

  • New Features
    • Test cases now support an optional dedicated pipeline broker instance for independent pipeline bus communication
    • Configure pipeline broker properties including image version, memory allocation, CPU cores, advertised port, and topic management
    • Choose between automatic topic creation or explicit topic list specification for the pipeline broker
  • Bug Fixes
    • Improved validation to prevent naming collisions with the new pipeline broker services, and to require an explicit topic list when auto-creation is disabled

@coderabbitai

coderabbitai Bot commented Jun 22, 2026

Copy link
Copy Markdown

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 0e1ea29a-22b5-4ac3-ba87-04e0c30f4674

📥 Commits

Reviewing files that changed from the base of the PR and between d97575f and 5452782.

📒 Files selected for processing (1)
  • internal/config/case.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • internal/config/case.go

Walkthrough

Adds an optional pipeline_broker configuration block to TestCase backed by a new PipelineBrokerConfig struct with image, resource, port, and topic settings, plus validation to enforce topics when auto-create is disabled. The docker-compose orchestration is extended to conditionally spin up a dedicated Redpanda pipeline-broker service and an optional one-shot pipeline-broker-init container for topic pre-creation, with the subject service gated on broker readiness.

Changes

Pipeline Broker per Test Case

Layer / File(s) Summary
PipelineBrokerConfig type, TestCase field, and validation
internal/config/case.go
Adds the exported PipelineBrokerConfig struct (Image, Memory, SMP, Port, AutoCreate, Topics) with five *OrDefault() helper methods, and attaches an optional PipelineBroker *PipelineBrokerConfig field to TestCase. Reserved service-name validation now includes pipeline-broker and pipeline-broker-init to prevent endpoint collisions. A new validatePipelineBroker() helper enforces that when auto_create is false, topics must be non-empty.
Docker-compose pipeline-broker service wiring
internal/orchestrator/docker.go
Adds PipelineBroker* fields to composeVars; extends the compose template with pipeline-broker (Redpanda) and optional pipeline-broker-init service blocks; expands the subject depends_on condition to gate on broker health or init completion; populates vars in writeCompose from tc.PipelineBroker, constructing the rpk topic create command when explicit topics are listed.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~12 minutes

Poem

🐇 A broker just for pipelines? Oh my, what a treat!
A second Redpanda to keep the data fleet.
With topics pre-created or born on the fly,
The subject waits patiently 'til health replies.
Validation ensures the config stays sound—
Hop, hop—the compose grows, the queues align and bound!

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately and concisely describes the main change: adding support for an optional pipeline broker that serves the subject's internal pipeline bus, which is the primary focus of modifications across both configuration and orchestration files.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch dev-pipe-broker

Comment @coderabbitai help to get the list of available commands.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@internal/config/case.go`:
- Around line 55-65: The new PipelineBroker field in the case configuration
creates fixed compose service names (pipeline-broker and pipeline-broker-init)
that can collide with user-defined endpoint names. You need to find the endpoint
name validation logic in the config package and add pipeline-broker and
pipeline-broker-init to the list of reserved service names that are blocked from
being used as endpoint names. This will prevent users from creating endpoints
with names that would conflict with the automatically generated pipeline broker
services.
- Around line 319-327: Add validation logic to enforce the documented contract
that when AutoCreate is set to false, the Topics field must contain at least one
topic. This validation should check that if AutoCreate is not nil and is false,
then Topics must be non-empty, otherwise return a validation error. This
validation should be added to the config validation function or method that
handles the struct containing the AutoCreate and Topics fields to prevent
invalid topology configurations from being created at runtime.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: d494a25b-d5f2-4944-9caf-e5507f0dfadd

📥 Commits

Reviewing files that changed from the base of the PR and between d0d0c90 and d97575f.

📒 Files selected for processing (2)
  • internal/config/case.go
  • internal/orchestrator/docker.go

Comment thread internal/config/case.go
Comment thread internal/config/case.go
@cloudflare-workers-and-pages

cloudflare-workers-and-pages Bot commented Jun 22, 2026

Copy link
Copy Markdown

Deploying pipebench with  Cloudflare Pages  Cloudflare Pages

Latest commit: 5452782
Status: ✅  Deploy successful!
Preview URL: https://003f90fa.pipebench.pages.dev
Branch Preview URL: https://dev-pipe-broker.pipebench.pages.dev

View logs

@yusufozturk yusufozturk merged commit 86569c6 into main Jun 22, 2026
5 checks passed
@yusufozturk yusufozturk deleted the dev-pipe-broker branch June 22, 2026 22:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant