UserApi.update()   F
last analyzed

Complexity

Conditions 15

Size

Total Lines 45
Code Lines 34

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 34
dl 0
loc 45
rs 2.9998
c 0
b 0
f 0
cc 15
nop 9

How to fix   Complexity    Many Parameters   

Complexity

Complex classes like backend.tracim_backend.lib.core.user.UserApi.update() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
# -*- coding: utf-8 -*-
2
import typing as typing
3
from smtplib import SMTPException
4
from smtplib import SMTPRecipientsRefused
5
6
import transaction
7
from sqlalchemy import func
8
from sqlalchemy import or_
9
from sqlalchemy.orm import Query
10
from sqlalchemy.orm import Session
11
from sqlalchemy.orm.exc import NoResultFound
12
13
from tracim_backend.config import CFG
14
from tracim_backend.exceptions import AuthenticationFailed
15
from tracim_backend.exceptions import EmailAlreadyExistInDb
16
from tracim_backend.exceptions import EmailValidationFailed
17
from tracim_backend.exceptions import \
18
    NotificationDisabledCantCreateUserWithInvitation
19
from tracim_backend.exceptions import NotificationDisabledCantResetPassword
20
from tracim_backend.exceptions import NotificationSendingFailed
21
from tracim_backend.exceptions import NoUserSetted
22
from tracim_backend.exceptions import PasswordDoNotMatch
23
from tracim_backend.exceptions import TooShortAutocompleteString
24
from tracim_backend.exceptions import UnvalidResetPasswordToken
25
from tracim_backend.exceptions import UserAuthenticatedIsNotActive
26
from tracim_backend.exceptions import UserCantChangeIsOwnProfile
27
from tracim_backend.exceptions import UserCantDeleteHimself
28
from tracim_backend.exceptions import UserCantDisableHimself
29
from tracim_backend.exceptions import UserDoesNotExist
30
from tracim_backend.exceptions import WrongUserPassword
31
from tracim_backend.lib.core.group import GroupApi
32
from tracim_backend.lib.mail_notifier.notifier import get_email_manager
33
from tracim_backend.lib.utils.logger import logger
34
from tracim_backend.models.auth import Group
35
from tracim_backend.models.auth import User
36
from tracim_backend.models.context_models import TypeUser
37
from tracim_backend.models.context_models import UserInContext
38
from tracim_backend.models.data import UserRoleInWorkspace
39
40
41
class UserApi(object):
42
43
    def __init__(
44
            self,
45
            current_user: typing.Optional[User],
46
            session: Session,
47
            config: CFG,
48
            show_deleted: bool = False,
49
            show_deactivated: bool = True,
50
    ) -> None:
51
        self._session = session
52
        self._user = current_user
53
        self._config = config
54
        self._show_deleted = show_deleted
55
        self._show_deactivated = show_deactivated
56
57
    def _base_query(self):
58
        query = self._session.query(User)
59
        if not self._show_deleted:
60
            query = query.filter(User.is_deleted == False)
61
        if not self._show_deactivated:
62
            query = query.filter(User.is_active == True)
63
        return query
64
65
    def get_user_with_context(self, user: User) -> UserInContext:
66
        """
67
        Return UserInContext object from User
68
        """
69
        user = UserInContext(
70
            user=user,
71
            dbsession=self._session,
72
            config=self._config,
73
        )
74
        return user
75
76
    # Getters
77
78
    def get_one(self, user_id: int) -> User:
79
        """
80
        Get one user by user id
81
        """
82
        try:
83
            user = self._base_query().filter(User.user_id == user_id).one()
84
        except NoResultFound as exc:
85
            raise UserDoesNotExist('User "{}" not found in database'.format(user_id)) from exc  # nopep8
86
        return user
87
88
    def get_one_by_email(self, email: str) -> User:
89
        """
90
        Get one user by email
91
        :param email: Email of the user
92
        :return: one user
93
        """
94
        try:
95
            user = self._base_query().filter(User.email == email).one()
96
        except NoResultFound as exc:
97
            raise UserDoesNotExist('User "{}" not found in database'.format(email)) from exc  # nopep8
98
        return user
99
100
    def get_one_by_public_name(self, public_name: str) -> User:
