Skip to content
Snippets Groups Projects
Commit 6a737beb authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Worker: Made shutdown-behaviour more reliable, and fixed unittest.

parent 0493ff50
No related branches found
No related tags found
No related merge requests found
...@@ -112,14 +112,28 @@ class FlamencoWorker: ...@@ -112,14 +112,28 @@ class FlamencoWorker:
# The current task may still be running, as fetch_task() calls schedule_fetch_task() to # The current task may still be running, as fetch_task() calls schedule_fetch_task() to
# schedule a future run. This may result in the task not being awaited when we are # schedule a future run. This may result in the task not being awaited when we are
# shutting down. # shutting down.
if self.shutdown_future.done():
self.log.warning('Shutting down, not scheduling another fetch-task task.')
return
self.fetch_task_task = asyncio.ensure_future(self.fetch_task(delay), loop=self.loop) self.fetch_task_task = asyncio.ensure_future(self.fetch_task(delay), loop=self.loop)
def shutdown(self): def shutdown(self):
"""Gracefully shuts down any asynchronous tasks.""" """Gracefully shuts down any asynchronous tasks."""
if self.fetch_task_task and not self.fetch_task_task.done(): if self.fetch_task_task is None or self.fetch_task_task.done():
self._log.info('Cancelling task fetching task %s', self.fetch_task_task) return
self.fetch_task_task.cancel()
self._log.info('shutdown(): Cancelling task fetching task %s', self.fetch_task_task)
self.fetch_task_task.cancel()
# This prevents a 'Task was destroyed but it is pending!' warning on the console.
# Sybren: I've only seen this in unit tests, so maybe this code should be moved
# there, instead.
try:
self.loop.run_until_complete(self.fetch_task_task)
except asyncio.CancelledError:
pass
async def fetch_task(self, delay: float): async def fetch_task(self, delay: float):
"""Fetches a single task to perform from Flamenco Manager. """Fetches a single task to perform from Flamenco Manager.
......
...@@ -28,6 +28,20 @@ class JsonResponse: ...@@ -28,6 +28,20 @@ class JsonResponse:
raise requests.HTTPError(self.status_code) raise requests.HTTPError(self.status_code)
@attr.s
class EmptyResponse:
"""Mocked HTTP response returning an empty 204.
Maybe we want to switch to using unittest.mock.Mock for this,
or to using the responses package.
"""
status_code = attr.ib(default=204, validator=attr.validators.instance_of(int))
def raise_for_status(self):
pass
class AbstractWorkerTest(unittest.TestCase): class AbstractWorkerTest(unittest.TestCase):
def setUp(self): def setUp(self):
from flamenco_worker.upstream import FlamencoManager from flamenco_worker.upstream import FlamencoManager
...@@ -132,26 +146,47 @@ class TestWorkerTaskFetch(AbstractWorkerTest): ...@@ -132,26 +146,47 @@ class TestWorkerTaskFetch(AbstractWorkerTest):
self.loop.run_until_complete(stop_loop()) self.loop.run_until_complete(stop_loop())
def test_fetch_task_happy(self): def test_fetch_task_happy(self):
self.manager.post = Mock(return_value=JsonResponse({ from unittest.mock import call
'_id': '58514d1e9837734f2e71b479',
'job': '58514d1e9837734f2e71b477', self.manager.post = Mock()
'manager': '585a795698377345814d2f68', self.manager.post.side_effect = [
'project': '', # response when fetching a task
'user': '580f8c66983773759afdb20e', JsonResponse({
'name': 'sleep-14-26', '_id': '58514d1e9837734f2e71b479',
'status': 'processing', 'job': '58514d1e9837734f2e71b477',
'priority': 50, 'manager': '585a795698377345814d2f68',
'job_type': 'sleep', 'project': '',
'commands': [ 'user': '580f8c66983773759afdb20e',
{'name': 'echo', 'settings': {'message': 'Preparing to sleep'}}, 'name': 'sleep-14-26',
{'name': 'sleep', 'settings': {'time_in_seconds': 3}} 'status': 'processing',
] 'priority': 50,
})) 'job_type': 'sleep',
'commands': [
{'name': 'echo', 'settings': {'message': 'Preparing to sleep'}},
{'name': 'sleep', 'settings': {'time_in_seconds': 3}}
]
}),
# Responses after status updates
EmptyResponse(), # task becoming active
EmptyResponse(), # task becoming complete
]
self.worker.schedule_fetch_task() self.worker.schedule_fetch_task()
self.manager.post.assert_not_called() self.manager.post.assert_not_called()
self.run_loop_for(0.5) interesting_task = self.worker.fetch_task_task
self.manager.post.assert_called_once_with( self.loop.run_until_complete(self.worker.fetch_task_task)
'/task',
auth=(self.worker.worker_id, self.worker.worker_secret)) # Another fetch-task task should have been scheduled.
self.assertNotEqual(self.worker.fetch_task_task, interesting_task)
self.manager.post.assert_has_calls([
call('/task', auth=(self.worker.worker_id, self.worker.worker_secret)),
call('/tasks/58514d1e9837734f2e71b479/update',
json={'task_progress_percentage': 0, 'activity': '', 'command_progress_percentage': 0, 'task_status': 'active', 'current_command_idx': 0},
auth=(self.worker.worker_id, self.worker.worker_secret)),
call('/tasks/58514d1e9837734f2e71b479/update',
json={'task_progress_percentage': 0, 'activity': '', 'command_progress_percentage': 0, 'task_status': 'completed', 'current_command_idx': 0},
auth=(self.worker.worker_id, self.worker.worker_secret)),
])
self.assertEqual(self.manager.post.call_count, 3)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment