Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase default work-stealing interval by 10x #8997

Merged
merged 5 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion distributed/dashboard/tests/test_scheduler_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ async def test_counters(c, s, a, b):
await asyncio.sleep(0.01)


@gen_cluster(client=True)
@gen_cluster(
client=True, config={"distributed.scheduler.work-stealing-interval": "100ms"}
)
async def test_stealing_events(c, s, a, b):
se = StealingEvents(s)

Expand Down
2 changes: 1 addition & 1 deletion distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ distributed:
idle-timeout: null # Shut down after this duration, like "1h" or "30 minutes"
no-workers-timeout: null # If a task remains unrunnable for longer than this, it fails.
work-stealing: True # workers should steal tasks from each other
work-stealing-interval: 100ms # Callback time for work stealing
work-stealing-interval: 1s # Callback time for work stealing
worker-saturation: 1.1 # Send this fraction of nthreads root tasks to workers
rootish-taskgroup: 5 # number of dependencies of a rootish tg
rootish-taskgroup-dependencies: 5 # number of dependencies of the dependencies of the rootish tg
Expand Down
8 changes: 6 additions & 2 deletions distributed/http/scheduler/tests/test_stealing_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ async def test_prometheus(c, s, a, b):
assert active_metrics == expected_metrics


@gen_cluster(client=True)
@gen_cluster(
client=True, config={"distributed.scheduler.work-stealing-interval": "100ms"}
)
async def test_prometheus_collect_count_total_by_cost_multipliers(c, s, a, b):
pytest.importorskip("prometheus_client")

Expand Down Expand Up @@ -58,7 +60,9 @@ async def fetch_metrics_by_cost_multipliers():
assert count == expected_count


@gen_cluster(client=True)
@gen_cluster(
client=True, config={"distributed.scheduler.work-stealing-interval": "100ms"}
)
async def test_prometheus_collect_cost_total_by_cost_multipliers(c, s, a, b):
pytest.importorskip("prometheus_client")

Expand Down
98 changes: 80 additions & 18 deletions distributed/tests/test_steal.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ async def test_work_stealing(c, s, a, b):
assert len(b.data) > 10


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 2)
@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)] * 2,
config={"distributed.scheduler.work-stealing-interval": "100ms"},
)
async def test_dont_steal_expensive_data_fast_computation(c, s, a, b):
np = pytest.importorskip("numpy")

Expand All @@ -91,7 +95,11 @@ async def test_dont_steal_expensive_data_fast_computation(c, s, a, b):
assert len(a.data) == 12


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 2)
@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)] * 2,
config={"distributed.scheduler.work-stealing-interval": "100ms"},
)
async def test_steal_cheap_data_slow_computation(c, s, a, b):
x = c.submit(slowinc, 100, delay=0.1) # learn that slowinc is slow
await wait(x)
Expand All @@ -104,7 +112,11 @@ async def test_steal_cheap_data_slow_computation(c, s, a, b):


@pytest.mark.slow
@gen_cluster(client=True, nthreads=[("", 1)] * 2, config=NO_AMM)
@gen_cluster(
client=True,
nthreads=[("", 1)] * 2,
config={"distributed.scheduler.work-stealing-interval": "100ms", **NO_AMM},
)
async def test_steal_expensive_data_slow_computation(c, s, a, b):
np = pytest.importorskip("numpy")

Expand All @@ -121,7 +133,11 @@ async def test_steal_expensive_data_slow_computation(c, s, a, b):
assert b.data # not empty


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 10, config=NO_AMM)
@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)] * 10,
config={"distributed.scheduler.work-stealing-interval": "100ms", **NO_AMM},
)
async def test_worksteal_many_thieves(c, s, *workers):
x = c.submit(slowinc, -1, delay=0.1)
await x
Expand Down Expand Up @@ -283,7 +299,11 @@ async def test_eventually_steal_unknown_functions(c, s, a, b):


@pytest.mark.skip(reason="")
@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3)
@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)] * 3,
config={"distributed.scheduler.work-stealing-interval": "100ms"},
)
async def test_steal_related_tasks(e, s, a, b, c):
futures = e.map(
slowinc, range(20), delay=0.05, workers=a.address, allow_other_workers=True
Expand All @@ -299,7 +319,11 @@ async def test_steal_related_tasks(e, s, a, b, c):
assert nearby > 10


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 10)
@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)] * 10,
config={"distributed.scheduler.work-stealing-interval": "100ms"},
)
async def test_dont_steal_fast_tasks_compute_time(c, s, *workers):
def do_nothing(x, y=None):
pass
Expand All @@ -315,7 +339,11 @@ def do_nothing(x, y=None):
assert len(s.workers[workers[0].address].has_what) == len(xs) + len(futures)


@gen_cluster(client=True, nthreads=[("", 1)])
@gen_cluster(
client=True,
nthreads=[("", 1)],
config={"distributed.scheduler.work-stealing-interval": "100ms"},
)
async def test_dont_steal_fast_tasks_blocklist(c, s, a):
async with BlockedGetData(s.address) as b:
# create a dependency
Expand Down Expand Up @@ -358,7 +386,11 @@ def fast_blocked(i, x):
assert ts.who_has == {ws_a}


