diff --git a/CHANGELOG.md b/CHANGELOG.md index ec4de270bfd8b3e9338244bacd299f8e1243631f..f103e21a260bd379c0142145e4702d05edf804b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,10 @@ This file logs the changes that are actually interesting to users (new features, changed functionality, fixed bugs). +## Version 2.3 (or 2.2.1, in development) + +- Fixed bug where an uncaught exception could make the Worker stop requesting tasks. + ## Version 2.2 (2019-01-11) diff --git a/flamenco_worker/worker.py b/flamenco_worker/worker.py index 9a48fb4204102c5ed8d7ea706816bfd2031998b7..0c149648319abc8be2b79506708c7c647e79c538 100644 --- a/flamenco_worker/worker.py +++ b/flamenco_worker/worker.py @@ -23,6 +23,8 @@ FETCH_TASK_FAILED_RETRY_DELAY = 10 # when we failed obtaining a task FETCH_TASK_EMPTY_RETRY_DELAY = 5 # when there are no tasks to perform FETCH_TASK_DONE_SCHEDULE_NEW_DELAY = 3 # after a task is completed ERROR_RETRY_DELAY = 600 # after the pre-task sanity check failed +UNCAUGHT_EXCEPTION_RETRY_DELAY = 60 # after single_iteration errored out + PUSH_LOG_MAX_ENTRIES = 1000 PUSH_LOG_MAX_INTERVAL = datetime.timedelta(seconds=30) @@ -308,10 +310,12 @@ class FlamencoWorker: return if ex is None: - self._log.debug('single iteration completed without exceptions') return self._log.error('Unhandled %s running single iteration: %s', type(ex).__name__, ex) + self._log.error('Bluntly going to reschedule another iteration in %d seconds', + UNCAUGHT_EXCEPTION_RETRY_DELAY) + self.schedule_fetch_task(UNCAUGHT_EXCEPTION_RETRY_DELAY) async def stop_current_task(self, task_id: str): """Stops the current task by canceling the AsyncIO task. diff --git a/tests/test_worker.py b/tests/test_worker.py index 21694032362e252bc0b633ac0dc87e6a892aaedb..7a8b84b2133b19cce4ec318af80ef8a5489a338b 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -340,6 +340,41 @@ class TestWorkerTaskExecution(AbstractFWorkerTest): self.assertEqual(self.tuqueue.queue.call_count, 2) + def test_uncaught_exception(self): + from unittest.mock import call + from tests.mock_responses import JsonResponse, CoroMock + + self.manager.post = CoroMock() + # response when fetching a task + self.manager.post.coro.return_value = JsonResponse({ + '_id': '58514d1e9837734f2e71b479', + 'job': '58514d1e9837734f2e71b477', + 'manager': '585a795698377345814d2f68', + 'project': '', + 'user': '580f8c66983773759afdb20e', + 'name': 'sleep-14-26', + 'status': 'processing', + 'priority': 50, + 'job_type': 'unittest', + 'task_type': 'sleep', + 'commands': [ + {'name': 'echo', 'settings': {'message': 'Preparing to sleep'}}, + {'name': 'sleep', 'settings': {'time_in_seconds': 3}} + ] + }) + + self.tuqueue.queue.side_effect = None + self.worker.schedule_fetch_task() + interesting_task = self.worker.single_iteration_fut + + with unittest.mock.patch('asyncio.sleep') as mock_sleep: + mock_sleep.side_effect = OSError('je moeder') + with self.assertRaises(OSError): + self.loop.run_until_complete(self.worker.single_iteration_fut) + + # Another fetch-task task should have been scheduled. + self.assertNotEqual(self.worker.single_iteration_fut, interesting_task) + class WorkerPushToMasterTest(AbstractFWorkerTest): def test_one_activity(self):