Skip to content
Snippets Groups Projects
Commit 1420660c authored by Milan Jaros's avatar Milan Jaros
Browse files

fix exr

parent 25f533b8
No related branches found
Tags
No related merge requests found
"""Task runner."""
import asyncio
import attr
import os
import os.path
import time
from . import attrs_extra
from . import worker
@attr.s
class TaskRunner:
"""Runs tasks, sending updates back to the worker."""
shutdown_future = attr.ib(validator=attr.validators.instance_of(asyncio.Future))
last_command_idx = attr.ib(default=0, init=False)
_log = attrs_extra.log('%s.TaskRunner' % __name__)
def __attrs_post_init__(self):
self.current_command = None
def wait_storage_data(self, settings: dict, fworker: worker.FlamencoWorker):
if not settings.get('filepath'):
return
wait_count=0
filepath = settings['filepath'] + '.run'
while not os.path.exists(filepath):
self._log.info('Wait %d s on %s.', wait_count * 10, filepath)
wait_count = wait_count + 1
if wait_count > 360:
raise ValueError('File %s does not exist' % (filepath))
time.sleep(10)
return
def check_blender_cmd(self, settings: dict, fworker: worker.FlamencoWorker):
if not fworker.worker_blender_cmd:
return
if not settings.get('blender_cmd'):
return
settings['blender_cmd'] = fworker.worker_blender_cmd
return
def check_storage_dir(self, settings: dict, fworker: worker.FlamencoWorker):
if not fworker.worker_dir:
return
if not settings.get('filepath'):
return
#(head, tail) = os.path.split(os.path.normpath(settings['filepath']))
#(head, tail) = os.path.split(head)
#settings['filepath'] = os.path.normpath(settings['filepath']).replace(head, os.path.normpath(fworker.worker_storage_dir))
settings['filepath'] = os.path.normpath(fworker.worker_dir + '/' + settings['filepath'])
return
def check_output_dir(self, settings: dict, fworker: worker.FlamencoWorker):
if not fworker.worker_dir:
return
if not settings.get('render_output'):
return
#(head, tail) = os.path.split(os.path.normpath(settings['render_output']))
#(head, tail) = os.path.split(head)
#settings['render_output'] = os.path.normpath(settings['render_output']).replace(head, os.path.normpath(fworker.worker_output_dir))
settings['render_output'] = os.path.normpath(fworker.worker_dir + '/' + settings['render_output'])
return
def check_src_dir(self, settings: dict, fworker: worker.FlamencoWorker):
if not fworker.worker_dir:
return
if not settings.get('src'):
return
#(head, tail) = os.path.split(os.path.normpath(settings['src']))
settings['src'] = os.path.normpath(fworker.worker_dir + '/' + settings['src']) #.replace(head, os.path.normpath(fworker.worker_output_dir))
return
def check_dest_dir(self, settings: dict, fworker: worker.FlamencoWorker):
if not fworker.worker_dir:
return
if not settings.get('dest'):
return
#(head, tail) = os.path.split(os.path.normpath(settings['dest']))
settings['dest'] = os.path.normpath(fworker.worker_dir + '/' + settings['dest']) #.replace(head, os.path.normpath(fworker.worker_output_dir))
return
async def execute(self, task: dict, fworker: worker.FlamencoWorker) -> bool:
"""Executes a task, returns True iff the entire task was run succesfully."""
from .commands import command_handlers
task_id = task['_id']
for cmd_idx, cmd_info in enumerate(task['commands']):
self.last_command_idx = cmd_idx
# Figure out the command name
cmd_name = cmd_info.get('name')
if not cmd_name:
raise ValueError('Command %i of task %s has no name' % (cmd_idx, task_id))
cmd_settings = cmd_info.get('settings')
print(cmd_settings)
self.check_blender_cmd(cmd_settings, fworker)
self.check_storage_dir(cmd_settings, fworker)
self.check_output_dir(cmd_settings, fworker)
self.check_src_dir(cmd_settings, fworker)
self.check_dest_dir(cmd_settings, fworker)
self.wait_storage_data(cmd_settings, fworker)
if cmd_settings is None or not isinstance(cmd_settings, dict):
raise ValueError('Command %i of task %s has malformed settings %r' %
(cmd_idx, task_id, cmd_settings))
# Find the handler class
try:
cmd_cls = command_handlers[cmd_name]
except KeyError:
raise ValueError('Command %i of task %s has unknown command name %r' %
(cmd_idx, task_id, cmd_name))
# Construct & execute the handler.
cmd = cmd_cls(
worker=fworker,
task_id=task_id,
command_idx=cmd_idx,
)
self.current_command = cmd
success = await cmd.run(cmd_settings)
if not success:
self._log.warning('Command %i of task %s was not succesful, aborting task.',
cmd_idx, task_id)
return False
self._log.info('Task %s completed succesfully.', task_id)
self.current_command = None
return True
async def abort_current_task(self):
"""Aborts the current task by aborting the currently running command.
Asynchronous, because a subprocess has to be wait()ed upon before returning.
"""
if self.current_command is None:
self._log.info('abort_current_task: no command running, nothing to abort.')
return
self._log.warning('abort_current_task: Aborting command %s', self.current_command)
await self.current_command.abort()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment