import logging
import os
import dask.bag
from bob.bio.base.pipelines.vanilla_biometrics import BioAlgorithmDaskWrapper
from bob.bio.base.pipelines.vanilla_biometrics import CSVScoreWriter
from bob.bio.base.pipelines.vanilla_biometrics import FourColumnsScoreWriter
from bob.bio.base.pipelines.vanilla_biometrics import ZTNormCheckpointWrapper
from bob.bio.base.pipelines.vanilla_biometrics import ZTNormPipeline
from bob.bio.base.pipelines.vanilla_biometrics import checkpoint_vanilla_biometrics
from bob.bio.base.pipelines.vanilla_biometrics import dask_vanilla_biometrics
from bob.bio.base.pipelines.vanilla_biometrics import is_checkpointed
from bob.pipelines.utils import isinstance_nested, is_estimator_stateless
from dask.delayed import Delayed
from bob.pipelines.distributed import dask_get_partition_size
logger = logging.getLogger(__name__)
def compute_scores(result, dask_client):
if isinstance(result, Delayed) or isinstance(result, dask.bag.Bag):
if dask_client is not None:
result = result.compute(scheduler=dask_client)
else:
logger.warning("`dask_client` not set. Your pipeline will run locally")
result = result.compute(scheduler="single-threaded")
return result
def post_process_scores(pipeline, scores, path):
written_scores = pipeline.write_scores(scores)
return pipeline.post_process(written_scores, path)
def execute_vanilla_biometrics(
pipeline,
database,
dask_client,
groups,
output,
write_metadata_scores,
checkpoint,
dask_partition_size,
dask_n_workers,
**kwargs,
):
"""
Function that executes the Vanilla Biometrics pipeline.
This is called when using the ``bob bio pipelines vanilla-biometrics``
command.
This is also callable from a script without fear of interrupting the running
Dask instance, allowing chaining multiple experiments while keeping the
workers alive.
Parameters
----------
pipeline: Instance of :py:class:`bob.bio.base.pipelines.vanilla_biometrics.VanillaBiometricsPipeline`
A constructed vanilla-biometrics pipeline.
database: Instance of :py:class:`bob.bio.base.pipelines.vanilla_biometrics.abstract_class.Database`
A database interface instance
dask_client: instance of :py:class:`dask.distributed.Client` or ``None``
A Dask client instance used to run the experiment in parallel on multiple
machines, or locally.
Basic configs can be found in ``bob.pipelines.config.distributed``.
groups: list of str
Groups of the dataset that will be requested from the database interface.
output: str
Path where the results and checkpoints will be saved to.
write_metadata_scores: bool
Use the CSVScoreWriter instead of the FourColumnScoreWriter when True.
checkpoint: bool
Whether checkpoint files will be created for every step of the pipelines.
"""
if not os.path.exists(output):
os.makedirs(output, exist_ok=True)
if write_metadata_scores:
pipeline.score_writer = CSVScoreWriter(os.path.join(output, "./tmp"))
else:
pipeline.score_writer = FourColumnsScoreWriter(os.path.join(output, "./tmp"))
# Check if it's already checkpointed
if checkpoint and not is_checkpointed(pipeline):
hash_fn = database.hash_fn if hasattr(database, "hash_fn") else None
pipeline = checkpoint_vanilla_biometrics(pipeline, output, hash_fn=hash_fn)
# Load the background model samples only if the transformer requires fitting
if all([is_estimator_stateless(step) for step in pipeline.transformer]):
background_model_samples = []
else:
background_model_samples = database.background_model_samples()
for group in groups:
score_file_name = os.path.join(
output, f"scores-{group}" + (".csv" if write_metadata_scores else "")
)
biometric_references = database.references(group=group)
probes = database.probes(group=group)
# If there's no data to be processed, continue
if len(biometric_references) == 0 or len(probes) == 0:
logger.warning(
f"Current dataset ({database}) does not have `{group}` set. The experiment will not be executed."
)
continue
if dask_client is not None and not isinstance_nested(
pipeline.biometric_algorithm, "biometric_algorithm", BioAlgorithmDaskWrapper
):
# Scaling up
if dask_n_workers is not None and not isinstance(dask_client, str):
dask_client.cluster.scale(dask_n_workers)
n_objects = max(
len(background_model_samples), len(biometric_references), len(probes)
)
partition_size = None
if not isinstance(dask_client, str):
partition_size = dask_get_partition_size(dask_client.cluster, n_objects)
if dask_partition_size is not None:
partition_size = dask_partition_size
pipeline = dask_vanilla_biometrics(pipeline, partition_size=partition_size,)
logger.info(f"Running vanilla biometrics for group {group}")
allow_scoring_with_all_biometric_references = (
database.allow_scoring_with_all_biometric_references
if hasattr(database, "allow_scoring_with_all_biometric_references")
else False
)
result = pipeline(
background_model_samples,
biometric_references,
probes,
allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references,
)
post_processed_scores = post_process_scores(pipeline, result, score_file_name)
_ = compute_scores(post_processed_scores, dask_client)
[docs]def execute_vanilla_biometrics_ztnorm(
pipeline,
database,
dask_client,
groups,
output,
consider_genuines,
write_metadata_scores,
ztnorm_cohort_proportion,
checkpoint,
dask_partition_size,
dask_n_workers,
**kwargs,
):
"""
Function that executes the Vanilla Biometrics pipeline with ZTNorm.
This is called when using the ``bob bio pipelines vanilla-biometrics-ztnorm``
command.
This is also callable from a script without fear of interrupting the running
Dask instance, allowing chaining multiple experiments while keeping the
workers alive.
Parameters
----------
pipeline: Instance of :py:class:`bob.bio.base.pipelines.vanilla_biometrics.VanillaBiometricsPipeline`
A constructed vanilla-biometrics pipeline.
database: Instance of :py:class:`bob.bio.base.pipelines.vanilla_biometrics.abstract_class.Database`
A database interface instance
dask_client: instance of :py:class:`dask.distributed.Client` or ``None``
A Dask client instance used to run the experiment in parallel on multiple machines, or locally. Basic configs can be found in ``bob.pipelines.config.distributed``.
groups: list of str
Groups of the dataset that will be requested from the database interface.
output: str
Path where the results and checkpoints will be saved to.
write_metadata_scores: bool
Use the CSVScoreWriter instead of the FourColumnScoreWriter when True.
checkpoint: bool
Whether checkpoint files will be created for every step of the pipelines.
dask_partition_size: int
If using Dask, this option defines the size of each dask.bag.partition. Use this option if the current heuristic that sets this value doesn't suit your experiment. (https://docs.dask.org/en/latest/bag-api.html?highlight=partition_size#dask.bag.from_sequence).
dask_n_workers: int
If using Dask, this option defines the number of workers to start your experiment. Dask automatically scales up/down the number of workers due to the current load of tasks to be solved. Use this option if the current amount of workers set to start an experiment doesn't suit you.
ztnorm_cohort_proportion: float
Sets the percentage of samples used for t-norm and z-norm. Sometimes you don't want to use all the t/z samples for normalization
consider_genuines: float
If set, will consider genuine scores in the ZT score normalization
"""
def _merge_references_ztnorm(biometric_references, probes, zprobes, treferences):
treferences_sub = [t.reference_id for t in treferences]
biometric_references_sub = [t.reference_id for t in biometric_references]
for i in range(len(probes)):
probes[i].references += treferences_sub
for i in range(len(zprobes)):
zprobes[i].references = biometric_references_sub + treferences_sub
return probes, zprobes
if not os.path.exists(output):
os.makedirs(output, exist_ok=True)
if write_metadata_scores:
pipeline.score_writer = CSVScoreWriter(os.path.join(output, "./tmp"))
else:
pipeline.score_writer = FourColumnsScoreWriter(os.path.join(output, "./tmp"))
# Check if it's already checkpointed
if checkpoint and not is_checkpointed(pipeline):
pipeline = checkpoint_vanilla_biometrics(pipeline, output)
# Patching the pipeline in case of ZNorm and checkpointing it
pipeline = ZTNormPipeline(pipeline)
if checkpoint:
pipeline.ztnorm_solver = ZTNormCheckpointWrapper(
pipeline.ztnorm_solver, os.path.join(output, "normed-scores")
)
background_model_samples = database.background_model_samples()
zprobes = database.zprobes(proportion=ztnorm_cohort_proportion)
treferences = database.treferences(proportion=ztnorm_cohort_proportion)
for group in groups:
score_file_name = os.path.join(output, f"scores-{group}")
biometric_references = database.references(group=group)
probes = database.probes(group=group)
# If there's no data to be processed, continue
if len(biometric_references) == 0 or len(probes) == 0:
logger.warning(
f"Current dataset ({database}) does not have `{group}` set. The experiment will not be executed."
)
continue
if dask_client is not None and not isinstance_nested(
pipeline.biometric_algorithm, "biometric_algorithm", BioAlgorithmDaskWrapper
):
# Scaling up
if dask_n_workers is not None and not isinstance(dask_client, str):
dask_client.cluster.scale(dask_n_workers)
n_objects = max(
len(background_model_samples), len(biometric_references), len(probes)
)
partition_size = None
if not isinstance(dask_client, str):
partition_size = dask_get_partition_size(dask_client.cluster, n_objects)
if dask_partition_size is not None:
partition_size = dask_partition_size
pipeline = dask_vanilla_biometrics(pipeline, partition_size=partition_size,)
logger.info(f"Running vanilla biometrics for group {group}")
allow_scoring_with_all_biometric_references = (
database.allow_scoring_with_all_biometric_references
if hasattr(database, "allow_scoring_with_all_biometric_references")
else False
)
if consider_genuines:
z_probes_cpy = copy.deepcopy(zprobes)
zprobes += copy.deepcopy(treferences)
treferences += z_probes_cpy
probes, zprobes = _merge_references_ztnorm(
biometric_references, probes, zprobes, treferences
)
(
raw_scores,
z_normed_scores,
t_normed_scores,
zt_normed_scores,
s_normed_scores,
) = pipeline(
background_model_samples,
biometric_references,
probes,
zprobes,
treferences,
allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references,
)
def _build_filename(score_file_name, suffix):
return os.path.join(score_file_name, suffix)
# Running RAW_SCORES
raw_scores = post_process_scores(
pipeline, raw_scores, _build_filename(score_file_name, "raw_scores.csv")
)
_ = compute_scores(raw_scores, dask_client)
# Z-SCORES
z_normed_scores = post_process_scores(
pipeline,
z_normed_scores,
_build_filename(score_file_name, "z_normed_scores.csv"),
)
_ = compute_scores(z_normed_scores, dask_client)
# T-SCORES
t_normed_scores = post_process_scores(
pipeline,
t_normed_scores,
_build_filename(score_file_name, "t_normed_scores.csv"),
)
_ = compute_scores(t_normed_scores, dask_client)
# S-SCORES
s_normed_scores = post_process_scores(
pipeline,
s_normed_scores,
_build_filename(score_file_name, "s_normed_scores.csv"),
)
_ = compute_scores(s_normed_scores, dask_client)
# ZT-SCORES
zt_normed_scores = post_process_scores(
pipeline,
zt_normed_scores,
_build_filename(score_file_name, "zt_normed_scores.csv"),
)
_ = compute_scores(zt_normed_scores, dask_client)