Source code for beat.cmdline.common

#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###################################################################################
#                                                                                 #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/               #
# Contact: beat.support@idiap.ch                                                  #
#                                                                                 #
# Redistribution and use in source and binary forms, with or without              #
# modification, are permitted provided that the following conditions are met:     #
#                                                                                 #
# 1. Redistributions of source code must retain the above copyright notice, this  #
# list of conditions and the following disclaimer.                                #
#                                                                                 #
# 2. Redistributions in binary form must reproduce the above copyright notice,    #
# this list of conditions and the following disclaimer in the documentation       #
# and/or other materials provided with the distribution.                          #
#                                                                                 #
# 3. Neither the name of the copyright holder nor the names of its contributors   #
# may be used to endorse or promote products derived from this software without   #
# specific prior written permission.                                              #
#                                                                                 #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED   #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE          #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE    #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL      #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR      #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER      #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,   #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE   #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.            #
#                                                                                 #
###################################################################################


"""Utility functions that are useful to all sub-commands"""

import collections
import difflib
import fnmatch
import glob
import logging
import os

from enum import Enum
from enum import unique

import simplejson
import six
import termcolor

from beat.core import algorithm
from beat.core import database
from beat.core import dataformat
from beat.core import experiment
from beat.core import library
from beat.core import plotter
from beat.core import plotterparameter
from beat.core import protocoltemplate
from beat.core import toolchain

logger = logging.getLogger(__name__)

TYPE_GLOB = {
    "dataformat": os.path.join("*", "*", "*.json"),
    "database": os.path.join("*", "*.json"),
    "library": os.path.join("*", "*", "*.json"),
    "algorithm": os.path.join("*", "*", "*.json"),
    "plotter": os.path.join("*", "*", "*.json"),
    "plotterparameter": os.path.join("*", "*", "*.json"),
    "protocoltemplate": os.path.join("*", "*.json"),
    "toolchain": os.path.join("*", "*", "*.json"),
    "experiment": os.path.join("*", "*", "*", "*", "*.json"),
}


TYPE_FNMATCH = {
    "dataformat": os.path.splitext(TYPE_GLOB["dataformat"])[0],
    "database": os.path.splitext(TYPE_GLOB["database"])[0],
    "library": os.path.splitext(TYPE_GLOB["library"])[0],
    "algorithm": os.path.splitext(TYPE_GLOB["algorithm"])[0],
    "plotter": os.path.splitext(TYPE_GLOB["plotter"])[0],
    "plotterparameter": os.path.splitext(TYPE_GLOB["plotterparameter"])[0],
    "protocoltemplate": os.path.splitext(TYPE_GLOB["protocoltemplate"])[0],
    "toolchain": os.path.splitext(TYPE_GLOB["toolchain"])[0],
    "experiment": os.path.splitext(TYPE_GLOB["experiment"])[0],
}


TYPE_VALIDATOR = {
    "dataformat": dataformat.DataFormat,
    "database": database.Database,
    "library": library.Library,
    "algorithm": algorithm.Algorithm,
    "plotter": plotter.Plotter,
    "plotterparameter": plotterparameter.Plotterparameter,
    "protocoltemplate": protocoltemplate.ProtocolTemplate,
    "toolchain": toolchain.Toolchain,
    "experiment": experiment.Experiment,
}

TYPE_STORAGE = {
    "dataformat": dataformat.Storage,
    "database": database.Storage,
    "library": library.Storage,
    "algorithm": algorithm.Storage,
    "plotter": plotter.Storage,
    "plotterparameter": plotterparameter.Storage,
    "protocoltemplate": protocoltemplate.Storage,
    "toolchain": toolchain.Storage,
    "experiment": experiment.Storage,
}

TYPE_PLURAL = {
    "dataformat": "dataformats",
    "database": "databases",
    "library": "libraries",
    "algorithm": "algorithms",
    "plotter": "plotters",
    "plotterparameter": "plotters/plotterparameters",
    "defaultplotter": "plotters/defaultplotters",
    "toolchain": "toolchains",
    "experiment": "experiments",
    "protocoltemplate": "protocoltemplates",
}


