Python API for bob.pipelines

Main module

class bob.pipelines.BaseWrapper

Bases: sklearn.base.MetaEstimatorMixin, sklearn.base.BaseEstimator

The base class for all wrappers.

class bob.pipelines.CheckpointWrapper(estimator, model_path=None, features_dir=None, extension='.h5', save_func=None, load_func=None, sample_attribute='data', hash_fn=None, attempts=10, **kwargs)

Bases: bob.pipelines.BaseWrapper, sklearn.base.TransformerMixin

Wraps Sample-based estimators so the results are saved in disk.

Parameters
  • estimator – The scikit-learn estimator to be wrapped.

  • model_path (str) – Saves the estimator state in this directory if the estimator is stateful

  • features_dir (str) – Saves the transformed data in this directory

  • extension (str) – Default extension of the transformed features

  • save_func – Pointer to a customized function that saves transformed features to disk

  • load_func – Pointer to a customized function that loads transformed features from disk

  • sample_attribute (str) – Defines the payload attribute of the sample (Defaul: data)

  • hash_fn – Pointer to a hash function. This hash function maps sample.key to a hash code and this hash code corresponds a relative directory where a single sample will be checkpointed. This is useful when is desirable file directories with less than a certain number of files.

  • attempts – Number of checkpoint attempts. Sometimes, because of network/disk issues files can’t be saved. This argument sets the maximum number of attempts to checkpoint a sample.

decision_function(samples)[source]
fit(samples, y=None)[source]
load(sample, path)[source]
load_model()[source]
make_path(sample)[source]
predict(samples)[source]
predict_proba(samples)[source]
save(sample)[source]
save_model()[source]
score(samples)[source]
transform(samples)[source]
class bob.pipelines.DaskWrapper(estimator, fit_tag=None, transform_tag=None, **kwargs)

Bases: bob.pipelines.BaseWrapper, sklearn.base.TransformerMixin

Wraps Scikit estimators to handle Dask Bags as input.

Parameters
  • fit_resource (str) – Mark the delayed(self.fit) with this value. This can be used in a future delayed(self.fit).compute(resources=resource_tape) so dask scheduler can place this task in a particular resource (e.g GPU)

  • transform_resource (str) – Mark the delayed(self.transform) with this value. This can be used in a future delayed(self.transform).compute(resources=resource_tape) so dask scheduler can place this task in a particular resource (e.g GPU)

decision_function(samples)[source]
fit(X, y=None, **fit_params)[source]
predict(samples)[source]
predict_proba(samples)[source]
score(samples)[source]
transform(samples)[source]
class bob.pipelines.DelayedSample(load, parent=None, delayed_attributes=None, **kwargs)

Bases: bob.pipelines.Sample

Representation of sample that can be loaded via a callable.

The optional **kwargs argument allows you to attach more attributes to this sample instance.

Parameters
  • load – A python function that can be called parameterlessly, to load the sample in question from whatever medium

  • parent (DelayedSample, Sample, None) – If passed, consider this as a parent of this sample, to copy information

  • delayed_attributes (dict or None) – A dictionary of name : load_fn pairs that will be used to create attributes of name : load_fn() in this class. Use this to option to create more delayed attributes than just sample.data.

  • kwargs (dict) – Further attributes of this sample, to be stored and eventually transmitted to transformed versions of the sample

property data

Loads the data from the disk file.

classmethod from_sample(sample: bob.pipelines.Sample, **kwargs)[source]

Creates a DelayedSample from another DelayedSample or a Sample. If the sample is a DelayedSample, its data will not be loaded.

Parameters

sample (Sample) – The sample to convert to a DelayedSample

class bob.pipelines.DelayedSampleSet(load, parent=None, **kwargs)

Bases: bob.pipelines.SampleSet

A set of samples with extra attributes

property samples
class bob.pipelines.DelayedSampleSetCached(load, parent=None, **kwargs)

Bases: bob.pipelines.DelayedSampleSet

A cached version of DelayedSampleSet

property samples
class bob.pipelines.DelayedSamplesCall(func, func_name, samples, sample_attribute='data', **kwargs)

Bases: object

class bob.pipelines.Sample(data, parent=None, **kwargs)

Bases: bob.pipelines.sample._ReprMixin

Representation of sample. A Sample is a simple container that wraps a data-point (see Samples, a way to enhance scikit pipelines with metadata)

Each sample must have the following attributes:

  • attribute data: Contains the data for this sample

Parameters
  • data (object) – Object representing the data to initialize this sample with.

  • parent (object) – A parent object from which to inherit all other attributes (except data)

class bob.pipelines.SampleBatch(samples, sample_attribute='data')

Bases: collections.abc.Sequence, bob.pipelines.sample._ReprMixin

A batch of samples that looks like [s.data for s in samples]

However, when you call np.array(SampleBatch), it will construct a numpy array from sample.data attributes in a memory efficient way.

class bob.pipelines.SampleSet(samples, parent=None, **kwargs)

Bases: collections.abc.MutableSequence, bob.pipelines.sample._ReprMixin

A set of samples with extra attributes

insert(index, item)[source]

S.insert(index, value) – insert value before index

class bob.pipelines.SampleWrapper(estimator, transform_extra_arguments=None, fit_extra_arguments=None, output_attribute='data', input_attribute='data', **kwargs)

Bases: bob.pipelines.BaseWrapper, sklearn.base.TransformerMixin

Wraps scikit-learn estimators to work with Sample-based pipelines.

Do not use this class except for scikit-learn estimators.

estimator

The scikit-learn estimator that is wrapped.

fit_extra_arguments

Use this option if you want to pass extra arguments to the fit method of the mixed instance. The format is a list of two value tuples. The first value in tuples is the name of the argument that fit accepts, like y, and the second value is the name of the attribute that samples carry. For example, if you are passing samples to the fit method and want to pass subject attributes of samples as the y argument to the fit method, you can provide [("y", "subject")] as the value for this attribute.

Type

[tuple]

output_attribute

The name of a Sample attribute where the output of the estimator will be saved to [Default is data]. For example, if output_attribute is "annotations", then sample.annotations will contain the output of the estimator.

Type

str

transform_extra_arguments

Similar to fit_extra_arguments but for the transform and other similar methods.

Type

[tuple]

decision_function(samples)[source]
fit(samples, y=None)[source]
predict(samples)[source]
predict_proba(samples)[source]
score(samples)[source]
transform(samples)[source]
class bob.pipelines.ToDaskBag(npartitions=None, partition_size=None, **kwargs)

Bases: sklearn.base.TransformerMixin, sklearn.base.BaseEstimator

Transform an arbitrary iterator into a dask.bag.Bag

Example

>>> import bob.pipelines as mario
>>> transformer = mario.ToDaskBag()
>>> dask_bag = transformer.transform([1,2,3])
>>> # dask_bag.map_partitions(...)
npartitions

Number of partitions used in dask.bag.from_sequence

Type

int

fit(X, y=None)[source]
transform(X)[source]
bob.pipelines.dask_tags(estimator)[source]

Recursively collects resource_tags in dasked estimators.

bob.pipelines.wrap(bases, estimator=None, **kwargs)[source]

Wraps several estimators inside each other.

Parameters
  • bases (list) – A list of classes to be used

  • estimator (object, optional) – An initial estimator to be wrapped inside other wrappers. If None, the first class will be used to initialize the estimator.

  • **kwargs – Extra parameters passed to the init of classes.

Returns

The wrapped estimator

Return type

object

Raises

ValueError – If not all kwargs are consumed.

Heterogeneous SGE

class bob.pipelines.distributed.sge.SGEIdiapJob(*args, queue=None, project=None, resource_spec=None, job_extra=None, config_name='sge', **kwargs)[source]

Bases: dask_jobqueue.core.Job

Launches a SGE Job in the IDIAP cluster. This class basically encodes the CLI command that bootstrap the worker in a SGE job. Check here https://distributed.dask.org/en/latest/resources.html#worker-resources for more information.

..note: This is class is temporary. It’s basically a copy from SGEJob from dask_jobqueue.

The difference is that here I’m also handling the dask job resources tag (which is not handled anywhere). This has to be patched in the Job class. Please follow here https://github.com/dask/dask-jobqueue/issues/378 to get news about this patch

submit_command = 'qsub'
cancel_command = 'qdel'
config_name = 'SGEIdiapJob'
bob.pipelines.distributed.sge.get_max_jobs(queue_dict)[source]

Given a queue list, get the max number of possible jobs.

bob.pipelines.distributed.sge.get_resource_requirements(pipeline)[source]

Get the resource requirements to execute a graph. This is useful when it’s necessary get the dictionary mapping the dask delayed keys with specific resource restrictions. Check https://distributed.dask.org/en/latest/resources.html#resources-with-collections for more information

Parameters

pipeline (sklearn.pipeline.Pipeline) – A sklearn.pipeline.Pipeline wrapper with bob.pipelines.DaskWrapper

Example

