#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###################################################################################
# #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# Redistribution and use in source and binary forms, with or without #
# modification, are permitted provided that the following conditions are met: #
# #
# 1. Redistributions of source code must retain the above copyright notice, this #
# list of conditions and the following disclaimer. #
# #
# 2. Redistributions in binary form must reproduce the above copyright notice, #
# this list of conditions and the following disclaimer in the documentation #
# and/or other materials provided with the distribution. #
# #
# 3. Neither the name of the copyright holder nor the names of its contributors #
# may be used to endorse or promote products derived from this software without #
# specific prior written permission. #
# #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #
# #
###################################################################################
"""
==========
experiment
==========
Validation for experiments
"""
import collections
import itertools
import os
import simplejson as json
from . import algorithm
from . import database
from . import hash
from . import schema
from . import toolchain
from . import utils
EVALUATOR_PREFIX = "evaluator_"
PROCESSOR_PREFIX = "processor_"
[docs]class Storage(utils.Storage):
"""Resolves paths for experiments
Parameters:
prefix (str): Establishes the prefix of your installation.
name (str): The name of the experiment object in the format
``<user>/<toolchain-user>/<toolchain-name>/<version>/<name>`` or
``<user>/<toolchain-name>/<version>/<name>``, in case ``<user>`` and
``<toolchain-user>`` are the same.
"""
asset_type = "experiment"
asset_folder = "experiments"
def __init__(self, prefix, name):
if name.count(os.sep) not in (3, 4):
raise RuntimeError("invalid experiment label: `%s'" % name)
s = name.split(os.sep)
if len(s) == 4:
name = os.path.join(s[0], name)
(
self.username,
self.toolchain_username,
self.toolchain,
self.version,
self.name,
) = name.split(os.sep)
self.label = name
self.toolchain = os.path.join(
self.toolchain_username, self.toolchain, self.version
)
self.prefix = prefix
path = utils.hashed_or_simple(
self.prefix, self.asset_folder, name, suffix=".json"
)
path = path[:-5]
super(Storage, self).__init__(path)
[docs]class Experiment(object):
"""Experiments define the complete workflow for user test on the platform.
Parameters:
prefix (str): Establishes the prefix of your installation.
data (:py:class:`object`, Optional): The piece of data representing the
experiment. It must validate against the schema defined for toolchains.
If a string is passed, it is supposed to be a valid path to an
experiment in the designated prefix area. If ``None`` is passed, loads
our default prototype for toolchains. If a tuple is passed (or a list),
then we consider that the first element represents the experiment,
while the second, the toolchain definition. The toolchain bit can be
defined as a dictionary or as a string (pointing to a valid path in the
designated prefix area).
dataformat_cache (:py:class:`dict`, Optional): A dictionary mapping
dataformat names to loaded dataformats. This parameter is optional and,
if passed, may greatly speed-up experiment loading times as dataformats
that are already loaded may be re-used. If you use this parameter, you
must guarantee that the cache is refreshed as appropriate in case the
underlying dataformats change.
database_cache (:py:class:`dict`, Optional): A dictionary mapping
database names to loaded databases. This parameter is optional and, if
passed, may greatly speed-up experiment loading times as databases that
are already loaded may be re-used. If you use this parameter, you must
guarantee that the cache is refreshed as appropriate in case the
underlying databases change.
algorithm_cache (:py:class:`dict`, Optional): A dictionary mapping
algorithm names to loaded algorithms. This parameter is optional and,
if passed, may greatly speed-up experiment loading times as algorithms
that are already loaded may be re-used. If you use this parameter, you
must guarantee that the cache is refreshed as appropriate in case the
underlying algorithms change.
library_cache (:py:class:`dict`, Optional): A dictionary mapping library
names to loaded libraries. This parameter is optional and, if passed,
may greatly speed-up library loading times as libraries that are
already loaded may be re-used. If you use this parameter, you must
guarantee that the cache is refreshed as appropriate in case the
underlying libraries change.
Attributes:
storage (object): A simple object that provides information about file
paths for this toolchain
toolchain (beat.core.toolchain.Toolchain): The toolchain in which this
experiment is based.
databases (dict): A dictionary containing the names and
:py:class:`beat.core.database.Database` pointers for all referenced
databases.
algorithms (dict): A dictionary containing the names and
:py:class:`beat.core.algorithm.Algorithm` pointers for all referenced
algorithms.
datasets (dict): A dictionary containing the names and
:py:class:`beat.core.database.Database` pointers for all datasets in
this experiment.
blocks (dict): A dictionary containing the names and
:py:class:`beat.core.algorithm.Algorithm` pointers for all blocks in
this experiment.
analyzers (dict): A dictionary containing the names and
:py:class:`beat.core.algorithm.Algorithm` pointers for all analyzers in
this experiment.
errors (list): A list strings containing errors found while loading this
experiment.
data (dict): The original data for this experiment, as loaded by our JSON
decoder.
"""
def __init__(
self,
prefix,
data,
dataformat_cache=None,
database_cache=None,
algorithm_cache=None,
library_cache=None,
):
self.prefix = prefix
# initializes the internal object cache
self.toolchain = None
self._label = None
self.data = None
self.errors = []
self.storage = None
self.datasets = {}
self.blocks = {}
self.loops = {}
self.analyzers = {}
self.databases = {}
self.algorithms = {}
# temporary caches, if the user has not set them, for performance
database_cache = database_cache if database_cache is not None else {}
dataformat_cache = dataformat_cache if dataformat_cache is not None else {}
algorithm_cache = algorithm_cache if algorithm_cache is not None else {}
library_cache = library_cache if library_cache is not None else {}
self._load(
data, database_cache, dataformat_cache, algorithm_cache, library_cache
)
def _load(
self, data, database_cache, dataformat_cache, algorithm_cache, library_cache
):
"""Loads the experiment"""
self._label = None
self.data = None
self.errors = []
if data is None: # Invalid case
# There can't be a prototype for experiments they must be
# filled based on the toolchain and the content of the prefix
raise RuntimeError("Experiments can't have default implementation")
elif isinstance(data, (tuple, list)): # the user has passed a tuple
experiment_data, toolchain_data = data
else: # the user has passed a path-like object
self.storage = Storage(self.prefix, data)
self._label = self.storage.label
experiment_data = self.storage.json.path
toolchain_data = self.storage.toolchain
if not self.storage.exists():
self.errors.append("Experiment declaration file not found: %s" % data)
return
# this runs basic validation, including JSON loading if required
self.data, self.errors = schema.validate("experiment", experiment_data)
if self.errors:
return # don't proceed with the rest of validation
# checks all internal aspects of the experiment
self._check_datasets(database_cache, dataformat_cache)
self._check_blocks(algorithm_cache, dataformat_cache, library_cache)
self._check_loops(algorithm_cache, dataformat_cache, library_cache)
self._check_analyzers(algorithm_cache, dataformat_cache, library_cache)
self._check_global_parameters()
self._load_toolchain(toolchain_data)
if self.errors:
return # stop, if up to here there were problems
# cross-checks all aspects of the experiment against related toolchain
self._crosscheck_toolchain_datasets()
if self.errors:
return
self._crosscheck_toolchain_blocks()
if self.errors:
return
self._crosscheck_toolchain_loops()
if self.errors:
return
self._crosscheck_toolchain_analyzers()
if self.errors:
return
self._crosscheck_connection_dataformats(dataformat_cache)
if self.errors:
return
self._crosscheck_block_algorithm_pertinence()
if self.errors:
return
self._crosscheck_loop_algorithm_pertinence()
def _check_datasets(self, database_cache, dataformat_cache):
"""checks all datasets are valid"""
for dataset, properties in self.data["datasets"].items():
# loads the database
dbname = properties["database"]
if dbname not in self.databases:
# load database
if dbname in database_cache:
db = database_cache[dbname]
else:
db = database.Database(self.prefix, dbname, dataformat_cache)
database_cache[dbname] = db
self.databases[dbname] = db
if db.errors:
self.errors.append(
"/datasets/%s: database `%s' is invalid" % (dataset, dbname)
)
continue
else:
db = self.databases[dbname] # take a loaded value
if db.errors:
continue # already done
# checks that the referred protocol is there
protoname = properties["protocol"]
if protoname not in db.protocols:
self.errors.append(
"/datasets/%s: cannot find protocol `%s' on "
"database `%s' - valid protocols are %s"
% (dataset, protoname, dbname, ", ".join(db.protocols.keys()))
)
continue
# finally, check if the referred set is inside the protocol
setname = properties["set"]
if setname not in db.sets(protoname):
self.errors.append(
"/datasets/%s: cannot find set `%s' on "
"protocol `%s' from database `%s' - valid set names "
"are %s"
% (
dataset,
setname,
protoname,
dbname,
", ".join(db.sets(protoname).keys()),
)
)
continue
# if you get to this point, then adds the set to our cache
self.datasets[dataset] = dict(database=db, protocol=protoname, set=setname)
def _check_blocks(self, algorithm_cache, dataformat_cache, library_cache):
"""checks all blocks are valid"""
for blockname, block in self.data["blocks"].items():
algoname = block["algorithm"]
if algoname not in self.algorithms:
# loads the algorithm
if algoname in algorithm_cache:
thisalgo = algorithm_cache[algoname]
else:
thisalgo = algorithm.Algorithm(
self.prefix, algoname, dataformat_cache, library_cache
)
algorithm_cache[algoname] = thisalgo
self.algorithms[algoname] = thisalgo
if thisalgo.errors:
self.errors.append(
"/blocks/%s: algorithm `%s' is invalid: %s"
% (blockname, algoname, "\n".join(thisalgo.errors))
)
else:
thisalgo = self.algorithms[algoname]
if thisalgo.errors:
continue # already done
# checks all inputs correspond
for algoin, blockin in block["inputs"].items():
if hasattr(thisalgo, "input_map") and algoin not in thisalgo.input_map:
self.errors.append(
"/blocks/%s/inputs/%s: algorithm `%s' does not "
"have an input named `%s' - valid algorithm inputs are %s"
% (
blockname,
blockin,
algoname,
algoin,
", ".join(thisalgo.input_map.keys()),
)
)
# checks all outputs correspond
for algout, blockout in block["outputs"].items():
if (
hasattr(thisalgo, "output_map")
and algout not in thisalgo.output_map
):
self.errors.append(
"/blocks/%s/outputs/%s: algorithm `%s' does not "
"have an output named `%s' - valid algorithm outputs are "
"%s"
% (
blockname,
blockout,
algoname,
algout,
", ".join(thisalgo.output_map.keys()),
)
)
# checks if parallelization make sense
if block.get("nb_slots", 1) > 1 and not thisalgo.splittable:
self.errors.append(
"/blocks/%s/nb_slots: you have set the number of "
"slots for algorithm `%s' to %d, but it is not splittable"
% (blockname, thisalgo.name, block["nb_slots"])
)
# check parameter consistence
for parameter, value in block.get("parameters", {}).items():
try:
thisalgo.clean_parameter(parameter, value)
except Exception as e:
self.errors.append(
"/blocks/%s/parameters/%s: cannot convert "
"value `%s' to required type: %s"
% (blockname, parameter, value, e)
)
self.blocks[blockname] = block
def _check_loops(self, algorithm_cache, dataformat_cache, library_cache):
"""checks all loops are valid"""
loops = self.data.get("loops", {})
for loopname, loop in loops.items():
for key in [PROCESSOR_PREFIX, EVALUATOR_PREFIX]:
algoname = loop[key + "algorithm"]
if algoname not in self.algorithms:
# loads the algorithm
if algoname in algorithm_cache:
thisalgo = algorithm_cache[algoname]
else:
thisalgo = algorithm.Algorithm(
self.prefix, algoname, dataformat_cache, library_cache
)
algorithm_cache[algoname] = thisalgo
self.algorithms[algoname] = thisalgo
if thisalgo.errors:
self.errors.append(
"/loops/%s: algorithm `%s' is invalid:\n%s"
% (loopname, algoname, "\n".join(thisalgo.errors))
)
continue
else:
thisalgo = self.algorithms[algoname]
if thisalgo.errors:
continue # already done
# checks all inputs correspond
for algoin, loop_input in loop[key + "inputs"].items():
if algoin not in thisalgo.input_map:
self.errors.append(
"/loop/%s/inputs/%s: algorithm `%s' does "
"not have an input named `%s' - valid algorithm inputs "
"are %s"
% (
loopname,
loop_input,
algoname,
algoin,
", ".join(thisalgo.input_map.keys()),
)
)
# checks all outputs correspond
for algout, loop_output in loop[key + "outputs"].items():
if (
hasattr(thisalgo, "output_map")
and algout not in thisalgo.output_map
):
self.errors.append(
"/loops/%s/outputs/%s: algorithm `%s' does not "
"have an output named `%s' - valid algorithm outputs are "
"%s"
% (
loopname,
loop_output,
algoname,
algout,
", ".join(thisalgo.output_map.keys()),
)
)
# checks if parallelization makes sense
if loop.get("nb_slots", 1) > 1 and not thisalgo.splittable:
self.errors.append(
"/loop/%s/nb_slots: you have set the number "
"of slots for algorithm `%s' to %d, but it is not "
"splittable" % (algoname, thisalgo.name, loop["nb_slots"])
)
# check parameter consistence
for parameter, value in loop.get("parameters", {}).items():
try:
thisalgo.clean_parameter(parameter, value)
except Exception as e:
self.errors.append(
"/loop/%s/parameters/%s: cannot convert "
"value `%s' to required type: %s"
% (loopname, parameter, value, e)
)
self.loops[loopname] = loop
def _check_analyzers(self, algorithm_cache, dataformat_cache, library_cache):
"""checks all analyzers are valid"""
for analyzername, analyzer in self.data["analyzers"].items():
algoname = analyzer["algorithm"]
if algoname not in self.algorithms:
# loads the algorithm
if algoname in algorithm_cache:
thisalgo = algorithm_cache[algoname]
else:
thisalgo = algorithm.Algorithm(
self.prefix, algoname, dataformat_cache, library_cache
)
algorithm_cache[algoname] = thisalgo
self.algorithms[algoname] = thisalgo
if thisalgo.errors:
self.errors.append(
"/analyzers/%s: algorithm `%s' is invalid:\n%s"
% (analyzername, algoname, "\n".join(thisalgo.errors))
)
continue
else:
thisalgo = self.algorithms[algoname]
if thisalgo.errors:
continue # already done
# checks all inputs correspond
for algoin, analyzerin in analyzer["inputs"].items():
if algoin not in thisalgo.input_map:
self.errors.append(
"/analyzers/%s/inputs/%s: algorithm `%s' does "
"not have an input named `%s' - valid algorithm inputs "
"are %s"
% (
analyzername,
analyzerin,
algoname,
algoin,
", ".join(thisalgo.input_map.keys()),
)
)
# checks if parallelization makes sense
if analyzer.get("nb_slots", 1) > 1 and not thisalgo.splittable:
self.errors.append(
"/analyzer/%s/nb_slots: you have set the number "
"of slots for algorithm `%s' to %d, but it is not "
"splittable" % (analyzername, thisalgo.name, analyzer["nb_slots"])
)
# check parameter consistence
for parameter, value in analyzer.get("parameters", {}).items():
try:
thisalgo.clean_parameter(parameter, value)
except Exception as e:
self.errors.append(
"/analyzer/%s/parameters/%s: cannot convert "
"value `%s' to required type: %s"
% (analyzername, parameter, value, e)
)
self.analyzers[analyzername] = analyzer
def _check_global_parameters(self):
"""checks global parameters"""
for algoname, parameters in self.data["globals"].items():
if algoname in ["queue", "environment"]:
continue # skip that
# else, algorithms must be loaded in memory already
if algoname not in self.algorithms:
self.errors.append(
"/globals/%s: found parameter section for "
"algorithm `%s' which is not used anywhere in the "
"experiment" % (algoname, algoname)
)
continue
# ...and each parameter must validate
thisalgo = self.algorithms[algoname]
if not thisalgo.valid:
continue # doesn't even check
for parameter, value in parameters.items():
try:
thisalgo.clean_parameter(parameter, value)
except Exception as e:
self.errors.append(
"/globals/%s/%s: cannot convert "
"value `%s' to required type: %s"
% (algoname, parameter, value, e)
)
def _load_toolchain(self, data):
"""Loads the related toolchain"""
# finally, we load the toolchain and cross-validate it
self.toolchain = toolchain.Toolchain(self.prefix, data)
if self.toolchain.errors:
if self.storage is not None:
self.errors.append(
"toolchain `%s' is not valid, because:\n * %s"
% (self.storage.toolchain, "\n * ".join(self.toolchain.errors))
)
else:
self.errors.append(
"toolchain data is not valid, because:\n * %s"
% "\n * ".join(self.toolchain.errors)
)
return
def _crosscheck_toolchain_datasets(self):
"""There must exist a 1-to-1 relation to existing datasets"""
toolchain_datasets = self.toolchain.datasets
if sorted(toolchain_datasets.keys()) != sorted(self.datasets.keys()):
self.errors.append(
"mismatch between the toolchain dataset names (%s)"
" and the experiment's (%s)"
% (
", ".join(sorted(toolchain_datasets.keys())),
", ".join(sorted(self.datasets.keys())),
)
)
# toolchain must use a subset of the dataset endpoints
for dataset_name, dataset in self.datasets.items():
db_endpts = set(
dataset["database"]
.set(dataset["protocol"], dataset["set"])["outputs"]
.keys()
)
tc_endpts = set(toolchain_datasets[dataset_name]["outputs"])
if not tc_endpts.issubset(db_endpts):
self.errors.append(
"/datasets/%s: toolchain endpoints (%s) must "
"be a subset of what is available on database `%s', "
"protocol `%s', "
"set `%s' outputs (%s)"
% (
dataset_name,
", ".join(tc_endpts),
dataset["database"].name,
dataset["protocol"],
dataset["set"],
", ".join(db_endpts),
)
)
def _crosscheck_toolchain_blocks(self):
"""There must exist a 1-to-1 relation to existing blocks"""
toolchain_blocks = self.toolchain.blocks
if sorted(toolchain_blocks.keys()) != sorted(self.blocks.keys()):
self.errors.append(
"mismatch between the toolchain block names (%s)"
" and the experiment's (%s)"
% (
", ".join(sorted(toolchain_blocks.keys())),
", ".join(sorted(self.blocks.keys())),
)
)
# the number of block endpoints and the toolchain's must match
for block_name, block in self.blocks.items():
if len(block["inputs"]) != len(toolchain_blocks[block_name]["inputs"]):
self.errors.append(
"/blocks/%s: toolchain blocks has %d inputs "
"while the experiment has %d inputs"
% (
block_name,
len(toolchain_blocks[block_name]["inputs"]),
len(block["inputs"]),
)
)
def _crosscheck_toolchain_loops(self):
"""There must exist a 1-to-1 relation to existing loops"""
toolchain_loops = self.toolchain.loops
if sorted(toolchain_loops.keys()) != sorted(self.loops.keys()):
self.errors.append(
"mismatch between the toolchain loop names (%s)"
" and the experiment's (%s)"
% (
", ".join(sorted(toolchain_loops.keys())),
", ".join(sorted(self.loops.keys())),
)
)
# the number of block endpoints and the toolchain's must match
for block_name, block in self.loops.items():
for prefix in [PROCESSOR_PREFIX, EVALUATOR_PREFIX]:
block_input_count = len(block[prefix + "inputs"])
toolchain_input_block = len(
toolchain_loops[block_name][prefix + "inputs"]
)
if block_input_count != toolchain_input_block:
self.errors.append(
"/loops/{}: toolchain loops has {} {}inputs "
"while the experiment has {} inputs".format(
block_name, toolchain_input_block, prefix, block_input_count
)
)
def _crosscheck_toolchain_analyzers(self):
"""There must exist a 1-to-1 relation to existing analyzers"""
toolchain_analyzers = self.toolchain.analyzers
if sorted(toolchain_analyzers.keys()) != sorted(self.analyzers.keys()):
self.errors.append(
"mismatch between the toolchain analyzer names "
"(%s) and the experiment's (%s)"
% (
", ".join(sorted(toolchain_analyzers.keys())),
", ".join(sorted(self.analyzers.keys())),
)
)
# the number of analyzer endpoints and the toolchain's must match
for analyzer_name, analyzer in self.analyzers.items():
if len(analyzer["inputs"]) != len(
toolchain_analyzers[analyzer_name]["inputs"]
):
self.errors.append(
"/analyzers/%s: toolchain analyzers has %d "
"inputs while the experiment has %d inputs"
% (
analyzer_name,
len(toolchain_analyzers[analyzer_name]["inputs"]),
len(analyzer["inputs"]),
)
)
def _crosscheck_connection_dataformats(self, dataformat_cache):
"""Connected endpoints must use the same dataformat as defined by the
generator and receptor algorithms
"""
for connection in self.toolchain.connections:
from_endpt = connection["from"].split(".", 1)
if from_endpt[0] in self.datasets:
dataset = self.datasets[from_endpt[0]]
from_dtype = dataset["database"].set(
dataset["protocol"], dataset["set"]
)["outputs"][from_endpt[1]]
from_name = "dataset"
elif from_endpt[0] in self.blocks: # it is a block
block = self.blocks[from_endpt[0]]
mapping = block["outputs"]
imapping = dict(zip(mapping.values(), mapping.keys()))
algout = imapping[from_endpt[1]] # name of output on algorithm
from_dtype = self.algorithms[block["algorithm"]].output_map[algout]
from_name = "block"
elif from_endpt[0] in self.loops:
loop = self.loops[from_endpt[0]]
for prefix in [PROCESSOR_PREFIX, EVALUATOR_PREFIX]:
mapping = loop[prefix + "outputs"]
imapping = dict(zip(mapping.values(), mapping.keys()))
if from_endpt[1] in imapping:
algout = imapping[from_endpt[1]] # name of output on algorithm
from_dtype = self.algorithms[
loop[prefix + "algorithm"]
].output_map[algout]
break
from_name = "loop"
else:
self.errors.append("Unknown endpoint %s" % from_endpt[0])
continue
to_endpt = connection["to"].split(".", 1)
if to_endpt[0] in self.blocks:
block = self.blocks[to_endpt[0]]
mapping = block["inputs"]
imapping = dict(zip(mapping.values(), mapping.keys()))
algoin = imapping[to_endpt[1]] # name of input on algorithm
to_dtype = self.algorithms[block["algorithm"]].input_map[algoin]
to_name = "block"
elif to_endpt[0] in self.loops:
loop = self.loops[to_endpt[0]]
for prefix in [PROCESSOR_PREFIX, EVALUATOR_PREFIX]:
mapping = loop[prefix + "inputs"]
imapping = dict(zip(mapping.values(), mapping.keys()))
if to_endpt[1] in imapping:
algoin = imapping[to_endpt[1]] # name of input on algorithm
to_dtype = self.algorithms[
loop[prefix + "algorithm"]
].input_map[algoin]
break
to_name = "loop"
elif to_endpt[0] in self.analyzers: # it is an analyzer
analyzer = self.analyzers[to_endpt[0]]
mapping = analyzer["inputs"]
imapping = dict(zip(mapping.values(), mapping.keys()))
algoin = imapping[to_endpt[1]] # name of input on algorithm
to_dtype = self.algorithms[analyzer["algorithm"]].input_map[algoin]
to_name = "analyzer"
else:
self.errors.append("Unknown endpoint %s" % to_endpt[0])
continue
if from_dtype == to_dtype:
continue # OK
# The other acceptable condition is that the receiving end is a
# Â subset of the producing end. This can happen if the producing end
# is a subclass of the receiving end - that is, the receiving end
# uses a data format that is a parent of the producing end.
from_format = dataformat_cache[from_dtype]
to_format = dataformat_cache[to_dtype]
if to_format.isparent(from_format):
continue # OK
# If you get to this point, then an error must be issued
self.errors.append(
"mismatch in data type at connection (%s) %s "
"-> (%s) %s - start point uses `%s' while end point "
"uses `%s' (must be equal or a parent format)"
% (
from_name,
".".join(from_endpt),
to_name,
".".join(to_endpt),
from_dtype,
to_dtype,
)
)
def _crosscheck_block_algorithm_pertinence(self):
"""The number of groups and the input-output connectivity must respect
the individual synchronization channels and the block's.
"""
for name, block in self.data["blocks"].items():
# filter connections that end on the visited block - remember, each
# input is checked for receiving a single input connection. It is
# illegal to connect an input multiple times. At this point, you
# already know that is not the case.
input_connections = [
k["channel"]
for k in self.toolchain.connections
if k["to"].startswith(name + ".")
]
# filter connections that start on the visited block, retain output
# name so we can check synchronization and then group
output_connections = set(
[
(k["from"].replace(name + ".", ""), k["channel"])
for k in self.toolchain.connections
if k["from"].startswith(name + ".")
]
)
output_connections = [k[1] for k in output_connections]
# note: dataformats have already been checked - only need to check
# for the grouping properties between inputs and outputs
# create channel groups
chain_in = collections.Counter(input_connections)
chain_out = collections.Counter(output_connections)
chain_groups = [(v, chain_out.get(k, 0)) for k, v in chain_in.items()]
# now check the algorithm for conformance
algo_groups = self.algorithms[self.blocks[name]["algorithm"]].groups
algo_groups = [
(len(k["inputs"]), len(k.get("outputs", []))) for k in algo_groups
]
if collections.Counter(chain_groups) != collections.Counter(algo_groups):
self.errors.append(
"synchronization mismatch in input/output "
"grouping between block `%s' and algorithm `%s'"
% (name, self.blocks[name]["algorithm"])
)
def _crosscheck_loop_algorithm_pertinence(self):
"""The number of groups and the input-output connectivity must respect
the individual synchronization channels and the block's.
"""
loops = self.data.get("loops", {})
for name, loop in loops.items():
# filter connections that end on the visited block - remember, each
# input is checked for receiving a single input connection. It is
# illegal to connect an input multiple times. At this point, you
# already know that is not the case.
input_connections = [
k["channel"]
for k in self.toolchain.connections
if k["to"].startswith(name + ".")
]
# filter connections that start on the visited block, retain output
# name so we can check synchronization and then group
output_connections = set(
[
(k["from"].replace(name + ".", ""), k["channel"])
for k in self.toolchain.connections
if k["from"].startswith(name + ".")
]
)
output_connections = [k[1] for k in output_connections]
# note: dataformats have already been checked - only need to check
# for the grouping properties between inputs and outputs
# create channel groups
chain_in = collections.Counter(input_connections)
chain_out = collections.Counter(output_connections)
chain_groups_count = [(v, chain_out.get(k, 0)) for k, v in chain_in.items()]
# now check the algorithms for conformance
processor_algorithm_name = loop[PROCESSOR_PREFIX + "algorithm"]
evaluator_algorithm_name = loop[EVALUATOR_PREFIX + "algorithm"]
processor_algo_groups_list = self.algorithms[
processor_algorithm_name
].groups
evaluator_algo_groups_list = self.algorithms[
evaluator_algorithm_name
].groups
groups_count = []
for processor_algo_groups, evaluator_algo_groups in itertools.zip_longest(
processor_algo_groups_list, evaluator_algo_groups_list
):
inputs = 0
outputs = 0
if processor_algo_groups:
inputs = len(processor_algo_groups["inputs"])
outputs = len(processor_algo_groups.get("outputs", []))
if evaluator_algo_groups:
inputs += len(evaluator_algo_groups["inputs"])
outputs += len(evaluator_algo_groups.get("outputs", []))
groups_count.append((inputs, outputs))
if collections.Counter(chain_groups_count) != collections.Counter(
groups_count
):
self.errors.append(
"synchronization mismatch in input/output "
"grouping between loop `{}', algorithm `{}' "
"and loop algorithm `{}'".format(
name, processor_algorithm_name, evaluator_algorithm_name
)
)
for processor_algo_groups, evaluator_algo_groups in zip(
processor_algo_groups_list, evaluator_algo_groups_list
):
processor_algo_loop = processor_algo_groups["loop"]
evaluator_algo_loop = evaluator_algo_groups["loop"]
for channel in ["request", "answer"]:
if (
processor_algo_loop[channel]["type"]
!= evaluator_algo_loop[channel]["type"]
):
self.errors.append(
"{} loop channel type incompatible between {} and {}".format(
channel,
processor_algorithm_name,
evaluator_algorithm_name,
)
)
def _crosscheck_analyzer_algorithm_pertinence(self):
"""
The number of groups and the input-output connectivity must respect the
individual synchronization channels and the analyzer.
"""
for name, analyzer in self.data["analyzers"].items():
# filter connections that end on the visited block
input_connections = [
k["channel"]
for k in self.toolchain.connections
if k["to"].startswith(name + ".")
]
# note: dataformats have already been checked - only need to check
# for the grouping properties for the inputs
# create channel groups
chain_groups = collections.Counter(input_connections)
# now check the algorithm for conformance
algo_groups = self.algorithms[self.analyzers[name]["algorithm"]].groups
algo_groups = [len(k["inputs"]) for k in algo_groups]
if collections.Counter(chain_groups) != collections.Counter(algo_groups):
self.errors.append(
"synchronization mismatch in input "
"grouping between analyzer `%s' and algorithm `%s'"
% (name, self.analyzers[name]["algorithm"])
)
@property
def label(self):
"""Label of this experiment"""
return self._label or "__unlabelled_experiment__"
name = label # compatiblity
@label.setter
def label(self, value):
self._label = value
self.storage = Storage(self.prefix, value)
@property
def schema_version(self):
"""Schema version"""
return self.data.get("schema_version", 1)
@property
def valid(self):
"""A boolean that indicates if this experiment is valid or not"""
return not bool(self.errors)
def _inputs(self, name, input_prefix=""):
"""Calculates and returns the inputs for a given block"""
# filter connections that end on the visited block
_connections = [
k for k in self.toolchain.connections if k["to"].startswith(name + ".")
]
# organize the connections into a map: block input name -> from key
connections = dict(
[
(
k["to"].replace(name + ".", ""),
tuple(k["from"].split(".", 1) + [k["channel"]]),
)
for k in _connections
]
)
# makes sure we don't have multiple incomming connections
assert len(_connections) == len(connections), ( # nosec
"detected multiple input "
"connections for block `%s' on experiment `%s'" % (name, self.label)
)
retval = dict()
# config_data = self.blocks.get(name, self.analyzers.get(name))
for item in [self.blocks, self.loops, self.analyzers]:
if name in item:
config_data = item[name]
break
if config_data is None:
raise KeyError("did not find `%s' among blocks, loops or analyzers" % name)
# if get_loop_data:
# inputs = config_data[EVALUATOR_PREFIX + "inputs"]
# else:
# inputs = config_data[PROCESSOR_PREFIX + "inputs"]
inputs = config_data[input_prefix + "inputs"]
for algo_endpt, block_endpt in inputs.items():
block, output, channel = connections[block_endpt]
if block in self.toolchain.datasets:
dataset_config = self.datasets[block]
retval[algo_endpt] = dict(
database=dataset_config["database"].name,
protocol=dataset_config["protocol"],
set=dataset_config["set"],
output=output, # dataset output name always matches block's!
endpoint=block_endpt, # the block intake name
channel=channel,
hash=hash.hashDataset(
dataset_config["database"].name,
dataset_config["protocol"],
dataset_config["set"],
),
)
# the path in the cache is calculated from the hash
retval[algo_endpt]["path"] = hash.toPath(
retval[algo_endpt]["hash"], suffix=".db"
)
else: # a normal block
# Here comes the trick: block hashes cannot be easily generated
# - they require the input hashes to be adequately generated.
# The way forward is to gather all inputs **and** outputs and
# then go one by one generating the input **and** output hashes
# until all is done.
retval[algo_endpt] = {
"from": "%s.%s" % (block, output),
"channel": channel,
"endpoint": block_endpt, # the block intake name
}
return retval
def _block_outputs(self, name, output_prefix=""):
"""Calculates and returns the outputs for a given block"""
for item in [self.blocks, self.loops]:
if name in item:
config_data = item[name]
break
if config_data is None:
raise KeyError("did not find `%s' among blocks or loops" % name)
# filter connections that end on the visited block
connections = [
k for k in self.toolchain.connections if k["from"].startswith(name + ".")
]
# organize the connections into a map: block input name -> from key
connections = dict(
[
(
k["from"].replace(name + ".", ""),
tuple(k["to"].split(".", 1) + [k["channel"]]),
)
for k in connections
]
)
retval = dict()
# notice: there can be multiply connected outputs
# if get_loop_data:
# outputs = config_data[EVALUATOR_PREFIX + "outputs"]
# else:
# outputs = config_data[PROCESSOR_PREFIX + "outputs"]
outputs = config_data[output_prefix + "outputs"]
for algo_endpt, block_endpt in outputs.items():
block, input, channel = connections[block_endpt]
retval[algo_endpt] = dict(
channel=channel, endpoint=block_endpt # the block outtake name
)
return retval
def _configuration(self, name):
"""Returns the execution configuration for a particular block
This method returns the (JSON) configuration for a particular block in
this experiment. This configuration is sent to worker nodes when the
platform wants to command the execution of a particular algorithm.
The variable ``blocks`` from this object contains a dictionary with all
block names as keys.
Parameters:
name (str): The name of the block from which to get the configuration
of. If the block does not exist, raises a :py:class:`KeyError`.
Raises:
KeyError: if the block name does not exist in this experiment.
"""
for item in [self.blocks, self.loops, self.analyzers]:
if name in item:
config_data = item[name]
break
# resolve the execution information
queue = config_data.get("queue", self.data["globals"]["queue"])
nb_slots = config_data.get("nb_slots", 1)
toolchain_data = self.toolchain.algorithm_item(name)
if toolchain_data is None:
raise KeyError("did not find `%s' among blocks, loops or analyzers" % name)
if name in self.loops:
def build_block_data(name, config_data, algorithm_prefix):
# resolve parameters taking globals in consideration
algorithm_name = config_data[algorithm_prefix + "algorithm"]
parameters = self.data["globals"].get(algorithm_name)
if parameters is None:
parameters = dict()
else:
parameters = dict(parameters) # copy
parameters.update(config_data.get(algorithm_prefix + "parameters", {}))
environment = config_data.get(
algorithm_prefix + "environment",
self.data["globals"]["environment"],
)
return dict(
inputs=self._inputs(name, algorithm_prefix),
outputs=self._block_outputs(name, algorithm_prefix),
channel=toolchain_data["synchronized_channel"],
algorithm=algorithm_name,
parameters=parameters,
queue=queue,
environment=environment,
)
retval = build_block_data(name, config_data, PROCESSOR_PREFIX)
retval["nb_slots"] = nb_slots
retval["loop"] = build_block_data(name, config_data, EVALUATOR_PREFIX)
else:
env = config_data.get("environment", self.data["globals"]["environment"])
# resolve parameters taking globals in consideration
parameters = self.data["globals"].get(config_data["algorithm"])
if parameters is None:
parameters = dict()
else:
parameters = dict(parameters) # copy
parameters.update(config_data.get("parameters", {}))
retval = dict(
inputs=self._inputs(name),
channel=toolchain_data["synchronized_channel"],
algorithm=config_data["algorithm"],
parameters=parameters,
queue=queue,
environment=env,
nb_slots=nb_slots,
)
if name in self.blocks:
retval["outputs"] = self._block_outputs(name)
else:
# Analyzers have only 1 output file/cache. This is the result of an
# optimization as most of the outputs are single numbers.
# Furthermore, given we need to read it out on beat.web, having a
# single file optimizes resource usage. The synchronization channel
# for the analyzer itself is respected.
retval["result"] = dict() # missing the hash/path
return retval
[docs] def setup(self):
"""Prepares the experiment so it can be executed by a scheduling
service.
This method will calculate the block execution order and prepare the
configuration for each block in the experiment so its execution can be
carried out by an adequate scheduling service.
Returns:
collections.OrderedDict: An ordered dictionary with the
block/analyzer execution order and configuration details. The keys
of this ordered dictionary correspond to the block and analyzer
names in the toolchain. The values correspond to a list of
dependencies for the given block in terms of other block names and
the block/analyzer configuration, as a dictionary.
"""
exec_order = self.toolchain.execution_order()
for key in exec_order:
exec_order[key] = dict(
dependencies=exec_order[key], configuration=self._configuration(key)
)
# import ipdb; ipdb.set_trace()
for key, value in exec_order.items():
# now compute missing hashes - because we're in execution order,
# there should be no missing input hashes in any of the blocks.
config = value["configuration"]
if "outputs" in config: # it is a block
def process_config(config):
block_outputs = {}
for output, output_value in config["outputs"].items():
output_value["hash"] = hash.hashBlockOutput(
key,
config["algorithm"],
self.algorithms[config["algorithm"]].hash(),
config["parameters"],
config["environment"],
dict([(k, v["hash"]) for k, v in config["inputs"].items()]),
output,
)
output_value["path"] = hash.toPath(output_value["hash"], "")
# set the inputs for the following blocks
block_outputs[
"%s.%s" % (key, output_value["endpoint"])
] = output_value
dependents = [
exec_order[k]["configuration"]
for k in exec_order
if key in exec_order[k]["dependencies"]
]
# updates inputs which have not yet been updated
for dependent in dependents:
def process_inputs(inputs):
for input_name, input_value in inputs.items():
if input_value.get("from") in block_outputs.keys():
input_value["hash"] = block_outputs[
input_value.get("from")
]["hash"]
input_value["path"] = block_outputs[
input_value.get("from")
]["path"]
del input_value[
"from"
] # no need for further update
inputs = dependent["inputs"]
process_inputs(inputs)
if "loop" in dependent:
process_inputs(dependent["loop"]["inputs"])
process_config(config)
if "loop" in config:
process_config(config["loop"])
else: # it is an analyzer: 1 single output
config["result"]["hash"] = hash.hashAnalyzer(
key,
config["algorithm"],
self.algorithms[config["algorithm"]].hash(),
config["parameters"],
config["environment"],
dict([(k, v["hash"]) for k, v in config["inputs"].items()]),
)
config["result"]["path"] = hash.toPath(config["result"]["hash"], "")
return exec_order
[docs] def dot_diagram(self):
"""Returns a dot diagram representation of the experiment"""
title = "Experiment: %s" % self.label
def __label_callback(type, name):
"""Adds experiment information to the given block"""
if type == "dataset":
info = self.datasets[name]
return "<b><u>%s</u></b><br/>%s<br/><i>%s:%s</i>" % (
name,
info["database"].name,
info["protocol"],
info["set"],
)
elif type == "block":
info = self.blocks[name]
env = info.get("environment", self.data["globals"]["environment"])
nb_slots = info.get("nb_slots", 1)
return "<b><u>%s</u></b><br/>%s<br/><i>@%s(%s) x %d</i>" % (
name,
info["algorithm"],
env["name"],
env["version"],
nb_slots,
)
elif type == "analyzer":
info = self.analyzers[name]
env = info.get("environment", self.data["globals"]["environment"])
nb_slots = info.get("nb_slots", 1)
return "<b><u>%s</u></b><br/>%s<br/><i>@%s(%s) x %d</i>" % (
name,
info["algorithm"],
env["name"],
env["version"],
nb_slots,
) # , ports)
else:
return name
def __result_callback(name):
"""Adds result information to analyzers"""
def __mkport(k, v):
name = k if not v["display"] else "+" + k
return "%s<br/>(%s)" % (name, v["type"])
info = self.analyzers[name]
results = self.algorithms[info["algorithm"]].results
return [__mkport(k, v) for k, v in results.items()]
def __edge_callback(start):
"""Adds the datatype to the given block"""
block, endpoint = start.split(".", 1)
if block in self.datasets:
db = self.datasets[block]
dbset = db["database"].set(db["protocol"], db["set"])
return dbset["outputs"][endpoint]
else:
forward_mapping = self.blocks[block]["outputs"]
reverse_mapping = dict((v, k) for k, v in forward_mapping.items())
algo_output = reverse_mapping[endpoint]
algo = self.algorithms[self.blocks[block]["algorithm"]]
return algo.output_map[algo_output]
return self.toolchain.dot_diagram(
title, __label_callback, __edge_callback, __result_callback
)
@property
def description(self):
"""The short description for this object"""
return self.data.get("description", None)
@description.setter
def description(self, value):
"""Sets the short description for this object"""
self.data["description"] = value
@property
def documentation(self):
"""The full-length description for this object"""
if not self._label:
raise RuntimeError("experiment has no label")
if self.storage.doc.exists():
return self.storage.doc.load()
return None
@documentation.setter
def documentation(self, value):
"""Sets the full-length description for this object"""
if not self._label:
raise RuntimeError("experiment has no label")
if hasattr(value, "read"):
self.storage.doc.save(value.read())
else:
self.storage.doc.save(value)
[docs] def hash(self):
"""Returns the hexadecimal hash for its declaration"""
if not self._label:
raise RuntimeError("experiment has no label")
return self.storage.hash()
[docs] def json_dumps(self, indent=4):
"""Dumps the JSON declaration of this object in a string
Parameters:
indent (int): The number of indentation spaces at every indentation
level
Returns:
str: The JSON representation for this object
"""
return json.dumps(self.data, indent=indent, cls=utils.NumpyJSONEncoder)
def __str__(self):
return self.json_dumps()
[docs] def write(self, storage=None):
"""Writes contents to prefix location
Parameters:
storage (:py:class:`.Storage`, Optional): If you pass a new storage,
then this object will be written to that storage point rather than
its default.
"""
if storage is None:
if not self._label:
raise RuntimeError("experiment has no label")
storage = self.storage # overwrite
storage.save(str(self), self.description)
[docs] def export(self, prefix):
"""Recursively exports itself into another prefix
Databases and algorithms associated are also exported recursively
Parameters:
prefix (str): A path to a prefix that must different then my own.
Returns:
None
Raises:
RuntimeError: If prefix and self.prefix point to the same directory.
"""
if not self._label:
raise RuntimeError("experiment has no label")
if not self.valid:
raise RuntimeError("experiment is not valid")
if prefix == self.prefix:
raise RuntimeError(
"Cannot export experiment to the same prefix (" "%s)" % (prefix)
)
self.toolchain.write(toolchain.Storage(prefix, self.toolchain.name))
for k in self.algorithms.values():
k.export(prefix)
for k in self.databases.values():
k.export(prefix)
self.write(Storage(prefix, self.name))