Python API for bob.pipelines¶
Main module¶
- class bob.pipelines.BaseWrapper¶
Bases:
sklearn.base.MetaEstimatorMixin
,sklearn.base.BaseEstimator
The base class for all wrappers.
- class bob.pipelines.CheckpointWrapper(estimator, model_path=None, features_dir=None, extension='.h5', save_func=None, load_func=None, sample_attribute='data', hash_fn=None, attempts=10, **kwargs)¶
Bases:
bob.pipelines.BaseWrapper
,sklearn.base.TransformerMixin
Wraps
Sample
-based estimators so the results are saved in disk.- Parameters
estimator – The scikit-learn estimator to be wrapped.
model_path (str) – Saves the estimator state in this directory if the estimator is stateful
features_dir (str) – Saves the transformed data in this directory
extension (str) – Default extension of the transformed features
save_func – Pointer to a customized function that saves transformed features to disk
load_func – Pointer to a customized function that loads transformed features from disk
sample_attribute (str) – Defines the payload attribute of the sample (Defaul: data)
hash_fn – Pointer to a hash function. This hash function maps sample.key to a hash code and this hash code corresponds a relative directory where a single sample will be checkpointed. This is useful when is desirable file directories with less than a certain number of files.
attempts – Number of checkpoint attempts. Sometimes, because of network/disk issues files can’t be saved. This argument sets the maximum number of attempts to checkpoint a sample.
- class bob.pipelines.DaskWrapper(estimator, fit_tag=None, transform_tag=None, **kwargs)¶
Bases:
bob.pipelines.BaseWrapper
,sklearn.base.TransformerMixin
Wraps Scikit estimators to handle Dask Bags as input.
- Parameters
fit_resource (str) – Mark the delayed(self.fit) with this value. This can be used in a future delayed(self.fit).compute(resources=resource_tape) so dask scheduler can place this task in a particular resource (e.g GPU)
transform_resource (str) – Mark the delayed(self.transform) with this value. This can be used in a future delayed(self.transform).compute(resources=resource_tape) so dask scheduler can place this task in a particular resource (e.g GPU)
- class bob.pipelines.DelayedSample(load, parent=None, delayed_attributes=None, **kwargs)¶
Bases:
bob.pipelines.Sample
Representation of sample that can be loaded via a callable.
The optional
**kwargs
argument allows you to attach more attributes to this sample instance.- Parameters
load – A python function that can be called parameterlessly, to load the sample in question from whatever medium
parent (
DelayedSample
,Sample
, None) – If passed, consider this as a parent of this sample, to copy informationdelayed_attributes (dict or None) – A dictionary of name : load_fn pairs that will be used to create attributes of name : load_fn() in this class. Use this to option to create more delayed attributes than just
sample.data
.kwargs (dict) – Further attributes of this sample, to be stored and eventually transmitted to transformed versions of the sample
- property data¶
Loads the data from the disk file.
- classmethod from_sample(sample: bob.pipelines.Sample, **kwargs)[source]¶
Creates a DelayedSample from another DelayedSample or a Sample. If the sample is a DelayedSample, its data will not be loaded.
- Parameters
sample (
Sample
) – The sample to convert to a DelayedSample
- class bob.pipelines.DelayedSampleSet(load, parent=None, **kwargs)¶
Bases:
bob.pipelines.SampleSet
A set of samples with extra attributes
- property samples¶
- class bob.pipelines.DelayedSampleSetCached(load, parent=None, **kwargs)¶
Bases:
bob.pipelines.DelayedSampleSet
A cached version of DelayedSampleSet
- property samples¶
- class bob.pipelines.DelayedSamplesCall(func, func_name, samples, sample_attribute='data', **kwargs)¶
Bases:
object
- class bob.pipelines.Sample(data, parent=None, **kwargs)¶
Bases:
bob.pipelines.sample._ReprMixin
Representation of sample. A Sample is a simple container that wraps a data-point (see Samples, a way to enhance scikit pipelines with metadata)
Each sample must have the following attributes:
attribute
data
: Contains the data for this sample
- class bob.pipelines.SampleBatch(samples, sample_attribute='data')¶
Bases:
collections.abc.Sequence
,bob.pipelines.sample._ReprMixin
A batch of samples that looks like [s.data for s in samples]
However, when you call np.array(SampleBatch), it will construct a numpy array from sample.data attributes in a memory efficient way.
- class bob.pipelines.SampleSet(samples, parent=None, **kwargs)¶
Bases:
collections.abc.MutableSequence
,bob.pipelines.sample._ReprMixin
A set of samples with extra attributes
- class bob.pipelines.SampleWrapper(estimator, transform_extra_arguments=None, fit_extra_arguments=None, output_attribute='data', input_attribute='data', **kwargs)¶
Bases:
bob.pipelines.BaseWrapper
,sklearn.base.TransformerMixin
Wraps scikit-learn estimators to work with
Sample
-based pipelines.Do not use this class except for scikit-learn estimators.
- estimator¶
The scikit-learn estimator that is wrapped.
- fit_extra_arguments¶
Use this option if you want to pass extra arguments to the fit method of the mixed instance. The format is a list of two value tuples. The first value in tuples is the name of the argument that fit accepts, like
y
, and the second value is the name of the attribute that samples carry. For example, if you are passing samples to the fit method and want to passsubject
attributes of samples as they
argument to the fit method, you can provide[("y", "subject")]
as the value for this attribute.- Type
[tuple]
- output_attribute¶
The name of a Sample attribute where the output of the estimator will be saved to [Default is
data
]. For example, ifoutput_attribute
is"annotations"
, thensample.annotations
will contain the output of the estimator.- Type
- class bob.pipelines.ToDaskBag(npartitions=None, partition_size=None, **kwargs)¶
Bases:
sklearn.base.TransformerMixin
,sklearn.base.BaseEstimator
Transform an arbitrary iterator into a
dask.bag.Bag
Example
>>> import bob.pipelines as mario >>> transformer = mario.ToDaskBag() >>> dask_bag = transformer.transform([1,2,3]) >>> # dask_bag.map_partitions(...)
- npartitions¶
Number of partitions used in
dask.bag.from_sequence
- Type
- bob.pipelines.dask_tags(estimator)[source]¶
Recursively collects resource_tags in dasked estimators.
- bob.pipelines.wrap(bases, estimator=None, **kwargs)[source]¶
Wraps several estimators inside each other.
- Parameters
- Returns
The wrapped estimator
- Return type
- Raises
ValueError – If not all kwargs are consumed.
Heterogeneous SGE¶
- class bob.pipelines.distributed.sge.SGEIdiapJob(*args, queue=None, project=None, resource_spec=None, job_extra=None, config_name='sge', **kwargs)[source]¶
Bases:
dask_jobqueue.core.Job
Launches a SGE Job in the IDIAP cluster. This class basically encodes the CLI command that bootstrap the worker in a SGE job. Check here https://distributed.dask.org/en/latest/resources.html#worker-resources for more information.
- ..note: This is class is temporary. It’s basically a copy from SGEJob from dask_jobqueue.
The difference is that here I’m also handling the dask job resources tag (which is not handled anywhere). This has to be patched in the Job class. Please follow here https://github.com/dask/dask-jobqueue/issues/378 to get news about this patch
- submit_command = 'qsub'¶
- cancel_command = 'qdel'¶
- config_name = 'SGEIdiapJob'¶
- bob.pipelines.distributed.sge.get_max_jobs(queue_dict)[source]¶
Given a queue list, get the max number of possible jobs.
- bob.pipelines.distributed.sge.get_resource_requirements(pipeline)[source]¶
Get the resource requirements to execute a graph. This is useful when it’s necessary get the dictionary mapping the dask delayed keys with specific resource restrictions. Check https://distributed.dask.org/en/latest/resources.html#resources-with-collections for more information
- Parameters
pipeline (
sklearn.pipeline.Pipeline
) – Asklearn.pipeline.Pipeline
wrapper withbob.pipelines.DaskWrapper
Example
>>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) >>> client = Client(cluster) >>> from bob.pipelines.sge import get_resource_requirements >>> resources = get_resource_requirements(pipeline) >>> my_delayed_task.compute(scheduler=client, resources=resources)
- class bob.pipelines.distributed.sge.SGEMultipleQueuesCluster(log_directory='./logs', protocol='tcp://', dashboard_address=':8787', env_extra=None, sge_job_spec={'default': {'io_big': False, 'max_jobs': 128, 'memory': '8GB', 'queue': 'q_1day', 'resource_spec': '', 'resources': {'default': 1}}, 'q_1week': {'io_big': True, 'max_jobs': 24, 'memory': '4GB', 'queue': 'q_1week', 'resource_spec': '', 'resources': {'q_1week': 1}}, 'q_gpu': {'io_big': False, 'memory': '30GB', 'queue': 'q_gpu', 'resource_spec': '', 'resources': {'q_gpu': 1}}, 'q_long_gpu': {'io_big': False, 'memory': '30GB', 'queue': 'q_long_gpu', 'resource_spec': '', 'resources': {'q_long_gpu': 1}}, 'q_short_gpu': {'io_big': False, 'max_jobs': 45, 'memory': '30GB', 'queue': 'q_short_gpu', 'resource_spec': '', 'resources': {'q_short_gpu': 1}}}, min_jobs=1, project=None, **kwargs)[source]¶
Bases:
dask_jobqueue.core.JobQueueCluster
Launch Dask jobs in the SGE cluster allowing the request of multiple queues.
- Parameters
log_directory (str) –
Default directory for the SGE logs
- protocol: str
Scheduler communication protocol
- dashboard_address: str
Default port for the dask dashboard,
- env_extra: str,
Extra environment variables to send to the workers
- sge_job_spec: dict
Dictionary containing a minimum specification for the qsub command. It consists of:
queue: SGE queue memory: Memory requirement in GB (e.g. 4GB) io_bio: set the io_big flag resource_spec: Whatever extra argument to be sent to qsub (qsub -l) tag: Mark this worker with an specific tag so dask scheduler can place specific tasks to it (https://distributed.dask.org/en/latest/resources.html) max_jobs: Maximum number of jobs in the queue
- min_jobs: int
Lower bound for the number of jobs for self.adapt
Example
Below follow a vanilla-example that will create a set of jobs on all.q:
>>> from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster >>> from dask.distributed import Client >>> cluster = SGEMultipleQueuesCluster() >>> cluster.scale_up(10) >>> client = Client(cluster)
It’s possible to demand a resource specification yourself:
>>> Q_1DAY_IO_BIG_SPEC = { ... "default": { ... "queue": "q_1day", ... "memory": "8GB", ... "io_big": True, ... "resource_spec": "", ... "resources": "", ... } ... } >>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_IO_BIG_SPEC) >>> cluster.scale_up(10) >>> client = Client(cluster)
More than one jon spec can be set:
>>> Q_1DAY_GPU_SPEC = { ... "default": { ... "queue": "q_1day", ... "memory": "8GB", ... "io_big": True, ... "resource_spec": "", ... "resources": "", ... }, ... "gpu": { ... "queue": "q_gpu", ... "memory": "12GB", ... "io_big": False, ... "resource_spec": "", ... "resources": {"GPU":1}, ... }, ... } >>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) >>> cluster.scale_up(10) >>> cluster.scale_up(1, sge_job_spec_key="gpu") >>> client = Client(cluster)
Adaptive job allocation can also be used via AdaptiveIdiap extension:
>>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) >>> cluster.adapt(Adaptive=AdaptiveIdiap,minimum=2, maximum=10) >>> client = Client(cluster)
- scale_up(n_jobs, sge_job_spec_key=None)[source]¶
Scale cluster up.
This is supposed to be used by the scheduler while dynamically allocating resources
- async scale_down(workers, sge_job_spec_key=None)[source]¶
Scale cluster down.
This is supposed to be used by the scheduler while dynamically allocating resources
- adapt(*args, **kwargs)[source]¶
Scale Dask cluster automatically based on scheduler activity.
- Parameters
minimum (int) – Minimum number of workers to keep around for the cluster
maximum (int) – Maximum number of workers to keep around for the cluster
minimum_memory (str) – Minimum amount of memory for the cluster
maximum_memory (str) – Maximum amount of memory for the cluster
minimum_jobs (int) – Minimum number of jobs
maximum_jobs (int) – Maximum number of jobs
**kwargs – Extra parameters to pass to dask.distributed.Adaptive
See also
dask.distributed.Adaptive
for more keyword arguments
- class bob.pipelines.distributed.sge.AdaptiveMultipleQueue(cluster=None, interval=None, minimum=None, maximum=None, wait_count=None, target_duration=None, worker_key=None, **kwargs)[source]¶
Bases:
distributed.deploy.adaptive.Adaptive
Custom mechanism to adaptively allocate workers based on scheduler load.
This custom implementation extends the Adaptive.recommendations by looking at the distributed.scheduler.TaskState.resource_restrictions.
The heuristics is:
Note
If a certain task has the status no-worker and it has resource_restrictions, the scheduler should request a job matching those resource restrictions
- class bob.pipelines.distributed.sge.SchedulerResourceRestriction(*args, **kwargs)[source]¶
Bases:
distributed.scheduler.Scheduler
Idiap extended distributed scheduler.
This scheduler extends Scheduler by just adding a handler that fetches, at every scheduler cycle, the resource restrictions of a task that has status no-worker
Transformers¶
- bob.pipelines.transformers.CheckpointSampleFunctionTransformer(**kwargs)¶
Class that transforms Scikit learn FunctionTransformer (https://scikit-l earn.org/stable/modules/generated/sklearn.preprocessing.FunctionTransformer .html) work with
Sample
-based pipelines.Furthermore, it makes it checkpointable
- bob.pipelines.transformers.CheckpointSampleLinearize(**kwargs)¶
- bob.pipelines.transformers.CheckpointSamplePCA(**kwargs)¶
Enables SAMPLE and CHECKPOINTIN handling for
sklearn.decomposition.PCA
- bob.pipelines.transformers.FileLoader(original_directory, original_extension=None, **kwargs)¶
- class bob.pipelines.transformers.Linearize(**kwargs)¶
Bases:
sklearn.preprocessing._function_transformer.FunctionTransformer
Extracts features by simply concatenating all elements of the data into one long vector.
- bob.pipelines.transformers.SampleFunctionTransformer(**kwargs)¶
Class that transforms Scikit learn FunctionTransformer (https://scikit-l earn.org/stable/modules/generated/sklearn.preprocessing.FunctionTransformer .html) work with
Sample
-based pipelines.
- bob.pipelines.transformers.SampleLinearize(**kwargs)¶
- bob.pipelines.transformers.SamplePCA(**kwargs)¶
Enables SAMPLE handling for
sklearn.decomposition.PCA
- class bob.pipelines.transformers.StatelessPipeline(steps, *, memory=None, verbose=False)¶
Bases:
sklearn.pipeline.Pipeline
- steps: List[Any]¶
- bob.pipelines.transformers.Str_To_Types(fieldtypes)[source]¶
Converts str fields in samples to a different type
- Parameters
fieldtypes (dict) – A dict that specifies the functions to be used to convert strings to other types.
- Returns
A scikit-learn transformer that does the conversion.
- Return type
Example
>>> from bob.pipelines import Sample >>> from bob.pipelines.transformers import Str_To_Types, str_to_bool >>> samples = [Sample(None, id="1", flag="True"), Sample(None, id="2", flag="False")] >>> transformer = Str_To_Types(fieldtypes=dict(id=int, flag=str_to_bool)) >>> transformer.transform(samples) [Sample(data=None, id=1, flag=True), Sample(data=None, id=2, flag=False)]
xarray Wrapper¶
- bob.pipelines.xarray.samples_to_dataset(samples, meta=None, npartitions=48, shuffle=False)[source]¶
Converts a list of samples to a dataset.
See Efficient pipelines with dask and xarray.
- Parameters
samples (list) – A list of
Sample
orDelayedSample
objects.meta (
xarray.DataArray
, optional) – An xarray.DataArray to be used as a template for data inside samples.npartitions (
int
, optional) – The number of partitions to partition the samples.shuffle (
bool
, optional) – If True, shuffles the samples (in-place) before constructing the dataset.
- Returns
The constructed dataset with at least a
data
variable.- Return type
xarray.Dataset
- class bob.pipelines.xarray.Block(estimator=None, output_dtype=<class 'float'>, output_dims=((None, nan), ), fit_input='data', transform_input='data', estimator_name=None, model_path=None, features_dir=None, extension='.hdf5', save_func=None, load_func=None, dataset_map=None, input_dask_array=False, fit_kwargs=None, **kwargs)[source]¶
Bases:
bob.pipelines.sample._ReprMixin
A block representation in a graph. This class is meant to be used with
DatasetPipeline
.- dataset_map¶
A callable that transforms the input dataset into another dataset.
- Type
callable
- fit_input¶
A str or list of str of column names of the dataset to be given to the
.fit
method.
- fit_kwargs¶
A dict of
fit_kwargs
to be passed to the.fit
method of the estimator.
- load_func¶
A function to save the features. Defaults to
np.load
.- Type
callable
- output_dims¶
A list of
(dim_name, dim_size)
tuples. Ifdim_name
isNone
, a new name is automatically generated, otherwise it should be a string.dim_size
should be a positive integer or nan for new dimensions orNone
for existing dimensions.- Type
- save_func¶
A function to save the features. Defaults to
np.save
withallow_pickle
set toFalse
.- Type
callable
- transform_input¶
A str or list of str of column names of the dataset to be given to the
.transform
method.
- property output_ndim¶
- class bob.pipelines.xarray.DatasetPipeline(graph, **kwargs)[source]¶
Bases:
sklearn.utils.metaestimators._BaseComposition
A dataset-based scikit-learn pipeline. See Efficient pipelines with dask and xarray.
- steps: List[Any]¶
Filelist Datasets¶
The principles of this module are:
one csv file -> one set
one row -> one sample
csv files could exist in a tarball or inside a folder
scikit-learn transformers are used to further transform samples
several csv files (sets) compose a protocol
several protocols compose a database
- class bob.pipelines.datasets.FileListToSamples(list_file, transformer=None, **kwargs)[source]¶
Bases:
collections.abc.Iterable
Converts a list of files to a list of samples.
- class bob.pipelines.datasets.CSVToSamples(list_file, transformer=None, fieldnames=None, dict_reader_kwargs=None, **kwargs)[source]¶
Bases:
bob.pipelines.datasets.FileListToSamples
Converts a csv file to a list of samples
- property rows¶
- class bob.pipelines.datasets.FileListDatabase(dataset_protocols_path, protocol, reader_cls=<class 'bob.pipelines.datasets.CSVToSamples'>, transformer=None, **kwargs)[source]¶
Bases:
object
A generic database interface. Use this class to convert csv files to a database that outputs samples. The format is simple, the files must be inside a folder (or a compressed tarball) with the following format:
dataset_protocols_path/<protocol>/<group>.csv
The top folders are the name of the protocols (if you only have one, you may name it
default
). Inside each protocol folder, there are <group>.csv files where the name of the file specifies the name of the group. We recommend using the namestrain
,dev
,eval
for your typical training, development, and test sets.- property protocol¶
- property transformer¶