Skip to content

perf: Parallelize DynamoDB batch reads in sync online_read#6024

Open
abhijeet-dhumal wants to merge 3 commits intofeast-dev:masterfrom
abhijeet-dhumal:perf/parallel-dynamodb-batch-reads
Open

perf: Parallelize DynamoDB batch reads in sync online_read#6024
abhijeet-dhumal wants to merge 3 commits intofeast-dev:masterfrom
abhijeet-dhumal:perf/parallel-dynamodb-batch-reads

Conversation

@abhijeet-dhumal
Copy link
Contributor

@abhijeet-dhumal abhijeet-dhumal commented Feb 25, 2026

Summary

Execute DynamoDB BatchGetItem requests in parallel using ThreadPoolExecutor instead of sequentially. This significantly reduces latency when reading features for many entities that span multiple batches.

Changes

  • Pre-split entity IDs into batches upfront
  • Use ThreadPoolExecutor to execute batch requests concurrently
  • Skip parallelization for single batch (no overhead)
  • Merge results in original order after parallel fetch

Expected Behavior

For multiple batches, DynamoDB BatchGetItem requests should execute in parallel, reducing total latency from N × network_latency to approximately 1 × network_latency.

Current Behavior

The sync online_read method executes batch requests sequentially in a while loop:

while True:
    batch = list(itertools.islice(entity_ids_iter, batch_size))
    if len(batch) == 0:
        break
    response = dynamodb_resource.batch_get_item(RequestItems=...)  # Sequential!
    result.extend(batch_result)

For 500 entities with batch_size=100, this makes 5 sequential network calls.

Steps to Reproduce

  1. Configure DynamoDB online store with batch_size=100
  2. Call get_online_features for 500 entities
  3. Profile network latency - observe 5 sequential calls

Specifications

  • Version: 0.47.0+
  • Platform: All
  • Subsystem: sdk/python/feast/infra/online_stores/dynamodb.py

Performance Impact

For 500 entities with batch_size=100 (5 batches):

  • Before: 5 sequential network calls = 50-150ms
  • After: 5 parallel network calls = 10-30ms
  • Estimated savings: 40-120ms for large entity sets

Possible Solution

Already implemented in this PR using ThreadPoolExecutor:

with ThreadPoolExecutor(max_workers=min(len(batches), batch_size)) as executor:
    responses = list(executor.map(fetch_batch, batches))

Related

  • RHOAIENG-46061 (60ms p99 SLA target for online feature serving)
  • Note: The async path (online_read_async) already uses asyncio.gather() for parallel execution

Open with Devin

@abhijeet-dhumal abhijeet-dhumal requested a review from a team as a code owner February 25, 2026 14:47
@abhijeet-dhumal abhijeet-dhumal changed the title perf: parallelize DynamoDB batch reads in sync online_read perf: Parallelize DynamoDB batch reads in sync online_read Feb 25, 2026
Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
@abhijeet-dhumal abhijeet-dhumal force-pushed the perf/parallel-dynamodb-batch-reads branch from 2a3e9b8 to f75c446 Compare February 25, 2026 14:49
devin-ai-integration[bot]

This comment was marked as resolved.

Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
…B reads

Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
_get_table_name(online_config, config, table)
)
table_name = _get_table_name(online_config, config, table)
table_instance = dynamodb_resource.Table(table_name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

table_instance is only used in the single-batch path, and seems unnecessary since table_name can be used instead of table_instance.name


# Use ThreadPoolExecutor for parallel I/O
# Cap at 10 workers to avoid excessive thread creation
max_workers = min(len(batches), 10)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be configurable (max_read_workers) in online store configs

# Execute batch requests in parallel for multiple batches
# Note: boto3 resources are NOT thread-safe, so we create a new resource per thread
def fetch_batch(batch: List[str]) -> Dict[str, Any]:
thread_resource = _initialize_dynamodb_resource(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think creating here we are creating new session on each thread. Instead we can share a single _dynamodb_client across all threads.

def fetch_batch(batch: List[str]) -> Dict[str, Any]:
    batch_entity_ids = self._to_client_batch_get_payload(
        online_config, table_name, batch
    )
    return dynamodb_client.batch_get_item(RequestItems=batch_entity_ids)

https://docs.aws.amazon.com/boto3/latest/guide/clients.html#multithreading-or-multiprocessing-with-clients

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants