#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###################################################################################
# #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# Redistribution and use in source and binary forms, with or without #
# modification, are permitted provided that the following conditions are met: #
# #
# 1. Redistributions of source code must retain the above copyright notice, this #
# list of conditions and the following disclaimer. #
# #
# 2. Redistributions in binary form must reproduce the above copyright notice, #
# this list of conditions and the following disclaimer in the documentation #
# and/or other materials provided with the distribution. #
# #
# 3. Neither the name of the copyright holder nor the names of its contributors #
# may be used to endorse or promote products derived from this software without #
# specific prior written permission. #
# #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #
# #
###################################################################################
"""
====
base
====
Execution utilities
"""
import collections
import glob
import logging
import os
import simplejson as json
from beat.backend.python.helpers import convert_experiment_configuration_to_container
from .. import algorithm
from .. import database
from .. import schema
from .. import stats
logger = logging.getLogger(__name__)
[docs]class BaseExecutor(object):
"""Executors runs the code given an execution block information
Parameters:
prefix (str): Establishes the prefix of your installation.
data (dict, str): The piece of data representing the block to be executed.
It must validate against the schema defined for execution blocks. If a
string is passed, it is supposed to be a fully qualified absolute path to
a JSON file containing the block execution information.
cache (:py:class:`str`, Optional): If your cache is not located under
``<prefix>/cache``, then specify a full path here. It will be used
instead.
dataformat_cache (:py:class:`dict`, Optional): A dictionary mapping
dataformat names to loaded dataformats. This parameter is optional and,
if passed, may greatly speed-up database loading times as dataformats
that are already loaded may be re-used. If you use this parameter, you
must guarantee that the cache is refreshed as appropriate in case the
underlying dataformats change.
database_cache (:py:class:`dict`, Optional): A dictionary mapping
database names to loaded databases. This parameter is optional and, if
passed, may greatly speed-up database loading times as databases that
are already loaded may be re-used. If you use this parameter, you must
guarantee that the cache is refreshed as appropriate in case the
underlying databases change.
algorithm_cache (:py:class:`dict`, Optional): A dictionary mapping
algorithm names to loaded algorithms. This parameter is optional and,
if passed, may greatly speed-up database loading times as algorithms
that are already loaded may be re-used. If you use this parameter, you
must guarantee that the cache is refreshed as appropriate in case the
underlying algorithms change.
library_cache (:py:class:`dict`, Optional): A dictionary mapping library
names to loaded libraries. This parameter is optional and, if passed,
may greatly speed-up library loading times as libraries that are
already loaded may be re-used. If you use this parameter, you must
guarantee that the cache is refreshed as appropriate in case the
underlying libraries change.
Attributes:
cache (str): The path to the cache currently being used
errors (list): A list containing errors found while loading this execution
block.
data (dict): The original data for this executor, as loaded by our JSON
decoder.
algorithm (beat.core.algorithm.Algorithm): An object representing the
algorithm to be run.
databases (dict): A dictionary in which keys are strings with database
names and values are :py:class:`.database.Database`, representing the
databases required for running this block. The dictionary may be empty
in case all inputs are taken from the file cache.
views (dict): A dictionary in which the keys are tuples pointing to the
``(<database-name>, <protocol>, <set>)`` and the value is a setup view
for that particular combination of details. The dictionary may be empty
in case all inputs are taken from the file cache.
input_list (beat.backend.python.inputs.InputList): A list of inputs that
will be served to the algorithm.
output_list (beat.backend.python.outputs.OutputList): A list of outputs
that the algorithm will produce.
data_sources (list): A list with all data-sources created by our execution
loader.
data_sinks (list): A list with all data-sinks created by our execution
loader. These are useful for clean-up actions in case of problems.
"""
def __init__(
self,
prefix,
data,
cache=None,
dataformat_cache=None,
database_cache=None,
algorithm_cache=None,
library_cache=None,
custom_root_folders=None,
):
# Initialisations
self.prefix = prefix
self.cache = cache or os.path.join(self.prefix, "cache")
self.algorithm = None
self.loop_algorithm = None
self.databases = {}
self.input_list = None
self.data_loaders = None
self.output_list = None
self.data_sinks = []
self.errors = []
self.data = data
self.debug = False
# Check that the cache path exists
if not os.path.exists(self.cache):
raise IOError("Cache path `%s' does not exist" % self.cache)
# Check the custom root folders
if custom_root_folders is not None:
if not isinstance(custom_root_folders, collections.abc.Mapping):
raise TypeError("The custom root folders must be in dictionary format")
else:
custom_root_folders = {}
# Temporary caches, if the user has not set them, for performance
database_cache = database_cache if database_cache is not None else {}
dataformat_cache = dataformat_cache if dataformat_cache is not None else {}
algorithm_cache = algorithm_cache if algorithm_cache is not None else {}
library_cache = library_cache if library_cache is not None else {}
# Basic validation of the data declaration, including JSON loading if required
if not isinstance(data, dict):
if not os.path.exists(data):
self.errors.append("File not found: %s" % data)
return
self.data, self.errors = schema.validate("execution", data)
if self.errors:
return
# Load the algorithm (using the algorithm cache if possible)
if self.data["algorithm"] in algorithm_cache:
self.algorithm = algorithm_cache[self.data["algorithm"]]
else:
self.algorithm = algorithm.Algorithm(
self.prefix, self.data["algorithm"], dataformat_cache, library_cache
)
algorithm_cache[self.algorithm.name] = self.algorithm
if not self.algorithm.valid:
self.errors += self.algorithm.errors
return
if "loop" in self.data:
loop = self.data["loop"]
if loop["algorithm"] in algorithm_cache:
self.loop_algorithm = algorithm_cache[loop["algorithm"]]
else:
self.loop_algorithm = algorithm.Algorithm(
self.prefix, loop["algorithm"], dataformat_cache, library_cache
)
algorithm_cache[self.loop_algorithm.name] = self.loop_algorithm
if len(loop["inputs"]) != len(self.loop_algorithm.input_map):
self.errors.append(
"The number of inputs of the loop algorithm doesn't correspond"
)
for name in self.data["inputs"].keys():
if name not in self.algorithm.input_map.keys():
self.errors.append(
"The input '%s' doesn't exist in the loop algorithm" % name
)
if len(loop["outputs"]) != len(self.loop_algorithm.output_map):
self.errors.append(
"The number of outputs of the loop algorithm doesn't correspond"
)
for name in self.data["outputs"].keys():
if name not in self.algorithm.output_map.keys():
self.errors.append(
"The output '%s' doesn't exist in the loop algorithm" % name
)
# Check that the mapping in coherent
if len(self.data["inputs"]) != len(self.algorithm.input_map):
self.errors.append(
"The number of inputs of the algorithm doesn't correspond"
)
if "outputs" in self.data and (
len(self.data["outputs"]) != len(self.algorithm.output_map)
):
self.errors.append(
"The number of outputs of the algorithm doesn't correspond"
)
for name in self.data["inputs"].keys():
if name not in self.algorithm.input_map.keys():
self.errors.append(
"The input '%s' doesn't exist in the algorithm" % name
)
if "outputs" in self.data:
for name in self.data["outputs"].keys():
if name not in self.algorithm.output_map.keys():
self.errors.append(
"The output '%s' doesn't exist in the algorithm" % name
)
if "loop" in self.data:
for name in ["request", "answer"]:
if name not in self.algorithm.loop_map.keys():
self.errors.append(
"The loop '%s' doesn't exist in the algorithm" % name
)
if self.errors:
return
# Load the databases (if any is required)
self._update_db_cache(
self.data["inputs"], custom_root_folders, database_cache, dataformat_cache
)
if "loop" in self.data:
self._update_db_cache(
self.data["loop"]["inputs"],
custom_root_folders,
database_cache,
dataformat_cache,
)
def __enter__(self):
"""Prepares inputs and outputs for the processing task
Raises:
IOError: in case something cannot be properly setup
"""
logger.info("Start the execution of '%s'", self.algorithm.name)
return self
def __exit__(self, exc_type, exc_value, traceback):
"""Closes all sinks and disconnects inputs and outputs"""
for sink in self.data_sinks:
# we save the output only if no valid error has been thrown
# n.b.: a system exit will raise SystemExit which is not an Exception
if not isinstance(exc_type, Exception):
sink.close()
self.input_list = None
self.data_loaders = []
self.output_list = None
self.data_sinks = []
def _update_db_cache(
self, inputs, custom_root_folders, database_cache, dataformat_cache
):
"""Update the database cache based on the input list given"""
for name, details in inputs.items():
if "database" in details:
if details["database"] not in self.databases:
if details["database"] in database_cache:
db = database_cache[details["database"]]
else:
db = database.Database(
self.prefix, details["database"], dataformat_cache
)
name = "database/%s" % db.name
if name in custom_root_folders:
db.data["root_folder"] = custom_root_folders[name]
database_cache[db.name] = db
self.databases[db.name] = db
if not db.valid:
self.errors += db.errors
def _prepare_inputs(self):
"""Prepares all input required by the execution."""
raise NotImplementedError()
def _prepare_outputs(self):
"""Prepares all output required by the execution."""
raise NotImplementedError()
[docs] def process(
self, virtual_memory_in_megabytes=0, max_cpu_percent=0, timeout_in_minutes=0
):
"""Executes the user algorithm code
Parameters:
virtual_memory_in_megabytes (:py:class:`int`, Optional): The amount
of virtual memory (in Megabytes) available for the job. If set to
zero, no limit will be applied.
max_cpu_percent (:py:class:`int`, Optional): The maximum amount of
CPU usage allowed in a system. This number must be an integer
number between 0 and ``100*number_of_cores`` in your system. For
instance, if your system has 2 cores, this number can go between 0
and 200. If it is <= 0, then we don't track CPU usage.
timeout_in_minutes (int): The number of minutes to wait for the user
process to execute. After this amount of time, the user process is
killed with ``signal.SIGKILL``. If set to zero, no timeout will be
applied.
Returns:
dict: A dictionary which is JSON formattable containing the summary of
this block execution.
"""
raise NotImplementedError()
@property
def valid(self):
"""A boolean that indicates if this executor is valid or not"""
return not bool(self.errors)
@property
def analysis(self):
"""A boolean that indicates if the current block is an analysis block"""
return "result" in self.data
@property
def outputs_exist(self):
"""Returns ``True`` if outputs this block is supposed to produce exists."""
if self.analysis:
path = os.path.join(self.cache, self.data["result"]["path"]) + "*"
if not glob.glob(path):
return False
else:
for name, details in self.data["outputs"].items():
path = os.path.join(self.cache, details["path"]) + "*"
if not glob.glob(path):
return False
# if you get to this point all outputs already exist
return True
@property
def io_statistics(self):
"""Summarize current I/O statistics looking at data sources and sinks, inputs and outputs
Returns:
dict: A dictionary summarizing current I/O statistics
"""
return stats.io_statistics(self.data, self.input_list, self.output_list)
def __str__(self):
return json.dumps(self.data, indent=4)
[docs] def write(self, path):
"""Writes contents to precise filesystem location"""
with open(path, "wt") as f:
f.write(str(self))
[docs] def dump_runner_configuration(self, directory):
"""Exports contents useful for a backend runner to run the algorithm"""
data = convert_experiment_configuration_to_container(self.data)
with open(os.path.join(directory, "configuration.json"), "wb") as f:
json_data = json.dumps(data, indent=2)
f.write(json_data.encode("utf-8"))
tmp_prefix = os.path.join(directory, "prefix")
if not os.path.exists(tmp_prefix):
os.makedirs(tmp_prefix)
self.algorithm.export(tmp_prefix)
if self.loop_algorithm:
self.loop_algorithm.export(tmp_prefix)
[docs] def dump_databases_provider_configuration(self, directory):
"""Exports contents useful for a backend runner to run the algorithm"""
with open(os.path.join(directory, "configuration.json"), "wb") as f:
json_data = json.dumps(self.data, indent=2)
f.write(json_data.encode("utf-8"))
tmp_prefix = os.path.join(directory, "prefix")
if not os.path.exists(tmp_prefix):
os.makedirs(tmp_prefix)
for db in self.databases.values():
db.export(tmp_prefix)