#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###################################################################################
# #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# Redistribution and use in source and binary forms, with or without #
# modification, are permitted provided that the following conditions are met: #
# #
# 1. Redistributions of source code must retain the above copyright notice, this #
# list of conditions and the following disclaimer. #
# #
# 2. Redistributions in binary form must reproduce the above copyright notice, #
# this list of conditions and the following disclaimer in the documentation #
# and/or other materials provided with the distribution. #
# #
# 3. Neither the name of the copyright holder nor the names of its contributors #
# may be used to endorse or promote products derived from this software without #
# specific prior written permission. #
# #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #
# #
###################################################################################
"""
=========
algorithm
=========
Validation for algorithms
Forward importing from :py:mod:`beat.backend.python.algorithm`
:py:class:`beat.backend.python.algorithm.Storage`
:py:class:`beat.backend.python.algorithm.Runner`
"""
import os
import numpy
import pkg_resources
import simplejson as json
import six
from beat.backend.python.algorithm import Algorithm as BackendAlgorithm
from beat.backend.python.algorithm import Runner # noqa
from beat.backend.python.algorithm import Storage
from . import dataformat
from . import library
from . import prototypes
from . import schema
[docs]def load_algorithm_prototype(prefix):
prototype_data = pkg_resources.resource_string(
__name__, "prototypes/algorithm.json"
)
algorithm_data = json.loads(prototype_data)
ref_dataformats = ["integer", "integers"]
dataformat = None
for ref_dataformat in ref_dataformats:
for root, dirs, _ in os.walk(os.path.join(prefix, "dataformats")):
if ref_dataformat in dirs:
dataformat_versions = sorted(
os.listdir(os.path.join(root, ref_dataformat))
)
version = dataformat_versions[-1].split(".")[0]
dataformat = "{}/{}/{}".format(
os.path.basename(root), ref_dataformat, version
)
break
if dataformat is None:
raise RuntimeError(
"Reference data formats [{}] not found".format(",".join(ref_dataformats))
)
algorithm_data["groups"][0]["inputs"]["in_data"]["type"] = dataformat
algorithm_data["groups"][0]["outputs"]["out_data"]["type"] = dataformat
return algorithm_data
[docs]class Algorithm(BackendAlgorithm):
"""Algorithms represent runnable components within the platform.
This class can only parse the meta-parameters of the algorithm (i.e., input
and output declaration, grouping, synchronization details, parameters and
splittability). The actual algorithm is not directly treated by this class.
It can, however, provide you with a loader for actually running the
algorithmic code (see :py:meth:`.runner`).
Parameters:
prefix (str): Establishes the prefix of your installation.
data (:py:class:`object`, Optional): The piece of data representing the
algorithm. It must validate against the schema defined for algorithms.
If a string is passed, it is supposed to be a valid path to an
algorithm in the designated prefix area. If a tuple is passed (or a
list), then we consider that the first element represents the algorithm
declaration, while the second, the code for the algorithm (either in
its source format or as a binary blob). If ``None`` is passed, loads
our default prototype for algorithms (source code will be in Python).
dataformat_cache (:py:class:`dict`, Optional): A dictionary mapping
dataformat names to loaded dataformats. This parameter is optional and,
if passed, may greatly speed-up algorithm loading times as dataformats
that are already loaded may be re-used.
library_cache (:py:class:`dict`, Optional): A dictionary mapping library
names to loaded libraries. This parameter is optional and, if passed,
may greatly speed-up library loading times as libraries that are
already loaded may be re-used.
Attributes:
name (str): The algorithm name
description (str): The short description string, loaded from the JSON
file if one was set.
documentation (str): The full-length docstring for this object.
storage (object): A simple object that provides information about file
paths for this algorithm
dataformats (dict): A dictionary containing all pre-loaded dataformats
used by this algorithm. Data format objects will be of type
:py:class:`beat.core.dataformat.DataFormat`.
libraries (dict): A mapping object defining other libraries this
algorithm needs to load so it can work properly.
uses (dict): A mapping object defining the required library import name
(keys) and the full-names (values).
parameters (dict): A dictionary containing all pre-defined parameters
that this algorithm accepts.
splittable (bool): A boolean value that indicates if this algorithm is
automatically parallelizeable by our backend.
input_map (dict): A dictionary where the key is the input name and the
value, its type. All input names (potentially from different groups)
are comprised in this dictionary.
output_map (dict): A dictionary where the key is the output name and the
value, its type. All output names (potentially from different groups)
are comprised in this dictionary.
results (dict): If this algorithm is actually an analyzer (i.e., there
are no formal outputs, but results that must be saved by the platform),
then this dictionary contains the names and data types of those
elements.
groups (dict): A list containing dictionaries with inputs and outputs
belonging to the same synchronization group.
errors (list): A list containing errors found while loading this
algorithm.
data (dict): The original data for this algorithm, as loaded by our JSON
decoder.
code (str): The code that is associated with this algorithm, loaded as a
text (or binary) file.
"""
dataformat_klass = dataformat.DataFormat
def __init__(self, prefix, data, dataformat_cache=None, library_cache=None):
super(Algorithm, self).__init__(prefix, data, dataformat_cache, library_cache)
def _load(self, data, dataformat_cache, library_cache):
"""Loads the algorithm"""
self.errors = []
self.data = None
self.code = None
self._name = None
self.storage = None
self.dataformats = {} # preloaded dataformats
self.libraries = {} # preloaded libraries
code = None
if data is None: # loads prototype and validates it
data = None
code = None
elif isinstance(data, (tuple, list)): # user has passed individual info
data, code = data # break down into two components
if isinstance(data, six.string_types): # user has passed a file pointer
self._name = data
self.storage = Storage(self.prefix, self._name)
if not self.storage.json.exists():
self.errors.append("Algorithm declaration file not found: %s" % data)
return
data = self.storage.json.path # loads data from JSON declaration
# At this point, `data' can be a dictionary or ``None``
if data is None: # loads the default declaration for an algorithm
algorithm_data = load_algorithm_prototype(self.prefix)
self.data, self.errors = schema.validate("algorithm", algorithm_data)
assert not self.errors, "\n * %s" % "\n *".join(self.errors) # nosec
else: # just assign it
# this runs basic validation, including JSON loading if required
self.data, self.errors = schema.validate("algorithm", data)
if self.errors:
return # don't proceed with the rest of validation
if self.storage is not None: # loading from the disk, check code
if not self.storage.code.exists():
if self.data["language"] != "cxx":
self.errors.append(
"Algorithm code not found: %s" % self.storage.code.path
)
return
else:
code = self.storage.code.load()
# At this point, `code' can be a string (or a binary blob) or ``None``
if code is None: # loads the default code for an algorithm
self.code = prototypes.binary_load("algorithm.py")
self.data["language"] = "python"
else: # just assign it - notice that in this case, no language is set
self.code = code
if self.errors:
return # don't proceed with the rest of validation
# if no errors so far, make sense out of the declaration data
self.groups = self.data["groups"]
# now we check for consistence
self._check_endpoint_uniqueness()
# create maps for easy access to data
self.input_map = dict(
[(k, v["type"]) for g in self.groups for k, v in g["inputs"].items()]
)
self.output_map = dict(
[
(k, v["type"])
for g in self.groups
for k, v in g.get("outputs", {}).items()
]
)
self.loop_map = dict(
[(k, v["type"]) for g in self.groups for k, v in g.get("loop", {}).items()]
)
self._validate_required_dataformats(dataformat_cache)
self._convert_parameter_types()
# finally, the libraries
self._validate_required_libraries(library_cache)
self._check_language_consistence()
def _check_endpoint_uniqueness(self):
"""Checks for name clashes accross input/output groups"""
all_input_names = []
for group in self.groups:
all_input_names.extend(group["inputs"].keys())
if len(set(all_input_names)) != len(all_input_names):
self.errors.append(
"repeated input name in algorithm `%s' "
"declaration: %s" % (self.name, ", ".join(all_input_names))
)
# all outputs must have unique names
all_output_names = []
for group in self.groups:
if "outputs" not in group:
continue
all_output_names.extend(group["outputs"].keys())
if len(set(all_output_names)) != len(all_output_names):
self.errors.append(
"repeated output name in algorithm `%s' "
"declaration: %s" % (self.name, ", ".join(all_output_names))
)
def _validate_format(self, type_name, group_name, entry_name, dataformat):
if dataformat.errors:
self.errors.append(
"found error validating data format `%s' "
"for %s `%s' on algorithm `%s': %s"
% (
type_name,
group_name,
entry_name,
self.name,
"\n".join(dataformat.errors),
)
)
def _validate_dataformats(self, group, group_name, dataformat_cache):
for name, entry in group[group_name].items():
type_name = entry["type"]
thisformat = self._update_dataformat_cache(type_name, dataformat_cache)
self._validate_format(type_name, group_name, name, thisformat)
def _validate_required_dataformats(self, dataformat_cache):
"""Makes sure we can load all requested formats"""
for group in self.groups:
for name, input_ in group["inputs"].items():
self._validate_dataformats(group, "inputs", dataformat_cache)
if "outputs" in group:
self._validate_dataformats(group, "outputs", dataformat_cache)
if "loop" in group:
self._validate_dataformats(group, "loop", dataformat_cache)
if self.results:
for name, result in self.results.items():
result_type = result["type"]
# results can only contain base types and plots therefore, only
# process plots
if result_type.find("/") != -1:
thisformat = self._update_dataformat_cache(
result_type, dataformat_cache
)
self._validate_format(result_type, "result", name, thisformat)
def _convert_parameter_types(self):
"""Converts types to numpy equivalents, checks defaults, ranges and
choices
"""
def _try_convert(name, tp, value, desc):
try:
return tp.type(value)
except Exception as e:
self.errors.append(
"%s for parameter `%s' cannot be cast to type "
"`%s': %s" % (desc, name, tp.name, e)
)
if self.parameters is None:
return
for name, parameter in self.parameters.items():
if parameter["type"] == "string":
parameter["type"] = numpy.dtype("str")
else:
parameter["type"] = numpy.dtype(parameter["type"])
if "range" in parameter:
parameter["range"][0] = _try_convert(
name, parameter["type"], parameter["range"][0], "start of range"
)
parameter["range"][1] = _try_convert(
name, parameter["type"], parameter["range"][1], "end of range"
)
if parameter["range"][0] >= parameter["range"][1]:
self.errors.append(
"range for parameter `%s' has a start greater "
"then the end value (%r >= %r)"
% (name, parameter["range"][0], parameter["range"][1])
)
if "choice" in parameter:
for i, choice in enumerate(parameter["choice"]):
parameter["choice"][i] = _try_convert(
name,
parameter["type"],
parameter["choice"][i],
"choice[%d]" % i,
)
if "default" in parameter:
parameter["default"] = _try_convert(
name, parameter["type"], parameter["default"], "default"
)
if "range" in parameter: # check range
if (
parameter["default"] < parameter["range"][0]
or parameter["default"] > parameter["range"][1]
):
self.errors.append(
"default for parameter `%s' (%r) is not "
"within parameter range [%r, %r]"
% (
name,
parameter["default"],
parameter["range"][0],
parameter["range"][1],
)
)
if "choice" in parameter: # check choices
if parameter["default"] not in parameter["choice"]:
self.errors.append(
"default for parameter `%s' (%r) is not "
"a valid choice `[%s]'"
% (
name,
parameter["default"],
", ".join(["%r" % k for k in parameter["choice"]]),
)
)
def _validate_required_libraries(self, library_cache):
# all used libraries must be loadable; cannot use self as a library
if self.uses:
for name, value in self.uses.items():
self.libraries[value] = library_cache.setdefault(
value, library.Library(self.prefix, value, library_cache)
)
if not self.libraries[value].valid:
self.errors.append(
"referred library `%s' (%s) is not valid"
% (self.libraries[value].name, name)
)
def _check_language_consistence(self):
# all used libraries must be programmed with the same language
if self.language == "unknown":
return # bail out on unknown language
if self.uses:
for name, library_name in self.uses.items():
if library_name not in self.libraries:
continue # invalid
if self.libraries[library_name].data is None:
self.errors.append(
"language for used library `%s' cannot be "
"inferred as the library was not properly loaded"
% (library_name,)
)
continue
if self.libraries[library_name].language != self.language:
self.errors.append(
"language for used library `%s' (`%s') "
"differs from current language for this algorithm (`%s')"
% (
library_name,
self.libraries[library_name].language,
self.language,
)
)