Skip to content
4 changes: 4 additions & 0 deletions changelog/8148-engine-creator-pattern.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
type: Changed
description: Refactored database engines to use SQLAlchemy creator pattern for per-connection credential resolution
pr: 8148
labels: []
2 changes: 1 addition & 1 deletion design-docs/dynamic-database-credentials.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ This depends on a SQLAlchemy internal (`greenlet_spawn`), which is acceptable be
- The planned SQLAlchemy 2.0 upgrade will replace this with the public `async_creator` API.
- The code should include a clear TODO and comments explaining this constraint.

The module-level engines in `ctl_session.py` need to be refactored into lazy factories (similar to how `session_management.py` already works) so the `creator` can be injected at construction time.
The module-level engines in `ctl_session.py` remain as module-level singletons. The `creator` closure captures a provider reference, not credentials themselves — credentials are resolved inside the closure body on every call. This means the engine can be constructed at any time (including module import) and credential rotation still works correctly.

### 4. Automatic Retry on Auth Failure

Expand Down
32 changes: 8 additions & 24 deletions src/fides/api/db/ctl_session.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import ssl
from asyncio import Lock, gather
from contextlib import _AsyncGeneratorContextManager, asynccontextmanager
from typing import Any, AsyncGenerator, Callable, Dict
Expand All @@ -11,24 +10,17 @@

from fides.api.db.session import ExtendedSession
from fides.api.db.util import custom_json_deserializer, custom_json_serializer
from fides.common.engine_creators import make_async_creator, make_sync_creator
from fides.config import CONFIG

# asyncio lock and flag for warming up the async pool
ASYNC_READONLY_POOL_LOCK = Lock()
ASYNC_READONLY_POOL_WARMED = False

# Associated with a workaround in fides.core.config.database_settings
# ref: https://github.com/sqlalchemy/sqlalchemy/discussions/5975
connect_args: Dict[str, Any] = {}
if CONFIG.database.params.get("sslrootcert"):
ssl_ctx = ssl.create_default_context(cafile=CONFIG.database.params["sslrootcert"])
ssl_ctx.verify_mode = ssl.CERT_REQUIRED
connect_args["ssl"] = ssl_ctx

