Profile.__init__()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 4
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
# -*- coding: utf-8 -*-
2
"""
3
Auth* related model.
4
5
This is where the models used by the authentication stack are defined.
6
7
It's perfectly fine to re-use this definition in the tracim application,
8
though.
9
"""
10
import os
11
import time
12
import uuid
13
14
from datetime import datetime
15
from hashlib import sha256
16
from typing import TYPE_CHECKING
17
18
import sqlalchemy
19
import typing
20
from sqlalchemy import Column
21
from sqlalchemy import ForeignKey
22
from sqlalchemy import Sequence
23
from sqlalchemy import Table
24
from sqlalchemy.ext.hybrid import hybrid_property
25
from sqlalchemy.orm import relation
26
from sqlalchemy.orm import relationship
27
from sqlalchemy.orm import synonym
28
from sqlalchemy.types import Boolean
29
from sqlalchemy.types import DateTime
30
from sqlalchemy.types import Integer
31
from sqlalchemy.types import Unicode
32
from tracim_backend.exceptions import ExpiredResetPasswordToken
33
from tracim_backend.exceptions import UnvalidResetPasswordToken
34
35
from tracim_backend.models.meta import DeclarativeBase
36
from tracim_backend.models.meta import metadata
37
if TYPE_CHECKING:
38
    from tracim_backend.models.data import Workspace
39
    from tracim_backend.models.data import UserRoleInWorkspace
40
__all__ = ['User', 'Group', 'Permission']
41
42
# This is the association table for the many-to-many relationship between
43
# groups and permissions.
44
group_permission_table = Table('group_permission', metadata,
45
    Column('group_id', Integer, ForeignKey('groups.group_id',
46
        onupdate="CASCADE", ondelete="CASCADE"), primary_key=True),
47
    Column('permission_id', Integer, ForeignKey('permissions.permission_id',
48
        onupdate="CASCADE", ondelete="CASCADE"), primary_key=True)
49
)
50
51
# This is the association table for the many-to-many relationship between
52
# groups and members - this is, the memberships.
53
user_group_table = Table('user_group', metadata,
54
    Column('user_id', Integer, ForeignKey('users.user_id',
55
        onupdate="CASCADE", ondelete="CASCADE"), primary_key=True),
56
    Column('group_id', Integer, ForeignKey('groups.group_id',
57
        onupdate="CASCADE", ondelete="CASCADE"), primary_key=True)
58
)
59
60
61
class Group(DeclarativeBase):
62
63
    TIM_NOBODY = 0
64
    TIM_USER = 1
65
    TIM_MANAGER = 2
66
    TIM_ADMIN = 3
67
68
    TIM_NOBODY_GROUPNAME = 'nobody'
69
    TIM_USER_GROUPNAME = 'users'
70
    TIM_MANAGER_GROUPNAME = 'trusted-users'
71
    TIM_ADMIN_GROUPNAME = 'administrators'
72
73
    __tablename__ = 'groups'
74
75
    group_id = Column(Integer, Sequence('seq__groups__group_id'), autoincrement=True, primary_key=True)
76
    group_name = Column(Unicode(16), unique=True, nullable=False)
77
    display_name = Column(Unicode(255))
78
    created = Column(DateTime, default=datetime.utcnow)
79
80
    users = relationship('User', secondary=user_group_table, backref='groups')
81
82
    def __repr__(self):
83
        return '<Group: name=%s>' % repr(self.group_name)
84
85
    def __unicode__(self):
86
        return self.group_name
87
88
    @classmethod
89
    def by_group_name(cls, group_name, dbsession):
90
        """Return the user object whose email address is ``email``."""
91
        return dbsession.query(cls).filter_by(group_name=group_name).first()
92
93
94
class Profile(object):
95
    """This model is the "max" group associated to a given user."""
