diff --git a/packages/flamenco-worker-python/flamenco_worker/worker.py b/packages/flamenco-worker-python/flamenco_worker/worker.py index a0d8cfcbd7b70d67796d79235c137ae5587835b8..a68248da13e033010da489bcc79533df6dfefbae 100644 --- a/packages/flamenco-worker-python/flamenco_worker/worker.py +++ b/packages/flamenco-worker-python/flamenco_worker/worker.py @@ -164,6 +164,15 @@ class FlamencoWorker: def shutdown(self): """Gracefully shuts down any asynchronous tasks.""" + push_act_sched = self._push_act_to_manager is not None \ + and not self._push_act_to_manager.done() + push_log_sched = self._push_log_to_manager is not None \ + and not self._push_log_to_manager.done() + if push_act_sched or push_log_sched: + # Try to push queued task updates to master before shutting down + self._log.info('shutdown(): pushing queued updates to manager') + self.loop.run_until_complete(self.push_to_manager()) + if self.fetch_task_task is None or self.fetch_task_task.done(): return @@ -244,7 +253,7 @@ class FlamencoWorker: # when we're in some infinite failure loop. self.schedule_fetch_task(FETCH_TASK_DONE_SCHEDULE_NEW_DELAY) - async def push_to_manager(self, *, delay: datetime.timedelta=None): + async def push_to_manager(self, *, delay: datetime.timedelta = None): """Updates a task's status and activity. Uses the TaskUpdateQueue to handle persistent queueing. @@ -255,6 +264,9 @@ class FlamencoWorker: self._log.debug('Scheduled delayed push to master in %r seconds', delay_sec) await asyncio.sleep(delay_sec) + if self.shutdown_future.done(): + self._log.info('Shutting down, not pushing changes to master.') + self._log.info('Updating task %s with status %r and activity %r', self.task_id, self.current_task_status, self.last_task_activity)