Source code for bob.pad.base.algorithm.SVMCascadePCA

#!/usr/bin/env python2
# -*- coding: utf-8 -*-
"""
Created on Wed May 17 09:43:09 2017

@author: Olegs Nikisins
"""

#==============================================================================
# Import what is needed here:

from bob.pad.base.algorithm import Algorithm

import numpy as np

import bob.learn.libsvm

import bob.learn.linear

import bob.io.base

import os

import fnmatch

from bob.bio.video.utils import FrameContainer

from bob.pad.base.utils import convert_frame_cont_to_array, mean_std_normalize

#==============================================================================
# Main body :

class SVMCascadePCA(Algorithm):
    """
    This class is designed to train the **cascede** of SVMs given Frame Containers
    with features of real and attack classes. The procedure is the following:

    1. First, the input data is mean-std normalized.

    2. Second, the PCA is trained on normalized input features. Only the
       features of the **real** class are used in PCA training, both
       for one-class and two-class SVMs.

    3. The features are next projected given trained PCA machine.

    4. Prior to SVM training the features are again mean-std normalized.

    5. Next SVM machine is trained for each N projected features. First, preojected
       features corresponding to highest eigenvalues are selected. N is usually small
       N = (2, 3). So, if N = 2, the first SVM is trained for projected features 1 and 2,
       second SVM is trained for projected features 3 and 4, and so on.

    6. These SVMs then form a cascade of classifiers. The input feature vector is then
       projected using PCA machine and passed through all classifiers in the cascade.
       The decision is then made by majority voting.

    Both one-class SVM and two-class SVM cascades can be trained.
    In this implementation the grid search of SVM parameters is not supported.

    **Parameters:**

    ``machine_type`` : :py:class:`str`
        A type of the SVM machine. Please check ``bob.learn.libsvm`` for
        more details. Default: 'C_SVC'.

    ``kernel_type`` : :py:class:`str`
        A type of kerenel for the SVM machine. Please check ``bob.learn.libsvm``
        for more details. Default: 'RBF'.

    ``svm_kwargs`` : :py:class:`dict`
        Dictionary containing the hyper-parameters of the SVM.
        Default: {'cost': 1, 'gamma': 0}.

    ``N`` : :py:class:`int`
        The number of features to be used for training a single SVM machine
        in the cascade. Default: 2.

    ``pos_scores_slope`` : :py:class:`float`
        The positive scores returned by SVM cascade will be multiplied by this
        constant prior to majority voting. Default: 0.01 .

    ``frame_level_scores_flag`` : :py:class:`bool`
        Return scores for each frame individually if True. Otherwise, return a
        single score per video. Default: False.
    """

    def __init__(self,
                 machine_type = 'C_SVC',
                 kernel_type = 'RBF',
                 svm_kwargs = {'cost': 1, 'gamma': 0},
                 N = 2,
                 pos_scores_slope = 0.01,
                 frame_level_scores_flag = False):


        Algorithm.__init__(self,
                           machine_type = machine_type,
                           kernel_type = kernel_type,
                           svm_kwargs = svm_kwargs,
                           N = N,
                           pos_scores_slope = pos_scores_slope,
                           frame_level_scores_flag = frame_level_scores_flag,
                           performs_projection=True,
                           requires_projector_training=True)

        self.machine_type = machine_type
        self.kernel_type = kernel_type
        self.svm_kwargs = svm_kwargs
        self.N = N
        self.pos_scores_slope = pos_scores_slope
        self.frame_level_scores_flag = frame_level_scores_flag

        self.pca_projector_file_name = "pca_projector" # pca machine will be saved to .hdf5 file with this name
        self.svm_projector_file_name = "svm_projector" # svm machines will be saved to .hdf5 files with this name augumented by machine number

        self.pca_machine = None
        self.svm_machines = None


    #==========================================================================
