import bob.io.base
import bob.learn.em
import bob.learn.linear
import numpy
import os, sys
import tarfile
import inspect
import logging
logger = logging.getLogger("bob.bio.base")
from .FileSelector import FileSelector
from .. import utils
from .. import score
def _scores(algorithm, reader, model, probe_objects, allow_missing_files):
"""Compute scores for the given model and a list of probes.
"""
# the file selector object
fs = FileSelector.instance()
# get probe files
probes = fs.get_paths(probe_objects, 'projected' if algorithm.performs_projection else 'extracted')
# the scores to be computed; initialized with NaN
scores = numpy.ones((1,len(probes)), numpy.float64) * numpy.nan
if allow_missing_files and model is None:
# if we have no model, all scores are undefined
return scores
# Checking if we need to ship the metadata in the scoring method
has_metadata = utils.is_argument_available("metadata", algorithm.score)
# Loops over the probe sets
for i, probe_element, probe_metadata in zip(range(len(probes)), probes, probe_objects):
if fs.uses_probe_file_sets():
assert isinstance(probe_element, list)
# filter missing files
if allow_missing_files:
probe_element = utils.filter_missing_files(probe_element, probe_objects)
if not probe_element:
# we keep the NaN score
continue
# read probe from probe_set
probe = [reader.read_feature(probe_file) for probe_file in probe_element]
# compute score
scores[0,i] = algorithm.score_for_multiple_probes(model, probe)
else:
if allow_missing_files and not os.path.exists(probe_element):
# we keep the NaN score
continue
# read probe
probe = reader.read_feature(probe_element)
# compute score
if has_metadata:
scores[0, i] = algorithm.score(model, probe, metadata=probe_metadata)
else:
scores[0, i] = algorithm.score(model, probe)
# Returns the scores
return scores
def _open_to_read(score_file):
"""Checks for the existence of the normal and the compressed version of the file, and calls :py:func:`bob.bio.base.score.open_file` for the existing one."""
if not os.path.exists(score_file):
score_file += '.tar.bz2'
if not os.path.exists(score_file):
raise IOError("The score file '%s' cannot be found. Aborting!" % score_file)
return score.open_file(score_file)
def _open_to_write(score_file, write_compressed):
"""Opens the given score file for writing. If write_compressed is set to ``True``, a file-like structure is returned."""
bob.io.base.create_directories_safe(os.path.dirname(score_file))
if write_compressed:
if sys.version_info[0] <= 2:
import StringIO
f = StringIO.StringIO()
else:
import io
f = io.BytesIO()
score_file += '.tar.bz2'
else:
f = open(score_file, 'w')
return f
def _write(f, data, write_compressed):
"""Writes the given data to file, after converting it to the required type."""
if write_compressed:
if sys.version_info[0] > 2:
data = str.encode(data)
f.write(data)
def _close_written(score_file, f, write_compressed):
"""Closes the file f that was opened with :py:func:`_open_to_read`"""
if write_compressed:
f.seek(0)
tarinfo = tarfile.TarInfo(os.path.basename(score_file))
tarinfo.size = len(f.buf if sys.version_info[0] <= 2 else f.getbuffer())
tar = tarfile.open(score_file, 'w')
tar.addfile(tarinfo, f)
tar.close()
# close the file
f.close()
def _delete(score_file, write_compressed):
"""Deletes the (compressed) score_file"""
if write_compressed:
score_file += '.tar.bz2'
if os.path.isfile(score_file):
os.remove(score_file)
def _save_scores(score_file, scores, probe_objects, client_id, write_compressed):
"""Saves the scores of one model into a text file that can be interpreted by
:py:func:`bob.bio.base.score.split_four_column`."""
assert len(probe_objects) == scores.shape[1]
# open file for writing
f = _open_to_write(score_file, write_compressed)
# write scores in four-column format as string
for i, probe_object in enumerate(probe_objects):
_write(f, "%s %s %s %3.8f\n" % (str(client_id), str(probe_object.client_id), str(probe_object.path), scores[0,i]), write_compressed)
_close_written(score_file, f, write_compressed)
def _scores_a(algorithm, reader, model_ids, group, compute_zt_norm, force, write_compressed, allow_missing_files):
"""Computes A scores for the models with the given model_ids. If ``compute_zt_norm = False``, these are the only scores that are actually computed."""
# the file selector object
fs = FileSelector.instance()
if compute_zt_norm:
logger.info("- Scoring: computing score matrix A for group '%s'", group)
else:
logger.info("- Scoring: computing scores for group '%s'", group)
# Computes the raw scores for each model
for pos, model_id in enumerate(model_ids):
# test if the file is already there
score_file = fs.a_file(model_id, group) if compute_zt_norm else fs.no_norm_file(model_id, group)
logger.debug("... Scoring model '%s' at '%s' (%d/%d)", model_id, score_file,
pos+1, len(model_ids))
if utils.check_file(score_file, force):
logger.warn("Score file '%s' already exists.", score_file)
else:
# get probe files that are required for this model
current_probe_objects = fs.probe_objects_for_model(model_id, group)
model_file = fs.model_file(model_id, group)
if allow_missing_files and not os.path.exists(model_file):
model = None
else:
model = algorithm.read_model(model_file)
# compute scores
a = _scores(algorithm, reader, model, current_probe_objects, allow_missing_files)
if compute_zt_norm:
# write A matrix only when you want to compute zt norm afterwards
bob.io.base.save(a, fs.a_file(model_id, group), True)
# Save scores to text file
_save_scores(fs.no_norm_file(model_id, group), a, current_probe_objects, fs.client_id(model_id, group), write_compressed)
def _scores_b(algorithm, reader, model_ids, group, force, allow_missing_files):
"""Computes B scores for the given model ids."""
# the file selector object
fs = FileSelector.instance()
# probe files:
z_probe_objects = fs.z_probe_objects(group)
logger.info("- Scoring: computing score matrix B for group '%s'", group)
# Loads the models
for pos, model_id in enumerate(model_ids):
# test if the file is already there
score_file = fs.b_file(model_id, group)
logger.debug("... Scoring model '%s' at '%s' (%d/%d)", model_id,
score_file, pos+1, len(model_ids))
if utils.check_file(score_file, force):
logger.warn("Score file '%s' already exists.", score_file)
else:
model_file = fs.model_file(model_id, group)
if allow_missing_files and not os.path.exists(model_file):
model = None
else:
model = algorithm.read_model(model_file)
b = _scores(algorithm, reader, model, z_probe_objects, allow_missing_files)
bob.io.base.save(b, score_file, True)
def _scores_c(algorithm, reader, t_model_ids, group, force, allow_missing_files):
"""Computes C scores for the given t-norm model ids."""
# the file selector object
fs = FileSelector.instance()
# probe files:
probe_objects = fs.probe_objects(group)
logger.info("- Scoring: computing score matrix C for group '%s'", group)
# Computes the raw scores for the T-Norm model
for pos, t_model_id in enumerate(t_model_ids):
# test if the file is already there
score_file = fs.c_file(t_model_id, group)
logger.debug("... Scoring model '%s' at '%s' (%d/%d)", t_model_id,
score_file, pos+1, len(t_model_ids))
if utils.check_file(score_file, force):
logger.warn("Score file '%s' already exists.", score_file)
else:
t_model_file = fs.t_model_file(t_model_id, group)
if allow_missing_files and not os.path.exists(t_model_file):
t_model = None
else:
t_model = algorithm.read_model(t_model_file)
c = _scores(algorithm, reader, t_model, probe_objects, allow_missing_files)
bob.io.base.save(c, score_file, True)
def _scores_d(algorithm, reader, t_model_ids, group, force, allow_missing_files):
"""Computes D scores for the given t-norm model ids. Both the D matrix and the D-samevalue matrix are written."""
# the file selector object
fs = FileSelector.instance()
# probe files:
z_probe_objects = fs.z_probe_objects(group)
logger.info("- Scoring: computing score matrix D for group '%s'", group)
# Gets the Z-Norm impostor samples
z_probe_ids = [z_probe_object.client_id for z_probe_object in z_probe_objects]
# Loads the T-Norm models
for pos, t_model_id in enumerate(t_model_ids):
# test if the file is already there
score_file = fs.d_file(t_model_id, group)
logger.debug("... Scoring model '%s' at '%s' (%d/%d)", t_model_id,
score_file, pos+1, len(t_model_ids))
same_score_file = fs.d_same_value_file(t_model_id, group)
if utils.check_file(score_file, force) and utils.check_file(same_score_file, force):
logger.warn("score files '%s' and '%s' already exist.", score_file, same_score_file)
else:
t_model_file = fs.t_model_file(t_model_id, group)
if allow_missing_files and not os.path.exists(t_model_file):
t_model = None
else:
t_model = algorithm.read_model(t_model_file)
d = _scores(algorithm, reader, t_model, z_probe_objects, allow_missing_files)
bob.io.base.save(d, score_file, True)
t_client_id = [fs.client_id(t_model_id, group, True)]
d_same_value_tm = bob.learn.em.ztnorm_same_value(t_client_id, z_probe_ids)
bob.io.base.save(d_same_value_tm, same_score_file, True)
[docs]def compute_scores(algorithm, extractor, compute_zt_norm, indices = None, groups = ['dev', 'eval'], types = ['A', 'B', 'C', 'D'], write_compressed = False, allow_missing_files = False, force = False):
"""Computes the scores for the given groups.
This function computes all scores for the experiment, and writes them to files, one per model.
When ``compute_zt_norm`` is enabled, scores are computed for all four matrices, i.e. A: normal scores; B: Z-norm scores; C: T-norm scores; D: ZT-norm scores and ZT-samevalue scores.
By default, scores are computed for both groups ``'dev'`` and ``'eval'``.
**Parameters:**
algorithm : py:class:`bob.bio.base.algorithm.Algorithm` or derived
The algorithm, used for enrolling model and writing them to file.
extractor : py:class:`bob.bio.base.extractor.Extractor` or derived
The extractor, used for extracting the features.
The extractor is only used to read features, if the algorithm does not perform projection.
compute_zt_norm : bool
If set to ``True``, also ZT-norm scores are computed.
indices : (int, int) or None
If specified, scores are computed only for the models in the given index range ``range(begin, end)``.
This is usually given, when parallel threads are executed.
.. note:: The probe files are not limited by the ``indices``.
groups : some of ``('dev', 'eval')``
The list of groups, for which scores should be computed.
types : some of ``['A', 'B', 'C', 'D']``
A list of score types to be computed.
If ``compute_zt_norm = False``, only the ``'A'`` scores are computed.
write_compressed : bool
If enabled, score files are compressed as ``.tar.bz2`` files.
allow_missing_files : bool
If set to ``True``, model and probe files that are not found will produce ``NaN`` scores.
force : bool
If given, score files are regenerated, even if they already exist.
"""
# the file selector object
fs = FileSelector.instance()
# load the projector and the enroller, if needed
if algorithm.performs_projection:
algorithm.load_projector(fs.projector_file)
algorithm.load_enroller(fs.enroller_file)
# which tool to use to read the probes
if algorithm.performs_projection:
reader = algorithm
else:
reader = extractor
# make sure that the extractor is loaded
extractor.load(fs.extractor_file)
for group in groups:
# get model ids
model_ids = fs.model_ids(group)
if indices is not None:
model_ids = model_ids[indices[0]:indices[1]]
logger.info("- Scoring: splitting of index range %s", str(indices))
if compute_zt_norm:
t_model_ids = fs.t_model_ids(group)
if indices is not None:
t_model_ids = t_model_ids[indices[0]:indices[1]]
# compute A scores
if 'A' in types:
_scores_a(algorithm, reader, model_ids, group, compute_zt_norm, force, write_compressed, allow_missing_files)
if compute_zt_norm:
# compute B scores
if 'B' in types:
_scores_b(algorithm, reader, model_ids, group, force, allow_missing_files)
# compute C scores
if 'C' in types:
_scores_c(algorithm, reader, t_model_ids, group, force, allow_missing_files)
# compute D scores
if 'D' in types:
_scores_d(algorithm, reader, t_model_ids, group, force, allow_missing_files)
def _c_matrix_split_for_model(selected_probe_objects, all_probe_objects, all_c_scores):
"""Helper function to sub-select the c-scores in case not all probe files were used to compute A scores."""
c_scores_for_model = numpy.empty((all_c_scores.shape[0], len(selected_probe_objects)), numpy.float64)
selected_index = 0
for all_index in range(len(all_probe_objects)):
if selected_index < len(selected_probe_objects) and selected_probe_objects[selected_index].id == all_probe_objects[all_index].id:
c_scores_for_model[:,selected_index] = all_c_scores[:,all_index]
selected_index += 1
assert selected_index == len(selected_probe_objects)
# return the split database
return c_scores_for_model
def _scores_c_normalize(model_ids, t_model_ids, group):
"""Compute normalized probe scores using T-model scores."""
# the file selector object
fs = FileSelector.instance()
# read all tmodel scores
c_for_all = None
for t_model_id in t_model_ids:
tmp = bob.io.base.load(fs.c_file(t_model_id, group))
if c_for_all is None:
c_for_all = tmp
else:
c_for_all = numpy.vstack((c_for_all, tmp))
# iterate over all models and generate C matrices for that specific model
all_probe_objects = fs.probe_objects(group)
for model_id in model_ids:
# select the correct probe files for the current model
probe_objects_for_model = fs.probe_objects_for_model(model_id, group)
c_matrix_for_model = _c_matrix_split_for_model(probe_objects_for_model, all_probe_objects, c_for_all)
# Save C matrix to file
bob.io.base.save(c_matrix_for_model, fs.c_file_for_model(model_id, group))
def _scores_d_normalize(t_model_ids, group):
"""Compute normalized D scores for the given T-model ids"""
# the file selector object
fs = FileSelector.instance()
# initialize D and D_same_value matrices
d_for_all = None
d_same_value = None
for t_model_id in t_model_ids:
tmp = bob.io.base.load(fs.d_file(t_model_id, group))
tmp2 = bob.io.base.load(fs.d_same_value_file(t_model_id, group))
if d_for_all is None and d_same_value is None:
d_for_all = tmp
d_same_value = tmp2
else:
d_for_all = numpy.vstack((d_for_all, tmp))
d_same_value = numpy.vstack((d_same_value, tmp2))
# Saves to files
bob.io.base.save(d_for_all, fs.d_matrix_file(group))
bob.io.base.save(d_same_value, fs.d_same_value_matrix_file(group))
[docs]def zt_norm(groups = ['dev', 'eval'], write_compressed = False, allow_missing_files = False):
"""Computes ZT-Norm using the previously generated A, B, C, D and D-samevalue matrix files.
This function computes the ZT-norm scores for all model ids for all desired groups and writes them into files defined by the :py:class:`bob.bio.base.tools.FileSelector`.
It loads the A, B, C, D and D-samevalue matrix files that need to be computed beforehand.
**Parameters:**
groups : some of ``('dev', 'eval')``
The list of groups, for which ZT-norm should be applied.
write_compressed : bool
If enabled, score files are compressed as ``.tar.bz2`` files.
allow_missing_files : bool
Currently, this option is only provided for completeness.
``NaN`` scores are not yet handled correctly.
"""
# the file selector object
fs = FileSelector.instance()
for group in groups:
logger.info("- Scoring: computing ZT-norm for group '%s'", group)
# list of models
model_ids = fs.model_ids(group)
t_model_ids = fs.t_model_ids(group)
# first, normalize C and D scores
_scores_c_normalize(model_ids, t_model_ids, group)
# and normalize it
_scores_d_normalize(t_model_ids, group)
# load D matrices only once
d = bob.io.base.load(fs.d_matrix_file(group))
d_same_value = bob.io.base.load(fs.d_same_value_matrix_file(group)).astype(bool)
error_log_done = False
# Loops over the model ids
for model_id in model_ids:
# Loads probe files to get information about the type of access
probe_objects = fs.probe_objects_for_model(model_id, group)
# Loads A, B, and C matrices for current model id
a = bob.io.base.load(fs.a_file(model_id, group))
b = bob.io.base.load(fs.b_file(model_id, group))
c = bob.io.base.load(fs.c_file_for_model(model_id, group))
# compute zt scores
if allow_missing_files:
# TODO: handle NaN scores, i.e., when allow_missing_files is enabled
if not error_log_done and any(numpy.any(numpy.isnan(x)) for x in (a,b,c,d,d_same_value)):
logger.error("There are NaN scores inside one of the score files for group %s; ZT-Norm will not work", group)
error_log_done = True
zt_scores = bob.learn.em.ztnorm(a, b, c, d, d_same_value)
# Saves to text file
_save_scores(fs.zt_norm_file(model_id, group), zt_scores, probe_objects, fs.client_id(model_id, group), write_compressed)
def _concat(score_files, output, write_compressed, model_ids):
"""Concatenates a list of score files into a single score file."""
try:
f = _open_to_write(output, write_compressed)
# Concatenates the scores
if model_ids is None:
for score_file in score_files:
i = _open_to_read(score_file)
f.write(i.read())
else:
for score_file, model_id in zip(score_files, model_ids):
i = _open_to_read(score_file)
for l in i:
s = l.split()
s.insert(1, str(model_id))
f.write(" ".join(s) + "\n")
except:
logger.error("Concatenation failed; removing result file %s", output)
_close_written(output, f, write_compressed)
_delete(output, write_compressed)
raise
else:
_close_written(output, f, write_compressed)
[docs]def concatenate(compute_zt_norm, groups = ['dev', 'eval'], write_compressed = False, add_model_id = False):
"""Concatenates all results into one (or two) score files per group.
Score files, which were generated per model, are concatenated into a single
score file, which can be interpreter by
:py:func:`bob.bio.base.score.load.split_four_column`.
The score files are always re-computed, regardless if they exist or not.
**Parameters:**
compute_zt_norm : bool
If set to ``True``, also score files for ZT-norm are concatenated.
groups : some of ``('dev', 'eval')``
The list of groups, for which score files should be concatenated.
write_compressed : bool
If enabled, concatenated score files are compressed as ``.tar.bz2`` files.
"""
# the file selector object
fs = FileSelector.instance()
for group in groups:
logger.info("- Scoring: concatenating score files for group '%s'", group)
# (sorted) list of models
model_ids = fs.model_ids(group)
model_files = [fs.no_norm_file(model_id, group) for model_id in model_ids]
result_file = fs.no_norm_result_file(group)
_concat(model_files, result_file, write_compressed, model_ids if add_model_id else None)
logger.info("- Scoring: wrote score file '%s'", result_file)
if compute_zt_norm:
model_files = [fs.zt_norm_file(model_id, group) for model_id in model_ids]
result_file = fs.zt_norm_result_file(group)
_concat(model_files, result_file, write_compressed, model_ids if add_model_id else None)
logger.info("- Scoring: wrote score file '%s'", result_file)
[docs]def calibrate(compute_zt_norm, groups = ['dev', 'eval'], prior = 0.5, write_compressed = False):
"""Calibrates the score files by learning a linear calibration from the dev files (first element of the groups) and executing the on all groups.
This function is intended to compute the calibration parameters on the scores of the development set using the :py:class:`bob.learn.linear.CGLogRegTrainer`.
Afterward, both the scores of the development and evaluation sets are calibrated and written to file.
For ZT-norm scores, the calibration is performed independently, if enabled.
The names of the calibrated score files that should be written are obtained from the :py:class:`bob.bio.base.tools.FileSelector`.
.. note::
All ``NaN`` scores in the development set are silently ignored.
This might raise an error, if **all** scores are ``NaN``.
**Parameters:**
compute_zt_norm : bool
If set to ``True``, also score files for ZT-norm are calibrated.
groups : some of ``('dev', 'eval')``
The list of groups, for which score files should be calibrated.
The first of the given groups is used to train the logistic regression parameters, while the calibration is performed for all given groups.
prior : float
Whatever :py:class:`bob.learn.linear.CGLogRegTrainer` takes as a ``prior``.
write_compressed : bool
If enabled, calibrated score files are compressed as ``.tar.bz2`` files.
"""
# the file selector object
fs = FileSelector.instance()
# read score files of the first group (assuming that the first group is 'dev')
norms = ['nonorm', 'ztnorm'] if compute_zt_norm else ["nonorm"]
for norm in norms:
training_score_file = fs.no_norm_result_file(groups[0]) if norm == 'nonorm' else fs.zt_norm_result_file(groups[0]) if norm == 'ztnorm' else None
# create a LLR trainer
logger.info(" - Calibration: Training calibration for type %s from group %s", norm, groups[0])
llr_trainer = bob.learn.linear.CGLogRegTrainer(prior, 1e-16, 100000)
training_scores = list(score.split_four_column(training_score_file))
for i in (0,1):
h = numpy.array(training_scores[i])
# remove NaN's
h = h[~numpy.isnan(h)]
training_scores[i] = h[:,numpy.newaxis]
# train the LLR
llr_machine = llr_trainer.train(training_scores[0], training_scores[1])
del training_scores
logger.debug(" ... Resulting calibration parameters: shift = %f, scale = %f", llr_machine.biases[0], llr_machine.weights[0,0])
# now, apply it to all groups
for group in groups:
score_file = fs.no_norm_result_file(group) if norm == 'nonorm' else fs.zt_norm_result_file(group) if norm is 'ztnorm' else None
calibrated_file = fs.calibrated_score_file(group, norm == 'ztnorm')
logger.info(" - Calibration: calibrating scores from '%s' to '%s'", score_file, calibrated_file)
# iterate through the score file and calibrate scores
scores = score.four_column(_open_to_read(score_file))
f = _open_to_write(calibrated_file, write_compressed)
for line in scores:
assert len(line) == 4, "The line %s of score file %s cannot be interpreted" % (line, score_file)
calibrated_score = llr_machine([line[3]])
f.write('%s %s %s %3.8f\n' % (line[0], line[1], line[2], calibrated_score[0]))
_close_written(calibrated_file, f, write_compressed)