Skip to content

Add sasl_oauth_token_provider_class for native OAUTHBEARER support#1226

Open
dylanbstorey wants to merge 5 commits intoAiven-Open:mainfrom
dylanbstorey:msk-iam-oauth-support
Open

Add sasl_oauth_token_provider_class for native OAUTHBEARER support#1226
dylanbstorey wants to merge 5 commits intoAiven-Open:mainfrom
dylanbstorey:msk-iam-oauth-support

Conversation

@dylanbstorey
Copy link
Copy Markdown

@dylanbstorey dylanbstorey commented Mar 7, 2026

Summary

  • Adds sasl_oauth_token_provider_class config option that accepts a Python import path (via pydantic ImportString) to a class implementing the TokenWithExpiryProvider protocol
  • When configured, an instance is created and passed as sasl_oauth_token_provider to all Kafka client factories — admin, consumer, and producer — including the schema reader's internal clients
  • Enables native OAUTHBEARER auth (e.g. AWS MSK IAM) without monkey-patching

Problem

kafka_utils.py and schema_reader.py create Kafka clients but never pass sasl_oauth_token_provider, even though _KafkaConfigMixin already supports it via KafkaClientParamsoauth_cb. Anyone deploying Karapace against AWS MSK with IAM auth must monkey-patch the client factories or build a custom wrapper.

Solution

New config option configurable via env var:

KARAPACE_SECURITY_PROTOCOL=SASL_SSL
KARAPACE_SASL_MECHANISM=OAUTHBEARER
KARAPACE_SASL_OAUTH_TOKEN_PROVIDER_CLASS=examples.msk_iam_token_provider:MSKIAMTokenProvider

The referenced class must implement:

def token_with_expiry(self, config: str | None) -> tuple[str, int | None]

Changes

  • src/karapace/core/config.py: Add sasl_oauth_token_provider_class: ImportString | None field
  • src/karapace/core/kafka_utils.py: Instantiate provider and pass to all 3 factory functions
  • src/karapace/core/schema_reader.py: Same for the 2 internal factory functions
  • examples/msk_iam_token_provider.py: Example AWS MSK IAM token provider using aws-msk-iam-sasl-signer-python
  • tests/unit/test_oauth_token_provider.py: 14 unit tests covering config, all 5 factories (with/without provider)

Test plan

  • 14 unit tests pass locally (tests/unit/test_oauth_token_provider.py)
  • Existing tests unaffected (no behavior change when option is unset)

@dylanbstorey dylanbstorey requested a review from a team as a code owner March 7, 2026 03:20
@muralibasani
Copy link
Copy Markdown
Contributor

@dylanbstorey can you pls rebase?

@dylanbstorey dylanbstorey force-pushed the msk-iam-oauth-support branch from d7d5268 to 3639f20 Compare March 9, 2026 14:11
@dylanbstorey
Copy link
Copy Markdown
Author

done

@muralibasani
Copy link
Copy Markdown
Contributor

@dylanbstorey there seems to be some lint errors

@dylanbstorey
Copy link
Copy Markdown
Author

done

@dylanbstorey
Copy link
Copy Markdown
Author

dylanbstorey commented Mar 10, 2026

Hey — just a note on the 3.14 test failure: we've seen two different tests fail across two CI runs (test_coordinator_workflow and test_schema_registry_oidc), both only on 3.14. Tests on 3.12 and 3.13 have been passing cleanly.

Our changes only add a new config field (sasl_oauth_token_provider_class) that defaults to None and is only activated when explicitly set, so they shouldn't affect any existing code paths. Given the different failures each run and that they're both e2e/timing-sensitive tests, we suspect these may be flaky on 3.14 rather than related to our changes.

They’re also passing locally from the makefile tests-in-docker pattern you provide.

Happy to re-trigger CI or investigate further if needed!

@dylanbstorey
Copy link
Copy Markdown
Author

@muralibasani
Ok got the OIDC test passing by putting a health check in to wait for the registry to be available. Let’s see if that did it.

@dylanbstorey dylanbstorey force-pushed the msk-iam-oauth-support branch from f841f2f to bbb508c Compare March 12, 2026 11:29
@dylanbstorey
Copy link
Copy Markdown
Author

dylanbstorey commented Mar 12, 2026

Rebased on main (includes merged #1225). All tests passing locally:

  • Unit tests: 631 passed
  • E2E tests: 61 passed (fixed the flaky OIDC test — was a primary election race condition)
  • Mypy: no issues found in 137 source files
  • test_registering_normalized_schema: passed locally. I can’t reproduce the failure after a few tries.

@dylanbstorey dylanbstorey force-pushed the msk-iam-oauth-support branch from c532ef3 to c699abc Compare March 13, 2026 01:58
@dylanbstorey
Copy link
Copy Markdown
Author

@muralibasani , added some more retries and polling helpers to help with flakiness.

Dylan Bobby Storey added 4 commits March 30, 2026 11:52
Adds a new config option `sasl_oauth_token_provider_class` that accepts
a Python import path to a class implementing the TokenWithExpiryProvider
protocol (i.e. a `token_with_expiry` method). When configured, an
instance is created and passed as `sasl_oauth_token_provider` to all
Kafka client factories (admin, consumer, producer), including the
schema reader's internal clients.

This enables native OAUTHBEARER auth (e.g. AWS MSK IAM) without
monkey-patching, configurable via env var:
  KARAPACE_SASL_OAUTH_TOKEN_PROVIDER_CLASS=mymodule:MyProvider

Includes an example MSK IAM token provider implementation in
examples/msk_iam_token_provider.py.
Documents how to configure sasl_oauth_token_provider_class, including
how to load a provider class that lives outside the Karapace repo
(pip install, PYTHONPATH, Docker mount).
Add missing copyright header to examples/msk_iam_token_provider.py
and add return type annotations to _get_oauth_token_provider in both
kafka_utils.py and schema_reader.py.
The test_schema_registry_oidc test was intermittently failing because it
attempted write requests before the schema registry had elected a primary.
This caused the node to forward the request to itself in a loop until the
60s timeout, resulting in a 500 Internal Server Error with text/plain response.
@dylanbstorey dylanbstorey force-pushed the msk-iam-oauth-support branch from 885ec91 to bb52ed9 Compare March 30, 2026 15:53
@dylanbstorey
Copy link
Copy Markdown
Author

@muralibasani - looks like the recent merges to master have stabilized the tests and this is passing now. ready for a review for real this time.

Thanks !

def _get_oauth_token_provider(config: Config) -> object | None:
"""Instantiate the configured OAuth token provider, if any."""
if config.sasl_oauth_token_provider_class is not None:
return config.sasl_oauth_token_provider_class()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we check for token_with_expiry ?

Copy link
Copy Markdown
Contributor

@muralibasani muralibasani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dylanbstorey thanks for the pr. Have a few comments left.

- Deduplicate _get_oauth_token_provider: single get_oauth_token_provider()
  in kafka_utils.py, imported by schema_reader.py
- Cache provider instance as singleton to avoid re-instantiation per client
- Validate that the provider implements token_with_expiry() on first use
- Fix copyright years to 2026 in examples and tests
- Add tests for validation error and caching behavior
@dylanbstorey
Copy link
Copy Markdown
Author

@muralibasani , updated

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants