Skip to content

Commit d156473

Browse files
phoflfjetter
andauthored
Remove expensive tokenization for key uniqueness check (#9009)
* Remove expensive tokenization for key uniqueness check * Remove no longer relevant tests --------- Co-authored-by: Florian Jetter <[email protected]>
1 parent 33b229a commit d156473

File tree

2 files changed

+2
-190
lines changed

2 files changed

+2
-190
lines changed

distributed/scheduler.py

+2-16
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
import dask
5656
import dask.utils
5757
from dask._task_spec import DependenciesMapping, GraphNode, convert_legacy_graph
58-
from dask.base import TokenizationError, normalize_token, tokenize
5958
from dask.core import istask, validate_key
6059
from dask.typing import Key, no_default
6160
from dask.utils import (
@@ -4985,25 +4984,14 @@ def _generate_taskstates(
49854984
# run_spec in the submitted graph may be None. This happens
49864985
# when an already persisted future is part of the graph
49874986
elif k in dsk:
4988-
# If both tokens are non-deterministic, skip comparison
4989-
try:
4990-
tok_lhs = tokenize(ts.run_spec, ensure_deterministic=True)
4991-
except TokenizationError:
4992-
tok_lhs = ""
4993-
try:
4994-
tok_rhs = tokenize(dsk[k], ensure_deterministic=True)
4995-
except TokenizationError:
4996-
tok_rhs = ""
4997-
4998-
# Additionally check dependency names. This should only be necessary
4999-
# if run_specs can't be tokenized deterministically.
4987+
# Check dependency names.
50004988
deps_lhs = {dts.key for dts in ts.dependencies}
50014989
deps_rhs = dependencies[k]
50024990

50034991
# FIXME It would be a really healthy idea to change this to a hard
50044992
# failure. However, this is not possible at the moment because of
50054993
# https://github.com/dask/dask/issues/9888
5006-
if tok_lhs != tok_rhs or deps_lhs != deps_rhs:
4994+
if deps_lhs != deps_rhs:
50074995
# Retain old run_spec and dependencies; rerun them if necessary.
50084996
# This sweeps the issue of collision under the carpet as long as the
50094997
# old and new task produce the same output - such as in
@@ -5029,8 +5017,6 @@ def _generate_taskstates(
50295017
old task state: {ts.state}
50305018
old run_spec: {ts.run_spec!r}
50315019
new run_spec: {dsk[k]!r}
5032-
old token: {normalize_token(ts.run_spec)!r}
5033-
new token: {normalize_token(dsk[k])!r}
50345020
old dependencies: {deps_lhs}
50355021
new dependencies: {deps_rhs}
50365022
"""

distributed/tests/test_scheduler.py

-174
Original file line numberDiff line numberDiff line change
@@ -4946,147 +4946,6 @@ async def test_html_repr(c, s, a, b):
49464946
await f
49474947

49484948

4949-
@pytest.mark.parametrize("deps", ["same", "less", "more"])
4950-
@gen_cluster(client=True, nthreads=[])
4951-
async def test_resubmit_different_task_same_key_before_previous_is_done(c, s, deps):
4952-
"""If an intermediate key has a different run_spec (either the callable function or
4953-
the dependencies / arguments) that will conflict with what was previously defined,
4954-
it should raise an error since this can otherwise break in many different places and
4955-
cause either spurious exceptions or even deadlocks.
4956-
4957-
In this specific test, the previous run_spec has not been computed yet.
4958-
See also test_resubmit_different_task_same_key_after_previous_is_done.
4959-
4960-
For a real world example where this can trigger, see
4961-
https://github.com/dask/dask/issues/9888
4962-
"""
4963-
seen = False
4964-
4965-
def _match(event):
4966-
_, msg = event
4967-
return (
4968-
isinstance(msg, dict)
4969-
and msg.get("action", None) == "update-graph"
4970-
and msg["metrics"]["key_collisions"] > 0
4971-
)
4972-
4973-
def handler(ev):
4974-
if _match(ev):
4975-
nonlocal seen
4976-
seen = True
4977-
4978-
c.subscribe_topic("scheduler", handler)
4979-
4980-
x1 = c.submit(inc, 1, key="x1")
4981-
y_old = c.submit(inc, x1, key="y")
4982-
4983-
x1b = x1 if deps != "less" else 2
4984-
x2 = delayed(inc)(10, dask_key_name="x2") if deps == "more" else 11
4985-
y_new = delayed(sum)([x1b, x2], dask_key_name="y")
4986-
z = delayed(inc)(y_new, dask_key_name="z")
4987-
4988-
with captured_logger("distributed.scheduler", level=logging.WARNING) as log:
4989-
fut = c.compute(z)
4990-
await wait_for_state("z", "waiting", s)
4991-
4992-
assert "Detected different `run_spec` for key 'y'" in log.getvalue()
4993-
4994-
await async_poll_for(lambda: seen, timeout=5)
4995-
4996-
async with Worker(s.address):
4997-
# Used old run_spec
4998-
assert await y_old == 3
4999-
assert await fut == 4
5000-
5001-
5002-
@pytest.mark.parametrize("deps", ["same", "less", "more"])
5003-
@pytest.mark.parametrize("release_previous", [False, True])
5004-
@gen_cluster(client=True)
5005-
async def test_resubmit_different_task_same_key_after_previous_is_done(
5006-
c, s, a, b, deps, release_previous
5007-
):
5008-
"""Same as test_resubmit_different_task_same_key, but now the replaced task has
5009-
already been computed and is either in memory or released, and so are its old
5010-
dependencies, so they may need to be recomputed.
5011-
"""
5012-
x1 = delayed(inc)(1, dask_key_name="x1")
5013-
x1fut = c.compute(x1)
5014-
y_old = c.submit(inc, x1fut, key="y")
5015-
z1 = c.submit(inc, y_old, key="z1")
5016-
await wait(z1)
5017-
if release_previous:
5018-
del x1fut, y_old
5019-
await wait_for_state("x1", "released", s)
5020-
await wait_for_state("y", "released", s)
5021-
5022-
x1b = x1 if deps != "less" else 2
5023-
x2 = delayed(inc)(10, dask_key_name="x2") if deps == "more" else 11
5024-
y_new = delayed(sum)([x1b, x2], dask_key_name="y")
5025-
z2 = delayed(inc)(y_new, dask_key_name="z2")
5026-
5027-
with captured_logger("distributed.scheduler", level=logging.WARNING) as log:
5028-
fut = c.compute(z2)
5029-
# Used old run_spec
5030-
assert await fut == 4
5031-
assert "x2" not in s.tasks
5032-
5033-
# _generate_taskstates won't run for a dependency that's already in memory
5034-
has_warning = "Detected different `run_spec` for key 'y'" in log.getvalue()
5035-
assert has_warning is (release_previous or deps == "less")
5036-
5037-
5038-
@gen_cluster(client=True, nthreads=[])
5039-
async def test_resubmit_different_task_same_key_many_clients(c, s):
5040-
"""Two different clients submit a task with the same key but different run_spec's."""
5041-
async with Client(s.address, asynchronous=True) as c2:
5042-
with captured_logger("distributed.scheduler", level=logging.WARNING) as log:
5043-
x1 = c.submit(inc, 1, key="x")
5044-
x2 = c2.submit(inc, 2, key="x")
5045-
5046-
await wait_for_state("x", ("no-worker", "queued"), s)
5047-
who_wants = s.tasks["x"].who_wants
5048-
await async_poll_for(
5049-
lambda: {cs.client_key for cs in who_wants} == {c.id, c2.id}, timeout=5
5050-
)
5051-
5052-
assert "Detected different `run_spec` for key 'x'" in log.getvalue()
5053-
5054-
async with Worker(s.address):
5055-
assert await x1 == 2
5056-
assert await x2 == 2 # kept old run_spec
5057-
5058-
5059-
@pytest.mark.parametrize(
5060-
"before,after,expect_msg",
5061-
[
5062-
(object(), 123, True),
5063-
(123, object(), True),
5064-
(o := object(), o, False),
5065-
],
5066-
)
5067-
@gen_cluster(client=True, nthreads=[])
5068-
async def test_resubmit_nondeterministic_task_same_deps(
5069-
c, s, before, after, expect_msg
5070-
):
5071-
"""Some run_specs can't be tokenized deterministically. Silently skip comparison on
5072-
the run_spec when both lhs and rhs are nondeterministic.
5073-
Dependencies must be the same.
5074-
"""
5075-
x1 = c.submit(lambda x: x, before, key="x")
5076-
x2 = delayed(lambda x: x)(after, dask_key_name="x")
5077-
y = delayed(lambda x: x)(x2, dask_key_name="y")
5078-
5079-
with captured_logger("distributed.scheduler", level=logging.WARNING) as log:
5080-
fut = c.compute(y)
5081-
await async_poll_for(lambda: "y" in s.tasks, timeout=5)
5082-
5083-
has_msg = "Detected different `run_spec` for key 'x'" in log.getvalue()
5084-
assert has_msg == expect_msg
5085-
5086-
async with Worker(s.address):
5087-
assert type(await fut) is type(before)
5088-
5089-
50904949
@pytest.mark.parametrize("add_deps", [False, True])
50914950
@gen_cluster(client=True, nthreads=[])
50924951
async def test_resubmit_nondeterministic_task_different_deps(c, s, add_deps):
@@ -5109,39 +4968,6 @@ async def test_resubmit_nondeterministic_task_different_deps(c, s, add_deps):
51094968
assert await fut == 3
51104969

51114970

5112-
@pytest.mark.parametrize(
5113-
"loglevel,expect_loglines", [(logging.DEBUG, 2), (logging.WARNING, 1)]
5114-
)
5115-
@gen_cluster(client=True, nthreads=[])
5116-
async def test_resubmit_different_task_same_key_warns_only_once(
5117-
c, s, loglevel, expect_loglines
5118-
):
5119-
"""If all tasks of a layer are affected by the same run_spec collision, warn
5120-
only once.
5121-
"""
5122-
y1s = c.map(inc, [0, 1, 2], key=[("y", 0), ("y", 1), ("y", 2)])
5123-
dsk = {
5124-
"x": 3,
5125-
("y", 0): (inc, "x"), # run_spec and dependencies change
5126-
("y", 1): (inc, 4), # run_spec changes, dependencies don't
5127-
("y", 2): (inc, 2), # Doesn't change
5128-
("z", 0): (inc, ("y", 0)),
5129-
("z", 1): (inc, ("y", 1)),
5130-
("z", 2): (inc, ("y", 2)),
5131-
}
5132-
with captured_logger("distributed.scheduler", level=loglevel) as log:
5133-
zs = c.get(dsk, [("z", 0), ("z", 1), ("z", 2)], sync=False)
5134-
await wait_for_state(("z", 2), "waiting", s)
5135-
5136-
actual_loglines = len(
5137-
re.findall("Detected different `run_spec` for key ", log.getvalue())
5138-
)
5139-
assert actual_loglines == expect_loglines
5140-
5141-
async with Worker(s.address):
5142-
assert await c.gather(zs) == [2, 3, 4] # Kept old ys
5143-
5144-
51454971
def block(x, in_event, block_event):
51464972
in_event.set()
51474973
block_event.wait()

0 commit comments

Comments
 (0)