Skip to content
Snippets Groups Projects
commands.py 45.7 KiB
Newer Older
  • Learn to ignore specific revisions
  •         if frame_end is not None:
                py_lines.append(f'bpy.context.scene.frame_end = {frame_end}')
    
            py_lines.append(f"bpy.ops.sound.mixdown(filepath={render_output!r}, "
                            f"codec='FLAC', container='FLAC', "
                            f"accuracy=128)")
            py_lines.append('bpy.ops.wm.quit_blender()')
            py_script = '\n'.join(py_lines)
    
            return [
                *settings['blender_cmd'],
                '--enable-autoexec',
                '-noaudio',
                '--background',
                settings['filepath'],
                '--python-exit-code', '47',
                '--python-expr', py_script
            ]
    
    
    class AbstractFFmpegCommand(AbstractSubprocessCommand, abc.ABC):
    
    
        def validate(self, settings: Settings) -> typing.Optional[str]:
    
            # Check that FFmpeg can be found and shlex-split the string.
            ffmpeg_cmd, err = self._setting(settings, 'ffmpeg_cmd', is_required=False, default='ffmpeg')
    
    
            cmd = shlex.split(ffmpeg_cmd)
            executable_path: typing.Optional[str] = shutil.which(cmd[0])
    
            if not executable_path:
    
                return f'FFmpeg command {ffmpeg_cmd!r} not found on $PATH'
            settings['ffmpeg_cmd'] = cmd
    
            self._log.debug('Found FFmpeg command at %r', executable_path)
    
            return None
    
        async def execute(self, settings: Settings) -> None:
            cmd = self._build_ffmpeg_command(settings)
            await self.subprocess(cmd)
    
        def _build_ffmpeg_command(self, settings: Settings) -> typing.List[str]:
            assert isinstance(settings['ffmpeg_cmd'], list), \
                'run validate() before _build_ffmpeg_command'
            cmd = [
                *settings['ffmpeg_cmd'],
                *self.ffmpeg_args(settings),
            ]
            return cmd
    
        @abc.abstractmethod
        def ffmpeg_args(self, settings: Settings) -> typing.List[str]:
            """Construct the FFmpeg arguments to execute.
    
            Does not need to include the FFmpeg command itself, just
            its arguments.
            """
            pass
    
    
    @command_executor('create_video')
    class CreateVideoCommand(AbstractFFmpegCommand):
        """Create a video from individual frames.
    
        Requires FFmpeg to be installed and available with the 'ffmpeg' command.
        """
    
        codec_video = 'h264'
    
        # Select some settings that are useful for scrubbing through the video.
    
        constant_rate_factor = 23
        keyframe_interval = 18  # GOP size
    
        max_b_frames: typing.Optional[int] = 0
    
        def validate(self, settings: Settings) -> typing.Optional[str]:
            err = super().validate(settings)
            if err:
                return err
    
    
            # Check that we know our input and output image files.
            input_files, err = self._setting(settings, 'input_files', is_required=True)
            if err:
                return err
            self._log.debug('Input files: %s', input_files)
            output_file, err = self._setting(settings, 'output_file', is_required=True)
            if err:
                return err
            self._log.debug('Output file: %s', output_file)
    
            fps, err = self._setting(settings, 'fps', is_required=True, valtype=(int, float))
            if err:
                return err
            self._log.debug('Frame rate: %r fps', fps)
            return None
    
    
        def ffmpeg_args(self, settings: Settings) -> typing.List[str]:
            args = [
    
                '-pattern_type', 'glob',
    
                '-i', settings['input_files'],
                '-c:v', self.codec_video,
                '-crf', str(self.constant_rate_factor),
                '-g', str(self.keyframe_interval),
    
            ]
            if self.max_b_frames is not None:
    
                args.extend(['-bf', str(self.max_b_frames)])
            args += [
    
                settings['output_file']
            ]
    
            return args
    
    
    @command_executor('concatenate_videos')
    class ConcatenateVideosCommand(AbstractFFmpegCommand):
        """Create a video by concatenating other videos.
    
        Requires FFmpeg to be installed and available with the 'ffmpeg' command.
        """
    
        index_file: typing.Optional[Path] = None
    
        def validate(self, settings: Settings) -> typing.Optional[str]:
            err = super().validate(settings)
            if err:
                return err
    
            # Check that we know our input and output image files.
            input_files, err = self._setting(settings, 'input_files', is_required=True)
            if err:
                return err
            self._log.debug('Input files: %s', input_files)
            output_file, err = self._setting(settings, 'output_file', is_required=True)
            if err:
                return err
            self._log.debug('Output file: %s', output_file)
    
            return None
    
        async def execute(self, settings: Settings) -> None:
            await super().execute(settings)
    
            if self.index_file is not None and self.index_file.exists():
                try:
                    self.index_file.unlink()
                except IOError:
                    msg = f'unable to unlink file {self.index_file}, ignoring'
    
                    await self.worker.register_log(msg)
    
                    self._log.warning(msg)
    
        def ffmpeg_args(self, settings: Settings) -> typing.List[str]:
            input_files = Path(settings['input_files']).absolute()
            self.index_file = input_files.with_name('ffmpeg-input.txt')
    
            # Construct the list of filenames for ffmpeg to process.
            # The index file needs to sit next to the input files, as
            # ffmpeg checks for 'unsafe paths'.
            with self.index_file.open('w') as outfile:
                for video_path in sorted(input_files.parent.glob(input_files.name)):
                    escaped = str(video_path.name).replace("'", "\\'")
                    print("file '%s'" % escaped, file=outfile)
    
            output_file = Path(settings['output_file'])
            self._log.debug('Output file: %s', output_file)
    
            args = [
                '-f', 'concat',
                '-i', str(self.index_file),
                '-c', 'copy',
                '-y',
                str(output_file),
            ]
            return args
    
    
    @command_executor('mux_audio')
    class MuxAudioCommand(AbstractFFmpegCommand):
    
        def validate(self, settings: Settings) -> typing.Optional[str]:
            err = super().validate(settings)
            if err:
                return err
    
            # Check that we know our input and output image files.
            audio_file, err = self._setting(settings, 'audio_file', is_required=True)
            if err:
                return err
            if not Path(audio_file).exists():
                return f'Audio file {audio_file} does not exist'
            self._log.debug('Audio file: %s', audio_file)
    
            video_file, err = self._setting(settings, 'video_file', is_required=True)
            if err:
                return err
            if not Path(video_file).exists():
                return f'Video file {video_file} does not exist'
            self._log.debug('Video file: %s', video_file)
    
            output_file, err = self._setting(settings, 'output_file', is_required=True)
            if err:
                return err
            self._log.debug('Output file: %s', output_file)
    
            return None
    
        def ffmpeg_args(self, settings: Settings) -> typing.List[str]:
            audio_file = Path(settings['audio_file']).absolute()
            video_file = Path(settings['video_file']).absolute()
            output_file = Path(settings['output_file']).absolute()
    
            args = [
                '-i', str(audio_file),
                '-i', str(video_file),
                '-c', 'copy',
                '-y',
                str(output_file),
            ]
            return args
    
    
    @command_executor('encode_audio')
    class EncodeAudioCommand(AbstractFFmpegCommand):
    
        def validate(self, settings: Settings) -> typing.Optional[str]:
            err = super().validate(settings)
            if err:
                return err
    
            # Check that we know our input and output image files.
            input_file, err = self._setting(settings, 'input_file', is_required=True)
            if err:
                return err
            if not Path(input_file).exists():
                return f'Audio file {input_file} does not exist'
            self._log.debug('Audio file: %s', input_file)
    
            output_file, err = self._setting(settings, 'output_file', is_required=True)
            if err:
                return err
            self._log.debug('Output file: %s', output_file)
    
            _, err = self._setting(settings, 'bitrate', is_required=True)
            if err:
                return err
            _, err = self._setting(settings, 'codec', is_required=True)
            if err:
                return err
            return None
    
        def ffmpeg_args(self, settings: Settings) -> typing.List[str]:
            input_file = Path(settings['input_file']).absolute()
            output_file = Path(settings['output_file']).absolute()
    
            args = [
                '-i', str(input_file),
                '-c:a', settings['codec'],
                '-b:a', settings['bitrate'],
                '-y',
                str(output_file),
            ]
            return args
    
    
    @command_executor('move_with_counter')
    class MoveWithCounterCommand(AbstractCommand):
        # Split '2018_12_06-spring.mkv' into a '2018_12_06' prefix and '-spring.mkv' suffix.
        filename_parts = re.compile(r'(?P<prefix>^[0-9_]+)(?P<suffix>.*)$')
    
        def validate(self, settings: Settings):
            src, err = self._setting(settings, 'src', True)
            if err:
                return err
            if not src:
                return 'src may not be empty'
            dest, err = self._setting(settings, 'dest', True)
            if err:
                return err
            if not dest:
                return 'dest may not be empty'
    
        async def execute(self, settings: Settings):
            src = Path(settings['src'])
            if not src.exists():
                raise CommandExecutionError('Path %s does not exist, unable to move' % src)
    
            dest = Path(settings['dest'])
            fname_parts = self.filename_parts.match(dest.name)
            if fname_parts:
                prefix = fname_parts.group('prefix') + '_'
                suffix = fname_parts.group('suffix')
            else:
                prefix = dest.stem + '_'
                suffix = dest.suffix
            self._log.debug('Adding counter to output name between %r and %r', prefix, suffix)
            dest = _numbered_path(dest.parent, prefix, suffix)
    
            self._log.info('Moving %s to %s', src, dest)
            await self.worker.register_log('%s: Moving %s to %s', self.command_name, src, dest)
            await self._mkdir_if_not_exists(dest.parent)
    
            shutil.move(str(src), str(dest))
            self.worker.output_produced(dest)