#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Tiago de Freitas Pereira <tiago.pereira@idiap.ch>
# Fri Feb 13 13:18:10 2015 +0200
#
# Copyright (C) 2011-2015 Idiap Research Institute, Martigny, Switzerland
import numpy
from ._library import *
import logging
from multiprocessing.pool import ThreadPool
logger = logging.getLogger(__name__)
def _set_average(trainer, trainers, machine, data, trainer_type):
"""_set_average(trainer, data) -> None
This function computes the average of the given data and sets it to the given machine.
This function works for different types of trainers, and can be used to parallelize the training.
For some trainers, the data is returned instead of set in the trainer.
**Parameters:**
trainer : one of :py:class:`KMeansTrainer`, :py:class:`MAP_GMMTrainer`, :py:class:`ML_GMMTrainer`, :py:class:`ISVTrainer`, :py:class:`IVectorTrainer`, :py:class:`PLDATrainer`, :py:class:`EMPCATrainer`
The trainer to set the data to.
trainers : [ trainer ]
The list of trainer objects that were used in the parallel training process.
All trainers must be of the same class as the ``trainer``.
data : [ object ]
The list of data objects that should be set to the trainer.
Usually this list is generated by parallelizing the e-step of the ``trainer``.
"""
if trainer_type == "KMeansTrainer":
# K-Means statistics
trainer.reset_accumulators(machine)
for t in trainers:
trainer.zeroeth_order_statistics = trainer.zeroeth_order_statistics + t.zeroeth_order_statistics
trainer.first_order_statistics = trainer.first_order_statistics + t.first_order_statistics
trainer.average_min_distance = trainer.average_min_distance + t.average_min_distance
trainer.average_min_distance /= data.shape[0]
elif trainer_type in ("ML_GMMTrainer", "MAP_GMMTrainer"):
# GMM statistics
trainer.gmm_statistics = trainers[0].gmm_statistics
for t in trainers[1:]:
trainer.gmm_statistics += t.gmm_statistics
elif trainer_type == "IVectorTrainer":
# GMM statistics
trainer.reset_accumulators(machine)
trainer.acc_fnormij_wij = trainers[0].acc_fnormij_wij
trainer.acc_nij_wij2 = trainers[0].acc_nij_wij2
trainer.acc_nij = trainers[0].acc_nij
trainer.acc_snormij = trainers[0].acc_snormij
for t in trainers[1:]:
trainer.acc_fnormij_wij = trainer.acc_fnormij_wij + t.acc_fnormij_wij
trainer.acc_nij_wij2 = trainer.acc_nij_wij2 + t.acc_nij_wij2
trainer.acc_nij = trainer.acc_nij + t.acc_nij
trainer.acc_snormij = trainer.acc_snormij + t.acc_snormij
else:
raise NotImplementedError("Implement Me!")
def _parallel_e_step(args):
"""This function applies the e_step of the given trainer (first argument) on the given data (second argument).
It is called by each parallel process.
"""
trainer, machine, data = args
trainer.e_step(machine, data)
[docs]def train(trainer, machine, data, max_iterations=50, convergence_threshold=None, initialize=True, rng=None, check_inputs=True, pool=None, trainer_type=None):
"""
Trains a machine given a trainer and the proper data
**Parameters**:
trainer : one of :py:class:`KMeansTrainer`, :py:class:`MAP_GMMTrainer`, :py:class:`ML_GMMTrainer`, :py:class:`ISVTrainer`, :py:class:`IVectorTrainer`, :py:class:`PLDATrainer`, :py:class:`EMPCATrainer`
A trainer mechanism
machine : one of :py:class:`KMeansMachine`, :py:class:`GMMMachine`, :py:class:`ISVBase`, :py:class:`IVectorMachine`, :py:class:`PLDAMachine`, :py:class:`bob.learn.linear.Machine`
A container machine
data : array_like <float, 2D>
The data to be trained
max_iterations : int
The maximum number of iterations to train a machine
convergence_threshold : float
The convergence threshold to train a machine. If None, the training procedure will stop with the iterations criteria
initialize : bool
If True, runs the initialization procedure
rng : :py:class:`bob.core.random.mt19937`
The Mersenne Twister mt19937 random generator used for the initialization of subspaces/arrays before the EM loop
check_inputs:
Shallow checks in the inputs. Check for inf and NaN
pool : ``int`` or ``multiprocessing.ThreadPool`` or ``None``
If given, the provided process pool will be used to parallelize the M-step of the
EM algorithm. You should provide a ThreadPool not a multi process Pool. If pool is
an integer, it will be used to create a ThreadPool with that many processes.
trainer_type : ``str`` or ``None``
This is used for the parallel e_step method to see how several processes' data can
be merged into one trainer before the m_step. By default
``trainer.__class__.__name__`` is used. This is useful if you have custom trainers
and want to use this function.
"""
if check_inputs and isinstance(data, numpy.ndarray):
sum_data = numpy.sum(data)
if numpy.isinf(sum_data):
raise ValueError("Please, check your inputs; numpy.inf detected in `data` ")
if numpy.isnan(sum_data):
raise ValueError("Please, check your inputs; numpy.nan detected in `data` ")
if isinstance(pool, int):
pool = ThreadPool(pool)
if trainer_type is None:
trainer_type = trainer.__class__.__name__
def _e_step(trainer, machine, data):
# performs the e-step, possibly in parallel
if pool is None:
# use only one core
trainer.e_step(machine, data)
else:
# use the given process pool
n_processes = pool._processes
# Mapping references of the data
split_data = []
offset = 0
step = int(len(data) // n_processes)
for p in range(n_processes):
if p == n_processes - 1:
# take all the data in the last chunk
split_data.append(data[offset:])
else:
split_data.append(data[offset: offset + step])
offset += step
# create trainers for each process
trainers = [trainer.__class__(trainer) for p in range(n_processes)]
# no need to copy the machines
machines = [machine.__class__(machine) for p in range(n_processes)]
# call the parallel processes
pool.map(_parallel_e_step, zip(trainers, machines, split_data))
# update the trainer with the data of the other trainers
_set_average(trainer, trainers, machine, data, trainer_type)
# Initialization
if initialize:
if rng is not None:
trainer.initialize(machine, data, rng)
else:
trainer.initialize(machine, data)
_e_step(trainer, machine, data)
average_output = 0
average_output_previous = 0
if hasattr(trainer,"compute_likelihood"):
average_output = trainer.compute_likelihood(machine)
for i in range(max_iterations):
logger.info("Iteration = %d/%d", i, max_iterations)
average_output_previous = average_output
trainer.m_step(machine, data)
_e_step(trainer, machine,data)
if hasattr(trainer,"compute_likelihood"):
average_output = trainer.compute_likelihood(machine)
if isinstance(machine, KMeansMachine):
logger.info("average euclidean distance = %f", average_output)
else:
logger.info("log likelihood = %f", average_output)
convergence_value = abs((average_output_previous - average_output)/average_output_previous)
logger.info("convergence value = %f",convergence_value)
#Terminates if converged (and likelihood computation is set)
if convergence_threshold is not None and convergence_value <= convergence_threshold:
break
if hasattr(trainer,"finalize"):
trainer.finalize(machine, data)
[docs]def train_jfa(trainer, jfa_base, data, max_iterations=10, initialize=True, rng=None):
"""
Trains a :py:class:`bob.learn.em.JFABase` given a :py:class:`bob.learn.em.JFATrainer` and the proper data
**Parameters**:
trainer : :py:class:`bob.learn.em.JFATrainer`
A JFA trainer mechanism
jfa_base : :py:class:`bob.learn.em.JFABase`
A container machine
data : [[:py:class:`bob.learn.em.GMMStats`]]
The data to be trained
max_iterations : int
The maximum number of iterations to train a machine
initialize : bool
If True, runs the initialization procedure
rng : :py:class:`bob.core.random.mt19937`
The Mersenne Twister mt19937 random generator used for the initialization of subspaces/arrays before the EM loops
"""
if initialize:
if rng is not None:
trainer.initialize(jfa_base, data, rng)
else:
trainer.initialize(jfa_base, data)
# V Subspace
logger.info("V subspace estimation...")
for i in range(max_iterations):
logger.info("Iteration = %d/%d", i, max_iterations)
trainer.e_step_v(jfa_base, data)
trainer.m_step_v(jfa_base, data)
trainer.finalize_v(jfa_base, data)
# U subspace
logger.info("U subspace estimation...")
for i in range(max_iterations):
logger.info("Iteration = %d/%d", i, max_iterations)
trainer.e_step_u(jfa_base, data)
trainer.m_step_u(jfa_base, data)
trainer.finalize_u(jfa_base, data)
# D subspace
logger.info("D subspace estimation...")
for i in range(max_iterations):
logger.info("Iteration = %d/%d", i, max_iterations)
trainer.e_step_d(jfa_base, data)
trainer.m_step_d(jfa_base, data)
trainer.finalize_d(jfa_base, data)