UserManager.get_queryset()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 2
rs 10
ccs 0
cts 0
cp 0
cc 1
crap 2
1
from django.db import models
2
from django.db.models import Q
3
4
from django.contrib.auth.models import AbstractBaseUser, BaseUserManager
5
6
from sigma_chat.models.chat_member import ChatMember
7
from sigma_chat.models.chat import Chat
8
9
10
class UserManager(BaseUserManager):
11
    def get_queryset(self):
12
        return super().get_queryset().prefetch_related('clusters__group_ptr')
13
14
    def create_user(self, email, lastname, firstname, password=None):
15
        """
16
        Creates and saves a User with the given email, lastname, firstname and password
17
        TODO - Generate a unique username.
18
        """
19
        if not email:
20
            raise ValueError('Users must have an email address')
21
22
        # username = email # TODO - Generate unique username
23
24
        user = self.model(
25
            email=self.normalize_email(email),
26
            lastname=lastname,
27
            firstname=firstname
28
        )
29
30
        user.set_password(password)
31
        user.save()
32
        return user
33
34
    def create_superuser(self, email, lastname, firstname, password):
35
        """
36
        Creates and saves a superuser with the given email, lastname, firstname and password.
37
        """
38
        user = self.create_user(email, lastname, firstname, password)
39
        user.is_superuser = True
40
        user.is_staff = True
41
        user.save()
42
        return user
43
44
45
class User(AbstractBaseUser):
46
    """
47
    User are identified by their email. Lastname and firstname are required.
48
    """
49
    email = models.EmailField(max_length=254, unique=True)
50
    lastname = models.CharField(max_length=255)
51
    firstname = models.CharField(max_length=128)
52
    # username = models.CharField(max_length=128, unique=True) # TODO - Add unique username for frontend URLs
53
    phone = models.CharField(max_length=20, blank=True)
54
    photo = models.OneToOneField('sigma_files.Image', blank=True, null=True, on_delete=models.SET_NULL)
55
56
    is_active = models.BooleanField(default=True)
57
    last_modified = models.DateTimeField(auto_now=True)
58
    join_date = models.DateTimeField(auto_now_add=True)
59
60
    is_superuser = models.BooleanField(default=False)
61
    is_staff = models.BooleanField(default=False)
62
63
    clusters = models.ManyToManyField('Cluster', related_name="cluster_users") # users should be members of at least one cluster
64
65 1
    objects = UserManager()
66
67
    invited_to_groups = models.ManyToManyField('Group', blank=True, related_name="invited_users");
68
    groups = models.ManyToManyField('Group', through='GroupMember', related_name='users')
69
70
    # Related fields:
71
    #   - memberships (model GroupMember)
72
    #   - user_chatmember (model ChatMember.user)
73
74
    USERNAME_FIELD = 'email'
75
    REQUIRED_FIELDS = ['lastname', 'firstname']
76
77
    def __str__(self):
78
        return self.email
79
80
    def get_full_name(self):
81
        return "{} {}".format(self.lastname, self.firstname)
82
83
    def get_short_name(self):
84 1
        return self.email
85
86
    def is_sigma_admin(self):
87
        return self.is_staff or self.is_superuser
88
89
    def is_in_cluster(self, cluster):
90
        return cluster in self.clusters.all()
91 1
92
    def is_cluster_admin(self, cluster):
93
        from sigma_core.models.cluster import Cluster
94
        ms = self.get_group_membership(cluster.group_ptr)
95
        return (ms is not None and ms.perm_rank == Cluster.ADMINISTRATOR_RANK)
96
97
    def is_admin_of_one_cluster(self, clusters):
98
        from functools import reduce
99 1
        import operator
100
        return reduce(operator.or_, [self.is_cluster_admin(c) for c in clusters])
101
102
    def has_common_cluster(self, user):
103
        """
104
        Return True iff self has a cluster in common with user.
105
        """
106
        return len(set(self.clusters.all().values_list('id', flat=True)).intersection(user.clusters.all().values_list('id', flat=True))) > 0
107 1
108
    def has_common_group(self, user):
109
        """
110
        Return True iff self has a group in common with user.
111
        Warning: non symmetric function! u1.has_common_group(u2) may be different of u2.has_common_group(u1)
112
        """
113 1
        # We filter on is_accepted : we are really in the same group if you ARE really in the group.
114
        # But, on the other hand, you can "see" pending request of other members.
115
        return len(set(self.memberships.filter(is_accepted=True).values_list('group', flat=True)).intersection(user.memberships.all().values_list('group', flat=True))) > 0
116
117
    def get_group_membership(self, group):
118
        from sigma_core.models.group_member import GroupMember
119
        from sigma_core.models.group import Group
120 1
        try:
121
            return self.memberships.get(group=group)
122
        except GroupMember.DoesNotExist:
123
            return None
124
125
    def is_group_member(self, g):
126
        from sigma_core.models.group_member import GroupMember
127
        mem = self.get_group_membership(g)
128
        return mem is not None and mem.is_accepted()
129
130
    def can_invite(self, group):
131
        from sigma_core.models.group_member import GroupMember
132
        mem = self.get_group_membership(group)
133
        return mem is not None and mem.can_invite
134
135
    def can_accept_join_requests(self, group):
136
        # Considered that someone who can invite can also accept join requests
137
        from sigma_core.models.group_member import GroupMember
138
        if self.is_sigma_admin():
139
            return True
140
        mem = self.get_group_membership(group)
141
        return mem is not None and mem.can_invite
142
143
    def can_modify_group_infos(self, group):
144
        from sigma_core.models.group_member import GroupMember
145
        mem = self.get_group_membership(group)
146
        return mem is not None and mem.can_modify_group_infos
147
148
    def has_group_admin_perm(self, group):
149
        from sigma_core.models.group_member import GroupMember
150
        from sigma_core.models.group import Group
151
        if self.is_sigma_admin():
152
            return True
153
        mem = self.get_group_membership(group)
154
        return mem is not None and (mem.is_administrator or mem.is_superadministrator)
155
156
    def is_invited_to_group_id(self, groupId):
157
        return self.invited_to_groups.filter(pk=groupId).exists()
158
159
    def get_groups_with_confirmed_membership(self):
160
        from sigma_core.models.group_member import GroupMember
161
        return GroupMember.objects.filter(Q(user=self) & Q(is_accepted=True)).values_list('group', flat=True)
162
163
    def get_chat_membership(self, chat):
164
        try:
165
            return self.user_chatmember.get(chat=chat)
166
        except ChatMember.DoesNotExist:
167
            return None
168
169
    def is_chat_member(self, chat):
170
        mem = self.get_chat_membership(chat)
171
        return mem is not None and mem.is_member
172
173
    def is_chat_admin(self, chat):
174
        mem = self.get_chat_membership(chat)
175
        return mem is not None and mem.is_admin
176
177
    def is_chat_creator(self, chat):
178
        mem = self.get_chat_membership(chat)
179
        return mem is not None and mem.is_creator
180
181
    def is_chat_banned(self, chat):
182
        mem = self.get_chat_membership(chat)
183
        return mem is not None and mem.is_banned
184
185
186
    ###############
187
    # Permissions #
188
    ###############
189
190
    # Perms for admin site
191
    def has_perm(self, perm, obj=None): # pragma: no cover
192
        return True
193
194
    def has_module_perms(self, app_label): # pragma: no cover
195
        return True
196