Source code for bob.io.stream.stream

# -*- coding: utf-8 -*-
"""Processing pipeline implementation.

This module implements the base class :class:`~bob.io.stream.Stream` which allows to build processing pipeline for data
stored in hdf5 files. The :class:`~bob.io.stream.Stream` implements a numpy-like interface to load or write and buffer 
data from file through the :class:`~bob.io.stream.StreamFile` class. 

Through the :func:`~bob.io.stream.stream_filter` decorator, the :class:`~bob.io.stream.Stream` class can be enriched 
with filters functions that implements data processing, which allows the easy definition of a processing pipeline by 
cascading filters. Some core functionalities of the :class:`~bob.io.stream.Stream` such as slicing 
(:class:`~bob.io.stream.StreamView`) class are also implemented using filters.

In order to implement new functionalities, one can either use the :class:`~bob.io.stream.StreamFilter` filter and 
provide a "process_frame" function, or use the :func:`~bob.io.stream.stream_filter` decorator. In order to have the the 
new filter accessible when using "import bob.io.stream", it is necessary to import the newly defined filter in the 
`__init__.py`.

Example
-------
Image processing using stream filters: 

    >>> from bob.io.stream import StreamFile, Stream
    >>> # Open a Streamfile to a hdf5 file.
    >>> f = StreamFile("input_example.h5", idiap_face_streams_config.json")
    >>> # Define streams
    >>> color = Stream("color", f)
    >>> swir_dark = Stream("swir", f)
    >>> swir_940 = Stream("swir_940nm", f)
    >>> # Define pipelines using filters
    >>> swir_dark = swir_dark.adjust(color)
    >>> swir_940 = swir_940.adjust(color).subtract(swir_dark).clean()
    >>> swir_940.load(0)  # Loads data and apply processing for the first frame.
"""


from builtins import str

import numpy as np
from scipy.spatial import cKDTree

from .utils import StreamArray, get_index_list, get_axis_size
from .stream_file import StreamFile


class Stream:
    """Base class implementing methods to load/write, process and use data from hdf5 file with a "numpy-like" api.

    This class is designed to provide the following functionalities:

    - Easily define chain of processing and loading data. When accessing data through a stream, it will recursively call
      its `parent` :meth:`~bob.io.stream.Stream.load` function before its own. For instance if a stream's parent is a 
      StreamFile, loading data through the stream will first load the data (from the dataset specified by the stream's 
      `name`) from the hdf5 through the StreamFile before applying its own processing.  
    - Provide an easy syntax to implement this chain processing. This is achieved through the 
      :func:`~bob.io.stream.stream_filter` filter decorator which adds to the Stream class filters members, allowing them to 
      be used in the following fashion::

        example_stream = Stream("cam1").normalize().stack(Stream("cam2").normalize())

      The data loaded through example_stream will thus load data from "cam1" and normalize it, then load data from 
      "cam2" and normalize it, and finally stack the two together.

    - In a similar fashion to the chain processing, this class allows to apply processing in the reverse order to write
      data in a hdf5 file. This is implemented in the :meth:`~bob.io.stream.Stream.put` method and uses the `child`
      attribute.

    The api is designed to be similar to numpy arrays:

    - Data access (processing and loading) is done using [].
    - Taking a slice in a stream returns a new stream with the sliced data. (This is implemented with the 
      :class:`~bob.io.stream.StreamView` filter).

    To reduce disk access, the result of loading or processing is buffered. 
    
    The class was initially designed to work with video streams, therefore :class:`~bob.io.stream.StreamArray` members 
    are available to provide an easy way to use bounding boxes or landmarks for each frame in the stream.
    Additionally, the :obj:`~bob.io.stream.Stream.timestamps` member are the timestamps of each frame in the stream.

    Attributes
    ----------
    name : str
        Name of the stream. If `parent` is a :class:`~bob.io.stream.StreamFile`, it will be used to know from which 
        dataset in the hdf5 file the data should be taken. Otherwise it is an identifier of the Stream (or 
        StreamFilter) functionality (eg "adjust", "normalize", ...).
    parent : :obj:`~bob.io.stream.Stream` or :obj:`~bob.io.stream.StreamFile`
        The element before this instance in the chain of processing for *loading* data. The parent's "load" function will 
        recursively be used before this instance's one.
    child : :obj:`~bob.io.stream.Stream` or :obj:`~bob.io.stream.StreamFile`
        The element after this instance in the chain of processing for *writing* data. When 
        :meth:`~bob.io.stream.Stream.put` is called, it will perform its function then recursively call its `child`'s.
    _loaded : :obj:`list` of int
        Indices of the data that is currently buffered.
    _data : :obj:`numpy.ndarray`
        Buffered data.
    _shape : :obj:`tuple` of int
        Shape of the stream's data. This member is mostly used when writing data, while when reading the 
        :obj:`~bob.io.stream.Stream.shape` property is used.
    """

    filters = []

    def __init__(self, name=None, parent=None):
        self.name = name
        self.parent = parent
        self.child = None
        if isinstance(parent, Stream):  # Sets Parent's child as this instance if needed.
            parent.child = self
        self.reset()

