Completed
Push — master ( 9a7f1b...b2a2c0 )
by Camille
9s
created

OpenGroupMemberCreationTests   A

Complexity

Total Complexity 4

Size/Duplication

Total Lines 43
Duplicated Lines 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
wmc 4
c 2
b 0
f 0
dl 0
loc 43
rs 10

4 Methods

Rating   Name   Duplication   Size   Complexity  
A test_create_not_for_self() 0 5 1
B setUpTestData() 0 24 1
A test_create_not_authed() 0 4 1
A test_create_success() 0 6 1
1
import json
2
3
from django.core import mail
4
5
from rest_framework import status
6
from rest_framework.test import APITestCase
7
8
from sigma_core.models.user import User
9
from sigma_core.models.group import Group
10
from sigma_core.models.group_member import GroupMember
11
from sigma_core.serializers.user import UserSerializer
12
from sigma_core.tests.factories import UserFactory, AdminUserFactory, GroupFactory, GroupAcknowledgmentFactory, GroupMemberFactory, ClusterFactory
13
14
15
class GroupMemberVisibilityTests(APITestCase):
16
    @classmethod
17
    def setUpTestData(self):
18
        # Summary: 1 Sigma admin + 10 users, 4 groups, 2 clusters
19
        super().setUpTestData()
20
21
        self.users = [AdminUserFactory()] + UserFactory.create_batch(10)
22
        self.clusters = ClusterFactory.create_batch(2)
23
        self.groups = [None] + GroupFactory.create_batch(4, is_private=True)
24
25
        # Group #2 is public
26
        self.groups[2].is_private = False
27
        self.groups[2].save()
28
29
        # Add users in clusters (user #1 is admin in cluster #1 and user #6 is admin in cluster #2)
30
        for i in range(5):
31
            self.clusters[0].cluster_users.add(self.users[1+i])
32
            GroupMemberFactory(group=self.clusters[0], user=self.users[1+i], perm_rank=(Group.ADMINISTRATOR_RANK if i == 0 else 1))
33
34
            self.clusters[1].cluster_users.add(self.users[6+i])
35
            GroupMemberFactory(group=self.clusters[1], user=self.users[6+i], perm_rank=(Group.ADMINISTRATOR_RANK if i == 0 else 1))
36
37
        # Add users to group
38
        self.mships = [
39
            GroupMemberFactory(user=self.users[2], group=self.groups[1], perm_rank=1),
40
            GroupMemberFactory(user=self.users[3], group=self.groups[2], perm_rank=1),
41
            GroupMemberFactory(user=self.users[6], group=self.groups[2], perm_rank=1),
42
            GroupMemberFactory(user=self.users[6], group=self.groups[3], perm_rank=1),
43
            GroupMemberFactory(user=self.users[8], group=self.groups[3], perm_rank=1),
44
            GroupMemberFactory(user=self.users[4], group=self.groups[4], perm_rank=1),
45
            GroupMemberFactory(user=self.users[7], group=self.groups[4], perm_rank=1),
46
        ]
47
48
        # User #5 is invited to group #2
49
        self.users[5].invited_to_groups.add(self.groups[2])
50
51
        # GroupAcknowledgments
52
        GroupAcknowledgmentFactory(parent_group=self.clusters[0].group_ptr, subgroup=self.groups[1], validated=True)
53
54
        # Misc
55
        self.mship_url = "/group-member/?"
56
57
    def try_get(self, reqId=-1, uid=-1, gid=-1, expectedStatus=status.HTTP_200_OK, expectedLength=1):
58
        """
59
        This function attempts to get the membership of user uid in group gid using reqId permissions
60
        GET /group-member/?user={uid}&group={gid}
61
        """
62
        if reqId >= 0: # use -1 for unauthed user
63
            self.client.force_authenticate(user=self.users[reqId])
64
        url = self.mship_url
65
        if uid >= 0:
66
            url = url + ("user=%d&" % self.users[uid].id)
67
        if gid >= 0:
68
            url = url + ("group=%d" % self.groups[gid].id)
69
        response = self.client.get(url)
70
        self.assertEqual(response.status_code, expectedStatus)
71
        if expectedStatus == status.HTTP_200_OK:
72
            self.assertEqual(len(response.data), expectedLength)
73
        return response
74
75
    def test_get_mship_unauthed(self):
76
        # Client is unauthed
77
        self.try_get(reqId=-1, uid=1, gid=1, expectedStatus=status.HTTP_401_UNAUTHORIZED)
78
79
    def test_get_mship_groupmember(self):
80
        # Client is member of the group he requests
81
        self.try_get(reqId=4, gid=4, expectedLength=2)
82
83
    def test_get_mship_invited(self):
84
        self.try_get(reqId=5, gid=2, expectedLength=2)
85
86
    def test_get_mship_public(self):