101
        """
102
        Get one user by public_name
103
        """
104
        try:
105
            user = self._base_query().filter(User.display_name == public_name).one()
106
        except NoResultFound as exc:
107
            raise UserDoesNotExist('User "{}" not found in database'.format(public_name)) from exc  # nopep8
108
        return user
109
    # FIXME - G.M - 24-04-2018 - Duplicate method with get_one.
110
111
    def get_one_by_id(self, id: int) -> User:
112
        return self.get_one(user_id=id)
113
114
    def get_current_user(self) -> User:
115
        """
116
        Get current_user
117
        """
118
        if not self._user:
119
            raise UserDoesNotExist('There is no current user')
120
        return self._user
121
122
    def _get_all_query(self) -> Query:
123
        return self._session.query(User).order_by(func.lower(User.display_name))
124
125
    def get_all(self) -> typing.Iterable[User]:
126
        return self._get_all_query().all()
127
128
    def get_known_user(
129
            self,
130
            acp: str,
131
            exclude_user_ids: typing.List[int] = None,
132
            exclude_workspace_ids: typing.List[int] = None,
133
    ) -> typing.Iterable[User]:
134
        """
135
        Return list of know user by current UserApi user.
136
        :param acp: autocomplete filter by name/email
137
        :param exclude_user_ids: user id to exclude from result
138
        :param exclude_workspace_ids: workspace user to exclude from result
139
        :return: List of found users
140
        """
141
        if len(acp) < 2:
142
            raise TooShortAutocompleteString(
143
                '"{acp}" is a too short string, acp string need to have more than one character'.format(acp=acp)  # nopep8
144
            )
145
        exclude_workspace_ids = exclude_workspace_ids or []  # DFV
146
        exclude_user_ids = exclude_user_ids or []  # DFV
147
        if exclude_workspace_ids:
148
            user_ids_in_workspaces_tuples = self._session\
149
                .query(UserRoleInWorkspace.user_id)\
150
                .distinct(UserRoleInWorkspace.user_id) \
151
                .filter(UserRoleInWorkspace.workspace_id.in_(exclude_workspace_ids))\
152
                .all()
153
            user_ids_in_workspaces = [item[0] for item in user_ids_in_workspaces_tuples]
154
            exclude_user_ids.extend(user_ids_in_workspaces)
155
        query = self._base_query().order_by(User.display_name)
156
        query = query.filter(or_(User.display_name.ilike('%{}%'.format(acp)), User.email.ilike('%{}%'.format(acp))))  # nopep8
157
        # INFO - G.M - 2018-07-27 - if user is set and is simple user, we
158
        # should show only user in same workspace as user
159
        if self._user and self._user.profile.id <= Group.TIM_USER:
160
            user_workspaces_id_query = self._session.\
161
                query(UserRoleInWorkspace.workspace_id).\
162
                distinct(UserRoleInWorkspace.workspace_id).\
163
                filter(UserRoleInWorkspace.user_id == self._user.user_id)
164
            users_in_workspaces = self._session.\
165
                query(UserRoleInWorkspace.user_id).\
166
                distinct(UserRoleInWorkspace.user_id).\
167
                filter(UserRoleInWorkspace.workspace_id.in_(user_workspaces_id_query.subquery())).subquery()  # nopep8
168
            query = query.filter(User.user_id.in_(users_in_workspaces))
169
        if exclude_user_ids:
170
            query = query.filter(~User.user_id.in_(exclude_user_ids))
171
        return query.all()
172
173
    def find(
174
            self,
175
            user_id: int=None,
176
            email: str=None,
177
            public_name: str=None
178
    ) -> typing.Tuple[TypeUser, User]:
179
        """
180
        Find existing user from all theses params.
181
        Check is made in this order: user_id, email, public_name
182
        If no user found raise UserDoesNotExist exception
183
        """
184
        user = None
185
186
        if user_id:
187
            try:
188
                user = self.get_one(user_id)
189
                return TypeUser.USER_ID, user
190
            except UserDoesNotExist:
191
                pass
192
        if email:
193
            try:
194
                user = self.get_one_by_email(email)
195
                return TypeUser.EMAIL, user
196
            except UserDoesNotExist:
197
                pass
