Skip to content

Commit 98348e7

Browse files
committed
Fix excessive logging on P2P retry
1 parent 045dc64 commit 98348e7

File tree

2 files changed

+22
-6
lines changed

2 files changed

+22
-6
lines changed

distributed/shuffle/_core.py

+13-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,15 @@
66
import itertools
77
import pickle
88
import time
9-
from collections.abc import Callable, Generator, Hashable, Iterable, Iterator, Sequence
9+
from collections.abc import (
10+
Callable,
11+
Coroutine,
12+
Generator,
13+
Hashable,
14+
Iterable,
15+
Iterator,
16+
Sequence,
17+
)
1018
from concurrent.futures import ThreadPoolExecutor
1119
from dataclasses import dataclass, field
1220
from enum import Enum
@@ -212,8 +220,11 @@ async def send(
212220
else:
213221
shards_or_bytes = shards
214222

223+
def _send() -> Coroutine[Any, Any, None]:
224+
return self._send(address, shards_or_bytes)
225+
215226
return await retry(
216-
partial(self._send, address, shards_or_bytes),
227+
_send,
217228
count=self.RETRY_COUNT,
218229
delay_min=self.RETRY_DELAY_MIN,
219230
delay_max=self.RETRY_DELAY_MAX,

distributed/shuffle/tests/test_shuffle.py

+9-4
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
from distributed.utils import Deadline
6363
from distributed.utils_test import (
6464
async_poll_for,
65+
captured_logger,
6566
cluster,
6667
gen_cluster,
6768
gen_test,
@@ -2642,10 +2643,14 @@ async def test_flaky_connect_recover_with_retry(c, s, a, b):
26422643
x = dd.shuffle.shuffle(df, "x")
26432644

26442645
rpc = await FlakyConnectionPool(failing_connects=1)
2645-
2646-
with mock.patch.object(a, "rpc", rpc):
2647-
await c.compute(x)
2648-
assert rpc.failed_attempts == 1
2646+
with captured_logger("distributed.utils_comm") as caplog:
2647+
with mock.patch.object(a, "rpc", rpc):
2648+
await c.compute(x)
2649+
assert rpc.failed_attempts == 1
2650+
# Assert that we do not log the binary payload (or any other excessive amount of data)
2651+
logs = caplog.getvalue()
2652+
assert len(logs) < 200
2653+
assert "Retrying" in logs
26492654

26502655
await check_worker_cleanup(a)
26512656
await check_worker_cleanup(b)

0 commit comments

Comments
 (0)