backend.tracim_backend.lib.core.user   F
last analyzed

Complexity

Total Complexity 102

Size/Duplication

Total Lines 588
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 102
eloc 389
dl 0
loc 588
rs 2
c 0
b 0
f 0

32 Methods

Rating   Name   Duplication   Size   Complexity  
A UserApi.user_with_email_exists() 0 7 2
A UserApi.get_one_by_public_name() 0 9 2
A UserApi.get_all() 0 2 1
A UserApi.get_one_by_email() 0 11 2
B UserApi.find() 0 33 7
A UserApi._base_query() 0 7 3
A UserApi.__init__() 0 13 1
A UserApi._check_email() 0 14 3
A UserApi.get_user_with_context() 0 10 1
A UserApi._check_email_correctness() 0 13 3
A UserApi.get_one() 0 9 2
A UserApi.get_current_user() 0 7 2
A UserApi.set_password_reset_token() 0 21 3
A UserApi.get_one_by_id() 0 2 1
B UserApi.get_known_user() 0 44 6
A UserApi.check_email_already_in_db() 0 5 1
A UserApi.authenticate_user() 0 18 4
A UserApi.set_email() 0 28 3
A UserApi._get_all_query() 0 2 1
B UserApi.set_password() 0 37 5
F UserApi.update() 0 45 15
A UserApi.undelete() 0 4 2
A UserApi.disable() 0 9 4
A UserApi.enable() 0 4 2
A UserApi.validate_reset_password_token() 0 4 1
A UserApi.delete() 0 8 4
A UserApi.create_minimal_user() 0 28 4
A UserApi.allowed_to_invite_new_user() 0 15 4
A UserApi.execute_created_user_actions() 0 20 1
A UserApi.reset_password_notification() 0 18 4
B UserApi.create_user() 0 51 7
A UserApi.save() 0 2 1

How to fix   Complexity   

Complexity

Complex classes like backend.tracim_backend.lib.core.user often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
import typing as typing
3
from smtplib import SMTPException
4
from smtplib import SMTPRecipientsRefused
5
6
import transaction
7
from sqlalchemy import func
8
from sqlalchemy import or_
9
from sqlalchemy.orm import Query
10
from sqlalchemy.orm import Session
11
from sqlalchemy.orm.exc import NoResultFound
12
13
from tracim_backend.config import CFG
14
from tracim_backend.exceptions import AuthenticationFailed
15
from tracim_backend.exceptions import EmailAlreadyExistInDb
16
from tracim_backend.exceptions import EmailValidationFailed
17
from tracim_backend.exceptions import \
18
    NotificationDisabledCantCreateUserWithInvitation
19
from tracim_backend.exceptions import NotificationDisabledCantResetPassword
20
from tracim_backend.exceptions import NotificationSendingFailed
21
from tracim_backend.exceptions import NoUserSetted
22
from tracim_backend.exceptions import PasswordDoNotMatch
23
from tracim_backend.exceptions import TooShortAutocompleteString
24
from tracim_backend.exceptions import UnvalidResetPasswordToken
25
from tracim_backend.exceptions import UserAuthenticatedIsNotActive
26
from tracim_backend.exceptions import UserCantChangeIsOwnProfile
27
from tracim_backend.exceptions import UserCantDeleteHimself
28
from tracim_backend.exceptions import UserCantDisableHimself
29
from tracim_backend.exceptions import UserDoesNotExist
30
from tracim_backend.exceptions import WrongUserPassword
31
from tracim_backend.lib.core.group import GroupApi
32
from tracim_backend.lib.mail_notifier.notifier import get_email_manager
33
from tracim_backend.lib.utils.logger import logger
34
from tracim_backend.models.auth import Group
35
from tracim_backend.models.auth import User
36
from tracim_backend.models.context_models import TypeUser
37
from tracim_backend.models.context_models import UserInContext
38
from tracim_backend.models.data import UserRoleInWorkspace
39
40
41
class UserApi(object):
42
43
    def __init__(
44
            self,
45
            current_user: typing.Optional[User],
46
            session: Session,
47
            config: CFG,
48
            show_deleted: bool = False,
49
            show_deactivated: bool = True,
50
    ) -> None:
51
        self._session = session
52
        self._user = current_user
53
        self._config = config
54
        self._show_deleted = show_deleted
55
        self._show_deactivated = show_deactivated
56
57
    def _base_query(self):
58
        query = self._session.query(User)
59
        if not self._show_deleted:
60
            query = query.filter(User.is_deleted == False)
61
        if not self._show_deactivated:
62
            query = query.filter(User.is_active == True)
63
        return query
64
65
    def get_user_with_context(self, user: User) -> UserInContext:
66
        """
67
        Return UserInContext object from User
68
        """
69
        user = UserInContext(
70
            user=user,
71
            dbsession=self._session,
72
            config=self._config,
73
        )
74
        return user
75
76
    # Getters
77
78
    def get_one(self, user_id: int) -> User:
79
        """
80
        Get one user by user id
81
        """
82
        try:
83
            user = self._base_query().filter(User.user_id == user_id).one()
84
        except NoResultFound as exc:
85
            raise UserDoesNotExist('User "{}" not found in database'.format(user_id)) from exc  # nopep8
86
        return user
87
88
    def get_one_by_email(self, email: str) -> User:
89
        """
90
        Get one user by email
91
        :param email: Email of the user
92
        :return: one user
93
        """
94
        try:
95
            user = self._base_query().filter(User.email == email).one()
96
        except NoResultFound as exc:
97
            raise UserDoesNotExist('User "{}" not found in database'.format(email)) from exc  # nopep8
98
        return user
99
100
    def get_one_by_public_name(self, public_name: str) -> User:
101
        """
102
        Get one user by public_name
103
        """
104
        try:
105
            user = self._base_query().filter(User.display_name == public_name).one()
106
        except NoResultFound as exc:
107
            raise UserDoesNotExist('User "{}" not found in database'.format(public_name)) from exc  # nopep8
108
        return user
109
    # FIXME - G.M - 24-04-2018 - Duplicate method with get_one.
110
111
    def get_one_by_id(self, id: int) -> User:
112
        return self.get_one(user_id=id)
113
114
    def get_current_user(self) -> User:
115
        """
116
        Get current_user
117
        """
118
        if not self._user:
119
            raise UserDoesNotExist('There is no current user')
120
        return self._user
121
122
    def _get_all_query(self) -> Query:
123
        return self._session.query(User).order_by(func.lower(User.display_name))
124
125
    def get_all(self) -> typing.Iterable[User]:
126
        return self._get_all_query().all()
127
128
    def get_known_user(
129
            self,
130
            acp: str,
131
            exclude_user_ids: typing.List[int] = None,
132
            exclude_workspace_ids: typing.List[int] = None,
133
    ) -> typing.Iterable[User]:
134
        """
135
        Return list of know user by current UserApi user.
136
        :param acp: autocomplete filter by name/email
137
        :param exclude_user_ids: user id to exclude from result
138
        :param exclude_workspace_ids: workspace user to exclude from result
139
        :return: List of found users
140
        """
141
        if len(acp) < 2:
142
            raise TooShortAutocompleteString(
143
                '"{acp}" is a too short string, acp string need to have more than one character'.format(acp=acp)  # nopep8
144
            )
145
        exclude_workspace_ids = exclude_workspace_ids or []  # DFV
146
        exclude_user_ids = exclude_user_ids or []  # DFV
147
        if exclude_workspace_ids:
148
            user_ids_in_workspaces_tuples = self._session\
149
                .query(UserRoleInWorkspace.user_id)\
150
                .distinct(UserRoleInWorkspace.user_id) \
151
                .filter(UserRoleInWorkspace.workspace_id.in_(exclude_workspace_ids))\
152
                .all()
153
            user_ids_in_workspaces = [item[0] for item in user_ids_in_workspaces_tuples]
154
            exclude_user_ids.extend(user_ids_in_workspaces)
155
        query = self._base_query().order_by(User.display_name)
156
        query = query.filter(or_(User.display_name.ilike('%{}%'.format(acp)), User.email.ilike('%{}%'.format(acp))))  # nopep8
157
        # INFO - G.M - 2018-07-27 - if user is set and is simple user, we
158
        # should show only user in same workspace as user
159
        if self._user and self._user.profile.id <= Group.TIM_USER:
160
            user_workspaces_id_query = self._session.\
161
                query(UserRoleInWorkspace.workspace_id).\
162
                distinct(UserRoleInWorkspace.workspace_id).\
163
                filter(UserRoleInWorkspace.user_id == self._user.user_id)
164
            users_in_workspaces = self._session.\
165
                query(UserRoleInWorkspace.user_id).\
166
                distinct(UserRoleInWorkspace.user_id).\
167
                filter(UserRoleInWorkspace.workspace_id.in_(user_workspaces_id_query.subquery())).subquery()  # nopep8
