diff --git a/distributed/client.py b/distributed/client.py index 004f64c2834..34df78c53ee 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -41,6 +41,7 @@ ensure_dict, format_bytes, funcname, + parse_bytes, parse_timedelta, shorten_traceback, typename, @@ -3162,7 +3163,9 @@ def _graph_to_futures( header, frames = serialize(ToPickle(dsk), on_error="raise") pickled_size = sum(map(nbytes, [header] + frames)) - if pickled_size > 10_000_000: + if pickled_size > parse_bytes( + dask.config.get("distributed.admin.large-graph-warning-threshold") + ): warnings.warn( f"Sending large graph of size {format_bytes(pickled_size)}.\n" "This may cause some slowdown.\n" diff --git a/distributed/distributed-schema.yaml b/distributed/distributed-schema.yaml index 71d3cd18570..45534e9be80 100644 --- a/distributed/distributed-schema.yaml +++ b/distributed/distributed-schema.yaml @@ -1103,6 +1103,12 @@ properties: description: | Options for logs, event loops, and so on properties: + large-graph-warning-threshold: + type: string + description: | + Threshold in bytes for when a warning is raised about a large + submitted task graph. + Default is 10MB. tick: type: object description: | diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index 1582cdc23e1..c36d0a21ea8 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -323,6 +323,7 @@ distributed: ################## admin: + large-graph-warning-threshold: 10MB # Threshold for warning on large graph tick: interval: 20ms # time between event loop health checks limit: 3s # time allowed before triggering a warning diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 67f79053d96..c00c2bbf2cd 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -5975,6 +5975,8 @@ async def test_config_scheduler_address(s, a, b): async def test_warn_when_submitting_large_values(c, s): with pytest.warns(UserWarning, match="Sending large graph of size"): future = c.submit(lambda x: x + 1, b"0" * 10_000_000) + with dask.config.set({"distributed.admin.large-graph-warning-threshold": "1GB"}): + future = c.submit(lambda x: x + 1, b"0" * 10_000_000) @gen_cluster(client=True, nthreads=[])