[docs]@unique class ModificationStatus(Enum): """This enum describes the state of possible changes between a local asset and it's remote counter part""" NO_CHANGES = "" REMOTE_ONLY_AVAILABLE = "r" LOCAL_ONLY_AVAILABLE = "l" DOC_CHANGED = "d" CONTENT_CHANGED = "+" BOTH_CHANGED = "*"
[docs]def recursive_rmdir_if_empty(path, stop_at): """Recursively removes empty directories until a certain top directory""" if not os.path.exists(path): recursive_rmdir_if_empty(os.path.dirname(path), stop_at) return if os.path.samefile(path, stop_at): return # stop if not os.listdir(path): # empty logger.info("removing empty directory `%s'...", path) os.rmdir(path) recursive_rmdir_if_empty(os.path.dirname(path), stop_at) return
[docs]class Selector(object): """Keeps track of versions and fork status""" def __init__(self, prefix): self.prefix = prefix # the root of the directory self.path = os.path.join(self.prefix, ".beat", "selected.json") self.__version = {} self.__fork = {} self.__versionables = [ "algorithm", "dataformat", "database", "library", "toolchain", "plotter", "plotterparameter", "protocoltemplate", ] self.__forkables = [ "algorithm", "dataformat", "experiment", "library", "toolchain", "plotter", "plotterparameter", ] if os.path.exists(self.path): self.load() else: self.__ensure_entries() def __enter__(self): """Implements our context manager""" return self def __exit__(self, *exc): """Implements our context manager""" self.save() def __ensure_entries(self): """Ensure all types have an entry""" for asset_type in self.__versionables: if asset_type not in self.__version: self.__version[asset_type] = dict() for asset_type in self.__forkables: if asset_type not in self.__fork: self.__fork[asset_type] = dict()
[docs] def can_fork(self, asset_type): """Returns whether the given asset type can be forked""" return asset_type in self.__forkables
[docs] def has_versions(self, asset_type): """Returns whether the given asset type can have versions""" return asset_type in self.__versionables
[docs] def fork(self, asset_type, src, dst): """Registers that object ``dst`` is a fork of object ``src``""" if not self.can_fork(asset_type): raise RuntimeError("Can't create new version of {}".format(asset_type)) logger.info( "`%s/%s' is forked from `%s/%s'", TYPE_PLURAL[asset_type], dst, TYPE_PLURAL[asset_type], src, ) self.__fork[asset_type][dst] = src
[docs] def forked_from(self, asset_type, name): """Returns the name of the originating source object or ``None``""" if not self.can_fork(asset_type): return None return self.__fork[asset_type].get(name)
[docs] def version(self, asset_type, src, dst): """Registers that object ``dst`` is a new version of object ``src``""" if asset_type not in self.__versionables: raise RuntimeError("Can't create new version of {}".format(asset_type)) logger.info( "`%s/%s' is a new version of `%s/%s'", TYPE_PLURAL[asset_type], dst, TYPE_PLURAL[asset_type], src, ) self.__version[asset_type][dst] = src
[docs] def version_of(self, asset_type, name): """Returns the name of the originating version object or ``None``""" if asset_type not in self.__version: return None return self.__version[asset_type].get(name)
[docs] def delete(self, asset_type, name): """Forgets about an object that was being tracked""" if asset_type in self.__fork and name in self.__fork[asset_type]: del self.__fork[asset_type][name] if asset_type in self.__version and name in self.__version[asset_type]: del self.__version[asset_type][name]
[docs] def load(self): """Loads contents from file""" try: with open(self.path, "rt") as f: data = simplejson.load(f, object_pairs_hook=collections.OrderedDict) except simplejson.JSONDecodeError: logger.warning( "invalid state file at `%s' - removing and re-starting...", self.path ) from beat.core.utils import safe_rmfile safe_rmfile(self.path) return False self.__fork = data["fork"] self.__version = data["version"] self.__ensure_entries() return True
[docs] def save(self): """Saves contents to file""" dirname = os.path.dirname(self.path) if not os.path.exists(dirname): os.makedirs(dirname) data = {"fork": self.__fork, "version": self.__version} with open(self.path, "wt") as f: simplejson.dump(data, f, indent=2)
[docs]def retrieve_remote_list(webapi, asset_type, fields): """Utility function used by commands to retrieve a remote list of objects Parameters: webapi (object): An instance of our WebAPI class, prepared to access the BEAT server of interest asset_type (str): One of ``database``, ``dataformat``, ``algorithm``, ``toolchain`` or ``experiment``. fields (:py:class:`list`): A list of fields to retrieve from the remote server Returns: :py:class:`list`: A list of dictionaries containing the ``name``, ``short_description`` and ``hash`` of available remote objects. """ logger.debug("retrieving remote %s list...", TYPE_PLURAL[asset_type]) fields = "" if not fields else "?fields=%s" % ",".join(fields) url = "/api/v1/%s/%s" % (TYPE_PLURAL[asset_type], fields) return webapi.get(url)
[docs]def make_up_remote_list(webapi, asset_type, requirements): """Creates a list of downloadable objects from user requirements. This function can create a list of downloadable objects from user requirements. User requirements may point to valid object names (in which case these are returned unchanged) or partial object names, which are used to filter available remote resources. A list of fully resolved remote names respecting user restrictions is returned. Parameters: webapi (object): An instance of our WebAPI class, prepared to access the BEAT server of interest asset_type (str): One of ``database``, ``dataformat``, ``algorithm``, ``toolchain`` or ``experiment``. requirements (:py:class:`list`): A list of requirements that are used to filter (additively) the available (remote) objects. Returns: :py:class:`list`: A list of valid object names matching user requirements and its order. """ candidates = retrieve_remote_list(webapi, asset_type, ["name"]) if not requirements: # special case, return all possible values if candidates is None: return None return [c["name"] for c in candidates] # othewise, we need to separate filters from full-names full_requirements = fnmatch.filter(requirements, TYPE_FNMATCH[asset_type]) short_requirements = [k for k in requirements if k not in full_requirements] retval = [] if short_requirements: if candidates is None: return None retval = set() for name in short_requirements: retval |= set([k["name"] for k in candidates if k["name"].find(name) != -1]) retval = list(retval) logger.info("search strings matched %d remote object(s)", len(retval)) # note: if you specify a full-length requirement, we don't really care if it # is there or not. The final command will decide if it is an error. return retval + full_requirements
[docs]def display_remote_list(webapi, asset_type): """Implements a generic "list --remote" command Parameters: webapi (object): An instance of our WebAPI class, prepared to access the BEAT server of interest, on behalf of a pre-configured user. asset_type (str): One of ``database``, ``dataformat``, ``algorithm``, ``toolchain`` or ``experiment``. Returns: int: Indicating the exit status of the command, to be reported back to the calling process. This value should be zero if everything works OK, otherwise, different than zero (POSIX compliance). """ remote_list = retrieve_remote_list( webapi, asset_type, ["name", "short_description"] ) if remote_list is None: return 1 for item in remote_list: logger.info("%s", item["name"]) if item["short_description"]: logger.extra(2 * " " + item["short_description"]) if len(remote_list) != 1: logger.extra("%d %s found", len(remote_list), TYPE_PLURAL[asset_type]) else: logger.extra("1 %s found" % asset_type) return 0
[docs]def make_up_local_list(prefix, asset_type, requirements): """Creates a list of uploadable objects from user requirements. This function can create a list of uploadable objects from user requirements. User requirements may point to valid object names (in which case these are returned unchanged) or partial object names, which are used to filter available local resources. A list of fully resolved local names respecting user restrictions is returned. Parameters: prefix (str): A string representing the root of the path in which the user objects are stored asset_type (str): One of ``database``, ``dataformat``, ``algorithm``, ``toolchain`` or ``experiment``. requirements (:py:class:`list`): A list of requirements that are used to filter (additively) the available (remote) objects. Returns: :py:class:`list`: A list of strings, each with the relative name of an object belonging to a certain category and in the order prescribed by the user. """ root = os.path.join(prefix, TYPE_PLURAL[asset_type]) asset_path_list = glob.glob(os.path.join(root, TYPE_GLOB[asset_type])) candidates = [ os.path.splitext(os.path.relpath(path, root))[0] for path in asset_path_list ] # adds hashed path structures hashed_path_list = glob.glob(os.path.join(root, "*", "*", TYPE_GLOB[asset_type])) hashed_path_list = [ os.path.splitext(os.path.relpath(path, root))[0] for path in hashed_path_list ] candidates += [os.path.join(*path.split(os.sep)[2:]) for path in hashed_path_list] if not requirements: return candidates use_requirements = [] for k in requirements: # remove leading plural-name if k.startswith(TYPE_PLURAL[asset_type] + os.sep): use_requirements.append(k.replace(TYPE_PLURAL[asset_type] + os.sep, "")) else: use_requirements.append(k) requirements = use_requirements full_requirements = fnmatch.filter(requirements, TYPE_FNMATCH[asset_type]) short_requirements = [k for k in requirements if k not in full_requirements] retval = set() for name in short_requirements: retval |= set([k for k in candidates if k.startswith(name)]) # note: if you specify a full-length requirement, we don't really care if it # is there or not. The final command will decide if it is an error. return list(retval) + full_requirements
[docs]def display_local_list(prefix, asset_type): """Implements the local "list" command Parameters: prefix (str): A string representing the root of the path in which the user objects are stored asset_type (str): One of ``database``, ``dataformat``, ``algorithm``, ``toolchain`` or ``experiment``. Returns: int: Indicating the exit status of the command, to be reported back to the calling process. This value should be zero if everything works OK, otherwise, different than zero (POSIX compliance). """ names = make_up_local_list(prefix, asset_type, []) for name in names: logger.info("%s", name) try: storage = TYPE_STORAGE[asset_type](prefix, name) contents = simplejson.loads( storage.json.load(), object_pairs_hook=collections.OrderedDict ) if "description" in contents: logger.extra(2 * " " + contents["description"]) except simplejson.JSONDecodeError: logger.warning(2 * " " + "(!) invalid JSON file") if len(names) != 1: logger.extra("%d %s found", len(names), TYPE_PLURAL[asset_type]) else: logger.extra("1 %s found" % asset_type) return 0
[docs]def display_local_path(prefix, asset_type, names): """Implements the local "path" command Parameters: prefix (str): A string representing the root of the path in which the user objects are stored asset_type (str): One of ``database``, ``dataformat``, ``algorithm``, ``toolchain`` or ``experiment``. Returns: int: Indicating the exit status of the command, to be reported back to the calling process. This value should be zero if everything works OK, otherwise, different than zero (POSIX compliance). """ selected_type = None try: selected_type = TYPE_PLURAL[asset_type] except IndexError: logger.error("Selected type is not valid: %s", asset_type) return 1 for name in names: root = os.path.join(prefix, selected_type) object_path = os.path.join(root, name.rsplit("/", 1)[0]) object_files = [ filename for filename in os.listdir(object_path) if filename.startswith(name.rsplit("/", 1)[1]) ] if len(object_files) > 0: logger.info( "Available local file(s) for type '%s' and name '%s':", selected_type, name, ) for filename in object_files: full_name = os.path.join(object_path, filename) logger.info(full_name) else: logger.info( "No local file(s) found for type '%s' and name '%s':", selected_type, name, ) return 0
[docs]def edit_local_file(prefix, editor, asset_type, name): """Implements the local "path" command Parameters: prefix (str): A string representing the root of the path in which the user objects are stored asset_type (str): One of ``database``, ``dataformat``, ``algorithm``, ``toolchain`` or ``experiment``. Returns: int: Indicating the exit status of the command, to be reported back to the calling process. This value should be zero if everything works OK, otherwise, different than zero (POSIX compliance). """ selected_type = None try: selected_type = TYPE_PLURAL[asset_type] except IndexError: logger.error("Selected type is not valid: %s", asset_type) return 1 python_objects = ["database", "library", "algorithm", "plotter"] json_objects = [ "dataformat", "toolchain", "experiment", "plotterparameter", "protocoltemplate", ] ext = None if asset_type in python_objects: ext = ".py" elif asset_type in json_objects: ext = ".json" else: logger.error("Selected type is not valid: %s", asset_type) root = os.path.join(prefix, selected_type) object_path = os.path.join(root, name + ext) if os.path.isfile(object_path): # check if editor set if editor is None: if "VISUAL" in os.environ and len(os.environ["VISUAL"]) > 0: editor = os.environ["VISUAL"] elif "EDITOR" in os.environ and len(os.environ["EDITOR"]) > 0: editor = os.environ["EDITOR"] else: logger.error("No default editor set in your environment variable") return 1 logger.info("Editing object of type '%s' and name '%s'", selected_type, name) cmd = "%s %s" % (editor, object_path) os.system(cmd) # nosec else: logger.error("Not a valid file: %s", object_path) return 1 return 0
[docs]def make_webapi(config): """Instantiates an usable web-api proxy using the command-line configuration Parameters: config (object): The command-line configuration object, from which this function will extract the ``platform``, ``user`` and ``token`` parameters. Returns WebAPI: A valid web-api proxy instance """ from .webapi import WebAPI return WebAPI(config.platform, config.user, config.token)
[docs]def check_one(prefix, asset_type, name): """Implements object validation for a single, well-defined object Parameters: prefix (str): A string representing the root of the path in which the user objects are stored asset_type (str): One of ``database``, ``dataformat``, ``algorithm``, ``toolchain`` or ``experiment``. name (str): The name of the object, representing the unique relative path of the objects to check (e.g. ``user/integer/1``) klass (type): A python class that validates the object. It must accept the object """ o = TYPE_VALIDATOR[asset_type](prefix, name) if not o.valid: logger.info("%s/%s [invalid]", TYPE_PLURAL[asset_type], name) for e in o.errors: logger.warning(" * %s", e) return 1 else: logger.info("%s/%s [ok]", TYPE_PLURAL[asset_type], name) return 0
[docs]def check(prefix, asset_type, names): """Implements object validation Parameters: prefix (str): A string representing the root of the path in which the user objects are stored asset_type (str): One of ``database``, ``dataformat``, ``algorithm``, ``toolchain`` or ``experiment``. names (:py:class:`list`): A list of strings, each representing the unique relative path of the objects to check. If the list is empty, then we check all available objects of a given type. Returns: int: Indicating the exit status of the command, to be reported back to the calling process. This value should be zero if everything works OK, otherwise, different than zero (POSIX compliance). """ names = make_up_local_list(prefix, asset_type, names) return sum([check_one(prefix, asset_type, name) for name in names])
[docs]def fetch_object(webapi, asset_type, name, fields): """Retrieves a single well-known object from the server Parameters: webapi (object): An instance of our WebAPI class, prepared to access the BEAT server of interest asset_type (str): One of ``database``, ``dataformat``, ``algorithm``, ``toolchain`` or ``experiment``. name (str): A string defining the name of the object to retrieve fields (:py:class:`list`): A list of fields to retrieve from the remote server Returns: dict: A dictionary containing the object contents """ fields = "?object_format=string&fields=%s" % ",".join(fields) if name is not None: url = "/api/v1/%s/%s/%s" % (TYPE_PLURAL[asset_type], name, fields) else: url = "/api/v1/%s/%s" % (TYPE_PLURAL[asset_type], fields) return webapi.get(url)
[docs]def pull(webapi, prefix, asset_type, names, fields, force, indentation): """Copies objects from the server to the local prefix Parameters: webapi (object): An instance of our WebAPI class, prepared to access the BEAT server of interest prefix (str): A string representing the root of the path in which the user objects are stored asset_type (str): One of ``database``, ``dataformat``, ``algorithm``, ``toolchain`` or ``experiment``. names (:py:class:`list`): A list of strings, each representing the unique relative path of the objects to retrieve or a list of usernames from which to retrieve objects. If the list is empty, then we pull all available objects of a given type. If no user is set, then pull all public objects of a given type. fields (:py:class:`list`): A list of strings, each defining one field that **must** be downloaded from the web-server for a given object of the current type and passed, unchanged to the storage ``save()`` method. For example, for toolchains, this value shall be ``['declaration']``. For algorithms, it shall be ``['declaration', 'code']``. force (bool): If set to ``True``, then overwrites local changes with the remotely retrieved copies. indentation (int): The indentation level, useful if this function is called recursively while downloading different object types. This is normally set to ``0`` (zero). Returns: int: Indicating the exit status of the command, to be reported back to the calling process. This value should be zero if everything works OK, otherwise, different than zero (POSIX compliance). :py:class:`list`: A list of strings containing the names of objects successfuly downloaded or which were already present on the current installation (if the user has chosen not to ``--force`` the override), in the order of their download. """ names = make_up_remote_list(webapi, asset_type, names) if not names: return 1, [] indent = indentation * " " available = set() status = 0 for name in names: storage = TYPE_STORAGE[asset_type](prefix, name) if storage.exists() and not force: # exists locally, force not set logger.extra( "%sskipping download of `%s/%s' (exists locally)", indent, TYPE_PLURAL[asset_type], name, ) available.add(name) continue else: logger.info( "%sretrieving `%s/%s'...", indent, TYPE_PLURAL[asset_type], name ) data = fetch_object(webapi, asset_type, name, fields) if data is None: status += 1 # error continue if asset_type == "plotterparameter": declaration = { "description": data["short_description"], "plotter": data["plotter"], "data": data["data"], } storage.save(declaration) else: if asset_type == "algorithm" and storage.language == "cxx": try: file_data = webapi.download( "/api/v1/%s/%s/file/" % ( TYPE_PLURAL[asset_type], storage.fullname, ) ) data["code"] = file_data except Exception as e: logger.error(e) status += 1 continue storage.save(**data) available.add(name) return status, list(available)
[docs]def diff(webapi, prefix, asset_type, name, fields): """Shows the differences between two objects, for each of the fields Parameters: webapi (object): An instance of our WebAPI class, prepared to access the BEAT server of interest prefix (str): A string representing the root of the path in which the user objects are stored asset_type (str): One of ``database``, ``dataformat``, ``algorithm``, ``toolchain`` or ``experiment``. name (str): A string defining the name of the object to calculate differences from. fields (:py:class:`list`): A list of strings, each defining one field that **must** be downloaded from the web-server for a given object of the current type. For example, for toolchains, this value shall be ``['declaration']``. For algorithms, it shall be ``['declaration', 'code']``. Returns: int: Indicating the exit status of the command, to be reported back to the calling process. This value should be zero if everything works OK, otherwise, different than zero (POSIX compliance). """ extension = {"code": ".py", "declaration": ".json", "description": ".rst"} def _eval_diff(remote, local, ext): """Calculates differences between two string buffers""" if not isinstance(local, six.string_types): if isinstance(local, dict): local = simplejson.dumps(local) else: local = local.decode("utf-8") if not isinstance(remote, six.string_types): if isinstance(remote, dict): remote = simplejson.dumps(remote) else: remote = remote.decode("utf-8") return difflib.unified_diff( remote.split("\n"), local.split("\n"), os.path.join("remote", asset_type, name + ext), os.path.join("local", asset_type, name + ext), ) def _show_diff(diffs): """Displays difference display between two string buffers""" for line in diffs: if line.startswith("+"): termcolor.cprint(line, "green") elif line.startswith("-"): termcolor.cprint(line, "red") else: print(line) storage = TYPE_STORAGE[asset_type](prefix, name) local = storage.load() # may also return a tuple, depending on the type remote = fetch_object(webapi, asset_type, name, fields) if remote is None: return 1 if "declaration" in remote and not isinstance( remote["declaration"], six.string_types ): remote["declaration"] = simplejson.dumps(remote["declaration"], indent=4) local = dict(zip(fields, local)) # ``local`` should have the same size # replaces None entries with an empty string so these are comparable for key in local: local[key] = local[key] if local[key] is not None else "" for field in fields: diffs = _eval_diff(remote[field], local[field], extension.get(field, "")) if diffs: logger.info( "differences for `%s' of `%s/%s':", field, TYPE_PLURAL[asset_type], name ) _show_diff(diffs) else: logger.info( "no differences for `%s' of `%s/%s'", field, TYPE_PLURAL[asset_type], name, ) return 0
[docs]def create(prefix, asset_type, names): """Creates an empty object of a certain type under the given name Parameters: prefix (str): A string representing the root of the path in which the user objects are stored asset_type (str): One of ``database``, ``dataformat``, ``algorithm``, ``toolchain`` or ``experiment``. names (str): A string defining the names of the objects to create. Returns: int: Indicating the exit status of the command, to be reported back to the calling process. This value should be zero if everything works OK, otherwise, different than zero (POSIX compliance). """ status = 0 for name in names: storage = TYPE_STORAGE[asset_type](prefix, name) if storage.exists(): logger.error( "`%s/%s' already exists - will *not* overwrite", TYPE_PLURAL[asset_type], name, ) status += 1 obj = TYPE_VALIDATOR[asset_type](prefix, data=None) # the default object storage = TYPE_STORAGE[asset_type](prefix, name) obj.write(storage) return status
[docs]def copy(prefix, asset_type, src, dst): """Creates a new object by copying another object of the same type. Parameters: prefix (str): A string representing the root of the path in which the user objects are stored asset_type (str): One of ``database``, ``dataformat``, ``algorithm``, ``toolchain`` or ``experiment``. src (str): A string defining the name of the object to fork a new version from. dst (str): A string defining the name of the object to fork to. Returns: int: Indicating the exit status of the command, to be reported back to the calling process. This value should be zero if everything works OK, otherwise, different than zero (POSIX compliance). """ src_storage = TYPE_STORAGE[asset_type](prefix, src) if not src_storage.exists(): logger.error("source `%s/%s' does not exist", TYPE_PLURAL[asset_type], src) return 1 dst_storage = TYPE_STORAGE[asset_type](prefix, dst) if dst_storage.exists(): logger.error("destination `%s/%s' already exists", TYPE_PLURAL[asset_type], dst) return 1 dst_storage.save(*src_storage.load()) return 0
[docs]def new_version(prefix, asset_type, src): """Creates a new object by copying another object of the same type. Parameters: prefix (str): A string representing the root of the path in which the user objects are stored asset_type (str): One of ``database``, ``dataformat``, ``algorithm``, ``toolchain`` or ``experiment``. src (str): A string defining the name of the object to fork a new version from. Returns: int: Indicating the exit status of the command, to be reported back to the calling process. This value should be zero if everything works OK, otherwise, different than zero (POSIX compliance). """ with Selector(prefix) as selector: src_storage = TYPE_STORAGE[asset_type](prefix, src) dst = os.sep.join(src.split(os.sep)[:-1] + [""]) dst += str(int(src_storage.version) + 1) dst_storage = TYPE_STORAGE[asset_type](prefix, dst) if dst_storage.exists(): logger.info( "A representation for %s `%s' already exists - not " "overwriting", asset_type, dst, ) else: status = copy(prefix, asset_type, src, dst) if status != 0: return status # error selector.version(asset_type, src, dst) return 0
[docs]def fork(prefix, asset_type, src, dst): """Creates a new object by forking another object of the same type. Parameters: prefix (str): A string representing the root of the path in which the user objects are stored asset_type (str): One of ``database``, ``dataformat``, ``algorithm``, ``toolchain`` or ``experiment``. src (str): A string defining the name of the object to fork from. dst (str): A string defining the name of the object to fork to. Returns: int: Indicating the exit status of the command, to be reported back to the calling process. This value should be zero if everything works OK, otherwise, different than zero (POSIX compliance). """ with Selector(prefix) as selector: dst_storage = TYPE_STORAGE[asset_type](prefix, dst) if dst_storage.exists(): logger.info( "A representation for %s `%s' already exists - not " "overwriting", asset_type, dst, ) else: status = copy(prefix, asset_type, src, dst) if status != 0: return status # error selector.fork(asset_type, src, dst) # mark forking status return 0
[docs]def delete_local(prefix, asset_type, names): """Deletes a local object of a given type Parameters: prefix (str): A string representing the root of the path in which the user objects are stored asset_type (str): One of ``database``, ``dataformat``, ``algorithm``, ``toolchain`` or ``experiment``. names (:py:class:`list`): A list of strings, each representing the unique relative path of the objects to retrieve or a list of usernames from which to delete. Returns: int: Indicating the exit status of the command, to be reported back to the calling process. This value should be zero if everything works OK, otherwise, different than zero (POSIX compliance). """ status = 0 for name in names: storage = TYPE_STORAGE[asset_type](prefix, name) if not storage.exists(): logger.error("`%s/%s' does not exist", TYPE_PLURAL[asset_type], name) status += 1 with Selector(prefix) as selector: selector.delete(asset_type, name) # unset forking status and others storage.remove() return status
[docs]def delete_remote(webapi, asset_type, names): """Deletes a remote object of a given type Parameters: webapi (object): An instance of our WebAPI class, prepared to access the BEAT server of interest asset_type (str): One of ``database``, ``dataformat``, ``algorithm``, ``toolchain`` or ``experiment``. names (:py:class:`list`): A list of strings, each representing the unique relative path of the objects to retrieve or a list of usernames from which to delete. Returns: int: Indicating the exit status of the command, to be reported back to the calling process. This value should be zero if everything works OK, otherwise, different than zero (POSIX compliance). """ status = 0 for name in names: logger.debug("deleting %s/%s...", TYPE_PLURAL[asset_type], name) url = "/api/v1/%s/%s/" % (TYPE_PLURAL[asset_type], name) try: webapi.delete(url) except RuntimeError as e: logger.error(e) status += 1 return status
[docs]def status(webapi, prefix, asset_type): """Flags objects which have changed Parameters: webapi (object): An instance of our WebAPI class, prepared to access the BEAT server of interest prefix (str): A string representing the root of the path in which the user objects are stored asset_type (str): One of ``database``, ``dataformat``, ``algorithm``, ``toolchain`` or ``experiment``. Returns: int: Indicating the exit status of the command, to be reported back to the calling process. This value should be zero if everything works OK, otherwise, different than zero (POSIX compliance). :py:class:`list`: A list of objects that have local modifications and should be pushed remotely, eventually. """ remote = retrieve_remote_list( webapi, asset_type, ["name", "hash", "description", "creation_date"] ) if remote is None: return 1, [] remote = collections.OrderedDict([(k["name"], k) for k in remote]) local = make_up_local_list(prefix, asset_type, []) retval = collections.OrderedDict() logger.extra( "legend: [+] definition; [d] docs; [*] both; [l] only local; [r] only remote" ) for key in remote: if key in local: storage = TYPE_STORAGE[asset_type](prefix, key) contents = storage.hash() != remote[key]["hash"] local_doc = storage.doc.load() if storage.doc.exists() else "" doc = local_doc != remote[key]["description"] if contents: if doc: symbol = ( ModificationStatus.BOTH_CHANGED ) # both contents and doc changed else: symbol = ModificationStatus.CONTENT_CHANGED # only contents changed else: if doc: symbol = ModificationStatus.DOC_CHANGED # only docs changed else: symbol = ModificationStatus.NO_CHANGES # no changes if symbol != ModificationStatus.NO_CHANGES: logger.info( "[%s] %s/%s (@%s)", symbol, TYPE_PLURAL[asset_type], key, remote[key]["creation_date"], ) else: logger.extra(" %s/%s", TYPE_PLURAL[asset_type], key) retval[key] = symbol # whatever is not listed remotely for key in set(local) - set(remote.keys()): logger.info("[l] %s/%s", TYPE_PLURAL[asset_type], key) retval[key] = ModificationStatus.LOCAL_ONLY_AVAILABLE # whatever is not listed locally for key in set(remote.keys()) - set(local): logger.extra( "[r] %s/%s (@%s)", TYPE_PLURAL[asset_type], key, remote[key]["creation_date"], ) retval[key] = ModificationStatus.REMOTE_ONLY_AVAILABLE return 0, retval
[docs]def push( webapi, prefix, asset_type, names, fields, mappings, force, dry_run, indentation ): """Copies objects to the server from the local prefix Parameters: webapi (object): An instance of our WebAPI class, prepared to access the BEAT server of interest prefix (str): A string representing the root of the path in which the user objects are stored asset_type (str): One of ``database``, ``dataformat``, ``algorithm``, ``toolchain`` or ``experiment``. names (:py:class:`list`): A list of strings, each representing the unique relative path of the objects to push or a filtering criteria for local objects. If the list is empty, then we push all available objects of a given type, which have changes. If no user is set, then an error is raised. fields (:py:class:`list`): A list of strings, each defining one field that **must** be uploaded to the web-server for a given object of the current type. For example, for toolchains, this value shall be ``['declaration', 'description']``. For algorithms, it shall be ``['declaration', 'code', 'description']``. mappings (dict): A dictionary containing mappings from the stock field names to equivalents which are expected by our web interface. This field is required by experiments only. force (bool): If set to ``True``, then push local changes even if no change is detected on the local copy. dry_run (bool): If set to ``True``, then only prints what it would do instead of doing it. indentation (int): The indentation level, useful if this function is called recursively while downloading different object types. This is normally set to ``0`` (zero). Returns: int: Indicating the exit status of the command, to be reported back to the calling process. This value should be zero if everything works OK, otherwise, different than zero (POSIX compliance). :py:class:`list`: A list of strings containing the names of objects successfuly uploaded or which were already present on the remote server (if the user has chosen not to ``--force`` the override), in the order of their upload. """ if webapi.is_anonymous(): logger.error("cannot anonymously push objects, set your access token") return 1 if webapi.user is None: logger.error("Invalid user, stopping.") return 1 retval, candidates = status(webapi, prefix, asset_type) if retval: return retval if names: # apply filtering conditions full_requirements = fnmatch.filter(names, TYPE_FNMATCH[asset_type]) short_requirements = [k for k in names if k not in full_requirements] # check full-length requirements filtered = collections.OrderedDict() for k in full_requirements: if k not in candidates: logger.error( "%s/%s is not available locally", TYPE_PLURAL[asset_type], k ) return 1 filtered[k] = candidates[k] # check short requirements for k in short_requirements: for c in candidates: if c.startswith(k): filtered[c] = candidates[c] candidates = filtered if not force: # filter again, if no local changes candidates = collections.OrderedDict( [ (asset_name, modification_status) for asset_name, modification_status in candidates.items() if modification_status != ModificationStatus.NO_CHANGES ] ) if not candidates: logger.warning("No new candidates for upload were found") return 0 # flush all final candidates for asset_name, modification_status in candidates.items(): if modification_status == ModificationStatus.REMOTE_ONLY_AVAILABLE: # don't push what is available unmodified remotely continue splits = asset_name.split("/") if splits[0] != webapi.user: logger.error( "Can't push asset {} not owned by user {}".format( asset_name, webapi.user ) ) return 1 logger.info( "pushing %s/%s [%s]", TYPE_PLURAL[asset_type], asset_name, modification_status, ) storage = TYPE_STORAGE[asset_type](prefix, asset_name) data = storage.load() # returns a named tuple message = {} for f in fields: if hasattr(data, f): message[f] = getattr(data, f) elif hasattr(storage, f): message[f] = getattr(storage, f) if "description" in message and not message["description"]: del message["description"] # apply message mappings for key, value in mappings.items(): message[value] = message[key] del message[key] # fills-in fork status with Selector(prefix) as selector: fork = selector.forked_from(asset_type, asset_name) if fork: message["fork_of"] = fork version = selector.version_of(asset_type, asset_name) if version: message["previous_version"] = version if dry_run: continue # don't send the data url = "/api/v1/%s/%s/" % (TYPE_PLURAL[asset_type], webapi.user) # C++ algorithms must be sent in two steps (see below) if asset_type == "algorithm" and storage.language == "cxx": message.pop("code") # sends the data if ( modification_status == ModificationStatus.LOCAL_ONLY_AVAILABLE ): # POST (new object) message["name"] = storage.name version = getattr(storage, "version") if version is not None: message["version"] = version if asset_type == "experiment": message["toolchain"] = "/".join(splits[1:-1]) try: webapi.post(url, data=message) except RuntimeError as e: logger.error(e) return 1 else: logger.info("Successfuly created `%s'", asset_name) else: # PUT (update) url += "/".join(splits[1:]) + "/" if modification_status == ModificationStatus.DOC_CHANGED: data = {"description": message["description"]} else: data = message try: webapi.put(url, data=data) except RuntimeError as e: logger.error(e) return 1 else: logger.info("Successfuly updated `%s'", asset_name) # C++ algorithms must be sent in two steps, we send the binary file now if asset_type == "algorithm" and storage.language == "cxx": try: with open(storage.code.path, "rb") as f: webapi.upload( "/api/v1/%s/%s/%s/%s/file/" % ( TYPE_PLURAL[asset_type], webapi.user, storage.name, storage.version, ), {"binary": ("%s.so" % storage.version, f)}, ) except RuntimeError as e: logger.error(e) return 1 else: logger.info("Successfuly uploaded binary blob `%s'", asset_name) return 0
[docs]def dot_diagram(prefix, asset_type, names, path, formats): """Dumps DOT (Graphviz) diagrams of the given toolchains/experiments Parameters: prefix (str): A string representing the root of the path in which the user objects are stored asset_type (str): One of ``database``, ``dataformat``, ``algorithm``, ``toolchain`` or ``experiment``. names (:py:class:`list`): A list of strings, each representing the unique relative path of the objects to push or a filtering criteria for local objects. If the list is empty, then we push all available objects of a given type, which have changes. If no user is set, then an error is raised. path (str): The directory to use for dumping the drawings. The filenames assigned correspond to the full object name. If not set, the default is to write on the current directory. formats (:py:class:`list`): A list of formats to dump. If not set or set to an empty value, then dump dot/graphviz and png formats. Returns: int: Indicating the exit status of the command, to be reported back to the calling process. This value should be zero if everything works OK, otherwise, different than zero (POSIX compliance). """ if not formats: formats = ["dot", "png"] path = path or os.curdir if not os.path.exists(path): logger.info("creating directory `%s'...", path) os.makedirs(path) names = make_up_local_list(prefix, asset_type, names) for name in names: obj = TYPE_VALIDATOR[asset_type](prefix, name) if not hasattr(obj, "dot_diagram"): logger.error( "%s do not support DOT/Graphviz diagram output", TYPE_PLURAL[asset_type] ) return 1 if not obj.valid: logger.warning( "%s/%s is not valid - skipping it", TYPE_PLURAL[asset_type], name ) continue try: diagram = obj.dot_diagram() except Exception as e: logger.warning( "%s/%s cannot be drawn: %s - skipping it", TYPE_PLURAL[asset_type], name, e, ) continue destfile = os.path.join(path, "%s" % TYPE_PLURAL[asset_type], obj.name) for k in formats: if k == "dot": logger.info( "%s/%s -> %s.%s", TYPE_PLURAL[asset_type], name, destfile, k ) diagram.save(destfile + ".dot") else: logger.info( "%s/%s -> %s.%s", TYPE_PLURAL[asset_type], name, destfile, k ) diagram.format = k diagram.render(destfile, cleanup=True) return 0
[docs]def stringify(value): """Creates a string representation of a baseformat represented as a dict To avoid overloading the terminal that will print the result of this function, only the first ten elements of lists are processed with visual cues that will show that there's more to it. The input variable ``value`` is modified in-place. Parameters: value (dict): A dictionary representing the baseformat object, obtained as with :py:meth:`beat.backend.python.baseformat.baseformat.as_dict`, that represents the object one seeks to represent in string format. Returns dict: Modifies the input variable ``value`` in-place, but also returns it. """ if isinstance(value, list): if len(value) > 10: if isinstance(value[0], dict) or isinstance(value[0], list): return ( map(stringify, value[0:4]) + ["..."] + map(stringify, value[-4:]) + ["BEAT_LIST_SIZE(%d elements)BEAT_LIST_SIZE" % len(value)] ) reduced = "BEAT_LIST_DELIMITER[" for v in value[0:4]: reduced += str(v) + ", " reduced += "..." for v in value[-4:]: reduced += ", " + str(v) reduced += " (%d elements)]BEAT_LIST_DELIMITER" % len(value) return reduced elif (len(value) > 0) and ( isinstance(value[0], list) or isinstance(value[0], dict) ): return list(map(stringify, value)) elif isinstance(value, dict): for name, value2 in value.items(): value[name] = stringify(value2) return value