Source code for bob.learn.em.gmm

#!/usr/bin/env python
# @author: Yannick Dayer <yannick.dayer@idiap.ch>
# @date: Fri 30 Jul 2021 10:06:47 UTC+02

"""This module provides classes and functions for the training and usage of GMM."""

import copy
import functools
import logging
import operator

from typing import Union

import dask
import dask.array as da
import numpy as np

from h5py import File as HDF5File
from sklearn.base import BaseEstimator

from .kmeans import KMeansMachine
from .utils import array_to_delayed_list, check_and_persist_dask_input

logger = logging.getLogger(__name__)

EPSILON = np.finfo(float).eps


def logaddexp_reduce(array, axis=0, keepdims=False):
    return np.logaddexp.reduce(
        array, axis=axis, keepdims=keepdims, initial=-np.inf
    )


def log_weighted_likelihood(data, machine):
    """Returns the weighted log likelihood for each Gaussian for a set of data.

    Parameters
    ----------
    data
        Data to compute the log likelihood on.
    machine
        The GMM machine.

    Returns
    -------
    array of shape (n_gaussians, n_samples)
        The weighted log likelihood of each sample of each Gaussian.
    """
    # Compute the likelihood for each data point on each Gaussian
    n_gaussians = len(machine.means)
    z = []
    for i in range(n_gaussians):
        temp = np.sum(
            (data - machine.means[i]) ** 2 / machine.variances[i], axis=-1
        )
        z.append(temp)
    z = np.vstack(z)
    ll = -0.5 * (machine.g_norms[:, None] + z)
    log_weighted_likelihoods = machine.log_weights[:, None] + ll
    return log_weighted_likelihoods


def reduce_loglikelihood(log_weighted_likelihoods):
    if isinstance(log_weighted_likelihoods, np.ndarray):
        log_likelihood = logaddexp_reduce(log_weighted_likelihoods)
    else:
        # Sum along gaussians axis (using logAddExp to prevent underflow)
        log_likelihood = da.reduction(
            x=log_weighted_likelihoods,
            chunk=logaddexp_reduce,
            aggregate=logaddexp_reduce,
            axis=0,
            dtype=float,
            keepdims=False,
        )
    return log_likelihood


def log_likelihood(data, machine):
    """Returns the current log likelihood for a set of data in this Machine.

    Parameters
    ----------
    data
        Data to compute the log likelihood on.
    machine
        The GMM machine.

    Returns
    -------
    array of shape (n_samples)
        The log likelihood of each sample.
    """
    data = np.atleast_2d(data)

    # All likelihoods [array of shape (n_gaussians, n_samples)]
    log_weighted_likelihoods = log_weighted_likelihood(
        data=data,
        machine=machine,
    )
    # Likelihoods of each sample on this machine. [array of shape (n_samples,)]
    ll_reduced = reduce_loglikelihood(log_weighted_likelihoods)
    return ll_reduced


def e_step(data, machine):
    """Expectation step of the e-m algorithm."""
    # Ensure data is a series of samples (2D array)
    data = np.atleast_2d(data)

    n_gaussians = len(machine.weights)

    # Allow the absence of previous statistics
    statistics = GMMStats(n_gaussians, data.shape[-1], like=data)

    # Log weighted Gaussian likelihoods [array of shape (n_gaussians,n_samples)]
    log_weighted_likelihoods = log_weighted_likelihood(
        data=data, machine=machine
    )

    # Log likelihood [array of shape (n_samples,)]
    log_likelihood = reduce_loglikelihood(log_weighted_likelihoods)

    # Responsibility P [array of shape (n_gaussians, n_samples)]
    responsibility = np.exp(log_weighted_likelihoods - log_likelihood[None, :])

    # Accumulate

    # Total likelihood [float]
    statistics.log_likelihood = log_likelihood.sum()
    # Count of samples [int]
    statistics.t = data.shape[0]
    # Responsibilities [array of shape (n_gaussians,)]
    statistics.n = responsibility.sum(axis=-1)
    sum_px, sum_pxx = [], []
    for i in range(n_gaussians):
        # p * x [array of shape (n_gaussians, n_samples, n_features)]
        px = responsibility[i, :, None] * data
        # First order stats [array of shape (n_gaussians, n_features)]
        # statistics.sum_px[i] = np.sum(px, axis=0)
        sum_px.append(np.sum(px, axis=0))
        # Second order stats [array of shape (n_gaussians, n_features)]
        # statistics.sum_pxx[i] = np.sum(px * data, axis=0)
        sum_pxx.append(np.sum(px * data, axis=0))

    statistics.sum_px = np.vstack(sum_px)
    statistics.sum_pxx = np.vstack(sum_pxx)

    return statistics


def m_step(
    statistics,
    machine,
):
    """Maximization step of the e-m algorithm."""
    m_step_func = map_gmm_m_step if machine.trainer == "map" else ml_gmm_m_step
    statistics = functools.reduce(operator.iadd, statistics)
    m_step_func(
        machine=machine,
        statistics=statistics,
        update_means=machine.update_means,
        update_variances=machine.update_variances,
        update_weights=machine.update_weights,
        mean_var_update_threshold=machine.mean_var_update_threshold,
        reynolds_adaptation=machine.map_relevance_factor is not None,
        alpha=machine.map_alpha,
        relevance_factor=machine.map_relevance_factor,
    )
    average_output = float(statistics.log_likelihood / statistics.t)
    return machine, average_output


class GMMStats:
    """Stores accumulated statistics of a GMM.

    Attributes
    ----------
    log_likelihood: float
        The sum of log_likelihood of each sample on a GMM.
    t: int
        The number of considered samples.
    n: array of shape (n_gaussians,)
        Sum of responsibility.
    sum_px: array of shape (n_gaussians, n_features)
        First order statistic
    sum_pxx: array of shape (n_gaussians, n_features)
        Second order statistic
    """

    def __init__(
        self, n_gaussians: int, n_features: int, like=None, **kwargs
    ) -> None:
        super().__init__(**kwargs)
        self.n_gaussians = n_gaussians
        self.n_features = n_features
        self.log_likelihood = 0
        self.t = 0
        # create dask arrays if required
        kw = dict(like=like) if like is not None else {}
        self.n = np.zeros(shape=(self.n_gaussians,), dtype=float, **kw)
        self.sum_px = np.zeros(
            shape=(self.n_gaussians, self.n_features), dtype=float, **kw
        )
        self.sum_pxx = np.zeros(
            shape=(self.n_gaussians, self.n_features), dtype=float, **kw
        )

