diff --git a/flamenco_worker/commands.py b/flamenco_worker/commands.py index cfcfc54ad5a393bedefc6f7db702a3035c111a94..b72d8ba8b5b854fdff55e6af7ba254821b20e48a 100644 --- a/flamenco_worker/commands.py +++ b/flamenco_worker/commands.py @@ -22,6 +22,12 @@ from . import worker command_handlers = {} # type: typing.Mapping[str, typing.Type['AbstractCommand']] +# Some type declarations. +Settings = typing.MutableMapping[str, typing.Any] +# This is the type of the 2nd arg for instanceof(a, b) +InstanceOfType = typing.Union[type, typing.Tuple[typing.Union[type, typing.Tuple[typing.Any, ...]], + ...]] + # Timeout of subprocess.stdout.readline() call. SUBPROC_READLINE_TIMEOUT = 3600 # seconds @@ -94,7 +100,7 @@ class AbstractCommand(metaclass=abc.ABCMeta): self.command_idx) self._log = log.getChild(self.identifier) - async def run(self, settings: dict) -> bool: + async def run(self, settings: Settings) -> bool: """Runs the command, parsing output and sending it back to the worker. Returns True when the command was succesful, and False otherwise. @@ -159,13 +165,13 @@ class AbstractCommand(metaclass=abc.ABCMeta): ) @abc.abstractmethod - async def execute(self, settings: dict): + async def execute(self, settings: Settings) -> None: """Executes the command. An error should be indicated by an exception. """ - def validate(self, settings: dict): + def validate(self, settings: Settings) -> typing.Optional[str]: """Validates the settings for this command. If there is an error, a description of the error is returned. @@ -176,7 +182,8 @@ class AbstractCommand(metaclass=abc.ABCMeta): return None - def _setting(self, settings: dict, key: str, is_required: bool, valtype: typing.Type = str) \ + def _setting(self, settings: Settings, key: str, is_required: bool, + valtype: InstanceOfType = str) \ -> typing.Tuple[typing.Any, typing.Optional[str]]: """Parses a setting, returns either (value, None) or (None, errormsg)""" @@ -198,7 +205,7 @@ class AbstractCommand(metaclass=abc.ABCMeta): @command_executor('echo') class EchoCommand(AbstractCommand): - def validate(self, settings: dict): + def validate(self, settings: Settings): try: msg = settings['message'] except KeyError: @@ -207,20 +214,20 @@ class EchoCommand(AbstractCommand): if not isinstance(msg, str): return 'Message must be a string' - async def execute(self, settings: dict): + async def execute(self, settings: Settings): await self.worker.register_log(settings['message']) @command_executor('log_a_lot') class LogALotCommand(AbstractCommand): - def validate(self, settings: dict): + def validate(self, settings: Settings): lines = settings.get('lines', 20000) if isinstance(lines, float): lines = int(lines) if not isinstance(lines, int): return '"lines" setting must be an integer, not %s' % type(lines) - async def execute(self, settings: dict): + async def execute(self, settings: Settings): lines = settings.get('lines', 20000) await self.worker.register_task_update(activity='logging %d lines' % lines) @@ -230,7 +237,7 @@ class LogALotCommand(AbstractCommand): @command_executor('sleep') class SleepCommand(AbstractCommand): - def validate(self, settings: dict): + def validate(self, settings: Settings): try: sleeptime = settings['time_in_seconds'] except KeyError: @@ -239,7 +246,7 @@ class SleepCommand(AbstractCommand): if not isinstance(sleeptime, (int, float)): return 'time_in_seconds must be an int or float' - async def execute(self, settings: dict): + async def execute(self, settings: Settings): time_in_seconds = settings['time_in_seconds'] await self.worker.register_log('Sleeping for %s seconds' % time_in_seconds) await asyncio.sleep(time_in_seconds) @@ -285,7 +292,7 @@ def _unique_path(path: Path) -> Path: @command_executor('move_out_of_way') class MoveOutOfWayCommand(AbstractCommand): - def validate(self, settings: dict): + def validate(self, settings: Settings): try: src = settings['src'] except KeyError: @@ -294,7 +301,7 @@ class MoveOutOfWayCommand(AbstractCommand): if not isinstance(src, str): return 'src must be a string' - async def execute(self, settings: dict): + async def execute(self, settings: Settings): src = Path(settings['src']) if not src.exists(): self._log.info('Render output path %s does not exist, not moving out of way', src) @@ -315,12 +322,12 @@ class MoveOutOfWayCommand(AbstractCommand): @command_executor('move_to_final') class MoveToFinalCommand(AbstractCommand): - def validate(self, settings: dict): + def validate(self, settings: Settings): _, err1 = self._setting(settings, 'src', True) _, err2 = self._setting(settings, 'dest', True) return err1 or err2 - async def execute(self, settings: dict): + async def execute(self, settings: Settings): src = Path(settings['src']) if not src.exists(): msg = 'Path %s does not exist, not moving' % src @@ -349,7 +356,7 @@ class MoveToFinalCommand(AbstractCommand): @command_executor('copy_file') class CopyFileCommand(AbstractCommand): - def validate(self, settings: dict): + def validate(self, settings: Settings): src, err = self._setting(settings, 'src', True) if err: return err @@ -361,7 +368,7 @@ class CopyFileCommand(AbstractCommand): if not dest: return 'dest may not be empty' - async def execute(self, settings: dict): + async def execute(self, settings: Settings): src = Path(settings['src']) if not src.exists(): raise CommandExecutionError('Path %s does not exist, unable to copy' % src) @@ -386,14 +393,14 @@ class CopyFileCommand(AbstractCommand): @command_executor('remove_tree') class RemoveTreeCommand(AbstractCommand): - def validate(self, settings: dict): + def validate(self, settings: Settings): path, err = self._setting(settings, 'path', True) if err: return err if not path: return "'path' may not be empty" - async def execute(self, settings: dict): + async def execute(self, settings: Settings): path = Path(settings['path']) if not path.exists(): msg = 'Path %s does not exist, so not removing.' % path @@ -424,7 +431,7 @@ class AbstractSubprocessCommand(AbstractCommand, abc.ABC): return None return pathlib.Path(subprocess_pid_file) - def validate(self, settings: dict) -> typing.Optional[str]: + def validate(self, settings: Settings) -> typing.Optional[str]: supererr = super().validate(settings) if supererr: return supererr @@ -578,7 +585,7 @@ class AbstractSubprocessCommand(AbstractCommand, abc.ABC): @command_executor('exec') class ExecCommand(AbstractSubprocessCommand): - def validate(self, settings: dict): + def validate(self, settings: Settings): try: cmd = settings['cmd'] except KeyError: @@ -590,7 +597,7 @@ class ExecCommand(AbstractSubprocessCommand): return '"cmd" may not be empty' return super().validate(settings) - async def execute(self, settings: dict): + async def execute(self, settings: Settings): await self.subprocess(shlex.split(settings['cmd'])) @@ -628,7 +635,7 @@ class BlenderRenderCommand(AbstractSubprocessCommand): self._last_activity_time = 0.0 - def validate(self, settings: dict): + def validate(self, settings: Settings): blender_cmd, err = self._setting(settings, 'blender_cmd', True) if err: return err @@ -668,7 +675,7 @@ class BlenderRenderCommand(AbstractSubprocessCommand): return super().validate(settings) - async def execute(self, settings: dict): + async def execute(self, settings: Settings): cmd = self._build_blender_cmd(settings) await self.worker.register_task_update(activity='Starting Blender') @@ -770,7 +777,7 @@ class BlenderRenderCommand(AbstractSubprocessCommand): @command_executor('blender_render_progressive') class BlenderRenderProgressiveCommand(BlenderRenderCommand): - def validate(self, settings: dict): + def validate(self, settings: Settings): err = super().validate(settings) if err: return err @@ -796,7 +803,7 @@ class BlenderRenderProgressiveCommand(BlenderRenderCommand): @command_executor('merge_progressive_renders') class MergeProgressiveRendersCommand(AbstractSubprocessCommand): - def validate(self, settings: dict): + def validate(self, settings: Settings): blender_cmd, err = self._setting(settings, 'blender_cmd', True) if err: return err @@ -831,7 +838,7 @@ class MergeProgressiveRendersCommand(AbstractSubprocessCommand): return super().validate(settings) - async def execute(self, settings: dict): + async def execute(self, settings: Settings): blendpath = Path(__file__).with_name('merge-exr.blend') cmd = settings['blender_cmd'][:]