From 15d58762d5ceae2a97b3cd00f7ff88d9d495dd6b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= <sybren@stuvel.eu>
Date: Fri, 20 Oct 2017 13:30:52 +0200
Subject: [PATCH] Allow shutting down the worker from the Manager.

When the worker receives the "shutdown" state it'll cleanly shut down, and
eventually go to "offline" state.
---
 CHANGELOG.md                                  |  2 ++
 flamenco_worker/cli.py                        | 19 +++++++++------
 flamenco_worker/worker.py                     | 23 +++++++++++++++++--
 .../systemd/flamenco-worker.service           |  2 +-
 4 files changed, 36 insertions(+), 10 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index fe0e1d3d..b90e057d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -12,6 +12,8 @@ changed functionality, fixed bugs).
 - Fixed sending task status updates after the task may no longer be run.
 - Worker goes to sleep when receiving signal USR1 and wakes up after signal USR2.
   This is only supported on POSIX platforms that have those signals.
+- Worker can be told to shut down by the Manager. The environment (for example systemd
+  on Linux) is responsible for restarting Flamenco Worker after such a shutdown.
 
 
 ## Version 2.0.8 (released 2017-09-07)
diff --git a/flamenco_worker/cli.py b/flamenco_worker/cli.py
index d1c3b8e2..4705a991 100644
--- a/flamenco_worker/cli.py
+++ b/flamenco_worker/cli.py
@@ -136,13 +136,7 @@ def main():
     asyncio.ensure_future(tuqueue.work(loop=loop))
     mir_work_task = asyncio.ensure_future(mir.work())
 
-    try:
-        loop.run_until_complete(fworker.startup())
-        fworker.mainloop()
-    except worker.UnableToRegisterError:
-        # The worker will have logged something, we'll just shut down cleanly.
-        pass
-    except KeyboardInterrupt:
+    def do_clean_shutdown():
         shutdown_future.cancel()
         mir_work_task.cancel()
         try:
@@ -158,8 +152,19 @@ def main():
             loop.stop()
 
         loop.run_until_complete(stop_loop())
+
+    try:
+        loop.run_until_complete(fworker.startup())
+        fworker.mainloop()
+    except worker.UnableToRegisterError:
+        # The worker will have logged something, we'll just shut down cleanly.
+        pass
+    except KeyboardInterrupt:
+        do_clean_shutdown()
     except:
         log.exception('Uncaught exception!')
+    else:
+        do_clean_shutdown()
 
     # Report on the asyncio task status
     if args.verbose:
diff --git a/flamenco_worker/worker.py b/flamenco_worker/worker.py
index dcaf10b2..b7ee3abb 100644
--- a/flamenco_worker/worker.py
+++ b/flamenco_worker/worker.py
@@ -578,6 +578,7 @@ class FlamencoWorker:
         status_change_handlers = {
             'asleep': self.go_to_state_asleep,
             'awake': self.go_to_state_awake,
+            'shutdown': self.go_to_state_shutdown,
         }
 
         try:
@@ -588,7 +589,7 @@ class FlamencoWorker:
 
         handler()
 
-    def ack_status_change(self, new_status: str):
+    def ack_status_change(self, new_status: str) -> asyncio.Task:
         """Confirm that we're now in a certain state.
 
         This ACK can be given without a request from the server, for example to support
@@ -597,7 +598,7 @@ class FlamencoWorker:
 
         try:
             post = self.manager.post('/ack-status-change/%s' % new_status, loop=self.loop)
-            self.loop.create_task(post)
+            return self.loop.create_task(post)
         except Exception:
             self._log.exception('unable to notify Manager')
 
@@ -620,6 +621,24 @@ class FlamencoWorker:
         self.schedule_fetch_task(3)
         self.ack_status_change('awake')
 
+    def go_to_state_shutdown(self):
+        """Shuts down the Flamenco Worker.
+
+        Whether it comes back up depends on the environment. For example,
+        using systemd on Linux with Restart=always will do this.
+        """
+
+        self._log.info('Shutting down by request of the Flamenco Manager')
+        self.state = WorkerState.SHUTTING_DOWN
+
+        # Ack the status change before doing the actual shutdown.
+        def stop_main_loop(*args):
+            self._log.debug('Stopping main loop (%r)', args)
+            self.loop.stop()
+
+        post_task = self.ack_status_change('shutdown')
+        post_task.add_done_callback(stop_main_loop)
+
     def stop_sleeping(self):
         """Stops the asyncio task for sleeping."""
         if self.sleeping_task is None or self.sleeping_task.done():
diff --git a/system-integration/systemd/flamenco-worker.service b/system-integration/systemd/flamenco-worker.service
index 998cd0d3..7c42d220 100644
--- a/system-integration/systemd/flamenco-worker.service
+++ b/system-integration/systemd/flamenco-worker.service
@@ -15,7 +15,7 @@ WorkingDirectory=/shared/bin/flamenco-worker
 User=guest
 Group=guest
 
-RestartPreventExitStatus=0 SIGUSR1 SIGUSR2
+RestartPreventExitStatus=SIGUSR1 SIGUSR2
 Restart=always
 RestartSec=1s
 
-- 
GitLab