Completed
Push — master ( 576afd...c4beda )
by
unknown
01:15
created

GroupMemberViewSet.can_rank()   C

Complexity

Conditions 8

Size

Total Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
dl 0
loc 18
c 1
b 0
f 0
rs 6.6666
cc 8

1 Method

Rating   Name   Duplication   Size   Complexity  
A GroupMemberViewSet.can_promote_admin_or_superadmin() 0 3 1
1
from django.http import Http404
2
from django.db.models import Prefetch, Q
3
4
from rest_framework import viewsets, decorators, status, mixins
5
from rest_framework.response import Response
6
from rest_framework.permissions import IsAuthenticated
7
from rest_framework.filters import BaseFilterBackend
8
9
from sigma_core.models.user import User
10
from sigma_core.models.group import Group, GroupAcknowledgment
11
from sigma_core.models.group_member import GroupMember
12
from sigma_core.serializers.group_member import GroupMemberSerializer
13
14
15
class GroupMemberFilterBackend(BaseFilterBackend):
16
    filter_q = {
17
        'user': lambda u: Q(user=u),
18
        'group': lambda g: Q(group=g)
19
    }
20
21
    def filter_queryset(self, request, queryset, view):
22
        """
23
        Limits all list requests w.r.t the Normal Rules of Visibility.
24
        """
25
        if not request.user.is_sigma_admin():
26
            invited_to_groups_ids = request.user.invited_to_groups.all().values_list('id', flat=True)
27
            user_groups_ids = request.user.memberships.filter(is_accepted=True).values_list('group_id', flat=True)
28
            # I can see a GroupMember if one of the following conditions is met:
29
            #  - I am member of the group
30
            #  - I am invited to the group
31
            #  - (the group is public OR acknowledged by one of my groups) AND I can see the user w.r.t. NRVU
32
            queryset = queryset.prefetch_related(
33
                Prefetch('user__memberships', queryset=GroupMember.objects.filter(is_accepted=True)),
34
                Prefetch('group__group_parents', queryset=GroupAcknowledgment.objects.filter(validated=True))
35
            ).filter(Q(user_id=request.user.id) | Q(group_id__in=user_groups_ids) | Q(group_id__in=invited_to_groups_ids) | (
36
                (Q(group__is_private=False) | Q(group__group_parents__id__in=user_groups_ids)) &
37
                    Q(user__memberships__group_id__in=user_groups_ids)
38
            ))
39
40
        for (param, q) in self.filter_q.items():
41
            x = request.query_params.get(param, None)
42
            if x is not None:
43
                queryset = queryset.filter(q(x))
44
45
        return queryset.distinct()
46
47
48
class GroupMemberViewSet(viewsets.ModelViewSet):
49
    queryset = GroupMember.objects.select_related('group', 'user')
50
    serializer_class = GroupMemberSerializer
51
    permission_classes = [IsAuthenticated, ]
52
    filter_backends = (GroupMemberFilterBackend, )
53
54
    def list(self, request):
55
        """
56
        ---
57
        parameters_strategy:
58
            query: replace
59
        parameters:
60
            - name: user
61
              type: integer
62
              paramType: query
63
            - name: group
64
              type: integer
65
              paramType: query
66
        """
67
        return super().list(request)
68
69
    def create(self, request):
70
        serializer = GroupMemberSerializer(data=request.data)
71
        if not serializer.is_valid():
72
            return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
73
74
        if request.data.get('user_id', None) != request.user.id and not request.user.is_sigma_admin():
75
            return Response('You cannot add someone else to a group', status=status.HTTP_403_FORBIDDEN)
76
77
        try:
78
            group = Group.objects.get(pk=request.data.get('group_id', None))
79
        except Group.DoesNotExist:
80
            return Response(status=status.HTTP_404_NOT_FOUND)
81
82
        if not group.can_anyone_join:
83
            return Response('You cannot join this group without an invitation', status=status.HTTP_403_FORBIDDEN)
84
85
        mem = serializer.save()
86
        return Response(serializer.data, status=status.HTTP_201_CREATED)
87
88
    def destroy(self, request, pk=None):
89
        from sigma_core.models.group import Group
90
        try:
91
            modified_mship = GroupMember.objects.all().select_related('group').get(pk=pk)
92
            group = modified_mship.group
93
            my_mship = GroupMember.objects.all().get(group=group, user=request.user)
94
        except GroupMember.DoesNotExist:
95
            raise Http404()
96
97
        # You can always quit the Group (ie destroy YOUR membership)
