Skip to content

Commit 5486efa

Browse files
committed
Fix CI by using client.persist()
1 parent ad6f7d9 commit 5486efa

11 files changed

+77
-77
lines changed

distributed/comm/tests/test_ws.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ async def test_collections(c, s, a, b):
125125
da = pytest.importorskip("dask.array")
126126
x = da.random.random((1000, 1000), chunks=(100, 100))
127127
x = x + x.T
128-
await x.persist()
128+
await c.persist(x)
129129

130130

131131
@gen_cluster(client=True, scheduler_kwargs={"protocol": "ws://"})

distributed/dashboard/tests/test_scheduler_bokeh.py

+11-11
Original file line numberDiff line numberDiff line change
@@ -833,9 +833,9 @@ async def test_TaskGraph(c, s, a, b):
833833
json.dumps(gp.node_source.data)
834834

835835
da = pytest.importorskip("dask.array")
836-
x = da.random.random((20, 20), chunks=(10, 10)).persist()
836+
x = c.persist(da.random.random((20, 20), chunks=(10, 10)))
837837
y = (x + x.T) - x.mean(axis=0)
838-
y = y.persist()
838+
y = c.persist(y)
839839
await wait(y)
840840

841841
gp.update()
@@ -912,12 +912,12 @@ async def test_TaskGraph_complex(c, s, a, b):
912912
da = pytest.importorskip("dask.array")
913913
gp = TaskGraph(s)
914914
x = da.random.random((2000, 2000), chunks=(1000, 1000))
915-
y = ((x + x.T) - x.mean(axis=0)).persist()
915+
y = c.persist((x + x.T) - x.mean(axis=0))
916916
await wait(y)
917917
gp.update()
918918
assert len(gp.layout.index) == len(gp.node_source.data["x"])
919919
assert len(gp.layout.index) == len(s.tasks)
920-
z = (x - y).sum().persist()
920+
z = c.persist((x - y).sum())
921921
await wait(z)
922922
gp.update()
923923
assert len(gp.layout.index) == len(gp.node_source.data["x"])
@@ -1177,10 +1177,10 @@ async def test_memory_by_key(c, s, a, b):
11771177
mbk = MemoryByKey(s)
11781178

11791179
da = pytest.importorskip("dask.array")
1180-
x = (da.random.random((20, 20), chunks=(10, 10)) + 1).persist(optimize_graph=False)
1180+
x = c.persist(da.random.random((20, 20), chunks=(10, 10)) + 1, optimize_graph=False)
11811181
await x
11821182

1183-
y = await dask.delayed(inc)(1).persist()
1183+
y = await c.persist(dask.delayed(inc)(1))
11841184

11851185
mbk.update()
11861186
assert mbk.source.data["name"] == ["add", "inc"]
@@ -1260,10 +1260,10 @@ async def test_aggregate_action(c, s, a, b):
12601260
mbk = AggregateAction(s)
12611261

12621262
da = pytest.importorskip("dask.array")
1263-
x = (da.ones((20, 20), chunks=(10, 10)) + 1).persist(optimize_graph=False)
1263+
x = c.persist(da.ones((20, 20), chunks=(10, 10)) + 1, optimize_graph=False)
12641264

12651265
await x
1266-
y = await dask.delayed(inc)(1).persist()
1266+
y = await c.persist(dask.delayed(inc)(1))
12671267
z = (x + x.T) - x.mean(axis=0)
12681268
await c.compute(z.sum())
12691269

@@ -1289,9 +1289,9 @@ async def test_compute_per_key(c, s, a, b):
12891289
mbk = ComputePerKey(s)
12901290

