From 02df4b21f787d0ef6f33c709f17374e7a49a393f Mon Sep 17 00:00:00 2001 From: Nick Quinn Date: Mon, 9 Mar 2026 13:25:19 -0700 Subject: [PATCH 1/5] feat: Adding optional name to Aggregation (feast-dev#5994) Signed-off-by: Nick Quinn --- protos/feast/core/Aggregation.proto | 1 + sdk/python/feast/aggregation/__init__.py | 9 +++- .../protos/feast/core/Aggregation_pb2.py | 4 +- .../protos/feast/core/Aggregation_pb2.pyi | 5 +- sdk/python/tests/unit/test_aggregation_ops.py | 54 ++++++++++++++++++- 5 files changed, 67 insertions(+), 6 deletions(-) diff --git a/protos/feast/core/Aggregation.proto b/protos/feast/core/Aggregation.proto index d2d6cab7021..c1103b78769 100644 --- a/protos/feast/core/Aggregation.proto +++ b/protos/feast/core/Aggregation.proto @@ -12,4 +12,5 @@ message Aggregation { string function = 2; google.protobuf.Duration time_window = 3; google.protobuf.Duration slide_interval = 4; + string name = 5; } \ No newline at end of file diff --git a/sdk/python/feast/aggregation/__init__.py b/sdk/python/feast/aggregation/__init__.py index 470f91f09c2..f48731e5c32 100644 --- a/sdk/python/feast/aggregation/__init__.py +++ b/sdk/python/feast/aggregation/__init__.py @@ -21,12 +21,14 @@ class Aggregation: function: str # Provided built in aggregations sum, max, min, count mean time_window: timedelta # The time window for this aggregation. slide_interval: timedelta # The sliding window for these aggregations + name: str # Optional override for the output feature name (defaults to {function}_{column}) """ column: str function: str time_window: Optional[timedelta] slide_interval: Optional[timedelta] + name: str def __init__( self, @@ -34,6 +36,7 @@ def __init__( function: Optional[str] = "", time_window: Optional[timedelta] = None, slide_interval: Optional[timedelta] = None, + name: Optional[str] = None, ): self.column = column or "" self.function = function or "" @@ -42,6 +45,7 @@ def __init__( self.slide_interval = self.time_window else: self.slide_interval = slide_interval + self.name = name or "" def to_proto(self) -> AggregationProto: window_duration = None @@ -59,6 +63,7 @@ def to_proto(self) -> AggregationProto: function=self.function, time_window=window_duration, slide_interval=slide_interval_duration, + name=self.name, ) @classmethod @@ -79,6 +84,7 @@ def from_proto(cls, agg_proto: AggregationProto): function=agg_proto.function, time_window=time_window, slide_interval=slide_interval, + name=agg_proto.name or None, ) return aggregation @@ -91,6 +97,7 @@ def __eq__(self, other): or self.function != other.function or self.time_window != other.time_window or self.slide_interval != other.slide_interval + or self.name != other.name ): return False @@ -106,7 +113,7 @@ def aggregation_specs_to_agg_ops( for agg in agg_specs: if getattr(agg, "time_window", None) is not None: raise ValueError(time_window_unsupported_error_message) - alias = f"{agg.function}_{agg.column}" + alias = getattr(agg, "name", None) or f"{agg.function}_{agg.column}" agg_ops[alias] = (agg.function, agg.column) return agg_ops diff --git a/sdk/python/feast/protos/feast/core/Aggregation_pb2.py b/sdk/python/feast/protos/feast/core/Aggregation_pb2.py index 922f8f40aa2..48f107b8eff 100644 --- a/sdk/python/feast/protos/feast/core/Aggregation_pb2.py +++ b/sdk/python/feast/protos/feast/core/Aggregation_pb2.py @@ -15,7 +15,7 @@ from google.protobuf import duration_pb2 as google_dot_protobuf_dot_duration__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1c\x66\x65\x61st/core/Aggregation.proto\x12\nfeast.core\x1a\x1egoogle/protobuf/duration.proto\"\x92\x01\n\x0b\x41ggregation\x12\x0e\n\x06\x63olumn\x18\x01 \x01(\t\x12\x10\n\x08\x66unction\x18\x02 \x01(\t\x12.\n\x0btime_window\x18\x03 \x01(\x0b\x32\x19.google.protobuf.Duration\x12\x31\n\x0eslide_interval\x18\x04 \x01(\x0b\x32\x19.google.protobuf.DurationBU\n\x10\x66\x65\x61st.proto.coreB\x10\x41ggregationProtoZ/github.com/feast-dev/feast/go/protos/feast/coreb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1cfeast/core/Aggregation.proto\x12\nfeast.core\x1a\x1egoogle/protobuf/duration.proto\"\xd3\x01\n\x0bAggregation\x12\x16\n\x06column\x18\x01 \x01(\tR\x06column\x12\x1a\n\x08function\x18\x02 \x01(\tR\x08function\x12:\n\x0btime_window\x18\x03 \x01(\x0b2\x19.google.protobuf.DurationR\ntimeWindow\x12@\n\x0eslide_interval\x18\x04 \x01(\x0b2\x19.google.protobuf.DurationR\rslideInterval\x12\x12\n\x04name\x18\x05 \x01(\tR\x04nameBU\n\x10feast.proto.coreB\x10AggregationProtoZ/github.com/feast-dev/feast/go/protos/feast/coreb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -24,5 +24,5 @@ _globals['DESCRIPTOR']._options = None _globals['DESCRIPTOR']._serialized_options = b'\n\020feast.proto.coreB\020AggregationProtoZ/github.com/feast-dev/feast/go/protos/feast/core' _globals['_AGGREGATION']._serialized_start=77 - _globals['_AGGREGATION']._serialized_end=223 + _globals['_AGGREGATION']._serialized_end=288 # @@protoc_insertion_point(module_scope) diff --git a/sdk/python/feast/protos/feast/core/Aggregation_pb2.pyi b/sdk/python/feast/protos/feast/core/Aggregation_pb2.pyi index ceb8b1f8131..af9ec2b191f 100644 --- a/sdk/python/feast/protos/feast/core/Aggregation_pb2.pyi +++ b/sdk/python/feast/protos/feast/core/Aggregation_pb2.pyi @@ -22,8 +22,10 @@ class Aggregation(google.protobuf.message.Message): FUNCTION_FIELD_NUMBER: builtins.int TIME_WINDOW_FIELD_NUMBER: builtins.int SLIDE_INTERVAL_FIELD_NUMBER: builtins.int + NAME_FIELD_NUMBER: builtins.int column: builtins.str function: builtins.str + name: builtins.str @property def time_window(self) -> google.protobuf.duration_pb2.Duration: ... @property @@ -35,8 +37,9 @@ class Aggregation(google.protobuf.message.Message): function: builtins.str = ..., time_window: google.protobuf.duration_pb2.Duration | None = ..., slide_interval: google.protobuf.duration_pb2.Duration | None = ..., + name: builtins.str = ..., ) -> None: ... def HasField(self, field_name: typing_extensions.Literal["slide_interval", b"slide_interval", "time_window", b"time_window"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["column", b"column", "function", b"function", "slide_interval", b"slide_interval", "time_window", b"time_window"]) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["column", b"column", "function", b"function", "name", b"name", "slide_interval", b"slide_interval", "time_window", b"time_window"]) -> None: ... global___Aggregation = Aggregation diff --git a/sdk/python/tests/unit/test_aggregation_ops.py b/sdk/python/tests/unit/test_aggregation_ops.py index 0a5f0bd6ed5..d86bacd746f 100644 --- a/sdk/python/tests/unit/test_aggregation_ops.py +++ b/sdk/python/tests/unit/test_aggregation_ops.py @@ -1,13 +1,16 @@ +from datetime import timedelta + import pytest -from feast.aggregation import aggregation_specs_to_agg_ops +from feast.aggregation import Aggregation, aggregation_specs_to_agg_ops class DummyAggregation: - def __init__(self, *, function: str, column: str, time_window=None): + def __init__(self, *, function: str, column: str, time_window=None, name: str = ""): self.function = function self.column = column self.time_window = time_window + self.name = name def test_aggregation_specs_to_agg_ops_success(): @@ -42,3 +45,50 @@ def test_aggregation_specs_to_agg_ops_time_window_unsupported(error_message: str agg_specs, time_window_unsupported_error_message=error_message, ) + + +def test_aggregation_specs_to_agg_ops_custom_name(): + agg_specs = [ + DummyAggregation(function="sum", column="seconds_watched", name="sum_seconds_watched_per_ad_1d"), + ] + + agg_ops = aggregation_specs_to_agg_ops( + agg_specs, + time_window_unsupported_error_message="Time window aggregation is not supported.", + ) + + assert agg_ops == { + "sum_seconds_watched_per_ad_1d": ("sum", "seconds_watched"), + } + + +def test_aggregation_specs_to_agg_ops_mixed_names(): + agg_specs = [ + DummyAggregation(function="sum", column="trips", name="total_trips"), + DummyAggregation(function="mean", column="fare"), + ] + + agg_ops = aggregation_specs_to_agg_ops( + agg_specs, + time_window_unsupported_error_message="Time window aggregation is not supported.", + ) + + assert agg_ops == { + "total_trips": ("sum", "trips"), + "mean_fare": ("mean", "fare"), + } + + +def test_aggregation_round_trip_with_name(): + agg = Aggregation( + column="seconds_watched", + function="sum", + time_window=timedelta(days=1), + name="sum_seconds_watched_per_ad_1d", + ) + proto = agg.to_proto() + assert proto.name == "sum_seconds_watched_per_ad_1d" + + restored = Aggregation.from_proto(proto) + assert restored.name == "sum_seconds_watched_per_ad_1d" + assert restored == agg From a684be2affd7b3621380276f3a548342d5efffcb Mon Sep 17 00:00:00 2001 From: Nick Quinn Date: Mon, 9 Mar 2026 13:40:30 -0700 Subject: [PATCH 2/5] Fix lint-python test Signed-off-by: Nick Quinn --- sdk/python/tests/unit/test_aggregation_ops.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdk/python/tests/unit/test_aggregation_ops.py b/sdk/python/tests/unit/test_aggregation_ops.py index d86bacd746f..9a22da0ecec 100644 --- a/sdk/python/tests/unit/test_aggregation_ops.py +++ b/sdk/python/tests/unit/test_aggregation_ops.py @@ -49,7 +49,11 @@ def test_aggregation_specs_to_agg_ops_time_window_unsupported(error_message: str def test_aggregation_specs_to_agg_ops_custom_name(): agg_specs = [ - DummyAggregation(function="sum", column="seconds_watched", name="sum_seconds_watched_per_ad_1d"), + DummyAggregation( + function="sum", + column="seconds_watched", + name="sum_seconds_watched_per_ad_1d", + ), ] agg_ops = aggregation_specs_to_agg_ops( From 0bfaeaa57f3013f12e003c7d64db2d6d6a91cf2d Mon Sep 17 00:00:00 2001 From: Nick Quinn Date: Mon, 9 Mar 2026 14:02:31 -0700 Subject: [PATCH 3/5] Adding name to documentation Signed-off-by: Nick Quinn --- docs/getting-started/concepts/batch-feature-view.md | 3 ++- docs/getting-started/concepts/tiling.md | 13 +++++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/docs/getting-started/concepts/batch-feature-view.md b/docs/getting-started/concepts/batch-feature-view.md index 9d0eb86389c..7240763a07b 100644 --- a/docs/getting-started/concepts/batch-feature-view.md +++ b/docs/getting-started/concepts/batch-feature-view.md @@ -70,7 +70,7 @@ driver_fv = BatchFeatureView( Field(name="conv_rate", dtype=Float32), ], aggregations=[ - Aggregation(column="conv_rate", function="sum", time_window=timedelta(days=1)), + Aggregation(column="conv_rate", function="sum", time_window=timedelta(days=1), name="total_conv_rate_1d"), ], source=source, ) @@ -144,6 +144,7 @@ See: - `sink_source` is **required** when chaining views (i.e., `source` is another FeatureView or list of them). - Schema fields must be consistent with `sink_source`, `batch_source.field_mapping` if field mappings exist. - Aggregation logic must reference columns present in the raw source or transformed inputs. +- The output feature name for an aggregation defaults to `{function}_{column}` (e.g., `sum_conv_rate`). Use the `name` parameter to override it (e.g., `name="total_conv_rate_1d"`). --- diff --git a/docs/getting-started/concepts/tiling.md b/docs/getting-started/concepts/tiling.md index 5f9d225baa0..4ecdca14441 100644 --- a/docs/getting-started/concepts/tiling.md +++ b/docs/getting-started/concepts/tiling.md @@ -206,9 +206,9 @@ customer_features = StreamFeatureView( batch_source=file_source, # For historical data ), aggregations=[ - Aggregation(column="amount", function="sum", time_window=timedelta(hours=1)), - Aggregation(column="amount", function="avg", time_window=timedelta(hours=1)), - Aggregation(column="amount", function="std", time_window=timedelta(hours=1)), + Aggregation(column="amount", function="sum", time_window=timedelta(hours=1), name="sum_amount_1h"), + Aggregation(column="amount", function="avg", time_window=timedelta(hours=1), name="avg_amount_1h"), + Aggregation(column="amount", function="std", time_window=timedelta(hours=1), name="std_amount_1h"), ], timestamp_field="event_timestamp", online=True, @@ -229,7 +229,12 @@ customer_features = StreamFeatureView( ### Key Parameters -- `aggregations`: List of time-windowed aggregations to compute +- `aggregations`: List of time-windowed aggregations to compute. Each `Aggregation` accepts: + - `column`: source column to aggregate + - `function`: aggregation function (`sum`, `avg`, `mean`, `min`, `max`, `count`, `std`) + - `time_window`: duration of the aggregation window + - `slide_interval`: hop/slide size (defaults to `time_window`) + - `name` *(optional)*: output feature name. Defaults to `{function}_{column}` (e.g., `sum_amount`). Set this to use a custom name (e.g., `name="sum_amount_1h"`). - `timestamp_field`: Column name for timestamps (required when aggregations are specified) - `enable_tiling`: Enable tiling optimization (default: `False`) - Set to `True` for **streaming scenarios** From cf0a2745a6ec1a7f21fc3d35d529ca34ac61771e Mon Sep 17 00:00:00 2001 From: Nick Quinn Date: Mon, 9 Mar 2026 14:11:27 -0700 Subject: [PATCH 4/5] Devin feedback Signed-off-by: Nick Quinn --- sdk/python/feast/aggregation/__init__.py | 14 ++++++++++++++ .../feast/aggregation/tiling/orchestrator.py | 8 ++------ .../feast/aggregation/tiling/tile_subtraction.py | 4 +--- .../feast/infra/compute_engines/ray/nodes.py | 4 +--- .../feast/infra/compute_engines/spark/nodes.py | 8 ++------ 5 files changed, 20 insertions(+), 18 deletions(-) diff --git a/sdk/python/feast/aggregation/__init__.py b/sdk/python/feast/aggregation/__init__.py index f48731e5c32..27697866131 100644 --- a/sdk/python/feast/aggregation/__init__.py +++ b/sdk/python/feast/aggregation/__init__.py @@ -103,6 +103,20 @@ def __eq__(self, other): return True + def resolved_name(self, time_window: Optional[timedelta] = None) -> str: + """Return the output feature name for this aggregation. + + If ``name`` is set it is returned as-is. Otherwise the name is + derived as ``{function}_{column}``, with ``_{seconds}s`` appended + when *time_window* is provided. + """ + if self.name: + return self.name + base = f"{self.function}_{self.column}" + if time_window is not None: + return f"{base}_{int(time_window.total_seconds())}s" + return base + def aggregation_specs_to_agg_ops( agg_specs: Iterable[Any], diff --git a/sdk/python/feast/aggregation/tiling/orchestrator.py b/sdk/python/feast/aggregation/tiling/orchestrator.py index 16e047f465c..fbec5568799 100644 --- a/sdk/python/feast/aggregation/tiling/orchestrator.py +++ b/sdk/python/feast/aggregation/tiling/orchestrator.py @@ -61,9 +61,7 @@ def apply_sawtooth_window_tiling( ir_metadata_dict = {} for agg in aggregations: - feature_name = ( - f"{agg.function}_{agg.column}_{int(window_size.total_seconds())}s" - ) + feature_name = agg.resolved_name(window_size) _, metadata = get_ir_metadata_for_aggregation(agg, feature_name) ir_metadata_dict[feature_name] = metadata @@ -161,9 +159,7 @@ def apply_sawtooth_window_tiling( # Step 5: Compute final feature values from IRs (for algebraic aggs, just rename) for agg in aggregations: - feature_name = ( - f"{agg.function}_{agg.column}_{int(window_size.total_seconds())}s" - ) + feature_name = agg.resolved_name(window_size) metadata = ir_metadata_dict[feature_name] if metadata.type == "algebraic": diff --git a/sdk/python/feast/aggregation/tiling/tile_subtraction.py b/sdk/python/feast/aggregation/tiling/tile_subtraction.py index 9dee478effa..778e05ecfb2 100644 --- a/sdk/python/feast/aggregation/tiling/tile_subtraction.py +++ b/sdk/python/feast/aggregation/tiling/tile_subtraction.py @@ -76,9 +76,7 @@ def convert_cumulative_to_windowed( # Subtract previous tile values from current tile for each aggregation for agg in aggregations: - feature_name = ( - f"{agg.function}_{agg.column}_{int(window_size.total_seconds())}s" - ) + feature_name = agg.resolved_name(window_size) _, metadata = get_ir_metadata_for_aggregation(agg, feature_name) if metadata.type == "algebraic": diff --git a/sdk/python/feast/infra/compute_engines/ray/nodes.py b/sdk/python/feast/infra/compute_engines/ray/nodes.py index 7d600728adf..c4eaa54d26a 100644 --- a/sdk/python/feast/infra/compute_engines/ray/nodes.py +++ b/sdk/python/feast/infra/compute_engines/ray/nodes.py @@ -461,9 +461,7 @@ def _execute_standard_aggregation(self, dataset: Dataset) -> DAGValue: # Convert aggregations to Ray's groupby format agg_dict = {} for agg in self.aggregations: - feature_name = f"{agg.function}_{agg.column}" - if agg.time_window: - feature_name += f"_{int(agg.time_window.total_seconds())}s" + feature_name = agg.resolved_name(agg.time_window) if agg.function == "count": agg_dict[feature_name] = (agg.column, "count") diff --git a/sdk/python/feast/infra/compute_engines/spark/nodes.py b/sdk/python/feast/infra/compute_engines/spark/nodes.py index d5463a9a16d..d44764e7b9b 100644 --- a/sdk/python/feast/infra/compute_engines/spark/nodes.py +++ b/sdk/python/feast/infra/compute_engines/spark/nodes.py @@ -311,7 +311,7 @@ def _execute_tiled_aggregation(self, input_df: DataFrame) -> DAGValue: expected_columns = entity_keys + [self.timestamp_col] for time_window, window_aggs in aggs_by_window.items(): for agg in window_aggs: - feature_name = f"{agg.function}_{agg.column}_{int(time_window.total_seconds())}s" + feature_name = agg.resolved_name(time_window) if feature_name not in expected_columns: expected_columns.append(feature_name) @@ -372,11 +372,7 @@ def _execute_standard_aggregation(self, input_df: DataFrame) -> DAGValue: agg_exprs = [] for agg in self.aggregations: func = getattr(F, agg.function) - expr = func(agg.column).alias( - f"{agg.function}_{agg.column}_{int(agg.time_window.total_seconds())}s" - if agg.time_window - else f"{agg.function}_{agg.column}" - ) + expr = func(agg.column).alias(agg.resolved_name(agg.time_window)) agg_exprs.append(expr) if any(agg.time_window for agg in self.aggregations): From 001952ddcc0420245816f82ffb0a9ac6316e2971 Mon Sep 17 00:00:00 2001 From: nquinn408 <57655411+nquinn408@users.noreply.github.com> Date: Mon, 9 Mar 2026 14:19:36 -0700 Subject: [PATCH 5/5] Update sdk/python/feast/aggregation/__init__.py Co-authored-by: devin-ai-integration[bot] <158243242+devin-ai-integration[bot]@users.noreply.github.com> Signed-off-by: Nick Quinn --- sdk/python/feast/aggregation/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/aggregation/__init__.py b/sdk/python/feast/aggregation/__init__.py index 27697866131..8edb51bb76b 100644 --- a/sdk/python/feast/aggregation/__init__.py +++ b/sdk/python/feast/aggregation/__init__.py @@ -113,7 +113,7 @@ def resolved_name(self, time_window: Optional[timedelta] = None) -> str: if self.name: return self.name base = f"{self.function}_{self.column}" - if time_window is not None: + if time_window is not None and time_window.total_seconds() > 0: return f"{base}_{int(time_window.total_seconds())}s" return base