Skip to content
Snippets Groups Projects
Commit 1d32c6a0 authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Worker: more explicit shutdown of what's currently running upon Ctrl+C

This also provides us with functions to call when the Manager tells us to
stop the currently running task.
parent 80a26aa4
No related branches found
No related tags found
No related merge requests found
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
import abc import abc
import asyncio import asyncio
import asyncio.subprocess
import logging import logging
import re import re
import typing import typing
...@@ -24,7 +25,8 @@ class CommandExecutionError(Exception): ...@@ -24,7 +25,8 @@ class CommandExecutionError(Exception):
@attr.s @attr.s
class AbstractCommand(metaclass=abc.ABCMeta): class AbstractCommand(metaclass=abc.ABCMeta):
worker = attr.ib(validator=attr.validators.instance_of(worker.FlamencoWorker)) worker = attr.ib(validator=attr.validators.instance_of(worker.FlamencoWorker),
repr=False)
task_id = attr.ib(validator=attr.validators.instance_of(str)) task_id = attr.ib(validator=attr.validators.instance_of(str))
command_idx = attr.ib(validator=attr.validators.instance_of(int)) command_idx = attr.ib(validator=attr.validators.instance_of(int))
...@@ -74,6 +76,10 @@ class AbstractCommand(metaclass=abc.ABCMeta): ...@@ -74,6 +76,10 @@ class AbstractCommand(metaclass=abc.ABCMeta):
self._log.warning('Error executing: %s', ex) self._log.warning('Error executing: %s', ex)
await self._register_exception(ex) await self._register_exception(ex)
return False return False
except asyncio.CancelledError as ex:
self._log.warning('Command execution was canceled')
await self._register_exception(ex)
return False
except Exception as ex: except Exception as ex:
# This is something unexpected, so do log the traceback. # This is something unexpected, so do log the traceback.
self._log.exception('Error executing.') self._log.exception('Error executing.')
...@@ -89,6 +95,14 @@ class AbstractCommand(metaclass=abc.ABCMeta): ...@@ -89,6 +95,14 @@ class AbstractCommand(metaclass=abc.ABCMeta):
return True return True
async def abort(self):
"""Aborts the command. This may or may not be actually possible.
A subprocess that's started by this command will be killed.
However, any asyncio coroutines that are not managed by this command
(such as the 'run' function) should be cancelled by the caller.
"""
async def _register_exception(self, ex: Exception): async def _register_exception(self, ex: Exception):
"""Registers an exception with the worker, and set the task status to 'failed'.""" """Registers an exception with the worker, and set the task status to 'failed'."""
...@@ -139,6 +153,9 @@ class TaskRunner: ...@@ -139,6 +153,9 @@ class TaskRunner:
_log = attrs_extra.log('%s.TaskRunner' % __name__) _log = attrs_extra.log('%s.TaskRunner' % __name__)
def __attrs_post_init__(self):
self.current_command = None
async def execute(self, task: dict, fworker: worker.FlamencoWorker) -> bool: async def execute(self, task: dict, fworker: worker.FlamencoWorker) -> bool:
"""Executes a task, returns True iff the entire task was run succesfully.""" """Executes a task, returns True iff the entire task was run succesfully."""
...@@ -170,6 +187,7 @@ class TaskRunner: ...@@ -170,6 +187,7 @@ class TaskRunner:
task_id=task_id, task_id=task_id,
command_idx=cmd_idx, command_idx=cmd_idx,
) )
self.current_command = cmd
success = await cmd.run(cmd_settings) success = await cmd.run(cmd_settings)
if not success: if not success:
...@@ -180,6 +198,19 @@ class TaskRunner: ...@@ -180,6 +198,19 @@ class TaskRunner:
self._log.info('Task %s completed succesfully.', task_id) self._log.info('Task %s completed succesfully.', task_id)
return True return True
async def abort_current_task(self):
"""Aborts the current task by aborting the currently running command.
Asynchronous, because a subprocess has to be wait()ed upon before returning.
"""
if self.current_command is None:
self._log.info('abort_current_task: no command running, nothing to abort.')
return
self._log.warning('abort_current_task: Aborting command %s', self.current_command)
await self.current_command.abort()
@command_executor('echo') @command_executor('echo')
class EchoCommand(AbstractCommand): class EchoCommand(AbstractCommand):
...@@ -217,6 +248,8 @@ class SleepCommand(AbstractCommand): ...@@ -217,6 +248,8 @@ class SleepCommand(AbstractCommand):
@attr.s @attr.s
class AbstractSubprocessCommand(AbstractCommand): class AbstractSubprocessCommand(AbstractCommand):
readline_timeout = attr.ib(default=SUBPROC_READLINE_TIMEOUT) readline_timeout = attr.ib(default=SUBPROC_READLINE_TIMEOUT)
proc = attr.ib(validator=attr.validators.instance_of(asyncio.subprocess.Process),
init=False)
async def subprocess(self, args: list): async def subprocess(self, args: list):
import subprocess import subprocess
...@@ -225,7 +258,7 @@ class AbstractSubprocessCommand(AbstractCommand): ...@@ -225,7 +258,7 @@ class AbstractSubprocessCommand(AbstractCommand):
cmd_to_log = ' '.join(shlex.quote(s) for s in args) cmd_to_log = ' '.join(shlex.quote(s) for s in args)
await self.worker.register_log('Executing %s', cmd_to_log) await self.worker.register_log('Executing %s', cmd_to_log)
proc = await asyncio.create_subprocess_exec( self.proc = await asyncio.create_subprocess_exec(
*args, *args,
stdin=subprocess.DEVNULL, stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
...@@ -233,48 +266,64 @@ class AbstractSubprocessCommand(AbstractCommand): ...@@ -233,48 +266,64 @@ class AbstractSubprocessCommand(AbstractCommand):
limit=800, # limit on the StreamReader buffer. limit=800, # limit on the StreamReader buffer.
) )
while not proc.stdout.at_eof(): try:
try: while not self.proc.stdout.at_eof():
line = await asyncio.wait_for(proc.stdout.readline(),
self.readline_timeout)
except asyncio.TimeoutError:
raise CommandExecutionError('Command timed out after %i seconds' %
self.readline_timeout)
except asyncio.CancelledError:
raise CommandExecutionError('Command execution was cancelled')
if len(line) == 0:
# EOF received, so let's bail.
break
try:
line = line.decode('utf8')
except UnicodeDecodeError as ex:
try: try:
proc.kill() line = await asyncio.wait_for(self.proc.stdout.readline(),
except ProcessLookupError: self.readline_timeout)
# The process is already stopped, so killing is impossible. That's ok. except asyncio.TimeoutError:
pass raise CommandExecutionError('Command timed out after %i seconds' %
await proc.wait() self.readline_timeout)
raise CommandExecutionError('Command produced non-UTF8 output, aborting: %s' % ex)
line = line.rstrip()
self._log.info('Read line: %s', line)
line = await self.process_line(line)
if line is not None:
await self.worker.register_log(line)
retcode = await proc.wait() if len(line) == 0:
self._log.info('Command %r stopped with status code %s', args, retcode) # EOF received, so let's bail.
break
if retcode: try:
raise CommandExecutionError('Command failed with status %s' % retcode) line = line.decode('utf8')
except UnicodeDecodeError as ex:
await self.abort()
raise CommandExecutionError('Command produced non-UTF8 output, '
'aborting: %s' % ex)
line = line.rstrip()
self._log.info('Read line: %s', line)
line = await self.process_line(line)
if line is not None:
await self.worker.register_log(line)
retcode = await self.proc.wait()
self._log.info('Command %r stopped with status code %s', args, retcode)
if retcode:
raise CommandExecutionError('Command failed with status %s' % retcode)
except asyncio.CancelledError as ex:
self._log.info('asyncio task got canceled, killing subprocess.')
self.abort()
async def process_line(self, line: str) -> typing.Optional[str]: async def process_line(self, line: str) -> typing.Optional[str]:
"""Processes the line, returning None to ignore it.""" """Processes the line, returning None to ignore it."""
return '> %s' % line return '> %s' % line
async def abort(self):
"""Aborts the command by killing the subprocess."""
if self.proc is None or self.proc == attr.NOTHING:
self._log.debug("No process to kill. That's ok.")
return
self._log.info('Aborting subprocess')
try:
self.proc.kill()
except ProcessLookupError:
# The process is already stopped, so killing is impossible. That's ok.
self._log.debug("The process was already stopped, aborting is impossible. That's ok.")
return
retval = await self.proc.wait()
self._log.info('The process aborted with status code %s', retval)
@command_executor('exec') @command_executor('exec')
class ExecCommand(AbstractSubprocessCommand): class ExecCommand(AbstractSubprocessCommand):
......
...@@ -164,6 +164,8 @@ class FlamencoWorker: ...@@ -164,6 +164,8 @@ class FlamencoWorker:
def shutdown(self): def shutdown(self):
"""Gracefully shuts down any asynchronous tasks.""" """Gracefully shuts down any asynchronous tasks."""
self._log.warning('Shutting down')
if self.fetch_task_task is not None and not self.fetch_task_task.done(): if self.fetch_task_task is not None and not self.fetch_task_task.done():
self._log.info('shutdown(): Cancelling task fetching task %s', self.fetch_task_task) self._log.info('shutdown(): Cancelling task fetching task %s', self.fetch_task_task)
self.fetch_task_task.cancel() self.fetch_task_task.cancel()
...@@ -176,6 +178,9 @@ class FlamencoWorker: ...@@ -176,6 +178,9 @@ class FlamencoWorker:
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
# Stop the task runner
self.loop.run_until_complete(self.trunner.abort_current_task())
# Queue anything that should still be pushed to the Manager # Queue anything that should still be pushed to the Manager
push_act_sched = self._push_act_to_manager is not None \ push_act_sched = self._push_act_to_manager is not None \
and not self._push_act_to_manager.done() and not self._push_act_to_manager.done()
......
...@@ -15,6 +15,7 @@ class AbstractFWorkerTest(AbstractWorkerTest): ...@@ -15,6 +15,7 @@ class AbstractFWorkerTest(AbstractWorkerTest):
from flamenco_worker.worker import FlamencoWorker from flamenco_worker.worker import FlamencoWorker
from flamenco_worker.runner import TaskRunner from flamenco_worker.runner import TaskRunner
from flamenco_worker.upstream_update_queue import TaskUpdateQueue from flamenco_worker.upstream_update_queue import TaskUpdateQueue
from mock_responses import CoroMock
self.asyncio_loop = construct_asyncio_loop() self.asyncio_loop = construct_asyncio_loop()
self.asyncio_loop.set_debug(True) self.asyncio_loop.set_debug(True)
...@@ -23,8 +24,10 @@ class AbstractFWorkerTest(AbstractWorkerTest): ...@@ -23,8 +24,10 @@ class AbstractFWorkerTest(AbstractWorkerTest):
self.manager = Mock(spec=FlamencoManager) self.manager = Mock(spec=FlamencoManager)
self.trunner = Mock(spec=TaskRunner) self.trunner = Mock(spec=TaskRunner)
self.tuqueue = Mock(spec=TaskUpdateQueue) self.tuqueue = Mock(spec=TaskUpdateQueue)
self.tuqueue.flush_for_shutdown = CoroMock()
self.trunner.execute = self.mock_task_execute self.trunner.execute = self.mock_task_execute
self.trunner.abort_current_task = CoroMock()
self.worker = FlamencoWorker( self.worker = FlamencoWorker(
manager=self.manager, manager=self.manager,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment