Skip to content
Snippets Groups Projects
Commit 826f53b3 authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Changed from 'task' to 'future'

This makes a clearer distinction between Flamenco tasks ('task') and
AsyncIO tasks ('future'). The AsyncIO Task class is a subclass of its
Future class.
parent 73439de8
No related branches found
No related tags found
No related merge requests found
......@@ -85,17 +85,17 @@ class FlamencoWorker:
task_is_silently_aborting = attr.ib(default=False, init=False,
validator=attr.validators.instance_of(bool))
single_iteration_task = attr.ib(
single_iteration_fut = attr.ib(
default=None, init=False,
validator=attr.validators.optional(attr.validators.instance_of(asyncio.Task)))
asyncio_execution_task = attr.ib(
validator=attr.validators.optional(attr.validators.instance_of(asyncio.Future)))
asyncio_execution_fut = attr.ib(
default=None, init=False,
validator=attr.validators.optional(attr.validators.instance_of(asyncio.Task)))
validator=attr.validators.optional(attr.validators.instance_of(asyncio.Future)))
# See self.sleeping()
sleeping_task = attr.ib(
sleeping_fut = attr.ib(
default=None, init=False,
validator=attr.validators.optional(attr.validators.instance_of(asyncio.Task)))
validator=attr.validators.optional(attr.validators.instance_of(asyncio.Future)))
task_id = attr.ib(
default=None, init=False,
......@@ -152,7 +152,7 @@ class FlamencoWorker:
def active_task_id(self) -> typing.Optional[str]:
"""Returns the task ID, but only if it is currently executing; returns None otherwise."""
if self.asyncio_execution_task is None or self.asyncio_execution_task.done():
if self.asyncio_execution_fut is None or self.asyncio_execution_fut.done():
return None
return self.task_id
......@@ -277,12 +277,12 @@ class FlamencoWorker:
# The current task may still be running, as single_iteration() calls schedule_fetch_task() to
# schedule a future run. This may result in the task not being awaited when we are
# shutting down.
if self.shutdown_future.done():
if self.shutdown_future is not None and self.shutdown_future.done():
self._log.warning('Shutting down, not scheduling another fetch-task task.')
return
self.single_iteration_task = asyncio.ensure_future(self.single_iteration(delay),
loop=self.loop)
self.single_iteration_fut = asyncio.ensure_future(self.single_iteration(delay),
loop=self.loop)
async def stop_current_task(self):
"""Stops the current task by canceling the AsyncIO task.
......@@ -291,7 +291,7 @@ class FlamencoWorker:
of the task status change and subsequent activity push.
"""
if not self.asyncio_execution_task or self.asyncio_execution_task.done():
if not self.asyncio_execution_fut or self.asyncio_execution_fut.done():
self._log.warning('stop_current_task() called but no task is running.')
return
......@@ -302,7 +302,7 @@ class FlamencoWorker:
await self.trunner.abort_current_task()
except asyncio.CancelledError:
self._log.info('asyncio task was canceled for task runner task %s', self.task_id)
self.asyncio_execution_task.cancel()
self.asyncio_execution_fut.cancel()
await self.register_log('Worker %s stopped running this task,'
' no longer allowed to run by Manager', self.worker_id)
......@@ -349,18 +349,18 @@ class FlamencoWorker:
Used in shutdown and when we're going to status 'asleep'.
"""
if self.single_iteration_task is None or self.single_iteration_task.done():
if self.single_iteration_fut is None or self.single_iteration_fut.done():
return
self._log.info('stopping task fetching task %s', self.single_iteration_task)
self.single_iteration_task.cancel()
self._log.info('stopping task fetching task %s', self.single_iteration_fut)
self.single_iteration_fut.cancel()
# This prevents a 'Task was destroyed but it is pending!' warning on the console.
# Sybren: I've only seen this in unit tests, so maybe this code should be moved
# there, instead.
try:
if not self.loop.is_running():
self.loop.run_until_complete(self.single_iteration_task)
self.loop.run_until_complete(self.single_iteration_fut)
except asyncio.CancelledError:
pass
......@@ -440,10 +440,10 @@ class FlamencoWorker:
"""Feed a task to the task runner and monitor for exceptions."""
try:
await self.register_task_update(task_status='active')
self.asyncio_execution_task = asyncio.ensure_future(
self.asyncio_execution_fut = asyncio.ensure_future(
self.trunner.execute(task_info, self),
loop=self.loop)
ok = await self.asyncio_execution_task
ok = await self.asyncio_execution_fut
if ok:
await self.register_task_update(
task_status='completed',
......@@ -671,8 +671,8 @@ class FlamencoWorker:
self._log.info('Going to sleep')
self.state = WorkerState.ASLEEP
self.stop_fetching_tasks()
self.sleeping_task = self.loop.create_task(self.sleeping())
self._log.debug('Created task %s', self.sleeping_task)
self.sleeping_fut = self.loop.create_task(self.sleeping())
self._log.debug('Created task %s', self.sleeping_fut)
self.ack_status_change('asleep')
def go_to_state_awake(self):
......@@ -681,7 +681,7 @@ class FlamencoWorker:
self._log.info('Waking up')
self.state = WorkerState.AWAKE
self.stop_sleeping()
self.schedule_fetch_task(3)
self.schedule_fetch_task(FETCH_TASK_DONE_SCHEDULE_NEW_DELAY)
self.ack_status_change('awake')
def go_to_state_shutdown(self):
......@@ -705,15 +705,15 @@ class FlamencoWorker:
self.state = WorkerState.ERROR
self._log.warning('Going to state %r', self.state.value)
self.ack_status_change(self.state.value)
self.sleeping_task = self.loop.create_task(self.sleeping_for_error())
self.sleeping_fut = self.loop.create_task(self.sleeping_for_error())
def stop_sleeping(self):
"""Stops the asyncio task for sleeping."""
if self.sleeping_task is None or self.sleeping_task.done():
if self.sleeping_fut is None or self.sleeping_fut.done():
return
self.sleeping_task.cancel()
self.sleeping_fut.cancel()
try:
self.sleeping_task.result()
self.sleeping_fut.result()
except (asyncio.CancelledError, asyncio.InvalidStateError):
pass
except Exception:
......
......@@ -44,12 +44,12 @@ class PretaskWriteCheckTest(AbstractFWorkerTest):
self.worker.pretask_check_params.pre_task_check_write = (testfile, )
self.worker.schedule_fetch_task()
self.asyncio_loop.run_until_complete(self.worker.single_iteration_task)
self.asyncio_loop.run_until_complete(self.worker.single_iteration_fut)
self.assertFalse(testfile.exists(), '%s should have been deleted' % testfile)
self.manager.post.assert_called_once_with('/task', loop=mock.ANY)
self.assertIsNone(self.worker.sleeping_task)
self.assertIsNone(self.worker.sleeping_fut)
def test_happy_not_remove_file(self):
from .mock_responses import EmptyResponse, CoroMock
......@@ -65,12 +65,12 @@ class PretaskWriteCheckTest(AbstractFWorkerTest):
self.worker.pretask_check_params.pre_task_check_write = (testfile, )
self.worker.schedule_fetch_task()
self.asyncio_loop.run_until_complete(self.worker.single_iteration_task)
self.asyncio_loop.run_until_complete(self.worker.single_iteration_fut)
self.assertTrue(testfile.exists(), '%s should not have been deleted' % testfile)
self.manager.post.assert_called_once_with('/task', loop=mock.ANY)
self.assertIsNone(self.worker.sleeping_task)
self.assertIsNone(self.worker.sleeping_fut)
@contextlib.contextmanager
def write_check(self, post_run=None):
......@@ -84,13 +84,13 @@ class PretaskWriteCheckTest(AbstractFWorkerTest):
yield tdir
self.worker.schedule_fetch_task()
self.asyncio_loop.run_until_complete(self.worker.single_iteration_task)
self.asyncio_loop.run_until_complete(self.worker.single_iteration_fut)
if post_run is not None:
post_run()
self.manager.post.assert_called_once_with('/ack-status-change/error', loop=mock.ANY)
self.assertFalse(self.worker.sleeping_task.done())
self.assertFalse(self.worker.sleeping_fut.done())
# Mock merge_with_home_config() so that it doesn't overread actual config.
......@@ -133,10 +133,10 @@ class PretaskReadCheckTest(AbstractFWorkerTest):
yield tdir
self.worker.schedule_fetch_task()
self.asyncio_loop.run_until_complete(self.worker.single_iteration_task)
self.asyncio_loop.run_until_complete(self.worker.single_iteration_fut)
if post_run is not None:
post_run()
self.manager.post.assert_called_once_with('/ack-status-change/error', loop=mock.ANY)
self.assertFalse(self.worker.sleeping_task.done())
self.assertFalse(self.worker.sleeping_fut.done())
......@@ -199,11 +199,11 @@ class TestWorkerTaskExecution(AbstractFWorkerTest):
self.worker.schedule_fetch_task()
self.manager.post.assert_not_called()
interesting_task = self.worker.single_iteration_task
self.loop.run_until_complete(self.worker.single_iteration_task)
interesting_task = self.worker.single_iteration_fut
self.loop.run_until_complete(self.worker.single_iteration_fut)
# Another fetch-task task should have been scheduled.
self.assertNotEqual(self.worker.single_iteration_task, interesting_task)
self.assertNotEqual(self.worker.single_iteration_fut, interesting_task)
self.manager.post.assert_called_once_with('/task', loop=self.asyncio_loop)
self.tuqueue.queue.assert_has_calls([
......@@ -255,7 +255,7 @@ class TestWorkerTaskExecution(AbstractFWorkerTest):
await self.worker.stop_current_task()
asyncio.ensure_future(stop(), loop=self.loop)
self.loop.run_until_complete(self.worker.single_iteration_task)
self.loop.run_until_complete(self.worker.single_iteration_fut)
self.assertTrue(stop_called)
......@@ -449,8 +449,8 @@ class WorkerSleepingTest(AbstractFWorkerTest):
self.worker.schedule_fetch_task()
with self.assertRaises(concurrent.futures.CancelledError):
self.loop.run_until_complete(self.worker.single_iteration_task)
self.loop.run_until_complete(self.worker.single_iteration_fut)
self.assertIsNotNone(self.worker.sleeping_task)
self.assertFalse(self.worker.sleeping_task.done())
self.assertTrue(self.worker.single_iteration_task.done())
self.assertIsNotNone(self.worker.sleeping_fut)
self.assertFalse(self.worker.sleeping_fut.done())
self.assertTrue(self.worker.single_iteration_fut.done())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment