Coverage for src/bob/bio/base/script/pipeline_transform.py: 0%
33 statements
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-14 21:41 +0100
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-14 21:41 +0100
1#!/usr/bin/env python
2# vim: set fileencoding=utf-8 :
3# Tiago de Freitas Pereira <tiago.pereira@idiap.ch>
6import logging
8import click
10from clapper.click import ConfigCommand, ResourceOption, verbosity_option
12from bob.pipelines import is_pipeline_wrapped
13from bob.pipelines.distributed import (
14 VALID_DASK_CLIENT_STRINGS,
15 dask_get_partition_size,
16)
18logger = logging.getLogger(__name__)
19from bob.pipelines.wrappers import CheckpointWrapper, DaskWrapper, wrap
21EPILOG = """\b
22Command line examples\n
23---------------------\n
26Follow below an example on how to extract arcface features from a database:\n
28`bob bio transform my_database iresnet100 -vv`\n\n
30To "dask" the execution of the pipeline, you can use the `--dask-client` option.\n
31In the example below we show how to use the `--dask-client` option to start a dask cluster on SGE.\n
33`bob bio transform my_database iresnet100 --dask-client sge -vv`\n\n
35\b
37Creating my own transformer\n
38---------------------------\n
40This command accepts configuration file as input.\n
41For example, if you desire to customize your transformer, you can use the following configuration file:\n
42\b\b
45```py\n
46from sklearn.base import BaseEstimator, TransformerMixin \n
47from sklearn.pipeline import make_pipeline \n
48from bob.pipelines import wrap \n
50class MyTransformer(TransformerMixin, BaseEstimator): \n
51 def _more_tags(self): \n
52 return {"requires_fit": False} \n
54 def transform(self, X): \n
55 # do something \n
56 return X \n
58transformer = wrap(["sample"],make_pipeline(MyTransformer())) \n
59```
61Then, you can use above configuration file to run the command:
63\b
64`bob bio pipelines transform my_database my_transformer.py --dask-client sge -vv`
66\b\b
68Leveraging from FunctionTransformer\n
69-----------------------------------\n
71You can also benefit from `FunctionTransformer` to create a transformer
73```py \n
74from sklearn.preprocessing import FunctionTransformer \n
75from sklearn.pipeline import make_pipeline \n
76from bob.pipelines import wrap \n
78\b
80def my_transformer(X): \n
81 # do something \n
82 return X \n
85transformer = wrap(["sample"],make_pipeline(FunctionTransformer(my_transformer))) \n
86```
88Then, you can use above configuration file to run the command:\n
90`bob bio pipelines transform my_database my_transformer.py --dask-client sge -vv`
93\b\b
94"""
97@click.command(
98 name="transform",
99 entry_point_group="bob.bio.config",
100 cls=ConfigCommand,
101 epilog=EPILOG,
102)
103@click.option(
104 "--transformer",
105 "-t",
106 required=True,
107 entry_point_group="bob.bio.transformer",
108 help="A scikit-learn Pipeline containing the set of transformations",
109 cls=ResourceOption,
110)
111@click.option(
112 "--database",
113 "-d",
114 entry_point_group="bob.bio.database",
115 required=True,
116 help="Biometric Database connector (class that implements the methods: `background_model_samples`, `references` and `probes`)",
117 cls=ResourceOption,
118)
119@click.option(
120 "--dask-client",
121 "-l",
122 entry_point_group="dask.client",
123 string_exceptions=VALID_DASK_CLIENT_STRINGS,
124 default="single-threaded",
125 help="Dask client for the execution of the pipeline.",
126 cls=ResourceOption,
127)
128@click.option(
129 "-c",
130 "--checkpoint-dir",
131 show_default=True,
132 default="./checkpoints",
133 help="Name of output directory where the checkpoints will be saved.",
134 cls=ResourceOption,
135)
136@click.option(
137 "--force",
138 "-f",
139 is_flag=True,
140 help="If set, it will force generate all the checkpoints of an experiment. This option doesn't work if `--memory` is set",
141 cls=ResourceOption,
142)
143@click.option(
144 "--dask-partition-size",
145 "-s",
146 help="If using Dask, this option defines the size of each dask.bag.partition."
147 "Use this option if the current heuristic that sets this value doesn't suit your experiment."
148 "(https://docs.dask.org/en/latest/bag-api.html?highlight=partition_size#dask.bag.from_sequence).",
149 default=None,
150 type=click.INT,
151 cls=ResourceOption,
152)
153@verbosity_option(cls=ResourceOption, logger=logger)
154def pipeline_transform(
155 transformer,
156 database,
157 dask_client,
158 checkpoint_dir,
159 force,
160 dask_partition_size,
161 **kwargs,
162):
163 """
164 This CLI command will execute a pipeline (a scikit learn Pipeline) on a given database.
166 This command can be used, for example, to extract features, face-crops, preprocess audio files and so on.
168 """
170 logger.info(f"Transforming samples from {database}")
172 # save_func = bob.io.base.save
174 # Idiap SETUP. This avoids having directories with more than 1000 files/directories
175 hash_fn = database.hash_fn if hasattr(database, "hash_fn") else None
177 # If NONE of the items are checkpointed, we checkpoint them all
178 if not any(is_pipeline_wrapped(transformer, CheckpointWrapper)):
179 logger.info("Checkpointing it")
180 transformer = wrap(
181 ["checkpoint"],
182 transformer,
183 features_dir=checkpoint_dir,
184 hash_fn=hash_fn,
185 force=force,
186 )
187 else:
188 # If there is only one item that is checkpointed, we don't need to wrap the pipeline
189 logger.warning(
190 f"{transformer}"
191 f"The pipeline contains elements that are already checkpointed."
192 "Hence, we are not checkpointing them again."
193 )
195 # Fetching all samples
196 samples = database.all_samples()
198 # The number of dasked elements has to be the number of
199 # elements in the pipeline - 1 (the ToDaskBag doesn't count)
200 dasked_elements = is_pipeline_wrapped(transformer, DaskWrapper)
202 if any(dasked_elements):
203 logger.warning(
204 "The pipeline is already dasked, hence, we are not dasking it again."
205 )
206 else:
207 if not isinstance(dask_client, str):
208 dask_partition_size = (
209 dask_get_partition_size(
210 dask_client.cluster, len(samples), lower_bound=200
211 )
212 if dask_partition_size is None
213 else dask_partition_size
214 )
216 logger.info(
217 f"Dask wrapping it with partition size {dask_partition_size}"
218 )
219 transformer = wrap(
220 ["dask"], transformer, partition_size=dask_partition_size
221 )
223 transformer.transform(samples).compute(
224 scheduler="single-threaded" if dask_client is None else dask_client
225 )
227 logger.info("Transformation finished !")