diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e7c7f3b..e10848c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -70,7 +70,18 @@ jobs: run: uv sync --locked --only-group test - name: Run tests with coverage - run: uv run pytest --cov --cov-report=xml --cov-report=term + # `coverage run -m pytest` (NOT `pytest --cov`): the pytest11 + # entry point in cfn_handler.testing.fixtures causes pytest to + # import cfn_handler during plugin collection, BEFORE pytest-cov + # attaches its instrumentation. Module-level code in __init__.py + # then runs uninstrumented and the report shows artificial 0% + # coverage on those lines, dropping aggregate to ~68%. Using + # `coverage run` ensures coverage's tracer is active before any + # imports. Same pattern as `just test-cov`. + run: | + uv run coverage run -m pytest + uv run coverage xml + uv run coverage report - name: Upload coverage to Codecov if: matrix.runner == 'ubuntu-24.04' && matrix.python == '3.12' diff --git a/README.md b/README.md index 804a32c..0aa2c58 100644 --- a/README.md +++ b/README.md @@ -132,6 +132,45 @@ Working SAM-deployable examples live in `examples/`: - `examples/with-physical-id/` — explicit physical resource id (replacement on update). - `examples/failing/` — handler that fails, demonstrating FAILED-response semantics. +## Testing your handlers + +`cfn-handler` ships first-class testing helpers under `cfn_handler.testing`. +The dispatch flow runs in-process; no HTTP, no boto3, no moto setup +required for unit tests: + +```python +from cfn_handler import CustomResource +from cfn_handler.testing import assert_success, make_event + +def test_my_create_handler(): + resource = CustomResource() + + @resource.create + def on_create(event, context): + return {"Endpoint": "https://x.example"} + + replay = resource.replay(make_event()) + assert_success(replay, data={"Endpoint": "https://x.example"}) +``` + +Available surface: + +- `CustomResource.replay(event, context=None)` — execute the dispatch + in-process, returning a structured `Replay` (status, data, reason, + payload, ...). +- `make_event(...)`, `make_context(...)` — factories with safe defaults. +- `assert_success`, `assert_failed`, `assert_deferred` — assertion helpers + with informative messages on failure. +- pytest fixtures `cfn_create_event`, `cfn_update_event`, + `cfn_delete_event`, `cfn_lambda_context` — auto-discovered via the + `pytest11` entry point; no `pytest_plugins` declaration needed. + +For long-running (polled) handlers, the first `replay()` returns +`Replay(status="DEFERRED")` and mutates the event with marker keys. +A second `replay()` with the mutated event resumes through the poll +handler — useful for testing both halves of a polled lifecycle without +provisioning EventBridge rules. + ## Project status v1.0.0 — first stable release. Follows [Semantic Versioning](https://semver.org). diff --git a/docs/ROADMAP.md b/docs/ROADMAP.md new file mode 100644 index 0000000..8ccdb12 --- /dev/null +++ b/docs/ROADMAP.md @@ -0,0 +1,186 @@ +# Roadmap + +This document captures the directional thinking for `cfn-handler`'s +post-1.x evolution. It's not a commitment — items move freely between +sections as priorities shift. + +OpenSpec changes are the canonical, executable plan. This roadmap is the +*context* those changes live in. + +--- + +## Vision + +`cfn-handler` is a small, focused, zero-dependency library for writing +CloudFormation Custom Resource Lambda handlers. The library does the +boilerplate (lifecycle dispatch, presigned-URL response, polling via +EventBridge, response-status handling) so users focus on the resource +logic. + +The library is **not** a framework. It does not own the user's logging +stack, idempotency layer, type system, or deployment toolchain. It +*interoperates* — cleanly — with whatever the user already runs. + +--- + +## Non-goals + +These have been considered and explicitly declined. Listed here so we +don't re-litigate them. + +- **CDK construct library** — `aws-cdk.custom-resources.Provider` already + occupies that niche, and the audience overlap (CDK + Python + custom + resources + cfn-handler) is small. SAM/raw-CFN is the consumer + identity. +- **Step Functions / "Durable" orchestration for >15 min runs** — a + non-problem. Polling already re-invokes the Lambda via EventBridge + scheduled rules; the actual ceiling is the CFN per-resource timeout + and the response-URL TTL, both well above 15 minutes. +- **CloudFormation macros / Transforms** — antipattern. Account-global + registration, hard to debug, requires `CAPABILITY_AUTO_EXPAND`. The + modern alternative (CFN Resource Type Registry) is parked separately + below. +- **Async/await handlers** — `async def on_create(...)` would let users + use async SDKs natively, but most CFN custom resources are I/O-light. + Park unless users ask. +- **CDK usage docs** — same reasoning as the construct library. Audience + too small to justify maintenance overhead. Users who need to call + `cfn-handler` from a CDK-deployed Lambda will figure out the obvious + shape. + +--- + +## Active priorities (next 3–6 months) + +These are scoped, designed, and ready to (or already) ship. Each becomes +its own OpenSpec change. + +### 1. ~~Testing helpers~~ — shipped in `v1.3.0` + +Shipped as `cfn_handler.testing`: `replay()`, `Replay` dataclass, +`make_event` / `make_context` factories, `assert_success` / `assert_failed` +/ `assert_deferred` helpers, and pytest fixtures auto-discovered via the +`pytest11` entry point. The legacy `test_mode` / `last_response` surface +is soft-deprecated (DeprecationWarning) and scheduled for removal in v2.0. + +See: archived OpenSpec change `add-testing-helpers` (search +`openspec/changes/archive/`) and the `testing-helpers` capability spec. + +### 2. Better logging — likely `v1.4.0` + +**Shape**: +- A stdlib `logging.Filter` that injects CFN context fields + (`StackId`, `LogicalResourceId`, `RequestId`, `RequestType`) into + every log record produced during a flow. Works with any logger. +- A `setup_logging()` opt-in convenience that attaches a JSON formatter + and the context filter to the `cfn_handler` logger. +- Internal lifecycle state-transition logs at INFO: + `dispatching:create → handler:returned → response:sent`. + +**Powertools interop**: the context filter is a plain `logging.Filter`; +Powertools users `.addFilter()` it on the Powertools `Logger`. No +Powertools dep. Document the recipe. + +### 3. Optional idempotency module — likely `v1.5.0` + +**Shape**: +- New module `cfn_handler.idempotency` (importable but not in `__all__`). +- Pluggable backend protocol; the user provides their own DynamoDB + table, S3 bucket, or in-memory cache (the last useful only for + testing). +- Usage: `@resource.idempotent(backend=...)` wrapping `@resource.create` + etc. Caches the response keyed by `RequestId`; on re-invocation + with the same `RequestId`, replays the cached response. + +**Powertools interop**: ship a thin adapter that wraps Powertools' +`@idempotent` so users with that already wired don't need a second +backend. + +**No hard dep on Powertools.** + +--- + +## Future / unscoped + +Ideas worth doing, not yet ready. + +### Typed events — `v2.0` candidate + +`event: dict[str, Any]` everywhere is an anti-pattern given the project's +"Rust-style error handling" stance. Concrete plan when we get there: + +- `cfn_handler.events` module with `CreateEvent`, `UpdateEvent`, + `DeleteEvent` `TypedDict` definitions covering the documented CFN + custom-resource event shape. +- Handler decorators preserve the existing `event: dict[str, Any]` + signature for backwards compatibility, **and** accept handlers typed + with the new TypedDicts via overloads. +- Powertools interop: structural compatibility with + `aws_lambda_powertools.utilities.parser.models.CloudFormationCustomResourceEvent` + (pydantic model) — both should satisfy the same `Mapping`-shaped + protocol so users can mix-and-match without runtime cost. + +This is a breaking change to the `__all__` surface, hence v2.0. + +### CFN Resource Type Registry support + +The modern CFN private registry mechanism (`AWS::CloudFormation::Resource`) +lets you publish a resource type with a JSON schema, consumed in templates +as `MyOrg::MyService::MyResource`. It overlaps with what `cfn-handler` +does today (Lambda-backed custom resources) but the deployment surface +and user experience are very different: + +- Resource Types are deployed via the CFN registry, not as Lambda + functions in the user's account. +- Schema-validated input/output. +- Versioning is handled by CFN, not the consumer. + +Worth a real exploration session before committing — this is arguably a +*different product* than `cfn-handler` is today, and might be more sensibly +shipped as a separate library that shares no code with the runtime. + +**Status**: parked, low priority, requires research spike. + +--- + +## Parallel-track work + +### `cfn-lint-cfn-handler` plugin (separate repo) + +A cfn-lint rule plugin catching `cfn-handler`-specific misconfigurations +(Lambda timeout too low, polling-using handler missing IAM permissions, +wrong-region layer ARN against our published manifest). + +Lives in its own GitHub repo (`cfn-lint-cfn-handler`) for release-cadence +independence. Bootstrap context for that repo is in +`tmp/cfn-lint-plugin-bootstrap.md` (will be deleted once the new repo is +live). + +The headline rule is **W9105: cfn-handler layer ARN doesn't match the +deployment region**. It consumes the `layer-arns.json` manifest published +with every `cfn-handler` release. No-one else can write that rule. + +--- + +## Decision log + +Things we considered and decided about, kept here for future reference. + +| Decision | Date | Rationale | +|---------------------------------------------------------|------------|-------------------------------------------------------------------------------------------------| +| Skip CDK construct | 2026-05-22 | `aws-cdk.custom-resources.Provider` covers it; audience overlap with cfn-handler users is small | +| Drop SFN/Durable for long-running ops | 2026-05-22 | Polling already re-invokes via EventBridge; 15 min Lambda limit is not the actual ceiling | +| Hard-pass on CFN macros | 2026-05-22 | Account-global, hard to debug, `CAPABILITY_AUTO_EXPAND` ergonomics | +| cfn-lint plugin in separate repo (not monorepo) | 2026-05-22 | Independent versioning, simpler CI, avoids workspace-restructure overhead | +| Powertools interop via duck-typing, never as a hard dep | 2026-05-22 | Zero-dep posture is core; Powertools users get free interop, non-Powertools users unaffected | +| Testing helpers ship before logging improvements | 2026-05-22 | Motivation; testing surface design might inform logging surface design | + +--- + +## Updating this document + +- New idea? Add it under "Future / unscoped" with a one-paragraph shape. +- Idea picked up for work? Move to "Active priorities" with a target version. +- Idea declined? Move to "Non-goals" or the "Decision log" with rationale. +- Idea shipped? Delete from this doc; the OpenSpec spec captures the + durable contract. diff --git a/justfile b/justfile index 7c8e608..e683b5e 100644 --- a/justfile +++ b/justfile @@ -21,8 +21,16 @@ test-integration: uv run pytest tests/integration # Run the test suite with coverage gate (fails below 95%). +# +# Uses `coverage run -m pytest` (NOT `pytest --cov`) so coverage's +# instrumentation hooks attach BEFORE the pytest11 entry point loads +# `cfn_handler.testing.fixtures` and transitively imports `cfn_handler`. +# Otherwise module-level code in __init__.py runs before coverage starts +# and the report shows artificial 0% coverage on those lines. test-cov: - uv run pytest --cov --cov-report=term-missing --cov-report=html + uv run coverage run -m pytest + uv run coverage report --show-missing --fail-under=95 + uv run coverage html # Watch tests (re-run on file change). Requires pytest-watcher; install via `uv add --group test pytest-watcher` if not already. test-watch: @@ -138,8 +146,21 @@ test-matrix-arm64: _check-act # 4b. ci.yml `lint` job — ruff, ruff-format, mypy strict, pyright strict # (~30s). # 4c. examples-lint.yml — cfn-lint over examples/**/template.yaml (~30s). -# 5. codeql.yml — Python security-and-quality scan (~1-8 min, slower -# on first run while CodeQL bundle downloads). +# 5. codeql.yml — INTENTIONALLY SKIPPED. The CodeQL Action's +# post-analysis step calls GitHub's REST API +# (/repos/{owner}/{repo}/actions/runs/{run_id}) for telemetry +# and status reporting; under `act` the synthesized GITHUB_RUN_ID +# doesn't exist on github.com, the call 404s, and the action +# classifies the job as JOB_STATUS_CONFIGURATION_ERROR even when +# the analysis itself succeeded with zero findings. CodeQL is +# validated by the real GH Actions run on every PR; replicating +# it here would always report a false-negative failure. +# To run CodeQL locally on demand: +# act push -W .github/workflows/codeql.yml \ +# --container-architecture linux/amd64 \ +# --secret GITHUB_TOKEN="$(gh auth token)" +# (Inspect the SARIF in /Users//.../results/python.sarif — +# configuration-error exits with the SARIF generated mean clean.) gha-pre-release: _check-act _check-gh-token _check-docker _check-npm #!/usr/bin/env bash set -uo pipefail @@ -149,13 +170,13 @@ gha-pre-release: _check-act _check-gh-token _check-docker _check-npm --secret GITHUB_TOKEN="$(gh auth token)" ) - echo "==> [1/7] secure-workflows.yml — SHA-pin enforcement" + echo "==> [1/6] secure-workflows.yml — SHA-pin enforcement" act pull_request -W .github/workflows/secure-workflows.yml "${common_flags[@]}" \ --action-cache-path /tmp/act-cache-secure-workflows \ || { echo; echo "FAIL: secure-workflows.yml"; exit 1; } echo - echo "==> [2/7] Docker action manifest probe" + echo "==> [2/6] Docker action manifest probe" # Match `uses: /@` in every workflow file, then for any # action that publishes a Docker image at ghcr.io//, verify # the SHA resolves to a real image. Currently this is just @@ -197,7 +218,7 @@ gha-pre-release: _check-act _check-gh-token _check-docker _check-npm echo " (all Docker action images resolve)" echo - echo "==> [3/7] release-please uv.lock validator" + echo "==> [3/6] release-please uv.lock validator" # Loads release-please's GenericToml updater locally and exercises it # against the real uv.lock + the jsonpath in release-please-config.json. # `npm ci` is strict-lockfile (matches our uv --locked posture); install @@ -209,30 +230,34 @@ gha-pre-release: _check-act _check-gh-token _check-docker _check-npm ) || { echo; echo "FAIL: release-please uv.lock validator"; exit 1; } echo - echo "==> [4a/7] ci.yml — test matrix (amd64 + arm64 in parallel)" + echo "==> [4a/6] ci.yml — test matrix (amd64 + arm64 in parallel)" just test-matrix \ || { echo; echo "FAIL: ci.yml test matrix"; exit 1; } echo - echo "==> [4b/7] ci.yml — lint+typecheck job" + echo "==> [4b/6] ci.yml — lint+typecheck job" act pull_request -W .github/workflows/ci.yml "${common_flags[@]}" --job lint \ --action-cache-path /tmp/act-cache-lint \ || { echo; echo "FAIL: ci.yml lint job"; exit 1; } echo - echo "==> [4c/7] examples-lint.yml — cfn-lint over examples" + echo "==> [4c/6] examples-lint.yml — cfn-lint over examples" act pull_request -W .github/workflows/examples-lint.yml "${common_flags[@]}" \ --action-cache-path /tmp/act-cache-examples-lint \ || { echo; echo "FAIL: examples-lint.yml"; exit 1; } echo - echo "==> [5/7] codeql.yml — Python security analysis" - act push -W .github/workflows/codeql.yml "${common_flags[@]}" \ - --action-cache-path /tmp/act-cache-codeql \ - || { echo; echo "FAIL: codeql.yml"; exit 1; } + echo "==> [5/6] codeql.yml — SKIPPED (act/CodeQL incompatibility)" + echo " The CodeQL Action's post-analysis telemetry call to GitHub's" + echo " REST API 404s under \`act\` because the synthesized GITHUB_RUN_ID" + echo " doesn't exist on github.com, even when the analysis itself" + echo " succeeds with zero findings. CodeQL is validated by the real" + echo " GH Actions run on every PR; see the recipe header comment for" + echo " instructions on running CodeQL locally on demand." echo - echo "OK: all gating jobs passed locally. Safe to merge." + echo "OK: all locally-replayable gating jobs passed. Safe to merge." + echo " (CodeQL still gates merge on the actual PR via real GH Actions.)" # ---- OpenSpec ------------------------------------------------------------ diff --git a/openspec/changes/add-testing-helpers/.openspec.yaml b/openspec/changes/add-testing-helpers/.openspec.yaml new file mode 100644 index 0000000..4a1c677 --- /dev/null +++ b/openspec/changes/add-testing-helpers/.openspec.yaml @@ -0,0 +1,2 @@ +schema: spec-driven +created: 2026-05-22 diff --git a/openspec/changes/add-testing-helpers/design.md b/openspec/changes/add-testing-helpers/design.md new file mode 100644 index 0000000..5961192 --- /dev/null +++ b/openspec/changes/add-testing-helpers/design.md @@ -0,0 +1,428 @@ +# Design: Testing helpers + +## Context + +`cfn-handler`'s production dispatch path is well-defined and tested. The +public API surface is `CustomResource`, the lifecycle decorators +(`@create`, `@update`, `@delete`), and the polling decorators +(`@poll_create` etc.). When the user-decorated handler completes, the +library runs: + +``` + ┌────────────────────────────────────────────┐ + │ CustomResource.__call__(event, context) │ + └────────────────────────────────────────────┘ + │ + ▼ + ┌──────────────────────┐ + │ resolve handler by │ + │ RequestType │ + └──────────────────────┘ + │ + ▼ + ┌──────────────────────┐ + │ invoke user handler │ + └──────────────────────┘ + │ + ┌─────────────┴─────────────┐ + ▼ ▼ + ┌────────────────┐ ┌──────────────────┐ + │ poll handler │ │ no poll handler │ + │ registered │ │ registered │ + │ → setup_polling│ │ → build_response │ + │ (boto3) │ │ → send_response │ + │ return early │ │ (urllib PUT) │ + └────────────────┘ └──────────────────┘ +``` + +### Existing test surface — `test_mode` / `last_response` + +The library already ships a rough first-pass at testing helpers: + +- `CustomResource(test_mode=True)` — skips HTTP and skips polling + provisioning/teardown. +- `self.last_response` — captures the response payload that would have + been PUT to the CFN URL. +- For the polling-defer case, `last_response` gets a sentinel + `{"__cfn_handler_polling__": True, "Data": ...}`. + +This works for the basic SUCCESS/FAILED-payload assertions but has known +problems: + +1. **Mutable state on the resource.** Tests must reset + `last_response = None` between assertions or risk false positives. +2. **Sentinel string for polling.** `"__cfn_handler_polling__"` is a + private-by-convention key but lives on the public `last_response` + surface; it has no type, no documentation guarantees. +3. **Polling re-invocation can't be tested.** `test_mode` short-circuits + `setup_polling` entirely, so the polling marker keys + (`CfnHandlerPoll`, `CfnHandlerRule`, `CfnHandlerPermission`) are + never added to the event. A test cannot then re-invoke `__call__` + with the marked event to simulate the second dispatch (the + marker-detection path expects those keys). +4. **Two ways to do everything.** Without a clear superseding API, the + library accumulates testing surfaces. + +The `replay()` API and `cfn_handler.testing` module supersede these. + +### Test-mode migration strategy + +This change ships in v1.3.0 alongside the new `replay()` API. To avoid +breaking existing users: + +- `test_mode=True` keeps working in v1.3 with identical semantics. +- Constructing a `CustomResource(test_mode=True)` emits a + `DeprecationWarning` pointing at `replay()`. +- Setting / reading `last_response` does NOT warn directly (would warn + inside `_emit_response` on every test invocation; too noisy). The + deprecation is signalled through the constructor only. +- The internal test suite migrates to `replay()` as part of this PR + (~50 call-sites). This is mechanical and validates the new API. +- v2.0 (separate change, no timeline yet) removes `test_mode`, + `last_response`, and the `__cfn_handler_polling__` sentinel. + +To unit-test the user handler today, the user must: +1. Construct a synthetic event dict (no helper exists); +2. Construct a Lambda context object (no helper exists); +3. Patch `cfn_handler._internal.response.send_response` to intercept + the PUT (relies on internal-module knowledge); +4. Patch `cfn_handler._internal.poller.setup_polling` if any poll handler + is registered (relies on internal-module knowledge). + +This is testable, but only by knowing the internals — which `_internal/` +explicitly says is unstable. A user who reaches into `_internal/` +shouldn't expect their tests to survive a minor-version bump. + +The fix is to ship the test seam as a public surface. + +## Goals / Non-Goals + +**Goals:** + +- Let users unit-test custom-resource handlers without HTTP, without + boto3, and without reaching into `cfn_handler._internal/`. +- Provide canonical event/context factories so users don't reinvent + fixture data. +- Provide assertion helpers that read naturally + (`assert_success(replay, data={...})`). +- Auto-discover pytest fixtures via the `pytest11` entry point so users + who already use pytest get fixtures for free without an explicit + `pytest_plugins` declaration. +- Cover the polling case: `replay()` of an event that would defer to + polling produces a `Replay` with status `"DEFERRED"`. Calling + `replay()` again with the polling re-invocation event resumes the + flow. +- Keep zero runtime dependencies. pytest is **not** a runtime dep; the + fixture entry point is loaded only inside pytest collection. + +**Non-Goals:** + +- **Integration testing helpers.** Standing up moto, simulating EventBridge + rule firing, and verifying that boto3 was called with specific args is + out of scope. Users wanting that already have moto + the existing + public API. We're not building a second integration-test stack. +- **Mocking the user's downstream calls.** The user is responsible for + mocking their own SDK calls; that's not this library's concern. +- **A "fluent" or builder API for events.** `make_event(**overrides)` is + enough; nobody needs `EventBuilder().with_request_type("Create")...`. +- **Fixtures for non-pytest test runners.** unittest users can call the + factories directly — `cfn_create_event()` works in any context. + +## Decisions + +### D1. Public module: `cfn_handler.testing` + +The testing surface lives in `src/cfn_handler/testing/__init__.py`, +re-exported from `cfn_handler.testing` (not from `cfn_handler` root). +Top-level imports remain minimal: + +```python +import cfn_handler # production code only +import cfn_handler.testing # tests only +``` + +Rationale: +- Test code shouldn't accidentally land in production bundles. By + partitioning the surface, the `cfn_handler.testing` module can be + flagged in lint configs (e.g. `flake8-tidy-imports` ban-relative). +- Mirrors the layout used by other libraries (e.g. `httpx.testing`, + `aiohttp.test_utils`). +- The Lambda Layer build can optionally exclude + `cfn_handler/testing/` to keep the layer small (saves a few KB; not + decisive). + +**Rejected alternative**: re-exporting helpers from `cfn_handler` root. +Pollutes `__all__` with names that have no place in production. The +import discipline this enforces is worth a tiny extra keystroke. + +### D2. `Replay` is a frozen dataclass, not a tuple or a class with logic + +```python +@dataclass(frozen=True, slots=True) +class Replay: + status: Literal["SUCCESS", "FAILED", "DEFERRED"] + physical_resource_id: str | None + data: dict[str, Any] + reason: str + no_echo: bool + payload: dict[str, Any] # the rendered response payload, or {} if DEFERRED + request_type: Literal["Create", "Update", "Delete"] +``` + +Rationale: +- Immutable means tests can't accidentally mutate the result and + confuse subsequent assertions. +- `slots=True` is a small memory win; mostly there for hygiene. +- `payload` is the full rendered response (what would have hit the + CFN URL). Useful for tests that care about NoEcho or want to assert + on the wire format directly. +- `"DEFERRED"` is a sentinel that does NOT exist in real CFN + responses. Choosing a string means it survives equality comparison + cleanly without users importing an enum. + +**Rejected alternative**: an enum for `status`. `Literal` strings are +lighter, work with structural typing, and don't require an import +for users. + +**Rejected alternative**: returning `None` for `payload` when DEFERRED. +Nullable fields force `if replay.payload is None` branches in tests. +An empty dict is a sentinel value with no information loss. + +### D3. The dispatch seam: `transport` parameter + +`CustomResource.__call__` and the polling deferral path both end up +calling `_internal.response.send_response(url, payload)`. To make +replay possible, we add a `transport` parameter to the internal +dispatch flow: + +```python +# Internal type: +Transport = Callable[[str, dict[str, Any]], None] + +# Default: +def _http_transport(url: str, payload: dict[str, Any]) -> None: + send_response(url, payload) # urllib PUT, existing behaviour + +# CustomResource holds a transport, defaulting to _http_transport. +# replay() injects an "intercepting" transport that captures the +# (url, payload) tuple instead of sending it. +``` + +The same seam is used in the polling teardown path: when polling +completes, the final response goes through the same `transport`, so +replay-driven polling tests work identically. + +**Why not monkeypatching?** Monkeypatching requires users to know what +to patch (`cfn_handler._internal.response.send_response` — +which `_internal/` says is unstable). The seam makes the dependency +inversion explicit and stable. + +**Why a callable parameter rather than a `Protocol`?** Lower ceremony, +and the contract is genuinely a single function call. No need to +ship a `Transport(Protocol)` class users would have to implement. + +**Why default to `_http_transport` rather than letting users pass +`send_response` themselves?** Backwards compatibility. The existing +`__init__` signature must keep working with no kwargs. + +### D4. Polling without boto3 + +The polling setup/teardown path lazy-imports `boto3`. In replay mode +we never want to call boto3 at all, even with mocked clients — +that's the whole point. The intercepting transport paired with a +**stub poller** captures "would have called setup_polling with these +args" without actually importing boto3. + +The stub poller follows the same Transport-like seam pattern: + +```python +PollerProvision = Callable[[dict[str, Any], str, int, str | None], None] +PollerTeardown = Callable[[dict[str, Any], str, str | None], None] + +# Default: thin wrappers around _internal.poller functions. +# Replay: stubs that record the call and mutate `event` to add the +# polling marker keys (so the user can assert "would have polled"). +``` + +When `replay()` is called and a poll handler is registered, the +returned `Replay` has `status="DEFERRED"`. The stub mutates the event +to add the polling markers (`CfnHandlerPoll`, `CfnHandlerRule`, +`CfnHandlerPermission`), so a follow-up `replay(event_with_markers)` +correctly routes to the poll handler. + +**Rejected alternative**: requiring users to install boto3 + moto +to test polled handlers. Defeats the point of the helpers (and the +zero-runtime-deps posture). + +### D5. Event factory: `make_event(request_type, ...)` + +```python +def make_event( + request_type: Literal["Create", "Update", "Delete"] = "Create", + *, + stack_id: str = "arn:aws:cloudformation:us-east-1:111111111111:stack/test-stack/abc", + request_id: str = "00000000-0000-0000-0000-000000000000", + logical_resource_id: str = "TestResource", + physical_resource_id: str | None = None, # required for Update/Delete + resource_type: str = "Custom::Test", + resource_properties: dict[str, Any] | None = None, + old_resource_properties: dict[str, Any] | None = None, # Update only + response_url: str = "https://example.invalid/cfn-response", + service_token: str = "arn:aws:lambda:us-east-1:111111111111:function:test", +) -> dict[str, Any]: ... +``` + +Rationale for sensible defaults: +- ARNs are syntactically valid (will parse) but use the reserved + `111111111111` example account ID and `example.invalid` host so + no production system is accidentally hit if a test misroutes. +- `physical_resource_id` defaults to `None` because Create events + don't carry one; for Update/Delete the function raises + `ValueError` if not supplied. (Better to fail loudly than to + silently use a Create-shaped event for an Update test.) + +**Rejected alternative**: separate factories per request type +(`make_create_event`, `make_update_event`, `make_delete_event`). +Three names where one suffices, and the validation logic for +"physical_resource_id required for Update/Delete" is the same in +all three. + +**Update consideration**: actually, three named factories MAY +be clearer. Reconsider in implementation; either works. The pytest +fixtures (next decision) will be three named fixtures regardless. + +### D6. pytest fixtures via `pytest11` entry point + +```toml +[project.entry-points.pytest11] +cfn_handler = "cfn_handler.testing.fixtures" +``` + +The fixtures module exposes: + +```python +@pytest.fixture +def cfn_create_event() -> dict[str, Any]: ... + +@pytest.fixture +def cfn_update_event() -> dict[str, Any]: ... + +@pytest.fixture +def cfn_delete_event() -> dict[str, Any]: ... + +@pytest.fixture +def cfn_lambda_context() -> LambdaContext: ... +``` + +Each fixture returns a fresh dict/object on every call (no shared +state). Users can override fields per-test via standard pytest +patterns: + +```python +def test_my_handler(cfn_create_event): + cfn_create_event["ResourceProperties"] = {"Foo": "bar"} + resource = CustomResource() + @resource.create + def on_create(event, ctx): return {"Endpoint": "x"} + + replay = resource.replay(cfn_create_event) + assert_success(replay, data={"Endpoint": "x"}) +``` + +**Rejected alternative**: requiring `pytest_plugins = ["cfn_handler.testing"]` +in users' `conftest.py`. The entry point is the modern, automatic way. + +**Risk to flag**: pytest fixtures have global names. If a user already +has a `cfn_create_event` fixture for some reason, ours collides. We +prefix with `cfn_` to minimise collision; users who hit it can +override with their own (pytest's local fixtures win over plugin ones). + +### D7. Assertion helpers + +```python +def assert_success( + replay: Replay, + *, + data: dict[str, Any] | None = None, + physical_resource_id: str | None = None, + no_echo: bool | None = None, +) -> None: ... + +def assert_failed( + replay: Replay, + *, + reason_contains: str | None = None, + physical_resource_id: str | None = None, +) -> None: ... + +def assert_deferred( + replay: Replay, + *, + rule_arn_present: bool = True, +) -> None: ... +``` + +Each performs `AssertionError`-raising checks suitable for use inside +pytest tests. Optional kwargs are matched only when supplied (so +`assert_success(replay)` is "any success", and +`assert_success(replay, data={...})` is "success AND data exactly +matches"). + +**Rationale**: pytest's `assert` plus `replay.status == "SUCCESS"` +also works. The helpers exist for ergonomics: a single line that +captures intent, with informative `AssertionError` messages. + +**Rejected alternative**: a hamcrest-style `is_success()` matcher. +Adds a dep, not idiomatic in pytest. + +### D8. Documentation surface + +The README gains a "Testing" section showing one minimal example. +A new top-level page (`docs/TESTING.md` or similar) is candidate +but not required for v1.3.0; the docstrings on `replay`, +`make_event`, and the assertion helpers should carry their weight. + +A new `examples/testing/` directory is **not** part of this change +(would expand scope). Add later if user feedback warrants. + +## Risks / Trade-offs + +| Risk | Mitigation | +|------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------| +| Internal seam complicates the dispatch code | Default the `transport` param everywhere; existing call sites stay unchanged. Production code paths see no change. | +| `pytest11` entry point loads `cfn_handler.testing.fixtures` in any pytest run, even ones not using cfn-handler | Fixtures only consume memory if requested; the import itself is cheap (no sys.modules pollution beyond `cfn_handler.testing`). | +| Fixture name collision (`cfn_create_event` etc.) with users who happen to have such fixtures | Use `cfn_` prefix; document override pattern. Local fixtures win in pytest's resolution order. | +| Replay's behaviour drifts from production behaviour (false-positive tests) | Dogfood: rewrite the existing test suite (or a representative subset) on top of the new helpers. If a test that passes via replay would have failed in production, the seam is wrong. | +| Polling stub fails to faithfully reproduce real polling behaviour | Cover both "stub records call" and "stub mutates event" in the spec scenarios. Run an end-to-end test (existing moto-based) alongside to verify parity. | +| `Replay.status="DEFERRED"` accidentally leaks into production code (somebody catches it as if real) | Document explicitly: DEFERRED is a sentinel for replay only; production status is always `"SUCCESS"` or `"FAILED"`. Lint rule (later, in `cfn-lint-cfn-handler`) could catch this. | + +## Migration Plan + +This is purely additive — no migration path for users. The change ships +as `1.3.0` (minor bump). Users who don't import `cfn_handler.testing` +see no difference. + +For the project itself: the existing test suite stays as it is. Once the +helpers ship and prove themselves on a few new tests, a separate +follow-up PR can opportunistically rewrite older tests onto the +helpers. That's not part of this change. + +## Open Questions + +1. **Single `make_event(request_type=...)` vs three named factories + (`make_create_event`, etc.)?** Both work. Decide during + implementation based on which reads better in fixture code. (D5 + leans toward single factory, but happy to be overruled by the + tests as they get written.) +2. **Should `assert_success(replay, data={...})` perform exact match + or subset match?** Default exact (matches `dict == dict`); add a + `data_subset=` kwarg if subset is needed. Decide based on + first uses. +3. **Should `make_event` validate the `RequestType` it produces against + a known set?** `Literal["Create", "Update", "Delete"]` already gives + us static checks; runtime validation is belt-and-braces. Leaning yes + for clear errors, no for simplicity. Decide during implementation. +4. **Should the testing module be importable when only stdlib is + available (no boto3, no pytest)?** Yes — but only if the user + sticks to factories and assertions. Importing the fixtures + module without pytest installed should fail gracefully (lazy import). diff --git a/openspec/changes/add-testing-helpers/proposal.md b/openspec/changes/add-testing-helpers/proposal.md new file mode 100644 index 0000000..ad7e4fe --- /dev/null +++ b/openspec/changes/add-testing-helpers/proposal.md @@ -0,0 +1,112 @@ +# Proposal: Add testing helpers + +## Why + +Users writing custom-resource handlers with `cfn-handler` currently have +no first-class way to unit-test their handler logic. The library +instantiates `CustomResource`, attaches decorated lifecycle handlers, +and invokes the handler from a Lambda entry-point — but every layer +below that (response serialisation, HTTP `PUT` to the CFN response URL, +polling re-invocation) is wired into the dispatch path. To test a +handler in isolation, users today must monkey-patch `urllib.request`, +mock `boto3` clients, or construct a synthetic event and ignore the +side-effects. + +This is friction the library should absorb. Custom-resource handlers +are exactly the kind of code where unit tests pay dividends (rare +production invocations, painful to debug after the fact, easy to write +in isolation). Shipping testing helpers makes the TDD loop cheap and +matches the "testable architecture is better architecture" stance the +project already adopts internally. + +It's also a differentiator: `crhelper` (the upstream library this project +clean-roomed) ships nothing of the sort. + +## What Changes + +- **Add `CustomResource.replay(event, context=None)` method** that runs + the dispatch logic in-process, intercepts the response payload before + it would be sent to CloudFormation, and returns a structured `Replay` + result. No HTTP, no boto3 lazy-import, no polling re-invocation. +- **Add a `cfn_handler.testing` module** exposing: + - `Replay` (dataclass): captures `status`, `physical_resource_id`, + `data`, `reason`, `no_echo`, plus the rendered response payload. + - `make_event(...)`: factory returning a canonical + CloudFormation custom-resource event dict, with sensible defaults + that match the CFN documented event shape. Supports overriding + any field. + - `make_context(...)`: factory returning a minimal object that + satisfies the `LambdaContext` protocol (`aws_request_id`, + `function_name`, `invoked_function_arn`, `get_remaining_time_in_millis`, + `log_group_name`, `log_stream_name`). + - `assert_success(replay, *, data=None, physical_resource_id=None)` + and `assert_failed(replay, *, reason_contains=None)` helpers. +- **Expose `cfn_handler.testing` in `__all__`** at package root via + re-export of the public names from the new module. The test + surface is part of the public API contract. +- **Add pytest fixtures** in `cfn_handler.testing.fixtures` that pytest + auto-discovers via the `pytest11` entry point: `cfn_create_event`, + `cfn_update_event`, `cfn_delete_event`, `cfn_lambda_context`. Users + who don't use pytest are unaffected (entry point only loads in pytest + collection). +- Polling-aware behaviour: when a poll handler is registered AND the + initial dispatch would normally defer the response, `replay()` returns + a `Replay` whose status is the sentinel `"DEFERRED"` (not a CFN value) + to make it explicit. Subsequent `replay()` calls with the polling + re-invocation event resume the flow. +- **Soft-deprecate the existing `test_mode` flag and `last_response` + attribute** on `CustomResource`. Both keep working in v1.3 (no + breaking change), but emit a `DeprecationWarning` directing users to + the new `replay()` API. Earmarked for removal in v2.0 alongside + other planned breaking changes. +- Migrate the existing internal test suite (~50 sites using + `test_mode=True` and `last_response`) onto the new `replay()` API + as part of this change. Dogfooding validates the new helpers under + their real intended usage. + +## Capabilities + +### New Capabilities +- `testing-helpers`: in-process replay of the dispatch flow plus + fixture/factory helpers that let users unit-test handlers without + HTTP transport, AWS API calls, or moto. + +### Modified Capabilities + +None. The testing-helpers capability stands alone: it describes the +external behaviour of `replay()` and the testing surface. The internal +seam in the dispatch path that makes replay possible is an +implementation detail, captured in `design.md`, not in the existing +capability specs. Production behaviour of `lifecycle-handler` and +`polling` is unchanged: real Lambda invocations still send via HTTP +PUT, polling still provisions EventBridge rules. + +## Impact + +- **New module**: `src/cfn_handler/testing/` (importable as + `cfn_handler.testing`). Privacy convention follows the rest of the + package: testing surfaces are public; their internal helpers live in + `cfn_handler/testing/_internal/`. +- **Package metadata**: a `pytest11` entry point is added to + `pyproject.toml` for fixture auto-discovery. This does not add a + runtime dependency on pytest; the entry point only fires inside + pytest collection. +- **Internals refactor**: the dispatch path in `resource.py` and the + HTTP send in `_internal/response.py` will need a thin seam (likely a + `transport` callable parameter, default = the existing + `send_response`) to support interception without monkeypatching. This + is a small refactor, kept private. +- **Coverage gate**: testing helpers themselves are subject to the same + 95% line+branch threshold as the rest of the library. The dogfooding + loop is: rewrite the existing test suite to use the new helpers + where they fit, validate the helpers under their real intended + usage. +- **Docs**: `README.md` gains a "Testing" section. A new examples + directory entry (`examples/testing/`) is candidate but not required + for this change. +- **Zero new runtime dependencies.** +- **No breaking changes in v1.3**: existing `test_mode=True` / + `last_response` continue to work, but emit a `DeprecationWarning`. + The new `replay()` API is purely additive. The `test_mode` / + `last_response` removal is scheduled for v2.0 (separate change). +- Targets release `1.3.0` (minor bump). diff --git a/openspec/changes/add-testing-helpers/specs/testing-helpers/spec.md b/openspec/changes/add-testing-helpers/specs/testing-helpers/spec.md new file mode 100644 index 0000000..fe23316 --- /dev/null +++ b/openspec/changes/add-testing-helpers/specs/testing-helpers/spec.md @@ -0,0 +1,183 @@ +# testing-helpers Specification (delta) + +## ADDED Requirements + +### Requirement: Public testing module is importable + +The library SHALL expose a `cfn_handler.testing` module importable in any Python environment where `cfn_handler` itself imports cleanly, without requiring `pytest`, `boto3`, or any other optional dependency. + +#### Scenario: Module imports without pytest installed +- **WHEN** a user runs `import cfn_handler.testing` in an environment + where pytest is not installed +- **THEN** the import succeeds and the public names (`Replay`, + `make_event`, `assert_success`, `assert_failed`, `assert_deferred`) + are available + +#### Scenario: Module imports without boto3 installed +- **WHEN** a user runs `import cfn_handler.testing` in an environment + where boto3 is not installed +- **THEN** the import succeeds and `Replay` / `make_event` / assertion + helpers are available + +### Requirement: In-process replay of the dispatch flow + +`CustomResource` SHALL expose a `replay(event, context=None)` method that executes the full dispatch pipeline in-process and returns a `Replay` object capturing the outcome, without issuing HTTP requests, importing `boto3`, or mutating the registered handler functions. + +#### Scenario: Successful create handler is replayed +- **WHEN** a `CustomResource` has a CREATE handler registered that + returns `{"Endpoint": "https://x"}`, and `replay(create_event)` is + invoked +- **THEN** the returned `Replay` has `status="SUCCESS"`, + `data={"Endpoint": "https://x"}`, and `payload` is the rendered + CFN response payload that would have been PUT to the response URL + +#### Scenario: Handler raises during replay +- **WHEN** a CREATE handler raises `RuntimeError("boom")` during + replay +- **THEN** the returned `Replay` has `status="FAILED"` and `reason` + contains `"boom"` + +#### Scenario: Replay does not perform HTTP I/O +- **WHEN** `replay()` is invoked with a valid event whose `ResponseURL` + is `https://example.invalid/cfn-response` +- **THEN** no HTTP request is made to any URL during the call + +#### Scenario: Replay does not import boto3 +- **WHEN** `replay()` is invoked in an environment without boto3 + installed AND no poll handler is registered +- **THEN** the call completes successfully without raising + `PollingDependencyError` or `ImportError` + +### Requirement: Replay produces a structured result + +The `Replay` type SHALL be a frozen, immutable dataclass with the fields `status` (literal `"SUCCESS" | "FAILED" | "DEFERRED"`), `physical_resource_id` (`str | None`), `data` (`dict[str, Any]`), `reason` (`str`), `no_echo` (`bool`), `payload` (`dict[str, Any]`), and `request_type` (literal `"Create" | "Update" | "Delete"`). + +#### Scenario: Replay result is immutable +- **WHEN** a user attempts to mutate `replay.status = "FAILED"` after + a SUCCESS replay +- **THEN** `dataclasses.FrozenInstanceError` is raised + +#### Scenario: Replay payload matches what would be sent +- **WHEN** `replay()` returns a `Replay` with `status="SUCCESS"` and + `data={"Endpoint": "x"}` +- **THEN** `replay.payload["Status"] == "SUCCESS"`, + `replay.payload["Data"] == {"Endpoint": "x"}`, and the payload + conforms to the CFN custom-resource response schema + +### Requirement: Replay supports the polling-deferral case + +`replay()` SHALL handle the polling-deferral path without invoking any AWS API or importing `boto3`: when a matching poll handler is registered, it MUST return a `Replay` with `status="DEFERRED"` and an empty `payload` dict, and MUST mutate the input event to add the polling marker keys (`CfnHandlerPoll`, `CfnHandlerRule`, `CfnHandlerPermission`) so a subsequent `replay()` call resumes into the poll handler path. + +#### Scenario: Create with poller defers +- **WHEN** a `CustomResource` has both `@create` and `@poll_create` + handlers registered, and `replay(create_event)` is invoked +- **THEN** the returned `Replay` has `status="DEFERRED"`, no AWS API + call is made, and the input event has been mutated to include + `event["CfnHandlerPoll"] is True` + +#### Scenario: Poll re-invocation completes the flow +- **WHEN** a deferred event is replayed a second time, and the + registered poll handler returns response data +- **THEN** the returned `Replay` has `status="SUCCESS"` and the + data the poll handler provided + +### Requirement: Event factory produces canonical CFN events + +The library SHALL expose a `make_event` callable in `cfn_handler.testing` that returns a dict matching the documented CloudFormation custom-resource event shape, with keyword overrides for every documented field, and MUST require a non-`None` `physical_resource_id` argument when `RequestType` is `"Update"` or `"Delete"` (raising `ValueError` if not supplied). + +#### Scenario: Default Create event is well-formed +- **WHEN** `make_event()` is called with no arguments +- **THEN** the returned dict has `RequestType="Create"`, + syntactically valid `StackId`, `RequestId`, `LogicalResourceId`, + `ResourceType`, `ResourceProperties`, `ResponseURL`, `ServiceToken` + fields, and no `PhysicalResourceId` + +#### Scenario: Update event requires PhysicalResourceId +- **WHEN** `make_event(request_type="Update")` is called without + passing `physical_resource_id` +- **THEN** `ValueError` is raised with a message identifying the + missing argument + +#### Scenario: Field overrides are applied +- **WHEN** `make_event(resource_properties={"Foo": "bar"})` is + called +- **THEN** the returned dict has `ResourceProperties == {"Foo": "bar"}` + +#### Scenario: Defaults use safe placeholder values +- **WHEN** `make_event()` is called with no overrides +- **THEN** the `ResponseURL` host is `example.invalid` (RFC 6761 + reserved name guaranteed not to resolve) and the account ID portion + of `StackId` is `111111111111` (AWS-reserved example account) + +### Requirement: Lambda context factory satisfies the protocol + +The library SHALL expose a `make_context` callable in `cfn_handler.testing` that returns an object satisfying the existing `LambdaContext` protocol used by `CustomResource.__call__`, exposing `aws_request_id`, `function_name`, `invoked_function_arn`, `log_group_name`, `log_stream_name`, and `get_remaining_time_in_millis()`. + +#### Scenario: Context satisfies the protocol +- **WHEN** `ctx = make_context()` is called and used in + `resource.replay(event, ctx)` +- **THEN** the call succeeds and `ctx.get_remaining_time_in_millis()` + returns a positive integer + +#### Scenario: Remaining-time override is honoured +- **WHEN** `make_context(remaining_time_ms=5000)` is called +- **THEN** `ctx.get_remaining_time_in_millis()` returns `5000` + +### Requirement: Assertion helpers raise informative AssertionError + +The library SHALL expose `assert_success`, `assert_failed`, and `assert_deferred` helpers in `cfn_handler.testing`, each of which MUST raise `AssertionError` with a message identifying both the expected and actual values when the assertion fails. + +#### Scenario: assert_success on a SUCCESS replay passes +- **WHEN** `assert_success(replay, data={"x": 1})` is called and + `replay.status == "SUCCESS"` and `replay.data == {"x": 1}` +- **THEN** the call returns `None` (no exception) + +#### Scenario: assert_success on a FAILED replay raises +- **WHEN** `assert_success(replay)` is called and + `replay.status == "FAILED"` with `reason="boom"` +- **THEN** `AssertionError` is raised and the message contains both + `"FAILED"` and `"boom"` + +#### Scenario: assert_failed with reason_contains matches a substring +- **WHEN** `assert_failed(replay, reason_contains="boom")` is called + and `replay.status == "FAILED"` with `reason="something boom happened"` +- **THEN** the call returns `None` + +#### Scenario: assert_deferred on a SUCCESS replay raises +- **WHEN** `assert_deferred(replay)` is called and + `replay.status == "SUCCESS"` +- **THEN** `AssertionError` is raised + +### Requirement: pytest fixtures auto-register via entry point + +The library's `pyproject.toml` SHALL declare a `pytest11` entry point named `cfn_handler` pointing at the fixtures module so that the fixtures `cfn_create_event`, `cfn_update_event`, `cfn_delete_event`, and `cfn_lambda_context` are available without any user-side `pytest_plugins` declaration. + +#### Scenario: Fixture is auto-discovered +- **WHEN** a user with `cfn_handler` installed writes a test + `def test_x(cfn_create_event): ...` in a fresh pytest project + with no `conftest.py` configuration +- **THEN** pytest resolves the fixture without error and passes a + Create-shaped event dict + +#### Scenario: Each invocation gets a fresh event +- **WHEN** two tests both consume `cfn_create_event` and one mutates + the event dict +- **THEN** the second test sees the unmutated default event (no + cross-test leak) + +### Requirement: Replay never sends a real CFN response + +`CustomResource.replay` SHALL NOT, under any code path, send an HTTP request to the event's `ResponseURL` or any other URL, and the production HTTP transport MUST be replaced by an in-memory capture for the duration of the replay call and restored when the call returns or raises. + +#### Scenario: Replay catches a handler exception without sending HTTP +- **WHEN** `replay()` is invoked with a handler that raises during + execution +- **THEN** the returned `Replay` has `status="FAILED"` AND no HTTP + request was issued (verified via mock or instrumentation) + +#### Scenario: Replay restores transport after exception +- **WHEN** `replay()` raises an unexpected internal exception (not a + handler exception) and the same `CustomResource` instance is then + invoked normally via `__call__` (with the production HTTP transport) +- **THEN** the production invocation correctly issues an HTTP PUT to + the event's `ResponseURL` diff --git a/openspec/changes/add-testing-helpers/tasks.md b/openspec/changes/add-testing-helpers/tasks.md new file mode 100644 index 0000000..b610833 --- /dev/null +++ b/openspec/changes/add-testing-helpers/tasks.md @@ -0,0 +1,278 @@ +# Tasks: Add testing helpers + +> Apply order: tasks within a phase MAY be parallelised when no +> dependency exists. Phases are sequential. Each task ends with a +> verification command in backticks. +> +> TDD discipline: in phases 2–6, write tests **before** implementation. +> A green run after writing the test means the test is wrong (it should +> fail until implementation lands). + +## 1. Scaffolding + +- [x] 1.1 Create directory `src/cfn_handler/testing/` with `__init__.py`, + `_internal/__init__.py`, and a `py.typed` marker copied from + the package root. Verify: `ls src/cfn_handler/testing/` +- [x] 1.2 Add an empty `cfn_handler/testing/fixtures.py` placeholder + so the `pytest11` entry-point lookup later doesn't fail at + collection time. Verify: `python -c "import cfn_handler.testing.fixtures"` +- [x] 1.3 Update `pyproject.toml`: + - Add `[project.entry-points.pytest11]` table with + `cfn_handler = "cfn_handler.testing.fixtures"`. + - Verify the entry point appears in the built wheel: + `uv build && unzip -p dist/*.whl '*entry_points.txt' | grep pytest11` +- [x] 1.4 Update `[tool.hatch.build.targets.wheel].packages` and + `[tool.hatch.build.targets.sdist].include` to include the new + `src/cfn_handler/testing` subtree (verify: a built wheel contains + `cfn_handler/testing/__init__.py`). +- [x] 1.5 Run baseline checks before any logic lands: `just lint + typecheck test-cov`. Repo MUST be green. + +## 2. Internal seam — Transport callable + +- [x] 2.1 Write a failing test that monkeypatches an in-memory + transport into `CustomResource` and asserts the test transport is + called instead of `_internal.response.send_response`. Place under + `tests/unit/test_transport_seam.py`. Verify: `uv run pytest + tests/unit/test_transport_seam.py -x` fails. +- [x] 2.2 Define the `Transport` callable type in + `src/cfn_handler/_internal/response.py` (or a new file + `src/cfn_handler/_internal/transport.py` if it grows beyond a + single alias). Type: + `Transport = Callable[[str, dict[str, Any]], None]`. +- [x] 2.3 Add `transport: Transport | None = None` parameter to + `CustomResource.__init__`. Default to a thin wrapper that calls + `send_response` (existing behaviour preserved). Replace direct + `send_response` call sites in `resource.py` with calls through + `self._transport`. Verify: 2.1 test now passes; full suite still + green: `uv run pytest`. +- [x] 2.4 Type-check passes with the new generic: `uv run mypy + src/cfn_handler && uv run pyright src/cfn_handler`. + +## 3. Internal seam — Poller stubs + +- [x] 3.1 Write a failing test that asserts: when a poll handler is + registered AND a stub poller is injected, the lifecycle dispatch + calls the stub, never imports boto3, and mutates the event with + polling marker keys. Place under + `tests/unit/test_poller_seam.py`. +- [x] 3.2 Define `PollerProvision` and `PollerTeardown` callable types + in `src/cfn_handler/_internal/poller.py`. Add equivalent + injection seam to `CustomResource.__init__` + (`provision_poller`, `teardown_poller` defaulting to the existing + module-level functions). +- [x] 3.3 Wire `setup_polling` / `teardown_polling` calls in + `resource.py` to go through the seam. Verify 3.1 test passes; + existing polling test suite still green: `uv run pytest + tests/`. + +## 4. Public testing surface — `Replay` + `replay()` + +- [x] 4.1 Write spec scenarios as failing tests in + `tests/unit/testing/test_replay.py` covering: + - Successful create handler → `Replay(status="SUCCESS", ...)` + - Handler raises → `Replay(status="FAILED", reason=...)` + - No HTTP I/O performed (assert via instrumented transport) + - boto3 not imported (assert `"boto3" not in sys.modules` after + a replay that does NOT use polling, in a subprocess to avoid + prior pollution) + - `Replay` is frozen (FrozenInstanceError on mutation) +- [x] 4.2 Implement `Replay` dataclass in + `src/cfn_handler/testing/_internal/replay_result.py` per the + design (frozen, slots, all fields). Re-export from + `cfn_handler.testing`. +- [x] 4.3 Implement `CustomResource.replay(event, context=None)`: + builds a capturing transport + stub pollers, swaps them in via + `dataclasses.replace`-style or constructor-args (decide during + implementation), runs the dispatch, captures the rendered + payload, returns the `Replay`. Restore production transport on + both normal and exceptional return. +- [x] 4.4 Verify: `uv run pytest tests/unit/testing/`. Verify + coverage of `replay()` is at 100% line+branch (replay is + a small surface; nothing should be uncovered). + +## 5. Public testing surface — Polling-aware replay + +- [x] 5.1 Write failing test: `replay()` of a Create event with both + `@create` and `@poll_create` registered returns + `Replay(status="DEFERRED")` and mutates the event to include + `CfnHandlerPoll=True`, `CfnHandlerRule=...`, + `CfnHandlerPermission=...`. +- [x] 5.2 Write failing test: a second `replay()` call with the + mutated event resumes through the poll handler. If the poll + handler returns data, the result is + `Replay(status="SUCCESS", data=...)`. +- [x] 5.3 Implement the stub `provision_poller`: mutates the event + to add the same marker keys real polling adds, records the call + args (the existing real `setup_polling` mutates and returns + `None`; the stub follows the same shape). Implement stub + `teardown_poller` as a no-op recorder. +- [x] 5.4 Wire stubs into `replay()`. Verify: 5.1 + 5.2 pass. +- [x] 5.5 Add a parity test: same handler, run once via + `CustomResource.__call__` against a moto-mocked AWS environment + (existing pattern in `tests/integration/`), once via `replay()`. + Assert that the rendered payload is byte-equal in both cases + (excluding ARNs/IDs that vary per run). + +## 6. Public testing surface — Factories + assertions + +- [x] 6.1 Write failing tests for `make_event`: + - Default Create event has expected shape + - Update/Delete missing physical_resource_id raises `ValueError` + - Field overrides applied + - Defaults use safe placeholders (`example.invalid`, + `111111111111`) +- [x] 6.2 Implement `make_event` in `src/cfn_handler/testing/ + _internal/event_factory.py`. Re-export from + `cfn_handler.testing`. +- [x] 6.3 Write failing tests for `make_context`: + - Returns object satisfying `LambdaContext` protocol + - `remaining_time_ms` override honoured +- [x] 6.4 Implement `make_context`. +- [x] 6.5 Write failing tests for `assert_success`, `assert_failed`, + `assert_deferred` covering both pass and fail paths and the + message contents on failure. +- [x] 6.6 Implement assertion helpers in + `src/cfn_handler/testing/_internal/assertions.py`. Re-export. + +## 7. pytest fixtures + +- [x] 7.1 Write a failing test in a *fresh* pytest project layout + (under `tests/integration/test_fixture_discovery/`) — a single + `conftest.py`-free directory with a test that depends on + `cfn_create_event`. Run via `uv run pytest --rootdir=...` + pointed at the temp directory. Verify the fixture is found. +- [x] 7.2 Implement the fixtures in + `src/cfn_handler/testing/fixtures.py`: + `cfn_create_event`, `cfn_update_event`, `cfn_delete_event`, + `cfn_lambda_context`. Each is a pytest fixture returning + a fresh value built via `make_event` / `make_context`. +- [x] 7.3 Verify each fixture invocation is independent (no shared + state across tests). +- [x] 7.4 Confirm fixture loading does NOT add overhead in pytest + runs that don't use them: `uv run pytest --collect-only -q | wc + -l` baseline matches pre-change collection count. + +## 8. Public surface wiring + +- [x] 8.1 Update `src/cfn_handler/testing/__init__.py` to export + exactly the public names: `Replay`, `make_event`, + `make_context`, `assert_success`, `assert_failed`, + `assert_deferred`. Define `__all__` accordingly. +- [x] 8.2 Verify NOTHING from `_internal/` is exposed via + `cfn_handler.testing`: `python -c "import cfn_handler.testing as + t; print(sorted(n for n in dir(t) if not n.startswith('_')))"` + output matches the `__all__`. +- [x] 8.3 Confirm the package root `cfn_handler.__all__` is + **unchanged** (no testing names leak into the production root). + +## 9. Soft-deprecate `test_mode` / `last_response` + +- [x] 9.1 Write a failing test asserting that + `CustomResource(test_mode=True)` emits a `DeprecationWarning` + whose message references `replay()` and points at + `cfn_handler.testing`. Place under + `tests/unit/test_test_mode_deprecation.py`. Verify: test fails. +- [x] 9.2 Add `warnings.warn(..., DeprecationWarning, stacklevel=2)` + in `CustomResource.__init__` when `test_mode=True`. Verify 9.1 + passes. +- [x] 9.3 Update the docstring on `test_mode` and `last_response` + to mark them as deprecated and reference `replay()`. +- [x] 9.4 Update `pytest.ini_options.filterwarnings` to NOT promote + this specific deprecation to an error in our own test suite + until the migration in §10 is complete (then revert in §10.5). + +## 10. Migrate existing test suite to `replay()` + +- [x] 10.1 Sweep: enumerate every test file referencing `test_mode=` + or `last_response`. Expected sites (from grep): `tests/unit/ + test_resource.py`, `test_backstops.py`, `test_polling_dispatch.py`, + `test_state_machine.py`. Verify count: `grep -rn 'test_mode\| + last_response' tests/ | wc -l` matches expectation before any edits. +- [x] 10.2 Migrate `tests/unit/test_resource.py` from `test_mode=True` + + `last_response` reads to `replay()` + `Replay` field reads. + Use the new assertion helpers (`assert_success`, + `assert_failed`) where they match the existing assertion shape. + Verify: file's tests still pass after migration. +- [x] 10.3 Migrate `tests/unit/test_backstops.py`. Verify: tests pass. +- [x] 10.4 Migrate `tests/unit/test_polling_dispatch.py`. The polling + sentinel test (`test_create_with_poll_handler_in_test_mode_records_sentinel`) + becomes a `Replay(status="DEFERRED")` assertion via + `assert_deferred`. Verify: tests pass. +- [x] 10.5 Migrate `tests/unit/test_state_machine.py`. Verify: tests + pass. After all migrations: `grep -rn 'test_mode\|last_response' + tests/` should return ZERO matches. +- [x] 10.6 Revert the `pytest.ini_options.filterwarnings` exception + from 9.4. Tests now must NOT emit the deprecation warning; + if any do, the suite fails (forcing the migration to be + complete). + +## 11. Documentation + +- [x] 11.1 Add a "Testing" section to `README.md` between "Examples" + and "Project status". One minimal example showing + `make_event` + `replay` + `assert_success`. +- [x] 11.2 Docstrings on every public name in `cfn_handler.testing` + following Google-style convention (matches existing project + docstrings). Verify: `uv run ruff check src/cfn_handler/testing` + catches any missing docstrings (the `D` ruleset is on for + `src/`). +- [x] 11.3 Add an entry to `docs/ROADMAP.md` moving testing helpers + from "Active priorities" to a "Shipped in 1.3.0" reference (or + delete it entirely; the spec is the durable record). +- [x] 11.4 ~~Add a `CHANGELOG.md` deprecation entry~~ — N/A. + `CHANGELOG.md` is auto-managed by release-please from + Conventional Commits. The deprecation context goes in the + squash-merge commit body (see §12.4). + +## 12. Coverage + final checks + +- [x] 12.1 Run `just ci-check` (lint + typecheck + test-cov). Coverage + MUST be ≥95% line+branch including the new module. Failures here + block release. **Result: 98% coverage, 152 tests passing, lint + and types clean.** Note: switched `test-cov` to use `coverage + run -m pytest` instead of `pytest --cov` because the new + `pytest11` entry point causes `cfn_handler` to be imported + during pytest plugin collection (before `--cov` instrumentation + attaches), making module-level lines look unhit. Documented in + the recipe comment. +- [ ] 12.2 Run `just gha-pre-release` to replay every CI gating + workflow locally. All green required before merge. **Defer to + pre-merge step.** +- [x] 12.3 Verify the built wheel includes `cfn_handler/testing/`: + `uv build && unzip -l dist/*.whl | grep testing`. Verified — + 9 files including `_internal/` modules and `py.typed`. +- [ ] 12.4 Verify the conventional-commit message for the squash-merge + starts with `feat(testing):` so release-please bumps minor + (target: `1.3.0`). **Squash-merge commit message guidance:** + ``` + feat(testing): add cfn_handler.testing module with replay() helpers + + Adds the new `cfn_handler.testing` public surface: + - `CustomResource.replay(event, context=None)` — in-process dispatch + returning a structured `Replay` (no HTTP, no boto3). + - `Replay` frozen dataclass. + - `make_event` / `make_context` factories with safe defaults. + - `assert_success` / `assert_failed` / `assert_deferred` helpers. + - pytest fixtures (`cfn_create_event`, `cfn_update_event`, + `cfn_delete_event`, `cfn_lambda_context`) auto-discovered via + the `pytest11` entry point. + + DEPRECATED: `CustomResource(test_mode=True)` and `last_response` + now emit a DeprecationWarning. They continue to work in v1.x; + removal scheduled for v2.0. + ``` + **Defer to merge step.** + +## 13. Validation + +- [x] 13.1 Validate the change strictly: `openspec validate + add-testing-helpers --strict`. All artifacts must pass. + **Result: "Change 'add-testing-helpers' is valid".** +- [x] 13.2 Manual smoke test: in a fresh venv outside the repo, `uv + add cfn_handler` from the local wheel and verify + `import cfn_handler.testing` works and `make_event()` produces + a Create event. **Result: smoke test passed — + `replay(make_event(), make_context())` returns + `Replay(status="SUCCESS", data={"Endpoint": "https://smoke.example"})` + and `assert_success` passes.** diff --git a/pyproject.toml b/pyproject.toml index 318369a..e8e6424 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,6 +43,14 @@ Repository = "https://github.com/igorlg/cfn-handler" Issues = "https://github.com/igorlg/cfn-handler/issues" Changelog = "https://github.com/igorlg/cfn-handler/blob/main/CHANGELOG.md" +# Auto-discovered pytest plugin: when `cfn_handler` is installed in any +# Python environment that runs pytest, pytest's plugin manager picks up +# the fixtures module via this entry point — no `pytest_plugins` declaration +# required in user conftest. The fixtures module is a thin namespace that +# only does work when its fixtures are actually requested. +[project.entry-points.pytest11] +cfn_handler = "cfn_handler.testing.fixtures" + [dependency-groups] test = [ "pytest>=8", diff --git a/src/cfn_handler/_internal/poller.py b/src/cfn_handler/_internal/poller.py index 4de5843..edab4d4 100644 --- a/src/cfn_handler/_internal/poller.py +++ b/src/cfn_handler/_internal/poller.py @@ -16,6 +16,7 @@ import json import secrets import string +from collections.abc import Callable from typing import TYPE_CHECKING, Any from cfn_handler._internal.log import logger @@ -31,6 +32,15 @@ EVENT_MARKER_PERMISSION = "CfnHandlerPermission" EVENT_MARKER_DATA = "CfnHandlerData" +#: Pluggable callable for provisioning polling. The default is +#: :func:`setup_polling`; testing helpers swap in a stub that mutates the +#: event with marker keys (matching real provisioning) without importing +#: boto3 or hitting AWS. +PollerProvision = Callable[[dict[str, Any], str, int, "str | None"], None] + +#: Pluggable callable for tearing down polling, mirroring :data:`PollerProvision`. +PollerTeardown = Callable[[dict[str, Any], str, "str | None"], None] + class PollingDependencyError(CfnHandlerError): """Polling was requested but ``boto3`` is not importable. diff --git a/src/cfn_handler/_internal/response.py b/src/cfn_handler/_internal/response.py index 09c8cda..9c110e8 100644 --- a/src/cfn_handler/_internal/response.py +++ b/src/cfn_handler/_internal/response.py @@ -16,6 +16,7 @@ import json import urllib.error import urllib.request +from collections.abc import Callable from typing import Any, Final, Literal from cfn_handler._internal.log import logger @@ -23,6 +24,12 @@ ResponseStatus = Literal["SUCCESS", "FAILED"] +#: A pluggable transport callable: takes ``(url, payload)`` and is responsible +#: for delivering the payload to ``url``. The production implementation is +#: :func:`send_response` (urllib PUT). Testing helpers swap this for an +#: in-memory capture; see :mod:`cfn_handler.testing`. +Transport = Callable[[str, dict[str, Any]], None] + #: CloudFormation truncates the response ``Reason`` field at this many bytes. #: See: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/crpg-ref-responses.html MAX_REASON_LENGTH: Final = 4096 diff --git a/src/cfn_handler/resource.py b/src/cfn_handler/resource.py index 76b430d..6aae4fe 100644 --- a/src/cfn_handler/resource.py +++ b/src/cfn_handler/resource.py @@ -31,19 +31,24 @@ def handler(event, context): import secrets import string -from collections.abc import Callable -from typing import Any, Literal, Protocol +import warnings +from collections.abc import Callable, Generator +from contextlib import contextmanager +from typing import TYPE_CHECKING, Any, Literal, Protocol from cfn_handler._internal.log import logger from cfn_handler._internal.poller import ( EVENT_MARKER_PERMISSION, EVENT_MARKER_RULE, + PollerProvision, + PollerTeardown, is_poll_event, setup_polling, teardown_polling, ) from cfn_handler._internal.response import ( ResponseStatus, + Transport, build_response, send_response, ) @@ -53,6 +58,9 @@ def handler(event, context): ) from cfn_handler.exceptions import CfnHandlerError, ResponseError +if TYPE_CHECKING: + from cfn_handler.testing._internal.replay_result import Replay + #: A user-registered handler returns a dict (becomes ``Data``) or ``None``. HandlerResult = dict[str, Any] | None @@ -110,9 +118,23 @@ class CustomResource: invocation for cleanup and response sending. If less remains, the resource is failed with a timeout reason rather than risk an unresponsive Lambda. - test_mode: When True, responses are captured on + test_mode: **Deprecated.** When True, responses are captured on :attr:`last_response` instead of being sent to CloudFormation. - Useful for unit-testing handlers in isolation. + Scheduled for removal in v2.0; use ``CustomResource.replay()`` + and the helpers in :mod:`cfn_handler.testing` instead. + transport: Optional transport callable replacing the default urllib + PUT to the CFN response URL. Signature: ``(url, payload) -> None``. + Used internally by :meth:`replay` and available for advanced + users who need to interpose on the response. Pass ``None`` + (default) to use the production HTTP transport. + provision_poller: Optional callable replacing the default + ``setup_polling`` (boto3 EventBridge call) used when a + polling handler is registered. Signature: + ``(event, function_name, polling_interval_minutes, region) -> None``. + Used internally by :meth:`replay` to stub out AWS calls. + teardown_poller: Optional callable replacing the default + ``teardown_polling``. Signature: + ``(event, function_name, region) -> None``. log_level: Optional log level (``"DEBUG"``, ``logging.INFO``, etc.) to apply to the ``cfn_handler`` logger. Pass ``None`` (default) to leave the user's logging configuration alone. @@ -124,8 +146,9 @@ class CustomResource: create (otherwise one is auto-generated). no_echo: When True, the ``Data`` field is masked in CloudFormation output (used for credentials). - last_response: In ``test_mode``, the most recent response payload - that *would* have been sent. ``None`` outside test mode. + last_response: **Deprecated.** In ``test_mode``, the most recent + response payload that *would* have been sent. ``None`` outside + test mode. Use :meth:`replay` for new code. """ physical_resource_id: str @@ -138,12 +161,35 @@ def __init__( polling_interval_minutes: int = 1, polling_safety_margin_ms: int = DEFAULT_SAFETY_MARGIN_MS, test_mode: bool = False, + transport: Transport | None = None, + provision_poller: PollerProvision | None = None, + teardown_poller: PollerTeardown | None = None, log_level: int | str | None = None, ) -> None: """Initialise the resource. Raises no exceptions; init failures should be reported via :meth:`init_failure`.""" self._polling_interval_minutes = polling_interval_minutes self._polling_safety_margin_ms = polling_safety_margin_ms self._test_mode = test_mode + if test_mode: + warnings.warn( + "CustomResource(test_mode=True) is deprecated and will be removed " + "in v2.0. Use CustomResource.replay() and the helpers in " + "cfn_handler.testing (Replay, make_event, make_context, " + "assert_success, assert_failed, assert_deferred) instead.", + DeprecationWarning, + stacklevel=2, + ) + # Default transport is the production HTTP PUT (looked up lazily in + # ``_emit_response`` so that monkey-patching ``cfn_handler.resource. + # send_response`` continues to work). Tests that want explicit + # control inject a callable via the ``transport=`` kwarg or via + # ``replay()``. + self._transport: Transport | None = transport + # Same late-binding pattern for poller seams: defaults are looked + # up lazily so existing tests that ``patch("cfn_handler.resource. + # setup_polling")`` continue to work; explicit kwargs short-circuit. + self._provision_poller: PollerProvision | None = provision_poller + self._teardown_poller: PollerTeardown | None = teardown_poller if log_level is not None: logger.setLevel(log_level) @@ -237,6 +283,78 @@ def __call__(self, event: dict[str, Any], context: LambdaContext) -> dict[str, A self._best_effort_failed(event, context, "Internal error in cfn_handler dispatch") return self.last_response if self._test_mode else None + # ---- In-process replay (for testing) -------------------------------- + + # ---- In-process replay (for testing) -------------------------------- + + @contextmanager + def _replay_seams( + self, + *, + transport: Transport, + provision_poller: PollerProvision, + teardown_poller: PollerTeardown, + ) -> Generator[None, None, None]: + """Temporarily replace the production seams for a replay run. + + Internal contract used by ``cfn_handler.testing._internal.runner``. + Snapshots the current values, swaps in the supplied callables, + and restores on exit (including exceptions). Also forces + ``test_mode`` off for the duration: ``replay()`` always wants + the dispatch path to go through the supplied capturing + transport, never the legacy ``last_response`` capture. + """ + saved_transport = self._transport + saved_provision = self._provision_poller + saved_teardown = self._teardown_poller + saved_test_mode = self._test_mode + + self._transport = transport + self._provision_poller = provision_poller + self._teardown_poller = teardown_poller + self._test_mode = False + + try: + yield + finally: + self._transport = saved_transport + self._provision_poller = saved_provision + self._teardown_poller = saved_teardown + self._test_mode = saved_test_mode + + def replay( + self, + event: dict[str, Any], + context: LambdaContext | None = None, + ) -> Replay: + """Execute the dispatch flow in-process and return a structured result. + + Replay runs the same code paths as :meth:`__call__` (handler + resolution, handler invocation, polling deferral) but swaps the + HTTP transport and the polling-provisioning callables for in-memory + captures. No HTTP request is issued; ``boto3`` is never imported + unless something on the user's handler path imports it. + + The same instance can be replayed multiple times. Each call + snapshots the request type and rebuilds an isolated capture + state, so polling-deferral tests work cleanly: replay once, + observe ``status="DEFERRED"`` and the mutated event, replay + again with the mutated event to drive the poll handler. + + Args: + event: A CloudFormation custom-resource event. Use + :func:`cfn_handler.testing.make_event` to build one. + context: Optional Lambda context. Defaults to a fresh + :func:`cfn_handler.testing.make_context` instance. + + Returns: + A :class:`cfn_handler.testing.Replay` capturing the outcome. + """ + # Local import to avoid a public→testing→public cycle at module load. + from cfn_handler.testing._internal.runner import run_replay + + return run_replay(self, event, context) + # ---- Internal dispatch ---------------------------------------------- def _dispatch(self, event: dict[str, Any], context: LambdaContext) -> None: @@ -342,11 +460,8 @@ def _enter_polling( return try: - setup_polling( - event, - function_name=context.function_name, - polling_interval_minutes=self._polling_interval_minutes, - ) + provision = self._provision_poller if self._provision_poller is not None else setup_polling + provision(event, context.function_name, self._polling_interval_minutes, None) except CfnHandlerError as exc: logger.exception("setup_polling failed; failing the resource") self._send_failed(event, context, self._reason_from_exception(exc)) @@ -359,7 +474,8 @@ def _safe_teardown(self, event: dict[str, Any], context: LambdaContext) -> None: if EVENT_MARKER_RULE not in event and EVENT_MARKER_PERMISSION not in event: return # Nothing to tear down (initial invocation path that errored before setup). try: - teardown_polling(event, function_name=context.function_name) + teardown = self._teardown_poller if self._teardown_poller is not None else teardown_polling + teardown(event, context.function_name, None) except Exception: logger.exception("teardown_polling raised; continuing") @@ -422,8 +538,14 @@ def _emit_response(self, event: dict[str, Any], payload: dict[str, Any]) -> None self.last_response = payload logger.info("test_mode: skipping CFN response, captured on .last_response") return + # Late-bound default: look up the module-level ``send_response`` at + # call time so existing tests that monkey-patch + # ``cfn_handler.resource.send_response`` continue to work. An + # explicit ``transport=`` kwarg short-circuits this and is what + # ``replay()`` uses to capture without HTTP. + transport = self._transport if self._transport is not None else send_response try: - send_response(event["ResponseURL"], payload) + transport(event["ResponseURL"], payload) except ResponseError: logger.exception("Failed to send CloudFormation response") diff --git a/src/cfn_handler/testing/__init__.py b/src/cfn_handler/testing/__init__.py new file mode 100644 index 0000000..cb7c809 --- /dev/null +++ b/src/cfn_handler/testing/__init__.py @@ -0,0 +1,43 @@ +"""Testing helpers for users writing custom-resource handlers with cfn-handler. + +The public surface lives at :mod:`cfn_handler.testing`; do not import from +``cfn_handler.testing._internal`` (that subpackage is private and may +change between minor versions). + +Quickstart:: + + from cfn_handler import CustomResource + from cfn_handler.testing import make_event, assert_success + + def test_my_handler(): + resource = CustomResource() + + @resource.create + def on_create(event, ctx): + return {"Endpoint": "https://x"} + + replay = resource.replay(make_event()) + assert_success(replay, data={"Endpoint": "https://x"}) + +The full public API is re-exported from this module's ``__all__``. +""" + +from __future__ import annotations + +from cfn_handler.testing._internal.assertions import ( + assert_deferred, + assert_failed, + assert_success, +) +from cfn_handler.testing._internal.context_factory import make_context +from cfn_handler.testing._internal.event_factory import make_event +from cfn_handler.testing._internal.replay_result import Replay + +__all__ = [ + "Replay", + "assert_deferred", + "assert_failed", + "assert_success", + "make_context", + "make_event", +] diff --git a/src/cfn_handler/testing/_internal/__init__.py b/src/cfn_handler/testing/_internal/__init__.py new file mode 100644 index 0000000..592bc8d --- /dev/null +++ b/src/cfn_handler/testing/_internal/__init__.py @@ -0,0 +1,6 @@ +"""Internal implementation of the testing helpers. + +Not part of the public API; do not import from outside the +``cfn_handler.testing`` package. Names and signatures here may change +between minor releases. +""" diff --git a/src/cfn_handler/testing/_internal/assertions.py b/src/cfn_handler/testing/_internal/assertions.py new file mode 100644 index 0000000..14987ff --- /dev/null +++ b/src/cfn_handler/testing/_internal/assertions.py @@ -0,0 +1,107 @@ +"""Assertion helpers for replay-based unit tests. + +Each helper raises ``AssertionError`` with an informative message on +failure, and returns ``None`` on success. Optional kwargs are matched +only when explicitly supplied (so ``assert_success(replay)`` just checks +the status, while ``assert_success(replay, data={...})`` adds a data +match). +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from cfn_handler.testing._internal.replay_result import Replay + + +def assert_success( + replay: Replay, + *, + data: dict[str, Any] | None = None, + physical_resource_id: str | None = None, + no_echo: bool | None = None, +) -> None: + """Assert ``replay`` represents a SUCCESS, optionally matching fields. + + Args: + replay: The :class:`Replay` to inspect. + data: When supplied, requires ``replay.data == data`` (exact + match). Pass ``None`` to ignore. + physical_resource_id: When supplied, requires + ``replay.physical_resource_id == physical_resource_id``. + no_echo: When supplied, requires ``replay.no_echo == no_echo``. + + Raises: + AssertionError: When any condition fails. The message includes + both the expected and actual values. + """ + if replay.status != "SUCCESS": + msg = f"expected status='SUCCESS', got status={replay.status!r}; reason={replay.reason!r}; data={replay.data!r}" + raise AssertionError(msg) + + if data is not None and replay.data != data: + msg = f"expected data={data!r}, got data={replay.data!r}" + raise AssertionError(msg) + + if physical_resource_id is not None and replay.physical_resource_id != physical_resource_id: + msg = ( + f"expected physical_resource_id={physical_resource_id!r}, " + f"got physical_resource_id={replay.physical_resource_id!r}" + ) + raise AssertionError(msg) + + if no_echo is not None and replay.no_echo != no_echo: + msg = f"expected no_echo={no_echo!r}, got no_echo={replay.no_echo!r}" + raise AssertionError(msg) + + +def assert_failed( + replay: Replay, + *, + reason_contains: str | None = None, + physical_resource_id: str | None = None, +) -> None: + """Assert ``replay`` represents a FAILED, optionally matching reason. + + Args: + replay: The :class:`Replay` to inspect. + reason_contains: When supplied, requires the substring to appear + anywhere in ``replay.reason``. Pass ``None`` to ignore. + physical_resource_id: When supplied, requires + ``replay.physical_resource_id == physical_resource_id``. + + Raises: + AssertionError: When any condition fails. + """ + if replay.status != "FAILED": + msg = f"expected status='FAILED', got status={replay.status!r}; reason={replay.reason!r}" + raise AssertionError(msg) + + if reason_contains is not None and reason_contains not in replay.reason: + msg = f"expected reason to contain {reason_contains!r}, got reason={replay.reason!r}" + raise AssertionError(msg) + + if physical_resource_id is not None and replay.physical_resource_id != physical_resource_id: + msg = ( + f"expected physical_resource_id={physical_resource_id!r}, " + f"got physical_resource_id={replay.physical_resource_id!r}" + ) + raise AssertionError(msg) + + +def assert_deferred(replay: Replay) -> None: + """Assert ``replay`` represents a DEFERRED outcome (entered polling). + + Args: + replay: The :class:`Replay` to inspect. + + Raises: + AssertionError: If ``replay.status != "DEFERRED"``. + """ + if replay.status != "DEFERRED": + msg = ( + f"expected status='DEFERRED' (would have entered polling), " + f"got status={replay.status!r}; payload={replay.payload!r}" + ) + raise AssertionError(msg) diff --git a/src/cfn_handler/testing/_internal/context_factory.py b/src/cfn_handler/testing/_internal/context_factory.py new file mode 100644 index 0000000..2f354c3 --- /dev/null +++ b/src/cfn_handler/testing/_internal/context_factory.py @@ -0,0 +1,72 @@ +"""Lambda context factory for tests. + +The factory produces an object satisfying the +:class:`~cfn_handler.resource.LambdaContext` Protocol with sensible +defaults. Designed to be used directly (in non-pytest tests) or via the +:func:`cfn_handler.testing.fixtures.cfn_lambda_context` pytest fixture. +""" + +from __future__ import annotations + +from dataclasses import dataclass + +#: Default time remaining at the start of a fresh Lambda invocation. +#: 5 minutes is the AWS Lambda default for new functions. +DEFAULT_REMAINING_TIME_MS = 300_000 + + +@dataclass +class _ReplayLambdaContext: + """A plain dataclass that structurally satisfies ``LambdaContext``. + + Intentionally not a ``Mock`` so that mypy/pyright in user code see + real attribute types (a ``Mock`` reports everything as ``Any``, + defeating type checking). + """ + + aws_request_id: str = "00000000-0000-0000-0000-000000000000" + function_name: str = "test-function" + invoked_function_arn: str = "arn:aws:lambda:us-east-1:111111111111:function:test-function" + log_group_name: str = "/aws/lambda/test-function" + log_stream_name: str = "2026/05/22/[$LATEST]00000000000000000000000000000000" + _remaining_time_ms: int = DEFAULT_REMAINING_TIME_MS + + def get_remaining_time_in_millis(self) -> int: + """Return the configured remaining time, in milliseconds.""" + return self._remaining_time_ms + + +def make_context( + *, + aws_request_id: str = "00000000-0000-0000-0000-000000000000", + function_name: str = "test-function", + invoked_function_arn: str = "arn:aws:lambda:us-east-1:111111111111:function:test-function", + log_group_name: str = "/aws/lambda/test-function", + log_stream_name: str = "2026/05/22/[$LATEST]00000000000000000000000000000000", + remaining_time_ms: int = DEFAULT_REMAINING_TIME_MS, +) -> _ReplayLambdaContext: + """Build a Lambda context double satisfying the ``LambdaContext`` Protocol. + + All defaults use safe placeholder values (RFC 5737 / AWS-reserved + example account ID ``111111111111``) so a misrouted test can't hit + real infrastructure. + + Args: + aws_request_id: Value for ``ctx.aws_request_id``. + function_name: Value for ``ctx.function_name``. + invoked_function_arn: Value for ``ctx.invoked_function_arn``. + log_group_name: Value for ``ctx.log_group_name``. + log_stream_name: Value for ``ctx.log_stream_name``. + remaining_time_ms: Value returned by ``ctx.get_remaining_time_in_millis()``. + + Returns: + A dataclass instance structurally compatible with ``LambdaContext``. + """ + return _ReplayLambdaContext( + aws_request_id=aws_request_id, + function_name=function_name, + invoked_function_arn=invoked_function_arn, + log_group_name=log_group_name, + log_stream_name=log_stream_name, + _remaining_time_ms=remaining_time_ms, + ) diff --git a/src/cfn_handler/testing/_internal/event_factory.py b/src/cfn_handler/testing/_internal/event_factory.py new file mode 100644 index 0000000..f80f108 --- /dev/null +++ b/src/cfn_handler/testing/_internal/event_factory.py @@ -0,0 +1,84 @@ +"""Event factory: build canonical CloudFormation custom-resource event dicts. + +Defaults use safe placeholder values: the response URL host is +``example.invalid`` (RFC 6761 reserved name guaranteed not to resolve) +and the account ID is ``111111111111`` (AWS-reserved example account), +so a misrouted test cannot hit real infrastructure. +""" + +from __future__ import annotations + +from typing import Any, Literal + +#: The set of valid CloudFormation custom-resource request types. +RequestType = Literal["Create", "Update", "Delete"] + +_DEFAULT_STACK_ID = ( + "arn:aws:cloudformation:us-east-1:111111111111:stack/test-stack/00000000-0000-0000-0000-000000000000" +) +_DEFAULT_REQUEST_ID = "00000000-0000-0000-0000-000000000000" +_DEFAULT_LOGICAL_ID = "TestResource" +_DEFAULT_RESOURCE_TYPE = "Custom::Test" +_DEFAULT_RESPONSE_URL = "https://example.invalid/cfn-response" +_DEFAULT_SERVICE_TOKEN = "arn:aws:lambda:us-east-1:111111111111:function:test-function" + + +def make_event( + request_type: RequestType = "Create", + *, + stack_id: str = _DEFAULT_STACK_ID, + request_id: str = _DEFAULT_REQUEST_ID, + logical_resource_id: str = _DEFAULT_LOGICAL_ID, + physical_resource_id: str | None = None, + resource_type: str = _DEFAULT_RESOURCE_TYPE, + resource_properties: dict[str, Any] | None = None, + old_resource_properties: dict[str, Any] | None = None, + response_url: str = _DEFAULT_RESPONSE_URL, + service_token: str = _DEFAULT_SERVICE_TOKEN, +) -> dict[str, Any]: + """Build a CloudFormation custom-resource event dict. + + Args: + request_type: ``"Create"``, ``"Update"``, or ``"Delete"``. + stack_id: Full stack ARN. + request_id: Per-invocation UUID. + logical_resource_id: Template-side logical name of the resource. + physical_resource_id: Required for Update/Delete; raises + ``ValueError`` if missing. + resource_type: ``Custom::*`` type from the template. + resource_properties: ``ResourceProperties`` dict; defaults to ``{}``. + old_resource_properties: ``OldResourceProperties`` dict for Update; + defaults to ``{}`` for Update events, omitted otherwise. + response_url: The presigned URL CFN expects the response on. + service_token: ARN of the Lambda function backing this custom + resource. + + Returns: + A dict matching the documented CFN custom-resource event shape. + + Raises: + ValueError: For Update/Delete events when ``physical_resource_id`` + is not supplied. + """ + if request_type in ("Update", "Delete") and physical_resource_id is None: + msg = ( + f"physical_resource_id is required for {request_type} events; " + "real CFN events always carry one. Pass it explicitly." + ) + raise ValueError(msg) + + event: dict[str, Any] = { + "RequestType": request_type, + "ServiceToken": service_token, + "ResponseURL": response_url, + "StackId": stack_id, + "RequestId": request_id, + "LogicalResourceId": logical_resource_id, + "ResourceType": resource_type, + "ResourceProperties": resource_properties if resource_properties is not None else {}, + } + if physical_resource_id is not None: + event["PhysicalResourceId"] = physical_resource_id + if request_type == "Update": + event["OldResourceProperties"] = old_resource_properties if old_resource_properties is not None else {} + return event diff --git a/src/cfn_handler/testing/_internal/replay_result.py b/src/cfn_handler/testing/_internal/replay_result.py new file mode 100644 index 0000000..536b5ee --- /dev/null +++ b/src/cfn_handler/testing/_internal/replay_result.py @@ -0,0 +1,58 @@ +"""The :class:`Replay` dataclass — structured outcome of an in-process dispatch. + +Returned by :meth:`cfn_handler.CustomResource.replay`; never mutated by the +library after construction (the dataclass is frozen). Tests inspect the +fields directly or use the assertion helpers in :mod:`cfn_handler.testing`. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Literal + + +def _empty_payload() -> dict[str, Any]: + """Build an empty payload dict (typed for pyright/mypy strict).""" + return {} + + +#: Possible outcomes of a replay. ``"SUCCESS"`` and ``"FAILED"`` mirror real +#: CloudFormation response statuses; ``"DEFERRED"`` is a sentinel value +#: that exists only in replay (never sent on the wire) used to signal +#: "this would have entered polling". +ReplayStatus = Literal["SUCCESS", "FAILED", "DEFERRED"] + +#: The lifecycle request type a replay was dispatched on. +ReplayRequestType = Literal["Create", "Update", "Delete"] + + +@dataclass(frozen=True, slots=True) +class Replay: + """Outcome of a :meth:`CustomResource.replay` call. + + Attributes: + status: ``"SUCCESS"``, ``"FAILED"``, or the replay-only sentinel + ``"DEFERRED"`` (set when the dispatch would have entered + polling instead of sending a terminal response). + physical_resource_id: The PhysicalResourceId that would be sent + to CloudFormation. ``None`` only on a ``DEFERRED`` replay + where the value isn't computed because no response is built. + data: The ``Data`` field of the response payload (the dict the + handler returned, or ``{}`` for handlers returning ``None`` + and for FAILED responses). + reason: The ``Reason`` field of the response. Empty string on + SUCCESS; the exception text on FAILED; empty on DEFERRED. + no_echo: Whether the response would have been marked NoEcho. + payload: The full rendered response payload that would have been + PUT to ``ResponseURL``. Empty dict on DEFERRED. + request_type: ``"Create"``, ``"Update"``, or ``"Delete"`` — + the value of ``event["RequestType"]`` at replay time. + """ + + status: ReplayStatus + physical_resource_id: str | None + data: dict[str, Any] + reason: str + no_echo: bool + payload: dict[str, Any] = field(default_factory=_empty_payload) + request_type: ReplayRequestType = "Create" diff --git a/src/cfn_handler/testing/_internal/runner.py b/src/cfn_handler/testing/_internal/runner.py new file mode 100644 index 0000000..1cf8a78 --- /dev/null +++ b/src/cfn_handler/testing/_internal/runner.py @@ -0,0 +1,144 @@ +"""The runner that drives ``CustomResource.replay``. + +Lives under ``cfn_handler.testing._internal`` (private). Splits the +``replay()`` logic out of ``resource.py`` to keep the production module +free of testing-only concerns. + +The strategy: capture the resource's existing ``_transport``, +``_provision_poller``, and ``_teardown_poller`` attributes, swap them for +in-memory recorders, run a normal ``__call__``, then restore the originals. +The recorders also produce the ``Replay`` value returned to the caller. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, cast + +from cfn_handler.testing._internal.replay_result import Replay + +# TYPE_CHECKING-only imports: at runtime ``cfn_handler.resource`` IS already +# loaded by the time ``run_replay`` is called (the user must have imported +# ``CustomResource`` to instantiate one). The ``if TYPE_CHECKING`` guard means +# this block is never executed at module load — it exists solely so mypy/pyright +# can resolve the ``CustomResource`` and ``LambdaContext`` annotations below. +# ``ReplayRequestType`` and ``ReplayStatus`` go here too because they're only +# used in annotations (which become strings under +# ``from __future__ import annotations``) or in string-form ``cast()`` calls +# (per ruff's TC006). Importing them at runtime would be flagged as unused. +# CodeQL's ``py/cyclic-import`` query flags the ``cfn_handler.resource`` import +# as a cycle because it doesn't model conditional imports; the cycle is +# paper-only and has no runtime effect. Paired with the lazy +# ``from ... import run_replay`` inside ``CustomResource.replay`` +# (resource.py), which is the canonical Python pattern for breaking import +# cycles between modules with a directional dependency. +if TYPE_CHECKING: + from cfn_handler.resource import CustomResource, LambdaContext + from cfn_handler.testing._internal.replay_result import ReplayRequestType, ReplayStatus + + +def _make_default_context() -> LambdaContext: + """Build a default context for ``replay()`` calls without an explicit one. + + Imported lazily because ``cfn_handler.testing._internal.context_factory`` + isn't part of the public API surface this module is documented as. + """ + from cfn_handler.testing._internal.context_factory import make_context + + return make_context() + + +def run_replay( + resource: CustomResource, + event: dict[str, Any], + context: LambdaContext | None, +) -> Replay: + """Drive ``resource(event, context)`` with capturing transport + stub pollers. + + See :meth:`cfn_handler.CustomResource.replay` for the public contract. + """ + if context is None: + context = _make_default_context() + + request_type = cast("ReplayRequestType", event.get("RequestType", "Create")) + + # Capture state. Lists capture in mutation order; we only ever expect at + # most one entry per call but allow more to surface bugs visibly. + sent: list[dict[str, Any]] = [] + provisioned: list[bool] = [] + + def capture_transport(_url: str, payload: dict[str, Any]) -> None: + sent.append(payload) + + def stub_provision( + evt: dict[str, Any], + _function_name: str, + _polling_interval_minutes: int = 1, + _region: str | None = None, + ) -> None: + # Mirror real ``setup_polling`` event mutation so a follow-up + # ``replay()`` of the mutated event correctly routes to the poll + # handler. The marker values are syntactically valid placeholders; + # tests that care about exact values can override via the public + # poller seam (CustomResource(provision_poller=...)). + evt["CfnHandlerPoll"] = True + evt["CfnHandlerRule"] = "arn:aws:events:us-east-1:111111111111:rule/cfn-handler-replay-stub" + evt["CfnHandlerPermission"] = "cfn-handler-replay-stub-permission" + provisioned.append(True) + + def stub_teardown( + _evt: dict[str, Any], + _function_name: str, + _region: str | None = None, + ) -> None: + # No-op: in replay we never set up real AWS resources, so there's + # nothing to remove. Recording is unnecessary because the + # post-poll terminal response in ``sent`` is the authoritative + # signal that teardown was reached. + return + + with resource._replay_seams( # pyright: ignore[reportPrivateUsage] + transport=capture_transport, + provision_poller=stub_provision, + teardown_poller=stub_teardown, + ): + resource(event, context) + + # Decide DEFERRED vs SUCCESS/FAILED based on what we captured. + # If polling was provisioned AND no terminal response was emitted, + # this was a deferral. + if provisioned and not sent: + return Replay( + status="DEFERRED", + physical_resource_id=None, + data={}, + reason="", + no_echo=False, + payload={}, + request_type=request_type, + ) + + if not sent: + # No response and no polling provisioned — should not happen on a + # well-formed dispatch. Surface this as FAILED with a message + # so test failures are loud, not silent. + return Replay( + status="FAILED", + physical_resource_id=None, + data={}, + reason="cfn-handler internal: dispatch produced no response and no polling deferral", + no_echo=False, + payload={}, + request_type=request_type, + ) + + payload = sent[-1] # last response wins (a dispatch should only emit one) + status: ReplayStatus = payload["Status"] + return Replay( + status=status, + physical_resource_id=payload.get("PhysicalResourceId"), + data=payload.get("Data", {}), + reason=payload.get("Reason", ""), + no_echo=payload.get("NoEcho", False), + payload=payload, + request_type=request_type, + ) diff --git a/src/cfn_handler/testing/fixtures.py b/src/cfn_handler/testing/fixtures.py new file mode 100644 index 0000000..2732942 --- /dev/null +++ b/src/cfn_handler/testing/fixtures.py @@ -0,0 +1,73 @@ +"""pytest fixtures auto-loaded via the ``pytest11`` entry point. + +This module is imported by pytest at collection time when ``cfn_handler`` +is installed; it has no effect outside pytest. Importing this module +does not pull in pytest itself unless the fixtures are referenced. + +Public fixtures: + +- :func:`cfn_create_event`, :func:`cfn_update_event`, :func:`cfn_delete_event`: + canonical CloudFormation custom-resource event dicts (one per request type). +- :func:`cfn_lambda_context`: a minimal ``LambdaContext``-protocol object. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +import pytest + +from cfn_handler.testing._internal.context_factory import make_context +from cfn_handler.testing._internal.event_factory import make_event + +if TYPE_CHECKING: + from cfn_handler.resource import LambdaContext + +# Stable values used by the Update/Delete fixtures so the PhysicalResourceId +# is consistent across calls. Tests that need uniqueness can override. +_FIXTURE_PHYSICAL_RESOURCE_ID = "test-physical-resource-id" + + +@pytest.fixture +def cfn_create_event() -> dict[str, Any]: + """A canonical CloudFormation Create event dict. + + Returns a fresh dict on every test (mutations don't leak across tests). + """ + return make_event(request_type="Create") + + +@pytest.fixture +def cfn_update_event() -> dict[str, Any]: + """A canonical CloudFormation Update event dict. + + Includes a ``PhysicalResourceId`` (required by CFN for Update events) + and an empty ``OldResourceProperties``. Tests can override either by + mutating the returned dict. + """ + return make_event( + request_type="Update", + physical_resource_id=_FIXTURE_PHYSICAL_RESOURCE_ID, + ) + + +@pytest.fixture +def cfn_delete_event() -> dict[str, Any]: + """A canonical CloudFormation Delete event dict. + + Includes a ``PhysicalResourceId`` (required by CFN for Delete events). + """ + return make_event( + request_type="Delete", + physical_resource_id=_FIXTURE_PHYSICAL_RESOURCE_ID, + ) + + +@pytest.fixture +def cfn_lambda_context() -> LambdaContext: + """A minimal ``LambdaContext`` double satisfying the Protocol. + + Override fields by mutating the returned object, or call + :func:`cfn_handler.testing.make_context` directly with kwargs. + """ + return make_context() diff --git a/src/cfn_handler/testing/py.typed b/src/cfn_handler/testing/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/test_fixture_discovery.py b/tests/integration/test_fixture_discovery.py new file mode 100644 index 0000000..3776023 --- /dev/null +++ b/tests/integration/test_fixture_discovery.py @@ -0,0 +1,89 @@ +"""End-to-end test that fixtures auto-load via the pytest11 entry point. + +We invoke pytest as a subprocess against a fresh project that has NO +conftest.py declaring `pytest_plugins`. If our entry point is wired +correctly, pytest's plugin manager picks up cfn_handler.testing.fixtures +and the four fixtures resolve. +""" + +from __future__ import annotations + +import subprocess +import sys +import textwrap +from pathlib import Path + +import pytest + + +@pytest.mark.integration +def test_fixtures_auto_discovered_in_fresh_pytest_project(tmp_path: Path) -> None: + """A pristine pytest project with no conftest sees our fixtures.""" + test_file = tmp_path / "test_consumer.py" + test_file.write_text( + textwrap.dedent( + """\ + from cfn_handler import CustomResource + from cfn_handler.testing import assert_success + + + def test_uses_fixtures(cfn_create_event, cfn_lambda_context): + resource = CustomResource() + + @resource.create + def on_create(event, ctx): + return {"x": "y"} + + replay = resource.replay(cfn_create_event, cfn_lambda_context) + assert_success(replay, data={"x": "y"}) + + + def test_each_event_fixture_works( + cfn_create_event, + cfn_update_event, + cfn_delete_event, + ): + assert cfn_create_event["RequestType"] == "Create" + assert cfn_update_event["RequestType"] == "Update" + assert cfn_delete_event["RequestType"] == "Delete" + # PhysicalResourceId is required for Update/Delete. + assert "PhysicalResourceId" in cfn_update_event + assert "PhysicalResourceId" in cfn_delete_event + """, + ), + ) + result = subprocess.run( + [sys.executable, "-m", "pytest", "-q", str(test_file)], + check=False, + capture_output=True, + text=True, + ) + assert result.returncode == 0, ( + f"Auto-discovered fixtures failed:\n--- stdout ---\n{result.stdout}\n--- stderr ---\n{result.stderr}" + ) + + +@pytest.mark.integration +def test_fixture_invocations_are_independent(tmp_path: Path) -> None: + """Mutating one test's event must not bleed into a sibling.""" + test_file = tmp_path / "test_isolation.py" + test_file.write_text( + textwrap.dedent( + """\ + def test_a_mutates(cfn_create_event): + cfn_create_event["MUTATED"] = True + + def test_b_is_pristine(cfn_create_event): + assert "MUTATED" not in cfn_create_event + """, + ), + ) + result = subprocess.run( + [sys.executable, "-m", "pytest", "-q", str(test_file)], + check=False, + capture_output=True, + text=True, + ) + assert result.returncode == 0, ( + f"Fixture isolation failed:\n--- stdout ---\n{result.stdout}\n--- stderr ---\n{result.stderr}" + ) diff --git a/tests/integration/test_replay_parity.py b/tests/integration/test_replay_parity.py new file mode 100644 index 0000000..c86e354 --- /dev/null +++ b/tests/integration/test_replay_parity.py @@ -0,0 +1,102 @@ +"""Parity test: ``CustomResource.__call__`` (real path) vs ``replay()`` (in-process). + +For the same handler logic and event, both paths must produce equivalent +response payloads. This test catches drift between the production dispatch +and the replay-based testing surface. + +The PhysicalResourceId carries a per-invocation random suffix so we mask +that (and the RequestId echo) before comparison. +""" + +from __future__ import annotations + +import json +import urllib.error +import urllib.request +from collections.abc import Iterator +from typing import Any +from unittest.mock import Mock + +import pytest +from moto import mock_aws + +from cfn_handler import CustomResource +from cfn_handler.resource import LambdaContext + + +@pytest.fixture +def moto_aws() -> Iterator[None]: + with mock_aws(): + yield + + +@pytest.fixture +def captured_payloads(monkeypatch: pytest.MonkeyPatch) -> list[dict[str, Any]]: + """Capture every PUT to the simulated CFN ResponseURL. + + Returns the list of payloads (in order) the production path would have + PUT to CloudFormation. We can then compare to ``replay()`` output. + """ + captured: list[dict[str, Any]] = [] + + class _FakeResponse: + status = 200 + reason = "OK" + + def __enter__(self) -> _FakeResponse: + return self + + def __exit__(self, *_args: object) -> None: + return None + + def fake_urlopen(request: urllib.request.Request) -> _FakeResponse: + body = request.data + if isinstance(body, bytes): + captured.append(json.loads(body.decode("utf-8"))) + return _FakeResponse() + + monkeypatch.setattr( + "cfn_handler._internal.response.urllib.request.urlopen", + fake_urlopen, + ) + return captured + + +def _normalize(payload: dict[str, Any]) -> dict[str, Any]: + """Strip per-invocation random/echo fields for cross-run comparison.""" + keys = ("Status", "Data", "Reason", "LogicalResourceId", "StackId") + return {k: payload[k] for k in keys if k in payload} + + +@pytest.mark.integration +def test_production_call_and_replay_produce_equivalent_payloads( + events: dict[str, dict[str, Any]], + mock_context: Mock, + moto_aws: None, + captured_payloads: list[dict[str, Any]], +) -> None: + """Same handler, two paths, equivalent payloads.""" + + def build_resource() -> CustomResource: + resource = CustomResource() + + @resource.create + def on_create(_event: dict[str, Any], _ctx: LambdaContext) -> dict[str, Any]: + return {"Endpoint": "https://parity.example", "Token": "abc"} + + return resource + + # ---- Production path: __call__ via fake_urlopen + moto ------------- + prod_resource = build_resource() + prod_resource(events["Create"], mock_context) + assert len(captured_payloads) == 1, "production path did not emit a response" + prod_payload = captured_payloads[0] + + # ---- Replay path: in-process, no HTTP, no boto3 -------------------- + replay_resource = build_resource() + replay = replay_resource.replay(events["Create"], mock_context) + + # Status, Data, LogicalResourceId, StackId, Reason MUST agree. + assert _normalize(prod_payload) == _normalize(replay.payload) + assert prod_payload["Status"] == replay.status + assert prod_payload["Data"] == replay.data diff --git a/tests/unit/test_backstops.py b/tests/unit/test_backstops.py index 69074ef..32aaaa9 100644 --- a/tests/unit/test_backstops.py +++ b/tests/unit/test_backstops.py @@ -127,25 +127,3 @@ def on_poll(_e: dict[str, Any], _c: LambdaContext) -> dict[str, Any]: ): # Should not raise: resource(poll_event, mock_context) - - -def test_safe_teardown_skipped_in_test_mode( - events: dict[str, dict[str, Any]], - mock_context: Mock, -) -> None: - """In test_mode, no teardown is attempted (we never set up either).""" - poll_event = events["Create"] - poll_event[EVENT_MARKER_POLL] = True - poll_event[EVENT_MARKER_RULE] = "arn:aws:events:us-east-1:123:rule/my-rule" - poll_event[EVENT_MARKER_PERMISSION] = "Sid" - - resource = CustomResource(test_mode=True) - - @resource.poll_create - def on_poll(_e: dict[str, Any], _c: LambdaContext) -> dict[str, Any]: - return {"done": True} - - with patch("cfn_handler.resource.teardown_polling") as teardown: - resource(poll_event, mock_context) - - teardown.assert_not_called() diff --git a/tests/unit/test_poller_seam.py b/tests/unit/test_poller_seam.py new file mode 100644 index 0000000..5d5c46c --- /dev/null +++ b/tests/unit/test_poller_seam.py @@ -0,0 +1,89 @@ +"""Tests for the internal poller-stub seam. + +The seam allows polling provisioning/teardown (which import boto3 and +provision EventBridge rules) to be swapped for in-memory stubs during +testing. This is the foundation for ``replay()``-based polling tests +that don't require boto3. +""" + +from __future__ import annotations + +import sys +from typing import Any +from unittest.mock import Mock + +from cfn_handler import CustomResource + + +def test_poller_stubs_replace_boto3_calls( + events: dict[str, dict[str, Any]], + mock_context: Mock, +) -> None: + """When poller stubs are injected, no boto3 import or AWS call occurs. + + The stub provisioner records its call args and mutates the event + with the polling marker keys (matching real ``setup_polling`` + behaviour). The library never reaches into ``cfn_handler._internal. + poller`` — the seam handles it. + """ + provision_calls: list[tuple[dict[str, Any], str, int, str | None]] = [] + teardown_calls: list[tuple[dict[str, Any], str, str | None]] = [] + + def stub_provision( + event: dict[str, Any], + function_name: str, + polling_interval_minutes: int = 1, + region: str | None = None, + ) -> None: + provision_calls.append((event, function_name, polling_interval_minutes, region)) + # Mutate event the same way real setup_polling does so a + # subsequent invocation (poll re-invocation) routes correctly. + event["CfnHandlerPoll"] = True + event["CfnHandlerRule"] = "arn:aws:events:us-east-1:111111111111:rule/stub" + event["CfnHandlerPermission"] = "stub-perm-id" + + def stub_teardown( + event: dict[str, Any], + function_name: str, + region: str | None = None, + ) -> None: + teardown_calls.append((event, function_name, region)) + + # Capturing transport so we don't hit the network either. + sent: list[tuple[str, dict[str, Any]]] = [] + + def fake_transport(url: str, payload: dict[str, Any]) -> None: + sent.append((url, payload)) + + resource = CustomResource( + transport=fake_transport, + provision_poller=stub_provision, + teardown_poller=stub_teardown, + ) + + @resource.create + def on_create(_event: dict[str, Any], _ctx: Any) -> None: + return None + + @resource.poll_create + def on_poll(_event: dict[str, Any], _ctx: Any) -> dict[str, Any]: + return {"Endpoint": "https://done.example"} + + # First invocation: lifecycle handler runs, stub provisioner mutates + # event. No CFN response sent (deferred). + pre_modules = set(sys.modules) + resource(events["Create"], mock_context) + assert "boto3" not in (set(sys.modules) - pre_modules), ( + "boto3 must not have been imported during the deferred-create path" + ) + assert len(provision_calls) == 1 + assert len(sent) == 0 + assert events["Create"]["CfnHandlerPoll"] is True + + # Second invocation: poll re-invocation routes to the poll handler. + resource(events["Create"], mock_context) + assert len(teardown_calls) == 1 + assert len(sent) == 1 + _url, payload = sent[0] + assert payload["Status"] == "SUCCESS" + assert payload["Data"] == {"Endpoint": "https://done.example"} diff --git a/tests/unit/test_polling_dispatch.py b/tests/unit/test_polling_dispatch.py index b03d759..e73b5ea 100644 --- a/tests/unit/test_polling_dispatch.py +++ b/tests/unit/test_polling_dispatch.py @@ -30,7 +30,7 @@ def test_create_with_poll_handler_defers_response( mock_context: Mock, ) -> None: """Lifecycle handler runs, then setup_polling is called and NO response sent.""" - resource = CustomResource() # NOT test_mode: we want to verify setup_polling + resource = CustomResource() # production path: we want to verify setup_polling @resource.create def on_create(_e: dict[str, Any], _c: LambdaContext) -> None: @@ -50,27 +50,6 @@ def on_poll(_e: dict[str, Any], _c: LambdaContext) -> None: send.assert_not_called() -def test_create_with_poll_handler_in_test_mode_records_sentinel( - events: dict[str, dict[str, Any]], - mock_context: Mock, -) -> None: - """Test-mode polling records a sentinel on last_response so tests can assert intent.""" - resource = CustomResource(test_mode=True) - - @resource.create - def on_create(_e: dict[str, Any], _c: LambdaContext) -> dict[str, Any]: - return {"foo": "bar"} - - @resource.poll_create - def on_poll(_e: dict[str, Any], _c: LambdaContext) -> None: - return None - - resource(events["Create"], mock_context) - assert resource.last_response is not None - assert resource.last_response.get("__cfn_handler_polling__") is True - assert resource.last_response.get("Data") == {"foo": "bar"} - - def test_setup_polling_failure_yields_failed_response( events: dict[str, dict[str, Any]], mock_context: Mock, diff --git a/tests/unit/test_resource.py b/tests/unit/test_resource.py index 5d0e2b5..b851fe1 100644 --- a/tests/unit/test_resource.py +++ b/tests/unit/test_resource.py @@ -1,8 +1,9 @@ """Unit tests for the public ``CustomResource`` class. -Strategy: use ``test_mode=True`` so we don't actually PUT to a CFN URL; we -inspect ``last_response`` to verify the payload that *would* have been sent. -This is exactly the test-mode pattern from upstream issues #52 / #54. +Strategy: use ``replay()`` so we don't actually PUT to a CFN URL; we +inspect the returned ``Replay`` value to verify the payload that *would* +have been sent. This is the ``cfn_handler.testing`` v1.3+ pattern; +the legacy ``test_mode`` flag is deprecated. """ from __future__ import annotations @@ -20,6 +21,7 @@ LambdaContext, _generate_physical_id, ) +from cfn_handler.testing import assert_failed, assert_success # ---- Public API surface -------------------------------------------------- @@ -58,7 +60,7 @@ def test_lambda_context_protocol_accepts_real_shape() -> None: @pytest.mark.parametrize("attr", ["create", "update", "delete"]) def test_lifecycle_decorator_registers_handler(attr: str) -> None: - resource = CustomResource(test_mode=True) + resource = CustomResource() @getattr(resource, attr) def fn(_event: dict[str, Any], _context: LambdaContext) -> None: @@ -70,7 +72,7 @@ def fn(_event: dict[str, Any], _context: LambdaContext) -> None: @pytest.mark.parametrize("attr", ["poll_create", "poll_update", "poll_delete"]) def test_poll_decorator_registers_handler(attr: str) -> None: - resource = CustomResource(test_mode=True) + resource = CustomResource() @getattr(resource, attr) def fn(_event: dict[str, Any], _context: LambdaContext) -> None: @@ -81,7 +83,7 @@ def fn(_event: dict[str, Any], _context: LambdaContext) -> None: def test_lifecycle_decorator_returns_original_callable() -> None: - resource = CustomResource(test_mode=True) + resource = CustomResource() def fn(_event: dict[str, Any], _context: LambdaContext) -> None: return None @@ -92,7 +94,7 @@ def fn(_event: dict[str, Any], _context: LambdaContext) -> None: @pytest.mark.parametrize("attr", ["create", "update", "delete"]) def test_double_lifecycle_registration_raises(attr: str) -> None: - resource = CustomResource(test_mode=True) + resource = CustomResource() decorator = getattr(resource, attr) @decorator @@ -108,7 +110,7 @@ def second(_e: dict[str, Any], _c: LambdaContext) -> None: @pytest.mark.parametrize("attr", ["poll_create", "poll_update", "poll_delete"]) def test_double_poll_registration_raises(attr: str) -> None: - resource = CustomResource(test_mode=True) + resource = CustomResource() decorator = getattr(resource, attr) @decorator @@ -131,7 +133,7 @@ def test_double_registration_error_is_a_value_error() -> None: def test_create_dispatch_invokes_create_handler(events: dict[str, dict[str, Any]], mock_context: Mock) -> None: - resource = CustomResource(test_mode=True) + resource = CustomResource() captured: list[tuple[dict[str, Any], LambdaContext]] = [] @resource.create @@ -139,14 +141,14 @@ def on_create(event: dict[str, Any], context: LambdaContext) -> None: captured.append((event, context)) return None - resource(events["Create"], mock_context) + resource.replay(events["Create"], mock_context) assert len(captured) == 1 assert captured[0][0] is events["Create"] assert captured[0][1] is mock_context def test_update_and_delete_dispatch(events: dict[str, dict[str, Any]], mock_context: Mock) -> None: - resource = CustomResource(test_mode=True) + resource = CustomResource() calls: list[str] = [] @resource.create @@ -161,166 +163,156 @@ def on_update(_e: dict[str, Any], _c: LambdaContext) -> None: def on_delete(_e: dict[str, Any], _c: LambdaContext) -> None: calls.append("delete") - resource(events["Update"], mock_context) - resource(events["Delete"], mock_context) + resource.replay(events["Update"], mock_context) + resource.replay(events["Delete"], mock_context) assert calls == ["update", "delete"] def test_handler_return_value_becomes_data(events: dict[str, dict[str, Any]], mock_context: Mock) -> None: - resource = CustomResource(test_mode=True) + resource = CustomResource() @resource.create def on_create(_e: dict[str, Any], _c: LambdaContext) -> dict[str, Any]: return {"Endpoint": "https://x.example", "Token": "abc"} - resource(events["Create"], mock_context) - assert resource.last_response is not None - assert resource.last_response["Status"] == "SUCCESS" - assert resource.last_response["Data"] == {"Endpoint": "https://x.example", "Token": "abc"} + replay = resource.replay(events["Create"], mock_context) + assert_success(replay, data={"Endpoint": "https://x.example", "Token": "abc"}) def test_handler_returning_none_yields_empty_data(events: dict[str, dict[str, Any]], mock_context: Mock) -> None: - resource = CustomResource(test_mode=True) + resource = CustomResource() @resource.create def on_create(_e: dict[str, Any], _c: LambdaContext) -> None: return None - resource(events["Create"], mock_context) - assert resource.last_response is not None - assert resource.last_response["Data"] == {} + replay = resource.replay(events["Create"], mock_context) + assert_success(replay, data={}) def test_unknown_request_type_yields_failed(events: dict[str, dict[str, Any]], mock_context: Mock) -> None: - resource = CustomResource(test_mode=True) + resource = CustomResource() bogus = events["Create"] bogus["RequestType"] = "FooBar" - resource(bogus, mock_context) - assert resource.last_response is not None - assert resource.last_response["Status"] == "FAILED" - assert "FooBar" in resource.last_response["Reason"] + replay = resource.replay(bogus, mock_context) + assert_failed(replay, reason_contains="FooBar") def test_missing_handler_for_known_request_type_yields_failed( - events: dict[str, dict[str, Any]], mock_context: Mock + events: dict[str, dict[str, Any]], + mock_context: Mock, ) -> None: - resource = CustomResource(test_mode=True) + resource = CustomResource() # No handlers registered. - resource(events["Delete"], mock_context) - assert resource.last_response is not None - assert resource.last_response["Status"] == "FAILED" - assert "Delete" in resource.last_response["Reason"] + replay = resource.replay(events["Delete"], mock_context) + assert_failed(replay, reason_contains="Delete") # ---- Exception handling -------------------------------------------------- def test_handler_exception_is_reported_as_failed(events: dict[str, dict[str, Any]], mock_context: Mock) -> None: - resource = CustomResource(test_mode=True) + resource = CustomResource() @resource.create def on_create(_e: dict[str, Any], _c: LambdaContext) -> None: - raise RuntimeError("policy not found") + msg = "policy not found" + raise RuntimeError(msg) - resource(events["Create"], mock_context) - assert resource.last_response is not None - assert resource.last_response["Status"] == "FAILED" - assert "policy not found" in resource.last_response["Reason"] + replay = resource.replay(events["Create"], mock_context) + assert_failed(replay, reason_contains="policy not found") def test_handler_exception_with_empty_message_uses_class_name( - events: dict[str, dict[str, Any]], mock_context: Mock + events: dict[str, dict[str, Any]], + mock_context: Mock, ) -> None: - resource = CustomResource(test_mode=True) + resource = CustomResource() @resource.create def on_create(_e: dict[str, Any], _c: LambdaContext) -> None: raise RuntimeError - resource(events["Create"], mock_context) - assert resource.last_response is not None - assert resource.last_response["Reason"] == "RuntimeError" + replay = resource.replay(events["Create"], mock_context) + assert replay.reason == "RuntimeError" # ---- PhysicalResourceId semantics --------------------------------------- def test_create_default_physical_resource_id_is_generated( - events: dict[str, dict[str, Any]], mock_context: Mock + events: dict[str, dict[str, Any]], + mock_context: Mock, ) -> None: - resource = CustomResource(test_mode=True) + resource = CustomResource() @resource.create def on_create(_e: dict[str, Any], _c: LambdaContext) -> None: return None - resource(events["Create"], mock_context) - assert resource.last_response is not None - pid = resource.last_response["PhysicalResourceId"] - assert pid - assert "TestResource" in pid + replay = resource.replay(events["Create"], mock_context) + assert replay.physical_resource_id is not None + assert "TestResource" in replay.physical_resource_id def test_update_echoes_existing_physical_resource_id(events: dict[str, dict[str, Any]], mock_context: Mock) -> None: - resource = CustomResource(test_mode=True) + resource = CustomResource() @resource.update def on_update(_e: dict[str, Any], _c: LambdaContext) -> None: return None - resource(events["Update"], mock_context) - assert resource.last_response is not None - assert resource.last_response["PhysicalResourceId"] == events["Update"]["PhysicalResourceId"] + replay = resource.replay(events["Update"], mock_context) + assert replay.physical_resource_id == events["Update"]["PhysicalResourceId"] def test_update_overrides_physical_resource_id_for_replacement( - events: dict[str, dict[str, Any]], mock_context: Mock + events: dict[str, dict[str, Any]], + mock_context: Mock, ) -> None: - resource = CustomResource(test_mode=True) + resource = CustomResource() @resource.update def on_update(_e: dict[str, Any], _c: LambdaContext) -> None: resource.physical_resource_id = "new-id-replaced" - resource(events["Update"], mock_context) - assert resource.last_response is not None - assert resource.last_response["PhysicalResourceId"] == "new-id-replaced" + replay = resource.replay(events["Update"], mock_context) + assert replay.physical_resource_id == "new-id-replaced" def test_delete_echoes_existing_physical_resource_id(events: dict[str, dict[str, Any]], mock_context: Mock) -> None: - resource = CustomResource(test_mode=True) + resource = CustomResource() @resource.delete def on_delete(_e: dict[str, Any], _c: LambdaContext) -> None: return None - resource(events["Delete"], mock_context) - assert resource.last_response is not None - assert resource.last_response["PhysicalResourceId"] == events["Delete"]["PhysicalResourceId"] + replay = resource.replay(events["Delete"], mock_context) + assert replay.physical_resource_id == events["Delete"]["PhysicalResourceId"] def test_no_echo_default_omitted(events: dict[str, dict[str, Any]], mock_context: Mock) -> None: - resource = CustomResource(test_mode=True) + resource = CustomResource() @resource.create def on_create(_e: dict[str, Any], _c: LambdaContext) -> None: return None - resource(events["Create"], mock_context) - assert resource.last_response is not None - assert "NoEcho" not in resource.last_response + replay = resource.replay(events["Create"], mock_context) + assert replay.no_echo is False + assert "NoEcho" not in replay.payload def test_no_echo_can_be_set_from_handler(events: dict[str, dict[str, Any]], mock_context: Mock) -> None: - resource = CustomResource(test_mode=True) + resource = CustomResource() @resource.create def on_create(_e: dict[str, Any], _c: LambdaContext) -> None: resource.no_echo = True - resource(events["Create"], mock_context) - assert resource.last_response is not None - assert resource.last_response["NoEcho"] is True + replay = resource.replay(events["Create"], mock_context) + assert replay.no_echo is True + assert replay.payload["NoEcho"] is True # ---- Init failure (#7 / #67) -------------------------------------------- @@ -328,57 +320,22 @@ def on_create(_e: dict[str, Any], _c: LambdaContext) -> None: def test_init_failure_short_circuits_to_failed_with_pid(events: dict[str, dict[str, Any]], mock_context: Mock) -> None: """Issue #7/#67: init_failure must produce a usable PhysicalResourceId - so CloudFormation can roll back, not get stuck in ROLLBACK_FAILED.""" - resource = CustomResource(test_mode=True) + so CloudFormation can roll back, not get stuck in ROLLBACK_FAILED. + """ + resource = CustomResource() resource.init_failure(RuntimeError("boom in cold start")) - resource(events["Create"], mock_context) - assert resource.last_response is not None - assert resource.last_response["Status"] == "FAILED" - assert "boom in cold start" in resource.last_response["Reason"] - assert resource.last_response["PhysicalResourceId"] + replay = resource.replay(events["Create"], mock_context) + assert_failed(replay, reason_contains="boom in cold start") + assert replay.physical_resource_id def test_init_failure_for_update_echoes_existing_pid(events: dict[str, dict[str, Any]], mock_context: Mock) -> None: - resource = CustomResource(test_mode=True) + resource = CustomResource() resource.init_failure(RuntimeError("init blew up")) - resource(events["Update"], mock_context) - assert resource.last_response is not None - assert resource.last_response["PhysicalResourceId"] == events["Update"]["PhysicalResourceId"] - - -# ---- Test mode ---------------------------------------------------------- - - -def test_test_mode_does_not_send_response(events: dict[str, dict[str, Any]], mock_context: Mock) -> None: - """Test mode captures the response on the instance instead of POSTing.""" - resource = CustomResource(test_mode=True) - - @resource.create - def on_create(_e: dict[str, Any], _c: LambdaContext) -> None: - return None - - captured = resource(events["Create"], mock_context) - assert captured is not None - assert captured["Status"] == "SUCCESS" - assert captured is resource.last_response - - -def test_test_mode_returns_none_outside_test_mode( - events: dict[str, dict[str, Any]], - mock_context: Mock, -) -> None: - """Outside test mode, __call__ returns None (Lambda ignores return values).""" - resource = CustomResource(test_mode=True) - - @resource.create - def on_create(_e: dict[str, Any], _c: LambdaContext) -> None: - return None - - # Even in test_mode we still get the response back; explicitly verify the - # falsey-check semantics for paranoid tests. - assert resource(events["Create"], mock_context) is not None + replay = resource.replay(events["Update"], mock_context) + assert replay.physical_resource_id == events["Update"]["PhysicalResourceId"] # ---- log_level acceptance (#66) ------------------------------------------ @@ -390,7 +347,7 @@ def on_create(_e: dict[str, Any], _c: LambdaContext) -> None: ) def test_log_level_constructor_accepts_str_int_or_none(log_level: int | str | None) -> None: """Issue #66: log_level should accept int and str (and None to leave alone).""" - CustomResource(test_mode=True, log_level=log_level) + CustomResource(log_level=log_level) # ---- Internal helpers ---------------------------------------------------- diff --git a/tests/unit/test_state_machine.py b/tests/unit/test_state_machine.py index ffbcc79..0be3928 100644 --- a/tests/unit/test_state_machine.py +++ b/tests/unit/test_state_machine.py @@ -8,7 +8,7 @@ against the documented invariants: -I1. Exactly one response is sent (or recorded in test_mode) per terminal call. +I1. Exactly one response is captured per terminal call (via ``replay()``). I2. SUCCESS responses include all required fields (Status, PhysicalResourceId, StackId, RequestId, LogicalResourceId, Reason, Data). I3. FAILED responses include a non-empty Reason. @@ -78,12 +78,11 @@ def test_lifecycle_invariants_no_polling( remaining_ms: int, no_echo: bool, ) -> None: - """Without polling, every dispatch produces exactly one response satisfying - the documented invariants (I1-I5).""" + """Without polling, every dispatch produces exactly one response satisfying the documented invariants (I1-I5).""" event = copy.deepcopy(_BASE_EVENTS[request_type]) context = _make_context(remaining_ms) - resource = CustomResource(test_mode=True) + resource = CustomResource() if register_handler: decorator = getattr(resource, request_type.lower()) @@ -96,13 +95,14 @@ def fn(_e: dict[str, Any], _c: object) -> dict[str, Any] | None: return None if handler_outcome == "dict": return {"k": "v"} - raise RuntimeError("simulated handler failure") + msg = "simulated handler failure" + raise RuntimeError(msg) - resource(event, context) + replay = resource.replay(event, context) - payload = resource.last_response - # I1: exactly one response was recorded. - assert payload is not None, "No response was recorded" + # I1: exactly one response was captured. + assert replay.payload, "No response was captured" + payload = replay.payload # I2 / I5: required fields present and non-empty PID. for required in ("Status", "PhysicalResourceId", "StackId", "RequestId", "LogicalResourceId", "Reason", "Data"): @@ -115,18 +115,18 @@ def fn(_e: dict[str, Any], _c: object) -> dict[str, Any] | None: # Outcome-specific assertions: if not register_handler: - assert payload["Status"] == "FAILED" - assert payload["Reason"] # I3 + assert replay.status == "FAILED" + assert replay.reason # I3 elif handler_outcome == "raise": - assert payload["Status"] == "FAILED" - assert payload["Reason"] # I3 - assert "simulated handler failure" in payload["Reason"] + assert replay.status == "FAILED" + assert replay.reason # I3 + assert "simulated handler failure" in replay.reason else: - assert payload["Status"] == "SUCCESS" + assert replay.status == "SUCCESS" if handler_outcome == "dict": - assert payload["Data"] == {"k": "v"} + assert replay.data == {"k": "v"} else: - assert payload["Data"] == {} + assert replay.data == {} @settings( @@ -152,7 +152,7 @@ def test_physical_resource_id_resolution( # refuse; assume well-formed input. (Real CFN PIDs are well-formed.) assume(pid_value.encode("utf-8", "surrogatepass") == pid_value.encode("utf-8", "strict")) - resource = CustomResource(test_mode=True) + resource = CustomResource() decorator = getattr(resource, request_type.lower()) @@ -162,17 +162,17 @@ def fn(_e: dict[str, Any], _c: object) -> None: resource.physical_resource_id = pid_value return None - resource(event, context) - assert resource.last_response is not None + replay = resource.replay(event, context) + assert replay.physical_resource_id is not None if pid_set: - assert resource.last_response["PhysicalResourceId"] == pid_value + assert replay.physical_resource_id == pid_value elif "PhysicalResourceId" in event: - assert resource.last_response["PhysicalResourceId"] == event["PhysicalResourceId"] + assert replay.physical_resource_id == event["PhysicalResourceId"] else: - assert resource.last_response["PhysicalResourceId"] + assert replay.physical_resource_id # Auto-generated id includes the logical resource id. - assert event["LogicalResourceId"] in resource.last_response["PhysicalResourceId"] + assert event["LogicalResourceId"] in replay.physical_resource_id @settings(deadline=None, max_examples=100) @@ -209,7 +209,7 @@ def test_handler_outcomes_each_request_type( event = copy.deepcopy(_BASE_EVENTS[request_type]) context = _make_context(120_000) - resource = CustomResource(test_mode=True) + resource = CustomResource() decorator = getattr(resource, handler_to_register) @@ -219,9 +219,9 @@ def fn(_e: dict[str, Any], _c: object) -> dict[str, Any] | None: return None if handler_outcome == "dict": return {"x": "y"} - raise RuntimeError("err") + msg = "err" + raise RuntimeError(msg) - resource(event, context) - assert resource.last_response is not None - assert resource.last_response["Status"] in {"SUCCESS", "FAILED"} - assert resource.last_response["PhysicalResourceId"] + replay = resource.replay(event, context) + assert replay.status in {"SUCCESS", "FAILED"} + assert replay.physical_resource_id diff --git a/tests/unit/test_test_mode_deprecation.py b/tests/unit/test_test_mode_deprecation.py new file mode 100644 index 0000000..f859099 --- /dev/null +++ b/tests/unit/test_test_mode_deprecation.py @@ -0,0 +1,102 @@ +"""Tests for the soft-deprecation of ``CustomResource(test_mode=True)``. + +In v1.3 the legacy ``test_mode`` flag continues to work, but constructing +a ``CustomResource`` with ``test_mode=True`` emits a ``DeprecationWarning`` +directing users at the new ``replay()`` API. Removed in v2.0. +""" + +from __future__ import annotations + +import warnings +from typing import Any +from unittest.mock import Mock, patch + +from cfn_handler import CustomResource +from cfn_handler._internal.poller import EVENT_MARKER_PERMISSION, EVENT_MARKER_POLL, EVENT_MARKER_RULE +from cfn_handler.resource import LambdaContext + + +def test_test_mode_emits_deprecation_warning() -> None: + """Construction with ``test_mode=True`` emits ``DeprecationWarning``.""" + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + CustomResource(test_mode=True) + deprecations = [w for w in caught if issubclass(w.category, DeprecationWarning)] + assert len(deprecations) == 1, f"expected 1 DeprecationWarning, got {len(deprecations)}: {caught}" + msg = str(deprecations[0].message) + assert "replay" in msg.lower(), f"deprecation message should mention replay(): {msg!r}" + assert "cfn_handler.testing" in msg, f"message should reference the new module: {msg!r}" + + +def test_test_mode_false_does_not_warn() -> None: + """Constructing without ``test_mode=True`` is silent.""" + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + CustomResource() # default test_mode=False + CustomResource(test_mode=False) # explicit False + deprecations = [w for w in caught if issubclass(w.category, DeprecationWarning)] + assert deprecations == [] + + +def test_test_mode_still_works_for_backwards_compat() -> None: + """The legacy behaviour continues to function despite the deprecation.""" + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + resource = CustomResource(test_mode=True) + assert resource._test_mode is True + assert resource.last_response is None + + +def test_test_mode_safe_teardown_skipped( + events: dict[str, dict[str, Any]], + mock_context: Mock, +) -> None: + """In legacy ``test_mode``, no teardown is attempted (we never set up either). + + This behaviour is preserved verbatim in v1.3 for backwards compatibility. + Removed in v2.0 along with ``test_mode`` itself. + """ + poll_event = events["Create"] + poll_event[EVENT_MARKER_POLL] = True + poll_event[EVENT_MARKER_RULE] = "arn:aws:events:us-east-1:123:rule/my-rule" + poll_event[EVENT_MARKER_PERMISSION] = "Sid" + + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + resource = CustomResource(test_mode=True) + + @resource.poll_create + def on_poll(_e: dict[str, Any], _c: LambdaContext) -> dict[str, Any]: + return {"done": True} + + with patch("cfn_handler.resource.teardown_polling") as teardown: + resource(poll_event, mock_context) + + teardown.assert_not_called() + + +def test_test_mode_polling_records_sentinel( + events: dict[str, dict[str, Any]], + mock_context: Mock, +) -> None: + """In legacy ``test_mode``, polling deferral writes a ``__cfn_handler_polling__`` sentinel to ``last_response``. + + Preserved in v1.3 for backwards compatibility. Replaced in v2.0 by + ``replay()`` which returns ``Replay(status="DEFERRED")``. + """ + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + resource = CustomResource(test_mode=True) + + @resource.create + def on_create(_e: dict[str, Any], _c: LambdaContext) -> dict[str, Any]: + return {"foo": "bar"} + + @resource.poll_create + def on_poll(_e: dict[str, Any], _c: LambdaContext) -> None: + return None + + resource(events["Create"], mock_context) + assert resource.last_response is not None + assert resource.last_response.get("__cfn_handler_polling__") is True + assert resource.last_response.get("Data") == {"foo": "bar"} diff --git a/tests/unit/test_transport_seam.py b/tests/unit/test_transport_seam.py new file mode 100644 index 0000000..bd12199 --- /dev/null +++ b/tests/unit/test_transport_seam.py @@ -0,0 +1,74 @@ +"""Tests for the internal Transport seam. + +The seam allows the response transport (urllib PUT to CFN) to be +swapped for an in-memory capture during testing. This is the foundation +for ``replay()`` and is verified independently here. +""" + +from __future__ import annotations + +from typing import Any +from unittest.mock import Mock + +from cfn_handler import CustomResource + + +def test_transport_callable_intercepts_response( + events: dict[str, dict[str, Any]], + mock_context: Mock, +) -> None: + """A custom transport callable replaces the urllib PUT transport. + + When ``CustomResource(transport=...)`` is constructed with a callable, + that callable receives ``(url, payload)`` instead of urllib being + invoked. This is the foundational seam for the testing helpers. + """ + captured: list[tuple[str, dict[str, Any]]] = [] + + def fake_transport(url: str, payload: dict[str, Any]) -> None: + captured.append((url, payload)) + + resource = CustomResource(transport=fake_transport) + + @resource.create + def on_create(_event: dict[str, Any], _ctx: Any) -> dict[str, Any]: + return {"Endpoint": "https://intercepted.example"} + + resource(events["Create"], mock_context) + + assert len(captured) == 1 + url, payload = captured[0] + assert url == events["Create"]["ResponseURL"] + assert payload["Status"] == "SUCCESS" + assert payload["Data"] == {"Endpoint": "https://intercepted.example"} + + +def test_transport_default_remains_http( + events: dict[str, dict[str, Any]], + mock_context: Mock, + monkeypatch: Any, +) -> None: + """Without an explicit transport, the urllib PUT path is used. + + Verifies the seam is purely additive: existing behaviour is preserved + when no transport is supplied. + """ + sent: list[tuple[str, dict[str, Any]]] = [] + + def fake_send_response(url: str, payload: dict[str, Any]) -> None: + sent.append((url, payload)) + + monkeypatch.setattr( + "cfn_handler.resource.send_response", + fake_send_response, + ) + + resource = CustomResource() # no transport= kwarg + + @resource.create + def on_create(_event: dict[str, Any], _ctx: Any) -> dict[str, Any]: + return {"x": "y"} + + resource(events["Create"], mock_context) + + assert len(sent) == 1 diff --git a/tests/unit/testing/__init__.py b/tests/unit/testing/__init__.py new file mode 100644 index 0000000..045c50b --- /dev/null +++ b/tests/unit/testing/__init__.py @@ -0,0 +1 @@ +"""Unit tests for cfn_handler.testing — package marker.""" diff --git a/tests/unit/testing/test_assertions.py b/tests/unit/testing/test_assertions.py new file mode 100644 index 0000000..0282f57 --- /dev/null +++ b/tests/unit/testing/test_assertions.py @@ -0,0 +1,167 @@ +"""Tests for ``assert_success``, ``assert_failed``, ``assert_deferred``.""" + +from __future__ import annotations + +import pytest + +from cfn_handler.testing import ( + Replay, + assert_deferred, + assert_failed, + assert_success, +) + + +def _success( + *, + data: dict[str, object] | None = None, + physical_resource_id: str = "test-pid", + no_echo: bool = False, +) -> Replay: + payload_data = data if data is not None else {} + return Replay( + status="SUCCESS", + physical_resource_id=physical_resource_id, + data=payload_data, + reason="", + no_echo=no_echo, + payload={ + "Status": "SUCCESS", + "PhysicalResourceId": physical_resource_id, + "Data": payload_data, + "Reason": "", + }, + request_type="Create", + ) + + +def _failed(*, reason: str = "boom", physical_resource_id: str = "test-pid") -> Replay: + return Replay( + status="FAILED", + physical_resource_id=physical_resource_id, + data={}, + reason=reason, + no_echo=False, + payload={ + "Status": "FAILED", + "PhysicalResourceId": physical_resource_id, + "Data": {}, + "Reason": reason, + }, + request_type="Create", + ) + + +def _deferred() -> Replay: + return Replay( + status="DEFERRED", + physical_resource_id=None, + data={}, + reason="", + no_echo=False, + payload={}, + request_type="Create", + ) + + +# ---- assert_success --------------------------------------------------- + + +def test_assert_success_passes_on_success_replay() -> None: + assert_success(_success()) # should not raise + + +def test_assert_success_with_data_match_passes() -> None: + assert_success(_success(data={"x": 1}), data={"x": 1}) + + +def test_assert_success_with_data_mismatch_raises() -> None: + with pytest.raises(AssertionError) as excinfo: + assert_success(_success(data={"x": 1}), data={"x": 2}) + assert "data" in str(excinfo.value).lower() + + +def test_assert_success_with_physical_resource_id_match_passes() -> None: + assert_success(_success(physical_resource_id="abc"), physical_resource_id="abc") + + +def test_assert_success_with_physical_resource_id_mismatch_raises() -> None: + with pytest.raises(AssertionError): + assert_success(_success(physical_resource_id="abc"), physical_resource_id="xyz") + + +def test_assert_success_with_no_echo_match_passes() -> None: + assert_success(_success(no_echo=True), no_echo=True) + + +def test_assert_success_with_no_echo_mismatch_raises() -> None: + with pytest.raises(AssertionError) as excinfo: + assert_success(_success(no_echo=False), no_echo=True) + assert "no_echo" in str(excinfo.value) + + +def test_assert_success_on_failed_replay_raises_with_reason_in_message() -> None: + with pytest.raises(AssertionError) as excinfo: + assert_success(_failed(reason="something broke")) + msg = str(excinfo.value) + assert "FAILED" in msg + assert "something broke" in msg + + +def test_assert_success_on_deferred_replay_raises() -> None: + with pytest.raises(AssertionError): + assert_success(_deferred()) + + +# ---- assert_failed ---------------------------------------------------- + + +def test_assert_failed_passes_on_failed_replay() -> None: + assert_failed(_failed()) # should not raise + + +def test_assert_failed_with_reason_substring_passes() -> None: + assert_failed(_failed(reason="something boom happened"), reason_contains="boom") + + +def test_assert_failed_with_reason_substring_mismatch_raises() -> None: + with pytest.raises(AssertionError) as excinfo: + assert_failed(_failed(reason="something boom happened"), reason_contains="kaboom") + assert "kaboom" in str(excinfo.value) + + +def test_assert_failed_on_success_replay_raises() -> None: + with pytest.raises(AssertionError): + assert_failed(_success()) + + +def test_assert_failed_on_deferred_replay_raises() -> None: + with pytest.raises(AssertionError): + assert_failed(_deferred()) + + +def test_assert_failed_with_physical_resource_id_match() -> None: + assert_failed(_failed(physical_resource_id="abc"), physical_resource_id="abc") + + +def test_assert_failed_with_physical_resource_id_mismatch_raises() -> None: + with pytest.raises(AssertionError) as excinfo: + assert_failed(_failed(physical_resource_id="abc"), physical_resource_id="xyz") + assert "physical_resource_id" in str(excinfo.value) + + +# ---- assert_deferred -------------------------------------------------- + + +def test_assert_deferred_passes_on_deferred_replay() -> None: + assert_deferred(_deferred()) + + +def test_assert_deferred_on_success_raises() -> None: + with pytest.raises(AssertionError): + assert_deferred(_success()) + + +def test_assert_deferred_on_failed_raises() -> None: + with pytest.raises(AssertionError): + assert_deferred(_failed()) diff --git a/tests/unit/testing/test_factories.py b/tests/unit/testing/test_factories.py new file mode 100644 index 0000000..6b6ea60 --- /dev/null +++ b/tests/unit/testing/test_factories.py @@ -0,0 +1,139 @@ +"""Tests for ``cfn_handler.testing.make_event`` and ``make_context``.""" + +from __future__ import annotations + +import pytest + +from cfn_handler.resource import LambdaContext +from cfn_handler.testing import make_context, make_event + +# ---- make_event -------------------------------------------------------- + + +def test_default_create_event_is_well_formed() -> None: + event = make_event() + assert event["RequestType"] == "Create" + for key in ( + "ServiceToken", + "ResponseURL", + "StackId", + "RequestId", + "LogicalResourceId", + "ResourceType", + "ResourceProperties", + ): + assert key in event, f"missing {key}" + assert "PhysicalResourceId" not in event # not present on Create + assert "OldResourceProperties" not in event # only on Update + + +def test_update_event_requires_physical_resource_id() -> None: + with pytest.raises(ValueError, match="physical_resource_id"): + make_event(request_type="Update") + + +def test_delete_event_requires_physical_resource_id() -> None: + with pytest.raises(ValueError, match="physical_resource_id"): + make_event(request_type="Delete") + + +def test_update_event_with_physical_resource_id_is_well_formed() -> None: + event = make_event(request_type="Update", physical_resource_id="abc-123") + assert event["RequestType"] == "Update" + assert event["PhysicalResourceId"] == "abc-123" + assert event["OldResourceProperties"] == {} # default empty dict on Update + + +def test_delete_event_with_physical_resource_id_is_well_formed() -> None: + event = make_event(request_type="Delete", physical_resource_id="abc-123") + assert event["RequestType"] == "Delete" + assert event["PhysicalResourceId"] == "abc-123" + assert "OldResourceProperties" not in event + + +def test_resource_properties_override_is_applied() -> None: + event = make_event(resource_properties={"Foo": "bar"}) + assert event["ResourceProperties"] == {"Foo": "bar"} + + +def test_old_resource_properties_override_is_applied_for_update() -> None: + event = make_event( + request_type="Update", + physical_resource_id="x", + old_resource_properties={"Old": True}, + ) + assert event["OldResourceProperties"] == {"Old": True} + + +def test_defaults_use_safe_placeholders() -> None: + event = make_event() + # RFC 6761 reserved name: example.invalid is guaranteed not to resolve. + assert "example.invalid" in event["ResponseURL"] + # AWS-reserved example account IDs. + assert "111111111111" in event["StackId"] + assert "111111111111" in event["ServiceToken"] + + +def test_field_overrides_propagate_individually() -> None: + event = make_event( + stack_id="arn:aws:cloudformation:us-west-2:222222222222:stack/x/y", + request_id="custom-uuid", + logical_resource_id="MyLogical", + resource_type="Custom::Banana", + response_url="https://other.invalid/cfn", + service_token="arn:aws:lambda:us-west-2:222222222222:function:x", + ) + assert event["StackId"] == "arn:aws:cloudformation:us-west-2:222222222222:stack/x/y" + assert event["RequestId"] == "custom-uuid" + assert event["LogicalResourceId"] == "MyLogical" + assert event["ResourceType"] == "Custom::Banana" + assert event["ResponseURL"] == "https://other.invalid/cfn" + assert event["ServiceToken"] == "arn:aws:lambda:us-west-2:222222222222:function:x" + + +# ---- make_context ------------------------------------------------------ + + +def test_make_context_satisfies_lambda_context_protocol() -> None: + """The factory's return value must structurally satisfy LambdaContext.""" + + def consume(ctx: LambdaContext) -> int: + return ctx.get_remaining_time_in_millis() + + ctx = make_context() + assert consume(ctx) > 0 + assert isinstance(ctx.aws_request_id, str) + assert isinstance(ctx.function_name, str) + + +def test_make_context_remaining_time_override() -> None: + ctx = make_context(remaining_time_ms=5_000) + assert ctx.get_remaining_time_in_millis() == 5_000 + + +def test_make_context_default_remaining_time_is_positive() -> None: + ctx = make_context() + assert ctx.get_remaining_time_in_millis() > 0 + + +def test_make_context_field_overrides() -> None: + ctx = make_context( + aws_request_id="custom-id", + function_name="my-func", + invoked_function_arn="arn:aws:lambda:us-east-1:123:function:my-func", + log_group_name="/aws/lambda/my-func", + log_stream_name="2026/05/22/[$LATEST]xyz", + ) + assert ctx.aws_request_id == "custom-id" + assert ctx.function_name == "my-func" + assert ctx.invoked_function_arn == "arn:aws:lambda:us-east-1:123:function:my-func" + assert ctx.log_group_name == "/aws/lambda/my-func" + assert ctx.log_stream_name == "2026/05/22/[$LATEST]xyz" + + +def test_make_context_each_call_returns_independent_instance() -> None: + """Mutations to one context don't bleed into others.""" + ctx_a = make_context() + ctx_b = make_context() + ctx_a.aws_request_id = "mutated" + assert ctx_b.aws_request_id != "mutated" diff --git a/tests/unit/testing/test_replay.py b/tests/unit/testing/test_replay.py new file mode 100644 index 0000000..c576b12 --- /dev/null +++ b/tests/unit/testing/test_replay.py @@ -0,0 +1,187 @@ +"""Tests for ``cfn_handler.testing.Replay`` and ``CustomResource.replay``. + +These tests verify the public testing surface end-to-end: +- ``replay()`` returns a structured ``Replay`` value +- No HTTP I/O and no boto3 import occurs during replay +- The dataclass is immutable +""" + +from __future__ import annotations + +import dataclasses +import subprocess +import sys +from typing import Any +from unittest.mock import Mock + +import pytest + +from cfn_handler import CustomResource +from cfn_handler.testing import Replay + + +def test_replay_returns_success_for_returning_handler( + events: dict[str, dict[str, Any]], + mock_context: Mock, +) -> None: + resource = CustomResource() + + @resource.create + def on_create(_event: dict[str, Any], _ctx: Any) -> dict[str, Any]: + return {"Endpoint": "https://x"} + + replay = resource.replay(events["Create"], mock_context) + + assert isinstance(replay, Replay) + assert replay.status == "SUCCESS" + assert replay.data == {"Endpoint": "https://x"} + assert replay.request_type == "Create" + assert replay.payload["Status"] == "SUCCESS" + assert replay.payload["Data"] == {"Endpoint": "https://x"} + + +def test_replay_returns_failed_for_raising_handler( + events: dict[str, dict[str, Any]], + mock_context: Mock, +) -> None: + resource = CustomResource() + + @resource.create + def on_create(_event: dict[str, Any], _ctx: Any) -> None: + msg = "boom" + raise RuntimeError(msg) + + replay = resource.replay(events["Create"], mock_context) + + assert replay.status == "FAILED" + assert "boom" in replay.reason + assert replay.payload["Status"] == "FAILED" + + +def test_replay_does_not_perform_http_io( + events: dict[str, dict[str, Any]], + mock_context: Mock, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """``replay()`` must NEVER call urllib's PUT path.""" + + def boom(*_args: Any, **_kwargs: Any) -> None: + msg = "urllib must not be invoked during replay" + raise AssertionError(msg) + + monkeypatch.setattr("cfn_handler._internal.response.urllib.request.urlopen", boom) + + resource = CustomResource() + + @resource.create + def on_create(_event: dict[str, Any], _ctx: Any) -> dict[str, Any]: + return {"x": "y"} + + # Should NOT raise from boom. + resource.replay(events["Create"], mock_context) + + +def test_replay_does_not_import_boto3_for_non_polling_path(tmp_path: Any) -> None: + """A non-polling replay must not import boto3 (run in subprocess for clean sys.modules).""" + script = tmp_path / "no_boto3.py" + script.write_text( + "import sys\n" + "from cfn_handler import CustomResource\n" + "from cfn_handler.testing import make_event, make_context\n" + "\n" + "resource = CustomResource()\n" + "\n" + "@resource.create\n" + "def on_create(event, ctx):\n" + " return {'ok': True}\n" + "\n" + "resource.replay(make_event(), make_context())\n" + "assert 'boto3' not in sys.modules, 'boto3 was imported during replay'\n" + "print('OK')\n", + ) + result = subprocess.run( + [sys.executable, str(script)], + check=True, + capture_output=True, + text=True, + ) + assert "OK" in result.stdout + + +def test_replay_dataclass_is_frozen( + events: dict[str, dict[str, Any]], + mock_context: Mock, +) -> None: + resource = CustomResource() + + @resource.create + def on_create(_event: dict[str, Any], _ctx: Any) -> dict[str, Any]: + return {"x": 1} + + replay = resource.replay(events["Create"], mock_context) + + with pytest.raises(dataclasses.FrozenInstanceError): + replay.status = "FAILED" # type: ignore[misc] + + +def test_replay_payload_is_complete_response_shape( + events: dict[str, dict[str, Any]], + mock_context: Mock, +) -> None: + """``replay.payload`` is exactly what would have been PUT to CFN.""" + resource = CustomResource() + + @resource.create + def on_create(_event: dict[str, Any], _ctx: Any) -> dict[str, Any]: + return {"Endpoint": "x"} + + replay = resource.replay(events["Create"], mock_context) + + # CFN response schema requires these keys. + for key in ("Status", "PhysicalResourceId", "StackId", "RequestId", "LogicalResourceId", "Reason", "Data"): + assert key in replay.payload, f"missing {key} in payload" + assert replay.payload["Status"] == "SUCCESS" + assert replay.payload["Data"] == {"Endpoint": "x"} + + +def test_replay_does_not_persist_state_between_calls( + events: dict[str, dict[str, Any]], + mock_context: Mock, +) -> None: + """Two replays on the same instance produce independent results.""" + resource = CustomResource() + + @resource.create + def on_create(_event: dict[str, Any], _ctx: Any) -> dict[str, Any]: + return {"i": 1} + + @resource.update + def on_update(_event: dict[str, Any], _ctx: Any) -> None: + msg = "update boom" + raise RuntimeError(msg) + + success = resource.replay(events["Create"], mock_context) + failure = resource.replay(events["Update"], mock_context) + + assert success.status == "SUCCESS" + assert failure.status == "FAILED" + assert "update boom" in failure.reason + + +def test_replay_without_context_uses_default( + events: dict[str, dict[str, Any]], +) -> None: + """When ``context`` is omitted, replay() supplies a default LambdaContext. + + The default context comes from cfn_handler.testing.make_context() + and provides values that satisfy the LambdaContext Protocol. + """ + resource = CustomResource() + + @resource.create + def on_create(_event: dict[str, Any], _ctx: Any) -> dict[str, Any]: + return {"ok": True} + + replay = resource.replay(events["Create"]) + assert replay.status == "SUCCESS" + assert replay.data == {"ok": True} diff --git a/tests/unit/testing/test_replay_polling.py b/tests/unit/testing/test_replay_polling.py new file mode 100644 index 0000000..6abc00d --- /dev/null +++ b/tests/unit/testing/test_replay_polling.py @@ -0,0 +1,123 @@ +"""Tests for ``replay()`` polling deferral and re-invocation. + +Verifies the two-step polling flow: +1. First replay: lifecycle handler runs, polling is "provisioned" (stubbed), + no response is emitted, ``Replay(status="DEFERRED")`` returned. +2. Second replay (with the mutated event): the poll handler runs, response + is emitted, ``Replay(status="SUCCESS"/"FAILED")`` returned. +""" + +from __future__ import annotations + +from typing import Any +from unittest.mock import Mock + +from cfn_handler import CustomResource +from cfn_handler._internal.poller import ( + EVENT_MARKER_PERMISSION, + EVENT_MARKER_POLL, + EVENT_MARKER_RULE, +) + + +def test_replay_with_poll_handler_returns_deferred( + events: dict[str, dict[str, Any]], + mock_context: Mock, +) -> None: + """First replay defers, no response emitted, event mutated with markers.""" + resource = CustomResource() + + @resource.create + def on_create(_event: dict[str, Any], _ctx: Any) -> dict[str, Any]: + return {"InitialData": "x"} + + @resource.poll_create + def on_poll(_event: dict[str, Any], _ctx: Any) -> None: + return None # would continue polling + + event = events["Create"] + replay = resource.replay(event, mock_context) + + assert replay.status == "DEFERRED" + assert replay.payload == {} + assert replay.physical_resource_id is None + # Event has been mutated so a follow-up replay routes to the poll handler. + assert event[EVENT_MARKER_POLL] is True + assert event[EVENT_MARKER_RULE].startswith("arn:aws:events:") + assert isinstance(event[EVENT_MARKER_PERMISSION], str) + + +def test_replay_resumes_into_poll_handler_with_mutated_event( + events: dict[str, dict[str, Any]], + mock_context: Mock, +) -> None: + """Second replay (with markers from the first) routes to the poll handler.""" + resource = CustomResource() + + poll_calls: list[bool] = [] + + @resource.create + def on_create(_event: dict[str, Any], _ctx: Any) -> None: + return None + + @resource.poll_create + def on_poll(_event: dict[str, Any], _ctx: Any) -> dict[str, Any]: + poll_calls.append(True) + return {"Endpoint": "https://done.example"} + + event = events["Create"] + + # Step 1: defer. + deferred = resource.replay(event, mock_context) + assert deferred.status == "DEFERRED" + assert poll_calls == [] # poll handler NOT called yet + + # Step 2: resume — same event, now with marker keys. + final = resource.replay(event, mock_context) + assert final.status == "SUCCESS" + assert final.data == {"Endpoint": "https://done.example"} + assert poll_calls == [True] + + +def test_replay_resumes_with_poll_handler_failure( + events: dict[str, dict[str, Any]], + mock_context: Mock, +) -> None: + """Poll handler raising on the resume step produces FAILED.""" + resource = CustomResource() + + @resource.create + def on_create(_event: dict[str, Any], _ctx: Any) -> None: + return None + + @resource.poll_create + def on_poll(_event: dict[str, Any], _ctx: Any) -> None: + msg = "polling step failed" + raise RuntimeError(msg) + + event = events["Create"] + resource.replay(event, mock_context) + final = resource.replay(event, mock_context) + + assert final.status == "FAILED" + assert "polling step failed" in final.reason + + +def test_replay_does_not_mutate_event_when_no_poll_handler( + events: dict[str, dict[str, Any]], + mock_context: Mock, +) -> None: + """A non-polled handler doesn't touch the event.""" + resource = CustomResource() + + @resource.create + def on_create(_event: dict[str, Any], _ctx: Any) -> dict[str, Any]: + return {"x": 1} + + event = events["Create"] + replay = resource.replay(event, mock_context) + + assert replay.status == "SUCCESS" + assert EVENT_MARKER_POLL not in event + assert EVENT_MARKER_RULE not in event + assert EVENT_MARKER_PERMISSION not in event