Passed
Pull Request — develop (#29)
by inkhey
02:37
created

tracim_backend.models.auth.User.get_display_name()   A

Complexity

Conditions 3

Size

Total Lines 15
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 15
rs 10
c 0
b 0
f 0
cc 3
nop 2
1
# -*- coding: utf-8 -*-
2
"""
3
Auth* related model.
4
5
This is where the models used by the authentication stack are defined.
6
7
It's perfectly fine to re-use this definition in the tracim application,
8
though.
9
"""
10
import os
11
import time
12
import uuid
13
14
from datetime import datetime
15
from hashlib import sha256
16
from typing import TYPE_CHECKING
17
18
import sqlalchemy
19
import typing
20
from sqlalchemy import Column
21
from sqlalchemy import ForeignKey
22
from sqlalchemy import Sequence
23
from sqlalchemy import Table
24
from sqlalchemy.ext.hybrid import hybrid_property
25
from sqlalchemy.orm import relation
26
from sqlalchemy.orm import relationship
27
from sqlalchemy.orm import synonym
28
from sqlalchemy.types import Boolean
29
from sqlalchemy.types import DateTime
30
from sqlalchemy.types import Integer
31
from sqlalchemy.types import Unicode
32
from tracim_backend.exceptions import ExpiredResetPasswordToken
33
from tracim_backend.exceptions import UnvalidResetPasswordToken
34
35
from tracim_backend.models.meta import DeclarativeBase
36
from tracim_backend.models.meta import metadata
37
if TYPE_CHECKING:
38
    from tracim_backend.models.data import Workspace
39
    from tracim_backend.models.data import UserRoleInWorkspace
40
__all__ = ['User', 'Group', 'Permission']
41
42
# This is the association table for the many-to-many relationship between
43
# groups and permissions.
44
group_permission_table = Table('group_permission', metadata,
45
    Column('group_id', Integer, ForeignKey('groups.group_id',
46
        onupdate="CASCADE", ondelete="CASCADE"), primary_key=True),
47
    Column('permission_id', Integer, ForeignKey('permissions.permission_id',
48
        onupdate="CASCADE", ondelete="CASCADE"), primary_key=True)
49
)
50
51
# This is the association table for the many-to-many relationship between
52
# groups and members - this is, the memberships.
53
user_group_table = Table('user_group', metadata,
54
    Column('user_id', Integer, ForeignKey('users.user_id',
55
        onupdate="CASCADE", ondelete="CASCADE"), primary_key=True),
56
    Column('group_id', Integer, ForeignKey('groups.group_id',
57
        onupdate="CASCADE", ondelete="CASCADE"), primary_key=True)
58
)
59
60
61
class Group(DeclarativeBase):
62
63
    TIM_NOBODY = 0
64
    TIM_USER = 1
65
    TIM_MANAGER = 2
66
    TIM_ADMIN = 3
67
68
    TIM_NOBODY_GROUPNAME = 'nobody'
69
    TIM_USER_GROUPNAME = 'users'
70
    TIM_MANAGER_GROUPNAME = 'managers'
71
    TIM_ADMIN_GROUPNAME = 'administrators'
72
73
    __tablename__ = 'groups'
74
75
    group_id = Column(Integer, Sequence('seq__groups__group_id'), autoincrement=True, primary_key=True)
76
    group_name = Column(Unicode(16), unique=True, nullable=False)
77
    display_name = Column(Unicode(255))
78
    created = Column(DateTime, default=datetime.utcnow)
79
80
    users = relationship('User', secondary=user_group_table, backref='groups')
81
82
    def __repr__(self):
83
        return '<Group: name=%s>' % repr(self.group_name)
84
85
    def __unicode__(self):
86
        return self.group_name
87
88
    @classmethod
89
    def by_group_name(cls, group_name, dbsession):
90
        """Return the user object whose email address is ``email``."""
91
        return dbsession.query(cls).filter_by(group_name=group_name).first()
92
93
94
class Profile(object):
95
    """This model is the "max" group associated to a given user."""
96
97
    _NAME = [
98
        Group.TIM_NOBODY_GROUPNAME,
99
        Group.TIM_USER_GROUPNAME,
100
        Group.TIM_MANAGER_GROUPNAME,
101
        Group.TIM_ADMIN_GROUPNAME,
102
    ]
103
104
    _IDS = [
105
        Group.TIM_NOBODY,
106
        Group.TIM_USER,
107
        Group.TIM_MANAGER,
108
        Group.TIM_ADMIN,
109
    ]
110
111
    # TODO - G.M - 18-04-2018 [Cleanup] Drop this
112
    # _LABEL = [l_('Nobody'),
113
    #           l_('Users'),
114
    #           l_('Global managers'),
115
    #           l_('Administrators')]
116
117
    def __init__(self, profile_id):
118
        assert isinstance(profile_id, int)
119
        self.id = profile_id
120
        self.name = Profile._NAME[profile_id]
121
        # TODO - G.M - 18-04-2018 [Cleanup] Drop this
122
        # self.label = Profile._LABEL[profile_id]
123
124
125
class User(DeclarativeBase):
126
    """
127
    User definition.
128
129
    This is the user definition used by :mod:`repoze.who`, which requires at
130
    least the ``email`` column.
131
    """
132
133
    __tablename__ = 'users'
134
135
    user_id = Column(Integer, Sequence('seq__users__user_id'), autoincrement=True, primary_key=True)
136
    email = Column(Unicode(255), unique=True, nullable=False)
137
    display_name = Column(Unicode(255))
138
    _password = Column('password', Unicode(128))
139
    created = Column(DateTime, default=datetime.utcnow)
140
    is_active = Column(Boolean, default=True, nullable=False)
141
    is_deleted = Column(Boolean, default=False, nullable=False, server_default=sqlalchemy.sql.expression.literal(False))
142
    imported_from = Column(Unicode(32), nullable=True)
143
    # timezone as tz database format
144
    timezone = Column(Unicode(255), nullable=False, server_default='')
145
    # lang in iso639 format
146
    lang = Column(Unicode(3), nullable=True, default=None)
147
    # TODO - G.M - 04-04-2018 - [auth] Check if this is already needed
148
    # with new auth system
149
    # TODO - G.M - 2018-08-22 - Think about hash instead of direct token
150
    auth_token = Column(Unicode(255))
151
    auth_token_created = Column(DateTime)
152
153
    reset_password_token_hash = Column(Unicode(255), nullable=True, default=None)  # nopep8
154
    reset_password_token_created = Column(DateTime, nullable=True, default=None)
155
156
    @hybrid_property
157
    def email_address(self):
158
        return self.email
159
160
    def __repr__(self):
161
        return '<User: email=%s, display=%s>' % (
162
                repr(self.email), repr(self.display_name))
163
164
    def __unicode__(self):
165
        return self.display_name or self.email
166
167
    @property
168
    def permissions(self):
169
        """Return a set with all permissions granted to the user."""
170
        perms = set()
171
        for g in self.groups:
172
            perms = perms | set(g.permissions)
173
        return perms
174
175
    @property
176
    def profile(self) -> Profile:
177
        profile_id = 0
178
        if len(self.groups) > 0:
179
            profile_id = max(group.group_id for group in self.groups)
180
        return Profile(profile_id)
181
182
    # TODO - G-M - 20-04-2018 - [Calendar] Replace this in context model object
183
    # @property
184
    # def calendar_url(self) -> str:
185
    #     # TODO - 20160531 - Bastien: Cyclic import if import in top of file
186
    #     from tracim.lib.calendar import CalendarManager
187
    #     calendar_manager = CalendarManager(None)
188
    #
189
    #     return calendar_manager.get_user_calendar_url(self.user_id)
190
191
    @classmethod
192
    def by_email_address(cls, email, dbsession):
193
        """Return the user object whose email address is ``email``."""
194
        return dbsession.query(cls).filter_by(email=email).first()
195
196
    @classmethod
197
    def by_user_name(cls, username, dbsession):
198
        """Return the user object whose user name is ``username``."""
199
        return dbsession.query(cls).filter_by(email=username).first()
200
201
    def _set_password(self, cleartext_password: str) -> None:
202
        """
203
        Set ciphertext password from cleartext password.
204
205
        Hash cleartext password on the fly,
206
        Store its ciphertext version,
207
        """
208
        self._password = self._hash(cleartext_password)
209
210
    def _get_password(self) -> str:
211
        """Return the hashed version of the password."""
212
        return self._password
213
214
    password = synonym('_password', descriptor=property(_get_password,
215
                                                        _set_password))
216
217
    def validate_password(self, cleartext_password: str) -> bool:
218
        """
219
        Check the password against existing credentials.
220
221
        :param cleartext_password: the password that was provided by the user
222
            to try and authenticate. This is the clear text version that we
223
            will need to match against the hashed one in the database.
224
        :type cleartext_password: unicode object.
225
        :return: Whether the password is valid.
226
        :rtype: bool
227
228
        """
229
        return self._validate_hash(self.password, cleartext_password)
230
231
    def get_display_name(self, remove_email_part: bool=False) -> str:
232
        """
233
        Get a name to display from corresponding member or email.
234
235
        :param remove_email_part: If True and display name based on email,
236
            remove @xxx.xxx part of email in returned value
237
        :return: display name based on user name or email.
238
        """
239
        if self.display_name:
240
            return self.display_name
241
        else:
242
            if remove_email_part:
243
                at_pos = self.email.index('@')
244
                return self.email[0:at_pos]
245
            return self.email
246
247
    def get_role(self, workspace: 'Workspace') -> int:
248
        for role in self.roles:
249
            if role.workspace == workspace:
250
                return role.role
251
252
        return UserRoleInWorkspace.NOT_APPLICABLE
0 ignored issues
show
introduced by
The variable UserRoleInWorkspace does not seem to be defined in case TYPE_CHECKING on line 37 is False. Are you sure this can never be the case?
Loading history...
253
254
    def get_active_roles(self) -> ['UserRoleInWorkspace']:
255
        """
256
        :return: list of roles of the user for all not-deleted workspaces
257
        """
258
        roles = []
259
        for role in self.roles:
260
            if not role.workspace.is_deleted:
261
                roles.append(role)
262
        return roles
263
264
    # Tokens ###
265
266
    def reset_tokens(self):
267
        self._generate_auth_token()
268
        # disable reset_password token
269
        self.reset_password_token_hash = None
270
        self.reset_password_token_created = None
271
272
    # Reset Password Tokens #
273
    def generate_reset_password_token(self) -> str:
274
        reset_password_token, self.reset_password_token_created, self.reset_password_token_hash = self._generate_token(
275
            create_hash=True)  # nopep8
276
        return reset_password_token
277
278
    def validate_reset_password_token(self, token, validity_seconds) -> bool:
279
        if not self._validate_date(self.reset_password_token_created, validity_seconds):  # nopep8
280
            raise ExpiredResetPasswordToken('reset password token has expired')
281
        if not self._validate_hash(self.reset_password_token_hash, token):
282
            raise UnvalidResetPasswordToken('reset password token is unvalid')
283
        return True
284
285
    # Auth Token #
286
    # TODO - G.M - 04-04-2018 - [auth] Check if this is already needed
287
    # with new auth system
288
289
    def _generate_auth_token(self) -> str:
290
        self.auth_token, self.auth_token_created, _ = self._generate_token()
291
        return self.auth_token
292
293
    # TODO - G.M - 2018-08-23 - Should we store hash instead of direct stored
294
    # auth token ?
295
    def validate_auth_token(self, token, validity_seconds) -> bool:
296
        return self.ensure_auth_token(validity_seconds) == token
297
298
    def ensure_auth_token(self, validity_seconds) -> str:
299
        """
300
        Create auth_token if None, regenerate auth_token if too much old.
301
        auth_token validity is set in
302
        :return: actual valid auth token
303
        """
304
305
        if not self.auth_token or not self.auth_token_created:
306
            self._generate_auth_token()
307
            return self.auth_token
308
309
        if not self._validate_date(self.auth_token_created, validity_seconds):
310
            self._generate_auth_token()
311
312
        return self.auth_token
313
314
    # Utils functions #
315
316
    @classmethod
317
    def _hash(cls, cleartext_password_or_token: str) -> str:
318
        salt = sha256()
319
        salt.update(os.urandom(60))
320
        salt = salt.hexdigest()
321
322
        hashed = sha256()
323
        # Make sure password is a str because we cannot hash unicode objects
324
        hashed.update((cleartext_password_or_token + salt).encode('utf-8'))
325
        hashed = hashed.hexdigest()
326
327
        ciphertext_password = salt + hashed
328
329
        # Make sure the hashed password is a unicode object at the end of the
330
        # process because SQLAlchemy _wants_ unicode objects for Unicode cols
331
        # FIXME - D.A. - 2013-11-20 - The following line has been removed since using python3. Is this normal ?!
332
        # password = password.decode('utf-8')
333
334
        return ciphertext_password
335
336
    @classmethod
337
    def _validate_hash(cls, hashed: str, cleartext_password_or_token: str) -> bool:  # nopep8
338
        result = False
339
        if hashed:
340
            new_hash = sha256()
341
            new_hash.update((cleartext_password_or_token + hashed[:64]).encode('utf-8'))
342
            result = hashed[64:] == new_hash.hexdigest()
343
        return result
344
345
    @classmethod
346
    def _generate_token(cls, create_hash=False) -> typing.Union[str, datetime, typing.Optional[str]]:  # nopep8
347
        token = str(uuid.uuid4())
348
        creation_datetime = datetime.utcnow()
349
        hashed_token = None
350
        if create_hash:
351
            hashed_token = cls._hash(token)
352
        return token, creation_datetime, hashed_token
353
354
    @classmethod
355
    def _validate_date(cls, date: datetime, validity_seconds: int) -> bool:
356
        if not date:
357
            return False
358
        now_seconds = time.mktime(datetime.utcnow().timetuple())
359
        auth_token_seconds = time.mktime(date.timetuple())
360
        difference = now_seconds - auth_token_seconds
361
362
        if difference > validity_seconds:
363
            return False
364
        return True
365
366
367
class Permission(DeclarativeBase):
368
    """
369
    Permission definition.
370
371
    Only the ``permission_name`` column is required.
372
373
    """
374
375
    __tablename__ = 'permissions'
376
377
    permission_id = Column(
378
        Integer,
379
        Sequence('seq__permissions__permission_id'),
380
        autoincrement=True,
381
        primary_key=True
382
    )
383
    permission_name = Column(Unicode(63), unique=True, nullable=False)
384
    description = Column(Unicode(255))
385
386
    groups = relation(Group, secondary=group_permission_table,
387
                      backref='permissions')
388
389
    def __repr__(self):
390
        return '<Permission: name=%s>' % repr(self.permission_name)
391
392
    def __unicode__(self):
393
        return self.permission_name
394