Skip to content
Snippets Groups Projects
Commit b54fed38 authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Send paths of produced output to Manager's /output-produced endpoint.

This is now implemented for Blender renders, merging of EXRs, and files
copied using the 'copy_file' command.
parent 2f1d429a
No related branches found
No related tags found
No related merge requests found
# Flamenco Worker changelog
This file logs the changes that are actually interesting to users (new features,
changed functionality, fixed bugs).
## Version 2.0.1 (released 2017-03-31)
- Registers rendered and copied files with the Manager, so that they can be
shown as "latest render".
## Version 2.0 (released 2017-03-29)
- First release of Pillar-based Flamenco, including this Worker.
...@@ -359,7 +359,7 @@ class CopyFileCommand(AbstractCommand): ...@@ -359,7 +359,7 @@ class CopyFileCommand(AbstractCommand):
import shutil import shutil
shutil.copy(str(src), str(dest)) shutil.copy(str(src), str(dest))
self.worker.output_produced(dest)
@command_executor('remove_tree') @command_executor('remove_tree')
class RemoveTreeCommand(AbstractCommand): class RemoveTreeCommand(AbstractCommand):
...@@ -499,6 +499,7 @@ class BlenderRenderCommand(AbstractSubprocessCommand): ...@@ -499,6 +499,7 @@ class BlenderRenderCommand(AbstractSubprocessCommand):
re_remaining = attr.ib(init=False) re_remaining = attr.ib(init=False)
re_status = attr.ib(init=False) re_status = attr.ib(init=False)
re_path_not_found = attr.ib(init=False) re_path_not_found = attr.ib(init=False)
re_file_saved = attr.ib(init=False)
def __attrs_post_init__(self): def __attrs_post_init__(self):
super().__attrs_post_init__() super().__attrs_post_init__()
...@@ -512,6 +513,7 @@ class BlenderRenderCommand(AbstractSubprocessCommand): ...@@ -512,6 +513,7 @@ class BlenderRenderCommand(AbstractSubprocessCommand):
r'\| Remaining:((?P<hours>\d+):)?(?P<minutes>\d+):(?P<seconds>\d+)\.(?P<hunds>\d+) ') r'\| Remaining:((?P<hours>\d+):)?(?P<minutes>\d+):(?P<seconds>\d+)\.(?P<hunds>\d+) ')
self.re_status = re.compile(r'\| (?P<status>[^\|]+)\s*$') self.re_status = re.compile(r'\| (?P<status>[^\|]+)\s*$')
self.re_path_not_found = re.compile(r"Warning: Path '.*' not found") self.re_path_not_found = re.compile(r"Warning: Path '.*' not found")
self.re_file_saved = re.compile(r"Saved: '(?P<filename>.*)'")
def validate(self, settings: dict): def validate(self, settings: dict):
import shlex import shlex
...@@ -625,6 +627,11 @@ class BlenderRenderCommand(AbstractSubprocessCommand): ...@@ -625,6 +627,11 @@ class BlenderRenderCommand(AbstractSubprocessCommand):
activity = line activity = line
await self.worker.register_task_update(activity=activity) await self.worker.register_task_update(activity=activity)
# See if this line logs the saving of a file.
m = self.re_file_saved.search(line)
if m:
self.worker.output_produced(m.group('filename'))
# Not a render progress line; just log it for now. # Not a render progress line; just log it for now.
return '> %s' % line return '> %s' % line
...@@ -724,7 +731,10 @@ class MergeProgressiveRendersCommand(AbstractSubprocessCommand): ...@@ -724,7 +731,10 @@ class MergeProgressiveRendersCommand(AbstractSubprocessCommand):
# move output files into the correct spot. # move output files into the correct spot.
await self.move(tmppath / 'merged0001.exr', output) await self.move(tmppath / 'merged0001.exr', output)
await self.move(tmppath / 'preview.jpg', output.with_suffix('.jpg')) # await self.move(tmppath / 'preview.jpg', output.with_suffix('.jpg'))
# See if this line logs the saving of a file.
self.worker.output_produced(output)
async def move(self, src: Path, dst: Path): async def move(self, src: Path, dst: Path):
"""Moves a file to another location.""" """Moves a file to another location."""
......
import asyncio import asyncio
import datetime import datetime
import pathlib
import typing import typing
import attr import attr
...@@ -302,7 +303,7 @@ class FlamencoWorker: ...@@ -302,7 +303,7 @@ class FlamencoWorker:
) )
elif self.failures_are_acceptable: elif self.failures_are_acceptable:
self._log.warning('Task %s failed, but ignoring it since we are shutting down.', self._log.warning('Task %s failed, but ignoring it since we are shutting down.',
self.task_id) self.task_id)
else: else:
self._log.error('Task %s failed', self.task_id) self._log.error('Task %s failed', self.task_id)
await self.register_task_update(task_status='failed') await self.register_task_update(task_status='failed')
...@@ -437,6 +438,29 @@ class FlamencoWorker: ...@@ -437,6 +438,29 @@ class FlamencoWorker:
self._push_log_to_manager = asyncio.ensure_future( self._push_log_to_manager = asyncio.ensure_future(
self.push_to_manager(delay=self.push_log_max_interval)) self.push_to_manager(delay=self.push_log_max_interval))
def output_produced(self, *paths: typing.Union[str, pathlib.PurePath]):
"""Registers a produced output (e.g. rendered frame) with the manager.
This performs a HTTP POST in a background task, returning as soon as
the task is scheduled.
"""
async def do_post():
try:
self._log.info('Sending %i path(s) to Manager', len(paths))
resp = await self.manager.post('/output-produced',
json={'paths': [str(p) for p in paths]},
loop=self.loop)
if resp.status_code == 204:
self._log.info('Manager accepted our output notification for %s', paths)
else:
self._log.warning('Manager rejected our output notification: %d %s',
resp.status_code, resp.text)
except Exception:
self._log.exception('error POSTing to manager /output-produced')
self.loop.create_task(do_post())
def generate_secret() -> str: def generate_secret() -> str:
"""Generates a 64-character secret key.""" """Generates a 64-character secret key."""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment