From 7310523954ac3c13012762ef4f84759cfd99629d Mon Sep 17 00:00:00 2001 From: Eren Aslan <16862833+erenaslandev@users.noreply.github.com> Date: Tue, 23 Jun 2026 17:31:00 +0300 Subject: [PATCH 1/5] feat: implement rotation handling --- internal/config/case.go | 112 ++++++++++ internal/orchestrator/tls.go | 78 +++++++ internal/runner/runner.go | 394 ++++++++++++++++++++++++++++++++++- 3 files changed, 578 insertions(+), 6 deletions(-) diff --git a/internal/config/case.go b/internal/config/case.go index c508156..0d80e79 100644 --- a/internal/config/case.go +++ b/internal/config/case.go @@ -103,6 +103,13 @@ type TestCase struct { // can't decode columnar objects, and the s3 receiver's destructive drain // would corrupt the read. See VerifierConfig. Verifier *VerifierConfig `yaml:"verifier"` + + // Rotation, when set, parameterizes the + // director_agent_tls_cert_rotation_correctness driver: which director TLS + // cert/CA rotation to perform mid-run and how the enrolled agent is expected + // to respond. Required by (and only meaningful for) that type. See + // RotationConfig and runDirectorAgentCertRotation. + Rotation *RotationConfig `yaml:"rotation"` } // VerifierConfig configures the post-drain DuckDB verifier container (see @@ -409,6 +416,65 @@ func (v *VaultConfig) MountOrDefault() string { return "secret" } +// Rotation parameterizes the director_agent_tls_cert_rotation_correctness +// driver (see TestCase.Rotation, runDirectorAgentCertRotation). The director +// deploys an agent that streams back over the proxy_tls listener; mid-run the +// director's serving cert/CA is rotated on disk and the director is bounced so +// the enrolled agent must re-handshake. Mode selects which rotation and the +// expected agent response. +type RotationConfig struct { + // Mode selects the mid-run rotation: + // "same_ca" — re-sign the director leaf under the SAME CA + // (RotateServerCert). The agent reconnects transparently + // because the chain still validates. Verdict: delivery + // resumes (count grows after the bounce). + // "new_ca_recover" — rotate to a BRAND-NEW CA written to ca.crt and served + // at /dl/cert.pem (RotateServerCertNewCA). A bootstrap + // agent (no operator-pinned CA) must re-fetch the new CA + // and reconnect. Verdict: delivery resumes. + // "new_ca_reject" — TWO PHASE. Phase 1 re-signs the leaf under an UNTRUSTED + // CA the director never serves (RotateServerCertWrongCA): + // the agent MUST fail validation, so delivery STALLS + // (a missing stall is a SECURITY failure — validation is + // disabled). Phase 2 restores a trusted leaf + // (RotateServerCert) and delivery must resume. + Mode string `yaml:"mode"` + + // SettleSeconds is the pause after a rotation+bounce before the driver samples + // the receiver, giving the director time to rebind its listener and the agent + // time to detect the dropped session and reconnect (default 25s). + SettleSeconds int `yaml:"settle_seconds"` + + // StallSeconds (new_ca_reject only) is how long the receiver count must hold + // flat after the untrusted rotation for the bad cert to count as rejected + // (default 20s). The case's endpoint seed loop MUST still be appending fresh + // records during this window, else the stall is vacuous — see the case NOTES. + StallSeconds int `yaml:"stall_seconds"` +} + +// Rotation mode values for RotationConfig.Mode. +const ( + RotationSameCA = "same_ca" + RotationNewCARecover = "new_ca_recover" + RotationNewCAReject = "new_ca_reject" +) + +// SettleSecondsOrDefault / StallSecondsOrDefault centralize the rotation timing +// defaults so the driver and any caller agree. +func (rc *RotationConfig) SettleSecondsOrDefault() int { + if rc != nil && rc.SettleSeconds > 0 { + return rc.SettleSeconds + } + return 25 +} + +func (rc *RotationConfig) StallSecondsOrDefault() int { + if rc != nil && rc.StallSeconds > 0 { + return rc.StallSeconds + } + return 20 +} + // Endpoint is an auxiliary container in the test topology (see // TestCase.Endpoints). It's a host the subject reaches on the bench network — // not a generator or receiver. @@ -554,6 +620,13 @@ func (tc *TestCase) UsesVault() bool { return tc.Vault != nil } // DuckDB verifier container instead of a receiver. func (tc *TestCase) UsesVerifier() bool { return tc.Verifier != nil } +// IsDirectorAgentRotationType reports whether the case is the director↔agent +// TLS cert-rotation correctness flow, which has its own subject-driven (no +// generator) driver — see runDirectorAgentCertRotation. +func (tc *TestCase) IsDirectorAgentRotationType() bool { + return tc.Type == "director_agent_tls_cert_rotation_correctness" +} + // IsPerformanceType reports whether the case is scored as a throughput test — // the plain `performance` type or the Kafka variant `kafka_performance`. func (tc *TestCase) IsPerformanceType() bool { @@ -681,6 +754,45 @@ func (tc *TestCase) Validate() error { if err := tc.validateVerifier(); err != nil { return err } + if err := tc.validateRotation(); err != nil { + return err + } + return nil +} + +// validateRotation checks the optional `rotation:` block and the +// director_agent_tls_cert_rotation_correctness type's structural requirements: +// the block is required for (and only meaningful to) that type, the mode must be +// known, and the case must be subject-driven (an endpoint the director deploys +// onto, no generator) with a min_received floor for the verdict. +func (tc *TestCase) validateRotation() error { + if !tc.IsDirectorAgentRotationType() { + if tc.Rotation != nil { + return fmt.Errorf("case %q: `rotation:` is only valid for type director_agent_tls_cert_rotation_correctness", tc.Name) + } + return nil + } + if tc.Rotation == nil { + return fmt.Errorf("case %q: type director_agent_tls_cert_rotation_correctness requires a `rotation:` block", tc.Name) + } + switch tc.Rotation.Mode { + case RotationSameCA, RotationNewCARecover, RotationNewCAReject: + default: + return fmt.Errorf("case %q: rotation.mode %q must be one of %s, %s, %s", + tc.Name, tc.Rotation.Mode, RotationSameCA, RotationNewCARecover, RotationNewCAReject) + } + // The director drives data by collecting from an endpoint and forwarding — + // there is no generator, so the verdict rests on min_received plus the + // post-rotation count behaviour. Guard the two structural preconditions. + if tc.HasGenerator() { + return fmt.Errorf("case %q: director_agent_tls_cert_rotation_correctness is subject-driven and must not declare a generator", tc.Name) + } + if len(tc.Endpoints) == 0 { + return fmt.Errorf("case %q: director_agent_tls_cert_rotation_correctness requires an `endpoints:` block (the host the director deploys the agent onto)", tc.Name) + } + if tc.Correctness.MinReceived <= 0 { + return fmt.Errorf("case %q: director_agent_tls_cert_rotation_correctness requires correctness.min_received > 0", tc.Name) + } return nil } diff --git a/internal/orchestrator/tls.go b/internal/orchestrator/tls.go index 602d807..e05604f 100644 --- a/internal/orchestrator/tls.go +++ b/internal/orchestrator/tls.go @@ -248,6 +248,84 @@ func RotateServerCertWrongCA(outDir string, serverHosts []string) error { return writePEMKey(filepath.Join(abs, "server.key"), srvKey, 0o644) } +// RotateServerCertNewCA rotates the entire trust root: it generates a BRAND-NEW +// CA, overwrites ca.crt / ca.key in place, and re-signs server.crt / server.key +// under the new CA (same SAN set). Unlike RotateServerCertWrongCA — which throws +// the new CA away so nothing can ever trust the new leaf — this PERSISTS the new +// CA, so a party that re-reads ca.crt (e.g. a director serving it at +// /dl/cert.pem, or a bootstrap agent that re-fetches it) can recover trust and +// reconnect. It models a full CA rollover with re-distribution. Used by the +// director↔agent "new_ca_recover" rotation case. +// +// The client bundle (client.crt) is intentionally left untouched: the +// director↔agent cases have no generator/client leaf, and rewriting it would +// only matter to a client that pins the old CA, which this rollover deliberately +// supersedes. +func RotateServerCertNewCA(outDir string, serverHosts []string) error { + abs, err := filepath.Abs(outDir) + if err != nil { + return err + } + + // Fresh CA — PERSISTED to ca.crt/ca.key (the difference from + // RotateServerCertWrongCA), so anything that re-reads the CA can recover. + caKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + if err != nil { + return fmt.Errorf("generate new ca key: %w", err) + } + caTpl := &x509.Certificate{ + SerialNumber: bigSerial(), + Subject: pkix.Name{CommonName: "PipeBench Rotated CA"}, + NotBefore: time.Now().Add(-1 * time.Hour), + NotAfter: time.Now().Add(24 * time.Hour), + IsCA: true, + KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageCRLSign, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth}, + BasicConstraintsValid: true, + } + caDER, err := x509.CreateCertificate(rand.Reader, caTpl, caTpl, &caKey.PublicKey, caKey) + if err != nil { + return fmt.Errorf("sign new ca: %w", err) + } + caCert, err := x509.ParseCertificate(caDER) + if err != nil { + return fmt.Errorf("parse new ca: %w", err) + } + if err := writePEMCert(filepath.Join(abs, "ca.crt"), caDER); err != nil { + return err + } + if err := writePEMKey(filepath.Join(abs, "ca.key"), caKey, 0o600); err != nil { + return err + } + + if len(serverHosts) == 0 { + serverHosts = []string{"subject"} + } + serverDNS, serverIP := splitHosts(serverHosts) + srvKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + if err != nil { + return fmt.Errorf("generate server key: %w", err) + } + srvTpl := &x509.Certificate{ + SerialNumber: bigSerial(), + Subject: pkix.Name{CommonName: serverHosts[0]}, + NotBefore: time.Now().Add(-1 * time.Hour), + NotAfter: time.Now().Add(24 * time.Hour), + KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageKeyEncipherment, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, + DNSNames: serverDNS, + IPAddresses: serverIP, + } + srvDER, err := x509.CreateCertificate(rand.Reader, srvTpl, caCert, &srvKey.PublicKey, caKey) + if err != nil { + return fmt.Errorf("re-sign server cert under new ca: %w", err) + } + if err := writePEMCert(filepath.Join(abs, "server.crt"), srvDER); err != nil { + return err + } + return writePEMKey(filepath.Join(abs, "server.key"), srvKey, 0o644) +} + // loadCA reads and parses the CA cert + key written by GenerateTLSCerts. func loadCA(dir string) (*x509.Certificate, *ecdsa.PrivateKey, error) { certPEM, err := os.ReadFile(filepath.Join(dir, "ca.crt")) diff --git a/internal/runner/runner.go b/internal/runner/runner.go index 61e113d..4ccc0c0 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -198,6 +198,13 @@ func (r *Runner) Run(tc *config.TestCase, subject config.Subject) (results.RunRe if tc.Type == "syslog_tls_vault_cert_rotation_correctness" { return r.runSyslogVaultCertRotation(tc, subject) } + // Director↔agent TLS cert rotation: the director deploys an agent that + // streams back over its proxy_tls listener; mid-run the director's serving + // cert/CA is rotated on disk and the director is bounced so the enrolled + // agent must re-handshake and reconnect. Subject-driven (no generator). + if tc.IsDirectorAgentRotationType() { + return r.runDirectorAgentCertRotation(tc, subject) + } configName := r.opts.ConfigName @@ -732,7 +739,8 @@ func (r *Runner) Run(tc *config.TestCase, subject config.Subject) (results.RunRe result.FailReason = fmt.Sprintf( "expect_failure: data path was NOT blocked — receiver observed %s line(s) (> %s); "+ "the control under test (e.g. auth) appears bypassed", - formatCount(recvMetrics.LinesReceived), formatCount(cap)) + formatCount(recvMetrics.LinesReceived), formatCount(cap), + ) } } else if tc.IsCorrectnessType() && !tc.HasGenerator() { // No generator: there's no expected line count to derive loss or @@ -752,7 +760,8 @@ func (r *Runner) Run(tc *config.TestCase, subject config.Subject) (results.RunRe if !gotEnough { failReasons = append(failReasons, fmt.Sprintf( "expected >= %s received records, got %s", - formatCount(minRecv), formatCount(recvMetrics.LinesReceived))) + formatCount(minRecv), formatCount(recvMetrics.LinesReceived), + )) } passed := gotEnough && recvOK result.Passed = &passed @@ -786,13 +795,15 @@ func (r *Runner) Run(tc *config.TestCase, subject config.Subject) (results.RunRe if !lossOK { failReasons = append(failReasons, fmt.Sprintf( "expected loss <= %.2f%%, got %.2f%%", - tc.Correctness.ExpectedLossPct, lossPct)) + tc.Correctness.ExpectedLossPct, lossPct, + )) } if !overOK { extra := recvMetrics.LinesReceived - expectedOut failReasons = append(failReasons, fmt.Sprintf( "over-delivery: received %s lines but only %s were expected (%s extra/duplicate lines)", - formatCount(recvMetrics.LinesReceived), formatCount(expectedOut), formatCount(extra))) + formatCount(recvMetrics.LinesReceived), formatCount(expectedOut), formatCount(extra), + )) } passed := lossOK && overOK && recvOK @@ -1894,6 +1905,33 @@ func (r *Runner) runKafkaCertRotation(tc *config.TestCase, subject config.Subjec }) } +// subjectHasAgentPackage reports whether the subject (director) container ships +// the baked vmetric-agent that the push-deploy pushes to endpoints. The +// agent-capable image (vmetric/director-enterprise) ships +// /opt/vmetric/package/agent; the default vmetric/director image has no package +// dir at all, so a director↔agent case run on it can never deploy an agent. +// +// conclusive is false when the probe could not run (empty container, exec not +// ready, neither marker printed) so callers do NOT fast-fail on noise — they +// fall through to the timed wait. Only (present=false, conclusive=true) — the +// image demonstrably lacks the agent — is a safe fast-fail signal. +func subjectHasAgentPackage(container string) (present, conclusive bool) { + if container == "" { + return false, false + } + out, _ := exec.Command("docker", "exec", container, "sh", "-c", + "test -d /opt/vmetric/package/agent && echo PRESENT || echo ABSENT").CombinedOutput() + s := string(out) + switch { + case strings.Contains(s, "PRESENT"): + return true, true + case strings.Contains(s, "ABSENT"): + return false, true + default: + return false, false + } +} + // subjectLogStats returns (total non-empty log lines, cert-verification-error // lines) from the subject container's console log. The cert-error count is the // signature of a consumer rejecting an untrusted broker leaf; the total-line @@ -1922,6 +1960,348 @@ func subjectLogStats(container string) (int, int) { return total, certErrs } +// runDirectorAgentCertRotation drives the director↔agent TLS cert-rotation +// correctness flow. Unlike the kafka/syslog rotation cases (a generator feeds +// the subject), this case is SUBJECT-DRIVEN: the director SSH-deploys an agent +// onto an endpoint, the agent streams collected logs back over the director's +// proxy_tls listener, and the receiver counts what arrives. There is no +// generator, so the verdict rests on min_received plus proof that delivery +// RESUMED after the mid-run rotation (a live reconnect, not just replay of +// pre-rotation records). +// +// Disruption: the director's serving cert is a file path under the subject's +// CertDir, bind-mounted from a host dir the harness owns. Mid-run the harness +// rotates those files and bounces the director (StopServices→UpServices). A +// live wss/NATS session does not re-handshake on its own, so the bounce is the +// reconnect trigger; the already-running agent (the director re-attaches to it +// on restart — it does not re-deploy) reconnects against the rotated cert. +// +// Modes (rotation.mode): +// - same_ca: leaf re-signed under the same CA — transparent reconnect. +// - new_ca_recover: CA rolled over and re-served at /dl/cert.pem — a bootstrap +// agent must re-fetch the new CA and reconnect. +// - new_ca_reject: two-phase. Phase 1 rotates to an UNTRUSTED, unserved leaf; +// the agent MUST fail validation, so delivery STALLS (a missing stall is a +// SECURITY failure — the agent accepted an untrusted cert). Phase 2 restores +// a trusted leaf and delivery must resume. +func (r *Runner) runDirectorAgentCertRotation(tc *config.TestCase, subject config.Subject) (results.RunResult, error) { + configName := r.opts.ConfigName + subject = r.applySubjectOverrides(subject) + + fmt.Printf("→ test=%s subject=%s version=%s config=%s\n", + tc.Name, subject.Name, subject.Version, configName) + + configSrc, err := tc.ConfigFilePath(r.opts.CasesDir, configName, subject) + if err != nil { + return results.RunResult{}, err + } + configSrc, err = filepath.Abs(configSrc) + if err != nil { + return results.RunResult{}, fmt.Errorf("resolving config path: %w", err) + } + + tmpDir, err := os.MkdirTemp("", "bench-"+tc.Name+"-") + if err != nil { + return results.RunResult{}, err + } + if err := os.Chmod(tmpDir, 0o777); err != nil { + return results.RunResult{}, fmt.Errorf("chmod tmpdir: %w", err) + } + defer func() { + if !r.opts.NoCleanup { + os.RemoveAll(tmpDir) + } + }() + + caseDir, err := filepath.Abs(filepath.Join(r.opts.CasesDir, tc.Name)) + if err != nil { + return results.RunResult{}, fmt.Errorf("resolving case directory: %w", err) + } + + extraEnv := map[string]string{} + if cfg, ok := tc.Configurations[configName]; ok { + maps.Copy(extraEnv, cfg.Env) + } + + // The director's proxy_tls leaf is served to the agent on the "subject" + // alias; bake that (and localhost) into the SAN set. certsDir is rotated + // in place mid-run and reflected into the subject via the CertDir bind mount. + certsDir := filepath.Join(tmpDir, "certs") + hosts := []string{"subject", "localhost"} + if _, err := orchestrator.GenerateTLSCerts(certsDir, hosts); err != nil { + return results.RunResult{}, fmt.Errorf("generating TLS certs: %w", err) + } + + runCfg := orchestrator.RunConfig{ + TestCase: tc, + Subject: subject, + ConfigName: configName, + ConfigSrcPath: configSrc, + CaseDir: caseDir, + TmpDir: tmpDir, + GeneratorImage: r.opts.GeneratorImage, + ReceiverImage: r.opts.ReceiverImage, + CollectorImage: r.opts.CollectorImage, + ReceiverHostPort: r.opts.ReceiverHostPort, + ExtraSubjectEnv: extraEnv, + CPULimit: r.opts.CPULimit, + MemLimit: r.opts.MemLimit, + TLSCertsHost: certsDir, + } + + orch, err := orchestrator.NewComposeRunner(runCfg) + if err != nil { + return results.RunResult{}, fmt.Errorf("compose setup: %w", err) + } + + cleanup := []string{"bench-receiver", "bench-collector", "bench-subject-" + subject.Name} + for _, e := range tc.Endpoints { + cleanup = append(cleanup, "bench-"+e.Name) + } + for _, c := range cleanup { + _ = exec.Command("docker", "rm", "-f", c).Run() + } + _ = orch.Down() + + startTime := time.Now() + defer func() { + if !r.opts.NoCleanup { + fmt.Println(" tearing down…") + _ = orch.Down() + } + }() + + minRecv := tc.Correctness.MinReceived + if minRecv <= 0 { + minRecv = 1 + } + + fmt.Println(" starting all services (director deploys the agent; receiver UP throughout)…") + if err := orch.Up(); err != nil { + return results.RunResult{}, fmt.Errorf("starting services: %w", err) + } + + metricsPort, stopPortFwd, err := orch.ReceiverMetricsPort() + if err != nil { + return results.RunResult{}, fmt.Errorf("setting up receiver access: %w", err) + } + defer stopPortFwd() + + // Pre-flight: these cases need the agent-capable image. The default + // vmetric/director image ships no baked vmetric-agent (no + // /opt/vmetric/package/agent), so the director can never push/run an agent + // — the run would otherwise sit at 0 records until the initial-wait timeout + // and fail with a misleading "deploy/enroll/connect failed". Detect the wrong + // image up front and fail immediately with the actual cause. Inconclusive + // probes (container not ready) fall through to the timed wait below. + if present, conclusive := subjectHasAgentPackage(orch.SubjectContainer()); conclusive && !present { + return results.RunResult{}, fmt.Errorf( + "subject image %s:%s has no baked vmetric-agent (/opt/vmetric/package/agent missing): "+ + "director↔agent cert-rotation cases require the agent-capable image — "+ + "re-run with VMETRIC_IMAGE=vmetric/director-enterprise", + subject.Image, subject.Version, + ) + } + + // Phase 0 — wait for the initial deploy→enroll→stream chain to deliver at + // least min_received records, proving the agent connected before we disturb + // it. Budget off warmup + a deploy allowance, capped by the overall timeout. + warmup := tc.WarmupOrDefault(30 * time.Second) + initialBudget := min(warmup+3*time.Minute, r.opts.Timeout) + fmt.Printf(" waiting for initial delivery (receiver >= %s, up to %s)…\n", formatCount(minRecv), initialBudget) + var countAtRotation int64 + initialDeadline := time.Now().Add(initialBudget) + established := false + for time.Now().Before(initialDeadline) { + rm, qerr := r.queryReceiverMetrics(metricsPort, 10*time.Second) + if qerr == nil { + fmt.Printf(" received: %s\n", formatCount(rm.LinesReceived)) + if rm.LinesReceived >= minRecv { + countAtRotation = rm.LinesReceived + established = true + break + } + } + time.Sleep(2 * time.Second) + } + if !established { + return results.RunResult{}, fmt.Errorf( + "agent never delivered the initial %s records within %s (subject image %s:%s) — "+ + "deploy/enroll/connect failed; if this is not the agent-capable image, "+ + "re-run with VMETRIC_IMAGE=vmetric/director-enterprise", + formatCount(minRecv), initialBudget, subject.Image, subject.Version, + ) + } + fmt.Printf(" agent established — %s records before rotation ✓\n", formatCount(countAtRotation)) + + // Phase 1 — rotate the director cert per mode and bounce the director so the + // agent must re-handshake. + settle := time.Duration(tc.Rotation.SettleSecondsOrDefault()) * time.Second + switch tc.Rotation.Mode { + case config.RotationSameCA: + fmt.Println(" rotating director leaf under the SAME CA, then bouncing the director…") + if err := rotateAndReload(orch, "subject", func() error { + return orchestrator.RotateServerCert(certsDir, hosts) + }); err != nil { + return results.RunResult{}, err + } + time.Sleep(settle) + + case config.RotationNewCARecover: + fmt.Println(" rolling the director CA over (re-served at /dl/cert.pem), then bouncing the director…") + if err := rotateAndReload(orch, "subject", func() error { + return orchestrator.RotateServerCertNewCA(certsDir, hosts) + }); err != nil { + return results.RunResult{}, err + } + time.Sleep(settle) + + case config.RotationNewCAReject: + // Phase 1a (negative/security): untrusted, unserved leaf — the agent MUST + // reject it, so delivery stalls. Sample the count, rotate+bounce, wait the + // stall window, then require the count did NOT advance. + rmBefore, qerr := r.queryReceiverMetrics(metricsPort, 10*time.Second) + if qerr != nil { + return results.RunResult{}, fmt.Errorf("sampling receiver before untrusted rotation: %w", qerr) + } + stall := time.Duration(tc.Rotation.StallSecondsOrDefault()) * time.Second + fmt.Printf(" rotating director leaf to an UNTRUSTED CA (must be rejected), bouncing, then holding %s…\n", stall) + if err := rotateAndReload(orch, "subject", func() error { + return orchestrator.RotateServerCertWrongCA(certsDir, hosts) + }); err != nil { + return results.RunResult{}, err + } + time.Sleep(stall) + rmStall, qerr := r.queryReceiverMetrics(metricsPort, 10*time.Second) + if qerr != nil { + return results.RunResult{}, fmt.Errorf("sampling receiver during untrusted window: %w", qerr) + } + // A handful of in-flight records may land right as the session drops; + // tolerate a tiny leak but fail loudly if delivery clearly continued + // (which would mean the agent accepted the untrusted cert). + const stallLeak = 3 + advanced := rmStall.LinesReceived - rmBefore.LinesReceived + if advanced > stallLeak { + return results.RunResult{}, fmt.Errorf( + "SECURITY: delivery did NOT stall after rotating to an untrusted director cert "+ + "(%s new records during the %s window; before=%s after=%s) — the agent appears to accept untrusted certs", + formatCount(advanced), stall, formatCount(rmBefore.LinesReceived), formatCount(rmStall.LinesReceived), + ) + } + fmt.Printf(" delivery STALLED under the untrusted cert (%s new records) ✓\n", formatCount(advanced)) + + // Phase 1b (recovery): restore a trusted leaf under the original CA and + // bounce again; delivery must resume. + fmt.Println(" restoring a trusted director leaf, then bouncing the director…") + if err := rotateAndReload(orch, "subject", func() error { + return orchestrator.RotateServerCert(certsDir, hosts) + }); err != nil { + return results.RunResult{}, err + } + time.Sleep(settle) + + default: + return results.RunResult{}, fmt.Errorf("unknown rotation.mode %q", tc.Rotation.Mode) + } + + // Phase 2 — wait for the post-rotation RESUME, then drain to stable. The + // agent's reconnect is not instant: it must detect the dropped session and + // (for a CA rollover) re-fetch /dl/cert.pem with jitter + backoff, so the + // count legitimately sits flat at countAtRotation for a while. Crucially the + // stable-exit is gated on resume having been observed (count > countAtRotation) + // — otherwise that recovery gap looks "stable" and the drain bails before the + // agent ever reconnects (a false failure). Until resume, we keep polling for + // the full window; once resumed, six unchanged rounds means drained. + drainTimeout := 3 * time.Minute + if tc.Correctness.DrainSeconds > 0 { + drainTimeout = time.Duration(tc.Correctness.DrainSeconds) * time.Second + } + fmt.Printf(" waiting for resume then draining (up to %s)…\n", drainTimeout) + var lastCount int64 + stableRounds := 0 + resumedObserved := false + drainDeadline := time.Now().Add(drainTimeout) + for time.Now().Before(drainDeadline) { + time.Sleep(5 * time.Second) + rm, qerr := r.queryReceiverMetrics(metricsPort, 10*time.Second) + if qerr != nil { + continue + } + fmt.Printf(" received: %s\n", formatCount(rm.LinesReceived)) + if rm.LinesReceived > countAtRotation && !resumedObserved { + resumedObserved = true + fmt.Println(" delivery resumed after rotation ✓") + } + if resumedObserved && rm.LinesReceived == lastCount && rm.LinesReceived > 0 { + stableRounds++ + if stableRounds >= 6 { + fmt.Println(" receiver stable — drained") + break + } + } else { + stableRounds = 0 + } + lastCount = rm.LinesReceived + } + + recvMetrics, err := r.queryReceiverMetrics(metricsPort, 30*time.Second) + if err != nil { + return results.RunResult{}, fmt.Errorf("querying receiver metrics: %w", err) + } + + // Verdict: enough records overall AND delivery resumed after the rotation + // (proves a live reconnect, not just the pre-rotation backlog). The reject + // case additionally required the stall, already enforced inline above. + gotEnough := recvMetrics.LinesReceived >= minRecv + resumed := recvMetrics.LinesReceived > countAtRotation + passed := gotEnough && resumed + var errs []string + if !gotEnough { + errs = append(errs, fmt.Sprintf("expected >= %s received records, got %s", + formatCount(minRecv), formatCount(recvMetrics.LinesReceived))) + } + if !resumed { + errs = append(errs, fmt.Sprintf( + "delivery did not resume after rotation — %s records at rotation, %s at drain (agent never reconnected)", + formatCount(countAtRotation), formatCount(recvMetrics.LinesReceived), + )) + } + + elapsed := time.Since(startTime).Seconds() + fmt.Printf(" records at rotation: %s final: %s\n", formatCount(countAtRotation), formatCount(recvMetrics.LinesReceived)) + if passed { + fmt.Printf(" director↔agent cert rotation (%s): PASSED ✓\n", tc.Rotation.Mode) + } else { + fmt.Printf(" director↔agent cert rotation (%s): FAILED ✗\n", tc.Rotation.Mode) + } + + result := results.RunResult{ + TestName: tc.Name, + Config: configName, + Subject: subject.Name, + Version: subject.Version, + Hardware: hardwareID(), + Timestamp: startTime, + DurationSec: elapsed, + FirstReceivedNs: recvMetrics.FirstReceivedNs, + LastReceivedNs: recvMetrics.LastReceivedNs, + LinesOut: recvMetrics.LinesReceived, + BytesOut: recvMetrics.BytesReceived, + Passed: &passed, + } + if !passed { + result.FailReason = strings.Join(errs, "; ") + } + + dir, err := r.store.Save(result, "") + if err != nil { + return result, fmt.Errorf("saving results: %w", err) + } + fmt.Printf(" done. results → %s\n", dir) + + return result, nil +} + // runKafkaOffsetCommitRestart verifies that delivery-bound source // acknowledgments actually persist: every produced record is delivered to a // LIVE receiver, the subject is then restarted GRACEFULLY, and the restarted @@ -2131,7 +2511,8 @@ func (r *Runner) runKafkaOffsetCommitRestart(tc *config.TestCase, subject config if overPct > tc.Correctness.MaxOverDeliveryPct { errors = append(errors, fmt.Sprintf( "expected over-delivery <= %.2f%%, got %.2f%% (%s duplicate lines) — restart re-consumed records whose offsets should have been committed", - tc.Correctness.MaxOverDeliveryPct, overPct, formatCount(extra))) + tc.Correctness.MaxOverDeliveryPct, overPct, formatCount(extra), + )) } passed := len(errors) == 0 @@ -2709,7 +3090,8 @@ func (r *Runner) runSyslogVaultCertRotation(tc *config.TestCase, subject config. "SECURITY: receiver count never stalled after wrong-CA rotation "+ "(last count: %d) — director did not serve the untrusted cert, "+ "or generator ignored cert validation; check debug.console.status: true logs", - lastCount) + lastCount, + ) } // ---- Phase 2: TRUSTED cert restored — recovery ---- From 8968ade53a29ac2f7f09cababd394d1d8040a04d Mon Sep 17 00:00:00 2001 From: Eren Aslan <16862833+erenaslandev@users.noreply.github.com> Date: Tue, 23 Jun 2026 21:39:04 +0300 Subject: [PATCH 2/5] fix(config): reject negative rotation settle/stall seconds --- internal/config/case.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/internal/config/case.go b/internal/config/case.go index 8247fcc..ecff5e0 100644 --- a/internal/config/case.go +++ b/internal/config/case.go @@ -826,6 +826,14 @@ func (tc *TestCase) validateRotation() error { return fmt.Errorf("case %q: rotation.mode %q must be one of %s, %s, %s", tc.Name, tc.Rotation.Mode, RotationSameCA, RotationNewCARecover, RotationNewCAReject) } + // 0/unset defaults via SettleSecondsOrDefault/StallSecondsOrDefault; reject + // negatives so a typo like `settle_seconds: -5` can't be silently defaulted. + if tc.Rotation.SettleSeconds < 0 { + return fmt.Errorf("case %q: rotation.settle_seconds must be >= 0 (0/unset defaults to 25), got %d", tc.Name, tc.Rotation.SettleSeconds) + } + if tc.Rotation.StallSeconds < 0 { + return fmt.Errorf("case %q: rotation.stall_seconds must be >= 0 (0/unset defaults to 20), got %d", tc.Name, tc.Rotation.StallSeconds) + } // The director drives data by collecting from an endpoint and forwarding — // there is no generator, so the verdict rests on min_received plus the // post-rotation count behaviour. Guard the two structural preconditions. From 1adb6eb3050582a80fca351fde149d139c0d34e0 Mon Sep 17 00:00:00 2001 From: Eren Aslan <16862833+erenaslandev@users.noreply.github.com> Date: Tue, 23 Jun 2026 21:41:50 +0300 Subject: [PATCH 3/5] fix(runner): harden director-agent cert rotation correctness checks --- internal/runner/runner.go | 104 +++++++++++++++++++++++++++++++------- 1 file changed, 86 insertions(+), 18 deletions(-) diff --git a/internal/runner/runner.go b/internal/runner/runner.go index 4ccc0c0..e72649c 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -1919,7 +1919,12 @@ func subjectHasAgentPackage(container string) (present, conclusive bool) { if container == "" { return false, false } - out, _ := exec.Command("docker", "exec", container, "sh", "-c", + // Bound the probe: a stalled `docker exec` must not hang the whole run. A + // timeout (or any error) leaves the result inconclusive so the caller falls + // through to the timed delivery wait rather than failing on a flaky probe. + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + out, _ := exec.CommandContext(ctx, "docker", "exec", container, "sh", "-c", "test -d /opt/vmetric/package/agent && echo PRESENT || echo ABSENT").CombinedOutput() s := string(out) switch { @@ -1988,6 +1993,16 @@ func (r *Runner) runDirectorAgentCertRotation(tc *config.TestCase, subject confi configName := r.opts.ConfigName subject = r.applySubjectOverrides(subject) + // This specialized handler is dispatched before the generic capability guard + // in Run, so honor the case's `requires:` here too — otherwise a rotation case + // declaring a capability could start against a subject that lacks it. + for _, capName := range tc.Requires { + if !subject.HasCapability(capName) { + return results.RunResult{}, fmt.Errorf("subject %q does not declare capability %q required by case %q", + subject.Name, capName, tc.Name) + } + } + fmt.Printf("→ test=%s subject=%s version=%s config=%s\n", tc.Name, subject.Name, subject.Version, configName) @@ -2004,7 +2019,10 @@ func (r *Runner) runDirectorAgentCertRotation(tc *config.TestCase, subject confi if err != nil { return results.RunResult{}, err } - if err := os.Chmod(tmpDir, 0o777); err != nil { + // World-writable so container UIDs can write the mounted cert/compose files, + // plus the sticky bit so a co-tenant on a multi-user host can't unlink or + // replace this run's generated certs/compose. + if err := os.Chmod(tmpDir, 0o1777); err != nil { return results.RunResult{}, fmt.Errorf("chmod tmpdir: %w", err) } defer func() { @@ -2064,6 +2082,18 @@ func (r *Runner) runDirectorAgentCertRotation(tc *config.TestCase, subject confi _ = orch.Down() startTime := time.Now() + // All rotation waits (settle, stall, drain) are clamped to the overall run + // deadline so a run can't keep sleeping/polling past Options.Timeout — the + // same bound the persistence drivers apply via runDeadline. + runDeadline := startTime.Add(r.opts.Timeout) + sleepWithinDeadline := func(d time.Duration) error { + rem := time.Until(runDeadline) + if rem <= 0 { + return fmt.Errorf("run timeout (%s) exceeded before the rotation wait completed", r.opts.Timeout) + } + time.Sleep(min(d, rem)) + return nil + } defer func() { if !r.opts.NoCleanup { fmt.Println(" tearing down…") @@ -2135,26 +2165,44 @@ func (r *Runner) runDirectorAgentCertRotation(tc *config.TestCase, subject confi fmt.Printf(" agent established — %s records before rotation ✓\n", formatCount(countAtRotation)) // Phase 1 — rotate the director cert per mode and bounce the director so the - // agent must re-handshake. + // agent must re-handshake. resumeBaseline is the receiver count captured + // immediately before the disruption (and, for reject, AFTER the stall) so the + // "delivery resumed" verdict reflects only genuine post-disruption reconnects — + // not pre-rotation arrivals still draining, nor the tolerated stall leak. settle := time.Duration(tc.Rotation.SettleSecondsOrDefault()) * time.Second + resumeBaseline := countAtRotation switch tc.Rotation.Mode { case config.RotationSameCA: + rmBefore, qerr := r.queryReceiverMetrics(metricsPort, 10*time.Second) + if qerr != nil { + return results.RunResult{}, fmt.Errorf("sampling receiver before same-CA rotation: %w", qerr) + } + resumeBaseline = rmBefore.LinesReceived fmt.Println(" rotating director leaf under the SAME CA, then bouncing the director…") if err := rotateAndReload(orch, "subject", func() error { return orchestrator.RotateServerCert(certsDir, hosts) }); err != nil { return results.RunResult{}, err } - time.Sleep(settle) + if err := sleepWithinDeadline(settle); err != nil { + return results.RunResult{}, err + } case config.RotationNewCARecover: + rmBefore, qerr := r.queryReceiverMetrics(metricsPort, 10*time.Second) + if qerr != nil { + return results.RunResult{}, fmt.Errorf("sampling receiver before CA rollover: %w", qerr) + } + resumeBaseline = rmBefore.LinesReceived fmt.Println(" rolling the director CA over (re-served at /dl/cert.pem), then bouncing the director…") if err := rotateAndReload(orch, "subject", func() error { return orchestrator.RotateServerCertNewCA(certsDir, hosts) }); err != nil { return results.RunResult{}, err } - time.Sleep(settle) + if err := sleepWithinDeadline(settle); err != nil { + return results.RunResult{}, err + } case config.RotationNewCAReject: // Phase 1a (negative/security): untrusted, unserved leaf — the agent MUST @@ -2171,7 +2219,9 @@ func (r *Runner) runDirectorAgentCertRotation(tc *config.TestCase, subject confi }); err != nil { return results.RunResult{}, err } - time.Sleep(stall) + if err := sleepWithinDeadline(stall); err != nil { + return results.RunResult{}, err + } rmStall, qerr := r.queryReceiverMetrics(metricsPort, 10*time.Second) if qerr != nil { return results.RunResult{}, fmt.Errorf("sampling receiver during untrusted window: %w", qerr) @@ -2188,6 +2238,9 @@ func (r *Runner) runDirectorAgentCertRotation(tc *config.TestCase, subject confi formatCount(advanced), stall, formatCount(rmBefore.LinesReceived), formatCount(rmStall.LinesReceived), ) } + // Resume must be measured against the stalled level, not the pre-rotation + // count — the ≤stallLeak in-flight records are not a reconnect. + resumeBaseline = rmStall.LinesReceived fmt.Printf(" delivery STALLED under the untrusted cert (%s new records) ✓\n", formatCount(advanced)) // Phase 1b (recovery): restore a trusted leaf under the original CA and @@ -2198,7 +2251,9 @@ func (r *Runner) runDirectorAgentCertRotation(tc *config.TestCase, subject confi }); err != nil { return results.RunResult{}, err } - time.Sleep(settle) + if err := sleepWithinDeadline(settle); err != nil { + return results.RunResult{}, err + } default: return results.RunResult{}, fmt.Errorf("unknown rotation.mode %q", tc.Rotation.Mode) @@ -2207,8 +2262,8 @@ func (r *Runner) runDirectorAgentCertRotation(tc *config.TestCase, subject confi // Phase 2 — wait for the post-rotation RESUME, then drain to stable. The // agent's reconnect is not instant: it must detect the dropped session and // (for a CA rollover) re-fetch /dl/cert.pem with jitter + backoff, so the - // count legitimately sits flat at countAtRotation for a while. Crucially the - // stable-exit is gated on resume having been observed (count > countAtRotation) + // count legitimately sits flat at resumeBaseline for a while. Crucially the + // stable-exit is gated on resume having been observed (count > resumeBaseline) // — otherwise that recovery gap looks "stable" and the drain bails before the // agent ever reconnects (a false failure). Until resume, we keep polling for // the full window; once resumed, six unchanged rounds means drained. @@ -2220,7 +2275,11 @@ func (r *Runner) runDirectorAgentCertRotation(tc *config.TestCase, subject confi var lastCount int64 stableRounds := 0 resumedObserved := false + // Clamp the drain to the overall run deadline so it can't poll past Options.Timeout. drainDeadline := time.Now().Add(drainTimeout) + if drainDeadline.After(runDeadline) { + drainDeadline = runDeadline + } for time.Now().Before(drainDeadline) { time.Sleep(5 * time.Second) rm, qerr := r.queryReceiverMetrics(metricsPort, 10*time.Second) @@ -2228,7 +2287,7 @@ func (r *Runner) runDirectorAgentCertRotation(tc *config.TestCase, subject confi continue } fmt.Printf(" received: %s\n", formatCount(rm.LinesReceived)) - if rm.LinesReceived > countAtRotation && !resumedObserved { + if rm.LinesReceived > resumeBaseline && !resumedObserved { resumedObserved = true fmt.Println(" delivery resumed after rotation ✓") } @@ -2249,26 +2308,35 @@ func (r *Runner) runDirectorAgentCertRotation(tc *config.TestCase, subject confi return results.RunResult{}, fmt.Errorf("querying receiver metrics: %w", err) } - // Verdict: enough records overall AND delivery resumed after the rotation - // (proves a live reconnect, not just the pre-rotation backlog). The reject - // case additionally required the stall, already enforced inline above. + // Verdict: enough records overall, delivery resumed past the pre-rotation + // baseline (a live reconnect, not just drained backlog), AND the receiver + // raised no content failure. The reject case additionally required the stall, + // already enforced inline above. gotEnough := recvMetrics.LinesReceived >= minRecv - resumed := recvMetrics.LinesReceived > countAtRotation - passed := gotEnough && resumed + resumed := recvMetrics.LinesReceived > resumeBaseline + recvOK := recvMetrics.Passed == nil || *recvMetrics.Passed + passed := gotEnough && resumed && recvOK var errs []string + if !recvOK { + if len(recvMetrics.Errors) > 0 { + errs = append(errs, recvMetrics.Errors...) + } else { + errs = append(errs, "receiver flagged a content failure") + } + } if !gotEnough { errs = append(errs, fmt.Sprintf("expected >= %s received records, got %s", formatCount(minRecv), formatCount(recvMetrics.LinesReceived))) } if !resumed { errs = append(errs, fmt.Sprintf( - "delivery did not resume after rotation — %s records at rotation, %s at drain (agent never reconnected)", - formatCount(countAtRotation), formatCount(recvMetrics.LinesReceived), + "delivery did not resume after rotation — %s records pre-rotation, %s at drain (agent never reconnected)", + formatCount(resumeBaseline), formatCount(recvMetrics.LinesReceived), )) } elapsed := time.Since(startTime).Seconds() - fmt.Printf(" records at rotation: %s final: %s\n", formatCount(countAtRotation), formatCount(recvMetrics.LinesReceived)) + fmt.Printf(" pre-rotation baseline: %s final: %s\n", formatCount(resumeBaseline), formatCount(recvMetrics.LinesReceived)) if passed { fmt.Printf(" director↔agent cert rotation (%s): PASSED ✓\n", tc.Rotation.Mode) } else { From e09e64eb5a9fbbee9452c586098abed8795526e6 Mon Sep 17 00:00:00 2001 From: Eren Aslan <16862833+erenaslandev@users.noreply.github.com> Date: Tue, 23 Jun 2026 21:54:10 +0300 Subject: [PATCH 4/5] fix(runner): correct inefficient assignment --- internal/runner/runner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/runner/runner.go b/internal/runner/runner.go index e72649c..11ea895 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -2170,7 +2170,7 @@ func (r *Runner) runDirectorAgentCertRotation(tc *config.TestCase, subject confi // "delivery resumed" verdict reflects only genuine post-disruption reconnects — // not pre-rotation arrivals still draining, nor the tolerated stall leak. settle := time.Duration(tc.Rotation.SettleSecondsOrDefault()) * time.Second - resumeBaseline := countAtRotation + var resumeBaseline int64 switch tc.Rotation.Mode { case config.RotationSameCA: rmBefore, qerr := r.queryReceiverMetrics(metricsPort, 10*time.Second) From b199cf431fcb4de44fa7ee0a67333a450fa690f9 Mon Sep 17 00:00:00 2001 From: Eren Aslan <16862833+erenaslandev@users.noreply.github.com> Date: Wed, 24 Jun 2026 09:38:08 +0300 Subject: [PATCH 5/5] feat: implement rotation handling --- internal/config/case.go | 66 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/internal/config/case.go b/internal/config/case.go index ecff5e0..d6ff6cc 100644 --- a/internal/config/case.go +++ b/internal/config/case.go @@ -514,6 +514,65 @@ type AgentConfig struct { MountsSharedData bool `yaml:"mounts_shared_data"` } +// Rotation parameterizes the director_agent_tls_cert_rotation_correctness +// driver (see TestCase.Rotation, runDirectorAgentCertRotation). The director +// deploys an agent that streams back over the proxy_tls listener; mid-run the +// director's serving cert/CA is rotated on disk and the director is bounced so +// the enrolled agent must re-handshake. Mode selects which rotation and the +// expected agent response. +type RotationConfig struct { + // Mode selects the mid-run rotation: + // "same_ca" — re-sign the director leaf under the SAME CA + // (RotateServerCert). The agent reconnects transparently + // because the chain still validates. Verdict: delivery + // resumes (count grows after the bounce). + // "new_ca_recover" — rotate to a BRAND-NEW CA written to ca.crt and served + // at /dl/cert.pem (RotateServerCertNewCA). A bootstrap + // agent (no operator-pinned CA) must re-fetch the new CA + // and reconnect. Verdict: delivery resumes. + // "new_ca_reject" — TWO PHASE. Phase 1 re-signs the leaf under an UNTRUSTED + // CA the director never serves (RotateServerCertWrongCA): + // the agent MUST fail validation, so delivery STALLS + // (a missing stall is a SECURITY failure — validation is + // disabled). Phase 2 restores a trusted leaf + // (RotateServerCert) and delivery must resume. + Mode string `yaml:"mode"` + + // SettleSeconds is the pause after a rotation+bounce before the driver samples + // the receiver, giving the director time to rebind its listener and the agent + // time to detect the dropped session and reconnect (default 25s). + SettleSeconds int `yaml:"settle_seconds"` + + // StallSeconds (new_ca_reject only) is how long the receiver count must hold + // flat after the untrusted rotation for the bad cert to count as rejected + // (default 20s). The case's endpoint seed loop MUST still be appending fresh + // records during this window, else the stall is vacuous — see the case NOTES. + StallSeconds int `yaml:"stall_seconds"` +} + +// Rotation mode values for RotationConfig.Mode. +const ( + RotationSameCA = "same_ca" + RotationNewCARecover = "new_ca_recover" + RotationNewCAReject = "new_ca_reject" +) + +// SettleSecondsOrDefault / StallSecondsOrDefault centralize the rotation timing +// defaults so the driver and any caller agree. +func (rc *RotationConfig) SettleSecondsOrDefault() int { + if rc != nil && rc.SettleSeconds > 0 { + return rc.SettleSeconds + } + return 25 +} + +func (rc *RotationConfig) StallSecondsOrDefault() int { + if rc != nil && rc.StallSeconds > 0 { + return rc.StallSeconds + } + return 20 +} + // Endpoint is an auxiliary container in the test topology (see // TestCase.Endpoints). It's a host the subject reaches on the bench network — // not a generator or receiver. @@ -670,6 +729,13 @@ func (tc *TestCase) IsDirectorAgentRotationType() bool { // topology. The agent connects into the subject rather than being connected to. func (tc *TestCase) UsesAgent() bool { return tc.Agent != nil } +// IsDirectorAgentRotationType reports whether the case is the director↔agent +// TLS cert-rotation correctness flow, which has its own subject-driven (no +// generator) driver — see runDirectorAgentCertRotation. +func (tc *TestCase) IsDirectorAgentRotationType() bool { + return tc.Type == "director_agent_tls_cert_rotation_correctness" +} + // IsPerformanceType reports whether the case is scored as a throughput test — // the plain `performance` type or the Kafka variant `kafka_performance`. func (tc *TestCase) IsPerformanceType() bool {