Skip to content

Commit 7df4356

Browse files
Fix CI by using client.persist(collection) instead of collection.persist() (#9020)
1 parent ad6f7d9 commit 7df4356

11 files changed

+80
-142
lines changed

distributed/comm/tests/test_ws.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ async def test_collections(c, s, a, b):
125125
da = pytest.importorskip("dask.array")
126126
x = da.random.random((1000, 1000), chunks=(100, 100))
127127
x = x + x.T
128-
await x.persist()
128+
await c.persist(x)
129129

130130

131131
@gen_cluster(client=True, scheduler_kwargs={"protocol": "ws://"})

distributed/dashboard/tests/test_scheduler_bokeh.py

+11-11
Original file line numberDiff line numberDiff line change
@@ -833,9 +833,9 @@ async def test_TaskGraph(c, s, a, b):
833833
json.dumps(gp.node_source.data)
834834

835835
da = pytest.importorskip("dask.array")
836-
x = da.random.random((20, 20), chunks=(10, 10)).persist()
836+
x = c.persist(da.random.random((20, 20), chunks=(10, 10)))
837837
y = (x + x.T) - x.mean(axis=0)
838-
y = y.persist()
838+
y = c.persist(y)
839839
await wait(y)
840840

841841
gp.update()
@@ -912,12 +912,12 @@ async def test_TaskGraph_complex(c, s, a, b):
912912
da = pytest.importorskip("dask.array")
913913
gp = TaskGraph(s)
914914
x = da.random.random((2000, 2000), chunks=(1000, 1000))
915-
y = ((x + x.T) - x.mean(axis=0)).persist()
915+
y = c.persist((x + x.T) - x.mean(axis=0))
916916
await wait(y)
917917
gp.update()
918918
assert len(gp.layout.index) == len(gp.node_source.data["x"])
919919
assert len(gp.layout.index) == len(s.tasks)
920-
z = (x - y).sum().persist()
920+
z = c.persist((x - y).sum())
921921
await wait(z)
922922
gp.update()
923923
assert len(gp.layout.index) == len(gp.node_source.data["x"])
@@ -1177,10 +1177,10 @@ async def test_memory_by_key(c, s, a, b):
11771177
mbk = MemoryByKey(s)
11781178

11791179
da = pytest.importorskip("dask.array")
1180-
x = (da.random.random((20, 20), chunks=(10, 10)) + 1).persist(optimize_graph=False)
1180+
x = c.persist(da.random.random((20, 20), chunks=(10, 10)) + 1, optimize_graph=False)
11811181
await x
11821182

1183-
y = await dask.delayed(inc)(1).persist()
1183+
y = await c.persist(dask.delayed(inc)(1))
11841184

11851185
mbk.update()
11861186
assert mbk.source.data["name"] == ["add", "inc"]
@@ -1260,10 +1260,10 @@ async def test_aggregate_action(c, s, a, b):
12601260
mbk = AggregateAction(s)
12611261

12621262
da = pytest.importorskip("dask.array")
1263-
x = (da.ones((20, 20), chunks=(10, 10)) + 1).persist(optimize_graph=False)
1263+
x = c.persist(da.ones((20, 20), chunks=(10, 10)) + 1, optimize_graph=False)
12641264

12651265
await x
1266-
y = await dask.delayed(inc)(1).persist()
1266+
y = await c.persist(dask.delayed(inc)(1))
12671267
z = (x + x.T) - x.mean(axis=0)
12681268
await c.compute(z.sum())
12691269

@@ -1289,9 +1289,9 @@ async def test_compute_per_key(c, s, a, b):
12891289
mbk = ComputePerKey(s)
12901290

12911291
da = pytest.importorskip("dask.array")
1292-
x = (da.ones((20, 20), chunks=(10, 10)) + 1).persist(optimize_graph=False)
1292+
x = c.persist(da.ones((20, 20), chunks=(10, 10)) + 1, optimize_graph=False)
12931293
await x
1294-
y = await dask.delayed(inc)(1).persist()
1294+
y = await c.persist(dask.delayed(inc)(1))
12951295
z = (x + x.T) - x.mean(axis=0)
12961296
zsum = z.sum()
12971297
await c.compute(zsum)
@@ -1367,7 +1367,7 @@ async def test_shuffling(c, s, a, b):
13671367
freq="10 s",
13681368
)
13691369
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
1370-
df2 = df.shuffle("x").persist()
1370+
df2 = c.persist(df.shuffle("x"))
13711371
start = time()
13721372
while not ss.source.data["comm_written"]:
13731373
await asyncio.gather(*[a.heartbeat(), b.heartbeat()])

