ClusterViewSet.only_staff()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
dl 0
loc 6
c 1
b 0
f 0
rs 9.2
cc 4

1 Method

Rating   Name   Duplication   Size   Complexity  
A ClusterViewSet.func_wrapper() 0 4 2
1
from rest_framework import viewsets, decorators, status, mixins
2
from rest_framework.response import Response
3
from rest_framework.permissions import IsAuthenticated, AllowAny
4
5
from sigma_core.models.cluster import Cluster
6
from sigma_core.models.group import Group
7
from sigma_core.models.group_member import GroupMember
8
from sigma_core.serializers.cluster import BasicClusterSerializer, ClusterSerializer
9
10
11
class ClusterViewSet(mixins.CreateModelMixin,   # Only sigma admins
12
                    mixins.ListModelMixin,      # Everyone (even if not authed)
13
                    mixins.RetrieveModelMixin,  # Everyone (even if not authed)
14
                    mixins.UpdateModelMixin,    # Only sigma admins
15
                    mixins.DestroyModelMixin,   # Only sigma admins
16
                    viewsets.GenericViewSet):
17
    queryset = Cluster.objects.all()
18
    serializer_class = BasicClusterSerializer
19
    permission_classes = [IsAuthenticated, ]
20
21
    def only_staff(func):
22
        def func_wrapper(self, request, *args, **kwargs):
23
            if not request.user.is_authenticated() or not request.user.is_sigma_admin():
24
                return Response(status=status.HTTP_403_FORBIDDEN)
25
            return func(self, request, *args, **kwargs)
26
        return func_wrapper
27
28
    def restrict_queryset_to_administrated_clusters(func):
29
        def func_wrapper(self, request, *args, **kwargs):
30
            if not request.user.is_sigma_admin():
31
                self.queryset = self.queryset.filter(pk__in=GroupMember.objects.filter(user=request.user, perm_rank=Group.ADMINISTRATOR_RANK).values_list('group', flat=True))
32
            return func(self, request, *args, **kwargs)
33
        return func_wrapper
34
35
    def get_permissions(self):
36
        if self.action == 'list' or self.action == 'retrieve':
37
            self.permission_classes = [AllowAny, ]
38
        return super().get_permissions()
39
40
    def retrieve(self, request, pk=None):
41
        if request.user.is_authenticated() and (request.user.is_sigma_admin() or request.user.clusters.filter(pk=pk).exists()):
42
            self.serializer_class = ClusterSerializer
43
        return super().retrieve(request, pk=pk)
44
45
    @only_staff
46
    def create(self, request):
47
        return super().create(request)
48
49
    @restrict_queryset_to_administrated_clusters
50
    def update(self, request, pk=None):
51
        return super().update(request, pk=pk)
52
53
    @only_staff
54
    def destroy(self, request, pk=None):
55
        return super().destroy(request, pk=pk)
56