test_get_mship_invited()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 2
rs 10
cc 1
1
import json
2
3
from django.core import mail
4
5
from rest_framework import status
6
from rest_framework.test import APITestCase
7
8
from sigma_core.models.user import User
9
from sigma_core.models.group import Group
10
from sigma_core.models.group_member import GroupMember
11
from sigma_core.serializers.user import UserSerializer
12
from sigma_core.tests.factories import UserFactory, AdminUserFactory, GroupFactory, GroupAcknowledgmentFactory, GroupMemberFactory, ClusterFactory
13
14
15
class GroupMemberVisibilityTests(APITestCase):
16
    @classmethod
17
    def setUpTestData(self):
18
        # Summary: 1 Sigma admin + 10 users, 4 groups, 2 clusters
19
        super().setUpTestData()
20
21
        self.users = [AdminUserFactory()] + UserFactory.create_batch(10)
22
        self.clusters = ClusterFactory.create_batch(2)
23
        self.groups = [None] + GroupFactory.create_batch(4, is_private=True)
24
25
        # Group #2 is public
26
        self.groups[2].is_private = False
27
        self.groups[2].save()
28
29
        # Add users in clusters (user #1 is admin in cluster #1 and user #6 is admin in cluster #2)
30
        for i in range(5):
31
            self.clusters[0].cluster_users.add(self.users[1+i])
32
            GroupMemberFactory(group=self.clusters[0], user=self.users[1+i], perm_rank=(Group.ADMINISTRATOR_RANK if i == 0 else 1))
33
34
            self.clusters[1].cluster_users.add(self.users[6+i])
35
            GroupMemberFactory(group=self.clusters[1], user=self.users[6+i], perm_rank=(Group.ADMINISTRATOR_RANK if i == 0 else 1))
36
37
        # Add users to group
38
        self.mships = [
39
            GroupMemberFactory(user=self.users[2], group=self.groups[1], perm_rank=1),
40
            GroupMemberFactory(user=self.users[3], group=self.groups[2], perm_rank=1),
41
            GroupMemberFactory(user=self.users[6], group=self.groups[2], perm_rank=1),
42
            GroupMemberFactory(user=self.users[6], group=self.groups[3], perm_rank=1),
43
            GroupMemberFactory(user=self.users[8], group=self.groups[3], perm_rank=1),
44
            GroupMemberFactory(user=self.users[4], group=self.groups[4], perm_rank=1),
45
            GroupMemberFactory(user=self.users[7], group=self.groups[4], perm_rank=1),
46
            GroupMemberFactory(user=self.users[9], group=self.groups[4], perm_rank=0), # pending request
47
        ]
48
49
        # User #5 is invited to group #2
50
        self.users[5].invited_to_groups.add(self.groups[2])
51
52
        # GroupAcknowledgments
53
        GroupAcknowledgmentFactory(parent_group=self.clusters[0].group_ptr, subgroup=self.groups[1], validated=True)
54
55
        # Misc
56
        self.mship_url = "/group-member/?"
57
58
    def try_get(self, reqId=-1, uid=-1, gid=-1, expectedStatus=status.HTTP_200_OK, expectedLength=1):
59
        """
60
        This function attempts to get the membership of user uid in group gid using reqId permissions
61
        GET /group-member/?user={uid}&group={gid}
62
        """
63
        if reqId >= 0: # use -1 for unauthed user
64
            self.client.force_authenticate(user=self.users[reqId])
65
        url = self.mship_url
66
        if uid >= 0:
67
            url = url + ("user=%d&" % self.users[uid].id)
68
        if gid >= 0:
69
            url = url + ("group=%d" % self.groups[gid].id)
70
        response = self.client.get(url)
71
        self.assertEqual(response.status_code, expectedStatus)
72
        if expectedStatus == status.HTTP_200_OK:
73
            self.assertEqual(len(response.data), expectedLength)
74
        return response
75
76
    def test_get_mship_unauthed(self):
77
        # Client is unauthed
78
        self.try_get(reqId=-1, uid=1, gid=1, expectedStatus=status.HTTP_401_UNAUTHORIZED)
79
80
    def test_get_mship_groupmember(self):
81
        # Client is member of the group he requests
82
        self.try_get(reqId=4, gid=4, expectedLength=3)
83
        self.try_get(reqId=9, gid=4, expectedLength=1)
84
85
    def test_get_mship_invited(self):
86
        self.try_get(reqId=5, gid=2, expectedLength=2)
87
88
    def test_get_mship_public(self):
89
        self.try_get(reqId=1, gid=2, expectedLength=1)
90
91
    def test_get_mship_acknowledged(self):
92
        self.try_get(reqId=3, gid=1)
93
94
    def test_get_mship_private(self):
