Skip to content
Snippets Groups Projects
Commit 5f7aa10b authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Worker: sign off at Manager upon SIGINT and SIGTERM

This allows the manager to re-queue the worker's current task.
parent b00a37b2
No related branches found
No related tags found
No related merge requests found
......@@ -87,6 +87,12 @@ class FlamencoWorker:
default=None, init=False,
validator=attr.validators.optional(attr.validators.instance_of(asyncio.Future)))
# When the worker is shutting down, the currently running task will be
# handed back to the manager for re-scheduling. In such a situation,
# an abort is expected and acceptable.
failures_are_acceptable = attr.ib(default=False, init=False,
validator=attr.validators.instance_of(bool))
_log = attrs_extra.log('%s.FlamencoWorker' % __name__)
@property
......@@ -208,6 +214,7 @@ class FlamencoWorker:
"""Gracefully shuts down any asynchronous tasks."""
self._log.warning('Shutting down')
self.failures_are_acceptable = True
if self.fetch_task_task is not None and not self.fetch_task_task.done():
self._log.info('shutdown(): Cancelling task fetching task %s', self.fetch_task_task)
......@@ -237,6 +244,11 @@ class FlamencoWorker:
# Try to do a final push of queued updates to the Manager.
self.loop.run_until_complete(self.tuqueue.flush_for_shutdown(loop=self.loop))
# Let the Manager know we're shutting down
self._log.info('shutdown(): signing off at Manager')
self.loop.run_until_complete(self.manager.post('/sign-off', loop=self.loop))
self.failures_are_acceptable = False
async def fetch_task(self, delay: float):
"""Fetches a single task to perform from Flamenco Manager, and executes it.
......@@ -287,16 +299,26 @@ class FlamencoWorker:
task_status='completed',
activity='Task completed',
)
elif self.failures_are_acceptable:
self._log.warning('Task %s failed, but ignoring it since we are shutting down.',
self.task_id)
else:
self._log.error('Task %s failed', self.task_id)
await self.register_task_update(task_status='failed')
except asyncio.CancelledError:
self._log.warning('Task %s was canceled', self.task_id)
await self.register_task_update(task_status='canceled',
activity='Task was canceled')
if self.failures_are_acceptable:
self._log.warning('Task %s was cancelled, but ignoring it since '
'we are shutting down.', self.task_id)
else:
self._log.warning('Task %s was cancelled', self.task_id)
await self.register_task_update(task_status='canceled',
activity='Task was canceled')
except Exception as ex:
self._log.exception('Uncaught exception executing task %s' % self.task_id)
try:
# Such a failure will always result in a failed task, even when
# self.failures_are_acceptable = True; only expected failures are
# acceptable then.
with (await self._queue_lock):
self._queued_log_entries.append(traceback.format_exc())
await self.register_task_update(
......@@ -306,9 +328,10 @@ class FlamencoWorker:
except Exception:
self._log.exception('While notifying manager of failure, another error happened.')
finally:
# Always schedule a new task run; after a little delay to not hammer the world
# when we're in some infinite failure loop.
self.schedule_fetch_task(FETCH_TASK_DONE_SCHEDULE_NEW_DELAY)
if not self.failures_are_acceptable:
# Schedule a new task run unless shutting down; after a little delay to not hammer
# the world when we're in some infinite failure loop.
self.schedule_fetch_task(FETCH_TASK_DONE_SCHEDULE_NEW_DELAY)
async def push_to_manager(self, *, delay: datetime.timedelta = None):
"""Updates a task's status and activity.
......
......@@ -129,7 +129,38 @@ class ExecCommandTest(AbstractCommandTest):
call(decode_err),
])
# The update should NOT contain a new task status -- that is left to the Worker.
self.fworker.register_task_update.assert_called_with(activity=decode_err)
def test_exec_python_fails(self):
import shlex
import sys
cmd = self.construct()
# Use shlex to quote strings like this, so we're sure it's done well.
args = [sys.executable, '-c', r'raise SystemExit("¡FAIL!")']
settings = {
'cmd': ' '.join(shlex.quote(s) for s in args)
}
ok = self.loop.run_until_complete(asyncio.wait_for(
cmd.run(settings),
0.6
))
self.assertFalse(ok)
# Check that the execution error has been reported.
self.fworker.register_log.assert_has_calls([
call('exec: Starting'),
call('Executing %s',
'%s -c \'raise SystemExit("¡FAIL!")\'' % sys.executable),
call('> ¡FAIL!'), # note the logged line doesn't end in a newline
call('exec.(task_id=12345, command_idx=0): Error executing: '
'Command failed with status 1')
])
# The update should NOT contain a new task status -- that is left to the Worker.
self.fworker.register_task_update.assert_called_with(
activity=decode_err,
task_status='failed'
activity='exec.(task_id=12345, command_idx=0): Error executing: '
'Command failed with status 1',
)
......@@ -22,6 +22,8 @@ class AbstractFWorkerTest(AbstractWorkerTest):
self.shutdown_future = self.asyncio_loop.create_future()
self.manager = Mock(spec=FlamencoManager)
self.manager.post = CoroMock()
self.trunner = Mock(spec=TaskRunner)
self.tuqueue = Mock(spec=TaskUpdateQueue)
self.tuqueue.flush_for_shutdown = CoroMock()
......@@ -215,9 +217,6 @@ class TestWorkerTaskExecution(AbstractFWorkerTest):
]
})
def assert_becoming_active(url, payload, loop):
self.assertEqual(['je', 'moeder'], payload)
self.worker.schedule_fetch_task()
stop_called = False
......@@ -359,3 +358,46 @@ class WorkerPushToMasterTest(AbstractFWorkerTest):
# The scheduled task should be cancelled.
self.assertTrue(self.worker._push_log_to_manager.cancelled())
class WorkerShutdownTest(AbstractWorkerTest):
def setUp(self):
from flamenco_worker.cli import construct_asyncio_loop
from flamenco_worker.upstream import FlamencoManager
from flamenco_worker.worker import FlamencoWorker
from flamenco_worker.runner import TaskRunner
from flamenco_worker.upstream_update_queue import TaskUpdateQueue
from mock_responses import CoroMock
self.asyncio_loop = construct_asyncio_loop()
self.asyncio_loop.set_debug(True)
self.shutdown_future = self.asyncio_loop.create_future()
self.manager = Mock(spec=FlamencoManager)
self.manager.post = CoroMock()
self.trunner = Mock(spec=TaskRunner)
self.tuqueue = Mock(spec=TaskUpdateQueue)
self.tuqueue.flush_for_shutdown = CoroMock()
self.trunner.abort_current_task = CoroMock()
self.worker = FlamencoWorker(
manager=self.manager,
trunner=self.trunner,
tuqueue=self.tuqueue,
job_types=['sleep', 'unittest'],
worker_id='1234',
worker_secret='jemoeder',
loop=self.asyncio_loop,
shutdown_future=self.shutdown_future,
)
def test_shutdown(self):
self.shutdown_future.cancel()
self.worker.shutdown()
self.manager.post.assert_called_once_with('/sign-off', loop=self.asyncio_loop)
def tearDown(self):
self.asyncio_loop.close()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment