From 241ab33bd39462803efdf2beae288032f5983868 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 30 Jan 2025 12:46:52 +0100 Subject: [PATCH 1/5] Increase work-stealing interval --- distributed/distributed.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index d3ae4844c5b..a7624dd9eff 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -21,7 +21,7 @@ distributed: idle-timeout: null # Shut down after this duration, like "1h" or "30 minutes" no-workers-timeout: null # If a task remains unrunnable for longer than this, it fails. work-stealing: True # workers should steal tasks from each other - work-stealing-interval: 100ms # Callback time for work stealing + work-stealing-interval: 1s # Callback time for work stealing worker-saturation: 1.1 # Send this fraction of nthreads root tasks to workers rootish-taskgroup: 5 # number of dependencies of a rootish tg rootish-taskgroup-dependencies: 5 # number of dependencies of the dependencies of the rootish tg From 68a3eb1c916c7289d1d2a826dd5a94196b766ae7 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 30 Jan 2025 14:24:10 +0100 Subject: [PATCH 2/5] fix tests --- distributed/tests/test_steal.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 3976857c9e7..6181e02071a 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -104,7 +104,11 @@ async def test_steal_cheap_data_slow_computation(c, s, a, b): @pytest.mark.slow -@gen_cluster(client=True, nthreads=[("", 1)] * 2, config=NO_AMM) +@gen_cluster( + client=True, + nthreads=[("", 1)] * 2, + config={"distributed.scheduler.work-stealing-interval": "100ms", **NO_AMM}, +) async def test_steal_expensive_data_slow_computation(c, s, a, b): np = pytest.importorskip("numpy") @@ -974,7 +978,7 @@ async def test_lose_task(c, s, a, b): assert "Error" not in out -@pytest.mark.parametrize("interval, expected", [(None, 100), ("500ms", 500), (2, 2)]) +@pytest.mark.parametrize("interval, expected", [(None, 1000), ("500ms", 500), (2, 2)]) @gen_cluster(nthreads=[], config={"distributed.scheduler.work-stealing": False}) async def test_parse_stealing_interval(s, interval, expected): from distributed.scheduler import WorkStealing From d7adf6e1a94a873d3f82132acd8bffc5bca93308 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 30 Jan 2025 14:27:44 +0100 Subject: [PATCH 3/5] fix tests --- distributed/dashboard/tests/test_scheduler_bokeh.py | 4 +++- distributed/http/scheduler/tests/test_stealing_http.py | 8 ++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/distributed/dashboard/tests/test_scheduler_bokeh.py b/distributed/dashboard/tests/test_scheduler_bokeh.py index 44cd97fda56..a5e52db838c 100644 --- a/distributed/dashboard/tests/test_scheduler_bokeh.py +++ b/distributed/dashboard/tests/test_scheduler_bokeh.py @@ -137,7 +137,9 @@ async def test_counters(c, s, a, b): await asyncio.sleep(0.01) -@gen_cluster(client=True) +@gen_cluster( + client=True, config={"distributed.scheduler.work-stealing-interval": "100ms"} +) async def test_stealing_events(c, s, a, b): se = StealingEvents(s) diff --git a/distributed/http/scheduler/tests/test_stealing_http.py b/distributed/http/scheduler/tests/test_stealing_http.py index efd5589612c..860948afe3f 100644 --- a/distributed/http/scheduler/tests/test_stealing_http.py +++ b/distributed/http/scheduler/tests/test_stealing_http.py @@ -25,7 +25,9 @@ async def test_prometheus(c, s, a, b): assert active_metrics == expected_metrics -@gen_cluster(client=True) +@gen_cluster( + client=True, config={"distributed.scheduler.work-stealing-interval": "100ms"} +) async def test_prometheus_collect_count_total_by_cost_multipliers(c, s, a, b): pytest.importorskip("prometheus_client") @@ -58,7 +60,9 @@ async def fetch_metrics_by_cost_multipliers(): assert count == expected_count -@gen_cluster(client=True) +@gen_cluster( + client=True, config={"distributed.scheduler.work-stealing-interval": "100ms"} +) async def test_prometheus_collect_cost_total_by_cost_multipliers(c, s, a, b): pytest.importorskip("prometheus_client") From d466ff5421c919d64ce11f19365ae532c3b5be6b Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 30 Jan 2025 16:15:01 +0100 Subject: [PATCH 4/5] fix tests --- distributed/tests/test_steal.py | 117 ++++++++++++++++++++++++++------ 1 file changed, 97 insertions(+), 20 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 6181e02071a..925faee88b2 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -72,7 +72,11 @@ async def test_work_stealing(c, s, a, b): assert len(b.data) > 10 -@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 2) +@gen_cluster( + client=True, + nthreads=[("127.0.0.1", 1)] * 2, + config={"distributed.scheduler.work-stealing-interval": "100ms"}, +) async def test_dont_steal_expensive_data_fast_computation(c, s, a, b): np = pytest.importorskip("numpy") @@ -91,7 +95,11 @@ async def test_dont_steal_expensive_data_fast_computation(c, s, a, b): assert len(a.data) == 12 -@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 2) +@gen_cluster( + client=True, + nthreads=[("127.0.0.1", 1)] * 2, + config={"distributed.scheduler.work-stealing-interval": "100ms"}, +) async def test_steal_cheap_data_slow_computation(c, s, a, b): x = c.submit(slowinc, 100, delay=0.1) # learn that slowinc is slow await wait(x) @@ -125,7 +133,11 @@ async def test_steal_expensive_data_slow_computation(c, s, a, b): assert b.data # not empty -@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 10, config=NO_AMM) +@gen_cluster( + client=True, + nthreads=[("127.0.0.1", 1)] * 10, + config={"distributed.scheduler.work-stealing-interval": "100ms", **NO_AMM}, +) async def test_worksteal_many_thieves(c, s, *workers): x = c.submit(slowinc, -1, delay=0.1) await x @@ -287,7 +299,11 @@ async def test_eventually_steal_unknown_functions(c, s, a, b): @pytest.mark.skip(reason="") -@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3) +@gen_cluster( + client=True, + nthreads=[("127.0.0.1", 1)] * 3, + config={"distributed.scheduler.work-stealing-interval": "100ms"}, +) async def test_steal_related_tasks(e, s, a, b, c): futures = e.map( slowinc, range(20), delay=0.05, workers=a.address, allow_other_workers=True @@ -303,7 +319,11 @@ async def test_steal_related_tasks(e, s, a, b, c): assert nearby > 10 -@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 10) +@gen_cluster( + client=True, + nthreads=[("127.0.0.1", 1)] * 10, + config={"distributed.scheduler.work-stealing-interval": "100ms"}, +) async def test_dont_steal_fast_tasks_compute_time(c, s, *workers): def do_nothing(x, y=None): pass @@ -319,7 +339,11 @@ def do_nothing(x, y=None): assert len(s.workers[workers[0].address].has_what) == len(xs) + len(futures) -@gen_cluster(client=True, nthreads=[("", 1)]) +@gen_cluster( + client=True, + nthreads=[("", 1)], + config={"distributed.scheduler.work-stealing-interval": "100ms"}, +) async def test_dont_steal_fast_tasks_blocklist(c, s, a): async with BlockedGetData(s.address) as b: # create a dependency @@ -362,7 +386,11 @@ def fast_blocked(i, x): assert ts.who_has == {ws_a} -@gen_cluster(client=True, nthreads=[("", 1)], config=NO_AMM) +@gen_cluster( + client=True, + nthreads=[("", 1)], + config={"distributed.scheduler.work-stealing-interval": "100ms", **NO_AMM}, +) async def test_new_worker_steals(c, s, a): await wait(c.submit(slowinc, 1, delay=0.01)) @@ -384,8 +412,16 @@ async def test_new_worker_steals(c, s, a): assert b.data -@gen_cluster(client=True) -async def test_work_steal_allow_other_workers(c, s, a, b): +@gen_cluster( + client=True, + config={"distributed.scheduler.work-stealing-interval": "100ms"}, +) +async def test_work_steal_allow_other_workers( + c, + s, + a, + b, +): # Note: this test also verifies the baseline for all other tests below futures = c.map( slowinc, range(100), workers=a.address, allow_other_workers=True, delay=0.05 @@ -401,7 +437,10 @@ async def test_work_steal_allow_other_workers(c, s, a, b): assert result == sum(map(inc, range(100))) -@gen_cluster(client=True, nthreads=[("127.0.0.1", 1), ("127.0.0.1", 2)]) +@gen_cluster( + client=True, + nthreads=[("127.0.0.1", 1), ("127.0.0.1", 2)], +) async def test_dont_steal_worker_restrictions(c, s, a, b): futures = c.map(slowinc, range(100), delay=0.05, workers=a.address) @@ -508,8 +547,16 @@ async def test_steal_resource_restrictions(c, s, a): assert 20 < len(a.state.tasks) < 80 -@gen_cluster(client=True, nthreads=[("", 1, {"resources": {"A": 2, "C": 1}})]) -async def test_steal_resource_restrictions_asym_diff(c, s, a): +@gen_cluster( + client=True, + nthreads=[("", 1, {"resources": {"A": 2, "C": 1}})], + config={"distributed.scheduler.work-stealing-interval": "100ms"}, +) +async def test_steal_resource_restrictions_asym_diff( + c, + s, + a, +): # See https://github.com/dask/distributed/issues/5565 future = c.submit(slowinc, 1, delay=0.1, workers=a.address) await future @@ -545,7 +592,11 @@ def slow(x): assert max(durations) / min(durations) < 3 -@gen_cluster(client=True, nthreads=[("127.0.0.1", 4)] * 2) +@gen_cluster( + client=True, + nthreads=[("127.0.0.1", 4)] * 2, + config={"distributed.scheduler.work-stealing-interval": "100ms"}, +) async def test_dont_steal_executing_tasks(c, s, a, b): futures = c.map( slowinc, range(4), delay=0.1, workers=a.address, allow_other_workers=True @@ -801,8 +852,16 @@ async def test_restart(c, s, a, b): assert not any(x for L in steal.stealable.values() for x in L) -@gen_cluster(client=True) -async def test_steal_twice(c, s, a, b): +@gen_cluster( + client=True, + config={"distributed.scheduler.work-stealing-interval": "100ms"}, +) +async def test_steal_twice( + c, + s, + a, + b, +): x = c.submit(inc, 1, workers=a.address) await wait(x) @@ -845,7 +904,10 @@ async def test_steal_twice(c, s, a, b): @gen_cluster( client=True, nthreads=[("", 1)] * 3, - config={"distributed.worker.memory.pause": False}, + config={ + "distributed.worker.memory.pause": False, + "distributed.scheduler.work-stealing-interval": "100ms", + }, ) async def test_paused_workers_must_not_steal(c, s, w1, w2, w3): w2.status = Status.paused @@ -863,7 +925,10 @@ async def test_paused_workers_must_not_steal(c, s, w1, w2, w3): assert w3.data -@gen_cluster(client=True) +@gen_cluster( + client=True, + config={"distributed.scheduler.work-stealing-interval": "100ms"}, +) async def test_dont_steal_already_released(c, s, a, b): future = c.submit(slowinc, 1, delay=0.05, workers=a.address) key = future.key @@ -890,8 +955,17 @@ async def test_dont_steal_already_released(c, s, a, b): await asyncio.sleep(0.05) -@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 2) -async def test_dont_steal_long_running_tasks(c, s, a, b): +@gen_cluster( + client=True, + nthreads=[("127.0.0.1", 1)] * 2, + config={"distributed.scheduler.work-stealing-interval": "100ms"}, +) +async def test_dont_steal_long_running_tasks( + c, + s, + a, + b, +): def long(delay): with worker_client() as c: sleep(delay) @@ -995,7 +1069,10 @@ async def test_parse_stealing_interval(s, interval, expected): assert s.periodic_callbacks["stealing"].callback_time == expected -@gen_cluster(client=True) +@gen_cluster( + client=True, + config={"distributed.scheduler.work-stealing-interval": "100ms"}, +) async def test_balance_with_longer_task(c, s, a, b): np = pytest.importorskip("numpy") From 7133528bb3ffa58197814deffa77b6e9f0dfa712 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 30 Jan 2025 16:21:12 +0100 Subject: [PATCH 5/5] fix tests --- distributed/tests/test_steal.py | 27 ++++----------------------- 1 file changed, 4 insertions(+), 23 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 925faee88b2..d575e75c4a6 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -416,12 +416,7 @@ async def test_new_worker_steals(c, s, a): client=True, config={"distributed.scheduler.work-stealing-interval": "100ms"}, ) -async def test_work_steal_allow_other_workers( - c, - s, - a, - b, -): +async def test_work_steal_allow_other_workers(c, s, a, b): # Note: this test also verifies the baseline for all other tests below futures = c.map( slowinc, range(100), workers=a.address, allow_other_workers=True, delay=0.05 @@ -552,11 +547,7 @@ async def test_steal_resource_restrictions(c, s, a): nthreads=[("", 1, {"resources": {"A": 2, "C": 1}})], config={"distributed.scheduler.work-stealing-interval": "100ms"}, ) -async def test_steal_resource_restrictions_asym_diff( - c, - s, - a, -): +async def test_steal_resource_restrictions_asym_diff(c, s, a): # See https://github.com/dask/distributed/issues/5565 future = c.submit(slowinc, 1, delay=0.1, workers=a.address) await future @@ -856,12 +847,7 @@ async def test_restart(c, s, a, b): client=True, config={"distributed.scheduler.work-stealing-interval": "100ms"}, ) -async def test_steal_twice( - c, - s, - a, - b, -): +async def test_steal_twice(c, s, a, b): x = c.submit(inc, 1, workers=a.address) await wait(x) @@ -960,12 +946,7 @@ async def test_dont_steal_already_released(c, s, a, b): nthreads=[("127.0.0.1", 1)] * 2, config={"distributed.scheduler.work-stealing-interval": "100ms"}, ) -async def test_dont_steal_long_running_tasks( - c, - s, - a, - b, -): +async def test_dont_steal_long_running_tasks(c, s, a, b): def long(delay): with worker_client() as c: sleep(delay)