Skip to content

Commit 24a88f5

Browse files
Improve tests for P2P stable ordering (#8458)
1 parent c98cd09 commit 24a88f5

File tree

1 file changed

+53
-26
lines changed

1 file changed

+53
-26
lines changed

distributed/shuffle/tests/test_shuffle.py

+53-26
Original file line numberDiff line numberDiff line change
@@ -210,32 +210,6 @@ async def test_basic_integration(c, s, a, b, npartitions, disk):
210210
await check_scheduler_cleanup(s)
211211

212212

213-
@pytest.mark.parametrize("disk", [True, False])
214-
@gen_cluster(client=True)
215-
async def test_stable_ordering(c, s, a, b, disk):
216-
df = dask.datasets.timeseries(
217-
start="2000-01-01",
218-
end="2000-02-01",
219-
dtypes={"x": int, "y": int},
220-
freq="10 s",
221-
)
222-
df["x"] = df["x"] % 19
223-
df["y"] = df["y"] % 23
224-
with dask.config.set(
225-
{"dataframe.shuffle.method": "p2p", "distributed.p2p.disk": disk}
226-
):
227-
shuffled = dd.shuffle.shuffle(df, "x")
228-
result, expected = await c.compute([shuffled, df], sync=True)
229-
dd.assert_eq(
230-
result.drop_duplicates("x", keep="first"),
231-
expected.drop_duplicates("x", keep="first"),
232-
)
233-
234-
await check_worker_cleanup(a)
235-
await check_worker_cleanup(b)
236-
await check_scheduler_cleanup(s)
237-
238-
239213
@pytest.mark.parametrize("processes", [True, False])
240214
@gen_test()
241215
async def test_basic_integration_local_cluster(processes):
@@ -2725,3 +2699,56 @@ async def test_barrier_handles_stale_resumed_transfer(c, s, *workers):
27252699
await wait_for_tasks_in_state("shuffle-transfer", "resumed", 1, barrier_worker)
27262700
barrier_worker.block_gather_dep.set()
27272701
await out
2702+
2703+
2704+
@pytest.mark.parametrize("disk", [True, False])
2705+
@pytest.mark.parametrize("keep", ["first", "last"])
2706+
@gen_cluster(client=True)
2707+
async def test_shuffle_stable_ordering(c, s, a, b, keep, disk):
2708+
"""Ensures that shuffling guarantees ordering for individual entries
2709+
belonging to the same shuffle key"""
2710+
2711+
def make_partition(partition_id, size):
2712+
"""Return null column for every other partition"""
2713+
offset = partition_id * size
2714+
df = pd.DataFrame({"a": np.arange(start=offset, stop=offset + size)})
2715+
df["b"] = df["a"] % 23
2716+
return df
2717+
2718+
df = dd.from_map(make_partition, np.arange(19), args=(250,))
2719+
2720+
with dask.config.set(
2721+
{"dataframe.shuffle.method": "p2p", "distributed.p2p.disk": disk}
2722+
):
2723+
shuffled = df.shuffle("b")
2724+
result, expected = await c.compute([shuffled, df], sync=True)
2725+
dd.assert_eq(result, expected)
2726+
2727+
for _, group in result.groupby("b"):
2728+
assert group["a"].is_monotonic_increasing
2729+
2730+
await check_worker_cleanup(a)
2731+
await check_worker_cleanup(b)
2732+
await check_scheduler_cleanup(s)
2733+
2734+
2735+
@pytest.mark.parametrize("disk", [True, False])
2736+
@pytest.mark.parametrize("keep", ["first", "last"])
2737+
@gen_cluster(client=True)
2738+
async def test_drop_duplicates_stable_ordering(c, s, a, b, keep, disk):
2739+
df = dask.datasets.timeseries()
2740+
2741+
with dask.config.set(
2742+
{"dataframe.shuffle.method": "p2p", "distributed.p2p.disk": disk}
2743+
):
2744+
result, expected = await c.compute(
2745+
[
2746+
df.drop_duplicates(
2747+
subset=["name"], keep=keep, split_out=df.npartitions
2748+
),
2749+
df,
2750+
],
2751+
sync=True,
2752+
)
2753+
expected = expected.drop_duplicates(subset=["name"], keep=keep)
2754+
dd.assert_eq(result, expected)

0 commit comments

Comments
 (0)