12911291
da = pytest.importorskip("dask.array")
1292-
x = (da.ones((20, 20), chunks=(10, 10)) + 1).persist(optimize_graph=False)
1292+
x = c.persist(da.ones((20, 20), chunks=(10, 10)) + 1, optimize_graph=False)
12931293
await x
1294-
y = await dask.delayed(inc)(1).persist()
1294+
y = await c.persist(dask.delayed(inc)(1))
12951295
z = (x + x.T) - x.mean(axis=0)
12961296
zsum = z.sum()
12971297
await c.compute(zsum)
@@ -1367,7 +1367,7 @@ async def test_shuffling(c, s, a, b):
13671367
freq="10 s",
13681368
)
13691369
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
1370-
df2 = df.shuffle("x").persist()
1370+
df2 = c.persist(df.shuffle("x"))
13711371
start = time()
13721372
while not ss.source.data["comm_written"]:
13731373
await asyncio.gather(*[a.heartbeat(), b.heartbeat()])

distributed/shuffle/tests/test_shuffle.py

+19-19
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ async def test_bad_disk(c, s, a, b):
286286
)
287287
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
288288
out = df.shuffle("x")
289-
out = out.persist()
289+
out = c.persist(out)
290290
shuffle_id = await wait_until_new_shuffle_is_initialized(s)
291291
while not get_active_shuffle_runs(a):
292292
await asyncio.sleep(0.01)
@@ -1068,7 +1068,7 @@ async def test_heartbeat(c, s, a, b):
10681068
)
10691069
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
10701070
out = df.shuffle("x")
1071-
out = out.persist()
1071+
out = c.persist(out)
10721072

10731073
while not s.plugins["shuffle"].heartbeats:
10741074
await asyncio.sleep(0.001)
@@ -1272,7 +1272,7 @@ async def test_head(c, s, a, b):
12721272
)
12731273
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
12741274
out = df.shuffle("x")
1275-
out = await out.head(compute=False).persist() # Only ask for one key
1275+
out = await c.persist(out.head(compute=False)) # Only ask for one key
12761276

12771277
assert list(os.walk(a.local_directory)) == a_files # cleaned up files?
12781278
assert list(os.walk(b.local_directory)) == b_files
@@ -1380,7 +1380,7 @@ async def test_clean_after_forgotten_early(c, s, a, b):
13801380
)
13811381
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
13821382
out = df.shuffle("x")
1383-
out = out.persist()
1383+
out = c.persist(out)
13841384
await wait_for_tasks_in_state("shuffle-transfer", "memory", 1, a)
13851385
await wait_for_tasks_in_state("shuffle-transfer", "memory", 1, b)
13861386
del out
@@ -1399,7 +1399,7 @@ async def test_tail(c, s, a, b):
13991399
)
14001400
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
14011401
x = df.shuffle("x")
1402-
full = await x.persist()
1402+
full = await c.persist(x)
14031403
ntasks_full = len(s.tasks)
14041404
del full
14051405
while s.tasks:
@@ -1539,7 +1539,7 @@ async def test_crashed_worker_after_shuffle_persisted(c, s, a):
15391539
)
15401540
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
15411541
out = df.shuffle("x")
1542-
out = out.persist()
1542+
out = c.persist(out)
15431543

15441544
await event.wait()
15451545
await out
@@ -1597,7 +1597,7 @@ async def test_new_worker(c, s, a, b):
15971597
)
15981598
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
15991599
shuffled = df.shuffle("x")
1600-
persisted = shuffled.persist()
1600+
persisted = c.persist(shuffled)
16011601
while not s.plugins["shuffle"].active_shuffles:
16021602
await asyncio.sleep(0.001)
16031603

@@ -1646,7 +1646,7 @@ async def test_delete_some_results(c, s, a, b):
16461646
freq="10 s",
16471647
)
16481648
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
1649-
x = df.shuffle("x").persist()
1649+
x = c.persist(df.shuffle("x"))
16501650
while not s.tasks or not any(ts.state == "memory" for ts in s.tasks.values()):
16511651
await asyncio.sleep(0.01)
16521652

@@ -1669,12 +1669,12 @@ async def test_add_some_results(c, s, a, b):
16691669
)
16701670
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
16711671
x = df.shuffle("x")
1672-
y = x.partitions[: x.npartitions // 2].persist()
1672+
y = c.persist(x.partitions[: x.npartitions // 2])
16731673

16741674
while not s.tasks or not any(ts.state == "memory" for ts in s.tasks.values()):
16751675
await asyncio.sleep(0.01)
16761676

1677-
x = x.persist()
1677+
x = c.persist(x)
16781678

16791679
await c.compute(x.size)
16801680

@@ -1697,7 +1697,7 @@ async def test_clean_after_close(c, s, a, b):
16971697

16981698
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
16991699
out = df.shuffle("x")
1700-
out = out.persist()
1700+
out = c.persist(out)
17011701

17021702
await wait_for_tasks_in_state("shuffle-transfer", "executing", 1, a)
17031703
await wait_for_tasks_in_state("shuffle-transfer", "memory", 1, b)
@@ -2079,7 +2079,7 @@ async def test_deduplicate_stale_transfer(c, s, a, b, wait_until_forgotten):
20792079
)
20802080
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
20812081
shuffled = df.shuffle("x")
2082-
shuffled = shuffled.persist()
2082+
shuffled = c.persist(shuffled)
20832083

20842084
shuffle_extA = a.plugins["shuffle"]
20852085
shuffle_extB = b.plugins["shuffle"]
@@ -2131,7 +2131,7 @@ async def test_handle_stale_barrier(c, s, a, b, wait_until_forgotten):
21312131
)
21322132
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
21332133
shuffled = df.shuffle("x")
2134-
shuffled = shuffled.persist()
2134+
shuffled = c.persist(shuffled)
21352135

21362136
shuffle_extA = a.plugins["shuffle"]
21372137
shuffle_extB = b.plugins["shuffle"]
@@ -2190,7 +2190,7 @@ async def test_shuffle_run_consistency(c, s, a):
21902190
# Initialize first shuffle execution
21912191
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
21922192
out = df.shuffle("x")
2193-
out = out.persist()
2193+
out = c.persist(out)
21942194

21952195
shuffle_id = await wait_until_new_shuffle_is_initialized(s)
21962196
spec = scheduler_ext.get(shuffle_id, a.worker_address)["run_spec"].data
@@ -2214,7 +2214,7 @@ async def test_shuffle_run_consistency(c, s, a):
22142214
# Initialize second shuffle execution
22152215
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
22162216
out = df.shuffle("x")
2217-
out = out.persist()
2217+
out = c.persist(out)
22182218

22192219
new_shuffle_id = await wait_until_new_shuffle_is_initialized(s)
22202220
assert shuffle_id == new_shuffle_id
@@ -2241,7 +2241,7 @@ async def test_shuffle_run_consistency(c, s, a):
22412241
# Create an unrelated shuffle on a different column
22422242
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
22432243
out = df.shuffle("y")
2244-
out = out.persist()
2244+
out = c.persist(out)
22452245
independent_shuffle_id = await wait_until_new_shuffle_is_initialized(s)
22462246
assert shuffle_id != independent_shuffle_id
22472247

@@ -2284,7 +2284,7 @@ async def test_fail_fetch_race(c, s, a):
22842284
)
22852285
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
22862286
out = df.shuffle("x")
2287-
out = out.persist()
2287+
out = c.persist(out)
22882288

22892289
shuffle_id = await wait_until_new_shuffle_is_initialized(s)
22902290
spec = scheduler_ext.get(shuffle_id, a.worker_address)["run_spec"].data
@@ -2365,7 +2365,7 @@ async def test_replace_stale_shuffle(c, s, a, b):
23652365
# Initialize first shuffle execution
23662366
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
23672367
out = df.shuffle("x")
2368-
out = out.persist()
2368+
out = c.persist(out)
23692369

23702370
shuffle_id = await wait_until_new_shuffle_is_initialized(s)
23712371

