Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/online_stores/eg_valkey.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ def online_write_batch(
ttl = online_store_config.key_ttl_seconds
if ttl:
pipe.expire(name=valkey_key_bin, time=ttl)
results = pipe.execute()
results = pipe.execute()
if progress:
progress(len(results))

Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/infra/online_stores/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,10 +351,10 @@ def online_write_batch(
num_cmds += 2
if num_cmds >= num_cmds_per_pipeline_execute:
# TODO: May be add retries with backoff
pipe.execute() # flush
results = pipe.execute() # flush
num_cmds = 0
if num_cmds:
pipe.execute() # flush any remaining data in the last batch
results = pipe.execute() # flush any remaining data in the last batch
else:
# check if a previous record under the key bin exists
# TODO: investigate if check and set is a better approach rather than pulling all entity ts and then setting
Expand Down Expand Up @@ -400,7 +400,7 @@ def online_write_batch(
ttl = online_store_config.key_ttl_seconds
if ttl:
pipe.expire(name=redis_key_bin, time=ttl)
results = pipe.execute()
results = pipe.execute()
if progress:
progress(len(results))

Expand Down
54 changes: 26 additions & 28 deletions sdk/python/tests/unit/infra/online_store/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def base_repo_config_kwargs():


@pytest.fixture
def repo_config(base_repo_config_kwargs) -> RepoConfig:
def repo_config_without_docker_connection_string(base_repo_config_kwargs) -> RepoConfig:
return RepoConfig(
**base_repo_config_kwargs,
online_store=RedisOnlineStoreConfig(
Expand All @@ -56,9 +56,7 @@ def repo_config(base_repo_config_kwargs) -> RepoConfig:


@pytest.fixture
def repo_config_with_docker_connection_string(
redis_online_store_config, base_repo_config_kwargs
) -> RepoConfig:
def repo_config(redis_online_store_config, base_repo_config_kwargs) -> RepoConfig:
return RepoConfig(
**base_repo_config_kwargs,
online_store=RedisOnlineStoreConfig(
Expand All @@ -84,13 +82,15 @@ def feature_view():
return feature_view


def test_generate_entity_redis_keys(redis_online_store: RedisOnlineStore, repo_config):
def test_generate_entity_redis_keys(
redis_online_store: RedisOnlineStore, repo_config_without_docker_connection_string
):
entity_keys = [
EntityKeyProto(join_keys=["entity"], entity_values=[ValueProto(int32_val=1)]),
]

actual = redis_online_store._generate_redis_keys_for_entities(
repo_config, entity_keys
repo_config_without_docker_connection_string, entity_keys
)
expected = [
b"\x01\x00\x00\x00\x02\x00\x00\x00\x06\x00\x00\x00entity\x03\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00test"
Expand Down Expand Up @@ -176,7 +176,7 @@ def test_get_features_for_entity(redis_online_store: RedisOnlineStore, feature_v

@pytest.mark.docker
def test_redis_online_write_batch_with_timestamp_as_sortkey(
repo_config_with_docker_connection_string: RepoConfig,
repo_config: RepoConfig,
redis_online_store: RedisOnlineStore,
):
(
Expand All @@ -185,15 +185,13 @@ def test_redis_online_write_batch_with_timestamp_as_sortkey(
) = _create_sorted_feature_view_with_timestamp_as_sortkey()

redis_online_store.online_write_batch(
config=repo_config_with_docker_connection_string,
config=repo_config,
table=feature_view,
data=data,
progress=None,
)

connection_string = (
repo_config_with_docker_connection_string.online_store.connection_string
)
connection_string = repo_config.online_store.connection_string
connection_string_split = connection_string.split(":")
conn_dict = {}
conn_dict["host"] = connection_string_split[0]
Expand All @@ -209,9 +207,9 @@ def test_redis_online_write_batch_with_timestamp_as_sortkey(
)

redis_key_bin_driver_1 = _redis_key(
repo_config_with_docker_connection_string.project,
repo_config.project,
entity_key_driver_1,
entity_key_serialization_version=repo_config_with_docker_connection_string.entity_key_serialization_version,
entity_key_serialization_version=repo_config.entity_key_serialization_version,
)

zset_key_driver_1 = redis_online_store.zset_key_bytes(
Expand All @@ -223,9 +221,9 @@ def test_redis_online_write_batch_with_timestamp_as_sortkey(
entity_values=[ValueProto(int32_val=2)],
)
redis_key_bin_driver_2 = _redis_key(
repo_config_with_docker_connection_string.project,
repo_config.project,
entity_key_driver_2,
entity_key_serialization_version=repo_config_with_docker_connection_string.entity_key_serialization_version,
entity_key_serialization_version=repo_config.entity_key_serialization_version,
)

zset_key_driver_2 = redis_online_store.zset_key_bytes(
Expand Down Expand Up @@ -269,7 +267,7 @@ def test_redis_online_write_batch_with_timestamp_as_sortkey(

@pytest.mark.docker
def test_redis_online_write_batch_with_float_as_sortkey(
repo_config_with_docker_connection_string: RepoConfig,
repo_config: RepoConfig,
redis_online_store: RedisOnlineStore,
):
(
Expand All @@ -278,15 +276,13 @@ def test_redis_online_write_batch_with_float_as_sortkey(
) = _create_sorted_feature_view_with_float_as_sortkey()

redis_online_store.online_write_batch(
config=repo_config_with_docker_connection_string,
config=repo_config,
table=feature_view,
data=data,
progress=None,
)

connection_string = (
repo_config_with_docker_connection_string.online_store.connection_string
)
connection_string = repo_config.online_store.connection_string
connection_string_split = connection_string.split(":")
conn_dict = {}
conn_dict["host"] = connection_string_split[0]
Expand All @@ -302,9 +298,9 @@ def test_redis_online_write_batch_with_float_as_sortkey(
)

redis_key_bin_driver_1 = _redis_key(
repo_config_with_docker_connection_string.project,
repo_config.project,
entity_key_driver_1,
entity_key_serialization_version=repo_config_with_docker_connection_string.entity_key_serialization_version,
entity_key_serialization_version=repo_config.entity_key_serialization_version,
)

zset_key_driver_1 = redis_online_store.zset_key_bytes(
Expand All @@ -316,9 +312,9 @@ def test_redis_online_write_batch_with_float_as_sortkey(
entity_values=[ValueProto(int32_val=2)],
)
redis_key_bin_driver_2 = _redis_key(
repo_config_with_docker_connection_string.project,
repo_config.project,
entity_key_driver_2,
entity_key_serialization_version=repo_config_with_docker_connection_string.entity_key_serialization_version,
entity_key_serialization_version=repo_config.entity_key_serialization_version,
)

zset_key_driver_2 = redis_online_store.zset_key_bytes(
Expand Down Expand Up @@ -371,7 +367,8 @@ def repo_config_before(redis_online_store_config):


def test_multiple_sort_keys_not_supported(
repo_config: RepoConfig, redis_online_store: RedisOnlineStore
repo_config_without_docker_connection_string: RepoConfig,
redis_online_store: RedisOnlineStore,
):
(
feature_view,
Expand All @@ -383,15 +380,16 @@ def test_multiple_sort_keys_not_supported(
match=r"Only one sort key is supported for Range query use cases in Redis, but found 2 sort keys in the",
):
redis_online_store.online_write_batch(
config=repo_config,
config=repo_config_without_docker_connection_string,
table=feature_view,
data=data,
progress=None,
)


def test_non_numeric_sort_key_not_supported(
repo_config: RepoConfig, redis_online_store: RedisOnlineStore
repo_config_without_docker_connection_string: RepoConfig,
redis_online_store: RedisOnlineStore,
):
(
feature_view,
Expand All @@ -402,7 +400,7 @@ def test_non_numeric_sort_key_not_supported(
TypeError, match=r"Unsupported sort key type STRING. Only numerics or timestamp"
):
redis_online_store.online_write_batch(
config=repo_config,
config=repo_config_without_docker_connection_string,
table=feature_view,
data=data,
progress=None,
Expand Down
48 changes: 22 additions & 26 deletions sdk/python/tests/unit/infra/online_store/test_valkey.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def base_repo_config_kwargs():


@pytest.fixture
def repo_config(base_repo_config_kwargs) -> RepoConfig:
def repo_config_without_docker_connection_string(base_repo_config_kwargs) -> RepoConfig:
return RepoConfig(
**base_repo_config_kwargs,
online_store=EGValkeyOnlineStoreConfig(
Expand All @@ -58,9 +58,7 @@ def repo_config(base_repo_config_kwargs) -> RepoConfig:


@pytest.fixture
def repo_config_with_docker_connection_string(
valkey_online_store_config, base_repo_config_kwargs
) -> RepoConfig:
def repo_config(valkey_online_store_config, base_repo_config_kwargs) -> RepoConfig:
return RepoConfig(
**base_repo_config_kwargs,
online_store=EGValkeyOnlineStoreConfig(
Expand All @@ -71,7 +69,7 @@ def repo_config_with_docker_connection_string(

@pytest.mark.docker
def test_valkey_online_write_batch_with_timestamp_as_sortkey(
repo_config_with_docker_connection_string: RepoConfig,
repo_config: RepoConfig,
valkey_online_store: EGValkeyOnlineStore,
):
(
Expand All @@ -80,15 +78,13 @@ def test_valkey_online_write_batch_with_timestamp_as_sortkey(
) = _create_sorted_feature_view_with_timestamp_as_sortkey()

valkey_online_store.online_write_batch(
config=repo_config_with_docker_connection_string,
config=repo_config,
table=feature_view,
data=data,
progress=None,
)

connection_string = (
repo_config_with_docker_connection_string.online_store.connection_string
)
connection_string = repo_config.online_store.connection_string
connection_string_split = connection_string.split(":")
conn_dict = {}
conn_dict["host"] = connection_string_split[0]
Expand All @@ -104,9 +100,9 @@ def test_valkey_online_write_batch_with_timestamp_as_sortkey(
)

redis_key_bin_driver_1 = _redis_key(
repo_config_with_docker_connection_string.project,
repo_config.project,
entity_key_driver_1,
entity_key_serialization_version=repo_config_with_docker_connection_string.entity_key_serialization_version,
entity_key_serialization_version=repo_config.entity_key_serialization_version,
)

zset_key_driver_1 = valkey_online_store.zset_key_bytes(
Expand All @@ -118,9 +114,9 @@ def test_valkey_online_write_batch_with_timestamp_as_sortkey(
entity_values=[ValueProto(int32_val=2)],
)
redis_key_bin_driver_2 = _redis_key(
repo_config_with_docker_connection_string.project,
repo_config.project,
entity_key_driver_2,
entity_key_serialization_version=repo_config_with_docker_connection_string.entity_key_serialization_version,
entity_key_serialization_version=repo_config.entity_key_serialization_version,
)

zset_key_driver_2 = valkey_online_store.zset_key_bytes(
Expand Down Expand Up @@ -164,7 +160,7 @@ def test_valkey_online_write_batch_with_timestamp_as_sortkey(

@pytest.mark.docker
def test_valkey_online_write_batch_with_float_as_sortkey(
repo_config_with_docker_connection_string: RepoConfig,
repo_config: RepoConfig,
valkey_online_store: EGValkeyOnlineStore,
):
(
Expand All @@ -173,15 +169,13 @@ def test_valkey_online_write_batch_with_float_as_sortkey(
) = _create_sorted_feature_view_with_float_as_sortkey()

valkey_online_store.online_write_batch(
config=repo_config_with_docker_connection_string,
config=repo_config,
table=feature_view,
data=data,
progress=None,
)

connection_string = (
repo_config_with_docker_connection_string.online_store.connection_string
)
connection_string = repo_config.online_store.connection_string
connection_string_split = connection_string.split(":")
conn_dict = {}
conn_dict["host"] = connection_string_split[0]
Expand All @@ -197,9 +191,9 @@ def test_valkey_online_write_batch_with_float_as_sortkey(
)

redis_key_bin_driver_1 = _redis_key(
repo_config_with_docker_connection_string.project,
repo_config.project,
entity_key_driver_1,
entity_key_serialization_version=repo_config_with_docker_connection_string.entity_key_serialization_version,
entity_key_serialization_version=repo_config.entity_key_serialization_version,
)

zset_key_driver_1 = valkey_online_store.zset_key_bytes(
Expand All @@ -211,9 +205,9 @@ def test_valkey_online_write_batch_with_float_as_sortkey(
entity_values=[ValueProto(int32_val=2)],
)
redis_key_bin_driver_2 = _redis_key(
repo_config_with_docker_connection_string.project,
repo_config.project,
entity_key_driver_2,
entity_key_serialization_version=repo_config_with_docker_connection_string.entity_key_serialization_version,
entity_key_serialization_version=repo_config.entity_key_serialization_version,
)

zset_key_driver_2 = valkey_online_store.zset_key_bytes(
Expand Down Expand Up @@ -253,7 +247,8 @@ def test_valkey_online_write_batch_with_float_as_sortkey(


def test_multiple_sort_keys_not_supported(
repo_config: RepoConfig, valkey_online_store: EGValkeyOnlineStore
repo_config_without_docker_connection_string: RepoConfig,
valkey_online_store: EGValkeyOnlineStore,
):
(
feature_view,
Expand All @@ -265,15 +260,16 @@ def test_multiple_sort_keys_not_supported(
match=r"Only one sort key is supported for Range query use cases in Valkey, but found 2 sort keys in the",
):
valkey_online_store.online_write_batch(
config=repo_config,
config=repo_config_without_docker_connection_string,
table=feature_view,
data=data,
progress=None,
)


def test_non_numeric_sort_key_not_supported(
repo_config: RepoConfig, valkey_online_store: EGValkeyOnlineStore
repo_config_without_docker_connection_string: RepoConfig,
valkey_online_store: EGValkeyOnlineStore,
):
(
feature_view,
Expand All @@ -284,7 +280,7 @@ def test_non_numeric_sort_key_not_supported(
TypeError, match=r"Unsupported sort key type STRING. Only numerics or timestamp"
):
valkey_online_store.online_write_batch(
config=repo_config,
config=repo_config_without_docker_connection_string,
table=feature_view,
data=data,
progress=None,
Expand Down
Loading