Completed
Push — develop ( 10efb0...b51ddf )
by Bastien
16s queued 14s
created

UserApi.update()   D

Complexity

Conditions 13

Size

Total Lines 41
Code Lines 31

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 31
dl 0
loc 41
rs 4.2
c 0
b 0
f 0
cc 13
nop 9

How to fix   Complexity    Many Parameters   

Complexity

Complex classes like backend.tracim_backend.lib.core.user.UserApi.update() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
# -*- coding: utf-8 -*-
2
import typing as typing
3
from smtplib import SMTPException
4
5
import transaction
6
from sqlalchemy import or_
7
from sqlalchemy.orm import Query
8
from sqlalchemy.orm import Session
9
from sqlalchemy.orm.exc import NoResultFound
10
11
from tracim_backend.config import CFG
12
from tracim_backend.exceptions import AuthenticationFailed
13
from tracim_backend.exceptions import EmailAlreadyExistInDb
14
from tracim_backend.exceptions import EmailValidationFailed
15
from tracim_backend.exceptions import NotificationDisabled
16
from tracim_backend.exceptions import NotificationSendingFailed
17
from tracim_backend.exceptions import NoUserSetted
18
from tracim_backend.exceptions import PasswordDoNotMatch
19
from tracim_backend.exceptions import TooShortAutocompleteString
20
from tracim_backend.exceptions import UnvalidResetPasswordToken
21
from tracim_backend.exceptions import UserDoesNotExist
22
from tracim_backend.exceptions import UserNotActive
23
from tracim_backend.exceptions import WrongUserPassword
24
from tracim_backend.lib.core.group import GroupApi
25
from tracim_backend.lib.mail_notifier.notifier import get_email_manager
26
from tracim_backend.models.auth import Group
27
from tracim_backend.models.auth import User
28
from tracim_backend.models.context_models import TypeUser
29
from tracim_backend.models.context_models import UserInContext
30
from tracim_backend.models.data import UserRoleInWorkspace
31
32
33
class UserApi(object):
34
35
    def __init__(
36
            self,
37
            current_user: typing.Optional[User],
38
            session: Session,
39
            config: CFG,
40
            show_deleted: bool = False,
41
    ) -> None:
42
        self._session = session
43
        self._user = current_user
44
        self._config = config
45
        self._show_deleted = show_deleted
46
47
    def _base_query(self):
48
        query = self._session.query(User)
49
        if not self._show_deleted:
50
            query = query.filter(User.is_deleted == False)
51
        return query
52
53
    def get_user_with_context(self, user: User) -> UserInContext:
54
        """
55
        Return UserInContext object from User
56
        """
57
        user = UserInContext(
58
            user=user,
59
            dbsession=self._session,
60
            config=self._config,
61
        )
62
        return user
63
64
    # Getters
65
66
    def get_one(self, user_id: int) -> User:
67
        """
68
        Get one user by user id
69
        """
70
        try:
71
            user = self._base_query().filter(User.user_id == user_id).one()
72
        except NoResultFound as exc:
73
            raise UserDoesNotExist('User "{}" not found in database'.format(user_id)) from exc  # nopep8
74
        return user
75
76
    def get_one_by_email(self, email: str) -> User:
77
        """
78
        Get one user by email
79
        :param email: Email of the user
80
        :return: one user
81
        """
82
        try:
83
            user = self._base_query().filter(User.email == email).one()
84
        except NoResultFound as exc:
85
            raise UserDoesNotExist('User "{}" not found in database'.format(email)) from exc  # nopep8
86
        return user
87
88
    def get_one_by_public_name(self, public_name: str) -> User:
89
        """
90
        Get one user by public_name
91
        """
92
        try:
93
            user = self._base_query().filter(User.display_name == public_name).one()
94
        except NoResultFound as exc:
95
            raise UserDoesNotExist('User "{}" not found in database'.format(public_name)) from exc  # nopep8
96
        return user
97
    # FIXME - G.M - 24-04-2018 - Duplicate method with get_one.
98
99
    def get_one_by_id(self, id: int) -> User:
100
        return self.get_one(user_id=id)
101
102
    def get_current_user(self) -> User:
103
        """
104
        Get current_user
105
        """
106
        if not self._user:
107
            raise UserDoesNotExist('There is no current user')
108
        return self._user
109
110
    def _get_all_query(self) -> Query:
111
        return self._session.query(User).order_by(User.display_name)