@@ -2391,7 +2391,7 @@ async def test_replace_stale_shuffle(c, s, a, b):
23912391
# Initialize second shuffle execution
23922392
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
23932393
out = df.shuffle("x")
2394-
out = out.persist()
2394+
out = c.persist(out)
23952395

23962396
await wait_for_tasks_in_state("shuffle-transfer", "memory", 1, a)
23972397
await run_manager_B.finished_get_shuffle_run.wait()

distributed/tests/test_client.py

+17-17
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ async def test_future_repr(c, s, a, b):
401401
async def test_future_tuple_repr(c, s, a, b):
402402
pytest.importorskip("numpy")
403403
da = pytest.importorskip("dask.array")
404-
y = da.arange(10, chunks=(5,)).persist()
404+
y = c.persist(da.arange(10, chunks=(5,)))
405405
f = futures_of(y)[0]
406406
for func in [repr, lambda x: x._repr_html_()]:
407407
for k in f.key:
@@ -4808,7 +4808,7 @@ async def test_restart_workers(c, s, a, b):
48084808
# Persist futures and perform a computation
48094809
size = 100
48104810
x = da.ones(size, chunks=10)
4811-
x = x.persist()
4811+
x = c.persist(x)
48124812
assert await c.compute(x.sum()) == size
48134813

48144814
# Restart a single worker
@@ -5178,7 +5178,7 @@ async def test_serialize_collections(c, s, a, b):
51785178
pytest.importorskip("numpy")
51795179
da = pytest.importorskip("dask.array")
51805180

5181-
x = da.arange(10, chunks=(5,)).persist()
5181+
x = c.persist(da.arange(10, chunks=(5,)))
51825182

51835183
def f(x):
51845184
assert isinstance(x, da.Array)
@@ -5360,7 +5360,7 @@ async def test_serialize_collections_of_futures(c, s, a, b):
53605360
from dask.dataframe.utils import assert_eq
53615361

53625362
df = pd.DataFrame({"x": [1, 2, 3]})
5363-
ddf = dd.from_pandas(df, npartitions=2).persist()
5363+
ddf = c.persist(dd.from_pandas(df, npartitions=2))
53645364
future = await c.scatter(ddf)
53655365

53665366
ddf2 = await future
@@ -5509,7 +5509,7 @@ async def test_call_stack_collections(c, s, a, b):
55095509
pytest.importorskip("numpy")
55105510
da = pytest.importorskip("dask.array")
55115511

5512-
x = da.random.random(100, chunks=(10,)).map_blocks(slowinc, delay=0.5).persist()
5512+
x = c.persist(da.random.random(100, chunks=(10,)).map_blocks(slowinc, delay=0.5))
55135513
while not a.state.executing_count and not b.state.executing_count:
55145514
await asyncio.sleep(0.001)
55155515
result = await c.call_stack(x)
@@ -5521,7 +5521,7 @@ async def test_call_stack_collections_all(c, s, a, b):
55215521
pytest.importorskip("numpy")
55225522
da = pytest.importorskip("dask.array")
55235523

5524-
x = da.random.random(100, chunks=(10,)).map_blocks(slowinc, delay=0.5).persist()
5524+
x = c.persist(da.random.random(100, chunks=(10,)).map_blocks(slowinc, delay=0.5))
55255525
while not a.state.executing_count and not b.state.executing_count:
55265526
await asyncio.sleep(0.001)
55275527
result = await c.call_stack()
@@ -6179,7 +6179,7 @@ async def test_get_mix_futures_and_SubgraphCallable_dask_dataframe(c, s, a, b):
61796179
dd = pytest.importorskip("dask.dataframe")
61806180

