Skip to content

Commit a2a5aee

Browse files
authored
SpecCluster: add option to *not* shut down the scheduler when the cluster is closed (#9021)
1 parent 7df4356 commit a2a5aee

File tree

2 files changed

+35
-3
lines changed

2 files changed

+35
-3
lines changed

distributed/deploy/spec.py

+10-3
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,10 @@ class does handle all of the logic around asynchronously cleanly setting up
154154
Whether or not we should silence logging when setting up the cluster.
155155
name: str, optional
156156
A name to use when printing out the cluster, defaults to type name
157+
shutdown_on_close: bool
158+
Whether or not to close the cluster when the program exits
159+
shutdown_scheduler: bool
160+
Whether or not to shut down the scheduler when the cluster is closed
157161
158162
Examples
159163
--------
@@ -247,6 +251,7 @@ def __init__(
247251
name=None,
248252
shutdown_on_close=True,
249253
scheduler_sync_interval=1,
254+
shutdown_scheduler=True,
250255
):
251256
if loop is None and asynchronous:
252257
loop = IOLoop.current()
@@ -271,6 +276,7 @@ def __init__(
271276
self._correct_state_waiting = None
272277
self._name = name or type(self).__name__
273278
self.shutdown_on_close = shutdown_on_close
279+
self.shutdown_scheduler = shutdown_scheduler
274280

275281
super().__init__(
276282
asynchronous=asynchronous,
@@ -450,13 +456,14 @@ async def _close(self):
450456

451457
if self.scheduler_comm:
452458
async with self._lock:
453-
with suppress(OSError):
454-
await self.scheduler_comm.terminate()
459+
if self.shutdown_scheduler:
460+
with suppress(OSError):
461+
await self.scheduler_comm.terminate()
455462
await self.scheduler_comm.close_rpc()
456463
else:
457464
logger.warning("Cluster closed without starting up")
458465

459-
if self.scheduler:
466+
if self.scheduler and self.shutdown_scheduler:
460467
await self.scheduler.close()
461468
for w in self._created:
462469
assert w.status in {

distributed/deploy/tests/test_spec_cluster.py

+25
Original file line numberDiff line numberDiff line change
@@ -519,3 +519,28 @@ async def test_bad_close():
519519
await cluster.close()
520520

521521
assert not record
522+
523+
524+
@gen_test()
525+
async def test_shutdown_scheduler_disabled():
526+
async with SpecCluster(
527+
workers=worker_spec,
528+
scheduler=scheduler,
529+
asynchronous=True,
530+
shutdown_scheduler=False,
531+
) as cluster:
532+
s = cluster.scheduler
533+
assert isinstance(s, Scheduler)
534+
535+
assert s.status == Status.running
536+
537+
538+
@gen_test()
539+
async def test_shutdown_scheduler():
540+
async with SpecCluster(
541+
workers=worker_spec, scheduler=scheduler, asynchronous=True
542+
) as cluster:
543+
s = cluster.scheduler
544+
assert isinstance(s, Scheduler)
545+
546+
assert s.status == Status.closed

0 commit comments

Comments
 (0)