>>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) 
>>> client = Client(cluster) 
>>> from bob.pipelines.sge import get_resource_requirements  
>>> resources = get_resource_requirements(pipeline) 
>>> my_delayed_task.compute(scheduler=client, resources=resources) 
class bob.pipelines.distributed.sge.SGEMultipleQueuesCluster(log_directory='./logs', protocol='tcp://', dashboard_address=':8787', env_extra=None, sge_job_spec={'default': {'io_big': False, 'max_jobs': 128, 'memory': '8GB', 'queue': 'q_1day', 'resource_spec': '', 'resources': {'default': 1}}, 'q_1week': {'io_big': True, 'max_jobs': 24, 'memory': '4GB', 'queue': 'q_1week', 'resource_spec': '', 'resources': {'q_1week': 1}}, 'q_gpu': {'io_big': False, 'memory': '30GB', 'queue': 'q_gpu', 'resource_spec': '', 'resources': {'q_gpu': 1}}, 'q_long_gpu': {'io_big': False, 'memory': '30GB', 'queue': 'q_long_gpu', 'resource_spec': '', 'resources': {'q_long_gpu': 1}}, 'q_short_gpu': {'io_big': False, 'max_jobs': 45, 'memory': '30GB', 'queue': 'q_short_gpu', 'resource_spec': '', 'resources': {'q_short_gpu': 1}}}, min_jobs=1, project=None, **kwargs)[source]

Bases: dask_jobqueue.core.JobQueueCluster

Launch Dask jobs in the SGE cluster allowing the request of multiple queues.

Parameters

log_directory (str) –

Default directory for the SGE logs

protocol: str

Scheduler communication protocol

dashboard_address: str

Default port for the dask dashboard,

env_extra: str,

Extra environment variables to send to the workers

sge_job_spec: dict

Dictionary containing a minimum specification for the qsub command. It consists of:

queue: SGE queue memory: Memory requirement in GB (e.g. 4GB) io_bio: set the io_big flag resource_spec: Whatever extra argument to be sent to qsub (qsub -l) tag: Mark this worker with an specific tag so dask scheduler can place specific tasks to it (https://distributed.dask.org/en/latest/resources.html) max_jobs: Maximum number of jobs in the queue

min_jobs: int

Lower bound for the number of jobs for self.adapt

Example

Below follow a vanilla-example that will create a set of jobs on all.q:

>>> from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster  
>>> from dask.distributed import Client 
>>> cluster = SGEMultipleQueuesCluster() 
>>> cluster.scale_up(10) 
>>> client = Client(cluster) 

It’s possible to demand a resource specification yourself:

>>> Q_1DAY_IO_BIG_SPEC = {
...        "default": {
...        "queue": "q_1day",
...        "memory": "8GB",
...        "io_big": True,
...        "resource_spec": "",
...        "resources": "",
...    }
... }
>>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_IO_BIG_SPEC) 
>>> cluster.scale_up(10) 
>>> client = Client(cluster) 

More than one jon spec can be set:

>>> Q_1DAY_GPU_SPEC = {
...         "default": {
...             "queue": "q_1day",
...             "memory": "8GB",
...             "io_big": True,
...             "resource_spec": "",
...             "resources": "",
...         },
...         "gpu": {
...             "queue": "q_gpu",
...             "memory": "12GB",
...             "io_big": False,
...             "resource_spec": "",
...             "resources": {"GPU":1},
...         },
...     }
>>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) 
>>> cluster.scale_up(10) 
>>> cluster.scale_up(1, sge_job_spec_key="gpu") 
>>> client = Client(cluster) 

Adaptive job allocation can also be used via AdaptiveIdiap extension:

>>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC)  
>>> cluster.adapt(Adaptive=AdaptiveIdiap,minimum=2, maximum=10) 
>>> client = Client(cluster)     
scale(n_jobs, sge_job_spec_key='default')[source]

Launch an SGE job in the Idiap SGE cluster.

Parameters
  • n_jobs (int) – Quantity of jobs to scale

  • sge_job_spec_key (str) – One of the specs SGEMultipleQueuesCluster.sge_job_spec

scale_up(n_jobs, sge_job_spec_key=None)[source]

Scale cluster up.

This is supposed to be used by the scheduler while dynamically allocating resources

async scale_down(workers, sge_job_spec_key=None)[source]

Scale cluster down.

This is supposed to be used by the scheduler while dynamically allocating resources

adapt(*args, **kwargs)[source]

Scale Dask cluster automatically based on scheduler activity.

