Source code for bob.bio.base.pipelines.vanilla_biometrics.zt_norm

"""
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

Implementation of a pipeline and an algorithm that 
computes Z, T and ZT Score Normalization of a :any:`bob.bio.base.pipelines.vanilla_biometrics.BioAlgorithm`
"""

from bob.pipelines import (
    DelayedSample,
    Sample,
    SampleSet,
    DelayedSampleSet,
    DelayedSampleSetCached,
)


import numpy as np
import dask
import functools
import cloudpickle
import os
from .score_writers import FourColumnsScoreWriter
import copy
import logging
from .pipelines import check_valid_pipeline
from . import pickle_compress, uncompress_unpickle

logger = logging.getLogger(__name__)


class ZTNormPipeline(object):
    """
    Apply Z, T or ZT Score normalization on top of VanillaBiometric Pipeline

    Reference bibliography from: A Generative Model for Score Normalization in Speaker Recognition
    https://arxiv.org/pdf/1709.09868.pdf


    Example
    -------
       >>> from bob.pipelines.transformers import Linearize
       >>> from sklearn.pipeline import make_pipeline
       >>> from bob.bio.base.pipelines.vanilla_biometrics import Distance, VanillaBiometricsPipeline, ZTNormPipeline
       >>> estimator_1 = Linearize()
       >>> transformer = make_pipeline(estimator_1)
       >>> biometric_algorithm = Distance()
       >>> vanilla_biometrics_pipeline = VanillaBiometricsPipeline(transformer, biometric_algorithm)
       >>> zt_pipeline = ZTNormPipeline(vanilla_biometrics_pipeline)
       >>> zt_pipeline(...) #doctest: +SKIP

    Parameters
    ----------

        vanilla_biometrics_pipeline: :any:`VanillaBiometricsPipeline`
          An instance :any:`VanillaBiometricsPipeline` to the wrapped with score normalization

        z_norm: bool
          If True, applies ZScore normalization on top of raw scores.

        t_norm: bool
          If True, applies TScore normalization on top of raw scores.
          If both, z_norm and t_norm are true, it applies score normalization

        score_writer: 

        adaptive_score_fraction: float
           Set the proportion of the impostor scores used to compute :math:`\mu` and :math:`\std` for the T normalization
           This is also called as adaptative T-Norm (https://ieeexplore.ieee.org/document/1415220) or 
           Top-Norm (https://ieeexplore.ieee.org/document/4013533)

        adaptive_score_descending_sort bool
            It true, during the Top-norm statistics computations, sort the scores in descending order


    """

    def __init__(
        self,
        vanilla_biometrics_pipeline,
        z_norm=True,
        t_norm=True,
        score_writer=FourColumnsScoreWriter("./scores.txt"),
        adaptive_score_fraction=1.0,
        adaptive_score_descending_sort=True,
    ):
        self.vanilla_biometrics_pipeline = vanilla_biometrics_pipeline
        self.biometric_algorithm = self.vanilla_biometrics_pipeline.biometric_algorithm
        self.transformer = self.vanilla_biometrics_pipeline.transformer

        self.ztnorm_solver = ZTNorm(
            adaptive_score_fraction, adaptive_score_descending_sort
        )

        self.z_norm = z_norm
        self.t_norm = t_norm
        self.score_writer = score_writer

        if not z_norm and not t_norm:
            raise ValueError(
                "Both z_norm and t_norm are False. No normalization will be applied"
            )
        check_valid_pipeline(self)

    def __call__(
        self,
        background_model_samples,
        biometric_reference_samples,
        probe_samples,
        zprobe_samples=None,
        t_biometric_reference_samples=None,
        allow_scoring_with_all_biometric_references=False,
    ):

        self.transformer = self.train_background_model(background_model_samples)

        # Create biometric samples
        biometric_references = self.create_biometric_reference(
            biometric_reference_samples
        )

        raw_scores, probe_features = self.compute_scores(
            probe_samples,
            biometric_references,
            allow_scoring_with_all_biometric_references,
        )

        # Z NORM
        if self.z_norm:
            if zprobe_samples is None:
                raise ValueError("No samples for `z_norm` was provided")
            z_normed_scores, z_probe_features = self.compute_znorm_scores(
                zprobe_samples,
                raw_scores,
                biometric_references,
                allow_scoring_with_all_biometric_references,
            )

        if self.t_norm:
            if t_biometric_reference_samples is None:
                raise ValueError("No samples for `t_norm` was provided")
        else:
            # In case z_norm=True and t_norm=False
            return z_normed_scores

        # T NORM
        t_normed_scores, t_scores, t_biometric_references = self.compute_tnorm_scores(
            t_biometric_reference_samples,
            probe_features,
            raw_scores,
            allow_scoring_with_all_biometric_references,
        )

        if not self.z_norm:
            # In case z_norm=False and t_norm=True
            return t_normed_scores

        # ZT NORM
        zt_normed_scores = self.compute_ztnorm_scores(
            z_probe_features,
            t_biometric_references,
            z_normed_scores,
            t_scores,
            allow_scoring_with_all_biometric_references,
        )

        # S-norm
        s_normed_scores = self.compute_snorm_scores(z_normed_scores, t_normed_scores)

        return (
            raw_scores,
            z_normed_scores,
            t_normed_scores,
            zt_normed_scores,
            s_normed_scores,
        )

