Completed
Push — master ( 463532...ebb95c )
by
unknown
01:33 queued 37s
created

ClusterViewSet.update()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 3
rs 10
1
from django.http import Http404
2
3
from rest_framework import viewsets, decorators, status, mixins
4
from rest_framework.response import Response
5
from rest_framework.permissions import IsAuthenticated, AllowAny
6
7
from sigma_core.models.cluster import Cluster
8
from sigma_core.models.group import Group
9
from sigma_core.models.group_member import GroupMember
10
from sigma_core.serializers.cluster import BasicClusterSerializer, ClusterSerializer
11
12
13
class ClusterViewSet(mixins.CreateModelMixin,   # Only sigma admins
14
                    mixins.ListModelMixin,      # Everyone (even if not authed)
15
                    mixins.RetrieveModelMixin,  # Everyone (even if not authed)
16
                    mixins.UpdateModelMixin,    # Only sigma admins
17
                    mixins.DestroyModelMixin,   # Only sigma admins
18
                    viewsets.GenericViewSet):
19
    queryset = Cluster.objects.all()
20
    serializer_class = ClusterSerializer
21
    permission_classes = [IsAuthenticated, ]
22
23
    def only_staff(func):
24
        def func_wrapper(self, request, *args, **kwargs):
25
            if not request.user.is_authenticated() or not request.user.is_sigma_admin():
26
                return Response(status=status.HTTP_403_FORBIDDEN)
27
            return func(self, request, *args, **kwargs)
28
        return func_wrapper
29
30
    def restrict_queryset_to_administrated_clusters(func):
31
        def func_wrapper(self, request, *args, **kwargs):
32
            if not request.user.is_sigma_admin():
33
                self.queryset = self.queryset.filter(pk__in=GroupMember.objects.filter(user=request.user, perm_rank=Group.ADMINISTRATOR_RANK).values_list('group', flat=True))
34
            return func(self, request, *args, **kwargs)
35
        return func_wrapper
36
37
    @only_staff
38
    def create(self, request):
39
        return super().create(request)
40
41
    def list(self, request):
42
        self.serializer_class = BasicClusterSerializer
43
        return super().list(request)
44
45
    def retrieve(self, request, pk=None):
46
        if not request.user.is_authenticated() or (not request.user.is_sigma_admin() and not request.user.clusters.filter(pk=pk).exists()):
47
            self.serializer_class = BasicClusterSerializer
48
        return super().retrieve(request, pk=pk)
49
50
    @restrict_queryset_to_administrated_clusters
51
    def update(self, request, pk=None):
52
        return super().update(request, pk=pk)
53
54
    @only_staff
55
    def destroy(self, request, pk=None):
56
        return super().destroy(request, pk=pk)
57
58
    def get_permissions(self):
59
        if self.action == 'list' or self.action == 'retrieve':
60
            self.permission_classes = [AllowAny, ]
61
        return super().get_permissions()
62