diff --git a/CHANGELOG.md b/CHANGELOG.md index 42ce035695812eedf51f6a6b34c0dd230b1b3ce8..d99aeb0449890d839c3cc048a0e09c2b91de02dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,11 @@ This file logs the changes that are actually interesting to users (new features, changed functionality, fixed bugs). +## Version 2.0.9 (in development) + +- Fixed sending task status updates after the task may no longer be run. + + ## Version 2.0.8 (released 2017-09-07) - Fixed parsing of `--config` CLI param on Python 3.5 diff --git a/flamenco_worker/worker.py b/flamenco_worker/worker.py index 9fac48b019b6ecf5ef3dba06fb3770d4b63fd2c9..0b41cd497b1a510786515e63089355b9a77d2413 100644 --- a/flamenco_worker/worker.py +++ b/flamenco_worker/worker.py @@ -41,6 +41,12 @@ class FlamencoWorker: shutdown_future = attr.ib( validator=attr.validators.optional(attr.validators.instance_of(asyncio.Future))) + # When Manager tells us we may no longer run our current task, this is set to True. + # As a result, the cancelled state isn't pushed to Manager any more. It is reset + # to False when a new task is started. + task_is_silently_aborting = attr.ib(default=False, init=False, + validator=attr.validators.instance_of(bool)) + fetch_task_task = attr.ib( default=None, init=False, validator=attr.validators.optional(attr.validators.instance_of(asyncio.Task))) @@ -240,6 +246,7 @@ class FlamencoWorker: return self._log.warning('Stopping task %s', self.task_id) + self.task_is_silently_aborting = True try: await self.trunner.abort_current_task() @@ -247,6 +254,11 @@ class FlamencoWorker: self._log.info('asyncio task was canceled for task runner task %s', self.task_id) self.asyncio_execution_task.cancel() + await self.register_log('Worker %s stopped running this task,' + ' no longer allowed to run by Manager', self.worker_id) + await self.push_to_manager() + await self.tuqueue.flush_and_report(loop=self.loop) + def shutdown(self): """Gracefully shuts down any asynchronous tasks.""" @@ -298,6 +310,8 @@ class FlamencoWorker: import traceback import requests + self._cleanup_state_for_new_task() + self._log.debug('Going to fetch task in %s seconds', delay) await asyncio.sleep(delay) @@ -349,6 +363,9 @@ class FlamencoWorker: if self.failures_are_acceptable: self._log.warning('Task %s was cancelled, but ignoring it since ' 'we are shutting down.', self.task_id) + elif self.task_is_silently_aborting: + self._log.warning('Task %s was cancelled, but ignoring it since ' + 'we are no longer allowed to run it.', self.task_id) else: self._log.warning('Task %s was cancelled', self.task_id) await self.register_task_update(task_status='canceled', @@ -373,6 +390,13 @@ class FlamencoWorker: # the world when we're in some infinite failure loop. self.schedule_fetch_task(FETCH_TASK_DONE_SCHEDULE_NEW_DELAY) + def _cleanup_state_for_new_task(self): + """Cleans up internal state to prepare for a new task to be executed.""" + + self.last_task_activity = documents.Activity() + self.task_is_silently_aborting = False + self.current_task_status = '' + async def push_to_manager(self, *, delay: datetime.timedelta = None): """Updates a task's status and activity. @@ -390,8 +414,13 @@ class FlamencoWorker: self._log.info('Updating task %s with status %r and activity %r', self.task_id, self.current_task_status, self.last_task_activity) - payload = attr.asdict(self.last_task_activity) - payload['task_status'] = self.current_task_status + if self.task_is_silently_aborting: + self._log.info('push_to_manager: task is silently aborting, will only push logs') + payload = {} + else: + payload = attr.asdict(self.last_task_activity) + if self.current_task_status: + payload['task_status'] = self.current_task_status now = datetime.datetime.now() self.last_activity_push = now @@ -410,6 +439,10 @@ class FlamencoWorker: if self._push_log_to_manager is not None: self._push_log_to_manager.cancel() + if not payload: + self._log.debug('push_to_manager: nothing to push') + return + self.tuqueue.queue('/tasks/%s/update' % self.task_id, payload, loop=self.loop) async def register_task_update(self, *, diff --git a/tests/test_worker.py b/tests/test_worker.py index 1542696cde391cef853b82fa3f5ed7620caa1d6b..e2016262a23458ded6200a18468bd62cb032dabc 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -71,7 +71,6 @@ class WorkerStartupTest(AbstractFWorkerTest): @unittest.mock.patch('socket.gethostname') @unittest.mock.patch('flamenco_worker.config.merge_with_home_config') def test_startup_already_registered(self, mock_merge_with_home_config, mock_gethostname): - from mock_responses import EmptyResponse, CoroMock mock_gethostname.return_value = 'ws-unittest' @@ -224,7 +223,6 @@ class TestWorkerTaskExecution(AbstractFWorkerTest): def test_stop_current_task(self): """Test that stopped tasks get status 'canceled'.""" - from unittest.mock import call from mock_responses import JsonResponse, CoroMock self.manager.post = CoroMock() @@ -248,6 +246,7 @@ class TestWorkerTaskExecution(AbstractFWorkerTest): self.worker.schedule_fetch_task() stop_called = False + async def stop(): nonlocal stop_called stop_called = True @@ -261,20 +260,22 @@ class TestWorkerTaskExecution(AbstractFWorkerTest): self.assertTrue(stop_called) self.manager.post.assert_called_once_with('/task', loop=self.asyncio_loop) - self.tuqueue.queue.assert_has_calls([ - call('/tasks/58514d1e9837734f2e71b479/update', - {'task_progress_percentage': 0, 'activity': '', - 'command_progress_percentage': 0, 'task_status': 'active', - 'current_command_idx': 0}, - loop=self.loop, - ), - call('/tasks/58514d1e9837734f2e71b479/update', - {'task_progress_percentage': 0, 'activity': 'Task was canceled', - 'command_progress_percentage': 0, 'task_status': 'canceled', - 'current_command_idx': 0}, - loop=self.loop, - ) - ]) + self.tuqueue.queue.assert_any_call( + '/tasks/58514d1e9837734f2e71b479/update', + {'task_progress_percentage': 0, 'activity': '', + 'command_progress_percentage': 0, 'task_status': 'active', + 'current_command_idx': 0}, + loop=self.loop, + ) + + # A bit clunky because we don't know which timestamp is included in the log line. + last_args, last_kwargs = self.tuqueue.queue.call_args + self.assertEqual(last_args[0], '/tasks/58514d1e9837734f2e71b479/update') + self.assertEqual(last_kwargs, {'loop': self.loop}) + self.assertIn('log', last_args[1]) + self.assertTrue(last_args[1]['log'].endswith( + 'Worker 1234 stopped running this task, no longer allowed to run by Manager')) + self.assertEqual(self.tuqueue.queue.call_count, 2) @@ -388,7 +389,6 @@ class WorkerPushToMasterTest(AbstractFWorkerTest): self.assertTrue(self.worker._push_log_to_manager.cancelled()) - class WorkerShutdownTest(AbstractWorkerTest): def setUp(self): from flamenco_worker.cli import construct_asyncio_loop