distributed/shuffle/tests/test_shuffle.py

+20-20
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ async def test_bad_disk(c, s, a, b):
286286
)
287287
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
288288
out = df.shuffle("x")
289-
out = out.persist()
289+
out = c.persist(out)
290290
shuffle_id = await wait_until_new_shuffle_is_initialized(s)
291291
while not get_active_shuffle_runs(a):
292292
await asyncio.sleep(0.01)
@@ -1068,7 +1068,7 @@ async def test_heartbeat(c, s, a, b):
10681068
)
10691069
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
10701070
out = df.shuffle("x")
1071-
out = out.persist()
1071+
out = c.persist(out)
10721072

10731073
while not s.plugins["shuffle"].heartbeats:
10741074
await asyncio.sleep(0.001)
@@ -1272,7 +1272,7 @@ async def test_head(c, s, a, b):
12721272
)
12731273
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
12741274
out = df.shuffle("x")
1275-
out = await out.head(compute=False).persist() # Only ask for one key
1275+
out = await c.persist(out.head(compute=False)) # Only ask for one key
12761276

12771277
assert list(os.walk(a.local_directory)) == a_files # cleaned up files?
12781278
assert list(os.walk(b.local_directory)) == b_files
@@ -1380,7 +1380,7 @@ async def test_clean_after_forgotten_early(c, s, a, b):
13801380
)
13811381
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
13821382
out = df.shuffle("x")
1383-
out = out.persist()
1383+
out = c.persist(out)
13841384
await wait_for_tasks_in_state("shuffle-transfer", "memory", 1, a)
13851385
await wait_for_tasks_in_state("shuffle-transfer", "memory", 1, b)
13861386
del out
@@ -1399,12 +1399,12 @@ async def test_tail(c, s, a, b):
13991399
)
14001400
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
14011401
x = df.shuffle("x")
1402-
full = await x.persist()
1402+
full = await c.persist(x)
14031403
ntasks_full = len(s.tasks)
14041404
del full
14051405
while s.tasks:
14061406
await asyncio.sleep(0)
1407-
partial = await x.tail(compute=False).persist() # Only ask for one key
1407+
partial = await c.persist(x.tail(compute=False)) # Only ask for one key
14081408

14091409
assert len(s.tasks) < ntasks_full
14101410
del partial
@@ -1539,7 +1539,7 @@ async def test_crashed_worker_after_shuffle_persisted(c, s, a):
15391539
)
15401540
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
15411541
out = df.shuffle("x")
1542-
out = out.persist()
1542+
out = c.persist(out)
15431543

15441544
await event.wait()
15451545
await out
@@ -1597,7 +1597,7 @@ async def test_new_worker(c, s, a, b):
15971597
)
15981598
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
15991599
shuffled = df.shuffle("x")
1600-
persisted = shuffled.persist()
1600+
persisted = c.persist(shuffled)
16011601
while not s.plugins["shuffle"].active_shuffles:
16021602
await asyncio.sleep(0.001)
16031603

@@ -1646,7 +1646,7 @@ async def test_delete_some_results(c, s, a, b):
16461646
freq="10 s",
16471647
)
16481648
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
1649-
x = df.shuffle("x").persist()
1649+
x = c.persist(df.shuffle("x"))
16501650
while not s.tasks or not any(ts.state == "memory" for ts in s.tasks.values()):
16511651
await asyncio.sleep(0.01)
16521652

