52
52
from tornado .ioloop import IOLoop
53
53
54
54
import dask
55
- from dask .core import get_deps , validate_key
55
+ import dask .utils
56
+ from dask .core import get_deps , iskey , validate_key
56
57
from dask .typing import Key , no_default
57
58
from dask .utils import (
58
59
ensure_dict ,
@@ -4721,6 +4722,7 @@ async def update_graph(
4721
4722
stimulus_id = stimulus_id or f"update-graph-{ start } " ,
4722
4723
)
4723
4724
except RuntimeError as e :
4725
+ logger .error (str (e ))
4724
4726
err = error_message (e )
4725
4727
for key in keys :
4726
4728
self .report (
@@ -4729,7 +4731,10 @@ async def update_graph(
4729
4731
"key" : key ,
4730
4732
"exception" : err ["exception" ],
4731
4733
"traceback" : err ["traceback" ],
4732
- }
4734
+ },
4735
+ # This informs all clients in who_wants plus the current client
4736
+ # (which may not have been added to who_wants yet)
4737
+ client = client ,
4733
4738
)
4734
4739
end = time ()
4735
4740
self .digest_metric ("update-graph-duration" , end - start )
@@ -4755,8 +4760,21 @@ def _generate_taskstates(
4755
4760
if ts is None :
4756
4761
ts = self .new_task (k , dsk .get (k ), "released" , computation = computation )
4757
4762
new_tasks .append (ts )
4758
- elif not ts .run_spec :
4763
+ # It is possible to create the TaskState object before its runspec is known
4764
+ # to the scheduler. For instance, this is possible when using a Variable:
4765
+ # `f = c.submit(foo); await Variable().set(f)` since the Variable uses a
4766
+ # different comm channel, so the `client_desires_key` message could arrive
4767
+ # before `update_graph`.
4768
+ # There are also anti-pattern processes possible;
4769
+ # see for example test_scatter_creates_ts
4770
+ elif ts .run_spec is None :
4759
4771
ts .run_spec = dsk .get (k )
4772
+ # run_spec in the submitted graph may be None. This happens
4773
+ # when an already persisted future is part of the graph
4774
+ elif k in dsk :
4775
+ # TODO run a health check to verify that run_spec and dependencies
4776
+ # did not change. See https://github.com/dask/distributed/pull/8185
4777
+ pass
4760
4778
4761
4779
if ts .run_spec :
4762
4780
runnable .append (ts )
@@ -5538,28 +5556,28 @@ def report(
5538
5556
tasks : dict = self .tasks
5539
5557
ts = tasks .get (msg_key )
5540
5558
5541
- client_comms : dict = self .client_comms
5542
- if ts is None :
5559
+ if ts is None and client is None :
5543
5560
# Notify all clients
5544
- client_keys = list (client_comms )
5545
- elif client :
5546
- # Notify clients interested in key
5547
- client_keys = [cs .client_key for cs in ts .who_wants or ()]
5561
+ client_keys = list (self .client_comms )
5562
+ elif ts is None :
5563
+ client_keys = [client ]
5548
5564
else :
5549
5565
# Notify clients interested in key (including `client`)
5566
+ # Note that, if report() was called by update_graph(), `client` won't be in
5567
+ # ts.who_wants yet.
5550
5568
client_keys = [
5551
5569
cs .client_key for cs in ts .who_wants or () if cs .client_key != client
5552
5570
]
5553
- client_keys .append (client )
5571
+ if client is not None :
5572
+ client_keys .append (client )
5554
5573
5555
- k : str
5556
5574
for k in client_keys :
5557
- c = client_comms .get (k )
5575
+ c = self . client_comms .get (k )
5558
5576
if c is None :
5559
5577
continue
5560
5578
try :
5561
5579
c .send (msg )
5562
- # logger.debug("Scheduler sends message to client %s" , msg)
5580
+ # logger.debug("Scheduler sends message to client %s: %s", k , msg)
5563
5581
except CommClosedError :
5564
5582
if self .status == Status .running :
5565
5583
logger .critical (
@@ -8724,26 +8742,28 @@ def _materialize_graph(
8724
8742
dsk2 = {}
8725
8743
fut_deps = {}
8726
8744
for k , v in dsk .items ():
8727
- dsk2 [ k ] , futs = unpack_remotedata (v , byte_keys = True )
8745
+ v , futs = unpack_remotedata (v , byte_keys = True )
8728
8746
if futs :
8729
8747
fut_deps [k ] = futs
8748
+
8749
+ # Remove aliases {x: x}.
8750
+ # FIXME: This is an artifact generated by unpack_remotedata when using persisted
8751
+ # collections. There should be a better way to achieve that tasks are not self
8752
+ # referencing themselves.
8753
+ if not iskey (v ) or v != k :
8754
+ dsk2 [k ] = v
8755
+
8730
8756
dsk = dsk2
8731
8757
8732
8758
# - Add in deps for any tasks that depend on futures
8733
8759
for k , futures in fut_deps .items ():
8734
- dependencies [k ].update (f .key for f in futures )
8760
+ dependencies [k ].update (f .key for f in futures if f . key != k )
8735
8761
8736
8762
# Remove any self-dependencies (happens on test_publish_bag() and others)
8737
8763
for k , v in dependencies .items ():
8738
8764
deps = set (v )
8739
- if k in deps :
8740
- deps .remove (k )
8765
+ deps .discard (k )
8741
8766
dependencies [k ] = deps
8742
8767
8743
- # Remove aliases
8744
- for k in list (dsk ):
8745
- if dsk [k ] is k :
8746
- del dsk [k ]
8747
8768
dsk = valmap (_normalize_task , dsk )
8748
-
8749
8769
return dsk , dependencies , annotations_by_type
0 commit comments