Skip to content

Commit c81af1b

Browse files
committed
[DOP-31838] Extract engine & OpenLineage versions as job tags
1 parent f6c2c4e commit c81af1b

33 files changed

+756
-68
lines changed

data_rentgen/consumer/extractors/batch_extraction_result.py

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,27 @@
1818
RunDTO,
1919
SchemaDTO,
2020
SQLQueryDTO,
21+
TagDTO,
22+
TagValueDTO,
2123
UserDTO,
2224
)
2325

2426
T = TypeVar(
2527
"T",
26-
LocationDTO,
27-
DatasetDTO,
2828
ColumnLineageDTO,
29+
DatasetDTO,
2930
DatasetSymlinkDTO,
31+
InputDTO,
3032
JobDTO,
3133
JobTypeDTO,
32-
RunDTO,
34+
LocationDTO,
3335
OperationDTO,
34-
InputDTO,
3536
OutputDTO,
37+
RunDTO,
3638
SchemaDTO,
3739
SQLQueryDTO,
40+
TagDTO,
41+
TagValueDTO,
3842
UserDTO,
3943
)
4044

@@ -70,6 +74,8 @@ def __init__(self):
7074
self._column_lineage: dict[tuple, ColumnLineageDTO] = {}
7175
self._schemas: dict[tuple, SchemaDTO] = {}
7276
self._sql_queries: dict[tuple, SQLQueryDTO] = {}
77+
self._tags: dict[tuple, TagDTO] = {}
78+
self._tag_values: dict[tuple, TagValueDTO] = {}
7379
self._users: dict[tuple, UserDTO] = {}
7480

7581
def __repr__(self):
@@ -87,6 +93,8 @@ def __repr__(self):
8793
f"column_lineage={len(self._column_lineage)}, "
8894
f"schemas={len(self._schemas)}, "
8995
f"sql_queries={len(self._sql_queries)}, "
96+
f"tag_values={len(self._tags)}, "
97+
f"tag_values={len(self._tag_values)}, "
9098
f"users={len(self._users)}"
9199
")"
92100
)
@@ -125,6 +133,7 @@ def add_job(self, job: JobDTO):
125133
job.location = self.add_location(job.location)
126134
if job.type:
127135
job.type = self.add_job_type(job.type)
136+
job.tag_values = {self.add_tag_value(tag_value) for tag_value in job.tag_values}
128137
return self._add(self._jobs, job)
129138

130139
def add_run(self, run: RunDTO):
@@ -167,6 +176,13 @@ def add_schema(self, schema: SchemaDTO):
167176
def add_sql_query(self, sql_query: SQLQueryDTO):
168177
return self._add(self._sql_queries, sql_query)
169178

179+
def add_tag(self, tag: TagDTO):
180+
return self._add(self._tags, tag)
181+
182+
def add_tag_value(self, tag_value: TagValueDTO):
183+
tag_value.tag = self.add_tag(tag_value.tag)
184+
return self._add(self._tag_values, tag_value)
185+
170186
def add_user(self, user: UserDTO):
171187
return self._add(self._users, user)
172188

@@ -182,6 +198,12 @@ def get_sql_query(self, sql_query_key: tuple) -> SQLQueryDTO:
182198
def get_user(self, user_key: tuple) -> UserDTO:
183199
return self._users[user_key]
184200

201+
def get_tag(self, tag_key: tuple) -> TagDTO:
202+
return self._tags[tag_key]
203+
204+
def get_tag_value(self, tag_value_key: tuple) -> TagValueDTO:
205+
return self._tag_values[tag_value_key]
206+
185207
def get_dataset(self, dataset_key: tuple) -> DatasetDTO:
186208
dataset = self._datasets[dataset_key]
187209
dataset.location = self.get_location(dataset.location.unique_key)
@@ -201,6 +223,7 @@ def get_job(self, job_key: tuple) -> JobDTO:
201223
job.location = self.get_location(job.location.unique_key)
202224
if job.type:
203225
job.type = self.get_job_type(job.type.unique_key)
226+
job.tag_values = {self.get_tag_value(tag_value.unique_key) for tag_value in job.tag_values}
204227
return job
205228

206229
def get_run(self, run_key: tuple) -> RunDTO:
@@ -282,6 +305,12 @@ def schemas(self) -> list[SchemaDTO]:
282305
def sql_queries(self) -> list[SQLQueryDTO]:
283306
return self._resolve(self.get_sql_query, self._sql_queries)
284307

