Passed
Pull Request — develop (#92)
by inkhey
02:00
created

UserApi.update()   F

Complexity

Conditions 15

Size

Total Lines 45
Code Lines 34

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 34
dl 0
loc 45
rs 2.9998
c 0
b 0
f 0
cc 15
nop 9

How to fix   Complexity    Many Parameters   

Complexity

Complex classes like backend.tracim_backend.lib.core.user.UserApi.update() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
# -*- coding: utf-8 -*-
2
import typing as typing
3
from smtplib import SMTPException
4
5
import transaction
6
from sqlalchemy import or_
7
from sqlalchemy.orm import Query
8
from sqlalchemy.orm import Session
9
from sqlalchemy.orm.exc import NoResultFound
10
11
from tracim_backend.config import CFG
12
from tracim_backend.exceptions import AuthenticationFailed, \
13
    UserCantDeleteHimself, UserCantChangeIsOwnProfile
14
from tracim_backend.exceptions import EmailAlreadyExistInDb
15
from tracim_backend.exceptions import EmailValidationFailed
16
from tracim_backend.exceptions import \
17
    NotificationDisabledCantCreateUserWithInvitation
18
from tracim_backend.exceptions import NotificationDisabledCantResetPassword
19
from tracim_backend.exceptions import NotificationSendingFailed
20
from tracim_backend.exceptions import NoUserSetted
21
from tracim_backend.exceptions import PasswordDoNotMatch
22
from tracim_backend.exceptions import TooShortAutocompleteString
23
from tracim_backend.exceptions import UnvalidResetPasswordToken
24
from tracim_backend.exceptions import UserAuthenticatedIsNotActive
25
from tracim_backend.exceptions import UserDoesNotExist
26
from tracim_backend.exceptions import UserCantDisableHimself
27
from tracim_backend.exceptions import WrongUserPassword
28
from tracim_backend.lib.core.group import GroupApi
29
from tracim_backend.lib.mail_notifier.notifier import get_email_manager
30
from tracim_backend.models.auth import Group
31
from tracim_backend.models.auth import User
32
from tracim_backend.models.context_models import TypeUser
33
from tracim_backend.models.context_models import UserInContext
34
from tracim_backend.models.data import UserRoleInWorkspace
35
36
37
class UserApi(object):
38
39
    def __init__(
40
            self,
41
            current_user: typing.Optional[User],
42
            session: Session,
43
            config: CFG,
44
            show_deleted: bool = False,
45
            show_deactivated: bool = True,
46
    ) -> None:
47
        self._session = session
48
        self._user = current_user
49
        self._config = config
50
        self._show_deleted = show_deleted
51
        self._show_deactivated = show_deactivated
52
53
    def _base_query(self):
54
        query = self._session.query(User)
55
        if not self._show_deleted:
56
            query = query.filter(User.is_deleted == False)
57
        if not self._show_deactivated:
58
            query = query.filter(User.is_active == True)
59
        return query
60
61
    def get_user_with_context(self, user: User) -> UserInContext:
62
        """
63
        Return UserInContext object from User
64
        """
65
        user = UserInContext(
66
            user=user,
67
            dbsession=self._session,
68
            config=self._config,
69
        )
70
        return user
71
72
    # Getters
73
74
    def get_one(self, user_id: int) -> User:
75
        """
76
        Get one user by user id
77
        """
78
        try:
79
            user = self._base_query().filter(User.user_id == user_id).one()
80
        except NoResultFound as exc:
81
            raise UserDoesNotExist('User "{}" not found in database'.format(user_id)) from exc  # nopep8
82
        return user
83
84
    def get_one_by_email(self, email: str) -> User:
85
        """
86
        Get one user by email
87
        :param email: Email of the user
88
        :return: one user
89
        """
90
        try:
91
            user = self._base_query().filter(User.email == email).one()
92
        except NoResultFound as exc:
93
            raise UserDoesNotExist('User "{}" not found in database'.format(email)) from exc  # nopep8
94
        return user
95
96
    def get_one_by_public_name(self, public_name: str) -> User:
97
        """
98
        Get one user by public_name
99
        """
100
        try:
101
            user = self._base_query().filter(User.display_name == public_name).one()
102
        except NoResultFound as exc:
103
            raise UserDoesNotExist('User "{}" not found in database'.format(public_name)) from exc  # nopep8
104
        return user
105
    # FIXME - G.M - 24-04-2018 - Duplicate method with get_one.
106
107
    def get_one_by_id(self, id: int) -> User:
108
        return self.get_one(user_id=id)
109
110
    def get_current_user(self) -> User:
111
        """
112
        Get current_user
113
        """
114
        if not self._user:
115
            raise UserDoesNotExist('There is no current user')
116
        return self._user
117
118
    def _get_all_query(self) -> Query:
119
        return self._session.query(User).order_by(User.display_name)
120
121
    def get_all(self) -> typing.Iterable[User]:
122
        return self._get_all_query().all()
123
124
    def get_known_user(
125
            self,
126
            acp: str,
127
    ) -> typing.Iterable[User]:
128
        """
129
        Return list of know user by current UserApi user.
130
        :param acp: autocomplete filter by name/email
131
        :return: List of found users
132
        """
133
        if len(acp) < 2:
134
            raise TooShortAutocompleteString(
135
                '"{acp}" is a too short string, acp string need to have more than one character'.format(acp=acp)  # nopep8
136
            )
137
        query = self._base_query().order_by(User.display_name)
138
        query = query.filter(or_(User.display_name.ilike('%{}%'.format(acp)), User.email.ilike('%{}%'.format(acp))))  # nopep8
139
140
        # INFO - G.M - 2018-07-27 - if user is set and is simple user, we
141
        # should show only user in same workspace as user
142
        if self._user and self._user.profile.id <= Group.TIM_USER:
143
            user_workspaces_id_query = self._session.\
144
                query(UserRoleInWorkspace.workspace_id).\
145
                distinct(UserRoleInWorkspace.workspace_id).\
146
                filter(UserRoleInWorkspace.user_id == self._user.user_id)
147
            users_in_workspaces = self._session.\
148
                query(UserRoleInWorkspace.user_id).\
149
                distinct(UserRoleInWorkspace.user_id).\
150
                filter(UserRoleInWorkspace.workspace_id.in_(user_workspaces_id_query.subquery())).subquery()  # nopep8
151
            query = query.filter(User.user_id.in_(users_in_workspaces))
152
        return query.all()
153
154
    def find(
155
            self,
156
            user_id: int=None,
157
            email: str=None,
158
            public_name: str=None
159
    ) -> typing.Tuple[TypeUser, User]:
160
        """
161
        Find existing user from all theses params.
162
        Check is made in this order: user_id, email, public_name
163
        If no user found raise UserDoesNotExist exception
164
        """
165
        user = None
166
167
        if user_id:
168
            try:
169
                user = self.get_one(user_id)
170
                return TypeUser.USER_ID, user
171
            except UserDoesNotExist:
172
                pass
173
        if email:
174
            try:
175
                user = self.get_one_by_email(email)
176
                return TypeUser.EMAIL, user
177
            except UserDoesNotExist:
178
                pass
179
        if public_name:
180
            try:
181
                user = self.get_one_by_public_name(public_name)
182
                return TypeUser.PUBLIC_NAME, user
183
            except UserDoesNotExist:
184
                pass
185
186
        raise UserDoesNotExist('User not found with any of given params.')
187
188
    # Check methods
189
190
    def user_with_email_exists(self, email: str) -> bool:
191
        try:
192
            self.get_one_by_email(email)
193
            return True
194
        # TODO - G.M - 09-04-2018 - Better exception
195
        except:
196
            return False
197
198
    def authenticate_user(self, email: str, password: str) -> User:
199
        """
200
        Authenticate user with email and password, raise AuthenticationFailed
201
        if uncorrect.
202
        :param email: email of the user
203
        :param password: cleartext password of the user
204
        :return: User who was authenticated.
205
        """
206
        try:
207
            user = self.get_one_by_email(email)
208
            if not user.is_active:
209
                raise UserAuthenticatedIsNotActive('User "{}" is not active'.format(email))
210
            if user.validate_password(password):
211
                return user
212
            else:
213
                raise WrongUserPassword('User "{}" password is incorrect'.format(email))  # nopep8
214
        except (WrongUserPassword, UserDoesNotExist) as exc:
215
            raise AuthenticationFailed('User "{}" authentication failed'.format(email)) from exc  # nopep8
216
217
    # Actions
218
    def set_password(
219
            self,
220
            user: User,
221
            loggedin_user_password: str,
222
            new_password: str,
223
            new_password2: str,
224
            do_save: bool=True
225
    ):
226
        """
227
        Set User password if logged-in user password is correct
228
        and both new_password are the same.
229
        :param user: User who need password changed
230
        :param loggedin_user_password: cleartext password of logged user (not
231
        same as user)
232
        :param new_password: new password for user
233
        :param new_password2: should be same as new_password
234
        :param do_save: should we save new user password ?
235
        :return:
236
        """
237
        if not self._user:
238
            raise NoUserSetted('Current User should be set in UserApi to use this method')  # nopep8
239
        if not self._user.validate_password(loggedin_user_password):  # nopep8
240
            raise WrongUserPassword(
241
                'Wrong password for authenticated user {}'. format(self._user.user_id)  # nopep8
242
            )
243
        if new_password != new_password2:
244
            raise PasswordDoNotMatch('Passwords given are different')
245
246
        self.update(
247
            user=user,
248
            password=new_password,
249
            do_save=do_save,
250
        )
251
        if do_save:
252
            # TODO - G.M - 2018-07-24 - Check why commit is needed here
253
            self.save(user)
254
        return user
255
256
    def set_email(
257
            self,
258
            user: User,
259
            loggedin_user_password: str,
260
            email: str,
261
            do_save: bool = True
262
    ):
263
        """
264
        Set email address of user if loggedin user password is correct
265
        :param user: User who need email changed
266
        :param loggedin_user_password: cleartext password of logged user (not
267
        same as user)
268
        :param email:
269
        :param do_save:
270
        :return:
271
        """
272
        if not self._user:
273
            raise NoUserSetted('Current User should be set in UserApi to use this method')  # nopep8
274
        if not self._user.validate_password(loggedin_user_password):  # nopep8
275
            raise WrongUserPassword(
276
                'Wrong password for authenticated user {}'. format(self._user.user_id)  # nopep8
277
            )
278
        self.update(
279
            user=user,
280
            email=email,
281
            do_save=do_save,
282
        )
283
        return user
284
285
    def set_password_reset_token(
286
            self,
287
            user: User,
288
            new_password: str,
289
            new_password2: str,
290
            reset_token: str,
291
            do_save: bool = False,
292
    ):
293
        self.validate_reset_password_token(user, reset_token)
294
        if new_password != new_password2:
295
            raise PasswordDoNotMatch('Passwords given are different')
296
297
        self.update(
298
            user=user,
299
            password=new_password,
300
            do_save=do_save,
301
        )
302
        user.reset_tokens()
303
        if do_save:
304
            self.save(user)
305
        return user
306
307
    def _check_email(self, email: str) -> bool:
308
        """
309
        Check if email is completely ok to be used in user db table
310
        """
311
        is_email_correct = self._check_email_correctness(email)
312
        if not is_email_correct:
313
            raise EmailValidationFailed(
314
                'Email given form {} is uncorrect'.format(email))  # nopep8
315
        email_already_exist_in_db = self.check_email_already_in_db(email)
316
        if email_already_exist_in_db:
317
            raise EmailAlreadyExistInDb(
318
                'Email given {} already exist, please choose something else'.format(email)  # nopep8
319
            )
320
        return True
321
322
    def check_email_already_in_db(self, email: str) -> bool:
323
        """
324
        Verify if given email does not already exist in db
325
        """
326
        return self._session.query(User.email).filter(User.email==email).count() != 0  # nopep8
327
328
    def _check_email_correctness(self, email: str) -> bool:
329
        """
330
           Verify if given email is correct:
331
           - check format
332
           - futur active check for email ? (dns based ?)
333
           """
334
        # TODO - G.M - 2018-07-05 - find a better way to check email
335
        if not email:
336
            return False
337
        email = email.split('@')
338
        if len(email) != 2:
339
            return False
340
        return True
341
342
    def update(
343
            self,
344
            user: User,
345
            name: str=None,
346
            email: str=None,
347
            password: str=None,
348
            timezone: str=None,
349
            lang: str=None,
350
            groups: typing.Optional[typing.List[Group]]=None,
351
            do_save=True,
352
    ) -> User:
353
        if name is not None:
354
            user.display_name = name
355
356
        if email is not None and email != user.email:
357
            self._check_email(email)
358
            user.email = email
359
360
        if password is not None:
361
            user.password = password
362
363
        if timezone is not None:
364
            user.timezone = timezone
365
366
        if lang is not None:
367
            user.lang = lang
368
369
        if groups is not None:
370
            if self._user and self._user == user:
371
                raise UserCantChangeIsOwnProfile(
372
                    "User {} can't change is own profile".format(user.user_id)
373
                )
374
            # INFO - G.M - 2018-07-18 - Delete old groups
375
            for group in user.groups:
376
                if group not in groups:
377
                    user.groups.remove(group)
378
            # INFO - G.M - 2018-07-18 - add new groups
379
            for group in groups:
380
                if group not in user.groups:
381
                    user.groups.append(group)
382
383
        if do_save:
384
            self.save(user)
385
386
        return user
387
388
    def create_user(
389
        self,
390
        email,
391
        password: str = None,
392
        name: str = None,
393
        timezone: str = '',
394
        lang: str= None,
395
        groups=[],
396
        do_save: bool=True,
397
        do_notify: bool=True,
398
    ) -> User:
399
        if do_notify and not self._config.EMAIL_NOTIFICATION_ACTIVATED:
400
            raise NotificationDisabledCantCreateUserWithInvitation(
401
                "Can't create user with invitation mail because "
402
                "notification are disabled."
403
            )
404
        new_user = self.create_minimal_user(email, groups, save_now=False)
405
        self.update(
406
            user=new_user,
407
            name=name,
408
            email=email,
409
            password=password,
410
            timezone=timezone,
411
            lang=lang,
412
            do_save=False,
413
        )
414
        if do_notify:
415
            try:
416
                email_manager = get_email_manager(self._config, self._session)
417
                email_manager.notify_created_account(
418
                    new_user,
419
                    password=password
420
                )
421
            except SMTPException as exc:
422
                raise NotificationSendingFailed(
423
                    "Notification for new created account can't be send "
424
                    "(SMTP error), new account creation aborted"
425
                ) from exc
426
        if do_save:
427
            self.save(new_user)
428
        return new_user
429
430
    def create_minimal_user(
431
            self,
432
            email,
433
            groups=[],
434
            save_now=False
435
    ) -> User:
436
        """Previous create_user method"""
437
        self._check_email(email)
438
        user = User()
439
        user.email = email
440
        user.display_name = email.split('@')[0]
441
442
        if not groups:
443
            gapi = GroupApi(
444
                current_user=self._user,  # User
445
                session=self._session,
446
                config=self._config,
447
            )
448
            groups = [gapi.get_one(Group.TIM_USER)]
449
        for group in groups:
450
            user.groups.append(group)
451
452
        self._session.add(user)
453
454
        if save_now:
455
            self._session.flush()
456
457
        return user
458
459
    def reset_password_notification(self, user: User, do_save: bool=False) -> str:  # nopep8
460
        """
461
        Reset password notification
462
        :param user: User who want is password resetted
463
        :param do_save: save update ?
464
        :return: reset_password_token
465
        """
466
        if not self._config.EMAIL_NOTIFICATION_ACTIVATED:
467
            raise NotificationDisabledCantResetPassword("cant reset password with notification disabled")  # nopep8
468
        token = user.generate_reset_password_token()
469
        try:
470
            email_manager = get_email_manager(self._config, self._session)
471
            email_manager.notify_reset_password(user, token)
472
        except SMTPException as exc:
473
            raise NotificationSendingFailed("SMTP error, can't send notification") from exc
474
        if do_save:
475
            self.save(user)
476
        return token
477
478
    def validate_reset_password_token(self, user: User, token: str) -> bool:
479
        return user.validate_reset_password_token(
480
            token=token,
481
            validity_seconds=self._config.USER_RESET_PASSWORD_TOKEN_VALIDITY,
482
        )
483
484
    def enable(self, user: User, do_save=False):
485
        user.is_active = True
486
        if do_save:
487
            self.save(user)
488
489
    def disable(self, user: User, do_save=False):
490
        if self._user and self._user == user:
491
            raise UserCantDisableHimself(
492
                "User {} can't disable himself".format(user.user_id)
493
            )
494
495
        user.is_active = False
496
        if do_save:
497
            self.save(user)
498
499
    def delete(self, user: User, do_save=False):
500
        if self._user and self._user == user:
501
            raise UserCantDeleteHimself(
502
                "User {} can't delete himself".format(user.user_id)
503
            )
504
        user.is_deleted = True
505
        if do_save:
506
            self.save(user)
507
508
    def undelete(self, user: User, do_save=False):
509
        user.is_deleted = False
510
        if do_save:
511
            self.save(user)
512
513
    def save(self, user: User):
514
        self._session.flush()
515
516
    def execute_created_user_actions(self, created_user: User) -> None:
517
        """
518
        Execute actions when user just been created
519
        :return:
520
        """
521
        # NOTE: Cyclic import
522
        # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
523
        #from tracim.lib.calendar import CalendarManager
524
        #from tracim.model.organisational import UserCalendar
525
526
        # TODO - G.M - 04-04-2018 - [auth]
527
        # Check if this is already needed with
528
        # new auth system
529
        created_user.ensure_auth_token(
530
            validity_seconds=self._config.USER_AUTH_TOKEN_VALIDITY
531
        )
532
533
        # Ensure database is up-to-date
534
        self._session.flush()
535
        transaction.commit()
536
537
        # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
538
        # calendar_manager = CalendarManager(created_user)
539
        # calendar_manager.create_then_remove_fake_event(
540
        #     calendar_class=UserCalendar,
541
        #     related_object_id=created_user.user_id,
542
        # )
543