Skip to content
Snippets Groups Projects
Commit 90a83b64 authored by Milan Jaros's avatar Milan Jaros
Browse files

add worker path for storage and output

parent 31c3b21e
No related branches found
No related tags found
No related merge requests found
[flamenco-worker]
# The URL of the Flamenco Manager. Leave empty for auto-discovery via UPnP/SSDP.
manager_url =
manager_url = http://login2:8083
task_types = sleep blender-render file-management
task_update_queue_db = flamenco-worker.db
......@@ -10,7 +10,8 @@ may_i_run_interval_seconds = 5
push_log_max_interval_seconds = 20
push_log_max_entries = 200
push_act_max_interval_seconds = 10
worker_storage_dir = /scratch/work/user/milanjaros/temp/public/flamenco/output/storage
worker_output_dir = /scratch/work/user/milanjaros/temp/public/flamenco/output/output
[loggers]
keys = root,flamenco_worker
......@@ -38,7 +39,7 @@ class = logging.handlers.TimedRotatingFileHandler
formatter = flamenco
# (filename, when, interval, backupCount, encoding, delay, utc, atTime=None)
# Be aware that tilde expansion is *not* performed on the path.
args = ('/tmp/flamenco-worker.log', 'midnight', 1, 7, 'utf8', True, True)
args = ('flamenco-worker.log', 'midnight', 1, 7, 'utf8', True, True)
[formatters]
keys = flamenco
......
......@@ -87,6 +87,8 @@ def main():
push_log_max_interval=confparser.interval_secs('push_log_max_interval_seconds'),
push_log_max_entries=confparser.value('push_log_max_entries', int),
push_act_max_interval=confparser.interval_secs('push_act_max_interval_seconds'),
worker_storage_dir=confparser.value('worker_storage_dir'),
worker_output_dir=confparser.value('worker_output_dir'),
)
mir = may_i_run.MayIRun(
......
......@@ -3,6 +3,7 @@
import asyncio
import attr
import os
from . import attrs_extra
from . import worker
......@@ -20,6 +21,32 @@ class TaskRunner:
def __attrs_post_init__(self):
self.current_command = None
def check_storage_dir(self, settings: dict, fworker: worker.FlamencoWorker):
if not fworker.worker_storage_dir:
return
if not settings.get('filepath'):
return
(head, tail) = os.path.split(os.path.normpath(settings['filepath']))
(head, tail) = os.path.split(head)
settings['filepath'] = os.path.normpath(settings['filepath']).replace(head, os.path.normpath(fworker.worker_storage_dir))
return
def check_output_dir(self, settings: dict, fworker: worker.FlamencoWorker):
if not fworker.worker_output_dir:
return
if not settings.get('render_output'):
return
(head, tail) = os.path.split(os.path.normpath(settings['render_output']))
(head, tail) = os.path.split(head)
settings['render_output'] = os.path.normpath(settings['render_output']).replace(head, os.path.normpath(fworker.worker_output_dir))
return
async def execute(self, task: dict, fworker: worker.FlamencoWorker) -> bool:
"""Executes a task, returns True iff the entire task was run succesfully."""
......@@ -36,6 +63,9 @@ class TaskRunner:
raise ValueError('Command %i of task %s has no name' % (cmd_idx, task_id))
cmd_settings = cmd_info.get('settings')
self.check_storage_dir(cmd_settings, fworker)
self.check_output_dir(cmd_settings, fworker)
if cmd_settings is None or not isinstance(cmd_settings, dict):
raise ValueError('Command %i of task %s has malformed settings %r' %
(cmd_idx, task_id, cmd_settings))
......
......@@ -37,6 +37,9 @@ class FlamencoWorker:
worker_id = attr.ib(validator=attr.validators.instance_of(str))
worker_secret = attr.ib(validator=attr.validators.instance_of(str))
worker_storage_dir = attr.ib(validator=attr.validators.instance_of(str))
worker_output_dir = attr.ib(validator=attr.validators.instance_of(str))
loop = attr.ib(validator=attr.validators.instance_of(asyncio.AbstractEventLoop))
shutdown_future = attr.ib(
validator=attr.validators.optional(attr.validators.instance_of(asyncio.Future)))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment