Skip to content

Commit ba5b65b

Browse files
committed
Remove SubgraphCallable
1 parent 7f14edf commit ba5b65b

File tree

4 files changed

+0
-89
lines changed

4 files changed

+0
-89
lines changed

distributed/client.py

-3
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
from dask.core import flatten, validate_key
4646
from dask.highlevelgraph import HighLevelGraph
4747
from dask.layers import Layer
48-
from dask.optimization import SubgraphCallable
4948
from dask.tokenize import tokenize
5049
from dask.typing import Key, NestedKeys, NoDefault, no_default
5150
from dask.utils import (
@@ -6120,8 +6119,6 @@ def futures_of(o, client=None):
61206119
stack.extend(x)
61216120
elif type(x) is dict:
61226121
stack.extend(x.values())
6123-
elif type(x) is SubgraphCallable:
6124-
stack.extend(x.dsk.values())
61256122
elif isinstance(x, TaskRef):
61266123
if x not in seen:
61276124
seen.add(x)

distributed/tests/test_client.py

-45
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import dask
4343
import dask.bag as db
4444
from dask import delayed
45-
from dask.optimization import SubgraphCallable
4645
from dask.tokenize import tokenize
4746
from dask.utils import get_default_shuffle_method, parse_timedelta, tmpfile
4847

@@ -2626,13 +2625,6 @@ async def test_futures_of_get(c, s, a, b):
26262625
b = db.Bag({("b", i): f for i, f in enumerate([x, y, z])}, "b", 3)
26272626
assert set(futures_of(b)) == {x, y, z}
26282627

2629-
sg = SubgraphCallable(
2630-
{"x": x, "y": y, "z": z, "out": (add, (add, (add, x, y), z), "in")},
2631-
"out",
2632-
("in",),
2633-
)
2634-
assert set(futures_of(sg)) == {x, y, z}
2635-
26362628

26372629
def test_futures_of_class():
26382630
pytest.importorskip("numpy")
@@ -6191,43 +6183,6 @@ async def test_profile_bokeh(c, s, a, b):
61916183
assert os.path.exists(fn)
61926184

61936185

6194-
@gen_cluster(client=True, nthreads=[("", 1)])
6195-
async def test_get_mix_futures_and_SubgraphCallable(c, s, a):
6196-
future = c.submit(add, 1, 2)
6197-
6198-
subgraph = SubgraphCallable(
6199-
{"_2": (add, "_0", "_1"), "_3": (add, future, "_2")},
6200-
"_3",
6201-
("_0", "_1"),
6202-
)
6203-
dsk = {
6204-
"a": 1,
6205-
"b": 2,
6206-
"c": (subgraph, "a", "b"),
6207-
"d": (subgraph, "c", "b"),
6208-
}
6209-
6210-
future2 = c.get(dsk, "d", sync=False)
6211-
result = await future2
6212-
assert result == 11
6213-
6214-
# Nested subgraphs
6215-
subgraph2 = SubgraphCallable(
6216-
{
6217-
"_2": (subgraph, "_0", "_1"),
6218-
"_3": (subgraph, "_2", "_1"),
6219-
"_4": (add, "_3", future2),
6220-
},
6221-
"_4",
6222-
("_0", "_1"),
6223-
)
6224-
6225-
dsk2 = {"e": 1, "f": 2, "g": (subgraph2, "e", "f")}
6226-
6227-
result = await c.get(dsk2, "g", sync=False)
6228-
assert result == 22
6229-
6230-
62316186
@gen_cluster(client=True)
62326187
async def test_get_mix_futures_and_SubgraphCallable_dask_dataframe(c, s, a, b):
62336188
pd = pytest.importorskip("pandas")

distributed/tests/test_utils_comm.py

-16
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import pytest
88

99
from dask._task_spec import TaskRef
10-
from dask.optimization import SubgraphCallable
1110

1211
from distributed import wait
1312
from distributed.compatibility import asyncio_run
@@ -246,18 +245,3 @@ def assert_eq(keys1: set[TaskRef], keys2: set[TaskRef]) -> None:
246245
res, keys = unpack_remotedata(TaskRef("mykey"))
247246
assert res == "mykey"
248247
assert_eq(keys, {TaskRef("mykey")})
249-
250-
# Check unpack of SC that contains a wrapped key
251-
sc = SubgraphCallable({"key": (TaskRef("data"),)}, outkey="key", inkeys=["arg1"])
252-
dsk = (sc, "arg1")
253-
res, keys = unpack_remotedata(dsk)
254-
assert res[0] != sc # Notice, the first item (the SC) has been changed
255-
assert res[1:] == ("arg1", "data")
256-
assert_eq(keys, {TaskRef("data")})
257-
258-
# Check unpack of SC when it takes a wrapped key as argument
259-
sc = SubgraphCallable({"key": ("arg1",)}, outkey="key", inkeys=[TaskRef("arg1")])
260-
dsk = (sc, "arg1")
261-
res, keys = unpack_remotedata(dsk)
262-
assert res == (sc, "arg1") # Notice, the first item (the SC) has NOT been changed
263-
assert_eq(keys, set())

distributed/utils_comm.py

-25
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313

1414
import dask.config
1515
from dask._task_spec import TaskRef
16-
from dask.optimization import SubgraphCallable
1716
from dask.typing import Key
1817
from dask.utils import is_namedtuple_instance, parse_timedelta
1918

@@ -197,30 +196,6 @@ def _unpack_remotedata_inner(
197196
if typ is tuple:
198197
if not o:
199198
return o
200-
if type(o[0]) is SubgraphCallable:
201-
# Unpack futures within the arguments of the subgraph callable
202-
futures: set[TaskRef] = set()
203-
args = tuple(_unpack_remotedata_inner(i, byte_keys, futures) for i in o[1:])
204-
found_futures.update(futures)
205-
206-
# Unpack futures within the subgraph callable itself
207-
sc: SubgraphCallable = o[0]
208-
futures = set()
209-
dsk = {
210-
k: _unpack_remotedata_inner(v, byte_keys, futures)
211-
for k, v in sc.dsk.items()
212-
}
213-
future_keys: tuple = ()
214-
if futures: # If no futures is in the subgraph, we just use `sc` as-is
215-
found_futures.update(futures)
216-
future_keys = (
217-
tuple(f.key for f in futures)
218-
if byte_keys
219-
else tuple(f.key for f in futures)
220-
)
221-
inkeys = tuple(sc.inkeys) + future_keys
222-
sc = SubgraphCallable(dsk, sc.outkey, inkeys, sc.name)
223-
return (sc,) + args + future_keys
224199
else:
225200
return tuple(
226201
_unpack_remotedata_inner(item, byte_keys, found_futures) for item in o

0 commit comments

Comments
 (0)