Source code for bob.learn.pytorch.architectures.MCCNNv2

#!/usr/bin/env python
# encoding: utf-8

import torch
import torch.nn as nn
import torch.nn.functional as F

import os
import numpy as np

import pkg_resources
import bob.extension.download

import bob.io.base

from .utils import MaxFeatureMap
from .utils import group
from .utils import resblock

import logging

logger = logging.getLogger("bob.learn.pytorch")


[docs]class MCCNNv2(nn.Module): """ The class defining the MCCNNv2 the difference from MCCNN is that it uses shared layers for layers which are not adapted. This avoids replicating shared layers. Attributes ---------- num_channels: int The number of channels present in the input lcnn_layers: list The adaptable layers present in the base LightCNN model module_dict: dict A dictionary containing module names and `torch.nn.Module` elements as key, value pairs. layer_dict: :py:class:`torch.nn.ModuleDict` Pytorch class containing the modules as a dictionary. light_cnn_model_file: str Absolute path to the pretrained LightCNN model file. adapted_layers: str The layers to be adapted in training, they are to be separated by '-'. Example: 'conv1-block1-group1-ffc'; 'ffc' denotes final fully connected layers which are adapted in all the cases. url: str The path to download the pretrained LightCNN model from. """ def __init__( self, block=resblock, layers=[1, 2, 3, 4], num_channels=4, adapted_layers="conv1-block1-group1-ffc", verbosity_level=2, ): """ Init function Parameters ---------- num_channels: int The number of channels present in the input adapted_layers: str The layers to be adapted in training, they are to be separated by '-'. Example: 'conv1-block1-group1-ffc'; 'ffc' denotes final fully connected layers which are adapted in all the cases. verbosity_level: int Verbosity level. """ super(MCCNNv2, self).__init__() self.num_channels = num_channels self.lcnn_layers = [ "conv1", "block1", "group1", "block2", "group2", "block3", "group3", "block4", "group4", "fc", ] layers_present = self.lcnn_layers.copy() layers_present.append("ffc") # select the layers in the network to adapt adapted_layers_list = adapted_layers.split("-") assert "ffc" in adapted_layers_list assert set(adapted_layers_list) <= set( layers_present ) # to ensure layer names are valid self.shared_layers = list( set(layers_present) - set(adapted_layers_list) ) # shared layers self.domain_specific_layers = list(set(adapted_layers_list) - set(["ffc"])) logger.setLevel(verbosity_level) self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2, ceil_mode=True) self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2, ceil_mode=True) self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2, ceil_mode=True) self.pool4 = nn.MaxPool2d(kernel_size=2, stride=2, ceil_mode=True) # newly added FC layers self.linear1fc = nn.Linear(256 * num_channels, 10) self.linear2fc = nn.Linear(10, 1) # add modules module_dict = {} for i in range(self.num_channels): m_dict = {} m_dict["conv1"] = MaxFeatureMap(1, 48, 5, 1, 2) m_dict["block1"] = self._make_layer(block, layers[0], 48, 48) m_dict["group1"] = group(48, 96, 3, 1, 1) m_dict["block2"] = self._make_layer(block, layers[1], 96, 96) m_dict["group2"] = group(96, 192, 3, 1, 1) m_dict["block3"] = self._make_layer(block, layers[2], 192, 192) m_dict["group3"] = group(192, 128, 3, 1, 1) m_dict["block4"] = self._make_layer(block, layers[3], 128, 128) m_dict["group4"] = group(128, 128, 3, 1, 1) m_dict["fc"] = MaxFeatureMap(8 * 8 * 128, 256, type=0) # ch_0_should be the anchor for ( layer ) in self.domain_specific_layers: # needs copies for domain specific layers layer_name = "ch_{}_".format(i) + layer module_dict[layer_name] = m_dict[layer] m_dict = {} m_dict["conv1"] = MaxFeatureMap(1, 48, 5, 1, 2) m_dict["block1"] = self._make_layer(block, layers[0], 48, 48) m_dict["group1"] = group(48, 96, 3, 1, 1) m_dict["block2"] = self._make_layer(block, layers[1], 96, 96) m_dict["group2"] = group(96, 192, 3, 1, 1) m_dict["block3"] = self._make_layer(block, layers[2], 192, 192) m_dict["group3"] = group(192, 128, 3, 1, 1) m_dict["block4"] = self._make_layer(block, layers[3], 128, 128) m_dict["group4"] = group(128, 128, 3, 1, 1) m_dict["fc"] = MaxFeatureMap(8 * 8 * 128, 256, type=0) for ( layer ) in ( self.shared_layers ): # shared layers have ch_0_ prefix to make loading from pretrained model easier. layer_name = "ch_0_" + layer module_dict[layer_name] = m_dict[layer] self.layer_dict = nn.ModuleDict(module_dict) # check for pretrained model light_cnn_model_file = os.path.join( MCCNNv2.get_mccnnv2path(), "LightCNN_29Layers_checkpoint.pth.tar" ) url = "http://www.idiap.ch/software/bob/data/bob/bob.learn.pytorch/master/LightCNN_29Layers_checkpoint.pth.tar" logger.info("Light_cnn_model_file path: {}".format(light_cnn_model_file)) if not os.path.exists(light_cnn_model_file): bob.io.base.create_directories_safe(os.path.split(light_cnn_model_file)[0]) logger.info("Downloading the LightCNN model") bob.extension.download.download_file(url, light_cnn_model_file) logger.info( "Downloaded LightCNN model to location: {}".format(light_cnn_model_file) ) ## Loding the pretrained model for ch_0 self.load_state_dict( self.get_model_state_dict(light_cnn_model_file), strict=False ) # copy over the weights to all other domain specific layers for layer in self.domain_specific_layers: for i in range(1, self.num_channels): # except for 0 th channel self.layer_dict["ch_{}_".format(i) + layer].load_state_dict( self.layer_dict["ch_0_" + layer].state_dict() ) def _make_layer(self, block, num_blocks, in_channels, out_channels): """ makes multiple copies of the same base module Parameters ---------- block: :py:class:`torch.nn.Module` The base block to replicate num_blocks: int Number of copies of the block to be made in_channels: int Number of input channels for a block out_channels: int Number of output channels for a block """ layers = [] for i in range(0, num_blocks): layers.append(block(in_channels, out_channels)) return nn.Sequential(*layers)
[docs] def forward(self, img): """ Propagate data through the network Parameters ---------- img: :py:class:`torch.Tensor` The data to forward through the network. Image of size num_channelsx128x128 Returns ------- output: :py:class:`torch.Tensor` score """ embeddings = [] for i in range(self.num_channels): commom_layer = ( lambda x, y: x if self.lcnn_layers[y] in self.domain_specific_layers else 0 ) # for ll in range(0,10): # logger.debug("ch_{}_".format(commom_layer(i,ll))+self.lcnn_layers[ll]) x = img[:, i, :, :].unsqueeze(1) # the image for the specific channel x = self.layer_dict[ "ch_{}_".format(commom_layer(i, 0)) + self.lcnn_layers[0] ](x) x = self.pool1(x) x = self.layer_dict[ "ch_{}_".format(commom_layer(i, 1)) + self.lcnn_layers[1] ](x) x = self.layer_dict[ "ch_{}_".format(commom_layer(i, 2)) + self.lcnn_layers[2] ](x) x = self.pool2(x) x = self.layer_dict[ "ch_{}_".format(commom_layer(i, 3)) + self.lcnn_layers[3] ](x) x = self.layer_dict[ "ch_{}_".format(commom_layer(i, 4)) + self.lcnn_layers[4] ](x) x = self.pool3(x) x = self.layer_dict[ "ch_{}_".format(commom_layer(i, 5)) + self.lcnn_layers[5] ](x) x = self.layer_dict[ "ch_{}_".format(commom_layer(i, 6)) + self.lcnn_layers[6] ](x) x = self.layer_dict[ "ch_{}_".format(commom_layer(i, 7)) + self.lcnn_layers[7] ](x) x = self.layer_dict[ "ch_{}_".format(commom_layer(i, 8)) + self.lcnn_layers[8] ](x) x = self.pool4(x) x = x.view(x.size(0), -1) fc = self.layer_dict[ "ch_{}_".format(commom_layer(i, 9)) + self.lcnn_layers[9] ](x) fc = F.dropout(fc, training=self.training) embeddings.append(fc) merged = torch.cat(embeddings, 1) output = self.linear1fc(merged) output = nn.Sigmoid()(output) output = self.linear2fc(output) output = nn.Sigmoid()(output) return output
[docs] @staticmethod def get_mccnnv2path(): import pkg_resources return pkg_resources.resource_filename("bob.learn.pytorch", "models")
[docs] def get_model_state_dict(self, pretrained_model_path): """ The class to load pretrained LightCNN model Attributes ---------- pretrained_model_path: str Absolute path to the LightCNN model file new_state_dict: dict Dictionary with LightCNN weights """ checkpoint = torch.load( pretrained_model_path, map_location=lambda storage, loc: storage ) start_epoch = checkpoint["epoch"] state_dict = checkpoint["state_dict"] # create new OrderedDict that does not contain `module.` from collections import OrderedDict new_state_dict = OrderedDict() for k, v in state_dict.items(): name = "layer_dict.ch_0_" + k[7:] # remove `module.` new_state_dict[name] = v # load params return new_state_dict