From 23aaf142090396b1494bac0b3a064d513e02c4c3 Mon Sep 17 00:00:00 2001
From: tgalvin <tim.galvin@csiro.au>
Date: Wed, 24 May 2023 15:22:34 +0800
Subject: [PATCH 1/8] added initial drain to worker

---
 distributed/worker.py | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git a/distributed/worker.py b/distributed/worker.py
index e6642d278b1..4f8b9f890e1 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -385,6 +385,9 @@ class Worker(BaseWorker, ServerNode):
     lifetime_restart: bool
         Whether or not to restart a worker after it has reached its lifetime
         Default False
+    drain: bool
+        The worker is allowed to complete its assigned worker before closing.
+        Default False. 
     kwargs: optional
         Additional parameters to ServerNode constructor
 
@@ -511,6 +514,7 @@ def __init__(
         lifetime: Any | None = None,
         lifetime_stagger: Any | None = None,
         lifetime_restart: bool | None = None,
+        drain: bool = False,
         transition_counter_max: int | Literal[False] = False,
         ###################################
         # Parameters to WorkerMemoryManager
@@ -844,6 +848,8 @@ def __init__(
             )
         self.lifetime = lifetime
 
+        self.drain = drain
+
         Worker._instances.add(self)
 
     ################
@@ -1653,6 +1659,12 @@ async def close_gracefully(
         )
         if restart is None:
             restart = self.lifetime_restart
+        
+        if self.drain:
+            logger.info(f"Draining worker")
+            while self.state.executed_count:
+                await asyncio.sleep(0.1)
+                
         await self.close(nanny=not restart, reason=reason)
 
     async def wait_until_closed(self):

From 2f1c27fd6d0edd7d9c3265736d01e1520c0074e2 Mon Sep 17 00:00:00 2001
From: tgalvin <tim.galvin@csiro.au>
Date: Wed, 24 May 2023 16:23:51 +0800
Subject: [PATCH 2/8] added drain to worker cli

---
 distributed/cli/dask_worker.py | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/distributed/cli/dask_worker.py b/distributed/cli/dask_worker.py
index 91a824f4edc..73d0900a073 100755
--- a/distributed/cli/dask_worker.py
+++ b/distributed/cli/dask_worker.py
@@ -208,6 +208,11 @@
     help="Whether or not to restart the worker after the lifetime lapses. "
     "This assumes that you are using the --lifetime and --nanny keywords",
 )
+@click.option(
+    "--drain/--no-drain",
+    default=False,
+    help="Let the worker finish its current work before closing [default: --no-drain]",
+)
 @click.option(
     "--preload",
     type=str,

From eac4444780b3a2e44e2cd18c3b5d10b199fd5302 Mon Sep 17 00:00:00 2001
From: Tim Galvin <gal16b@petrichor-i1.cm.hpc>
Date: Mon, 29 May 2023 17:15:56 +1000
Subject: [PATCH 3/8] initial tweak to waiter

---
 distributed/worker.py | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)

diff --git a/distributed/worker.py b/distributed/worker.py
index 4f8b9f890e1..0d66eaac10f 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -1640,7 +1640,7 @@ async def close_gracefully(
 
         This first informs the scheduler that we're shutting down, and asks it
         to move our data elsewhere. Afterwards, we close as normal
-        """
+        """         
         if self.status in (Status.closing, Status.closing_gracefully):
             await self.finished()
 
@@ -1648,6 +1648,7 @@ async def close_gracefully(
             return
 
         logger.info("Closing worker gracefully: %s. Reason: %s", self.address, reason)
+            
         # Wait for all tasks to leave the worker and don't accept any new ones.
         # Scheduler.retire_workers will set the status to closing_gracefully and push it
         # back to this worker.
@@ -1659,12 +1660,13 @@ async def close_gracefully(
         )
         if restart is None:
             restart = self.lifetime_restart
-        
+                
         if self.drain:
-            logger.info(f"Draining worker")
-            while self.state.executed_count:
+            logger.info(f"Draining worker, waiting on {len(self.state.all_running_tasks)=} threads. ")
+            while len(self.state.all_running_tasks):
                 await asyncio.sleep(0.1)
-                
+            logger.info(f"Draining has finished. ")
+            
         await self.close(nanny=not restart, reason=reason)
 
     async def wait_until_closed(self):

From c4c30fc244d19c3a79203938b554d3a8d4bf7ba2 Mon Sep 17 00:00:00 2001
From: "Alec Thomson (S&A, Kensington WA)" <alec.thomson@csiro.au>
Date: Sat, 6 Jul 2024 14:27:11 +0800
Subject: [PATCH 4/8] Add docs

---
 distributed/worker.py | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/distributed/worker.py b/distributed/worker.py
index 79c5f2a3802..1bea662bf4e 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -1697,11 +1697,13 @@ async def close_gracefully(
         if restart is None:
             restart = self.lifetime_restart
                 
+        # `drain` mode waits for all tasks to finish before closing
+        # otherwise, we close immediately and unfinished tasks will be rescheduled or cancelled
         if self.drain:
             logger.info(f"Draining worker, waiting on {len(self.state.all_running_tasks)=} threads. ")
             while len(self.state.all_running_tasks):
                 await asyncio.sleep(0.1)
-            logger.info(f"Draining has finished. ")
+            logger.info("Draining has finished.")
             
         await self.close(nanny=not restart, reason=reason)
 

From 3bad6fcf1538b42d48123931470aac3614d48b7f Mon Sep 17 00:00:00 2001
From: "Alec Thomson (S&A, Kensington WA)" <alec.thomson@csiro.au>
Date: Sat, 6 Jul 2024 14:48:52 +0800
Subject: [PATCH 5/8] Fix order of operations

---
 distributed/worker.py | 21 ++++++++++++---------
 1 file changed, 12 insertions(+), 9 deletions(-)

diff --git a/distributed/worker.py b/distributed/worker.py
index 1bea662bf4e..68615cbc4cf 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -1676,7 +1676,18 @@ async def close_gracefully(
 
         This first informs the scheduler that we're shutting down, and asks it
         to move our data elsewhere. Afterwards, we close as normal
-        """         
+        """
+        # `drain` mode waits for all tasks to finish before closing
+        # otherwise, we close immediately and unfinished tasks will be rescheduled or cancelled
+        if self.drain:
+            n_tasks = len(self.state.all_running_tasks)
+            logger.warning(
+                f"Draining worker, waiting on {n_tasks=} threads."
+            )
+            while len(self.state.all_running_tasks):
+                await asyncio.sleep(0.1)
+            logger.warning("Draining has finished.")
+        
         if self.status in (Status.closing, Status.closing_gracefully):
             await self.finished()
 
@@ -1696,14 +1707,6 @@ async def close_gracefully(
         )
         if restart is None:
             restart = self.lifetime_restart
-                
-        # `drain` mode waits for all tasks to finish before closing
-        # otherwise, we close immediately and unfinished tasks will be rescheduled or cancelled
-        if self.drain:
-            logger.info(f"Draining worker, waiting on {len(self.state.all_running_tasks)=} threads. ")
-            while len(self.state.all_running_tasks):
-                await asyncio.sleep(0.1)
-            logger.info("Draining has finished.")
             
         await self.close(nanny=not restart, reason=reason)
 

From 0a7f925951418dfc833e046d730f09aac18913ad Mon Sep 17 00:00:00 2001
From: "Alec Thomson (S&A, Kensington WA)" <alec.thomson@csiro.au>
Date: Sat, 6 Jul 2024 15:01:51 +0800
Subject: [PATCH 6/8] Logging

---
 distributed/worker.py | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/distributed/worker.py b/distributed/worker.py
index 68615cbc4cf..17ac522bee6 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -1680,9 +1680,8 @@ async def close_gracefully(
         # `drain` mode waits for all tasks to finish before closing
         # otherwise, we close immediately and unfinished tasks will be rescheduled or cancelled
         if self.drain:
-            n_tasks = len(self.state.all_running_tasks)
             logger.warning(
-                f"Draining worker, waiting on {n_tasks=} threads."
+                f"Draining worker, waiting on {len(self.state.all_running_tasks)} tasks."
             )
             while len(self.state.all_running_tasks):
                 await asyncio.sleep(0.1)

From 39011d99dcc6dbfe2cf410c92151357156182e0b Mon Sep 17 00:00:00 2001
From: "Alec Thomson (S&A, Kensington WA)" <alec.thomson@csiro.au>
Date: Sat, 6 Jul 2024 15:22:38 +0800
Subject: [PATCH 7/8] Black

---
 distributed/worker.py | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/distributed/worker.py b/distributed/worker.py
index 17ac522bee6..54179285a07 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -386,7 +386,7 @@ class Worker(BaseWorker, ServerNode):
         Default False
     drain: bool
         The worker is allowed to complete its assigned worker before closing.
-        Default False. 
+        Default False.
     kwargs: optional
         Additional parameters to ServerNode constructor
 
@@ -1686,7 +1686,7 @@ async def close_gracefully(
             while len(self.state.all_running_tasks):
                 await asyncio.sleep(0.1)
             logger.warning("Draining has finished.")
-        
+
         if self.status in (Status.closing, Status.closing_gracefully):
             await self.finished()
 
@@ -1694,7 +1694,7 @@ async def close_gracefully(
             return
 
         logger.info("Closing worker gracefully: %s. Reason: %s", self.address, reason)
-            
+
         # Wait for all tasks to leave the worker and don't accept any new ones.
         # Scheduler.retire_workers will set the status to closing_gracefully and push it
         # back to this worker.
@@ -1706,7 +1706,7 @@ async def close_gracefully(
         )
         if restart is None:
             restart = self.lifetime_restart
-            
+
         await self.close(nanny=not restart, reason=reason)
 
     async def wait_until_closed(self):

From 2efde3312d2ea3b7d674c75045fe1f62d2f55b79 Mon Sep 17 00:00:00 2001
From: "Alec Thomson (S&A, Kensington WA)" <alec.thomson@csiro.au>
Date: Tue, 9 Jul 2024 16:08:19 +0800
Subject: [PATCH 8/8] Add a retire

---
 distributed/worker.py | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)

diff --git a/distributed/worker.py b/distributed/worker.py
index 54179285a07..00b69398d83 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -1679,6 +1679,15 @@ async def close_gracefully(
         """
         # `drain` mode waits for all tasks to finish before closing
         # otherwise, we close immediately and unfinished tasks will be rescheduled or cancelled
+
+        if self.status in (Status.closing, Status.closing_gracefully):
+            await self.finished()
+        await self.scheduler.retire_workers(
+            workers=[self.address],
+            close_workers=False,
+            remove=False,
+            stimulus_id=f"worker-drain-{time()}",
+        )
         if self.drain:
             logger.warning(
                 f"Draining worker, waiting on {len(self.state.all_running_tasks)} tasks."
@@ -1687,9 +1696,6 @@ async def close_gracefully(
                 await asyncio.sleep(0.1)
             logger.warning("Draining has finished.")
 
-        if self.status in (Status.closing, Status.closing_gracefully):
-            await self.finished()
-
         if self.status == Status.closed:
             return