[docs] def init_fields( self, log_likelihood=0.0, t=0, n=None, sum_px=None, sum_pxx=None ): """Initializes the statistics values to a defined value, or zero by default.""" # The accumulated log likelihood of all samples self.log_likelihood = log_likelihood # The accumulated number of samples self.t = t # For each Gaussian, the accumulated sum of responsibilities, i.e. the sum of # P(gaussian_i|x) self.n = ( np.zeros(shape=(self.n_gaussians,), dtype=float) if n is None else n ) # For each Gaussian, the accumulated sum of responsibility times the sample self.sum_px = ( np.zeros(shape=(self.n_gaussians, self.n_features), dtype=float) if sum_px is None else sum_px ) # For each Gaussian, the accumulated sum of responsibility times the sample # squared self.sum_pxx = ( np.zeros(shape=(self.n_gaussians, self.n_features), dtype=float) if sum_pxx is None else sum_pxx )
[docs] def reset(self): """Sets all statistics to zero.""" self.init_fields()
[docs] @classmethod def from_hdf5(cls, hdf5): """Creates a new GMMStats object from an `HDF5File` object.""" if isinstance(hdf5, str): hdf5 = HDF5File(hdf5, "r") try: version_major, version_minor = hdf5.attrs["file_version"].split(".") logger.debug( f"Reading a GMMStats HDF5 file of version {version_major}.{version_minor}" ) except (KeyError, RuntimeError): version_major, version_minor = 0, 0 if int(version_major) >= 1: if hdf5.attrs["writer_class"] != str(cls): logger.warning(f"{hdf5.attrs['writer_class']} is not {cls}.") self = cls( n_gaussians=hdf5["n_gaussians"][()], n_features=hdf5["n_features"][()], ) self.log_likelihood = hdf5["log_likelihood"][()] self.t = hdf5["T"][()] self.n = hdf5["n"][...] self.sum_px = hdf5["sumPx"][...] self.sum_pxx = hdf5["sumPxx"][...] else: # Legacy file version logger.info("Loading a legacy HDF5 stats file.") self = cls( n_gaussians=int(hdf5["n_gaussians"][()]), n_features=int(hdf5["n_inputs"][()]), ) self.log_likelihood = hdf5["log_liklihood"][()] self.t = int(hdf5["T"][()]) self.n = np.reshape(hdf5["n"], (self.n_gaussians,)) self.sum_px = np.reshape(hdf5["sumPx"], (self.shape)) self.sum_pxx = np.reshape(hdf5["sumPxx"], (self.shape)) return self
[docs] def save(self, hdf5): """Saves the current statistsics in an `HDF5File` object.""" if isinstance(hdf5, str): hdf5 = HDF5File(hdf5, "w") hdf5.attrs["file_version"] = "1.0" hdf5.attrs["writer_class"] = str(self.__class__) hdf5["n_gaussians"] = self.n_gaussians hdf5["n_features"] = self.n_features hdf5["log_likelihood"] = float(self.log_likelihood) hdf5["T"] = int(self.t) hdf5["n"] = np.array(self.n) hdf5["sumPx"] = np.array(self.sum_px) hdf5["sumPxx"] = np.array(self.sum_pxx)
[docs] def load(self, hdf5): """Overwrites the current statistics with those in an `HDF5File` object.""" new_self = self.from_hdf5(hdf5) if new_self.shape != self.shape: logger.warning("Loaded GMMStats from hdf5 with a different shape.") self.resize(*new_self.shape) self.init_fields( new_self.log_likelihood, new_self.t, new_self.n, new_self.sum_px, new_self.sum_pxx, )
def __add__(self, other): if ( self.n_gaussians != other.n_gaussians or self.n_features != other.n_features ): raise ValueError( "Statistics could not be added together (shape mismatch)" ) new_stats = GMMStats(self.n_gaussians, self.n_features) new_stats.log_likelihood = self.log_likelihood + other.log_likelihood new_stats.t = self.t + other.t new_stats.n = self.n + other.n new_stats.sum_px = self.sum_px + other.sum_px new_stats.sum_pxx = self.sum_pxx + other.sum_pxx return new_stats def __iadd__(self, other): if ( self.n_gaussians != other.n_gaussians or self.n_features != other.n_features ): raise ValueError( "Statistics could not be added together (shape mismatch)" ) self.log_likelihood += other.log_likelihood self.t += other.t self.n += other.n self.sum_px += other.sum_px self.sum_pxx += other.sum_pxx return self def __eq__(self, other): return ( self.log_likelihood == other.log_likelihood and self.t == other.t and np.array_equal(self.n, other.n) and np.array_equal(self.sum_px, other.sum_px) and np.array_equal(self.sum_pxx, other.sum_pxx) )
[docs] def is_similar_to(self, other, rtol=1e-5, atol=1e-8): """Returns True if `other` has the same values (within a tolerance).""" return ( np.isclose( self.log_likelihood, other.log_likelihood, rtol=rtol, atol=atol ) and np.isclose(self.t, other.t, rtol=rtol, atol=atol) and np.allclose(self.n, other.n, rtol=rtol, atol=atol) and np.allclose(self.sum_px, other.sum_px, rtol=rtol, atol=atol) and np.allclose(self.sum_pxx, other.sum_pxx, rtol=rtol, atol=atol) )
[docs] def resize(self, n_gaussians, n_features): """Reinitializes the machine with new dimensions.""" self.n_gaussians = n_gaussians self.n_features = n_features self.init_fields()
@property def shape(self): """The number of gaussians and their dimensionality.""" return (self.n_gaussians, self.n_features) @property def nbytes(self): """The number of bytes used by the statistics n, sum_px, sum_pxx.""" return self.n.nbytes + self.sum_px.nbytes + self.sum_pxx.nbytes class GMMMachine(BaseEstimator): """Transformer that stores a Gaussian Mixture Model (GMM) parameters. This class implements the statistical model for multivariate diagonal mixture Gaussian distribution (GMM), as well as ways to train a model on data. A GMM is defined as :math:`\\sum_{c=0}^{C} \\omega_c \\mathcal{N}(x | \\mu_c, \\sigma_c)`, where :math:`C` is the number of Gaussian components :math:`\\mu_c`, :math:`\\sigma_c` and :math:`\\omega_c` are respectively the the mean, variance and the weight of each gaussian component :math:`c`. See Section 2.3.9 of Bishop, \"Pattern recognition and machine learning\", 2006 Two types of training are available MLE and MAP, chosen with `trainer`. * Maximum Likelihood Estimation (:ref:`MLE <mle>`, ML) The mixtures are initialized (with k-means by default). The means, variances, and weights of the mixtures are then trained on the data to increase the likelihood value. (:ref:`MLE <mle>`) * Maximum a Posteriori (:ref:`MAP <map>`) The MAP machine takes another GMM machine as prior, called Universal Background Model (UBM). The means, variances, and weights of the MAP mixtures are then trained on the data as adaptation of the UBM. Both training method use a Expectation-Maximization (e-m) algorithm to iteratively train the GMM. Note ---- When setting manually any of the means, variances or variance thresholds, the k-means initialization will be skipped in `fit`. Attributes ---------- means, variances, variance_thresholds Gaussians parameters. """ def __init__( self, n_gaussians: int, trainer: str = "ml", ubm: "Union[GMMMachine, None]" = None, convergence_threshold: float = 1e-5, max_fitting_steps: Union[int, None] = 200, random_state: Union[int, np.random.RandomState] = 0, weights: "Union[np.ndarray[('n_gaussians',), float], None]" = None, # noqa: F821 k_means_trainer: Union[KMeansMachine, None] = None, update_means: bool = True, update_variances: bool = False, update_weights: bool = False, mean_var_update_threshold: float = EPSILON, map_alpha: float = 0.5, map_relevance_factor: Union[None, float] = 4, **kwargs, ): """ Parameters ---------- n_gaussians The number of gaussians to be represented by the machine. trainer `"ml"` for the maximum likelihood estimator method; `"map"` for the maximum a posteriori method. (MAP Requires `ubm`) ubm: GMMMachine Universal Background Model. GMMMachine Required for the MAP method. convergence_threshold The threshold value of likelihood difference between e-m steps used for stopping the training iterations. max_fitting_steps The number of e-m iterations to fit the GMM. Stop the training even when the convergence threshold isn't met. random_state Specifies a RandomState or a seed for reproducibility. weights The weight of each Gaussian. (defaults to `1/n_gaussians`) k_means_trainer Optional trainer for the k-means method, replacing the default one. update_means Update the Gaussians means at every m step. update_variances Update the Gaussians variances at every m step. update_weights Update the GMM weights at every m step. mean_var_update_threshold Threshold value used when updating the means and variances. map_alpha Ratio for MAP adaptation. Used when `trainer == "map"` and `relevance_factor is None`) map_relevance_factor Factor for the computation of alpha with Reynolds adaptation. (Used when `trainer == "map"`) """ super().__init__(**kwargs) self.n_gaussians = n_gaussians self.trainer = trainer if trainer in ["ml", "map"] else "ml" self.m_step_func = ( map_gmm_m_step if self.trainer == "map" else ml_gmm_m_step ) if self.trainer == "map" and ubm is None: raise ValueError("A UBM is required for MAP GMM.") if ubm is not None and ubm.n_gaussians != self.n_gaussians: raise ValueError( "The UBM machine is not compatible with this machine." ) self.ubm = ubm if max_fitting_steps is None and convergence_threshold is None: raise ValueError( "Either or both convergence_threshold and max_fitting_steps must be set" ) self.convergence_threshold = convergence_threshold self.max_fitting_steps = max_fitting_steps self.random_state = random_state self.k_means_trainer = k_means_trainer self.update_means = update_means self.update_variances = update_variances self.update_weights = update_weights self.mean_var_update_threshold = mean_var_update_threshold self._means = None self._variances = None self._variance_thresholds = mean_var_update_threshold self._g_norms = None if self.ubm is not None: self.means = copy.deepcopy(self.ubm.means) self.variances = copy.deepcopy(self.ubm.variances) self.variance_thresholds = copy.deepcopy( self.ubm.variance_thresholds ) self.weights = copy.deepcopy(self.ubm.weights) else: self.weights = np.full( (self.n_gaussians,), fill_value=(1 / self.n_gaussians), dtype=float, ) if weights is not None: self.weights = weights self.map_alpha = map_alpha self.map_relevance_factor = map_relevance_factor @property def weights(self): """The weights of each Gaussian mixture.""" return self._weights @weights.setter def weights( self, weights: "np.ndarray[('n_gaussians',), float]" # noqa: F821 ): # noqa: F821 self._weights = weights self._log_weights = np.log(self._weights) @property def means(self): """The means of each Gaussian.""" if self._means is None: raise ValueError("GMMMachine means were never set.") return self._means @means.setter def means( self, means: "np.ndarray[('n_gaussians', 'n_features'), float]", # noqa: F821 ): # noqa: F821 self._means = means @property def variances(self): """The (diagonal) variances of the gaussians.""" if self._variances is None: raise ValueError("GMMMachine variances were never set.") return self._variances @variances.setter def variances( self, variances: "np.ndarray[('n_gaussians', 'n_features'), float]", # noqa: F821 ): self._variances = np.maximum(self.variance_thresholds, variances) # Recompute g_norm for each gaussian [array of shape (n_gaussians,)] n_log_2pi = self._variances.shape[-1] * np.log(2 * np.pi) self._g_norms = n_log_2pi + np.log(self._variances).sum(axis=-1) @property def variance_thresholds(self): """Threshold below which variances are clamped to prevent precision losses.""" if self._variance_thresholds is None: return EPSILON return self._variance_thresholds @variance_thresholds.setter def variance_thresholds( self, threshold: "Union[float, np.ndarray[('n_gaussians', 'n_features'), float]]", # noqa: F821 ): self._variance_thresholds = threshold if self._variances is not None: self.variances = np.maximum(threshold, self._variances) @property def g_norms(self): """Precomputed g_norms (depends on variances and feature shape).""" if self._g_norms is None: # Recompute g_norm for each gaussian [array of shape (n_gaussians,)] n_log_2pi = self.variances.shape[-1] * np.log(2 * np.pi) self._g_norms = n_log_2pi + np.log(self._variances).sum(axis=-1) return self._g_norms @property def log_weights(self): """Retrieve the logarithm of the weights.""" return self._log_weights @property def shape(self): """Shape of the gaussians in the GMM machine.""" return self.means.shape
[docs] @classmethod def from_hdf5(cls, hdf5, ubm=None): """Creates a new GMMMachine object from an `HDF5File` object.""" if isinstance(hdf5, str): hdf5 = HDF5File(hdf5, "r") try: version_major, version_minor = hdf5.attrs["file_version"].split(".") logger.debug( f"Reading a GMMMachine HDF5 file of version {version_major}.{version_minor}" ) except (KeyError, RuntimeError): version_major, version_minor = 0, 0 if int(version_major) >= 1: if hdf5.attrs["writer_class"] != str(cls): logger.warning(f"{hdf5.attrs['writer_class']} is not {cls}.") if hdf5["trainer"] == "map" and ubm is None: raise ValueError( "The UBM is needed when loading a MAP machine." ) self = cls( n_gaussians=hdf5["n_gaussians"][()], trainer=hdf5["trainer"][()], ubm=ubm, convergence_threshold=1e-5, max_fitting_steps=hdf5["max_fitting_steps"][()], weights=hdf5["weights"][...], k_means_trainer=None, update_means=hdf5["update_means"][()], update_variances=hdf5["update_variances"][()], update_weights=hdf5["update_weights"][()], ) gaussians_group = hdf5["gaussians"] self.means = gaussians_group["means"][...] self.variances = gaussians_group["variances"][...] self.variance_thresholds = gaussians_group["variance_thresholds"][ ... ] else: # Legacy file version logger.info("Loading a legacy HDF5 machine file.") n_gaussians = hdf5["m_n_gaussians"][()] g_means = [] g_variances = [] g_variance_thresholds = [] for i in range(n_gaussians): gaussian_group = hdf5[f"m_gaussians{i}"] g_means.append(gaussian_group["m_mean"][...]) g_variances.append(gaussian_group["m_variance"][...]) g_variance_thresholds.append( gaussian_group["m_variance_thresholds"][...] ) weights = np.reshape(hdf5["m_weights"], (n_gaussians,)) self = cls(n_gaussians=n_gaussians, ubm=ubm, weights=weights) self.means = np.array(g_means).reshape(n_gaussians, -1) self.variances = np.array(g_variances).reshape(n_gaussians, -1) self.variance_thresholds = np.array(g_variance_thresholds).reshape( n_gaussians, -1 ) return self
[docs] def load(self, hdf5): """Overwrites the current state with those in an `HDF5File` object.""" new_self = self.from_hdf5(hdf5) self.__dict__.update(new_self.__dict__)
[docs] def save(self, hdf5): """Saves the current statistics in an `HDF5File` object.""" if isinstance(hdf5, str): hdf5 = HDF5File(hdf5, "w") hdf5.attrs["file_version"] = "1.0" hdf5.attrs["writer_class"] = str(self.__class__) hdf5["n_gaussians"] = self.n_gaussians hdf5["trainer"] = self.trainer hdf5["convergence_threshold"] = self.convergence_threshold hdf5["max_fitting_steps"] = self.max_fitting_steps hdf5["weights"] = self.weights hdf5["update_means"] = self.update_means hdf5["update_variances"] = self.update_variances hdf5["update_weights"] = self.update_weights gaussians_group = hdf5.create_group("gaussians") gaussians_group["means"] = self.means gaussians_group["variances"] = self.variances gaussians_group["variance_thresholds"] = self.variance_thresholds
def __eq__(self, other): if self._means is None: return False return ( np.allclose(self.means, other.means) and np.allclose(self.variances, other.variances) and np.allclose(self.variance_thresholds, other.variance_thresholds) and np.allclose(self.weights, other.weights) )
[docs] def is_similar_to(self, other, rtol=1e-5, atol=1e-8): """Returns True if `other` has the same gaussians (within a tolerance).""" return ( np.allclose(self.means, other.means, rtol=rtol, atol=atol) and np.allclose( self.variances, other.variances, rtol=rtol, atol=atol ) and np.allclose( self.variance_thresholds, other.variance_thresholds, rtol=rtol, atol=atol, ) and np.allclose(self.weights, other.weights, rtol=rtol, atol=atol) )
[docs] def initialize_gaussians( self, data: "Union[np.ndarray[('n_samples', 'n_features'), float], None]" = None, # noqa: F821 ): """Populates gaussians parameters with either k-means or the UBM values.""" if self.trainer == "map": self.means = copy.deepcopy(self.ubm.means) self.variances = copy.deepcopy(self.ubm.variances) self.variance_thresholds = copy.deepcopy( self.ubm.variance_thresholds ) self.weights = copy.deepcopy(self.ubm.weights) else: logger.debug("GMM means was never set. Initializing with k-means.") if data is None: raise ValueError("Data is required when training with k-means.") logger.info("Initializing GMM with k-means.") kmeans_machine = self.k_means_trainer or KMeansMachine( self.n_gaussians, random_state=self.random_state, ) kmeans_machine = kmeans_machine.fit(data) # Set the GMM machine's gaussians with the results of k-means self.means = copy.deepcopy(kmeans_machine.centroids_) logger.debug( "Estimating the variance and weights of each gaussian from kmeans." ) ( self.variances, self.weights, ) = kmeans_machine.get_variances_and_weights_for_each_cluster(data) logger.debug("Done.")
[docs] def log_weighted_likelihood( self, data: "np.ndarray[('n_samples', 'n_features'), float]", # noqa: F821 ): """Returns the weighted log likelihood for each Gaussian for a set of data. Parameters ---------- data Data to compute the log likelihood on. Returns ------- array of shape (n_gaussians, n_samples) The weighted log likelihood of each sample of each Gaussian. """ return log_weighted_likelihood( data=data, machine=self, )
[docs] def log_likelihood( self, data: "np.ndarray[('n_samples', 'n_features'), float]", # noqa: F821 ): """Returns the current log likelihood for a set of data in this Machine. Parameters ---------- data Data to compute the log likelihood on. Returns ------- array of shape (n_samples) The log likelihood of each sample. """ return log_likelihood( data=data, machine=self, )
[docs] def fit(self, X, y=None): """Trains the GMM on data until convergence or maximum step is reached.""" input_is_dask, X = check_and_persist_dask_input(X) if self._means is None: self.initialize_gaussians(X) else: logger.debug("GMM means already set. Initialization was not run!") if self._variances is None: logger.warning( "Variances were not defined before fit. Using variance=1" ) self.variances = np.ones_like(self.means) X = array_to_delayed_list(X, input_is_dask) average_output = 0 logger.info("Training GMM...") step = 0 while self.max_fitting_steps is None or step < self.max_fitting_steps: step += 1 logger.info( f"Iteration {step:3d}" + ( f"/{self.max_fitting_steps:3d}" if self.max_fitting_steps else "" ) ) average_output_previous = average_output # compute the e-m steps if input_is_dask: stats = [ dask.delayed(e_step)( data=xx, machine=self, ) for xx in X ] new_machine, average_output = dask.compute( dask.delayed(m_step)(stats, self) )[0] for attr in ["weights", "means", "variances"]: setattr(self, attr, getattr(new_machine, attr)) else: stats = [ e_step( data=X, machine=self, ) ] _, average_output = m_step(stats, self) logger.debug(f"log likelihood = {average_output}") if step > 1: convergence_value = abs( (average_output_previous - average_output) / average_output_previous ) logger.debug( f"convergence val = {convergence_value} and threshold = {self.convergence_threshold}" ) # Terminates if converged (and likelihood computation is set) if ( self.convergence_threshold is not None and convergence_value <= self.convergence_threshold ): logger.info( "Reached convergence threshold. Training stopped." ) break else: logger.info( "Reached maximum step. Training stopped without convergence." ) return self
[docs] def acc_stats(self, X): """Returns the statistics for `X`.""" # we need this because sometimes the transform function gets overridden return e_step(data=X, machine=self)
[docs] def transform(self, X): """Returns the statistics for `X`.""" return self.stats_per_sample(X)
[docs] def stats_per_sample(self, X): return [e_step(data=xx, machine=self) for xx in X]
def ml_gmm_m_step( machine: GMMMachine, statistics: GMMStats, update_means=True, update_variances=False, update_weights=False, mean_var_update_threshold=EPSILON, **kwargs, ): """Updates a gmm machine parameter according to the e-step statistics.""" logger.debug("ML GMM Trainer m-step") # Threshold the low n to prevent divide by zero thresholded_n = np.clip(statistics.n, mean_var_update_threshold, None) # Update weights if requested # (Equation 9.26 of Bishop, "Pattern recognition and machine learning", 2006) if update_weights: logger.debug("Update weights.") machine.weights = thresholded_n / statistics.t # Update GMM parameters using the sufficient statistics (m_ss): # Update means if requested # (Equation 9.24 of Bishop, "Pattern recognition and machine learning", 2006) if update_means: logger.debug("Update means.") # Using n with the applied threshold machine.means = statistics.sum_px / thresholded_n[:, None] # Update variances if requested # (Equation 9.25 of Bishop, "Pattern recognition and machine learning", 2006) # ...but we use the "computational formula for the variance", i.e. # var = 1/n * sum (P(x-mean)(x-mean)) # = 1/n * sum (Pxx) - mean^2 if update_variances: logger.debug("Update variances.") machine.variances = statistics.sum_pxx / thresholded_n[ :, None ] - np.power(machine.means, 2) def map_gmm_m_step( machine: GMMMachine, statistics: GMMStats, update_means=True, update_variances=False, update_weights=False, reynolds_adaptation=True, relevance_factor=4, alpha=0.5, mean_var_update_threshold=EPSILON, ): """Updates a GMMMachine parameters using statistics adapted from a UBM.""" if machine.ubm is None: raise ValueError("A machine used for MAP must have a UBM.") # Calculate the "data-dependent adaptation coefficient", alpha_i # [array of shape (n_gaussians, )] if reynolds_adaptation: alpha = statistics.n / (statistics.n + relevance_factor) else: if not hasattr(alpha, "ndim"): alpha = np.full((machine.n_gaussians,), alpha) # - Update weights if requested # Equation 11 of Reynolds et al., "Speaker Verification Using Adapted # Gaussian Mixture Models", Digital Signal Processing, 2000 if update_weights: # Calculate the maximum likelihood weights [array of shape (n_gaussians,)] ml_weights = statistics.n / statistics.t # Calculate the new weights machine.weights = alpha * ml_weights + (1 - alpha) * machine.ubm.weights # Apply the scale factor, gamma, to ensure the new weights sum to unity gamma = machine.weights.sum() machine.weights /= gamma # Update GMM parameters # - Update means if requested # Equation 12 of Reynolds et al., "Speaker Verification Using Adapted # Gaussian Mixture Models", Digital Signal Processing, 2000 if update_means: # Apply threshold to prevent divide by zero below n_threshold = np.where( statistics.n < mean_var_update_threshold, mean_var_update_threshold, statistics.n, ) # n_threshold = np.full(statistics.n.shape, fill_value=mean_var_update_threshold) # n_threshold[statistics.n > mean_var_update_threshold] = statistics.n[ # statistics.n > mean_var_update_threshold # ] new_means = np.multiply( alpha[:, None], (statistics.sum_px / n_threshold[:, None]), ) + np.multiply((1 - alpha[:, None]), machine.ubm.means) machine.means = np.where( statistics.n[:, None] < mean_var_update_threshold, machine.ubm.means, new_means, ) # - Update variance if requested # Equation 13 of Reynolds et al., "Speaker Verification Using Adapted # Gaussian Mixture Models", Digital Signal Processing, 2000 if update_variances: # Calculate new variances (equation 13) prior_norm_variances = ( machine.ubm.variances + machine.ubm.means ) - np.power(machine.means, 2) new_variances = ( alpha[:, None] * statistics.sum_pxx / statistics.n[:, None] + (1 - alpha[:, None]) * (machine.ubm.variances + machine.ubm.means) - np.power(machine.means, 2) ) machine.variances = np.where( statistics.n[:, None] < mean_var_update_threshold, prior_norm_variances, new_variances, )