Skip to content

Commit 01f5f66

Browse files
authored
set object_name, payload, external_id_field as templated fields in SalesforceBulkOperator (#63109)
Add template_fields (object_name, payload, external_id_field) to SalesforceBulkOperator so Jinja templates can be used for these parameters at runtime. Validation remains in __init__ as operation is not templated. Closes: #62375
1 parent 7dca298 commit 01f5f66

File tree

2 files changed

+38
-1
lines changed
  • providers/salesforce

2 files changed

+38
-1
lines changed

providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# under the License.
1717
from __future__ import annotations
1818

19-
from collections.abc import Iterable
19+
from collections.abc import Iterable, Sequence
2020
from typing import TYPE_CHECKING, cast
2121

2222
from airflow.providers.common.compat.sdk import BaseOperator
@@ -48,6 +48,8 @@ class SalesforceBulkOperator(BaseOperator):
4848
:param salesforce_conn_id: The :ref:`Salesforce Connection id <howto/connection:salesforce>`.
4949
"""
5050

51+
template_fields: Sequence[str] = ("object_name", "payload", "external_id_field")
52+
5153
available_operations = ("insert", "update", "upsert", "delete", "hard_delete")
5254

5355
def __init__(

providers/salesforce/tests/unit/salesforce/operators/test_bulk.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,41 @@ class TestSalesforceBulkOperator:
2929
Test class for SalesforceBulkOperator
3030
"""
3131

32+
def test_template_fields(self):
33+
"""
34+
Test that template_fields are correctly defined and renderable.
35+
"""
36+
operator = SalesforceBulkOperator(
37+
task_id="test_template_fields",
38+
operation="insert",
39+
object_name="Account",
40+
payload=[],
41+
)
42+
assert operator.template_fields == ("object_name", "payload", "external_id_field")
43+
44+
@pytest.mark.db_test
45+
def test_template_rendering(self, create_task_instance_of_operator):
46+
"""
47+
Test that template_fields actually render Jinja templates.
48+
"""
49+
ti = create_task_instance_of_operator(
50+
SalesforceBulkOperator,
51+
operation="upsert",
52+
object_name="{{ params.obj }}",
53+
payload="{{ params.data }}",
54+
external_id_field="{{ params.ext_id }}",
55+
params={
56+
"obj": "Contact",
57+
"data": "[{'Name': 'Test'}]",
58+
"ext_id": "Email",
59+
},
60+
dag_id="test_salesforce_bulk_template",
61+
task_id="test_render",
62+
)
63+
rendered = ti.render_templates()
64+
assert rendered.object_name == "Contact"
65+
assert rendered.external_id_field == "Email"
66+
3267
def test_execute_missing_operation(self):
3368
"""
3469
Test execute missing operation

0 commit comments

Comments
 (0)