198
        if public_name:
199
            try:
200
                user = self.get_one_by_public_name(public_name)
201
                return TypeUser.PUBLIC_NAME, user
202
            except UserDoesNotExist:
203
                pass
204
205
        raise UserDoesNotExist('User not found with any of given params.')
206
207
    # Check methods
208
209
    def user_with_email_exists(self, email: str) -> bool:
210
        try:
211
            self.get_one_by_email(email)
212
            return True
213
        # TODO - G.M - 09-04-2018 - Better exception
214
        except:
215
            return False
216
217
    def authenticate_user(self, email: str, password: str) -> User:
218
        """
219
        Authenticate user with email and password, raise AuthenticationFailed
220
        if uncorrect.
221
        :param email: email of the user
222
        :param password: cleartext password of the user
223
        :return: User who was authenticated.
224
        """
225
        try:
226
            user = self.get_one_by_email(email)
227
            if not user.is_active:
228
                raise UserAuthenticatedIsNotActive('User "{}" is not active'.format(email))
229
            if user.validate_password(password):
230
                return user
231
            else:
232
                raise WrongUserPassword('User "{}" password is incorrect'.format(email))  # nopep8
233
        except (WrongUserPassword, UserDoesNotExist) as exc:
234
            raise AuthenticationFailed('User "{}" authentication failed'.format(email)) from exc  # nopep8
235
236
    # Actions
237
    def set_password(
238
            self,
239
            user: User,
240
            loggedin_user_password: str,
241
            new_password: str,
242
            new_password2: str,
243
            do_save: bool=True
244
    ):
245
        """
246
        Set User password if logged-in user password is correct
247
        and both new_password are the same.
248
        :param user: User who need password changed
249
        :param loggedin_user_password: cleartext password of logged user (not
250
        same as user)
251
        :param new_password: new password for user
252
        :param new_password2: should be same as new_password
253
        :param do_save: should we save new user password ?
254
        :return:
255
        """
256
        if not self._user:
257
            raise NoUserSetted('Current User should be set in UserApi to use this method')  # nopep8
258
        if not self._user.validate_password(loggedin_user_password):  # nopep8
259
            raise WrongUserPassword(
260
                'Wrong password for authenticated user {}'. format(self._user.user_id)  # nopep8
261
            )
262
        if new_password != new_password2:
263
            raise PasswordDoNotMatch('Passwords given are different')
264
265
        self.update(
266
            user=user,
267
            password=new_password,
268
            do_save=do_save,
269
        )
270
        if do_save:
271
            # TODO - G.M - 2018-07-24 - Check why commit is needed here
272
            self.save(user)
273
        return user
274
275
    def set_email(
276
            self,
277
            user: User,
278
            loggedin_user_password: str,
279
            email: str,
280
            do_save: bool = True
281
    ):
282
        """
283
        Set email address of user if loggedin user password is correct
284
        :param user: User who need email changed
285
        :param loggedin_user_password: cleartext password of logged user (not
286
        same as user)
287
        :param email:
288
        :param do_save:
289
        :return:
290
        """
291
        if not self._user:
292
            raise NoUserSetted('Current User should be set in UserApi to use this method')  # nopep8
293
        if not self._user.validate_password(loggedin_user_password):  # nopep8
294
            raise WrongUserPassword(
295
                'Wrong password for authenticated user {}'. format(self._user.user_id)  # nopep8
296
            )
297
        self.update(
298
            user=user,
299
            email=email,
300
            do_save=do_save,
301
        )
302
        return user
303
304
    def set_password_reset_token(
305
            self,
306
            user: User,
307
            new_password: str,
308
            new_password2: str,
309
            reset_token: str,
310
            do_save: bool = False,
311
    ):
312
        self.validate_reset_password_token(user, reset_token)
313
        if new_password != new_password2:
314
            raise PasswordDoNotMatch('Passwords given are different')
315
316
        self.update(
317
            user=user,
318
            password=new_password,
319
            do_save=do_save,
320
        )
321
        user.reset_tokens()
322
        if do_save:
323
            self.save(user)
324
        return user
325
326
    def _check_email(self, email: str) -> bool:
327
        """
328
        Check if email is completely ok to be used in user db table
329
        """
