#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###################################################################################
# #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# Redistribution and use in source and binary forms, with or without #
# modification, are permitted provided that the following conditions are met: #
# #
# 1. Redistributions of source code must retain the above copyright notice, this #
# list of conditions and the following disclaimer. #
# #
# 2. Redistributions in binary form must reproduce the above copyright notice, #
# this list of conditions and the following disclaimer in the documentation #
# and/or other materials provided with the distribution. #
# #
# 3. Neither the name of the copyright holder nor the names of its contributors #
# may be used to endorse or promote products derived from this software without #
# specific prior written permission. #
# #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #
# #
###################################################################################
"""
=====
utils
=====
This module implements helper classes and functions.
"""
import collections
import os
import shutil
import numpy
import simplejson
import six
from . import hash
# ----------------------------------------------------------
[docs]def hashed_or_simple(prefix, what, path, suffix=".json"):
"""Returns a hashed path or simple path depending on where the resource is"""
username, right_bit = path.split("/", 1)
hashed_prefix = hash.toUserPath(username)
candidate = os.path.join(prefix, what, hashed_prefix, right_bit) + suffix
if os.path.exists(candidate):
return candidate
return os.path.join(prefix, what, path + suffix)
# ----------------------------------------------------------
[docs]def safe_rmfile(f):
"""Safely removes a file from the disk"""
if os.path.exists(f):
os.unlink(f)
# ----------------------------------------------------------
[docs]def safe_rmdir(f):
"""Safely removes the directory containg a given file from the disk"""
d = os.path.dirname(f)
if not os.path.exists(d):
return
if not os.listdir(d):
os.rmdir(d)
# ----------------------------------------------------------
[docs]def extension_for_language(language):
"""Returns the preferred extension for a given programming language
The set of languages supported must match those declared in our
``common.json`` schema.
Parameters:
language (str) The language for which you'd like to get the extension for.
Returns:
str: The extension for the given language, including a leading ``.`` (dot)
Raises:
KeyError: If the language is not defined in our internal dictionary.
"""
return dict(unknown="", cxx=".so", matlab=".m", python=".py", r=".r")[language]
# ----------------------------------------------------------
[docs]class Prefix(object):
def __init__(self, paths=None):
if isinstance(paths, list):
self.paths = paths
elif paths is not None:
self.paths = [paths]
else:
self.paths = []
[docs] def add(self, path):
self.paths.append(path)
[docs] def path(self, filename):
for p in self.paths:
fullpath = os.path.join(p, filename)
if os.path.exists(fullpath):
return fullpath
return os.path.join(self.paths[0], filename)
# ----------------------------------------------------------
[docs]class File(object):
"""User helper to read and write file objects"""
def __init__(self, path, binary=False):
self.path = path
self.binary = binary
[docs] def exists(self):
return os.path.exists(self.path)
[docs] def load(self):
mode = "rb" if self.binary else "rt"
with open(self.path, mode) as f:
return f.read()
[docs] def try_load(self):
if os.path.exists(self.path):
return self.load()
return None
[docs] def backup(self):
if not os.path.exists(self.path):
return # no point in backing-up
backup = self.path + "~"
if os.path.exists(backup):
os.remove(backup)
shutil.copy(self.path, backup)
[docs] def save(self, contents):
d = os.path.dirname(self.path)
if not os.path.exists(d):
os.makedirs(d)
if os.path.exists(self.path):
self.backup()
mode = "wb" if self.binary else "wt"
if self.binary:
mode = "wb"
if isinstance(contents, six.string_types):
contents = contents.encode("utf-8")
else:
mode = "wt"
if not isinstance(contents, six.string_types):
contents = contents.decode("utf-8")
with open(self.path, mode) as f:
f.write(contents)
[docs] def remove(self):
safe_rmfile(self.path)
safe_rmfile(self.path + "~") # backup
safe_rmdir(self.path) # remove containing directory
# ----------------------------------------------------------
[docs]class AbstractStorage(object):
asset_type = None
asset_folder = None
def __init__(self, path):
if not all(
[type(attr) == str for attr in [self.asset_type, self.asset_folder]]
):
raise TypeError(
"asset_type and asset_folder must be configured properly\n"
"asset_type: {}\n"
"asset_folder: {}".format(self.asset_type, self.asset_folder)
)
self.path = path
self.json = File(self.path + ".json")
self.doc = File(self.path + ".rst")
[docs] def exists(self):
"""If the database declaration file exists"""
return self.json.exists()
[docs] def remove(self):
"""Removes the object from the disk"""
self.json.remove()
self.doc.remove()
[docs] def hash(self):
"""The 64-character hash of the database declaration JSON"""
raise NotImplementedError
[docs] def load(self):
"""Loads the JSON declaration as a file"""
raise NotImplementedError
[docs] def save(self):
"""Saves the JSON declaration as files"""
raise NotImplementedError
[docs]class Storage(AbstractStorage):
"""Resolves paths for objects that provide only a description"""
def __init__(self, path):
super(Storage, self).__init__(path)
[docs] def hash(self, description="description"):
"""Re-imp"""
return hash.hashJSONFile(self.json.path, description)
[docs] def load(self):
"""Re-imp"""
tp = collections.namedtuple("Storage", ["declaration", "description"])
return tp(self.json.load(), self.doc.try_load())
[docs] def save(self, declaration, description=None):
"""Re-imp"""
if description:
self.doc.save(description.encode("utf8"))
if not isinstance(declaration, six.string_types):
declaration = simplejson.dumps(declaration, indent=4)
self.json.save(declaration)
# ----------------------------------------------------------
[docs]class CodeStorage(AbstractStorage):
"""Resolves paths for objects that provide a description and code
Parameters:
language (str): One of the valdid programming languages
"""
def __init__(self, path, language=None):
super(CodeStorage, self).__init__(path)
self._language = language or self.__auto_discover_language()
self.code = File(
self.path + extension_for_language(self._language), binary=True
)
def __auto_discover_language(self, json=None):
"""Discovers and sets the language from its own JSON descriptor"""
try:
text = json or self.json.load()
json = simplejson.loads(text)
return json["language"]
except (IOError, KeyError, simplejson.JSONDecodeError):
return "unknown"
@property
def language(self):
return self._language
@language.setter
def language(self, value):
self._language = value
self.code = File(
self.path + extension_for_language(self._language), binary=True
)
[docs] def hash(self):
"""Re-imp"""
declaration_hash = hash.hashJSONFile(self.json.path, "description")
if self.code.exists():
code_hash = hash.hashFileContents(self.code.path)
return hash.hash(dict(declaration=declaration_hash, code=code_hash))
else:
return declaration_hash
[docs] def exists(self):
"""Re-imp"""
return super(CodeStorage, self).exists() and self.code.exists()
[docs] def load(self):
"""Re-imp"""
tp = collections.namedtuple(
"CodeStorage", ["declaration", "code", "description"]
)
return tp(self.json.load(), self.code.try_load(), self.doc.try_load())
[docs] def save(self, declaration, code=None, description=None):
"""Re-imp"""
if description:
self.doc.save(description.encode("utf8"))
if not isinstance(declaration, six.string_types):
declaration = simplejson.dumps(declaration, indent=4)
self.json.save(declaration)
if code:
if self._language == "unknown":
self.language = self.__auto_discover_language(declaration)
self.code.save(code)
[docs] def remove(self):
"""Re-imp"""
super(CodeStorage, self).remove()
self.code.remove()
# ----------------------------------------------------------
[docs]class NumpyJSONEncoder(simplejson.JSONEncoder):
"""Encodes numpy arrays and scalars
See Also:
:py:class:`simplejson.JSONEncoder`
"""
[docs] def default(self, obj):
if isinstance(obj, numpy.ndarray) or isinstance(obj, numpy.generic):
return obj.tolist()
elif isinstance(obj, numpy.dtype):
if obj.name == "str":
return "string"
return obj.name
return simplejson.JSONEncoder.default(self, obj)
# ----------------------------------------------------------
[docs]def error_on_duplicate_key_hook(pairs):
"""JSON loader hook that will error out if several same keys are found
Returns an OrderedDict if everything goes well
"""
dct = collections.OrderedDict()
for key, value in pairs:
if key in dct:
raise RuntimeError(
"Invalid file content\n{} found several times".format(key)
)
dct[key] = value
return dct
# ----------------------------------------------------------
[docs]def has_argument(method, argument):
try:
from inspect import signature
sig = signature(method)
params = sig.parameters
except ImportError:
from inspect import getargspec
params = getargspec(method).args
return argument in params