96
97
    _NAME = [
98
        Group.TIM_NOBODY_GROUPNAME,
99
        Group.TIM_USER_GROUPNAME,
100
        Group.TIM_MANAGER_GROUPNAME,
101
        Group.TIM_ADMIN_GROUPNAME,
102
    ]
103
104
    _IDS = [
105
        Group.TIM_NOBODY,
106
        Group.TIM_USER,
107
        Group.TIM_MANAGER,
108
        Group.TIM_ADMIN,
109
    ]
110
111
    # TODO - G.M - 18-04-2018 [Cleanup] Drop this
112
    # _LABEL = [l_('Nobody'),
113
    #           l_('Users'),
114
    #           l_('Global managers'),
115
    #           l_('Administrators')]
116
117
    def __init__(self, profile_id):
118
        assert isinstance(profile_id, int)
119
        self.id = profile_id
120
        self.name = Profile._NAME[profile_id]
121
        # TODO - G.M - 18-04-2018 [Cleanup] Drop this
122
        # self.label = Profile._LABEL[profile_id]
123
124
125
class User(DeclarativeBase):
126
    """
127
    User definition.
128
129
    This is the user definition used by :mod:`repoze.who`, which requires at
130
    least the ``email`` column.
131
    """
132
133
    __tablename__ = 'users'
134
    # INFO - G.M - 2018-10-24 - force table to use utf8 instead of
135
    # utf8bm4 for mysql only in order to avoid max length of key issue with
136
    # long varchar in utf8bm4 column. This issue is related to email
137
    # field and is uniqueness. As far we search, there is to be no way to apply
138
    # mysql specific (which is ignored by other database)
139
    #  collation only on email field.
140
    __table_args__ = {
141
        'mysql_charset': 'utf8',
142
        'mysql_collate': 'utf8_general_ci'
143
    }
144
145
    user_id = Column(Integer, Sequence('seq__users__user_id'), autoincrement=True, primary_key=True)
146
    email = Column(Unicode(255), unique=True, nullable=False)
147
    display_name = Column(Unicode(255))
148
    _password = Column('password', Unicode(128))
149
    created = Column(DateTime, default=datetime.utcnow)
150
    is_active = Column(Boolean, default=True, nullable=False)
151
    is_deleted = Column(Boolean, default=False, nullable=False, server_default=sqlalchemy.sql.expression.literal(False))
152
    imported_from = Column(Unicode(32), nullable=True)
153
    # timezone as tz database format
154
    timezone = Column(Unicode(255), nullable=False, server_default='')
155
    # lang in iso639 format
156
    lang = Column(Unicode(3), nullable=True, default=None)
157
    # TODO - G.M - 04-04-2018 - [auth] Check if this is already needed
158
    # with new auth system
159
    # TODO - G.M - 2018-08-22 - Think about hash instead of direct token
160
    auth_token = Column(Unicode(255))
161
    auth_token_created = Column(DateTime)
162
163
    reset_password_token_hash = Column(Unicode(255), nullable=True, default=None)  # nopep8
164
    reset_password_token_created = Column(DateTime, nullable=True, default=None)
165
166
    @hybrid_property
167
    def email_address(self):
168
        return self.email
169
170
    def __repr__(self):
171
        return '<User: email=%s, display=%s>' % (
172
                repr(self.email), repr(self.display_name))
173
174
    def __unicode__(self):
175
        return self.display_name or self.email
176
177
    @property
178
    def permissions(self):
179
        """Return a set with all permissions granted to the user."""
180
        perms = set()
181
        for g in self.groups:
182
            perms = perms | set(g.permissions)
183
        return perms
184
185
    @property
186
    def profile(self) -> Profile:
187
        profile_id = 0
188
        if len(self.groups) > 0:
189
            profile_id = max(group.group_id for group in self.groups)
190
        return Profile(profile_id)
191
192
    # TODO - G-M - 20-04-2018 - [Calendar] Replace this in context model object
193
    # @property
194
    # def calendar_url(self) -> str:
