Source code for bob.bio.base.pipelines.vanilla_biometrics.wrappers

from bob.pipelines import (
    DelayedSample,
    SampleSet,
    Sample,
    DelayedSampleSet,
    DelayedSampleSetCached,
)
import bob.io.base
import os
import dask
import functools
from .score_writers import FourColumnsScoreWriter
from .abstract_classes import BioAlgorithm
import bob.pipelines
import numpy as np
import h5py
from .zt_norm import ZTNormPipeline, ZTNormDaskWrapper
from .legacy import BioAlgorithmLegacy
from bob.bio.base.transformers import (
    PreprocessorTransformer,
    ExtractorTransformer,
    AlgorithmTransformer,
)
from bob.pipelines.wrappers import SampleWrapper, CheckpointWrapper
from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster
import logging
from bob.pipelines.utils import isinstance_nested
import gc
import time
from . import pickle_compress, uncompress_unpickle

logger = logging.getLogger(__name__)


class BioAlgorithmCheckpointWrapper(BioAlgorithm):
    """Wrapper used to checkpoint enrolled and Scoring samples.

    Parameters
    ----------
        biometric_algorithm: :any:`bob.bio.base.pipelines.vanilla_biometrics.BioAlgorithm`
           An implemented :any:`bob.bio.base.pipelines.vanilla_biometrics.BioAlgorithm`
    
        base_dir: str
           Path to store biometric references and scores
        
        extension: str
            File extension

        force: bool
          If True, will recompute scores and biometric references no matter if a file exists

        hash_fn
        Pointer to a hash function. This hash function maps
        `sample.key` to a hash code and this hash code corresponds a relative directory
        where a single `sample` will be checkpointed.
        This is useful when is desirable file directories with less than
        a certain number of files.


    Examples
    --------

    >>> from bob.bio.base.pipelines.vanilla_biometrics import BioAlgorithmCheckpointWrapper, Distance    
    >>> biometric_algorithm = BioAlgorithmCheckpointWrapper(Distance(), base_dir="./")
    >>> biometric_algorithm.enroll(sample) # doctest: +SKIP

    """

    def __init__(
        self,
        biometric_algorithm,
        base_dir,
        group=None,
        force=False,
        hash_fn=None,
        **kwargs
    ):
        super().__init__(**kwargs)

        self.base_dir = base_dir
        self.set_score_references_path(group)

        self.biometric_algorithm = biometric_algorithm
        self.force = force
        self._biometric_reference_extension = ".hdf5"
        self._score_extension = ".pickle.gz"
        self.hash_fn = hash_fn

[docs] def clear_caches(self): self.biometric_algorithm.clear_caches()
[docs] def set_score_references_path(self, group): if group is None: self.biometric_reference_dir = os.path.join( self.base_dir, "biometric_references" ) self.score_dir = os.path.join(self.base_dir, "scores") else: self.biometric_reference_dir = os.path.join( self.base_dir, group, "biometric_references" ) self.score_dir = os.path.join(self.base_dir, group, "scores")
[docs] def enroll(self, enroll_features): return self.biometric_algorithm.enroll(enroll_features)
[docs] def score(self, biometric_reference, data): return self.biometric_algorithm.score(biometric_reference, data)
[docs] def score_multiple_biometric_references(self, biometric_references, data): return self.biometric_algorithm.score_multiple_biometric_references( biometric_references, data )
[docs] def write_biometric_reference(self, sample, path): return bob.io.base.save(sample.data, path, create_directories=True)
[docs] def write_scores(self, samples, path): pickle_compress(path, samples)
def _enroll_sample_set(self, sampleset): """ Enroll a sample set with checkpointing """ # Amending `models` directory hash_dir_name = ( self.hash_fn(str(sampleset.key)) if self.hash_fn is not None else "" ) path = os.path.join( self.biometric_reference_dir, hash_dir_name, str(sampleset.key) + self._biometric_reference_extension, ) if self.force or not os.path.exists(path): enrolled_sample = self.biometric_algorithm._enroll_sample_set(sampleset) # saving the new sample self.write_biometric_reference(enrolled_sample, path) # This seems inefficient, but it's crucial for large datasets delayed_enrolled_sample = DelayedSample( functools.partial(bob.io.base.load, path), parent=sampleset ) return delayed_enrolled_sample def _score_sample_set( self, sampleset, biometric_references, allow_scoring_with_all_biometric_references=False, ): """Given a sampleset for probing, compute the scores and returns a sample set with the scores """ def _load(path): return uncompress_unpickle(path) def _make_name(sampleset, biometric_references): # The score file name is composed by sampleset key and the # first 3 biometric_references reference_id = str(sampleset.reference_id) name = str(sampleset.key) suffix = "_".join([str(s.key) for s in biometric_references[0:3]]) return os.path.join(reference_id, name + suffix) # Amending `models` directory hash_dir_name = ( self.hash_fn(str(sampleset.key)) if self.hash_fn is not None else "" ) path = os.path.join( self.score_dir, hash_dir_name, _make_name(sampleset, biometric_references) + self._score_extension, ) parent = sampleset if self.force or not os.path.exists(path): # Computing score scored_sample_set = self.biometric_algorithm._score_sample_set( sampleset, biometric_references, allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references, ) self.write_scores(scored_sample_set.samples, path) parent = scored_sample_set scored_sample_set = DelayedSampleSetCached( functools.partial(_load, path), parent=parent ) return scored_sample_set class BioAlgorithmDaskWrapper(BioAlgorithm): """ Wrap :any:`bob.bio.base.pipelines.vanilla_biometrics.BioAlgorithm` to work with DASK """ def __init__(self, biometric_algorithm, **kwargs): self.biometric_algorithm = biometric_algorithm
[docs] def clear_caches(self): self.biometric_algorithm.clear_caches()
[docs] def enroll_samples(self, biometric_reference_features): biometric_references = biometric_reference_features.map_partitions( self.biometric_algorithm.enroll_samples ) return biometric_references
[docs] def score_samples( self, probe_features, biometric_references, allow_scoring_with_all_biometric_references=False, ): # TODO: Here, we are sending all computed biometric references to all # probes. It would be more efficient if only the models related to each # probe are sent to the probing split. An option would be to use caching # and allow the ``score`` function above to load the required data from # the disk, directly. A second option would be to generate named delays # for each model and then associate them here. all_references = dask.delayed(list)(biometric_references) scores = probe_features.map_partitions( self.biometric_algorithm.score_samples, all_references, allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references, ) return scores
[docs] def enroll(self, data): return self.biometric_algorithm.enroll(data)
[docs] def score(self, biometric_reference, data): return self.biometric_algorithm.score(biometric_reference, data)
[docs] def score_multiple_biometric_references(self, biometric_references, data): return self.biometric_algorithm.score_multiple_biometric_references( biometric_references, data )
[docs] def set_score_references_path(self, group): self.biometric_algorithm.set_score_references_path(group)
def dask_vanilla_biometrics(pipeline, npartitions=None, partition_size=None): """ Given a :any:`bob.bio.base.pipelines.vanilla_biometrics.VanillaBiometricsPipeline`, wraps :any:`bob.bio.base.pipelines.vanilla_biometrics.VanillaBiometricsPipeline` and :any:`bob.bio.base.pipelines.vanilla_biometrics.BioAlgorithm` to be executed with dask Parameters ---------- pipeline: :any:`bob.bio.base.pipelines.vanilla_biometrics.VanillaBiometricsPipeline` Vanilla Biometrics based pipeline to be dasked npartitions: int Number of partitions for the initial `dask.bag` partition_size: int Size of the partition for the initial `dask.bag` """ if isinstance(pipeline, ZTNormPipeline): # Dasking the first part of the pipelines pipeline.vanilla_biometrics_pipeline = dask_vanilla_biometrics( pipeline.vanilla_biometrics_pipeline, npartitions=npartitions, partition_size=partition_size, ) pipeline.biometric_algorithm = ( pipeline.vanilla_biometrics_pipeline.biometric_algorithm ) pipeline.transformer = pipeline.vanilla_biometrics_pipeline.transformer pipeline.ztnorm_solver = ZTNormDaskWrapper(pipeline.ztnorm_solver) else: if partition_size is None: pipeline.transformer = bob.pipelines.wrap( ["dask"], pipeline.transformer, npartitions=npartitions ) else: pipeline.transformer = bob.pipelines.wrap( ["dask"], pipeline.transformer, partition_size=partition_size ) pipeline.biometric_algorithm = BioAlgorithmDaskWrapper( pipeline.biometric_algorithm ) def _write_scores(scores): return scores.map_partitions(pipeline.write_scores_on_dask) pipeline.write_scores_on_dask = pipeline.write_scores pipeline.write_scores = _write_scores return pipeline def checkpoint_vanilla_biometrics( pipeline, base_dir, biometric_algorithm_dir=None, hash_fn=None ): """ Given a :any:`bob.bio.base.pipelines.vanilla_biometrics.VanillaBiometricsPipeline`, wraps :any:`bob.bio.base.pipelines.vanilla_biometrics.VanillaBiometricsPipeline` and :any:`bob.bio.base.pipelines.vanilla_biometrics.BioAlgorithm` to be checkpointed Parameters ---------- pipeline: :any:`bob.bio.base.pipelines.vanilla_biometrics.VanillaBiometricsPipeline` Vanilla Biometrics based pipeline to be checkpointed base_dir: str Path to store transformed input data and possibly biometric references and scores biometric_algorithm_dir: str If set, it will checkpoint the biometric references and scores to this path. If not, `base_dir` will be used. This is useful when it's suitable to have the transformed data path, and biometric references and scores in different paths. hash_fn Pointer to a hash function. This hash function will map `sample.key` to a hash code and this hash code will be the relative directory where a single `sample` will be checkpointed. This is useful when is desireable file directories with more than a certain number of files. """ sk_pipeline = pipeline.transformer bio_ref_scores_dir = ( base_dir if biometric_algorithm_dir is None else biometric_algorithm_dir ) for i, name, estimator in sk_pipeline._iter(): wraped_estimator = bob.pipelines.wrap( ["checkpoint"], estimator, features_dir=os.path.join(base_dir, name), hash_fn=hash_fn, ) sk_pipeline.steps[i] = (name, wraped_estimator) if isinstance(pipeline.biometric_algorithm, BioAlgorithmLegacy): pipeline.biometric_algorithm.base_dir = bio_ref_scores_dir else: pipeline.biometric_algorithm = BioAlgorithmCheckpointWrapper( pipeline.biometric_algorithm, base_dir=bio_ref_scores_dir, hash_fn=hash_fn ) return pipeline def is_checkpointed(pipeline): """ Check if :any:`bob.bio.base.pipelines.vanilla_biometrics.VanillaBiometricsPipeline` is checkpointed Parameters ---------- pipeline: :any:`bob.bio.base.pipelines.vanilla_biometrics.VanillaBiometricsPipeline` Vanilla Biometrics based pipeline to be checkpointed """ # We have to check if is BioAlgorithmCheckpointWrapper OR # If it BioAlgorithmLegacy and the transformer of BioAlgorithmLegacy is also checkpointable return isinstance_nested( pipeline, "biometric_algorithm", BioAlgorithmCheckpointWrapper ) or ( isinstance_nested(pipeline, "biometric_algorithm", BioAlgorithmLegacy) and isinstance_nested(pipeline, "transformer", CheckpointWrapper) )