diff --git a/packages/flamenco-worker-python/flamenco_worker/documents.py b/packages/flamenco-worker-python/flamenco_worker/documents.py index 633e7e9db619c8fe6cc34b51df4f0514173a420c..b5629672b49569f5b0ad6f8530b5bb69555b99cd 100644 --- a/packages/flamenco-worker-python/flamenco_worker/documents.py +++ b/packages/flamenco-worker-python/flamenco_worker/documents.py @@ -7,8 +7,7 @@ import attr class Activity: """Activity on a task.""" - description = attr.ib(validator=attr.validators.instance_of(str)) - current_cmd_name = attr.ib(validator=attr.validators.instance_of(str)) - percentage_complete_task = attr.ib(validator=attr.validators.instance_of(int)) - percentage_complete_command = attr.ib(validator=attr.validators.instance_of(int)) - + activity = attr.ib(default='', validator=attr.validators.instance_of(str)) + current_command_idx = attr.ib(default=0, validator=attr.validators.instance_of(int)) + task_progress_percentage = attr.ib(default=0, validator=attr.validators.instance_of(int)) + command_progress_percentage = attr.ib(default=0, validator=attr.validators.instance_of(int)) diff --git a/packages/flamenco-worker-python/flamenco_worker/runner.py b/packages/flamenco-worker-python/flamenco_worker/runner.py index 076f67c3036274d97ebfb5a38186585b23cd6372..74fd348a9ab308530f900c363e23a83dfd7c852b 100644 --- a/packages/flamenco-worker-python/flamenco_worker/runner.py +++ b/packages/flamenco-worker-python/flamenco_worker/runner.py @@ -7,24 +7,12 @@ import logging import attr from . import attrs_extra +from . import documents from . import worker command_handlers = {} -def command_executor(cmdname): - """Class decorator, registers a command executor.""" - - def decorator(cls): - assert cmdname not in command_handlers - - command_handlers[cmdname] = cls - cls.command_name = cmdname - return cls - - return decorator - - @attr.s class AbstractCommand(metaclass=abc.ABCMeta): worker = attr.ib(validator=attr.validators.instance_of(worker.FlamencoWorker)) @@ -32,14 +20,18 @@ class AbstractCommand(metaclass=abc.ABCMeta): command_idx = attr.ib(validator=attr.validators.instance_of(int)) # Set by @command_executor - command_name = attr.ib(default=None, init=False, validator=attr.validators.instance_of(str)) + command_name = '' # Set by __call__() - identifier = attr.ib(default=None, init=False, validator=attr.validators.instance_of(str)) + identifier = attr.ib(default=None, init=False, + validator=attr.validators.optional(attr.validators.instance_of(str))) _log = None - def __call__(self, settings: dict): - """Runs the command, parsing output and sending it back to the worker.""" + async def run(self, settings: dict) -> bool: + """Runs the command, parsing output and sending it back to the worker. + + Returns True when the command was succesful, and False otherwise. + """ self.identifier = '%s(task_id=%s, command_idx=%s)' % ( self.__class__.__name__, @@ -50,16 +42,37 @@ class AbstractCommand(metaclass=abc.ABCMeta): verr = self.validate(settings) if verr is not None: self._log.warning('Invalid settings: %s', verr) - # worker.command_error(self.command_name, verr) - return + await self.register_error('%s: Error in settings: %s' % (self.identifier, verr)) + return False + + await self.register_log('%s: Starting' % self.identifier) + await self.update_task(documents.Activity( + activity='starting %s' % self.identifier, + current_command_idx=self.command_idx, + task_progress_percentage=-1, + command_progress_percentage=100 + )) try: - self.execute(settings) - except: + await self.execute(settings) + await self.register_error('%s: Completed' % self.identifier) + except Exception as ex: self._log.exception('Error executing.') + await self.register_error('%s: Error executing: %s' % (self.identifier, ex)) + return False + + await self.register_log('%s: Finished' % self.identifier) + await self.update_task(documents.Activity( + activity='finished %s' % self.identifier, + current_command_idx=self.command_idx, + task_progress_percentage=-1, + command_progress_percentage=100 + )) + + return True @abc.abstractmethod - def execute(self, settings: dict): + async def execute(self, settings: dict): """Executes the command.""" def validate(self, settings: dict) -> str: @@ -73,26 +86,47 @@ class AbstractCommand(metaclass=abc.ABCMeta): return None - def update_activity(self, new_activity): - """Sends a new activity to the manager.""" + async def register_log(self, log): + """Sends a new log to the worker.""" raise NotImplementedError() - def upload_log(self, log): - """Sends a new chunk of logs to the manager.""" + async def handle_output_line(self, line: str): + """Handles a line of output, parsing it into activity & log.""" raise NotImplementedError() - def handle_output_line(self, line: str): - """Handles a line of output, parsing it into activity & log.""" + async def register_error(self, log_line: str): + # TODO: implement this + self._log.debug('TODO: send this to worker: %r', log_line) - raise NotImplementedError() + +def command_executor(cmdname): + """Class decorator, registers a command executor.""" + + def decorator(cls): + assert cmdname not in command_handlers + + command_handlers[cmdname] = cls + cls.command_name = cmdname + return cls + + return decorator @command_executor('echo') class EchoCommand(AbstractCommand): - def execute(self, settings: dict): - raise NotImplementedError() + def validate(self, settings: dict): + try: + msg = settings['message'] + except KeyError: + return 'Missing "message"' + + if not isinstance(msg, str): + return 'Message must be a string' + + async def execute(self, settings: dict): + self.register_log(settings['message']) @attr.s @@ -100,7 +134,47 @@ class TaskRunner: """Runs tasks, sending updates back to the worker.""" shutdown_future = attr.ib(validator=attr.validators.instance_of(asyncio.Future)) + last_command_idx = attr.ib(default=0, init=False) + _log = attrs_extra.log('%s.TaskRunner' % __name__) - async def execute(self, task: dict, fworker: worker.FlamencoWorker): - raise NotImplementedError('Task execution not implemented yet.') + async def execute(self, task: dict, fworker: worker.FlamencoWorker) -> bool: + """Executes a task, returns True iff the entire task was run succesfully.""" + + task_id = task['_id'] + + for cmd_idx, cmd_info in enumerate(task['commands']): + self.last_command_idx = cmd_idx + + # Figure out the command name + cmd_name = cmd_info.get('name') + if not cmd_name: + raise ValueError('Command %i of task %s has no name' % (cmd_idx, task_id)) + + cmd_settings = cmd_info.get('settings') + if cmd_settings is None or not isinstance(cmd_settings, dict): + raise ValueError('Command %i of task %s has malformed settings %r' % + (cmd_idx, task_id, cmd_settings)) + + # Find the handler class + try: + cmd_cls = command_handlers[cmd_name] + except KeyError: + raise ValueError('Command %i of task %s has unknown command name %r' % + (cmd_idx, task_id, cmd_name)) + + # Construct & execute the handler. + cmd = cmd_cls( + worker=fworker, + task_id=task_id, + command_idx=cmd_idx, + ) + success = await cmd.run(cmd_settings) + + if not success: + self._log.warning('Command %i of task %s was not succesful, aborting task.', + cmd_idx, task_id) + return False + + self._log.info('Task %s completed succesfully.', task_id) + return True diff --git a/packages/flamenco-worker-python/flamenco_worker/upstream.py b/packages/flamenco-worker-python/flamenco_worker/upstream.py index 4f4fa64a8fa571444eb403fb827766a69769c7e8..c2a8fce20a171b440f9c238e9b58f251b488ad4a 100644 --- a/packages/flamenco-worker-python/flamenco_worker/upstream.py +++ b/packages/flamenco-worker-python/flamenco_worker/upstream.py @@ -58,7 +58,7 @@ class FlamencoManager: self.session.mount(self.manager_url, HTTPAdapter(max_retries=HTTP_RETRY_COUNT)) abs_url = urllib.parse.urljoin(self.manager_url, url) - self._log.debug('%s %s', method, abs_url) + self._log.debug('%s %s JSON: %s', method, abs_url, json) resp = self.session.request( method, abs_url, diff --git a/packages/flamenco-worker-python/flamenco_worker/worker.py b/packages/flamenco-worker-python/flamenco_worker/worker.py index fd1e52022ce6408b4919f0fd012f69c060e19be9..ad04bbe6097ef9d36d69b0c555f11a49bea03a40 100644 --- a/packages/flamenco-worker-python/flamenco_worker/worker.py +++ b/packages/flamenco-worker-python/flamenco_worker/worker.py @@ -1,14 +1,20 @@ import asyncio +import datetime import attr from . import attrs_extra +from . import documents from . import upstream # All durations/delays/etc are in seconds. FETCH_TASK_FAILED_RETRY_DELAY = 10 # when we failed obtaining a task FETCH_TASK_EMPTY_RETRY_DELAY = 5 # when there are no tasks to perform +PUSH_LOG_MAX_ENTRIES = 10 +PUSH_LOG_MAX_INTERVAL = datetime.timedelta(seconds=5) +PUSH_ACT_MAX_INTERVAL = datetime.timedelta(seconds=10) + @attr.s class FlamencoWorker: @@ -23,10 +29,29 @@ class FlamencoWorker: validator=attr.validators.optional(attr.validators.instance_of(asyncio.Future))) fetch_task_task = attr.ib( - default=None, - init=False, + default=None, init=False, validator=attr.validators.optional(attr.validators.instance_of(asyncio.Task))) + task_id = attr.ib( + default=None, init=False, + validator=attr.validators.optional(attr.validators.instance_of(str)) + ) + current_task_status = attr.ib( + default=None, init=False, + validator=attr.validators.optional(attr.validators.instance_of(str)) + ) + queued_log_entries = attr.ib(default=attr.Factory(list), init=False) + last_log_push = attr.ib( + default=datetime.datetime.now(), + validator=attr.validators.optional(attr.validators.instance_of(datetime.datetime))) + last_activity_push = attr.ib( + default=datetime.datetime.now(), + validator=attr.validators.optional(attr.validators.instance_of(datetime.datetime))) + + # Kept in sync with the task updates we send to upstream Master, so that we can send + # a complete Activity each time. + last_task_activity = attr.ib(default=attr.Factory(documents.Activity)) + _log = attrs_extra.log('%s.FlamencoWorker' % __name__) def startup(self): @@ -82,9 +107,6 @@ class FlamencoWorker: :param delay: delay in seconds, after which the task fetch will be performed. """ - if self.fetch_task_task: - self.fetch_task_task.cancel() - self.fetch_task_task = asyncio.ensure_future(self.fetch_task(delay), loop=self.loop) def shutdown(self): @@ -100,6 +122,7 @@ class FlamencoWorker: :param delay: waits this many seconds before fetching a task. """ + import traceback import requests self._log.debug('Going to fetch task in %s seconds', delay) @@ -129,45 +152,95 @@ class FlamencoWorker: return task_info = resp.json() - task_id = task_info['_id'] - self._log.info('Received task: %s', task_id) + self.task_id = task_info['_id'] + self._log.info('Received task: %s', self.task_id) self._log.debug('Received task: %s', task_info) try: await self.trunner.execute(task_info, self) except Exception as ex: - self._log.exception('Uncaught exception executing task %s' % task_id) - self.send_task_update( - task_id, - 'failed', - 'Uncaught exception: %s' % ex - ) + self._log.exception('Uncaught exception executing task %s' % self.task_id) + try: + self.queued_log_entries.append(traceback.format_exc()) + self.register_task_update( + task_status='failed', + activity='Uncaught exception: %s %s' % (type(ex).__name__, ex), + ) + except: + self._log.exception('While notifying manager of failure, another error happened.') finally: - # Always schedule a new task run. - self.schedule_fetch_task(0) + # Always schedule a new task run; after a little delay to not hammer the world + # when we're in some infinite failure loop. + self.schedule_fetch_task(3) - def send_task_update(self, task_id, new_activity_descr: str = None, - task_status: str = None): - """Updates a task's status and activity description.""" + def push_to_master(self): + """Updates a task's status and activity. + """ import requests - self._log.info('Updating task %s with new status %r and activity %r', - task_id, task_status, new_activity_descr) + self._log.info('Updating task %s with status %r and activity %r', + self.task_id, self.current_task_status, self.last_task_activity) + + payload = attr.asdict(self.last_task_activity) + payload['task_status'] = self.current_task_status - payload = {'activity_descr': new_activity_descr} - if task_status: - payload['task_status'] = task_status + now = datetime.datetime.now() + self.last_activity_push = now - resp = self.manager.post('/tasks/%s/update' % task_id, + if self.queued_log_entries: + payload['log'] = '\n'.join(self.queued_log_entries) + self.queued_log_entries.clear() + self.last_log_push = now + + resp = self.manager.post('/tasks/%s/update' % self.task_id, json=payload, auth=(self.worker_id, self.worker_secret)) - self._log.debug('Sent task %s update to manager', task_id) + self._log.debug('Sent task %s update to manager', self.task_id) try: resp.raise_for_status() except requests.HTTPError as ex: self._log.error('Unable to send status update to manager, update is lost: %s', ex) + def register_task_update(self, *, + task_status: str = None, + **kwargs): + """Stores the task status and activity, and possibly sends to Flamenco Master. + + If the last update to Master was long enough ago, or the task status changed, + the info is sent to Master. This way we can update command progress percentage + hundreds of times per second, without worrying about network overhead. + """ + + # Update the current activity + for key, value in kwargs.items(): + setattr(self.last_task_activity, key, value) + + task_status_changed = self.current_task_status != task_status + self.current_task_status = task_status + + if task_status_changed: + self._log.info('Task changed status to %s, pushing to master', task_status) + self.push_to_master() + elif datetime.datetime.now() - self.last_activity_push > PUSH_ACT_MAX_INTERVAL: + self._log.info('More than %s since last activity update, pushing to master', + PUSH_ACT_MAX_INTERVAL) + self.push_to_master() + + def register_log(self, log_entry): + """Registers a log entry, and possibly sends all queued log entries to upstream Master.""" + + self.queued_log_entries.append(log_entry) + + if len(self.queued_log_entries) > PUSH_LOG_MAX_ENTRIES: + self._log.info('Queued up more than %i log entries, pushing to master', + PUSH_LOG_MAX_ENTRIES) + self.push_to_master() + elif datetime.datetime.now() - self.last_log_push > PUSH_LOG_MAX_INTERVAL: + self._log.info('More than %s since last log update, pushing to master', + PUSH_LOG_MAX_INTERVAL) + self.push_to_master() + def generate_secret() -> str: """Generates a 64-character secret key."""