112
113
    def get_all(self) -> typing.Iterable[User]:
114
        return self._get_all_query().all()
115
116
    def get_known_user(
117
            self,
118
            acp: str,
119
    ) -> typing.Iterable[User]:
120
        """
121
        Return list of know user by current UserApi user.
122
        :param acp: autocomplete filter by name/email
123
        :return: List of found users
124
        """
125
        if len(acp) < 2:
126
            raise TooShortAutocompleteString(
127
                '"{acp}" is a too short string, acp string need to have more than one character'.format(acp=acp)  # nopep8
128
            )
129
        query = self._get_all_query()
130
        query = query.filter(or_(User.display_name.ilike('%{}%'.format(acp)), User.email.ilike('%{}%'.format(acp))))  # nopep8
131
132
        # INFO - G.M - 2018-07-27 - if user is set and is simple user, we
133
        # should show only user in same workspace as user
134
        if self._user and self._user.profile.id <= Group.TIM_USER:
135
            user_workspaces_id_query = self._session.\
136
                query(UserRoleInWorkspace.workspace_id).\
137
                distinct(UserRoleInWorkspace.workspace_id).\
138
                filter(UserRoleInWorkspace.user_id == self._user.user_id)
139
            users_in_workspaces = self._session.\
140
                query(UserRoleInWorkspace.user_id).\
141
                distinct(UserRoleInWorkspace.user_id).\
142
                filter(UserRoleInWorkspace.workspace_id.in_(user_workspaces_id_query.subquery())).subquery()  # nopep8
143
            query = query.filter(User.user_id.in_(users_in_workspaces))
144
        return query.all()
145
146
    def find(
147
            self,
148
            user_id: int=None,
149
            email: str=None,
150
            public_name: str=None
151
    ) -> typing.Tuple[TypeUser, User]:
152
        """
153
        Find existing user from all theses params.
154
        Check is made in this order: user_id, email, public_name
155
        If no user found raise UserDoesNotExist exception
156
        """
157
        user = None
158
159
        if user_id:
160
            try:
161
                user = self.get_one(user_id)
162
                return TypeUser.USER_ID, user
163
            except UserDoesNotExist:
164
                pass
165
        if email:
166
            try:
167
                user = self.get_one_by_email(email)
168
                return TypeUser.EMAIL, user
169
            except UserDoesNotExist:
170
                pass
171
        if public_name:
172
            try:
173
                user = self.get_one_by_public_name(public_name)
174
                return TypeUser.PUBLIC_NAME, user
175
            except UserDoesNotExist:
176
                pass
177
178
        raise UserDoesNotExist('User not found with any of given params.')
179
180
    # Check methods
181
182
    def user_with_email_exists(self, email: str) -> bool:
183
        try:
184
            self.get_one_by_email(email)
185
            return True
186
        # TODO - G.M - 09-04-2018 - Better exception
187
        except:
188
            return False
189
190
    def authenticate_user(self, email: str, password: str) -> User:
191
        """
192
        Authenticate user with email and password, raise AuthenticationFailed
193
        if uncorrect.
194
        :param email: email of the user
195
        :param password: cleartext password of the user
196
        :return: User who was authenticated.
197
        """
198
        try:
199
            user = self.get_one_by_email(email)
200
            if not user.is_active:
201
                raise UserNotActive('User "{}" is not active'.format(email))
202
            if user.validate_password(password):
203
                return user
204
            else:
205
                raise WrongUserPassword('User "{}" password is incorrect'.format(email))  # nopep8
206
        except (WrongUserPassword, UserDoesNotExist) as exc:
207
            raise AuthenticationFailed('User "{}" authentication failed'.format(email)) from exc  # nopep8
208
209
    # Actions
210
    def set_password(
211
            self,
212
            user: User,
213
            loggedin_user_password: str,
214
            new_password: str,
215
            new_password2: str,
216
            do_save: bool=True
217
    ):
218
        """
219
        Set User password if loggedin user password is correct
220
        and both new_password are the same.
221
        :param user: User who need password changed
222
        :param loggedin_user_password: cleartext password of logged user (not
223
        same as user)
224
        :param new_password: new password for user
225
        :param new_password2: should be same as new_password
226
        :param do_save: should we save new user password ?
227
        :return:
228
        """
229
        if not self._user:
230
            raise NoUserSetted('Current User should be set in UserApi to use this method')  # nopep8
231
        if not self._user.validate_password(loggedin_user_password):  # nopep8
232
            raise WrongUserPassword(
233
                'Wrong password for authenticated user {}'. format(self._user.user_id)  # nopep8
234
            )
235
        if new_password != new_password2:
236
            raise PasswordDoNotMatch('Passwords given are different')
237
238
        self.update(
239
            user=user,
240
            password=new_password,
241
            do_save=do_save,
242
        )
243
        if do_save:
244
            # TODO - G.M - 2018-07-24 - Check why commit is needed here
245
            self.save(user)
246
        return user
247
248
    def set_email(
249
            self,
250
            user: User,
251
            loggedin_user_password: str,
252
            email: str,
253
            do_save: bool = True
254
    ):
255
        """
256
        Set email address of user if loggedin user password is correct
257
        :param user: User who need email changed
258
        :param loggedin_user_password: cleartext password of logged user (not
259
        same as user)
260
        :param email:
261
        :param do_save:
262
        :return:
263
        """
264
        if not self._user:
265
            raise NoUserSetted('Current User should be set in UserApi to use this method')  # nopep8
266
        if not self._user.validate_password(loggedin_user_password):  # nopep8
267
            raise WrongUserPassword(
268
                'Wrong password for authenticated user {}'. format(self._user.user_id)  # nopep8
269
            )
270
        self.update(
271
            user=user,
272
            email=email,
273
            do_save=do_save,
274
        )
275
        return user
276
277
    def set_password_reset_token(
278
            self,
279
            user: User,
280
            new_password: str,
281
            new_password2: str,
282
            reset_token: str,
283
            do_save: bool = False,
284
    ):
285
        self.validate_reset_password_token(user, reset_token)
286
        if new_password != new_password2:
287
            raise PasswordDoNotMatch('Passwords given are different')
288
289
        self.update(
290
            user=user,
291
            password=new_password,
292
            do_save=do_save,
293
        )
294
        user.reset_tokens()
295
        if do_save:
296
            self.save(user)
297
        return user
298
299
    def _check_email(self, email: str) -> bool:
300
        """
301
        Check if email is completely ok to be used in user db table
302
        """
303
        is_email_correct = self._check_email_correctness(email)
304
        if not is_email_correct:
305
            raise EmailValidationFailed(
306
                'Email given form {} is uncorrect'.format(email))  # nopep8
307
        email_already_exist_in_db = self.check_email_already_in_db(email)
308
        if email_already_exist_in_db:
309
            raise EmailAlreadyExistInDb(
310
                'Email given {} already exist, please choose something else'.format(email)  # nopep8
311
            )
312
        return True
313
314
    def check_email_already_in_db(self, email: str) -> bool:
315
        """
316
        Verify if given email does not already exist in db
317
        """
318
        return self._session.query(User.email).filter(User.email==email).count() != 0  # nopep8
319
320
    def _check_email_correctness(self, email: str) -> bool:
321
        """
322
           Verify if given email is correct:
323
           - check format
324
           - futur active check for email ? (dns based ?)
325
           """
326
        # TODO - G.M - 2018-07-05 - find a better way to check email
327
        if not email:
328
            return False
329
        email = email.split('@')
330
        if len(email) != 2:
331
            return False
332
        return True
333
334
    def update(
335
            self,
336
            user: User,
337
            name: str=None,
338
            email: str=None,
339
            password: str=None,
340
            timezone: str=None,
341
            lang: str=None,
342
            groups: typing.Optional[typing.List[Group]]=None,
343
            do_save=True,
344
    ) -> User:
345
        if name is not None:
346
            user.display_name = name
347
348
        if email is not None and email != user.email:
349
            self._check_email(email)
350
            user.email = email
351
352
        if password is not None:
353
            user.password = password
354
355
        if timezone is not None:
356
            user.timezone = timezone
357
358
        if lang is not None:
359
            user.lang = lang
360
361
        if groups is not None:
362
            # INFO - G.M - 2018-07-18 - Delete old groups
363
            for group in user.groups:
364
                if group not in groups:
365
                    user.groups.remove(group)
366
            # INFO - G.M - 2018-07-18 - add new groups
367
            for group in groups:
368
                if group not in user.groups:
369
                    user.groups.append(group)
370
371
        if do_save:
372
            self.save(user)
373
374
        return user
