Skip to content
Snippets Groups Projects
Commit 70f50ede authored by Milan Jaros's avatar Milan Jaros
Browse files

fix worker

parent 248ec155
No related branches found
No related tags found
No related merge requests found
"""Task runner."""
import asyncio
import attr
import os
import os.path
import time
from . import attrs_extra
from . import worker
@attr.s
class TaskRunner:
"""Runs tasks, sending updates back to the worker."""
shutdown_future = attr.ib(validator=attr.validators.instance_of(asyncio.Future))
last_command_idx = attr.ib(default=0, init=False)
_log = attrs_extra.log('%s.TaskRunner' % __name__)
def __attrs_post_init__(self):
self.current_command = None
def wait_storage_data(self, settings: dict, fworker: worker.FlamencoWorker):
wait_count=0
filepath = settings['filepath'] + '.run'
while not os.path.exists(filepath):
self._log.info('Wait %d s on %s.', wait_count * 10, filepath)
wait_count = wait_count + 1
if wait_count > 360:
raise ValueError('File %s does not exist' % (filepath))
time.sleep(10)
return
def check_storage_dir(self, settings: dict, fworker: worker.FlamencoWorker):
if not fworker.worker_storage_dir:
return
if not settings.get('filepath'):
return
(head, tail) = os.path.split(os.path.normpath(settings['filepath']))
(head, tail) = os.path.split(head)
settings['filepath'] = os.path.normpath(settings['filepath']).replace(head, os.path.normpath(fworker.worker_storage_dir))
return
def check_output_dir(self, settings: dict, fworker: worker.FlamencoWorker):
if not fworker.worker_output_dir:
return
if not settings.get('render_output'):
return
(head, tail) = os.path.split(os.path.normpath(settings['render_output']))
(head, tail) = os.path.split(head)
settings['render_output'] = os.path.normpath(settings['render_output']).replace(head, os.path.normpath(fworker.worker_output_dir))
return
async def execute(self, task: dict, fworker: worker.FlamencoWorker) -> bool:
"""Executes a task, returns True iff the entire task was run succesfully."""
from .commands import command_handlers
task_id = task['_id']
for cmd_idx, cmd_info in enumerate(task['commands']):
self.last_command_idx = cmd_idx
# Figure out the command name
cmd_name = cmd_info.get('name')
if not cmd_name:
raise ValueError('Command %i of task %s has no name' % (cmd_idx, task_id))
cmd_settings = cmd_info.get('settings')
self.check_storage_dir(cmd_settings, fworker)
self.check_output_dir(cmd_settings, fworker)
self.wait_storage_data(cmd_settings, fworker)
if cmd_settings is None or not isinstance(cmd_settings, dict):
raise ValueError('Command %i of task %s has malformed settings %r' %
(cmd_idx, task_id, cmd_settings))
# Find the handler class
try:
cmd_cls = command_handlers[cmd_name]
except KeyError:
raise ValueError('Command %i of task %s has unknown command name %r' %
(cmd_idx, task_id, cmd_name))
# Construct & execute the handler.
cmd = cmd_cls(
worker=fworker,
task_id=task_id,
command_idx=cmd_idx,
)
self.current_command = cmd
success = await cmd.run(cmd_settings)
if not success:
self._log.warning('Command %i of task %s was not succesful, aborting task.',
cmd_idx, task_id)
return False
self._log.info('Task %s completed succesfully.', task_id)
self.current_command = None
return True
async def abort_current_task(self):
"""Aborts the current task by aborting the currently running command.
Asynchronous, because a subprocess has to be wait()ed upon before returning.
"""
if self.current_command is None:
self._log.info('abort_current_task: no command running, nothing to abort.')
return
self._log.warning('abort_current_task: Aborting command %s', self.current_command)
await self.current_command.abort()
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment