Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/getting-started/concepts/batch-feature-view.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ driver_fv = BatchFeatureView(
Field(name="conv_rate", dtype=Float32),
],
aggregations=[
Aggregation(column="conv_rate", function="sum", time_window=timedelta(days=1)),
Aggregation(column="conv_rate", function="sum", time_window=timedelta(days=1), name="total_conv_rate_1d"),
],
source=source,
)
Expand Down Expand Up @@ -145,6 +145,7 @@ See:
- `source` is optional; if omitted (`None`), the feature view has no associated batch data source.
- Schema fields must be consistent with `sink_source`, `batch_source.field_mapping` if field mappings exist.
- Aggregation logic must reference columns present in the raw source or transformed inputs.
- The output feature name for an aggregation defaults to `{function}_{column}` (e.g., `sum_conv_rate`). Use the `name` parameter to override it (e.g., `name="total_conv_rate_1d"`).

---

Expand Down
13 changes: 9 additions & 4 deletions docs/getting-started/concepts/tiling.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,9 @@ customer_features = StreamFeatureView(
batch_source=file_source, # For historical data
),
aggregations=[
Aggregation(column="amount", function="sum", time_window=timedelta(hours=1)),
Aggregation(column="amount", function="avg", time_window=timedelta(hours=1)),
Aggregation(column="amount", function="std", time_window=timedelta(hours=1)),
Aggregation(column="amount", function="sum", time_window=timedelta(hours=1), name="sum_amount_1h"),
Aggregation(column="amount", function="avg", time_window=timedelta(hours=1), name="avg_amount_1h"),
Aggregation(column="amount", function="std", time_window=timedelta(hours=1), name="std_amount_1h"),
],
timestamp_field="event_timestamp",
online=True,
Expand All @@ -229,7 +229,12 @@ customer_features = StreamFeatureView(

### Key Parameters

- `aggregations`: List of time-windowed aggregations to compute
- `aggregations`: List of time-windowed aggregations to compute. Each `Aggregation` accepts:
- `column`: source column to aggregate
- `function`: aggregation function (`sum`, `avg`, `mean`, `min`, `max`, `count`, `std`)
- `time_window`: duration of the aggregation window
- `slide_interval`: hop/slide size (defaults to `time_window`)
- `name` *(optional)*: output feature name. Defaults to `{function}_{column}` (e.g., `sum_amount`). Set this to use a custom name (e.g., `name="sum_amount_1h"`).
- `timestamp_field`: Column name for timestamps (required when aggregations are specified)
- `enable_tiling`: Enable tiling optimization (default: `False`)
- Set to `True` for **streaming scenarios**
Expand Down
1 change: 1 addition & 0 deletions protos/feast/core/Aggregation.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ message Aggregation {
string function = 2;
google.protobuf.Duration time_window = 3;
google.protobuf.Duration slide_interval = 4;
string name = 5;
}
23 changes: 22 additions & 1 deletion sdk/python/feast/aggregation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,22 @@ class Aggregation:
function: str # Provided built in aggregations sum, max, min, count mean
time_window: timedelta # The time window for this aggregation.
slide_interval: timedelta # The sliding window for these aggregations
name: str # Optional override for the output feature name (defaults to {function}_{column})
"""

column: str
function: str
time_window: Optional[timedelta]
slide_interval: Optional[timedelta]
name: str

def __init__(
self,
column: Optional[str] = "",
function: Optional[str] = "",
time_window: Optional[timedelta] = None,
slide_interval: Optional[timedelta] = None,
name: Optional[str] = None,
):
self.column = column or ""
self.function = function or ""
Expand All @@ -42,6 +45,7 @@ def __init__(
self.slide_interval = self.time_window
else:
self.slide_interval = slide_interval
self.name = name or ""

def to_proto(self) -> AggregationProto:
window_duration = None
Expand All @@ -59,6 +63,7 @@ def to_proto(self) -> AggregationProto:
function=self.function,
time_window=window_duration,
slide_interval=slide_interval_duration,
name=self.name,
)

@classmethod
Expand All @@ -79,6 +84,7 @@ def from_proto(cls, agg_proto: AggregationProto):
function=agg_proto.function,
time_window=time_window,
slide_interval=slide_interval,
name=agg_proto.name or None,
)
return aggregation

Expand All @@ -91,11 +97,26 @@ def __eq__(self, other):
or self.function != other.function
or self.time_window != other.time_window
or self.slide_interval != other.slide_interval
or self.name != other.name
):
return False

return True

def resolved_name(self, time_window: Optional[timedelta] = None) -> str:
"""Return the output feature name for this aggregation.

If ``name`` is set it is returned as-is. Otherwise the name is
derived as ``{function}_{column}``, with ``_{seconds}s`` appended
when *time_window* is provided.
"""
if self.name:
return self.name
base = f"{self.function}_{self.column}"
if time_window is not None and time_window.total_seconds() > 0:
return f"{base}_{int(time_window.total_seconds())}s"
return base


def aggregation_specs_to_agg_ops(
agg_specs: Iterable[Any],
Expand All @@ -106,7 +127,7 @@ def aggregation_specs_to_agg_ops(
for agg in agg_specs:
if getattr(agg, "time_window", None) is not None:
raise ValueError(time_window_unsupported_error_message)
alias = f"{agg.function}_{agg.column}"
alias = getattr(agg, "name", None) or f"{agg.function}_{agg.column}"
agg_ops[alias] = (agg.function, agg.column)
return agg_ops

Expand Down
8 changes: 2 additions & 6 deletions sdk/python/feast/aggregation/tiling/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ def apply_sawtooth_window_tiling(
ir_metadata_dict = {}

for agg in aggregations:
feature_name = (
f"{agg.function}_{agg.column}_{int(window_size.total_seconds())}s"
)
feature_name = agg.resolved_name(window_size)
_, metadata = get_ir_metadata_for_aggregation(agg, feature_name)
ir_metadata_dict[feature_name] = metadata

Expand Down Expand Up @@ -161,9 +159,7 @@ def apply_sawtooth_window_tiling(

# Step 5: Compute final feature values from IRs (for algebraic aggs, just rename)
for agg in aggregations:
feature_name = (
f"{agg.function}_{agg.column}_{int(window_size.total_seconds())}s"
)
feature_name = agg.resolved_name(window_size)
metadata = ir_metadata_dict[feature_name]

if metadata.type == "algebraic":
Expand Down
4 changes: 1 addition & 3 deletions sdk/python/feast/aggregation/tiling/tile_subtraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,7 @@ def convert_cumulative_to_windowed(

# Subtract previous tile values from current tile for each aggregation
for agg in aggregations:
feature_name = (
f"{agg.function}_{agg.column}_{int(window_size.total_seconds())}s"
)
feature_name = agg.resolved_name(window_size)
_, metadata = get_ir_metadata_for_aggregation(agg, feature_name)

if metadata.type == "algebraic":
Expand Down
4 changes: 1 addition & 3 deletions sdk/python/feast/infra/compute_engines/ray/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,9 +461,7 @@ def _execute_standard_aggregation(self, dataset: Dataset) -> DAGValue:
# Convert aggregations to Ray's groupby format
agg_dict = {}
for agg in self.aggregations:
feature_name = f"{agg.function}_{agg.column}"
if agg.time_window:
feature_name += f"_{int(agg.time_window.total_seconds())}s"
feature_name = agg.resolved_name(agg.time_window)

if agg.function == "count":
agg_dict[feature_name] = (agg.column, "count")
Expand Down
8 changes: 2 additions & 6 deletions sdk/python/feast/infra/compute_engines/spark/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ def _execute_tiled_aggregation(self, input_df: DataFrame) -> DAGValue:
expected_columns = entity_keys + [self.timestamp_col]
for time_window, window_aggs in aggs_by_window.items():
for agg in window_aggs:
feature_name = f"{agg.function}_{agg.column}_{int(time_window.total_seconds())}s"
feature_name = agg.resolved_name(time_window)
if feature_name not in expected_columns:
expected_columns.append(feature_name)

Expand Down Expand Up @@ -372,11 +372,7 @@ def _execute_standard_aggregation(self, input_df: DataFrame) -> DAGValue:
agg_exprs = []
for agg in self.aggregations:
func = getattr(F, agg.function)
expr = func(agg.column).alias(
f"{agg.function}_{agg.column}_{int(agg.time_window.total_seconds())}s"
if agg.time_window
else f"{agg.function}_{agg.column}"
)
expr = func(agg.column).alias(agg.resolved_name(agg.time_window))
agg_exprs.append(expr)

if any(agg.time_window for agg in self.aggregations):
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/protos/feast/core/Aggregation_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion sdk/python/feast/protos/feast/core/Aggregation_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ class Aggregation(google.protobuf.message.Message):
FUNCTION_FIELD_NUMBER: builtins.int
TIME_WINDOW_FIELD_NUMBER: builtins.int
SLIDE_INTERVAL_FIELD_NUMBER: builtins.int
NAME_FIELD_NUMBER: builtins.int
column: builtins.str
function: builtins.str
name: builtins.str
@property
def time_window(self) -> google.protobuf.duration_pb2.Duration: ...
@property
Expand All @@ -35,8 +37,9 @@ class Aggregation(google.protobuf.message.Message):
function: builtins.str = ...,
time_window: google.protobuf.duration_pb2.Duration | None = ...,
slide_interval: google.protobuf.duration_pb2.Duration | None = ...,
name: builtins.str = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["slide_interval", b"slide_interval", "time_window", b"time_window"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["column", b"column", "function", b"function", "slide_interval", b"slide_interval", "time_window", b"time_window"]) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["column", b"column", "function", b"function", "name", b"name", "slide_interval", b"slide_interval", "time_window", b"time_window"]) -> None: ...

global___Aggregation = Aggregation
58 changes: 56 additions & 2 deletions sdk/python/tests/unit/test_aggregation_ops.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from datetime import timedelta

import pytest

from feast.aggregation import aggregation_specs_to_agg_ops
from feast.aggregation import Aggregation, aggregation_specs_to_agg_ops


class DummyAggregation:
def __init__(self, *, function: str, column: str, time_window=None):
def __init__(self, *, function: str, column: str, time_window=None, name: str = ""):
self.function = function
self.column = column
self.time_window = time_window
self.name = name


def test_aggregation_specs_to_agg_ops_success():
Expand Down Expand Up @@ -42,3 +45,54 @@ def test_aggregation_specs_to_agg_ops_time_window_unsupported(error_message: str
agg_specs,
time_window_unsupported_error_message=error_message,
)


def test_aggregation_specs_to_agg_ops_custom_name():
agg_specs = [
DummyAggregation(
function="sum",
column="seconds_watched",
name="sum_seconds_watched_per_ad_1d",
),
]

agg_ops = aggregation_specs_to_agg_ops(
agg_specs,
time_window_unsupported_error_message="Time window aggregation is not supported.",
)

assert agg_ops == {
"sum_seconds_watched_per_ad_1d": ("sum", "seconds_watched"),
}


def test_aggregation_specs_to_agg_ops_mixed_names():
agg_specs = [
DummyAggregation(function="sum", column="trips", name="total_trips"),
DummyAggregation(function="mean", column="fare"),
]

agg_ops = aggregation_specs_to_agg_ops(
agg_specs,
time_window_unsupported_error_message="Time window aggregation is not supported.",
)

assert agg_ops == {
"total_trips": ("sum", "trips"),
"mean_fare": ("mean", "fare"),
}


def test_aggregation_round_trip_with_name():
agg = Aggregation(
column="seconds_watched",
function="sum",
time_window=timedelta(days=1),
name="sum_seconds_watched_per_ad_1d",
)
proto = agg.to_proto()
assert proto.name == "sum_seconds_watched_per_ad_1d"

restored = Aggregation.from_proto(proto)
assert restored.name == "sum_seconds_watched_per_ad_1d"
assert restored == agg
Loading