87
        self.try_get(reqId=1, gid=2, expectedLength=1)
88
89
    def test_get_mship_acknowledged(self):
90
        self.try_get(reqId=3, gid=1)
91
92
    def test_get_mship_private(self):
93
        resp = self.try_get(reqId=2, gid=4, expectedLength=0)
94
95
    def test_get_mship_invisibleuser(self):
96
        self.try_get(reqId=1, uid=6, gid=2, expectedLength=0)
97
98
    def test_get_mship_private_but_admin(self):
99
        self.try_get(reqId=0, uid=7, gid=4)
100
101
102
# Test /rank, /kick
103
class GroupMemberPermissionTests(APITestCase):
104
    @classmethod
105
    def setUpTestData(self):
106
        # Summary: 6 users, 1 group, 5 mships
107
108
        super().setUpTestData()
109
110
        self.member_rank_url = "/group-member/%d/rank/"
111
        self.member_url = "/group-member/%d/"
112
        self.users = UserFactory.create_batch(6)
113
        self.group = GroupFactory(req_rank_promote=3, req_rank_demote=4, req_rank_kick=5)
114
        self.mships = [
115
            None,
116
            GroupMemberFactory(user=self.users[1], group=self.group, perm_rank=1),
117
            GroupMemberFactory(user=self.users[2], group=self.group, perm_rank=2),
118
            GroupMemberFactory(user=self.users[3], group=self.group, perm_rank=3),
119
            GroupMemberFactory(user=self.users[4], group=self.group, perm_rank=4),
120
            GroupMemberFactory(user=self.users[5], group=self.group, perm_rank=5)
121
        ]
122
123
    def try_rank(self, userId, targetId, newPermRank, expectedHttpResponseCode):
124
        """
125
        This function attempts to set $targetId membership rank to $newPermRank
126
        using $userId permissions
127
        PUT /group-member/{targetId}/rank with perm_rank=newPermRank
128
        """
129
        if userId >= 0: # use -1 for unauthed user
130
            self.client.force_authenticate(user=self.users[userId])
131
        response = self.client.put(self.member_rank_url % (self.mships[targetId].id), {"perm_rank": newPermRank})
132
        self.assertEqual(response.status_code, expectedHttpResponseCode)
133
134
    def try_delete(self, userId, targetId, expectedHttpResponseCode):
135
        """
136
        This function attempts to remove $targetId membership from Group
137
        using $userId permissions
138
        DELETE /group-member/{targetId}/
139
        """
140
        if userId >= 0:
141
            self.client.force_authenticate(user=self.users[userId])
142
        response = self.client.delete(self.member_url % targetId)
143
        self.assertEqual(response.status_code, expectedHttpResponseCode)
144
145
#### Model methods tests
146
    def test_group_member_model(self):
147
        self.assertTrue(self.mships[1].is_accepted())
148
        self.assertTrue(self.mships[5].can_kick())
149
        self.assertFalse(self.mships[4].can_kick())
150
        self.assertTrue(self.mships[1].can_invite())
151
152
#### Tests /rank
153
    def test_rank_not_authed(self):
154
        self.try_rank(-1, 1, 4, status.HTTP_401_UNAUTHORIZED)
155
156
    def test_rank_not_group_member(self):
157
        self.try_rank(0, 1, 4, status.HTTP_404_NOT_FOUND)
158
159
    def test_rank_demote_no_permission1(self):
160
        self.try_rank(1, 2, 1, status.HTTP_403_FORBIDDEN)
161
162
    def test_rank_demote_no_permission2(self):
163
        self.try_rank(3, 2, 1, status.HTTP_403_FORBIDDEN)
164
        self.assertEqual(GroupMember.objects.get(pk=self.mships[2].id).perm_rank, 2)
165
166
    def test_rank_promote_no_permission(self):
167
        self.try_rank(1, 1, 2, status.HTTP_403_FORBIDDEN)
168
169
    def test_rank_no_rank_change(self):
170
        self.try_rank(5, 1, 1, status.HTTP_400_BAD_REQUEST)
171
172
    def test_rank_not_a_number(self):
173
        self.try_rank(5, 1, "hi!", status.HTTP_400_BAD_REQUEST)
174
175
    def test_rank_set_rank_to_zero(self):
176
        self.try_rank(5, 1, 0, status.HTTP_400_BAD_REQUEST)
177
178
    def test_rank_bad_rank(self):
179
        self.try_rank(5, 1, -1, status.HTTP_400_BAD_REQUEST)
180
        self.try_rank(5, 1, Group.ADMINISTRATOR_RANK + 1, status.HTTP_400_BAD_REQUEST)
181
182
    def test_rank_too_high_rank(self):
183
        self.try_rank(5, 1, 5, status.HTTP_403_FORBIDDEN)