@@ -1669,12 +1669,12 @@ async def test_add_some_results(c, s, a, b):
16691669
)
16701670
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
16711671
x = df.shuffle("x")
1672-
y = x.partitions[: x.npartitions // 2].persist()
1672+
y = c.persist(x.partitions[: x.npartitions // 2])
16731673

16741674
while not s.tasks or not any(ts.state == "memory" for ts in s.tasks.values()):
16751675
await asyncio.sleep(0.01)
16761676

1677-
x = x.persist()
1677+
x = c.persist(x)
16781678

16791679
await c.compute(x.size)
16801680

@@ -1697,7 +1697,7 @@ async def test_clean_after_close(c, s, a, b):
16971697

16981698
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
16991699
out = df.shuffle("x")
1700-
out = out.persist()
1700+
out = c.persist(out)
17011701

17021702
await wait_for_tasks_in_state("shuffle-transfer", "executing", 1, a)
17031703
await wait_for_tasks_in_state("shuffle-transfer", "memory", 1, b)
@@ -2079,7 +2079,7 @@ async def test_deduplicate_stale_transfer(c, s, a, b, wait_until_forgotten):
20792079
)
20802080
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
20812081
shuffled = df.shuffle("x")
2082-
shuffled = shuffled.persist()
2082+
shuffled = c.persist(shuffled)
20832083

20842084
shuffle_extA = a.plugins["shuffle"]
20852085
shuffle_extB = b.plugins["shuffle"]
@@ -2131,7 +2131,7 @@ async def test_handle_stale_barrier(c, s, a, b, wait_until_forgotten):
21312131
)
21322132
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
21332133
shuffled = df.shuffle("x")
2134-
shuffled = shuffled.persist()
2134+
shuffled = c.persist(shuffled)
21352135

21362136
shuffle_extA = a.plugins["shuffle"]
21372137
shuffle_extB = b.plugins["shuffle"]
@@ -2190,7 +2190,7 @@ async def test_shuffle_run_consistency(c, s, a):
21902190
# Initialize first shuffle execution
21912191
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
21922192
out = df.shuffle("x")
2193-
out = out.persist()
2193+
out = c.persist(out)
21942194

21952195
shuffle_id = await wait_until_new_shuffle_is_initialized(s)
21962196
spec = scheduler_ext.get(shuffle_id, a.worker_address)["run_spec"].data
@@ -2214,7 +2214,7 @@ async def test_shuffle_run_consistency(c, s, a):
22142214
# Initialize second shuffle execution
22152215
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
22162216
out = df.shuffle("x")
2217-
out = out.persist()
2217+
out = c.persist(out)
22182218

22192219
new_shuffle_id = await wait_until_new_shuffle_is_initialized(s)
22202220
assert shuffle_id == new_shuffle_id
@@ -2241,7 +2241,7 @@ async def test_shuffle_run_consistency(c, s, a):
22412241
# Create an unrelated shuffle on a different column
22422242
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
22432243
out = df.shuffle("y")
2244-
out = out.persist()
2244+
out = c.persist(out)
22452245
independent_shuffle_id = await wait_until_new_shuffle_is_initialized(s)
22462246
assert shuffle_id != independent_shuffle_id
22472247

@@ -2284,7 +2284,7 @@ async def test_fail_fetch_race(c, s, a):
22842284
)
22852285
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
22862286
out = df.shuffle("x")
2287-
out = out.persist()
2287+
out = c.persist(out)
22882288

22892289
shuffle_id = await wait_until_new_shuffle_is_initialized(s)
22902290
spec = scheduler_ext.get(shuffle_id, a.worker_address)["run_spec"].data
@@ -2365,7 +2365,7 @@ async def test_replace_stale_shuffle(c, s, a, b):
23652365
# Initialize first shuffle execution
23662366
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
23672367
out = df.shuffle("x")
2368-
out = out.persist()
2368+
out = c.persist(out)
23692369

23702370
shuffle_id = await wait_until_new_shuffle_is_initialized(s)
23712371

@@ -2391,7 +2391,7 @@ async def test_replace_stale_shuffle(c, s, a, b):
23912391
# Initialize second shuffle execution
23922392
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
23932393
out = df.shuffle("x")
2394-
out = out.persist()
2394+
out = c.persist(out)
23952395

23962396
await wait_for_tasks_in_state("shuffle-transfer", "memory", 1, a)
23972397
await run_manager_B.finished_get_shuffle_run.wait()

0 commit comments

Comments
 (0)