Source code for bob.bio.gmm.algorithm.IVector

#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Laurent El Shafey <Laurent.El-Shafey@idiap.ch>

import logging

import numpy

import bob.core
import bob.io.base
import bob.learn.em
import bob.learn.linear

from bob.bio.base.algorithm import Algorithm

from .GMM import GMM

logger = logging.getLogger("bob.bio.gmm")


class IVector(GMM):
    """Tool for extracting I-Vectors"""

    def __init__(
        self,
        # IVector training
        subspace_dimension_of_t,  # T subspace dimension
        tv_training_iterations=25,  # Number of EM iterations for the JFA training
        update_sigma=True,
        use_whitening=True,
        use_lda=False,
        use_wccn=False,
        use_plda=False,
        lda_dim=None,
        lda_strip_to_rank=True,
        plda_dim_F=50,
        plda_dim_G=50,
        plda_training_iterations=50,
        # parameters of the GMM
        **kwargs
    ):
        """Initializes the local GMM tool with the given file selector object"""
        # call base class constructor with its set of parameters
        GMM.__init__(self, **kwargs)

        # call tool constructor to overwrite what was set before
        Algorithm.__init__(
            self,
            performs_projection=True,
            use_projected_features_for_enrollment=True,
            requires_enroller_training=False,  # not needed anymore because it's done while training the projector
            split_training_features_by_client=True,
            subspace_dimension_of_t=subspace_dimension_of_t,
            tv_training_iterations=tv_training_iterations,
            update_sigma=update_sigma,
            use_whitening=use_whitening,
            use_lda=use_lda,
            use_wccn=use_wccn,
            use_plda=use_plda,
            lda_dim=lda_dim,
            lda_strip_to_rank=lda_strip_to_rank,
            plda_dim_F=plda_dim_F,
            plda_dim_G=plda_dim_G,
            plda_training_iterations=plda_training_iterations,
            multiple_model_scoring=None,
            multiple_probe_scoring=None,
            **kwargs
        )

        self.update_sigma = update_sigma
        self.use_whitening = use_whitening
        self.use_lda = use_lda
        self.use_wccn = use_wccn
        self.use_plda = use_plda
        self.subspace_dimension_of_t = subspace_dimension_of_t
        self.tv_training_iterations = tv_training_iterations

        self.ivector_trainer = bob.learn.em.IVectorTrainer(update_sigma=update_sigma)
        self.whitening_trainer = bob.learn.linear.WhiteningTrainer()

        self.lda_dim = lda_dim
        self.lda_trainer = bob.learn.linear.FisherLDATrainer(
            strip_to_rank=lda_strip_to_rank
        )
        self.wccn_trainer = bob.learn.linear.WCCNTrainer()
        self.plda_trainer = bob.learn.em.PLDATrainer()
        self.plda_dim_F = plda_dim_F
        self.plda_dim_G = plda_dim_G
        self.plda_training_iterations = plda_training_iterations

    def _check_ivector(self, feature):
        """Checks that the features are appropriate"""
        if (
            not isinstance(feature, numpy.ndarray)
            or feature.ndim != 1
            or feature.dtype != numpy.float64
        ):
            raise ValueError("The given feature is not appropriate")