308+
def tags(self) -> list[TagDTO]:
309+
return self._resolve(self.get_tag, self._tags)
310+
311+
def tag_values(self) -> list[TagValueDTO]:
312+
return self._resolve(self.get_tag_value, self._tag_values)
313+
285314
def users(self) -> list[UserDTO]:
286315
return self._resolve(self.get_user, self._users)
287316

@@ -322,6 +351,12 @@ def merge(self, other: BatchExtractionResult) -> BatchExtractionResult: # noqa:
322351
for sql_query in other.sql_queries():
323352
self.add_sql_query(sql_query)
324353

354+
for tag in other.tags():
355+
self.add_tag(tag)
356+
357+
for tag_value in other.tag_values():
358+
self.add_tag_value(tag_value)
359+
325360
for user in other.users():
326361
self.add_user(user)
327362

data_rentgen/consumer/extractors/generic/run.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
JobDTO,
99
RunDTO,
1010
RunStatusDTO,
11+
TagDTO,
12+
TagValueDTO,
1113
)
1214
from data_rentgen.openlineage.job import OpenLineageJob
1315
from data_rentgen.openlineage.run_event import (
@@ -39,6 +41,7 @@ def extract_run(self, event: OpenLineageRunEvent) -> RunDTO:
3941
parent_run=self.extract_parent_run(event.run.facets.parent) if event.run.facets.parent else None,
4042
)
4143
self._enrich_run_status(run, event)
44+
self._enrich_run_tags(run, event)
4245
return run
4346

4447
def extract_parent_run(self, facet: OpenLineageParentRunFacet | OpenLineageRunEvent) -> RunDTO:
@@ -70,3 +73,31 @@ def _enrich_run_status(self, run: RunDTO, event: OpenLineageRunEvent) -> RunDTO:
7073
# OTHER is used only to update run statistics
7174
pass
7275
return run
76+
77+
def _enrich_run_tags(self, run: RunDTO, event: OpenLineageRunEvent) -> RunDTO:
78+
if event.run.facets.processing_engine:
79+
client_tag_value = TagValueDTO(
80+
tag=TagDTO(name=f"{event.run.facets.processing_engine.name.lower()}.version"),
81+
value=str(event.run.facets.processing_engine.version),
82+
)
83+
adapter_tag_value = TagValueDTO(
84+
tag=TagDTO(name="openlineage_adapter.version"),
85+
value=str(event.run.facets.processing_engine.openlineageAdapterVersion),
86+
)
87+
# we don't store run tags, everything is merged into job tags
88+
run.job.tag_values.add(client_tag_value)
89+
run.job.tag_values.add(adapter_tag_value)
90+
91+
if not event.run.facets.tags:
92+
return run
93+
94+
for raw_tag in event.run.facets.tags.tags:
95+
key = raw_tag.key
96+
if key == "openlineage_client_version":
97+
# https://github.com/OpenLineage/OpenLineage/blob/1.42.1/client/python/src/openlineage/client/client.py#L460
98+
tag_value = TagValueDTO(
99+
tag=TagDTO(name="openlineage_client.version"),
100+
value=raw_tag.value,
101+
)
102+
run.job.tag_values.add(tag_value)
103+
return run

data_rentgen/consumer/extractors/impl/hive.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def extract_run(self, event: OpenLineageRunEvent) -> RunDTO:
5555
if hive_session.username not in ("anonymous", "hive"):
5656
user = UserDTO(name=hive_session.username)
5757

