#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Laurent El Shafey <Laurent.El-Shafey@idiap.ch>
import bob.core
import bob.io.base
import bob.learn.linear
import bob.learn.em
import numpy
from .Algorithm import Algorithm
import logging
logger = logging.getLogger("bob.bio.base")
class PLDA (Algorithm):
"""Tool chain for computing PLDA (over PCA-dimensionality reduced) features
.. todo:: Add more documentation for the PLDA constructor, i.e., by explaining the parameters
"""
def __init__(
self,
subspace_dimension_of_f, # Size of subspace F
subspace_dimension_of_g, # Size of subspace G
subspace_dimension_pca = None, # if given, perform PCA on data and reduce the PCA subspace to the given dimension
plda_training_iterations = 200, # Maximum number of iterations for the EM loop
# TODO: refactor the remaining parameters!
INIT_SEED = 5489, # seed for initializing
INIT_F_METHOD = 'BETWEEN_SCATTER',
INIT_G_METHOD = 'WITHIN_SCATTER',
INIT_S_METHOD = 'VARIANCE_DATA',
multiple_probe_scoring = 'joint_likelihood'
):
"""Initializes the local (PCA-)PLDA tool chain with the given file selector object"""
# call base class constructor and register that this class requires training for enrollment
super(PLDA, self).__init__(
requires_enroller_training = True,
subspace_dimension_of_f = subspace_dimension_of_f, # Size of subspace F
subspace_dimension_of_g = subspace_dimension_of_g, # Size of subspace G
subspace_dimension_pca = subspace_dimension_pca, # if given, perform PCA on data and reduce the PCA subspace to the given dimension
plda_training_iterations = plda_training_iterations, # Maximum number of iterations for the EM loop
# TODO: refactor the remaining parameters!
INIT_SEED = INIT_SEED, # seed for initializing
INIT_F_METHOD = str(INIT_F_METHOD),
INIT_G_METHOD = str(INIT_G_METHOD),
INIT_S_METHOD = str(INIT_S_METHOD),
multiple_probe_scoring = multiple_probe_scoring,
multiple_model_scoring = None
)
self.subspace_dimension_of_f = subspace_dimension_of_f
self.subspace_dimension_of_g = subspace_dimension_of_g
self.subspace_dimension_pca = subspace_dimension_pca
self.plda_training_iterations = plda_training_iterations
self.score_set = {'joint_likelihood': 'joint_likelihood', 'average':numpy.average, 'min':min, 'max':max}[multiple_probe_scoring]
# TODO: refactor
self.plda_trainer = bob.learn.em.PLDATrainer()
self.plda_trainer.init_f_method = INIT_F_METHOD
self.plda_trainer.init_g_method = INIT_G_METHOD
self.plda_trainer.init_sigma_method = INIT_S_METHOD
self.rng = bob.core.random.mt19937(INIT_SEED)
self.pca_machine = None
self.plda_base = None
def _train_pca(self, training_set):
"""Trains and returns a LinearMachine that is trained using PCA"""
data = numpy.vstack(feature for feature in training_set)
logger.info(" -> Training LinearMachine using PCA ")
trainer = bob.learn.linear.PCATrainer()
machine, eigen_values = trainer.train(data)
if isinstance(self.subspace_dimension_pca, float):
cummulated = numpy.cumsum(eigen_values) / numpy.sum(eigen_values)
for index in range(len(cummulated)):
if cummulated[index] > self.subspace_dimension_pca:
self.subspace_dimension_pca = index
break
self.subspace_dimension_pca = index
# limit number of pcs
logger.info(" -> limiting PCA subspace to %d dimensions", self.subspace_dimension_pca)
machine.resize(machine.shape[0], self.subspace_dimension_pca)
return machine
def _perform_pca(self, training_set):
"""Perform PCA on data"""
return [self.pca_machine(client) for client in training_set]
def _arrange_data(self, training_files):
"""Arranges the data to train the PLDA """
data = []
for client_files in training_files:
# at least two files per client are required!
if len(client_files) < 2:
logger.warn("Skipping one client since the number of client files is only %d", len(client_files))
continue
data.append(numpy.vstack(feature.flatten() for feature in client_files))
# Returns the list of lists of arrays
return data
[docs] def train_enroller(self, training_features, projector_file):
"""Generates the PLDA base model from a list of arrays (one per identity),
and a set of training parameters. If PCA is requested, it is trained on the same data.
Both the trained PLDABase and the PCA machine are written."""
# arrange PLDA training data
training_features = self._arrange_data(training_features)
# train PCA and perform PCA on training data
if self.subspace_dimension_pca is not None:
self.pca_machine = self._train_pca(training_features)
training_features = self._perform_pca(training_features)
input_dimension = training_features[0].shape[1]
logger.info(" -> Training PLDA base machine")
# train machine
self.plda_base = bob.learn.em.PLDABase(input_dimension, self.subspace_dimension_of_f, self.subspace_dimension_of_g)
bob.learn.em.train(self.plda_trainer, self.plda_base, training_features, self.plda_training_iterations, rng=self.rng)
# write machines to file
proj_hdf5file = bob.io.base.HDF5File(str(projector_file), "w")
if self.subspace_dimension_pca is not None:
proj_hdf5file.create_group('/pca')
proj_hdf5file.cd('/pca')
self.pca_machine.save(proj_hdf5file)
proj_hdf5file.create_group('/plda')
proj_hdf5file.cd('/plda')
self.plda_base.save(proj_hdf5file)
[docs] def load_enroller(self, projector_file):
"""Reads the PCA projection matrix and the PLDA model from file"""
# read enroller (PCA and PLDA matrix)
hdf5 = bob.io.base.HDF5File(projector_file)
if hdf5.has_group("/pca"):
hdf5.cd('/pca')
self.pca_machine = bob.learn.linear.Machine(hdf5)
hdf5.cd('/plda')
self.plda_base = bob.learn.em.PLDABase(hdf5)
[docs] def enroll(self, enroll_features):
"""Enrolls the model by computing an average of the given input vectors"""
plda_machine = bob.learn.em.PLDAMachine(self.plda_base)
# project features, if enabled
if self.pca_machine is not None:
enroll_features = self._perform_pca(enroll_features)
# enroll
self.plda_trainer.enroll(plda_machine, enroll_features)
return plda_machine
[docs] def read_model(self, model_file):
"""Reads the model, which in this case is a PLDA-Machine"""
# read machine and attach base machine
plda_machine = bob.learn.em.PLDAMachine(bob.io.base.HDF5File(model_file), self.plda_base)
return plda_machine
[docs] def score(self, model, probe):
"""Computes the PLDA score for the given model and probe"""
return self.score_for_multiple_probes(model, [probe])
[docs] def score_for_multiple_probes(self, model, probes):
"""This function computes the score between the given model and several given probe files.
In this base class implementation, it computes the scores for each probe file using the 'score' method,
and fuses the scores using the fusion method specified in the constructor of this class."""
if self.pca_machine is not None:
probes = (self.pca_machine(probe) for probe in probes)
# forward
if self.score_set == 'joint_likelihood':
return model.log_likelihood_ratio(numpy.vstack(probes))
else:
return self.score_set([model.log_likelihood_ratio(probe) for probe in probes])
# re-define unused functions, just so that they do not get documented
[docs] def train_projector(*args,**kwargs): raise NotImplementedError()
[docs] def load_projector(*args,**kwargs): pass
[docs] def project(*args,**kwargs): raise NotImplementedError()
[docs] def write_feature(*args,**kwargs): raise NotImplementedError()
[docs] def read_feature(*args,**kwargs): raise NotImplementedError()