import logging
import os
import random
import string
from functools import partial
import cloudpickle
import dask
import h5py
import numpy as np
import xarray as xr
from sklearn.base import BaseEstimator
from sklearn.pipeline import _name_estimators
from sklearn.utils.metaestimators import _BaseComposition
from .sample import SAMPLE_DATA_ATTRS, _ReprMixin
from .wrappers import estimator_requires_fit
logger = logging.getLogger(__name__)
[docs]def save(data, path):
array = np.require(data, requirements=("C_CONTIGUOUS", "ALIGNED"))
with h5py.File(path, "w") as f:
f.create_dataset("array", data=array)
[docs]def load(path):
with h5py.File(path, "r") as f:
data = f["array"][()]
return data
def _load_fn_to_xarray(load_fn, meta=None):
if meta is None:
meta = np.array(load_fn())
da = dask.array.from_delayed(
dask.delayed(load_fn)(), meta.shape, dtype=meta.dtype, name=False
)
try:
dims = meta.dims
except Exception:
dims = None
xa = xr.DataArray(da, dims=dims)
return xa, meta
def _one_sample_to_dataset(sample, meta=None):
dataset = {}
delayed_attributes = getattr(sample, "_delayed_attributes", None) or {}
for k in sample.__dict__:
if (
k in SAMPLE_DATA_ATTRS
or k in delayed_attributes
or k.startswith("_")
):
continue
dataset[k] = getattr(sample, k)
meta = meta or {}
for k in ["data"] + list(delayed_attributes.keys()):
attr_meta = meta.get(k)
attr_array, attr_meta = _load_fn_to_xarray(
partial(getattr, sample, k), meta=attr_meta
)
meta[k] = attr_meta
dataset[k] = attr_array
return xr.Dataset(dataset).chunk(), meta
[docs]def samples_to_dataset(samples, meta=None, npartitions=48, shuffle=False):
"""Converts a list of samples to a dataset.
See :ref:`bob.pipelines.dataset_pipeline`.
Parameters
----------
samples : list
A list of :any:`Sample` or :any:`DelayedSample` objects.
meta : ``xarray.DataArray``, optional
An xarray.DataArray to be used as a template for data inside samples.
npartitions : :obj:`int`, optional
The number of partitions to partition the samples.
shuffle : :obj:`bool`, optional
If True, shuffles the samples (in-place) before constructing the dataset.
Returns
-------
``xarray.Dataset``
The constructed dataset with at least a ``data`` variable.
"""
if meta is not None and not isinstance(meta, dict):
meta = dict(data=meta)
delayed_attributes = getattr(samples[0], "delayed_attributes", None) or {}
if meta is None or not all(
k in meta for k in ["data"] + list(delayed_attributes.keys())
):
dataset, meta = _one_sample_to_dataset(samples[0])
if shuffle:
random.shuffle(samples)
dataset = xr.concat(
[_one_sample_to_dataset(s, meta=meta)[0] for s in samples], dim="sample"
)
if npartitions is not None:
dataset = dataset.chunk({"sample": max(1, len(samples) // npartitions)})
return dataset
[docs]class Block(_ReprMixin):
"""A block representation in a graph.
This class is meant to be used with :any:`DatasetPipeline`.
Attributes
----------
dataset_map : ``callable``
A callable that transforms the input dataset into another dataset.
estimator : object
A scikit-learn estimator
estimator_name : str
Name of the estimator
extension : str
The extension of checkpointed features.
features_dir : str
The directory to save the features.
fit_input : str or list
A str or list of str of column names of the dataset to be given to the ``.fit``
method.
fit_kwargs : None or dict
A dict of ``fit_kwargs`` to be passed to the ``.fit`` method of the estimator.
input_dask_array : bool
Whether the estimator takes dask arrays in its fit method or not.
load_func : ``callable``
A function to save the features. Defaults to ``np.load``.
model_path : str or None
If given, the estimator will be pickled here.
output_dims : list
A list of ``(dim_name, dim_size)`` tuples. If ``dim_name`` is ``None``, a new
name is automatically generated, otherwise it should be a string. ``dim_size``
should be a positive integer or nan for new dimensions or ``None`` for existing
dimensions.
output_dtype : object
The dtype of the output of the transformer. Defaults to ``float``.
save_func : ``callable``
A function to save the features. Defaults to ``np.save`` with ``allow_pickle``
set to ``False``.
transform_input : str or list
A str or list of str of column names of the dataset to be given to the
``.transform`` method.
"""
def __init__(
self,
estimator=None,
output_dtype=float,
output_dims=((None, np.nan),),
fit_input="data",
transform_input="data",
estimator_name=None,
model_path=None,
features_dir=None,
extension=".hdf5",
save_func=None,
load_func=None,
dataset_map=None,
input_dask_array=False,
fit_kwargs=None,
**kwargs,
):
super().__init__(**kwargs)
self.estimator = estimator
self.output_dtype = output_dtype
if not all(len(d) == 2 for d in output_dims):
raise ValueError(
"output_dims must be an iterable of size 2 tuples "
f"(dim_name, dim_size), not {output_dims}"
)
self.output_dims = output_dims
self.fit_input = fit_input
self.transform_input = transform_input
if estimator_name is None:
estimator_name = _name_estimators([estimator])[0][0]
self.estimator_name = estimator_name
self.model_path = model_path
self.features_dir = features_dir
self.extension = extension
estimator_save_fn = (
None
if estimator is None
else estimator._get_tags().get("bob_features_save_fn")
)
estimator_load_fn = (
None
if estimator is None
else estimator._get_tags().get("bob_features_load_fn")
)
self.save_func = save_func or estimator_save_fn or save
self.load_func = load_func or estimator_load_fn or load
self.dataset_map = dataset_map
self.input_dask_array = input_dask_array
self.fit_kwargs = fit_kwargs or {}
def __getitem__(self, key):
return getattr(self, key)
def __setitem__(self, key, value):
setattr(self, key, value)
@property
def output_ndim(self):
return len(self.output_dims) + 1
[docs] def make_path(self, key):
key = str(key)
if key.startswith(os.sep) or ".." in key:
raise ValueError(
"Sample.key values should be relative paths with no "
f"reference to upper folders. Got: {key}"
)
return os.path.join(self.features_dir, key + self.extension)
[docs] def save(self, key, data):
path = self.make_path(key)
os.makedirs(os.path.dirname(path), exist_ok=True)
# this should be save_func(data, path) so it's compatible with bob.io.base.save
return self.save_func(data, path)
[docs] def load(self, key):
path = self.make_path(key)
return self.load_func(path)
def _fit(*args, block):
logger.info(f"Calling {block.estimator_name}.fit")
block.estimator.fit(*args, **block.fit_kwargs)
if block.model_path is not None:
logger.info(f"Saving {block.estimator_name} in {block.model_path}")
os.makedirs(os.path.dirname(block.model_path), exist_ok=True)
with open(block.model_path, "wb") as f:
cloudpickle.dump(block.estimator, f)
return block.estimator
class _TokenStableTransform:
def __init__(self, block, method_name=None, input_has_keys=False, **kwargs):
super().__init__(**kwargs)
self.block = block
self.method_name = method_name or "transform"
self.input_has_keys = input_has_keys
def __dask_tokenize__(self):
return (self.method_name, self.block.features_dir)
def __call__(self, *args, estimator):
block, method_name = self.block, self.method_name
logger.info(f"Calling {block.estimator_name}.{method_name}")
input_args = args[:-1] if self.input_has_keys else args
try:
features = getattr(estimator, self.method_name)(*input_args)
except Exception as e:
raise RuntimeError(
f"Failed to transform data: {estimator}.{self.method_name}(*{input_args})"
) from e
# if keys are provided, checkpoint features
if self.input_has_keys:
data = args[0]
key = args[-1]
l1, l2 = len(data), len(features)
if l1 != l2:
raise ValueError(
f"Got {l2} features from processing {l1} samples!"
)
# save computed_features
logger.info(f"Saving {l2} features in {block.features_dir}")
for feat, k in zip(features, key):
block.save(k, feat)
return features
def _populate_graph(graph):
new_graph = []
for block in graph:
if isinstance(block, BaseEstimator):
block = {"estimator": block}
if isinstance(block, dict):
block = Block(**block)
new_graph.append(block)
return new_graph
def _get_dask_args_from_ds(ds, columns):
if isinstance(columns, str):
args = [(ds[columns].data, ds[columns].dims)]
else:
args = []
for c in columns:
args.extend(_get_dask_args_from_ds(ds, c))
args = tuple(args)
return args
def _blockwise_with_block_args(args, block, method_name=None):
meta = []
for _ in range(1, block.output_ndim):
meta = [meta]
meta = np.array(meta, dtype=block.output_dtype)
ascii_letters = list(string.ascii_lowercase)
dim_map = {}
input_arg_pairs = []
for array, dims in args:
dim_name = []
for dim, dim_size in zip(dims, array.shape):
if dim not in dim_map:
dim_map[dim] = (ascii_letters.pop(0), dim_size)
dim_name.append(dim_map[dim][0])
input_arg_pairs.extend((array, "".join(dim_name)))
# the sample dimension is always kept the same
output_dim_name = f"{input_arg_pairs[1][0]}"
new_axes = dict()
for dim_name, dim_size in block.output_dims:
if dim_name in dim_map:
output_dim_name += dim_map[dim_name][0]
else:
try:
dim_size = float(dim_size)
except Exception:
raise ValueError(
"Expected a float dim_size (positive integers or nan) for new "
f"dimension: {dim_name} but got: {dim_size}"
)
new_letter = ascii_letters.pop(0)
if dim_name is None:
dim_name = new_letter
dim_map[dim_name] = (new_letter, dim_size)
output_dim_name += new_letter
new_axes[new_letter] = dim_size
dims = []
inv_map = {v[0]: k for k, v in dim_map.items()}
for dim_name in output_dim_name:
dims.append(inv_map[dim_name])
output_shape = [dim_map[d][1] for d in dims]
return output_dim_name, new_axes, input_arg_pairs, dims, meta, output_shape
def _blockwise_with_block(args, block, method_name=None, input_has_keys=False):
(
output_dim_name,
new_axes,
input_arg_pairs,
dims,
meta,
_,
) = _blockwise_with_block_args(args, block, method_name=None)
transform_func = _TokenStableTransform(
block, method_name, input_has_keys=input_has_keys
)
transform_func.__name__ = f"{block.estimator_name}.{method_name}"
data = dask.array.blockwise(
transform_func,
output_dim_name,
*input_arg_pairs,
meta=meta,
new_axes=new_axes,
concatenate=True,
estimator=block.estimator_,
)
return dims, data
def _load_estimator(block):
logger.info(f"Loading {block.estimator_name} from {block.model_path}")
with open(block.model_path, "rb") as f:
block.estimator = cloudpickle.load(f)
return block.estimator
def _transform_or_load(block, ds, input_columns, mn):
if isinstance(input_columns, str):
input_columns = [input_columns]
input_columns = list(input_columns) + ["key"]
# filter dataset based on existing checkpoints
key = np.asarray(ds["key"])
paths = [block.make_path(k) for k in key]
saved_samples = np.asarray([os.path.isfile(p) for p in paths])
# compute/load features per chunk
chunksize = ds.data.data.chunksize[0]
for i in range(0, len(saved_samples), chunksize):
if not np.all(saved_samples[i : i + chunksize]):
saved_samples[i : i + chunksize] = False
nonsaved_samples = np.logical_not(saved_samples)
total_samples_n, saved_samples_n = len(key), saved_samples.sum()
saved_ds = ds.sel({"sample": saved_samples})
nonsaved_ds = ds.sel({"sample": nonsaved_samples})
computed_data = loaded_data = None
# compute non-saved data
if total_samples_n - saved_samples_n > 0:
args = _get_dask_args_from_ds(nonsaved_ds, input_columns)
dims, computed_data = _blockwise_with_block(
args, block, mn, input_has_keys=True
)
# load saved data
if saved_samples_n > 0:
logger.info(
f"Might load {saved_samples_n} features of {block.estimator_name}.{mn} from disk."
)
args = _get_dask_args_from_ds(saved_ds, input_columns)
dims, meta, shape = _blockwise_with_block_args(args, block, mn)[-3:]
loaded_data = [
dask.array.from_delayed(
dask.delayed(block.load)(k),
shape=shape[1:],
meta=meta,
name=False,
)[None, ...]
for k in key[saved_samples]
]
loaded_data = dask.array.concatenate(loaded_data, axis=0)
# merge loaded and computed data
if computed_data is None:
data = loaded_data
elif loaded_data is None:
data = computed_data
else:
# merge data chunk-based
data = []
i, j = 0, 0
for k in range(0, len(saved_samples), chunksize):
saved = saved_samples[k]
if saved:
pick = loaded_data[j : j + chunksize]
j += chunksize
else:
pick = computed_data[i : i + chunksize]
i += chunksize
data.append(pick)
data = dask.array.concatenate(data, axis=0)
data = dask.array.rechunk(data, {0: chunksize})
return dims, data
[docs]class DatasetPipeline(_BaseComposition):
"""A dataset-based scikit-learn pipeline.
See :ref:`bob.pipelines.dataset_pipeline`.
Attributes
----------
graph : list
A list of :any:`Block`'s to be applied on input dataset.
"""
def __init__(self, graph, **kwargs):
super().__init__(**kwargs)
self.graph = _populate_graph(graph)
def _transform(self, ds, do_fit=False, method_name=None):
for i, block in enumerate(self.graph):
if block.dataset_map is not None:
try:
ds = block.dataset_map(ds)
except Exception as e:
raise RuntimeError(
f"Could not map ds {ds}\n with {block.dataset_map}"
) from e
continue
if do_fit:
args = _get_dask_args_from_ds(ds, block.fit_input)
args = [d for d, dims in args]
estimator = block.estimator
if not estimator_requires_fit(estimator):
block.estimator_ = estimator
elif block.model_path is not None and os.path.isfile(
block.model_path
):
_load_estimator.__name__ = f"load_{block.estimator_name}"
block.estimator_ = dask.delayed(_load_estimator)(block)
elif block.input_dask_array:
ds = ds.persist()
args = _get_dask_args_from_ds(ds, block.fit_input)
args = [d for d, dims in args]
block.estimator_ = _fit(*args, block=block)
else:
_fit.__name__ = f"{block.estimator_name}.fit"
block.estimator_ = dask.delayed(_fit)(
*args,
block=block,
)
mn = "transform"
if i == len(self.graph) - 1:
if do_fit:
break
mn = method_name
if block.features_dir is None:
args = _get_dask_args_from_ds(ds, block.transform_input)
dims, data = _blockwise_with_block(
args, block, mn, input_has_keys=False
)
else:
dims, data = _transform_or_load(
block, ds, block.transform_input, mn
)
# replace data inside dataset
ds = ds.copy(deep=False)
del ds["data"]
persisted = False
if not np.all(np.isfinite(data.shape)):
block.estimator_, data = dask.persist(block.estimator_, data)
data = data.compute_chunk_sizes()
persisted = True
ds["data"] = (dims, data)
if persisted:
ds = ds.persist()
return ds
[docs] def fit(self, ds, y=None):
if y is not None:
raise ValueError()
self._transform(ds, do_fit=True)
return self
[docs] def decision_function(self, ds):
return self._transform(ds, method_name="decision_function")
[docs] def predict(self, ds):
return self._transform(ds, method_name="predict")
[docs] def predict_proba(self, ds):
return self._transform(ds, method_name="predict_proba")
[docs] def score(self, ds):
return self._transform(ds, method_name="score")