diff --git a/distributed/cli/dask_worker.py b/distributed/cli/dask_worker.py index bf0f96420d0..0d831186b47 100755 --- a/distributed/cli/dask_worker.py +++ b/distributed/cli/dask_worker.py @@ -210,6 +210,11 @@ help="Whether or not to restart the worker after the lifetime lapses. " "This assumes that you are using the --lifetime and --nanny keywords", ) +@click.option( + "--drain/--no-drain", + default=False, + help="Let the worker finish its current work before closing [default: --no-drain]", +) @click.option( "--preload", type=str, diff --git a/distributed/worker.py b/distributed/worker.py index c8a4bbd574e..118135d9866 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -402,6 +402,9 @@ class Worker(BaseWorker, ServerNode): lifetime_restart: bool Whether or not to restart a worker after it has reached its lifetime Default False + drain: bool + The worker is allowed to complete its assigned worker before closing. + Default False. kwargs: optional Additional parameters to ServerNode constructor @@ -528,6 +531,7 @@ def __init__( lifetime: Any | None = None, lifetime_stagger: Any | None = None, lifetime_restart: bool | None = None, + drain: bool = False, transition_counter_max: int | Literal[False] = False, ################################### # Parameters to WorkerMemoryManager @@ -859,6 +863,8 @@ def __init__( ) self.lifetime = lifetime + self.drain = drain + Worker._instances.add(self) ################ @@ -1685,13 +1691,30 @@ async def close_gracefully( This first informs the scheduler that we're shutting down, and asks it to move our data elsewhere. Afterwards, we close as normal """ + # `drain` mode waits for all tasks to finish before closing + # otherwise, we close immediately and unfinished tasks will be rescheduled or cancelled + if self.status in (Status.closing, Status.closing_gracefully): await self.finished() + await self.scheduler.retire_workers( + workers=[self.address], + close_workers=False, + remove=False, + stimulus_id=f"worker-drain-{time()}", + ) + if self.drain: + logger.warning( + f"Draining worker, waiting on {len(self.state.all_running_tasks)} tasks." + ) + while len(self.state.all_running_tasks): + await asyncio.sleep(0.1) + logger.warning("Draining has finished.") if self.status == Status.closed: return logger.info("Closing worker gracefully: %s. Reason: %s", self.address, reason) + # Wait for all tasks to leave the worker and don't accept any new ones. # Scheduler.retire_workers will set the status to closing_gracefully and push it # back to this worker. @@ -1703,6 +1726,7 @@ async def close_gracefully( ) if restart is None: restart = self.lifetime_restart + await self.close(nanny=not restart, reason=reason) async def wait_until_closed(self):