95
        resp = self.try_get(reqId=2, gid=4, expectedLength=0)
96
97
    def test_get_mship_invisibleuser(self):
98
        self.try_get(reqId=1, uid=6, gid=2, expectedLength=0)
99
100
    def test_get_mship_private_but_admin(self):
101
        self.try_get(reqId=0, uid=7, gid=4)
102
103
104
# Test /rank, /kick
105
class GroupMemberPermissionTests(APITestCase):
106
    @classmethod
107
    def setUpTestData(self):
108
        # Summary: 6 users, 1 group, 5 mships
109
110
        super().setUpTestData()
111
112
        self.member_rank_url = "/group-member/%d/rank/"
113
        self.member_url = "/group-member/%d/"
114
        self.users = UserFactory.create_batch(6)
115
        self.group = GroupFactory(req_rank_promote=3, req_rank_demote=4, req_rank_kick=5)
116
        self.mships = [
117
            None,
118
            GroupMemberFactory(user=self.users[1], group=self.group, perm_rank=1),
119
            GroupMemberFactory(user=self.users[2], group=self.group, perm_rank=2),
120
            GroupMemberFactory(user=self.users[3], group=self.group, perm_rank=3),
121
            GroupMemberFactory(user=self.users[4], group=self.group, perm_rank=4),
122
            GroupMemberFactory(user=self.users[5], group=self.group, perm_rank=5)
123
        ]
124
125
    def try_rank(self, userId, targetId, newPermRank, expectedHttpResponseCode):
126
        """
127
        This function attempts to set $targetId membership rank to $newPermRank
128
        using $userId permissions
129
        PUT /group-member/{targetId}/rank with perm_rank=newPermRank
130
        """
131
        if userId >= 0: # use -1 for unauthed user
132
            self.client.force_authenticate(user=self.users[userId])
133
        response = self.client.put(self.member_rank_url % (self.mships[targetId].id), {"perm_rank": newPermRank})
134
        self.assertEqual(response.status_code, expectedHttpResponseCode)
135
136
    def try_delete(self, userId, targetId, expectedHttpResponseCode):
137
        """
138
        This function attempts to remove $targetId membership from Group
139
        using $userId permissions
140
        DELETE /group-member/{targetId}/
141
        """
142
        if userId >= 0:
143
            self.client.force_authenticate(user=self.users[userId])
144
        response = self.client.delete(self.member_url % targetId)
145
        self.assertEqual(response.status_code, expectedHttpResponseCode)
146
147
#### Model methods tests
148
    def test_group_member_model(self):
149
        self.assertTrue(self.mships[1].is_accepted())
150
        self.assertTrue(self.mships[5].can_kick())
151
        self.assertFalse(self.mships[4].can_kick())
152
        self.assertTrue(self.mships[1].can_invite())
153
154
#### Tests /rank
155
    def test_rank_not_authed(self):
156
        self.try_rank(-1, 1, 4, status.HTTP_401_UNAUTHORIZED)
157
158
    def test_rank_not_group_member(self):
159
        self.try_rank(0, 1, 4, status.HTTP_404_NOT_FOUND)
160
161
    def test_rank_demote_no_permission1(self):
162
        self.try_rank(1, 2, 1, status.HTTP_403_FORBIDDEN)
163
164
    def test_rank_demote_no_permission2(self):
165
        self.try_rank(3, 2, 1, status.HTTP_403_FORBIDDEN)
166
        self.assertEqual(GroupMember.objects.get(pk=self.mships[2].id).perm_rank, 2)
167
168
    def test_rank_promote_no_permission(self):
169
        self.try_rank(1, 1, 2, status.HTTP_403_FORBIDDEN)
170
171
    def test_rank_no_rank_change(self):
172
        self.try_rank(5, 1, 1, status.HTTP_400_BAD_REQUEST)
173
174
    def test_rank_not_a_number(self):
175
        self.try_rank(5, 1, "hi!", status.HTTP_400_BAD_REQUEST)
176
177
    def test_rank_set_rank_to_zero(self):
178
        self.try_rank(5, 1, 0, status.HTTP_400_BAD_REQUEST)
179
180
    def test_rank_bad_rank(self):
181
        self.try_rank(5, 1, -1, status.HTTP_400_BAD_REQUEST)
182
        self.try_rank(5, 1, Group.ADMINISTRATOR_RANK + 1, status.HTTP_400_BAD_REQUEST)
183
184
    def test_rank_too_high_rank(self):
185
        self.try_rank(5, 1, 5, status.HTTP_403_FORBIDDEN)
186
187
    def test_rank_promote_ok(self):
188
        self.try_rank(3, 1, 2, status.HTTP_200_OK)
189
        self.assertEqual(GroupMember.objects.get(pk=self.mships[1].id).perm_rank, 2)
190
191
    def test_rank_demote_ok(self):
192
        self.try_rank(5, 3, 2, status.HTTP_200_OK)
193
        self.assertEqual(GroupMember.objects.get(pk=self.mships[3].id).perm_rank, 2)
194
195
    def test_rank_demote_self_ok(self):
196
        self.try_rank(3, 3, 2, status.HTTP_200_OK)
197
        self.assertEqual(GroupMember.objects.get(pk=self.mships[3].id).perm_rank, 2)
198
199
#### Tests delete
200
    def test_delete_not_authed(self):
201
        self.try_delete(-1, 1, status.HTTP_401_UNAUTHORIZED)
202
203
    def test_delete_not_group_member(self):
204
        self.try_delete(0, 1, status.HTTP_404_NOT_FOUND)
205
206
    def test_delete_no_permission(self):
207
        self.try_delete(3, 2, status.HTTP_403_FORBIDDEN)
208
209
    def test_delete_user_not_in_group(self):
210
        self.try_delete(4, 0, status.HTTP_404_NOT_FOUND)
211
212
    def test_delete_self_ok(self):
213
        self.try_delete(1, 1, status.HTTP_204_NO_CONTENT)
214
        self.assertFalse(GroupMember.objects.filter(pk=self.mships[1].id).exists())
215
216
    def test_delete_ok(self):
217
        self.try_delete(5, 1, status.HTTP_204_NO_CONTENT)
218
        self.assertFalse(GroupMember.objects.filter(pk=self.mships[1].id).exists())
219
220
221
class OpenGroupMemberCreationTests(APITestCase):
222
    @classmethod
223
    def setUpTestData(self):
224
        super(APITestCase, self).setUpTestData()
225
226
        # Routes
227
        self.members_url = "/group-member/"
228
        self.member_url = self.members_url + "%d/"
229
230
        # Public Group, open to anyone
231
        self.group = GroupFactory()
232
        self.group.is_private = False
233
        self.group.default_member_rank = 1
234
        self.group.save()
235
236
        # Users already in group
237
        self.users = [UserFactory()]
238
        # Associated GroupMember
239
        self.group_member1 = GroupMember(user=self.users[0], group=self.group, perm_rank=Group.ADMINISTRATOR_RANK)
240
241
        # Testing user
242
        self.user = UserFactory()
243
244
        # Misc
245
        self.new_membership_data = {"group_id": self.group.id, "user_id": self.user.id}
246
247
    def test_create_not_authed(self):
248
        self.client.force_authenticate(user=None)
249
        response = self.client.post(self.members_url, self.new_membership_data)
250
        self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
251
252
    def test_create_not_for_self(self):
253
        # Attempt to add somebody else to a group
254
        self.client.force_authenticate(user=self.users[0])
255
        response = self.client.post(self.members_url, self.new_membership_data)
256
        self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
257
258
    def test_create_success(self):
259
        # Succesful attempt to join an open group
260
        self.client.force_authenticate(user=self.user)
261
        response = self.client.post(self.members_url, self.new_membership_data)
262
        self.assertEqual(response.status_code, status.HTTP_201_CREATED)
263
        self.assertEqual(response.data['perm_rank'], self.group.default_member_rank)
264
265
266
class RequestGroupMemberCreationTests(APITestCase):
267
    @classmethod
268
    def setUpTestData(self):
269
        super(APITestCase, self).setUpTestData()
270
271
        # Routes
272
        self.members_url = "/group-member/"
273
        self.accept_join_requests_url = self.members_url + "%d/accept_join_request/"
274
275
        # Group with membership request
276
        self.group = GroupFactory(default_member_rank=0, req_rank_accept_join_requests=5)
277
278
        # Users already in group
279
        self.users = UserFactory.create_batch(3)
280
        # Associated GroupMember
281
        self.group_member1 = GroupMemberFactory(user=self.users[0], group=self.group, perm_rank=Group.ADMINISTRATOR_RANK) # can validate requests
282
        self.group_member2 = GroupMemberFactory(user=self.users[1], group=self.group, perm_rank=1) # cannot validate requests
283
        self.group_member3 = GroupMemberFactory(user=self.users[2], group=self.group, perm_rank=0) # request to be validated
284
285
        # Testing user
286
        self.user = UserFactory()
287
288
        # Misc
289
        self.new_membership_data = {"group_id": self.group.id, "user_id": self.user.id}
290
291
    def test_create_not_authed(self):
292
        self.client.force_authenticate(user=None)
293
        response = self.client.post(self.members_url, self.new_membership_data)
294
        self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
295
296
    def test_create_success(self):
