diff --git a/CHANGELOG.md b/CHANGELOG.md index fe0e1d3d90ea79b4e9ec05a34f6c50982177d0a1..b90e057df480834e9a434e9b694ac44b405ab3e8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ changed functionality, fixed bugs). - Fixed sending task status updates after the task may no longer be run. - Worker goes to sleep when receiving signal USR1 and wakes up after signal USR2. This is only supported on POSIX platforms that have those signals. +- Worker can be told to shut down by the Manager. The environment (for example systemd + on Linux) is responsible for restarting Flamenco Worker after such a shutdown. ## Version 2.0.8 (released 2017-09-07) diff --git a/flamenco_worker/cli.py b/flamenco_worker/cli.py index d1c3b8e24738ae1400121f813b7527d9336ebd23..4705a991e39bc963109ec957b8a329efd4f05093 100644 --- a/flamenco_worker/cli.py +++ b/flamenco_worker/cli.py @@ -136,13 +136,7 @@ def main(): asyncio.ensure_future(tuqueue.work(loop=loop)) mir_work_task = asyncio.ensure_future(mir.work()) - try: - loop.run_until_complete(fworker.startup()) - fworker.mainloop() - except worker.UnableToRegisterError: - # The worker will have logged something, we'll just shut down cleanly. - pass - except KeyboardInterrupt: + def do_clean_shutdown(): shutdown_future.cancel() mir_work_task.cancel() try: @@ -158,8 +152,19 @@ def main(): loop.stop() loop.run_until_complete(stop_loop()) + + try: + loop.run_until_complete(fworker.startup()) + fworker.mainloop() + except worker.UnableToRegisterError: + # The worker will have logged something, we'll just shut down cleanly. + pass + except KeyboardInterrupt: + do_clean_shutdown() except: log.exception('Uncaught exception!') + else: + do_clean_shutdown() # Report on the asyncio task status if args.verbose: diff --git a/flamenco_worker/worker.py b/flamenco_worker/worker.py index dcaf10b2b93dd77e4ac7fcf7cc0a6f0f8b2b5850..b7ee3abb73ad32866e189565e1332ebeea7f5ecb 100644 --- a/flamenco_worker/worker.py +++ b/flamenco_worker/worker.py @@ -578,6 +578,7 @@ class FlamencoWorker: status_change_handlers = { 'asleep': self.go_to_state_asleep, 'awake': self.go_to_state_awake, + 'shutdown': self.go_to_state_shutdown, } try: @@ -588,7 +589,7 @@ class FlamencoWorker: handler() - def ack_status_change(self, new_status: str): + def ack_status_change(self, new_status: str) -> asyncio.Task: """Confirm that we're now in a certain state. This ACK can be given without a request from the server, for example to support @@ -597,7 +598,7 @@ class FlamencoWorker: try: post = self.manager.post('/ack-status-change/%s' % new_status, loop=self.loop) - self.loop.create_task(post) + return self.loop.create_task(post) except Exception: self._log.exception('unable to notify Manager') @@ -620,6 +621,24 @@ class FlamencoWorker: self.schedule_fetch_task(3) self.ack_status_change('awake') + def go_to_state_shutdown(self): + """Shuts down the Flamenco Worker. + + Whether it comes back up depends on the environment. For example, + using systemd on Linux with Restart=always will do this. + """ + + self._log.info('Shutting down by request of the Flamenco Manager') + self.state = WorkerState.SHUTTING_DOWN + + # Ack the status change before doing the actual shutdown. + def stop_main_loop(*args): + self._log.debug('Stopping main loop (%r)', args) + self.loop.stop() + + post_task = self.ack_status_change('shutdown') + post_task.add_done_callback(stop_main_loop) + def stop_sleeping(self): """Stops the asyncio task for sleeping.""" if self.sleeping_task is None or self.sleeping_task.done(): diff --git a/system-integration/systemd/flamenco-worker.service b/system-integration/systemd/flamenco-worker.service index 998cd0d3e109e700637e72afc5723c8f6a9d22e1..7c42d2200f099cab831f8def25ad3dd479a75d6e 100644 --- a/system-integration/systemd/flamenco-worker.service +++ b/system-integration/systemd/flamenco-worker.service @@ -15,7 +15,7 @@ WorkingDirectory=/shared/bin/flamenco-worker User=guest Group=guest -RestartPreventExitStatus=0 SIGUSR1 SIGUSR2 +RestartPreventExitStatus=SIGUSR1 SIGUSR2 Restart=always RestartSec=1s