# Parameters are hidden for security
# Primary async engine — credentials resolved per-connection via creator
async_engine = create_async_engine(
CONFIG.database.async_database_uri,
connect_args=connect_args,
"postgresql+asyncpg://",
creator=make_async_creator(),
echo=False,
hide_parameters=not CONFIG.dev_mode,
logging_name="AsyncEngine",
Expand All @@ -49,21 +41,12 @@

if CONFIG.database.async_readonly_database_uri:
logger.info("Creating read-only async engine and session factory")
# Build connect_args for readonly (similar to primary)
readonly_connect_args: Dict[str, Any] = {}
readonly_params = CONFIG.database.readonly_params or {}

if readonly_params.get("sslrootcert"):
ssl_ctx = ssl.create_default_context(cafile=readonly_params["sslrootcert"])
ssl_ctx.verify_mode = ssl.CERT_REQUIRED
readonly_connect_args["ssl"] = ssl_ctx

logger.info(
f"Read-only async settings: max-overflow: {CONFIG.database.api_async_engine_max_overflow}, pool-size: {CONFIG.database.async_readonly_database_pool_size}, pre-warm = {CONFIG.database.async_readonly_database_prewarm}, autocommit = {CONFIG.database.async_readonly_database_autocommit}, skip rollback = {CONFIG.database.async_readonly_database_pool_skip_rollback}"
)
readonly_async_engine = create_async_engine(
CONFIG.database.async_readonly_database_uri,
connect_args=readonly_connect_args,
"postgresql+asyncpg://",
creator=make_async_creator(readonly=True),
echo=False,
hide_parameters=not CONFIG.dev_mode,
logging_name="ReadOnlyAsyncEngine",
Expand Down Expand Up @@ -92,7 +75,8 @@
# and they do not respect engine settings like pool_size, max_overflow, etc.
# these should be removed, and we should standardize on what's provided in `session.py`
sync_engine = create_engine(
CONFIG.database.sync_database_uri,
"postgresql+psycopg2://",
creator=make_sync_creator(),
echo=False,
hide_parameters=not CONFIG.dev_mode,
logging_name="SyncEngine",
Expand Down
70 changes: 47 additions & 23 deletions src/fides/api/db/session.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import Any, Dict
from typing import Any, Callable, Dict

from loguru import logger
from sqlalchemy import create_engine
Expand All @@ -18,6 +18,7 @@ def get_db_engine(
*,
config: FidesConfig | None = None,
database_uri: str | URL | None = None,
creator: Callable[[], Any] | None = None,
pool_size: int = 50,
max_overflow: int = 50,
keepalives_idle: int | None = None,
Expand All @@ -28,36 +29,59 @@ def get_db_engine(
) -> Engine:
"""Return a database engine.

When *creator* is provided, it is called by the pool to open each new
connection — credentials and connect_args are handled inside the creator.
A dialect-only URL is used for engine construction.

When *database_uri* or *config* is provided, the engine uses a fixed
connection URI (existing behavior).

If the TESTING environment var is set the database engine returned will be
connected to the test DB.
"""
if not config and not database_uri:
raise ValueError("Either a config or database_uri is required")

if not database_uri and config:
# Don't override any database_uri explicitly passed in
if config.test_mode:
database_uri = config.database.sqlalchemy_test_database_uri
else:
database_uri = config.database.sqlalchemy_database_uri

engine_args: Dict[str, Any] = {
"json_serializer": custom_json_serializer,
"json_deserializer": custom_json_deserializer,
}

# keepalives settings
connect_args = {}
if keepalives_idle:
connect_args["keepalives_idle"] = keepalives_idle
if keepalives_interval:
connect_args["keepalives_interval"] = keepalives_interval
if keepalives_count:
connect_args["keepalives_count"] = keepalives_count

if connect_args:
connect_args["keepalives"] = 1
engine_args["connect_args"] = connect_args
if creator:
# Creator handles credentials and connect_args internally,
# so creator needs to set keepalives settings.
if database_uri or config:
raise ValueError(
"database_uri/config cannot be used with creator — "
"the creator handles connection construction"
)
if keepalives_idle or keepalives_interval or keepalives_count:
raise ValueError(
"keepalives_idle/interval/count cannot be used with creator — "
"pass them as connect_args to the creator instead"
)
engine_args["creator"] = creator
database_uri = "postgresql+psycopg2://"
else:
# URI-based path.
if not config and not database_uri:
raise ValueError("Either a config, database_uri, or creator is required")

if not database_uri and config:
if config.test_mode:
database_uri = config.database.sqlalchemy_test_database_uri
else:
database_uri = config.database.sqlalchemy_database_uri

# keepalives settings (only for URI path; creator handles its own)
connect_args = {}
if keepalives_idle:
connect_args["keepalives_idle"] = keepalives_idle
if keepalives_interval:
connect_args["keepalives_interval"] = keepalives_interval
if keepalives_count:
connect_args["keepalives_count"] = keepalives_count

if connect_args:
connect_args["keepalives"] = 1
engine_args["connect_args"] = connect_args

if disable_pooling:
engine_args["poolclass"] = NullPool
Expand Down
13 changes: 9 additions & 4 deletions src/fides/api/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from fides.api.request_context import get_request_id, set_request_id
from fides.api.tasks import celery_healthcheck
from fides.api.util.logger import setup as setup_logging
from fides.common.engine_creators import make_sync_creator
from fides.config import CONFIG, FidesConfig

MESSAGING_QUEUE_NAME = "fidesops.messaging"
Expand Down Expand Up @@ -77,12 +78,16 @@ def get_new_session(self) -> ContextManager[Session]:
# once per celery process.
if self._task_engine is None:
self._task_engine = get_db_engine(
config=CONFIG,
creator=make_sync_creator(
connect_args={
"keepalives": 1,
"keepalives_idle": CONFIG.database.task_engine_keepalives_idle,
"keepalives_interval": CONFIG.database.task_engine_keepalives_interval,
"keepalives_count": CONFIG.database.task_engine_keepalives_count,
},
),
pool_size=CONFIG.database.task_engine_pool_size,
max_overflow=CONFIG.database.task_engine_max_overflow,
keepalives_idle=CONFIG.database.task_engine_keepalives_idle,
keepalives_interval=CONFIG.database.task_engine_keepalives_interval,
keepalives_count=CONFIG.database.task_engine_keepalives_count,
pool_pre_ping=CONFIG.database.task_engine_pool_pre_ping,
)

Expand Down
190 changes: 190 additions & 0 deletions src/fides/common/engine_creators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
"""
SQLAlchemy engine ``creator`` callables for dynamic credential resolution.

The ``creator`` pattern passes a callable to ``create_engine`` /
``create_async_engine`` instead of a connection URI. SQLAlchemy calls the
creator every time the pool needs a new connection, so credentials are
resolved at **connection time** rather than engine construction time.

Today the credential helpers read from static config (``CONFIG.database``).
A future secret-provider integration will swap them to call
``provider.get_secret()`` — the rest of the engine code stays the same.
Comment on lines +2 to +11
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice documentation here.

nit: maybe worth mentioning that they must stay lightweight? (or at least i assume so 😄)


Because creators run on every new pool connection, they must stay
lightweight — avoid expensive I/O, network calls, or heavy computation.
Credential lookups should return cached values in the common case.
"""

from __future__ import annotations

import ssl
from copy import deepcopy
from typing import Any, Callable, Dict, Optional

import asyncpg # type: ignore[import-untyped]
import psycopg2 # type: ignore[import-untyped]
from sqlalchemy.dialects.postgresql.asyncpg import (
AsyncAdapt_asyncpg_connection,
AsyncAdapt_asyncpg_dbapi,
)
from sqlalchemy.util.concurrency import await_only # type: ignore[import-untyped]

from fides.config import CONFIG

# Shared dbapi instance for async creators — reused across connections.
_asyncpg_dbapi = AsyncAdapt_asyncpg_dbapi(asyncpg)


# ---------------------------------------------------------------------------
# Credential helpers
# ---------------------------------------------------------------------------


def get_db_credentials() -> Dict[str, Any]:
"""Return DB credentials from static config."""
db_settings = CONFIG.database
dbname = db_settings.test_db if CONFIG.test_mode else db_settings.db
return {
"host": db_settings.server,
"port": int(db_settings.port),
"user": db_settings.user,
"password": db_settings.raw_password,
"dbname": dbname,
}


def get_readonly_db_credentials() -> Optional[Dict[str, Any]]:
"""Return readonly DB credentials, or ``None`` if not configured.

Falls back to primary fields where readonly-specific values are absent.
"""
db_settings = CONFIG.database
if not db_settings.readonly_server:
return None
return {
"host": db_settings.readonly_server,
"port": int(db_settings.readonly_port or db_settings.port),
"user": db_settings.readonly_user or db_settings.user,
"password": db_settings.raw_readonly_password or db_settings.raw_password,
"dbname": db_settings.readonly_db or db_settings.db,
}
Comment thread
erosselli marked this conversation as resolved.


# ---------------------------------------------------------------------------
# Sync creators (psycopg2)
# ---------------------------------------------------------------------------


def make_sync_creator(
connect_args: Optional[Dict[str, Any]] = None,
readonly: bool = False,
) -> Callable[[], Any]:
"""Return a creator callable for psycopg2 engines.

The factory captures per-engine config (keepalives, SSL) in the closure.
Credentials are resolved from CONFIG on every call — the seam for future
dynamic credential rotation.

When using ``creator``, SQLAlchemy ignores ``connect_args`` passed to
``create_engine``, so all connection parameters must be baked in here.
"""

def creator() -> Any:
if readonly:
kw = get_readonly_db_credentials() or get_db_credentials()
else:
kw = get_db_credentials()
if connect_args:
kw.update(connect_args)
return psycopg2.connect(**kw)

return creator


# ---------------------------------------------------------------------------
# Async creators (asyncpg)
# ---------------------------------------------------------------------------


def make_async_creator(
readonly: bool = False,
) -> Callable[[], Any]:
"""Return a creator callable for asyncpg engines (SA 1.4.27).

The factory builds the SSL context and asyncpg-compatible params from
CONFIG, capturing them in the closure. Credentials are resolved from
CONFIG on every call.

The creator replaces ``dialect.connect()`` in SQLAlchemy's pool. For
async engines the pool runs inside a greenlet bridge, so ``await_only``
is valid. Must return ``AsyncAdapt_asyncpg_connection`` (SA's sync
wrapper) since the pool operates in sync mode through greenlets.

TODO: Replace with ``async_creator`` API after SQLAlchemy 2.0 upgrade.
"""
db_params = (
(CONFIG.database.readonly_params or CONFIG.database.params)
if readonly
else CONFIG.database.params
)
ssl_context = _build_ssl_context(db_params)
async_params = _convert_asyncpg_params(db_params)

# When we have a full SSLContext (from sslrootcert), it takes priority
# over the raw ssl string (from sslmode). Otherwise kw.update(async_params)
# would overwrite the SSLContext with e.g. "require", losing cert verification.
if ssl_context:
async_params.pop("ssl", None)

def creator() -> Any:
if readonly:
creds = get_readonly_db_credentials() or get_db_credentials()
else:
creds = get_db_credentials()
kw: Dict[str, Any] = {
"host": creds["host"],
"port": creds["port"],
"user": creds["user"],
"password": creds["password"],
"database": creds["dbname"],
}
if ssl_context:
kw["ssl"] = ssl_context
if async_params:
kw.update(async_params)
raw_conn = await_only(asyncpg.connect(**kw))
return AsyncAdapt_asyncpg_connection(_asyncpg_dbapi, raw_conn)

return creator


# ---------------------------------------------------------------------------
# Private helpers
# ---------------------------------------------------------------------------


def _build_ssl_context(params: Dict[str, Any]) -> Optional[ssl.SSLContext]:
"""Build an ``ssl.SSLContext`` from DB params if ``sslrootcert`` is set."""
sslrootcert = params.get("sslrootcert")
if not sslrootcert:
return None
ctx = ssl.create_default_context(cafile=sslrootcert)
ctx.verify_mode = ssl.CERT_REQUIRED
return ctx


def _convert_asyncpg_params(params: Dict[str, Any]) -> Dict[str, Any]:
"""Convert DB params dict for asyncpg compatibility.

asyncpg uses ``ssl`` instead of ``sslmode`` and does not accept
``sslrootcert`` as a connection parameter (it's handled via
``ssl.SSLContext`` passed separately).

See: https://github.com/MagicStack/asyncpg/issues/737
ref: https://github.com/sqlalchemy/sqlalchemy/discussions/5975
"""
converted = deepcopy(params)
if "sslmode" in converted:
converted["ssl"] = converted.pop("sslmode")
converted.pop("sslrootcert", None)
return converted
Loading
Loading