Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure large payload can be serialized and sent over comms #8507

Merged
merged 7 commits into from
Feb 16, 2024
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions distributed/comm/tcp.py
Original file line number Diff line number Diff line change
@@ -44,14 +44,14 @@
logger = logging.getLogger(__name__)


# Workaround for OpenSSL 1.0.2.
# Can drop with OpenSSL 1.1.1 used by Python 3.10+.
# ref: https://bugs.python.org/issue42853
if sys.version_info < (3, 10):
OPENSSL_MAX_CHUNKSIZE = 256 ** ctypes.sizeof(ctypes.c_int) // 2 - 1
else:
OPENSSL_MAX_CHUNKSIZE = 256 ** ctypes.sizeof(ctypes.c_size_t) - 1

# We must not load more than this into a buffer at a time
# It's currently unclear why that is
# see
# - https://github.com/dask/distributed/pull/5854
# - https://bugs.python.org/issue42853
# - https://github.com/dask/distributed/pull/8507

C_INT_MAX = 256 ** ctypes.sizeof(ctypes.c_int) // 2 - 1

Check warning on line 54 in distributed/comm/tcp.py

Codecov / codecov/patch

distributed/comm/tcp.py#L54

Added line #L54 was not covered by tests
MAX_BUFFER_SIZE = MEMORY_LIMIT / 2


@@ -286,8 +286,8 @@
2,
range(
0,
each_frame_nbytes + OPENSSL_MAX_CHUNKSIZE,
OPENSSL_MAX_CHUNKSIZE,
each_frame_nbytes + C_INT_MAX,
C_INT_MAX,
),
):
chunk = each_frame[i:j]
@@ -360,7 +360,7 @@

for i, j in sliding_window(
2,
range(0, n + OPENSSL_MAX_CHUNKSIZE, OPENSSL_MAX_CHUNKSIZE),
range(0, n + C_INT_MAX, C_INT_MAX),
):
chunk = buf[i:j]
actual = await stream.read_into(chunk) # type: ignore[arg-type]
@@ -432,7 +432,8 @@
A TLS-specific version of TCP.
"""

max_shard_size = min(OPENSSL_MAX_CHUNKSIZE, TCP.max_shard_size)
# Workaround for OpenSSL 1.0.2 (can drop with OpenSSL 1.1.1)
max_shard_size = min(C_INT_MAX, TCP.max_shard_size)

Check warning on line 436 in distributed/comm/tcp.py

Codecov / codecov/patch

distributed/comm/tcp.py#L436

Added line #L436 was not covered by tests

def _read_extra(self):
TCP._read_extra(self)
26 changes: 26 additions & 0 deletions distributed/protocol/tests/test_protocol.py
Original file line number Diff line number Diff line change
@@ -208,3 +208,29 @@ def test_fallback_to_pickle():
assert L[0].count(b"__Pickled__") == 1
assert L[0].count(b"__Serialized__") == 1
assert loads(L) == {np.int64(1): {2: "a"}, 3: ("b", "c"), 4: "d"}


@pytest.mark.slow
@pytest.mark.parametrize("typ", [bytes, str, "ext"])
def test_large_payload(typ):
critical_size = 2 * 1024**3
if typ == bytes:
large_payload = critical_size * b"0"
expected = large_payload
elif typ == str:
large_payload = critical_size * "0"
expected = large_payload
# Testing array and map dtypes is practically not possible since we'd have
# to create an actual list or dict object of critical size (i.e. not the
# content but the container itself). These are so large that msgpack is
# running forever
# elif typ == "array":
# large_payload = [b"0"] * critical_size
# expected = tuple(large_payload)
# elif typ == "map":
# large_payload = {x: b"0" for x in range(critical_size)}
# expected = large_payload
elif typ == "ext":
large_payload = msgpack.ExtType(1, b"0" * critical_size)
expected = large_payload
assert loads(dumps(large_payload)) == expected
4 changes: 1 addition & 3 deletions distributed/protocol/utils.py
Original file line number Diff line number Diff line change
@@ -12,9 +12,7 @@
BIG_BYTES_SHARD_SIZE = dask.utils.parse_bytes(dask.config.get("distributed.comm.shard"))


msgpack_opts = {
("max_%s_len" % x): 2**31 - 1 for x in ["str", "bin", "array", "map", "ext"]
}
msgpack_opts = {}

Check warning on line 15 in distributed/protocol/utils.py

Codecov / codecov/patch

distributed/protocol/utils.py#L15

Added line #L15 was not covered by tests
msgpack_opts["strict_map_key"] = False
msgpack_opts["raw"] = False

14 changes: 14 additions & 0 deletions distributed/tests/test_core.py
Original file line number Diff line number Diff line change
@@ -1481,3 +1481,17 @@ def sync_handler(val):
assert ledger == list(range(n))
finally:
await comm.close()


@pytest.mark.slow
@gen_test()
async def test_large_payload():
async with Server({"echo": echo_serialize}) as server:
await server.listen(0)

comm = await connect(server.address)
data = b"0" * 3 * 1024**3 # 3GB
await comm.write({"op": "echo", "x": to_serialize(data)})
response = await comm.read()
assert response["result"] == data
await comm.close()