Skip to content
Snippets Groups Projects
commands.py 52.9 KiB
Newer Older
  • Learn to ignore specific revisions
  • 
            with tempfile.TemporaryDirectory(dir=str(output.parent)) as tmpdir:
                tmppath = Path(tmpdir)
                assert tmppath.exists()
    
    
                settings['tmpdir'] = tmppath.as_posix()
    
                    '--python-expr', self.script_template % settings
    
                ]
    
                await self.worker.register_task_update(activity='Starting Blender to merge EXR files')
                await self.subprocess(cmd)
    
                # move output files into the correct spot.
                await self.move(tmppath / 'merged0001.exr', output)
    
    
            # See if this line logs the saving of a file.
            self.worker.output_produced(output)
    
        def _base_blender_cli(self, settings):
            blendpath = Path(__file__).parent / 'resources/merge-exr.blend'
    
            cmd = settings['blender_cmd'] + [
                '--factory-startup',
                '--enable-autoexec',
                '-noaudio',
                '--background',
                blendpath.as_posix(),
                '--python-exit-code', '47',
            ]
            return cmd
    
    
        async def move(self, src: Path, dst: Path):
            """Moves a file to another location."""
    
            self._log.info('Moving %s to %s', src, dst)
    
    
            assert src.exists()
    
            assert src.is_file()
            assert not dst.exists() or dst.is_file()
            assert dst.exists() or dst.parent.exists()
    
            await self.worker.register_log('Moving %s to %s', src, dst)
            shutil.move(str(src), str(dst))
    
    @command_executor('merge_progressive_render_sequence')
    class MergeProgressiveRenderSequenceCommand(MergeProgressiveRendersCommand):
        script_template = MERGE_EXR_SEQUENCE_PYTHON
    
        def validate(self, settings: Settings):
            err = super().validate(settings)
            if err:
                return err
    
            if '##' not in settings['output']:
                return 'Output filename should contain at least two "##" marks'
    
            _, err = self._setting(settings, 'frame_start', True, int)
            if err:
                return err
    
            _, err = self._setting(settings, 'frame_end', True, int)
            if err:
                return err
    
        async def execute(self, settings: Settings):
            cmd = self._base_blender_cli(settings)
    
            # set up node properties and render settings.
            output = Path(settings['output'])
            await self._mkdir_if_not_exists(output.parent)
    
            cmd += [
                '--python-expr', self.script_template % settings
            ]
            await self.worker.register_task_update(activity='Starting Blender to merge EXR sequence')
            await self.subprocess(cmd)
    
            as_glob = _hashes_to_glob(output)
            for fpath in as_glob.parent.glob(as_glob.name):
                self.worker.output_produced(fpath)
    
    
    
    # TODO(Sybren): maybe subclass AbstractBlenderCommand instead?
    
    @command_executor('blender_render_audio')
    class BlenderRenderAudioCommand(BlenderRenderCommand):
        def validate(self, settings: Settings):
            err = super().validate(settings)
            if err:
                return err
    
            render_output, err = self._setting(settings, 'render_output', True)
            if err:
                return err
            if not render_output:
                return "'render_output' is a required setting"
    
            _, err = self._setting(settings, 'frame_start', False, int)
            if err:
                return err
            _, err = self._setting(settings, 'frame_end', False, int)
            if err:
                return err
    
    
        async def _build_blender_cmd(self, settings: Settings) -> typing.List[str]:
    
            frame_start = settings.get('frame_start')
            frame_end = settings.get('frame_end')
            render_output = settings.get('render_output')
    
            py_lines = [
                "import bpy"
            ]
            if frame_start is not None:
                py_lines.append(f'bpy.context.scene.frame_start = {frame_start}')
            if frame_end is not None:
                py_lines.append(f'bpy.context.scene.frame_end = {frame_end}')
    
            py_lines.append(f"bpy.ops.sound.mixdown(filepath={render_output!r}, "
                            f"codec='FLAC', container='FLAC', "
                            f"accuracy=128)")
            py_lines.append('bpy.ops.wm.quit_blender()')
            py_script = '\n'.join(py_lines)
    
            return [
                *settings['blender_cmd'],
                '--enable-autoexec',
                '-noaudio',
                '--background',
                settings['filepath'],
                '--python-exit-code', '47',
                '--python-expr', py_script
            ]
    
    @command_executor('exr_sequence_to_jpeg')
    class EXRSequenceToJPEGCommand(BlenderRenderCommand):
        """Convert an EXR sequence to JPEG files.
    
        This assumes the EXR files are named '{frame number}.exr', where the
        frame number may have any number of leading zeroes.
        """
        pyscript = Path(__file__).parent / 'resources/exr_sequence_to_jpeg.py'
    
        def validate(self, settings: Settings) -> typing.Optional[str]:
            if not self.pyscript.exists():
                raise FileNotFoundError(f'Resource script {self.pyscript} cannot be found')
    
    
            exr_glob, err = self._setting(settings, 'exr_glob', False)
    
    
            # Only for backward compatibility. Should not be used.
            exr_directory, err = self._setting(settings, 'exr_directory', False)
            if err:
                return err
    
            if not exr_glob and not exr_directory:
                return '"exr_glob" may not be empty'
            if exr_glob and exr_directory:
                # Normally I would say 'use either one or the other, not both', but
                # in this case 'exr_directory' is deprecated and shouldn't be used.
                return 'Just pass "exr_glob", do not use "exr_directory"'
    
            if exr_directory:
                settings['exr_glob'] = str(Path(exr_directory) / '*.exr')
    
    
            output_pattern, err = self._setting(settings, 'output_pattern', False,
                                                default='preview-######.jpg')
            if not output_pattern:
                return '"output_pattern" may not be empty'
            return super().validate(settings)
    
        async def _build_blender_cmd(self, settings) -> typing.List[str]:
            cmd = await super()._build_blender_cmd(settings)
    
            return cmd + [
                '--python-exit-code', '32',
                '--python', str(self.pyscript),
                '--',
    
                '--exr-glob', settings['exr_glob'],
    
                '--output-pattern', settings['output_pattern'],
            ]
    
    
    
    class AbstractFFmpegCommand(AbstractSubprocessCommand, abc.ABC):
    
        index_file: typing.Optional[pathlib.Path] = None
    
    
        def validate(self, settings: Settings) -> typing.Optional[str]:
    
            # Check that FFmpeg can be found and shlex-split the string.
            ffmpeg_cmd, err = self._setting(settings, 'ffmpeg_cmd', is_required=False, default='ffmpeg')
    
    
            cmd = shlex.split(ffmpeg_cmd)
            executable_path: typing.Optional[str] = shutil.which(cmd[0])
    
            if not executable_path:
    
                return f'FFmpeg command {ffmpeg_cmd!r} not found on $PATH'
            settings['ffmpeg_cmd'] = cmd
    
            self._log.debug('Found FFmpeg command at %r', executable_path)
    
            return None
    
        async def execute(self, settings: Settings) -> None:
            cmd = self._build_ffmpeg_command(settings)
            await self.subprocess(cmd)
    
    
            if self.index_file is not None and self.index_file.exists():
                try:
                    self.index_file.unlink()
                except IOError:
                    msg = f'unable to unlink file {self.index_file}, ignoring'
                    await self.worker.register_log(msg)
                    self._log.warning(msg)
    
    
        def _build_ffmpeg_command(self, settings: Settings) -> typing.List[str]:
            assert isinstance(settings['ffmpeg_cmd'], list), \
                'run validate() before _build_ffmpeg_command'
            cmd = [
                *settings['ffmpeg_cmd'],
                *self.ffmpeg_args(settings),
            ]
            return cmd
    
        @abc.abstractmethod
        def ffmpeg_args(self, settings: Settings) -> typing.List[str]:
            """Construct the FFmpeg arguments to execute.
    
            Does not need to include the FFmpeg command itself, just
            its arguments.
            """
            pass
    
    
        def create_index_file(self, input_files: pathlib.Path) -> pathlib.Path:
            """Construct a list of filenames for ffmpeg to process.
    
            The filenames are stored in a file 'ffmpeg-input.txt' that sits in the
            same directory as the input files.
    
            It is assumed that 'input_files' contains a glob pattern in the file
            name, and not in any directory parts.
    
            The index file will be deleted after successful execution of the ffmpeg
            command.
            """
    
            # The index file needs to sit next to the input files, as
            # ffmpeg checks for 'unsafe paths'.
            self.index_file = input_files.absolute().with_name('ffmpeg-input.txt')
    
            with self.index_file.open('w') as outfile:
                for file_path in sorted(input_files.parent.glob(input_files.name)):
                    escaped = str(file_path.name).replace("'", "\\'")
                    print("file '%s'" % escaped, file=outfile)
    
            return self.index_file
    
    
    
    @command_executor('create_video')
    class CreateVideoCommand(AbstractFFmpegCommand):
        """Create a video from individual frames.
    
        Requires FFmpeg to be installed and available with the 'ffmpeg' command.
        """
    
        codec_video = 'h264'
    
        # Select some settings that are useful for scrubbing through the video.
    
        constant_rate_factor = 23
        keyframe_interval = 18  # GOP size
    
        max_b_frames: typing.Optional[int] = 0
    
        def validate(self, settings: Settings) -> typing.Optional[str]:
            err = super().validate(settings)
            if err:
                return err
    
    
            # Check that we know our input and output image files.
            input_files, err = self._setting(settings, 'input_files', is_required=True)
            if err:
                return err
            self._log.debug('Input files: %s', input_files)
            output_file, err = self._setting(settings, 'output_file', is_required=True)
            if err:
                return err
            self._log.debug('Output file: %s', output_file)
    
            fps, err = self._setting(settings, 'fps', is_required=True, valtype=(int, float))
            if err:
                return err
            self._log.debug('Frame rate: %r fps', fps)
            return None
    
    
        def ffmpeg_args(self, settings: Settings) -> typing.List[str]:
    
            input_files = Path(settings['input_files'])
    
    
            ]
    
            if platform.system() == 'Windows':
                # FFMpeg on Windows doesn't support globbing, so we have to do
                # that in Python instead.
                index_file = self.create_index_file(input_files)
                args += [
                    '-f', 'concat',
                    '-i', index_file.as_posix(),
                ]
            else:
                args += [
                    '-pattern_type', 'glob',
                    '-i', input_files.as_posix(),
                ]
    
            args += [
    
                '-c:v', self.codec_video,
                '-crf', str(self.constant_rate_factor),
                '-g', str(self.keyframe_interval),
    
                '-vf', 'pad=ceil(iw/2)*2:ceil(ih/2)*2',
    
            ]
            if self.max_b_frames is not None:
    
                args.extend(['-bf', str(self.max_b_frames)])
            args += [
    
                settings['output_file']
            ]
    
            return args
    
    
    @command_executor('concatenate_videos')
    class ConcatenateVideosCommand(AbstractFFmpegCommand):
        """Create a video by concatenating other videos.
    
        Requires FFmpeg to be installed and available with the 'ffmpeg' command.
        """
    
        def validate(self, settings: Settings) -> typing.Optional[str]:
            err = super().validate(settings)
            if err:
                return err
    
            # Check that we know our input and output image files.
            input_files, err = self._setting(settings, 'input_files', is_required=True)
            if err:
                return err
            self._log.debug('Input files: %s', input_files)
            output_file, err = self._setting(settings, 'output_file', is_required=True)
            if err:
                return err
            self._log.debug('Output file: %s', output_file)
    
            return None
    
        def ffmpeg_args(self, settings: Settings) -> typing.List[str]:
    
            index_file = self.create_index_file(Path(settings['input_files']))
    
    
            output_file = Path(settings['output_file'])
            self._log.debug('Output file: %s', output_file)
    
            args = [
                '-f', 'concat',
    
                output_file.as_posix(),
    
            ]
            return args
    
    
    @command_executor('mux_audio')
    class MuxAudioCommand(AbstractFFmpegCommand):
    
        def validate(self, settings: Settings) -> typing.Optional[str]:
            err = super().validate(settings)
            if err:
                return err
    
            # Check that we know our input and output image files.
            audio_file, err = self._setting(settings, 'audio_file', is_required=True)
            if err:
                return err
            if not Path(audio_file).exists():
                return f'Audio file {audio_file} does not exist'
            self._log.debug('Audio file: %s', audio_file)
    
            video_file, err = self._setting(settings, 'video_file', is_required=True)
            if err:
                return err
            if not Path(video_file).exists():
                return f'Video file {video_file} does not exist'
            self._log.debug('Video file: %s', video_file)
    
            output_file, err = self._setting(settings, 'output_file', is_required=True)
            if err:
                return err
            self._log.debug('Output file: %s', output_file)
    
            return None
    
        def ffmpeg_args(self, settings: Settings) -> typing.List[str]:
            audio_file = Path(settings['audio_file']).absolute()
            video_file = Path(settings['video_file']).absolute()
            output_file = Path(settings['output_file']).absolute()
    
            args = [
                '-i', str(audio_file),
                '-i', str(video_file),
                '-c', 'copy',
                '-y',
                str(output_file),
            ]
            return args
    
    
    @command_executor('encode_audio')
    class EncodeAudioCommand(AbstractFFmpegCommand):
    
        def validate(self, settings: Settings) -> typing.Optional[str]:
            err = super().validate(settings)
            if err:
                return err
    
            # Check that we know our input and output image files.
            input_file, err = self._setting(settings, 'input_file', is_required=True)
            if err:
                return err
            if not Path(input_file).exists():
                return f'Audio file {input_file} does not exist'
            self._log.debug('Audio file: %s', input_file)
    
            output_file, err = self._setting(settings, 'output_file', is_required=True)
            if err:
                return err
            self._log.debug('Output file: %s', output_file)
    
            _, err = self._setting(settings, 'bitrate', is_required=True)
            if err:
                return err
            _, err = self._setting(settings, 'codec', is_required=True)
            if err:
                return err
            return None
    
        def ffmpeg_args(self, settings: Settings) -> typing.List[str]:
            input_file = Path(settings['input_file']).absolute()
            output_file = Path(settings['output_file']).absolute()
    
            args = [
                '-i', str(input_file),
                '-c:a', settings['codec'],
                '-b:a', settings['bitrate'],
                '-y',
                str(output_file),
            ]
            return args
    
    
    @command_executor('move_with_counter')
    class MoveWithCounterCommand(AbstractCommand):
        # Split '2018_12_06-spring.mkv' into a '2018_12_06' prefix and '-spring.mkv' suffix.
        filename_parts = re.compile(r'(?P<prefix>^[0-9_]+)(?P<suffix>.*)$')
    
        def validate(self, settings: Settings):
            src, err = self._setting(settings, 'src', True)
            if err:
                return err
            if not src:
                return 'src may not be empty'
            dest, err = self._setting(settings, 'dest', True)
            if err:
                return err
            if not dest:
                return 'dest may not be empty'
    
        async def execute(self, settings: Settings):
            src = Path(settings['src'])
            if not src.exists():
                raise CommandExecutionError('Path %s does not exist, unable to move' % src)
    
            dest = Path(settings['dest'])
            fname_parts = self.filename_parts.match(dest.name)
            if fname_parts:
                prefix = fname_parts.group('prefix') + '_'
                suffix = fname_parts.group('suffix')
            else:
                prefix = dest.stem + '_'
                suffix = dest.suffix
            self._log.debug('Adding counter to output name between %r and %r', prefix, suffix)
            dest = _numbered_path(dest.parent, prefix, suffix)
    
            self._log.info('Moving %s to %s', src, dest)
            await self.worker.register_log('%s: Moving %s to %s', self.command_name, src, dest)
            await self._mkdir_if_not_exists(dest.parent)
    
            shutil.move(str(src), str(dest))
            self.worker.output_produced(dest)
    
    
    
    @command_executor('create_python_file')
    class CreatePythonFile(AbstractCommand):
        def validate(self, settings: Settings):
            filepath, err = self._setting(settings, 'filepath', True)
            if err:
                return err
            if not filepath:
                return 'filepath may not be empty'
            if not filepath.endswith('.py'):
                return 'filepath must end in .py'
    
            dest, err = self._setting(settings, 'contents', True)
            if err:
                return err
    
        async def execute(self, settings: Settings):
            filepath = Path(settings['filepath'])
            await self._mkdir_if_not_exists(filepath.parent)
    
            if filepath.exists():
                msg = f'Overwriting Python file {filepath}'
            else:
                msg = f'Creating Python file {filepath}'
    
            self._log.info(msg)
            await self.worker.register_log('%s: %s', self.command_name, msg)
            await self.worker.register_log('%s: contents:\n%s', self.command_name, settings['contents'])
    
            filepath.write_text(settings['contents'], encoding='utf-8')