Source code for beat.web.experiments.models.experiment

#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.web module of the BEAT platform.              #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################

from django.db import models
from django.contrib.auth.models import User
from django.core.urlresolvers import reverse
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.core.mail import send_mail
from django.template.loader import render_to_string
from django.contrib.sites.models import Site

import beat.core.hash
import beat.core.experiment

from ...algorithms.models import Algorithm
from ...toolchains.models import Toolchain

from ...common.models import Shareable
from ...common.models import ContributionManager
from ...common.models import get_contribution_declaration_filename
from ...common.models import get_contribution_description_filename
from ...common.models import get_description
from ...common.models import set_description
from ...common.models import get_declaration
from ...common.models import set_declaration
from ...common import storage

from ...common.exceptions import ShareError
from ...common.texts import Messages
from ...common.storage import OverwriteStorage
from ...backend.models import Queue
from ...backend.models import Environment
from ...databases.models import DatabaseSet
from ...databases.models import DatabaseSetOutput
from ... import __version__

from .block import Block
from .block_input import BlockInput
from .cached_file import CachedFile

from datetime import datetime

import os
import simplejson

import logging

logger = logging.getLogger(__name__)


# ----------------------------------------------------------


[docs]def validate_environments(experiment, user=None): """Validates the environments throughout the experiment""" def _valid(environment): q = ( Environment.objects.for_user(user, True) if user is not None else Environment.objects ) return bool(q.filter(name=environment["name"], version=environment["version"])) def _valid_combination(queue, environment): return bool( Queue.objects.filter( name=queue, environments__name=environment["name"], environments__version=environment["version"], ) ) errors = [] default_q = experiment.data["globals"]["queue"] default_env = experiment.data["globals"]["environment"] if not _valid(default_env): errors.append( "The environment '%s (%s)' in the global experiment declaration does not exist" % (default_env["name"], default_env["version"]) ) elif not _valid_combination(default_q, default_env): errors.append( "The combination of queue '%s' with environment '%s (%s)' in the global experiment declaration does not exist" % (default_q, default_env["name"], default_env["version"]) ) for name, config in experiment.blocks.items(): q = config.get("queue", default_q) env = config.get("environment", default_env) if not _valid(env): errors.append( "The environment '%s (%s)' for block '%s' does not exist" % (env["name"], env["version"], name) ) elif not _valid_combination(q, env): errors.append( "The combination of queue '%s' with environment '%s (%s)' for block '%s' does not exist" % (q, env["name"], env["version"], name) ) for name, config in experiment.analyzers.items(): q = config.get("queue", default_q) env = config.get("environment", default_env) if not _valid(env): errors.append( "The environment '%s (%s)' for analyzer '%s' does not exist" % (env["name"], env["version"], name) ) elif not _valid_combination(q, env): errors.append( "The combination of queue '%s' with environment '%s (%s)' for analyzer '%s' does not exist" % (q, env["name"], env["version"], name) ) return errors
# ----------------------------------------------------------
[docs]def validate_experiment(experiment_info, toolchain_info, user=None): """Makes sure the experiment can be run""" xp = beat.core.experiment.Experiment( settings.PREFIX, (experiment_info, toolchain_info) ) if not xp.valid: return xp, xp.errors return xp, xp.errors + validate_environments(xp, user)
# ----------------------------------------------------------
[docs]class DeclarationStorage(OverwriteStorage): def __init__(self, *args, **kwargs): super(DeclarationStorage, self).__init__( *args, location=settings.EXPERIMENTS_ROOT, **kwargs )
# ----------------------------------------------------------
[docs]class ExperimentManager(ContributionManager):
[docs] def get_by_natural_key( self, author_username, toolchain_username, toolchain_name, toolchain_version, name, ): return self.get( author__username=author_username, toolchain__author__username=toolchain_username, toolchain__name=toolchain_name, toolchain__version=toolchain_version, name=name, )
[docs] def from_author(self, user, author_name, add_public=False): return ( super(ExperimentManager, self) .from_author(user, author_name, add_public) .order_by("-creation_date", "name") )
[docs] def from_author_and_public(self, user, author_name): return ( super(ExperimentManager, self) .from_author_and_public(user, author_name) .order_by("-creation_date", "name") )
[docs] def create_experiment( self, author, toolchain, name, declaration, short_description="", description="" ): """Creates a new experiment in pending state""" # Create the database representation of the experiment experiment = self.model( author=author, toolchain=toolchain, name=name, status=self.model.PENDING, start_date=None, end_date=None, ) if not (isinstance(declaration, dict)): declaration = simplejson.loads(declaration) if len(short_description) > 0: declaration["description"] = short_description experiment.declaration = declaration # Check the provided description if description is not None: experiment.description = description # Save the experiment (will run the validation) try: experiment.save() except SyntaxError as e: return (None, None, str(e)) except Exception: import traceback return (None, None, traceback.format_exc()) experiment._loaded_status = experiment.status return experiment, experiment._toolchain_cache, None
# ----------------------------------------------------------
[docs]class Experiment(Shareable): # _____ Constants __________ PENDING = "P" SCHEDULED = "S" RUNNING = "R" DONE = "D" FAILED = "F" CANCELLING = "C" STATUS = ( (PENDING, "Pending"), (SCHEDULED, "Scheduled"), (RUNNING, "Running"), (DONE, "Done"), (FAILED, "Failed"), (CANCELLING, "Canceling"), ) # _____ Fields __________ author = models.ForeignKey( User, related_name="experiments", on_delete=models.CASCADE ) toolchain = models.ForeignKey( Toolchain, related_name="experiments", on_delete=models.CASCADE ) name = models.CharField(max_length=200) short_description = models.CharField( max_length=100, default="", blank=True, help_text=Messages["short_description"] ) status = models.CharField(max_length=1, choices=STATUS, default=PENDING) creation_date = models.DateTimeField(null=True, blank=True, auto_now_add=True) start_date = models.DateTimeField(null=True, blank=True) end_date = models.DateTimeField(null=True, blank=True) declaration_file = models.FileField( storage=DeclarationStorage(), upload_to=get_contribution_declaration_filename, blank=True, null=True, max_length=300, db_column="declaration", ) description_file = models.FileField( storage=DeclarationStorage(), upload_to=get_contribution_description_filename, blank=True, null=True, max_length=300, db_column="description", ) # read-only parameters that are updated at every save(), if required hash = models.CharField(max_length=64) referenced_datasets = models.ManyToManyField( DatabaseSet, related_name="experiments", blank=True ) referenced_algorithms = models.ManyToManyField( Algorithm, related_name="experiments", blank=True ) objects = ExperimentManager() # _____ Meta parameters __________ class Meta: unique_together = ("author", "toolchain", "name") ordering = ["-creation_date"] # _____ Utilities __________
[docs] def natural_key(self): return (self.author.username,) + self.toolchain.natural_key() + (self.name,)
natural_key.dependencies = ["toolchains.toolchain"] # _____ Methods __________
[docs] def fullname(self): return "%s/%s/%s" % (self.author.username, self.toolchain.fullname(), self.name)
[docs] def hashed_path(self, extension=""): """Relative path of a file belonging to the object on the respective storage""" return os.path.join( beat.core.hash.toUserPath(self.author.username), self.toolchain.fullname(), self.name + extension, )
[docs] def declaration_filename(self): """Relative path of the declaration file on the storage""" return self.hashed_path(".json")
[docs] def description_filename(self): """Relative path of the description file on the storage""" return self.hashed_path(".rst")
# _____ Utilities __________ def _share_dataformats(self, users, teams): needed_formats = [] for dataset in self.referenced_datasets.all(): needed_formats.extend(dataset.all_needed_dataformats()) # Retrieve and process the list of referenced dataformats own_needed_formats = filter(lambda x: x.author == self.author, needed_formats) other_needed_formats = filter(lambda x: x.author != self.author, needed_formats) # Ensure that all needed dataformats from other users/teams have the necessary sharing # preferences errors = [] for needed_format in other_needed_formats: errors.extend(needed_format.is_accessible(users=users, teams=teams)) if len(errors) > 0: raise ShareError(errors) for dataformats in own_needed_formats: dataformats.share(users=users, teams=teams) def __share_algorithms(self, users, teams, algorithms_infos): # Retrieve and process the list of algorithms referenced by the declaration, and # the data formats they reference needed_algorithms = [] for algorithm in self.referenced_algorithms.iterator(): needed_algorithms.append(algorithm) needed_algorithms = list(set(needed_algorithms)) # Only keep the referenced dataformats and algorithms owned by the author of the # experiment # Process the list of referenced algorithms own_needed_algorithms = filter( lambda x: x.author == self.author, needed_algorithms ) other_needed_algorithms = filter( lambda x: x.author != self.author, needed_algorithms ) # Ensure that all needed algorithms from other users have the necessary sharing # preferences errors = [] for needed_algorithm in other_needed_algorithms: errors.extend(needed_algorithm.is_accessible(False, users, teams)) if len(errors) > 0: raise ShareError(errors) for algorithm in own_needed_algorithms: if algorithms_infos and algorithm.fullname() in algorithms_infos: infos = algorithms_infos[algorithm.fullname()] opensource = infos.get("opensource", False) else: opensource = True algorithm.share(public=opensource, users=users, teams=teams) # _____ Overrides __________
[docs] def has_attestation(self): try: self.attestation except ObjectDoesNotExist: return False else: return True
[docs] @classmethod def from_db(cls, db, field_names, values): instance = super(Experiment, cls).from_db(db, field_names, values) instance._loaded_status = values[field_names.index("status")] return instance
[docs] def save(self, *args, **kwargs): new_experiment = self.id is None # if changing the experiment status, then make sure start_date is set if (self.status not in [self.PENDING, self.SCHEDULED]) and ( self.start_date is None ): self.start_date = self.end_date or datetime.now() # Retrieve the experiment declaration declaration = self.declaration # Compute the hash of the content content_hash = beat.core.hash.hashJSON(declaration, "description") content_modified = content_hash != self.hash if content_modified: # validates the experiment xp, errors = validate_experiment( declaration, self.toolchain.declaration, self.author ) if errors: message = ( "The experiment isn't valid, due to the " "following errors:\n * %s" ) raise SyntaxError(message % "\n * ".join(errors)) self.hash = content_hash self.short_description = declaration.get("description", "") # Ensures that the sharing informations are consistent if self.sharing == Shareable.USABLE: self.sharing = Shareable.PUBLIC # Save the changed files (if necessary) storage.save_files(self) is_adding = self._state.adding if not is_adding and self._loaded_status != self.status: if self.status in [Experiment.DONE, Experiment.FAILED]: self.email() # Invoke the base implementation super(Experiment, self).save(*args, **kwargs) # If the filename has changed, move all the files if self.declaration_filename() != self.declaration_file.name: storage.rename_file(self, "declaration_file", self.declaration_filename()) storage.rename_file(self, "description_file", self.description_filename()) if content_modified: # Creates experiment blocks and setup dependencies self.update_blocks() # Link the experiment to the datasets self.referenced_datasets.clear() for dataset_declaration in xp.datasets.values(): try: (db_name, db_version) = dataset_declaration["database"].name.split( "/" ) dataset = DatabaseSet.objects.get( protocol__database__name=db_name, protocol__database__version=db_version, protocol__name=dataset_declaration["protocol"], name=dataset_declaration["set"], ) except Exception: if new_experiment: self.delete() raise SyntaxError( "The dataset '%s.%s.%s' can't be found" % ( dataset_declaration["database"].name, dataset_declaration["protocol"], dataset_declaration["set"], ) ) self.referenced_datasets.add(dataset) # Link the experiment to the algorithms self.referenced_algorithms.clear() for algorithm in xp.algorithms: try: (username, name, version) = algorithm.split("/") algorithm_db = Algorithm.objects.get( author__username=username, name=name, version=int(version) ) except Exception: if new_experiment: self.delete() raise SyntaxError( "The algorithm '%s' can't be found" % algorithm.fullname() ) self.referenced_algorithms.add(algorithm_db)
[docs] def email(self): """e-mails owners and shared parties about this experiment status""" user_email_list = ( [self.author.email] if self.author.accountsettings.experiment_mail_notifications_enabled else [] ) user_email_list.extend( [ user.email for user in self.shared_with.all() if user.accountsettings.experiment_mail_notifications_enabled ] ) all_team_members = [ user for team in self.shared_with_team.all() for user in team.members.all() ] user_email_list.extend( [ user.email for user in all_team_members if user.email not in user_email_list and user.accountsettings.experiment_mail_notifications_enabled ] ) if user_email_list: if self.status == Experiment.DONE: subject = "Experiment %s finished successfully" % self.fullname() template_path = "experiments/successful_experiment_email.txt" elif self.status == Experiment.FAILED: subject = "Experiment %s failed" % self.fullname() template_path = "experiments/failed_experiment_email.txt" try: send_mail( subject, render_to_string( template_path, { "experiment": self, "beat_version": __version__, "site": Site.objects.get_current(), }, ), settings.DEFAULT_FROM_EMAIL, user_email_list, ) except Exception: import traceback logger.warn( "Could not send e-mail to `%s' about " "`%s'. Exception caught: %s", user_email_list, self, traceback.format_exc(), )
[docs] def share(self, users=None, teams=None, algorithms_infos={}): self._share_dataformats(users=users, teams=teams) self.__share_algorithms( users=users, teams=teams, algorithms_infos=algorithms_infos ) self.toolchain.share(users=users, teams=teams) super(Experiment, self).share(users=users, teams=teams)
[docs] def update_blocks(self): """Updates internal block representation of an experiment""" corexp = self.core() # Loads the experiment execution description, creating the Block's, # BlockInput's and BlockOutput's as required. for order_0, (block_name, description) in enumerate(corexp.setup().items()): # Checks that the Queue/Environment exists job_description = description["configuration"] env = Environment.objects.filter( name=job_description["environment"]["name"], version=job_description["environment"]["version"], ) if not env: logger.warn( "Cannot find environment `%s (%s)' - not setting", job_description["environment"]["name"], job_description["environment"]["version"], ) env = None else: env = env[0] # Search for queue that contains a specific environment if env: queue = Queue.objects.filter( name=job_description["queue"], environments__in=[env] ) else: queue = Queue.objects.filter(name=queue) if not queue: env_name = env.fullname() if env else "NULL" logger.warn( "Cannot find queue `%s' which contains " "environment `%s' - not setting", job_description["queue"], env_name, ) queue = None else: queue = queue[0] parts = job_description["algorithm"].split("/") algorithm = Algorithm.objects.get( author__username=parts[0], name=parts[1], version=parts[2] ) # Ties the block in b = Block.objects.filter(experiment=self, name=block_name).first() if b is None: b = Block(experiment=self, name=block_name, algorithm=algorithm) else: b.algorithm = algorithm b.execution_order = order_0 + 1 b.command = simplejson.dumps(job_description, indent=4) b.status = Block.PENDING b.analyzer = algorithm.analysis() b.environment = env b.queue = queue b.required_slots = job_description["nb_slots"] b.channel = job_description["channel"] b.save() # from this point: requires block to have an assigned id b.dependencies.clear() b.dependencies.add( *[self.blocks.get(name=k) for k in description["dependencies"]] ) # reset inputs - creates if necessary only b.inputs.clear() for v in job_description["inputs"].values(): if "database" in v: # database input db = DatabaseSetOutput.objects.get( set__hash=v["hash"], template__name=v["output"] ) BlockInput.objects.get_or_create( block=b, channel=v["channel"], database=db ) else: cache, _ = CachedFile.objects.get_or_create(hash=v["hash"]) BlockInput.objects.get_or_create( block=b, channel=v["channel"], cache=cache ) # reset outputs - creates if necessary only b.outputs.clear() outputs = job_description.get( "outputs", {"": job_description.get("result")} ) for v in outputs.values(): cache, cr = CachedFile.objects.get_or_create(hash=v["hash"]) cache.blocks.add(b)
# _____ Methods __________
[docs] def is_busy(self): return self.status in [ Experiment.PENDING, Experiment.SCHEDULED, Experiment.CANCELLING, ]
[docs] def is_done(self): return self.status in [Experiment.DONE, Experiment.FAILED]
[docs] def is_running(self): return self.status == Experiment.RUNNING
[docs] def modifiable(self): return ( (self.reports.count() == 0) and not self.has_attestation() and super(Experiment, self).modifiable() )
[docs] def deletable(self): return ( (self.reports.count() == 0) and not self.has_attestation() and super(Experiment, self).deletable() )
[docs] def core(self): return validate_experiment( self.declaration, self.toolchain.declaration, self.author )[0]
[docs] def job_splits(self, status=None): from ...backend.models import JobSplit retval = JobSplit.objects.filter(job__block__in=self.blocks.all()) if status is not None: retval = retval.filter(status=status) return retval
[docs] def get_absolute_url(self): return reverse( "experiments:view", args=( self.author.username, self.toolchain.author.username, self.toolchain.name, self.toolchain.version, self.name, ), )
[docs] def get_api_share_url(self): return reverse( "api_experiments:share", args=( self.author.username, self.toolchain.author.username, self.toolchain.name, self.toolchain.version, self.name, ), )
[docs] def get_api_update_url(self): return reverse( "api_experiments:object", args=( self.author.username, self.toolchain.author.username, self.toolchain.name, self.toolchain.version, self.name, ), )
[docs] def get_admin_change_url(self): return reverse("admin:experiments_experiment_change", args=(self.id,))
[docs] def completion(self): if self.start_date is None: return 0 if self.end_date is not None: return 100 blocks = self.blocks.all() if len(blocks) == 0: return 0 return int( 100 * float(len(filter(lambda x: x.status == Block.DONE, blocks))) / len(blocks) )
[docs] def all_needed_dataformats(self): result = [] for algorithm in self.referenced_algorithms.all(): result.extend(algorithm.all_needed_dataformats()) for dataset in self.referenced_datasets.all(): result.extend(dataset.all_needed_dataformats()) return list(set(result))
[docs] def reset(self): """Resets an experiment so it can be run again""" if not self.is_done(): return # can only reset experiments which are done self.blocks.update(status=Block.PENDING, start_date=None, end_date=None) self.start_date = None self.end_date = None self.status = self.PENDING # reset sharing state self.sharing = Shareable.PRIVATE self.shared_with.clear() self.shared_with_team.clear() # remove associated attestations if self.has_attestation(): self.attestation.all().delete() self.save()
[docs] def databases_and_protocols(self): """A set of all used database/protocol combinations for all datasets""" return set(["%s" % k.protocol for k in self.referenced_datasets.all()])
[docs] def analyzers(self): """A list of all used analyzers""" return set( [k.fullname() for k in self.referenced_algorithms.all() if k.analysis()] )
# _____ Properties __________ description = property(get_description, set_description) declaration = property(get_declaration, set_declaration) declaration_string = property(beat.web.common.models.get_declaration_string)
[docs] def schedule(self): """Schedules this experiment for execution at the backend""" from ...backend.helpers import schedule_experiment schedule_experiment(self)
[docs] def cancel(self): """Cancels the execution of this experiment on the backend.""" from ...backend.helpers import cancel_experiment cancel_experiment(self)
[docs] def fork(self, username=None, name=None): """Forks this experiment under a new username or name""" author = username or self.author name = name or self.name xp, _, __ = Experiment.objects.create_experiment( author, self.toolchain, name, self.declaration, self.short_description, self.description, ) return xp