Skip to content

Commit 26e7b27

Browse files
[v3-1-test] UI: Fix Grid for cleared runs when tasks were removed (#56085) (#56297)
Ensure removed/historical tasks from selected runs are visible in Grid even if they no longer exist in the current DAG version. We now: - Include synthetic leaf nodes for task_ids present in TIs but missing from the serialized DAG in both grid/structure and grid/ti_summaries. - Aggregate TI states for these synthetic nodes Add tests covering structure and TI summaries for removed tasks. (cherry picked from commit 77fae41) Co-authored-by: Ephraim Anierobi <splendidzigy24@gmail.com>
1 parent a3b7856 commit 26e7b27

File tree

3 files changed

+119
-11
lines changed
  • airflow-core

3 files changed

+119
-11
lines changed

airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py

Lines changed: 68 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from __future__ import annotations
1919

2020
import collections
21-
from typing import TYPE_CHECKING, Annotated
21+
from typing import TYPE_CHECKING, Annotated, Any
2222

2323
import structlog
2424
from fastapi import Depends, HTTPException, status
@@ -48,6 +48,7 @@
4848
from airflow.api_fastapi.core_api.security import requires_access_dag
4949
from airflow.api_fastapi.core_api.services.ui.grid import (
5050
_find_aggregates,
51+
_get_aggs_for_node,
5152
_merge_node_dicts,
5253
)
5354
from airflow.api_fastapi.core_api.services.ui.task_group import (
@@ -156,7 +157,7 @@ def get_dag_structure(
156157
task_group_sort = get_task_group_children_getter()
157158
if not run_ids:
158159
nodes = [task_group_to_dict_grid(x) for x in task_group_sort(latest_dag.task_group)]
159-
return nodes
160+
return [GridNodeResponse(**n) for n in nodes]
160161

161162
serdags = session.scalars(
162163
select(SerializedDagModel).where(
@@ -170,7 +171,7 @@ def get_dag_structure(
170171
)
171172
)
172173
)
173-
merged_nodes: list[GridNodeResponse] = []
174+
merged_nodes: list[dict[str, Any]] = []
174175
dags = [latest_dag]
175176
for serdag in serdags:
176177
if serdag:
@@ -179,7 +180,37 @@ def get_dag_structure(
179180
nodes = [task_group_to_dict_grid(x) for x in task_group_sort(dag.task_group)]
180181
_merge_node_dicts(merged_nodes, nodes)
181182

182-
return merged_nodes
183+
# Ensure historical tasks (e.g. removed) that exist in TIs for the selected runs are represented
184+
def _collect_ids(nodes: list[dict[str, Any]]) -> set[str]:
185+
ids: set[str] = set()
186+
for n in nodes:
187+
nid = n.get("id")
188+
if nid:
189+
ids.add(nid)
190+
children = n.get("children")
191+
if children:
192+
ids |= _collect_ids(children) # recurse
193+
return ids
194+
195+
existing_ids = _collect_ids(merged_nodes)
196+
historical_task_ids = session.scalars(
197+
select(TaskInstance.task_id)
198+
.join(TaskInstance.dag_run)
199+
.where(TaskInstance.dag_id == dag_id, DagRun.id.in_(run_ids))
200+
.distinct()
201+
)
202+
for task_id in historical_task_ids:
203+
if task_id not in existing_ids:
204+
merged_nodes.append(
205+
{
206+
"id": task_id,
207+
"label": task_id,
208+
"is_mapped": None,
209+
"children": None,
210+
}
211+
)
212+
213+
return [GridNodeResponse(**n) for n in merged_nodes]
183214

184215

185216
@grid_router.get(
@@ -345,19 +376,47 @@ def get_grid_ti_summaries(
345376
assert serdag
346377

347378
def get_node_sumaries():
379+
yielded_task_ids: set[str] = set()
380+
381+
# Yield all nodes discoverable from the serialized DAG structure
348382
for node in _find_aggregates(
349383
node=serdag.dag.task_group,
350384
parent_node=None,
351385
ti_details=ti_details,
352386
):
353-
if node["type"] == "task":
354-
node["child_states"] = None
355-
node["min_start_date"] = None
356-
node["max_end_date"] = None
387+
if node["type"] in {"task", "mapped_task"}:
388+
yielded_task_ids.add(node["task_id"])
389+
if node["type"] == "task":
390+
node["child_states"] = None
391+
node["min_start_date"] = None
392+
node["max_end_date"] = None
357393
yield node
358394

395+
# For good history: add synthetic leaf nodes for task_ids that have TIs in this run
396+
# but are not present in the current DAG structure (e.g. removed tasks)
397+
missing_task_ids = set(ti_details.keys()) - yielded_task_ids
398+
for task_id in sorted(missing_task_ids):
399+
detail = ti_details[task_id]
400+
# Create a leaf task node with aggregated state from its TIs
401+
agg = _get_aggs_for_node(detail)
402+
yield {
403+
"task_id": task_id,
404+
"type": "task",
405+
"parent_id": None,
406+
**agg,
407+
# Align with leaf behavior
408+
"child_states": None,
409+
"min_start_date": None,
410+
"max_end_date": None,
411+
}
412+
413+
task_instances = list(get_node_sumaries())
414+
# If a group id and a task id collide, prefer the group record
415+
group_ids = {n.get("task_id") for n in task_instances if n.get("type") == "group"}
416+
filtered = [n for n in task_instances if not (n.get("type") == "task" and n.get("task_id") in group_ids)]
417+
359418
return { # type: ignore[return-value]
360419
"run_id": run_id,
361420
"dag_id": dag_id,
362-
"task_instances": list(get_node_sumaries()),
421+
"task_instances": filtered,
363422
}

airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,16 +85,19 @@ def _find_aggregates(
8585
"""Recursively fill the Task Group Map."""
8686
node_id = node.node_id
8787
parent_id = parent_node.node_id if parent_node else None
88-
details = ti_details[node_id]
88+
# Do not mutate ti_details by accidental key creation
89+
details = ti_details.get(node_id, [])
8990

9091
if node is None:
9192
return
9293
if isinstance(node, MappedOperator):
94+
# For unmapped tasks, reflect a single None state so UI shows one square
95+
mapped_details = details or [{"state": None, "start_date": None, "end_date": None}]
9396
yield {
9497
"task_id": node_id,
9598
"type": "mapped_task",
9699
"parent_id": parent_id,
97-
**_get_aggs_for_node(details),
100+
**_get_aggs_for_node(mapped_details),
98101
}
99102

100103
return

airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,39 @@ def test_should_response_200_with_deleted_task_and_taskgroup(self, session, test
438438
},
439439
]
440440

441+
# Also verify that TI summaries include a leaf entry for the removed task
442+
ti_resp = test_client.get(f"/grid/ti_summaries/{DAG_ID_3}/run_3")
443+
assert ti_resp.status_code == 200
444+
ti_payload = ti_resp.json()
445+
assert ti_payload["dag_id"] == DAG_ID_3
446+
assert ti_payload["run_id"] == "run_3"
447+
# Find the removed task summary; it should exist even if not in current serialized DAG structure
448+
removed_ti = next(
449+
(
450+
n
451+
for n in ti_payload["task_instances"]
452+
if n["task_id"] == TASK_ID_4 and n["child_states"] is None
453+
),
454+
None,
455+
)
456+
assert removed_ti is not None
457+
# Its state should be the aggregated state of its TIs, which includes 'removed'
458+
assert removed_ti["state"] in (
459+
"removed",
460+
None,
461+
"skipped",
462+
"success",
463+
"failed",
464+
"running",
465+
"queued",
466+
"scheduled",
467+
"deferred",
468+
"restarting",
469+
"up_for_retry",
470+
"up_for_reschedule",
471+
"upstream_failed",
472+
)
473+
441474
def test_get_dag_structure(self, session, test_client):
442475
session.commit()
443476
response = test_client.get(f"/grid/structure/{DAG_ID}?limit=5")
@@ -690,3 +723,16 @@ def sort_dict(in_dict):
690723
expected = sort_dict(expected)
691724
actual = sort_dict(actual)
692725
assert actual == expected
726+
727+
def test_structure_includes_historical_removed_task_with_proper_shape(self, session, test_client):
728+
# Ensure the structure endpoint returns synthetic node for historical/removed task
729+
response = test_client.get(f"/grid/structure/{DAG_ID_3}")
730+
assert response.status_code == 200
731+
nodes = response.json()
732+
# Find the historical removed task id
733+
t4 = next((n for n in nodes if n["id"] == TASK_ID_4), None)
734+
assert t4 is not None
735+
assert t4["label"] == TASK_ID_4
736+
# Optional None fields are excluded from response due to response_model_exclude_none=True
737+
assert "is_mapped" not in t4
738+
assert "children" not in t4

0 commit comments

Comments
 (0)