184
185
    def test_rank_promote_ok(self):
186
        self.try_rank(3, 1, 2, status.HTTP_200_OK)
187
        self.assertEqual(GroupMember.objects.get(pk=self.mships[1].id).perm_rank, 2)
188
189
    def test_rank_demote_ok(self):
190
        self.try_rank(5, 3, 2, status.HTTP_200_OK)
191
        self.assertEqual(GroupMember.objects.get(pk=self.mships[3].id).perm_rank, 2)
192
193
    def test_rank_demote_self_ok(self):
194
        self.try_rank(3, 3, 2, status.HTTP_200_OK)
195
        self.assertEqual(GroupMember.objects.get(pk=self.mships[3].id).perm_rank, 2)
196
197
#### Tests delete
198
    def test_delete_not_authed(self):
199
        self.try_delete(-1, 1, status.HTTP_401_UNAUTHORIZED)
200
201
    def test_delete_not_group_member(self):
202
        self.try_delete(0, 1, status.HTTP_404_NOT_FOUND)
203
204
    def test_delete_no_permission(self):
205
        self.try_delete(3, 2, status.HTTP_403_FORBIDDEN)
206
207
    def test_delete_user_not_in_group(self):
208
        self.try_delete(4, 0, status.HTTP_404_NOT_FOUND)
209
210
    def test_delete_self_ok(self):
211
        self.try_delete(1, 1, status.HTTP_204_NO_CONTENT)
212
        self.assertFalse(GroupMember.objects.filter(pk=self.mships[1].id).exists())
213
214
    def test_delete_ok(self):
215
        self.try_delete(5, 1, status.HTTP_204_NO_CONTENT)
216
        self.assertFalse(GroupMember.objects.filter(pk=self.mships[1].id).exists())
217
218
219
class OpenGroupMemberCreationTests(APITestCase):
220
    @classmethod
221
    def setUpTestData(self):
222
        super(APITestCase, self).setUpTestData()
223
224
        # Routes
225
        self.members_url = "/group-member/"
226
        self.member_url = self.members_url + "%d/"
227
228
        # Public Group, open to anyone
229
        self.group = GroupFactory()
230
        self.group.is_private = False
231
        self.group.default_member_rank = 1
232
        self.group.save()
233
234
        # Users already in group
235
        self.users = [UserFactory()]
236
        # Associated GroupMember
237
        self.group_member1 = GroupMember(user=self.users[0], group=self.group, perm_rank=Group.ADMINISTRATOR_RANK)
238
239
        # Testing user
240
        self.user = UserFactory()
241
242
        # Misc
243
        self.new_membership_data = {"group_id": self.group.id, "user_id": self.user.id}
244
245
    def test_create_not_authed(self):
246
        self.client.force_authenticate(user=None)
247
        response = self.client.post(self.members_url, self.new_membership_data)
248
        self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
249
250
    def test_create_not_for_self(self):
251
        # Attempt to add somebody else to a group
252
        self.client.force_authenticate(user=self.users[0])
253
        response = self.client.post(self.members_url, self.new_membership_data)
254
        self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
255
256
    def test_create_success(self):
257
        # Succesful attempt to join an open group
258
        self.client.force_authenticate(user=self.user)
259
        response = self.client.post(self.members_url, self.new_membership_data)
260
        self.assertEqual(response.status_code, status.HTTP_201_CREATED)
261
        self.assertEqual(response.data['perm_rank'], self.group.default_member_rank)
262
263
264
class RequestGroupMemberCreationTests(APITestCase):
265
    @classmethod
266
    def setUpTestData(self):
267
        super(APITestCase, self).setUpTestData()
268
269
        # Routes
270
        self.members_url = "/group-member/"
271
        self.accept_join_requests_url = self.members_url + "%d/accept_join_request/"
272
273
        # Group with membership request
274
        self.group = GroupFactory(default_member_rank=0, req_rank_accept_join_requests=5)
275
276
        # Users already in group
277
        self.users = UserFactory.create_batch(3)
278
        # Associated GroupMember
279
        self.group_member1 = GroupMemberFactory(user=self.users[0], group=self.group, perm_rank=Group.ADMINISTRATOR_RANK) # can validate requests
280
        self.group_member2 = GroupMemberFactory(user=self.users[1], group=self.group, perm_rank=1) # cannot validate requests
281
        self.group_member3 = GroupMemberFactory(user=self.users[2], group=self.group, perm_rank=0) # request to be validated
282
283
        # Testing user
284
        self.user = UserFactory()
285
286
        # Misc
287
        self.new_membership_data = {"group_id": self.group.id, "user_id": self.user.id}
288
289
    def test_create_not_authed(self):
290
        self.client.force_authenticate(user=None)
291
        response = self.client.post(self.members_url, self.new_membership_data)
292
        self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