330
        is_email_correct = self._check_email_correctness(email)
331
        if not is_email_correct:
332
            raise EmailValidationFailed(
333
                'Email given form {} is uncorrect'.format(email))  # nopep8
334
        email_already_exist_in_db = self.check_email_already_in_db(email)
335
        if email_already_exist_in_db:
336
            raise EmailAlreadyExistInDb(
337
                'Email given {} already exist, please choose something else'.format(email)  # nopep8
338
            )
339
        return True
340
341
    def check_email_already_in_db(self, email: str) -> bool:
342
        """
343
        Verify if given email does not already exist in db
344
        """
345
        return self._session.query(User.email).filter(User.email==email).count() != 0  # nopep8
346
347
    def _check_email_correctness(self, email: str) -> bool:
348
        """
349
           Verify if given email is correct:
350
           - check format
351
           - futur active check for email ? (dns based ?)
352
           """
353
        # TODO - G.M - 2018-07-05 - find a better way to check email
354
        if not email:
355
            return False
356
        email = email.split('@')
357
        if len(email) != 2:
358
            return False
359
        return True
360
361
    def update(
362
            self,
363
            user: User,
364
            name: str=None,
365
            email: str=None,
366
            password: str=None,
367
            timezone: str=None,
368
            lang: str=None,
369
            groups: typing.Optional[typing.List[Group]]=None,
370
            do_save=True,
371
    ) -> User:
372
        if name is not None:
373
            user.display_name = name
374
375
        if email is not None and email != user.email:
376
            self._check_email(email)
377
            user.email = email
378
379
        if password is not None:
380
            user.password = password
381
382
        if timezone is not None:
383
            user.timezone = timezone
384
385
        if lang is not None:
386
            user.lang = lang
387
388
        if groups is not None:
389
            if self._user and self._user == user:
390
                raise UserCantChangeIsOwnProfile(
391
                    "User {} can't change is own profile".format(user.user_id)
392
                )
393
            # INFO - G.M - 2018-07-18 - Delete old groups
394
            for group in user.groups:
395
                if group not in groups:
396
                    user.groups.remove(group)
397
            # INFO - G.M - 2018-07-18 - add new groups
398
            for group in groups:
399
                if group not in user.groups:
400
                    user.groups.append(group)
401
402
        if do_save:
403
            self.save(user)
404
405
        return user
406
407
    def create_user(
408
        self,
409
        email,
410
        password: str = None,
411
        name: str = None,
412
        timezone: str = '',
413
        lang: str= None,
414
        groups=[],
415
        do_save: bool=True,
416
        do_notify: bool=True,
417
    ) -> User:
418
        if do_notify and not self._config.EMAIL_NOTIFICATION_ACTIVATED:
419
            raise NotificationDisabledCantCreateUserWithInvitation(
420
                "Can't create user with invitation mail because "
421
                "notification are disabled."
422
            )
423
        new_user = self.create_minimal_user(email, groups, save_now=False)
424
        self.update(
425
            user=new_user,
426
            name=name,
427
            email=email,
428
            password=password,
429
            timezone=timezone,
430
            lang=lang,
431
            do_save=False,
432
        )
433
        if do_notify:
434
            try:
435
                email_manager = get_email_manager(self._config, self._session)
436
                email_manager.notify_created_account(
437
                    new_user,
438
                    password=password
439
                )
440
            # FIXME - G.M - 2018-11-02 - hack: accept bad recipient user creation
441
            # this should be fixed to find a solution to allow "fake" email but
442
            # also have clear error case for valid mail.
443
            except SMTPRecipientsRefused as exc:
444
                logger.warning(
445
                    self,
446
                    "Account created for {email} but SMTP server refuse to send notification".format(  # nopep8
447
                        email=email
448
                    )
449
                )
450
            except SMTPException as exc:
451
                raise NotificationSendingFailed(
452
                    "Notification for new created account can't be send "
453
                    "(SMTP error), new account creation aborted"
454
                ) from exc
455
        if do_save:
456
            self.save(new_user)
457
        return new_user
458
459
    def create_minimal_user(
460
            self,
461
            email,
462
            groups=[],
463
            save_now=False
464
    ) -> User:
465
        """Previous create_user method"""
