Source code for bob.ip.tensorflow_extractor.FaceNet

from __future__ import division
import os
import re
import logging
import numpy
import tensorflow as tf
from bob.ip.color import gray_to_rgb
from bob.io.image import to_matplotlib
from bob.extension import rc
import bob.extension.download
import bob.io.base
import multiprocessing

logger = logging.getLogger(__name__)

FACENET_MODELPATH_KEY = "bob.ip.tensorflow_extractor.facenet_modelpath"


def prewhiten(img):
    mean = numpy.mean(img)
    std = numpy.std(img)
    std_adj = numpy.maximum(std, 1.0 / numpy.sqrt(img.size))
    y = numpy.multiply(numpy.subtract(img, mean), 1 / std_adj)
    return y


def get_model_filenames(model_dir):
    # code from https://github.com/davidsandberg/facenet
    files = os.listdir(model_dir)
    meta_files = [s for s in files if s.endswith(".meta")]
    if len(meta_files) == 0:
        raise ValueError("No meta file found in the model directory (%s)" % model_dir)
    elif len(meta_files) > 1:
        raise ValueError(
            "There should not be more than one meta file in the model "
            "directory (%s)" % model_dir
        )
    meta_file = meta_files[0]
    max_step = -1
    for f in files:
        step_str = re.match(r"(^model-[\w\- ]+.ckpt-(\d+))", f)
        if step_str is not None and len(step_str.groups()) >= 2:
            step = int(step_str.groups()[1])
            if step > max_step:
                max_step = step
                ckpt_file = step_str.groups()[0]
    return meta_file, ckpt_file

_semaphore = multiprocessing.Semaphore()
class FaceNet(object):
    """Wrapper for the free FaceNet variant:
    https://github.com/davidsandberg/facenet

    To use this class as a bob.bio.base extractor::

        from bob.bio.base.extractor import Extractor
        class FaceNetExtractor(FaceNet, Extractor):
            pass
        extractor = FaceNetExtractor()

    And for a preprocessor you can use::

        from bob.bio.face.preprocessor import FaceCrop
        # This is the size of the image that this model expects
        CROPPED_IMAGE_HEIGHT = 160
        CROPPED_IMAGE_WIDTH = 160
        # eye positions for frontal images
        RIGHT_EYE_POS = (46, 53)
        LEFT_EYE_POS = (46, 107)
        # Crops the face using eye annotations
        preprocessor = FaceCrop(
            cropped_image_size=(CROPPED_IMAGE_HEIGHT, CROPPED_IMAGE_WIDTH),
            cropped_positions={'leye': LEFT_EYE_POS, 'reye': RIGHT_EYE_POS},
            color_channel='rgb'
        )

    """

[docs] def __init__( self, model_path=rc[FACENET_MODELPATH_KEY], image_size=160, layer_name="embeddings:0", **kwargs ): super(FaceNet, self).__init__() self.model_path = model_path self.image_size = image_size self.layer_name = layer_name self._clean_unpicklables()
def _clean_unpicklables(self): self.session = None self.embeddings = None self.graph = None self.images_placeholder = None self.embeddings = None self.phase_train_placeholder = None self.session = None def _check_feature(self, img): img = numpy.ascontiguousarray(img) if img.ndim == 2: img = gray_to_rgb(img) assert img.shape[-1] == self.image_size assert img.shape[-2] == self.image_size img = to_matplotlib(img) img = prewhiten(img) return img[None, ...]
[docs] def load_model(self): if self.model_path is None: self.model_path = self.get_modelpath() if not os.path.exists(self.model_path): bob.io.base.create_directories_safe(FaceNet.get_modelpath()) zip_file = os.path.join(FaceNet.get_modelpath(), "20170512-110547.zip") urls = [ # This link only works in Idiap CI to save bandwidth. "http://www.idiap.ch/private/wheels/gitlab/" "facenet_model2_20170512-110547.zip", # this link to dropbox would work for everybody "https://www.dropbox.com/s/" "k7bhxe58q7d48g7/facenet_model2_20170512-110547.zip?dl=1", ] bob.extension.download.download_and_unzip(urls, zip_file) # code from https://github.com/davidsandberg/facenet model_exp = os.path.expanduser(self.model_path) with self.graph.as_default(): if os.path.isfile(model_exp): logger.info("Model filename: %s" % model_exp) with tf.compat.v1.gfile.FastGFile(model_exp, "rb") as f: graph_def = tf.compat.v1.GraphDef() graph_def.ParseFromString(f.read()) tf.import_graph_def(graph_def, name="") else: logger.info("Model directory: %s" % model_exp) meta_file, ckpt_file = get_model_filenames(model_exp) logger.info("Metagraph file: %s" % meta_file) logger.info("Checkpoint file: %s" % ckpt_file) saver = tf.compat.v1.train.import_meta_graph( os.path.join(model_exp, meta_file) ) saver.restore(self.session, os.path.join(model_exp, ckpt_file)) # Get input and output tensors self.images_placeholder = self.graph.get_tensor_by_name("input:0") self.embeddings = self.graph.get_tensor_by_name(self.layer_name) self.phase_train_placeholder = self.graph.get_tensor_by_name("phase_train:0") logger.info("Successfully loaded the model.")
[docs] def __call__(self, img): with _semaphore: images = self._check_feature(img) if self.session is None: self.graph = tf.Graph() self.session = tf.compat.v1.Session(graph=self.graph) if self.embeddings is None: self.load_model() feed_dict = { self.images_placeholder: images, self.phase_train_placeholder: False, } features = self.session.run(self.embeddings, feed_dict=feed_dict) return features.flatten()
[docs] @staticmethod def get_modelpath(): """ Get default model path. First we try the to search this path via Global Configuration System. If we can not find it, we set the path in the directory `<project>/data` """ # Priority to the RC path model_path = rc["bob.ip.tensorflow_extractor.facenet_modelpath"] if model_path is None: import pkg_resources model_path = pkg_resources.resource_filename( __name__, "data/FaceNet/20170512-110547" ) return model_path
def __setstate__(self, d): # Handling unpicklable objects self.__dict__ = d def __getstate__(self): # Handling unpicklable objects with _semaphore: self._clean_unpicklables() return self.__dict__