[docs] def train_background_model(self, background_model_samples): return self.vanilla_biometrics_pipeline.train_background_model( background_model_samples )
[docs] def create_biometric_reference(self, biometric_reference_samples): return self.vanilla_biometrics_pipeline.create_biometric_reference( biometric_reference_samples )
[docs] def compute_scores( self, probe_samples, biometric_references, allow_scoring_with_all_biometric_references=False, ): return self.vanilla_biometrics_pipeline.compute_scores( probe_samples, biometric_references, allow_scoring_with_all_biometric_references, )
[docs] def compute_znorm_scores( self, zprobe_samples, probe_scores, biometric_references, allow_scoring_with_all_biometric_references=False, ): z_scores, z_probe_features = self.compute_scores( zprobe_samples, biometric_references ) z_normed_scores = self.ztnorm_solver.compute_znorm_scores( probe_scores, z_scores, biometric_references ) return z_normed_scores, z_probe_features
[docs] def compute_tnorm_scores( self, t_biometric_reference_samples, probe_features, probe_scores, allow_scoring_with_all_biometric_references=False, ): t_biometric_references = self.create_biometric_reference( t_biometric_reference_samples ) # probe_features = self._inject_references(probe_features, t_biometric_references) # Reusing the probe features t_scores = self.vanilla_biometrics_pipeline.biometric_algorithm.score_samples( probe_features, t_biometric_references, allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references, ) t_normed_scores = self.ztnorm_solver.compute_tnorm_scores( probe_scores, t_scores, t_biometric_references, ) return t_normed_scores, t_scores, t_biometric_references
[docs] def compute_ztnorm_scores( self, z_probe_features, t_biometric_references, z_normed_scores, t_scores, allow_scoring_with_all_biometric_references=False, ): # Reusing the zprobe_features and t_biometric_references zt_scores = self.vanilla_biometrics_pipeline.biometric_algorithm.score_samples( z_probe_features, t_biometric_references, allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references, ) return self.ztnorm_solver.compute_ztnorm_score( t_scores, zt_scores, t_biometric_references, z_normed_scores )
[docs] def compute_snorm_scores(self, znormed_scores, tnormed_scores): s_normed_scores = self.ztnorm_solver.compute_snorm_scores( znormed_scores, tnormed_scores ) return s_normed_scores
[docs] def write_scores(self, scores): return self.vanilla_biometrics_pipeline.write_scores(scores)
[docs] def post_process(self, score_paths, filename): return self.vanilla_biometrics_pipeline.post_process(score_paths, filename)
class ZTNorm(object): """ Computes Z, T and ZT Score Normalization of a `:any:`bob.bio.base.pipelines.vanilla_biometrics.BioAlgorithm` Reference bibliography from: A Generative Model for Score Normalization in Speaker Recognition https://arxiv.org/pdf/1709.09868.pdf Parameters ---------- adaptive_score_fraction: float Set the proportion of the impostor scores used to compute :math:`\mu` and :math:`\std` for the T normalization This is also called as adaptative T-Norm (https://ieeexplore.ieee.org/document/1415220) or Top-Norm (https://ieeexplore.ieee.org/document/4013533) adaptive_score_descending_sort bool It true, during the Top-norm statistics computations, sort the scores in descending order """ def __init__(self, adaptive_score_fraction, adaptive_score_descending_sort): self.adaptive_score_fraction = adaptive_score_fraction self.adaptive_score_descending_sort = adaptive_score_descending_sort def _norm(self, score, mu, std): # Reference: https://gitlab.idiap.ch/bob/bob.learn.em/-/blob/master/bob/learn/em/test/test_ztnorm.py # Axis 0=ZNORM # Axi1 1=TNORM return (score - mu) / std def _compute_std(self, mu, norm_base_scores, axis=1): # Reference: https://gitlab.idiap.ch/bob/bob.learn.em/-/blob/master/bob/learn/em/test/test_ztnorm.py # Axis 0=ZNORM # Axi1 1=TNORM if axis == 1: return np.sqrt( np.sum( ( norm_base_scores - np.tile( mu.reshape(norm_base_scores.shape[0], 1), (1, norm_base_scores.shape[1]), ) ) ** 2, axis=1, ) / (norm_base_scores.shape[1] - 1) ) else: return np.sqrt( np.sum( ( norm_base_scores - np.tile( mu.reshape(1, norm_base_scores.shape[1]), (norm_base_scores.shape[0], 1), ) ) ** 2, axis=0, ) / (norm_base_scores.shape[0] - 1) ) def _compute_stats(self, sampleset_for_norm, biometric_references, axis=0): """ Compute statistics for Z and T Norm. The way the scores are organized (probe vs bioref); axis=0 computes CORRECTLY the statistics for ZNorm axis=1 computes CORRECTLY the statistics for TNorm """ # Dumping all scores score_floats = np.array([s.data for sset in sampleset_for_norm for s in sset]) # Reshaping in PROBE vs BIOMETRIC_REFERENCES n_probes = len(sampleset_for_norm) n_references = len(biometric_references) score_floats = score_floats.reshape((n_probes, n_references)) # AXIS ON THE MODELS proportion = int( np.floor(score_floats.shape[axis] * self.adaptive_score_fraction) ) sorted_scores = ( -np.sort(-score_floats, axis=axis) if self.adaptive_score_descending_sort else np.sort(score_floats, axis=axis) ) if axis == 0: top_scores = sorted_scores[0:proportion, :] else: top_scores = sorted_scores[:, 0:proportion] top_scores = np.nan_to_num(top_scores) big_mu = np.mean(top_scores, axis=axis) big_std = self._compute_std(big_mu, top_scores, axis=axis) # Creating statistics structure with subject id as the key stats = {} if axis == 0: # Z-Norm is one statistic per biometric references biometric_reference_subjects = [ br.reference_id for br in sampleset_for_norm[0] ] for mu, std, s in zip(big_mu, big_std, biometric_reference_subjects): stats[s] = {"big_mu": mu, "big_std": std} else: # T-Norm is one statistic per probe for mu, std, sset in zip(big_mu, big_std, sampleset_for_norm): stats[sset.reference_id] = {"big_mu": mu, "big_std": std} return stats def _znorm_samplesets(self, probe_scores, stats, for_zt=False): # Normalizing # TODO: THIS TENDS TO BE EXTREMLY SLOW z_normed_score_samples = [] for probe_score in probe_scores: z_normed_score_samples.append(self._apply_znorm(probe_score, stats)) return z_normed_score_samples def _apply_znorm(self, probe_score, stats): z_normed_score = SampleSet([], parent=probe_score) for biometric_reference_score in probe_score: mu = stats[biometric_reference_score.reference_id]["big_mu"] std = stats[biometric_reference_score.reference_id]["big_std"] score = self._norm(biometric_reference_score.data, mu, std) new_sample = Sample(score, parent=biometric_reference_score) z_normed_score.samples.append(new_sample) return z_normed_score def _tnorm_samplesets(self, probe_scores, stats, for_zt=False): # Normalizing # TODO: THIS TENDS TO BE EXTREMLY SLOW # MAYBE THIS COULD BE DELAYED OR RUN ON TOP OF t_normed_score_samples = [] for probe_score in probe_scores: t_normed_score_samples.append(self._apply_tnorm(probe_score, stats)) return t_normed_score_samples def _apply_tnorm(self, probe_score, stats): # Normalizing t_normed_scores = SampleSet([], parent=probe_score) mu = stats[probe_score.reference_id]["big_mu"] std = stats[probe_score.reference_id]["big_std"] for biometric_reference_score in probe_score: score = self._norm(biometric_reference_score.data, mu, std) new_sample = Sample(score, parent=biometric_reference_score) t_normed_scores.samples.append(new_sample) return t_normed_scores
[docs] def compute_znorm_scores( self, probe_scores, sampleset_for_znorm, biometric_references ): """ Base Z-normalization function """ stats = self._compute_stats(sampleset_for_znorm, biometric_references, axis=0) return self._znorm_samplesets(probe_scores, stats)
[docs] def compute_tnorm_scores( self, probe_scores, sampleset_for_tnorm, t_biometric_references, allow_scoring_with_all_biometric_references=False, ): """ Base T-normalization function """ stats = self._compute_stats( sampleset_for_tnorm, t_biometric_references, axis=1, ) return self._tnorm_samplesets(probe_scores, stats)
[docs] def compute_ztnorm_score( self, t_scores, zt_scores, t_biometric_references, z_normed_scores ): # Z Normalizing the T-normed scores z_normed_t_normed = self.compute_znorm_scores( t_scores, zt_scores, t_biometric_references, ) # (Z Normalizing the T-normed scores) the Z normed scores zt_normed_scores = self.compute_tnorm_scores( z_normed_scores, z_normed_t_normed, t_biometric_references, ) return zt_normed_scores
def _snorm(self, z_score, t_score): return 0.5 * (z_score + t_score) def _snorm_samplesets(self, znormed_scores, tnormed_scores): s_normed_samplesets = [] for z, t in zip(znormed_scores, tnormed_scores): s_normed_scores = SampleSet([], parent=z) for b_z, b_t in zip(z, t): score = self._snorm(b_z.data, b_t.data) new_sample = Sample(score, parent=b_z) s_normed_scores.samples.append(new_sample) s_normed_samplesets.append(s_normed_scores) return s_normed_samplesets
[docs] def compute_snorm_scores(self, znormed_scores, tnormed_scores): return self._snorm_samplesets(znormed_scores, tnormed_scores)
class ZTNormDaskWrapper(object): """ Wrap `:any:`bob.bio.base.pipelines.vanilla_biometrics.ZTNorm` to work with DASK Parameters ---------- ztnorm: :any:`bob.bio.base.pipelines.vanilla_biometrics.ZTNormPipeline` ZTNorm Pipeline """ def __init__(self, ztnorm): self.ztnorm = ztnorm
[docs] def compute_znorm_scores( self, probe_scores, sampleset_for_znorm, biometric_references, for_zt=False ): # Reducing all the Z-Scores to compute the stats all_scores_for_znorm = dask.delayed(list)(sampleset_for_znorm) stats = dask.delayed(self.ztnorm._compute_stats)( all_scores_for_znorm, biometric_references, axis=0 ) return probe_scores.map_partitions(self.ztnorm._znorm_samplesets, stats, for_zt)
[docs] def compute_tnorm_scores( self, probe_scores, sampleset_for_tnorm, t_biometric_references, for_zt=False ): # Reducing all the Z-Scores to compute the stats all_scores_for_tnorm = dask.delayed(list)(sampleset_for_tnorm) stats = dask.delayed(self.ztnorm._compute_stats)( all_scores_for_tnorm, t_biometric_references, axis=1 ) return probe_scores.map_partitions(self.ztnorm._tnorm_samplesets, stats, for_zt)
[docs] def compute_ztnorm_score( self, t_scores, zt_scores, t_biometric_references, z_normed_scores ): # Z Normalizing the T-normed scores z_normed_t_normed = self.compute_znorm_scores( t_scores, zt_scores, t_biometric_references, for_zt=True ) # (Z Normalizing the T-normed scores) the Z normed scores zt_normed_scores = self.compute_tnorm_scores( z_normed_scores, z_normed_t_normed, t_biometric_references, for_zt=True ) return zt_normed_scores
[docs] def compute_snorm_scores(self, znormed_scores, tnormed_scores): return znormed_scores.map_partitions( self.ztnorm._snorm_samplesets, tnormed_scores )
class ZTNormCheckpointWrapper(object): """ Wrap :any:`bob.bio.base.pipelines.vanilla_biometrics.ZTNormPipeline` to work with DASK Parameters ---------- ztnorm: :any:`bob.bio.base.pipelines.vanilla_biometrics.ZTNorm` ZTNorm Pipeline """ def __init__(self, ztnorm, base_dir, force=False): if not isinstance(ztnorm, ZTNorm): raise ValueError("This class only wraps `ZTNorm` objects") self.ztnorm = ztnorm self.znorm_score_path = os.path.join(base_dir, "znorm_scores") self.tnorm_score_path = os.path.join(base_dir, "tnorm_scores") self.ztnorm_score_path = os.path.join(base_dir, "ztnorm_scores") self.snorm_score_path = os.path.join(base_dir, "snorm_scores") self.force = force self.base_dir = base_dir self._score_extension = ".pickle.gz"
[docs] def write_scores(self, samples, path): pickle_compress(path, samples)
def _load(self, path): return uncompress_unpickle(path) def _make_name(self, sampleset, biometric_references, for_zt=False): # The score file name is composed by sampleset key and the # first 3 biometric_references reference_id = str(sampleset.reference_id) name = str(sampleset.key) # suffix = "_".join([s for s in biometric_references[0:5]]) suffix = "_".join([str(s) for s in biometric_references[0:5]]) suffix += "_zt_norm" if for_zt else "" return os.path.join(reference_id, name + suffix) def _apply_znorm(self, probe_score, stats, for_zt=False): path = os.path.join( self.znorm_score_path, self._make_name(probe_score, probe_score.references, for_zt) + self._score_extension, ) if self.force or not os.path.exists(path): z_normed_score = self.ztnorm._apply_znorm(probe_score, stats) self.write_scores(z_normed_score.samples, path) z_normed_score = DelayedSampleSetCached( functools.partial(self._load, path), parent=probe_score ) return z_normed_score def _apply_tnorm(self, probe_score, stats, for_zt=False): path = os.path.join( self.tnorm_score_path, self._make_name(probe_score, probe_score.references, for_zt) + self._score_extension, ) if self.force or not os.path.exists(path): t_normed_score = self.ztnorm._apply_tnorm(probe_score, stats) self.write_scores(t_normed_score.samples, path) t_normed_score = DelayedSampleSetCached( functools.partial(self._load, path), parent=probe_score ) return t_normed_score
[docs] def compute_znorm_scores( self, probe_scores, sampleset_for_znorm, biometric_references, for_zt=False ): # return self.ztnorm.compute_znorm_scores(probe_scores, sampleset_for_znorm, biometric_references) stats = self._compute_stats(sampleset_for_znorm, biometric_references, axis=0) return self._znorm_samplesets(probe_scores, stats, for_zt)
[docs] def compute_tnorm_scores( self, probe_scores, sampleset_for_tnorm, t_biometric_references, for_zt=False ): # return self.ztnorm.compute_tnorm_scores(probe_scores, sampleset_for_tnorm, t_biometric_references) stats = self._compute_stats(sampleset_for_tnorm, t_biometric_references, axis=1) return self._tnorm_samplesets(probe_scores, stats, for_zt)
[docs] def compute_ztnorm_score( self, t_scores, zt_scores, t_biometric_references, z_normed_scores ): # Z Normalizing the T-normed scores z_normed_t_normed = self.compute_znorm_scores( t_scores, zt_scores, t_biometric_references, for_zt=True ) # (Z Normalizing the T-normed scores) the Z normed scores zt_normed_scores = self.compute_tnorm_scores( z_normed_scores, z_normed_t_normed, t_biometric_references, for_zt=True ) return zt_normed_scores
[docs] def compute_snorm_scores(self, znormed_scores, tnormed_scores): return self.ztnorm.compute_snorm_scores(znormed_scores, tnormed_scores)
def _compute_stats(self, sampleset_for_norm, biometric_references, axis=0): return self.ztnorm._compute_stats( sampleset_for_norm, biometric_references, axis=axis ) def _znorm_samplesets(self, probe_scores, stats, for_zt=False): z_normed_score_samples = [] for probe_score in probe_scores: z_normed_score_samples.append( self._apply_znorm(probe_score, stats, for_zt=for_zt) ) return z_normed_score_samples # return self.ztnorm._znorm_samplesets(probe_scores, stats) def _tnorm_samplesets(self, probe_scores, stats, for_zt=False): t_normed_score_samples = [] for probe_score in probe_scores: t_normed_score_samples.append( self._apply_tnorm(probe_score, stats, for_zt=for_zt) ) return t_normed_score_samples # return self.ztnorm._tnorm_samplesets(probe_scores, stats) def _snorm_samplesets(self, probe_scores, stats): return self.ztnorm._snorm_samplesets(probe_scores, stats)