98
        if my_mship.id != modified_mship.id:
99
            # Can't modify a admin if you're not superadmin
100
            if not my_mship.is_super_administrator and modified_mship.is_administrator:
101
                return Response(status=status.HTTP_403_FORBIDDEN)
102
103
            # Check permission
104
            if not my_mship.can_kick:
105
                return Response(status=status.HTTP_403_FORBIDDEN)
106
107
        modified_mship.delete()
108
        return Response(status=status.HTTP_204_NO_CONTENT)
109
110
    def can_modify_basic_rights(self, request, modified_mship, my_mship):
111
        #trights to become superadmin or admin are not concerned
112
        if my_mship.is_super_administrator:
113
            return True
114
115
        if my_mship.is_administrator and not modified_mship.is_administrator:
116
            return True
117
118
        return False
119
120
    def can_promote_admin_or_superadmin(self,request,modified_mship,my_mship):
121
        #only a super_administrator can do that
122
        return my_mship.is_super_administrator
123
124
    # def can_be_contacted(self,request,modified_mship,my_mship):
125
    #
126
    #     if my_mship.id == modified_mship.id:
127
    #         return True
128
    #
129
    #     if my_mship.is_administrator:
130
    #         return True
131
132
133
    @decorators.detail_route(methods=['put'])
134
    def give_rights(self, request, pk=None):
135
        """
136
        Change the rights of a member of the group pk.
137
        ---
138
        request_serializer: null
139
        response_serializer: GroupMemberSerializer
140
        parameters_strategy:
141
            form: replace
142
        parameters:
143
            - name: right_to_change
144
              type: string
145
              possible values : is_administrator, is_super_administrator,
146
              can_invite, can_be_contacted, can_kick, can_modify_group_infos,
147
              can_publish
148
              required: true
149
        """
150
        from sigma_core.models.group import Group
151
        try:
152
            modified_mship = GroupMember.objects.all().select_related('group').get(pk=pk)
153
            my_mship = GroupMember.objects.all().get(group=modified_mship.group, user=request.user)
154
        except GroupMember.DoesNotExist:
155
            raise Http404()
156
157
        right_to_change = request.data.get('right_to_change', None)
158
159
        if my_mship.can_modify_basic_rights and right_to_change in basic_rights_string:
160
            if right_to_change=="can_invite":
161
                modified_mship.can_invite= not modified_mship.can_invite
162
            elif right_to_change=="can_kick":
163
                modified_mship.can_kick= not modified_mship.can_kick
164
            elif right_to_change=="can_publish":
165
                modified_mship.can_publish= not modified_mship.can_publish
166
            elif right_to_change=="can_modify_group_infos":
167
                modified_mship.can_modify_group_infos= not modified_mship.can_modify_group_infos
168
            elif right_to_change=="can_be_contacted":
169
                modified_mship.can_be_contacted= not modified_mship.can_be_contacted
170
171
        elif my_mship.can_promote_admin_or_superadmin:
172
            if right_to_change=="is_administrator":
173
                modified_mship.is_administrator= not modified_mship.is_administrator
174
            else:
175
                #there can only be one super admin
176
                modified_mship.is_super_administrator=True
177
                my_mship.is_super_administrator=False
178
179
            if modified_mship.is_administrator:
180
                #we give him all the rights (except can_be_contacted, he gets to choose)
181
                modified_mship.can_invite=True
182
                modified_mship.can_kick=True
183
                modified_mship.can_publish=True
184
185
        else:
186
            return Response(status=status.HTTP_403_FORBIDDEN)
187
188
        my_mship.save()
189
        modified_mship.save()
190
191
        return Response(GroupMemberSerializer(modified_mship).data, status=status.HTTP_200_OK)
192
193
    @decorators.detail_route(methods=['put'])
194
    def accept_join_request(self, request, pk=None):
195
        """
196
        Validate a pending membership request.
197
        ---
198
        request_serializer: null
199
        response_serializer: GroupMemberSerializer
200
        """
201
        try:
202
            gm = GroupMember.objects.select_related('group').get(pk=pk)
203
        except GroupMember.DoesNotExist:
204
            raise Http404()
205
206
        if not request.user.can_accept_join_requests(gm.group):
207
            return Response(status=status.HTTP_403_FORBIDDEN)
208
209
        gm.is_accepted = True
210
        gm.save()
211
212
        # TODO: notify user of that change
213
214
        s = GroupMemberSerializer(gm)
215
        return Response(s.data, status=status.HTTP_200_OK)
216