Skip to content

Commit a6d4816

Browse files
author
palkeo
committed
properly add receive_buffer_size everywhere
1 parent e988b56 commit a6d4816

File tree

2 files changed

+58
-15
lines changed

2 files changed

+58
-15
lines changed

tests/test_connection.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,7 @@ def test_client_open_url_options(open_websocket_mock):
311311
'extra_headers': [(b'X-Test-Header', b'My test header')],
312312
'message_queue_size': 9,
313313
'max_message_size': 333,
314+
'receive_buffer_size': 999,
314315
'connect_timeout': 36,
315316
'disconnect_timeout': 37,
316317
}

trio_websocket/_impl.py

Lines changed: 57 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
CONN_TIMEOUT = 60 # default connect & disconnect timeout, in seconds
3939
MESSAGE_QUEUE_SIZE = 1
4040
MAX_MESSAGE_SIZE = 2 ** 20 # 1 MiB
41-
DEFAULT_RECEIVE_BYTES = 4 * 2 ** 10 # 4 KiB
41+
RECEIVE_BYTES = 4 * 2 ** 10 # 4 KiB
4242
logger = logging.getLogger('trio-websocket')
4343

4444

@@ -81,6 +81,7 @@ def __exit__(self, ty, value, tb):
8181
async def open_websocket(host, port, resource, *, use_ssl, subprotocols=None,
8282
extra_headers=None,
8383
message_queue_size=MESSAGE_QUEUE_SIZE, max_message_size=MAX_MESSAGE_SIZE,
84+
receive_buffer_size=RECEIVE_BYTES,
8485
connect_timeout=CONN_TIMEOUT, disconnect_timeout=CONN_TIMEOUT):
8586
'''
8687
Open a WebSocket client connection to a host.
@@ -106,6 +107,9 @@ async def open_websocket(host, port, resource, *, use_ssl, subprotocols=None,
106107
:param int max_message_size: The maximum message size as measured by
107108
``len()``. If a message is received that is larger than this size,
108109
then the connection is closed with code 1009 (Message Too Big).
110+
:param Optional[int] receive_buffer_size: The buffer size we use to
111+
receive messages internally. None to let trio choose. Defaults
112+
to 4 KiB.
109113
:param float connect_timeout: The number of seconds to wait for the
110114
connection before timing out.
111115
:param float disconnect_timeout: The number of seconds to wait when closing
@@ -121,7 +125,8 @@ async def open_websocket(host, port, resource, *, use_ssl, subprotocols=None,
121125
resource, use_ssl=use_ssl, subprotocols=subprotocols,
122126
extra_headers=extra_headers,
123127
message_queue_size=message_queue_size,
124-
max_message_size=max_message_size)
128+
max_message_size=max_message_size,
129+
receive_buffer_size=receive_buffer_size)
125130
except trio.TooSlowError:
126131
raise ConnectionTimeout from None
127132
except OSError as e:
@@ -138,7 +143,8 @@ async def open_websocket(host, port, resource, *, use_ssl, subprotocols=None,
138143

139144
async def connect_websocket(nursery, host, port, resource, *, use_ssl,
140145
subprotocols=None, extra_headers=None,
141-
message_queue_size=MESSAGE_QUEUE_SIZE, max_message_size=MAX_MESSAGE_SIZE):
146+
message_queue_size=MESSAGE_QUEUE_SIZE, max_message_size=MAX_MESSAGE_SIZE,
147+
receive_buffer_size=RECEIVE_BYTES):
142148
'''
143149
Return an open WebSocket client connection to a host.
144150
@@ -166,6 +172,9 @@ async def connect_websocket(nursery, host, port, resource, *, use_ssl,
166172
:param int max_message_size: The maximum message size as measured by
167173
``len()``. If a message is received that is larger than this size,
168174
then the connection is closed with code 1009 (Message Too Big).
175+
:param Optional[int] receive_buffer_size: The buffer size we use to
176+
receive messages internally. None to let trio choose. Defaults
177+
to 4 KiB.
169178
:rtype: WebSocketConnection
170179
'''
171180
if use_ssl is True:
@@ -194,7 +203,8 @@ async def connect_websocket(nursery, host, port, resource, *, use_ssl,
194203
path=resource,
195204
client_subprotocols=subprotocols, client_extra_headers=extra_headers,
196205
message_queue_size=message_queue_size,
197-
max_message_size=max_message_size)
206+
max_message_size=max_message_size,
207+
receive_buffer_size=receive_buffer_size)
198208
nursery.start_soon(connection._reader_task)
199209
await connection._open_handshake.wait()
200210
return connection
@@ -203,6 +213,7 @@ async def connect_websocket(nursery, host, port, resource, *, use_ssl,
203213
def open_websocket_url(url, ssl_context=None, *, subprotocols=None,
204214
extra_headers=None,
205215
message_queue_size=MESSAGE_QUEUE_SIZE, max_message_size=MAX_MESSAGE_SIZE,
216+
receive_buffer_size=RECEIVE_BYTES,
206217
connect_timeout=CONN_TIMEOUT, disconnect_timeout=CONN_TIMEOUT):
207218
'''
208219
Open a WebSocket client connection to a URL.
@@ -226,6 +237,9 @@ def open_websocket_url(url, ssl_context=None, *, subprotocols=None,
226237
:param int max_message_size: The maximum message size as measured by
227238
``len()``. If a message is received that is larger than this size,
228239
then the connection is closed with code 1009 (Message Too Big).
240+
:param Optional[int] receive_buffer_size: The buffer size we use to
241+
receive messages internally. None to let trio choose. Defaults
242+
to 4 KiB.
229243
:param float connect_timeout: The number of seconds to wait for the
230244
connection before timing out.
231245
:param float disconnect_timeout: The number of seconds to wait when closing
@@ -239,12 +253,14 @@ def open_websocket_url(url, ssl_context=None, *, subprotocols=None,
239253
subprotocols=subprotocols, extra_headers=extra_headers,
240254
message_queue_size=message_queue_size,
241255
max_message_size=max_message_size,
256+
receive_buffer_size=receive_buffer_size,
242257
connect_timeout=connect_timeout, disconnect_timeout=disconnect_timeout)
243258

244259

245260
async def connect_websocket_url(nursery, url, ssl_context=None, *,
246261
subprotocols=None, extra_headers=None,
247-
message_queue_size=MESSAGE_QUEUE_SIZE, max_message_size=MAX_MESSAGE_SIZE):
262+
message_queue_size=MESSAGE_QUEUE_SIZE, max_message_size=MAX_MESSAGE_SIZE,
263+
receive_buffer_size=RECEIVE_BYTES):
248264
'''
249265
Return an open WebSocket client connection to a URL.
250266
@@ -269,13 +285,17 @@ async def connect_websocket_url(nursery, url, ssl_context=None, *,
269285
:param int max_message_size: The maximum message size as measured by
270286
``len()``. If a message is received that is larger than this size,
271287
then the connection is closed with code 1009 (Message Too Big).
288+
:param Optional[int] receive_buffer_size: The buffer size we use to
289+
receive messages internally. None to let trio choose. Defaults
290+
to 4 KiB.
272291
:rtype: WebSocketConnection
273292
'''
274293
host, port, resource, ssl_context = _url_to_host(url, ssl_context)
275294
return await connect_websocket(nursery, host, port, resource,
276295
use_ssl=ssl_context, subprotocols=subprotocols,
277296
extra_headers=extra_headers, message_queue_size=message_queue_size,
278-
max_message_size=max_message_size)
297+
max_message_size=max_message_size,
298+
receive_buffer_size=receive_buffer_size)
279299