466
        self._check_email(email)
467
        user = User()
468
        user.email = email
469
        user.display_name = email.split('@')[0]
470
471
        if not groups:
472
            gapi = GroupApi(
473
                current_user=self._user,  # User
474
                session=self._session,
475
                config=self._config,
476
            )
477
            groups = [gapi.get_one(Group.TIM_USER)]
478
        for group in groups:
479
            user.groups.append(group)
480
481
        self._session.add(user)
482
483
        if save_now:
484
            self._session.flush()
485
486
        return user
487
488
    def reset_password_notification(self, user: User, do_save: bool=False) -> str:  # nopep8
489
        """
490
        Reset password notification
491
        :param user: User who want is password resetted
492
        :param do_save: save update ?
493
        :return: reset_password_token
494
        """
495
        if not self._config.EMAIL_NOTIFICATION_ACTIVATED:
496
            raise NotificationDisabledCantResetPassword("cant reset password with notification disabled")  # nopep8
497
        token = user.generate_reset_password_token()
498
        try:
499
            email_manager = get_email_manager(self._config, self._session)
500
            email_manager.notify_reset_password(user, token)
501
        except SMTPException as exc:
502
            raise NotificationSendingFailed("SMTP error, can't send notification") from exc
503
        if do_save:
504
            self.save(user)
505
        return token
506
507
    def validate_reset_password_token(self, user: User, token: str) -> bool:
508
        return user.validate_reset_password_token(
509
            token=token,
510
            validity_seconds=self._config.USER_RESET_PASSWORD_TOKEN_VALIDITY,
511
        )
512
513
    def enable(self, user: User, do_save=False):
514
        user.is_active = True
515
        if do_save:
516
            self.save(user)
517
518
    def disable(self, user: User, do_save=False):
519
        if self._user and self._user == user:
520
            raise UserCantDisableHimself(
521
                "User {} can't disable himself".format(user.user_id)
522
            )
523
524
        user.is_active = False
525
        if do_save:
526
            self.save(user)
527
528
    def delete(self, user: User, do_save=False):
529
        if self._user and self._user == user:
530
            raise UserCantDeleteHimself(
531
                "User {} can't delete himself".format(user.user_id)
532
            )
533
        user.is_deleted = True
534
        if do_save:
535
            self.save(user)
536
537
    def undelete(self, user: User, do_save=False):
538
        user.is_deleted = False
539
        if do_save:
540
            self.save(user)
541
542
    def save(self, user: User):
543
        self._session.flush()
544
545
    def execute_created_user_actions(self, created_user: User) -> None:
546
        """
547
        Execute actions when user just been created
548
        :return:
549
        """
550
        # NOTE: Cyclic import
551
        # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
552
        #from tracim.lib.calendar import CalendarManager
553
        #from tracim.model.organisational import UserCalendar
554
555
        # TODO - G.M - 04-04-2018 - [auth]
556
        # Check if this is already needed with
557
        # new auth system
558
        created_user.ensure_auth_token(
559
            validity_seconds=self._config.USER_AUTH_TOKEN_VALIDITY
560
        )
561
562
        # Ensure database is up-to-date
563
        self._session.flush()
564
        transaction.commit()
565
566
        # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
567
        # calendar_manager = CalendarManager(created_user)
568
        # calendar_manager.create_then_remove_fake_event(
569
        #     calendar_class=UserCalendar,
570
        #     related_object_id=created_user.user_id,
571
        # )
572
573
    def allowed_to_invite_new_user(self, email: str) -> bool:
574
        # INFO - G.M - 2018-10-25 - disallow account creation if no
575
        # email provided or email_notification disabled.
576
        if not email:
577
            return False
578
        if not self._config.EMAIL_NOTIFICATION_ACTIVATED:
579
            return False
580
        # INFO - G.M - 2018-10-25 - do not allow all profile to invite new user
581
        gapi = GroupApi(self._session, self._user, self._config)
582
        invite_minimal_profile = gapi.get_one_with_name(group_name=self._config.INVITE_NEW_USER_MINIMAL_PROFILE)  # nopep8
583
584
        if not self._user.profile.id >= invite_minimal_profile.group_id:
585
            return False
586
587
        return True
588