293
294
    def test_create_success(self):
295
        # Succesful attempt to request group membership
296
        self.client.force_authenticate(user=self.user)
297
        response = self.client.post(self.members_url, self.new_membership_data)
298
        self.assertEqual(response.status_code, status.HTTP_201_CREATED)
299
        self.assertEqual(response.data['perm_rank'], self.group.default_member_rank)
300
301
    def test_validate_forbidden(self):
302
        # Attempt to validate a request but not enough permission
303
        self.client.force_authenticate(user=self.users[1])
304
        response = self.client.put(self.accept_join_requests_url % self.group_member3.id)
305
        self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
306
307
    def test_validate_success(self):
308
        # Succesful attempt to validate a request
309
        self.client.force_authenticate(user=self.users[0])
310
        response = self.client.put(self.accept_join_requests_url % self.group_member3.id)
311
        self.assertEqual(response.status_code, status.HTTP_200_OK)
312
        self.assertEqual(response.data['perm_rank'], 1)
313
314
315
class InvitationGroupMemberCreationTests(APITestCase):
316 View Code Duplication
    @classmethod
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
317
    def setUpTestData(self):
318
        super(APITestCase, self).setUpTestData()
319
320
        # Routes
321
        self.members_url = "/group-member/"
322
323
        # Group with invitation only
324
        self.group = GroupFactory(req_rank_invite=5, default_member_rank=-1, is_private=True)
325
326
        # Testing user
327
        self.users = UserFactory.create_batch(2)
328
        self.memberships = [
329
            None,
330
            GroupMemberFactory(user=self.users[1], group=self.group, perm_rank=self.group.req_rank_invite)
331
        ]
332
333
        # Misc
334
        self.new_membership_data = {"group_id": self.group.id, "user_id": self.users[0].id}
335
336
    def test_create_not_authed(self):
337
        self.client.force_authenticate(user=None)
338
        response = self.client.post(self.members_url, self.new_membership_data)
339
        self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
340
341
    def test_create_forbidden(self):
342
        # Can't join this Group
343
        self.client.force_authenticate(user=self.users[0])
344
        response = self.client.post(self.members_url, self.new_membership_data)
345
        self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
346
347
    def test_create_already_group_member(self):
348
        self.client.force_authenticate(user=self.users[1])
349
        response = self.client.post(self.members_url, self.new_membership_data)
350
        self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
351
352
    def test_create_self_forbidden(self):
353
        self.client.force_authenticate(user=self.users[1])
354
        response = self.client.post(self.members_url, self.new_membership_data)
355
        self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
356
357
358
class InvitationGroupMemberInvitationWorkflowTests(APITestCase):
359 View Code Duplication
    @classmethod
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
360
    def setUpTestData(self):
361
        super(APITestCase, self).setUpTestData()
362
363
        # Routes
364
        self.members_url = "/group-member/"
365
        self.group_invite_url = "/group/%d/invite/";
366
        self.group_invite_accept_url = "/group/%d/invite_accept/";
367
368
        # Group with invitation only
369
        self.group = GroupFactory(req_rank_invite=5, default_member_rank=-1)
370
371
        # Testing user
372
        self.users = UserFactory.create_batch(2)
373
        self.memberships = [
374
            None,
375
            GroupMemberFactory(user=self.users[1], group=self.group, perm_rank=self.group.req_rank_invite)
376
        ]
377
378
    def test_invite_not_authed(self):
379
        self.client.force_authenticate(user=None)
380
        response = self.client.put(self.group_invite_url % (self.group.id), {"user_id": self.users[0].id} )
381
        self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
382
383
    def test_invite_forbidden(self):
384
        # Attempt to get group membership
385
        self.client.force_authenticate(user=self.users[0])
386
        response = self.client.put(self.group_invite_url % (self.group.id), {"user_id": self.users[0].id} )
387
        self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
388
389
    def test_invite_already_group_member(self):
390
        # Attempt to get group membership
391
        self.client.force_authenticate(user=self.users[1])
392
        response = self.client.put(self.group_invite_url % (self.group.id), {"user_id": self.users[1].id} )
393
        self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
394
395
    def test_invite_ok(self):
396
        # User0 invites User1 to group
397
        self.client.force_authenticate(user=self.users[1])
398
        response = self.client.put(self.group_invite_url % (self.group.id), {"user_id": self.users[0].id} )
399
        self.assertEqual(response.status_code, status.HTTP_200_OK)
400
401
    # def test_invite_accept(self): # TODO: invitation process
402
    #     self.test_invite_ok()
403
    #     self.client.force_authenticate(user=self.users[0])
404
    #     response = self.client.post(self.members_url, {"user_id": self.users[0].id, "group_id": self.group.id})
405
    #     self.assertEqual(response.status_code, status.HTTP_201_CREATED)
406