#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
from django.db import models
from django.contrib.auth.models import User
from django.core.urlresolvers import reverse
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.core.mail import send_mail
from django.template.loader import render_to_string
from django.contrib.sites.models import Site
import beat.core.hash
import beat.core.experiment
from ...algorithms.models import Algorithm
from ...toolchains.models import Toolchain
from ...common.models import Shareable
from ...common.models import ContributionManager
from ...common.models import get_contribution_declaration_filename
from ...common.models import get_contribution_description_filename
from ...common.models import get_description
from ...common.models import set_description
from ...common.models import get_declaration
from ...common.models import set_declaration
from ...common import storage
from ...common.exceptions import ShareError
from ...common.texts import Messages
from ...common.storage import OverwriteStorage
from ...backend.models import Queue
from ...backend.models import Environment
from ...databases.models import DatabaseSet
from ...databases.models import DatabaseSetOutput
from ... import __version__
from .block import Block
from .block_input import BlockInput
from .cached_file import CachedFile
from datetime import datetime
import os
import simplejson
import logging
logger = logging.getLogger(__name__)
# ----------------------------------------------------------
[docs]def validate_environments(experiment, user=None):
"""Validates the environments throughout the experiment"""
def _valid(environment):
q = (
Environment.objects.for_user(user, True)
if user is not None
else Environment.objects
)
return bool(q.filter(name=environment["name"], version=environment["version"]))
def _valid_combination(queue, environment):
return bool(
Queue.objects.filter(
name=queue,
environments__name=environment["name"],
environments__version=environment["version"],
)
)
errors = []
default_q = experiment.data["globals"]["queue"]
default_env = experiment.data["globals"]["environment"]
if not _valid(default_env):
errors.append(
"The environment '%s (%s)' in the global experiment declaration does not exist"
% (default_env["name"], default_env["version"])
)
elif not _valid_combination(default_q, default_env):
errors.append(
"The combination of queue '%s' with environment '%s (%s)' in the global experiment declaration does not exist"
% (default_q, default_env["name"], default_env["version"])
)
for name, config in experiment.blocks.items():
q = config.get("queue", default_q)
env = config.get("environment", default_env)
if not _valid(env):
errors.append(
"The environment '%s (%s)' for block '%s' does not exist"
% (env["name"], env["version"], name)
)
elif not _valid_combination(q, env):
errors.append(
"The combination of queue '%s' with environment '%s (%s)' for block '%s' does not exist"
% (q, env["name"], env["version"], name)
)
for name, config in experiment.analyzers.items():
q = config.get("queue", default_q)
env = config.get("environment", default_env)
if not _valid(env):
errors.append(
"The environment '%s (%s)' for analyzer '%s' does not exist"
% (env["name"], env["version"], name)
)
elif not _valid_combination(q, env):
errors.append(
"The combination of queue '%s' with environment '%s (%s)' for analyzer '%s' does not exist"
% (q, env["name"], env["version"], name)
)
return errors
# ----------------------------------------------------------
[docs]def validate_experiment(experiment_info, toolchain_info, user=None):
"""Makes sure the experiment can be run"""
xp = beat.core.experiment.Experiment(
settings.PREFIX, (experiment_info, toolchain_info)
)
if not xp.valid:
return xp, xp.errors
return xp, xp.errors + validate_environments(xp, user)
# ----------------------------------------------------------
[docs]class DeclarationStorage(OverwriteStorage):
def __init__(self, *args, **kwargs):
super(DeclarationStorage, self).__init__(
*args, location=settings.EXPERIMENTS_ROOT, **kwargs
)
# ----------------------------------------------------------
[docs]class ExperimentManager(ContributionManager):
[docs] def get_by_natural_key(
self,
author_username,
toolchain_username,
toolchain_name,
toolchain_version,
name,
):
return self.get(
author__username=author_username,
toolchain__author__username=toolchain_username,
toolchain__name=toolchain_name,
toolchain__version=toolchain_version,
name=name,
)
[docs] def from_author(self, user, author_name, add_public=False):
return (
super(ExperimentManager, self)
.from_author(user, author_name, add_public)
.order_by("-creation_date", "name")
)
[docs] def from_author_and_public(self, user, author_name):
return (
super(ExperimentManager, self)
.from_author_and_public(user, author_name)
.order_by("-creation_date", "name")
)
[docs] def create_experiment(
self, author, toolchain, name, declaration, short_description="", description=""
):
"""Creates a new experiment in pending state"""
# Create the database representation of the experiment
experiment = self.model(
author=author,
toolchain=toolchain,
name=name,
status=self.model.PENDING,
start_date=None,
end_date=None,
)
if not (isinstance(declaration, dict)):
declaration = simplejson.loads(declaration)
if len(short_description) > 0:
declaration["description"] = short_description
experiment.declaration = declaration
# Check the provided description
if description is not None:
experiment.description = description
# Save the experiment (will run the validation)
try:
experiment.save()
except SyntaxError as e:
return (None, None, str(e))
except Exception:
import traceback
return (None, None, traceback.format_exc())
experiment._loaded_status = experiment.status
return experiment, experiment._toolchain_cache, None
# ----------------------------------------------------------
[docs]class Experiment(Shareable):
# _____ Constants __________
PENDING = "P"
SCHEDULED = "S"
RUNNING = "R"
DONE = "D"
FAILED = "F"
CANCELLING = "C"
STATUS = (
(PENDING, "Pending"),
(SCHEDULED, "Scheduled"),
(RUNNING, "Running"),
(DONE, "Done"),
(FAILED, "Failed"),
(CANCELLING, "Canceling"),
)
# _____ Fields __________
author = models.ForeignKey(
User, related_name="experiments", on_delete=models.CASCADE
)
toolchain = models.ForeignKey(
Toolchain, related_name="experiments", on_delete=models.CASCADE
)
name = models.CharField(max_length=200)
short_description = models.CharField(
max_length=100, default="", blank=True, help_text=Messages["short_description"]
)
status = models.CharField(max_length=1, choices=STATUS, default=PENDING)
creation_date = models.DateTimeField(null=True, blank=True, auto_now_add=True)
start_date = models.DateTimeField(null=True, blank=True)
end_date = models.DateTimeField(null=True, blank=True)
declaration_file = models.FileField(
storage=DeclarationStorage(),
upload_to=get_contribution_declaration_filename,
blank=True,
null=True,
max_length=300,
db_column="declaration",
)
description_file = models.FileField(
storage=DeclarationStorage(),
upload_to=get_contribution_description_filename,
blank=True,
null=True,
max_length=300,
db_column="description",
)
# read-only parameters that are updated at every save(), if required
hash = models.CharField(max_length=64)
referenced_datasets = models.ManyToManyField(
DatabaseSet, related_name="experiments", blank=True
)
referenced_algorithms = models.ManyToManyField(
Algorithm, related_name="experiments", blank=True
)
objects = ExperimentManager()
# _____ Meta parameters __________
class Meta:
unique_together = ("author", "toolchain", "name")
ordering = ["-creation_date"]
# _____ Utilities __________
[docs] def natural_key(self):
return (self.author.username,) + self.toolchain.natural_key() + (self.name,)
natural_key.dependencies = ["toolchains.toolchain"]
# _____ Methods __________
[docs] def fullname(self):
return "%s/%s/%s" % (self.author.username, self.toolchain.fullname(), self.name)
[docs] def hashed_path(self, extension=""):
"""Relative path of a file belonging to the object on the respective
storage"""
return os.path.join(
beat.core.hash.toUserPath(self.author.username),
self.toolchain.fullname(),
self.name + extension,
)
[docs] def declaration_filename(self):
"""Relative path of the declaration file on the storage"""
return self.hashed_path(".json")
[docs] def description_filename(self):
"""Relative path of the description file on the storage"""
return self.hashed_path(".rst")
# _____ Utilities __________
def _share_dataformats(self, users, teams):
needed_formats = []
for dataset in self.referenced_datasets.all():
needed_formats.extend(dataset.all_needed_dataformats())
# Retrieve and process the list of referenced dataformats
own_needed_formats = filter(lambda x: x.author == self.author, needed_formats)
other_needed_formats = filter(lambda x: x.author != self.author, needed_formats)
# Ensure that all needed dataformats from other users/teams have the necessary sharing
# preferences
errors = []
for needed_format in other_needed_formats:
errors.extend(needed_format.is_accessible(users=users, teams=teams))
if len(errors) > 0:
raise ShareError(errors)
for dataformats in own_needed_formats:
dataformats.share(users=users, teams=teams)
def __share_algorithms(self, users, teams, algorithms_infos):
# Retrieve and process the list of algorithms referenced by the declaration, and
# the data formats they reference
needed_algorithms = []
for algorithm in self.referenced_algorithms.iterator():
needed_algorithms.append(algorithm)
needed_algorithms = list(set(needed_algorithms))
# Only keep the referenced dataformats and algorithms owned by the author of the
# experiment
# Process the list of referenced algorithms
own_needed_algorithms = filter(
lambda x: x.author == self.author, needed_algorithms
)
other_needed_algorithms = filter(
lambda x: x.author != self.author, needed_algorithms
)
# Ensure that all needed algorithms from other users have the necessary sharing
# preferences
errors = []
for needed_algorithm in other_needed_algorithms:
errors.extend(needed_algorithm.is_accessible(False, users, teams))
if len(errors) > 0:
raise ShareError(errors)
for algorithm in own_needed_algorithms:
if algorithms_infos and algorithm.fullname() in algorithms_infos:
infos = algorithms_infos[algorithm.fullname()]
opensource = infos.get("opensource", False)
else:
opensource = True
algorithm.share(public=opensource, users=users, teams=teams)
# _____ Overrides __________
[docs] def has_attestation(self):
try:
self.attestation
except ObjectDoesNotExist:
return False
else:
return True
[docs] @classmethod
def from_db(cls, db, field_names, values):
instance = super(Experiment, cls).from_db(db, field_names, values)
instance._loaded_status = values[field_names.index("status")]
return instance
[docs] def save(self, *args, **kwargs):
new_experiment = self.id is None
# if changing the experiment status, then make sure start_date is set
if (self.status not in [self.PENDING, self.SCHEDULED]) and (
self.start_date is None
):
self.start_date = self.end_date or datetime.now()
# Retrieve the experiment declaration
declaration = self.declaration
# Compute the hash of the content
content_hash = beat.core.hash.hashJSON(declaration, "description")
content_modified = content_hash != self.hash
if content_modified:
# validates the experiment
xp, errors = validate_experiment(
declaration, self.toolchain.declaration, self.author
)
if errors:
message = (
"The experiment isn't valid, due to the "
"following errors:\n * %s"
)
raise SyntaxError(message % "\n * ".join(errors))
self.hash = content_hash
self.short_description = declaration.get("description", "")
# Ensures that the sharing informations are consistent
if self.sharing == Shareable.USABLE:
self.sharing = Shareable.PUBLIC
# Save the changed files (if necessary)
storage.save_files(self)
is_adding = self._state.adding
if not is_adding and self._loaded_status != self.status:
if self.status in [Experiment.DONE, Experiment.FAILED]:
self.email()
# Invoke the base implementation
super(Experiment, self).save(*args, **kwargs)
# If the filename has changed, move all the files
if self.declaration_filename() != self.declaration_file.name:
storage.rename_file(self, "declaration_file", self.declaration_filename())
storage.rename_file(self, "description_file", self.description_filename())
if content_modified:
# Creates experiment blocks and setup dependencies
self.update_blocks()
# Link the experiment to the datasets
self.referenced_datasets.clear()
for dataset_declaration in xp.datasets.values():
try:
(db_name, db_version) = dataset_declaration["database"].name.split(
"/"
)
dataset = DatabaseSet.objects.get(
protocol__database__name=db_name,
protocol__database__version=db_version,
protocol__name=dataset_declaration["protocol"],
name=dataset_declaration["set"],
)
except Exception:
if new_experiment:
self.delete()
raise SyntaxError(
"The dataset '%s.%s.%s' can't be found"
% (
dataset_declaration["database"].name,
dataset_declaration["protocol"],
dataset_declaration["set"],
)
)
self.referenced_datasets.add(dataset)
# Link the experiment to the algorithms
self.referenced_algorithms.clear()
for algorithm in xp.algorithms:
try:
(username, name, version) = algorithm.split("/")
algorithm_db = Algorithm.objects.get(
author__username=username, name=name, version=int(version)
)
except Exception:
if new_experiment:
self.delete()
raise SyntaxError(
"The algorithm '%s' can't be found" % algorithm.fullname()
)
self.referenced_algorithms.add(algorithm_db)
[docs] def email(self):
"""e-mails owners and shared parties about this experiment status"""
user_email_list = (
[self.author.email]
if self.author.accountsettings.experiment_mail_notifications_enabled
else []
)
user_email_list.extend(
[
user.email
for user in self.shared_with.all()
if user.accountsettings.experiment_mail_notifications_enabled
]
)
all_team_members = [
user for team in self.shared_with_team.all() for user in team.members.all()
]
user_email_list.extend(
[
user.email
for user in all_team_members
if user.email not in user_email_list
and user.accountsettings.experiment_mail_notifications_enabled
]
)
if user_email_list:
if self.status == Experiment.DONE:
subject = "Experiment %s finished successfully" % self.fullname()
template_path = "experiments/successful_experiment_email.txt"
elif self.status == Experiment.FAILED:
subject = "Experiment %s failed" % self.fullname()
template_path = "experiments/failed_experiment_email.txt"
try:
send_mail(
subject,
render_to_string(
template_path,
{
"experiment": self,
"beat_version": __version__,
"site": Site.objects.get_current(),
},
),
settings.DEFAULT_FROM_EMAIL,
user_email_list,
)
except Exception:
import traceback
logger.warn(
"Could not send e-mail to `%s' about " "`%s'. Exception caught: %s",
user_email_list,
self,
traceback.format_exc(),
)
[docs] def share(self, users=None, teams=None, algorithms_infos={}):
self._share_dataformats(users=users, teams=teams)
self.__share_algorithms(
users=users, teams=teams, algorithms_infos=algorithms_infos
)
self.toolchain.share(users=users, teams=teams)
super(Experiment, self).share(users=users, teams=teams)
[docs] def update_blocks(self):
"""Updates internal block representation of an experiment"""
corexp = self.core()
# Loads the experiment execution description, creating the Block's,
# BlockInput's and BlockOutput's as required.
for order_0, (block_name, description) in enumerate(corexp.setup().items()):
# Checks that the Queue/Environment exists
job_description = description["configuration"]
env = Environment.objects.filter(
name=job_description["environment"]["name"],
version=job_description["environment"]["version"],
)
if not env:
logger.warn(
"Cannot find environment `%s (%s)' - not setting",
job_description["environment"]["name"],
job_description["environment"]["version"],
)
env = None
else:
env = env[0]
# Search for queue that contains a specific environment
if env:
queue = Queue.objects.filter(
name=job_description["queue"], environments__in=[env]
)
else:
queue = Queue.objects.filter(name=queue)
if not queue:
env_name = env.fullname() if env else "NULL"
logger.warn(
"Cannot find queue `%s' which contains "
"environment `%s' - not setting",
job_description["queue"],
env_name,
)
queue = None
else:
queue = queue[0]
parts = job_description["algorithm"].split("/")
algorithm = Algorithm.objects.get(
author__username=parts[0], name=parts[1], version=parts[2]
)
# Ties the block in
b = Block.objects.filter(experiment=self, name=block_name).first()
if b is None:
b = Block(experiment=self, name=block_name, algorithm=algorithm)
else:
b.algorithm = algorithm
b.execution_order = order_0 + 1
b.command = simplejson.dumps(job_description, indent=4)
b.status = Block.PENDING
b.analyzer = algorithm.analysis()
b.environment = env
b.queue = queue
b.required_slots = job_description["nb_slots"]
b.channel = job_description["channel"]
b.save()
# from this point: requires block to have an assigned id
b.dependencies.clear()
b.dependencies.add(
*[self.blocks.get(name=k) for k in description["dependencies"]]
)
# reset inputs - creates if necessary only
b.inputs.clear()
for v in job_description["inputs"].values():
if "database" in v: # database input
db = DatabaseSetOutput.objects.get(
set__hash=v["hash"], template__name=v["output"]
)
BlockInput.objects.get_or_create(
block=b, channel=v["channel"], database=db
)
else:
cache, _ = CachedFile.objects.get_or_create(hash=v["hash"])
BlockInput.objects.get_or_create(
block=b, channel=v["channel"], cache=cache
)
# reset outputs - creates if necessary only
b.outputs.clear()
outputs = job_description.get(
"outputs", {"": job_description.get("result")}
)
for v in outputs.values():
cache, cr = CachedFile.objects.get_or_create(hash=v["hash"])
cache.blocks.add(b)
# _____ Methods __________
[docs] def is_busy(self):
return self.status in [
Experiment.PENDING,
Experiment.SCHEDULED,
Experiment.CANCELLING,
]
[docs] def is_done(self):
return self.status in [Experiment.DONE, Experiment.FAILED]
[docs] def is_running(self):
return self.status == Experiment.RUNNING
[docs] def modifiable(self):
return (
(self.reports.count() == 0)
and not self.has_attestation()
and super(Experiment, self).modifiable()
)
[docs] def deletable(self):
return (
(self.reports.count() == 0)
and not self.has_attestation()
and super(Experiment, self).deletable()
)
[docs] def core(self):
return validate_experiment(
self.declaration, self.toolchain.declaration, self.author
)[0]
[docs] def job_splits(self, status=None):
from ...backend.models import JobSplit
retval = JobSplit.objects.filter(job__block__in=self.blocks.all())
if status is not None:
retval = retval.filter(status=status)
return retval
[docs] def get_absolute_url(self):
return reverse(
"experiments:view",
args=(
self.author.username,
self.toolchain.author.username,
self.toolchain.name,
self.toolchain.version,
self.name,
),
)
[docs] def get_api_share_url(self):
return reverse(
"api_experiments:share",
args=(
self.author.username,
self.toolchain.author.username,
self.toolchain.name,
self.toolchain.version,
self.name,
),
)
[docs] def get_api_update_url(self):
return reverse(
"api_experiments:object",
args=(
self.author.username,
self.toolchain.author.username,
self.toolchain.name,
self.toolchain.version,
self.name,
),
)
[docs] def get_admin_change_url(self):
return reverse("admin:experiments_experiment_change", args=(self.id,))
[docs] def completion(self):
if self.start_date is None:
return 0
if self.end_date is not None:
return 100
blocks = self.blocks.all()
if len(blocks) == 0:
return 0
return int(
100
* float(len(filter(lambda x: x.status == Block.DONE, blocks)))
/ len(blocks)
)
[docs] def reset(self):
"""Resets an experiment so it can be run again"""
if not self.is_done():
return # can only reset experiments which are done
self.blocks.update(status=Block.PENDING, start_date=None, end_date=None)
self.start_date = None
self.end_date = None
self.status = self.PENDING
# reset sharing state
self.sharing = Shareable.PRIVATE
self.shared_with.clear()
self.shared_with_team.clear()
# remove associated attestations
if self.has_attestation():
self.attestation.all().delete()
self.save()
[docs] def databases_and_protocols(self):
"""A set of all used database/protocol combinations for all datasets"""
return set(["%s" % k.protocol for k in self.referenced_datasets.all()])
[docs] def analyzers(self):
"""A list of all used analyzers"""
return set(
[k.fullname() for k in self.referenced_algorithms.all() if k.analysis()]
)
# _____ Properties __________
description = property(get_description, set_description)
declaration = property(get_declaration, set_declaration)
declaration_string = property(beat.web.common.models.get_declaration_string)
[docs] def schedule(self):
"""Schedules this experiment for execution at the backend"""
from ...backend.helpers import schedule_experiment
schedule_experiment(self)
[docs] def cancel(self):
"""Cancels the execution of this experiment on the backend."""
from ...backend.helpers import cancel_experiment
cancel_experiment(self)
[docs] def fork(self, username=None, name=None):
"""Forks this experiment under a new username or name"""
author = username or self.author
name = name or self.name
xp, _, __ = Experiment.objects.create_experiment(
author,
self.toolchain,
name,
self.declaration,
self.short_description,
self.description,
)
return xp