297
        # Succesful attempt to request group membership
298
        self.client.force_authenticate(user=self.user)
299
        response = self.client.post(self.members_url, self.new_membership_data)
300
        self.assertEqual(response.status_code, status.HTTP_201_CREATED)
301
        self.assertEqual(response.data['perm_rank'], self.group.default_member_rank)
302
303
    def test_validate_forbidden(self):
304
        # Attempt to validate a request but not enough permission
305
        self.client.force_authenticate(user=self.users[1])
306
        response = self.client.put(self.accept_join_requests_url % self.group_member3.id)
307
        self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
308
309
    def test_validate_success(self):
310
        # Succesful attempt to validate a request
311
        self.client.force_authenticate(user=self.users[0])
312
        response = self.client.put(self.accept_join_requests_url % self.group_member3.id)
313
        self.assertEqual(response.status_code, status.HTTP_200_OK)
314
        self.assertEqual(response.data['perm_rank'], 1)
315
316 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
317
class InvitationGroupMemberCreationTests(APITestCase):
318
    @classmethod
319
    def setUpTestData(self):
320
        super(APITestCase, self).setUpTestData()
321
322
        # Routes
323
        self.members_url = "/group-member/"
324
325
        # Group with invitation only
326
        self.group = GroupFactory(req_rank_invite=5, default_member_rank=-1, is_private=True)
327
328
        # Testing user
329
        self.users = UserFactory.create_batch(2)
330
        self.memberships = [
331
            None,
332
            GroupMemberFactory(user=self.users[1], group=self.group, perm_rank=self.group.req_rank_invite)
333
        ]
334
335
        # Misc
336
        self.new_membership_data = {"group_id": self.group.id, "user_id": self.users[0].id}
337
338
    def test_create_not_authed(self):
339
        self.client.force_authenticate(user=None)
340
        response = self.client.post(self.members_url, self.new_membership_data)
341
        self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
342
343
    def test_create_forbidden(self):
344
        # Can't join this Group
345
        self.client.force_authenticate(user=self.users[0])
346
        response = self.client.post(self.members_url, self.new_membership_data)
347
        self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
348
349
    def test_create_already_group_member(self):
350
        self.client.force_authenticate(user=self.users[1])
351
        response = self.client.post(self.members_url, self.new_membership_data)
352
        self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
353
354
    def test_create_self_forbidden(self):
355
        self.client.force_authenticate(user=self.users[1])
356
        response = self.client.post(self.members_url, self.new_membership_data)
357
        self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
358
359 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
360
class InvitationGroupMemberInvitationWorkflowTests(APITestCase):
361
    @classmethod
362
    def setUpTestData(self):
363
        super(APITestCase, self).setUpTestData()
364
365
        # Routes
366
        self.members_url = "/group-member/"
367
        self.group_invite_url = "/group/%d/invite/";
368
        self.group_invite_accept_url = "/group/%d/invite_accept/";
369
370
        # Group with invitation only
371
        self.group = GroupFactory(req_rank_invite=5, default_member_rank=-1)
372
373
        # Testing user
374
        self.users = UserFactory.create_batch(2)
375
        self.memberships = [
376
            None,
377
            GroupMemberFactory(user=self.users[1], group=self.group, perm_rank=self.group.req_rank_invite)
378
        ]
379
380
    def test_invite_not_authed(self):
381
        self.client.force_authenticate(user=None)
382
        response = self.client.put(self.group_invite_url % (self.group.id), {"user_id": self.users[0].id} )
383
        self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
384
385
    def test_invite_forbidden(self):
386
        # Attempt to get group membership
387
        self.client.force_authenticate(user=self.users[0])
388
        response = self.client.put(self.group_invite_url % (self.group.id), {"user_id": self.users[0].id} )
389
        self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
390
391
    def test_invite_already_group_member(self):
392
        # Attempt to get group membership
393
        self.client.force_authenticate(user=self.users[1])
394
        response = self.client.put(self.group_invite_url % (self.group.id), {"user_id": self.users[1].id} )
395
        self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
396
397
    def test_invite_ok(self):
398
        # User0 invites User1 to group
399
        self.client.force_authenticate(user=self.users[1])
400
        response = self.client.put(self.group_invite_url % (self.group.id), {"user_id": self.users[0].id} )
401
        self.assertEqual(response.status_code, status.HTTP_200_OK)
402
403
    # def test_invite_accept(self): # TODO: invitation process
404
    #     self.test_invite_ok()
405
    #     self.client.force_authenticate(user=self.users[0])
406
    #     response = self.client.post(self.members_url, {"user_id": self.users[0].id, "group_id": self.group.id})
407
    #     self.assertEqual(response.status_code, status.HTTP_201_CREATED)
408