Skip to content
Snippets Groups Projects
Commit 248ec155 authored by Milan Jaros's avatar Milan Jaros
Browse files

fix worker

parent f532c8bb
Branches
No related tags found
No related merge requests found
...@@ -12,6 +12,7 @@ push_log_max_entries = 200 ...@@ -12,6 +12,7 @@ push_log_max_entries = 200
push_act_max_interval_seconds = 10 push_act_max_interval_seconds = 10
worker_storage_dir = /home/milanjaros/work/temp/public/flamenco/render/storage worker_storage_dir = /home/milanjaros/work/temp/public/flamenco/render/storage
worker_output_dir = /home/milanjaros/work/temp/public/flamenco/render/output worker_output_dir = /home/milanjaros/work/temp/public/flamenco/render/output
worker_blender_cmd = /home/milanjaros/work/temp/public/flamenco/blender/run_icc_mpi.sh
[loggers] [loggers]
keys = root,flamenco_worker keys = root,flamenco_worker
......
...@@ -115,6 +115,7 @@ def main(): ...@@ -115,6 +115,7 @@ def main():
push_act_max_interval=confparser.interval_secs('push_act_max_interval_seconds'), push_act_max_interval=confparser.interval_secs('push_act_max_interval_seconds'),
worker_storage_dir=confparser.value('worker_storage_dir'), worker_storage_dir=confparser.value('worker_storage_dir'),
worker_output_dir=confparser.value('worker_output_dir'), worker_output_dir=confparser.value('worker_output_dir'),
worker_blender_cmd=confparser.value('worker_blender_cmd'),
initial_state='testing' if args.test else 'awake', initial_state='testing' if args.test else 'awake',
run_single_task=args.single, run_single_task=args.single,
) )
......
...@@ -25,6 +25,9 @@ class TaskRunner: ...@@ -25,6 +25,9 @@ class TaskRunner:
self.current_command = None self.current_command = None
def wait_storage_data(self, settings: dict, fworker: worker.FlamencoWorker): def wait_storage_data(self, settings: dict, fworker: worker.FlamencoWorker):
if not settings.get('filepath'):
return
wait_count=0 wait_count=0
filepath = settings['filepath'] + '.run' filepath = settings['filepath'] + '.run'
...@@ -38,6 +41,16 @@ class TaskRunner: ...@@ -38,6 +41,16 @@ class TaskRunner:
time.sleep(10) time.sleep(10)
return return
def check_blender_cmd(self, settings: dict, fworker: worker.FlamencoWorker):
if not fworker.worker_blender_cmd:
return
if not settings.get('blender_cmd'):
return
settings['blender_cmd'] = fworker.worker_blender_cmd
return
def check_storage_dir(self, settings: dict, fworker: worker.FlamencoWorker): def check_storage_dir(self, settings: dict, fworker: worker.FlamencoWorker):
if not fworker.worker_storage_dir: if not fworker.worker_storage_dir:
return return
...@@ -80,6 +93,8 @@ class TaskRunner: ...@@ -80,6 +93,8 @@ class TaskRunner:
raise ValueError('Command %i of task %s has no name' % (cmd_idx, task_id)) raise ValueError('Command %i of task %s has no name' % (cmd_idx, task_id))
cmd_settings = cmd_info.get('settings') cmd_settings = cmd_info.get('settings')
print(cmd_settings)
self.check_blender_cmd(cmd_settings, fworker)
self.check_storage_dir(cmd_settings, fworker) self.check_storage_dir(cmd_settings, fworker)
self.check_output_dir(cmd_settings, fworker) self.check_output_dir(cmd_settings, fworker)
self.wait_storage_data(cmd_settings, fworker) self.wait_storage_data(cmd_settings, fworker)
......
"""Task runner."""
import asyncio
import attr
import os
import os.path
import time
from . import attrs_extra
from . import worker
@attr.s
class TaskRunner:
"""Runs tasks, sending updates back to the worker."""
shutdown_future = attr.ib(validator=attr.validators.instance_of(asyncio.Future))
last_command_idx = attr.ib(default=0, init=False)
_log = attrs_extra.log('%s.TaskRunner' % __name__)
def __attrs_post_init__(self):
self.current_command = None
def wait_storage_data(self, settings: dict, fworker: worker.FlamencoWorker):
wait_count=0
filepath = settings['filepath'] + '.run'
while not os.path.exists(filepath):
self._log.info('Wait %d s on %s.', wait_count * 10, filepath)
wait_count = wait_count + 1
if wait_count > 360:
raise ValueError('File %s does not exist' % (filepath))
time.sleep(10)
return
def check_storage_dir(self, settings: dict, fworker: worker.FlamencoWorker):
if not fworker.worker_storage_dir:
return
if not settings.get('filepath'):
return
(head, tail) = os.path.split(os.path.normpath(settings['filepath']))
(head, tail) = os.path.split(head)
settings['filepath'] = os.path.normpath(settings['filepath']).replace(head, os.path.normpath(fworker.worker_storage_dir))
return
def check_output_dir(self, settings: dict, fworker: worker.FlamencoWorker):
if not fworker.worker_output_dir:
return
if not settings.get('render_output'):
return
(head, tail) = os.path.split(os.path.normpath(settings['render_output']))
(head, tail) = os.path.split(head)
settings['render_output'] = os.path.normpath(settings['render_output']).replace(head, os.path.normpath(fworker.worker_output_dir))
return
async def execute(self, task: dict, fworker: worker.FlamencoWorker) -> bool:
"""Executes a task, returns True iff the entire task was run succesfully."""
from .commands import command_handlers
task_id = task['_id']
for cmd_idx, cmd_info in enumerate(task['commands']):
self.last_command_idx = cmd_idx
# Figure out the command name
cmd_name = cmd_info.get('name')
if not cmd_name:
raise ValueError('Command %i of task %s has no name' % (cmd_idx, task_id))
cmd_settings = cmd_info.get('settings')
self.check_storage_dir(cmd_settings, fworker)
self.check_output_dir(cmd_settings, fworker)
self.wait_storage_data(cmd_settings, fworker)
if cmd_settings is None or not isinstance(cmd_settings, dict):
raise ValueError('Command %i of task %s has malformed settings %r' %
(cmd_idx, task_id, cmd_settings))
# Find the handler class
try:
cmd_cls = command_handlers[cmd_name]
except KeyError:
raise ValueError('Command %i of task %s has unknown command name %r' %
(cmd_idx, task_id, cmd_name))
# Construct & execute the handler.
cmd = cmd_cls(
worker=fworker,
task_id=task_id,
command_idx=cmd_idx,
)
self.current_command = cmd
success = await cmd.run(cmd_settings)
if not success:
self._log.warning('Command %i of task %s was not succesful, aborting task.',
cmd_idx, task_id)
return False
self._log.info('Task %s completed succesfully.', task_id)
self.current_command = None
return True
async def abort_current_task(self):
"""Aborts the current task by aborting the currently running command.
Asynchronous, because a subprocess has to be wait()ed upon before returning.
"""
if self.current_command is None:
self._log.info('abort_current_task: no command running, nothing to abort.')
return
self._log.warning('abort_current_task: Aborting command %s', self.current_command)
await self.current_command.abort()
...@@ -50,6 +50,7 @@ class FlamencoWorker: ...@@ -50,6 +50,7 @@ class FlamencoWorker:
worker_storage_dir = attr.ib(validator=attr.validators.instance_of(str)) worker_storage_dir = attr.ib(validator=attr.validators.instance_of(str))
worker_output_dir = attr.ib(validator=attr.validators.instance_of(str)) worker_output_dir = attr.ib(validator=attr.validators.instance_of(str))
worker_blender_cmd = attr.ib(validator=attr.validators.instance_of(str))
loop = attr.ib(validator=attr.validators.instance_of(asyncio.AbstractEventLoop)) loop = attr.ib(validator=attr.validators.instance_of(asyncio.AbstractEventLoop))
shutdown_future = attr.ib( shutdown_future = attr.ib(
......
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment