#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
import random
from datetime import datetime
import simplejson as json
from django.conf import settings
from django.contrib.auth.models import User
from django.core import exceptions as django_exceptions
from django.db import models
from django.db.models import Q
from django.urls import reverse
from django.utils.encoding import force_bytes
from django.utils.encoding import force_text
from docutils.core import publish_parts
from ..algorithms.models import Algorithm
from ..common.texts import Messages
from ..experiments.models import Experiment
from ..plotters.models import Plotter
from ..plotters.models import PlotterParameter
# ----------------------------------------------------------
[docs]class ReportManager(models.Manager):
[docs] def create_object(
self,
author,
name,
short_description="",
description="",
content={},
experiments=[],
):
# Creation of the report
report = Report()
report.name = name
report.author = author
report.short_description = short_description
report.description = description
if isinstance(content, dict):
report.content = json.dumps(content, indent=4)
else:
report.content = content
report.creation_date = datetime.now()
report.last_edited_date = datetime.now()
report.publication_date = None
report.expiration_date = None
report.status = self.model.EDITABLE
report.save()
# Add the experiments
result = report.add_experiments(experiments)
if not (result["success"]):
report.delete()
report = None
return (report, result)
[docs] def editable(self):
return self.filter(status=Report.EDITABLE)
[docs] def locked(self):
return self.filter(status=Report.LOCKED)
[docs] def published(self):
return self.filter(status=Report.PUBLISHED)
[docs] def for_user(self, user, add_public=False):
if user.is_anonymous:
return self.published()
query = Q(author=user)
if add_public:
query |= Q(status=Report.PUBLISHED)
return self.filter(query).distinct()
# ----------------------------------------------------------
[docs]class Report(models.Model):
# _____ Constants __________
EDITABLE = "E"
LOCKED = "L"
PUBLISHED = "P"
REPORT_STATUS = (
(EDITABLE, "Editable"),
(LOCKED, "Locked"),
(PUBLISHED, "Published"),
)
# _____ Fields __________
status = models.CharField(max_length=1, choices=REPORT_STATUS, default=EDITABLE)
name = models.CharField(max_length=200, help_text=Messages["name"], blank=False)
number = models.IntegerField(blank=True)
author = models.ForeignKey(
User, related_name="%(class)ss", on_delete=models.CASCADE
)
experiments = models.ManyToManyField(Experiment, related_name="reports", blank=True)
creation_date = models.DateTimeField()
last_edited_date = models.DateTimeField(null=True)
expiration_date = models.DateTimeField(null=True, blank=True)
publication_date = models.DateTimeField(null=True, blank=True)
short_description = models.CharField(
max_length=100, default="", blank=True, help_text=Messages["short_description"]
)
description = models.TextField(default="", blank=True)
content = models.TextField(default="{}", blank=True)
analyzer = models.ForeignKey(
Algorithm,
related_name="reports",
null=True,
blank=True,
on_delete=models.CASCADE,
)
# read-only parameters that are updated at every save(), if required
referenced_plotters = models.ManyToManyField(
Plotter, related_name="reports", blank=True
)
referenced_plotterparameters = models.ManyToManyField(
PlotterParameter, related_name="reports", blank=True
)
objects = ReportManager()
def __str__(self):
return "Report %s (#%d)" % (self.fullname(), self.number)
[docs] def fullname(self):
return "%s/%s" % (self.author.username, self.name)
[docs] def get_absolute_url(self):
return reverse("reports:view", args=(self.number,),)
[docs] def get_author_absolute_url(self):
return reverse("reports:author-view", args=(self.author.username, self.name),)
[docs] def get_api_update_url(self):
return reverse("api_reports:object", args=(self.author.username, self.name),)
[docs] def get_api_add_url(self):
return reverse(
"api_reports:add_experiments", args=(self.author.username, self.name),
)
[docs] def get_api_remove_url(self):
return reverse(
"api_reports:remove_experiments", args=(self.author.username, self.name),
)
[docs] def save(self, *args, **kwargs):
if self.number is None:
# Generate a unique report number
used_numbers = Report.objects.values_list("number", flat=True)
number = 0
while (number == 0) or number in used_numbers:
number = random.randint(100000, 2 ** 31) # nosec
self.number = number
report_content = json.loads(self.content)
report_content_charts = dict(
filter(lambda item: item[0].startswith("chart"), report_content.items())
)
self.last_edited_date = datetime.now()
super(Report, self).save(*args, **kwargs)
self.referenced_plotters.clear()
self.referenced_plotterparameters.clear()
for key, value in report_content_charts.items():
plotter_parts = value["data"]["plotter"].split("/")
plotterparameters_parts = value["selected_template"].split("/")
plotter = Plotter.objects.get(
author__username=plotter_parts[0],
name=plotter_parts[1],
version=plotter_parts[2],
)
plotterparameter = PlotterParameter.objects.get(
author__username=plotterparameters_parts[0],
name=plotterparameters_parts[1],
version=plotterparameters_parts[2],
)
self.referenced_plotters.add(plotter)
self.referenced_plotterparameters.add(plotterparameter)
[docs] def add_experiments(self, experiment_fullname_list):
# Check that the report is modifiable
if self.status != Report.EDITABLE:
return {
"success": False,
"error": "Report not modifiable",
}
# Process the list of experiments
accessible_experiments = []
inaccessible_experiments = []
experiments, unknown_experiments = self._get_experiments_from_fullname_list(
experiment_fullname_list
)
for experiment in experiments:
# Check experiments accessibility
(is_accessible, accessibility) = experiment.accessibility_for(self.author)
if not (is_accessible) or (experiment.status != Experiment.DONE):
inaccessible_experiments.append(experiment.fullname())
continue
accessible_experiments.append(experiment)
# Add the experiments to the report
incompatible_experiments = []
for experiment in accessible_experiments:
if len(experiment.blocks.filter(analyzer=True)) >= 1:
self.experiments.add(experiment)
else:
incompatible_experiments.append(experiment.fullname())
return {
"success": True,
"unknown_experiments": unknown_experiments,
"inaccessible_experiments": inaccessible_experiments,
"incompatible_experiments": incompatible_experiments,
}
[docs] def remove_experiments(self, experiment_fullname_list):
# Check that the report is modifiable
if self.status != Report.EDITABLE:
return
experiments, unknown_experiments = self._get_experiments_from_fullname_list(
experiment_fullname_list
)
for experiment in experiments:
self.experiments.remove(experiment)
if self.experiments.count() == 0:
self.analyzer = None
self.save()
def _get_experiments_from_fullname_list(self, experiment_fullname_list):
experiments = []
unknown_experiments = []
for experiment_name in experiment_fullname_list:
parts = experiment_name.split("/")
if len(parts) != 5:
raise django_exceptions.ValidationError(
{"experiment": "Invalid experiment full name"}
)
try:
experiment = Experiment.objects.get(
author__username=parts[0],
toolchain__author__username=parts[1],
toolchain__name=parts[2],
toolchain__version=int(parts[3]),
name=parts[4],
)
except Experiment.DoesNotExist:
unknown_experiments.append(experiment_name)
else:
experiments.append(experiment)
return experiments, unknown_experiments
def _get_experiments_and_alias(self, alias_filter):
experiments_list = []
alias_list = []
report_content = json.loads(self.content)
if "alias_experiments" not in report_content:
report_content["alias_experiments"] = {}
for experiment in self.experiments.iterator():
if experiment.fullname() not in report_content["alias_experiments"]:
report_content["alias_experiments"][
experiment.fullname()
] = experiment.name
try:
alias_filter.index(
report_content["alias_experiments"][experiment.fullname()]
)
experiments_list.append(experiment.fullname())
except Exception: # nosec
pass
alias_list = map(
lambda x: report_content["alias_experiments"][x], experiments_list
)
return experiments_list, alias_list
# the itemStr can either be:
[docs] def compileTextItem(self, itemStr):
content = json.loads(self.content)
rstStr = ""
try:
textBlockMap = itemStr.split("|")
rstStr = content["groups"][textBlockMap[0]]["reportItems"][
int(textBlockMap[1])
]["content"]["text"]
except KeyError:
rstStr = itemStr
docutils_settings = getattr(settings, "RESTRUCTUREDTEXT_FILTER_SETTINGS", {})
parts = publish_parts(
source=force_bytes(rstStr),
writer_name="html4css1",
settings_overrides=docutils_settings,
)
return force_text(parts["fragment"])