Source code for bob.bio.video.utils

import logging
import pickle

import h5py
import imageio
import numpy as np

from bob.bio.base import selected_indices
from bob.io.image import to_bob
from bob.pipelines import wrap

from .transformer import VideoWrapper

logger = logging.getLogger(__name__)


[docs]def video_wrap_skpipeline(sk_pipeline): """ This function takes a `sklearn.Pipeline` and wraps each estimator inside of it with :any:`bob.bio.video.transformer.VideoWrapper` """ for i, name, estimator in sk_pipeline._iter(): # 1. Unwrap the estimator # If the estimator is `Sample` wrapped takes `estimator.estimator`. transformer = ( estimator.estimator if hasattr(estimator, "estimator") else estimator ) # 2. do a video wrap transformer = VideoWrapper(transformer) # 3. Sample wrap again transformer = wrap( ["sample"], transformer, fit_extra_arguments=estimator.fit_extra_arguments, transform_extra_arguments=estimator.transform_extra_arguments, ) sk_pipeline.steps[i] = (name, transformer) return sk_pipeline
[docs]def select_frames( count, max_number_of_frames=None, selection_style=None, step_size=None ): """Returns indices of the frames to be selected given the parameters. Different selection styles are supported: * first : The first frames are selected * spread : Frames are selected to be taken from the whole video with equal spaces in between. * step : Frames are selected every ``step_size`` indices, starting at ``step_size/2`` **Think twice if you want to have that when giving FrameContainer data!** * all : All frames are selected unconditionally. Parameters ---------- count : int Total number of frames that are available max_number_of_frames : int The maximum number of frames to be selected. Ignored when selection_style is "all". selection_style : str One of (``first``, ``spread``, ``step``, ``all``). See above. step_size : int Only useful when ``selection_style`` is ``step``. Returns ------- range A range of frames to be selected. Raises ------ ValueError If ``selection_style`` is not one of the supported ones. """ # default values if max_number_of_frames is None: max_number_of_frames = 20 if selection_style is None: selection_style = "spread" if step_size is None: step_size = 10 if selection_style == "first": # get the first frames (limited by all frames) indices = range(0, min(count, max_number_of_frames)) elif selection_style == "spread": # get frames lineraly spread over all frames indices = selected_indices(count, max_number_of_frames) elif selection_style == "step": indices = range(step_size // 2, count, step_size)[:max_number_of_frames] elif selection_style == "all": indices = range(0, count) else: raise ValueError(f"Invalid selection style: {selection_style}") return indices
def no_transform(x): return x class VideoAsArray: """A memory efficient class to load only select video frames. It also supports efficient conversion to dask arrays. """ def __init__( self, path, selection_style=None, max_number_of_frames=None, step_size=None, transform=None, **kwargs, ): """init Parameters ---------- path : str Path to the video file selection_style : str, optional See :any:`select_frames`, by default None max_number_of_frames : int, optional See :any:`select_frames`, by default None step_size : int, optional See :any:`select_frames`, by default None transform : callable, optional A function that transforms the loaded video. This function should not change the video shape or its dtype. For example, you may flip the frames horizontally using this function, by default None """ super().__init__(**kwargs) self.path = path self.reader = imageio.get_reader(path) self.dtype = np.uint8 shape = (self.reader.count_frames(), 3) + self.reader.get_meta_data()[ "size" ][::-1] self.ndim = len(shape) self.selection_style = selection_style indices = select_frames( count=self.reader.count_frames(), max_number_of_frames=max_number_of_frames, selection_style=selection_style, step_size=step_size, ) self.indices = indices self.shape = (len(indices),) + shape[1:] self.transform = transform or no_transform def __getstate__(self): d = self.__dict__.copy() d.pop("reader") return d def __setstate__(self, state): self.__dict__.update(state) self.reader = imageio.get_reader(self.path) def __len__(self): return self.shape[0] def __getitem__(self, index): # logger.debug("Getting frame %s from %s", index, self.path) # In this method, someone is requesting indices thinking this video has # the shape of self.shape but self.shape is determined through # select_frames parameters. What we want to do here is to translate # ``index`` to real indices of the video file given that we want to load # only the selected frames. List of the selected frames are stored in # self.indices # If only one frame is requested, first translate the index to the real # frame number in the video file and load that if isinstance(index, int): idx = self.indices[index] return self.transform( np.asarray([to_bob(self.reader.get_data(idx))]) )[0] if not ( isinstance(index, tuple) and len(index) == self.ndim and all(isinstance(idx, slice) for idx in index) ): raise NotImplementedError( f"Indexing like {index} is not supported yet!" ) # dask.array.from_array sometimes requests empty arrays if all(i == slice(0, 0) for i in index): return np.array([], dtype=self.dtype) def _frames_generator(): # read the frames one by one and yield them real_frame_numbers = self.indices[index[0]] for i, frame in enumerate(self.reader): frame = to_bob(frame) if i not in real_frame_numbers: continue # make sure arrays are loaded in C order because we reshape them # by C order later. Also, index into the frames here frame = np.ascontiguousarray(frame)[index[1:]] # return a tuple of flat array to match what is expected by # field_dtype yield (frame.ravel(),) if i == real_frame_numbers[-1]: break iterable = _frames_generator() # compute the final shape given self.shape and index # see https://stackoverflow.com/a/36188683/1286165 shape = [ len(range(*idx.indices(dim))) for idx, dim in zip(index, self.shape) ] # field_dtype contains information about dtype and shape of each frame # numpy black magic: https://stackoverflow.com/a/12473478/1286165 allows # us to yield frame by frame in _frames_generator which greatly speeds # up loading the video field_dtype = [("", (self.dtype, (np.prod(shape[1:]),)))] total_number_of_frames = shape[0] video = np.fromiter(iterable, field_dtype, total_number_of_frames) # view the array as self.dtype to remove the field_dtype video = np.reshape(video.view(self.dtype), shape, order="C") return self.transform(video) def __repr__(self): return f"VideoAsArray: {self.path!r} {self.dtype!r} {self.ndim!r} {self.shape!r} {self.indices!r}" class VideoLikeContainer: def __init__(self, data, indices, **kwargs): super().__init__(**kwargs) self.data = data self.indices = indices def __repr__(self) -> str: return f"VideoLikeContainer: {self.data!r} {self.indices!r}" @property def dtype(self): return self.data.dtype @property def shape(self): return self.data.shape @property def ndim(self): return self.data.ndim def __len__(self): return len(self.data) def __getitem__(self, item): # we need to throw IndexErrors here because h5py throws ValueErrors # instead and this breaks loops on this class if isinstance(item, int) and item >= len(self): raise IndexError(f"Index ({item}) out of range (0-{len(self)-1})") return self.data[item] def __array__(self, dtype=None, *args, **kwargs): return np.asarray(self.data, dtype, *args, **kwargs) def __eq__(self, o: object) -> bool: return np.array_equal(self.data, o.data) and np.array_equal( self.indices, o.indices )
[docs] def save(self, file): self.save_function(self, file)
[docs] @staticmethod def save_function(other, file): try: with h5py.File(file, mode="w") as f: f["data"] = other.data f["indices"] = other.indices # revert to saving data in pickles when the dtype is not supported by hdf5 except TypeError: with open(file, "wb") as f: pickle.dump({"data": other.data, "indices": other.indices}, f)
[docs] @classmethod def load(cls, file): try: # weak closing of the hdf5 file so we don't load all the data into # memory https://docs.h5py.org/en/stable/high/file.html#closing-files f = h5py.File(file, mode="r") loaded = {"data": f["data"], "indices": list(f["indices"])} except OSError: with open(file, "rb") as f: loaded = pickle.load(f) self = cls(**loaded) return self