Source code for beat.web.reports.api

#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.web module of the BEAT platform.              #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################

import json

from datetime import datetime, timedelta

from django.conf import settings
from django.contrib.auth.models import User
from django.shortcuts import get_object_or_404
from django.db.models import Q
from django.core.urlresolvers import reverse

from rest_framework import generics
from rest_framework import views
from rest_framework import permissions
from rest_framework.response import Response
from rest_framework import status

from ..common.models import Shareable
from ..common.exceptions import ShareError
from ..common.mixins import CommonContextMixin
from ..common.responses import BadRequestResponse, ForbiddenResponse
from ..common.utils import ensure_html
from ..experiments.serializers import ExperimentResultsSerializer

from .serializers import SimpleReportSerializer
from .serializers import FullReportSerializer
from .serializers import ReportCreationSerializer
from .serializers import ReportUpdateSerializer
from .serializers import ReportCreationFailedException

from .models import Report

from .permissions import (
    IsAuthor,
    IsAuthorOrPublished,
    IsAccessibleOutside,
    IsEditable,
    IsLocked,
)


# ----------------------------------------------------------


[docs]class UserReportListView(generics.ListCreateAPIView): model = Report serializer_class = SimpleReportSerializer writing_serializer_class = ReportCreationSerializer
[docs] def get_queryset(self): owner_name = self.kwargs.get("owner_name") owner = get_object_or_404(User, username=owner_name) if self.request.user == owner: queryset = Report.objects.filter(author=owner) else: queryset = Report.objects.filter( Q(author=owner), Q(status=Report.PUBLISHED) ).distinct() return queryset
[docs] def get_permissions(self): permission_classes = [permissions.AllowAny] if self.request.method == "POST": permission_classes = [permissions.IsAuthenticated, IsAuthor] self.permission_classes = permission_classes return super(UserReportListView, self).get_permissions()
[docs] def get_serializer(self, *args, **kwargs): if self.request.method == "POST": self.serializer_class = self.writing_serializer_class return super(UserReportListView, self).get_serializer(*args, **kwargs)
[docs] def post(self, request, owner_name): serializer = self.get_serializer( data=request.data, context={"user": request.user}, partial=True ) serializer.is_valid(raise_exception=True) try: report = serializer.save() except ReportCreationFailedException: pass details = serializer.details if details["success"]: result = { "name": report.fullname(), "url": reverse( "api_reports:object", kwargs={ "owner_name": report.author.username, "report_name": report.name, }, ), } if len(details["unknown_experiments"]) > 0: result["unknown_experiments"] = details["unknown_experiments"] if len(details["inaccessible_experiments"]) > 0: result["inaccessible_experiments"] = details["inaccessible_experiments"] if len(details["incompatible_experiments"]) > 0: result["incompatible_experiments"] = details["incompatible_experiments"] return Response(result, status=status.HTTP_201_CREATED) else: if "error" in details: return BadRequestResponse({"error": details["error"]}) elif "common_analyzers" in details: result = {"common_analyzers": details["common_analyzers"]} return BadRequestResponse(result) return BadRequestResponse()
# ----------------------------------------------------------
[docs]class ReportDetailView(generics.RetrieveUpdateDestroyAPIView): model = Report serializer_class = FullReportSerializer writing_serializer_class = ReportUpdateSerializer
[docs] def get_queryset(self): if "number" in self.kwargs: report = get_object_or_404(Report, number=int(self.kwargs.get("number"))) else: owner_name = self.kwargs.get("owner_name") report_name = self.kwargs.get("report_name") report = get_object_or_404( Report, author__username=owner_name, name=report_name ) self.check_object_permissions(self.request, report) return report
[docs] def get_permissions(self): permission_classes = [IsAuthorOrPublished] if "number" in self.kwargs: permission_classes = [IsAccessibleOutside] else: permission_classes = [IsAuthorOrPublished] if self.request.method != "GET": permission_classes = [IsAuthor, IsEditable, permissions.IsAuthenticated] self.permission_classes = permission_classes return super(ReportDetailView, self).get_permissions()
[docs] def get_serializer(self, *args, **kwargs): if self.request.method in ["PUT", "PATCH"]: self.serializer_class = self.writing_serializer_class return super(ReportDetailView, self).get_serializer(*args, **kwargs)
[docs] def get(self, request, owner_name=None, report_name=None, number=None): report = self.get_queryset() serializer = self.get_serializer(report, context={"request": request}) data = serializer.data if "number" in self.kwargs and report.status == Report.LOCKED: data["anonymous"] = True return Response(data)
[docs] def delete(self, request, owner_name, report_name): report = self.get_queryset() report.delete() return Response(status=status.HTTP_204_NO_CONTENT)
[docs] def update(self, request, owner_name, report_name): # Process the query string if "fields" in request.GET: fields_to_return = request.GET["fields"].split(",") else: # Available fields (not returned by default): # - html_description fields_to_return = [] report = self.get_queryset() self.check_object_permissions(request, report) serializer = self.writing_serializer_class( report, data=request.data, partial=True ) if not (serializer.is_valid()): return BadRequestResponse(serializer.errors) db_object = serializer.save() details = serializer.details result = { "name": db_object.fullname(), "short_description": db_object.short_description, "url": reverse( "api_reports:object", kwargs={ "owner_name": db_object.author.username, "report_name": db_object.name, }, ), } if details is not None: if len(details["unknown_experiments"]) > 0: result["unknown_experiments"] = details["unknown_experiments"] if len(details["inaccessible_experiments"]) > 0: result["inaccessible_experiments"] = details["inaccessible_experiments"] if len(details["incompatible_experiments"]) > 0: result["incompatible_experiments"] = details["incompatible_experiments"] # Retrieve the description in HTML format (if necessary) if "html_description" in fields_to_return: description = db_object.description if len(description) > 0: result["html_description"] = ensure_html(description) else: result["html_description"] = "" response = Response(result, status=status.HTTP_200_OK) response["Location"] = result["url"] return response
# ----------------------------------------------------------
[docs]class ReportListView(generics.ListAPIView): model = Report serializer_class = SimpleReportSerializer permission_classes = [permissions.AllowAny]
[docs] def get_queryset(self): request = self.request if not request.user.is_anonymous(): queryset = Report.objects.filter( Q(author=request.user) | Q(status=Report.PUBLISHED) ).distinct() else: queryset = Report.objects.filter(status=Report.PUBLISHED) return queryset
# ----------------------------------------------------------
[docs]class BaseReportActionView(views.APIView): model = Report permission_classes = [permissions.IsAuthenticated, IsAuthor]
[docs] def get_queryset(self): owner_name = self.kwargs.get("owner_name") report_name = self.kwargs.get("report_name") report = get_object_or_404( Report, author__username=owner_name, name=report_name ) self.check_object_permissions(self.request, report) return report
# ----------------------------------------------------------
[docs]class LockReportView(BaseReportActionView): permission_classes = BaseReportActionView.permission_classes + [IsEditable]
[docs] def post(self, request, owner_name, report_name): report = self.get_queryset() if report.experiments.count() == 0: return ForbiddenResponse("Report is empty and has no experiments") unusable_experiments = [] for experiment in report.experiments.all(): if experiment.sharing == Shareable.PUBLIC: continue if experiment.author != request.user: unusable_experiments.append(experiment) if len(unusable_experiments) > 0: error = {} error["unusable_experiments"] = map( lambda x: x.fullname(), unusable_experiments ) return Response(error, status=status.HTTP_403_FORBIDDEN) content = json.loads(report.content) if len(content.keys()) == 0: return ForbiddenResponse("Report is empty") report.status = Report.LOCKED report.expiration_date = datetime.now() + timedelta( days=settings.EXPIRATION_DELTA ) report.save() return Response(status=status.HTTP_204_NO_CONTENT)
# ----------------------------------------------------------
[docs]class PublishReportView(BaseReportActionView): permission_classes = BaseReportActionView.permission_classes + [IsLocked]
[docs] def post(self, request, owner_name, report_name): report = self.get_queryset() visible_algorithms_names = [] if "visible_algorithms" in request.data: visible_algorithms_names = request.data["visible_algorithms"] # Build algorithms sharing information for experiment in report.experiments.filter(author=request.user).exclude( sharing=Shareable.PUBLIC ): own_needed_algorithms = experiment.referenced_algorithms.filter( author=experiment.author ) algorithms_infos = {} visible_algorithms = filter( lambda x: x.fullname() in visible_algorithms_names, own_needed_algorithms, ) for visible_algorithm in visible_algorithms: algorithms_infos[visible_algorithm.fullname()] = {"opensource": False} public_algorithms = filter( lambda x: x.fullname() not in visible_algorithms_names, own_needed_algorithms, ) for public_algorithm in public_algorithms: algorithms_infos[public_algorithm.fullname()] = {"opensource": True} # Share the experiment try: experiment.share(algorithms_infos=algorithms_infos) except ShareError as e: return Response(e.errors, status=400) report.status = Report.PUBLISHED report.publication_date = datetime.now() report.expiration_date = None report.save() return Response(status=status.HTTP_204_NO_CONTENT)
# ----------------------------------------------------------
[docs]class ReportAddExperimentsView(BaseReportActionView): permission_classes = BaseReportActionView.permission_classes + [IsEditable]
[docs] def post(self, request, owner_name, report_name): report = self.get_queryset() details = report.add_experiments(request.data["experiments"]) if details["success"]: if ( (len(details["unknown_experiments"]) == 0) and (len(details["inaccessible_experiments"]) == 0) and (len(details["incompatible_experiments"]) == 0) ): return Response(status=status.HTTP_204_NO_CONTENT) result = {} if len(details["unknown_experiments"]) > 0: result["unknown_experiments"] = details["unknown_experiments"] if len(details["inaccessible_experiments"]) > 0: result["inaccessible_experiments"] = details["inaccessible_experiments"] if len(details["incompatible_experiments"]) > 0: result["incompatible_experiments"] = details["incompatible_experiments"] return Response(result) else: if "error" in details: return BadRequestResponse({"error": details["error"]}) elif "common_analyzers" in details: result = {"common_analyzers": details["common_analyzers"]} return BadRequestResponse(result) return BadRequestResponse()
# ----------------------------------------------------------
[docs]class ReportRemoveExperimentsView(BaseReportActionView): permission_classes = BaseReportActionView.permission_classes + [IsEditable]
[docs] def post(self, request, owner_name, report_name): report = self.get_queryset() report.remove_experiments(request.data["experiments"]) return Response(status=status.HTTP_204_NO_CONTENT)
# ----------------------------------------------------------
[docs]class ReportAlgorithmsView(views.APIView): model = Report permission_classes = [IsAuthor, permissions.IsAuthenticated]
[docs] def get_queryset(self): owner_name = self.kwargs.get("owner_name") report_name = self.kwargs.get("report_name") report = get_object_or_404( Report, author__username=owner_name, name=report_name ) self.check_object_permissions(self.request, report) return report
[docs] def get(self, request, owner_name=None, report_name=None, number=None): report = self.get_queryset() all_algorithms = [] for experiment in report.experiments.iterator(): all_algorithms.extend( experiment.referenced_algorithms.exclude( sharing=Shareable.PUBLIC ).filter(author=request.user) ) all_algorithms = list(set(map(lambda x: x.fullname(), all_algorithms))) return Response(all_algorithms)
# ----------------------------------------------------------
[docs]class ReportResultsView(CommonContextMixin, generics.RetrieveAPIView): model = Report serializer_class = ExperimentResultsSerializer permission_classes = [permissions.AllowAny]
[docs] def get_queryset(self): report = get_object_or_404(Report, number=int(self.kwargs.get("number"))) self.check_object_permissions(self.request, report) return report
[docs] def get(self, request, owner_name=None, report_name=None, number=None): report = self.get_queryset() results = {} report_content = json.loads(report.content) if "alias_experiments" not in report_content: report_content["alias_experiments"] = {} for experiment in report.experiments.iterator(): serializer = self.get_serializer( experiment, fields=["results", "blocks_status", "execution_info", "declaration"], ) if experiment.fullname() not in report_content["alias_experiments"]: report_content["alias_experiments"][ experiment.fullname() ] = experiment.name if report.status == Report.LOCKED: results[ report_content["alias_experiments"][experiment.fullname()] ] = serializer.data else: results[experiment.fullname()] = serializer.data return Response(results)
# ----------------------------------------------------------
[docs]class ReportResultsAllExperimentsView(CommonContextMixin, generics.RetrieveAPIView): model = Report serializer_class = ExperimentResultsSerializer permission_classes = [permissions.IsAuthenticated, IsAuthor]
[docs] def get_queryset(self): owner_name = self.kwargs.get("owner_name") report_name = self.kwargs.get("report_name") report = get_object_or_404( Report, author__username=owner_name, name=report_name ) self.check_object_permissions(self.request, report) return report
[docs] def get(self, request, owner_name=None, report_name=None, number=None): report = self.get_queryset() results = {} report_content = json.loads(report.content) if "alias_experiments" not in report_content: report_content["alias_experiments"] = {} for experiment in report.experiments.iterator(): serializer = self.get_serializer( experiment, fields=["results", "blocks_status", "execution_info", "declaration"], ) if experiment.fullname() not in report_content["alias_experiments"]: report_content["alias_experiments"][ experiment.fullname() ] = experiment.name if report.status == Report.LOCKED: results[ report_content["alias_experiments"][experiment.fullname()] ] = serializer.data else: results[experiment.fullname()] = serializer.data return Response(results)
# ----------------------------------------------------------
[docs]class ReportRSTCompileView(BaseReportActionView): permission_classes = BaseReportActionView.permission_classes + [IsEditable]
[docs] def post(self, request, owner_name, report_name): report = self.get_queryset() result = {} result["html_str"] = report.compileTextItem(request.data["raw"]) return Response(result)
# ----------------------------------------------------------
[docs]class ReportRSTCompileAnonView(views.APIView): permission_classes = [permissions.AllowAny]
[docs] def get_queryset(self): number = self.kwargs.get("number") report = get_object_or_404(Report, number=int(number)) self.check_object_permissions(self.request, report) return report
[docs] def post(self, request, number): report = self.get_queryset() result = {} result["html_str"] = report.compileTextItem(request.data["raw"]) return Response(result)