280300

281301
def _url_to_host(url, ssl_context):
@@ -316,7 +336,8 @@ def _url_to_host(url, ssl_context):
316336

317337
async def wrap_client_stream(nursery, stream, host, resource, *,
318338
subprotocols=None, extra_headers=None,
319-
message_queue_size=MESSAGE_QUEUE_SIZE, max_message_size=MAX_MESSAGE_SIZE):
339+
message_queue_size=MESSAGE_QUEUE_SIZE, max_message_size=MAX_MESSAGE_SIZE,
340+
receive_buffer_size=RECEIVE_BYTES):
320341
'''
321342
Wrap an arbitrary stream in a WebSocket connection.
322343
@@ -340,21 +361,26 @@ async def wrap_client_stream(nursery, stream, host, resource, *,
340361
:param int max_message_size: The maximum message size as measured by
341362
``len()``. If a message is received that is larger than this size,
342363
then the connection is closed with code 1009 (Message Too Big).
364+
:param Optional[int] receive_buffer_size: The buffer size we use to
365+
receive messages internally. None to let trio choose. Defaults
366+
to 4 KiB.
343367
:rtype: WebSocketConnection
344368
'''
345369
connection = WebSocketConnection(stream,
346370
WSConnection(ConnectionType.CLIENT),
347371
host=host, path=resource,
348372
client_subprotocols=subprotocols, client_extra_headers=extra_headers,
349373
message_queue_size=message_queue_size,
350-
max_message_size=max_message_size)
374+
max_message_size=max_message_size,
375+
receive_buffer_size=receive_buffer_size)
351376
nursery.start_soon(connection._reader_task)
352377
await connection._open_handshake.wait()
353378
return connection
354379

355380

356381
async def wrap_server_stream(nursery, stream,
357-
message_queue_size=MESSAGE_QUEUE_SIZE, max_message_size=MAX_MESSAGE_SIZE):
382+
message_queue_size=MESSAGE_QUEUE_SIZE, max_message_size=MAX_MESSAGE_SIZE,
383+
receive_buffer_size=RECEIVE_BYTES):
358384
'''
359385
Wrap an arbitrary stream in a server-side WebSocket.
360386
@@ -368,21 +394,26 @@ async def wrap_server_stream(nursery, stream,
368394
:param int max_message_size: The maximum message size as measured by
369395
``len()``. If a message is received that is larger than this size,
370396
then the connection is closed with code 1009 (Message Too Big).
397+
:param Optional[int] receive_buffer_size: The buffer size we use to
398+
receive messages internally. None to let trio choose. Defaults
399+
to 4 KiB.
371400
:type stream: trio.abc.Stream
372401
:rtype: WebSocketRequest
373402
'''
374403
connection = WebSocketConnection(stream,
375404
WSConnection(ConnectionType.SERVER),
376405
message_queue_size=message_queue_size,
377-
max_message_size=max_message_size)
406+
max_message_size=max_message_size,
407+
receive_buffer_size=receive_buffer_size)
378408
nursery.start_soon(connection._reader_task)
379409
request = await connection._get_request()
380410
return request
381411

