Skip to content

Commit 50def75

Browse files
committed
Better logging for worker removal
1 parent a5a6e99 commit 50def75

File tree

4 files changed

+198
-28
lines changed

4 files changed

+198
-28
lines changed

distributed/scheduler.py

+66-24
Original file line numberDiff line numberDiff line change
@@ -5182,14 +5182,6 @@ async def remove_worker(
51825182

51835183
ws = self.workers[address]
51845184

5185-
event_msg = {
5186-
"action": "remove-worker",
5187-
"processing-tasks": {ts.key for ts in ws.processing},
5188-
}
5189-
self.log_event(address, event_msg.copy())
5190-
event_msg["worker"] = address
5191-
self.log_event("all", event_msg)
5192-
51935185
logger.info(f"Remove worker {ws} ({stimulus_id=})")
51945186
if close:
51955187
with suppress(AttributeError, CommClosedError):
@@ -5220,6 +5212,7 @@ async def remove_worker(
52205212

52215213
recommendations: Recs = {}
52225214

5215+
processing_keys = {ts.key for ts in ws.processing}
52235216
for ts in list(ws.processing):
52245217
k = ts.key
52255218
recommendations[k] = "released"
@@ -5244,21 +5237,47 @@ async def remove_worker(
52445237
worker=address,
52455238
)
52465239
recommendations.update(r)
5247-
logger.info(
5240+
logger.error(
52485241
"Task %s marked as failed because %d workers died"
52495242
" while trying to run it",
52505243
ts.key,
52515244
ts.suspicious,
52525245
)
52535246

5247+
recompute_keys = set()
5248+
lost_keys = set()
5249+
52545250
for ts in list(ws.has_what):
52555251
self.remove_replica(ts, ws)
52565252
if not ts.who_has:
52575253
if ts.run_spec:
5254+
recompute_keys.add(ts.key)
52585255
recommendations[ts.key] = "released"
52595256
else: # pure data
5257+
lost_keys.add(ts.key)
52605258
recommendations[ts.key] = "forgotten"
52615259

5260+
if recompute_keys:
5261+
logger.warning(
5262+
f"Removing worker {ws.address!r} caused the cluster to lose "
5263+
"already computed task(s), which will be recomputed elsewhere: "
5264+
f"{recompute_keys} ({stimulus_id=})"
5265+
)
5266+
if lost_keys:
5267+
logger.error(
5268+
f"Removing worker {ws.address!r} caused the cluster to lose scattered "
5269+
f"data, which can't be recovered: {lost_keys} ({stimulus_id=})"
5270+
)
5271+
5272+
event_msg = {
5273+
"action": "remove-worker",
5274+
"processing-tasks": processing_keys,
5275+
"lost-computed-tasks": recompute_keys,
5276+
"lost-scattered-tasks": lost_keys,
5277+
"stimulus_id": stimulus_id,
5278+
}
5279+
self.log_event(address, event_msg.copy())
5280+
52625281
self.transitions(recommendations, stimulus_id=stimulus_id)
52635282

52645283
awaitables = []
@@ -5827,6 +5846,7 @@ def handle_worker_status_change(
58275846
"action": "worker-status-change",
58285847
"prev-status": prev_status.name,
58295848
"status": ws.status.name,
5849+
"stimulus_id": stimulus_id,
58305850
},
58315851
)
58325852
logger.debug(f"Worker status {prev_status.name} -> {status} - {ws}")
@@ -7163,7 +7183,7 @@ async def retire_workers(
71637183
try:
71647184
coros = []
71657185
for ws in wss:
7166-
logger.info("Retiring worker %s", ws.address)
7186+
logger.info(f"Retiring worker {ws.address!r} ({stimulus_id=!r})")
71677187

71687188
policy = RetireWorker(ws.address)
71697189
amm.add_policy(policy)
@@ -7200,19 +7220,37 @@ async def retire_workers(
72007220
# time (depending on interval settings)
72017221
amm.run_once()
72027222

7203-
workers_info = {
7204-
addr: info
7205-
for addr, info in await asyncio.gather(*coros)
7206-
if addr is not None
7207-
}
7223+
workers_info_ok = {}
7224+
workers_info_abort = {}
7225+
for addr, result, info in await asyncio.gather(*coros):
7226+
if result == "OK":
7227+
workers_info_ok[addr] = info
7228+
else:
7229+
workers_info_abort[addr] = info
7230+
72087231
finally:
72097232
if stop_amm:
72107233
amm.stop()
72117234

7212-
self.log_event("all", {"action": "retire-workers", "workers": workers_info})
7213-
self.log_event(list(workers_info), {"action": "retired"})
7235+
self.log_event(
7236+
"all",
7237+
{
7238+
"action": "retire-workers",
7239+
"retired": workers_info_ok,
7240+
"could-not-retire": workers_info_abort,
7241+
"stimulus_id": stimulus_id,
7242+
},
7243+
)
7244+
self.log_event(
7245+
list(workers_info_ok),
7246+
{"action": "retired", "stimulus_id": stimulus_id},
7247+
)
7248+
self.log_event(
7249+
list(workers_info_abort),
7250+
{"action": "could-not-retire", "stimulus_id": stimulus_id},
7251+
)
72147252

7215-
return workers_info
7253+
return workers_info_ok
72167254

72177255
async def _track_retire_worker(
72187256
self,
@@ -7222,7 +7260,7 @@ async def _track_retire_worker(
72227260
close: bool,
72237261
remove: bool,
72247262
stimulus_id: str,
7225-
) -> tuple[str | None, dict]:
7263+
) -> tuple[str, Literal["OK", "no-recipients"], dict]:
72267264
while not policy.done():
72277265
# Sleep 0.01s when there are 4 tasks or less
72287266
# Sleep 0.5s when there are 200 or more
@@ -7240,10 +7278,14 @@ async def _track_retire_worker(
72407278
"stimulus_id": stimulus_id,
72417279
}
72427280
)
7243-
return None, {}
7281+
logger.warning(
7282+
f"Could not retire worker {ws.address!r}: unique data could not be "
7283+
f"moved to any other worker ({stimulus_id=!r})"
7284+
)
7285+
return ws.address, "no-recipients", ws.identity()
72447286

72457287
logger.debug(
7246-
"All unique keys on worker %s have been replicated elsewhere", ws.address
7288+
f"All unique keys on worker {ws.address!r} have been replicated elsewhere"
72477289
)
72487290

72497291
if remove:
@@ -7253,8 +7295,8 @@ async def _track_retire_worker(
72537295
elif close:
72547296
self.close_worker(ws.address)
72557297

7256-
logger.info("Retired worker %s", ws.address)
7257-
return ws.address, ws.identity()
7298+
logger.info(f"Retired worker {ws.address!r} ({stimulus_id=!r})")
7299+
return ws.address, "OK", ws.identity()
72587300

72597301
def add_keys(
72607302
self, worker: str, keys: Collection[Key] = (), stimulus_id: str | None = None
@@ -7390,7 +7432,7 @@ async def feed(
73907432
def log_worker_event(
73917433
self, worker: str, topic: str | Collection[str], msg: Any
73927434
) -> None:
7393-
if isinstance(msg, dict):
7435+
if isinstance(msg, dict) and worker != topic:
73947436
msg["worker"] = worker
73957437
self.log_event(topic, msg)
73967438

distributed/tests/test_client.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -5196,7 +5196,7 @@ def test_quiet_client_close(loop):
51965196
threads_per_worker=4,
51975197
) as c:
51985198
futures = c.map(slowinc, range(1000), delay=0.01)
5199-
sleep(0.200) # stop part-way
5199+
sleep(0.2) # stop part-way
52005200
sleep(0.1) # let things settle
52015201

52025202
out = logger.getvalue()

distributed/tests/test_scheduler.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -3047,7 +3047,7 @@ async def connect(self, *args, **kwargs):
30473047

30483048

30493049
@gen_cluster(client=True)
3050-
async def test_gather_failing_cnn_recover(c, s, a, b):
3050+
async def test_gather_failing_can_recover(c, s, a, b):
30513051
x = await c.scatter({"x": 1}, workers=a.address)
30523052
rpc = await FlakyConnectionPool(failing_connections=1)
30533053
with mock.patch.object(s, "rpc", rpc), dask.config.set(

distributed/tests/test_worker.py

+130-2
Original file line numberDiff line numberDiff line change
@@ -2940,32 +2940,160 @@ async def test_worker_status_sync(s, a):
29402940
await asyncio.sleep(0.01)
29412941

29422942
events = [ev for _, ev in s.events[ws.address] if ev["action"] != "heartbeat"]
2943+
for ev in events:
2944+
if "stimulus_id" in ev: # Strip timestamp
2945+
ev["stimulus_id"] = ev["stimulus_id"].rsplit("-", 1)[0]
2946+
29432947
assert events == [
29442948
{"action": "add-worker"},
29452949
{
29462950
"action": "worker-status-change",
29472951
"prev-status": "init",
29482952
"status": "running",
2953+
"stimulus_id": "worker-status-change",
29492954
},
29502955
{
29512956
"action": "worker-status-change",
29522957
"prev-status": "running",
29532958
"status": "paused",
2959+
"stimulus_id": "worker-status-change",
29542960
},
29552961
{
29562962
"action": "worker-status-change",
29572963
"prev-status": "paused",
29582964
"status": "running",
2965+
"stimulus_id": "worker-status-change",
29592966
},
29602967
{
29612968
"action": "worker-status-change",
29622969
"prev-status": "running",
29632970
"status": "closing_gracefully",
2971+
"stimulus_id": "retire-workers",
2972+
},
2973+
{
2974+
"action": "remove-worker",
2975+
"lost-computed-tasks": set(),
2976+
"lost-scattered-tasks": set(),
2977+
"processing-tasks": set(),
2978+
"stimulus_id": "retire-workers",
29642979
},
2965-
{"action": "remove-worker", "processing-tasks": set()},
2966-
{"action": "retired"},
2980+
{"action": "retired", "stimulus_id": "retire-workers"},
2981+
]
2982+
2983+
2984+
@gen_cluster(client=True)
2985+
async def test_log_remove_worker(c, s, a, b):
2986+
# Computed task
2987+
x = c.submit(inc, 1, key="x", workers=a.address)
2988+
await x
2989+
ev = Event()
2990+
# Processing task
2991+
y = c.submit(
2992+
lambda ev: ev.wait(), ev, key="y", workers=a.address, allow_other_workers=True
2993+
)
2994+
await wait_for_state("y", "processing", s)
2995+
# Scattered task
2996+
z = await c.scatter({"z": 3}, workers=a.address)
2997+
2998+
s.events.clear()
2999+
3000+
with captured_logger("distributed.scheduler", level=logging.INFO) as log:
3001+
# Successful graceful shutdown
3002+
await s.retire_workers([a.address], stimulus_id="graceful")
3003+
# Refuse to retire gracefully as there's nowhere to put x and z
3004+
await s.retire_workers([b.address], stimulus_id="graceful_abort")
3005+
await asyncio.sleep(0.2)
3006+
# Ungraceful shutdown
3007+
await s.remove_worker(b.address, stimulus_id="ungraceful")
3008+
await asyncio.sleep(0.2)
3009+
await ev.set()
3010+
3011+
assert log.getvalue().splitlines() == [
3012+
# Successful graceful
3013+
f"Retiring worker '{a.address}' (stimulus_id='graceful')",
3014+
f"Remove worker <WorkerState '{a.address}', name: 0, status: "
3015+
"closing_gracefully, memory: 2, processing: 1> (stimulus_id='graceful')",
3016+
f"Retired worker '{a.address}' (stimulus_id='graceful')",
3017+
# Aborted graceful
3018+
f"Retiring worker '{b.address}' (stimulus_id='graceful_abort')",
3019+
f"Could not retire worker '{b.address}': unique data could not be "
3020+
"moved to any other worker (stimulus_id='graceful_abort')",
3021+
# Ungraceful
3022+
f"Remove worker <WorkerState '{b.address}', name: 1, status: "
3023+
"running, memory: 2, processing: 1> (stimulus_id='ungraceful')",
3024+
f"Removing worker '{b.address}' caused the cluster to lose already "
3025+
"computed task(s), which will be recomputed elsewhere: {'x'} "
3026+
"(stimulus_id='ungraceful')",
3027+
f"Removing worker '{b.address}' caused the cluster to lose scattered "
3028+
"data, which can't be recovered: {'z'} (stimulus_id='ungraceful')",
3029+
"Lost all workers",
29673030
]
29683031

3032+
events = {topic: [ev for _, ev in evs] for topic, evs in s.events.items()}
3033+
for evs in events.values():
3034+
for ev in evs:
3035+
if ev["action"] == "retire-workers":
3036+
for k in ("retired", "could-not-retire"):
3037+
ev[k] = {addr: "snip" for addr in ev[k]}
3038+
if "stimulus_id" in ev: # Strip timestamp
3039+
ev["stimulus_id"] = ev["stimulus_id"].rsplit("-", 1)[0]
3040+
3041+
assert events == {
3042+
a.address: [
3043+
{
3044+
"action": "worker-status-change",
3045+
"prev-status": "running",
3046+
"status": "closing_gracefully",
3047+
"stimulus_id": "graceful",
3048+
},
3049+
{
3050+
"action": "remove-worker",
3051+
"lost-computed-tasks": set(),
3052+
"lost-scattered-tasks": set(),
3053+
"processing-tasks": {"y"},
3054+
"stimulus_id": "graceful",
3055+
},
3056+
{"action": "retired", "stimulus_id": "graceful"},
3057+
],
3058+
b.address: [
3059+
{
3060+
"action": "worker-status-change",
3061+
"prev-status": "running",
3062+
"status": "closing_gracefully",
3063+
"stimulus_id": "graceful_abort",
3064+
},
3065+
{"action": "could-not-retire", "stimulus_id": "graceful_abort"},
3066+
{
3067+
"action": "worker-status-change",
3068+
"prev-status": "closing_gracefully",
3069+
"status": "running",
3070+
"stimulus_id": "worker-status-change",
3071+
},
3072+
{
3073+
"action": "remove-worker",
3074+
"lost-computed-tasks": {"x"},
3075+
"lost-scattered-tasks": {"z"},
3076+
"processing-tasks": {"y"},
3077+
"stimulus_id": "ungraceful",
3078+
},
3079+
{"action": "closing-worker", "reason": "scheduler-remove-worker"},
3080+
],
3081+
"all": [
3082+
{
3083+
"action": "retire-workers",
3084+
"stimulus_id": "graceful",
3085+
"retired": {a.address: "snip"},
3086+
"could-not-retire": {},
3087+
},
3088+
{
3089+
"action": "retire-workers",
3090+
"stimulus_id": "graceful_abort",
3091+
"retired": {},
3092+
"could-not-retire": {b.address: "snip"},
3093+
},
3094+
],
3095+
}
3096+
29693097

29703098
@gen_cluster(client=True)
29713099
async def test_task_flight_compute_oserror(c, s, a, b):

0 commit comments

Comments
 (0)