Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -1451,7 +1451,7 @@
"filename": "sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py",
"hashed_secret": "5baa61e4c9b93f3f0682250b6cf8331b7ee68fd8",
"is_verified": false,
"line_number": 20
"line_number": 21
}
],
"sdk/python/tests/unit/infra/offline_stores/test_offline_store.py": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from feast.infra.utils.clickhouse.clickhouse_config import ClickhouseConfig
from feast.infra.utils.clickhouse.connection_utils import get_client
from feast.saved_dataset import SavedDatasetStorage
from feast.utils import _utc_now, make_tzaware


class ClickhouseOfflineStoreConfig(ClickhouseConfig):
Expand All @@ -43,15 +44,26 @@ def get_historical_features(
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
entity_df: Optional[Union[pd.DataFrame, str]],
registry: BaseRegistry,
project: str,
full_feature_names: bool = False,
**kwargs,
) -> RetrievalJob:
assert isinstance(config.offline_store, ClickhouseOfflineStoreConfig)
for fv in feature_views:
assert isinstance(fv.batch_source, ClickhouseSource)

# Handle non-entity retrieval mode
if entity_df is None:
end_date = kwargs.get("end_date", None)
if end_date is None:
end_date = _utc_now()
else:
end_date = make_tzaware(end_date)

entity_df = pd.DataFrame({"event_timestamp": [end_date]})
Comment on lines +57 to +65
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Non-entity retrieval silently ignores start_date kwarg unlike Postgres counterpart

When entity_df is None, the Clickhouse get_historical_features only reads end_date from kwargs (clickhouse.py:59) and completely ignores start_date. The caller feature_store.py:1369-1370 passes start_date as a kwarg when the user provides it. The Postgres implementation (postgres.py:132-160), which this code is modeled after, uses start_date to compute the entity_df timestamp and to bound the TTL-based data scan window. In the Clickhouse version, a user-provided start_date is silently dropped, meaning the point-in-time join will use end_date as the sole entity timestamp regardless of the user's intent — potentially returning different (and unexpected) feature data compared to the Postgres offline store for the same inputs.

Prompt for agents
In sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse.py, lines 57-65, add handling for the start_date kwarg to match the Postgres implementation at sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py lines 132-168. Specifically:

1. Before the `if entity_df is None:` block, extract start_date from kwargs: `start_date = kwargs.get("start_date", None)`
2. Inside the block, after computing end_date, add logic to compute start_date from TTL if not provided (matching postgres.py lines 145-160):
   - If start_date is None, find the max TTL across feature_views and set start_date = end_date - max_ttl (or default to 30 days)
   - If start_date is provided, make it tz-aware with make_tzaware(start_date)
3. You will also need to import timedelta from datetime at the top of the file.
4. Consider whether the entity_df should use start_date or end_date as the event_timestamp (the Postgres version uses start_date via pd.date_range[:1], while the current Clickhouse version uses end_date).
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YassinNouh21 This seems critical issue.

Copy link
Collaborator Author

@YassinNouh21 YassinNouh21 Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a bug — this is intentional. The PIT join uses MAX(entity_timestamp) as the upper bound, so the timestamp in the synthetic entity_df IS the query upper bound. Using [end_date] gives the window [end_date - TTL, end_date], which is correct. The Postgres implementation using pd.date_range(start=start_date, ...)[:1] actually has the bug — it takes start_date as the sole timestamp, making end_date unreachable. Our implementation matches Dask and is the correct behavior.
@ntkathole

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ntkathole sounds like there's a bug in postgres???

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I noticed that

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will get resolved via #6057 cc @Vperiodt


entity_schema = _get_entity_schema(entity_df, config)

entity_df_event_timestamp_col = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,18 @@ def teardown(self):
pass


def _make_offline_store_config(clickhouse_container):
"""Build a ClickhouseOfflineStoreConfig pointing at the test container."""
return ClickhouseOfflineStoreConfig(
type="clickhouse",
host=clickhouse_container.get_container_host_ip(),
port=clickhouse_container.get_exposed_port(8123),
database=CLICKHOUSE_OFFLINE_DB,
user=CLICKHOUSE_USER,
password=CLICKHOUSE_PASSWORD,
)