58-
return RunDTO(
58+
result = RunDTO(
5959
id=run_id,
6060
job=JobDTO(
6161
name=job_name,
@@ -68,6 +68,8 @@ def extract_run(self, event: OpenLineageRunEvent) -> RunDTO:
6868
external_id=hive_session.sessionId,
6969
user=user,
7070
)
71+
self._enrich_run_tags(result, event)
72+
return result
7173

7274
def extract_operation(self, event: OpenLineageRunEvent) -> OperationDTO:
7375
run = self.extract_run(event)

data_rentgen/consumer/extractors/impl/spark.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ def extract_operation(self, event: OpenLineageRunEvent) -> OperationDTO:
6262
run = self.extract_parent_run(event.run.facets.parent) # type: ignore[arg-type]
6363
# Workaround for https://github.com/OpenLineage/OpenLineage/issues/3846
6464
self._enrich_run_identifiers(run, event)
65+
self._enrich_run_tags(run, event)
6566
operation = super()._extract_operation(event, run)
6667

6768
# in some cases, operation name may contain raw SELECT query with newlines. use spaces instead.

data_rentgen/consumer/saver.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ async def save(self, data: BatchExtractionResult):
2424
self.logger.info("Saving to database")
2525

2626
await self.create_locations(data)
27+
await self.create_tags(data)
28+
await self.create_tag_values(data)
2729
await self.create_datasets(data)
2830
await self.create_dataset_symlinks(data)
2931
await self.create_job_types(data)
@@ -51,7 +53,7 @@ async def create_locations(self, data: BatchExtractionResult):
5153
for location_dto in data.locations():
5254
async with self.unit_of_work:
5355
location = await self.unit_of_work.location.create_or_update(location_dto)
54-
location_dto.id = location.id
56+
location_dto.id = location.id
5557

5658
# To avoid deadlocks when parallel consumer instances insert/update the same row,
5759
# commit changes for each row instead of committing the whole batch. Yes, this cloud be slow.
@@ -92,7 +94,7 @@ async def create_jobs(self, data: BatchExtractionResult):
9294
job = await self.unit_of_work.job.create_or_update(job_dto) # noqa: PLW2901
9395
else:
9496
job = await self.unit_of_work.job.update(job, job_dto) # noqa: PLW2901
95-
job_dto.id = job.id
97+
job_dto.id = job.id
9698

9799
async def create_users(self, data: BatchExtractionResult):
98100
self.logger.debug("Creating users")
@@ -125,6 +127,26 @@ async def create_schemas(self, data: BatchExtractionResult):
125127
else:
126128
schema_dto.id = schema_id
127129

130+
async def create_tags(self, data: BatchExtractionResult):
131+
self.logger.debug("Creating tags")
132+
self.logger.warning("Creating tags %s", data.tags())
133+
tag_pairs = await self.unit_of_work.tag.fetch_bulk(data.tags())
134+
for tag_dto, tag in tag_pairs:
135+
if not tag:
136+
async with self.unit_of_work:
137+
tag = await self.unit_of_work.tag.create(tag_dto) # noqa: PLW2901
138+
tag_dto.id = tag.id
139+
140+
async def create_tag_values(self, data: BatchExtractionResult):
141+
self.logger.debug("Creating tag values")
142+
self.logger.warning("Creating tag values %s", data.tag_values())
143+
tag_value_pairs = await self.unit_of_work.tag_value.fetch_bulk(data.tag_values())
144+
for tag_value_dto, tag_value in tag_value_pairs:
145+
if not tag_value:
146+
async with self.unit_of_work:
147+
tag_value = await self.unit_of_work.tag_value.create(tag_value_dto) # noqa: PLW2901
148+
tag_value_dto.id = tag_value.id
149+
128150
# In most cases, all the run tree created by some parent is send into one
129151
# Kafka partition, and thus handled by just one worker.
130152
# Cross fingers and create all runs in one transaction.

data_rentgen/db/repositories/job.py

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121
select,
2222
tuple_,
2323
union,
24+
update,
2425
)
26+
from sqlalchemy.dialects.postgresql import insert
2527
from sqlalchemy.orm import selectinload
2628

27-
from data_rentgen.db.models import Address, Job, Location, TagValue
29+
from data_rentgen.db.models import Address, Job, JobTagValue, Location, TagValue
2830
from data_rentgen.db.repositories.base import Repository
2931
from data_rentgen.db.utils.search import make_tsquery, ts_match, ts_rank
3032
from data_rentgen.dto import JobDTO, PaginationDTO
@@ -69,6 +71,19 @@
6971
.group_by(Job.location_id)
7072
)
7173

74+
update_job_type_query = update(Job).where(Job.id == bindparam("job_id")).values(type_id=bindparam("type_id"))
75+
76+
insert_tag_value_query = (
77+
insert(JobTagValue)
78+
.values(
79+
{
80+
"job_id": bindparam("job_id"),
81+
"tag_value_id": bindparam("tag_value_id"),
82+
}
83+
)
84+
.on_conflict_do_nothing(index_elements=["job_id", "tag_value_id"])
85+
)
86+
7287