Parameters
  • minimum (int) – Minimum number of workers to keep around for the cluster

  • maximum (int) – Maximum number of workers to keep around for the cluster

  • minimum_memory (str) – Minimum amount of memory for the cluster

  • maximum_memory (str) – Maximum amount of memory for the cluster

  • minimum_jobs (int) – Minimum number of jobs

  • maximum_jobs (int) – Maximum number of jobs

  • **kwargs – Extra parameters to pass to dask.distributed.Adaptive

See also

dask.distributed.Adaptive

for more keyword arguments

class bob.pipelines.distributed.sge.AdaptiveMultipleQueue(cluster=None, interval=None, minimum=None, maximum=None, wait_count=None, target_duration=None, worker_key=None, **kwargs)[source]

Bases: distributed.deploy.adaptive.Adaptive

Custom mechanism to adaptively allocate workers based on scheduler load.

This custom implementation extends the Adaptive.recommendations by looking at the distributed.scheduler.TaskState.resource_restrictions.

The heuristics is:

Note

If a certain task has the status no-worker and it has resource_restrictions, the scheduler should request a job matching those resource restrictions

async recommendations(target: int) dict[source]

Make scale up/down recommendations based on current state and target.

async scale_up(n, sge_job_spec_key='default')[source]
async scale_down(workers, sge_job_spec_key='default')[source]
minimum: int
maximum: int | float
wait_count: int
interval: int | float
periodic_callback: PeriodicCallback | None
close_counts: defaultdict[WorkerState, int]
log: deque[tuple[float, dict]]
class bob.pipelines.distributed.sge.SchedulerResourceRestriction(*args, **kwargs)[source]

Bases: distributed.scheduler.Scheduler

Idiap extended distributed scheduler.

This scheduler extends Scheduler by just adding a handler that fetches, at every scheduler cycle, the resource restrictions of a task that has status no-worker

get_no_worker_tasks_resource_restrictions(comm=None)[source]

Get the a task resource restrictions for jobs that has the status ‘no-worker’.

Transformers

bob.pipelines.transformers.CheckpointSampleFunctionTransformer(**kwargs)

