Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure large payload can be serialized and sent over comms #8507

Merged
merged 7 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions distributed/comm/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@
logger = logging.getLogger(__name__)


# Workaround for OpenSSL 1.0.2.
# Can drop with OpenSSL 1.1.1 used by Python 3.10+.
# ref: https://bugs.python.org/issue42853
if sys.version_info < (3, 10):
OPENSSL_MAX_CHUNKSIZE = 256 ** ctypes.sizeof(ctypes.c_int) // 2 - 1
else:
OPENSSL_MAX_CHUNKSIZE = 256 ** ctypes.sizeof(ctypes.c_size_t) - 1

# We must not load more than this into a buffer at a time
# It's currently unclear why that is
# see
# - https://github.com/dask/distributed/pull/5854
# - https://bugs.python.org/issue42853
# - https://github.com/dask/distributed/pull/8507

C_INT_MAX = 256 ** ctypes.sizeof(ctypes.c_int) // 2 - 1

Check warning on line 54 in distributed/comm/tcp.py

View check run for this annotation

Codecov / codecov/patch

distributed/comm/tcp.py#L54

Added line #L54 was not covered by tests
MAX_BUFFER_SIZE = MEMORY_LIMIT / 2


Expand Down Expand Up @@ -286,8 +286,8 @@
2,
range(
0,
each_frame_nbytes + OPENSSL_MAX_CHUNKSIZE,
OPENSSL_MAX_CHUNKSIZE,
each_frame_nbytes + C_INT_MAX,
C_INT_MAX,
),
):
chunk = each_frame[i:j]
Expand Down Expand Up @@ -360,7 +360,7 @@

for i, j in sliding_window(
2,
range(0, n + OPENSSL_MAX_CHUNKSIZE, OPENSSL_MAX_CHUNKSIZE),
range(0, n + C_INT_MAX, C_INT_MAX),
):
chunk = buf[i:j]
actual = await stream.read_into(chunk) # type: ignore[arg-type]
Expand Down Expand Up @@ -432,7 +432,8 @@
A TLS-specific version of TCP.
"""

max_shard_size = min(OPENSSL_MAX_CHUNKSIZE, TCP.max_shard_size)
# Workaround for OpenSSL 1.0.2 (can drop with OpenSSL 1.1.1)
max_shard_size = min(C_INT_MAX, TCP.max_shard_size)

Check warning on line 436 in distributed/comm/tcp.py

View check run for this annotation

Codecov / codecov/patch

distributed/comm/tcp.py#L436

Added line #L436 was not covered by tests

def _read_extra(self):
TCP._read_extra(self)
Expand Down
27 changes: 27 additions & 0 deletions distributed/protocol/tests/test_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,30 @@ def test_fallback_to_pickle():
assert L[0].count(b"__Pickled__") == 1
assert L[0].count(b"__Serialized__") == 1
assert loads(L) == {np.int64(1): {2: "a"}, 3: ("b", "c"), 4: "d"}


@pytest.mark.slow
@pytest.mark.parametrize("typ", [bytes, str, "ext"])
def test_large_payload(typ):
"""See also: test_core.py::test_large_payload"""
critical_size = 2**31 + 1 # >2 GiB
if typ == bytes:
large_payload = critical_size * b"0"
expected = large_payload
elif typ == str:
large_payload = critical_size * "0"
expected = large_payload
# Testing array and map dtypes is practically not possible since we'd have
# to create an actual list or dict object of critical size (i.e. not the
# content but the container itself). These are so large that msgpack is
# running forever
# elif typ == "array":
# large_payload = [b"0"] * critical_size
# expected = tuple(large_payload)
# elif typ == "map":
# large_payload = {x: b"0" for x in range(critical_size)}
# expected = large_payload
elif typ == "ext":
large_payload = msgpack.ExtType(1, b"0" * critical_size)
expected = large_payload
assert loads(dumps(large_payload)) == expected
4 changes: 1 addition & 3 deletions distributed/protocol/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
BIG_BYTES_SHARD_SIZE = dask.utils.parse_bytes(dask.config.get("distributed.comm.shard"))


msgpack_opts = {
("max_%s_len" % x): 2**31 - 1 for x in ["str", "bin", "array", "map", "ext"]
}
msgpack_opts = {}

Check warning on line 15 in distributed/protocol/utils.py

View check run for this annotation

Codecov / codecov/patch

distributed/protocol/utils.py#L15

Added line #L15 was not covered by tests
msgpack_opts["strict_map_key"] = False
msgpack_opts["raw"] = False

Expand Down
25 changes: 25 additions & 0 deletions distributed/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import contextlib
import logging
import os
import random
import socket
Expand Down Expand Up @@ -1481,3 +1482,27 @@ def sync_handler(val):
assert ledger == list(range(n))
finally:
await comm.close()


@pytest.mark.slow
@gen_test(timeout=180)
async def test_large_payload(caplog):
"""See also: protocol/tests/test_protocol.py::test_large_payload"""
critical_size = 2**31 + 1 # >2 GiB
data = b"0" * critical_size

async with Server({"echo": echo_serialize}) as server:
await server.listen(0)
comm = await connect(server.address)

# FIXME https://github.com/dask/distributed/issues/8465
# At debug level, messages are dumped into the log. By default, pytest captures
# all logs, which would make this test extremely expensive to run.
with caplog.at_level(logging.INFO, logger="distributed.core"):
# Note: if we wrap data in to_serialize, it will be sent as a buffer, which
# is not encoded by msgpack.
await comm.write({"op": "echo", "x": data})
response = await comm.read()

assert response["result"] == data
await comm.close()
Loading