Skip to content

Commit a00fad5

Browse files
authored
Merge pull request #121 from SchmidtDSE/fix-streaming
Fix streaming on weak connection.
2 parents 7540550 + 8fb1a69 commit a00fad5

File tree

2 files changed

+23
-4
lines changed

2 files changed

+23
-4
lines changed

afscgap/flat_http.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,25 @@ def determine_matching_hauls_from_index(options: typing.Iterable[dict],
8888
return dict_stream
8989

9090

91+
def get_complete(iterator, url: str) -> typing.List[dict]:
92+
"""Get the complete payload from an Avro iterator.
93+
94+
Get the complete payload from an Avro iterator to avoid issues with streaming interruption on
95+
weaker connections.
96+
97+
Args:
98+
iterator: The iterator over Avro files read from the stream.
99+
url: The URL at which the Avro payload was found.
100+
101+
Returns:
102+
Iterable over the parsed Avro records.
103+
"""
104+
try:
105+
return list(iterator)
106+
except Exception as e:
107+
raise RuntimeError('Failed on %s (%s).' % (url, str(e)))
108+
109+
91110
def get_all_hauls(meta: afscgap.flat_model.ExecuteMetaParams) -> HAUL_KEYS:
92111
"""Get information about all hauls currently available.
93112
@@ -109,7 +128,7 @@ def get_all_hauls(meta: afscgap.flat_model.ExecuteMetaParams) -> HAUL_KEYS:
109128
afscgap.http_util.check_result(response)
110129

111130
stream = response.raw
112-
dict_stream = fastavro.reader(stream) # type: ignore
131+
dict_stream = get_complete(fastavro.reader(stream), url) # type: ignore
113132
obj_stream = map(build_haul_from_avro, dict_stream) # type: ignore
114133
return obj_stream
115134

@@ -139,7 +158,7 @@ def get_for_url(url):
139158
afscgap.http_util.check_result(response)
140159

141160
stream = response.raw
142-
all_with_value: typing.Iterable[dict] = fastavro.reader(stream) # type: ignore
161+
all_with_value = get_complete(fastavro.reader(stream), url)
143162
dict_stream = determine_matching_hauls_from_index(all_with_value, index_filter)
144163

145164
obj_stream = map(build_haul_from_avro, dict_stream)
@@ -170,6 +189,6 @@ def get_records_for_haul(meta: afscgap.flat_model.ExecuteMetaParams,
170189
afscgap.http_util.check_result(response)
171190

172191
stream = response.raw
173-
dict_stream = fastavro.reader(stream) # type: ignore
192+
dict_stream = get_complete(fastavro.reader(stream), url)
174193
obj_stream = map(lambda x: afscgap.flat_model.FlatRecord(x), dict_stream)
175194
return obj_stream

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "afscgap"
3-
version = "2.0.0"
3+
version = "2.0.1"
44
authors = [
55
{ name="A Samuel Pottinger", email="sam.pottinger@berkeley.edu" },
66
]

0 commit comments

Comments
 (0)