diff --git a/flamenco_worker/runner.py~ b/flamenco_worker/runner.py~ deleted file mode 100644 index 10c45588e9966f0ca4f29dca3a4174c1385a828d..0000000000000000000000000000000000000000 --- a/flamenco_worker/runner.py~ +++ /dev/null @@ -1,171 +0,0 @@ -"""Task runner.""" - -import asyncio - -import attr -import os - -import os.path -import time - -from . import attrs_extra -from . import worker - - -@attr.s -class TaskRunner: - """Runs tasks, sending updates back to the worker.""" - - shutdown_future = attr.ib(validator=attr.validators.instance_of(asyncio.Future)) - last_command_idx = attr.ib(default=0, init=False) - - _log = attrs_extra.log('%s.TaskRunner' % __name__) - - def __attrs_post_init__(self): - self.current_command = None - - def wait_storage_data(self, settings: dict, fworker: worker.FlamencoWorker): - if not settings.get('filepath'): - return - - wait_count=0 - filepath = settings['filepath'] + '.run' - - while not os.path.exists(filepath): - self._log.info('Wait %d s on %s.', wait_count * 10, filepath) - - wait_count = wait_count + 1 - if wait_count > 360: - raise ValueError('File %s does not exist' % (filepath)) - - time.sleep(10) - return - - def check_blender_cmd(self, settings: dict, fworker: worker.FlamencoWorker): - if not fworker.worker_blender_cmd: - return - - if not settings.get('blender_cmd'): - return - - settings['blender_cmd'] = fworker.worker_blender_cmd - return - - def check_storage_dir(self, settings: dict, fworker: worker.FlamencoWorker): - if not fworker.worker_dir: - return - - if not settings.get('filepath'): - return - - #(head, tail) = os.path.split(os.path.normpath(settings['filepath'])) - #(head, tail) = os.path.split(head) - - #settings['filepath'] = os.path.normpath(settings['filepath']).replace(head, os.path.normpath(fworker.worker_storage_dir)) - settings['filepath'] = os.path.normpath(fworker.worker_dir + '/' + settings['filepath']) - return - - def check_output_dir(self, settings: dict, fworker: worker.FlamencoWorker): - if not fworker.worker_dir: - return - - if not settings.get('render_output'): - return - - #(head, tail) = os.path.split(os.path.normpath(settings['render_output'])) - #(head, tail) = os.path.split(head) - - #settings['render_output'] = os.path.normpath(settings['render_output']).replace(head, os.path.normpath(fworker.worker_output_dir)) - settings['render_output'] = os.path.normpath(fworker.worker_dir + '/' + settings['render_output']) - - return - - def check_src_dir(self, settings: dict, fworker: worker.FlamencoWorker): - if not fworker.worker_dir: - return - - if not settings.get('src'): - return - - #(head, tail) = os.path.split(os.path.normpath(settings['src'])) - - settings['src'] = os.path.normpath(fworker.worker_dir + '/' + settings['src']) #.replace(head, os.path.normpath(fworker.worker_output_dir)) - return - - def check_dest_dir(self, settings: dict, fworker: worker.FlamencoWorker): - if not fworker.worker_dir: - return - - if not settings.get('dest'): - return - - #(head, tail) = os.path.split(os.path.normpath(settings['dest'])) - - settings['dest'] = os.path.normpath(fworker.worker_dir + '/' + settings['dest']) #.replace(head, os.path.normpath(fworker.worker_output_dir)) - return - - async def execute(self, task: dict, fworker: worker.FlamencoWorker) -> bool: - """Executes a task, returns True iff the entire task was run succesfully.""" - - from .commands import command_handlers - - task_id = task['_id'] - - for cmd_idx, cmd_info in enumerate(task['commands']): - self.last_command_idx = cmd_idx - - # Figure out the command name - cmd_name = cmd_info.get('name') - if not cmd_name: - raise ValueError('Command %i of task %s has no name' % (cmd_idx, task_id)) - - cmd_settings = cmd_info.get('settings') - print(cmd_settings) - self.check_blender_cmd(cmd_settings, fworker) - self.check_storage_dir(cmd_settings, fworker) - self.check_output_dir(cmd_settings, fworker) - self.check_src_dir(cmd_settings, fworker) - self.check_dest_dir(cmd_settings, fworker) - self.wait_storage_data(cmd_settings, fworker) - - if cmd_settings is None or not isinstance(cmd_settings, dict): - raise ValueError('Command %i of task %s has malformed settings %r' % - (cmd_idx, task_id, cmd_settings)) - - # Find the handler class - try: - cmd_cls = command_handlers[cmd_name] - except KeyError: - raise ValueError('Command %i of task %s has unknown command name %r' % - (cmd_idx, task_id, cmd_name)) - - # Construct & execute the handler. - cmd = cmd_cls( - worker=fworker, - task_id=task_id, - command_idx=cmd_idx, - ) - self.current_command = cmd - success = await cmd.run(cmd_settings) - - if not success: - self._log.warning('Command %i of task %s was not succesful, aborting task.', - cmd_idx, task_id) - return False - - self._log.info('Task %s completed succesfully.', task_id) - self.current_command = None - return True - - async def abort_current_task(self): - """Aborts the current task by aborting the currently running command. - - Asynchronous, because a subprocess has to be wait()ed upon before returning. - """ - - if self.current_command is None: - self._log.info('abort_current_task: no command running, nothing to abort.') - return - - self._log.warning('abort_current_task: Aborting command %s', self.current_command) - await self.current_command.abort()