[docs] def reset(self): """Deletes buffered data and meta-data.""" self._loaded = None self._data = None self._shape = None self.__bounding_box = StreamArray(self) self.__image_points = StreamArray(self)
@property def source(self): """Source file of the Stream's data. While `parent` points to the previous stream in the chain of processing, `source` points directly to the data file. Returns ------- :obj:`~bob.io.stream.StreamFile` File containing the stream's data, before processing. """ if isinstance(self.parent, StreamFile) or self.parent is None: return self.parent else: return self.parent.source @source.setter def source(self, src): self.set_source(src)
[docs] def set_source(self, src): """Recursively set `source` of self and `parent`. Parameters ---------- src : :obj:`~bob.io.stream.StreamFile` The file containing the raw data of this stream and parents. """ if isinstance(self.parent, StreamFile) or self.parent is None: self.parent = src else: self.parent.set_source(src) self.reset()
@property def config(self): """Configuration dictionary to access the data in the hdf5 file. Returns ------- :obj:`dict` Config. """ if isinstance(self.parent, StreamFile): return self.parent.get_stream_config(self.name) else: return self.parent.config @config.setter def config(self, value): raise NotImplementedError @property def shape(self): """Shape of the stream's data. When reading data, the shape of the stream is typically defined by the shape of the data in `source`, therefore the shape is recursively set to `parent` as well. However, when writing data, the shape is defined by the user, and the stream's parent might not be set. In this case, we store the shape in `_shape`. Raises ------ Exception If trying to set the shape when it is already defined (by a parent StreamFile). ValueError If setting the shape with an invalid type. Returns ------- :obj:`tuple` of int Shape. """ # if parent is file return dataset shape if isinstance(self.parent, StreamFile): return self.parent.get_stream_shape(self.name) # if has parent return parent shape elif self.parent is not None: return self.parent.shape # if parent is None return internal _shape # This case typically happen when writing data: in this case the shape is not defined by the data in the source. else: return self._shape @shape.setter def shape(self, value): # can only set shape if undefined if self.shape is not None: raise Exception("shape is already set") # set dataset dimension (should this be allowed?) if isinstance(self.parent, StreamFile): self.parent.set_dataset_shape(name=self.name, shape=value) # set recursively elif self.parent is not None: self.parent.shape = value else: if isinstance(value, tuple): self._shape = value else: raise ValueError("shape must be a tuple of int or None") @property def ndim(self): """Number of dimension in the stream's data. Returns ------- int Number of dimension. """ return len(self.shape) @property def timestamps(self): """Timestamp of each frame in the stream's data. Returns ------- :obj:`numpy.ndarray` Timestamps. """ if isinstance(self.parent, StreamFile): return self.parent.get_stream_timestamps(self.name) else: return self.parent.timestamps @timestamps.setter def timestamps(self, value): raise NotImplementedError @property def bounding_box(self): """Bounding box at each frame in the stream. A StreamArray member is provided to allow the user easily store their bounding boxes with the stream's data. Returns ------- :obj:`~bob.io.stream.StreamArray` Bounding boxes. """ if isinstance(self.parent, StreamFile): return self.__bounding_box else: return self.parent.bounding_box @property def image_points(self): """Landmarks at each frame in the stream. A StreamArray member is provided to allow the user easily store their landmark points with the stream's data. Returns ------- :obj:`~bob.io.stream.StreamArray` Landmarks. """ if isinstance(self.parent, StreamFile): return self.__image_points else: return self.parent.image_points def __getitem__(self, index): """Data access: returns either a frame (if index is int) or a StreamView (if index is a slice or a tuple). Implementation is similar to numpy's: when only 1 frame is requested (`index` is an int), return the frame - not a Stream containing only 1 frame ; when several frames are requested, return a Stream containing the requested frames. This last case is implemented using the :class:`~bob.io.stream.Stream.StreamView". Parameters ---------- index : int or :obj:`slice` or :obj:`tuple` of int Indices of the data to load. Returns ------- :obj:`numpy.ndarray` or :obj:`~bob.io.stream.Stream` A frame if only one was requested. If a slice was requested, return a :obj:`~bob.io.stream.StreamView`. Raises ------ ValueError If indices in `index` are of the wrong type. ValueError If index does not have a supported type. """ # case 1: index is int: load and return a frame. if isinstance(index, int): data = self.load(index) return data[0] # case 2: index is slice: it acts on first dimension only so pack it in a tuple elif isinstance(index, slice): view_indices = (index,) # case 3: index is tuple: validate each index elif isinstance(index, tuple): # we do not check dimensionality here, because it can be varying if the pipeline has no source for i in index: if isinstance(i, int): pass elif isinstance(i, slice): pass else: raise ValueError( "Got index " + str(i) + " of type " + str(type(i)) + " in " + str(index) + ", but only support int or slice." ) view_indices = index else: raise ValueError( "index can only be int, slice or tuple, but got " + str(index) + " of type " + str(type(index)) ) # case 2 & 3: return stream_view return self.view(view_indices=view_indices) def __setitem__(self, index, data): raise NotImplementedError
[docs] def load(self, index=None): """Load data directly. Unlike accessing stream data through brackets [], this method always returns the data, not a Stream. This method is overloaded in :class:`~bob.io.stream.StreamFilter`, in order to call `parent` load method first and apply processing on the result. The loaded data is buffered to reduce disk access. Parameters ---------- index : int or :obj:`list` Indices of the frames to load, by default None. Returns ------- :obj:`numpy.ndarray` Data at `index`. """ indices = get_index_list(index, self.shape[0]) # return buffered data OR load from file if self._loaded == indices and self._data is not None: pass else: self._data = self.parent.load_stream_data(self.name, indices) # buffer loaded indices self._loaded = indices return self._data
# TODO define behaviour across all states
[docs] def put(self, data, timestamp=None): """Recursivelly pass `data` down to `child` to write in hdf5File. :class:`~bob.io.stream.StreamFilter` overloads this method to process `data` with the filter function before passing down to child. Parameters ---------- data : :obj:`numpy.ndarray` data to write to file. timestamp : int or float Timestamp of `data`, by default None. Raises ------ ValueError If `data`'s shape does not match with previous frames' shape or with stream's shape.. """ # set _shape to keep track of size of frames if self._shape is None: self._shape = tuple([None, *data.shape]) # check if same shape elif data.shape != self._shape[1:]: raise ValueError( "Expected data with shape " + str(data.shape) + " to have same shape " + str(self._shape[1:]) + ' as previous frames. Use "reset"' " to reset the stream's shape." ) # pass down the chain for writing. if self.child is not None: self.child.put(data, timestamp)
[docs] def get_available_filters(self): """Get a list of the available filters to use with :class:`~bob.io.stream.Stream` class. Note: Stream.filters is filled in with the name of the filters by the :func:`~bob.io.stream.stream_filter` decorator, each time a class is decorated. Returns ------- :obj:`list` of str List of available filters in the :class:`~bob.io.stream.Stream` class. The filters can be used as "stream.filter()" """ return Stream.filters
[docs] def get_parent(self): """Return this stream's parent (None if `parent` is not set) Returns ------- :obj:`~bob.io.stream.Stream` This stream's parent. """ return self.parent
################################################################## # Decorator to add Filters to the Stream class def stream_filter(name): """Adds the filter with `name` to the :class:`~bob.io.stream.Stream` class. This decorator function is meant to be used on a filter class that inherits the :class:`~bob.io.stream.Stream` class. It adds this filter to the :class:`~bob.io.stream.Stream` class so it can be used directly as a member. It also adds it to the :obj:`~bob.io.stream.Stream.filters` list. For example, see the :class:`~bob.io.stream.StreamView` filter. Parameters ---------- name : str Name of the filter """ def wrapper(Filter): def filter(self, *args, **kwargs): return Filter(name=name, parent=self, *args, **kwargs) # add the filter to the Stream class. setattr(Stream, name, filter) # Append it to the filtes list. Stream.filters.append(name) return Filter return wrapper ################################################################## # StreamFilter class: base class for filters. @stream_filter("filter") class StreamFilter(Stream): """Base filter class: overloads the :meth:`bob.io.stream.Stream.load` and :meth:`bob.io.stream.Stream.put` methods to insert the filter processing. This class implements the :meth:`~bob.io.stream.StreamFilter.process` and :meth:`bob.io.stream.StreamFilter.process_frame` methods, which define the processing operated by the filter. A "process_frame" method can be receive in argument, in which case it will be applied to each frame of data in :meth:`~bob.io.stream.StreamFilter.process_frame`. If not provided, this filter doesn't perform any processing, however it provides the definition of the processing methods which can be overloaded in inheriting classes. See for example :class:`~bob.io.stream.StreamView` filter. The :meth:`bob.io.stream.Stream.load` is overloaded to first perform the filter's `parent` processing (or loading if `parent` is not a filter) The :meth:`bob.io.stream.Stream.put` methods is overloaded to first perform the processing of the filter, then pass the data down to `child` to further process or write on disk. Attributes ---------- filter_name : str The name of this filter. `name` (from class :class:`bob.io.stream.Stream`) is kept separate because it is used to know from which dataset to load data in the hdf5. """ def __init__(self, name, parent, process_frame=None): """Register `process_frame` if provided, and intialize super. Parameters ---------- name : str "filter": identifier name to use this filter from the :obj:`~bob.io.stream.Stream` class. parent : :obj:`~bob.io.stream.Stream` or :obj:`~bob.io.stream.StreamFilter` Parent Stream(Filter). process_frame : :obj:`Callable` Process_frame function, by default None. """ self.__process_frame = process_frame self.filter_name = name super().__init__(name=parent.name, parent=parent)
[docs] def process(self, data, indices): """Apply the filter on each frame of data, and stack the results back in one array. Parameters ---------- data : :obj:`numpy.ndarray` Data to process. indices : :obj:`list` of int Indices of `data` in the stream. Unused here, but usefull for instance for filters that combine two streams together. Returns ------- :obj:`numpy.ndarray` Processed data. Raises ------ ValueError If indices is not a list. """ if not isinstance(indices, list): raise ValueError("Indices should be a list, but got " + str(type(indices))) return np.stack([self.process_frame(data[i], i, indices[i]) for i in range(data.shape[0])])
[docs] def process_frame(self, data, data_index, stream_index): """Apply `self.__process_frame` if possible, otherwise simply return data. Parameters ---------- data : :obj:`numpy.ndarray` Data (one frame) to process. data_index : int Not used. Index of `data` in the stream. stream_index : int Not used. Index of the stream from which `data` comes, to be used by filters that combine several streams. Returns ------- :obj:`numpy.ndarray` Processed frame. """ if self.__process_frame is not None: return self.__process_frame(data) else: return data
# load one or several frames
[docs] def load(self, index=None): """Overload :meth:`bob.io.stream.Stream.load` to apply the filter processing what `parent` loaded. Parameters ---------- index : int or :obj:`list` of int Indices of the frames to load, by default None. Returns ------- :obj:`numpy.ndarray` The processed data. """ indices = get_index_list(index, self.shape[0]) # return buffered data OR process parent's data. if self._loaded == indices and self._data is not None: # print('loaded', self.name) pass else: self._data = self.process(self.parent.load(indices), indices) # buffer loaded indices. self._loaded = indices return self._data
# TODO define behaviour across all states
[docs] def put(self, data, timestamp=None): """Apply filter's processsing, then pass down `data` to child for further processing or save on disk. Parameters ---------- data : :obj:`numpy.ndarray` Data (one frame) to process. timestamp : int or float Timestamp of `data` in the stream, by default None. """ # set _shape to keep track of size of frames if self._shape is None: self._shape = tuple([None, *data.shape]) # check if same shape elif data.shape != self._shape[1:]: raise ValueError( "Expected data with shape " + str(data.shape) + " to have same shape " + str(self._shape[1:]) + " as previous frames." ) indices = [-1] # data is appended to the previous frames, so index can be -1: implies frame is the latest. data = np.stack([data]) # create a set of frames (with 1 frame) to match what `process` is expecting. data = self.process(data, indices) data = data[0] # go back to manipulating only 1 frame. # buffer processed index and data. self._loaded = indices self._data = data # pass down to children until writing to disk. if self.child is not None: self.child.put(data, timestamp)
################################################################################ # Filters implementing important functionalities of the Stream class
[docs]@stream_filter("view") class StreamView(StreamFilter): """Filter to implement "slicing" functionality for the :class:`bob.io.stream.Stream` class. Similarly to numpy's "view", this filter allows to take a slice in a stream without creating a copy of the data. Attributes ---------- frame_view : :obj:`slice` or None Slice value in the first dimension of the stream (along the frame's axis). None means no slicing: take the whole array. bulk_view : :obj:`tuple` of int or :obj:`slice` or None Slice value along the other axis in the stream. """ def __init__(self, name, parent, view_indices=None): """Initializes the stream and checks that the requested slice is properly defined. Parameters ---------- name : str "view": identifier name to use this filter from the :obj:`~bob.io.stream.Stream` class. parent : :obj:`~bob.io.stream.Stream` or :obj:`~bob.io.stream.StreamFilter` Parent Stream(Filter). view_indices : :obj:`tuple` of int or :obj:`slice` Indices defining the slice, by default None (meaning no slicing: take the whole array). Raises ------ ValueError If indices type are not supported. ValueError If `view_indices` type is not supported. ValueError If the slice would result in an empty stream. ValueError If the requested slice has more dimension than are available in the stream. """ super().__init__(name=name, parent=parent) self.frame_view = None self.bulk_view = None if isinstance(view_indices, tuple): # separate frame (axis = 0) and bulk (axis > 0) views self.frame_view = view_indices[0] # TODO should case with int on frame index be allowed? if isinstance(self.frame_view, int): raise ValueError( "Can not slice into a single frame. To perform this operation, select first the frame and apply " "slicing on the numpy array." ) # bulk views if len(view_indices) > 1: self.bulk_view = view_indices[1:] for e in self.bulk_view: if isinstance(e, int) or isinstance(e, slice) or e is None: pass else: raise ValueError( "Incorrect type of index: received " + str(type(e)) + " in " + str(view_indices) + ". Indices should be a tuple of int/slice or None." ) elif view_indices is None: pass else: raise ValueError( "Indices should be a tuple of int/slice or None, but got " + str(view_indices) + " of type " + str(type(view_indices)) + "." ) if not all(self.shape): # true if a dimension in shape is 0: resulting stream has no data. if not all(self.shape): raise ValueError( str(view_indices) + " in stream with shape " + str(parent.shape) + " results in empty stream. (shape " + str(self.shape) + ")" ) # If slice operates on more dimension than available. if self.bulk_view is not None and self.parent.ndim <= len(self.bulk_view): raise ValueError( "Got " + str(len(self.bulk_view) + 1) + " indices for stream with shape " + str(self.parent.shape) + ". Too many indices !" ) # shape property @property def shape(self): """Shape of the stream's data. The shape is computed with respect to the parent's shape, because `source` might not be set so we can not know the shape of the data. If the requested slice has a integer index along one axis, this dimension is dropped. However, taking an integer along the first axis is not allowed (Exception raised in __init__). Returns ------- :obj:`tuple` of int Shape of the stream's data. """ # first dimension ... __shape = [get_axis_size(self.parent.shape, 0, self.frame_view)] # ... and others for d in range(1, self.parent.ndim): if self.bulk_view is not None and d - 1 < len(self.bulk_view): __view_index = self.bulk_view[d - 1] # don't add axis to shape if integer index if isinstance(__view_index, int): pass else: __shape.append(get_axis_size(self.parent.shape, d, __view_index)) else: __shape.append(get_axis_size(self.parent.shape, d)) return tuple(__shape) @property def ndim(self): """Number of dimension of the stream's data. If the requested slice has an integer along an axis, this dimension is collapsed, otherwise the number of dimension is the same as `parent`'s. Returns ------- int Number of dimension. """ __ndim = self.parent.ndim if self.frame_view is not None and isinstance(self.frame_view, int): __ndim -= 1 if self.bulk_view is not None: for a in self.bulk_view: if isinstance(a, int): __ndim -= 1 return __ndim
[docs] def load(self, index=None): """Load stream's data at the corresponding indices. Maps `index` to indices in `parent` and delegate loading. Parameters ---------- index : int or :obj:`list` of int or :obj:`slice` Indices of the data to load. Returns ------- :obj:`numpy.ndarray` Data at `index` in the stream. """ parent_indices = get_index_list(self.frame_view, self.parent.shape[0]) view_indices = get_index_list(index, self.shape[0]) indices = [parent_indices[i] for i in view_indices] return super().load(indices)
[docs] def process(self, data, indices): """Apply slicing on each frame of `data`. The slicing of the frame's axis is performed in :meth:`bob.io.stream.StreamView.load`, so that `data` only contains frames that are requested. It remains to apply the slicing along the other axis in `data`, which is delegarted to :meth:`~bob.io.stream.StreamView.process_frame` (by slicing into the numpy arrays). Here we only store the requested slice in full format (value along all axis). Parameters ---------- data : :obj:`numpy.ndarray` Data to slice. Slicing on the first axis is already performed. indices : int or :obj:`list` of int Indices of `data` in the stream. Returns ------- :obj:`numpy.ndarray` Sliced data. """ # generate full bulk view __bulk_view_full = [slice(None, None, None) for d in range(self.parent.ndim - 1)] if self.bulk_view is not None: for d in range(self.ndim - 1): if d < len(self.bulk_view): if self.bulk_view[d] is not None: # should be int or slice __bulk_view_full[d] = self.bulk_view[d] self.__bulk_view_full = tuple(__bulk_view_full) return super().process(data, indices) # super stacks the result of process_frame.
[docs] def process_frame(self, data, data_index, stream_index): """Apply the slicing on a frame of data. Apply the frame slicing computed in :meth:`~bob.io.stream.StreamView.process` on a frame. Parameters ---------- data : :obj:`numpy.ndarray` Frame of data. data_index : int Not used. Present for compatibility with other filters. stream_index : int Not used. Present for compatibility with other filters. Returns ------- :obj:`numpy.ndarray` Sliced data. """ return data[self.__bulk_view_full]
[docs]@stream_filter("save") class StreamSave(StreamFilter): """Filter to save frames of data to a :class:`~bob.io.stream.StreamFile`. Saving is performed by appending to the streamfile. Attributes ---------- file : :obj:`~bob.io.stream.StreamFile` StreamFile into which the data will be appended. """ def __init__(self, file, name, parent): """Initializes stream and register the output :obj:`~bob.io.stream.StreamFile`. Parameters ---------- file : :obj:`~bob.io.stream.StreamFile` Output file. name : str "save": identifier name to use this filter from the :obj:`~bob.io.stream.Stream` class. parent : :obj:`~bob.io.stream.Stream` or :obj:`~bob.io.stream.StreamFilter` Parent Stream(Filter). """ super().__init__(name=name, parent=parent) self.file = file if not isinstance(self.file, StreamFile): raise ValueError("Output file is not a valid StreamFile.")
[docs] def put(self, data, timestamp=None): """Pass data and timestamp to the :obj:`~bob.io.stream.StreamFile` to write to disk. Parameters ---------- data : :obj:`numpy.ndarray` data to write to file. timestamp : int or float Timestamp of `data`, by default None. """ self.file.put_frame(self.name, data, timestamp)
################################################################################ # General use filters
[docs]@stream_filter("astype") class StreamAsType(StreamFilter): """Filter to cast the data to a different numpy dtype. Attributes ---------- dtype : :obj:`numpy.dtype` The dtype to which to cast the data. """ def __init__(self, name, parent, dtype): """Set `dtype` and initializes super(). Parameters ---------- name : str "astyype": identifier name to use this filter from the :obj:`~bob.io.stream.Stream` class. parent : :obj:`~bob.io.stream.Stream` or :obj:`~bob.io.stream.StreamFilter` Parent Stream(Filter). dtype : :obj:`numpy.dtype` dtype to cast to. """ super().__init__(name=name, parent=parent) self.dtype = dtype
[docs] def process(self, data, indices): """Cast `data` to `dtype`. Parameters ---------- data : :obj:`numpy.ndarray` Data to cast. indices : int or :obj:`list` of int Not used. Present for compatibility with other filters. Returns ------- :obj:`numpy.ndarray` `data` casted to `dtype`. """ return data.astype(self.dtype)
[docs]@stream_filter("adjust") class StreamAdjust(StreamFilter): """Filter that allows to use 2 streams with different timestamps seamlessly by taking the closest time neighbors. Streams frames are not necessarily simultaneous: some streams may be delayed, some might have less frames... However the timestamps of each frames are available. Given the timestamps of the `parent` stream, this filter implements a nearest neighbor search in the timestamps of the `adjust_to` stream to load the closest frame. This stream emulates the `adjust_to` number of frames and timestamps to facilitate operations on streams. Attributes ---------- adjust_to : :obj:`~bob.io.stream.Stream` or :obj:`~bob.io.stream.StreamFilter` Stream relatively to which the timestamps will be adjusted. """ def __init__(self, adjust_to, name, parent): """Set super and register `adjust_to`. Parameters ---------- adjust_to : :obj:`~bob.io.stream.Stream` or :obj:`~bob.io.stream.StreamFilter` Stream to which `parent` is adjusted. name : str "adjust": identifier name to use this filter from the :obj:`~bob.io.stream.Stream` class. parent : :obj:`~bob.io.stream.Stream` or :obj:`~bob.io.stream.StreamFilter` Parent Stream(Filter). """ super().__init__(name=name, parent=parent) self.adjust_to = adjust_to
[docs] def set_source(self, src): """Set `self` and `adjust_to` sources to `src`. Parameters ---------- src : :obj:`~bob.io.stream.Stream` or :obj:`~bob.io.stream.StreamFile` Source Stream or StreamFile. """ super().set_source(src) self.adjust_to.set_source(src)
@property def shape(self): """Stream's data shape. The number of frames is equal to `adjust_to`. Returns ------- :obj:`tuple` of int Shape of the Stream's data. """ return (self.adjust_to.shape[0], self.parent.shape[1], self.parent.shape[2], self.parent.shape[3]) @property def timestamps(self): """Stream's timestamps, equal to `adjust_to` after adjustment. Returns ------- :obj:`numpy.ndarray` Timestamps of the frames in the stream. """ return self.adjust_to.timestamps
[docs] def load(self, index): """Load frame(s) at index. `index` is the index of a frame in `adjust_to`. The closest frame in `self` is found using nearest neighbor search, then the data is loaded. Parameters ---------- index : int or :obj:`list` of int or slice Indices of the frames to load. Returns ------- :obj:`numpy.ndarray` Stream's data at `index`. """ # original stream indices old_indices = get_index_list(index, self.shape[0]) selected_timestamps = [self.adjust_to.timestamps[i] for i in old_indices] kdtree = cKDTree(self.parent.timestamps[:, np.newaxis]) def get_index(val, kdtree): _, i = kdtree.query(val, k=1) return i new_indices = [get_index(ts[np.newaxis], kdtree) for ts in selected_timestamps] return super().load(new_indices)