[docs] def comp_prediction_precision(self, machine, real, attack): """ This function computes the precision of the predictions as a ratio of correctly classified samples to the total number of samples. **Parameters:** ``machine`` : object A pre-trained SVM machine. ``real`` : 2D :py:class:`numpy.ndarray` Array of features representing the real class. ``attack`` : 2D :py:class:`numpy.ndarray` Array of features representing the attack class. **Returns:** ``precision`` : :py:class:`float` The precision of the predictions. """ labels_real = machine.predict_class(real) labels_attack = machine.predict_class(attack) samples_num = len(labels_real) + len(labels_attack) precision = ( np.sum(labels_real == 1) + np.sum(labels_attack == -1) ).astype( np.float ) / samples_num return precision
#==========================================================================
[docs] def train_pca(self, data): """ Train PCA given input array of feature vectors. The data is mean-std normalized prior to PCA training. **Parameters:** ``data`` : 2D :py:class:`numpy.ndarray` Array of feature vectors of the size (N_samples x N_features). The features must be already mean-std normalized. **Returns:** ``machine`` : :py:class:`bob.learn.linear.Machine` The PCA machine that has been trained. The mean-std normalizers are also set in the machine. ``eig_vals`` : 1D :py:class:`numpy.ndarray` The eigen-values of the PCA projection. """ # 1. Normalize the training data: data_norm, features_mean, features_std = mean_std_normalize(data) trainer = bob.learn.linear.PCATrainer() # Creates a PCA trainer [machine, eig_vals] = trainer.train(data_norm) # Trains the machine with the given data # Set the normalizers for the PCA machine, needed to normalize the test samples. machine.input_subtract = features_mean # subtract the mean of train data machine.input_divide = features_std # divide by std of train data return machine, eig_vals
#==========================================================================
[docs] def train_svm(self, real, attack, machine_type, kernel_type, svm_kwargs): """ One-class or two class-SVM is trained in this method given input features. The value of ``attack`` argument is not important in the case of one-class SVM. Prior to training the data is mean-std normalized. **Parameters:** ``real`` : 2D :py:class:`numpy.ndarray` Training features for the real class. ``attack`` : 2D :py:class:`numpy.ndarray` Training features for the attack class. If machine_type == 'ONE_CLASS' this argument can be anything, it will be skipped. ``machine_type`` : :py:class:`str` A type of the SVM machine. Please check ``bob.learn.libsvm`` for more details. ``kernel_type`` : :py:class:`str` A type of kerenel for the SVM machine. Please check ``bob.learn.libsvm`` for more details. ``svm_kwargs`` : :py:class:`dict` Dictionary containing the hyper-parameters of the SVM. **Returns:** ``machine`` : object A trained SVM machine. The mean-std normalizers are also set in the machine. """ one_class_flag = (machine_type == 'ONE_CLASS') # True if one-class SVM is used # Mean-std normalize the data before training real, attack, features_mean, features_std = self.norm_train_data(real, attack, one_class_flag) # real and attack - are now mean-std normalized trainer = bob.learn.libsvm.Trainer(machine_type = machine_type, kernel_type = kernel_type, probability = True) for key in svm_kwargs.keys(): setattr(trainer, key, svm_kwargs[key]) # set the hyper-parameters of the SVM if not( one_class_flag ): # two-class SVM case data = [real, attack] # data for final training else: # one-class SVM case data = [real] # only real class used for training machine = trainer.train(data) # train the machine # add the normalizers to the trained SVM machine machine.input_subtract = features_mean # subtract the mean of train data machine.input_divide = features_std # divide by std of train data return machine
#==========================================================================
[docs] def get_data_start_end_idx(self, data, N): """ Get indexes to select the subsets of data related to the cascades. First (n_machines - 1) SVMs will be trained using N features. Last SVM will be trained using remaining features, which is less or equal to N. **Parameters:** ``data`` : 2D :py:class:`numpy.ndarray` Data array containing the training features. The dimensionality is (N_samples x N_features). ``N`` : :py:class:`int` Number of features per single SVM. **Returns:** ``idx_start`` : [int] Starting indexes for data subsets. ``idx_end`` : [int] End indexes for data subsets. ``n_machines`` : :py:class:`int` Number of SVMs to be trained. """ n_features = data.shape[1] n_machines = np.int(n_features/N) if (n_features - n_machines*N) > 1: # if more than one feature remains machines_num = range(0, n_machines, 1) idx_start = [item*N for item in machines_num] idx_end = [(item+1)*N for item in machines_num] idx_start.append( n_machines*N ) idx_end.append( n_features ) n_machines = n_machines + 1 else: machines_num = range(0, n_machines, 1) idx_start = [item*N for item in machines_num] idx_end = [(item+1)*N for item in machines_num] return idx_start, idx_end, n_machines
#==========================================================================
[docs] def train_svm_cascade(self, real, attack, machine_type, kernel_type, svm_kwargs, N): """ Train a cascade of SVMs, one SVM machine per N features. N is usually small N = (2, 3). So, if N = 2, the first SVM is trained for features 1 and 2, second SVM is trained for features 3 and 4, and so on. Both one-class and two-class SVM cascades can be trained. The value of ``attack`` argument is not important in the case of one-class SVM. The data is mean-std normalized prior to SVM cascade training. **Parameters:** ``real`` : 2D :py:class:`numpy.ndarray` Training features for the real class. ``attack`` : 2D :py:class:`numpy.ndarray` Training features for the attack class. If machine_type == 'ONE_CLASS' this argument can be anything, it will be skipped. ``machine_type`` : :py:class:`str` A type of the SVM machine. Please check ``bob.learn.libsvm`` for more details. ``kernel_type`` : :py:class:`str` A type of kerenel for the SVM machine. Please check ``bob.learn.libsvm`` for more details. ``svm_kwargs`` : :py:class:`dict` Dictionary containing the hyper-parameters of the SVM. ``N`` : :py:class:`int` The number of features to be used for training a single SVM machine in the cascade. **Returns:** ``machines`` : :py:class:`dict` A dictionary containing a cascade of trained SVM machines. """ one_class_flag = (machine_type == 'ONE_CLASS') # True if one-class SVM is used idx_start, idx_end, n_machines = self.get_data_start_end_idx(real, N) machines = {} for machine_num in range(0, n_machines, 1): if not(one_class_flag): # two-class SVM real_subset = real[:, idx_start[machine_num] : idx_end[machine_num] ] # both real and attack classes are used attack_subset = attack[:, idx_start[machine_num] : idx_end[machine_num] ] else: # one-class SVM case real_subset = real[:, idx_start[machine_num] : idx_end[machine_num] ] # only the real class is used attack_subset = [] machine = self.train_svm(real_subset, attack_subset, machine_type, kernel_type, svm_kwargs) machines[ str(machine_num) ] = machine del machine return machines
#==========================================================================
[docs] def train_pca_svm_cascade(self, real, attack, machine_type, kernel_type, svm_kwargs, N): """ This function is designed to train the **cascede** of SVMs given features of real and attack classes. The procedure is the following: 1. First, the PCA machine is trained also incorporating mean-std feature normalization. Only the features of the **real** class are used in PCA training, both for one-class and two-class SVMs. 2. The features are next projected given trained PCA machine. 3. Next, SVM machine is trained for each N projected features. Prior to SVM training the features are again mean-std normalized. First, preojected features corresponding to highest eigenvalues are selected. N is usually small N = (2, 3). So, if N = 2, the first SVM is trained for projected features 1 and 2, second SVM is trained for projected features 3 and 4, and so on. Both one-class SVM and two-class SVM cascades can be trained. In this implementation the grid search of SVM parameters is not supported. **Parameters:** ``real`` : 2D :py:class:`numpy.ndarray` Training features for the real class. ``attack`` : 2D :py:class:`numpy.ndarray` Training features for the attack class. If machine_type == 'ONE_CLASS' this argument can be anything, it will be skipped. ``machine_type`` : :py:class:`str` A type of the SVM machine. Please check ``bob.learn.libsvm`` for more details. ``kernel_type`` : :py:class:`str` A type of kerenel for the SVM machine. Please check ``bob.learn.libsvm`` for more details. ``svm_kwargs`` : :py:class:`dict` Dictionary containing the hyper-parameters of the SVM. ``N`` : :py:class:`int` The number of features to be used for training a single SVM machine in the cascade. **Returns:** ``pca_machine`` : object A trained PCA machine. ``svm_machines`` : :py:class:`dict` A cascade of SVM machines. """ one_class_flag = (machine_type == 'ONE_CLASS') # True if one-class SVM is used # 1. Train PCA using normalized features of the real class: pca_machine, _ = self.train_pca(real) # the mean-std normalizers are already set in this machine # 2. Project the features given PCA machine: if not(one_class_flag): projected_real = pca_machine(real) # the normalizers are already set for the PCA machine, therefore non-normalized data is passed in projected_attack = pca_machine(attack) # the normalizers are already set for the PCA machine, therefore non-normalized data is passed in else: projected_real = pca_machine(real) # the normalizers are already set for the PCA machine, therefore non-normalized data is passed in projected_attack = [] # 3. Train a cascade of SVM machines using **projected** data svm_machines = self.train_svm_cascade(projected_real, projected_attack, machine_type, kernel_type, svm_kwargs, N) return pca_machine, svm_machines
#==========================================================================
[docs] def save_machine(self, projector_file, projector_file_name, machine): """ Saves the machine to the hdf5 file. The name of the file is specified in ``projector_file_name`` string. The location is specified in the path component of the ``projector_file`` string. **Parameters:** ``projector_file`` : :py:class:`str` Absolute name of the file to save the trained projector to, as returned by ``bob.pad.base`` framework. In this function only the path component is used. ``projector_file_name`` : :py:class:`str` The relative name of the file to save the machine to. Name without extension. ``machine`` : object The machine to be saved. """ extension = ".hdf5" resulting_file_name = os.path.join( os.path.split(projector_file)[0], projector_file_name + extension ) f = bob.io.base.HDF5File(resulting_file_name, 'w') # open hdf5 file to save to machine.save(f) # save the machine and normalization parameters del f
#==========================================================================
[docs] def save_cascade_of_machines(self, projector_file, projector_file_name, machines): """ Saves a cascade of machines to the hdf5 files. The name of the file is specified in ``projector_file_name`` string and will be augumented with a number of the machine. The location is specified in the path component of the ``projector_file`` string. **Parameters:** ``projector_file`` : :py:class:`str` Absolute name of the file to save the trained projector to, as returned by ``bob.pad.base`` framework. In this function only the path component is used. ``projector_file_name`` : :py:class:`str` The relative name of the file to save the machine to. This name will be augumented with a number of the machine. Name without extension. ``machines`` : :py:class:`dict` A cascade of machines. The key in the dictionary is the number of the machine, value is the machine itself. """ for key in machines: augumented_projector_file_name = projector_file_name + key machine = machines[key] self.save_machine(projector_file, augumented_projector_file_name, machine)
#==========================================================================
[docs] def train_projector(self, training_features, projector_file): """ Train PCA and cascade of SVMs for feature projection and save them to files. The ``requires_projector_training = True`` flag must be set to True to enable this function. **Parameters:** ``training_features`` : [[FrameContainer], [FrameContainer]] A list containing two elements: [0] - a list of Frame Containers with feature vectors for the real class; [1] - a list of Frame Containers with feature vectors for the attack class. ``projector_file`` : :py:class:`str` The file to save the trained projector to, as returned by the ``bob.pad.base`` framework. In this class the names of the files to save the projectors to are modified, see ``save_machine`` and ``save_cascade_of_machines`` methods of this class for more details. """ # training_features[0] - training features for the REAL class. real = convert_list_of_frame_cont_to_array(training_features[0]) # output is array # training_features[1] - training features for the ATTACK class. attack = convert_list_of_frame_cont_to_array(training_features[1]) # output is array # Train the PCA machine and cascade of SVMs pca_machine, svm_machines = self.train_pca_svm_cascade(real = real, attack = attack, machine_type = self.machine_type, kernel_type = self.kernel_type, svm_kwargs = self.svm_kwargs, N = self.N) # Save the PCA machine self.save_machine(projector_file, self.pca_projector_file_name, pca_machine) # Save the cascade of SVMs: self.save_cascade_of_machines(projector_file, self.svm_projector_file_name, svm_machines)
#==========================================================================
[docs] def load_machine(self, projector_file, projector_file_name): """ Loads the machine from the hdf5 file. The name of the file is specified in ``projector_file_name`` string. The location is specified in the path component of the ``projector_file`` string. **Parameters:** ``projector_file`` : :py:class:`str` Absolute name of the file to load the trained projector from, as returned by ``bob.pad.base`` framework. In this function only the path component is used. ``projector_file_name`` : :py:class:`str` The relative name of the file to load the machine from. Name without extension. **Returns:** ``machine`` : object A machine loaded from file. """ extension = ".hdf5" resulting_file_name = os.path.join( os.path.split(projector_file)[0], projector_file_name + extension ) # name of the file f = bob.io.base.HDF5File(resulting_file_name, 'r') # file to read the machine from if "pca_" in projector_file_name: machine = bob.learn.linear.Machine(f) if "svm_" in projector_file_name: machine = bob.learn.libsvm.Machine(f) del f return machine
#==========================================================================
[docs] def get_cascade_file_names(self, projector_file, projector_file_name): """ Get the list of file-names storing the cascade of machines. The location of the files is specified in the path component of the ``projector_file`` argument. **Parameters:** ``projector_file`` : :py:class:`str` Absolute name of the file to load the trained projector from, as returned by ``bob.pad.base`` framework. In this function only the path component is used. ``projector_file_name`` : :py:class:`str` The **common** string in the names of files storing the cascade of pretrained machines. Name without extension. **Returns:** ``cascade_file_names`` : [str] A list of of **relative** file-names storing the cascade of machines. """ path = os.path.split(projector_file)[0] # directory containing files storing the cascade of machines. files = [] for f in os.listdir( path ): if fnmatch.fnmatch( f, projector_file_name + "*" ): files.append(f) return files
#==========================================================================
[docs] def load_cascade_of_machines(self, projector_file, projector_file_name): """ Loades a cascade of machines from the hdf5 files. The name of the file is specified in ``projector_file_name`` string and will be augumented with a number of the machine. The location is specified in the path component of the ``projector_file`` string. **Parameters:** ``projector_file`` : :py:class:`str` Absolute name of the file to load the trained projector from, as returned by ``bob.pad.base`` framework. In this function only the path component is used. ``projector_file_name`` : :py:class:`str` The relative name of the file to load the machine from. This name will be augumented with a number of the machine. Name without extension. **Returns:** ``machines`` : :py:class:`dict` A cascade of machines. The key in the dictionary is the number of the machine, value is the machine itself. """ files = self.get_cascade_file_names(projector_file, projector_file_name) # files storing the cascade machines = {} for idx, _ in enumerate(files): machine = self.load_machine( projector_file, projector_file_name + str(idx) ) machines[ str(idx) ] = machine return machines
#==========================================================================
[docs] def load_projector(self, projector_file): """ Load the pretrained PCA machine and a cascade of SVM classifiers from files to perform feature projection. This function sets the arguments ``self.pca_machine`` and ``self.svm_machines`` of this class with loaded machines. The function must be capable of reading the data saved with the :py:meth:`train_projector` method of this class. Please register `performs_projection = True` in the constructor to enable this function. **Parameters:** ``projector_file`` : :py:class:`str` The file to read the projector from, as returned by the ``bob.pad.base`` framework. In this class the names of the files to read the projectors from are modified, see ``load_machine`` and ``load_cascade_of_machines`` methods of this class for more details. """ # Load the PCA machine pca_machine = self.load_machine(projector_file, self.pca_projector_file_name) # Load the cascade of SVMs: svm_machines = self.load_cascade_of_machines(projector_file, self.svm_projector_file_name) self.pca_machine = pca_machine self.svm_machines = svm_machines
#==========================================================================
[docs] def combine_scores_of_svm_cascade(self, scores_array, pos_scores_slope): """ First, multiply positive scores by constant ``pos_scores_slope`` in the input 2D array. The constant is usually small, making the impact of negative scores more significant. Second, the a single score per sample is obtained by avaraging the **pre-modified** scores of the cascade. **Parameters:** ``scores_array`` : 2D :py:class:`numpy.ndarray` 2D score array of the size (N_samples x N_scores). ``pos_scores_slope`` : :py:class:`float` The positive scores returned by SVM cascade will be multiplied by this constant prior to majority voting. Default: 0.01 . **Returns:** ``scores`` : 1D :py:class:`numpy.ndarray` Vector of scores. Scores for the real class are expected to be higher, than the scores of the negative / attack class. """ cols = [] for col in scores_array.T: idx_vec = np.where(col>=0) col[idx_vec] *= pos_scores_slope # multiply positive scores by the constant cols.append(col) scores_array_modified = np.stack(cols, axis=1) scores = np.mean(scores_array_modified, axis = 1) return scores
#==========================================================================
[docs] def project(self, feature): """ This function computes a vector of scores for each sample in the input array of features. The following steps are apllied: 1. Convert input array to numpy array if necessary. 2. Project features using pretrained PCA machine. 3. Apply the cascade of SVMs to the preojected features. 4. Compute a single score per sample by combining the scores produced by the cascade of SVMs. The combination is done using ``combine_scores_of_svm_cascade`` method of this class. Set ``performs_projection = True`` in the constructor to enable this function. It is assured that the :py:meth:`load_projector` was **called before** the ``project`` function is executed. **Parameters:** ``feature`` : FrameContainer or 2D :py:class:`numpy.ndarray` Two types of inputs are accepted. A Frame Container conteining the features of an individual, see ``bob.bio.video.utils.FrameContainer``. Or a 2D feature array of the size (N_samples x N_features). **Returns:** ``scores`` : 1D :py:class:`numpy.ndarray` Vector of scores. Scores for the real class are expected to be higher, than the scores of the negative / attack class. """ # 1. Convert input array to numpy array if necessary. if isinstance(feature, FrameContainer): # if FrameContainer convert to 2D numpy array features_array = convert_frame_cont_to_array(feature) else: features_array = feature # 2. Project features using pretrained PCA machine. pca_projected_features = self.pca_machine(features_array) # 3. Apply the cascade of SVMs to the preojected features. all_scores = [] idx_start, idx_end, n_machines = self.get_data_start_end_idx(pca_projected_features, self.N) for machine_num in range(0, n_machines, 1): # iterate over SVM machines svm_machine = self.svm_machines[ str(machine_num) ] # select a machine # subset of PCA projected features to be passed to SVM machine pca_projected_features_subset = pca_projected_features[:, idx_start[machine_num] : idx_end[machine_num] ] # for two-class SVM select the scores corresponding to the real class only, done by [:,0]. Index [0] selects the class Index [1] selects the score.. single_machine_scores = svm_machine.predict_class_and_scores( pca_projected_features_subset )[1][:,0] all_scores.append(single_machine_scores) all_scores_array = np.stack(all_scores, axis = 1).astype(np.float) # 4. Combine the scores: one_class_flag = (svm_machine.machine_type == 'ONE_CLASS') # True if one-class SVM is used if not(one_class_flag): scores = np.mean(all_scores_array, axis = 1) # compute mean for two-class SVM else: # one class SVM case scores = self.combine_scores_of_svm_cascade(all_scores_array, self.pos_scores_slope) return scores
#==========================================================================
[docs] def score(self, toscore): """ Returns a probability of a sample being a real class. **Parameters:** ``toscore`` : 1D or 2D :py:class:`numpy.ndarray` 2D in the case of two-class SVM. An array containing class probabilities for each frame. First column contains probabilities for each frame being a real class. Second column contains probabilities for each frame being an attack class. 1D in the case of one-class SVM. Vector with scores for each frame defining belonging to the real class. **Returns:** ``score`` : [:py:class:`float`] If ``frame_level_scores_flag = False`` a single score is returned. One score per video. This score is placed into a list, because the ``score`` must be an iterable. Score is a probability of a sample being a real class. If ``frame_level_scores_flag = True`` a list of scores is returned. One score per frame/sample. """ if self.frame_level_scores_flag: score = list(toscore) else: score = [np.mean( toscore )] # compute a single score per video return score