7388
class JobRepository(Repository[Job]):
7489
async def paginate(
@@ -175,11 +190,15 @@ async def fetch_bulk(self, jobs_dto: list[JobDTO]) -> list[tuple[JobDTO, Job | N
175190
]
176191

177192
async def create_or_update(self, job: JobDTO) -> Job:
178-
# if another worker already created the same row, just use it. if not - create with holding the lock.
179-
await self._lock(job.location.id, job.name.lower())
180193
result = await self._get(job)
181194
if not result:
182-
return await self._create(job)
195+
# try one more time, but with lock acquired.
196+
# if another worker already created the same row, just use it. if not - create with holding the lock.
197+
await self._lock(job.location.id, job.name.lower())
198+
result = await self._get(job)
199+
200+
if not result:
201+
result = await self._create(job)
183202
return await self.update(result, job)
184203

185204
async def _get(self, job: JobDTO) -> Job | None:
@@ -204,8 +223,30 @@ async def _create(self, job: JobDTO) -> Job:
204223
async def update(self, existing: Job, new: JobDTO) -> Job:
205224
# almost of fields are immutable, so we can avoid UPDATE statements if row is unchanged
206225
if new.type and new.type.id and existing.type_id != new.type.id:
207-
existing.type_id = new.type.id
208-
await self._session.flush([existing])
226+
await self._session.execute(
227+
update_job_type_query,
228+
{
229+
"job_id": existing.id,
230+
"type_id": new.type.id,
231+
},
232+
)
233+
234+
if not new.tag_values:
235+
# in cases when jobs have no tag values we can avoid INSERT statements
236+
return existing
237+
238+
# Lock to prevent inserting the same rows from multiple workers
239+
await self._lock(existing.location_id, existing.name)
240+
await self._session.execute(
241+
insert_tag_value_query,
242+
[
243+
{
244+
"job_id": existing.id,
245+
"tag_value_id": tag_value_dto.id,
246+
}
247+
for tag_value_dto in new.tag_values
248+
],
249+
)
209250
return existing
210251

211252
async def list_by_ids(self, job_ids: Collection[int]) -> list[Job]:

data_rentgen/db/repositories/location.py

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
select,
1616
union,
1717
)
18+
from sqlalchemy.dialects.postgresql import insert
1819
from sqlalchemy.orm import selectinload
1920

2021
from data_rentgen.db.models import Address, Location
@@ -44,6 +45,17 @@
4445
)
4546
get_distinct_query = select(Location.type).distinct(Location.type).order_by(Location.type)
4647

48+
insert_address_query = (
49+
insert(Address)
50+
.values(
51+
{
52+
"location_id": bindparam("location_id"),
53+
"url": bindparam("url"),
54+
}
55+
)
56+
.on_conflict_do_nothing(index_elements=["location_id", "url"])
57+
)
58+
4759

4860
class LocationRepository(Repository[Location]):
4961
async def paginate(
@@ -141,25 +153,22 @@ async def _create(self, location: LocationDTO) -> Location:
141153
async def _update_addresses(self, existing: Location, new: LocationDTO) -> Location:
142154
existing_urls = {address.url for address in existing.addresses}
143155
new_urls = new.addresses - existing_urls
144-
# in most cases, Location is unchanged, so we can avoid UPDATE statements
156+
# in most cases, Location is unchanged, so we can avoid INSERT statements
145157
if not new_urls:
146158
return existing
147159

148-
# take a lock, to avoid race conditions, and then
149-
# get fresh state of the object, because it already could be updated by another worker
160+
# take a lock to avoid creating the same address from multiple workers
150161
await self._lock(existing.type, existing.name)
151-
await self._session.refresh(existing, ["addresses"])
152-
153-
# already has all required addresses - nothing to update
154-
existing_urls = {address.url for address in existing.addresses}
155-
new_urls = new.addresses - existing_urls
156-
if not new_urls:
157-
return existing
158-
159-
# add new addresses while holding the lock
160-
addresses = [Address(url=url, location_id=existing.id) for url in new_urls]
161-
existing.addresses.extend(addresses)
162-
await self._session.flush([existing])
162+
await self._session.execute(
163+
insert_address_query,
164+
[
165+
{
166+
"location_id": existing.id,
167+
"url": url,
168+
}
169+
for url in new_urls
170+
],
171+
)
163172
return existing
164173

165174
async def get_location_types(self):

0 commit comments

Comments
 (0)