From 421ec4b65833aa535d5b3b5db3a7307bfdc240d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= <sybren@stuvel.eu> Date: Fri, 16 Nov 2018 15:45:39 +0100 Subject: [PATCH] Only stop task if task-to-stop is the same as currently-executing-task The 'may-i-run' endpoint may say 'no' to us running task X, but when the response comes in and is handled the worker may already be working on task Y. This case is now recognised, and the 'no' will be ignored. --- flamenco_worker/may_i_run.py | 4 +-- flamenco_worker/worker.py | 11 ++++++- tests/test_worker.py | 57 +++++++++++++++++++++++++++++++++++- 3 files changed, 68 insertions(+), 4 deletions(-) diff --git a/flamenco_worker/may_i_run.py b/flamenco_worker/may_i_run.py index 5eec5ffd..25404745 100644 --- a/flamenco_worker/may_i_run.py +++ b/flamenco_worker/may_i_run.py @@ -38,11 +38,11 @@ class MayIRun: return if await self.may_i_run(task_id): - self._log.debug('Current task may run') + self._log.debug('Current task %s may run', task_id) return self._log.warning('We have to stop task %s', task_id) - await self.worker.stop_current_task() + await self.worker.stop_current_task(task_id) async def may_i_run(self, task_id: str) -> bool: """Asks the Manager whether we are still allowed to run the given task.""" diff --git a/flamenco_worker/worker.py b/flamenco_worker/worker.py index 245f7bae..971d9e11 100644 --- a/flamenco_worker/worker.py +++ b/flamenco_worker/worker.py @@ -284,17 +284,26 @@ class FlamencoWorker: self.single_iteration_fut = asyncio.ensure_future(self.single_iteration(delay), loop=self.loop) - async def stop_current_task(self): + async def stop_current_task(self, task_id: str): """Stops the current task by canceling the AsyncIO task. This causes a CancelledError in the self.single_iteration() function, which then takes care of the task status change and subsequent activity push. + + :param task_id: the task ID to stop. Will only perform a stop if it + matches the currently executing task. This is to avoid race + conditions. """ if not self.asyncio_execution_fut or self.asyncio_execution_fut.done(): self._log.warning('stop_current_task() called but no task is running.') return + if self.task_id != task_id: + self._log.warning('stop_current_task(%r) called, but current task is %r, not stopping', + task_id, self.task_id) + return + self._log.warning('Stopping task %s', self.task_id) self.task_is_silently_aborting = True diff --git a/tests/test_worker.py b/tests/test_worker.py index 17c53de7..974ca2be 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -252,7 +252,7 @@ class TestWorkerTaskExecution(AbstractFWorkerTest): stop_called = True await asyncio.sleep(0.2) - await self.worker.stop_current_task() + await self.worker.stop_current_task(self.worker.task_id) asyncio.ensure_future(stop(), loop=self.loop) self.loop.run_until_complete(self.worker.single_iteration_fut) @@ -277,6 +277,61 @@ class TestWorkerTaskExecution(AbstractFWorkerTest): self.assertEqual(self.tuqueue.queue.call_count, 2) + def test_stop_current_task_mismatch(self): + + from tests.mock_responses import JsonResponse, CoroMock + + self.manager.post = CoroMock() + # response when fetching a task + self.manager.post.coro.return_value = JsonResponse({ + '_id': '58514d1e9837734f2e71b479', + 'job': '58514d1e9837734f2e71b477', + 'manager': '585a795698377345814d2f68', + 'project': '', + 'user': '580f8c66983773759afdb20e', + 'name': 'sleep-14-26', + 'status': 'processing', + 'priority': 50, + 'job_type': 'unittest', + 'task_type': 'sleep', + 'commands': [ + {'name': 'sleep', 'settings': {'time_in_seconds': 3}} + ] + }) + + self.worker.schedule_fetch_task() + + stop_called = False + + async def stop(): + nonlocal stop_called + stop_called = True + + await asyncio.sleep(0.2) + await self.worker.stop_current_task('other-task-id') + + asyncio.ensure_future(stop(), loop=self.loop) + self.loop.run_until_complete(self.worker.single_iteration_fut) + + self.assertTrue(stop_called) + + self.manager.post.assert_called_once_with('/task', loop=self.asyncio_loop) + self.tuqueue.queue.assert_any_call( + '/tasks/58514d1e9837734f2e71b479/update', + {'task_progress_percentage': 0, 'activity': '', + 'command_progress_percentage': 0, 'task_status': 'active', + 'current_command_idx': 0}, + ) + + # The task shouldn't be stopped, because the wrong task ID was requested to stop. + last_args, last_kwargs = self.tuqueue.queue.call_args + self.assertEqual(last_args[0], '/tasks/58514d1e9837734f2e71b479/update') + self.assertEqual(last_kwargs, {}) + self.assertIn('activity', last_args[1]) + self.assertEqual(last_args[1]['activity'], 'Task completed') + + self.assertEqual(self.tuqueue.queue.call_count, 2) + class WorkerPushToMasterTest(AbstractFWorkerTest): def test_one_activity(self): -- GitLab