382412

383413
async def serve_websocket(handler, host, port, ssl_context, *,
384414
handler_nursery=None, message_queue_size=MESSAGE_QUEUE_SIZE,
385-
max_message_size=MAX_MESSAGE_SIZE, connect_timeout=CONN_TIMEOUT,
415+
max_message_size=MAX_MESSAGE_SIZE, receive_buffer_size=RECEIVE_BYTES,
416+
connect_timeout=CONN_TIMEOUT,
386417
disconnect_timeout=CONN_TIMEOUT, task_status=trio.TASK_STATUS_IGNORED):
387418
'''
388419
Serve a WebSocket over TCP.
@@ -412,6 +443,9 @@ async def serve_websocket(handler, host, port, ssl_context, *,
412443
:param int max_message_size: The maximum message size as measured by
413444
``len()``. If a message is received that is larger than this size,
414445
then the connection is closed with code 1009 (Message Too Big).
446+
:param Optional[int] receive_buffer_size: The buffer size we use to
447+
receive messages internally. None to let trio choose. Defaults
448+
to 4 KiB.
415449
:param float connect_timeout: The number of seconds to wait for a client
416450
to finish connection handshake before timing out.
417451
:param float disconnect_timeout: The number of seconds to wait for a client
@@ -427,7 +461,9 @@ async def serve_websocket(handler, host, port, ssl_context, *,
427461
listeners = await open_tcp_listeners()
428462
server = WebSocketServer(handler, listeners,
429463
handler_nursery=handler_nursery, message_queue_size=message_queue_size,
430-
max_message_size=max_message_size, connect_timeout=connect_timeout,
464+
max_message_size=max_message_size,
465+
receive_buffer_size=receive_buffer_size,
466+
connect_timeout=connect_timeout,
431467
disconnect_timeout=disconnect_timeout)
432468
await server.run(task_status=task_status)
433469

@@ -688,7 +724,7 @@ def __init__(self, stream, ws_connection, *, host=None, path=None,
688724
client_subprotocols=None, client_extra_headers=None,
689725
message_queue_size=MESSAGE_QUEUE_SIZE,
690726
max_message_size=MAX_MESSAGE_SIZE,
691-
receive_buffer_size=DEFAULT_RECEIVE_BYTES):
727+
receive_buffer_size=RECEIVE_BYTES):
692728
'''
693729
Constructor.
694730
@@ -1321,7 +1357,8 @@ class WebSocketServer:
13211357

13221358
def __init__(self, handler, listeners, *, handler_nursery=None,
13231359
message_queue_size=MESSAGE_QUEUE_SIZE,
1324-
max_message_size=MAX_MESSAGE_SIZE, connect_timeout=CONN_TIMEOUT,
1360+
max_message_size=MAX_MESSAGE_SIZE, receive_buffer_size=RECEIVE_BYTES,
1361+
connect_timeout=CONN_TIMEOUT,
13251362
disconnect_timeout=CONN_TIMEOUT):
13261363
'''
13271364
Constructor.
@@ -1338,6 +1375,9 @@ def __init__(self, handler, listeners, *, handler_nursery=None,
13381375
:param handler_nursery: An optional nursery to spawn connection tasks
13391376
inside of. If ``None``, then a new nursery will be created
13401377
internally.
1378+
:param Optional[int] receive_buffer_size: The buffer size we use to
1379+
receive messages internally. None to let trio choose. Defaults
1380+
to 4 KiB.
13411381
:param float connect_timeout: The number of seconds to wait for a client
13421382
to finish connection handshake before timing out.
13431383
:param float disconnect_timeout: The number of seconds to wait for a client
@@ -1350,6 +1390,7 @@ def __init__(self, handler, listeners, *, handler_nursery=None,
13501390
self._listeners = listeners
13511391
self._message_queue_size = message_queue_size
13521392
self._max_message_size = max_message_size
1393+
self._receive_buffer_size = receive_buffer_size
13531394
self._connect_timeout = connect_timeout
13541395
self._disconnect_timeout = disconnect_timeout
13551396

@@ -1432,7 +1473,8 @@ async def _handle_connection(self, stream):
14321473
connection = WebSocketConnection(stream,
14331474
WSConnection(ConnectionType.SERVER),
14341475
message_queue_size=self._message_queue_size,
1435-
max_message_size=self._max_message_size)
1476+
max_message_size=self._max_message_size,
1477+
receive_buffer_size=self._receive_buffer_size)
14361478
nursery.start_soon(connection._reader_task)
14371479
with trio.move_on_after(self._connect_timeout) as connect_scope:
14381480
request = await connection._get_request()

0 commit comments

Comments
 (0)