|
16 | 16 | import itertools |
17 | 17 | import logging |
18 | 18 | from collections import OrderedDict, defaultdict |
| 19 | +from concurrent.futures import ThreadPoolExecutor |
19 | 20 | from datetime import datetime |
20 | 21 | from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union |
21 | 22 |
|
@@ -479,33 +480,50 @@ def online_read( |
479 | 480 | online_config.endpoint_url, |
480 | 481 | online_config.session_based_auth, |
481 | 482 | ) |
482 | | - table_instance = dynamodb_resource.Table( |
483 | | - _get_table_name(online_config, config, table) |
484 | | - ) |
| 483 | + table_name = _get_table_name(online_config, config, table) |
| 484 | + table_instance = dynamodb_resource.Table(table_name) |
485 | 485 |
|
486 | 486 | batch_size = online_config.batch_size |
487 | 487 | entity_ids = self._to_entity_ids(config, entity_keys) |
488 | | - entity_ids_iter = iter(entity_ids) |
489 | | - result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] |
490 | 488 |
|
| 489 | + # Split entity_ids into batches upfront |
| 490 | + batches: List[List[str]] = [] |
| 491 | + entity_ids_iter = iter(entity_ids) |
491 | 492 | while True: |
492 | 493 | batch = list(itertools.islice(entity_ids_iter, batch_size)) |
493 | | - |
494 | | - # No more items to insert |
495 | | - if len(batch) == 0: |
| 494 | + if not batch: |
496 | 495 | break |
| 496 | + batches.append(batch) |
| 497 | + |
| 498 | + if not batches: |
| 499 | + return [] |
| 500 | + |
| 501 | + # For single batch, no parallelization overhead needed |
| 502 | + if len(batches) == 1: |
497 | 503 | batch_entity_ids = self._to_resource_batch_get_payload( |
498 | | - online_config, table_instance.name, batch |
499 | | - ) |
500 | | - response = dynamodb_resource.batch_get_item( |
501 | | - RequestItems=batch_entity_ids, |
| 504 | + online_config, table_instance.name, batches[0] |
502 | 505 | ) |
503 | | - batch_result = self._process_batch_get_response( |
504 | | - table_instance.name, |
505 | | - response, |
506 | | - batch, |
| 506 | + response = dynamodb_resource.batch_get_item(RequestItems=batch_entity_ids) |
| 507 | + return self._process_batch_get_response(table_name, response, batches[0]) |
| 508 | + |
| 509 | + # Execute batch requests in parallel for multiple batches |
| 510 | + def fetch_batch(batch: List[str]) -> Dict[str, Any]: |
| 511 | + batch_entity_ids = self._to_resource_batch_get_payload( |
| 512 | + online_config, table_instance.name, batch |
507 | 513 | ) |
| 514 | + return dynamodb_resource.batch_get_item(RequestItems=batch_entity_ids) |
| 515 | + |
| 516 | + # Use ThreadPoolExecutor for parallel I/O |
| 517 | + max_workers = min(len(batches), batch_size) |
| 518 | + with ThreadPoolExecutor(max_workers=max_workers) as executor: |
| 519 | + responses = list(executor.map(fetch_batch, batches)) |
| 520 | + |
| 521 | + # Process responses and merge results in order |
| 522 | + result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] |
| 523 | + for batch, response in zip(batches, responses): |
| 524 | + batch_result = self._process_batch_get_response(table_name, response, batch) |
508 | 525 | result.extend(batch_result) |
| 526 | + |
509 | 527 | return result |
510 | 528 |
|
511 | 529 | async def online_read_async( |
|
0 commit comments