168
            query = query.filter(User.user_id.in_(users_in_workspaces))
169
        if exclude_user_ids:
170
            query = query.filter(~User.user_id.in_(exclude_user_ids))
171
        return query.all()
172
173
    def find(
174
            self,
175
            user_id: int=None,
176
            email: str=None,
177
            public_name: str=None
178
    ) -> typing.Tuple[TypeUser, User]:
179
        """
180
        Find existing user from all theses params.
181
        Check is made in this order: user_id, email, public_name
182
        If no user found raise UserDoesNotExist exception
183
        """
184
        user = None
185
186
        if user_id:
187
            try:
188
                user = self.get_one(user_id)
189
                return TypeUser.USER_ID, user
190
            except UserDoesNotExist:
191
                pass
192
        if email:
193
            try:
194
                user = self.get_one_by_email(email)
195
                return TypeUser.EMAIL, user
196
            except UserDoesNotExist:
197
                pass
198
        if public_name:
199
            try:
200
                user = self.get_one_by_public_name(public_name)
201
                return TypeUser.PUBLIC_NAME, user
202
            except UserDoesNotExist:
203
                pass
204
205
        raise UserDoesNotExist('User not found with any of given params.')
206
207
    # Check methods
208
209
    def user_with_email_exists(self, email: str) -> bool:
210
        try:
211
            self.get_one_by_email(email)
212
            return True
213
        # TODO - G.M - 09-04-2018 - Better exception
214
        except:
215
            return False
216
217
    def authenticate_user(self, email: str, password: str) -> User:
218
        """
219
        Authenticate user with email and password, raise AuthenticationFailed
220
        if uncorrect.
221
        :param email: email of the user
222
        :param password: cleartext password of the user
223
        :return: User who was authenticated.
224
        """
225
        try:
226
            user = self.get_one_by_email(email)
227
            if not user.is_active:
228
                raise UserAuthenticatedIsNotActive('User "{}" is not active'.format(email))
229
            if user.validate_password(password):
230
                return user
231
            else:
232
                raise WrongUserPassword('User "{}" password is incorrect'.format(email))  # nopep8
233
        except (WrongUserPassword, UserDoesNotExist) as exc:
234
            raise AuthenticationFailed('User "{}" authentication failed'.format(email)) from exc  # nopep8
235
236
    # Actions
237
    def set_password(
238
            self,
239
            user: User,
240
            loggedin_user_password: str,
241
            new_password: str,
242
            new_password2: str,
243
            do_save: bool=True
244
    ):
245
        """
246
        Set User password if logged-in user password is correct
247
        and both new_password are the same.
248
        :param user: User who need password changed
249
        :param loggedin_user_password: cleartext password of logged user (not
250
        same as user)
251
        :param new_password: new password for user
252
        :param new_password2: should be same as new_password
253
        :param do_save: should we save new user password ?
254
        :return:
255
        """
256
        if not self._user:
257
            raise NoUserSetted('Current User should be set in UserApi to use this method')  # nopep8
258
        if not self._user.validate_password(loggedin_user_password):  # nopep8
259
            raise WrongUserPassword(
260
                'Wrong password for authenticated user {}'. format(self._user.user_id)  # nopep8
261
            )
262
        if new_password != new_password2:
263
            raise PasswordDoNotMatch('Passwords given are different')
264
265
        self.update(
266
            user=user,
267
            password=new_password,
268
            do_save=do_save,
269
        )
270
        if do_save:
271
            # TODO - G.M - 2018-07-24 - Check why commit is needed here
272
            self.save(user)
273
        return user
274
275
    def set_email(
276
            self,
277
            user: User,
278
            loggedin_user_password: str,
279
            email: str,
280
            do_save: bool = True
281
    ):
282
        """
283
        Set email address of user if loggedin user password is correct
284
        :param user: User who need email changed
285
        :param loggedin_user_password: cleartext password of logged user (not
286
        same as user)
287
        :param email:
288
        :param do_save:
289
        :return:
290
        """
291
        if not self._user:
292
            raise NoUserSetted('Current User should be set in UserApi to use this method')  # nopep8
293
        if not self._user.validate_password(loggedin_user_password):  # nopep8
294
            raise WrongUserPassword(
295
                'Wrong password for authenticated user {}'. format(self._user.user_id)  # nopep8
296
            )
297
        self.update(
298
            user=user,
299
            email=email,
300
            do_save=do_save,
301
        )
302
        return user