375
376
    def create_user(
377
        self,
378
        email,
379
        password: str = None,
380
        name: str = None,
381
        timezone: str = '',
382
        lang: str= None,
383
        groups=[],
384
        do_save: bool=True,
385
        do_notify: bool=True,
386
    ) -> User:
387
        if do_notify and not self._config.EMAIL_NOTIFICATION_ACTIVATED:
388
            raise NotificationDisabled(
389
                "Can't create user with invitation mail because "
390
                "notification are disabled."
391
            )
392
        new_user = self.create_minimal_user(email, groups, save_now=False)
393
        self.update(
394
            user=new_user,
395
            name=name,
396
            email=email,
397
            password=password,
398
            timezone=timezone,
399
            lang=lang,
400
            do_save=False,
401
        )
402
        if do_notify:
403
            try:
404
                email_manager = get_email_manager(self._config, self._session)
405
                email_manager.notify_created_account(
406
                    new_user,
407
                    password=password
408
                )
409
            except SMTPException as exc:
410
                raise NotificationSendingFailed(
411
                    "Notification for new created account can't be send "
412
                    "(SMTP error), new account creation aborted"
413
                ) from exc
414
        if do_save:
415
            self.save(new_user)
416
        return new_user
417
418
    def create_minimal_user(
419
            self,
420
            email,
421
            groups=[],
422
            save_now=False
423
    ) -> User:
424
        """Previous create_user method"""
425
        self._check_email(email)
426
        user = User()
427
        user.email = email
428
        user.display_name = email.split('@')[0]
429
430
        if not groups:
431
            gapi = GroupApi(
432
                current_user=self._user,  # User
433
                session=self._session,
434
                config=self._config,
435
            )
436
            groups = [gapi.get_one(Group.TIM_USER)]
437
        for group in groups:
438
            user.groups.append(group)
439
440
        self._session.add(user)
441
442
        if save_now:
443
            self._session.flush()
444
445
        return user
446
447
    def reset_password_notification(self, user: User, do_save: bool=False) -> str:  # nopep8
448
        """
449
        Reset password notification
450
        :param user: User who want is password resetted
451
        :param do_save: save update ?
452
        :return: reset_password_token
453
        """
454
        if not self._config.EMAIL_NOTIFICATION_ACTIVATED:
455
            raise NotificationDisabled("cant reset password with notification disabled")  # nopep8
456
        token = user.generate_reset_password_token()
457
        try:
458
            email_manager = get_email_manager(self._config, self._session)
459
            email_manager.notify_reset_password(user, token)
460
        except SMTPException as exc:
461
            raise NotificationSendingFailed("SMTP error, can't send notification") from exc
462
        if do_save:
463
            self.save(user)
464
        return token
465
466
    def validate_reset_password_token(self, user: User, token: str) -> bool:
467
        return user.validate_reset_password_token(
468
            token=token,
469
            validity_seconds=self._config.USER_RESET_PASSWORD_TOKEN_VALIDITY,
470
        )
471
472
    def enable(self, user: User, do_save=False):
473
        user.is_active = True
474
        if do_save:
475
            self.save(user)
476
477
    def disable(self, user: User, do_save=False):
478
        user.is_active = False
479
        if do_save:
480
            self.save(user)
481
482
    def delete(self, user: User, do_save=False):
483
        user.is_deleted = True
484
        if do_save:
485
            self.save(user)
486
487
    def undelete(self, user: User, do_save=False):
488
        user.is_deleted = False
489
        if do_save:
490
            self.save(user)
491
492
    def save(self, user: User):
493
        self._session.flush()
494
495
    def execute_created_user_actions(self, created_user: User) -> None:
496
        """
497
        Execute actions when user just been created
498
        :return:
499
        """
500
        # NOTE: Cyclic import
501
        # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
502
        #from tracim.lib.calendar import CalendarManager
503
        #from tracim.model.organisational import UserCalendar
504
505
        # TODO - G.M - 04-04-2018 - [auth]
506
        # Check if this is already needed with
507
        # new auth system
508
        created_user.ensure_auth_token(
509
            validity_seconds=self._config.USER_AUTH_TOKEN_VALIDITY
510
        )
511
512
        # Ensure database is up-to-date
513
        self._session.flush()
514
        transaction.commit()
515
516
        # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
517
        # calendar_manager = CalendarManager(created_user)
518
        # calendar_manager.create_then_remove_fake_event(
519
        #     calendar_class=UserCalendar,
520
        #     related_object_id=created_user.user_id,
521
        # )
522