195
    #     # TODO - 20160531 - Bastien: Cyclic import if import in top of file
196
    #     from tracim.lib.calendar import CalendarManager
197
    #     calendar_manager = CalendarManager(None)
198
    #
199
    #     return calendar_manager.get_user_calendar_url(self.user_id)
200
201
    @classmethod
202
    def by_email_address(cls, email, dbsession):
203
        """Return the user object whose email address is ``email``."""
204
        return dbsession.query(cls).filter_by(email=email).first()
205
206
    @classmethod
207
    def by_user_name(cls, username, dbsession):
208
        """Return the user object whose user name is ``username``."""
209
        return dbsession.query(cls).filter_by(email=username).first()
210
211
    def _set_password(self, cleartext_password: str) -> None:
212
        """
213
        Set ciphertext password from cleartext password.
214
215
        Hash cleartext password on the fly,
216
        Store its ciphertext version,
217
        """
218
        self._password = self._hash(cleartext_password)
219
220
    def _get_password(self) -> str:
221
        """Return the hashed version of the password."""
222
        return self._password
223
224
    password = synonym('_password', descriptor=property(_get_password,
225
                                                        _set_password))
226
227
    def validate_password(self, cleartext_password: str) -> bool:
228
        """
229
        Check the password against existing credentials.
230
231
        :param cleartext_password: the password that was provided by the user
232
            to try and authenticate. This is the clear text version that we
233
            will need to match against the hashed one in the database.
234
        :type cleartext_password: unicode object.
235
        :return: Whether the password is valid.
236
        :rtype: bool
237
238
        """
239
        return self._validate_hash(self.password, cleartext_password)
240
241
    def get_display_name(self, remove_email_part: bool=False) -> str:
242
        """
243
        Get a name to display from corresponding member or email.
244
245
        :param remove_email_part: If True and display name based on email,
246
            remove @xxx.xxx part of email in returned value
247
        :return: display name based on user name or email.
248
        """
249
        if self.display_name:
250
            return self.display_name
251
        else:
252
            if remove_email_part:
253
                at_pos = self.email.index('@')
254
                return self.email[0:at_pos]
255
            return self.email
256
257
    def get_role(self, workspace: 'Workspace') -> int:
258
        for role in self.roles:
259
            if role.workspace == workspace:
260
                return role.role
261
262
        return UserRoleInWorkspace.NOT_APPLICABLE
0 ignored issues
show
introduced by
The variable UserRoleInWorkspace does not seem to be defined in case TYPE_CHECKING on line 37 is False. Are you sure this can never be the case?
Loading history...
263
264
    def get_active_roles(self) -> ['UserRoleInWorkspace']:
265
        """
266
        :return: list of roles of the user for all not-deleted workspaces
267
        """
268
        roles = []
269
        for role in self.roles:
270
            if not role.workspace.is_deleted:
271
                roles.append(role)
272
        return roles
273
274
    # Tokens ###
275
276
    def reset_tokens(self):
277
        self._generate_auth_token()
278
        # disable reset_password token
279
        self.reset_password_token_hash = None
280
        self.reset_password_token_created = None
281
282
    # Reset Password Tokens #
283
    def generate_reset_password_token(self) -> str:
284
        reset_password_token, self.reset_password_token_created, self.reset_password_token_hash = self._generate_token(
285
            create_hash=True)  # nopep8
286
        return reset_password_token
287
288
    def validate_reset_password_token(self, token, validity_seconds) -> bool:
289
        if not self.reset_password_token_created:
290
            raise UnvalidResetPasswordToken('reset password token is unvalid due to unknown creation date')  # nopep8
291
        if not self._validate_date(self.reset_password_token_created, validity_seconds):  # nopep8
292
            raise ExpiredResetPasswordToken('reset password token has expired')
293
        if not self._validate_hash(self.reset_password_token_hash, token):
294
            raise UnvalidResetPasswordToken('reset password token is unvalid')
