Conversation
Replace psycopg2-binary with psycopg[binary]>=3.3.3. psycopg3 uses autobegin, meaning any SELECT starts an implicit transaction that stays open until an explicit commit/rollback. This PR fixes the resulting idle-in-transaction connections by managing transaction boundaries explicitly throughout the workflow execution path. Key changes: database.py: - Add pool_recycle=3600 and a checkin event listener that rolls back any leftover transaction when a connection returns to the pool. - Make transactional() reentrant: nested calls yield without commit/rollback; the outermost call owns the transaction. - Disable expire_on_commit during commit so ORM objects retain their in-memory values and avoid lazy-load queries that would start new unmanaged transactions. services/processes.py: - JSON-roundtrip current_step.state in _db_log_step before flush. This serializes live SubscriptionModel instances exactly once, preventing double model_dump on the outer flush and ensuring the next step receives a clean plain-dict state. workflow.py / threadpool.py / tasks.py: - Wrap engine_status check, post-step logging, retrieve_input_state, and Celery task entry points in transactional() so every DB query on the Celery worker's empty-scope session runs inside a managed transaction. app.py: - Emit DeprecationWarning when DATABASE_URI uses the bare 'postgresql://' scheme (defaults to psycopg2 driver). - Add OrchestratorCore.register_table() to copy extra column properties from a custom table subclass onto the base mapper. Update all DATABASE_URI references in CI, docs, README, settings, and test fixtures to use 'postgresql+psycopg://'.
Signed-off-by: Erik Stoel <estoel81@hotmail.com>
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1549 +/- ##
==========================================
+ Coverage 89.84% 90.19% +0.34%
==========================================
Files 270 270
Lines 13707 13743 +36
Branches 1342 1346 +4
==========================================
+ Hits 12315 12395 +80
+ Misses 1116 1071 -45
- Partials 276 277 +1 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
- Fix Mapper[Any] assignment in register_table() via cast() - Fix tasks.py: kombu attr-defined, return type, remove stale type: ignore comments - Add test_pool_events.py: pool checkin rollback registration and handler - Add test_database_uri_validation.py: DeprecationWarning for postgresql:// URI - Add test_dbutils.py: handle_missing_tables() with psycopg3 error types
…ion and task execution
Tests cover the psycopg3-critical transactional() wrapping in start_process and resume_process, plus get_celery_task, register_custom_serializer, and initialise_celery task route/registration behaviour.
pboers1988
left a comment
There was a problem hiding this comment.
@estoel81 Also need some 5.0.0 upgrade docs.
The pool checkin rollback handler is no longer needed with the removal of pool_recycle, and the DATABASE_URI validation tests are obsolete. This cleanup eliminates the _register_pool_events() function and its associated test coverage.
# Conflicts: # pyproject.toml # uv.lock
|
|
||
| class MySubscriptionTable(SubscriptionTable): | ||
| customer_name: Mapped[str] = column_property( | ||
| select(CustomerTable.fullname) |
There was a problem hiding this comment.
maybe add a table example above this, the only reference of CustomerTable is somewhere in the docs about adding it, could also link to that place
| # Serialize state to a plain dict now, before the outer transactional() flushes it. | ||
| # This ensures the next step never receives live SubscriptionModel instances and | ||
| # avoids re-evaluating expensive @computed_field properties on flush. | ||
| current_step.state = json_loads(json_dumps(current_step.state)) |
There was a problem hiding this comment.
maybe override the pydanticmodel.model_dump, so its in done in the model
| ensure_correct_process_status(process_id, ProcessStatus.CREATED) | ||
| # Celery workers use the empty-scope session (no database_scope); wrap in | ||
| # transactional() to prevent psycopg3 autobegin leaving the connection idle-in-transaction. | ||
| with transactional(db, local_logger): |
There was a problem hiding this comment.
could move the transactional inside the _get_process, load_process and ensure_correct_process_status. so any re-uses in other places don't need to be wrapped inside a transactional.
|
|
||
| self.include_router(api_router, prefix="/api") | ||
|
|
||
| # Validate DATABASE_URI dialect before initializing the database. |
There was a problem hiding this comment.
I don't believe this comment is necessary
| "Not executing Step as the workflow engine is Paused. Process will remain in state 'running'" | ||
| ) | ||
| return process | ||
| with transactional(db, logger): |
There was a problem hiding this comment.
move the transactional inside the get_engine_settings_table?
| # Wrap logging in transactional() so SELECTs triggered by mutationlogger and | ||
| # dblogstep run inside a managed transaction (psycopg3 autobegin prevention). | ||
| with transactional(db, logger): | ||
| result_to_log.on_success(mutationlogger).on_failed(errorlogger).on_waiting(errorlogger) |
There was a problem hiding this comment.
I don't think mutationlogger triggers any selects and move the transactional inside the dblogstep
| log = MagicMock() | ||
|
|
||
| with transactional(db, log): | ||
| pass |
There was a problem hiding this comment.
wouldn't you need a db insert or db error before you can check that a commit or rollback isn't called.
|
|
||
| def test_handle_missing_tables_suppresses_undefined_table_error() -> None: | ||
| """ProgrammingError wrapping UndefinedTable is caught and not re-raised.""" | ||
| orig = psycopg_errors.UndefinedTable.__new__(psycopg_errors.UndefinedTable) |
There was a problem hiding this comment.
this looks weird, cant you directly make the class?
| orig = psycopg_errors.UndefinedTable.__new__(psycopg_errors.UndefinedTable) | |
| orig = psycopg_errors.UndefinedTable() |
| original_description = base_mapper.column_attrs["description"] | ||
|
|
||
| class CustomSubscriptionTable(SubscriptionTable): | ||
| pass |
There was a problem hiding this comment.
you aren't overwriting the description here?
|
|
||
|
|
||
| @pytest.mark.usefixtures("_cleanup_extra_field") | ||
| def test_register_table_columns_visible_in_inspect(): |
There was a problem hiding this comment.
this seems redundant since test_register_table_copies_column_properties already checks if the extra_fields is inside mapper.column_attrs
| # Build a minimal in-memory workflow function. The workflow is never executed | ||
| # because we mock _run_process_async; we only need a non-removed_workflow value | ||
| # for the early branch in thread_start_process and for ProcessStat construction. | ||
| wf = make_workflow(_workflow_test_fn, "wf description", None, Target.SYSTEM, StepList()) |
There was a problem hiding this comment.
can't you use the simple_workflow?
|
|
||
| # Mock _run_process_async to skip actually running the workflow thread; we are | ||
| # only testing the pre-runwf section of thread_start_process. | ||
| with mock.patch("orchestrator.services.executors.threadpool._run_process_async"): |
There was a problem hiding this comment.
could add the patch as decorator on the test function
|
|
||
|
|
||
| def test_register_custom_serializer_registers_orchestrator_json(): | ||
| with patch("orchestrator.services.tasks.registry") as mock_registry: |
There was a problem hiding this comment.
use patch as decorator, also update this in the other tests
| } | ||
|
|
||
|
|
||
| def test_initialise_celery_registers_four_named_tasks(): |
There was a problem hiding this comment.
redundant, you already check it in the previous test
| def test_start_process_returns_none_on_exception(celery_start_fn, failing_fn): | ||
| process_id = uuid4() | ||
|
|
||
| patches = { |
There was a problem hiding this comment.
probably better to use variables instead of a dict or change the keys to just the function name.
|
|
||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # CeleryJobWorkerStatus (existing fixture kept below) |
There was a problem hiding this comment.
the (existing fixture kept below) is useless in the comment, could have more context about the tests below instead.
Signed-off-by: Peter Boers <peter.boers@surf.nl>
uv Lockfile ReportAdded
Removed
|
Summary
psycopg2-binarywithpsycopg[binary]>=3.3.2and update allpostgresql://URIs topostgresql+psycopg://across CI, docs, settings, and test fixturescheckinrollback,pool_recycle, and wrap all bare DB reads on Celery workers intransactional()transactional()reentrant so nested calls (e.g. a step wrapping a Celery task entry point) don't roll back the outer transaction_db_log_stepbefore the outer flush, preventing re-evaluation of expensive@computed_fieldproperties and ensuring the next step never receives liveSubscriptionModelinstancesOrchestratorCore.register_table()to copy extracolumn_propertyattributes from a custom table subclass onto the base mapperDeprecationWarningwhenDATABASE_URIuses the barepostgresql://schemeTest Plan
uv run pytest test/unit_tests/db/test_database.py— transactional reentrancyuv run pytest test/unit_tests/services/test_processes.py— state serialization + idle-in-transactionuv run pytest test/unit_tests/test_subscription_table_registry.py— register_tableuv run pytest test/unit_tests/