diff --git a/packages/flamenco-worker-python/flamenco_worker/cli.py b/packages/flamenco-worker-python/flamenco_worker/cli.py index 33aa0b33e9f7991b1dad7a665376f97715f196b7..0b92ddd98c677b6ba852a4443abca703acbea4f5 100644 --- a/packages/flamenco-worker-python/flamenco_worker/cli.py +++ b/packages/flamenco-worker-python/flamenco_worker/cli.py @@ -1,6 +1,7 @@ """Commandline interface entry points.""" import argparse +import asyncio import logging import logging.config @@ -50,24 +51,48 @@ def main(): # Load configuration from . import config - confparser = config.load_config(args.config, args.verbose) - from . import worker, upstream + # Construct the AsyncIO loop + loop = asyncio.get_event_loop() + if args.verbose: + log.debug('Enabling AsyncIO debugging') + loop.set_debug(True) + shutdown_future = loop.create_future() + + # Piece all the components together. + from . import runner, worker, upstream fmanager = upstream.FlamencoManager( manager_url=confparser.get(config.CONFIG_SECTION, 'manager_url'), ) + trunner = runner.TaskRunner( + shutdown_future=shutdown_future) + fworker = worker.FlamencoWorker( manager=fmanager, + trunner=trunner, job_types=confparser.get(config.CONFIG_SECTION, 'job_types').split(), worker_id=confparser.get(config.CONFIG_SECTION, 'worker_id'), worker_secret=confparser.get(config.CONFIG_SECTION, 'worker_secret'), + loop=loop, + shutdown_future=shutdown_future, ) + try: fworker.startup() fworker.mainloop() + except KeyboardInterrupt: + log.warning('Shutting down due to keyboard interrupt') + shutdown_future.cancel() + fworker.shutdown() + + async def stop_loop(): + log.info('Waiting to give tasks the time to stop gracefully') + await asyncio.sleep(2) + loop.stop() + loop.run_until_complete(stop_loop()) except: log.exception('Uncaught exception!') log.warning('Shutting down') diff --git a/packages/flamenco-worker-python/flamenco_worker/documents.py b/packages/flamenco-worker-python/flamenco_worker/documents.py new file mode 100644 index 0000000000000000000000000000000000000000..633e7e9db619c8fe6cc34b51df4f0514173a420c --- /dev/null +++ b/packages/flamenco-worker-python/flamenco_worker/documents.py @@ -0,0 +1,14 @@ +"""Classes for JSON documents used in upstream communication.""" + +import attr + + +@attr.s +class Activity: + """Activity on a task.""" + + description = attr.ib(validator=attr.validators.instance_of(str)) + current_cmd_name = attr.ib(validator=attr.validators.instance_of(str)) + percentage_complete_task = attr.ib(validator=attr.validators.instance_of(int)) + percentage_complete_command = attr.ib(validator=attr.validators.instance_of(int)) + diff --git a/packages/flamenco-worker-python/flamenco_worker/runner.py b/packages/flamenco-worker-python/flamenco_worker/runner.py new file mode 100644 index 0000000000000000000000000000000000000000..076f67c3036274d97ebfb5a38186585b23cd6372 --- /dev/null +++ b/packages/flamenco-worker-python/flamenco_worker/runner.py @@ -0,0 +1,106 @@ +"""Task runner.""" + +import abc +import asyncio +import logging + +import attr + +from . import attrs_extra +from . import worker + +command_handlers = {} + + +def command_executor(cmdname): + """Class decorator, registers a command executor.""" + + def decorator(cls): + assert cmdname not in command_handlers + + command_handlers[cmdname] = cls + cls.command_name = cmdname + return cls + + return decorator + + +@attr.s +class AbstractCommand(metaclass=abc.ABCMeta): + worker = attr.ib(validator=attr.validators.instance_of(worker.FlamencoWorker)) + task_id = attr.ib(validator=attr.validators.instance_of(str)) + command_idx = attr.ib(validator=attr.validators.instance_of(int)) + + # Set by @command_executor + command_name = attr.ib(default=None, init=False, validator=attr.validators.instance_of(str)) + + # Set by __call__() + identifier = attr.ib(default=None, init=False, validator=attr.validators.instance_of(str)) + _log = None + + def __call__(self, settings: dict): + """Runs the command, parsing output and sending it back to the worker.""" + + self.identifier = '%s(task_id=%s, command_idx=%s)' % ( + self.__class__.__name__, + self.task_id, + self.command_idx) + self._log = logging.getLogger('%s.%s' % (__name__, self.identifier)) + + verr = self.validate(settings) + if verr is not None: + self._log.warning('Invalid settings: %s', verr) + # worker.command_error(self.command_name, verr) + return + + try: + self.execute(settings) + except: + self._log.exception('Error executing.') + + @abc.abstractmethod + def execute(self, settings: dict): + """Executes the command.""" + + def validate(self, settings: dict) -> str: + """Validates the settings for this command. + + If there is an error, a description of the error is returned. + If the settings are valid, None is returned. + + By default all settings are considered valid. + """ + + return None + + def update_activity(self, new_activity): + """Sends a new activity to the manager.""" + + raise NotImplementedError() + + def upload_log(self, log): + """Sends a new chunk of logs to the manager.""" + + raise NotImplementedError() + + def handle_output_line(self, line: str): + """Handles a line of output, parsing it into activity & log.""" + + raise NotImplementedError() + + +@command_executor('echo') +class EchoCommand(AbstractCommand): + def execute(self, settings: dict): + raise NotImplementedError() + + +@attr.s +class TaskRunner: + """Runs tasks, sending updates back to the worker.""" + + shutdown_future = attr.ib(validator=attr.validators.instance_of(asyncio.Future)) + _log = attrs_extra.log('%s.TaskRunner' % __name__) + + async def execute(self, task: dict, fworker: worker.FlamencoWorker): + raise NotImplementedError('Task execution not implemented yet.') diff --git a/packages/flamenco-worker-python/flamenco_worker/worker.py b/packages/flamenco-worker-python/flamenco_worker/worker.py index cfd647131bd0313e0c23f5ab7ffc8b614d61f96f..2fdea336efb0f3c09d399aa40884fb8fa5517861 100644 --- a/packages/flamenco-worker-python/flamenco_worker/worker.py +++ b/packages/flamenco-worker-python/flamenco_worker/worker.py @@ -13,12 +13,16 @@ FETCH_TASK_EMPTY_RETRY_DELAY = 5 # when there are no tasks to perform @attr.s class FlamencoWorker: manager = attr.ib(validator=attr.validators.instance_of(upstream.FlamencoManager)) + trunner = attr.ib() # Instance of flamenco_worker.runner.TaskRunner job_types = attr.ib(validator=attr.validators.instance_of(list)) worker_id = attr.ib(validator=attr.validators.instance_of(str)) worker_secret = attr.ib(validator=attr.validators.instance_of(str)) - loop = attr.ib(init=False, validator=attr.validators.instance_of(asyncio.AbstractEventLoop)) - fetch_task_handle = attr.ib( + loop = attr.ib(validator=attr.validators.instance_of(asyncio.AbstractEventLoop)) + shutdown_future = attr.ib( + validator=attr.validators.optional(attr.validators.instance_of(asyncio.Future))) + + fetch_task_task = attr.ib( default=None, init=False, validator=attr.validators.optional(attr.validators.instance_of(asyncio.Task))) @@ -27,7 +31,6 @@ class FlamencoWorker: def startup(self): self._log.info('Starting up') - self.loop = asyncio.get_event_loop() if not self.worker_id or not self.worker_secret: self.register_at_manager() @@ -65,6 +68,8 @@ class FlamencoWorker: def mainloop(self): self._log.info('Entering main loop') + # TODO: add "watchdog" task that checks the asyncio loop and ensures there is + # always either a task being executed or a task fetch scheduled. self.schedule_fetch_task() self.loop.run_forever() @@ -76,15 +81,29 @@ class FlamencoWorker: :param delay: delay in seconds, after which the task fetch will be performed. """ - if self.fetch_task_handle: - self.fetch_task_handle.cancel() - self.fetch_task_handle = self.loop.call_later(delay, self._fetch_task) + if self.fetch_task_task: + self.fetch_task_task.cancel() + + self.fetch_task_task = asyncio.ensure_future(self.fetch_task(delay), loop=self.loop) + + def shutdown(self): + """Gracefully shuts down any asynchronous tasks.""" + + if self.fetch_task_task and not self.fetch_task_task.done(): + self._log.info('Cancelling task fetching task %s', self.fetch_task_task) + self.fetch_task_task.cancel() - def _fetch_task(self): - """Fetches a single task to perform from Flamenco Manager.""" + async def fetch_task(self, delay: float): + """Fetches a single task to perform from Flamenco Manager. + + :param delay: waits this many seconds before fetching a task. + """ import requests + self._log.debug('Going to fetch task in %s seconds', delay) + await asyncio.sleep(delay) + # TODO: use exponential backoff instead of retrying every fixed N seconds. self._log.info('Fetching task') try: @@ -109,9 +128,45 @@ class FlamencoWorker: return task_info = resp.json() - self._log.info('Received task: %s', task_info['_id']) + task_id = task_info['_id'] + self._log.info('Received task: %s', task_id) self._log.debug('Received task: %s', task_info) + try: + await self.trunner.execute(task_info, self) + except Exception as ex: + self._log.exception('Uncaught exception executing task %s' % task_id) + self.send_task_update( + task_id, + 'failed', + 'Uncaught exception: %s' % ex + ) + finally: + # Always schedule a new task run. + self.schedule_fetch_task(0) + + def send_task_update(self, task_id, new_activity_descr: str = None, + task_status: str = None): + """Updates a task's status and activity description.""" + + import requests + + self._log.info('Updating task %s with new status %r and activity %r', + task_id, task_status, new_activity_descr) + + payload = {'activity_descr': new_activity_descr} + if task_status: + payload['task_status'] = task_status + + resp = self.manager.post('/tasks/%s/update' % task_id, + json=payload, + auth=(self.worker_id, self.worker_secret)) + self._log.debug('Sent task %s update to manager', task_id) + try: + resp.raise_for_status() + except requests.HTTPError as ex: + self._log.error('Unable to send status update to manager, update is lost: %s', ex) + def generate_secret() -> str: """Generates a 64-character secret key."""