#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Laurent El Shafey <Laurent.El-Shafey@idiap.ch>
import logging
import numpy
import bob.core
import bob.io.base
import bob.learn.em
import bob.learn.linear
from bob.bio.base.algorithm import Algorithm
from .GMM import GMM
logger = logging.getLogger("bob.bio.gmm")
class IVector(GMM):
"""Tool for extracting I-Vectors"""
def __init__(
self,
# IVector training
subspace_dimension_of_t, # T subspace dimension
tv_training_iterations=25, # Number of EM iterations for the JFA training
update_sigma=True,
use_whitening=True,
use_lda=False,
use_wccn=False,
use_plda=False,
lda_dim=None,
lda_strip_to_rank=True,
plda_dim_F=50,
plda_dim_G=50,
plda_training_iterations=50,
# parameters of the GMM
**kwargs
):
"""Initializes the local GMM tool with the given file selector object"""
# call base class constructor with its set of parameters
GMM.__init__(self, **kwargs)
# call tool constructor to overwrite what was set before
Algorithm.__init__(
self,
performs_projection=True,
use_projected_features_for_enrollment=True,
requires_enroller_training=False, # not needed anymore because it's done while training the projector
split_training_features_by_client=True,
subspace_dimension_of_t=subspace_dimension_of_t,
tv_training_iterations=tv_training_iterations,
update_sigma=update_sigma,
use_whitening=use_whitening,
use_lda=use_lda,
use_wccn=use_wccn,
use_plda=use_plda,
lda_dim=lda_dim,
lda_strip_to_rank=lda_strip_to_rank,
plda_dim_F=plda_dim_F,
plda_dim_G=plda_dim_G,
plda_training_iterations=plda_training_iterations,
multiple_model_scoring=None,
multiple_probe_scoring=None,
**kwargs
)
self.update_sigma = update_sigma
self.use_whitening = use_whitening
self.use_lda = use_lda
self.use_wccn = use_wccn
self.use_plda = use_plda
self.subspace_dimension_of_t = subspace_dimension_of_t
self.tv_training_iterations = tv_training_iterations
self.ivector_trainer = bob.learn.em.IVectorTrainer(update_sigma=update_sigma)
self.whitening_trainer = bob.learn.linear.WhiteningTrainer()
self.lda_dim = lda_dim
self.lda_trainer = bob.learn.linear.FisherLDATrainer(
strip_to_rank=lda_strip_to_rank
)
self.wccn_trainer = bob.learn.linear.WCCNTrainer()
self.plda_trainer = bob.learn.em.PLDATrainer()
self.plda_dim_F = plda_dim_F
self.plda_dim_G = plda_dim_G
self.plda_training_iterations = plda_training_iterations
def _check_ivector(self, feature):
"""Checks that the features are appropriate"""
if (
not isinstance(feature, numpy.ndarray)
or feature.ndim != 1
or feature.dtype != numpy.float64
):
raise ValueError("The given feature is not appropriate")
[docs] def train_ivector(self, training_stats):
logger.info(" -> Training IVector enroller")
self.tv = bob.learn.em.IVectorMachine(
self.ubm, self.subspace_dimension_of_t, self.variance_threshold
)
# Reseting the pseudo random number generator so we can have the same initialization for serial and parallel execution.
self.rng = bob.core.random.mt19937(self.init_seed)
# train IVector model
bob.learn.em.train(
self.ivector_trainer,
self.tv,
training_stats,
self.tv_training_iterations,
rng=self.rng,
)
[docs] def train_whitener(self, training_features):
logger.info(" -> Training Whitening")
ivectors_matrix = numpy.vstack(training_features)
# create a Linear Machine
self.whitener = bob.learn.linear.Machine(
ivectors_matrix.shape[1], ivectors_matrix.shape[1]
)
# create the whitening trainer
self.whitening_trainer.train(ivectors_matrix, self.whitener)
[docs] def train_lda(self, training_features):
logger.info(" -> Training LDA projector")
self.lda, __eig_vals = self.lda_trainer.train(training_features)
# resize the machine if desired
# You can only clip if the rank is higher than LDA_DIM
if self.lda_dim is not None:
if len(__eig_vals) < self.lda_dim:
logger.warning(
" -> You are resizing the LDA matrix to a value above its rank"
"(from {0} to {1}). Be aware that this may lead you to imprecise eigenvectors.".format(
len(__eig_vals), self.lda_dim
)
)
self.lda.resize(self.lda.shape[0], self.lda_dim)
[docs] def train_wccn(self, training_features):
logger.info(" -> Training WCCN projector")
self.wccn = self.wccn_trainer.train(training_features)
[docs] def train_plda(self, training_features):
logger.info(" -> Training PLDA projector")
self.plda_trainer.init_f_method = "BETWEEN_SCATTER"
self.plda_trainer.init_g_method = "WITHIN_SCATTER"
self.plda_trainer.init_sigma_method = "VARIANCE_DATA"
variance_flooring = 1e-5
training_features = [numpy.vstack(client) for client in training_features]
input_dim = training_features[0].shape[1]
# Reseting the pseudo random number generator so we can have the same initialization for serial and parallel execution.
self.rng = bob.core.random.mt19937(self.init_seed)
self.plda_base = bob.learn.em.PLDABase(
input_dim, self.plda_dim_F, self.plda_dim_G, variance_flooring
)
bob.learn.em.train(
self.plda_trainer,
self.plda_base,
training_features,
self.plda_training_iterations,
rng=self.rng,
)
[docs] def train_projector(self, train_features, projector_file):
"""Train Projector and Enroller at the same time"""
[
self._check_feature(feature)
for client in train_features
for feature in client
]
# train UBM
data = numpy.vstack(feature for client in train_features for feature in client)
self.train_ubm(data)
del data
# project training data
logger.info(" -> Projecting training data")
train_gmm_stats = [
[self.project_ubm(feature) for feature in client]
for client in train_features
]
train_gmm_stats_flatten = [
stats for client in train_gmm_stats for stats in client
]
# train IVector
logger.info(" -> Projecting training data")
self.train_ivector(train_gmm_stats_flatten)
# project training i-vectors
train_ivectors = [
[self.project_ivector(stats) for stats in client]
for client in train_gmm_stats
]
train_ivectors_flatten = [
stats for client in train_ivectors for stats in client
]
if self.use_whitening:
# Train Whitening
self.train_whitener(train_ivectors_flatten)
# whitening and length-normalizing i-vectors
train_ivectors = [
[self.project_whitening(ivec) for ivec in client]
for client in train_ivectors
]
if self.use_lda:
self.train_lda(train_ivectors)
train_ivectors = [
[self.project_lda(ivec) for ivec in client] for client in train_ivectors
]
if self.use_wccn:
self.train_wccn(train_ivectors)
train_ivectors = [
[self.project_wccn(ivec) for ivec in client]
for client in train_ivectors
]
if self.use_plda:
self.train_plda(train_ivectors)
# save
self.save_projector(projector_file)
[docs] def save_projector(self, projector_file):
# Save the IVector base AND the UBM AND the whitening into the same file
hdf5file = bob.io.base.HDF5File(projector_file, "w")
hdf5file.create_group("Projector")
hdf5file.cd("Projector")
self.save_ubm(hdf5file)
hdf5file.cd("/")
hdf5file.create_group("Enroller")
hdf5file.cd("Enroller")
self.tv.save(hdf5file)
if self.use_whitening:
hdf5file.cd("/")
hdf5file.create_group("Whitener")
hdf5file.cd("Whitener")
self.whitener.save(hdf5file)
if self.use_lda:
hdf5file.cd("/")
hdf5file.create_group("LDA")
hdf5file.cd("LDA")
self.lda.save(hdf5file)
if self.use_wccn:
hdf5file.cd("/")
hdf5file.create_group("WCCN")
hdf5file.cd("WCCN")
self.wccn.save(hdf5file)
if self.use_plda:
hdf5file.cd("/")
hdf5file.create_group("PLDA")
hdf5file.cd("PLDA")
self.plda_base.save(hdf5file)
[docs] def load_tv(self, tv_file):
hdf5file = bob.io.base.HDF5File(tv_file)
self.tv = bob.learn.em.IVectorMachine(hdf5file)
# add UBM model from base class
self.tv.ubm = self.ubm
[docs] def load_whitener(self, whitening_file):
hdf5file = bob.io.base.HDF5File(whitening_file)
self.whitener = bob.learn.linear.Machine(hdf5file)
[docs] def load_lda(self, lda_file):
hdf5file = bob.io.base.HDF5File(lda_file)
self.lda = bob.learn.linear.Machine(hdf5file)
[docs] def load_wccn(self, wccn_file):
hdf5file = bob.io.base.HDF5File(wccn_file)
self.wccn = bob.learn.linear.Machine(hdf5file)
[docs] def load_plda(self, plda_file):
hdf5file = bob.io.base.HDF5File(plda_file)
self.plda_base = bob.learn.em.PLDABase(hdf5file)
self.plda_machine = bob.learn.em.PLDAMachine(self.plda_base)
[docs] def load_projector(self, projector_file):
"""Load the GMM and the ISV model from the same HDF5 file"""
hdf5file = bob.io.base.HDF5File(projector_file)
# Load Projector
hdf5file.cd("/Projector")
self.load_ubm(hdf5file)
# Load Enroller
hdf5file.cd("/Enroller")
self.load_tv(hdf5file)
if self.use_whitening:
# Load Whitening
hdf5file.cd("/Whitener")
self.load_whitener(hdf5file)
if self.use_lda:
# Load LDA
hdf5file.cd("/LDA")
self.load_lda(hdf5file)
if self.use_wccn:
# Load WCCN
hdf5file.cd("/WCCN")
self.load_wccn(hdf5file)
if self.use_plda:
# Load PLDA
hdf5file.cd("/PLDA")
self.load_plda(hdf5file)
[docs] def project_ivector(self, gmm_stats):
return self.tv.project(gmm_stats)
[docs] def project_whitening(self, ivector):
whitened = self.whitener.forward(ivector)
return whitened / numpy.linalg.norm(whitened)
[docs] def project_lda(self, ivector):
out_ivector = numpy.ndarray(self.lda.shape[1], numpy.float64)
self.lda(ivector, out_ivector)
return out_ivector
[docs] def project_wccn(self, ivector):
out_ivector = numpy.ndarray(self.wccn.shape[1], numpy.float64)
self.wccn(ivector, out_ivector)
return out_ivector
#######################################################
# IVector projection #
[docs] def project(self, feature_array):
"""Computes GMM statistics against a UBM, then corresponding Ux vector"""
self._check_feature(feature_array)
# project UBM
projected_ubm = self.project_ubm(feature_array)
# project I-Vector
ivector = self.project_ivector(projected_ubm)
# whiten I-Vector
if self.use_whitening:
ivector = self.project_whitening(ivector)
# LDA projection
if self.use_lda:
ivector = self.project_lda(ivector)
# WCCN projection
if self.use_wccn:
ivector = self.project_wccn(ivector)
return ivector
#######################################################
# Read / Write I-Vectors #
[docs] def write_feature(self, data, feature_file):
"""Saves the feature, which is the (whitened) I-Vector."""
bob.bio.base.save(data, feature_file)
[docs] def read_feature(self, feature_file):
"""Read the type of features that we require, namely i-vectors (stored as simple numpy arrays)"""
return bob.bio.base.load(feature_file)
#######################################################
# Model Enrollment #
[docs] def enroll(self, enroll_features):
"""Performs IVector enrollment"""
[self._check_ivector(feature) for feature in enroll_features]
average_ivector = numpy.mean(numpy.vstack(enroll_features), axis=0)
if self.use_plda:
average_ivector = average_ivector.reshape(1, -1)
self.plda_trainer.enroll(self.plda_machine, average_ivector)
return self.plda_machine
else:
return average_ivector
######################################################
# Feature comparison #
[docs] def read_model(self, model_file):
"""Reads the whitened i-vector that holds the model"""
if self.use_plda:
return bob.learn.em.PLDAMachine(
bob.io.base.HDF5File(str(model_file)), self.plda_base
)
else:
return bob.bio.base.load(model_file)
[docs] def score(self, model, probe):
"""Computes the score for the given model and the given probe."""
self._check_ivector(probe)
if self.use_plda:
return model.log_likelihood_ratio(probe)
else:
self._check_ivector(model)
return numpy.dot(
model / numpy.linalg.norm(model), probe / numpy.linalg.norm(probe)
)
[docs] def score_for_multiple_probes(self, model, probes):
"""This function computes the score between the given model and several given probe files."""
probe = numpy.mean(numpy.vstack(probes), axis=0)
return self.score(model, probe)