Skip to content

Commit a7a5991

Browse files
committed
Handle triggerer cross-loop connection fallback (#64213)
1 parent e8e22bd commit a7a5991

File tree

2 files changed

+21
-7
lines changed

2 files changed

+21
-7
lines changed

task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@
2626
from airflow.sdk import Connection
2727

2828

29+
_GREENBACK_RUNTIME_ERROR_MESSAGES = (
30+
"You cannot use AsyncToSync in the same thread as an async event loop",
31+
"got Future <Future pending> attached to a different loop",
32+
)
33+
34+
2935
class ExecutionAPISecretsBackend(BaseSecretsBackend):
3036
"""
3137
Secrets backend for client contexts (workers, DAG processors, triggerers).
@@ -69,7 +75,7 @@ def get_connection(self, conn_id: str, team_name: str | None = None) -> Connecti
6975
# TriggerCommsDecoder.send() uses async_to_sync internally, which raises RuntimeError
7076
# when called within an async event loop. In greenback portal contexts (triggerer),
7177
# we catch this and use greenback to call the async version instead.
72-
if str(e).startswith("You cannot use AsyncToSync in the same thread as an async event loop"):
78+
if any(message in str(e) for message in _GREENBACK_RUNTIME_ERROR_MESSAGES):
7379
import asyncio
7480

7581
import greenback

task-sdk/tests/task_sdk/execution_time/test_secrets.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,12 +121,22 @@ def test_get_conn_value_not_implemented(self):
121121
with pytest.raises(NotImplementedError, match="Use get_connection instead"):
122122
backend.get_conn_value("test_conn")
123123

124-
def test_runtime_error_triggers_greenback_fallback(self, mocker, mock_supervisor_comms):
124+
@pytest.mark.parametrize(
125+
"runtime_error_message",
126+
[
127+
"You cannot use AsyncToSync in the same thread as an async event loop",
128+
"Task <Task pending name='Task-3'> got Future <Future pending> attached to a different loop",
129+
],
130+
)
131+
def test_runtime_error_triggers_greenback_fallback(
132+
self, mocker, mock_supervisor_comms, runtime_error_message
133+
):
125134
"""
126135
Test that RuntimeError from async_to_sync triggers greenback fallback.
127136
128-
This test verifies the fix for issue #57145: when SUPERVISOR_COMMS.send()
129-
raises the specific RuntimeError about async_to_sync in an event loop,
137+
This test verifies the fix for issue #57145 and regression coverage for #64213:
138+
when SUPERVISOR_COMMS.send() raises the specific RuntimeError about async_to_sync
139+
in an event loop, or the cross-loop Future error seen in triggerer worker threads,
130140
the backend catches it and uses greenback to call aget_connection().
131141
"""
132142

@@ -138,9 +148,7 @@ def test_runtime_error_triggers_greenback_fallback(self, mocker, mock_supervisor
138148
)
139149

140150
# Simulate the RuntimeError that triggers greenback fallback
141-
mock_supervisor_comms.send.side_effect = RuntimeError(
142-
"You cannot use AsyncToSync in the same thread as an async event loop"
143-
)
151+
mock_supervisor_comms.send.side_effect = RuntimeError(runtime_error_message)
144152

145153
# Mock the greenback and asyncio modules that are imported inside the exception handler
146154
mocker.patch("greenback.has_portal", return_value=True)

0 commit comments

Comments
 (0)