Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove expensive tokenization for key uniqueness check #9009

Merged
merged 3 commits into from
Feb 13, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 2 additions & 16 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import dask
import dask.utils
from dask._task_spec import DependenciesMapping, GraphNode, convert_legacy_graph
from dask.base import TokenizationError, normalize_token, tokenize
from dask.core import istask, validate_key
from dask.typing import Key, no_default
from dask.utils import (
Expand Down Expand Up @@ -4985,25 +4984,14 @@ def _generate_taskstates(
# run_spec in the submitted graph may be None. This happens
# when an already persisted future is part of the graph
elif k in dsk:
# If both tokens are non-deterministic, skip comparison
try:
tok_lhs = tokenize(ts.run_spec, ensure_deterministic=True)
except TokenizationError:
tok_lhs = ""
try:
tok_rhs = tokenize(dsk[k], ensure_deterministic=True)
except TokenizationError:
tok_rhs = ""

# Additionally check dependency names. This should only be necessary
# if run_specs can't be tokenized deterministically.
# Check dependency names.
deps_lhs = {dts.key for dts in ts.dependencies}
deps_rhs = dependencies[k]

# FIXME It would be a really healthy idea to change this to a hard
# failure. However, this is not possible at the moment because of
# https://github.com/dask/dask/issues/9888
if tok_lhs != tok_rhs or deps_lhs != deps_rhs:
if deps_lhs != deps_rhs:
# Retain old run_spec and dependencies; rerun them if necessary.
# This sweeps the issue of collision under the carpet as long as the
# old and new task produce the same output - such as in
Expand All @@ -5029,8 +5017,6 @@ def _generate_taskstates(
old task state: {ts.state}
old run_spec: {ts.run_spec!r}
new run_spec: {dsk[k]!r}
old token: {normalize_token(ts.run_spec)!r}
new token: {normalize_token(dsk[k])!r}
old dependencies: {deps_lhs}
new dependencies: {deps_rhs}
"""
Expand Down
Loading