303
304
    def set_password_reset_token(
305
            self,
306
            user: User,
307
            new_password: str,
308
            new_password2: str,
309
            reset_token: str,
310
            do_save: bool = False,
311
    ):
312
        self.validate_reset_password_token(user, reset_token)
313
        if new_password != new_password2:
314
            raise PasswordDoNotMatch('Passwords given are different')
315
316
        self.update(
317
            user=user,
318
            password=new_password,
319
            do_save=do_save,
320
        )
321
        user.reset_tokens()
322
        if do_save:
323
            self.save(user)
324
        return user
325
326
    def _check_email(self, email: str) -> bool:
327
        """
328
        Check if email is completely ok to be used in user db table
329
        """
330
        is_email_correct = self._check_email_correctness(email)
331
        if not is_email_correct:
332
            raise EmailValidationFailed(
333
                'Email given form {} is uncorrect'.format(email))  # nopep8
334
        email_already_exist_in_db = self.check_email_already_in_db(email)
335
        if email_already_exist_in_db:
336
            raise EmailAlreadyExistInDb(
337
                'Email given {} already exist, please choose something else'.format(email)  # nopep8
338
            )
339
        return True
340
341
    def check_email_already_in_db(self, email: str) -> bool:
342
        """
343
        Verify if given email does not already exist in db
344
        """
345
        return self._session.query(User.email).filter(User.email==email).count() != 0  # nopep8
346
347
    def _check_email_correctness(self, email: str) -> bool:
348
        """
349
           Verify if given email is correct:
350
           - check format
351
           - futur active check for email ? (dns based ?)
352
           """
353
        # TODO - G.M - 2018-07-05 - find a better way to check email
354
        if not email:
355
            return False
356
        email = email.split('@')
357
        if len(email) != 2:
358
            return False
359
        return True
360
361
    def update(
362
            self,
363
            user: User,
364
            name: str=None,
365
            email: str=None,
366
            password: str=None,
367
            timezone: str=None,
368
            lang: str=None,
369
            groups: typing.Optional[typing.List[Group]]=None,
370
            do_save=True,
371
    ) -> User:
372
        if name is not None:
373
            user.display_name = name
374
375
        if email is not None and email != user.email:
376
            self._check_email(email)
377
            user.email = email
378
379
        if password is not None:
380
            user.password = password
381
382
        if timezone is not None:
383
            user.timezone = timezone
384
385
        if lang is not None:
386
            user.lang = lang
387
388
        if groups is not None:
389
            if self._user and self._user == user:
390
                raise UserCantChangeIsOwnProfile(
391
                    "User {} can't change is own profile".format(user.user_id)
392
                )
393
            # INFO - G.M - 2018-07-18 - Delete old groups
394
            for group in user.groups:
395
                if group not in groups:
396
                    user.groups.remove(group)
397
            # INFO - G.M - 2018-07-18 - add new groups
398
            for group in groups:
399
                if group not in user.groups:
400
                    user.groups.append(group)
401
402
        if do_save:
403
            self.save(user)
404
405
        return user
406
407
    def create_user(
408
        self,
409
        email,
410
        password: str = None,
411
        name: str = None,
412
        timezone: str = '',
413
        lang: str= None,
414
        groups=[],
415
        do_save: bool=True,
416
        do_notify: bool=True,
417
    ) -> User:
418
        if do_notify and not self._config.EMAIL_NOTIFICATION_ACTIVATED:
419
            raise NotificationDisabledCantCreateUserWithInvitation(
420
                "Can't create user with invitation mail because "
421
                "notification are disabled."
422
            )
423
        new_user = self.create_minimal_user(email, groups, save_now=False)
424
        self.update(
425
            user=new_user,
426
            name=name,
427
            email=email,
428
            password=password,
429
            timezone=timezone,
430
            lang=lang,
431
            do_save=False,
432
        )
433
        if do_notify:
434
            try:
435
                email_manager = get_email_manager(self._config, self._session)
436
                email_manager.notify_created_account(
437
                    new_user,
438
                    password=password
439
                )
440
            # FIXME - G.M - 2018-11-02 - hack: accept bad recipient user creation
441
            # this should be fixed to find a solution to allow "fake" email but
442
            # also have clear error case for valid mail.
443
            except SMTPRecipientsRefused as exc:
444
                logger.warning(
445
                    self,
446
                    "Account created for {email} but SMTP server refuse to send notification".format(  # nopep8
447
                        email=email
448
                    )
449
                )
450
            except SMTPException as exc:
451
                raise NotificationSendingFailed(
452
                    "Notification for new created account can't be send "
453
                    "(SMTP error), new account creation aborted"
454
                ) from exc
