Coverage for src/bob/bio/base/pipelines/entry_points.py: 69%
185 statements
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-14 21:41 +0100
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-14 21:41 +0100
1import glob
2import logging
3import os
4import pickle
5import random
7from typing import Optional, Union
9import dask.bag
10import dask.distributed
12from dask.delayed import Delayed
13from sklearn.pipeline import Pipeline
15from bob.bio.base.pipelines import (
16 BioAlgDaskWrapper,
17 CSVScoreWriter,
18 Database,
19 FourColumnsScoreWriter,
20 PipelineScoreNorm,
21 PipelineSimple,
22 TNormScores,
23 ZNormScores,
24 checkpoint_pipeline_simple,
25 dask_bio_pipeline,
26 is_biopipeline_checkpointed,
27)
28from bob.pipelines import (
29 DaskWrapper,
30 estimator_requires_fit,
31 is_instance_nested,
32 wrap,
33)
34from bob.pipelines.distributed import dask_get_partition_size
35from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster
37logger = logging.getLogger(__name__)
40def compute_scores(result, dask_client):
41 if isinstance(result, Delayed) or isinstance(result, dask.bag.Bag):
42 if dask_client is not None:
43 result = result.compute(scheduler=dask_client)
44 else:
45 logger.warning(
46 "`dask_client` not set. Your pipeline will run locally"
47 )
48 result = result.compute(scheduler="single-threaded")
49 return result
52def post_process_scores(pipeline, scores, path):
53 written_scores = pipeline.write_scores(scores)
54 return pipeline.post_process(written_scores, path)
57def execute_pipeline_simple(
58 pipeline,
59 database,
60 dask_client,
61 groups,
62 output,
63 write_metadata_scores,
64 checkpoint,
65 dask_n_partitions,
66 dask_partition_size,
67 dask_n_workers,
68 checkpoint_dir=None,
69 force=False,
70):
71 """
72 Function that executes the PipelineSimple.
74 This is called when using the ``bob bio pipeline simple``
75 command.
77 This is also callable from a script without fear of interrupting the running
78 Dask instance, allowing chaining multiple experiments while keeping the
79 workers alive.
81 When using Dask, something to keep in mind is that we want to split our data and
82 processing time on multiple workers. There is no recipe to make everything work on
83 any system. So if you encounter some balancing error (a few of all the available
84 workers actually working while the rest waits, or the scheduler being overloaded
85 trying to organize millions of tiny tasks), you can specify ``dask_n_partitions``
86 or ``dask_partition_size``.
87 The first will try to split any set of data into a number of chunks (ideally, we
88 would want one per worker), and the second creates similar-sized partitions in each
89 set.
90 If the memory on the workers is not sufficient, try reducing the size of the
91 partitions (or increasing the number of partitions).
93 Parameters
94 ----------
96 pipeline: Instance of :py:class:`bob.bio.base.pipelines.PipelineSimple`
97 A constructed PipelineSimple object.
99 database: Instance of :py:class:`bob.bio.base.pipelines.abstract_class.Database`
100 A database interface instance
102 dask_client: instance of :py:class:`dask.distributed.Client` or ``None``
103 A Dask client instance used to run the experiment in parallel on multiple
104 machines, or locally.
105 Basic configs can be found in ``bob.pipelines.config.distributed``.
107 dask_n_partitions: int or None
108 Specifies a number of partitions to split the data into.
110 dask_partition_size: int or None
111 Specifies a data partition size when using dask. Ignored when dask_n_partitions
112 is set.
114 dask_n_workers: int or None
115 Sets the starting number of Dask workers. Does not prevent Dask from requesting
116 more or releasing workers depending on load.
118 groups: list of str
119 Groups of the dataset that will be requested from the database interface.
121 output: str
122 Path where the scores will be saved.
124 write_metadata_scores: bool
125 Use the CSVScoreWriter instead of the FourColumnScoreWriter when True.
127 checkpoint: bool
128 Whether checkpoint files will be created for every step of the pipelines.
130 checkpoint_dir: str
131 If `checkpoint` is set, this path will be used to save the checkpoints.
132 If `None`, the content of `output` will be used.
134 force: bool
135 If set, it will force generate all the checkpoints of an experiment. This option doesn't work if `--memory` is set
136 """
137 if not os.path.exists(output):
138 os.makedirs(output, exist_ok=True)
140 # Setting the `checkpoint_dir`
141 if checkpoint_dir is None:
142 checkpoint_dir = output
143 else:
144 os.makedirs(checkpoint_dir, exist_ok=True)
146 # Scores are written on `output`
147 if write_metadata_scores:
148 pipeline.score_writer = CSVScoreWriter(os.path.join(output, "./tmp"))
149 else:
150 pipeline.score_writer = FourColumnsScoreWriter(
151 os.path.join(output, "./tmp")
152 )
154 # Checkpoint if it's not already checkpointed
155 if checkpoint and not is_biopipeline_checkpointed(pipeline):
156 hash_fn = database.hash_fn if hasattr(database, "hash_fn") else None
157 pipeline = checkpoint_pipeline_simple(
158 pipeline, checkpoint_dir, hash_fn=hash_fn, force=force
159 )
161 # Load the background model samples only if the transformer requires fitting
162 if estimator_requires_fit(pipeline.transformer):
163 background_model_samples = database.background_model_samples()
164 else:
165 background_model_samples = []
167 for group in groups:
168 score_file_name = os.path.join(
169 output,
170 f"scores-{group}" + (".csv" if write_metadata_scores else ""),
171 )
172 biometric_references = database.references(group=group)
173 probes = database.probes(group=group)
175 # If there's no data to be processed, continue
176 if len(biometric_references) == 0 or len(probes) == 0:
177 logger.warning(
178 f"Current dataset ({database}) does not have `{group}` set. The experiment will not be executed."
179 )
180 continue
182 if dask_client is not None and not is_instance_nested(
183 pipeline.biometric_algorithm,
184 "biometric_algorithm",
185 BioAlgDaskWrapper,
186 ):
187 # Scaling up
188 if dask_n_workers is not None and not isinstance(dask_client, str):
189 dask_client.cluster.scale(dask_n_workers)
191 # Data partitioning.
192 # - Too many small partitions: the scheduler takes more time scheduling
193 # than the computations.
194 # - Too few big partitions: We don't use all the available workers and thus
195 # run slower.
196 if dask_partition_size is not None:
197 logger.debug(
198 f"Splitting data with fixed size partitions: {dask_partition_size}."
199 )
200 pipeline = dask_bio_pipeline(
201 pipeline,
202 partition_size=dask_partition_size,
203 )
204 elif dask_n_partitions is not None or dask_n_workers is not None:
205 # Divide each Set in a user-defined number of partitions
206 n_partitions = dask_n_partitions or dask_n_workers
207 logger.debug(
208 f"Splitting data with fixed number of partitions: {n_partitions}."
209 )
210 pipeline = dask_bio_pipeline(pipeline, npartitions=n_partitions)
211 else:
212 # Split in max_jobs partitions or revert to the default behavior of
213 # dask.Bag from_sequence: partition_size = 100
214 n_jobs = None
215 if not isinstance(dask_client, str) and isinstance(
216 dask_client.cluster, SGEMultipleQueuesCluster
217 ):
218 logger.debug(
219 "Splitting data according to the number of available workers."
220 )
221 n_jobs = dask_client.cluster.sge_job_spec["default"][
222 "max_jobs"
223 ]
224 logger.debug(f"{n_jobs} partitions will be created.")
225 pipeline = dask_bio_pipeline(pipeline, npartitions=n_jobs)
227 logger.info(f"Running the PipelineSimple for group {group}")
228 score_all_vs_all = (
229 database.score_all_vs_all
230 if hasattr(database, "score_all_vs_all")
231 else False
232 )
234 result = pipeline(
235 background_model_samples,
236 biometric_references,
237 probes,
238 score_all_vs_all=score_all_vs_all,
239 )
241 post_processed_scores = post_process_scores(
242 pipeline, result, score_file_name
243 )
244 compute_scores(post_processed_scores, dask_client)
247def execute_pipeline_score_norm(
248 pipeline,
249 database,
250 dask_client,
251 groups,
252 output,
253 write_metadata_scores,
254 checkpoint,
255 dask_partition_size,
256 dask_n_workers,
257 checkpoint_dir=None,
258 top_norm=False,
259 top_norm_score_fraction=0.8,
260 score_normalization_type="znorm",
261 force=False,
262):
263 """
264 Function that extends the capabilities of the PipelineSimple to run score normalization.
266 This is called when using the ``bob bio pipeline score-norm`` command.
268 This is also callable from a script without fear of interrupting the running
269 Dask instance, allowing chaining multiple experiments while keeping the
270 workers alive.
272 Parameters
273 ----------
275 pipeline: Instance of :py:class:`bob.bio.base.pipelines.PipelineSimple`
276 A constructed PipelineSimple object.
278 database: Instance of :py:class:`bob.bio.base.pipelines.abstract_class.Database`
279 A database interface instance
281 dask_client: instance of :py:class:`dask.distributed.Client` or ``None``
282 A Dask client instance used to run the experiment in parallel on multiple machines, or locally. Basic configs can be found in ``bob.pipelines.config.distributed``.
284 groups: list of str
285 Groups of the dataset that will be requested from the database interface.
287 output: str
288 Path where the results and checkpoints will be saved to.
290 write_metadata_scores: bool
291 Use the CSVScoreWriter instead of the FourColumnScoreWriter when True.
293 checkpoint: bool
294 Whether checkpoint files will be created for every step of the pipelines.
296 dask_partition_size: int
297 If using Dask, this option defines the size of each dask.bag.partition. Use this option if the current heuristic that sets this value doesn't suit your experiment. (https://docs.dask.org/en/latest/bag-api.html?highlight=partition_size#dask.bag.from_sequence).
299 dask_n_workers: int
300 If using Dask, this option defines the number of workers to start your experiment. Dask automatically scales up/down the number of workers due to the current load of tasks to be solved. Use this option if the current amount of workers set to start an experiment doesn't suit you.
302 top_norm: bool
304 top_norm_score_fraction: float
305 Sets the percentage of samples used for t-norm and z-norm. Sometimes you don't want to use all the t/z samples for normalization
307 checkpoint_dir: str
308 If `checkpoint` is set, this path will be used to save the checkpoints.
309 If `None`, the content of `output` will be used.
311 """
313 if not os.path.exists(output):
314 os.makedirs(output, exist_ok=True)
316 # Setting the `checkpoint_dir`
317 if checkpoint_dir is None:
318 checkpoint_dir = output
319 else:
320 os.makedirs(checkpoint_dir, exist_ok=True)
322 # Scores are written on `output`
323 if write_metadata_scores:
324 pipeline.score_writer = CSVScoreWriter(os.path.join(output, "./tmp"))
325 else:
326 pipeline.score_writer = FourColumnsScoreWriter(
327 os.path.join(output, "./tmp")
328 )
330 # Check if it's already checkpointed
331 if checkpoint and not is_biopipeline_checkpointed(pipeline):
332 pipeline = checkpoint_pipeline_simple(
333 pipeline, checkpoint_dir, force=force
334 )
336 # PICKING THE TYPE OF POST-PROCESSING
337 if score_normalization_type == "znorm":
338 post_processor = ZNormScores(
339 top_norm=top_norm,
340 top_norm_score_fraction=top_norm_score_fraction,
341 )
342 elif score_normalization_type == "tnorm":
343 post_processor = TNormScores(
344 top_norm=top_norm,
345 top_norm_score_fraction=top_norm_score_fraction,
346 )
347 else:
348 raise ValueError(
349 f"score_normalization_type {score_normalization_type} is not valid"
350 )
352 if checkpoint and not is_biopipeline_checkpointed(post_processor):
353 score_stats_path = os.path.join(
354 checkpoint_dir,
355 f"{score_normalization_type}-scores",
356 "norm",
357 "stats",
358 )
359 # we cannot checkpoint "features" because sample.keys are not unique.
360 post_processor = wrap(
361 ["checkpoint"],
362 post_processor,
363 model_path=score_stats_path,
364 force=force,
365 )
367 pipeline = PipelineScoreNorm(pipeline, post_processor)
369 background_model_samples = database.background_model_samples()
371 # treferences = database.treferences(proportion=ztnorm_cohort_proportion)
372 for group in groups:
373 # Changing the score normalization stats file name as a function of the group
374 if checkpoint and not is_biopipeline_checkpointed(post_processor):
375 post_processor.model_path = f"{score_stats_path}_{group}.pkl"
377 if score_normalization_type == "znorm":
378 score_normalization_samples = database.zprobes(group=group)
379 elif score_normalization_type == "tnorm":
380 score_normalization_samples = database.treferences()
382 score_file_name = os.path.join(output, f"scores-{group}")
384 biometric_references = database.references(group=group)
385 probes = database.probes(group=group)
387 # If there's no data to be processed, continue
388 if len(biometric_references) == 0 or len(probes) == 0:
389 logger.warning(
390 f"Current dataset ({database}) does not have `{group}` set. The experiment will not be executed."
391 )
392 continue
394 if dask_client is not None and not is_instance_nested(
395 pipeline.biometric_algorithm,
396 "biometric_algorithm",
397 BioAlgDaskWrapper,
398 ):
399 # Scaling up
400 if dask_n_workers is not None and not isinstance(dask_client, str):
401 dask_client.cluster.scale(dask_n_workers)
403 n_objects = max(
404 len(background_model_samples),
405 len(biometric_references),
406 len(probes),
407 )
408 partition_size = None
409 if not isinstance(dask_client, str):
410 partition_size = dask_get_partition_size(
411 dask_client.cluster, n_objects
412 )
413 if dask_partition_size is not None:
414 partition_size = dask_partition_size
416 pipeline = dask_bio_pipeline(
417 pipeline,
418 partition_size=partition_size,
419 )
421 logger.info(f"Running PipelineSimple for group {group}")
422 score_all_vs_all = (
423 database.score_all_vs_all
424 if hasattr(database, "score_all_vs_all")
425 else False
426 )
428 (
429 raw_scores,
430 score_normed_scores,
431 ) = pipeline(
432 background_model_samples,
433 biometric_references,
434 probes,
435 score_normalization_samples,
436 score_all_vs_all=score_all_vs_all,
437 )
439 def _build_filename(score_file_name, suffix):
440 return os.path.join(score_file_name, suffix)
442 # Running RAW_SCORES
444 raw_scores = post_process_scores(
445 pipeline,
446 raw_scores,
447 _build_filename(score_file_name, "raw_scores.csv"),
448 )
449 _ = compute_scores(raw_scores, dask_client)
451 # Z-SCORES
452 score_normed_scores = post_process_scores(
453 pipeline,
454 score_normed_scores,
455 _build_filename(score_file_name, f"{score_normalization_type}.csv"),
456 )
457 _ = compute_scores(score_normed_scores, dask_client)
459 # T-SCORES
460 """
461 t_normed_scores = post_process_scores(
462 pipeline,
463 t_normed_scores,
464 _build_filename(score_file_name, "t_normed_scores.csv"),
465 )
466 _ = compute_scores(t_normed_scores, dask_client)
468 # S-SCORES
469 s_normed_scores = post_process_scores(
470 pipeline,
471 s_normed_scores,
472 _build_filename(score_file_name, "s_normed_scores.csv"),
473 )
474 _ = compute_scores(s_normed_scores, dask_client)
476 # ZT-SCORES
477 zt_normed_scores = post_process_scores(
478 pipeline,
479 zt_normed_scores,
480 _build_filename(score_file_name, "zt_normed_scores.csv"),
481 )
482 _ = compute_scores(zt_normed_scores, dask_client)
483 """
486def execute_pipeline_train(
487 pipeline: Union[PipelineSimple, Pipeline],
488 database: Database,
489 dask_client: Optional[dask.distributed.Client] = None,
490 output: str = "./results",
491 checkpoint: bool = True,
492 dask_n_partitions: Optional[int] = None,
493 dask_partition_size: Optional[int] = None,
494 dask_n_workers: Optional[int] = None,
495 checkpoint_dir: Optional[str] = None,
496 force: bool = False,
497 split_training: bool = False,
498 n_splits: int = 3,
499 **kwargs,
500):
501 """Executes only the training part of a pipeline.
503 When running from a script, use this function instead of the click command in
504 ``bob.bio.base.script.pipeline_train``.
506 Parameters
507 ----------
509 pipeline:
510 A constructed ``PipelineSimple`` object (the ``transformer`` will be extracted
511 for training), or an ``sklearn.Pipeline``.
513 database:
514 A database interface instance
516 dask_client:
517 A Dask client instance used to run the experiment in parallel on multiple
518 machines, or locally.
519 Basic configs can be found in ``bob.pipelines.config.distributed``.
521 dask_n_partitions:
522 Specifies a number of partitions to split the data into.
524 dask_partition_size:
525 Specifies a data partition size when using dask. Ignored when dask_n_partitions
526 is set.
528 dask_n_workers:
529 Sets the starting number of Dask workers. Does not prevent Dask from requesting
530 more or releasing workers depending on load.
532 output:
533 Path where the scores will be saved.
535 checkpoint:
536 Whether checkpoint files will be created for every step of the pipelines.
538 checkpoint_dir:
539 If `checkpoint` is set, this path will be used to save the checkpoints.
540 If `None`, the content of `output` will be used.
542 force:
543 If set, it will force generate all the checkpoints of an experiment. This option doesn't work if `--memory` is set
545 split_training:
546 If set, the background model will be trained on multiple partitions of the data.
548 n_splits:
549 Number of splits to use when splitting the data.
550 """
552 logger.debug(f"Unused arguments: {kwargs=}")
553 if not os.path.exists(output):
554 os.makedirs(output, exist_ok=True)
556 # Setting the `checkpoint_dir`
557 if checkpoint_dir is None:
558 checkpoint_dir = output
559 else:
560 os.makedirs(checkpoint_dir, exist_ok=True)
562 if isinstance(pipeline, PipelineSimple):
563 pipeline = pipeline.transformer
565 # Checkpoint (only features, not the model)
566 if checkpoint:
567 hash_fn = database.hash_fn if hasattr(database, "hash_fn") else None
568 wrap(
569 ["checkpoint"],
570 pipeline,
571 features_dir=checkpoint_dir,
572 model_path=None,
573 hash_fn=hash_fn,
574 force=force,
575 )
577 if not estimator_requires_fit(pipeline):
578 raise ValueError(
579 "Estimator does not require fitting. No training necessary."
580 )
582 background_model_samples = database.background_model_samples()
584 if dask_client is not None:
585 # Scaling up
586 if dask_n_workers is not None and not isinstance(dask_client, str):
587 dask_client.cluster.scale(dask_n_workers)
589 if dask_partition_size is not None:
590 logger.debug(
591 f"Splitting data with fixed size partitions: {dask_partition_size}."
592 )
593 pipeline = wrap(
594 ["dask"], pipeline, partition_size=dask_partition_size
595 )
596 elif dask_n_partitions is not None or dask_n_workers is not None:
597 # Divide each Set in a user-defined number of partitions
598 n_partitions = dask_n_partitions or dask_n_workers
599 logger.debug(
600 f"Splitting data with fixed number of partitions: {n_partitions}."
601 )
602 pipeline = wrap(["dask"], pipeline, npartitions=n_partitions)
603 else:
604 # Split in max_jobs partitions or revert to the default behavior of
605 # dask.Bag from_sequence: partition_size = 100
606 n_jobs = None
607 if not isinstance(dask_client, str) and isinstance(
608 dask_client.cluster, SGEMultipleQueuesCluster
609 ):
610 logger.debug(
611 "Splitting data according to the number of available workers."
612 )
613 n_jobs = dask_client.cluster.sge_job_spec["default"]["max_jobs"]
614 logger.debug(f"{n_jobs} partitions will be created.")
615 pipeline = wrap(["dask"], pipeline, npartitions=n_jobs)
617 logger.info("Running the pipeline training")
618 if split_training:
619 start_step = -1
620 # Look at step files, and assess if we can load the last one
621 for step_file in glob.glob(
622 os.path.join(output, "train_pipeline_step_*.pkl")
623 ):
624 to_rem = os.path.join(output, "train_pipeline_step_")
625 file_step = int(step_file.replace(to_rem, "").replace(".pkl", ""))
626 start_step = max(start_step, file_step)
627 if start_step > -1:
628 logger.debug("Found pipeline training step. Loading it.")
629 last_step_file = os.path.join(
630 output, f"train_pipeline_step_{start_step}.pkl"
631 )
632 with open(last_step_file, "rb") as start_file:
633 pipeline = pickle.load(start_file)
634 start_step += 1 # Loaded step is i -> training starts a i+1
635 logger.info(f"Starting from training step {start_step}")
637 random.seed(0)
638 random.shuffle(background_model_samples)
640 for partition_i in range(start_step, n_splits):
641 logger.info(
642 f"Training with partition {partition_i} ({partition_i+1}/{n_splits})"
643 )
644 start = len(background_model_samples) // n_splits * partition_i
645 end = len(background_model_samples) // n_splits * (partition_i + 1)
646 _ = pipeline.fit(background_model_samples[start:end])
647 step_path = os.path.join(
648 output, f"train_pipeline_step_{partition_i}.pkl"
649 )
650 with open(step_path, "wb") as f:
651 pickle.dump(pipeline, f)
652 else:
653 _ = pipeline.fit(background_model_samples)
655 # Save each fitted transformer
656 for transformer_name, transformer in pipeline.steps:
657 if transformer._get_tags()["requires_fit"]:
658 if isinstance(transformer, DaskWrapper):
659 transformer = transformer.estimator
660 step_path = os.path.join(output, f"{transformer_name}.pkl")
661 with open(step_path, "wb") as f:
662 pickle.dump(transformer, f)