Source code for gnes.preprocessor.io_utils.video

#  Tencent is pleased to support the open source community by making GNES available.
#
#  Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
#  Licensed under the Apache License, Version 2.0 (the 'License');
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an 'AS IS' BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

import io
import numpy as np

from typing import List

from .ffmpeg import compile_args, probe
from .helper import _check_input, run_command, run_command_async


[docs]def scale_video(input_fn: str = 'pipe:', output_fn: str = 'pipe:', input_data: bytes = None, start_time: float = None, end_time: float = None, scale: str = None, frame_rate: int = 15, crf: int = 16, vcodec: str = 'libx264', format: str = 'mp4', pix_fmt: str = 'yuv420p', **kwargs): _check_input(input_fn, input_data) capture_stdout = (output_fn == 'pipe:') input_kwargs = {} if start_time is not None: input_kwargs['ss'] = start_time else: start_time = 0. if end_time is not None: input_kwargs['t'] = end_time - start_time out_kwargs = { 'vcodec': vcodec, 'pix_fmt': pix_fmt, 'crf': crf, 'framerate': frame_rate, 'acodec': 'aac', 'strict': 'experimental', # AAC audio encoder is experimental } if scale: out_kwargs['s'] = scale if capture_stdout: out_kwargs['format'] = format # an empty moov means it doesn't need to seek and thus works with a pipe. out_kwargs['movflags'] = 'frag_keyframe+empty_moov' cmd_args = compile_args( input_fn=input_fn, output_fn=output_fn, input_options=input_kwargs, output_options=out_kwargs, overwrite_output=True) stdout, _ = run_command( cmd_args, input=input_data, pipe_stdout=True, pipe_stderr=True) if capture_stdout: return stdout return None
[docs]def encode_video(images: 'np.ndarray', pix_fmt: str = 'rgb24', frame_rate: int = 15, output_fn: str = 'pipe:', vcodec: str = 'libx264', format: str = 'mp4', **kwargs): packet_size = 4096 n = len(images) height, width, channels = images[0].shape capture_stdout = (output_fn == 'pipe:') input_kwargs = { 'format': 'rawvideo', 'pix_fmt': pix_fmt, 'framerate': frame_rate, 's': '{}x{}'.format(width, height), } output_kwargs = { 'vcodec': vcodec, 'r': frame_rate, 'pix_fmt': 'yuv420p', 'format': format, 'movflags': 'frag_keyframe+empty_moov', } cmd_args = compile_args( input_fn='pipe:', output_fn=output_fn, input_options=input_kwargs, output_options=output_kwargs) with run_command_async( cmd_args, pipe_stdin=True, pipe_stdout=capture_stdout, pipe_stderr=True) as proc: input_stream = io.BytesIO(b'') for frame in images: input_stream.write(frame.astype(np.uint8).tobytes()) output, err = proc.communicate(input_stream.getvalue()) if proc.returncode: err = '\n'.join([' '.join(cmd_args), err.decode('utf8')]) raise IOError(err) if proc.stdout is not None: proc.stdout.close() if proc.stderr is not None: proc.stderr.close() return output
[docs]def capture_frames(input_fn: str = 'pipe:', input_data: bytes = None, pix_fmt: str = 'rgb24', fps: int = -1, scale: str = None, start_time: float = None, end_time: float = None, vframes: int = -1, **kwargs) -> List['np.ndarray']: _check_input(input_fn, input_data) import tempfile with tempfile.NamedTemporaryFile() as f: if input_data: f.write(input_data) f.flush() input_fn = f.name video_meta = probe(input_fn) width = video_meta['width'] height = video_meta['height'] if scale is not None: _width, _height = map(int, scale.split(':')) if _width * _height < 0: if _width > 0: ratio = _width / width height = int(ratio * height) if _height == -2: height += height % 2 width = _width else: ratio = _height / height width = int(ratio * width) if _width == -2: width += width % 2 height = _height scale = '%d:%d' % (width, height) else: width = _width height = _height input_kwargs = { 'err_detect': 'aggressive', 'fflags': 'discardcorrupt' # discard corrupted frames } if start_time is not None: input_kwargs['ss'] = str(start_time) else: start_time = 0. if end_time is not None: input_kwargs['t'] = str(end_time - start_time) video_filters = [] if fps: video_filters += ['fps=%d' % fps] if scale: video_filters += ['scale=%s' % scale] output_kwargs = { 'format': 'image2pipe', 'pix_fmt': pix_fmt, 'vcodec': 'rawvideo', 'movflags': 'faststart', } if vframes > 0: output_kwargs['vframes'] = vframes cmd_args = compile_args( input_fn=input_fn, input_options=input_kwargs, video_filters=video_filters, output_options=output_kwargs) out, _ = run_command(cmd_args, pipe_stdout=True, pipe_stderr=True) depth = 3 if pix_fmt == 'rgba': depth = 4 frames = np.frombuffer(out, np.uint8).copy() frames = frames.reshape([-1, height, width, depth]) return frames
# def read_frame_as_jpg(in_filename, frame_num): # out, err = ( # ffmpeg # .input(in_filename) # .filter_('select', 'gte(n,{})'.format(frame_num)) # .output('pipe:', vframes=1, format='image2', vcodec='mjpeg') # .run(capture_stdout=True) # ) # return out