Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix CI by using client.persist(collection) instead of collection.persist() #9020

Merged
merged 4 commits into from
Mar 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion distributed/comm/tests/test_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ async def test_collections(c, s, a, b):
da = pytest.importorskip("dask.array")
x = da.random.random((1000, 1000), chunks=(100, 100))
x = x + x.T
await x.persist()
await c.persist(x)


@gen_cluster(client=True, scheduler_kwargs={"protocol": "ws://"})
Expand Down
22 changes: 11 additions & 11 deletions distributed/dashboard/tests/test_scheduler_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -833,9 +833,9 @@ async def test_TaskGraph(c, s, a, b):
json.dumps(gp.node_source.data)

da = pytest.importorskip("dask.array")
x = da.random.random((20, 20), chunks=(10, 10)).persist()
x = c.persist(da.random.random((20, 20), chunks=(10, 10)))
y = (x + x.T) - x.mean(axis=0)
y = y.persist()
y = c.persist(y)
await wait(y)

gp.update()
Expand Down Expand Up @@ -912,12 +912,12 @@ async def test_TaskGraph_complex(c, s, a, b):
da = pytest.importorskip("dask.array")
gp = TaskGraph(s)
x = da.random.random((2000, 2000), chunks=(1000, 1000))
y = ((x + x.T) - x.mean(axis=0)).persist()
y = c.persist((x + x.T) - x.mean(axis=0))
await wait(y)
gp.update()
assert len(gp.layout.index) == len(gp.node_source.data["x"])
assert len(gp.layout.index) == len(s.tasks)
z = (x - y).sum().persist()
z = c.persist((x - y).sum())
await wait(z)
gp.update()
assert len(gp.layout.index) == len(gp.node_source.data["x"])
Expand Down Expand Up @@ -1177,10 +1177,10 @@ async def test_memory_by_key(c, s, a, b):
mbk = MemoryByKey(s)

da = pytest.importorskip("dask.array")
x = (da.random.random((20, 20), chunks=(10, 10)) + 1).persist(optimize_graph=False)
x = c.persist(da.random.random((20, 20), chunks=(10, 10)) + 1, optimize_graph=False)
await x

y = await dask.delayed(inc)(1).persist()
y = await c.persist(dask.delayed(inc)(1))

mbk.update()
assert mbk.source.data["name"] == ["add", "inc"]
Expand Down Expand Up @@ -1260,10 +1260,10 @@ async def test_aggregate_action(c, s, a, b):
mbk = AggregateAction(s)

da = pytest.importorskip("dask.array")
x = (da.ones((20, 20), chunks=(10, 10)) + 1).persist(optimize_graph=False)
x = c.persist(da.ones((20, 20), chunks=(10, 10)) + 1, optimize_graph=False)

await x
y = await dask.delayed(inc)(1).persist()
y = await c.persist(dask.delayed(inc)(1))
z = (x + x.T) - x.mean(axis=0)
await c.compute(z.sum())

Expand All @@ -1289,9 +1289,9 @@ async def test_compute_per_key(c, s, a, b):
mbk = ComputePerKey(s)

da = pytest.importorskip("dask.array")
x = (da.ones((20, 20), chunks=(10, 10)) + 1).persist(optimize_graph=False)
x = c.persist(da.ones((20, 20), chunks=(10, 10)) + 1, optimize_graph=False)
await x
y = await dask.delayed(inc)(1).persist()
y = await c.persist(dask.delayed(inc)(1))
z = (x + x.T) - x.mean(axis=0)
zsum = z.sum()
await c.compute(zsum)
Expand Down Expand Up @@ -1367,7 +1367,7 @@ async def test_shuffling(c, s, a, b):
freq="10 s",
)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
df2 = df.shuffle("x").persist()
df2 = c.persist(df.shuffle("x"))
start = time()
while not ss.source.data["comm_written"]:
await asyncio.gather(*[a.heartbeat(), b.heartbeat()])
Expand Down
40 changes: 20 additions & 20 deletions distributed/shuffle/tests/test_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ async def test_bad_disk(c, s, a, b):
)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
out = df.shuffle("x")
out = out.persist()
out = c.persist(out)
shuffle_id = await wait_until_new_shuffle_is_initialized(s)
while not get_active_shuffle_runs(a):
await asyncio.sleep(0.01)
Expand Down Expand Up @@ -1068,7 +1068,7 @@ async def test_heartbeat(c, s, a, b):
)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
out = df.shuffle("x")
out = out.persist()
out = c.persist(out)

while not s.plugins["shuffle"].heartbeats:
await asyncio.sleep(0.001)
Expand Down Expand Up @@ -1272,7 +1272,7 @@ async def test_head(c, s, a, b):
)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
out = df.shuffle("x")
out = await out.head(compute=False).persist() # Only ask for one key
out = await c.persist(out.head(compute=False)) # Only ask for one key

