Source code for beat.web.common.models

#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.web module of the BEAT platform.              #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################

from django.db import models
from django.conf import settings
from django.db.models import Q
from django.contrib.auth.models import User, AnonymousUser

import beat.core.hash
from beat.core.utils import NumpyJSONEncoder

from .texts import Messages
from ..team.models import Team
from ..ui.templatetags.gravatar import gravatar_url

from . import storage

from .exceptions import NotUserNorTeam
from .signals import shared
from .utils import ensure_string

import os
import re
import simplejson
from collections import OrderedDict


#----------------------------------------------------------


[docs]class ShareableManager(models.Manager):
[docs] def for_user(self, user, add_public=False): if user.is_anonymous(): return self.public() query = Q(sharing=Shareable.SHARED) &\ (Q(shared_with=user) |\ Q(shared_with_team__in=Team.objects.filter(members=user))) if hasattr(self.model, 'author'): query |= Q(author=user) if add_public: query |= Q(sharing=Shareable.PUBLIC) return self.filter(query).distinct()
[docs] def public(self): return self.filter(sharing=Shareable.PUBLIC)
#----------------------------------------------------------
[docs]class Shareable(models.Model): #_____ Constants __________ PRIVATE = 'P' SHARED = 'S' PUBLIC = 'A' USABLE = 'U' SHARING_STATUS = ( (PRIVATE, 'Private'), (SHARED, 'Shared'), (PUBLIC, 'Public'), (USABLE, 'Usable'), # Not applicable for all contribution types, must be # checked for each case! ) #_____ Fields __________ sharing = models.CharField(max_length=1, choices=SHARING_STATUS, default=PRIVATE) shared_with = models.ManyToManyField(User, related_name='shared_%(class)ss', blank=True) shared_with_team = models.ManyToManyField(Team, related_name='shared_%(class)ss', blank=True) objects = ShareableManager() #_____ Meta parameters __________
[docs] class Meta: abstract = True
#_____ Utilities __________
[docs] def model_name(self): return type(self).__name__.lower()
#_____ Overrides __________ def __str__(self): return self.fullname()
[docs] def save(self, *args, **kwargs): super(Shareable, self).save(*args, **kwargs) # Makes sure the user has not set something weird if self.sharing == Shareable.PUBLIC: self.shared_with.clear()
#_____ Methods __________
[docs] def get_verbose_name(self): return self._meta.verbose_name
[docs] def get_verbose_name_plural(self): return self._meta.verbose_name_plural
[docs] def modifiable(self): if hasattr(self, 'attestations'): return (self.attestations.count() == 0) return True
[docs] def deletable(self): if hasattr(self, 'attestations'): return (self.attestations.count() == 0) return True
[docs] def accessibility_for(self, user_or_team, without_usable=False): """Returns a tuple (<has_access>, <accessibility>), with <accessibility> being either 'public', 'private', 'confidential' """ if isinstance(user_or_team, User) or isinstance(user_or_team, AnonymousUser): return self._accessibility_for_user(user_or_team, without_usable) elif isinstance(user_or_team, Team): return self._accessibility_for_team(user_or_team, without_usable) else: raise NotUserNorTeam
[docs] def is_accessible(self, users=None, teams=None): errors = [] if self.sharing == Shareable.PUBLIC: return errors # Accessible to some users if users is not None: # Retrieve the users that don't have enough access to the algorithm users_with_better_access = [] if hasattr(self, 'author'): users_with_better_access.append(self.author.username) users_with_better_access.extend(self.shared_with.all().values_list('username', flat=True)) users_with_better_access.extend([user for user in users if self.shared_with_team.filter(members__username=user).exists()]) missing_users = [user for user in users if user not in users_with_better_access] if len(missing_users) > 0: errors.append("The {0} '{1}' isn't accessible to the following users: {2}".format(self.model_name(), self.fullname(), ', '.join(missing_users))) if teams is not None: # Retrieve the teams that don't have enough access to the algorithm teams_with_better_access = [team.fullname() for team in self.shared_with_team.all()] missing_teams = [team for team in teams if team not in teams_with_better_access] if len(missing_teams) > 0: errors.append("The {0} '{1}' isn't accessible to the following teams: {2}".format(self.model_name(), self.fullname(), ', '.join(missing_teams))) # Accessible to everybody if users is None and teams is None: if self.sharing != Shareable.PUBLIC: errors.append("The {0} '{1}' isn't Accessible to all".format(self.model_name(), self.fullname())) return errors
[docs] def share(self, users=None, teams=None): if self.sharing == Shareable.PUBLIC: return sharing = self.sharing # Accessible to some users if users is not None: # Filter out the users that already have this sharing access or better users_with_better_access = [] if hasattr(self, 'author'): users_with_better_access.append(self.author.username) users_with_better_access.extend(self.shared_with.all().values_list('username', flat=True)) users = [user for user in users if user not in users_with_better_access] # Update the database entry db_users = User.objects.filter(username__in=users) for db_user in db_users: self.shared_with.add(db_user) if self.sharing == Shareable.PRIVATE: sharing = Shareable.SHARED if (teams is not None) and (len(teams) > 0): # 'teams' might be a list of names or a list of DB objects if isinstance(teams[0], Team): # Filter out the teams that already have this sharing access or better teams_with_better_access = self.shared_with_team.all() db_teams = [team for team in teams if team not in teams_with_better_access] # Update the database entry for db_team in db_teams: self.shared_with_team.add(db_team) else: # Filter out the teams that already have this sharing access or better teams_with_better_access = [team.fullname() for team in self.shared_with_team.all()] teams = [team for team in teams if team not in teams_with_better_access] # Update the database entry for team_name in teams: parts = team_name.split('/') if len(parts) == 1: parts = [self.author.username, team_name] db_team = Team.objects.filter(owner__username=parts[0], name=parts[1]) if len(db_team) == 1: self.shared_with_team.add(db_team[0]) if self.sharing == Shareable.PRIVATE: sharing = Shareable.SHARED # Accessible to everybody if users is None and teams is None: # Update the database entry sharing = Shareable.PUBLIC self._update_sharing(sharing, users, teams)
[docs] def sharing_preferences(self): result = { 'status': self.get_sharing_display().lower() } if self.shared_with.count() > 0: result['shared_with'] = self.shared_with.all().values_list('username', flat=True) if self.shared_with_team.count() > 0: result['shared_with_team'] = [team.fullname() for team in self.shared_with_team.all()] return result
[docs] def all_shared_with_users(self): '''Returns a list of users this object is shared with''' users = set(self.shared_with.exclude(username__in=settings.ACCOUNTS_TO_EXCLUDE_FROM_TEAMS).distinct()) teams = set(User.objects.filter(teams__in=self.shared_with_team.all()).exclude(username__in=settings.ACCOUNTS_TO_EXCLUDE_FROM_TEAMS)) return users | teams
[docs] def users_with_access(self): '''Returns a set of users that have access to this environment''' if self.sharing == Shareable.PUBLIC: return set(User.objects.filter(is_active=True).exclude(username__in=settings.ACCOUNTS_TO_EXCLUDE_FROM_TEAMS).distinct()) elif self.sharing == Shareable.SHARED: return self.all_shared_with_users() return set()
#_____ Protected Methods __________ def _accessibility_for_user(self, user, without_usable=False): """User specific accessibility check """ if hasattr(self, 'author') and self.author == user: if self.sharing == Shareable.PRIVATE: return (True, 'private') elif self.sharing == Shareable.PUBLIC: return (True, 'public') else: return (True, 'confidential') else: if self.sharing == Shareable.PRIVATE: return (False, None) elif self.sharing == Shareable.PUBLIC: return (True, 'public') elif not user.is_anonymous(): if self.shared_with.filter(id=user.id).exists() or (self.shared_with_team.filter(members=user).count() > 0): return (True, 'confidential') return (False, None) def _accessibility_for_team(self, team, without_usable=False): """Team specific accessibility check """ if self.sharing == Shareable.PRIVATE: return (False, None) elif self.sharing == Shareable.PUBLIC: return (True, 'public') elif self.shared_with_team.filter(id=team.id).exists(): return (True, 'confidential') return (False, None) def _update_sharing(self, sharing, users, teams): if sharing != self.sharing: self.sharing = sharing self.save() shared.send(sender=self, users=users, teams=teams)
#----------------------------------------------------------
[docs]class VersionableManager(ShareableManager):
[docs] def is_last_version(self, object): return not self.filter(name=object.name, version__gt=object.version).exists()
#----------------------------------------------------------
[docs]class Versionable(Shareable): #_____ Fields __________ name = models.CharField(max_length=200, help_text=Messages['name'], blank=False) version = models.PositiveIntegerField(default=1, help_text=Messages['version']) short_description = models.CharField(max_length=100, default='', blank=True, help_text=Messages['short_description']) creation_date = models.DateTimeField('Creation date', auto_now_add=True) hash = models.CharField(max_length=64, editable=False, help_text=Messages['hash']) previous_version = models.ForeignKey('self', related_name='next_versions', null=True, blank=True, ) fork_of = models.ForeignKey('self', related_name='forks', null=True, blank=True, ) objects = VersionableManager() #_____ Meta parameters __________
[docs] class Meta(Shareable.Meta): abstract = True ordering = ['name', '-version']
#_____ Static Methods __________
[docs] @staticmethod def sanitize_name(name): """Makes sure that the name is valid""" return re.sub(r'[^\x00-\x7f]|\W', r'-', name)
[docs] @staticmethod def filter_latest_versions(versionables): result = [] for versionable in versionables: try: entry = [item for item in result if item.fullname().startswith('%s' % versionable.name)][0] if entry.version >= versionable.version: continue else: result.remove(entry) except: pass result.append(versionable) return result
#_____ Overrides __________
[docs] def delete(self, *args, **kwargs): for next_version in self.next_versions.iterator(): next_version.previous_version = self.previous_version next_version.save() for fork in self.forks.iterator(): fork.fork_of = self.previous_version fork.save() super(Versionable, self).delete(*args, **kwargs)
[docs] def modifiable(self): return super(Versionable, self).modifiable() and (self.next_versions.count() == 0)
[docs] def history(self, for_user): """Calculates its own history and returns it in a dictionary. The history shows all related objects which are accessible either forwards or backwards to this own. Accessibility is calculated based on the user passed. The user can be anonymous. """ def _process(obj, forward_only=True): """Recursive function to build the history starting from a leaf""" # First retrieve all accessible versions of the versionable if hasattr(obj, 'author'): versions = self.__class__.objects.for_user(for_user, True).filter(author=obj.author, name=obj.name).order_by('version') else: versions = self.__class__.objects.for_user(for_user, True).filter(name=obj.name).order_by('version') if versions.count() == 0: return None # Next search for the very first version of the history first_version = versions[0] if not(forward_only) and (first_version.fork_of is not None): history = _process(first_version.fork_of, forward_only=False) if history is not None: return history # Construct a tree from the first version history = {'object': first_version, 'next': []} for fork in first_version.forks.iterator(): fork_history = _process(fork) if fork_history is not None: history['next'].append(fork_history) previous_version = history for version in versions[1:]: version_dict = {'object': version, 'next': []} for fork in version.forks.iterator(): fork_history = _process(fork) if fork_history is not None: version_dict['next'].append(fork_history) previous_version['next'].append(version_dict) previous_version = version_dict return history return _process(self, forward_only=False)
[docs] def api_history(self, for_user): """The same as history(), but providing an implementation compatible with our V1 API""" def _recurse(d): o = d['object'] d['name'] = o.fullname() d['creation_date'] = o.creation_date.isoformat(' ') d['author_gravatar'] = gravatar_url(o.author.email) if hasattr(o, 'author') else None del d['object'] d['next'] = [_recurse(k) for k in d['next']] return d return _recurse(self.history(for_user))
[docs] def json_history(self, for_user): """The same as API history, but serializes the result into a JSON""" return simplejson.dumps(self.api_history(for_user))
#----------------------------------------------------------
[docs]class ContributionManager(VersionableManager):
[docs] def get_by_natural_key(self, username, name, version): return self.get(author__username=username, name=name, version=version)
[docs] def is_last_version(self, object): return not self.filter(author=object.author, name=object.name, version__gt=object.version).exists()
[docs] def from_author(self, user, author_name, add_public=False): if user.is_anonymous(): objects_for_user = self.public().filter(author__username=author_name) elif user.username == author_name: # Retrieve all the objects of the specified user objects_for_user = self.for_user(user, add_public) else: teams = Team.objects.filter(members=user) objects_for_user = self.filter(Q(author__username=author_name) & (Q(sharing=Contribution.PUBLIC)| Q(shared_with=user) | Q(shared_with_team__in=teams))).distinct() return objects_for_user.order_by('author__username', 'name', '-version').select_related()
[docs] def from_author_and_public(self, user, author_name): return self.from_author(user, author_name, True)
#----------------------------------------------------------
[docs]class Contribution(Versionable): #_____ Fields __________ author = models.ForeignKey(User, related_name='%(class)ss', on_delete=models.CASCADE) objects = ContributionManager() #_____ Meta parameters __________
[docs] class Meta(Versionable.Meta): abstract = True ordering = ['author__username', 'name', 'version'] unique_together = ('author', 'name', 'version')
#_____ Utilities __________
[docs] def natural_key(self): return (self.author.username, self.name, self.version)
#_____ Methods __________
[docs] def fullname(self): return '%s/%s/%d' % (self.author.username, self.name, self.version)
#_____ Static Methods __________
[docs] @staticmethod def filter_latest_versions(contributions): result = [] for contribution in contributions: try: entry = [item for item in result if item.fullname().startswith('%s/%s/' % (contribution.author.username, contribution.name))][0] if entry.version >= contribution.version: continue else: result.remove(entry) except: pass result.append(contribution) return result
#----------------------------------------------------------
[docs]class StoredContributionManager(ContributionManager):
[docs] def create_object(self, author, name, short_description='', description='', declaration=None, version=1, previous_version=None, fork_of=None): create = getattr(self, 'create_{}'.format(self.model.__name__.lower())) return create(author=author, name=name, short_description=short_description, description=description, declaration=declaration, version=version, previous_version=previous_version, fork_of=fork_of)
#----------------------------------------------------------
[docs]def get_contribution_declaration_filename(obj, path): return obj.declaration_filename()
[docs]def get_contribution_description_filename(obj, path): return obj.description_filename()
#---------------------------------------------------------- # Use those function to add a 'declaration' and a 'declaration_string' property to a # model, by doing: # declaration = property(beat.web.common.models.get_declaration, # beat.web.common.models.set_declaration) # declaration_string = property(beat.web.common.models.get_declaration_string)
[docs]def set_declaration(instance, value): if isinstance(value, dict): value = simplejson.dumps(value, indent=4, cls=NumpyJSONEncoder) storage.set_file_content(instance, 'declaration_file', instance.declaration_filename(), value)
[docs]def get_declaration(instance): return simplejson.loads(storage.get_file_content(instance, 'declaration_file'), object_pairs_hook=OrderedDict)
[docs]def get_declaration_string(instance): data = storage.get_file_content(instance, 'declaration_file') return ensure_string(data)
#---------------------------------------------------------- # Use those function to add a 'description' property to a model, by doing: # description = property(beat.web.common.models.get_description, # beat.web.common.models.set_description)
[docs]def set_description(instance, value): storage.set_file_content(instance, 'description_file', instance.description_filename(), value)
[docs]def get_description(instance): return storage.get_file_content(instance, 'description_file')
#----------------------------------------------------------
[docs]class StoredContribution(Contribution): #_____ Fields __________ # For technical reason, it is not possible to declare the required fields here. They # must be declared in each subclass of StoredContribution, like this: # # declaration_file = models.FileField( # storage=AppropriateStorageClass(), # upload_to=get_contribution_declaration_filename, # blank=True, null=True, # max_length=200, # db_column='declaration' # ) # # description_file = models.FileField( # storage=AppropriateStorageClass(), # upload_to=get_contribution_description_filename, # max_length=200, # blank=True, null=True, # db_column='description' # ) objects = StoredContributionManager() #_____ Meta parameters __________
[docs] class Meta(Contribution.Meta): abstract = True
#_____ Methods __________
[docs] def hashed_path(self, extension=''): """Relative path of a file belonging to the object on the respective storage""" return os.path.join( beat.core.hash.toUserPath(self.author.username), self.name, str(self.version) + extension, )
[docs] def declaration_filename(self): """Relative path of the declaration file on the storage""" return self.hashed_path('.json')
[docs] def description_filename(self): """Relative path of the description file on the storage""" return self.hashed_path('.rst')
#_____ Overrides __________
[docs] def save(self, *args, **kwargs): # Save the changed files (if necessary) storage.save_files(self) # Invoke the base implementation super(StoredContribution, self).save(*args, **kwargs)
#_____ Properties __________ description = property(get_description, set_description) declaration = property(get_declaration, set_declaration) declaration_string = property(get_declaration_string)