Passed
Pull Request — develop (#92)
by inkhey
01:32
created

UserApi.update()   F

Complexity

Conditions 15

Size

Total Lines 45
Code Lines 34

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 34
dl 0
loc 45
rs 2.9998
c 0
b 0
f 0
cc 15
nop 9

How to fix   Complexity    Many Parameters   

Complexity

Complex classes like backend.tracim_backend.lib.core.user.UserApi.update() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
# -*- coding: utf-8 -*-
2
import typing as typing
3
from smtplib import SMTPException
4
5
import transaction
6
from sqlalchemy import or_
7
from sqlalchemy.orm import Query
8
from sqlalchemy.orm import Session
9
from sqlalchemy.orm.exc import NoResultFound
10
11
from tracim_backend.config import CFG
12
from tracim_backend.exceptions import AuthenticationFailed, \
13
    UserCantDeleteHimself, UserCantChangeIsOwnProfile
14
from tracim_backend.exceptions import EmailAlreadyExistInDb
15
from tracim_backend.exceptions import EmailValidationFailed
16
from tracim_backend.exceptions import NotificationDisabled
17
from tracim_backend.exceptions import NotificationSendingFailed
18
from tracim_backend.exceptions import NoUserSetted
19
from tracim_backend.exceptions import PasswordDoNotMatch
20
from tracim_backend.exceptions import TooShortAutocompleteString
21
from tracim_backend.exceptions import UnvalidResetPasswordToken
22
from tracim_backend.exceptions import UserDoesNotExist
23
from tracim_backend.exceptions import UserAuthenticatedIsNotActive
24
from tracim_backend.exceptions import UserCantDisableHimself
25
from tracim_backend.exceptions import WrongUserPassword
26
from tracim_backend.lib.core.group import GroupApi
27
from tracim_backend.lib.mail_notifier.notifier import get_email_manager
28
from tracim_backend.models.auth import Group
29
from tracim_backend.models.auth import User
30
from tracim_backend.models.context_models import TypeUser
31
from tracim_backend.models.context_models import UserInContext
32
from tracim_backend.models.data import UserRoleInWorkspace
33
34
35
class UserApi(object):
36
37
    def __init__(
38
            self,
39
            current_user: typing.Optional[User],
40
            session: Session,
41
            config: CFG,
42
            show_deleted: bool = False,
43
            show_deactivated: bool = True,
44
    ) -> None:
45
        self._session = session
46
        self._user = current_user
47
        self._config = config
48
        self._show_deleted = show_deleted
49
        self._show_deactivated = show_deactivated
50
51
    def _base_query(self):
52
        query = self._session.query(User)
53
        if not self._show_deleted:
54
            query = query.filter(User.is_deleted == False)
55
        if not self._show_deactivated:
56
            query = query.filter(User.is_active == True)
57
        return query
58
59
    def get_user_with_context(self, user: User) -> UserInContext:
60
        """
61
        Return UserInContext object from User
62
        """
63
        user = UserInContext(
64
            user=user,
65
            dbsession=self._session,
66
            config=self._config,
67
        )
68
        return user
69
70
    # Getters
71
72
    def get_one(self, user_id: int) -> User:
73
        """
74
        Get one user by user id
75
        """
76
        try:
77
            user = self._base_query().filter(User.user_id == user_id).one()
78
        except NoResultFound as exc:
79
            raise UserDoesNotExist('User "{}" not found in database'.format(user_id)) from exc  # nopep8
80
        return user
81
82
    def get_one_by_email(self, email: str) -> User:
83
        """
84
        Get one user by email
85
        :param email: Email of the user
86
        :return: one user
87
        """
88
        try:
89
            user = self._base_query().filter(User.email == email).one()
90
        except NoResultFound as exc:
91
            raise UserDoesNotExist('User "{}" not found in database'.format(email)) from exc  # nopep8
92
        return user
93
94
    def get_one_by_public_name(self, public_name: str) -> User:
95
        """
96
        Get one user by public_name
97
        """
98
        try:
99
            user = self._base_query().filter(User.display_name == public_name).one()
100
        except NoResultFound as exc:
101
            raise UserDoesNotExist('User "{}" not found in database'.format(public_name)) from exc  # nopep8
102
        return user
103
    # FIXME - G.M - 24-04-2018 - Duplicate method with get_one.
104
105
    def get_one_by_id(self, id: int) -> User:
106
        return self.get_one(user_id=id)
107
108
    def get_current_user(self) -> User:
109
        """
110
        Get current_user
111
        """
112
        if not self._user:
113
            raise UserDoesNotExist('There is no current user')
114
        return self._user
115
116
    def _get_all_query(self) -> Query:
117
        return self._session.query(User).order_by(User.display_name)
118
119
    def get_all(self) -> typing.Iterable[User]:
120
        return self._get_all_query().all()
121
122
    def get_known_user(
123
            self,
124
            acp: str,
125
    ) -> typing.Iterable[User]:
126
        """
127
        Return list of know user by current UserApi user.
128
        :param acp: autocomplete filter by name/email
129
        :return: List of found users
130
        """
131
        if len(acp) < 2:
132
            raise TooShortAutocompleteString(
133
                '"{acp}" is a too short string, acp string need to have more than one character'.format(acp=acp)  # nopep8
134
            )
135
        query = self._base_query().order_by(User.display_name)
136
        query = query.filter(or_(User.display_name.ilike('%{}%'.format(acp)), User.email.ilike('%{}%'.format(acp))))  # nopep8
137
138
        # INFO - G.M - 2018-07-27 - if user is set and is simple user, we
139
        # should show only user in same workspace as user
140
        if self._user and self._user.profile.id <= Group.TIM_USER:
141
            user_workspaces_id_query = self._session.\
142
                query(UserRoleInWorkspace.workspace_id).\
143
                distinct(UserRoleInWorkspace.workspace_id).\
144
                filter(UserRoleInWorkspace.user_id == self._user.user_id)
145
            users_in_workspaces = self._session.\
146
                query(UserRoleInWorkspace.user_id).\
147
                distinct(UserRoleInWorkspace.user_id).\
148
                filter(UserRoleInWorkspace.workspace_id.in_(user_workspaces_id_query.subquery())).subquery()  # nopep8
149
            query = query.filter(User.user_id.in_(users_in_workspaces))
150
        return query.all()
151
152
    def find(
153
            self,
154
            user_id: int=None,
155
            email: str=None,
156
            public_name: str=None
157
    ) -> typing.Tuple[TypeUser, User]:
158
        """
159
        Find existing user from all theses params.
160
        Check is made in this order: user_id, email, public_name
161
        If no user found raise UserDoesNotExist exception
162
        """
163
        user = None
164
165
        if user_id:
166
            try:
167
                user = self.get_one(user_id)
168
                return TypeUser.USER_ID, user
169
            except UserDoesNotExist:
170
                pass
171
        if email:
172
            try:
173
                user = self.get_one_by_email(email)
174
                return TypeUser.EMAIL, user
175
            except UserDoesNotExist:
176
                pass
177
        if public_name:
178
            try:
179
                user = self.get_one_by_public_name(public_name)
180
                return TypeUser.PUBLIC_NAME, user
181
            except UserDoesNotExist:
182
                pass
183
184
        raise UserDoesNotExist('User not found with any of given params.')
185
186
    # Check methods
187
188
    def user_with_email_exists(self, email: str) -> bool:
189
        try:
190
            self.get_one_by_email(email)
191
            return True
192
        # TODO - G.M - 09-04-2018 - Better exception
193
        except:
194
            return False
195
196
    def authenticate_user(self, email: str, password: str) -> User:
197
        """
198
        Authenticate user with email and password, raise AuthenticationFailed
199
        if uncorrect.
200
        :param email: email of the user
201
        :param password: cleartext password of the user
202
        :return: User who was authenticated.
203
        """
204
        try:
205
            user = self.get_one_by_email(email)
206
            if not user.is_active:
207
                raise UserAuthenticatedIsNotActive('User "{}" is not active'.format(email))
208
            if user.validate_password(password):
209
                return user
210
            else:
211
                raise WrongUserPassword('User "{}" password is incorrect'.format(email))  # nopep8
212
        except (WrongUserPassword, UserDoesNotExist) as exc:
213
            raise AuthenticationFailed('User "{}" authentication failed'.format(email)) from exc  # nopep8
214
215
    # Actions
216
    def set_password(
217
            self,
218
            user: User,
219
            loggedin_user_password: str,
220
            new_password: str,
221
            new_password2: str,
222
            do_save: bool=True
223
    ):
224
        """
225
        Set User password if logged-in user password is correct
226
        and both new_password are the same.
227
        :param user: User who need password changed
228
        :param loggedin_user_password: cleartext password of logged user (not
229
        same as user)
230
        :param new_password: new password for user
231
        :param new_password2: should be same as new_password
232
        :param do_save: should we save new user password ?
233
        :return:
234
        """
235
        if not self._user:
236
            raise NoUserSetted('Current User should be set in UserApi to use this method')  # nopep8
237
        if not self._user.validate_password(loggedin_user_password):  # nopep8
238
            raise WrongUserPassword(
239
                'Wrong password for authenticated user {}'. format(self._user.user_id)  # nopep8
240
            )
241
        if new_password != new_password2:
242
            raise PasswordDoNotMatch('Passwords given are different')
243
244
        self.update(
245
            user=user,
246
            password=new_password,
247
            do_save=do_save,
248
        )
249
        if do_save:
250
            # TODO - G.M - 2018-07-24 - Check why commit is needed here
251
            self.save(user)
252
        return user
253
254
    def set_email(
255
            self,
256
            user: User,
257
            loggedin_user_password: str,
258
            email: str,
259
            do_save: bool = True
260
    ):
261
        """
262
        Set email address of user if loggedin user password is correct
263
        :param user: User who need email changed
264
        :param loggedin_user_password: cleartext password of logged user (not
265
        same as user)
266
        :param email:
267
        :param do_save:
268
        :return:
269
        """
270
        if not self._user:
271
            raise NoUserSetted('Current User should be set in UserApi to use this method')  # nopep8
272
        if not self._user.validate_password(loggedin_user_password):  # nopep8
273
            raise WrongUserPassword(
274
                'Wrong password for authenticated user {}'. format(self._user.user_id)  # nopep8
275
            )
276
        self.update(
277
            user=user,
278
            email=email,
279
            do_save=do_save,
280
        )
281
        return user
282
283
    def set_password_reset_token(
284
            self,
285
            user: User,
286
            new_password: str,
287
            new_password2: str,
288
            reset_token: str,
289
            do_save: bool = False,
290
    ):
291
        self.validate_reset_password_token(user, reset_token)
292
        if new_password != new_password2:
293
            raise PasswordDoNotMatch('Passwords given are different')
294
295
        self.update(
296
            user=user,
297
            password=new_password,
298
            do_save=do_save,
299
        )
300
        user.reset_tokens()
301
        if do_save:
302
            self.save(user)
303
        return user
304
305
    def _check_email(self, email: str) -> bool:
306
        """
307
        Check if email is completely ok to be used in user db table
308
        """
309
        is_email_correct = self._check_email_correctness(email)
310
        if not is_email_correct:
311
            raise EmailValidationFailed(
312
                'Email given form {} is uncorrect'.format(email))  # nopep8
313
        email_already_exist_in_db = self.check_email_already_in_db(email)
314
        if email_already_exist_in_db:
315
            raise EmailAlreadyExistInDb(
316
                'Email given {} already exist, please choose something else'.format(email)  # nopep8
317
            )
318
        return True
319
320
    def check_email_already_in_db(self, email: str) -> bool:
321
        """
322
        Verify if given email does not already exist in db
323
        """
324
        return self._session.query(User.email).filter(User.email==email).count() != 0  # nopep8
325
326
    def _check_email_correctness(self, email: str) -> bool:
327
        """
328
           Verify if given email is correct:
329
           - check format
330
           - futur active check for email ? (dns based ?)
331
           """
332
        # TODO - G.M - 2018-07-05 - find a better way to check email
333
        if not email:
334
            return False
335
        email = email.split('@')
336
        if len(email) != 2:
337
            return False
338
        return True
339
340
    def update(
341
            self,
342
            user: User,
343
            name: str=None,
344
            email: str=None,
345
            password: str=None,
346
            timezone: str=None,
347
            lang: str=None,
348
            groups: typing.Optional[typing.List[Group]]=None,
349
            do_save=True,
350
    ) -> User:
351
        if name is not None:
352
            user.display_name = name
353
354
        if email is not None and email != user.email:
355
            self._check_email(email)
356
            user.email = email
357
358
        if password is not None:
359
            user.password = password
360
361
        if timezone is not None:
362
            user.timezone = timezone
363
364
        if lang is not None:
365
            user.lang = lang
366
367
        if groups is not None:
368
            if self._user and self._user == user:
369
                raise UserCantChangeIsOwnProfile(
370
                    "User {} can't change is own profile".format(user.user_id)
371
                )
372
            # INFO - G.M - 2018-07-18 - Delete old groups
373
            for group in user.groups:
374
                if group not in groups:
375
                    user.groups.remove(group)
376
            # INFO - G.M - 2018-07-18 - add new groups
377
            for group in groups:
378
                if group not in user.groups:
379
                    user.groups.append(group)
380
381
        if do_save:
382
            self.save(user)
383
384
        return user
385
386
    def create_user(
387
        self,
388
        email,
389
        password: str = None,
390
        name: str = None,
391
        timezone: str = '',
392
        lang: str= None,
393
        groups=[],
394
        do_save: bool=True,
395
        do_notify: bool=True,
396
    ) -> User:
397
        if do_notify and not self._config.EMAIL_NOTIFICATION_ACTIVATED:
398
            raise NotificationDisabled(
399
                "Can't create user with invitation mail because "
400
                "notification are disabled."
401
            )
402
        new_user = self.create_minimal_user(email, groups, save_now=False)
403
        self.update(
404
            user=new_user,
405
            name=name,
406
            email=email,
407
            password=password,
408
            timezone=timezone,
409
            lang=lang,
410
            do_save=False,
411
        )
412
        if do_notify:
413
            try:
414
                email_manager = get_email_manager(self._config, self._session)
415
                email_manager.notify_created_account(
416
                    new_user,
417
                    password=password
418
                )
419
            except SMTPException as exc:
420
                raise NotificationSendingFailed(
421
                    "Notification for new created account can't be send "
422
                    "(SMTP error), new account creation aborted"
423
                ) from exc
424
        if do_save:
425
            self.save(new_user)
426
        return new_user
427
428
    def create_minimal_user(
429
            self,
430
            email,
431
            groups=[],
432
            save_now=False
433
    ) -> User:
434
        """Previous create_user method"""
435
        self._check_email(email)
436
        user = User()
437
        user.email = email
438
        user.display_name = email.split('@')[0]
439
440
        if not groups:
441
            gapi = GroupApi(
442
                current_user=self._user,  # User
443
                session=self._session,
444
                config=self._config,
445
            )
446
            groups = [gapi.get_one(Group.TIM_USER)]
447
        for group in groups:
448
            user.groups.append(group)
449
450
        self._session.add(user)
451
452
        if save_now:
453
            self._session.flush()
454
455
        return user
456
457
    def reset_password_notification(self, user: User, do_save: bool=False) -> str:  # nopep8
458
        """
459
        Reset password notification
460
        :param user: User who want is password resetted
461
        :param do_save: save update ?
462
        :return: reset_password_token
463
        """
464
        if not self._config.EMAIL_NOTIFICATION_ACTIVATED:
465
            raise NotificationDisabled("cant reset password with notification disabled")  # nopep8
466
        token = user.generate_reset_password_token()
467
        try:
468
            email_manager = get_email_manager(self._config, self._session)
469
            email_manager.notify_reset_password(user, token)
470
        except SMTPException as exc:
471
            raise NotificationSendingFailed("SMTP error, can't send notification") from exc
472
        if do_save:
473
            self.save(user)
474
        return token
475
476
    def validate_reset_password_token(self, user: User, token: str) -> bool:
477
        return user.validate_reset_password_token(
478
            token=token,
479
            validity_seconds=self._config.USER_RESET_PASSWORD_TOKEN_VALIDITY,
480
        )
481
482
    def enable(self, user: User, do_save=False):
483
        user.is_active = True
484
        if do_save:
485
            self.save(user)
486
487
    def disable(self, user: User, do_save=False):
488
        if self._user and self._user == user:
489
            raise UserCantDisableHimself(
490
                "User {} can't disable himself".format(user.user_id)
491
            )
492
493
        user.is_active = False
494
        if do_save:
495
            self.save(user)
496
497
    def delete(self, user: User, do_save=False):
498
        if self._user and self._user == user:
499
            raise UserCantDeleteHimself(
500
                "User {} can't delete himself".format(user.user_id)
501
            )
502
        user.is_deleted = True
503
        if do_save:
504
            self.save(user)
505
506
    def undelete(self, user: User, do_save=False):
507
        user.is_deleted = False
508
        if do_save:
509
            self.save(user)
510
511
    def save(self, user: User):
512
        self._session.flush()
513
514
    def execute_created_user_actions(self, created_user: User) -> None:
515
        """
516
        Execute actions when user just been created
517
        :return:
518
        """
519
        # NOTE: Cyclic import
520
        # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
521
        #from tracim.lib.calendar import CalendarManager
522
        #from tracim.model.organisational import UserCalendar
523
524
        # TODO - G.M - 04-04-2018 - [auth]
525
        # Check if this is already needed with
526
        # new auth system
527
        created_user.ensure_auth_token(
528
            validity_seconds=self._config.USER_AUTH_TOKEN_VALIDITY
529
        )
530
531
        # Ensure database is up-to-date
532
        self._session.flush()
533
        transaction.commit()
534
535
        # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
536
        # calendar_manager = CalendarManager(created_user)
537
        # calendar_manager.create_then_remove_fake_event(
538
        #     calendar_class=UserCalendar,
539
        #     related_object_id=created_user.user_id,
540
        # )
541