assert list(os.walk(a.local_directory)) == a_files # cleaned up files?
assert list(os.walk(b.local_directory)) == b_files
Expand Down Expand Up @@ -1380,7 +1380,7 @@ async def test_clean_after_forgotten_early(c, s, a, b):
)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
out = df.shuffle("x")
out = out.persist()
out = c.persist(out)
await wait_for_tasks_in_state("shuffle-transfer", "memory", 1, a)
await wait_for_tasks_in_state("shuffle-transfer", "memory", 1, b)
del out
Expand All @@ -1399,12 +1399,12 @@ async def test_tail(c, s, a, b):
)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
x = df.shuffle("x")
full = await x.persist()
full = await c.persist(x)
ntasks_full = len(s.tasks)
del full
while s.tasks:
await asyncio.sleep(0)
partial = await x.tail(compute=False).persist() # Only ask for one key
partial = await c.persist(x.tail(compute=False)) # Only ask for one key

assert len(s.tasks) < ntasks_full
del partial
Expand Down Expand Up @@ -1539,7 +1539,7 @@ async def test_crashed_worker_after_shuffle_persisted(c, s, a):
)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
out = df.shuffle("x")
out = out.persist()
out = c.persist(out)

await event.wait()
await out
Expand Down Expand Up @@ -1597,7 +1597,7 @@ async def test_new_worker(c, s, a, b):
)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
shuffled = df.shuffle("x")
persisted = shuffled.persist()
persisted = c.persist(shuffled)
while not s.plugins["shuffle"].active_shuffles:
await asyncio.sleep(0.001)

Expand Down Expand Up @@ -1646,7 +1646,7 @@ async def test_delete_some_results(c, s, a, b):
freq="10 s",
)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
x = df.shuffle("x").persist()
x = c.persist(df.shuffle("x"))
while not s.tasks or not any(ts.state == "memory" for ts in s.tasks.values()):
await asyncio.sleep(0.01)

Expand All @@ -1669,12 +1669,12 @@ async def test_add_some_results(c, s, a, b):
)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
x = df.shuffle("x")
y = x.partitions[: x.npartitions // 2].persist()
y = c.persist(x.partitions[: x.npartitions // 2])

while not s.tasks or not any(ts.state == "memory" for ts in s.tasks.values()):
await asyncio.sleep(0.01)

x = x.persist()
x = c.persist(x)

await c.compute(x.size)

Expand All @@ -1697,7 +1697,7 @@ async def test_clean_after_close(c, s, a, b):

with dask.config.set({"dataframe.shuffle.method": "p2p"}):
out = df.shuffle("x")
out = out.persist()
out = c.persist(out)

await wait_for_tasks_in_state("shuffle-transfer", "executing", 1, a)
await wait_for_tasks_in_state("shuffle-transfer", "memory", 1, b)
Expand Down Expand Up @@ -2079,7 +2079,7 @@ async def test_deduplicate_stale_transfer(c, s, a, b, wait_until_forgotten):
)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
shuffled = df.shuffle("x")
shuffled = shuffled.persist()
shuffled = c.persist(shuffled)

shuffle_extA = a.plugins["shuffle"]
shuffle_extB = b.plugins["shuffle"]
Expand Down Expand Up @@ -2131,7 +2131,7 @@ async def test_handle_stale_barrier(c, s, a, b, wait_until_forgotten):
)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
shuffled = df.shuffle("x")
shuffled = shuffled.persist()
shuffled = c.persist(shuffled)

shuffle_extA = a.plugins["shuffle"]
shuffle_extB = b.plugins["shuffle"]
Expand Down Expand Up @@ -2190,7 +2190,7 @@ async def test_shuffle_run_consistency(c, s, a):
# Initialize first shuffle execution
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
out = df.shuffle("x")
out = out.persist()
out = c.persist(out)

shuffle_id = await wait_until_new_shuffle_is_initialized(s)
spec = scheduler_ext.get(shuffle_id, a.worker_address)["run_spec"].data
Expand All @@ -2214,7 +2214,7 @@ async def test_shuffle_run_consistency(c, s, a):
# Initialize second shuffle execution
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
out = df.shuffle("x")
out = out.persist()
out = c.persist(out)

new_shuffle_id = await wait_until_new_shuffle_is_initialized(s)
assert shuffle_id == new_shuffle_id
Expand All @@ -2241,7 +2241,7 @@ async def test_shuffle_run_consistency(c, s, a):
# Create an unrelated shuffle on a different column
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
out = df.shuffle("y")
out = out.persist()
out = c.persist(out)
independent_shuffle_id = await wait_until_new_shuffle_is_initialized(s)
assert shuffle_id != independent_shuffle_id

Expand Down Expand Up @@ -2284,7 +2284,7 @@ async def test_fail_fetch_race(c, s, a):
)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
out = df.shuffle("x")
out = out.persist()
out = c.persist(out)

shuffle_id = await wait_until_new_shuffle_is_initialized(s)
spec = scheduler_ext.get(shuffle_id, a.worker_address)["run_spec"].data
Expand Down Expand Up @@ -2365,7 +2365,7 @@ async def test_replace_stale_shuffle(c, s, a, b):
# Initialize first shuffle execution
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
out = df.shuffle("x")
out = out.persist()
out = c.persist(out)

shuffle_id = await wait_until_new_shuffle_is_initialized(s)

Expand All @@ -2391,7 +2391,7 @@ async def test_replace_stale_shuffle(c, s, a, b):
# Initialize second shuffle execution
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
out = df.shuffle("x")
out = out.persist()
out = c.persist(out)

await wait_for_tasks_in_state("shuffle-transfer", "memory", 1, a)
await run_manager_B.finished_get_shuffle_run.wait()
Expand Down
Loading
Loading