#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Laurent El Shafey <Laurent.El-Shafey@idiap.ch>
# @author: Pavel Korshunov <pavel.korshunov@idiap.ch>
# @date: Thu Nov 17 16:09:22 CET 2016
#
# Copyright (C) 2011-2013 Idiap Research Institute, Martigny, Switzerland
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""This module provides the Dataset interface allowing the user to query the
verification database based on file lists in the most obvious ways.
"""
import os
from .models import Client, ListReader
from bob.db.base import Database
class PadFileListDatabase(Database):
"""This class provides a user-friendly interface to databases that are given as file lists.
Keyword parameters:
base_dir : str
The directory that contains the filelists defining the protocol(s). If you use the protocol
attribute when querying the database, it will be appended to the base directory, such that
several protocols are supported by the same class instance of `bob.pad.db`.
original_directory : str or ``None``
The directory, where the original data can be found
original_extension : str or [str] or ``None``
The filename extension of the original data, or multiple extensions
annotation_directory : str or ``None``
The directory, where additional annotation files can be found
annotation_extension : str or ``None``
The filename extension of the annoation files
annotation_type : str or ``None``
The type of annotation that can be read.
Currently, annotations are not supported for PAD
See :py:func:`bob.db.base.read_annotation_file` for details.
train_subdir : str or ``None``
Specify a custom subdirectory for the filelists of the development set (default is 'train')
dev_subdir : str or ``None``
Specify a custom subdirectory for the filelists of the development set (default is 'dev')
eval_subdir : str or ``None``
Specify a custom subdirectory for the filelists of the development set (default is 'eval')
keep_read_lists_in_memory : bool
If set to true, the lists are read only once and stored in memory
"""
def __init__(
self,
base_dir,
original_directory=None,
original_extension=None,
# PAD annotations should be supported in the future
annotation_directory=None,
annotation_extension="",
annotation_type=None,
train_subdir=None,
dev_subdir=None,
eval_subdir=None,
real_filename=None, # File containing the real files
attack_filename=None, # File containing the real files
# if set to True (the RECOMMENDED default) lists are read only once and stored in memory.
keep_read_lists_in_memory=True
):
"""Initializes the database with the file lists from the given base directory,
and the given sub-directories and file names (which default to useful values if not given)."""
self.original_directory = original_directory
self.original_extension = original_extension
self.m_annotation_directory = annotation_directory
self.m_annotation_extension = annotation_extension
self.m_annotation_type = annotation_type
self.m_base_dir = os.path.abspath(base_dir)
if not os.path.isdir(self.m_base_dir):
raise RuntimeError('Invalid directory specified %s.' % (self.m_base_dir))
# sub-directories for train, dev, and eval sets:
self.m_dev_subdir = dev_subdir if dev_subdir is not None else 'dev'
self.m_eval_subdir = eval_subdir if eval_subdir is not None else 'eval'
self.m_train_subdir = train_subdir if train_subdir is not None else 'train'
# real list: format: filename client_id
self.m_real_filename = real_filename if real_filename is not None else 'for_real.lst'
# attack list: format: filename client_id attack_type
self.m_attack_filename = attack_filename if attack_filename is not None else 'for_attack.lst'
self.m_list_reader = ListReader(keep_read_lists_in_memory)
[docs] def groups(self, protocol=None):
"""This function returns the list of groups for this database.
protocol : str or ``None``
The protocol for which the groups should be retrieved.
Returns: a list of groups
"""
groups = []
if protocol is not None:
if os.path.isdir(os.path.join(self.get_base_directory(), protocol, self.m_dev_subdir)):
groups.append('dev')
if os.path.isdir(os.path.join(self.get_base_directory(), protocol, self.m_eval_subdir)):
groups.append('eval')
if os.path.isdir(os.path.join(self.get_base_directory(), protocol, self.m_train_subdir)):
groups.append('train')
else:
if os.path.isdir(os.path.join(self.get_base_directory(), self.m_dev_subdir)):
groups.append('dev')
if os.path.isdir(os.path.join(self.get_base_directory(), self.m_eval_subdir)):
groups.append('eval')
if os.path.isdir(os.path.join(self.get_base_directory(), self.m_train_subdir)):
groups.append('train')
return groups
[docs] def get_base_directory(self):
"""Returns the base directory where the filelists defining the database
are located."""
return self.m_base_dir
[docs] def set_base_directory(self, base_dir):
"""Resets the base directory where the filelists defining the database
are located."""
self.m_base_dir = base_dir
if not os.path.isdir(self.m_base_dir):
raise RuntimeError('Invalid directory specified %s.' % self.m_base_dir)
[docs] def get_list_file(self, group, type=None, protocol=None):
if protocol:
base_directory = os.path.join(self.get_base_directory(), protocol)
else:
base_directory = self.get_base_directory()
group_dir = self.m_dev_subdir if group == 'dev' else self.m_eval_subdir if group == 'eval' else self.m_train_subdir
list_name = {'for_real': self.m_real_filename,
'for_attack': self.m_attack_filename,
}[type]
return os.path.join(base_directory, group_dir, list_name)
[docs] def clients(self, protocol=None, groups=None):
"""Returns a list of :py:class:`Client` objects for the specific query by the user.
Keyword Parameters:
protocol : str or ``None``
The protocol to consider
groups : str or [str] or ``None``
The groups to which the clients belong ("dev", "eval", "train", "optional_train_1", "optional_train_2").
Returns: A list containing all the :py:class:`Client` objects which have the given properties.
"""
client_ids = self.client_ids(protocol, groups)
return [Client(id) for id in client_ids]
def __client_id_list__(self, groups, type, protocol=None):
ids = set()
# read all lists for all groups and extract the model ids
for group in groups:
files = self.m_list_reader.read_list(self.get_list_file(group, type, protocol), group, type)
for file in files:
ids.add(file.client_id)
return ids
[docs] def client_ids(self, protocol=None, groups=None):
"""Returns a list of client ids for the specific query by the user.
Keyword Parameters:
protocol : str or ``None``
The protocol to consider
groups : str or [str] or ``None``
The groups to which the clients belong ("dev", "eval", "train").
Returns: A list containing all the client ids which have the given properties.
"""
groups = self.check_parameters_for_validity(groups, "group",
('dev', 'eval', 'train'),
default_parameters=('dev', 'eval', 'train'))
return self.__client_id_list__(groups, 'for_real', protocol)
[docs] def objects(self, protocol=None, purposes=None, groups=None):
"""Returns a set of :py:class:`File` objects for the specific query by the user.
Keyword Parameters:
protocol : str or ``None``
The protocol to consider
purposes : str or [str] or ``None``
The purposes required to be retrieved ("real", "attack") or a tuple
with several of them. If 'None' is given (this is the default), it is
considered the same as a tuple with all possible values.
groups : str or [str] or ``None``
One of the groups ("dev", "eval", "train") or a tuple with several of them.
If 'None' is given (this is the default), it is considered the same as a
tuple with all possible values.
Returns: A list of :py:class:`File` objects considering all the filtering criteria.
"""
purposes = self.check_parameters_for_validity(purposes, "purpose", ('real', 'attack'))
groups = self.check_parameters_for_validity(groups, "group",
('dev', 'eval', 'train'),
default_parameters=('dev', 'eval', 'train'))
# first, collect all the lists that we want to process
lists = []
for group in ('train', 'dev', 'eval'):
if group in groups:
if 'real' in purposes:
lists.append(
self.m_list_reader.read_list(self.get_list_file(group, 'for_real', protocol=protocol),
group, 'for_real'))
if 'attack' in purposes:
lists.append(
self.m_list_reader.read_list(self.get_list_file(group, 'for_attack', protocol=protocol),
group, 'for_attack'))
# now, go through the lists and add add corresponding files
retval = []
# non-probe files; just filter by model id
for flist in lists:
for fileobj in flist:
retval.append(fileobj)
return retval
[docs] def annotations(self, file):
"""We do not have support for PAD annotations yet.
Return value
Does nothing
"""
pass
[docs] def original_file_name(self, file_obj, check_existence=True):
"""Returns the original file_obj name of the given file_obj.
This interface supports several original extensions, so that file_obj lists can contain different data types.
When multiple original extensions are specified, this function will check the existence of any of
these file_obj names, and return the first one that actually exists.
In this case, the ``check_existence`` flag is ignored.
**Keyword parameters**
file_obj : :py:class:`bob.pad.db.File`
The py:class:`File` object for which the file_obj name should be returned.
check_existence : bool
Should the existence of the original file_obj be checked?
(Ignored when multiple original extensions were specified in the contructor.)
**Returns**
str : The full path of the original data file_obj.
"""
if isinstance(self.original_extension, str):
# extract file_obj name
file_name = file_obj.make_path(self.original_directory, self.original_extension)
if check_existence and os.path.exists(file_name):
return file_name
# check all registered extensions
for extension in self.original_extension:
file_name = file_obj.make_path(self.original_directory, extension)
if check_existence and os.path.exists(file_name):
return file_name
# None of the extensions matched
raise IOError("File '%s' does not exist with any of the extensions '%s'" % (
file_obj.make_path(self.original_directory, None), self.original_extension))