295
        return True
296
297
    # Auth Token #
298
    # TODO - G.M - 04-04-2018 - [auth] Check if this is already needed
299
    # with new auth system
300
301
    def _generate_auth_token(self) -> str:
302
        self.auth_token, self.auth_token_created, _ = self._generate_token()
303
        return self.auth_token
304
305
    # TODO - G.M - 2018-08-23 - Should we store hash instead of direct stored
306
    # auth token ?
307
    def validate_auth_token(self, token, validity_seconds) -> bool:
308
        return self.ensure_auth_token(validity_seconds) == token
309
310
    def ensure_auth_token(self, validity_seconds) -> str:
311
        """
312
        Create auth_token if None, regenerate auth_token if too much old.
313
        auth_token validity is set in
314
        :return: actual valid auth token
315
        """
316
317
        if not self.auth_token or not self.auth_token_created:
318
            self._generate_auth_token()
319
            return self.auth_token
320
321
        if not self._validate_date(self.auth_token_created, validity_seconds):
322
            self._generate_auth_token()
323
324
        return self.auth_token
325
326
    # Utils functions #
327
328
    @classmethod
329
    def _hash(cls, cleartext_password_or_token: str) -> str:
330
        salt = sha256()
331
        salt.update(os.urandom(60))
332
        salt = salt.hexdigest()
333
334
        hashed = sha256()
335
        # Make sure password is a str because we cannot hash unicode objects
336
        hashed.update((cleartext_password_or_token + salt).encode('utf-8'))
337
        hashed = hashed.hexdigest()
338
339
        ciphertext_password = salt + hashed
340
341
        # Make sure the hashed password is a unicode object at the end of the
342
        # process because SQLAlchemy _wants_ unicode objects for Unicode cols
343
        # FIXME - D.A. - 2013-11-20 - The following line has been removed since using python3. Is this normal ?!
344
        # password = password.decode('utf-8')
345
346
        return ciphertext_password
347
348
    @classmethod
349
    def _validate_hash(cls, hashed: str, cleartext_password_or_token: str) -> bool:  # nopep8
350
        result = False
351
        if hashed:
352
            new_hash = sha256()
353
            new_hash.update((cleartext_password_or_token + hashed[:64]).encode('utf-8'))
354
            result = hashed[64:] == new_hash.hexdigest()
355
        return result
356
357
    @classmethod
358
    def _generate_token(cls, create_hash=False) -> typing.Union[str, datetime, typing.Optional[str]]:  # nopep8
359
        token = str(uuid.uuid4())
360
        creation_datetime = datetime.utcnow()
361
        hashed_token = None
362
        if create_hash:
363
            hashed_token = cls._hash(token)
364
        return token, creation_datetime, hashed_token
365
366
    @classmethod
367
    def _validate_date(cls, date: datetime, validity_seconds: int) -> bool:
368
        if not date:
369
            return False
370
        now_seconds = time.mktime(datetime.utcnow().timetuple())
371
        auth_token_seconds = time.mktime(date.timetuple())
372
        difference = now_seconds - auth_token_seconds
373
374
        if difference > validity_seconds:
375
            return False
376
        return True
377
378
379
class Permission(DeclarativeBase):
380
    """
381
    Permission definition.
382
383
    Only the ``permission_name`` column is required.
384
385
    """
386
387
    __tablename__ = 'permissions'
388
389
    permission_id = Column(
390
        Integer,
391
        Sequence('seq__permissions__permission_id'),
392
        autoincrement=True,
393
        primary_key=True
394
    )
395
    permission_name = Column(Unicode(63), unique=True, nullable=False)
396
    description = Column(Unicode(255))
397
398
    groups = relation(Group, secondary=group_permission_table,
399
                      backref='permissions')
400
401
    def __repr__(self):
402
        return '<Permission: name=%s>' % repr(self.permission_name)
403
404
    def __unicode__(self):
405
        return self.permission_name
406