Efficient pipelines with dask and xarray

In this guide, we will see an alternative method to what was discussed before about sample-based processing, checkpointing, dask integration. We will be doing the same concepts as discussed before but try to do it in a more efficient way.

In this guide we are interested in several things:

  1. Sample-based processing and carrying the sample metadata over the pipeline.

  2. Checkpointing: we may want to save intermediate steps.

  3. Lazy operations and graph optimizations. We’ll define all operations using dask and we will benefit from lazy operations and graph optimizations.

  4. Failed sample handling: we may want to drop some samples in the pipeline if we fail to process them.

  5. Scalability: we want to use dask-ml estimators to handle larger than memory datasets for training.

This guide builds upon scikit-learn, dask, and xarray. If you are not familiar with those libraries, you may want to get familiar with those libraries first.

First, let’s run our example classification problem without using the tools here to get familiar with our examples. We are going to do an Scaler+PCA+LDA example on the iris dataset:

>>> from sklearn import datasets
>>> from sklearn.preprocessing import StandardScaler
>>> from sklearn.decomposition import PCA
>>> from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
>>> from sklearn.pipeline import make_pipeline

>>> iris = datasets.load_iris()
>>> scaler, pca, lda = StandardScaler(), PCA(n_components=3, random_state=0), LinearDiscriminantAnalysis()
>>> pipeline = make_pipeline(scaler, pca, lda)

>>> _ = pipeline.fit(iris.data, iris.target)
>>> # scaler.fit was called
>>> # scaler.transform was called
>>> # pca.fit was called
>>> # pca.transform was called
>>> # lda.fit was called

>>> data = pipeline.decision_function(iris.data)
>>> # scaler.transform was called
>>> # pca.transform was called
>>> # lda.decision_function was called

As you can see here, the example ran fine. The iris.data was transformed twice using scaler.transform and pca.transform but that’s ok and we could have avoided that at the cost of complexity and more memory usage. Let’s go through through this example again and increase its complexity as we progress.

Sample-based processing

First, let’s look at how we can turn this into a sample-based pipeline. We need to convert our dataset to a list of samples first:

>>> import bob.pipelines as mario
>>> from functools import partial

>>> def load(i):
...     return iris.data[i]

>>> samples = [
...     mario.DelayedSample(partial(load, i), target=y)
...     for i, y in enumerate(iris.target)
... ]
>>> samples[0]
DelayedSample(target=0)

You may be already familiar with our sample concept. If not, please read more on Samples, a way to enhance scikit pipelines with metadata. Now, to optimize our process, we will represent our samples in an xarray.Dataset using dask.array.Array’s:

>>> dataset = mario.xr.samples_to_dataset(samples, npartitions=3)
>>> dataset  
<xarray.Dataset>
Dimensions:  (sample: 150, dim_0: 4)
Dimensions without coordinates: sample, dim_0
Data variables:
   target   (sample) int64 dask.array<chunksize=(50,), meta=np.ndarray>
   data     (sample, dim_0) float64 dask.array<chunksize=(50, 4), meta=np.ndarray>

You can see here that our samples were converted to a dataset of dask arrays. The dataset is made of two dimensions: sample and dim_0. We have 150 samples in our dataset and the data is made of 4 dimensional features.

We also partitioned our dataset to 3 partitions (chunks of 50 samples) to ensure efficient and parallel processing of our data. Read up on dask arrays to become more familiar with the idea of chunks.

If you want to give a name to dim_0, you can provide a meta parameter which should a xarray.DataArray providing information about data in our samples:

>>> import xarray as xr

>>> # construct the meta from one sample
>>> meta = xr.DataArray(samples[0].data, dims=("feature"))
>>> dataset = mario.xr.samples_to_dataset(samples, npartitions=3, meta=meta)
>>> dataset  
<xarray.Dataset>
Dimensions:  (sample: 150, feature: 4)
Dimensions without coordinates: sample, feature
Data variables:
   target   (sample) int64 dask.array<chunksize=(50,), meta=np.ndarray>
   data     (sample, feature) float64 dask.array<chunksize=(50, 4), meta=np.ndarray>

Now, we want to build a pipeline that instead of numpy arrays, processes this dataset instead. We can do that with our DatasetPipeline. A dataset pipeline is made of scikit-learn estimators but instead of working on numpy arrays, it works on xarray datasets with dask arrays inside them. We will build our pipeline using again PCA and LDA. To build a dataset pipeline, we need to tell DatasetPipeline which variables to pass to our estimators. By default, DatasetPipeline will pass the data variable to our transformer. This will work for PCA since it only needs data in its .fit and .transform methods. However, for LDA, we also need to provide target when fitting the estimator. DatasetPipeline as input takes a list of estimators. If you have to give more information about an estimator, you pass a dictionary instead.

>>> pipeline = mario.xr.DatasetPipeline(
...     [
...         scaler,
...         pca,
...         dict(estimator=lda, fit_input=["data", "target"]),
...     ]
... )
>>> pipeline
DatasetPipeline(...)

The dictionaries are used to construct Block’s. You can checkout that class to see what options are possible.

Now let’s fit our pipeline with our xarray dataset. Ideally, we want this fit step be postponed until the we call dask.compute on our results. But this does not happen here which we will explain later.

>>> _ = pipeline.fit(dataset)

Now let’s call decision_function on our pipeline. What will be returned is a new dataset with the data variable changed to the output of lda.decision_function.

>>> ds = pipeline.decision_function(dataset)
>>> ds  
<xarray.Dataset>
Dimensions:  (sample: 150, c: 3)
Dimensions without coordinates: sample, c
Data variables:
   target   (sample) int64 dask.array<chunksize=(50,), meta=np.ndarray>
   data     (sample, c) float64 dask.array<chunksize=(50, 3), meta=np.ndarray>

To get the results as numpy arrays you can call .compute() on xarray or dask objects:

>>> ds.compute()  
<xarray.Dataset>
Dimensions:  (sample: 150, c: 3)
Dimensions without coordinates: sample, c
Data variables:
   target   (sample) int64 0 0 0 0 0 0 0 0 0 0 0 0 ... 2 2 2 2 2 2 2 2 2 2 2 2
   data     (sample, c) float64 28.42 -15.84 -59.68 20.69 ... -57.81 3.79 6.92

Our operations were not lazy here (you can’t see in the docs that it was not lazy but if you increase logging’s verbosity in your code, you will see) because we had unknown dimensions. Whenever DatasetPipeline is faced with an unknown dimension, it will compute all the computations till then (using dask.persist) and then continues with valid size dimensions. A workaround to this is to provide the feature size of each estimator to DatasetPipeline. You have to provide a list of size 2 tuples of (dim_name, dim_size). The dim_name must not overlap with what is already in the dataset unless it has the same size. If your estimator does not change the dimension sizes, you can provide None as dimension size and this will be inferred from the dataset. For new and unknown dimension sizes use np.nan.

>>> pipeline = mario.xr.DatasetPipeline(
...     [
...         # scaler output is the same size as input `feature`
...         dict(estimator=scaler, output_dims=[("feature", None)]),
...         # pca feature size is 3. You can't call this dimension `feature`
...         # because it's already in the input dataset with a different size
...         dict(estimator=pca, output_dims=[("pca_feature", 3)]),
...         # size of the output of lda.decision_function is 3
...         # because we have 3 classes in our problem
...         dict(estimator=lda, fit_input=["data", "target"], output_dims=[("class", 3)]),
...     ]
... )
>>> pipeline
DatasetPipeline(...)

>>> ds = pipeline.fit(dataset).decision_function(dataset)
>>> ds  
<xarray.Dataset>
Dimensions:  (sample: 150, class: 3)
Dimensions without coordinates: sample, class
Data variables:
   target   (sample) int64 dask.array<chunksize=(50,), meta=np.ndarray>
   data     (sample, class) float64 dask.array<chunksize=(50, 3), meta=np.ndarray>

This time nothing was computed. We can get the results by calling ds.compute():

>>> ds.compute()  
<xarray.Dataset>
Dimensions:  (sample: 150, class: 3)
Dimensions without coordinates: sample, class
Data variables:
   target   (sample) int64 0 0 0 0 0 0 0 0 0 0 0 0 ... 2 2 2 2 2 2 2 2 2 2 2 2
   data     (sample, class) float64 28.42 -15.84 -59.68 ... -57.81 3.79 6.92
>>> ds.data.data.visualize(format="svg")  

In the visualization of the dask graph below, you can see that dask is only executing scaler.transform and pca.transform once.

Dask graph of the sample-based computations

Checkpointing

We may want to checkpoint features or save our fitted estimators for later use. Also, we might want to do this checkpointing to inspect our features. Let’s add the key metadata to our dataset first:

>>> def load(i):
...     return iris.data[i]

>>> samples = [
...     mario.DelayedSample(partial(load, i), target=y, key=i)
...     for i, y in enumerate(iris.target)
... ]
>>> samples[0]
DelayedSample(target=0, key=0)

>>> # construct the meta from one sample
>>> meta = xr.DataArray(samples[0].data, dims=("feature"))
>>> dataset = mario.xr.samples_to_dataset(samples, npartitions=3, meta=meta)
>>> dataset  
<xarray.Dataset>
Dimensions:  (sample: 150, feature: 4)
Dimensions without coordinates: sample, feature
Data variables:
   target   (sample) int64 dask.array<chunksize=(50,), meta=np.ndarray>
   key      (sample) int64 dask.array<chunksize=(50,), meta=np.ndarray>
   data     (sample, feature) float64 dask.array<chunksize=(50, 4), meta=np.ndarray>

To checkpoint estimators, all we need is to provide a model_path or features_dir in our pipeline and it will checkpont the model or the features:

>>> import os

>>> scaler_model = os.path.join(tempdir, "scaler.pkl")
>>> pca_model = os.path.join(tempdir, "pca.pkl")
>>> pca_features = os.path.join(tempdir, "pca_features")
>>> lda_model = os.path.join(tempdir, "lda.pkl")

>>> pipeline = mario.xr.DatasetPipeline(
...     [
...         dict(estimator=scaler, output_dims=[("feature", None)],
...              model_path=scaler_model),
...         dict(estimator=pca, output_dims=[("pca_feature", 3)],
...              model_path=pca_model, features_dir=pca_features),
...         dict(estimator=lda, fit_input=["data", "target"], output_dims=[("class", 3)],
...              model_path=lda_model),
...     ]
... )

>>> ds = pipeline.fit(dataset).decision_function(dataset)
>>> ds.compute()  
<xarray.Dataset>
Dimensions:  (sample: 150, class: 3)
Dimensions without coordinates: sample, class
Data variables:
   target   (sample) int64 0 0 0 0 0 0 0 0 0 0 0 0 ... 2 2 2 2 2 2 2 2 2 2 2 2
   key      (sample) int64 0 1 2 3 4 5 6 7 ... 142 143 144 145 146 147 148 149
   data     (sample, class) float64 28.42 -15.84 -59.68 ... -57.81 3.79 6.92

Now if you repeat the operations, the checkpoints will be used:

>>> ds = pipeline.fit(dataset).decision_function(dataset)
>>> ds.compute()  
<xarray.Dataset>
Dimensions:  (sample: 150, class: 3)
Dimensions without coordinates: sample, class
Data variables:
   target   (sample) int64 0 0 0 0 0 0 0 0 0 0 0 0 ... 2 2 2 2 2 2 2 2 2 2 2 2
   key      (sample) int64 0 1 2 3 4 5 6 7 ... 142 143 144 145 146 147 148 149
   data     (sample, class) float64 28.42 -15.84 -59.68 ... -57.81 3.79 6.92

>>> ds.data.data.visualize(format="svg")  

In the visualization of the dask graph below, you can see that dask is only executing lda.decision_function and loading pca features and lda model from disk.

Dask graph of the checkpointed sample-based computations

Handling failed processing of Samples

Imagine that in your pipeline, you may want to drop some samples because you were unable to process them or they were low quality samples. DatasetPipeline provides a generic option for applying a function on the whole dataset. Instead of providing an estimator, you just provide a function that takes as input a dataset and returns a modified dataset. This mechanism can be used for example to drop samples. To signal that we have failed processing a sample, we will put np.nan in the data of our failed samples.

Imagine, the following PCA class that fails to process every other sample:

>>> import numpy as np
>>> class FailingPCA(PCA):
...     def transform(self, X):
...         Xt = super().transform(X)
...         Xt[::2] = np.nan
...         return Xt

Now in our pipeline, we want to drop nan samples after PCA transformations:

>>> failing_pca = FailingPCA(n_components=3, random_state=0)
>>> pipeline = mario.xr.DatasetPipeline(
...     [
...         dict(estimator=scaler, output_dims=[("feature", None)]),
...         dict(estimator=failing_pca, output_dims=[("pca_feature", 3)]),
...         dict(dataset_map=lambda x: x.persist().dropna("sample")),
...         dict(estimator=lda, fit_input=["data", "target"], output_dims=[("class", 3)]),
...     ]
... )
>>> ds = pipeline.fit(dataset).decision_function(dataset)
>>> ds.compute()  
<xarray.Dataset>
Dimensions:  (sample: 75, class: 3)
Dimensions without coordinates: sample, class
Data variables:
   target   (sample) int64 0 0 0 0 0 0 0 0 0 0 0 0 ... 2 2 2 2 2 2 2 2 2 2 2 2
   key      (sample) int64 1 3 5 7 9 11 13 15 ... 137 139 141 143 145 147 149
   data     (sample, class) float64 21.74 -13.45 -54.81 ... -58.76 4.178 8.07

You can see that we have 75 samples now instead of 150 samples. The dataset_map option is generic. You can apply any operation in this function.

Training on datasets larger than memory

Sometimes your dataset is larger than your machine’s memory and this prevents you to train (.fit) your estimator(s). Luckily since our data are already dask arrays, we can use dask-ml estimators to overcome this. When you provide dask-ml estimators, set input_dask_array as True.

>>> # It's always a good idea to shuffle the samples if you are doing
>>> # partial_fit.
>>> dataset = mario.xr.samples_to_dataset(
...     samples, npartitions=3, meta=meta, shuffle=True)

>>> from sklearn.linear_model import SGDClassifier
>>> import dask_ml.preprocessing, dask_ml.decomposition, dask_ml.wrappers
>>> # construct the estimators
>>> scaler = dask_ml.preprocessing.StandardScaler()
>>> pca = dask_ml.decomposition.PCA(n_components=3, random_state=0)
>>> clf = SGDClassifier(random_state=0, loss='log', penalty='l2', tol=1e-3)
>>> clf = dask_ml.wrappers.Incremental(clf, scoring="accuracy")

>>> pipeline = mario.xr.DatasetPipeline(
...     [
...         dict(
...             estimator=scaler,
...             output_dims=[("feature", None)],
...             input_dask_array=True,
...         ),
...         dict(
...             estimator=pca,
...             output_dims=[("pca_features", 3)],
...             input_dask_array=True,
...         ),
...         dict(
...             estimator=clf,
...             fit_input=["data", "target"],
...             output_dims=[],  # since we are going to call `.predict`
...             input_dask_array=True,
...             fit_kwargs=dict(classes=range(3)),
...         ),
...     ]
... )

>>> ds = pipeline.fit(dataset).predict(dataset)
>>> ds = ds.compute()
>>> correct_classification = np.array(ds.data == ds.target).sum()
>>> correct_classification > 85
True
>>> ds.dims == {"sample": 150}
True