Skip to content

Commit e721a59

Browse files
authored
Merge branch 'main' into shared-serialization
2 parents c57083d + 64f22f8 commit e721a59

File tree

682 files changed

+5021
-3873
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

682 files changed

+5021
-3873
lines changed

.github/CODEOWNERS

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,9 @@ airflow-core/src/airflow/ui/public/i18n/locales/zh-TW/ @Lee-W @jason810496 @guan
9999
/providers/hashicorp/ @hussein-awala
100100
/providers/informatica/ @RNHTTR # + @cetingokhan @sertaykabuk @umutozel
101101
/providers/keycloak/ @vincbeck @bugraoz93
102-
/providers/microsoft/azure/ @dabla
102+
/providers/microsoft/azure/docs/**/msgraph.rst @dabla
103+
/providers/microsoft/azure/src/**/msgraph.py @dabla
104+
/providers/microsoft/azure/tests/**/*msgraph.py @dabla
103105
/providers/openlineage/ @mobuchowski
104106
/providers/smtp/ @hussein-awala
105107
/providers/snowflake/ @potiuk

airflow-core/docs/authoring-and-scheduling/assets.rst

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,3 +401,151 @@ As mentioned in :ref:`Fetching information from previously emitted asset events<
401401
def consume_asset_alias_events(*, inlet_events):
402402
events = inlet_events[AssetAlias("example-alias")]
403403
last_row_count = events[-1].extra["row_count"]
404+
405+
Asset partitions
406+
----------------
407+
408+
.. versionadded:: 3.2.0
409+
410+
Asset events can include a ``partition_key`` to make it _partitioned__. This lets you model
411+
the same asset at partition granularity (for example, ``2026-03-10T09:00:00`` for an
412+
hourly partition).
413+
414+
To produce partitioned events on a schedule, use
415+
``CronPartitionTimetable`` in the producer Dag (or ``@asset``). This timetable
416+
creates asset events with a partition key on each run.
417+
418+
.. code-block:: python
419+
420+
from airflow.sdk import CronPartitionTimetable, asset
421+
422+
423+
@asset(
424+
uri="file://incoming/player-stats/team_b.csv",
425+
schedule=CronPartitionTimetable("15 * * * *", timezone="UTC"),
426+
)
427+
def team_b_player_stats():
428+
pass
429+
430+
Partitioned events are intended for partition-aware downstream scheduling, and
431+
do not trigger non-partition-aware Dags.
432+
433+
For downstream partition-aware scheduling, use ``PartitionedAssetTimetable``:
434+
435+
.. code-block:: python
436+
437+
from airflow.sdk import DAG, HourlyMapper, PartitionedAssetTimetable
438+
439+
with DAG(
440+
dag_id="clean_and_combine_player_stats",
441+
schedule=PartitionedAssetTimetable(
442+
assets=team_a_player_stats & team_b_player_stats & team_c_player_stats,
443+
default_partition_mapper=HourlyMapper(),
444+
),
445+
catchup=False,
446+
):
447+
...
448+
449+
``PartitionedAssetTimetable`` requires partitioned asset events. If an asset
450+
event does not contain a ``partition_key``, it will not trigger a downstream
451+
Dag that uses ``PartitionedAssetTimetable``.
452+
453+
``default_partition_mapper`` is used for every upstream asset unless you
454+
override it via ``partition_mapper_config``. The default mapper is
455+
``IdentityMapper`` (no key transformation).
456+
457+
Partition mappers define how upstream partition keys are transformed to the
458+
downstream Dag partition key:
459+
460+
* ``IdentityMapper`` keeps keys unchanged.
461+
* Temporal mappers such as ``HourlyMapper``, ``DailyMapper``, and
462+
``YearlyMapper`` normalize time keys to a chosen grain. For input key
463+
``2026-03-10T09:37:51``, the default outputs are:
464+
465+
* ``HourlyMapper`` -> ``2026-03-10T09``
466+
* ``DailyMapper`` -> ``2026-03-10``
467+
* ``YearlyMapper`` -> ``2026``
468+
* ``ProductMapper`` maps composite keys segment-by-segment.
469+
It applies one mapper per segment and then rejoins the mapped segments.
470+
For example, with key ``us|2026-03-10T09:00:00``,
471+
``ProductMapper(IdentityMapper(), DailyMapper())`` produces
472+
``us|2026-03-10``.
473+
* ``AllowedKeyMapper`` validates that keys are in a fixed allow-list and
474+
passes the key through unchanged if valid.
475+
For example, ``AllowedKeyMapper(["us", "eu", "apac"])`` accepts only those
476+
region keys and rejects all others.
477+
478+
Example of per-asset mapper configuration and composite-key mapping:
479+
480+
.. code-block:: python
481+
482+
from airflow.sdk import (
483+
Asset,
484+
DailyMapper,
485+
IdentityMapper,
486+
PartitionedAssetTimetable,
487+
ProductMapper,
488+
)
489+
490+
regional_sales = Asset(uri="file://incoming/sales/regional.csv", name="regional_sales")
491+
492+
with DAG(
493+
dag_id="aggregate_regional_sales",
494+
schedule=PartitionedAssetTimetable(
495+
assets=regional_sales,
496+
default_partition_mapper=ProductMapper(IdentityMapper(), DailyMapper()),
497+
),
498+
):
499+
...
500+
501+
You can also override mappers for specific upstream assets with
502+
``partition_mapper_config``:
503+
504+
.. code-block:: python
505+
506+
from airflow.sdk import Asset, DAG, DailyMapper, IdentityMapper, PartitionedAssetTimetable
507+
508+
hourly_sales = Asset(uri="file://incoming/sales/hourly.csv", name="hourly_sales")
509+
daily_targets = Asset(uri="file://incoming/sales/targets.csv", name="daily_targets")
510+
511+
with DAG(
512+
dag_id="join_sales_and_targets",
513+
schedule=PartitionedAssetTimetable(
514+
assets=hourly_sales & daily_targets,
515+
# Default behavior: map timestamp-like keys to daily keys.
516+
default_partition_mapper=DailyMapper(),
517+
# Override for assets that already emit daily partition keys.
518+
partition_mapper_config={
519+
daily_targets: IdentityMapper(),
520+
},
521+
),
522+
):
523+
...
524+
525+
If transformed partition keys from all required upstream assets do not align,
526+
the downstream Dag will not be triggered for that partition.
527+
528+
The same applies when a mapper cannot transform a key. For example, if an
529+
upstream event has ``partition_key="random-text"`` and the downstream mapping
530+
uses ``DailyMapper`` (which expects a timestamp-like key), no downstream
531+
partition match can be produced, so the downstream Dag is not triggered for
532+
that key.
533+
534+
Inside partitioned Dag runs, access the resolved partition through
535+
``dag_run.partition_key``.
536+
537+
You can also trigger a DagRun manually with a partition key (for example,
538+
through the Trigger Dag window in the UI, or through the REST API by
539+
including ``partition_key`` in the request body):
540+
541+
.. code-block:: bash
542+
543+
curl -X POST "http://<airflow-host>/api/v2/dags/aggregate_regional_sales/dagRuns" \
544+
-H "Content-Type: application/json" \
545+
-d '{
546+
"logical_date": "2026-03-10T00:00:00Z",
547+
"partition_key": "us|2026-03-10T09:00:00"
548+
}'
549+
550+
For complete runnable examples, see
551+
``airflow-core/src/airflow/example_dags/example_asset_partition.py``.

airflow-core/pyproject.toml

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -195,12 +195,7 @@ dependencies = [
195195
"memray>=1.19.0",
196196
]
197197
"gunicorn" = [
198-
# Temporary upper bound due to Gunicorn 25.1.0 control-socket deadlock
199-
# seen in the Airflow API server path (preload + fork).
200-
# Upstream tracking:
201-
# - https://github.com/benoitc/gunicorn/issues/3529
202-
# - https://github.com/benoitc/gunicorn/pull/3520
203-
"gunicorn>=23.0.0,<25.1.0",
198+
"gunicorn>=23.0.0,!=25.1.0",
204199
]
205200
"otel" = [
206201
"opentelemetry-exporter-prometheus>=0.47b0",

airflow-core/src/airflow/api_fastapi/common/db/common.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -112,11 +112,6 @@ async def paginated_select_async(
112112
if return_total_entries:
113113
total_entries = await get_query_count_async(statement, session=session)
114114

115-
# TODO: Re-enable when permissions are handled. Readable / writable entities,
116-
# for instance:
117-
# readable_dags = get_auth_manager().get_authorized_dag_ids(user=g.user)
118-
# dags_select = dags_select.where(DagModel.dag_id.in_(readable_dags))
119-
120115
statement = apply_filters_to_select(
121116
statement=statement,
122117
filters=[order_by, offset, limit],
@@ -171,11 +166,6 @@ def paginated_select(
171166
if return_total_entries:
172167
total_entries = get_query_count(statement, session=session)
173168

174-
# TODO: Re-enable when permissions are handled. Readable / writable entities,
175-
# for instance:
176-
# readable_dags = get_auth_manager().get_authorized_dag_ids(user=g.user)
177-
# dags_select = dags_select.where(DagModel.dag_id.in_(readable_dags))
178-
179169
statement = apply_filters_to_select(statement=statement, filters=[order_by, offset, limit])
180170

181171
return statement, total_entries

airflow-core/src/airflow/api_fastapi/core_api/services/ui/connections.py

Lines changed: 76 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,11 @@ def __init__(
4949
pass
5050

5151
def __call__(self, form, field):
52-
pass
52+
"""No-op call to satisfy WTForms validator protocol."""
53+
return None
5354

5455
class MockEnum:
55-
"""Mock for wtforms.validators.Optional."""
56+
"""Mock for wtforms.validators.AnyOf."""
5657

5758
def __init__(self, allowed_values):
5859
self.allowed_values = allowed_values
@@ -156,23 +157,40 @@ def mock_any_of(allowed_values: list) -> HookMetaService.MockEnum:
156157
raise ModuleNotFoundError
157158
except ModuleNotFoundError:
158159
sys.modules[mod_name] = MagicMock()
159-
with (
160-
mock.patch("wtforms.StringField", HookMetaService.MockStringField),
161-
mock.patch("wtforms.fields.StringField", HookMetaService.MockStringField),
162-
mock.patch("wtforms.fields.simple.StringField", HookMetaService.MockStringField),
163-
mock.patch("wtforms.IntegerField", HookMetaService.MockIntegerField),
164-
mock.patch("wtforms.fields.IntegerField", HookMetaService.MockIntegerField),
165-
mock.patch("wtforms.PasswordField", HookMetaService.MockPasswordField),
166-
mock.patch("wtforms.BooleanField", HookMetaService.MockBooleanField),
167-
mock.patch("wtforms.fields.BooleanField", HookMetaService.MockBooleanField),
168-
mock.patch("wtforms.fields.simple.BooleanField", HookMetaService.MockBooleanField),
169-
mock.patch("flask_babel.lazy_gettext", mock_lazy_gettext),
170-
mock.patch("flask_appbuilder.fieldwidgets.BS3TextFieldWidget", HookMetaService.MockAnyWidget),
171-
mock.patch("flask_appbuilder.fieldwidgets.BS3TextAreaFieldWidget", HookMetaService.MockAnyWidget),
172-
mock.patch("flask_appbuilder.fieldwidgets.BS3PasswordFieldWidget", HookMetaService.MockAnyWidget),
173-
mock.patch("wtforms.validators.Optional", HookMetaService.MockOptional),
174-
mock.patch("wtforms.validators.any_of", mock_any_of),
175-
):
160+
161+
# We conditionally inject mock classes for missing dependencies
162+
# to ensure `ProvidersManager` can initialize hook connection widgets
163+
# without crashing when FAB/WTForms are not installed.
164+
if "wtforms.StringField" not in sys.modules:
165+
# Only apply mocks if the actual module wasn't loaded beforehand.
166+
# This avoids thread-safety issues caused by `unittest.mock.patch` mutating global states.
167+
with (
168+
mock.patch("wtforms.StringField", HookMetaService.MockStringField),
169+
mock.patch("wtforms.fields.StringField", HookMetaService.MockStringField),
170+
mock.patch("wtforms.fields.simple.StringField", HookMetaService.MockStringField),
171+
mock.patch("wtforms.IntegerField", HookMetaService.MockIntegerField),
172+
mock.patch("wtforms.fields.IntegerField", HookMetaService.MockIntegerField),
173+
mock.patch("wtforms.PasswordField", HookMetaService.MockPasswordField),
174+
mock.patch("wtforms.BooleanField", HookMetaService.MockBooleanField),
175+
mock.patch("wtforms.fields.BooleanField", HookMetaService.MockBooleanField),
176+
mock.patch("wtforms.fields.simple.BooleanField", HookMetaService.MockBooleanField),
177+
mock.patch("flask_babel.lazy_gettext", mock_lazy_gettext),
178+
mock.patch("flask_appbuilder.fieldwidgets.BS3TextFieldWidget", HookMetaService.MockAnyWidget),
179+
mock.patch(
180+
"flask_appbuilder.fieldwidgets.BS3TextAreaFieldWidget", HookMetaService.MockAnyWidget
181+
),
182+
mock.patch(
183+
"flask_appbuilder.fieldwidgets.BS3PasswordFieldWidget", HookMetaService.MockAnyWidget
184+
),
185+
mock.patch("wtforms.validators.Optional", HookMetaService.MockOptional),
186+
mock.patch("wtforms.validators.any_of", mock_any_of),
187+
# Prevent poisoning the global ProvidersManager singleton with mocks
188+
mock.patch("airflow.providers_manager.ProvidersManager._instance", None),
189+
mock.patch("airflow.providers_manager.ProvidersManager.initialized", return_value=False),
190+
):
191+
pm = ProvidersManager()
192+
return pm.hooks, pm.connection_form_widgets, pm.field_behaviours # Will init providers hooks
193+
else:
176194
pm = ProvidersManager()
177195
return pm.hooks, pm.connection_form_widgets, pm.field_behaviours # Will init providers hooks
178196

@@ -215,6 +233,45 @@ def _convert_extra_fields(form_widgets: dict[str, ConnectionFormWidgetInfo]) ->
215233
elif isinstance(form_widget.field, HookMetaService.MockBaseField):
216234
# legacy path, form widgets created using mocked WTForms fields, need to convert to SerializedParam.dump()
217235
hook_widgets[form_widget.field_name] = form_widget.field.param.dump()
236+
elif type(form_widget.field).__name__ == "UnboundField":
237+
# handle real WTForms fields gracefully without needing mock patches
238+
field_class_name = getattr(form_widget.field.field_class, "__name__", "")
239+
param_type = "string"
240+
param_format = None
241+
if field_class_name == "BooleanField":
242+
param_type = "boolean"
243+
elif field_class_name == "IntegerField":
244+
param_type = "integer"
245+
elif field_class_name == "PasswordField":
246+
param_format = "password"
247+
248+
label = (
249+
form_widget.field.args[0]
250+
if len(form_widget.field.args) > 0
251+
else form_widget.field.kwargs.get("label")
252+
)
253+
validators = form_widget.field.kwargs.get("validators", [])
254+
description = form_widget.field.kwargs.get("description", "")
255+
default = form_widget.field.kwargs.get("default", None)
256+
257+
enum = {}
258+
for v in validators:
259+
if type(v).__name__ == "AnyOf":
260+
enum["enum"] = getattr(v, "values", [])
261+
262+
types = [param_type, "null"]
263+
format_dict = {"format": param_format} if param_format else {}
264+
265+
param = SerializedParam(
266+
default=default,
267+
title=str(label) if label is not None else None,
268+
description=str(description) if description else None,
269+
source=None,
270+
type=types,
271+
**format_dict,
272+
**enum,
273+
).dump()
274+
hook_widgets[form_widget.field_name] = param
218275
else:
219276
log.error("Unknown form widget in %s: %s", hook_key, form_widget)
220277
continue

airflow-core/src/airflow/assets/manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ def _queue_dagruns(
357357
)
358358

359359
non_partitioned_dags = dags_to_queue.difference(partition_dags) # don't double process
360-
if not non_partitioned_dags:
360+
if not non_partitioned_dags or partition_key is not None:
361361
return None
362362

363363
# Possible race condition: if multiple dags or multiple (usually

airflow-core/src/airflow/dag_processing/collection.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,6 @@ def _get_latest_runs_stmt(dag_id: str) -> Select:
136136

137137
def _get_latest_runs_stmt_partitioned(dag_id: str) -> Select:
138138
"""Build a select statement to retrieve the last partitioned run for each Dag."""
139-
# todo: AIP-76 we should add a partition date field
140139
latest_run_id = (
141140
select(DagRun.id)
142141
.where(
@@ -149,7 +148,11 @@ def _get_latest_runs_stmt_partitioned(dag_id: str) -> Select:
149148
),
150149
DagRun.partition_key.is_not(None),
151150
)
152-
.order_by(DagRun.id.desc()) # todo: AIP-76 add partition date and sort by it here
151+
.order_by(
152+
DagRun.partition_date.is_(None),
153+
DagRun.partition_date.desc(),
154+
DagRun.run_after.desc(),
155+
)
153156
.limit(1)
154157
.scalar_subquery()
155158
)

0 commit comments

Comments
 (0)