diff --git a/.secrets.baseline b/.secrets.baseline index 70829ab7dee..1480d3529e4 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -1451,7 +1451,7 @@ "filename": "sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py", "hashed_secret": "5baa61e4c9b93f3f0682250b6cf8331b7ee68fd8", "is_verified": false, - "line_number": 20 + "line_number": 21 } ], "sdk/python/tests/unit/infra/offline_stores/test_offline_store.py": [ diff --git a/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse.py b/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse.py index 5e8cf3d9053..723869c6bd1 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse.py +++ b/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse.py @@ -31,6 +31,7 @@ from feast.infra.utils.clickhouse.clickhouse_config import ClickhouseConfig from feast.infra.utils.clickhouse.connection_utils import get_client from feast.saved_dataset import SavedDatasetStorage +from feast.utils import _utc_now, make_tzaware class ClickhouseOfflineStoreConfig(ClickhouseConfig): @@ -43,15 +44,26 @@ def get_historical_features( config: RepoConfig, feature_views: List[FeatureView], feature_refs: List[str], - entity_df: Union[pd.DataFrame, str], + entity_df: Optional[Union[pd.DataFrame, str]], registry: BaseRegistry, project: str, full_feature_names: bool = False, + **kwargs, ) -> RetrievalJob: assert isinstance(config.offline_store, ClickhouseOfflineStoreConfig) for fv in feature_views: assert isinstance(fv.batch_source, ClickhouseSource) + # Handle non-entity retrieval mode + if entity_df is None: + end_date = kwargs.get("end_date", None) + if end_date is None: + end_date = _utc_now() + else: + end_date = make_tzaware(end_date) + + entity_df = pd.DataFrame({"event_timestamp": [end_date]}) + entity_schema = _get_entity_schema(entity_df, config) entity_df_event_timestamp_col = ( diff --git a/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/tests/data_source.py index 3fe59c5a481..4c6068fd6bd 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/tests/data_source.py @@ -118,6 +118,18 @@ def teardown(self): pass +def _make_offline_store_config(clickhouse_container): + """Build a ClickhouseOfflineStoreConfig pointing at the test container.""" + return ClickhouseOfflineStoreConfig( + type="clickhouse", + host=clickhouse_container.get_container_host_ip(), + port=clickhouse_container.get_exposed_port(8123), + database=CLICKHOUSE_OFFLINE_DB, + user=CLICKHOUSE_USER, + password=CLICKHOUSE_PASSWORD, + ) + + def test_get_client_with_additional_params(clickhouse_container): """ Test that get_client works with a real ClickHouse container and properly passes @@ -142,3 +154,71 @@ def test_get_client_with_additional_params(clickhouse_container): # Verify the send_receive_timeout was applied assert client.timeout._read == 60 + + +def test_non_entity_retrieval(clickhouse_container): + """Integration test: get_historical_features with entity_df=None returns real data.""" + from datetime import datetime, timedelta, timezone + from unittest.mock import MagicMock + + from feast.feature_view import FeatureView, Field + from feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse import ( + ClickhouseOfflineStore, + df_to_clickhouse_table, + ) + from feast.repo_config import RepoConfig + from feast.types import Float32 + + offline_config = _make_offline_store_config(clickhouse_container) + repo_config = RepoConfig( + project="test_project", + registry="test_registry", + provider="local", + offline_store=offline_config, + ) + + # Seed a feature table with real data + now = datetime.now(tz=timezone.utc) + feature_df = pd.DataFrame( + { + "event_timestamp": [now - timedelta(hours=2), now - timedelta(hours=1)], + "feature_value": [1.0, 2.0], + } + ) + table_name = "test_non_entity_features" + client = get_client(offline_config) + client.command(f"DROP TABLE IF EXISTS {table_name}") + df_to_clickhouse_table(offline_config, feature_df, table_name, "event_timestamp") + + source = ClickhouseSource( + name=table_name, + table=table_name, + timestamp_field="event_timestamp", + ) + fv = FeatureView( + name="test_fv", + entities=[], + ttl=timedelta(days=1), + source=source, + schema=[Field(name="feature_value", dtype=Float32)], + ) + + registry = MagicMock() + registry.list_on_demand_feature_views.return_value = [] + + job = ClickhouseOfflineStore.get_historical_features( + config=repo_config, + feature_views=[fv], + feature_refs=["test_fv:feature_value"], + entity_df=None, + registry=registry, + project="test_project", + end_date=now, + ) + + result_df = job.to_df() + assert len(result_df) > 0 + assert "feature_value" in result_df.columns + + # Cleanup + client.command(f"DROP TABLE IF EXISTS {table_name}") diff --git a/sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py b/sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py index f5440ed367d..7789cde72b3 100644 --- a/sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py +++ b/sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py @@ -1,5 +1,6 @@ import logging import threading +from datetime import datetime, timedelta, timezone from unittest.mock import MagicMock, patch import pytest @@ -133,3 +134,109 @@ def test_clickhouse_config_handles_none_additional_client_args(): config = ClickhouseConfig(**raw_config) assert config.additional_client_args is None + + +class TestNonEntityRetrieval: + """Test the non-entity retrieval logic (entity_df=None) for ClickHouse.""" + + _MODULE = "feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse" + + def _call_get_historical_features(self, feature_views, **kwargs): + """Call get_historical_features with entity_df=None, mocking the pipeline.""" + from feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse import ( + ClickhouseOfflineStore, + ClickhouseOfflineStoreConfig, + ) + from feast.repo_config import RepoConfig + + config = RepoConfig( + project="test_project", + registry="test_registry", + provider="local", + offline_store=ClickhouseOfflineStoreConfig( + type="clickhouse", + host="localhost", + port=9000, + database="test_db", + user="default", + password="password", + ), + ) + + end = kwargs.get("end_date", datetime(2023, 1, 7, tzinfo=timezone.utc)) + + with ( + patch.multiple( + self._MODULE, + _upload_entity_df=MagicMock(), + _get_entity_schema=MagicMock( + return_value={"event_timestamp": "timestamp"} + ), + _get_entity_df_event_timestamp_range=MagicMock( + return_value=(end - timedelta(days=1), end) + ), + ), + patch( + f"{self._MODULE}.offline_utils.get_expected_join_keys", + return_value=[], + ), + patch( + f"{self._MODULE}.offline_utils.assert_expected_columns_in_entity_df", + ), + patch( + f"{self._MODULE}.offline_utils.get_feature_view_query_context", + return_value=[], + ), + ): + refs = [f"{fv.name}:feature1" for fv in feature_views] + return ClickhouseOfflineStore.get_historical_features( + config=config, + feature_views=feature_views, + feature_refs=refs, + entity_df=None, + registry=MagicMock(), + project="test_project", + **kwargs, + ) + + @staticmethod + def _make_feature_view(name, ttl=None): + from feast.entity import Entity + from feast.feature_view import FeatureView, Field + from feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse_source import ( + ClickhouseSource, + ) + from feast.types import Float32 + + return FeatureView( + name=name, + entities=[Entity(name="driver_id", join_keys=["driver_id"])], + ttl=ttl, + source=ClickhouseSource( + name=f"{name}_source", + table=f"{name}_table", + timestamp_field="event_timestamp", + ), + schema=[ + Field(name="feature1", dtype=Float32), + ], + ) + + def test_non_entity_mode_with_end_date(self): + """entity_df=None with explicit end_date produces a valid RetrievalJob.""" + from feast.infra.offline_stores.offline_store import RetrievalJob + + fv = self._make_feature_view("test_fv") + job = self._call_get_historical_features( + [fv], + end_date=datetime(2023, 1, 7, tzinfo=timezone.utc), + ) + assert isinstance(job, RetrievalJob) + + def test_non_entity_mode_defaults_end_date(self): + """entity_df=None without end_date defaults to now.""" + from feast.infra.offline_stores.offline_store import RetrievalJob + + fv = self._make_feature_view("test_fv") + job = self._call_get_historical_features([fv]) + assert isinstance(job, RetrievalJob)