Add redirect from prefix root to status #14611
15 errors, 212 fail, 111 skipped, 3 744 pass in 3h 18m 7s
20 files 20 suites 3h 18m 7s ⏱️
4 082 tests 3 744 ✅ 111 💤 212 ❌ 15 🔥
15 396 runs 13 817 ✅ 1 033 💤 506 ❌ 40 🔥
Results for commit 54eec97.
Annotations
github-actions / Unit Test Results
10 out of 16 runs with error: distributed.dashboard.tests.test_scheduler_bokeh
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-no_queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-notci1/pytest.xml [took 0s]
Raw output
collection failure
C:\Users\runneradmin\miniconda3\envs\dask-distributed\lib\site-packages\_pytest\python.py:493: in importtestmodule
mod = import_path(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\lib\site-packages\_pytest\pathlib.py:587: in import_path
importlib.import_module(module_name)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\lib\importlib\__init__.py:126: in import_module
return _bootstrap._gcd_import(name[level:], package, level)
<frozen importlib._bootstrap>:1050: in _gcd_import
???
<frozen importlib._bootstrap>:1027: in _find_and_load
???
<frozen importlib._bootstrap>:1006: in _find_and_load_unlocked
???
<frozen importlib._bootstrap>:688: in _load_unlocked
???
C:\Users\runneradmin\miniconda3\envs\dask-distributed\lib\site-packages\_pytest\assertion\rewrite.py:184: in exec_module
exec(co, module.__dict__)
distributed\dashboard\tests\test_scheduler_bokeh.py:24: in <module>
from distributed.dashboard import scheduler
E File "D:\a\distributed\distributed\distributed\dashboard\scheduler.py", line 186
E rf"/{prefix.strip("/")}/?",
E ^
E SyntaxError: f-string: unmatched '('
Check warning on line 0 in distributed.cli.tests.test_dask_worker
github-actions / Unit Test Results
2 out of 3 runs failed: test_error_during_startup[--nanny] (distributed.cli.tests.test_dask_worker)
artifacts/ubuntu-latest-mindeps-default-notci1/pytest.xml [took 30s]
artifacts/ubuntu-latest-mindeps-numpy-notci1/pytest.xml [took 30s]
Raw output
OSError: Timed out trying to connect to tcp://127.0.0.1:46313 after 30 s
ConnectionRefusedError: [Errno 111] Connection refused
The above exception was the direct cause of the following exception:
addr = 'tcp://127.0.0.1:46313', timeout = 30, deserialize = True
handshake_overrides = None
connection_args = {'extra_conn_args': {}, 'require_encryption': False, 'ssl_context': None}
scheme = 'tcp', loc = '127.0.0.1:46313'
backend = <distributed.comm.tcp.TCPBackend object at 0x7f9763940430>
connector = <distributed.comm.tcp.TCPConnector object at 0x7f9754a62110>
comm = None, time_left = <function connect.<locals>.time_left at 0x7f9754ad0f70>
backoff_base = 0.01
async def connect(
addr, timeout=None, deserialize=True, handshake_overrides=None, **connection_args
):
"""
Connect to the given address (a URI such as ``tcp://127.0.0.1:1234``)
and yield a ``Comm`` object. If the connection attempt fails, it is
retried until the *timeout* is expired.
"""
if timeout is None:
timeout = dask.config.get("distributed.comm.timeouts.connect")
timeout = parse_timedelta(timeout, default="seconds")
scheme, loc = parse_address(addr)
backend = registry.get_backend(scheme)
connector = backend.get_connector()
comm = None
start = time()
def time_left():
deadline = start + timeout
return max(0, deadline - time())
backoff_base = 0.01
attempt = 0
logger.debug("Establishing connection to %s", loc)
# Prefer multiple small attempts than one long attempt. This should protect
# primarily from DNS race conditions
# gh3104, gh4176, gh4167
intermediate_cap = timeout / 5
active_exception = None
while time_left() > 0:
try:
> comm = await wait_for(
connector.connect(loc, deserialize=deserialize, **connection_args),
timeout=min(intermediate_cap, time_left()),
)
distributed/comm/core.py:342:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/utils.py:1915: in wait_for
return await asyncio.wait_for(fut, timeout)
../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/tasks.py:445: in wait_for
return fut.result()
distributed/comm/tcp.py:560: in connect
convert_stream_closed_error(self, e)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
obj = <distributed.comm.tcp.TCPConnector object at 0x7f9754a62110>
exc = ConnectionRefusedError(111, 'Connection refused')
def convert_stream_closed_error(obj, exc):
"""
Re-raise StreamClosedError or SSLError as CommClosedError.
"""
if hasattr(exc, "real_error"):
# The stream was closed because of an underlying OS error
if exc.real_error is None:
raise CommClosedError(f"in {obj}: {exc}") from exc
exc = exc.real_error
if isinstance(exc, ssl.SSLError):
if exc.reason and "UNKNOWN_CA" in exc.reason:
raise FatalCommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}")
> raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
E distributed.comm.core.CommClosedError: in <distributed.comm.tcp.TCPConnector object at 0x7f9754a62110>: ConnectionRefusedError: [Errno 111] Connection refused
distributed/comm/tcp.py:143: CommClosedError
The above exception was the direct cause of the following exception:
monkeypatch = <_pytest.monkeypatch.MonkeyPatch object at 0x7f97555a1360>
nanny = '--nanny'
loop = <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7f9754a60c70>
@pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"])
def test_error_during_startup(monkeypatch, nanny, loop):
# see https://github.com/dask/distributed/issues/6320
scheduler_port = open_port()
scheduler_addr = f"tcp://127.0.0.1:{scheduler_port}"
monkeypatch.setenv("DASK_SCHEDULER_ADDRESS", scheduler_addr)
with popen(
[
sys.executable,
"-m",
"dask",
"scheduler",
f"--port={scheduler_port}",
"--dashboard-address=:0",
],
):
> with Client(scheduler_addr, loop=loop) as c:
distributed/cli/tests/test_dask_worker.py:801:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/client.py:1230: in __init__
self.start(timeout=timeout)
distributed/client.py:1432: in start
sync(self.loop, self._start, **kwargs)
distributed/utils.py:439: in sync
raise error
distributed/utils.py:413: in f
result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/tornado/gen.py:769: in run
value = future.result()
distributed/client.py:1511: in _start
await self._ensure_connected(timeout=timeout)
distributed/client.py:1579: in _ensure_connected
comm = await connect(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
addr = 'tcp://127.0.0.1:46313', timeout = 30, deserialize = True
handshake_overrides = None
connection_args = {'extra_conn_args': {}, 'require_encryption': False, 'ssl_context': None}
scheme = 'tcp', loc = '127.0.0.1:46313'
backend = <distributed.comm.tcp.TCPBackend object at 0x7f9763940430>
connector = <distributed.comm.tcp.TCPConnector object at 0x7f9754a62110>
comm = None, time_left = <function connect.<locals>.time_left at 0x7f9754ad0f70>
backoff_base = 0.01
async def connect(
addr, timeout=None, deserialize=True, handshake_overrides=None, **connection_args
):
"""
Connect to the given address (a URI such as ``tcp://127.0.0.1:1234``)
and yield a ``Comm`` object. If the connection attempt fails, it is
retried until the *timeout* is expired.
"""
if timeout is None:
timeout = dask.config.get("distributed.comm.timeouts.connect")
timeout = parse_timedelta(timeout, default="seconds")
scheme, loc = parse_address(addr)
backend = registry.get_backend(scheme)
connector = backend.get_connector()
comm = None
start = time()
def time_left():
deadline = start + timeout
return max(0, deadline - time())
backoff_base = 0.01
attempt = 0
logger.debug("Establishing connection to %s", loc)
# Prefer multiple small attempts than one long attempt. This should protect
# primarily from DNS race conditions
# gh3104, gh4176, gh4167
intermediate_cap = timeout / 5
active_exception = None
while time_left() > 0:
try:
comm = await wait_for(
connector.connect(loc, deserialize=deserialize, **connection_args),
timeout=min(intermediate_cap, time_left()),
)
break
except FatalCommClosedError:
raise
# Note: CommClosed inherits from OSError
except (asyncio.TimeoutError, OSError) as exc:
active_exception = exc
# As described above, the intermediate timeout is used to distributed
# initial, bulk connect attempts homogeneously. In particular with
# the jitter upon retries we should not be worred about overloading
# any more DNS servers
intermediate_cap = timeout
# FullJitter see https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
upper_cap = min(time_left(), backoff_base * (2**attempt))
backoff = random.uniform(0, upper_cap)
attempt += 1
logger.debug(
"Could not connect to %s, waiting for %s before retrying", loc, backoff
)
await asyncio.sleep(backoff)
else:
> raise OSError(
f"Timed out trying to connect to {addr} after {timeout} s"
) from active_exception
E OSError: Timed out trying to connect to tcp://127.0.0.1:46313 after 30 s
distributed/comm/core.py:368: OSError
Check warning on line 0 in distributed.cli.tests.test_dask_worker
github-actions / Unit Test Results
2 out of 3 runs failed: test_error_during_startup[--no-nanny] (distributed.cli.tests.test_dask_worker)
artifacts/ubuntu-latest-mindeps-default-notci1/pytest.xml [took 30s]
artifacts/ubuntu-latest-mindeps-numpy-notci1/pytest.xml [took 30s]
Raw output
OSError: Timed out trying to connect to tcp://127.0.0.1:57173 after 30 s
ConnectionRefusedError: [Errno 111] Connection refused
The above exception was the direct cause of the following exception:
addr = 'tcp://127.0.0.1:57173', timeout = 30, deserialize = True
handshake_overrides = None
connection_args = {'extra_conn_args': {}, 'require_encryption': False, 'ssl_context': None}
scheme = 'tcp', loc = '127.0.0.1:57173'
backend = <distributed.comm.tcp.TCPBackend object at 0x7f9763940430>
connector = <distributed.comm.tcp.TCPConnector object at 0x7f9755a43250>
comm = None, time_left = <function connect.<locals>.time_left at 0x7f975547b400>
backoff_base = 0.01
async def connect(
addr, timeout=None, deserialize=True, handshake_overrides=None, **connection_args
):
"""
Connect to the given address (a URI such as ``tcp://127.0.0.1:1234``)
and yield a ``Comm`` object. If the connection attempt fails, it is
retried until the *timeout* is expired.
"""
if timeout is None:
timeout = dask.config.get("distributed.comm.timeouts.connect")
timeout = parse_timedelta(timeout, default="seconds")
scheme, loc = parse_address(addr)
backend = registry.get_backend(scheme)
connector = backend.get_connector()
comm = None
start = time()
def time_left():
deadline = start + timeout
return max(0, deadline - time())
backoff_base = 0.01
attempt = 0
logger.debug("Establishing connection to %s", loc)
# Prefer multiple small attempts than one long attempt. This should protect
# primarily from DNS race conditions
# gh3104, gh4176, gh4167
intermediate_cap = timeout / 5
active_exception = None
while time_left() > 0:
try:
> comm = await wait_for(
connector.connect(loc, deserialize=deserialize, **connection_args),
timeout=min(intermediate_cap, time_left()),
)
distributed/comm/core.py:342:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/utils.py:1915: in wait_for
return await asyncio.wait_for(fut, timeout)
../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/tasks.py:445: in wait_for
return fut.result()
distributed/comm/tcp.py:560: in connect
convert_stream_closed_error(self, e)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
obj = <distributed.comm.tcp.TCPConnector object at 0x7f9755a43250>
exc = ConnectionRefusedError(111, 'Connection refused')
def convert_stream_closed_error(obj, exc):
"""
Re-raise StreamClosedError or SSLError as CommClosedError.
"""
if hasattr(exc, "real_error"):
# The stream was closed because of an underlying OS error
if exc.real_error is None:
raise CommClosedError(f"in {obj}: {exc}") from exc
exc = exc.real_error
if isinstance(exc, ssl.SSLError):
if exc.reason and "UNKNOWN_CA" in exc.reason:
raise FatalCommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}")
> raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
E distributed.comm.core.CommClosedError: in <distributed.comm.tcp.TCPConnector object at 0x7f9755a43250>: ConnectionRefusedError: [Errno 111] Connection refused
distributed/comm/tcp.py:143: CommClosedError
The above exception was the direct cause of the following exception:
monkeypatch = <_pytest.monkeypatch.MonkeyPatch object at 0x7f9755a43e80>
nanny = '--no-nanny'
loop = <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7f9755a42650>
@pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"])
def test_error_during_startup(monkeypatch, nanny, loop):
# see https://github.com/dask/distributed/issues/6320
scheduler_port = open_port()
scheduler_addr = f"tcp://127.0.0.1:{scheduler_port}"
monkeypatch.setenv("DASK_SCHEDULER_ADDRESS", scheduler_addr)
with popen(
[
sys.executable,
"-m",
"dask",
"scheduler",
f"--port={scheduler_port}",
"--dashboard-address=:0",
],
):
> with Client(scheduler_addr, loop=loop) as c:
distributed/cli/tests/test_dask_worker.py:801:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/client.py:1230: in __init__
self.start(timeout=timeout)
distributed/client.py:1432: in start
sync(self.loop, self._start, **kwargs)
distributed/utils.py:439: in sync
raise error
distributed/utils.py:413: in f
result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/tornado/gen.py:769: in run
value = future.result()
distributed/client.py:1511: in _start
await self._ensure_connected(timeout=timeout)
distributed/client.py:1579: in _ensure_connected
comm = await connect(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
addr = 'tcp://127.0.0.1:57173', timeout = 30, deserialize = True
handshake_overrides = None
connection_args = {'extra_conn_args': {}, 'require_encryption': False, 'ssl_context': None}
scheme = 'tcp', loc = '127.0.0.1:57173'
backend = <distributed.comm.tcp.TCPBackend object at 0x7f9763940430>
connector = <distributed.comm.tcp.TCPConnector object at 0x7f9755a43250>
comm = None, time_left = <function connect.<locals>.time_left at 0x7f975547b400>
backoff_base = 0.01
async def connect(
addr, timeout=None, deserialize=True, handshake_overrides=None, **connection_args
):
"""
Connect to the given address (a URI such as ``tcp://127.0.0.1:1234``)
and yield a ``Comm`` object. If the connection attempt fails, it is
retried until the *timeout* is expired.
"""
if timeout is None:
timeout = dask.config.get("distributed.comm.timeouts.connect")
timeout = parse_timedelta(timeout, default="seconds")
scheme, loc = parse_address(addr)
backend = registry.get_backend(scheme)
connector = backend.get_connector()
comm = None
start = time()
def time_left():
deadline = start + timeout
return max(0, deadline - time())
backoff_base = 0.01
attempt = 0
logger.debug("Establishing connection to %s", loc)
# Prefer multiple small attempts than one long attempt. This should protect
# primarily from DNS race conditions
# gh3104, gh4176, gh4167
intermediate_cap = timeout / 5
active_exception = None
while time_left() > 0:
try:
comm = await wait_for(
connector.connect(loc, deserialize=deserialize, **connection_args),
timeout=min(intermediate_cap, time_left()),
)
break
except FatalCommClosedError:
raise
# Note: CommClosed inherits from OSError
except (asyncio.TimeoutError, OSError) as exc:
active_exception = exc
# As described above, the intermediate timeout is used to distributed
# initial, bulk connect attempts homogeneously. In particular with
# the jitter upon retries we should not be worred about overloading
# any more DNS servers
intermediate_cap = timeout
# FullJitter see https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
upper_cap = min(time_left(), backoff_base * (2**attempt))
backoff = random.uniform(0, upper_cap)
attempt += 1
logger.debug(
"Could not connect to %s, waiting for %s before retrying", loc, backoff
)
await asyncio.sleep(backoff)
else:
> raise OSError(
f"Timed out trying to connect to {addr} after {timeout} s"
) from active_exception
E OSError: Timed out trying to connect to tcp://127.0.0.1:57173 after 30 s
distributed/comm/core.py:368: OSError
Check warning on line 0 in distributed.comm.tests.test_ws
github-actions / Unit Test Results
2 out of 3 runs failed: test_expect_scheduler_ssl_when_sharing_server (distributed.comm.tests.test_ws)
artifacts/ubuntu-latest-mindeps-default-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-notci1/pytest.xml [took 0s]
Raw output
File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
rf"/{prefix.strip("/")}/?",
^
SyntaxError: f-string: unmatched '('
tmp_path = PosixPath('/tmp/pytest-of-runner/pytest-0/test_expect_scheduler_ssl_when2')
@gen_test()
async def test_expect_scheduler_ssl_when_sharing_server(tmp_path):
xfail_ssl_issue5601()
pytest.importorskip("cryptography")
security = Security.temporary()
key_path = os.path.join(str(tmp_path), "dask.pem")
cert_path = os.path.join(str(tmp_path), "dask.crt")
with open(key_path, "w") as f:
f.write(security.tls_scheduler_key)
with open(cert_path, "w") as f:
f.write(security.tls_scheduler_cert)
c = {
"distributed.scheduler.dashboard.tls.key": key_path,
"distributed.scheduler.dashboard.tls.cert": cert_path,
}
with dask.config.set(c):
with pytest.raises(RuntimeError):
> async with Scheduler(protocol="ws://", dashboard=True, port=8787):
distributed/comm/tests/test_ws.py:110:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Scheduler' object has no attribute '_address'") raised in repr()] Scheduler object at 0x7f97563517a0>
loop = None, services = None, service_kwargs = None, allowed_failures = 3
extensions = None, validate = True, scheduler_file = None, security = None
worker_ttl = None, idle_timeout = None, interface = None, host = None
port = 8787, protocol = 'ws://', dashboard_address = None, dashboard = True
http_prefix = '/', preload = [], preload_argv = [], plugins = ()
contact_address = None, transition_counter_max = False, jupyter = False
kwargs = {}
http_server_modules = ['distributed.http.scheduler.prometheus', 'distributed.http.scheduler.info', 'distributed.http.scheduler.json', 'distributed.http.health', 'distributed.http.proxy', 'distributed.http.statics']
show_dashboard = True
def __init__(
self,
loop=None,
services=None,
service_kwargs=None,
allowed_failures=None,
extensions=None,
validate=None,
scheduler_file=None,
security=None,
worker_ttl=None,
idle_timeout=None,
interface=None,
host=None,
port=0,
protocol=None,
dashboard_address=None,
dashboard=None,
http_prefix="/",
preload=None,
preload_argv=(),
plugins=(),
contact_address=None,
transition_counter_max=False,
jupyter=False,
**kwargs,
):
if dask.config.get("distributed.scheduler.pickle", default=True) is False:
raise RuntimeError(
"Pickling can no longer be disabled with the `distributed.scheduler.pickle` option. Please remove this configuration to start the scheduler."
)
if loop is not None:
warnings.warn(
"the loop kwarg to Scheduler is deprecated",
DeprecationWarning,
stacklevel=2,
)
self.loop = self.io_loop = IOLoop.current()
self._setup_logging(logger)
# Attributes
if contact_address is None:
contact_address = dask.config.get("distributed.scheduler.contact-address")
self.contact_address = contact_address
if allowed_failures is None:
allowed_failures = dask.config.get("distributed.scheduler.allowed-failures")
self.allowed_failures = allowed_failures
if validate is None:
validate = dask.config.get("distributed.scheduler.validate")
self.proc = psutil.Process()
self.service_specs = services or {}
self.service_kwargs = service_kwargs or {}
self.services = {}
self.scheduler_file = scheduler_file
self.worker_ttl = parse_timedelta(
worker_ttl or dask.config.get("distributed.scheduler.worker-ttl")
)
self.idle_timeout = parse_timedelta(
idle_timeout or dask.config.get("distributed.scheduler.idle-timeout")
)
self.idle_since = time()
self.no_workers_timeout = parse_timedelta(
dask.config.get("distributed.scheduler.no-workers-timeout")
)
self._no_workers_since = None
self.time_started = self.idle_since # compatibility for dask-gateway
self._replica_lock = RLock()
self.bandwidth_workers = defaultdict(float)
self.bandwidth_types = defaultdict(float)
# Don't cast int metrics to float
self.cumulative_worker_metrics = defaultdict(int)
if not preload:
preload = dask.config.get("distributed.scheduler.preload")
if not preload_argv:
preload_argv = dask.config.get("distributed.scheduler.preload-argv")
self.preloads = preloading.process_preloads(self, preload, preload_argv)
if isinstance(security, dict):
security = Security(**security)
self.security = security or Security()
assert isinstance(self.security, Security)
self.connection_args = self.security.get_connection_args("scheduler")
self.connection_args["handshake_overrides"] = { # common denominator
"pickle-protocol": 4
}
self._start_address = addresses_from_user_args(
host=host,
port=port,
interface=interface,
protocol=protocol,
security=security,
default_port=self.default_port,
)
http_server_modules = dask.config.get("distributed.scheduler.http.routes")
show_dashboard = dashboard or (dashboard is None and dashboard_address)
# install vanilla route if show_dashboard but bokeh is not installed
if show_dashboard:
try:
> import distributed.dashboard.scheduler
E File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
E rf"/{prefix.strip("/")}/?",
E ^
E SyntaxError: f-string: unmatched '('
distributed/scheduler.py:3783: SyntaxError
Check warning on line 0 in distributed.comm.tests.test_ws
github-actions / Unit Test Results
2 out of 3 runs failed: test_http_and_comm_server[True-ws://-None-8787] (distributed.comm.tests.test_ws)
artifacts/ubuntu-latest-mindeps-default-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-notci1/pytest.xml [took 0s]
Raw output
File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
rf"/{prefix.strip("/")}/?",
^
SyntaxError: f-string: unmatched '('
dashboard = True, protocol = 'ws://', security = None, port = 8787
@pytest.mark.parametrize(
"dashboard,protocol,security,port",
[
(True, "ws://", None, 8787),
(True, "wss://", True, 8787),
(False, "ws://", None, 8787),
(False, "wss://", True, 8787),
(True, "ws://", None, 8786),
(True, "wss://", True, 8786),
(False, "ws://", None, 8786),
(False, "wss://", True, 8786),
],
)
@gen_test()
async def test_http_and_comm_server(dashboard, protocol, security, port):
if security:
xfail_ssl_issue5601()
pytest.importorskip("cryptography")
security = Security.temporary()
> async with Scheduler(
protocol=protocol, dashboard=dashboard, port=port, security=security
) as s:
distributed/comm/tests/test_ws.py:156:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Scheduler' object has no attribute '_address'") raised in repr()] Scheduler object at 0x7f9756352400>
loop = None, services = None, service_kwargs = None, allowed_failures = 3
extensions = None, validate = True, scheduler_file = None, security = None
worker_ttl = None, idle_timeout = None, interface = None, host = None
port = 8787, protocol = 'ws://', dashboard_address = None, dashboard = True
http_prefix = '/', preload = [], preload_argv = [], plugins = ()
contact_address = None, transition_counter_max = False, jupyter = False
kwargs = {}
http_server_modules = ['distributed.http.scheduler.prometheus', 'distributed.http.scheduler.info', 'distributed.http.scheduler.json', 'distributed.http.health', 'distributed.http.proxy', 'distributed.http.statics']
show_dashboard = True
def __init__(
self,
loop=None,
services=None,
service_kwargs=None,
allowed_failures=None,
extensions=None,
validate=None,
scheduler_file=None,
security=None,
worker_ttl=None,
idle_timeout=None,
interface=None,
host=None,
port=0,
protocol=None,
dashboard_address=None,
dashboard=None,
http_prefix="/",
preload=None,
preload_argv=(),
plugins=(),
contact_address=None,
transition_counter_max=False,
jupyter=False,
**kwargs,
):
if dask.config.get("distributed.scheduler.pickle", default=True) is False:
raise RuntimeError(
"Pickling can no longer be disabled with the `distributed.scheduler.pickle` option. Please remove this configuration to start the scheduler."
)
if loop is not None:
warnings.warn(
"the loop kwarg to Scheduler is deprecated",
DeprecationWarning,
stacklevel=2,
)
self.loop = self.io_loop = IOLoop.current()
self._setup_logging(logger)
# Attributes
if contact_address is None:
contact_address = dask.config.get("distributed.scheduler.contact-address")
self.contact_address = contact_address
if allowed_failures is None:
allowed_failures = dask.config.get("distributed.scheduler.allowed-failures")
self.allowed_failures = allowed_failures
if validate is None:
validate = dask.config.get("distributed.scheduler.validate")
self.proc = psutil.Process()
self.service_specs = services or {}
self.service_kwargs = service_kwargs or {}
self.services = {}
self.scheduler_file = scheduler_file
self.worker_ttl = parse_timedelta(
worker_ttl or dask.config.get("distributed.scheduler.worker-ttl")
)
self.idle_timeout = parse_timedelta(
idle_timeout or dask.config.get("distributed.scheduler.idle-timeout")
)
self.idle_since = time()
self.no_workers_timeout = parse_timedelta(
dask.config.get("distributed.scheduler.no-workers-timeout")
)
self._no_workers_since = None
self.time_started = self.idle_since # compatibility for dask-gateway
self._replica_lock = RLock()
self.bandwidth_workers = defaultdict(float)
self.bandwidth_types = defaultdict(float)
# Don't cast int metrics to float
self.cumulative_worker_metrics = defaultdict(int)
if not preload:
preload = dask.config.get("distributed.scheduler.preload")
if not preload_argv:
preload_argv = dask.config.get("distributed.scheduler.preload-argv")
self.preloads = preloading.process_preloads(self, preload, preload_argv)
if isinstance(security, dict):
security = Security(**security)
self.security = security or Security()
assert isinstance(self.security, Security)
self.connection_args = self.security.get_connection_args("scheduler")
self.connection_args["handshake_overrides"] = { # common denominator
"pickle-protocol": 4
}
self._start_address = addresses_from_user_args(
host=host,
port=port,
interface=interface,
protocol=protocol,
security=security,
default_port=self.default_port,
)
http_server_modules = dask.config.get("distributed.scheduler.http.routes")
show_dashboard = dashboard or (dashboard is None and dashboard_address)
# install vanilla route if show_dashboard but bokeh is not installed
if show_dashboard:
try:
> import distributed.dashboard.scheduler
E File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
E rf"/{prefix.strip("/")}/?",
E ^
E SyntaxError: f-string: unmatched '('
distributed/scheduler.py:3783: SyntaxError
Check warning on line 0 in distributed.comm.tests.test_ws
github-actions / Unit Test Results
2 out of 3 runs failed: test_http_and_comm_server[True-wss://-True-8787] (distributed.comm.tests.test_ws)
artifacts/ubuntu-latest-mindeps-default-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-notci1/pytest.xml [took 0s]
Raw output
File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
rf"/{prefix.strip("/")}/?",
^
SyntaxError: f-string: unmatched '('
dashboard = True, protocol = 'wss://'
security = Security(require_encryption=True, tls_ca_file=Temporary (In-memory), tls_client_cert=Temporary (In-memory), tls_client..., tls_scheduler_key=Temporary (In-memory), tls_worker_cert=Temporary (In-memory), tls_worker_key=Temporary (In-memory))
port = 8787
@pytest.mark.parametrize(
"dashboard,protocol,security,port",
[
(True, "ws://", None, 8787),
(True, "wss://", True, 8787),
(False, "ws://", None, 8787),
(False, "wss://", True, 8787),
(True, "ws://", None, 8786),
(True, "wss://", True, 8786),
(False, "ws://", None, 8786),
(False, "wss://", True, 8786),
],
)
@gen_test()
async def test_http_and_comm_server(dashboard, protocol, security, port):
if security:
xfail_ssl_issue5601()
pytest.importorskip("cryptography")
security = Security.temporary()
> async with Scheduler(
protocol=protocol, dashboard=dashboard, port=port, security=security
) as s:
distributed/comm/tests/test_ws.py:156:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Scheduler' object has no attribute '_address'") raised in repr()] Scheduler object at 0x7f97563526c0>
loop = None, services = None, service_kwargs = None, allowed_failures = 3
extensions = None, validate = True, scheduler_file = None
security = Security(require_encryption=True, tls_ca_file=Temporary (In-memory), tls_client_cert=Temporary (In-memory), tls_client..., tls_scheduler_key=Temporary (In-memory), tls_worker_cert=Temporary (In-memory), tls_worker_key=Temporary (In-memory))
worker_ttl = None, idle_timeout = None, interface = None, host = None
port = 8787, protocol = 'wss://', dashboard_address = None, dashboard = True
http_prefix = '/', preload = [], preload_argv = [], plugins = ()
contact_address = None, transition_counter_max = False, jupyter = False
kwargs = {}
http_server_modules = ['distributed.http.scheduler.prometheus', 'distributed.http.scheduler.info', 'distributed.http.scheduler.json', 'distributed.http.health', 'distributed.http.proxy', 'distributed.http.statics']
show_dashboard = True
def __init__(
self,
loop=None,
services=None,
service_kwargs=None,
allowed_failures=None,
extensions=None,
validate=None,
scheduler_file=None,
security=None,
worker_ttl=None,
idle_timeout=None,
interface=None,
host=None,
port=0,
protocol=None,
dashboard_address=None,
dashboard=None,
http_prefix="/",
preload=None,
preload_argv=(),
plugins=(),
contact_address=None,
transition_counter_max=False,
jupyter=False,
**kwargs,
):
if dask.config.get("distributed.scheduler.pickle", default=True) is False:
raise RuntimeError(
"Pickling can no longer be disabled with the `distributed.scheduler.pickle` option. Please remove this configuration to start the scheduler."
)
if loop is not None:
warnings.warn(
"the loop kwarg to Scheduler is deprecated",
DeprecationWarning,
stacklevel=2,
)
self.loop = self.io_loop = IOLoop.current()
self._setup_logging(logger)
# Attributes
if contact_address is None:
contact_address = dask.config.get("distributed.scheduler.contact-address")
self.contact_address = contact_address
if allowed_failures is None:
allowed_failures = dask.config.get("distributed.scheduler.allowed-failures")
self.allowed_failures = allowed_failures
if validate is None:
validate = dask.config.get("distributed.scheduler.validate")
self.proc = psutil.Process()
self.service_specs = services or {}
self.service_kwargs = service_kwargs or {}
self.services = {}
self.scheduler_file = scheduler_file
self.worker_ttl = parse_timedelta(
worker_ttl or dask.config.get("distributed.scheduler.worker-ttl")
)
self.idle_timeout = parse_timedelta(
idle_timeout or dask.config.get("distributed.scheduler.idle-timeout")
)
self.idle_since = time()
self.no_workers_timeout = parse_timedelta(
dask.config.get("distributed.scheduler.no-workers-timeout")
)
self._no_workers_since = None
self.time_started = self.idle_since # compatibility for dask-gateway
self._replica_lock = RLock()
self.bandwidth_workers = defaultdict(float)
self.bandwidth_types = defaultdict(float)
# Don't cast int metrics to float
self.cumulative_worker_metrics = defaultdict(int)
if not preload:
preload = dask.config.get("distributed.scheduler.preload")
if not preload_argv:
preload_argv = dask.config.get("distributed.scheduler.preload-argv")
self.preloads = preloading.process_preloads(self, preload, preload_argv)
if isinstance(security, dict):
security = Security(**security)
self.security = security or Security()
assert isinstance(self.security, Security)
self.connection_args = self.security.get_connection_args("scheduler")
self.connection_args["handshake_overrides"] = { # common denominator
"pickle-protocol": 4
}
self._start_address = addresses_from_user_args(
host=host,
port=port,
interface=interface,
protocol=protocol,
security=security,
default_port=self.default_port,
)
http_server_modules = dask.config.get("distributed.scheduler.http.routes")
show_dashboard = dashboard or (dashboard is None and dashboard_address)
# install vanilla route if show_dashboard but bokeh is not installed
if show_dashboard:
try:
> import distributed.dashboard.scheduler
E File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
E rf"/{prefix.strip("/")}/?",
E ^
E SyntaxError: f-string: unmatched '('
distributed/scheduler.py:3783: SyntaxError
Check warning on line 0 in distributed.comm.tests.test_ws
github-actions / Unit Test Results
2 out of 3 runs failed: test_http_and_comm_server[True-ws://-None-8786] (distributed.comm.tests.test_ws)
artifacts/ubuntu-latest-mindeps-default-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-notci1/pytest.xml [took 0s]
Raw output
File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
rf"/{prefix.strip("/")}/?",
^
SyntaxError: f-string: unmatched '('
dashboard = True, protocol = 'ws://', security = None, port = 8786
@pytest.mark.parametrize(
"dashboard,protocol,security,port",
[
(True, "ws://", None, 8787),
(True, "wss://", True, 8787),
(False, "ws://", None, 8787),
(False, "wss://", True, 8787),
(True, "ws://", None, 8786),
(True, "wss://", True, 8786),
(False, "ws://", None, 8786),
(False, "wss://", True, 8786),
],
)
@gen_test()
async def test_http_and_comm_server(dashboard, protocol, security, port):
if security:
xfail_ssl_issue5601()
pytest.importorskip("cryptography")
security = Security.temporary()
> async with Scheduler(
protocol=protocol, dashboard=dashboard, port=port, security=security
) as s:
distributed/comm/tests/test_ws.py:156:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Scheduler' object has no attribute '_address'") raised in repr()] Scheduler object at 0x7f9756352560>
loop = None, services = None, service_kwargs = None, allowed_failures = 3
extensions = None, validate = True, scheduler_file = None, security = None
worker_ttl = None, idle_timeout = None, interface = None, host = None
port = 8786, protocol = 'ws://', dashboard_address = None, dashboard = True
http_prefix = '/', preload = [], preload_argv = [], plugins = ()
contact_address = None, transition_counter_max = False, jupyter = False
kwargs = {}
http_server_modules = ['distributed.http.scheduler.prometheus', 'distributed.http.scheduler.info', 'distributed.http.scheduler.json', 'distributed.http.health', 'distributed.http.proxy', 'distributed.http.statics']
show_dashboard = True
def __init__(
self,
loop=None,
services=None,
service_kwargs=None,
allowed_failures=None,
extensions=None,
validate=None,
scheduler_file=None,
security=None,
worker_ttl=None,
idle_timeout=None,
interface=None,
host=None,
port=0,
protocol=None,
dashboard_address=None,
dashboard=None,
http_prefix="/",
preload=None,
preload_argv=(),
plugins=(),
contact_address=None,
transition_counter_max=False,
jupyter=False,
**kwargs,
):
if dask.config.get("distributed.scheduler.pickle", default=True) is False:
raise RuntimeError(
"Pickling can no longer be disabled with the `distributed.scheduler.pickle` option. Please remove this configuration to start the scheduler."
)
if loop is not None:
warnings.warn(
"the loop kwarg to Scheduler is deprecated",
DeprecationWarning,
stacklevel=2,
)
self.loop = self.io_loop = IOLoop.current()
self._setup_logging(logger)
# Attributes
if contact_address is None:
contact_address = dask.config.get("distributed.scheduler.contact-address")
self.contact_address = contact_address
if allowed_failures is None:
allowed_failures = dask.config.get("distributed.scheduler.allowed-failures")
self.allowed_failures = allowed_failures
if validate is None:
validate = dask.config.get("distributed.scheduler.validate")
self.proc = psutil.Process()
self.service_specs = services or {}
self.service_kwargs = service_kwargs or {}
self.services = {}
self.scheduler_file = scheduler_file
self.worker_ttl = parse_timedelta(
worker_ttl or dask.config.get("distributed.scheduler.worker-ttl")
)
self.idle_timeout = parse_timedelta(
idle_timeout or dask.config.get("distributed.scheduler.idle-timeout")
)
self.idle_since = time()
self.no_workers_timeout = parse_timedelta(
dask.config.get("distributed.scheduler.no-workers-timeout")
)
self._no_workers_since = None
self.time_started = self.idle_since # compatibility for dask-gateway
self._replica_lock = RLock()
self.bandwidth_workers = defaultdict(float)
self.bandwidth_types = defaultdict(float)
# Don't cast int metrics to float
self.cumulative_worker_metrics = defaultdict(int)
if not preload:
preload = dask.config.get("distributed.scheduler.preload")
if not preload_argv:
preload_argv = dask.config.get("distributed.scheduler.preload-argv")
self.preloads = preloading.process_preloads(self, preload, preload_argv)
if isinstance(security, dict):
security = Security(**security)
self.security = security or Security()
assert isinstance(self.security, Security)
self.connection_args = self.security.get_connection_args("scheduler")
self.connection_args["handshake_overrides"] = { # common denominator
"pickle-protocol": 4
}
self._start_address = addresses_from_user_args(
host=host,
port=port,
interface=interface,
protocol=protocol,
security=security,
default_port=self.default_port,
)
http_server_modules = dask.config.get("distributed.scheduler.http.routes")
show_dashboard = dashboard or (dashboard is None and dashboard_address)
# install vanilla route if show_dashboard but bokeh is not installed
if show_dashboard:
try:
> import distributed.dashboard.scheduler
E File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
E rf"/{prefix.strip("/")}/?",
E ^
E SyntaxError: f-string: unmatched '('
distributed/scheduler.py:3783: SyntaxError
Check warning on line 0 in distributed.comm.tests.test_ws
github-actions / Unit Test Results
2 out of 3 runs failed: test_http_and_comm_server[True-wss://-True-8786] (distributed.comm.tests.test_ws)
artifacts/ubuntu-latest-mindeps-default-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-notci1/pytest.xml [took 0s]
Raw output
File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
rf"/{prefix.strip("/")}/?",
^
SyntaxError: f-string: unmatched '('
dashboard = True, protocol = 'wss://'
security = Security(require_encryption=True, tls_ca_file=Temporary (In-memory), tls_client_cert=Temporary (In-memory), tls_client..., tls_scheduler_key=Temporary (In-memory), tls_worker_cert=Temporary (In-memory), tls_worker_key=Temporary (In-memory))
port = 8786
@pytest.mark.parametrize(
"dashboard,protocol,security,port",
[
(True, "ws://", None, 8787),
(True, "wss://", True, 8787),
(False, "ws://", None, 8787),
(False, "wss://", True, 8787),
(True, "ws://", None, 8786),
(True, "wss://", True, 8786),
(False, "ws://", None, 8786),
(False, "wss://", True, 8786),
],
)
@gen_test()
async def test_http_and_comm_server(dashboard, protocol, security, port):
if security:
xfail_ssl_issue5601()
pytest.importorskip("cryptography")
security = Security.temporary()
> async with Scheduler(
protocol=protocol, dashboard=dashboard, port=port, security=security
) as s:
distributed/comm/tests/test_ws.py:156:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Scheduler' object has no attribute '_address'") raised in repr()] Scheduler object at 0x7f97563517a0>
loop = None, services = None, service_kwargs = None, allowed_failures = 3
extensions = None, validate = True, scheduler_file = None
security = Security(require_encryption=True, tls_ca_file=Temporary (In-memory), tls_client_cert=Temporary (In-memory), tls_client..., tls_scheduler_key=Temporary (In-memory), tls_worker_cert=Temporary (In-memory), tls_worker_key=Temporary (In-memory))
worker_ttl = None, idle_timeout = None, interface = None, host = None
port = 8786, protocol = 'wss://', dashboard_address = None, dashboard = True
http_prefix = '/', preload = [], preload_argv = [], plugins = ()
contact_address = None, transition_counter_max = False, jupyter = False
kwargs = {}
http_server_modules = ['distributed.http.scheduler.prometheus', 'distributed.http.scheduler.info', 'distributed.http.scheduler.json', 'distributed.http.health', 'distributed.http.proxy', 'distributed.http.statics']
show_dashboard = True
def __init__(
self,
loop=None,
services=None,
service_kwargs=None,
allowed_failures=None,
extensions=None,
validate=None,
scheduler_file=None,
security=None,
worker_ttl=None,
idle_timeout=None,
interface=None,
host=None,
port=0,
protocol=None,
dashboard_address=None,
dashboard=None,
http_prefix="/",
preload=None,
preload_argv=(),
plugins=(),
contact_address=None,
transition_counter_max=False,
jupyter=False,
**kwargs,
):
if dask.config.get("distributed.scheduler.pickle", default=True) is False:
raise RuntimeError(
"Pickling can no longer be disabled with the `distributed.scheduler.pickle` option. Please remove this configuration to start the scheduler."
)
if loop is not None:
warnings.warn(
"the loop kwarg to Scheduler is deprecated",
DeprecationWarning,
stacklevel=2,
)
self.loop = self.io_loop = IOLoop.current()
self._setup_logging(logger)
# Attributes
if contact_address is None:
contact_address = dask.config.get("distributed.scheduler.contact-address")
self.contact_address = contact_address
if allowed_failures is None:
allowed_failures = dask.config.get("distributed.scheduler.allowed-failures")
self.allowed_failures = allowed_failures
if validate is None:
validate = dask.config.get("distributed.scheduler.validate")
self.proc = psutil.Process()
self.service_specs = services or {}
self.service_kwargs = service_kwargs or {}
self.services = {}
self.scheduler_file = scheduler_file
self.worker_ttl = parse_timedelta(
worker_ttl or dask.config.get("distributed.scheduler.worker-ttl")
)
self.idle_timeout = parse_timedelta(
idle_timeout or dask.config.get("distributed.scheduler.idle-timeout")
)
self.idle_since = time()
self.no_workers_timeout = parse_timedelta(
dask.config.get("distributed.scheduler.no-workers-timeout")
)
self._no_workers_since = None
self.time_started = self.idle_since # compatibility for dask-gateway
self._replica_lock = RLock()
self.bandwidth_workers = defaultdict(float)
self.bandwidth_types = defaultdict(float)
# Don't cast int metrics to float
self.cumulative_worker_metrics = defaultdict(int)
if not preload:
preload = dask.config.get("distributed.scheduler.preload")
if not preload_argv:
preload_argv = dask.config.get("distributed.scheduler.preload-argv")
self.preloads = preloading.process_preloads(self, preload, preload_argv)
if isinstance(security, dict):
security = Security(**security)
self.security = security or Security()
assert isinstance(self.security, Security)
self.connection_args = self.security.get_connection_args("scheduler")
self.connection_args["handshake_overrides"] = { # common denominator
"pickle-protocol": 4
}
self._start_address = addresses_from_user_args(
host=host,
port=port,
interface=interface,
protocol=protocol,
security=security,
default_port=self.default_port,
)
http_server_modules = dask.config.get("distributed.scheduler.http.routes")
show_dashboard = dashboard or (dashboard is None and dashboard_address)
# install vanilla route if show_dashboard but bokeh is not installed
if show_dashboard:
try:
> import distributed.dashboard.scheduler
E File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
E rf"/{prefix.strip("/")}/?",
E ^
E SyntaxError: f-string: unmatched '('
distributed/scheduler.py:3783: SyntaxError
Check warning on line 0 in distributed.comm.tests.test_ws
github-actions / Unit Test Results
2 out of 3 runs failed: test_connection_made_with_extra_conn_args[ws://-True] (distributed.comm.tests.test_ws)
artifacts/ubuntu-latest-mindeps-default-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-notci1/pytest.xml [took 0s]
Raw output
File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
rf"/{prefix.strip("/")}/?",
^
SyntaxError: f-string: unmatched '('
protocol = 'ws://', sni = True
@pytest.mark.parametrize(
"protocol,sni",
[("ws://", True), ("ws://", False), ("wss://", True), ("wss://", False)],
)
@gen_test(clean_kwargs={"instances": False})
async def test_connection_made_with_extra_conn_args(protocol, sni):
if protocol == "ws://":
security = Security(
extra_conn_args={"headers": {"Authorization": "Token abcd"}}
)
else:
xfail_ssl_issue5601()
pytest.importorskip("cryptography")
security = Security.temporary(
extra_conn_args={"headers": {"Authorization": "Token abcd"}}
)
> async with Scheduler(
protocol=protocol, security=security, dashboard_address=":0"
) as s:
distributed/comm/tests/test_ws.py:185:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Scheduler' object has no attribute '_address'") raised in repr()] Scheduler object at 0x7f9756353060>
loop = None, services = None, service_kwargs = None, allowed_failures = 3
extensions = None, validate = True, scheduler_file = None
security = Security(require_encryption=False, tls_min_version=771)
worker_ttl = None, idle_timeout = None, interface = None, host = None, port = 0
protocol = 'ws://', dashboard_address = ':0', dashboard = None
http_prefix = '/', preload = [], preload_argv = [], plugins = ()
contact_address = None, transition_counter_max = False, jupyter = False
kwargs = {}
http_server_modules = ['distributed.http.scheduler.prometheus', 'distributed.http.scheduler.info', 'distributed.http.scheduler.json', 'distributed.http.health', 'distributed.http.proxy', 'distributed.http.statics']
show_dashboard = ':0'
def __init__(
self,
loop=None,
services=None,
service_kwargs=None,
allowed_failures=None,
extensions=None,
validate=None,
scheduler_file=None,
security=None,
worker_ttl=None,
idle_timeout=None,
interface=None,
host=None,
port=0,
protocol=None,
dashboard_address=None,
dashboard=None,
http_prefix="/",
preload=None,
preload_argv=(),
plugins=(),
contact_address=None,
transition_counter_max=False,
jupyter=False,
**kwargs,
):
if dask.config.get("distributed.scheduler.pickle", default=True) is False:
raise RuntimeError(
"Pickling can no longer be disabled with the `distributed.scheduler.pickle` option. Please remove this configuration to start the scheduler."
)
if loop is not None:
warnings.warn(
"the loop kwarg to Scheduler is deprecated",
DeprecationWarning,
stacklevel=2,
)
self.loop = self.io_loop = IOLoop.current()
self._setup_logging(logger)
# Attributes
if contact_address is None:
contact_address = dask.config.get("distributed.scheduler.contact-address")
self.contact_address = contact_address
if allowed_failures is None:
allowed_failures = dask.config.get("distributed.scheduler.allowed-failures")
self.allowed_failures = allowed_failures
if validate is None:
validate = dask.config.get("distributed.scheduler.validate")
self.proc = psutil.Process()
self.service_specs = services or {}
self.service_kwargs = service_kwargs or {}
self.services = {}
self.scheduler_file = scheduler_file
self.worker_ttl = parse_timedelta(
worker_ttl or dask.config.get("distributed.scheduler.worker-ttl")
)
self.idle_timeout = parse_timedelta(
idle_timeout or dask.config.get("distributed.scheduler.idle-timeout")
)
self.idle_since = time()
self.no_workers_timeout = parse_timedelta(
dask.config.get("distributed.scheduler.no-workers-timeout")
)
self._no_workers_since = None
self.time_started = self.idle_since # compatibility for dask-gateway
self._replica_lock = RLock()
self.bandwidth_workers = defaultdict(float)
self.bandwidth_types = defaultdict(float)
# Don't cast int metrics to float
self.cumulative_worker_metrics = defaultdict(int)
if not preload:
preload = dask.config.get("distributed.scheduler.preload")
if not preload_argv:
preload_argv = dask.config.get("distributed.scheduler.preload-argv")
self.preloads = preloading.process_preloads(self, preload, preload_argv)
if isinstance(security, dict):
security = Security(**security)
self.security = security or Security()
assert isinstance(self.security, Security)
self.connection_args = self.security.get_connection_args("scheduler")
self.connection_args["handshake_overrides"] = { # common denominator
"pickle-protocol": 4
}
self._start_address = addresses_from_user_args(
host=host,
port=port,
interface=interface,
protocol=protocol,
security=security,
default_port=self.default_port,
)
http_server_modules = dask.config.get("distributed.scheduler.http.routes")
show_dashboard = dashboard or (dashboard is None and dashboard_address)
# install vanilla route if show_dashboard but bokeh is not installed
if show_dashboard:
try:
> import distributed.dashboard.scheduler
E File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
E rf"/{prefix.strip("/")}/?",
E ^
E SyntaxError: f-string: unmatched '('
distributed/scheduler.py:3783: SyntaxError
Check warning on line 0 in distributed.comm.tests.test_ws
github-actions / Unit Test Results
2 out of 3 runs failed: test_connection_made_with_extra_conn_args[ws://-False] (distributed.comm.tests.test_ws)
artifacts/ubuntu-latest-mindeps-default-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-notci1/pytest.xml [took 0s]
Raw output
File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
rf"/{prefix.strip("/")}/?",
^
SyntaxError: f-string: unmatched '('
protocol = 'ws://', sni = False
@pytest.mark.parametrize(
"protocol,sni",
[("ws://", True), ("ws://", False), ("wss://", True), ("wss://", False)],
)
@gen_test(clean_kwargs={"instances": False})
async def test_connection_made_with_extra_conn_args(protocol, sni):
if protocol == "ws://":
security = Security(
extra_conn_args={"headers": {"Authorization": "Token abcd"}}
)
else:
xfail_ssl_issue5601()
pytest.importorskip("cryptography")
security = Security.temporary(
extra_conn_args={"headers": {"Authorization": "Token abcd"}}
)
> async with Scheduler(
protocol=protocol, security=security, dashboard_address=":0"
) as s:
distributed/comm/tests/test_ws.py:185:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Scheduler' object has no attribute '_address'") raised in repr()] Scheduler object at 0x7f9756352560>
loop = None, services = None, service_kwargs = None, allowed_failures = 3
extensions = None, validate = True, scheduler_file = None
security = Security(require_encryption=False, tls_min_version=771)
worker_ttl = None, idle_timeout = None, interface = None, host = None, port = 0
protocol = 'ws://', dashboard_address = ':0', dashboard = None
http_prefix = '/', preload = [], preload_argv = [], plugins = ()
contact_address = None, transition_counter_max = False, jupyter = False
kwargs = {}
http_server_modules = ['distributed.http.scheduler.prometheus', 'distributed.http.scheduler.info', 'distributed.http.scheduler.json', 'distributed.http.health', 'distributed.http.proxy', 'distributed.http.statics']
show_dashboard = ':0'
def __init__(
self,
loop=None,
services=None,
service_kwargs=None,
allowed_failures=None,
extensions=None,
validate=None,
scheduler_file=None,
security=None,
worker_ttl=None,
idle_timeout=None,
interface=None,
host=None,
port=0,
protocol=None,
dashboard_address=None,
dashboard=None,
http_prefix="/",
preload=None,
preload_argv=(),
plugins=(),
contact_address=None,
transition_counter_max=False,
jupyter=False,
**kwargs,
):
if dask.config.get("distributed.scheduler.pickle", default=True) is False:
raise RuntimeError(
"Pickling can no longer be disabled with the `distributed.scheduler.pickle` option. Please remove this configuration to start the scheduler."
)
if loop is not None:
warnings.warn(
"the loop kwarg to Scheduler is deprecated",
DeprecationWarning,
stacklevel=2,
)
self.loop = self.io_loop = IOLoop.current()
self._setup_logging(logger)
# Attributes
if contact_address is None:
contact_address = dask.config.get("distributed.scheduler.contact-address")
self.contact_address = contact_address
if allowed_failures is None:
allowed_failures = dask.config.get("distributed.scheduler.allowed-failures")
self.allowed_failures = allowed_failures
if validate is None:
validate = dask.config.get("distributed.scheduler.validate")
self.proc = psutil.Process()
self.service_specs = services or {}
self.service_kwargs = service_kwargs or {}
self.services = {}
self.scheduler_file = scheduler_file
self.worker_ttl = parse_timedelta(
worker_ttl or dask.config.get("distributed.scheduler.worker-ttl")
)
self.idle_timeout = parse_timedelta(
idle_timeout or dask.config.get("distributed.scheduler.idle-timeout")
)
self.idle_since = time()
self.no_workers_timeout = parse_timedelta(
dask.config.get("distributed.scheduler.no-workers-timeout")
)
self._no_workers_since = None
self.time_started = self.idle_since # compatibility for dask-gateway
self._replica_lock = RLock()
self.bandwidth_workers = defaultdict(float)
self.bandwidth_types = defaultdict(float)
# Don't cast int metrics to float
self.cumulative_worker_metrics = defaultdict(int)
if not preload:
preload = dask.config.get("distributed.scheduler.preload")
if not preload_argv:
preload_argv = dask.config.get("distributed.scheduler.preload-argv")
self.preloads = preloading.process_preloads(self, preload, preload_argv)
if isinstance(security, dict):
security = Security(**security)
self.security = security or Security()
assert isinstance(self.security, Security)
self.connection_args = self.security.get_connection_args("scheduler")
self.connection_args["handshake_overrides"] = { # common denominator
"pickle-protocol": 4
}
self._start_address = addresses_from_user_args(
host=host,
port=port,
interface=interface,
protocol=protocol,
security=security,
default_port=self.default_port,
)
http_server_modules = dask.config.get("distributed.scheduler.http.routes")
show_dashboard = dashboard or (dashboard is None and dashboard_address)
# install vanilla route if show_dashboard but bokeh is not installed
if show_dashboard:
try:
> import distributed.dashboard.scheduler
E File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
E rf"/{prefix.strip("/")}/?",
E ^
E SyntaxError: f-string: unmatched '('
distributed/scheduler.py:3783: SyntaxError
Check warning on line 0 in distributed.comm.tests.test_ws
github-actions / Unit Test Results
2 out of 3 runs failed: test_connection_made_with_extra_conn_args[wss://-True] (distributed.comm.tests.test_ws)
artifacts/ubuntu-latest-mindeps-default-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-notci1/pytest.xml [took 0s]
Raw output
File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
rf"/{prefix.strip("/")}/?",
^
SyntaxError: f-string: unmatched '('
protocol = 'wss://', sni = True
@pytest.mark.parametrize(
"protocol,sni",
[("ws://", True), ("ws://", False), ("wss://", True), ("wss://", False)],
)
@gen_test(clean_kwargs={"instances": False})
async def test_connection_made_with_extra_conn_args(protocol, sni):
if protocol == "ws://":
security = Security(
extra_conn_args={"headers": {"Authorization": "Token abcd"}}
)
else:
xfail_ssl_issue5601()
pytest.importorskip("cryptography")
security = Security.temporary(
extra_conn_args={"headers": {"Authorization": "Token abcd"}}
)
> async with Scheduler(
protocol=protocol, security=security, dashboard_address=":0"
) as s:
distributed/comm/tests/test_ws.py:185:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Scheduler' object has no attribute '_address'") raised in repr()] Scheduler object at 0x7f9756352ae0>
loop = None, services = None, service_kwargs = None, allowed_failures = 3
extensions = None, validate = True, scheduler_file = None
security = Security(require_encryption=True, tls_ca_file=Temporary (In-memory), tls_client_cert=Temporary (In-memory), tls_client..., tls_scheduler_key=Temporary (In-memory), tls_worker_cert=Temporary (In-memory), tls_worker_key=Temporary (In-memory))
worker_ttl = None, idle_timeout = None, interface = None, host = None, port = 0
protocol = 'wss://', dashboard_address = ':0', dashboard = None
http_prefix = '/', preload = [], preload_argv = [], plugins = ()
contact_address = None, transition_counter_max = False, jupyter = False
kwargs = {}
http_server_modules = ['distributed.http.scheduler.prometheus', 'distributed.http.scheduler.info', 'distributed.http.scheduler.json', 'distributed.http.health', 'distributed.http.proxy', 'distributed.http.statics']
show_dashboard = ':0'
def __init__(
self,
loop=None,
services=None,
service_kwargs=None,
allowed_failures=None,
extensions=None,
validate=None,
scheduler_file=None,
security=None,
worker_ttl=None,
idle_timeout=None,
interface=None,
host=None,
port=0,
protocol=None,
dashboard_address=None,
dashboard=None,
http_prefix="/",
preload=None,
preload_argv=(),
plugins=(),
contact_address=None,
transition_counter_max=False,
jupyter=False,
**kwargs,
):
if dask.config.get("distributed.scheduler.pickle", default=True) is False:
raise RuntimeError(
"Pickling can no longer be disabled with the `distributed.scheduler.pickle` option. Please remove this configuration to start the scheduler."
)
if loop is not None:
warnings.warn(
"the loop kwarg to Scheduler is deprecated",
DeprecationWarning,
stacklevel=2,
)
self.loop = self.io_loop = IOLoop.current()
self._setup_logging(logger)
# Attributes
if contact_address is None:
contact_address = dask.config.get("distributed.scheduler.contact-address")
self.contact_address = contact_address
if allowed_failures is None:
allowed_failures = dask.config.get("distributed.scheduler.allowed-failures")
self.allowed_failures = allowed_failures
if validate is None:
validate = dask.config.get("distributed.scheduler.validate")
self.proc = psutil.Process()
self.service_specs = services or {}
self.service_kwargs = service_kwargs or {}
self.services = {}
self.scheduler_file = scheduler_file
self.worker_ttl = parse_timedelta(
worker_ttl or dask.config.get("distributed.scheduler.worker-ttl")
)
self.idle_timeout = parse_timedelta(
idle_timeout or dask.config.get("distributed.scheduler.idle-timeout")
)
self.idle_since = time()
self.no_workers_timeout = parse_timedelta(
dask.config.get("distributed.scheduler.no-workers-timeout")
)
self._no_workers_since = None
self.time_started = self.idle_since # compatibility for dask-gateway
self._replica_lock = RLock()
self.bandwidth_workers = defaultdict(float)
self.bandwidth_types = defaultdict(float)
# Don't cast int metrics to float
self.cumulative_worker_metrics = defaultdict(int)
if not preload:
preload = dask.config.get("distributed.scheduler.preload")
if not preload_argv:
preload_argv = dask.config.get("distributed.scheduler.preload-argv")
self.preloads = preloading.process_preloads(self, preload, preload_argv)
if isinstance(security, dict):
security = Security(**security)
self.security = security or Security()
assert isinstance(self.security, Security)
self.connection_args = self.security.get_connection_args("scheduler")
self.connection_args["handshake_overrides"] = { # common denominator
"pickle-protocol": 4
}
self._start_address = addresses_from_user_args(
host=host,
port=port,
interface=interface,
protocol=protocol,
security=security,
default_port=self.default_port,
)
http_server_modules = dask.config.get("distributed.scheduler.http.routes")
show_dashboard = dashboard or (dashboard is None and dashboard_address)
# install vanilla route if show_dashboard but bokeh is not installed
if show_dashboard:
try:
> import distributed.dashboard.scheduler
E File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
E rf"/{prefix.strip("/")}/?",
E ^
E SyntaxError: f-string: unmatched '('
distributed/scheduler.py:3783: SyntaxError
Check warning on line 0 in distributed.comm.tests.test_ws
github-actions / Unit Test Results
2 out of 3 runs failed: test_connection_made_with_extra_conn_args[wss://-False] (distributed.comm.tests.test_ws)
artifacts/ubuntu-latest-mindeps-default-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-notci1/pytest.xml [took 0s]
Raw output
File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
rf"/{prefix.strip("/")}/?",
^
SyntaxError: f-string: unmatched '('
protocol = 'wss://', sni = False
@pytest.mark.parametrize(
"protocol,sni",
[("ws://", True), ("ws://", False), ("wss://", True), ("wss://", False)],
)
@gen_test(clean_kwargs={"instances": False})
async def test_connection_made_with_extra_conn_args(protocol, sni):
if protocol == "ws://":
security = Security(
extra_conn_args={"headers": {"Authorization": "Token abcd"}}
)
else:
xfail_ssl_issue5601()
pytest.importorskip("cryptography")
security = Security.temporary(
extra_conn_args={"headers": {"Authorization": "Token abcd"}}
)
> async with Scheduler(
protocol=protocol, security=security, dashboard_address=":0"
) as s:
distributed/comm/tests/test_ws.py:185:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Scheduler' object has no attribute '_address'") raised in repr()] Scheduler object at 0x7f97563526c0>
loop = None, services = None, service_kwargs = None, allowed_failures = 3
extensions = None, validate = True, scheduler_file = None
security = Security(require_encryption=True, tls_ca_file=Temporary (In-memory), tls_client_cert=Temporary (In-memory), tls_client..., tls_scheduler_key=Temporary (In-memory), tls_worker_cert=Temporary (In-memory), tls_worker_key=Temporary (In-memory))
worker_ttl = None, idle_timeout = None, interface = None, host = None, port = 0
protocol = 'wss://', dashboard_address = ':0', dashboard = None
http_prefix = '/', preload = [], preload_argv = [], plugins = ()
contact_address = None, transition_counter_max = False, jupyter = False
kwargs = {}
http_server_modules = ['distributed.http.scheduler.prometheus', 'distributed.http.scheduler.info', 'distributed.http.scheduler.json', 'distributed.http.health', 'distributed.http.proxy', 'distributed.http.statics']
show_dashboard = ':0'
def __init__(
self,
loop=None,
services=None,
service_kwargs=None,
allowed_failures=None,
extensions=None,
validate=None,
scheduler_file=None,
security=None,
worker_ttl=None,
idle_timeout=None,
interface=None,
host=None,
port=0,
protocol=None,
dashboard_address=None,
dashboard=None,
http_prefix="/",
preload=None,
preload_argv=(),
plugins=(),
contact_address=None,
transition_counter_max=False,
jupyter=False,
**kwargs,
):
if dask.config.get("distributed.scheduler.pickle", default=True) is False:
raise RuntimeError(
"Pickling can no longer be disabled with the `distributed.scheduler.pickle` option. Please remove this configuration to start the scheduler."
)
if loop is not None:
warnings.warn(
"the loop kwarg to Scheduler is deprecated",
DeprecationWarning,
stacklevel=2,
)
self.loop = self.io_loop = IOLoop.current()
self._setup_logging(logger)
# Attributes
if contact_address is None:
contact_address = dask.config.get("distributed.scheduler.contact-address")
self.contact_address = contact_address
if allowed_failures is None:
allowed_failures = dask.config.get("distributed.scheduler.allowed-failures")
self.allowed_failures = allowed_failures
if validate is None:
validate = dask.config.get("distributed.scheduler.validate")
self.proc = psutil.Process()
self.service_specs = services or {}
self.service_kwargs = service_kwargs or {}
self.services = {}
self.scheduler_file = scheduler_file
self.worker_ttl = parse_timedelta(
worker_ttl or dask.config.get("distributed.scheduler.worker-ttl")
)
self.idle_timeout = parse_timedelta(
idle_timeout or dask.config.get("distributed.scheduler.idle-timeout")
)
self.idle_since = time()
self.no_workers_timeout = parse_timedelta(
dask.config.get("distributed.scheduler.no-workers-timeout")
)
self._no_workers_since = None
self.time_started = self.idle_since # compatibility for dask-gateway
self._replica_lock = RLock()
self.bandwidth_workers = defaultdict(float)
self.bandwidth_types = defaultdict(float)
# Don't cast int metrics to float
self.cumulative_worker_metrics = defaultdict(int)
if not preload:
preload = dask.config.get("distributed.scheduler.preload")
if not preload_argv:
preload_argv = dask.config.get("distributed.scheduler.preload-argv")
self.preloads = preloading.process_preloads(self, preload, preload_argv)
if isinstance(security, dict):
security = Security(**security)
self.security = security or Security()
assert isinstance(self.security, Security)
self.connection_args = self.security.get_connection_args("scheduler")
self.connection_args["handshake_overrides"] = { # common denominator
"pickle-protocol": 4
}
self._start_address = addresses_from_user_args(
host=host,
port=port,
interface=interface,
protocol=protocol,
security=security,
default_port=self.default_port,
)
http_server_modules = dask.config.get("distributed.scheduler.http.routes")
show_dashboard = dashboard or (dashboard is None and dashboard_address)
# install vanilla route if show_dashboard but bokeh is not installed
if show_dashboard:
try:
> import distributed.dashboard.scheduler
E File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
E rf"/{prefix.strip("/")}/?",
E ^
E SyntaxError: f-string: unmatched '('
distributed/scheduler.py:3783: SyntaxError
Check warning on line 0 in distributed.comm.tests.test_ws
github-actions / Unit Test Results
2 out of 3 runs failed: test_connection_made_with_sni (distributed.comm.tests.test_ws)
artifacts/ubuntu-latest-mindeps-default-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-notci1/pytest.xml [took 0s]
Raw output
File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
rf"/{prefix.strip("/")}/?",
^
SyntaxError: f-string: unmatched '('
@gen_test(clean_kwargs={"instances": False})
async def test_connection_made_with_sni():
xfail_ssl_issue5601()
pytest.importorskip("cryptography")
security = Security.temporary()
> async with Scheduler(
protocol="wss://", security=security, dashboard_address=":0"
) as s:
distributed/comm/tests/test_ws.py:202:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Scheduler' object has no attribute '_address'") raised in repr()] Scheduler object at 0x7f9756352ae0>
loop = None, services = None, service_kwargs = None, allowed_failures = 3
extensions = None, validate = True, scheduler_file = None
security = Security(require_encryption=True, tls_ca_file=Temporary (In-memory), tls_client_cert=Temporary (In-memory), tls_client..., tls_scheduler_key=Temporary (In-memory), tls_worker_cert=Temporary (In-memory), tls_worker_key=Temporary (In-memory))
worker_ttl = None, idle_timeout = None, interface = None, host = None, port = 0
protocol = 'wss://', dashboard_address = ':0', dashboard = None
http_prefix = '/', preload = [], preload_argv = [], plugins = ()
contact_address = None, transition_counter_max = False, jupyter = False
kwargs = {}
http_server_modules = ['distributed.http.scheduler.prometheus', 'distributed.http.scheduler.info', 'distributed.http.scheduler.json', 'distributed.http.health', 'distributed.http.proxy', 'distributed.http.statics']
show_dashboard = ':0'
def __init__(
self,
loop=None,
services=None,
service_kwargs=None,
allowed_failures=None,
extensions=None,
validate=None,
scheduler_file=None,
security=None,
worker_ttl=None,
idle_timeout=None,
interface=None,
host=None,
port=0,
protocol=None,
dashboard_address=None,
dashboard=None,
http_prefix="/",
preload=None,
preload_argv=(),
plugins=(),
contact_address=None,
transition_counter_max=False,
jupyter=False,
**kwargs,
):
if dask.config.get("distributed.scheduler.pickle", default=True) is False:
raise RuntimeError(
"Pickling can no longer be disabled with the `distributed.scheduler.pickle` option. Please remove this configuration to start the scheduler."
)
if loop is not None:
warnings.warn(
"the loop kwarg to Scheduler is deprecated",
DeprecationWarning,
stacklevel=2,
)
self.loop = self.io_loop = IOLoop.current()
self._setup_logging(logger)
# Attributes
if contact_address is None:
contact_address = dask.config.get("distributed.scheduler.contact-address")
self.contact_address = contact_address
if allowed_failures is None:
allowed_failures = dask.config.get("distributed.scheduler.allowed-failures")
self.allowed_failures = allowed_failures
if validate is None:
validate = dask.config.get("distributed.scheduler.validate")
self.proc = psutil.Process()
self.service_specs = services or {}
self.service_kwargs = service_kwargs or {}
self.services = {}
self.scheduler_file = scheduler_file
self.worker_ttl = parse_timedelta(
worker_ttl or dask.config.get("distributed.scheduler.worker-ttl")
)
self.idle_timeout = parse_timedelta(
idle_timeout or dask.config.get("distributed.scheduler.idle-timeout")
)
self.idle_since = time()
self.no_workers_timeout = parse_timedelta(
dask.config.get("distributed.scheduler.no-workers-timeout")
)
self._no_workers_since = None
self.time_started = self.idle_since # compatibility for dask-gateway
self._replica_lock = RLock()
self.bandwidth_workers = defaultdict(float)
self.bandwidth_types = defaultdict(float)
# Don't cast int metrics to float
self.cumulative_worker_metrics = defaultdict(int)
if not preload:
preload = dask.config.get("distributed.scheduler.preload")
if not preload_argv:
preload_argv = dask.config.get("distributed.scheduler.preload-argv")
self.preloads = preloading.process_preloads(self, preload, preload_argv)
if isinstance(security, dict):
security = Security(**security)
self.security = security or Security()
assert isinstance(self.security, Security)
self.connection_args = self.security.get_connection_args("scheduler")
self.connection_args["handshake_overrides"] = { # common denominator
"pickle-protocol": 4
}
self._start_address = addresses_from_user_args(
host=host,
port=port,
interface=interface,
protocol=protocol,
security=security,
default_port=self.default_port,
)
http_server_modules = dask.config.get("distributed.scheduler.http.routes")
show_dashboard = dashboard or (dashboard is None and dashboard_address)
# install vanilla route if show_dashboard but bokeh is not installed
if show_dashboard:
try:
> import distributed.dashboard.scheduler
E File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
E rf"/{prefix.strip("/")}/?",
E ^
E SyntaxError: f-string: unmatched '('
distributed/scheduler.py:3783: SyntaxError
Check warning on line 0 in distributed.comm.tests.test_ws
github-actions / Unit Test Results
2 out of 3 runs failed: test_quiet_close (distributed.comm.tests.test_ws)
artifacts/ubuntu-latest-mindeps-default-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: Cluster failed to start: f-string: unmatched '(' (scheduler.py, line 186)
self = LocalCluster(464f23f1, '<Not Connected>', workers=0, threads=0, memory=0 B)
async def _start(self):
while self.status == Status.starting:
await asyncio.sleep(0.01)
if self.status == Status.running:
return
if self.status == Status.closed:
raise ValueError("Cluster is closed")
self._lock = asyncio.Lock()
self.status = Status.starting
if self.scheduler_spec is None:
try:
import distributed.dashboard # noqa: F401
except ImportError:
pass
else:
options = {"dashboard": True}
self.scheduler_spec = {"cls": Scheduler, "options": options}
try:
# Check if scheduler has already been created by a subclass
if self.scheduler is None:
cls = self.scheduler_spec["cls"]
if isinstance(cls, str):
cls = import_term(cls)
> self.scheduler = cls(**self.scheduler_spec.get("options", {}))
distributed/deploy/spec.py:324:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Scheduler' object has no attribute '_address'") raised in repr()] Scheduler object at 0x7f97563531c0>
loop = None, services = {}, service_kwargs = None, allowed_failures = 3
extensions = None, validate = True, scheduler_file = None
security = Security(require_encryption=False, tls_min_version=771)
worker_ttl = None, idle_timeout = None, interface = None, host = '127.0.0.1'
port = 0, protocol = 'ws://', dashboard_address = ':0', dashboard = True
http_prefix = '/', preload = [], preload_argv = [], plugins = ()
contact_address = None, transition_counter_max = False, jupyter = False
kwargs = {'blocked_handlers': None}
http_server_modules = ['distributed.http.scheduler.prometheus', 'distributed.http.scheduler.info', 'distributed.http.scheduler.json', 'distributed.http.health', 'distributed.http.proxy', 'distributed.http.statics']
show_dashboard = True
def __init__(
self,
loop=None,
services=None,
service_kwargs=None,
allowed_failures=None,
extensions=None,
validate=None,
scheduler_file=None,
security=None,
worker_ttl=None,
idle_timeout=None,
interface=None,
host=None,
port=0,
protocol=None,
dashboard_address=None,
dashboard=None,
http_prefix="/",
preload=None,
preload_argv=(),
plugins=(),
contact_address=None,
transition_counter_max=False,
jupyter=False,
**kwargs,
):
if dask.config.get("distributed.scheduler.pickle", default=True) is False:
raise RuntimeError(
"Pickling can no longer be disabled with the `distributed.scheduler.pickle` option. Please remove this configuration to start the scheduler."
)
if loop is not None:
warnings.warn(
"the loop kwarg to Scheduler is deprecated",
DeprecationWarning,
stacklevel=2,
)
self.loop = self.io_loop = IOLoop.current()
self._setup_logging(logger)
# Attributes
if contact_address is None:
contact_address = dask.config.get("distributed.scheduler.contact-address")
self.contact_address = contact_address
if allowed_failures is None:
allowed_failures = dask.config.get("distributed.scheduler.allowed-failures")
self.allowed_failures = allowed_failures
if validate is None:
validate = dask.config.get("distributed.scheduler.validate")
self.proc = psutil.Process()
self.service_specs = services or {}
self.service_kwargs = service_kwargs or {}
self.services = {}
self.scheduler_file = scheduler_file
self.worker_ttl = parse_timedelta(
worker_ttl or dask.config.get("distributed.scheduler.worker-ttl")
)
self.idle_timeout = parse_timedelta(
idle_timeout or dask.config.get("distributed.scheduler.idle-timeout")
)
self.idle_since = time()
self.no_workers_timeout = parse_timedelta(
dask.config.get("distributed.scheduler.no-workers-timeout")
)
self._no_workers_since = None
self.time_started = self.idle_since # compatibility for dask-gateway
self._replica_lock = RLock()
self.bandwidth_workers = defaultdict(float)
self.bandwidth_types = defaultdict(float)
# Don't cast int metrics to float
self.cumulative_worker_metrics = defaultdict(int)
if not preload:
preload = dask.config.get("distributed.scheduler.preload")
if not preload_argv:
preload_argv = dask.config.get("distributed.scheduler.preload-argv")
self.preloads = preloading.process_preloads(self, preload, preload_argv)
if isinstance(security, dict):
security = Security(**security)
self.security = security or Security()
assert isinstance(self.security, Security)
self.connection_args = self.security.get_connection_args("scheduler")
self.connection_args["handshake_overrides"] = { # common denominator
"pickle-protocol": 4
}
self._start_address = addresses_from_user_args(
host=host,
port=port,
interface=interface,
protocol=protocol,
security=security,
default_port=self.default_port,
)
http_server_modules = dask.config.get("distributed.scheduler.http.routes")
show_dashboard = dashboard or (dashboard is None and dashboard_address)
# install vanilla route if show_dashboard but bokeh is not installed
if show_dashboard:
try:
> import distributed.dashboard.scheduler
E File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
E rf"/{prefix.strip("/")}/?",
E ^
E SyntaxError: f-string: unmatched '('
distributed/scheduler.py:3783: SyntaxError
The above exception was the direct cause of the following exception:
@gen_test()
async def test_quiet_close():
with warnings.catch_warnings(record=True) as record:
> async with Client(
protocol="ws", processes=False, asynchronous=True, dashboard_address=":0"
):
distributed/comm/tests/test_ws.py:216:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/client.py:1699: in __aenter__
await self
distributed/client.py:1497: in _start
self.cluster = await LocalCluster(
distributed/deploy/spec.py:418: in _
await self._start()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = LocalCluster(464f23f1, '<Not Connected>', workers=0, threads=0, memory=0 B)
async def _start(self):
while self.status == Status.starting:
await asyncio.sleep(0.01)
if self.status == Status.running:
return
if self.status == Status.closed:
raise ValueError("Cluster is closed")
self._lock = asyncio.Lock()
self.status = Status.starting
if self.scheduler_spec is None:
try:
import distributed.dashboard # noqa: F401
except ImportError:
pass
else:
options = {"dashboard": True}
self.scheduler_spec = {"cls": Scheduler, "options": options}
try:
# Check if scheduler has already been created by a subclass
if self.scheduler is None:
cls = self.scheduler_spec["cls"]
if isinstance(cls, str):
cls = import_term(cls)
self.scheduler = cls(**self.scheduler_spec.get("options", {}))
self.scheduler = await self.scheduler
self.scheduler_comm = rpc(
getattr(self.scheduler, "external_address", None)
or self.scheduler.address,
connection_args=self.security.get_connection_args("client"),
)
await super()._start()
except Exception as e: # pragma: no cover
self.status = Status.failed
await self._close()
> raise RuntimeError(f"Cluster failed to start: {e}") from e
E RuntimeError: Cluster failed to start: f-string: unmatched '(' (scheduler.py, line 186)
distributed/deploy/spec.py:335: RuntimeError
Check warning on line 0 in distributed.comm.tests.test_ws
github-actions / Unit Test Results
1 out of 3 runs failed: test_wss_roundtrip (distributed.comm.tests.test_ws)
artifacts/ubuntu-latest-mindeps-numpy-notci1/pytest.xml [took 0s]
Raw output
File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
rf"/{prefix.strip("/")}/?",
^
SyntaxError: f-string: unmatched '('
@gen_test()
async def test_wss_roundtrip():
np = pytest.importorskip("numpy")
xfail_ssl_issue5601()
pytest.importorskip("cryptography")
security = Security.temporary()
> async with Scheduler(
protocol="wss://", security=security, dashboard_address=":0"
) as s:
distributed/comm/tests/test_ws.py:239:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Scheduler' object has no attribute '_address'") raised in repr()] Scheduler object at 0x7f9756350f60>
loop = None, services = None, service_kwargs = None, allowed_failures = 3
extensions = None, validate = True, scheduler_file = None
security = Security(require_encryption=True, tls_ca_file=Temporary (In-memory), tls_client_cert=Temporary (In-memory), tls_client..., tls_scheduler_key=Temporary (In-memory), tls_worker_cert=Temporary (In-memory), tls_worker_key=Temporary (In-memory))
worker_ttl = None, idle_timeout = None, interface = None, host = None, port = 0
protocol = 'wss://', dashboard_address = ':0', dashboard = None
http_prefix = '/', preload = [], preload_argv = [], plugins = ()
contact_address = None, transition_counter_max = False, jupyter = False
kwargs = {}
http_server_modules = ['distributed.http.scheduler.prometheus', 'distributed.http.scheduler.info', 'distributed.http.scheduler.json', 'distributed.http.health', 'distributed.http.proxy', 'distributed.http.statics']
show_dashboard = ':0'
def __init__(
self,
loop=None,
services=None,
service_kwargs=None,
allowed_failures=None,
extensions=None,
validate=None,
scheduler_file=None,
security=None,
worker_ttl=None,
idle_timeout=None,
interface=None,
host=None,
port=0,
protocol=None,
dashboard_address=None,
dashboard=None,
http_prefix="/",
preload=None,
preload_argv=(),
plugins=(),
contact_address=None,
transition_counter_max=False,
jupyter=False,
**kwargs,
):
if dask.config.get("distributed.scheduler.pickle", default=True) is False:
raise RuntimeError(
"Pickling can no longer be disabled with the `distributed.scheduler.pickle` option. Please remove this configuration to start the scheduler."
)
if loop is not None:
warnings.warn(
"the loop kwarg to Scheduler is deprecated",
DeprecationWarning,
stacklevel=2,
)
self.loop = self.io_loop = IOLoop.current()
self._setup_logging(logger)
# Attributes
if contact_address is None:
contact_address = dask.config.get("distributed.scheduler.contact-address")
self.contact_address = contact_address
if allowed_failures is None:
allowed_failures = dask.config.get("distributed.scheduler.allowed-failures")
self.allowed_failures = allowed_failures
if validate is None:
validate = dask.config.get("distributed.scheduler.validate")
self.proc = psutil.Process()
self.service_specs = services or {}
self.service_kwargs = service_kwargs or {}
self.services = {}
self.scheduler_file = scheduler_file
self.worker_ttl = parse_timedelta(
worker_ttl or dask.config.get("distributed.scheduler.worker-ttl")
)
self.idle_timeout = parse_timedelta(
idle_timeout or dask.config.get("distributed.scheduler.idle-timeout")
)
self.idle_since = time()
self.no_workers_timeout = parse_timedelta(
dask.config.get("distributed.scheduler.no-workers-timeout")
)
self._no_workers_since = None
self.time_started = self.idle_since # compatibility for dask-gateway
self._replica_lock = RLock()
self.bandwidth_workers = defaultdict(float)
self.bandwidth_types = defaultdict(float)
# Don't cast int metrics to float
self.cumulative_worker_metrics = defaultdict(int)
if not preload:
preload = dask.config.get("distributed.scheduler.preload")
if not preload_argv:
preload_argv = dask.config.get("distributed.scheduler.preload-argv")
self.preloads = preloading.process_preloads(self, preload, preload_argv)
if isinstance(security, dict):
security = Security(**security)
self.security = security or Security()
assert isinstance(self.security, Security)
self.connection_args = self.security.get_connection_args("scheduler")
self.connection_args["handshake_overrides"] = { # common denominator
"pickle-protocol": 4
}
self._start_address = addresses_from_user_args(
host=host,
port=port,
interface=interface,
protocol=protocol,
security=security,
default_port=self.default_port,
)
http_server_modules = dask.config.get("distributed.scheduler.http.routes")
show_dashboard = dashboard or (dashboard is None and dashboard_address)
# install vanilla route if show_dashboard but bokeh is not installed
if show_dashboard:
try:
> import distributed.dashboard.scheduler
E File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
E rf"/{prefix.strip("/")}/?",
E ^
E SyntaxError: f-string: unmatched '('
distributed/scheduler.py:3783: SyntaxError
Check warning on line 0 in distributed.deploy.tests.test_adaptive
github-actions / Unit Test Results
2 out of 3 runs failed: test_adaptive_local_cluster (distributed.deploy.tests.test_adaptive)
artifacts/ubuntu-latest-mindeps-default-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: Cluster failed to start: f-string: unmatched '(' (scheduler.py, line 186)
self = LocalCluster(c88f520d, '<Not Connected>', workers=0, threads=0, memory=0 B)
async def _start(self):
while self.status == Status.starting:
await asyncio.sleep(0.01)
if self.status == Status.running:
return
if self.status == Status.closed:
raise ValueError("Cluster is closed")
self._lock = asyncio.Lock()
self.status = Status.starting
if self.scheduler_spec is None:
try:
import distributed.dashboard # noqa: F401
except ImportError:
pass
else:
options = {"dashboard": True}
self.scheduler_spec = {"cls": Scheduler, "options": options}
try:
# Check if scheduler has already been created by a subclass
if self.scheduler is None:
cls = self.scheduler_spec["cls"]
if isinstance(cls, str):
cls = import_term(cls)
> self.scheduler = cls(**self.scheduler_spec.get("options", {}))
distributed/deploy/spec.py:324:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Scheduler' object has no attribute '_address'") raised in repr()] Scheduler object at 0x7f97563517a0>
loop = None, services = {}, service_kwargs = None, allowed_failures = 3
extensions = None, validate = True, scheduler_file = None
security = Security(require_encryption=False, tls_min_version=771)
worker_ttl = None, idle_timeout = None, interface = None, host = '127.0.0.1'
port = 0, protocol = 'tcp://', dashboard_address = ':0', dashboard = True
http_prefix = '/', preload = [], preload_argv = [], plugins = ()
contact_address = None, transition_counter_max = False, jupyter = False
kwargs = {'blocked_handlers': None}
http_server_modules = ['distributed.http.scheduler.prometheus', 'distributed.http.scheduler.info', 'distributed.http.scheduler.json', 'distributed.http.health', 'distributed.http.proxy', 'distributed.http.statics']
show_dashboard = True
def __init__(
self,
loop=None,
services=None,
service_kwargs=None,
allowed_failures=None,
extensions=None,
validate=None,
scheduler_file=None,
security=None,
worker_ttl=None,
idle_timeout=None,
interface=None,
host=None,
port=0,
protocol=None,
dashboard_address=None,
dashboard=None,
http_prefix="/",
preload=None,
preload_argv=(),
plugins=(),
contact_address=None,
transition_counter_max=False,
jupyter=False,
**kwargs,
):
if dask.config.get("distributed.scheduler.pickle", default=True) is False:
raise RuntimeError(
"Pickling can no longer be disabled with the `distributed.scheduler.pickle` option. Please remove this configuration to start the scheduler."
)
if loop is not None:
warnings.warn(
"the loop kwarg to Scheduler is deprecated",
DeprecationWarning,
stacklevel=2,
)
self.loop = self.io_loop = IOLoop.current()
self._setup_logging(logger)
# Attributes
if contact_address is None:
contact_address = dask.config.get("distributed.scheduler.contact-address")
self.contact_address = contact_address
if allowed_failures is None:
allowed_failures = dask.config.get("distributed.scheduler.allowed-failures")
self.allowed_failures = allowed_failures
if validate is None:
validate = dask.config.get("distributed.scheduler.validate")
self.proc = psutil.Process()
self.service_specs = services or {}
self.service_kwargs = service_kwargs or {}
self.services = {}
self.scheduler_file = scheduler_file
self.worker_ttl = parse_timedelta(
worker_ttl or dask.config.get("distributed.scheduler.worker-ttl")
)
self.idle_timeout = parse_timedelta(
idle_timeout or dask.config.get("distributed.scheduler.idle-timeout")
)
self.idle_since = time()
self.no_workers_timeout = parse_timedelta(
dask.config.get("distributed.scheduler.no-workers-timeout")
)
self._no_workers_since = None
self.time_started = self.idle_since # compatibility for dask-gateway
self._replica_lock = RLock()
self.bandwidth_workers = defaultdict(float)
self.bandwidth_types = defaultdict(float)
# Don't cast int metrics to float
self.cumulative_worker_metrics = defaultdict(int)
if not preload:
preload = dask.config.get("distributed.scheduler.preload")
if not preload_argv:
preload_argv = dask.config.get("distributed.scheduler.preload-argv")
self.preloads = preloading.process_preloads(self, preload, preload_argv)
if isinstance(security, dict):
security = Security(**security)
self.security = security or Security()
assert isinstance(self.security, Security)
self.connection_args = self.security.get_connection_args("scheduler")
self.connection_args["handshake_overrides"] = { # common denominator
"pickle-protocol": 4
}
self._start_address = addresses_from_user_args(
host=host,
port=port,
interface=interface,
protocol=protocol,
security=security,
default_port=self.default_port,
)
http_server_modules = dask.config.get("distributed.scheduler.http.routes")
show_dashboard = dashboard or (dashboard is None and dashboard_address)
# install vanilla route if show_dashboard but bokeh is not installed
if show_dashboard:
try:
> import distributed.dashboard.scheduler
E File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
E rf"/{prefix.strip("/")}/?",
E ^
E SyntaxError: f-string: unmatched '('
distributed/scheduler.py:3783: SyntaxError
The above exception was the direct cause of the following exception:
loop = <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7f975778fa60>
def test_adaptive_local_cluster(loop):
> with LocalCluster(
n_workers=0,
silence_logs=False,
dashboard_address=":0",
loop=loop,
) as cluster:
distributed/deploy/tests/test_adaptive.py:34:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/deploy/local.py:256: in __init__
super().__init__(
distributed/deploy/spec.py:284: in __init__
self.sync(self._start)
distributed/utils.py:363: in sync
return sync(
distributed/utils.py:439: in sync
raise error
distributed/utils.py:413: in f
result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/tornado/gen.py:769: in run
value = future.result()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = LocalCluster(c88f520d, '<Not Connected>', workers=0, threads=0, memory=0 B)
async def _start(self):
while self.status == Status.starting:
await asyncio.sleep(0.01)
if self.status == Status.running:
return
if self.status == Status.closed:
raise ValueError("Cluster is closed")
self._lock = asyncio.Lock()
self.status = Status.starting
if self.scheduler_spec is None:
try:
import distributed.dashboard # noqa: F401
except ImportError:
pass
else:
options = {"dashboard": True}
self.scheduler_spec = {"cls": Scheduler, "options": options}
try:
# Check if scheduler has already been created by a subclass
if self.scheduler is None:
cls = self.scheduler_spec["cls"]
if isinstance(cls, str):
cls = import_term(cls)
self.scheduler = cls(**self.scheduler_spec.get("options", {}))
self.scheduler = await self.scheduler
self.scheduler_comm = rpc(
getattr(self.scheduler, "external_address", None)
or self.scheduler.address,
connection_args=self.security.get_connection_args("client"),
)
await super()._start()
except Exception as e: # pragma: no cover
self.status = Status.failed
await self._close()
> raise RuntimeError(f"Cluster failed to start: {e}") from e
E RuntimeError: Cluster failed to start: f-string: unmatched '(' (scheduler.py, line 186)
distributed/deploy/spec.py:335: RuntimeError
Check warning on line 0 in distributed.deploy.tests.test_adaptive
github-actions / Unit Test Results
2 out of 3 runs failed: test_adaptive_local_cluster_multi_workers (distributed.deploy.tests.test_adaptive)
artifacts/ubuntu-latest-mindeps-default-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: Cluster failed to start: f-string: unmatched '(' (scheduler.py, line 186)
self = LocalCluster(b3a5e2ab, '<Not Connected>', workers=0, threads=0, memory=0 B)
async def _start(self):
while self.status == Status.starting:
await asyncio.sleep(0.01)
if self.status == Status.running:
return
if self.status == Status.closed:
raise ValueError("Cluster is closed")
self._lock = asyncio.Lock()
self.status = Status.starting
if self.scheduler_spec is None:
try:
import distributed.dashboard # noqa: F401
except ImportError:
pass
else:
options = {"dashboard": True}
self.scheduler_spec = {"cls": Scheduler, "options": options}
try:
# Check if scheduler has already been created by a subclass
if self.scheduler is None:
cls = self.scheduler_spec["cls"]
if isinstance(cls, str):
cls = import_term(cls)
> self.scheduler = cls(**self.scheduler_spec.get("options", {}))
distributed/deploy/spec.py:324:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Scheduler' object has no attribute '_address'") raised in repr()] Scheduler object at 0x7f9756353a00>
loop = None, services = {}, service_kwargs = None, allowed_failures = 3
extensions = None, validate = True, scheduler_file = None
security = Security(require_encryption=False, tls_min_version=771)
worker_ttl = None, idle_timeout = None, interface = None, host = None, port = 0
protocol = 'inproc://', dashboard_address = ':0', dashboard = True
http_prefix = '/', preload = [], preload_argv = [], plugins = ()
contact_address = None, transition_counter_max = False, jupyter = False
kwargs = {'blocked_handlers': None}
http_server_modules = ['distributed.http.scheduler.prometheus', 'distributed.http.scheduler.info', 'distributed.http.scheduler.json', 'distributed.http.health', 'distributed.http.proxy', 'distributed.http.statics']
show_dashboard = True
def __init__(
self,
loop=None,
services=None,
service_kwargs=None,
allowed_failures=None,
extensions=None,
validate=None,
scheduler_file=None,
security=None,
worker_ttl=None,
idle_timeout=None,
interface=None,
host=None,
port=0,
protocol=None,
dashboard_address=None,
dashboard=None,
http_prefix="/",
preload=None,
preload_argv=(),
plugins=(),
contact_address=None,
transition_counter_max=False,
jupyter=False,
**kwargs,
):
if dask.config.get("distributed.scheduler.pickle", default=True) is False:
raise RuntimeError(
"Pickling can no longer be disabled with the `distributed.scheduler.pickle` option. Please remove this configuration to start the scheduler."
)
if loop is not None:
warnings.warn(
"the loop kwarg to Scheduler is deprecated",
DeprecationWarning,
stacklevel=2,
)
self.loop = self.io_loop = IOLoop.current()
self._setup_logging(logger)
# Attributes
if contact_address is None:
contact_address = dask.config.get("distributed.scheduler.contact-address")
self.contact_address = contact_address
if allowed_failures is None:
allowed_failures = dask.config.get("distributed.scheduler.allowed-failures")
self.allowed_failures = allowed_failures
if validate is None:
validate = dask.config.get("distributed.scheduler.validate")
self.proc = psutil.Process()
self.service_specs = services or {}
self.service_kwargs = service_kwargs or {}
self.services = {}
self.scheduler_file = scheduler_file
self.worker_ttl = parse_timedelta(
worker_ttl or dask.config.get("distributed.scheduler.worker-ttl")
)
self.idle_timeout = parse_timedelta(
idle_timeout or dask.config.get("distributed.scheduler.idle-timeout")
)
self.idle_since = time()
self.no_workers_timeout = parse_timedelta(
dask.config.get("distributed.scheduler.no-workers-timeout")
)
self._no_workers_since = None
self.time_started = self.idle_since # compatibility for dask-gateway
self._replica_lock = RLock()
self.bandwidth_workers = defaultdict(float)
self.bandwidth_types = defaultdict(float)
# Don't cast int metrics to float
self.cumulative_worker_metrics = defaultdict(int)
if not preload:
preload = dask.config.get("distributed.scheduler.preload")
if not preload_argv:
preload_argv = dask.config.get("distributed.scheduler.preload-argv")
self.preloads = preloading.process_preloads(self, preload, preload_argv)
if isinstance(security, dict):
security = Security(**security)
self.security = security or Security()
assert isinstance(self.security, Security)
self.connection_args = self.security.get_connection_args("scheduler")
self.connection_args["handshake_overrides"] = { # common denominator
"pickle-protocol": 4
}
self._start_address = addresses_from_user_args(
host=host,
port=port,
interface=interface,
protocol=protocol,
security=security,
default_port=self.default_port,
)
http_server_modules = dask.config.get("distributed.scheduler.http.routes")
show_dashboard = dashboard or (dashboard is None and dashboard_address)
# install vanilla route if show_dashboard but bokeh is not installed
if show_dashboard:
try:
> import distributed.dashboard.scheduler
E File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
E rf"/{prefix.strip("/")}/?",
E ^
E SyntaxError: f-string: unmatched '('
distributed/scheduler.py:3783: SyntaxError
The above exception was the direct cause of the following exception:
@gen_test()
async def test_adaptive_local_cluster_multi_workers():
> async with LocalCluster(
n_workers=0,
silence_logs=False,
processes=False,
dashboard_address=":0",
asynchronous=True,
) as cluster:
distributed/deploy/tests/test_adaptive.py:62:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/deploy/spec.py:473: in __aenter__
await self
distributed/deploy/spec.py:418: in _
await self._start()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = LocalCluster(b3a5e2ab, '<Not Connected>', workers=0, threads=0, memory=0 B)
async def _start(self):
while self.status == Status.starting:
await asyncio.sleep(0.01)
if self.status == Status.running:
return
if self.status == Status.closed:
raise ValueError("Cluster is closed")
self._lock = asyncio.Lock()
self.status = Status.starting
if self.scheduler_spec is None:
try:
import distributed.dashboard # noqa: F401
except ImportError:
pass
else:
options = {"dashboard": True}
self.scheduler_spec = {"cls": Scheduler, "options": options}
try:
# Check if scheduler has already been created by a subclass
if self.scheduler is None:
cls = self.scheduler_spec["cls"]
if isinstance(cls, str):
cls = import_term(cls)
self.scheduler = cls(**self.scheduler_spec.get("options", {}))
self.scheduler = await self.scheduler
self.scheduler_comm = rpc(
getattr(self.scheduler, "external_address", None)
or self.scheduler.address,
connection_args=self.security.get_connection_args("client"),
)
await super()._start()
except Exception as e: # pragma: no cover
self.status = Status.failed
await self._close()
> raise RuntimeError(f"Cluster failed to start: {e}") from e
E RuntimeError: Cluster failed to start: f-string: unmatched '(' (scheduler.py, line 186)
distributed/deploy/spec.py:335: RuntimeError
Check warning on line 0 in distributed.deploy.tests.test_adaptive
github-actions / Unit Test Results
2 out of 3 runs failed: test_min_max (distributed.deploy.tests.test_adaptive)
artifacts/ubuntu-latest-mindeps-default-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: Cluster failed to start: f-string: unmatched '(' (scheduler.py, line 186)
self = LocalCluster(415b936f, '<Not Connected>', workers=0, threads=0, memory=0 B)
async def _start(self):
while self.status == Status.starting:
await asyncio.sleep(0.01)
if self.status == Status.running:
return
if self.status == Status.closed:
raise ValueError("Cluster is closed")
self._lock = asyncio.Lock()
self.status = Status.starting
if self.scheduler_spec is None:
try:
import distributed.dashboard # noqa: F401
except ImportError:
pass
else:
options = {"dashboard": True}
self.scheduler_spec = {"cls": Scheduler, "options": options}
try:
# Check if scheduler has already been created by a subclass
if self.scheduler is None:
cls = self.scheduler_spec["cls"]
if isinstance(cls, str):
cls = import_term(cls)
> self.scheduler = cls(**self.scheduler_spec.get("options", {}))
distributed/deploy/spec.py:324:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Scheduler' object has no attribute '_address'") raised in repr()] Scheduler object at 0x7f97563535e0>
loop = None, services = {}, service_kwargs = None, allowed_failures = 3
extensions = None, validate = True, scheduler_file = None
security = Security(require_encryption=False, tls_min_version=771)
worker_ttl = None, idle_timeout = None, interface = None, host = None, port = 0
protocol = 'inproc://', dashboard_address = ':0', dashboard = True
http_prefix = '/', preload = [], preload_argv = [], plugins = ()
contact_address = None, transition_counter_max = False, jupyter = False
kwargs = {'blocked_handlers': None}
http_server_modules = ['distributed.http.scheduler.prometheus', 'distributed.http.scheduler.info', 'distributed.http.scheduler.json', 'distributed.http.health', 'distributed.http.proxy', 'distributed.http.statics']
show_dashboard = True
def __init__(
self,
loop=None,
services=None,
service_kwargs=None,
allowed_failures=None,
extensions=None,
validate=None,
scheduler_file=None,
security=None,
worker_ttl=None,
idle_timeout=None,
interface=None,
host=None,
port=0,
protocol=None,
dashboard_address=None,
dashboard=None,
http_prefix="/",
preload=None,
preload_argv=(),
plugins=(),
contact_address=None,
transition_counter_max=False,
jupyter=False,
**kwargs,
):
if dask.config.get("distributed.scheduler.pickle", default=True) is False:
raise RuntimeError(
"Pickling can no longer be disabled with the `distributed.scheduler.pickle` option. Please remove this configuration to start the scheduler."
)
if loop is not None:
warnings.warn(
"the loop kwarg to Scheduler is deprecated",
DeprecationWarning,
stacklevel=2,
)
self.loop = self.io_loop = IOLoop.current()
self._setup_logging(logger)
# Attributes
if contact_address is None:
contact_address = dask.config.get("distributed.scheduler.contact-address")
self.contact_address = contact_address
if allowed_failures is None:
allowed_failures = dask.config.get("distributed.scheduler.allowed-failures")
self.allowed_failures = allowed_failures
if validate is None:
validate = dask.config.get("distributed.scheduler.validate")
self.proc = psutil.Process()
self.service_specs = services or {}
self.service_kwargs = service_kwargs or {}
self.services = {}
self.scheduler_file = scheduler_file
self.worker_ttl = parse_timedelta(
worker_ttl or dask.config.get("distributed.scheduler.worker-ttl")
)
self.idle_timeout = parse_timedelta(
idle_timeout or dask.config.get("distributed.scheduler.idle-timeout")
)
self.idle_since = time()
self.no_workers_timeout = parse_timedelta(
dask.config.get("distributed.scheduler.no-workers-timeout")
)
self._no_workers_since = None
self.time_started = self.idle_since # compatibility for dask-gateway
self._replica_lock = RLock()
self.bandwidth_workers = defaultdict(float)
self.bandwidth_types = defaultdict(float)
# Don't cast int metrics to float
self.cumulative_worker_metrics = defaultdict(int)
if not preload:
preload = dask.config.get("distributed.scheduler.preload")
if not preload_argv:
preload_argv = dask.config.get("distributed.scheduler.preload-argv")
self.preloads = preloading.process_preloads(self, preload, preload_argv)
if isinstance(security, dict):
security = Security(**security)
self.security = security or Security()
assert isinstance(self.security, Security)
self.connection_args = self.security.get_connection_args("scheduler")
self.connection_args["handshake_overrides"] = { # common denominator
"pickle-protocol": 4
}
self._start_address = addresses_from_user_args(
host=host,
port=port,
interface=interface,
protocol=protocol,
security=security,
default_port=self.default_port,
)
http_server_modules = dask.config.get("distributed.scheduler.http.routes")
show_dashboard = dashboard or (dashboard is None and dashboard_address)
# install vanilla route if show_dashboard but bokeh is not installed
if show_dashboard:
try:
> import distributed.dashboard.scheduler
E File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
E rf"/{prefix.strip("/")}/?",
E ^
E SyntaxError: f-string: unmatched '('
distributed/scheduler.py:3783: SyntaxError
The above exception was the direct cause of the following exception:
@gen_test()
async def test_min_max():
> async with LocalCluster(
n_workers=0,
silence_logs=False,
processes=False,
dashboard_address=":0",
asynchronous=True,
threads_per_worker=1,
) as cluster:
distributed/deploy/tests/test_adaptive.py:94:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/deploy/spec.py:473: in __aenter__
await self
distributed/deploy/spec.py:418: in _
await self._start()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = LocalCluster(415b936f, '<Not Connected>', workers=0, threads=0, memory=0 B)
async def _start(self):
while self.status == Status.starting:
await asyncio.sleep(0.01)
if self.status == Status.running:
return
if self.status == Status.closed:
raise ValueError("Cluster is closed")
self._lock = asyncio.Lock()
self.status = Status.starting
if self.scheduler_spec is None:
try:
import distributed.dashboard # noqa: F401
except ImportError:
pass
else:
options = {"dashboard": True}
self.scheduler_spec = {"cls": Scheduler, "options": options}
try:
# Check if scheduler has already been created by a subclass
if self.scheduler is None:
cls = self.scheduler_spec["cls"]
if isinstance(cls, str):
cls = import_term(cls)
self.scheduler = cls(**self.scheduler_spec.get("options", {}))
self.scheduler = await self.scheduler
self.scheduler_comm = rpc(
getattr(self.scheduler, "external_address", None)
or self.scheduler.address,
connection_args=self.security.get_connection_args("client"),
)
await super()._start()
except Exception as e: # pragma: no cover
self.status = Status.failed
await self._close()
> raise RuntimeError(f"Cluster failed to start: {e}") from e
E RuntimeError: Cluster failed to start: f-string: unmatched '(' (scheduler.py, line 186)
distributed/deploy/spec.py:335: RuntimeError
Check warning on line 0 in distributed.deploy.tests.test_adaptive
github-actions / Unit Test Results
2 out of 3 runs failed: test_avoid_churn (distributed.deploy.tests.test_adaptive)
artifacts/ubuntu-latest-mindeps-default-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: Cluster failed to start: f-string: unmatched '(' (scheduler.py, line 186)
self = LocalCluster(b2bf9558, '<Not Connected>', workers=0, threads=0, memory=0 B)
async def _start(self):
while self.status == Status.starting:
await asyncio.sleep(0.01)
if self.status == Status.running:
return
if self.status == Status.closed:
raise ValueError("Cluster is closed")
self._lock = asyncio.Lock()
self.status = Status.starting
if self.scheduler_spec is None:
try:
import distributed.dashboard # noqa: F401
except ImportError:
pass
else:
options = {"dashboard": True}
self.scheduler_spec = {"cls": Scheduler, "options": options}
try:
# Check if scheduler has already been created by a subclass
if self.scheduler is None:
cls = self.scheduler_spec["cls"]
if isinstance(cls, str):
cls = import_term(cls)
> self.scheduler = cls(**self.scheduler_spec.get("options", {}))
distributed/deploy/spec.py:324:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Scheduler' object has no attribute '_address'") raised in repr()] Scheduler object at 0x7f97563538a0>
loop = None, services = {}, service_kwargs = None, allowed_failures = 3
extensions = None, validate = True, scheduler_file = None
security = Security(require_encryption=False, tls_min_version=771)
worker_ttl = None, idle_timeout = None, interface = None, host = None, port = 0
protocol = 'inproc://', dashboard_address = ':0', dashboard = True
http_prefix = '/', preload = [], preload_argv = [], plugins = ()
contact_address = None, transition_counter_max = False, jupyter = False
kwargs = {'blocked_handlers': None}
http_server_modules = ['distributed.http.scheduler.prometheus', 'distributed.http.scheduler.info', 'distributed.http.scheduler.json', 'distributed.http.health', 'distributed.http.proxy', 'distributed.http.statics']
show_dashboard = True
def __init__(
self,
loop=None,
services=None,
service_kwargs=None,
allowed_failures=None,
extensions=None,
validate=None,
scheduler_file=None,
security=None,
worker_ttl=None,
idle_timeout=None,
interface=None,
host=None,
port=0,
protocol=None,
dashboard_address=None,
dashboard=None,
http_prefix="/",
preload=None,
preload_argv=(),
plugins=(),
contact_address=None,
transition_counter_max=False,
jupyter=False,
**kwargs,
):
if dask.config.get("distributed.scheduler.pickle", default=True) is False:
raise RuntimeError(
"Pickling can no longer be disabled with the `distributed.scheduler.pickle` option. Please remove this configuration to start the scheduler."
)
if loop is not None:
warnings.warn(
"the loop kwarg to Scheduler is deprecated",
DeprecationWarning,
stacklevel=2,
)
self.loop = self.io_loop = IOLoop.current()
self._setup_logging(logger)
# Attributes
if contact_address is None:
contact_address = dask.config.get("distributed.scheduler.contact-address")
self.contact_address = contact_address
if allowed_failures is None:
allowed_failures = dask.config.get("distributed.scheduler.allowed-failures")
self.allowed_failures = allowed_failures
if validate is None:
validate = dask.config.get("distributed.scheduler.validate")
self.proc = psutil.Process()
self.service_specs = services or {}
self.service_kwargs = service_kwargs or {}
self.services = {}
self.scheduler_file = scheduler_file
self.worker_ttl = parse_timedelta(
worker_ttl or dask.config.get("distributed.scheduler.worker-ttl")
)
self.idle_timeout = parse_timedelta(
idle_timeout or dask.config.get("distributed.scheduler.idle-timeout")
)
self.idle_since = time()
self.no_workers_timeout = parse_timedelta(
dask.config.get("distributed.scheduler.no-workers-timeout")
)
self._no_workers_since = None
self.time_started = self.idle_since # compatibility for dask-gateway
self._replica_lock = RLock()
self.bandwidth_workers = defaultdict(float)
self.bandwidth_types = defaultdict(float)
# Don't cast int metrics to float
self.cumulative_worker_metrics = defaultdict(int)
if not preload:
preload = dask.config.get("distributed.scheduler.preload")
if not preload_argv:
preload_argv = dask.config.get("distributed.scheduler.preload-argv")
self.preloads = preloading.process_preloads(self, preload, preload_argv)
if isinstance(security, dict):
security = Security(**security)
self.security = security or Security()
assert isinstance(self.security, Security)
self.connection_args = self.security.get_connection_args("scheduler")
self.connection_args["handshake_overrides"] = { # common denominator
"pickle-protocol": 4
}
self._start_address = addresses_from_user_args(
host=host,
port=port,
interface=interface,
protocol=protocol,
security=security,
default_port=self.default_port,
)
http_server_modules = dask.config.get("distributed.scheduler.http.routes")
show_dashboard = dashboard or (dashboard is None and dashboard_address)
# install vanilla route if show_dashboard but bokeh is not installed
if show_dashboard:
try:
> import distributed.dashboard.scheduler
E File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
E rf"/{prefix.strip("/")}/?",
E ^
E SyntaxError: f-string: unmatched '('
distributed/scheduler.py:3783: SyntaxError
The above exception was the direct cause of the following exception:
@gen_test()
async def test_avoid_churn():
"""We want to avoid creating and deleting workers frequently
Instead we want to wait a few beats before removing a worker in case the
user is taking a brief pause between work
"""
> async with LocalCluster(
n_workers=0,
asynchronous=True,
processes=False,
silence_logs=False,
dashboard_address=":0",
) as cluster:
distributed/deploy/tests/test_adaptive.py:144:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/deploy/spec.py:473: in __aenter__
await self
distributed/deploy/spec.py:418: in _
await self._start()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = LocalCluster(b2bf9558, '<Not Connected>', workers=0, threads=0, memory=0 B)
async def _start(self):
while self.status == Status.starting:
await asyncio.sleep(0.01)
if self.status == Status.running:
return
if self.status == Status.closed:
raise ValueError("Cluster is closed")
self._lock = asyncio.Lock()
self.status = Status.starting
if self.scheduler_spec is None:
try:
import distributed.dashboard # noqa: F401
except ImportError:
pass
else:
options = {"dashboard": True}
self.scheduler_spec = {"cls": Scheduler, "options": options}
try:
# Check if scheduler has already been created by a subclass
if self.scheduler is None:
cls = self.scheduler_spec["cls"]
if isinstance(cls, str):
cls = import_term(cls)
self.scheduler = cls(**self.scheduler_spec.get("options", {}))
self.scheduler = await self.scheduler
self.scheduler_comm = rpc(
getattr(self.scheduler, "external_address", None)
or self.scheduler.address,
connection_args=self.security.get_connection_args("client"),
)
await super()._start()
except Exception as e: # pragma: no cover
self.status = Status.failed
await self._close()
> raise RuntimeError(f"Cluster failed to start: {e}") from e
E RuntimeError: Cluster failed to start: f-string: unmatched '(' (scheduler.py, line 186)
distributed/deploy/spec.py:335: RuntimeError
Check warning on line 0 in distributed.deploy.tests.test_adaptive
github-actions / Unit Test Results
2 out of 3 runs failed: test_adapt_quickly (distributed.deploy.tests.test_adaptive)
artifacts/ubuntu-latest-mindeps-default-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: Cluster failed to start: f-string: unmatched '(' (scheduler.py, line 186)
self = LocalCluster(4610b3e9, '<Not Connected>', workers=0, threads=0, memory=0 B)
async def _start(self):
while self.status == Status.starting:
await asyncio.sleep(0.01)
if self.status == Status.running:
return
if self.status == Status.closed:
raise ValueError("Cluster is closed")
self._lock = asyncio.Lock()
self.status = Status.starting
if self.scheduler_spec is None:
try:
import distributed.dashboard # noqa: F401
except ImportError:
pass
else:
options = {"dashboard": True}
self.scheduler_spec = {"cls": Scheduler, "options": options}
try:
# Check if scheduler has already been created by a subclass
if self.scheduler is None:
cls = self.scheduler_spec["cls"]
if isinstance(cls, str):
cls = import_term(cls)
> self.scheduler = cls(**self.scheduler_spec.get("options", {}))
distributed/deploy/spec.py:324:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Scheduler' object has no attribute '_address'") raised in repr()] Scheduler object at 0x7f9756352c40>
loop = None, services = {}, service_kwargs = None, allowed_failures = 3
extensions = None, validate = True, scheduler_file = None
security = Security(require_encryption=False, tls_min_version=771)
worker_ttl = None, idle_timeout = None, interface = None, host = None, port = 0
protocol = 'inproc://', dashboard_address = ':0', dashboard = True
http_prefix = '/', preload = [], preload_argv = [], plugins = ()
contact_address = None, transition_counter_max = False, jupyter = False
kwargs = {'blocked_handlers': None}
http_server_modules = ['distributed.http.scheduler.prometheus', 'distributed.http.scheduler.info', 'distributed.http.scheduler.json', 'distributed.http.health', 'distributed.http.proxy', 'distributed.http.statics']
show_dashboard = True
def __init__(
self,
loop=None,
services=None,
service_kwargs=None,
allowed_failures=None,
extensions=None,
validate=None,
scheduler_file=None,
security=None,
worker_ttl=None,
idle_timeout=None,
interface=None,
host=None,
port=0,
protocol=None,
dashboard_address=None,
dashboard=None,
http_prefix="/",
preload=None,
preload_argv=(),
plugins=(),
contact_address=None,
transition_counter_max=False,
jupyter=False,
**kwargs,
):
if dask.config.get("distributed.scheduler.pickle", default=True) is False:
raise RuntimeError(
"Pickling can no longer be disabled with the `distributed.scheduler.pickle` option. Please remove this configuration to start the scheduler."
)
if loop is not None:
warnings.warn(
"the loop kwarg to Scheduler is deprecated",
DeprecationWarning,
stacklevel=2,
)
self.loop = self.io_loop = IOLoop.current()
self._setup_logging(logger)
# Attributes
if contact_address is None:
contact_address = dask.config.get("distributed.scheduler.contact-address")
self.contact_address = contact_address
if allowed_failures is None:
allowed_failures = dask.config.get("distributed.scheduler.allowed-failures")
self.allowed_failures = allowed_failures
if validate is None:
validate = dask.config.get("distributed.scheduler.validate")
self.proc = psutil.Process()
self.service_specs = services or {}
self.service_kwargs = service_kwargs or {}
self.services = {}
self.scheduler_file = scheduler_file
self.worker_ttl = parse_timedelta(
worker_ttl or dask.config.get("distributed.scheduler.worker-ttl")
)
self.idle_timeout = parse_timedelta(
idle_timeout or dask.config.get("distributed.scheduler.idle-timeout")
)
self.idle_since = time()
self.no_workers_timeout = parse_timedelta(
dask.config.get("distributed.scheduler.no-workers-timeout")
)
self._no_workers_since = None
self.time_started = self.idle_since # compatibility for dask-gateway
self._replica_lock = RLock()
self.bandwidth_workers = defaultdict(float)
self.bandwidth_types = defaultdict(float)
# Don't cast int metrics to float
self.cumulative_worker_metrics = defaultdict(int)
if not preload:
preload = dask.config.get("distributed.scheduler.preload")
if not preload_argv:
preload_argv = dask.config.get("distributed.scheduler.preload-argv")
self.preloads = preloading.process_preloads(self, preload, preload_argv)
if isinstance(security, dict):
security = Security(**security)
self.security = security or Security()
assert isinstance(self.security, Security)
self.connection_args = self.security.get_connection_args("scheduler")
self.connection_args["handshake_overrides"] = { # common denominator
"pickle-protocol": 4
}
self._start_address = addresses_from_user_args(
host=host,
port=port,
interface=interface,
protocol=protocol,
security=security,
default_port=self.default_port,
)
http_server_modules = dask.config.get("distributed.scheduler.http.routes")
show_dashboard = dashboard or (dashboard is None and dashboard_address)
# install vanilla route if show_dashboard but bokeh is not installed
if show_dashboard:
try:
> import distributed.dashboard.scheduler
E File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
E rf"/{prefix.strip("/")}/?",
E ^
E SyntaxError: f-string: unmatched '('
distributed/scheduler.py:3783: SyntaxError
The above exception was the direct cause of the following exception:
@gen_test()
async def test_adapt_quickly():
"""We want to avoid creating and deleting workers frequently
Instead we want to wait a few beats before removing a worker in case the
user is taking a brief pause between work
"""
> async with (
LocalCluster(
n_workers=0,
asynchronous=True,
processes=False,
silence_logs=False,
dashboard_address=":0",
threads_per_worker=1,
) as cluster,
Client(cluster, asynchronous=True) as client,
):
distributed/deploy/tests/test_adaptive.py:168:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/deploy/spec.py:473: in __aenter__
await self
distributed/deploy/spec.py:418: in _
await self._start()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = LocalCluster(4610b3e9, '<Not Connected>', workers=0, threads=0, memory=0 B)
async def _start(self):
while self.status == Status.starting:
await asyncio.sleep(0.01)
if self.status == Status.running:
return
if self.status == Status.closed:
raise ValueError("Cluster is closed")
self._lock = asyncio.Lock()
self.status = Status.starting
if self.scheduler_spec is None:
try:
import distributed.dashboard # noqa: F401
except ImportError:
pass
else:
options = {"dashboard": True}
self.scheduler_spec = {"cls": Scheduler, "options": options}
try:
# Check if scheduler has already been created by a subclass
if self.scheduler is None:
cls = self.scheduler_spec["cls"]
if isinstance(cls, str):
cls = import_term(cls)
self.scheduler = cls(**self.scheduler_spec.get("options", {}))
self.scheduler = await self.scheduler
self.scheduler_comm = rpc(
getattr(self.scheduler, "external_address", None)
or self.scheduler.address,
connection_args=self.security.get_connection_args("client"),
)
await super()._start()
except Exception as e: # pragma: no cover
self.status = Status.failed
await self._close()
> raise RuntimeError(f"Cluster failed to start: {e}") from e
E RuntimeError: Cluster failed to start: f-string: unmatched '(' (scheduler.py, line 186)
distributed/deploy/spec.py:335: RuntimeError
Check warning on line 0 in distributed.deploy.tests.test_adaptive
github-actions / Unit Test Results
2 out of 3 runs failed: test_adapt_down (distributed.deploy.tests.test_adaptive)
artifacts/ubuntu-latest-mindeps-default-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: Cluster failed to start: f-string: unmatched '(' (scheduler.py, line 186)
self = LocalCluster(b6d3df68, '<Not Connected>', workers=0, threads=0, memory=0 B)
async def _start(self):
while self.status == Status.starting:
await asyncio.sleep(0.01)
if self.status == Status.running:
return
if self.status == Status.closed:
raise ValueError("Cluster is closed")
self._lock = asyncio.Lock()
self.status = Status.starting
if self.scheduler_spec is None:
try:
import distributed.dashboard # noqa: F401
except ImportError:
pass
else:
options = {"dashboard": True}
self.scheduler_spec = {"cls": Scheduler, "options": options}
try:
# Check if scheduler has already been created by a subclass
if self.scheduler is None:
cls = self.scheduler_spec["cls"]
if isinstance(cls, str):
cls = import_term(cls)
> self.scheduler = cls(**self.scheduler_spec.get("options", {}))
distributed/deploy/spec.py:324:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Scheduler' object has no attribute '_address'") raised in repr()] Scheduler object at 0x7f9756351fe0>
loop = None, services = {}, service_kwargs = None, allowed_failures = 3
extensions = None, validate = True, scheduler_file = None
security = Security(require_encryption=False, tls_min_version=771)
worker_ttl = None, idle_timeout = None, interface = None, host = None, port = 0
protocol = 'inproc://', dashboard_address = ':0', dashboard = True
http_prefix = '/', preload = [], preload_argv = [], plugins = ()
contact_address = None, transition_counter_max = False, jupyter = False
kwargs = {'blocked_handlers': None}
http_server_modules = ['distributed.http.scheduler.prometheus', 'distributed.http.scheduler.info', 'distributed.http.scheduler.json', 'distributed.http.health', 'distributed.http.proxy', 'distributed.http.statics']
show_dashboard = True
def __init__(
self,
loop=None,
services=None,
service_kwargs=None,
allowed_failures=None,
extensions=None,
validate=None,
scheduler_file=None,
security=None,
worker_ttl=None,
idle_timeout=None,
interface=None,
host=None,
port=0,
protocol=None,
dashboard_address=None,
dashboard=None,
http_prefix="/",
preload=None,
preload_argv=(),
plugins=(),
contact_address=None,
transition_counter_max=False,
jupyter=False,
**kwargs,
):
if dask.config.get("distributed.scheduler.pickle", default=True) is False:
raise RuntimeError(
"Pickling can no longer be disabled with the `distributed.scheduler.pickle` option. Please remove this configuration to start the scheduler."
)
if loop is not None:
warnings.warn(
"the loop kwarg to Scheduler is deprecated",
DeprecationWarning,
stacklevel=2,
)
self.loop = self.io_loop = IOLoop.current()
self._setup_logging(logger)
# Attributes
if contact_address is None:
contact_address = dask.config.get("distributed.scheduler.contact-address")
self.contact_address = contact_address
if allowed_failures is None:
allowed_failures = dask.config.get("distributed.scheduler.allowed-failures")
self.allowed_failures = allowed_failures
if validate is None:
validate = dask.config.get("distributed.scheduler.validate")
self.proc = psutil.Process()
self.service_specs = services or {}
self.service_kwargs = service_kwargs or {}
self.services = {}
self.scheduler_file = scheduler_file
self.worker_ttl = parse_timedelta(
worker_ttl or dask.config.get("distributed.scheduler.worker-ttl")
)
self.idle_timeout = parse_timedelta(
idle_timeout or dask.config.get("distributed.scheduler.idle-timeout")
)
self.idle_since = time()
self.no_workers_timeout = parse_timedelta(
dask.config.get("distributed.scheduler.no-workers-timeout")
)
self._no_workers_since = None
self.time_started = self.idle_since # compatibility for dask-gateway
self._replica_lock = RLock()
self.bandwidth_workers = defaultdict(float)
self.bandwidth_types = defaultdict(float)
# Don't cast int metrics to float
self.cumulative_worker_metrics = defaultdict(int)
if not preload:
preload = dask.config.get("distributed.scheduler.preload")
if not preload_argv:
preload_argv = dask.config.get("distributed.scheduler.preload-argv")
self.preloads = preloading.process_preloads(self, preload, preload_argv)
if isinstance(security, dict):
security = Security(**security)
self.security = security or Security()
assert isinstance(self.security, Security)
self.connection_args = self.security.get_connection_args("scheduler")
self.connection_args["handshake_overrides"] = { # common denominator
"pickle-protocol": 4
}
self._start_address = addresses_from_user_args(
host=host,
port=port,
interface=interface,
protocol=protocol,
security=security,
default_port=self.default_port,
)
http_server_modules = dask.config.get("distributed.scheduler.http.routes")
show_dashboard = dashboard or (dashboard is None and dashboard_address)
# install vanilla route if show_dashboard but bokeh is not installed
if show_dashboard:
try:
> import distributed.dashboard.scheduler
E File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
E rf"/{prefix.strip("/")}/?",
E ^
E SyntaxError: f-string: unmatched '('
distributed/scheduler.py:3783: SyntaxError
The above exception was the direct cause of the following exception:
@gen_test()
async def test_adapt_down():
"""Ensure that redefining adapt with a lower maximum removes workers"""
> async with (
LocalCluster(
n_workers=0,
asynchronous=True,
processes=False,
silence_logs=False,
dashboard_address=":0",
) as cluster,
Client(cluster, asynchronous=True) as client,
):
distributed/deploy/tests/test_adaptive.py:222:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/deploy/spec.py:473: in __aenter__
await self
distributed/deploy/spec.py:418: in _
await self._start()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = LocalCluster(b6d3df68, '<Not Connected>', workers=0, threads=0, memory=0 B)
async def _start(self):
while self.status == Status.starting:
await asyncio.sleep(0.01)
if self.status == Status.running:
return
if self.status == Status.closed:
raise ValueError("Cluster is closed")
self._lock = asyncio.Lock()
self.status = Status.starting
if self.scheduler_spec is None:
try:
import distributed.dashboard # noqa: F401
except ImportError:
pass
else:
options = {"dashboard": True}
self.scheduler_spec = {"cls": Scheduler, "options": options}
try:
# Check if scheduler has already been created by a subclass
if self.scheduler is None:
cls = self.scheduler_spec["cls"]
if isinstance(cls, str):
cls = import_term(cls)
self.scheduler = cls(**self.scheduler_spec.get("options", {}))
self.scheduler = await self.scheduler
self.scheduler_comm = rpc(
getattr(self.scheduler, "external_address", None)
or self.scheduler.address,
connection_args=self.security.get_connection_args("client"),
)
await super()._start()
except Exception as e: # pragma: no cover
self.status = Status.failed
await self._close()
> raise RuntimeError(f"Cluster failed to start: {e}") from e
E RuntimeError: Cluster failed to start: f-string: unmatched '(' (scheduler.py, line 186)
distributed/deploy/spec.py:335: RuntimeError
Check warning on line 0 in distributed.deploy.tests.test_adaptive
github-actions / Unit Test Results
2 out of 3 runs failed: test_no_more_workers_than_tasks (distributed.deploy.tests.test_adaptive)
artifacts/ubuntu-latest-mindeps-default-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: Cluster failed to start: f-string: unmatched '(' (scheduler.py, line 186)
self = LocalCluster(7b59a348, '<Not Connected>', workers=0, threads=0, memory=0 B)
async def _start(self):
while self.status == Status.starting:
await asyncio.sleep(0.01)
if self.status == Status.running:
return
if self.status == Status.closed:
raise ValueError("Cluster is closed")
self._lock = asyncio.Lock()
self.status = Status.starting
if self.scheduler_spec is None:
try:
import distributed.dashboard # noqa: F401
except ImportError:
pass
else:
options = {"dashboard": True}
self.scheduler_spec = {"cls": Scheduler, "options": options}
try:
# Check if scheduler has already been created by a subclass
if self.scheduler is None:
cls = self.scheduler_spec["cls"]
if isinstance(cls, str):
cls = import_term(cls)
> self.scheduler = cls(**self.scheduler_spec.get("options", {}))
distributed/deploy/spec.py:324:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Scheduler' object has no attribute '_address'") raised in repr()] Scheduler object at 0x7f9756353b60>
loop = None, services = {}, service_kwargs = None, allowed_failures = 3
extensions = None, validate = True, scheduler_file = None
security = Security(require_encryption=False, tls_min_version=771)
worker_ttl = None, idle_timeout = None, interface = None, host = None, port = 0
protocol = 'inproc://', dashboard_address = ':0', dashboard = True
http_prefix = '/', preload = [], preload_argv = [], plugins = ()
contact_address = None, transition_counter_max = False, jupyter = False
kwargs = {'blocked_handlers': None}
http_server_modules = ['distributed.http.scheduler.prometheus', 'distributed.http.scheduler.info', 'distributed.http.scheduler.json', 'distributed.http.health', 'distributed.http.proxy', 'distributed.http.statics']
show_dashboard = True
def __init__(
self,
loop=None,
services=None,
service_kwargs=None,
allowed_failures=None,
extensions=None,
validate=None,
scheduler_file=None,
security=None,
worker_ttl=None,
idle_timeout=None,
interface=None,
host=None,
port=0,
protocol=None,
dashboard_address=None,
dashboard=None,
http_prefix="/",
preload=None,
preload_argv=(),
plugins=(),
contact_address=None,
transition_counter_max=False,
jupyter=False,
**kwargs,
):
if dask.config.get("distributed.scheduler.pickle", default=True) is False:
raise RuntimeError(
"Pickling can no longer be disabled with the `distributed.scheduler.pickle` option. Please remove this configuration to start the scheduler."
)
if loop is not None:
warnings.warn(
"the loop kwarg to Scheduler is deprecated",
DeprecationWarning,
stacklevel=2,
)
self.loop = self.io_loop = IOLoop.current()
self._setup_logging(logger)
# Attributes
if contact_address is None:
contact_address = dask.config.get("distributed.scheduler.contact-address")
self.contact_address = contact_address
if allowed_failures is None:
allowed_failures = dask.config.get("distributed.scheduler.allowed-failures")
self.allowed_failures = allowed_failures
if validate is None:
validate = dask.config.get("distributed.scheduler.validate")
self.proc = psutil.Process()
self.service_specs = services or {}
self.service_kwargs = service_kwargs or {}
self.services = {}
self.scheduler_file = scheduler_file
self.worker_ttl = parse_timedelta(
worker_ttl or dask.config.get("distributed.scheduler.worker-ttl")
)
self.idle_timeout = parse_timedelta(
idle_timeout or dask.config.get("distributed.scheduler.idle-timeout")
)
self.idle_since = time()
self.no_workers_timeout = parse_timedelta(
dask.config.get("distributed.scheduler.no-workers-timeout")
)
self._no_workers_since = None
self.time_started = self.idle_since # compatibility for dask-gateway
self._replica_lock = RLock()
self.bandwidth_workers = defaultdict(float)
self.bandwidth_types = defaultdict(float)
# Don't cast int metrics to float
self.cumulative_worker_metrics = defaultdict(int)
if not preload:
preload = dask.config.get("distributed.scheduler.preload")
if not preload_argv:
preload_argv = dask.config.get("distributed.scheduler.preload-argv")
self.preloads = preloading.process_preloads(self, preload, preload_argv)
if isinstance(security, dict):
security = Security(**security)
self.security = security or Security()
assert isinstance(self.security, Security)
self.connection_args = self.security.get_connection_args("scheduler")
self.connection_args["handshake_overrides"] = { # common denominator
"pickle-protocol": 4
}
self._start_address = addresses_from_user_args(
host=host,
port=port,
interface=interface,
protocol=protocol,
security=security,
default_port=self.default_port,
)
http_server_modules = dask.config.get("distributed.scheduler.http.routes")
show_dashboard = dashboard or (dashboard is None and dashboard_address)
# install vanilla route if show_dashboard but bokeh is not installed
if show_dashboard:
try:
> import distributed.dashboard.scheduler
E File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
E rf"/{prefix.strip("/")}/?",
E ^
E SyntaxError: f-string: unmatched '('
distributed/scheduler.py:3783: SyntaxError
The above exception was the direct cause of the following exception:
@gen_test()
async def test_no_more_workers_than_tasks():
with dask.config.set(
{"distributed.scheduler.default-task-durations": {"slowinc": 1000}}
):
> async with LocalCluster(
n_workers=0,
silence_logs=False,
processes=False,
dashboard_address=":0",
asynchronous=True,
) as cluster:
distributed/deploy/tests/test_adaptive.py:251:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/deploy/spec.py:473: in __aenter__
await self
distributed/deploy/spec.py:418: in _
await self._start()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = LocalCluster(7b59a348, '<Not Connected>', workers=0, threads=0, memory=0 B)
async def _start(self):
while self.status == Status.starting:
await asyncio.sleep(0.01)
if self.status == Status.running:
return
if self.status == Status.closed:
raise ValueError("Cluster is closed")
self._lock = asyncio.Lock()
self.status = Status.starting
if self.scheduler_spec is None:
try:
import distributed.dashboard # noqa: F401
except ImportError:
pass
else:
options = {"dashboard": True}
self.scheduler_spec = {"cls": Scheduler, "options": options}
try:
# Check if scheduler has already been created by a subclass
if self.scheduler is None:
cls = self.scheduler_spec["cls"]
if isinstance(cls, str):
cls = import_term(cls)
self.scheduler = cls(**self.scheduler_spec.get("options", {}))
self.scheduler = await self.scheduler
self.scheduler_comm = rpc(
getattr(self.scheduler, "external_address", None)
or self.scheduler.address,
connection_args=self.security.get_connection_args("client"),
)
await super()._start()
except Exception as e: # pragma: no cover
self.status = Status.failed
await self._close()
> raise RuntimeError(f"Cluster failed to start: {e}") from e
E RuntimeError: Cluster failed to start: f-string: unmatched '(' (scheduler.py, line 186)
distributed/deploy/spec.py:335: RuntimeError
Check warning on line 0 in distributed.deploy.tests.test_adaptive
github-actions / Unit Test Results
2 out of 5 runs failed: test_basic_no_loop (distributed.deploy.tests.test_adaptive)
artifacts/ubuntu-latest-mindeps-default-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: Cluster failed to start: f-string: unmatched '(' (scheduler.py, line 186)
self = LocalCluster(dc099765, '<Not Connected>', workers=0, threads=0, memory=0 B)
async def _start(self):
while self.status == Status.starting:
await asyncio.sleep(0.01)
if self.status == Status.running:
return
if self.status == Status.closed:
raise ValueError("Cluster is closed")
self._lock = asyncio.Lock()
self.status = Status.starting
if self.scheduler_spec is None:
try:
import distributed.dashboard # noqa: F401
except ImportError:
pass
else:
options = {"dashboard": True}
self.scheduler_spec = {"cls": Scheduler, "options": options}
try:
# Check if scheduler has already been created by a subclass
if self.scheduler is None:
cls = self.scheduler_spec["cls"]
if isinstance(cls, str):
cls = import_term(cls)
> self.scheduler = cls(**self.scheduler_spec.get("options", {}))
distributed/deploy/spec.py:324:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Scheduler' object has no attribute '_address'") raised in repr()] Scheduler object at 0x7f9756353480>
loop = None, services = {}, service_kwargs = None, allowed_failures = 3
extensions = None, validate = False, scheduler_file = None
security = Security(require_encryption=False, tls_min_version=771)
worker_ttl = None, idle_timeout = None, interface = None, host = '127.0.0.1'
port = 0, protocol = 'tcp://', dashboard_address = ':0', dashboard = True
http_prefix = '/', preload = [], preload_argv = [], plugins = ()
contact_address = None, transition_counter_max = False, jupyter = False
kwargs = {'blocked_handlers': None}
http_server_modules = ['distributed.http.scheduler.prometheus', 'distributed.http.scheduler.info', 'distributed.http.scheduler.json', 'distributed.http.health', 'distributed.http.proxy', 'distributed.http.statics']
show_dashboard = True
def __init__(
self,
loop=None,
services=None,
service_kwargs=None,
allowed_failures=None,
extensions=None,
validate=None,
scheduler_file=None,
security=None,
worker_ttl=None,
idle_timeout=None,
interface=None,
host=None,
port=0,
protocol=None,
dashboard_address=None,
dashboard=None,
http_prefix="/",
preload=None,
preload_argv=(),
plugins=(),
contact_address=None,
transition_counter_max=False,
jupyter=False,
**kwargs,
):
if dask.config.get("distributed.scheduler.pickle", default=True) is False:
raise RuntimeError(
"Pickling can no longer be disabled with the `distributed.scheduler.pickle` option. Please remove this configuration to start the scheduler."
)
if loop is not None:
warnings.warn(
"the loop kwarg to Scheduler is deprecated",
DeprecationWarning,
stacklevel=2,
)
self.loop = self.io_loop = IOLoop.current()
self._setup_logging(logger)
# Attributes
if contact_address is None:
contact_address = dask.config.get("distributed.scheduler.contact-address")
self.contact_address = contact_address
if allowed_failures is None:
allowed_failures = dask.config.get("distributed.scheduler.allowed-failures")
self.allowed_failures = allowed_failures
if validate is None:
validate = dask.config.get("distributed.scheduler.validate")
self.proc = psutil.Process()
self.service_specs = services or {}
self.service_kwargs = service_kwargs or {}
self.services = {}
self.scheduler_file = scheduler_file
self.worker_ttl = parse_timedelta(
worker_ttl or dask.config.get("distributed.scheduler.worker-ttl")
)
self.idle_timeout = parse_timedelta(
idle_timeout or dask.config.get("distributed.scheduler.idle-timeout")
)
self.idle_since = time()
self.no_workers_timeout = parse_timedelta(
dask.config.get("distributed.scheduler.no-workers-timeout")
)
self._no_workers_since = None
self.time_started = self.idle_since # compatibility for dask-gateway
self._replica_lock = RLock()
self.bandwidth_workers = defaultdict(float)
self.bandwidth_types = defaultdict(float)
# Don't cast int metrics to float
self.cumulative_worker_metrics = defaultdict(int)
if not preload:
preload = dask.config.get("distributed.scheduler.preload")
if not preload_argv:
preload_argv = dask.config.get("distributed.scheduler.preload-argv")
self.preloads = preloading.process_preloads(self, preload, preload_argv)
if isinstance(security, dict):
security = Security(**security)
self.security = security or Security()
assert isinstance(self.security, Security)
self.connection_args = self.security.get_connection_args("scheduler")
self.connection_args["handshake_overrides"] = { # common denominator
"pickle-protocol": 4
}
self._start_address = addresses_from_user_args(
host=host,
port=port,
interface=interface,
protocol=protocol,
security=security,
default_port=self.default_port,
)
http_server_modules = dask.config.get("distributed.scheduler.http.routes")
show_dashboard = dashboard or (dashboard is None and dashboard_address)
# install vanilla route if show_dashboard but bokeh is not installed
if show_dashboard:
try:
> import distributed.dashboard.scheduler
E File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
E rf"/{prefix.strip("/")}/?",
E ^
E SyntaxError: f-string: unmatched '('
distributed/scheduler.py:3783: SyntaxError
The above exception was the direct cause of the following exception:
cleanup = None
def test_basic_no_loop(cleanup):
loop = None
try:
> with LocalCluster(
n_workers=0, silence_logs=False, dashboard_address=":0", loop=None
) as cluster:
distributed/deploy/tests/test_adaptive.py:267:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/deploy/local.py:256: in __init__
super().__init__(
distributed/deploy/spec.py:284: in __init__
self.sync(self._start)
distributed/utils.py:363: in sync
return sync(
distributed/utils.py:439: in sync
raise error
distributed/utils.py:413: in f
result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/tornado/gen.py:769: in run
value = future.result()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = LocalCluster(dc099765, '<Not Connected>', workers=0, threads=0, memory=0 B)
async def _start(self):
while self.status == Status.starting:
await asyncio.sleep(0.01)
if self.status == Status.running:
return
if self.status == Status.closed:
raise ValueError("Cluster is closed")
self._lock = asyncio.Lock()
self.status = Status.starting
if self.scheduler_spec is None:
try:
import distributed.dashboard # noqa: F401
except ImportError:
pass
else:
options = {"dashboard": True}
self.scheduler_spec = {"cls": Scheduler, "options": options}
try:
# Check if scheduler has already been created by a subclass
if self.scheduler is None:
cls = self.scheduler_spec["cls"]
if isinstance(cls, str):
cls = import_term(cls)
self.scheduler = cls(**self.scheduler_spec.get("options", {}))
self.scheduler = await self.scheduler
self.scheduler_comm = rpc(
getattr(self.scheduler, "external_address", None)
or self.scheduler.address,
connection_args=self.security.get_connection_args("client"),
)
await super()._start()
except Exception as e: # pragma: no cover
self.status = Status.failed
await self._close()
> raise RuntimeError(f"Cluster failed to start: {e}") from e
E RuntimeError: Cluster failed to start: f-string: unmatched '(' (scheduler.py, line 186)
distributed/deploy/spec.py:335: RuntimeError
Check failure on line 0 in distributed.deploy.tests.test_adaptive
github-actions / Unit Test Results
2 out of 5 runs with error: test_basic_no_loop (distributed.deploy.tests.test_adaptive)
artifacts/ubuntu-latest-mindeps-default-notci1/pytest.xml [took 5s]
artifacts/ubuntu-latest-mindeps-numpy-notci1/pytest.xml [took 5s]
Raw output
failed on teardown with "pytest.PytestUnraisableExceptionWarning: Exception ignored in: <coroutine object wait_for at 0x7f9756194c10>
Traceback (most recent call last):
File "/home/runner/miniconda3/envs/dask-distributed/lib/python3.10/warnings.py", line 506, in _warn_unawaited_coroutine
warn(msg, category=RuntimeWarning, stacklevel=2, source=coro)
RuntimeWarning: coroutine 'wait_for' was never awaited"
@pytest.fixture
def cleanup():
> with clean():
distributed/utils_test.py:1792:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.10/contextlib.py:142: in __exit__
next(self.gen)
distributed/utils_test.py:1784: in clean
with check_thread_leak() if threads else nullcontext():
../../../miniconda3/envs/dask-distributed/lib/python3.10/contextlib.py:142: in __exit__
next(self.gen)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
@contextmanager
def check_thread_leak():
"""Context manager to ensure we haven't leaked any threads"""
active_threads_start = threading.enumerate()
yield
start = time()
while True:
bad_threads = [
thread
for thread in threading.enumerate()
if thread not in active_threads_start
# FIXME this looks like a genuine leak that needs fixing
and "watch message queue" not in thread.name
]
if not bad_threads:
break
else:
sleep(0.01)
if time() > start + 5:
# Raise an error with information about leaked threads
from distributed import profile
frames = sys._current_frames()
try:
lines = [f"{len(bad_threads)} thread(s) were leaked from test\n"]
for i, thread in enumerate(bad_threads, 1):
lines.append(
f"------ Call stack of leaked thread {i}/{len(bad_threads)}: {thread} ------"
)
lines.append(
"".join(profile.call_stack(frames[thread.ident]))
# NOTE: `call_stack` already adds newlines
)
finally:
del frames
> pytest.fail("\n".join(lines), pytrace=False)
E Failed: 1 thread(s) were leaked from test
E
E ------ Call stack of leaked thread 1/1: <Thread(IO loop, started daemon 140287892522688)> ------
E File "/home/runner/miniconda3/envs/dask-distributed/lib/python3.10/threading.py", line 973, in _bootstrap
E self._bootstrap_inner()
E File "/home/runner/miniconda3/envs/dask-distributed/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
E self.run()
E File "/home/runner/miniconda3/envs/dask-distributed/lib/python3.10/threading.py", line 953, in run
E self._target(*self._args, **self._kwargs)
E File "/home/runner/work/distributed/distributed/distributed/utils.py", line 450, in wrapper
E target()
E File "/home/runner/work/distributed/distributed/distributed/utils.py", line 558, in run_loop
E asyncio_run(amain(), loop_factory=get_loop_factory())
E File "/home/runner/work/distributed/distributed/distributed/compatibility.py", line 236, in asyncio_run
E return loop.run_until_complete(main)
E File "/home/runner/miniconda3/envs/dask-distributed/lib/python3.10/asyncio/base_events.py", line 636, in run_until_complete
E self.run_forever()
E File "/home/runner/miniconda3/envs/dask-distributed/lib/python3.10/asyncio/base_events.py", line 603, in run_forever
E self._run_once()
E File "/home/runner/miniconda3/envs/dask-distributed/lib/python3.10/asyncio/base_events.py", line 1871, in _run_once
E event_list = self._selector.select(timeout)
E File "/home/runner/miniconda3/envs/dask-distributed/lib/python3.10/selectors.py", line 469, in select
E fd_event_list = self._selector.poll(timeout, max_ev)
distributed/utils_test.py:1629: Failed
During handling of the above exception, another exception occurred:
cls = <class '_pytest.runner.CallInfo'>
func = <function call_and_report.<locals>.<lambda> at 0x7f97577c79a0>
when = 'teardown'
reraise = (<class '_pytest.outcomes.Exit'>, <class 'KeyboardInterrupt'>)
@classmethod
def from_call(
cls,
func: Callable[[], TResult],
when: Literal["collect", "setup", "call", "teardown"],
reraise: type[BaseException] | tuple[type[BaseException], ...] | None = None,
) -> CallInfo[TResult]:
"""Call func, wrapping the result in a CallInfo.
:param func:
The function to call. Called without arguments.
:type func: Callable[[], _pytest.runner.TResult]
:param when:
The phase in which the function is called.
:param reraise:
Exception or exceptions that shall propagate if raised by the
function, instead of being wrapped in the CallInfo.
"""
excinfo = None
start = timing.time()
precise_start = timing.perf_counter()
try:
> result: TResult | None = func()
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/_pytest/runner.py:341:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/_pytest/runner.py:242: in <lambda>
lambda: runtest_hook(item=item, **kwds), when=when, reraise=reraise
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/pluggy/_hooks.py:513: in __call__
return self._hookexec(self.name, self._hookimpls.copy(), kwargs, firstresult)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/pluggy/_manager.py:120: in _hookexec
return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/_pytest/threadexception.py:97: in pytest_runtest_teardown
yield from thread_exception_runtest_hook()
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/_pytest/threadexception.py:68: in thread_exception_runtest_hook
yield
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/_pytest/unraisableexception.py:100: in pytest_runtest_teardown
yield from unraisable_exception_runtest_hook()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
def unraisable_exception_runtest_hook() -> Generator[None]:
with catch_unraisable_exception() as cm:
try:
yield
finally:
if cm.unraisable:
if cm.unraisable.err_msg is not None:
err_msg = cm.unraisable.err_msg
else:
err_msg = "Exception ignored in"
msg = f"{err_msg}: {cm.unraisable.object!r}\n\n"
msg += "".join(
traceback.format_exception(
cm.unraisable.exc_type,
cm.unraisable.exc_value,
cm.unraisable.exc_traceback,
)
)
> warnings.warn(pytest.PytestUnraisableExceptionWarning(msg))
E pytest.PytestUnraisableExceptionWarning: Exception ignored in: <coroutine object wait_for at 0x7f9756194c10>
E
E Traceback (most recent call last):
E File "/home/runner/miniconda3/envs/dask-distributed/lib/python3.10/warnings.py", line 506, in _warn_unawaited_coroutine
E warn(msg, category=RuntimeWarning, stacklevel=2, source=coro)
E RuntimeWarning: coroutine 'wait_for' was never awaited
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/_pytest/unraisableexception.py:85: PytestUnraisableExceptionWarning
Check warning on line 0 in distributed.deploy.tests.test_adaptive
github-actions / Unit Test Results
2 out of 3 runs failed: test_target_duration[5] (distributed.deploy.tests.test_adaptive)
artifacts/ubuntu-latest-mindeps-default-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-notci1/pytest.xml [took 0s]
Raw output
RuntimeError: Cluster failed to start: f-string: unmatched '(' (scheduler.py, line 186)
self = LocalCluster(b5ccde71, '<Not Connected>', workers=0, threads=0, memory=0 B)
async def _start(self):
while self.status == Status.starting:
await asyncio.sleep(0.01)
if self.status == Status.running:
return
if self.status == Status.closed:
raise ValueError("Cluster is closed")
self._lock = asyncio.Lock()
self.status = Status.starting
if self.scheduler_spec is None:
try:
import distributed.dashboard # noqa: F401
except ImportError:
pass
else:
options = {"dashboard": True}
self.scheduler_spec = {"cls": Scheduler, "options": options}
try:
# Check if scheduler has already been created by a subclass
if self.scheduler is None:
cls = self.scheduler_spec["cls"]
if isinstance(cls, str):
cls = import_term(cls)
> self.scheduler = cls(**self.scheduler_spec.get("options", {}))
distributed/deploy/spec.py:324:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Scheduler' object has no attribute '_address'") raised in repr()] Scheduler object at 0x7f97563535e0>
loop = None, services = {}, service_kwargs = None, allowed_failures = 3
extensions = None, validate = True, scheduler_file = None
security = Security(require_encryption=False, tls_min_version=771)
worker_ttl = None, idle_timeout = None, interface = None, host = None, port = 0
protocol = 'inproc://', dashboard_address = ':0', dashboard = True
http_prefix = '/', preload = [], preload_argv = [], plugins = ()
contact_address = None, transition_counter_max = False, jupyter = False
kwargs = {'blocked_handlers': None}
http_server_modules = ['distributed.http.scheduler.prometheus', 'distributed.http.scheduler.info', 'distributed.http.scheduler.json', 'distributed.http.health', 'distributed.http.proxy', 'distributed.http.statics']
show_dashboard = True
def __init__(
self,
loop=None,
services=None,
service_kwargs=None,
allowed_failures=None,
extensions=None,
validate=None,
scheduler_file=None,
security=None,
worker_ttl=None,
idle_timeout=None,
interface=None,
host=None,
port=0,
protocol=None,
dashboard_address=None,
dashboard=None,
http_prefix="/",
preload=None,
preload_argv=(),
plugins=(),
contact_address=None,
transition_counter_max=False,
jupyter=False,
**kwargs,
):
if dask.config.get("distributed.scheduler.pickle", default=True) is False:
raise RuntimeError(
"Pickling can no longer be disabled with the `distributed.scheduler.pickle` option. Please remove this configuration to start the scheduler."
)
if loop is not None:
warnings.warn(
"the loop kwarg to Scheduler is deprecated",
DeprecationWarning,
stacklevel=2,
)
self.loop = self.io_loop = IOLoop.current()
self._setup_logging(logger)
# Attributes
if contact_address is None:
contact_address = dask.config.get("distributed.scheduler.contact-address")
self.contact_address = contact_address
if allowed_failures is None:
allowed_failures = dask.config.get("distributed.scheduler.allowed-failures")
self.allowed_failures = allowed_failures
if validate is None:
validate = dask.config.get("distributed.scheduler.validate")
self.proc = psutil.Process()
self.service_specs = services or {}
self.service_kwargs = service_kwargs or {}
self.services = {}
self.scheduler_file = scheduler_file
self.worker_ttl = parse_timedelta(
worker_ttl or dask.config.get("distributed.scheduler.worker-ttl")
)
self.idle_timeout = parse_timedelta(
idle_timeout or dask.config.get("distributed.scheduler.idle-timeout")
)
self.idle_since = time()
self.no_workers_timeout = parse_timedelta(
dask.config.get("distributed.scheduler.no-workers-timeout")
)
self._no_workers_since = None
self.time_started = self.idle_since # compatibility for dask-gateway
self._replica_lock = RLock()
self.bandwidth_workers = defaultdict(float)
self.bandwidth_types = defaultdict(float)
# Don't cast int metrics to float
self.cumulative_worker_metrics = defaultdict(int)
if not preload:
preload = dask.config.get("distributed.scheduler.preload")
if not preload_argv:
preload_argv = dask.config.get("distributed.scheduler.preload-argv")
self.preloads = preloading.process_preloads(self, preload, preload_argv)
if isinstance(security, dict):
security = Security(**security)
self.security = security or Security()
assert isinstance(self.security, Security)
self.connection_args = self.security.get_connection_args("scheduler")
self.connection_args["handshake_overrides"] = { # common denominator
"pickle-protocol": 4
}
self._start_address = addresses_from_user_args(
host=host,
port=port,
interface=interface,
protocol=protocol,
security=security,
default_port=self.default_port,
)
http_server_modules = dask.config.get("distributed.scheduler.http.routes")
show_dashboard = dashboard or (dashboard is None and dashboard_address)
# install vanilla route if show_dashboard but bokeh is not installed
if show_dashboard:
try:
> import distributed.dashboard.scheduler
E File "/home/runner/work/distributed/distributed/distributed/dashboard/scheduler.py", line 186
E rf"/{prefix.strip("/")}/?",
E ^
E SyntaxError: f-string: unmatched '('
distributed/scheduler.py:3783: SyntaxError
The above exception was the direct cause of the following exception:
target_duration = 5
@pytest.mark.parametrize("target_duration", [5, 1])
def test_target_duration(target_duration):
@gen_test()
async def _test():
with dask.config.set(
{
"distributed.scheduler.default-task-durations": {"slowinc": 1},
# adaptive target for queued tasks doesn't yet consider default or learned task durations
"distributed.scheduler.worker-saturation": float("inf"),
}
):
async with LocalCluster(
n_workers=0,
asynchronous=True,
processes=False,
silence_logs=False,
dashboard_address=":0",
) as cluster:
adapt = cluster.adapt(
interval="20ms", minimum=2, target_duration=target_duration
)
# FIXME: LocalCluster is starting workers with CPU_COUNT threads
# each
# The default target duration is set to 1s
max_scaleup = 5
n_tasks = target_duration * dask.system.CPU_COUNT * max_scaleup
async with Client(cluster, asynchronous=True) as client:
await client.wait_for_workers(2)
futures = client.map(slowinc, range(n_tasks), delay=0.3)
await wait(futures)
scaleup_recs = [
msg[1]["n"] for msg in adapt.log if msg[1].get("status") == "up"
]
assert 2 <= min(scaleup_recs) < max(scaleup_recs) <= max_scaleup
> _test()
distributed/deploy/tests/test_adaptive.py:316:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.10/contextlib.py:79: in inner
return func(*args, **kwds)
../../../miniconda3/envs/dask-distributed/lib/python3.10/contextlib.py:79: in inner
return func(*args, **kwds)
distributed/utils_test.py:748: in test_func
return _run_and_close_tornado(async_fn_outer, func, *args, **kwargs)
distributed/utils_test.py:380: in _run_and_close_tornado
return asyncio_run(inner_fn(), loop_factory=get_loop_factory())
distributed/compatibility.py:236: in asyncio_run
return loop.run_until_complete(main)
../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/base_events.py:649: in run_until_complete
return future.result()
distributed/utils_test.py:377: in inner_fn
return await async_fn(*args, **kwargs)
distributed/utils_test.py:738: in async_fn_outer
return await utils_wait_for(async_fn(*args, **kwargs), timeout)
distributed/utils.py:1915: in wait_for
return await asyncio.wait_for(fut, timeout)
../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/tasks.py:445: in wait_for
return fut.result()
distributed/deploy/tests/test_adaptive.py:290: in _test
async with LocalCluster(
distributed/deploy/spec.py:473: in __aenter__
await self
distributed/deploy/spec.py:418: in _
await self._start()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = LocalCluster(b5ccde71, '<Not Connected>', workers=0, threads=0, memory=0 B)
async def _start(self):
while self.status == Status.starting:
await asyncio.sleep(0.01)
if self.status == Status.running:
return
if self.status == Status.closed:
raise ValueError("Cluster is closed")
self._lock = asyncio.Lock()
self.status = Status.starting
if self.scheduler_spec is None:
try:
import distributed.dashboard # noqa: F401
except ImportError:
pass
else:
options = {"dashboard": True}
self.scheduler_spec = {"cls": Scheduler, "options": options}
try:
# Check if scheduler has already been created by a subclass
if self.scheduler is None:
cls = self.scheduler_spec["cls"]
if isinstance(cls, str):
cls = import_term(cls)
self.scheduler = cls(**self.scheduler_spec.get("options", {}))
self.scheduler = await self.scheduler
self.scheduler_comm = rpc(
getattr(self.scheduler, "external_address", None)
or self.scheduler.address,
connection_args=self.security.get_connection_args("client"),
)
await super()._start()
except Exception as e: # pragma: no cover
self.status = Status.failed
await self._close()
> raise RuntimeError(f"Cluster failed to start: {e}") from e
E RuntimeError: Cluster failed to start: f-string: unmatched '(' (scheduler.py, line 186)
distributed/deploy/spec.py:335: RuntimeError