From 20b9cf289490bfa4111fa8a9d1506d42b6d2e14f Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= <sybren@stuvel.eu>
Date: Wed, 4 Jan 2017 18:08:28 +0100
Subject: [PATCH] Worker: push updates to manager before shutting down

---
 .../flamenco_worker/worker.py                      | 14 +++++++++++++-
 1 file changed, 13 insertions(+), 1 deletion(-)

diff --git a/packages/flamenco-worker-python/flamenco_worker/worker.py b/packages/flamenco-worker-python/flamenco_worker/worker.py
index a0d8cfcb..a68248da 100644
--- a/packages/flamenco-worker-python/flamenco_worker/worker.py
+++ b/packages/flamenco-worker-python/flamenco_worker/worker.py
@@ -164,6 +164,15 @@ class FlamencoWorker:
     def shutdown(self):
         """Gracefully shuts down any asynchronous tasks."""
 
+        push_act_sched = self._push_act_to_manager is not None \
+                         and not self._push_act_to_manager.done()
+        push_log_sched = self._push_log_to_manager is not None \
+                         and not self._push_log_to_manager.done()
+        if push_act_sched or push_log_sched:
+            # Try to push queued task updates to master before shutting down
+            self._log.info('shutdown(): pushing queued updates to manager')
+            self.loop.run_until_complete(self.push_to_manager())
+
         if self.fetch_task_task is None or self.fetch_task_task.done():
             return
 
@@ -244,7 +253,7 @@ class FlamencoWorker:
             # when we're in some infinite failure loop.
             self.schedule_fetch_task(FETCH_TASK_DONE_SCHEDULE_NEW_DELAY)
 
-    async def push_to_manager(self, *, delay: datetime.timedelta=None):
+    async def push_to_manager(self, *, delay: datetime.timedelta = None):
         """Updates a task's status and activity.
 
         Uses the TaskUpdateQueue to handle persistent queueing.
@@ -255,6 +264,9 @@ class FlamencoWorker:
             self._log.debug('Scheduled delayed push to master in %r seconds', delay_sec)
             await asyncio.sleep(delay_sec)
 
+            if self.shutdown_future.done():
+                self._log.info('Shutting down, not pushing changes to master.')
+
         self._log.info('Updating task %s with status %r and activity %r',
                        self.task_id, self.current_task_status, self.last_task_activity)
 
-- 
GitLab