Class that transforms Scikit learn FunctionTransformer (https://scikit-l earn.org/stable/modules/generated/sklearn.preprocessing.FunctionTransformer .html) work with Sample-based pipelines.

Furthermore, it makes it checkpointable

bob.pipelines.transformers.CheckpointSampleLinearize(**kwargs)
bob.pipelines.transformers.CheckpointSamplePCA(**kwargs)

Enables SAMPLE and CHECKPOINTIN handling for sklearn.decomposition.PCA

bob.pipelines.transformers.FileLoader(original_directory, original_extension=None, **kwargs)
class bob.pipelines.transformers.Linearize(**kwargs)

Bases: sklearn.preprocessing._function_transformer.FunctionTransformer

Extracts features by simply concatenating all elements of the data into one long vector.

bob.pipelines.transformers.SampleFunctionTransformer(**kwargs)

Class that transforms Scikit learn FunctionTransformer (https://scikit-l earn.org/stable/modules/generated/sklearn.preprocessing.FunctionTransformer .html) work with Sample-based pipelines.

bob.pipelines.transformers.SampleLinearize(**kwargs)
bob.pipelines.transformers.SamplePCA(**kwargs)

Enables SAMPLE handling for sklearn.decomposition.PCA

class bob.pipelines.transformers.StatelessPipeline(steps, *, memory=None, verbose=False)

Bases: sklearn.pipeline.Pipeline

fit(X, y=None, **fit_params)[source]

Does nothing

steps: List[Any]
bob.pipelines.transformers.Str_To_Types(fieldtypes)[source]

Converts str fields in samples to a different type

Parameters

fieldtypes (dict) – A dict that specifies the functions to be used to convert strings to other types.

Returns

A scikit-learn transformer that does the conversion.

Return type

object

Example

>>> from bob.pipelines import Sample
>>> from bob.pipelines.transformers import Str_To_Types, str_to_bool
>>> samples = [Sample(None, id="1", flag="True"), Sample(None, id="2", flag="False")]
>>> transformer = Str_To_Types(fieldtypes=dict(id=int, flag=str_to_bool))
>>> transformer.transform(samples)
[Sample(data=None, id=1, flag=True), Sample(data=None, id=2, flag=False)]
bob.pipelines.transformers.str_to_bool(value)[source]

xarray Wrapper

bob.pipelines.xarray.save(data, path)[source]
bob.pipelines.xarray.load(path)[source]
bob.pipelines.xarray.samples_to_dataset(samples, meta=None, npartitions=48, shuffle=False)[source]

Converts a list of samples to a dataset.

See Efficient pipelines with dask and xarray.

Parameters
  • samples (list) – A list of Sample or DelayedSample objects.

  • meta (xarray.DataArray, optional) – An xarray.DataArray to be used as a template for data inside samples.

  • npartitions (int, optional) – The number of partitions to partition the samples.

  • shuffle (bool, optional) – If True, shuffles the samples (in-place) before constructing the dataset.

Returns

The constructed dataset with at least a data variable.

Return type

xarray.Dataset

class bob.pipelines.xarray.Block(estimator=None, output_dtype=<class 'float'>, output_dims=((None, nan), ), fit_input='data', transform_input='data', estimator_name=None, model_path=None, features_dir=None, extension='.hdf5', save_func=None, load_func=None, dataset_map=None, input_dask_array=False, fit_kwargs=None, **kwargs)[source]

Bases: bob.pipelines.sample._ReprMixin

A block representation in a graph. This class is meant to be used with DatasetPipeline.

dataset_map

A callable that transforms the input dataset into another dataset.

Type

callable

estimator

A scikit-learn estimator

Type

object

estimator_name

Name of the estimator

Type

str

extension

The extension of checkpointed features.

Type

str

features_dir

The directory to save the features.

Type

str

fit_input

A str or list of str of column names of the dataset to be given to the .fit method.

Type

str or list

fit_kwargs

A dict of fit_kwargs to be passed to the .fit method of the estimator.

Type

None or dict

input_dask_array

Whether the estimator takes dask arrays in its fit method or not.

Type

bool

load_func

A function to save the features. Defaults to np.load.

Type

callable

model_path

If given, the estimator will be pickled here.

Type

str or None

output_dims

A list of (dim_name, dim_size) tuples. If dim_name is None, a new name is automatically generated, otherwise it should be a string. dim_size should be a positive integer or nan for new dimensions or None for existing dimensions.

Type

list

output_dtype

The dtype of the output of the transformer. Defaults to float.

Type

object

save_func

A function to save the features. Defaults to np.save with allow_pickle set to False.

Type

callable

transform_input

A str or list of str of column names of the dataset to be given to the .transform method.

Type

str or list

property output_ndim
make_path(key)[source]
save(key, data)[source]
load(key)[source]
class bob.pipelines.xarray.DatasetPipeline(graph, **kwargs)[source]

Bases: sklearn.utils.metaestimators._BaseComposition

A dataset-based scikit-learn pipeline. See Efficient pipelines with dask and xarray.

graph

A list of Block’s to be applied on input dataset.

Type

list

fit(ds, y=None)[source]
transform(ds)[source]
decision_function(ds)[source]
predict(ds)[source]
predict_proba(ds)[source]
score(ds)[source]
steps: List[Any]

Filelist Datasets

The principles of this module are:

  • one csv file -> one set

  • one row -> one sample

  • csv files could exist in a tarball or inside a folder

  • scikit-learn transformers are used to further transform samples

  • several csv files (sets) compose a protocol

  • several protocols compose a database

class bob.pipelines.datasets.FileListToSamples(list_file, transformer=None, **kwargs)[source]

Bases: collections.abc.Iterable

Converts a list of files to a list of samples.

class bob.pipelines.datasets.CSVToSamples(list_file, transformer=None, fieldnames=None, dict_reader_kwargs=None, **kwargs)[source]

Bases: bob.pipelines.datasets.FileListToSamples

Converts a csv file to a list of samples

property rows
class bob.pipelines.datasets.FileListDatabase(dataset_protocols_path, protocol, reader_cls=<class 'bob.pipelines.datasets.CSVToSamples'>, transformer=None, **kwargs)[source]

Bases: object

A generic database interface. Use this class to convert csv files to a database that outputs samples. The format is simple, the files must be inside a folder (or a compressed tarball) with the following format:

dataset_protocols_path/<protocol>/<group>.csv

The top folders are the name of the protocols (if you only have one, you may name it default). Inside each protocol folder, there are <group>.csv files where the name of the file specifies the name of the group. We recommend using the names train, dev, eval for your typical training, development, and test sets.

property protocol
property transformer
groups()[source]
protocols()[source]
list_file(group)[source]
get_reader(group)[source]
samples(groups=None)[source]

Get samples of a certain group

Parameters

groups (str, optional) – A str or list of str to be used for filtering samples, by default None

Returns

A list containing the samples loaded from csv files.

Return type

list

static sort(samples, unique=True)[source]

Sorts samples and removes duplicates by default.