diff --git a/flamenco_worker/runner.py b/flamenco_worker/runner.py index ff2aa72cab988ac44ae4a4c0661e8bd02f727807..926f6afac900f2073e3516521b06e60911dbce6c 100644 --- a/flamenco_worker/runner.py +++ b/flamenco_worker/runner.py @@ -77,6 +77,30 @@ class TaskRunner: settings['render_output'] = os.path.normpath(settings['render_output']).replace(head, os.path.normpath(fworker.worker_output_dir)) return + def check_src_dir(self, settings: dict, fworker: worker.FlamencoWorker): + if not fworker.worker_output_dir: + return + + if not settings.get('src'): + return + + (head, tail) = os.path.split(os.path.normpath(settings['src'])) + + settings['src'] = os.path.normpath(settings['src']).replace(head, os.path.normpath(fworker.worker_output_dir)) + return + + def check_dest_dir(self, settings: dict, fworker: worker.FlamencoWorker): + if not fworker.worker_output_dir: + return + + if not settings.get('dest'): + return + + (head, tail) = os.path.split(os.path.normpath(settings['dest'])) + + settings['dest'] = os.path.normpath(settings['dest']).replace(head, os.path.normpath(fworker.worker_output_dir)) + return + async def execute(self, task: dict, fworker: worker.FlamencoWorker) -> bool: """Executes a task, returns True iff the entire task was run succesfully.""" @@ -97,6 +121,8 @@ class TaskRunner: self.check_blender_cmd(cmd_settings, fworker) self.check_storage_dir(cmd_settings, fworker) self.check_output_dir(cmd_settings, fworker) + self.check_src_dir(cmd_settings, fworker) + self.check_dest_dir(cmd_settings, fworker) self.wait_storage_data(cmd_settings, fworker) if cmd_settings is None or not isinstance(cmd_settings, dict): diff --git a/flamenco_worker/runner.py~ b/flamenco_worker/runner.py~ new file mode 100644 index 0000000000000000000000000000000000000000..103d8fd47c490a9fcbd88c107a1ad3117768d340 --- /dev/null +++ b/flamenco_worker/runner.py~ @@ -0,0 +1,172 @@ +"""Task runner.""" + +import asyncio + +import attr +import os + +import os.path +import time + +from . import attrs_extra +from . import worker + + +@attr.s +class TaskRunner: + """Runs tasks, sending updates back to the worker.""" + + shutdown_future = attr.ib(validator=attr.validators.instance_of(asyncio.Future)) + last_command_idx = attr.ib(default=0, init=False) + + _log = attrs_extra.log('%s.TaskRunner' % __name__) + + def __attrs_post_init__(self): + self.current_command = None + + def wait_storage_data(self, settings: dict, fworker: worker.FlamencoWorker): + if not settings.get('filepath'): + return + + wait_count=0 + filepath = settings['filepath'] + '.run' + + while not os.path.exists(filepath): + self._log.info('Wait %d s on %s.', wait_count * 10, filepath) + + wait_count = wait_count + 1 + if wait_count > 360: + raise ValueError('File %s does not exist' % (filepath)) + + time.sleep(10) + return + + def check_blender_cmd(self, settings: dict, fworker: worker.FlamencoWorker): + if not fworker.worker_blender_cmd: + return + + if not settings.get('blender_cmd'): + return + + settings['blender_cmd'] = fworker.worker_blender_cmd + return + + def check_storage_dir(self, settings: dict, fworker: worker.FlamencoWorker): + if not fworker.worker_storage_dir: + return + + if not settings.get('filepath'): + return + + (head, tail) = os.path.split(os.path.normpath(settings['filepath'])) + (head, tail) = os.path.split(head) + + settings['filepath'] = os.path.normpath(settings['filepath']).replace(head, os.path.normpath(fworker.worker_storage_dir)) + return + + def check_output_dir(self, settings: dict, fworker: worker.FlamencoWorker): + if not fworker.worker_output_dir: + return + + if not settings.get('render_output'): + return + + (head, tail) = os.path.split(os.path.normpath(settings['render_output'])) + (head, tail) = os.path.split(head) + + settings['render_output'] = os.path.normpath(settings['render_output']).replace(head, os.path.normpath(fworker.worker_output_dir)) + return + + def check_src_dir(self, settings: dict, fworker: worker.FlamencoWorker): + if not fworker.worker_output_dir: + return + + if not settings.get('src'): + return + + (head, tail) = os.path.split(os.path.normpath(settings['src'])) + (head, tail) = os.path.split(head) + (head, tail) = os.path.split(head) + + settings['src'] = os.path.normpath(settings['src']).replace(head, os.path.normpath(fworker.worker_output_dir)) + return + + def check_dest_dir(self, settings: dict, fworker: worker.FlamencoWorker): + if not fworker.worker_output_dir: + return + + if not settings.get('dest'): + return + + (head, tail) = os.path.split(os.path.normpath(settings['dest'])) + (head, tail) = os.path.split(head) + (head, tail) = os.path.split(head) + + settings['dest'] = os.path.normpath(settings['dest']).replace(head, os.path.normpath(fworker.worker_output_dir)) + return + + async def execute(self, task: dict, fworker: worker.FlamencoWorker) -> bool: + """Executes a task, returns True iff the entire task was run succesfully.""" + + from .commands import command_handlers + + task_id = task['_id'] + + for cmd_idx, cmd_info in enumerate(task['commands']): + self.last_command_idx = cmd_idx + + # Figure out the command name + cmd_name = cmd_info.get('name') + if not cmd_name: + raise ValueError('Command %i of task %s has no name' % (cmd_idx, task_id)) + + cmd_settings = cmd_info.get('settings') + print(cmd_settings) + self.check_blender_cmd(cmd_settings, fworker) + self.check_storage_dir(cmd_settings, fworker) + self.check_output_dir(cmd_settings, fworker) + self.check_src_dir(cmd_settings, fworker) + self.check_dest_dir(cmd_settings, fworker) + self.wait_storage_data(cmd_settings, fworker) + + if cmd_settings is None or not isinstance(cmd_settings, dict): + raise ValueError('Command %i of task %s has malformed settings %r' % + (cmd_idx, task_id, cmd_settings)) + + # Find the handler class + try: + cmd_cls = command_handlers[cmd_name] + except KeyError: + raise ValueError('Command %i of task %s has unknown command name %r' % + (cmd_idx, task_id, cmd_name)) + + # Construct & execute the handler. + cmd = cmd_cls( + worker=fworker, + task_id=task_id, + command_idx=cmd_idx, + ) + self.current_command = cmd + success = await cmd.run(cmd_settings) + + if not success: + self._log.warning('Command %i of task %s was not succesful, aborting task.', + cmd_idx, task_id) + return False + + self._log.info('Task %s completed succesfully.', task_id) + self.current_command = None + return True + + async def abort_current_task(self): + """Aborts the current task by aborting the currently running command. + + Asynchronous, because a subprocess has to be wait()ed upon before returning. + """ + + if self.current_command is None: + self._log.info('abort_current_task: no command running, nothing to abort.') + return + + self._log.warning('abort_current_task: Aborting command %s', self.current_command) + await self.current_command.abort()