455
        if do_save:
456
            self.save(new_user)
457
        return new_user
458
459
    def create_minimal_user(
460
            self,
461
            email,
462
            groups=[],
463
            save_now=False
464
    ) -> User:
465
        """Previous create_user method"""
466
        self._check_email(email)
467
        user = User()
468
        user.email = email
469
        user.display_name = email.split('@')[0]
470
471
        if not groups:
472
            gapi = GroupApi(
473
                current_user=self._user,  # User
474
                session=self._session,
475
                config=self._config,
476
            )
477
            groups = [gapi.get_one(Group.TIM_USER)]
478
        for group in groups:
479
            user.groups.append(group)
480
481
        self._session.add(user)
482
483
        if save_now:
484
            self._session.flush()
485
486
        return user
487
488
    def reset_password_notification(self, user: User, do_save: bool=False) -> str:  # nopep8
489
        """
490
        Reset password notification
491
        :param user: User who want is password resetted
492
        :param do_save: save update ?
493
        :return: reset_password_token
494
        """
495
        if not self._config.EMAIL_NOTIFICATION_ACTIVATED:
496
            raise NotificationDisabledCantResetPassword("cant reset password with notification disabled")  # nopep8
497
        token = user.generate_reset_password_token()
498
        try:
499
            email_manager = get_email_manager(self._config, self._session)
500
            email_manager.notify_reset_password(user, token)
501
        except SMTPException as exc:
502
            raise NotificationSendingFailed("SMTP error, can't send notification") from exc
503
        if do_save:
504
            self.save(user)
505
        return token
506
507
    def validate_reset_password_token(self, user: User, token: str) -> bool:
508
        return user.validate_reset_password_token(
509
            token=token,
510
            validity_seconds=self._config.USER_RESET_PASSWORD_TOKEN_VALIDITY,
511
        )
512
513
    def enable(self, user: User, do_save=False):
514
        user.is_active = True
515
        if do_save:
516
            self.save(user)
517
518
    def disable(self, user: User, do_save=False):
519
        if self._user and self._user == user:
520
            raise UserCantDisableHimself(
521
                "User {} can't disable himself".format(user.user_id)
522
            )
523
524
        user.is_active = False
525
        if do_save:
526
            self.save(user)
527
528
    def delete(self, user: User, do_save=False):
529
        if self._user and self._user == user:
530
            raise UserCantDeleteHimself(
531
                "User {} can't delete himself".format(user.user_id)
532
            )
533
        user.is_deleted = True
534
        if do_save:
535
            self.save(user)
536
537
    def undelete(self, user: User, do_save=False):
538
        user.is_deleted = False
539
        if do_save:
540
            self.save(user)
541
542
    def save(self, user: User):
543
        self._session.flush()
544
545
    def execute_created_user_actions(self, created_user: User) -> None:
546
        """
547
        Execute actions when user just been created
548
        :return:
549
        """
550
        # NOTE: Cyclic import
551
        # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
552
        #from tracim.lib.calendar import CalendarManager
553
        #from tracim.model.organisational import UserCalendar
554
555
        # TODO - G.M - 04-04-2018 - [auth]
556
        # Check if this is already needed with
557
        # new auth system
558
        created_user.ensure_auth_token(
559
            validity_seconds=self._config.USER_AUTH_TOKEN_VALIDITY
560
        )
561
562
        # Ensure database is up-to-date
563
        self._session.flush()
564
        transaction.commit()
565
566
        # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
567
        # calendar_manager = CalendarManager(created_user)
568
        # calendar_manager.create_then_remove_fake_event(
569
        #     calendar_class=UserCalendar,
570
        #     related_object_id=created_user.user_id,
571
        # )
572
573
    def allowed_to_invite_new_user(self, email: str) -> bool:
574
        # INFO - G.M - 2018-10-25 - disallow account creation if no
575
        # email provided or email_notification disabled.
576
        if not email:
577
            return False
578
        if not self._config.EMAIL_NOTIFICATION_ACTIVATED:
579
            return False
580
        # INFO - G.M - 2018-10-25 - do not allow all profile to invite new user
581
        gapi = GroupApi(self._session, self._user, self._config)
582
        invite_minimal_profile = gapi.get_one_with_name(group_name=self._config.INVITE_NEW_USER_MINIMAL_PROFILE)  # nopep8
583
584
        if not self._user.profile.id >= invite_minimal_profile.group_id:
585
            return False
586
587
        return True
588