1
1
from __future__ import annotations
2
2
3
+ import asyncio
3
4
import os
4
5
5
6
import pytest
@@ -24,22 +25,27 @@ def force_spill():
24
25
25
26
manager = get_global_manager ()
26
27
27
- # 24 bytes
28
+ # Allocate a new dataframe and trigger spilling by setting a 1 byte limit
28
29
df = cudf .DataFrame ({"a" : [1 , 2 , 3 ]})
30
+ manager .spill_to_device_limit (1 )
29
31
30
- return manager .spill_to_device_limit (1 )
32
+ # Get bytes spilled from GPU to CPU
33
+ spill_totals , _ = get_global_manager ().statistics .spill_totals [("gpu" , "cpu" )]
34
+ return spill_totals
31
35
32
36
33
37
@gen_cluster (
34
38
client = True ,
35
39
nthreads = [("127.0.0.1" , 1 )],
36
40
)
37
- @pytest .mark .flaky (reruns = 10 , reruns_delay = 5 )
38
41
async def test_cudf_metrics (c , s , * workers ):
39
42
w = list (s .workers .values ())[0 ]
40
43
assert "cudf" in w .metrics
41
44
assert w .metrics ["cudf" ]["cudf-spilled" ] == 0
42
45
43
- await c .run (force_spill )
44
-
45
- assert w .metrics ["cudf" ]["cudf-spilled" ] == 24
46
+ spill_totals = (await c .run (force_spill , workers = [w .address ]))[w .address ]
47
+ assert spill_totals > 0
48
+ # We have to wait for the worker's metrics to update.
49
+ # TODO: avoid sleep, is it possible to wait on the next update of metrics?
50
+ await asyncio .sleep (1 )
51
+ assert w .metrics ["cudf" ]["cudf-spilled" ] == spill_totals
0 commit comments