@gen_cluster(client=True, nthreads=[("", 1)], config=NO_AMM)
@gen_cluster(
client=True,
nthreads=[("", 1)],
config={"distributed.scheduler.work-stealing-interval": "100ms", **NO_AMM},
)
async def test_new_worker_steals(c, s, a):
await wait(c.submit(slowinc, 1, delay=0.01))

Expand All @@ -380,7 +412,10 @@ async def test_new_worker_steals(c, s, a):
assert b.data


@gen_cluster(client=True)
@gen_cluster(
client=True,
config={"distributed.scheduler.work-stealing-interval": "100ms"},
)
async def test_work_steal_allow_other_workers(c, s, a, b):
# Note: this test also verifies the baseline for all other tests below
futures = c.map(
Expand All @@ -397,7 +432,10 @@ async def test_work_steal_allow_other_workers(c, s, a, b):
assert result == sum(map(inc, range(100)))


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1), ("127.0.0.1", 2)])
@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1), ("127.0.0.1", 2)],
)
async def test_dont_steal_worker_restrictions(c, s, a, b):
futures = c.map(slowinc, range(100), delay=0.05, workers=a.address)

Expand Down Expand Up @@ -504,7 +542,11 @@ async def test_steal_resource_restrictions(c, s, a):
assert 20 < len(a.state.tasks) < 80


@gen_cluster(client=True, nthreads=[("", 1, {"resources": {"A": 2, "C": 1}})])
@gen_cluster(
client=True,
nthreads=[("", 1, {"resources": {"A": 2, "C": 1}})],
config={"distributed.scheduler.work-stealing-interval": "100ms"},
)
async def test_steal_resource_restrictions_asym_diff(c, s, a):
# See https://github.com/dask/distributed/issues/5565
future = c.submit(slowinc, 1, delay=0.1, workers=a.address)
Expand Down Expand Up @@ -541,7 +583,11 @@ def slow(x):
assert max(durations) / min(durations) < 3


@gen_cluster(client=True, nthreads=[("127.0.0.1", 4)] * 2)
@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 4)] * 2,
config={"distributed.scheduler.work-stealing-interval": "100ms"},
)
async def test_dont_steal_executing_tasks(c, s, a, b):
futures = c.map(
slowinc, range(4), delay=0.1, workers=a.address, allow_other_workers=True
Expand Down Expand Up @@ -797,7 +843,10 @@ async def test_restart(c, s, a, b):
assert not any(x for L in steal.stealable.values() for x in L)


@gen_cluster(client=True)
@gen_cluster(
client=True,
config={"distributed.scheduler.work-stealing-interval": "100ms"},
)
async def test_steal_twice(c, s, a, b):
x = c.submit(inc, 1, workers=a.address)
await wait(x)
Expand Down Expand Up @@ -841,7 +890,10 @@ async def test_steal_twice(c, s, a, b):
@gen_cluster(
client=True,
nthreads=[("", 1)] * 3,
config={"distributed.worker.memory.pause": False},
config={
"distributed.worker.memory.pause": False,
"distributed.scheduler.work-stealing-interval": "100ms",
},
)
async def test_paused_workers_must_not_steal(c, s, w1, w2, w3):
w2.status = Status.paused
Expand All @@ -859,7 +911,10 @@ async def test_paused_workers_must_not_steal(c, s, w1, w2, w3):
assert w3.data


@gen_cluster(client=True)
@gen_cluster(
client=True,
config={"distributed.scheduler.work-stealing-interval": "100ms"},
)
async def test_dont_steal_already_released(c, s, a, b):
future = c.submit(slowinc, 1, delay=0.05, workers=a.address)
key = future.key
Expand All @@ -886,7 +941,11 @@ async def test_dont_steal_already_released(c, s, a, b):
await asyncio.sleep(0.05)


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 2)
@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)] * 2,
config={"distributed.scheduler.work-stealing-interval": "100ms"},
)
async def test_dont_steal_long_running_tasks(c, s, a, b):
def long(delay):
with worker_client() as c:
Expand Down Expand Up @@ -974,7 +1033,7 @@ async def test_lose_task(c, s, a, b):
assert "Error" not in out


@pytest.mark.parametrize("interval, expected", [(None, 100), ("500ms", 500), (2, 2)])
@pytest.mark.parametrize("interval, expected", [(None, 1000), ("500ms", 500), (2, 2)])
@gen_cluster(nthreads=[], config={"distributed.scheduler.work-stealing": False})
async def test_parse_stealing_interval(s, interval, expected):
from distributed.scheduler import WorkStealing
Expand All @@ -991,7 +1050,10 @@ async def test_parse_stealing_interval(s, interval, expected):
assert s.periodic_callbacks["stealing"].callback_time == expected


@gen_cluster(client=True)
@gen_cluster(
client=True,
config={"distributed.scheduler.work-stealing-interval": "100ms"},
)
async def test_balance_with_longer_task(c, s, a, b):
np = pytest.importorskip("numpy")

Expand Down
Loading