2626import logging
2727import queue
2828import threading
29+ import time
2930import warnings
3031from typing import Any , Union , Optional , Callable , Generator , List
3132
@@ -869,6 +870,7 @@ def _download_table_bqstorage(
869870 max_queue_size : Any = _MAX_QUEUE_SIZE_DEFAULT ,
870871 max_stream_count : Optional [int ] = None ,
871872 download_state : Optional [_DownloadState ] = None ,
873+ timeout : Optional [float ] = None ,
872874) -> Generator [Any , None , None ]:
873875 """Downloads a BigQuery table using the BigQuery Storage API.
874876
@@ -899,13 +901,18 @@ def _download_table_bqstorage(
899901 download_state (Optional[_DownloadState]):
900902 A threadsafe state object which can be used to observe the
901903 behavior of the worker threads created by this method.
904+ timeout (Optional[float]):
905+ The number of seconds to wait for the download to complete.
906+ If None, wait indefinitely.
902907
903908 Yields:
904909 pandas.DataFrame: Pandas DataFrames, one for each chunk of data
905910 downloaded from BigQuery.
906911
907912 Raises:
908913 ValueError: If attempting to read from a specific partition or snapshot.
914+ concurrent.futures.TimeoutError:
915+ If the download does not complete within the specified timeout.
909916
910917 Note:
911918 This method requires the `google-cloud-bigquery-storage` library
@@ -973,60 +980,73 @@ def _download_table_bqstorage(
973980
974981 worker_queue : queue .Queue [int ] = queue .Queue (maxsize = max_queue_size )
975982
976- with concurrent .futures .ThreadPoolExecutor (max_workers = total_streams ) as pool :
977- try :
978- # Manually submit jobs and wait for download to complete rather
979- # than using pool.map because pool.map continues running in the
980- # background even if there is an exception on the main thread.
981- # See: https://github.com/googleapis/google-cloud-python/pull/7698
982- not_done = [
983- pool .submit (
984- _download_table_bqstorage_stream ,
985- download_state ,
986- bqstorage_client ,
987- session ,
988- stream ,
989- worker_queue ,
990- page_to_item ,
991- )
992- for stream in session .streams
993- ]
994-
995- while not_done :
996- # Don't block on the worker threads. For performance reasons,
997- # we want to block on the queue's get method, instead. This
998- # prevents the queue from filling up, because the main thread
999- # has smaller gaps in time between calls to the queue's get
1000- # method. For a detailed explanation, see:
1001- # https://friendliness.dev/2019/06/18/python-nowait/
1002- done , not_done = _nowait (not_done )
1003- for future in done :
1004- # Call result() on any finished threads to raise any
1005- # exceptions encountered.
1006- future .result ()
983+ # Manually manage the pool to control shutdown behavior on timeout.
984+ pool = concurrent .futures .ThreadPoolExecutor (max_workers = max (1 , total_streams ))
985+ wait_on_shutdown = True
986+ start_time = time .time ()
1007987
1008- try :
1009- frame = worker_queue .get (timeout = _PROGRESS_INTERVAL )
1010- yield frame
1011- except queue .Empty : # pragma: NO COVER
1012- continue
988+ try :
989+ # Manually submit jobs and wait for download to complete rather
990+ # than using pool.map because pool.map continues running in the
991+ # background even if there is an exception on the main thread.
992+ # See: https://github.com/googleapis/google-cloud-python/pull/7698
993+ not_done = [
994+ pool .submit (
995+ _download_table_bqstorage_stream ,
996+ download_state ,
997+ bqstorage_client ,
998+ session ,
999+ stream ,
1000+ worker_queue ,
1001+ page_to_item ,
1002+ )
1003+ for stream in session .streams
1004+ ]
1005+
1006+ while not_done :
1007+ # Check for timeout
1008+ if timeout is not None :
1009+ elapsed = time .time () - start_time
1010+ if elapsed > timeout :
1011+ wait_on_shutdown = False
1012+ raise concurrent .futures .TimeoutError (
1013+ f"Download timed out after { timeout } seconds."
1014+ )
1015+
1016+ # Don't block on the worker threads. For performance reasons,
1017+ # we want to block on the queue's get method, instead. This
1018+ # prevents the queue from filling up, because the main thread
1019+ # has smaller gaps in time between calls to the queue's get
1020+ # method. For a detailed explanation, see:
1021+ # https://friendliness.dev/2019/06/18/python-nowait/
1022+ done , not_done = _nowait (not_done )
1023+ for future in done :
1024+ # Call result() on any finished threads to raise any
1025+ # exceptions encountered.
1026+ future .result ()
1027+
1028+ try :
1029+ frame = worker_queue .get (timeout = _PROGRESS_INTERVAL )
1030+ yield frame
1031+ except queue .Empty : # pragma: NO COVER
1032+ continue
10131033
1014- # Return any remaining values after the workers finished.
1015- while True : # pragma: NO COVER
1016- try :
1017- frame = worker_queue .get_nowait ()
1018- yield frame
1019- except queue .Empty : # pragma: NO COVER
1020- break
1021- finally :
1022- # No need for a lock because reading/replacing a variable is
1023- # defined to be an atomic operation in the Python language
1024- # definition (enforced by the global interpreter lock).
1025- download_state .done = True
1034+ # Return any remaining values after the workers finished.
1035+ while True : # pragma: NO COVER
1036+ try :
1037+ frame = worker_queue .get_nowait ()
1038+ yield frame
1039+ except queue .Empty : # pragma: NO COVER
1040+ break
1041+ finally :
1042+ # No need for a lock because reading/replacing a variable is
1043+ # defined to be an atomic operation in the Python language
1044+ # definition (enforced by the global interpreter lock).
1045+ download_state .done = True
10261046
1027- # Shutdown all background threads, now that they should know to
1028- # exit early.
1029- pool .shutdown (wait = True )
1047+ # Shutdown all background threads, now that they should know to
1048+ # exit early.
1049+ pool .shutdown (wait = wait_on_shutdown )
10301050
10311051
10321052def download_arrow_bqstorage (
@@ -1037,6 +1057,7 @@ def download_arrow_bqstorage(
10371057 selected_fields = None ,
10381058 max_queue_size = _MAX_QUEUE_SIZE_DEFAULT ,
10391059 max_stream_count = None ,
1060+ timeout = None ,
10401061):
10411062 return _download_table_bqstorage (
10421063 project_id ,
@@ -1047,6 +1068,7 @@ def download_arrow_bqstorage(
10471068 page_to_item = _bqstorage_page_to_arrow ,
10481069 max_queue_size = max_queue_size ,
10491070 max_stream_count = max_stream_count ,
1071+ timeout = timeout ,
10501072 )
10511073
10521074
@@ -1060,6 +1082,7 @@ def download_dataframe_bqstorage(
10601082 selected_fields = None ,
10611083 max_queue_size = _MAX_QUEUE_SIZE_DEFAULT ,
10621084 max_stream_count = None ,
1085+ timeout = None ,
10631086):
10641087 page_to_item = functools .partial (_bqstorage_page_to_dataframe , column_names , dtypes )
10651088 return _download_table_bqstorage (
@@ -1071,6 +1094,7 @@ def download_dataframe_bqstorage(
10711094 page_to_item = page_to_item ,
10721095 max_queue_size = max_queue_size ,
10731096 max_stream_count = max_stream_count ,
1097+ timeout = timeout ,
10741098 )
10751099
10761100
0 commit comments