Source code for bob.pad.base.algorithm.GMM

#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# @author: Pavel Korshunov <pavel.korshunov@idiap.ch>
# @date: Wed 19 Oct 23:43:22 2016


import logging
import numpy

import bob.io.base
import bob.learn.linear
import bob.learn.em
from bob.bio.video import FrameContainer
from . import Algorithm
from ..utils import (
    convert_and_prepare_features,
)


logger = logging.getLogger(__name__)


[docs]class GMM(Algorithm): """Trains two GMMs for two classes of PAD and calculates log likelihood ratio during evaluation. """ def __init__( self, # parameters for the GMM number_of_gaussians, # parameters of UBM training kmeans_training_iterations=25, # Maximum number of iterations for K-Means gmm_training_iterations=10, # Maximum number of iterations for ML GMM Training training_threshold=5e-4, # Threshold to end the ML training variance_threshold=5e-4, # Minimum value that a variance can reach update_weights=True, update_means=True, update_variances=True, responsibility_threshold=0, # If set, the weight of a particular Gaussian will at least be greater than this # threshold. In the case the real weight is lower, the prior mean value will be # used to estimate the current mean and variance. INIT_SEED=5489, performs_projection=True, requires_projector_training=True, **kwargs, ): super().__init__( performs_projection=performs_projection, requires_projector_training=requires_projector_training, **kwargs, ) self._kwargs.update( dict( number_of_gaussians=number_of_gaussians, kmeans_training_iterations=kmeans_training_iterations, gmm_training_iterations=gmm_training_iterations, training_threshold=training_threshold, variance_threshold=variance_threshold, update_weights=update_weights, update_means=update_means, update_variances=update_variances, responsibility_threshold=responsibility_threshold, INIT_SEED=INIT_SEED, ) ) # copy parameters self.gaussians = number_of_gaussians self.kmeans_training_iterations = kmeans_training_iterations self.gmm_training_iterations = gmm_training_iterations self.training_threshold = training_threshold self.variance_threshold = variance_threshold self.update_weights = update_weights self.update_means = update_means self.update_variances = update_variances self.responsibility_threshold = responsibility_threshold self.init_seed = INIT_SEED self.rng = bob.core.random.mt19937(self.init_seed) self.gmm_machine_real = None self.gmm_machine_attack = None self.kmeans_trainer = bob.learn.em.KMeansTrainer() self.gmm_trainer = bob.learn.em.ML_GMMTrainer( self.update_means, self.update_variances, self.update_weights, self.responsibility_threshold, ) def _check_feature(self, feature, machine=None, projected=False): """Checks that the features are appropriate.""" if ( not isinstance(feature, numpy.ndarray) or feature.ndim != 2 or feature.dtype != numpy.float64 ): raise ValueError("The given feature is not appropriate", feature) if ( self.gmm_machine_real is not None and feature.shape[1] != self.gmm_machine_real.shape[1] ): raise ValueError( "The given feature is expected to have %d elements, but it has %d" % (self.gmm_machine_real.shape[1], feature.shape[1]) ) if ( self.gmm_machine_attack is not None and feature.shape[1] != self.gmm_machine_attack.shape[1] ): raise ValueError( "The given feature is expected to have %d elements, but it has %d" % (self.gmm_machine_attack.shape[1], feature.shape[1]) ) return True ####################################################### # GMM training #
[docs] def train_gmm(self, array): logger.debug(" .... Training with %d feature vectors", array.shape[0]) # Computes input size input_size = array.shape[1] # Creates the machines (KMeans and GMM) logger.debug(" .... Creating machines") kmeans_machine = bob.learn.em.KMeansMachine(self.gaussians, input_size) gmm_machine = bob.learn.em.GMMMachine(self.gaussians, input_size) # initialize the random generator with out one single cool seed that allows us to reproduce experiments logger.info(" -> Init random generator with seed %d", self.init_seed) self.rng = bob.core.random.mt19937(self.init_seed) # Trains using the KMeansTrainer logger.info(" -> Training K-Means") bob.learn.em.train( self.kmeans_trainer, kmeans_machine, array, self.kmeans_training_iterations, self.training_threshold, self.rng, ) variances, weights = kmeans_machine.get_variances_and_weights_for_each_cluster( array ) means = kmeans_machine.means # Initializes the GMM gmm_machine.means = means gmm_machine.variances = variances gmm_machine.weights = weights gmm_machine.set_variance_thresholds(self.variance_threshold) # Trains the GMM logger.info(" -> Training GMM") bob.learn.em.train( self.gmm_trainer, gmm_machine, array, self.gmm_training_iterations, self.training_threshold, self.rng, ) return gmm_machine
[docs] def save_gmms(self, projector_file): """Save projector to file""" # Saves the trained GMMs to file logger.debug(" .... Saving GMM models to file '%s'", projector_file) hdf5 = ( projector_file if isinstance(projector_file, bob.io.base.HDF5File) else bob.io.base.HDF5File(projector_file, "w") ) hdf5.create_group("GMMReal") hdf5.cd("GMMReal") self.gmm_machine_real.save(hdf5) hdf5.cd("/") hdf5.create_group("GMMAttack") hdf5.cd("GMMAttack") self.gmm_machine_attack.save(hdf5)
[docs] def train_projector(self, training_features, projector_file): if len(training_features) != 2: raise ValueError( "Training projector: features should contain two lists: real and attack!" ) logger.info( " - Training: number of real features %d", len(training_features[0]) ) logger.info( " - Training: number of attack features %d", len(training_features[1]) ) attack_features = convert_and_prepare_features( training_features[1], dtype="float64" ) del training_features[1] real_features = convert_and_prepare_features( training_features[0], dtype="float64" ) del training_features [self._check_feature(feature) for feature in real_features] [self._check_feature(feature) for feature in attack_features] logger.debug( "GMM:train_projector(), real_features shape: %s", real_features.shape ) logger.debug( "GMM:train_projector(), attack_features shape: %s", attack_features.shape ) logger.debug("Min real %g", numpy.min(real_features)) logger.debug("Max real %g", numpy.max(real_features)) logger.debug("Min attack %g", numpy.min(attack_features)) logger.debug("Max attack %g", numpy.max(attack_features)) logger.info("Training the GMM for real samples") self.gmm_machine_real = self.train_gmm(real_features) logger.info("Training the GMM for attack samples") self.gmm_machine_attack = self.train_gmm(attack_features) logger.info("Saving the GMMs") self.save_gmms(projector_file)
[docs] def load_projector(self, projector_file): with bob.io.base.HDF5File(projector_file) as hdf5file: # read GMM for real data hdf5file.cd("/GMMReal") self.gmm_machine_real = bob.learn.em.GMMMachine(hdf5file) # read GMM for attack data hdf5file.cd("/GMMAttack") self.gmm_machine_attack = bob.learn.em.GMMMachine(hdf5file) self.gmm_machine_real.set_variance_thresholds(self.variance_threshold) self.gmm_machine_attack.set_variance_thresholds(self.variance_threshold)
[docs] def project(self, feature): """project(feature) -> projected Projects the given feature into GMM space. **Parameters:** feature : 1D :py:class:`numpy.ndarray` The 1D feature to be projected. **Returns:** projected : 1D :py:class:`numpy.ndarray` The ``feature`` projected into GMM space. """ if isinstance(feature, FrameContainer): feature = feature.as_array() feature = numpy.asarray(feature, dtype=numpy.float64) self._check_feature(feature) logger.debug(" .... Projecting %d features vector" % feature.shape[0]) # return the resulting log likelihoods return numpy.asarray( [self.gmm_machine_real(feature), self.gmm_machine_attack(feature)], dtype=numpy.float64, )
[docs] def score(self, toscore): """Returns the difference between log likelihoods of being real or attack""" return [toscore[0] - toscore[1]]
[docs] def score_for_multiple_projections(self, toscore): """Returns the difference between log likelihoods of being real or attack""" return self.score(toscore)