Skip to content

Commit 9f2e77d

Browse files
committed
Fix: Handle string formatted conf param in TriggerDagRunOperator
1 parent a92319a commit 9f2e77d

File tree

2 files changed

+39
-2
lines changed

2 files changed

+39
-2
lines changed

providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import json
2222
import time
2323
from collections.abc import Sequence
24+
from json import JSONDecodeError
2425
from typing import TYPE_CHECKING, Any
2526

2627
from sqlalchemy import select
@@ -202,9 +203,11 @@ def execute(self, context: Context):
202203
parsed_logical_date = timezone.parse(self.logical_date)
203204

204205
try:
206+
if self.conf and isinstance(self.conf, str):
207+
self.conf = json.loads(self.conf)
205208
json.dumps(self.conf)
206-
except TypeError:
207-
raise ValueError("conf parameter should be JSON Serializable")
209+
except (TypeError, JSONDecodeError):
210+
raise ValueError("conf parameter should be JSON Serializable %s", self.conf)
208211

209212
if self.trigger_run_id:
210213
run_id = str(self.trigger_run_id)

providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,40 @@ def test_trigger_dag_run_with_fail_when_dag_is_paused_should_fail(self):
267267
fail_when_dag_is_paused=True,
268268
)
269269

270+
@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Implementation is different for Airflow 2 & 3")
271+
def test_trigger_dagrun_with_str_conf(self):
272+
"""
273+
Test TriggerDagRunOperator conf is proper json string formatted
274+
"""
275+
with time_machine.travel("2025-02-18T08:04:46Z", tick=False):
276+
task = TriggerDagRunOperator(
277+
task_id="test_task",
278+
trigger_dag_id=TRIGGERED_DAG_ID,
279+
conf='{"foo": "bar"}',
280+
)
281+
282+
# Ensure correct exception is raised
283+
with pytest.raises(DagRunTriggerException) as exc_info:
284+
task.execute(context={})
285+
286+
assert exc_info.value.trigger_dag_id == TRIGGERED_DAG_ID
287+
assert exc_info.value.conf == {"foo": "bar"}
288+
289+
@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Implementation is different for Airflow 2 & 3")
290+
def test_trigger_dagrun_with_str_conf_error(self):
291+
"""
292+
Test TriggerDagRunOperator conf is not proper json string formatted
293+
"""
294+
with time_machine.travel("2025-02-18T08:04:46Z", tick=False):
295+
task = TriggerDagRunOperator(
296+
task_id="test_task",
297+
trigger_dag_id=TRIGGERED_DAG_ID,
298+
conf="{'foo': 'bar', 'key': 123}",
299+
)
300+
301+
with pytest.raises(ValueError, match="conf parameter should be JSON Serializable"):
302+
task.execute(context={})
303+
270304

271305
# TODO: To be removed once the provider drops support for Airflow 2
272306
@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 2")

0 commit comments

Comments
 (0)