[docs] def train_ivector(self, training_stats): logger.info(" -> Training IVector enroller") self.tv = bob.learn.em.IVectorMachine( self.ubm, self.subspace_dimension_of_t, self.variance_threshold ) # Reseting the pseudo random number generator so we can have the same initialization for serial and parallel execution. self.rng = bob.core.random.mt19937(self.init_seed) # train IVector model bob.learn.em.train( self.ivector_trainer, self.tv, training_stats, self.tv_training_iterations, rng=self.rng, )
[docs] def train_whitener(self, training_features): logger.info(" -> Training Whitening") ivectors_matrix = numpy.vstack(training_features) # create a Linear Machine self.whitener = bob.learn.linear.Machine( ivectors_matrix.shape[1], ivectors_matrix.shape[1] ) # create the whitening trainer self.whitening_trainer.train(ivectors_matrix, self.whitener)
[docs] def train_lda(self, training_features): logger.info(" -> Training LDA projector") self.lda, __eig_vals = self.lda_trainer.train(training_features) # resize the machine if desired # You can only clip if the rank is higher than LDA_DIM if self.lda_dim is not None: if len(__eig_vals) < self.lda_dim: logger.warning( " -> You are resizing the LDA matrix to a value above its rank" "(from {0} to {1}). Be aware that this may lead you to imprecise eigenvectors.".format( len(__eig_vals), self.lda_dim ) ) self.lda.resize(self.lda.shape[0], self.lda_dim)
[docs] def train_wccn(self, training_features): logger.info(" -> Training WCCN projector") self.wccn = self.wccn_trainer.train(training_features)
[docs] def train_plda(self, training_features): logger.info(" -> Training PLDA projector") self.plda_trainer.init_f_method = "BETWEEN_SCATTER" self.plda_trainer.init_g_method = "WITHIN_SCATTER" self.plda_trainer.init_sigma_method = "VARIANCE_DATA" variance_flooring = 1e-5 training_features = [numpy.vstack(client) for client in training_features] input_dim = training_features[0].shape[1] # Reseting the pseudo random number generator so we can have the same initialization for serial and parallel execution. self.rng = bob.core.random.mt19937(self.init_seed) self.plda_base = bob.learn.em.PLDABase( input_dim, self.plda_dim_F, self.plda_dim_G, variance_flooring ) bob.learn.em.train( self.plda_trainer, self.plda_base, training_features, self.plda_training_iterations, rng=self.rng, )
[docs] def train_projector(self, train_features, projector_file): """Train Projector and Enroller at the same time""" [ self._check_feature(feature) for client in train_features for feature in client ] # train UBM data = numpy.vstack(feature for client in train_features for feature in client) self.train_ubm(data) del data # project training data logger.info(" -> Projecting training data") train_gmm_stats = [ [self.project_ubm(feature) for feature in client] for client in train_features ] train_gmm_stats_flatten = [ stats for client in train_gmm_stats for stats in client ] # train IVector logger.info(" -> Projecting training data") self.train_ivector(train_gmm_stats_flatten) # project training i-vectors train_ivectors = [ [self.project_ivector(stats) for stats in client] for client in train_gmm_stats ] train_ivectors_flatten = [ stats for client in train_ivectors for stats in client ] if self.use_whitening: # Train Whitening self.train_whitener(train_ivectors_flatten) # whitening and length-normalizing i-vectors train_ivectors = [ [self.project_whitening(ivec) for ivec in client] for client in train_ivectors ] if self.use_lda: self.train_lda(train_ivectors) train_ivectors = [ [self.project_lda(ivec) for ivec in client] for client in train_ivectors ] if self.use_wccn: self.train_wccn(train_ivectors) train_ivectors = [ [self.project_wccn(ivec) for ivec in client] for client in train_ivectors ] if self.use_plda: self.train_plda(train_ivectors) # save self.save_projector(projector_file)
[docs] def save_projector(self, projector_file): # Save the IVector base AND the UBM AND the whitening into the same file hdf5file = bob.io.base.HDF5File(projector_file, "w") hdf5file.create_group("Projector") hdf5file.cd("Projector") self.save_ubm(hdf5file) hdf5file.cd("/") hdf5file.create_group("Enroller") hdf5file.cd("Enroller") self.tv.save(hdf5file) if self.use_whitening: hdf5file.cd("/") hdf5file.create_group("Whitener") hdf5file.cd("Whitener") self.whitener.save(hdf5file) if self.use_lda: hdf5file.cd("/") hdf5file.create_group("LDA") hdf5file.cd("LDA") self.lda.save(hdf5file) if self.use_wccn: hdf5file.cd("/") hdf5file.create_group("WCCN") hdf5file.cd("WCCN") self.wccn.save(hdf5file) if self.use_plda: hdf5file.cd("/") hdf5file.create_group("PLDA") hdf5file.cd("PLDA") self.plda_base.save(hdf5file)
[docs] def load_tv(self, tv_file): hdf5file = bob.io.base.HDF5File(tv_file) self.tv = bob.learn.em.IVectorMachine(hdf5file) # add UBM model from base class self.tv.ubm = self.ubm
[docs] def load_whitener(self, whitening_file): hdf5file = bob.io.base.HDF5File(whitening_file) self.whitener = bob.learn.linear.Machine(hdf5file)
[docs] def load_lda(self, lda_file): hdf5file = bob.io.base.HDF5File(lda_file) self.lda = bob.learn.linear.Machine(hdf5file)
[docs] def load_wccn(self, wccn_file): hdf5file = bob.io.base.HDF5File(wccn_file) self.wccn = bob.learn.linear.Machine(hdf5file)
[docs] def load_plda(self, plda_file): hdf5file = bob.io.base.HDF5File(plda_file) self.plda_base = bob.learn.em.PLDABase(hdf5file) self.plda_machine = bob.learn.em.PLDAMachine(self.plda_base)
[docs] def load_projector(self, projector_file): """Load the GMM and the ISV model from the same HDF5 file""" hdf5file = bob.io.base.HDF5File(projector_file) # Load Projector hdf5file.cd("/Projector") self.load_ubm(hdf5file) # Load Enroller hdf5file.cd("/Enroller") self.load_tv(hdf5file) if self.use_whitening: # Load Whitening hdf5file.cd("/Whitener") self.load_whitener(hdf5file) if self.use_lda: # Load LDA hdf5file.cd("/LDA") self.load_lda(hdf5file) if self.use_wccn: # Load WCCN hdf5file.cd("/WCCN") self.load_wccn(hdf5file) if self.use_plda: # Load PLDA hdf5file.cd("/PLDA") self.load_plda(hdf5file)
[docs] def project_ivector(self, gmm_stats): return self.tv.project(gmm_stats)
[docs] def project_whitening(self, ivector): whitened = self.whitener.forward(ivector) return whitened / numpy.linalg.norm(whitened)
[docs] def project_lda(self, ivector): out_ivector = numpy.ndarray(self.lda.shape[1], numpy.float64) self.lda(ivector, out_ivector) return out_ivector
[docs] def project_wccn(self, ivector): out_ivector = numpy.ndarray(self.wccn.shape[1], numpy.float64) self.wccn(ivector, out_ivector) return out_ivector
####################################################### # IVector projection #
[docs] def project(self, feature_array): """Computes GMM statistics against a UBM, then corresponding Ux vector""" self._check_feature(feature_array) # project UBM projected_ubm = self.project_ubm(feature_array) # project I-Vector ivector = self.project_ivector(projected_ubm) # whiten I-Vector if self.use_whitening: ivector = self.project_whitening(ivector) # LDA projection if self.use_lda: ivector = self.project_lda(ivector) # WCCN projection if self.use_wccn: ivector = self.project_wccn(ivector) return ivector
####################################################### # Read / Write I-Vectors #
[docs] def write_feature(self, data, feature_file): """Saves the feature, which is the (whitened) I-Vector.""" bob.bio.base.save(data, feature_file)
[docs] def read_feature(self, feature_file): """Read the type of features that we require, namely i-vectors (stored as simple numpy arrays)""" return bob.bio.base.load(feature_file)
####################################################### # Model Enrollment #
[docs] def enroll(self, enroll_features): """Performs IVector enrollment""" [self._check_ivector(feature) for feature in enroll_features] average_ivector = numpy.mean(numpy.vstack(enroll_features), axis=0) if self.use_plda: average_ivector = average_ivector.reshape(1, -1) self.plda_trainer.enroll(self.plda_machine, average_ivector) return self.plda_machine else: return average_ivector
###################################################### # Feature comparison #
[docs] def read_model(self, model_file): """Reads the whitened i-vector that holds the model""" if self.use_plda: return bob.learn.em.PLDAMachine( bob.io.base.HDF5File(str(model_file)), self.plda_base ) else: return bob.bio.base.load(model_file)
[docs] def score(self, model, probe): """Computes the score for the given model and the given probe.""" self._check_ivector(probe) if self.use_plda: return model.log_likelihood_ratio(probe) else: self._check_ivector(model) return numpy.dot( model / numpy.linalg.norm(model), probe / numpy.linalg.norm(probe) )
[docs] def score_for_multiple_probes(self, model, probes): """This function computes the score between the given model and several given probe files.""" probe = numpy.mean(numpy.vstack(probes), axis=0) return self.score(model, probe)