Skip to content

Commit 703606c

Browse files
fix: address thread-safety and max_workers issues in parallel DynamoDB reads
Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
1 parent a00a2c3 commit 703606c

File tree

2 files changed

+188
-3
lines changed

2 files changed

+188
-3
lines changed

sdk/python/feast/infra/online_stores/dynamodb.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -507,14 +507,21 @@ def online_read(
507507
return self._process_batch_get_response(table_name, response, batches[0])
508508

509509
# Execute batch requests in parallel for multiple batches
510+
# Note: boto3 resources are NOT thread-safe, so we create a new resource per thread
510511
def fetch_batch(batch: List[str]) -> Dict[str, Any]:
512+
thread_resource = _initialize_dynamodb_resource(
513+
online_config.region,
514+
online_config.endpoint_url,
515+
online_config.session_based_auth,
516+
)
511517
batch_entity_ids = self._to_resource_batch_get_payload(
512-
online_config, table_instance.name, batch
518+
online_config, table_name, batch
513519
)
514-
return dynamodb_resource.batch_get_item(RequestItems=batch_entity_ids)
520+
return thread_resource.batch_get_item(RequestItems=batch_entity_ids)
515521

516522
# Use ThreadPoolExecutor for parallel I/O
517-
max_workers = min(len(batches), batch_size)
523+
# Cap at 10 workers to avoid excessive thread creation
524+
max_workers = min(len(batches), 10)
518525
with ThreadPoolExecutor(max_workers=max_workers) as executor:
519526
responses = list(executor.map(fetch_batch, batches))
520527

sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -879,3 +879,181 @@ def test_dynamodb_online_store_online_read_order_preservation_across_batches(
879879
# Verify exact order matches
880880
for i, (returned, expected) in enumerate(zip(returned_items, features)):
881881
assert returned[1] == expected, f"Mismatch at index {i}"
882+
883+
884+
@mock_dynamodb
885+
def test_dynamodb_online_store_online_read_small_batch_size(dynamodb_online_store):
886+
"""Test parallel reads with small batch_size.
887+
888+
Verifies correctness with small batch sizes that create multiple batches.
889+
"""
890+
small_batch_config = RepoConfig(
891+
registry=REGISTRY,
892+
project=PROJECT,
893+
provider=PROVIDER,
894+
online_store=DynamoDBOnlineStoreConfig(region=REGION, batch_size=5),
895+
offline_store=DaskOfflineStoreConfig(),
896+
entity_key_serialization_version=3,
897+
)
898+
899+
n_samples = 25 # 5 batches with batch_size=5
900+
db_table_name = f"{TABLE_NAME}_small_batch"
901+
create_test_table(PROJECT, db_table_name, REGION)
902+
data = create_n_customer_test_samples(n=n_samples)
903+
insert_data_test_table(data, PROJECT, db_table_name, REGION)
904+
905+
entity_keys, features, *rest = zip(*data)
906+
returned_items = dynamodb_online_store.online_read(
907+
config=small_batch_config,
908+
table=MockFeatureView(name=db_table_name),
909+
entity_keys=entity_keys,
910+
)
911+
912+
assert len(returned_items) == n_samples
913+
assert [item[1] for item in returned_items] == list(features)
914+
915+
916+
@mock_dynamodb
917+
def test_dynamodb_online_store_online_read_many_batches(dynamodb_online_store):
918+
"""Test parallel reads with many batches (>10).
919+
920+
Verifies correctness when number of batches exceeds max_workers cap.
921+
"""
922+
many_batch_config = RepoConfig(
923+
registry=REGISTRY,
924+
project=PROJECT,
925+
provider=PROVIDER,
926+
online_store=DynamoDBOnlineStoreConfig(region=REGION, batch_size=10),
927+
offline_store=DaskOfflineStoreConfig(),
928+
entity_key_serialization_version=3,
929+
)
930+
931+
n_samples = 150 # 15 batches with batch_size=10
932+
db_table_name = f"{TABLE_NAME}_many_batches"
933+
create_test_table(PROJECT, db_table_name, REGION)
934+
data = create_n_customer_test_samples(n=n_samples)
935+
insert_data_test_table(data, PROJECT, db_table_name, REGION)
936+
937+
entity_keys, features, *rest = zip(*data)
938+
returned_items = dynamodb_online_store.online_read(
939+
config=many_batch_config,
940+
table=MockFeatureView(name=db_table_name),
941+
entity_keys=entity_keys,
942+
)
943+
944+
assert len(returned_items) == n_samples
945+
assert [item[1] for item in returned_items] == list(features)
946+
947+
948+
@mock_dynamodb
949+
def test_dynamodb_online_store_max_workers_capped_at_10(dynamodb_online_store):
950+
"""Verify ThreadPoolExecutor max_workers is capped at 10, not batch_size.
951+
952+
Bug: Old code used min(len(batches), batch_size) which fails with small batch_size.
953+
Fix: New code uses min(len(batches), 10) to ensure proper parallelization.
954+
955+
This test uses batch_size=5 with 15 batches to expose the bug:
956+
- OLD (buggy): max_workers = min(15, 5) = 5 (insufficient parallelism)
957+
- NEW (fixed): max_workers = min(15, 10) = 10 (correct cap)
958+
"""
959+
# Use small batch_size to expose the bug
960+
small_batch_config = RepoConfig(
961+
registry=REGISTRY,
962+
project=PROJECT,
963+
provider=PROVIDER,
964+
online_store=DynamoDBOnlineStoreConfig(region=REGION, batch_size=5),
965+
offline_store=DaskOfflineStoreConfig(),
966+
entity_key_serialization_version=3,
967+
)
968+
969+
n_samples = 75 # 15 batches with batch_size=5
970+
db_table_name = f"{TABLE_NAME}_max_workers_cap"
971+
create_test_table(PROJECT, db_table_name, REGION)
972+
data = create_n_customer_test_samples(n=n_samples)
973+
insert_data_test_table(data, PROJECT, db_table_name, REGION)
974+
975+
entity_keys, features, *rest = zip(*data)
976+
977+
with patch(
978+
"feast.infra.online_stores.dynamodb.ThreadPoolExecutor"
979+
) as mock_executor:
980+
# Configure mock to work like real ThreadPoolExecutor
981+
mock_executor.return_value.__enter__.return_value.map.return_value = iter(
982+
[{"Responses": {}} for _ in range(15)]
983+
)
984+
985+
dynamodb_online_store.online_read(
986+
config=small_batch_config,
987+
table=MockFeatureView(name=db_table_name),
988+
entity_keys=entity_keys,
989+
)
990+
991+
# Verify ThreadPoolExecutor was called with max_workers=10 (capped at 10, NOT batch_size=5)
992+
mock_executor.assert_called_once()
993+
call_kwargs = mock_executor.call_args
994+
assert call_kwargs[1]["max_workers"] == 10, (
995+
f"Expected max_workers=10 (capped), got {call_kwargs[1]['max_workers']}. "
996+
f"If got 5, the bug is using batch_size instead of 10 as cap."
997+
)
998+
999+
1000+
@mock_dynamodb
1001+
def test_dynamodb_online_store_thread_safety_new_resource_per_thread(
1002+
dynamodb_online_store,
1003+
):
1004+
"""Verify each thread creates its own boto3 resource for thread-safety.
1005+
1006+
boto3 resources are NOT thread-safe, so we must create a new resource
1007+
per thread when using ThreadPoolExecutor.
1008+
"""
1009+
config = RepoConfig(
1010+
registry=REGISTRY,
1011+
project=PROJECT,
1012+
provider=PROVIDER,
1013+
online_store=DynamoDBOnlineStoreConfig(region=REGION, batch_size=50),
1014+
offline_store=DaskOfflineStoreConfig(),
1015+
entity_key_serialization_version=3,
1016+
)
1017+
1018+
n_samples = 150 # 3 batches
1019+
db_table_name = f"{TABLE_NAME}_thread_safety"
1020+
create_test_table(PROJECT, db_table_name, REGION)
1021+
data = create_n_customer_test_samples(n=n_samples)
1022+
insert_data_test_table(data, PROJECT, db_table_name, REGION)
1023+
1024+
entity_keys, features, *rest = zip(*data)
1025+
1026+
# Track resources created to verify thread-safety
1027+
resources_created = []
1028+
original_init = boto3.resource
1029+
1030+
def tracking_resource(*args, **kwargs):
1031+
resource = original_init(*args, **kwargs)
1032+
resources_created.append(id(resource))
1033+
return resource
1034+
1035+
with patch.object(boto3, "resource", side_effect=tracking_resource):
1036+
returned_items = dynamodb_online_store.online_read(
1037+
config=config,
1038+
table=MockFeatureView(name=db_table_name),
1039+
entity_keys=entity_keys,
1040+
)
1041+
1042+
# Verify results are correct (functional correctness)
1043+
assert len(returned_items) == n_samples
1044+
1045+
# Verify multiple resources were created (thread-safety)
1046+
# Each of the 3 batches should create its own resource
1047+
# (plus potentially 1 for _get_dynamodb_resource cache initialization)
1048+
assert len(resources_created) >= 3, (
1049+
f"Expected at least 3 unique resources for 3 batches, "
1050+
f"got {len(resources_created)}"
1051+
)
1052+
1053+
# Verify the resources are actually different objects (not reused)
1054+
# At least the batch resources should be unique
1055+
unique_resources = set(resources_created)
1056+
assert len(unique_resources) >= 3, (
1057+
f"Expected at least 3 unique resource IDs, "
1058+
f"got {len(unique_resources)} unique out of {len(resources_created)}"
1059+
)

0 commit comments

Comments
 (0)