Skip to content

Commit cf0a274

Browse files
committed
Devin feedback
Signed-off-by: Nick Quinn <nicholas_quinn@apple.com>
1 parent 0bfaeaa commit cf0a274

File tree

5 files changed

+20
-18
lines changed

5 files changed

+20
-18
lines changed

sdk/python/feast/aggregation/__init__.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,20 @@ def __eq__(self, other):
103103

104104
return True
105105

106+
def resolved_name(self, time_window: Optional[timedelta] = None) -> str:
107+
"""Return the output feature name for this aggregation.
108+
109+
If ``name`` is set it is returned as-is. Otherwise the name is
110+
derived as ``{function}_{column}``, with ``_{seconds}s`` appended
111+
when *time_window* is provided.
112+
"""
113+
if self.name:
114+
return self.name
115+
base = f"{self.function}_{self.column}"
116+
if time_window is not None:
117+
return f"{base}_{int(time_window.total_seconds())}s"
118+
return base
119+
106120

107121
def aggregation_specs_to_agg_ops(
108122
agg_specs: Iterable[Any],

sdk/python/feast/aggregation/tiling/orchestrator.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,7 @@ def apply_sawtooth_window_tiling(
6161
ir_metadata_dict = {}
6262

6363
for agg in aggregations:
64-
feature_name = (
65-
f"{agg.function}_{agg.column}_{int(window_size.total_seconds())}s"
66-
)
64+
feature_name = agg.resolved_name(window_size)
6765
_, metadata = get_ir_metadata_for_aggregation(agg, feature_name)
6866
ir_metadata_dict[feature_name] = metadata
6967

@@ -161,9 +159,7 @@ def apply_sawtooth_window_tiling(
161159

162160
# Step 5: Compute final feature values from IRs (for algebraic aggs, just rename)
163161
for agg in aggregations:
164-
feature_name = (
165-
f"{agg.function}_{agg.column}_{int(window_size.total_seconds())}s"
166-
)
162+
feature_name = agg.resolved_name(window_size)
167163
metadata = ir_metadata_dict[feature_name]
168164

169165
if metadata.type == "algebraic":

sdk/python/feast/aggregation/tiling/tile_subtraction.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,7 @@ def convert_cumulative_to_windowed(
7676

7777
# Subtract previous tile values from current tile for each aggregation
7878
for agg in aggregations:
79-
feature_name = (
80-
f"{agg.function}_{agg.column}_{int(window_size.total_seconds())}s"
81-
)
79+
feature_name = agg.resolved_name(window_size)
8280
_, metadata = get_ir_metadata_for_aggregation(agg, feature_name)
8381

8482
if metadata.type == "algebraic":

sdk/python/feast/infra/compute_engines/ray/nodes.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -461,9 +461,7 @@ def _execute_standard_aggregation(self, dataset: Dataset) -> DAGValue:
461461
# Convert aggregations to Ray's groupby format
462462
agg_dict = {}
463463
for agg in self.aggregations:
464-
feature_name = f"{agg.function}_{agg.column}"
465-
if agg.time_window:
466-
feature_name += f"_{int(agg.time_window.total_seconds())}s"
464+
feature_name = agg.resolved_name(agg.time_window)
467465

468466
if agg.function == "count":
469467
agg_dict[feature_name] = (agg.column, "count")

sdk/python/feast/infra/compute_engines/spark/nodes.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ def _execute_tiled_aggregation(self, input_df: DataFrame) -> DAGValue:
311311
expected_columns = entity_keys + [self.timestamp_col]
312312
for time_window, window_aggs in aggs_by_window.items():
313313
for agg in window_aggs:
314-
feature_name = f"{agg.function}_{agg.column}_{int(time_window.total_seconds())}s"
314+
feature_name = agg.resolved_name(time_window)
315315
if feature_name not in expected_columns:
316316
expected_columns.append(feature_name)
317317

@@ -372,11 +372,7 @@ def _execute_standard_aggregation(self, input_df: DataFrame) -> DAGValue:
372372
agg_exprs = []
373373
for agg in self.aggregations:
374374
func = getattr(F, agg.function)
375-
expr = func(agg.column).alias(
376-
f"{agg.function}_{agg.column}_{int(agg.time_window.total_seconds())}s"
377-
if agg.time_window
378-
else f"{agg.function}_{agg.column}"
379-
)
375+
expr = func(agg.column).alias(agg.resolved_name(agg.time_window))
380376
agg_exprs.append(expr)
381377

382378
if any(agg.time_window for agg in self.aggregations):

0 commit comments

Comments
 (0)