Skip to content

Commit b03efee

Browse files
authored
Keep old dependencies on run_spec collision (#8512)
1 parent 207bbad commit b03efee

File tree

2 files changed

+72
-31
lines changed

2 files changed

+72
-31
lines changed

distributed/scheduler.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -4787,12 +4787,18 @@ def _generate_taskstates(
47874787
# Additionally check dependency names. This should only be necessary
47884788
# if run_specs can't be tokenized deterministically.
47894789
deps_lhs = {dts.key for dts in ts.dependencies}
4790-
deps_rhs = dependencies.get(k, set())
4790+
deps_rhs = dependencies[k]
47914791

47924792
# FIXME It would be a really healthy idea to change this to a hard
47934793
# failure. However, this is not possible at the moment because of
47944794
# https://github.com/dask/dask/issues/9888
47954795
if tok_lhs != tok_rhs or deps_lhs != deps_rhs:
4796+
# Retain old run_spec and dependencies; rerun them if necessary.
4797+
# This sweeps the issue of collision under the carpet as long as the
4798+
# old and new task produce the same output - such as in
4799+
# dask/dask#9888.
4800+
dependencies[k] = deps_lhs
4801+
47964802
if ts.group not in tgs_with_bad_run_spec:
47974803
tgs_with_bad_run_spec.add(ts.group)
47984804
logger.warning(

distributed/tests/test_scheduler.py

+65-30
Original file line numberDiff line numberDiff line change
@@ -4704,36 +4704,74 @@ async def test_html_repr(c, s, a, b):
47044704
await f
47054705

47064706

4707-
@pytest.mark.parametrize("add_deps", [False, True])
4707+
@pytest.mark.parametrize("deps", ["same", "less", "more"])
47084708
@gen_cluster(client=True, nthreads=[])
4709-
async def test_resubmit_different_task_same_key(c, s, add_deps):
4709+
async def test_resubmit_different_task_same_key_before_previous_is_done(c, s, deps):
47104710
"""If an intermediate key has a different run_spec (either the callable function or
47114711
the dependencies / arguments) that will conflict with what was previously defined,
47124712
it should raise an error since this can otherwise break in many different places and
47134713
cause either spurious exceptions or even deadlocks.
47144714
4715+
In this specific test, the previous run_spec has not been computed yet.
4716+
See also test_resubmit_different_task_same_key_after_previous_is_done.
4717+
47154718
For a real world example where this can trigger, see
47164719
https://github.com/dask/dask/issues/9888
47174720
"""
4718-
y1 = c.submit(inc, 1, key="y")
4721+
x1 = c.submit(inc, 1, key="x1")
4722+
y_old = c.submit(inc, x1, key="y")
47194723

4720-
x = delayed(inc)(1, dask_key_name="x") if add_deps else 2
4721-
y2 = delayed(inc)(x, dask_key_name="y")
4722-
z = delayed(inc)(y2, dask_key_name="z")
4723-
4724-
if add_deps: # add_deps=True corrupts the state machine
4725-
s.validate = False
4724+
x1b = x1 if deps != "less" else 2
4725+
x2 = delayed(inc)(10, dask_key_name="x2") if deps == "more" else 11
4726+
y_new = delayed(sum)([x1b, x2], dask_key_name="y")
4727+
z = delayed(inc)(y_new, dask_key_name="z")
47264728

47274729
with captured_logger("distributed.scheduler", level=logging.WARNING) as log:
47284730
fut = c.compute(z)
47294731
await wait_for_state("z", "waiting", s)
47304732

47314733
assert "Detected different `run_spec` for key 'y'" in log.getvalue()
47324734

4733-
if not add_deps: # add_deps=True hangs
4734-
async with Worker(s.address):
4735-
assert await y1 == 2
4736-
assert await fut == 3
4735+
async with Worker(s.address):
4736+
# Used old run_spec
4737+
assert await y_old == 3
4738+
assert await fut == 4
4739+
4740+
4741+
@pytest.mark.parametrize("deps", ["same", "less", "more"])
4742+
@pytest.mark.parametrize("release_previous", [False, True])
4743+
@gen_cluster(client=True)
4744+
async def test_resubmit_different_task_same_key_after_previous_is_done(
4745+
c, s, a, b, deps, release_previous
4746+
):
4747+
"""Same as test_resubmit_different_task_same_key, but now the replaced task has
4748+
already been computed and is either in memory or released, and so are its old
4749+
dependencies, so they may need to be recomputed.
4750+
"""
4751+
x1 = delayed(inc)(1, dask_key_name="x1")
4752+
x1fut = c.compute(x1)
4753+
y_old = c.submit(inc, x1fut, key="y")
4754+
z1 = c.submit(inc, y_old, key="z1")
4755+
await wait(z1)
4756+
if release_previous:
4757+
del x1fut, y_old
4758+
await wait_for_state("x1", "released", s)
4759+
await wait_for_state("y", "released", s)
4760+
4761+
x1b = x1 if deps != "less" else 2
4762+
x2 = delayed(inc)(10, dask_key_name="x2") if deps == "more" else 11
4763+
y_new = delayed(sum)([x1b, x2], dask_key_name="y")
4764+
z2 = delayed(inc)(y_new, dask_key_name="z2")
4765+
4766+
with captured_logger("distributed.scheduler", level=logging.WARNING) as log:
4767+
fut = c.compute(z2)
4768+
# Used old run_spec
4769+
assert await fut == 4
4770+
assert "x2" not in s.tasks
4771+
4772+
# _generate_taskstates won't run for a dependency that's already in memory
4773+
has_warning = "Detected different `run_spec` for key 'y'" in log.getvalue()
4774+
assert has_warning is (release_previous or deps == "less")
47374775

47384776

47394777
@gen_cluster(client=True, nthreads=[])
@@ -4801,21 +4839,17 @@ async def test_resubmit_nondeterministic_task_different_deps(c, s, add_deps):
48014839
y2 = delayed(lambda i, j: i)(x2, o, dask_key_name="y")
48024840
z = delayed(inc)(y2, dask_key_name="z")
48034841

4804-
if add_deps: # add_deps=True corrupts the state machine and hangs
4805-
s.validate = False
4806-
48074842
with captured_logger("distributed.scheduler", level=logging.WARNING) as log:
48084843
fut = c.compute(z)
48094844
await wait_for_state("z", "waiting", s)
48104845
assert "Detected different `run_spec` for key 'y'" in log.getvalue()
48114846

4812-
if not add_deps: # add_deps=True corrupts the state machine and hangs
4813-
async with Worker(s.address):
4814-
assert await fut == 3
4847+
async with Worker(s.address):
4848+
assert await fut == 3
48154849

48164850

48174851
@pytest.mark.parametrize(
4818-
"loglevel,expect_loglines", [(logging.DEBUG, 3), (logging.WARNING, 1)]
4852+
"loglevel,expect_loglines", [(logging.DEBUG, 2), (logging.WARNING, 1)]
48194853
)
48204854
@gen_cluster(client=True, nthreads=[])
48214855
async def test_resubmit_different_task_same_key_warns_only_once(
@@ -4824,23 +4858,24 @@ async def test_resubmit_different_task_same_key_warns_only_once(
48244858
"""If all tasks of a layer are affected by the same run_spec collision, warn
48254859
only once.
48264860
"""
4827-
x1s = c.map(inc, [0, 1, 2], key=[("x", 0), ("x", 1), ("x", 2)])
4861+
y1s = c.map(inc, [0, 1, 2], key=[("y", 0), ("y", 1), ("y", 2)])
48284862
dsk = {
4829-
("x", 0): 3,
4830-
("x", 1): 4,
4831-
("x", 2): 5,
4832-
("y", 0): (inc, ("x", 0)),
4833-
("y", 1): (inc, ("x", 1)),
4834-
("y", 2): (inc, ("x", 2)),
4863+
"x": 3,
4864+
("y", 0): (inc, "x"), # run_spec and dependencies change
4865+
("y", 1): (inc, 4), # run_spec changes, dependencies don't
4866+
("y", 2): (inc, 2), # Doesn't change
4867+
("z", 0): (inc, ("y", 0)),
4868+
("z", 1): (inc, ("y", 1)),
4869+
("z", 2): (inc, ("y", 2)),
48354870
}
48364871
with captured_logger("distributed.scheduler", level=loglevel) as log:
4837-
ys = c.get(dsk, [("y", 0), ("y", 1), ("y", 2)], sync=False)
4838-
await wait_for_state(("y", 2), "waiting", s)
4872+
zs = c.get(dsk, [("z", 0), ("z", 1), ("z", 2)], sync=False)
4873+
await wait_for_state(("z", 2), "waiting", s)
48394874

48404875
actual_loglines = len(
48414876
re.findall("Detected different `run_spec` for key ", log.getvalue())
48424877
)
48434878
assert actual_loglines == expect_loglines
48444879

48454880
async with Worker(s.address):
4846-
assert await c.gather(ys) == [2, 3, 4]
4881+
assert await c.gather(zs) == [2, 3, 4] # Kept old ys

0 commit comments

Comments
 (0)