Python API for bob.pipelines

Summary

Sample’s API

bob.pipelines.Sample(data[, parent])

Representation of sample.

bob.pipelines.DelayedSample(load[, parent, ...])

Representation of sample that can be loaded via a callable.

bob.pipelines.SampleSet(samples[, parent])

A set of samples with extra attributes

bob.pipelines.DelayedSampleSet(load[, parent])

A set of samples with extra attributes

bob.pipelines.DelayedSampleSetCached(load[, ...])

A cached version of DelayedSampleSet

bob.pipelines.SampleBatch(samples[, ...])

A batch of samples that looks like [s.data for s in samples]

Wrapper’s API

bob.pipelines.wrap(bases[, estimator])

Wraps several estimators inside each other.

bob.pipelines.BaseWrapper()

The base class for all wrappers.

bob.pipelines.SampleWrapper(estimator[, ...])

Wraps scikit-learn estimators to work with Sample-based pipelines.

bob.pipelines.CheckpointWrapper(estimator[, ...])

Wraps Sample-based estimators so the results are saved in disk.

bob.pipelines.DaskWrapper(estimator[, ...])

Wraps Scikit estimators to handle Dask Bags as input.

bob.pipelines.ToDaskBag([npartitions, ...])

Transform an arbitrary iterator into a dask.bag.Bag

bob.pipelines.DelayedSamplesCall(func, ...)

Database’s API

bob.pipelines.FileListDatabase(...[, ...])

A generic database interface. Use this class to convert csv files to a database that outputs samples. The format is simple, the files must be inside a folder (or a compressed tarball) with the following format::.

bob.pipelines.FileListToSamples(list_file[, ...])

Converts a list of files to a list of samples.

bob.pipelines.CSVToSamples(list_file[, ...])

Converts a csv file to a list of samples

Transformers’ API

bob.pipelines.transformers.Str_To_Types(...)

Converts str fields in samples to a different type

bob.pipelines.transformers.str_to_bool(value)

Xarray’s API

bob.pipelines.xarray.samples_to_dataset(samples)

Converts a list of samples to a dataset.

bob.pipelines.xarray.DatasetPipeline(graph, ...)

A dataset-based scikit-learn pipeline.

bob.pipelines.xarray.Block([estimator, ...])

A block representation in a graph.

Utilities

bob.pipelines.assert_picklable(obj)

Test if an object is picklable or not.

bob.pipelines.check_parameter_for_validity(...)

Checks the given parameter for validity

bob.pipelines.check_parameters_for_validity(...)

Checks the given parameters for validity.

bob.pipelines.dask_tags(estimator)

Recursively collects resource_tags in dasked estimators.

bob.pipelines.estimator_requires_fit(estimator)

bob.pipelines.flatten_samplesets(samplesets)

Takes a list of SampleSets (with one or multiple samples in each SampleSet) and returns a list of SampleSets (with one sample in each SampleSet)

bob.pipelines.get_bob_tags([estimator, ...])

Returns the default tags of a Transformer unless forced or specified.

bob.pipelines.hash_string(key[, bucket_size])

Generates a hash code given a string.

bob.pipelines.is_instance_nested(instance, ...)

Check if an object and its nested objects is an instance of a class.

bob.pipelines.is_picklable(obj)

Test if an object is picklable or not.

bob.pipelines.is_pipeline_wrapped(estimator, ...)

Iterates over the transformers of sklearn.pipeline.Pipeline checking and checks if they were wrapped with wrapper class

Main module

class bob.pipelines.BaseWrapper

Bases: sklearn.base.MetaEstimatorMixin, sklearn.base.BaseEstimator

The base class for all wrappers.

class bob.pipelines.CSVToSamples(list_file, transformer=None, dict_reader_kwargs=None, **kwargs)

Bases: bob.pipelines.FileListToSamples

Converts a csv file to a list of samples

property rows
class bob.pipelines.CheckpointWrapper(estimator, model_path=None, features_dir=None, extension=None, save_func=None, load_func=None, sample_attribute=None, hash_fn=None, attempts=10, force=False, **kwargs)

Bases: bob.pipelines.BaseWrapper, sklearn.base.TransformerMixin

Wraps Sample-based estimators so the results are saved in disk.

Parameters
  • estimator – The scikit-learn estimator to be wrapped.

  • model_path (str) – Saves the estimator state in this directory if the estimator is stateful

  • features_dir (str) – Saves the transformed data in this directory

  • extension (str) – Default extension of the transformed features. If None, will use the bob_checkpoint_extension tag in the estimator, or default to .h5.

  • save_func – Pointer to a customized function that saves transformed features to disk. If None, will use the bob_feature_save_fn tag in the estimator, or default to bob.io.base.save.

  • load_func – Pointer to a customized function that loads transformed features from disk. If None, will use the bob_feature_load_fn tag in the estimator, or default to bob.io.base.load.

  • sample_attribute (str) – Defines the payload attribute of the sample. If None, will use the bob_output tag in the estimator, or default to data.

  • hash_fn – Pointer to a hash function. This hash function maps sample.key to a hash code and this hash code corresponds a relative directory where a single sample will be checkpointed. This is useful when is desirable file directories with less than a certain number of files.

  • attempts – Number of checkpoint attempts. Sometimes, because of network/disk issues files can’t be saved. This argument sets the maximum number of attempts to checkpoint a sample.

  • force (bool) – If True, will recompute the checkpoints even if they exists

decision_function(samples)[source]
fit(samples, y=None, **kwargs)[source]
load(sample, path)[source]
load_model()[source]
make_path(sample)[source]
predict(samples)[source]
predict_proba(samples)[source]
save(sample)[source]
save_model()[source]
score(samples)[source]
transform(samples)[source]
class bob.pipelines.DaskWrapper(estimator, fit_tag=None, transform_tag=None, fit_supports_dask_array=None, fit_supports_dask_bag=None, **kwargs)

Bases: bob.pipelines.BaseWrapper, sklearn.base.TransformerMixin

Wraps Scikit estimators to handle Dask Bags as input.

Parameters
  • fit_resource (str) – Mark the delayed(self.fit) with this value. This can be used in a future delayed(self.fit).compute(resources=resource_tape) so dask scheduler can place this task in a particular resource (e.g GPU)

  • transform_resource (str) – Mark the delayed(self.transform) with this value. This can be used in a future delayed(self.transform).compute(resources=resource_tape) so dask scheduler can place this task in a particular resource (e.g GPU)

decision_function(samples)[source]
fit(X, y=None, **fit_params)[source]
predict(samples)[source]
predict_proba(samples)[source]
score(samples)[source]
transform(samples)[source]
class bob.pipelines.DelayedSample(load, parent=None, delayed_attributes=None, **kwargs)

Bases: bob.pipelines.Sample

Representation of sample that can be loaded via a callable.

The optional **kwargs argument allows you to attach more attributes to this sample instance.

Parameters
  • load – A python function that can be called parameterlessly, to load the sample in question from whatever medium

  • parent (DelayedSample, Sample, None) – If passed, consider this as a parent of this sample, to copy information

  • delayed_attributes (dict or None) – A dictionary of name : load_fn pairs that will be used to create attributes of name : load_fn() in this class. Use this to option to create more delayed attributes than just sample.data.

  • kwargs (dict) – Further attributes of this sample, to be stored and eventually transmitted to transformed versions of the sample

property data

Loads the data from the disk file.

classmethod from_sample(sample: bob.pipelines.Sample, **kwargs)[source]

Creates a DelayedSample from another DelayedSample or a Sample. If the sample is a DelayedSample, its data will not be loaded.

Parameters

sample (Sample) – The sample to convert to a DelayedSample

class bob.pipelines.DelayedSampleSet(load, parent=None, **kwargs)

Bases: bob.pipelines.SampleSet

A set of samples with extra attributes

property samples
class bob.pipelines.DelayedSampleSetCached(load, parent=None, **kwargs)

Bases: bob.pipelines.DelayedSampleSet

A cached version of DelayedSampleSet

property samples
class bob.pipelines.DelayedSamplesCall(func, func_name, samples, sample_attribute='data', **kwargs)

Bases: object

class bob.pipelines.FileListDatabase(dataset_protocols_path, protocol, reader_cls=<class 'bob.pipelines.CSVToSamples'>, transformer=None, **kwargs)

Bases: object

A generic database interface. Use this class to convert csv files to a database that outputs samples. The format is simple, the files must be inside a folder (or a compressed tarball) with the following format:

dataset_protocols_path/<protocol>/<group>.csv

The top folders are the name of the protocols (if you only have one, you may name it default). Inside each protocol folder, there are <group>.csv files where the name of the file specifies the name of the group. We recommend using the names train, dev, eval for your typical training, development, and test sets.

get_reader(group)[source]
groups()[source]
list_file(group)[source]
property protocol
protocols()[source]
samples(groups=None)[source]

Get samples of a certain group

Parameters

groups (str, optional) – A str or list of str to be used for filtering samples, by default None

Returns

A list containing the samples loaded from csv files.

Return type

list

static sort(samples, unique=True)[source]

Sorts samples and removes duplicates by default.

property transformer
class bob.pipelines.FileListToSamples(list_file, transformer=None, **kwargs)

Bases: collections.abc.Iterable

Converts a list of files to a list of samples.

class bob.pipelines.Sample(data, parent=None, **kwargs)

Bases: bob.pipelines.sample._ReprMixin

Representation of sample. A Sample is a simple container that wraps a data-point (see Samples, a way to enhance scikit pipelines with metadata)

Each sample must have the following attributes:

  • attribute data: Contains the data for this sample

Parameters
  • data (object) – Object representing the data to initialize this sample with.

  • parent (object) – A parent object from which to inherit all other attributes (except data)

class bob.pipelines.SampleBatch(samples, sample_attribute='data')

Bases: collections.abc.Sequence, bob.pipelines.sample._ReprMixin

A batch of samples that looks like [s.data for s in samples]

However, when you call np.array(SampleBatch), it will construct a numpy array from sample.data attributes in a memory efficient way.

class bob.pipelines.SampleSet(samples, parent=None, **kwargs)

Bases: collections.abc.MutableSequence, bob.pipelines.sample._ReprMixin

A set of samples with extra attributes

insert(index, item)[source]

S.insert(index, value) – insert value before index

class bob.pipelines.SampleWrapper(estimator, transform_extra_arguments=None, fit_extra_arguments=None, output_attribute=None, input_attribute=None, **kwargs)

Bases: bob.pipelines.BaseWrapper, sklearn.base.TransformerMixin

Wraps scikit-learn estimators to work with Sample-based pipelines.

Do not use this class except for scikit-learn estimators.

estimator

The scikit-learn estimator that is wrapped.

fit_extra_arguments

Use this option if you want to pass extra arguments to the fit method of the mixed instance. The format is a list of two value tuples. The first value in tuples is the name of the argument that fit accepts, like y, and the second value is the name of the attribute that samples carry. For example, if you are passing samples to the fit method and want to pass subject attributes of samples as the y argument to the fit method, you can provide [("y", "subject")] as the value for this attribute.

Type

[tuple]

output_attribute

The name of a Sample attribute where the output of the estimator will be saved to [Default is data]. For example, if output_attribute is "annotations", then sample.annotations will contain the output of the estimator.

Type

str

transform_extra_arguments

Similar to fit_extra_arguments but for the transform and other similar methods.

Type

[tuple]

decision_function(samples)[source]
fit(samples, y=None, **kwargs)[source]
predict(samples)[source]
predict_proba(samples)[source]
score(samples)[source]
transform(samples)[source]
class bob.pipelines.ToDaskBag(npartitions=None, partition_size=None, **kwargs)

Bases: sklearn.base.TransformerMixin, sklearn.base.BaseEstimator

Transform an arbitrary iterator into a dask.bag.Bag

Example

>>> import bob.pipelines as mario
>>> transformer = mario.ToDaskBag()
>>> dask_bag = transformer.transform([1,2,3])
>>> # dask_bag.map_partitions(...)
npartitions

Number of partitions used in dask.bag.from_sequence

Type

int

fit(X, y=None)[source]
transform(X)[source]
bob.pipelines.assert_picklable(obj)[source]

Test if an object is picklable or not.

bob.pipelines.check_parameter_for_validity(parameter, parameter_description, valid_parameters, default_parameter=None)[source]

Checks the given parameter for validity

Ensures a given parameter is in the set of valid parameters. If the parameter is None or empty, the value in default_parameter will be returned, in case it is specified, otherwise a ValueError will be raised.

This function will return the parameter after the check tuple or list of parameters, or raise a ValueError.

Parameters
  • parameter (str or None) – The single parameter to be checked. Might be a string or None.

  • parameter_description (str) – A short description of the parameter. This will be used to raise an exception in case the parameter is not valid.

  • valid_parameters (list of str) – A list/tuple of valid values for the parameters.

  • default_parameter (list of str, optional) – The default parameter that will be returned in case parameter is None or empty. If omitted and parameter is empty, a ValueError is raised.

Returns

The validated parameter.

Return type

str

Raises

ValueError – If the specified parameter is invalid.

bob.pipelines.check_parameters_for_validity(parameters, parameter_description, valid_parameters, default_parameters=None)[source]

Checks the given parameters for validity.

Checks a given parameter is in the set of valid parameters. It also assures that the parameters form a tuple or a list. If parameters is ‘None’ or empty, the default_parameters will be returned (if default_parameters is omitted, all valid_parameters are returned).

This function will return a tuple or list of parameters, or raise a ValueError.

Parameters
  • parameters (str or list of str or None) – The parameters to be checked. Might be a string, a list/tuple of strings, or None.

  • parameter_description (str) – A short description of the parameter. This will be used to raise an exception in case the parameter is not valid.

  • valid_parameters (list of str) – A list/tuple of valid values for the parameters.

  • default_parameters (list of str or None) – The list/tuple of default parameters that will be returned in case parameters is None or empty. If omitted, all valid_parameters are used.

Returns

A list or tuple containing the valid parameters.

Return type

tuple

Raises

ValueError – If some of the parameters are not valid.

bob.pipelines.dask_tags(estimator)[source]

Recursively collects resource_tags in dasked estimators.

bob.pipelines.estimator_requires_fit(estimator)[source]
bob.pipelines.flatten_samplesets(samplesets)[source]

Takes a list of SampleSets (with one or multiple samples in each SampleSet) and returns a list of SampleSets (with one sample in each SampleSet)

Parameters

samplesets (list of bob.pipelines.SampleSet) – Input list of SampleSets (with one or multiple samples in each SampleSet

bob.pipelines.get_bob_tags(estimator=None, force_tags=None)[source]

Returns the default tags of a Transformer unless forced or specified.

Relies on the tags API of sklearn to set and retrieve the tags.

Specify an estimator tag values with estimator._more_tags:

class My_annotator_transformer(sklearn.base.BaseEstimator):
    def _more_tags(self):
        return {"bob_output": "annotations"}

The returned tags will take their value with the following priority:

  1. key:value in force_tags, if it is present;

  2. key:value in estimator tags (set with estimator._more_tags()) if it exists;

  3. the default value for that tag if none of the previous exist.

Examples

bob_input: str

The Sample attribute passed to the first argument of the fit or transform method. Default value is data. Example:

{"bob_input": ("annotations")}

will result in:

estimator.transform(sample.annotations)
bob_transform_extra_input: tuple of str

Each element of the tuple is a str representing an attribute of a Sample object. Each attribute of the sample will be passed as argument to the transform method in that order. Default value is an empty tuple (,). Example:

{"bob_transform_extra_input": (("kwarg_1","annotations"), ("kwarg_2","gender"))}

will result in:

estimator.transform(sample.data, kwarg_1=sample.annotations, kwarg_2=sample.gender)
bob_fit_extra_input: tuple of str

Each element of the tuple is a str representing an attribute of a Sample object. Each attribute of the sample will be passed as argument to the fit method in that order. Default value is an empty tuple (,). Example:

{"bob_fit_extra_input": (("y", "annotations"), ("extra "metadata"))}

will result in:

estimator.fit(sample.data, y=sample.annotations, extra=sample.metadata)
bob_output: str

The Sample attribute in which the output of the transform is stored. Default value is data.

bob_checkpoint_extension: str

The extension of each checkpoint file. Default value is .h5.

bob_features_save_fn: func

The function used to save each checkpoint file. Default value is bob.io.base.save.

bob_features_load_fn: func

The function used to load each checkpoint file. Default value is bob.io.base.load.

bob_fit_supports_dask_array: bool

Indicates that the fit method of that estimator accepts dask arrays as input. You may only use this tag if you accept X (N, M) and optionally y (N) as input. The fit function may not accept any other input. Default value is False.

bob_fit_supports_dask_bag: bool

Indicates that the fit method of that estimator accepts dask bags as input. If true, each input parameter of the fit will be a dask bag. You still can (and normally you should) wrap your estimator with the SampleWrapper so the same code runs with and without dask. Default value is False.

bob_checkpoint_features: bool

If False, the features of the estimator will never be saved. Default value is True.

Parameters
  • estimator (sklearn.BaseEstimator or None) – An estimator class with tags that will overwrite the default values. Setting to None will return the default values of every tags.

  • force_tags (dict[str, Any] or None) – Tags with a non-default value that will overwrite the default and the estimator tags.

Returns

The resulting tags with a value (either specified in the estimator, forced by the arguments, or default)

Return type

dict[str, Any]

bob.pipelines.getattr_nested(estimator, attr)[source]
bob.pipelines.hash_string(key, bucket_size=1000)[source]

Generates a hash code given a string. The have is given by the sum(ord([string])) mod bucket_size

Parameters
  • key (str) – Input string to be hashed

  • bucket_size (int) – Size of the hash table.

bob.pipelines.is_instance_nested(instance, attribute, isinstance_of)[source]

Check if an object and its nested objects is an instance of a class.

This is useful while using aggregation and it’s necessary to check if some functionally was aggregated

Parameters
  • instance – Object to be searched

  • attribute – Attribute name to be recursively searched

  • isinstance_of – Instance class to be searched

bob.pipelines.is_picklable(obj)[source]

Test if an object is picklable or not.

bob.pipelines.is_pipeline_wrapped(estimator, wrapper)[source]

Iterates over the transformers of sklearn.pipeline.Pipeline checking and checks if they were wrapped with wrapper class

Parameters
Returns

Returns a list of boolean values, where each value indicates if the corresponding estimator is wrapped or not

Return type

list

bob.pipelines.wrap(bases, estimator=None, **kwargs)[source]

Wraps several estimators inside each other.

If estimator is a pipeline, the estimators in that pipeline are wrapped.

The default behavior of wrappers can be customized through the tags; see bob.pipelines.get_bob_tags for more information.

Parameters
  • bases (list) – A list of classes to be used to wrap estimator.

  • estimator (object, optional) – An initial estimator to be wrapped inside other wrappers. If None, the first class will be used to initialize the estimator.

  • **kwargs – Extra parameters passed to the init of classes.

Returns

The wrapped estimator

Return type

object

Raises

ValueError – If not all kwargs are consumed.

Heterogeneous SGE

class bob.pipelines.distributed.sge.SGEIdiapJob(*args, queue=None, project=None, resource_spec=None, job_extra=None, config_name='sge', **kwargs)[source]

Bases: dask_jobqueue.core.Job

Launches a SGE Job in the IDIAP cluster. This class basically encodes the CLI command that bootstrap the worker in a SGE job. Check here https://distributed.dask.org/en/latest/resources.html#worker-resources for more information.

..note: This is class is temporary. It’s basically a copy from SGEJob from dask_jobqueue.

The difference is that here I’m also handling the dask job resources tag (which is not handled anywhere). This has to be patched in the Job class. Please follow here https://github.com/dask/dask-jobqueue/issues/378 to get news about this patch

submit_command = 'qsub'
cancel_command = 'qdel'
config_name = 'SGEIdiapJob'
bob.pipelines.distributed.sge.get_max_jobs(queue_dict)[source]

Given a queue list, get the max number of possible jobs.

bob.pipelines.distributed.sge.get_resource_requirements(pipeline)[source]

Get the resource requirements to execute a graph. This is useful when it’s necessary get the dictionary mapping the dask delayed keys with specific resource restrictions. Check https://distributed.dask.org/en/latest/resources.html#resources-with-collections for more information

Parameters

pipeline (sklearn.pipeline.Pipeline) – A sklearn.pipeline.Pipeline wrapper with bob.pipelines.DaskWrapper

Example

>>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) 
>>> client = Client(cluster) 
>>> from bob.pipelines.sge import get_resource_requirements  
>>> resources = get_resource_requirements(pipeline) 
>>> my_delayed_task.compute(scheduler=client, resources=resources) 
class bob.pipelines.distributed.sge.SGEMultipleQueuesCluster(log_directory='./logs', protocol='tcp://', dashboard_address=':8787', env_extra=None, sge_job_spec={'default': {'io_big': False, 'max_jobs': 128, 'memory': '8GB', 'queue': 'q_1day', 'resource_spec': '', 'resources': {'default': 1}}, 'q_1week': {'io_big': True, 'max_jobs': 24, 'memory': '4GB', 'queue': 'q_1week', 'resource_spec': '', 'resources': {'q_1week': 1}}, 'q_gpu': {'io_big': False, 'memory': '30GB', 'queue': 'q_gpu', 'resource_spec': '', 'resources': {'q_gpu': 1}}, 'q_long_gpu': {'io_big': False, 'memory': '30GB', 'queue': 'q_long_gpu', 'resource_spec': '', 'resources': {'q_long_gpu': 1}}, 'q_short_gpu': {'io_big': False, 'max_jobs': 45, 'memory': '30GB', 'queue': 'q_short_gpu', 'resource_spec': '', 'resources': {'q_short_gpu': 1}}}, min_jobs=1, project=None, **kwargs)[source]

Bases: dask_jobqueue.core.JobQueueCluster

Launch Dask jobs in the SGE cluster allowing the request of multiple queues.

Parameters

log_directory (str) –

Default directory for the SGE logs

protocol: str

Scheduler communication protocol

dashboard_address: str

Default port for the dask dashboard,

env_extra: str,

Extra environment variables to send to the workers

sge_job_spec: dict

Dictionary containing a minimum specification for the qsub command. It consists of:

queue: SGE queue memory: Memory requirement in GB (e.g. 4GB) io_bio: set the io_big flag resource_spec: Whatever extra argument to be sent to qsub (qsub -l) tag: Mark this worker with an specific tag so dask scheduler can place specific tasks to it (https://distributed.dask.org/en/latest/resources.html) max_jobs: Maximum number of jobs in the queue

min_jobs: int

Lower bound for the number of jobs for self.adapt

Example

Below follow a vanilla-example that will create a set of jobs on all.q:

>>> from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster  
>>> from dask.distributed import Client 
>>> cluster = SGEMultipleQueuesCluster() 
>>> cluster.scale_up(10) 
>>> client = Client(cluster) 

It’s possible to demand a resource specification yourself:

>>> Q_1DAY_IO_BIG_SPEC = {
...        "default": {
...        "queue": "q_1day",
...        "memory": "8GB",
...        "io_big": True,
...        "resource_spec": "",
...        "resources": "",
...    }
... }
>>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_IO_BIG_SPEC) 
>>> cluster.scale_up(10) 
>>> client = Client(cluster) 

More than one jon spec can be set:

>>> Q_1DAY_GPU_SPEC = {
...         "default": {
...             "queue": "q_1day",
...             "memory": "8GB",
...             "io_big": True,
...             "resource_spec": "",
...             "resources": "",
...         },
...         "gpu": {
...             "queue": "q_gpu",
...             "memory": "12GB",
...             "io_big": False,
...             "resource_spec": "",
...             "resources": {"GPU":1},
...         },
...     }
>>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) 
>>> cluster.scale_up(10) 
>>> cluster.scale_up(1, sge_job_spec_key="gpu") 
>>> client = Client(cluster) 

Adaptive job allocation can also be used via AdaptiveIdiap extension:

>>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC)  
>>> cluster.adapt(Adaptive=AdaptiveIdiap,minimum=2, maximum=10) 
>>> client = Client(cluster)     
scale(n_jobs, sge_job_spec_key='default')[source]

Launch an SGE job in the Idiap SGE cluster.

Parameters
  • n_jobs (int) – Quantity of jobs to scale

  • sge_job_spec_key (str) – One of the specs SGEMultipleQueuesCluster.sge_job_spec

scale_up(n_jobs, sge_job_spec_key=None)[source]

Scale cluster up.

This is supposed to be used by the scheduler while dynamically allocating resources

async scale_down(workers, sge_job_spec_key=None)[source]

Scale cluster down.

This is supposed to be used by the scheduler while dynamically allocating resources

adapt(*args, **kwargs)[source]

Scale Dask cluster automatically based on scheduler activity.

Parameters
  • minimum (int) – Minimum number of workers to keep around for the cluster

  • maximum (int) – Maximum number of workers to keep around for the cluster

  • minimum_memory (str) – Minimum amount of memory for the cluster

  • maximum_memory (str) – Maximum amount of memory for the cluster

  • minimum_jobs (int) – Minimum number of jobs

  • maximum_jobs (int) – Maximum number of jobs

  • **kwargs – Extra parameters to pass to dask.distributed.Adaptive

See also

dask.distributed.Adaptive

for more keyword arguments

class bob.pipelines.distributed.sge.AdaptiveMultipleQueue(cluster=None, interval=None, minimum=None, maximum=None, wait_count=None, target_duration=None, worker_key=None, **kwargs)[source]

Bases: distributed.deploy.adaptive.Adaptive

Custom mechanism to adaptively allocate workers based on scheduler load.

This custom implementation extends the Adaptive.recommendations by looking at the distributed.scheduler.TaskState.resource_restrictions.

The heuristics is:

Note

If a certain task has the status no-worker and it has resource_restrictions, the scheduler should request a job matching those resource restrictions

async recommendations(target: int) dict[source]

Make scale up/down recommendations based on current state and target.

async scale_up(n, sge_job_spec_key='default')[source]
async scale_down(workers, sge_job_spec_key='default')[source]
minimum: int
maximum: int | float
wait_count: int
interval: int | float
periodic_callback: PeriodicCallback | None
close_counts: defaultdict[WorkerState, int]
log: deque[tuple[float, dict]]
class bob.pipelines.distributed.sge.SchedulerResourceRestriction(*args, **kwargs)[source]

Bases: distributed.scheduler.Scheduler

Idiap extended distributed scheduler.

This scheduler extends Scheduler by just adding a handler that fetches, at every scheduler cycle, the resource restrictions of a task that has status no-worker

get_no_worker_tasks_resource_restrictions(comm=None)[source]

Get the a task resource restrictions for jobs that has the status ‘no-worker’.

Transformers

bob.pipelines.transformers.Str_To_Types(fieldtypes)[source]

Converts str fields in samples to a different type

Parameters

fieldtypes (dict) – A dict that specifies the functions to be used to convert strings to other types.

Returns

A scikit-learn transformer that does the conversion.

Return type

object

Example

>>> from bob.pipelines import Sample
>>> from bob.pipelines.transformers import Str_To_Types, str_to_bool
>>> samples = [Sample(None, id="1", flag="True"), Sample(None, id="2", flag="False")]
>>> transformer = Str_To_Types(fieldtypes=dict(id=int, flag=str_to_bool))
>>> transformer.transform(samples)
[Sample(data=None, id=1, flag=True), Sample(data=None, id=2, flag=False)]
bob.pipelines.transformers.str_to_bool(value)[source]

xarray Wrapper

bob.pipelines.xarray.save(data, path)[source]
bob.pipelines.xarray.load(path)[source]
bob.pipelines.xarray.samples_to_dataset(samples, meta=None, npartitions=48, shuffle=False)[source]

Converts a list of samples to a dataset.

See Efficient pipelines with dask and xarray.

Parameters
  • samples (list) – A list of Sample or DelayedSample objects.

  • meta (xarray.DataArray, optional) – An xarray.DataArray to be used as a template for data inside samples.

  • npartitions (int, optional) – The number of partitions to partition the samples.

  • shuffle (bool, optional) – If True, shuffles the samples (in-place) before constructing the dataset.

Returns

The constructed dataset with at least a data variable.

Return type

xarray.Dataset

class bob.pipelines.xarray.Block(estimator=None, output_dtype=<class 'float'>, output_dims=((None, nan), ), fit_input='data', transform_input='data', estimator_name=None, model_path=None, features_dir=None, extension='.hdf5', save_func=None, load_func=None, dataset_map=None, input_dask_array=False, fit_kwargs=None, **kwargs)[source]

Bases: bob.pipelines.sample._ReprMixin

A block representation in a graph. This class is meant to be used with DatasetPipeline.

dataset_map

A callable that transforms the input dataset into another dataset.

Type

callable

estimator

A scikit-learn estimator

Type

object

estimator_name

Name of the estimator

Type

str

extension

The extension of checkpointed features.

Type

str

features_dir

The directory to save the features.

Type

str

fit_input

A str or list of str of column names of the dataset to be given to the .fit method.

Type

str or list

fit_kwargs

A dict of fit_kwargs to be passed to the .fit method of the estimator.

Type

None or dict

input_dask_array

Whether the estimator takes dask arrays in its fit method or not.

Type

bool

load_func

A function to save the features. Defaults to np.load.

Type

callable

model_path

If given, the estimator will be pickled here.

Type

str or None

output_dims

A list of (dim_name, dim_size) tuples. If dim_name is None, a new name is automatically generated, otherwise it should be a string. dim_size should be a positive integer or nan for new dimensions or None for existing dimensions.

Type

list

output_dtype

The dtype of the output of the transformer. Defaults to float.

Type

object

save_func

A function to save the features. Defaults to np.save with allow_pickle set to False.

Type

callable

transform_input

A str or list of str of column names of the dataset to be given to the .transform method.

Type

str or list

property output_ndim
make_path(key)[source]
save(key, data)[source]
load(key)[source]
class bob.pipelines.xarray.DatasetPipeline(graph, **kwargs)[source]

Bases: sklearn.utils.metaestimators._BaseComposition

A dataset-based scikit-learn pipeline. See Efficient pipelines with dask and xarray.

graph

A list of Block’s to be applied on input dataset.

Type

list

fit(ds, y=None)[source]
transform(ds)[source]
decision_function(ds)[source]
predict(ds)[source]
predict_proba(ds)[source]
score(ds)[source]
steps: List[Any]