Skip to content
Snippets Groups Projects
Commit 3838f56b authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Don't push task status 'canceled' when we are no longer allowed to run it.

When the "may I run" system says "no", we shouldn't push a "canceled"
status to the Manager, as this could overwrite another valid status that's
there for some other reason.
parent 1c16cf37
No related branches found
No related tags found
No related merge requests found
...@@ -3,6 +3,11 @@ ...@@ -3,6 +3,11 @@
This file logs the changes that are actually interesting to users (new features, This file logs the changes that are actually interesting to users (new features,
changed functionality, fixed bugs). changed functionality, fixed bugs).
## Version 2.0.9 (in development)
- Fixed sending task status updates after the task may no longer be run.
## Version 2.0.8 (released 2017-09-07) ## Version 2.0.8 (released 2017-09-07)
- Fixed parsing of `--config` CLI param on Python 3.5 - Fixed parsing of `--config` CLI param on Python 3.5
......
...@@ -41,6 +41,12 @@ class FlamencoWorker: ...@@ -41,6 +41,12 @@ class FlamencoWorker:
shutdown_future = attr.ib( shutdown_future = attr.ib(
validator=attr.validators.optional(attr.validators.instance_of(asyncio.Future))) validator=attr.validators.optional(attr.validators.instance_of(asyncio.Future)))
# When Manager tells us we may no longer run our current task, this is set to True.
# As a result, the cancelled state isn't pushed to Manager any more. It is reset
# to False when a new task is started.
task_is_silently_aborting = attr.ib(default=False, init=False,
validator=attr.validators.instance_of(bool))
fetch_task_task = attr.ib( fetch_task_task = attr.ib(
default=None, init=False, default=None, init=False,
validator=attr.validators.optional(attr.validators.instance_of(asyncio.Task))) validator=attr.validators.optional(attr.validators.instance_of(asyncio.Task)))
...@@ -240,6 +246,7 @@ class FlamencoWorker: ...@@ -240,6 +246,7 @@ class FlamencoWorker:
return return
self._log.warning('Stopping task %s', self.task_id) self._log.warning('Stopping task %s', self.task_id)
self.task_is_silently_aborting = True
try: try:
await self.trunner.abort_current_task() await self.trunner.abort_current_task()
...@@ -247,6 +254,11 @@ class FlamencoWorker: ...@@ -247,6 +254,11 @@ class FlamencoWorker:
self._log.info('asyncio task was canceled for task runner task %s', self.task_id) self._log.info('asyncio task was canceled for task runner task %s', self.task_id)
self.asyncio_execution_task.cancel() self.asyncio_execution_task.cancel()
await self.register_log('Worker %s stopped running this task,'
' no longer allowed to run by Manager', self.worker_id)
await self.push_to_manager()
await self.tuqueue.flush_and_report(loop=self.loop)
def shutdown(self): def shutdown(self):
"""Gracefully shuts down any asynchronous tasks.""" """Gracefully shuts down any asynchronous tasks."""
...@@ -298,6 +310,8 @@ class FlamencoWorker: ...@@ -298,6 +310,8 @@ class FlamencoWorker:
import traceback import traceback
import requests import requests
self._cleanup_state_for_new_task()
self._log.debug('Going to fetch task in %s seconds', delay) self._log.debug('Going to fetch task in %s seconds', delay)
await asyncio.sleep(delay) await asyncio.sleep(delay)
...@@ -349,6 +363,9 @@ class FlamencoWorker: ...@@ -349,6 +363,9 @@ class FlamencoWorker:
if self.failures_are_acceptable: if self.failures_are_acceptable:
self._log.warning('Task %s was cancelled, but ignoring it since ' self._log.warning('Task %s was cancelled, but ignoring it since '
'we are shutting down.', self.task_id) 'we are shutting down.', self.task_id)
elif self.task_is_silently_aborting:
self._log.warning('Task %s was cancelled, but ignoring it since '
'we are no longer allowed to run it.', self.task_id)
else: else:
self._log.warning('Task %s was cancelled', self.task_id) self._log.warning('Task %s was cancelled', self.task_id)
await self.register_task_update(task_status='canceled', await self.register_task_update(task_status='canceled',
...@@ -373,6 +390,13 @@ class FlamencoWorker: ...@@ -373,6 +390,13 @@ class FlamencoWorker:
# the world when we're in some infinite failure loop. # the world when we're in some infinite failure loop.
self.schedule_fetch_task(FETCH_TASK_DONE_SCHEDULE_NEW_DELAY) self.schedule_fetch_task(FETCH_TASK_DONE_SCHEDULE_NEW_DELAY)
def _cleanup_state_for_new_task(self):
"""Cleans up internal state to prepare for a new task to be executed."""
self.last_task_activity = documents.Activity()
self.task_is_silently_aborting = False
self.current_task_status = ''
async def push_to_manager(self, *, delay: datetime.timedelta = None): async def push_to_manager(self, *, delay: datetime.timedelta = None):
"""Updates a task's status and activity. """Updates a task's status and activity.
...@@ -390,7 +414,12 @@ class FlamencoWorker: ...@@ -390,7 +414,12 @@ class FlamencoWorker:
self._log.info('Updating task %s with status %r and activity %r', self._log.info('Updating task %s with status %r and activity %r',
self.task_id, self.current_task_status, self.last_task_activity) self.task_id, self.current_task_status, self.last_task_activity)
if self.task_is_silently_aborting:
self._log.info('push_to_manager: task is silently aborting, will only push logs')
payload = {}
else:
payload = attr.asdict(self.last_task_activity) payload = attr.asdict(self.last_task_activity)
if self.current_task_status:
payload['task_status'] = self.current_task_status payload['task_status'] = self.current_task_status
now = datetime.datetime.now() now = datetime.datetime.now()
...@@ -410,6 +439,10 @@ class FlamencoWorker: ...@@ -410,6 +439,10 @@ class FlamencoWorker:
if self._push_log_to_manager is not None: if self._push_log_to_manager is not None:
self._push_log_to_manager.cancel() self._push_log_to_manager.cancel()
if not payload:
self._log.debug('push_to_manager: nothing to push')
return
self.tuqueue.queue('/tasks/%s/update' % self.task_id, payload, loop=self.loop) self.tuqueue.queue('/tasks/%s/update' % self.task_id, payload, loop=self.loop)
async def register_task_update(self, *, async def register_task_update(self, *,
......
...@@ -71,7 +71,6 @@ class WorkerStartupTest(AbstractFWorkerTest): ...@@ -71,7 +71,6 @@ class WorkerStartupTest(AbstractFWorkerTest):
@unittest.mock.patch('socket.gethostname') @unittest.mock.patch('socket.gethostname')
@unittest.mock.patch('flamenco_worker.config.merge_with_home_config') @unittest.mock.patch('flamenco_worker.config.merge_with_home_config')
def test_startup_already_registered(self, mock_merge_with_home_config, mock_gethostname): def test_startup_already_registered(self, mock_merge_with_home_config, mock_gethostname):
from mock_responses import EmptyResponse, CoroMock from mock_responses import EmptyResponse, CoroMock
mock_gethostname.return_value = 'ws-unittest' mock_gethostname.return_value = 'ws-unittest'
...@@ -224,7 +223,6 @@ class TestWorkerTaskExecution(AbstractFWorkerTest): ...@@ -224,7 +223,6 @@ class TestWorkerTaskExecution(AbstractFWorkerTest):
def test_stop_current_task(self): def test_stop_current_task(self):
"""Test that stopped tasks get status 'canceled'.""" """Test that stopped tasks get status 'canceled'."""
from unittest.mock import call
from mock_responses import JsonResponse, CoroMock from mock_responses import JsonResponse, CoroMock
self.manager.post = CoroMock() self.manager.post = CoroMock()
...@@ -248,6 +246,7 @@ class TestWorkerTaskExecution(AbstractFWorkerTest): ...@@ -248,6 +246,7 @@ class TestWorkerTaskExecution(AbstractFWorkerTest):
self.worker.schedule_fetch_task() self.worker.schedule_fetch_task()
stop_called = False stop_called = False
async def stop(): async def stop():
nonlocal stop_called nonlocal stop_called
stop_called = True stop_called = True
...@@ -261,20 +260,22 @@ class TestWorkerTaskExecution(AbstractFWorkerTest): ...@@ -261,20 +260,22 @@ class TestWorkerTaskExecution(AbstractFWorkerTest):
self.assertTrue(stop_called) self.assertTrue(stop_called)
self.manager.post.assert_called_once_with('/task', loop=self.asyncio_loop) self.manager.post.assert_called_once_with('/task', loop=self.asyncio_loop)
self.tuqueue.queue.assert_has_calls([ self.tuqueue.queue.assert_any_call(
call('/tasks/58514d1e9837734f2e71b479/update', '/tasks/58514d1e9837734f2e71b479/update',
{'task_progress_percentage': 0, 'activity': '', {'task_progress_percentage': 0, 'activity': '',
'command_progress_percentage': 0, 'task_status': 'active', 'command_progress_percentage': 0, 'task_status': 'active',
'current_command_idx': 0}, 'current_command_idx': 0},
loop=self.loop, loop=self.loop,
),
call('/tasks/58514d1e9837734f2e71b479/update',
{'task_progress_percentage': 0, 'activity': 'Task was canceled',
'command_progress_percentage': 0, 'task_status': 'canceled',
'current_command_idx': 0},
loop=self.loop,
) )
])
# A bit clunky because we don't know which timestamp is included in the log line.
last_args, last_kwargs = self.tuqueue.queue.call_args
self.assertEqual(last_args[0], '/tasks/58514d1e9837734f2e71b479/update')
self.assertEqual(last_kwargs, {'loop': self.loop})
self.assertIn('log', last_args[1])
self.assertTrue(last_args[1]['log'].endswith(
'Worker 1234 stopped running this task, no longer allowed to run by Manager'))
self.assertEqual(self.tuqueue.queue.call_count, 2) self.assertEqual(self.tuqueue.queue.call_count, 2)
...@@ -388,7 +389,6 @@ class WorkerPushToMasterTest(AbstractFWorkerTest): ...@@ -388,7 +389,6 @@ class WorkerPushToMasterTest(AbstractFWorkerTest):
self.assertTrue(self.worker._push_log_to_manager.cancelled()) self.assertTrue(self.worker._push_log_to_manager.cancelled())
class WorkerShutdownTest(AbstractWorkerTest): class WorkerShutdownTest(AbstractWorkerTest):
def setUp(self): def setUp(self):
from flamenco_worker.cli import construct_asyncio_loop from flamenco_worker.cli import construct_asyncio_loop
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment