Skip to content
Snippets Groups Projects
Commit 155d91c5 authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Split Worker.fetch_task() into fetch_task() and execute_task()

This makes the naming more consistent, and makes it easier to add more
functionality without growing an already-big function.
parent 53dc8189
Branches
Tags
No related merge requests found
......@@ -69,7 +69,7 @@ class FlamencoWorker:
task_is_silently_aborting = attr.ib(default=False, init=False,
validator=attr.validators.instance_of(bool))
fetch_task_task = attr.ib(
single_iteration_task = attr.ib(
default=None, init=False,
validator=attr.validators.optional(attr.validators.instance_of(asyncio.Task)))
asyncio_execution_task = attr.ib(
......@@ -258,19 +258,19 @@ class FlamencoWorker:
:param delay: delay in seconds, after which the task fetch will be performed.
"""
# The current task may still be running, as fetch_task() calls schedule_fetch_task() to
# The current task may still be running, as single_iteration() calls schedule_fetch_task() to
# schedule a future run. This may result in the task not being awaited when we are
# shutting down.
if self.shutdown_future.done():
self._log.warning('Shutting down, not scheduling another fetch-task task.')
return
self.fetch_task_task = asyncio.ensure_future(self.fetch_task(delay), loop=self.loop)
self.single_iteration_task = asyncio.ensure_future(self.single_iteration(delay), loop=self.loop)
async def stop_current_task(self):
"""Stops the current task by canceling the AsyncIO task.
This causes a CancelledError in the self.fetch_task() function, which then takes care
This causes a CancelledError in the self.single_iteration() function, which then takes care
of the task status change and subsequent activity push.
"""
......@@ -332,22 +332,22 @@ class FlamencoWorker:
Used in shutdown and when we're going to status 'asleep'.
"""
if self.fetch_task_task is None or self.fetch_task_task.done():
if self.single_iteration_task is None or self.single_iteration_task.done():
return
self._log.info('stopping task fetching task %s', self.fetch_task_task)
self.fetch_task_task.cancel()
self._log.info('stopping task fetching task %s', self.single_iteration_task)
self.single_iteration_task.cancel()
# This prevents a 'Task was destroyed but it is pending!' warning on the console.
# Sybren: I've only seen this in unit tests, so maybe this code should be moved
# there, instead.
try:
if not self.loop.is_running():
self.loop.run_until_complete(self.fetch_task_task)
self.loop.run_until_complete(self.single_iteration_task)
except asyncio.CancelledError:
pass
async def fetch_task(self, delay: float):
async def single_iteration(self, delay: float):
"""Fetches a single task to perform from Flamenco Manager, and executes it.
:param delay: waits this many seconds before fetching a task.
......@@ -373,6 +373,13 @@ class FlamencoWorker:
self.schedule_fetch_task(FETCH_TASK_FAILED_RETRY_DELAY)
return
task_info = await self.fetch_task()
if task_info is None:
return
await self.execute_task(task_info)
async def fetch_task(self) -> typing.Optional[dict]:
# TODO: use exponential backoff instead of retrying every fixed N seconds.
self._log.debug('Fetching task')
try:
......@@ -381,32 +388,35 @@ class FlamencoWorker:
self._log.warning('Error fetching new task, will retry in %i seconds: %s',
FETCH_TASK_FAILED_RETRY_DELAY, ex)
self.schedule_fetch_task(FETCH_TASK_FAILED_RETRY_DELAY)
return
return None
if resp.status_code == 204:
self._log.debug('No tasks available, will retry in %i seconds.',
FETCH_TASK_EMPTY_RETRY_DELAY)
self.schedule_fetch_task(FETCH_TASK_EMPTY_RETRY_DELAY)
return
return None
if resp.status_code == 423:
status_change = documents.StatusChangeRequest(**resp.json())
self._log.info('status change to %r requested when fetching new task',
status_change.status_requested)
self.change_status(status_change.status_requested)
return
return None
if resp.status_code != 200:
self._log.warning('Error %i fetching new task, will retry in %i seconds.',
resp.status_code, FETCH_TASK_FAILED_RETRY_DELAY)
self.schedule_fetch_task(FETCH_TASK_FAILED_RETRY_DELAY)
return
return None
task_info = resp.json()
self.task_id = task_info['_id']
self._log.info('Received task: %s', self.task_id)
self._log.debug('Received task: %s', task_info)
return task_info
async def execute_task(self, task_info: dict) -> None:
"""Feed a task to the task runner and monitor for exceptions."""
try:
await self.register_task_update(task_status='active')
self.asyncio_execution_task = asyncio.ensure_future(
......
......@@ -199,11 +199,11 @@ class TestWorkerTaskExecution(AbstractFWorkerTest):
self.worker.schedule_fetch_task()
self.manager.post.assert_not_called()
interesting_task = self.worker.fetch_task_task
self.loop.run_until_complete(self.worker.fetch_task_task)
interesting_task = self.worker.single_iteration_task
self.loop.run_until_complete(self.worker.single_iteration_task)
# Another fetch-task task should have been scheduled.
self.assertNotEqual(self.worker.fetch_task_task, interesting_task)
self.assertNotEqual(self.worker.single_iteration_task, interesting_task)
self.manager.post.assert_called_once_with('/task', loop=self.asyncio_loop)
self.tuqueue.queue.assert_has_calls([
......@@ -255,7 +255,7 @@ class TestWorkerTaskExecution(AbstractFWorkerTest):
await self.worker.stop_current_task()
asyncio.ensure_future(stop(), loop=self.loop)
self.loop.run_until_complete(self.worker.fetch_task_task)
self.loop.run_until_complete(self.worker.single_iteration_task)
self.assertTrue(stop_called)
......@@ -449,8 +449,8 @@ class WorkerSleepingTest(AbstractFWorkerTest):
self.worker.schedule_fetch_task()
with self.assertRaises(concurrent.futures.CancelledError):
self.loop.run_until_complete(self.worker.fetch_task_task)
self.loop.run_until_complete(self.worker.single_iteration_task)
self.assertIsNotNone(self.worker.sleeping_task)
self.assertFalse(self.worker.sleeping_task.done())
self.assertTrue(self.worker.fetch_task_task.done())
self.assertTrue(self.worker.single_iteration_task.done())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment