Source code for bob.bio.base.pipelines.entry_points

import logging
import os

import dask.bag

from dask.delayed import Delayed

from bob.bio.base.pipelines import (
    BioAlgDaskWrapper,
    CSVScoreWriter,
    FourColumnsScoreWriter,
    PipelineScoreNorm,
    TNormScores,
    ZNormScores,
    checkpoint_pipeline_simple,
    dask_bio_pipeline,
    is_biopipeline_checkpointed,
)
from bob.pipelines import estimator_requires_fit, is_instance_nested, wrap
from bob.pipelines.distributed import dask_get_partition_size
from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster

logger = logging.getLogger(__name__)


def compute_scores(result, dask_client):
    if isinstance(result, Delayed) or isinstance(result, dask.bag.Bag):
        if dask_client is not None:
            result = result.compute(scheduler=dask_client)
        else:
            logger.warning(
                "`dask_client` not set. Your pipeline will run locally"
            )
            result = result.compute(scheduler="single-threaded")
    return result


def post_process_scores(pipeline, scores, path):
    written_scores = pipeline.write_scores(scores)
    return pipeline.post_process(written_scores, path)


[docs]def execute_pipeline_simple( pipeline, database, dask_client, groups, output, write_metadata_scores, checkpoint, dask_n_partitions, dask_partition_size, dask_n_workers, checkpoint_dir=None, force=False, ): """ Function that executes the PipelineSimple. This is called when using the ``bob bio pipeline simple`` command. This is also callable from a script without fear of interrupting the running Dask instance, allowing chaining multiple experiments while keeping the workers alive. When using Dask, something to keep in mind is that we want to split our data and processing time on multiple workers. There is no recipe to make everything work on any system. So if you encounter some balancing error (a few of all the available workers actually working while the rest waits, or the scheduler being overloaded trying to organise millions of tiny tasks), you can specify ``dask_n_partitions`` or ``dask_partition_size``. The first will try to split any set of data into a number of chunks (ideally, we would want one per worker), and the second creates similar-sized partitions in each set. If the memory on the workers is not sufficient, try reducing the size of the partitions (or increasing the number of partitions). Parameters ---------- pipeline: Instance of :py:class:`bob.bio.base.pipelines.PipelineSimple` A constructed PipelineSimple object. database: Instance of :py:class:`bob.bio.base.pipelines.abstract_class.Database` A database interface instance dask_client: instance of :py:class:`dask.distributed.Client` or ``None`` A Dask client instance used to run the experiment in parallel on multiple machines, or locally. Basic configs can be found in ``bob.pipelines.config.distributed``. dask_n_partitions: int or None Specifies a number of partitions to split the data into. dask_partition_size: int or None Specifies a data partition size when using dask. Ignored when dask_n_partitions is set. dask_n_workers: int or None Sets the starting number of Dask workers. Does not prevent Dask from requesting more or releasing workers depending on load. groups: list of str Groups of the dataset that will be requested from the database interface. output: str Path where the scores will be saved. write_metadata_scores: bool Use the CSVScoreWriter instead of the FourColumnScoreWriter when True. checkpoint: bool Whether checkpoint files will be created for every step of the pipelines. checkpoint_dir: str If `checkpoint` is set, this path will be used to save the checkpoints. If `None`, the content of `output` will be used. force: bool If set, it will force generate all the checkpoints of an experiment. This option doesn't work if `--memory` is set """ if not os.path.exists(output): os.makedirs(output, exist_ok=True) # Setting the `checkpoint_dir` if checkpoint_dir is None: checkpoint_dir = output else: os.makedirs(checkpoint_dir, exist_ok=True) # Scores are written on `output` if write_metadata_scores: pipeline.score_writer = CSVScoreWriter(os.path.join(output, "./tmp")) else: pipeline.score_writer = FourColumnsScoreWriter( os.path.join(output, "./tmp") ) # Checkpoint if it's already checkpointed if checkpoint and not is_biopipeline_checkpointed(pipeline): hash_fn = database.hash_fn if hasattr(database, "hash_fn") else None pipeline = checkpoint_pipeline_simple( pipeline, checkpoint_dir, hash_fn=hash_fn, force=force ) # Load the background model samples only if the transformer requires fitting if estimator_requires_fit(pipeline.transformer): background_model_samples = database.background_model_samples() else: background_model_samples = [] for group in groups: score_file_name = os.path.join( output, f"scores-{group}" + (".csv" if write_metadata_scores else ""), ) biometric_references = database.references(group=group) probes = database.probes(group=group) # If there's no data to be processed, continue if len(biometric_references) == 0 or len(probes) == 0: logger.warning( f"Current dataset ({database}) does not have `{group}` set. The experiment will not be executed." ) continue if dask_client is not None and not is_instance_nested( pipeline.biometric_algorithm, "biometric_algorithm", BioAlgDaskWrapper, ): # Scaling up if dask_n_workers is not None and not isinstance(dask_client, str): dask_client.cluster.scale(dask_n_workers) # Data partitioning. # - Too many small partitions: the scheduler takes more time scheduling # than the computations. # - Too few big partitions: We don't use all the available workers and thus # run slower. if dask_partition_size is not None: logger.debug( f"Splitting data with fixed size partitions: {dask_partition_size}." ) pipeline = dask_bio_pipeline( pipeline, partition_size=dask_partition_size, ) elif dask_n_partitions is not None or dask_n_workers is not None: # Divide each Set in a user-defined number of partitions logger.debug("Splitting data with fixed number of partitions.") pipeline = dask_bio_pipeline( pipeline, npartitions=dask_n_partitions or dask_n_workers, ) else: # Split in max_jobs partitions or revert to the default behavior of # dask.Bag from_sequence: partition_size = 100 n_jobs = None if not isinstance(dask_client, str) and isinstance( dask_client.cluster, SGEMultipleQueuesCluster ): logger.debug( "Splitting data according to the number of available workers." ) n_jobs = dask_client.cluster.sge_job_spec["default"][ "max_jobs" ] logger.debug(f"{n_jobs} partitions will be created.") pipeline = dask_bio_pipeline(pipeline, npartitions=n_jobs) logger.info(f"Running the PipelineSimple for group {group}") score_all_vs_all = ( database.score_all_vs_all if hasattr(database, "score_all_vs_all") else False ) result = pipeline( background_model_samples, biometric_references, probes, score_all_vs_all=score_all_vs_all, ) post_processed_scores = post_process_scores( pipeline, result, score_file_name ) compute_scores(post_processed_scores, dask_client)
[docs]def execute_pipeline_score_norm( pipeline, database, dask_client, groups, output, write_metadata_scores, checkpoint, dask_partition_size, dask_n_workers, checkpoint_dir=None, top_norm=False, top_norm_score_fraction=0.8, score_normalization_type="znorm", force=False, ): """ Function that extends the capabilities of the PipelineSimple to run score normalization. This is called when using the ``bob bio pipeline score-norm`` command. This is also callable from a script without fear of interrupting the running Dask instance, allowing chaining multiple experiments while keeping the workers alive. Parameters ---------- pipeline: Instance of :py:class:`bob.bio.base.pipelines.PipelineSimple` A constructed PipelineSimple object. database: Instance of :py:class:`bob.bio.base.pipelines.abstract_class.Database` A database interface instance dask_client: instance of :py:class:`dask.distributed.Client` or ``None`` A Dask client instance used to run the experiment in parallel on multiple machines, or locally. Basic configs can be found in ``bob.pipelines.config.distributed``. groups: list of str Groups of the dataset that will be requested from the database interface. output: str Path where the results and checkpoints will be saved to. write_metadata_scores: bool Use the CSVScoreWriter instead of the FourColumnScoreWriter when True. checkpoint: bool Whether checkpoint files will be created for every step of the pipelines. dask_partition_size: int If using Dask, this option defines the size of each dask.bag.partition. Use this option if the current heuristic that sets this value doesn't suit your experiment. (https://docs.dask.org/en/latest/bag-api.html?highlight=partition_size#dask.bag.from_sequence). dask_n_workers: int If using Dask, this option defines the number of workers to start your experiment. Dask automatically scales up/down the number of workers due to the current load of tasks to be solved. Use this option if the current amount of workers set to start an experiment doesn't suit you. top_norm: bool top_norm_score_fraction: float Sets the percentage of samples used for t-norm and z-norm. Sometimes you don't want to use all the t/z samples for normalization checkpoint_dir: str If `checkpoint` is set, this path will be used to save the checkpoints. If `None`, the content of `output` will be used. """ if not os.path.exists(output): os.makedirs(output, exist_ok=True) # Setting the `checkpoint_dir` if checkpoint_dir is None: checkpoint_dir = output else: os.makedirs(checkpoint_dir, exist_ok=True) # Scores are written on `output` if write_metadata_scores: pipeline.score_writer = CSVScoreWriter(os.path.join(output, "./tmp")) else: pipeline.score_writer = FourColumnsScoreWriter( os.path.join(output, "./tmp") ) # Check if it's already checkpointed if checkpoint and not is_biopipeline_checkpointed(pipeline): pipeline = checkpoint_pipeline_simple( pipeline, checkpoint_dir, force=force ) # PICKING THE TYPE OF POST-PROCESSING if score_normalization_type == "znorm": post_processor = ZNormScores( top_norm=top_norm, top_norm_score_fraction=top_norm_score_fraction, ) elif score_normalization_type == "tnorm": post_processor = TNormScores( top_norm=top_norm, top_norm_score_fraction=top_norm_score_fraction, ) else: raise ValueError( f"score_normalization_type {score_normalization_type} is not valid" ) if checkpoint and not is_biopipeline_checkpointed(post_processor): model_path = os.path.join( checkpoint_dir, f"{score_normalization_type}-scores", "norm", "stats.pkl", ) # we cannot checkpoint "features" because sample.keys are not unique. post_processor = wrap( ["checkpoint"], post_processor, model_path=model_path, force=force ) pipeline = PipelineScoreNorm(pipeline, post_processor) background_model_samples = database.background_model_samples() # treferences = database.treferences(proportion=ztnorm_cohort_proportion) for group in groups: if score_normalization_type == "znorm": score_normalization_samples = database.zprobes(group=group) elif score_normalization_type == "tnorm": score_normalization_samples = database.treferences() score_file_name = os.path.join(output, f"scores-{group}") biometric_references = database.references(group=group) probes = database.probes(group=group) # If there's no data to be processed, continue if len(biometric_references) == 0 or len(probes) == 0: logger.warning( f"Current dataset ({database}) does not have `{group}` set. The experiment will not be executed." ) continue if dask_client is not None and not is_instance_nested( pipeline.biometric_algorithm, "biometric_algorithm", BioAlgDaskWrapper, ): # Scaling up if dask_n_workers is not None and not isinstance(dask_client, str): dask_client.cluster.scale(dask_n_workers) n_objects = max( len(background_model_samples), len(biometric_references), len(probes), ) partition_size = None if not isinstance(dask_client, str): partition_size = dask_get_partition_size( dask_client.cluster, n_objects ) if dask_partition_size is not None: partition_size = dask_partition_size pipeline = dask_bio_pipeline( pipeline, partition_size=partition_size, ) logger.info(f"Running PipelineSimple for group {group}") score_all_vs_all = ( database.score_all_vs_all if hasattr(database, "score_all_vs_all") else False ) (raw_scores, score_normed_scores,) = pipeline( background_model_samples, biometric_references, probes, score_normalization_samples, score_all_vs_all=score_all_vs_all, ) def _build_filename(score_file_name, suffix): return os.path.join(score_file_name, suffix) # Running RAW_SCORES raw_scores = post_process_scores( pipeline, raw_scores, _build_filename(score_file_name, "raw_scores.csv"), ) _ = compute_scores(raw_scores, dask_client) # Z-SCORES score_normed_scores = post_process_scores( pipeline, score_normed_scores, _build_filename(score_file_name, f"{score_normalization_type}.csv"), ) _ = compute_scores(score_normed_scores, dask_client) # T-SCORES """ t_normed_scores = post_process_scores( pipeline, t_normed_scores, _build_filename(score_file_name, "t_normed_scores.csv"), ) _ = compute_scores(t_normed_scores, dask_client) # S-SCORES s_normed_scores = post_process_scores( pipeline, s_normed_scores, _build_filename(score_file_name, "s_normed_scores.csv"), ) _ = compute_scores(s_normed_scores, dask_client) # ZT-SCORES zt_normed_scores = post_process_scores( pipeline, zt_normed_scores, _build_filename(score_file_name, "zt_normed_scores.csv"), ) _ = compute_scores(zt_normed_scores, dask_client) """