61816181
df = pd.DataFrame({"x": range(1, 11)})
6182-
ddf = dd.from_pandas(df, npartitions=2).persist()
6182+
ddf = c.persist(dd.from_pandas(df, npartitions=2))
61836183
ddf = ddf.map_partitions(lambda x: x)
61846184
ddf["x"] = ddf["x"].astype("f8")
61856185
ddf = ddf.map_partitions(lambda x: x)
@@ -6239,10 +6239,10 @@ async def test_file_descriptors_dont_leak(Worker):
62396239
async with (
62406240
Worker(s.address),
62416241
Worker(s.address),
6242-
Client(s.address, asynchronous=True),
6242+
Client(s.address, asynchronous=True) as c,
62436243
):
62446244
assert proc.num_fds() > before
6245-
await df.sum().persist()
6245+
await c.persist(df.sum())
62466246

62476247
start = time()
62486248
while proc.num_fds() > before:
@@ -6340,7 +6340,7 @@ async def test_config_inherited_by_subprocess():
63406340

63416341
@gen_cluster(client=True)
63426342
async def test_futures_of_sorted(c, s, a, b):
6343-
b = dask.bag.from_sequence(range(10), npartitions=5).persist()
6343+
b = c.persist(dask.bag.from_sequence(range(10), npartitions=5))
63446344
futures = futures_of(b)
63456345
assert [fut.key for fut in futures] == [k for k in b.__dask_keys__()]
63466346

@@ -6767,7 +6767,7 @@ async def test_annotations_task_state(c, s, a, b):
67676767
x = da.ones(10, chunks=(5,))
67686768

67696769
with dask.config.set(optimization__fuse__active=False):
6770-
x = await x.persist()
6770+
x = await c.persist(x)
67716771

67726772
for ts in s.tasks.values():
67736773
assert ts.annotations["qux"] == "bar"
@@ -6822,7 +6822,7 @@ async def test_annotations_priorities(c, s, a, b):
68226822
x = da.ones(10, chunks=(5,))
68236823

68246824
with dask.config.set(optimization__fuse__active=False):
6825-
x = await x.persist()
6825+
x = await c.persist(x)
68266826

68276827
for ts in s.tasks.values():
68286828
assert ts.priority[0] == -15
@@ -6838,7 +6838,7 @@ async def test_annotations_workers(c, s, a, b):
68386838
x = da.ones(10, chunks=(5,))
68396839

68406840
with dask.config.set(optimization__fuse__active=False):
6841-
x = await x.persist()
6841+
x = await c.persist(x)
68426842

68436843
for ts in s.tasks.values():
68446844
assert ts.annotations["workers"] == [a.address]
@@ -6857,7 +6857,7 @@ async def test_annotations_retries(c, s, a, b):
68576857
x = da.ones(10, chunks=(5,))
68586858

68596859
with dask.config.set(optimization__fuse__active=False):
6860-
x = await x.persist()
6860+
x = await c.persist(x)
68616861

68626862
for ts in s.tasks.values():
68636863
assert ts.retries == 2
@@ -6910,7 +6910,7 @@ async def test_annotations_resources(c, s, a, b):
69106910
x = da.ones(10, chunks=(5,))
69116911

69126912
with dask.config.set(optimization__fuse__active=False):
6913-
x = await x.persist()
6913+
x = await c.persist(x)
69146914

69156915
for ts in s.tasks.values():
69166916
assert ts.resource_restrictions == {"GPU": 1}
@@ -6949,7 +6949,7 @@ async def test_annotations_loose_restrictions(c, s, a, b):
69496949
x = da.ones(10, chunks=(5,))
69506950

69516951
with dask.config.set(optimization__fuse__active=False):
6952-
x = await x.persist()
6952+
x = await c.persist(x)
69536953

69546954
for ts in s.tasks.values():
69556955
assert not ts.worker_restrictions
@@ -7229,7 +7229,7 @@ async def test_computation_object_code_dask_persist(c, s, a, b):
72297229
da = pytest.importorskip("dask.array")
72307230

72317231
x = da.ones((10, 10), chunks=(3, 3))
7232-
future = x.sum().persist()
7232+
future = c.persist(x.sum())
72337233
await future
72347234

72357235
test_function_code = inspect.getsource(

0 commit comments

Comments
 (0)