def test_get_client_with_additional_params(clickhouse_container):
"""
Test that get_client works with a real ClickHouse container and properly passes
Expand All @@ -142,3 +154,71 @@ def test_get_client_with_additional_params(clickhouse_container):

# Verify the send_receive_timeout was applied
assert client.timeout._read == 60


def test_non_entity_retrieval(clickhouse_container):
"""Integration test: get_historical_features with entity_df=None returns real data."""
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock

from feast.feature_view import FeatureView, Field
from feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse import (
ClickhouseOfflineStore,
df_to_clickhouse_table,
)
from feast.repo_config import RepoConfig
from feast.types import Float32

offline_config = _make_offline_store_config(clickhouse_container)
repo_config = RepoConfig(
project="test_project",
registry="test_registry",
provider="local",
offline_store=offline_config,
)

# Seed a feature table with real data
now = datetime.now(tz=timezone.utc)
feature_df = pd.DataFrame(
{
"event_timestamp": [now - timedelta(hours=2), now - timedelta(hours=1)],
"feature_value": [1.0, 2.0],
}
)
table_name = "test_non_entity_features"
client = get_client(offline_config)
client.command(f"DROP TABLE IF EXISTS {table_name}")
df_to_clickhouse_table(offline_config, feature_df, table_name, "event_timestamp")

source = ClickhouseSource(
name=table_name,
table=table_name,
timestamp_field="event_timestamp",
)
fv = FeatureView(
name="test_fv",
entities=[],
ttl=timedelta(days=1),
source=source,
schema=[Field(name="feature_value", dtype=Float32)],
)

registry = MagicMock()
registry.list_on_demand_feature_views.return_value = []

job = ClickhouseOfflineStore.get_historical_features(
config=repo_config,
feature_views=[fv],
feature_refs=["test_fv:feature_value"],
entity_df=None,
registry=registry,
project="test_project",
end_date=now,
)

result_df = job.to_df()
assert len(result_df) > 0
assert "feature_value" in result_df.columns

# Cleanup
client.command(f"DROP TABLE IF EXISTS {table_name}")
107 changes: 107 additions & 0 deletions sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import threading
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock, patch

import pytest
Expand Down Expand Up @@ -133,3 +134,109 @@ def test_clickhouse_config_handles_none_additional_client_args():
config = ClickhouseConfig(**raw_config)

assert config.additional_client_args is None


class TestNonEntityRetrieval:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests seems over-mocked

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, removed the heavy mocking. For proper coverage, an integration test against a real ClickHouse instance would be more valuable than over-mocked unit tests — the unit tests here may not be necessary at all.

"""Test the non-entity retrieval logic (entity_df=None) for ClickHouse."""

_MODULE = "feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse"

def _call_get_historical_features(self, feature_views, **kwargs):
"""Call get_historical_features with entity_df=None, mocking the pipeline."""
from feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse import (
ClickhouseOfflineStore,
ClickhouseOfflineStoreConfig,
)
from feast.repo_config import RepoConfig

config = RepoConfig(
project="test_project",
registry="test_registry",
provider="local",
offline_store=ClickhouseOfflineStoreConfig(
type="clickhouse",
host="localhost",
port=9000,
database="test_db",
user="default",
password="password",
),
)

end = kwargs.get("end_date", datetime(2023, 1, 7, tzinfo=timezone.utc))

with (
patch.multiple(
self._MODULE,
_upload_entity_df=MagicMock(),
_get_entity_schema=MagicMock(
return_value={"event_timestamp": "timestamp"}
),
_get_entity_df_event_timestamp_range=MagicMock(
return_value=(end - timedelta(days=1), end)
),
),
patch(
f"{self._MODULE}.offline_utils.get_expected_join_keys",
return_value=[],
),
patch(
f"{self._MODULE}.offline_utils.assert_expected_columns_in_entity_df",
),
patch(
f"{self._MODULE}.offline_utils.get_feature_view_query_context",
return_value=[],
),
):
refs = [f"{fv.name}:feature1" for fv in feature_views]
return ClickhouseOfflineStore.get_historical_features(
config=config,
feature_views=feature_views,
feature_refs=refs,
entity_df=None,
registry=MagicMock(),
project="test_project",
**kwargs,
)

@staticmethod
def _make_feature_view(name, ttl=None):
from feast.entity import Entity
from feast.feature_view import FeatureView, Field
from feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse_source import (
ClickhouseSource,
)
from feast.types import Float32

return FeatureView(
name=name,
entities=[Entity(name="driver_id", join_keys=["driver_id"])],
ttl=ttl,
source=ClickhouseSource(
name=f"{name}_source",
table=f"{name}_table",
timestamp_field="event_timestamp",
),
schema=[
Field(name="feature1", dtype=Float32),
],
)

def test_non_entity_mode_with_end_date(self):
"""entity_df=None with explicit end_date produces a valid RetrievalJob."""
from feast.infra.offline_stores.offline_store import RetrievalJob

fv = self._make_feature_view("test_fv")
job = self._call_get_historical_features(
[fv],
end_date=datetime(2023, 1, 7, tzinfo=timezone.utc),
)
assert isinstance(job, RetrievalJob)

def test_non_entity_mode_defaults_end_date(self):
"""entity_df=None without end_date defaults to now."""
from feast.infra.offline_stores.offline_store import RetrievalJob

fv = self._make_feature_view("test_fv")
job = self._call_get_historical_features([fv])
assert isinstance(job, RetrievalJob)
Loading