Completed
Pull Request — develop (#31)
by inkhey
20:37 queued 14:44
created

UserApi.create_user()   A

Complexity

Conditions 4

Size

Total Lines 33
Code Lines 30

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 30
dl 0
loc 33
rs 9.16
c 0
b 0
f 0
cc 4
nop 9

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
# -*- coding: utf-8 -*-
2
from smtplib import SMTPException
3
4
import transaction
5
import typing as typing
6
7
from sqlalchemy import or_
8
from sqlalchemy.orm import Session
9
from sqlalchemy.orm import Query
10
from sqlalchemy.orm.exc import NoResultFound
11
12
from tracim_backend.config import CFG
13
from tracim_backend.models.auth import User
14
from tracim_backend.models.auth import Group
15
from tracim_backend.exceptions import NoUserSetted
16
from tracim_backend.exceptions import EmailAlreadyExistInDb
17
from tracim_backend.exceptions import TooShortAutocompleteString
18
from tracim_backend.exceptions import PasswordDoNotMatch
19
from tracim_backend.exceptions import EmailValidationFailed
20
from tracim_backend.exceptions import UserDoesNotExist
21
from tracim_backend.exceptions import WrongUserPassword
22
from tracim_backend.exceptions import AuthenticationFailed
23
from tracim_backend.exceptions import NotificationNotSend
24
from tracim_backend.exceptions import UserNotActive
25
from tracim_backend.models.context_models import UserInContext
26
from tracim_backend.lib.mail_notifier.notifier import get_email_manager
27
from tracim_backend.models.context_models import TypeUser
28
from tracim_backend.models.data import UserRoleInWorkspace
29
30
31
class UserApi(object):
32
33
    def __init__(
34
            self,
35
            current_user: typing.Optional[User],
36
            session: Session,
37
            config: CFG,
38
            show_deleted: bool = False,
39
    ) -> None:
40
        self._session = session
41
        self._user = current_user
42
        self._config = config
43
        self._show_deleted = show_deleted
44
45
    def _base_query(self):
46
        query = self._session.query(User)
47
        if not self._show_deleted:
48
            query = query.filter(User.is_deleted == False)
49
        return query
50
51
    def get_user_with_context(self, user: User) -> UserInContext:
52
        """
53
        Return UserInContext object from User
54
        """
55
        user = UserInContext(
56
            user=user,
57
            dbsession=self._session,
58
            config=self._config,
59
        )
60
        return user
61
62
    # Getters
63
64
    def get_one(self, user_id: int) -> User:
65
        """
66
        Get one user by user id
67
        """
68
        try:
69
            user = self._base_query().filter(User.user_id == user_id).one()
70
        except NoResultFound as exc:
71
            raise UserDoesNotExist('User "{}" not found in database'.format(user_id)) from exc  # nopep8
72
        return user
73
74
    def get_one_by_email(self, email: str) -> User:
75
        """
76
        Get one user by email
77
        :param email: Email of the user
78
        :return: one user
79
        """
80
        try:
81
            user = self._base_query().filter(User.email == email).one()
82
        except NoResultFound as exc:
83
            raise UserDoesNotExist('User "{}" not found in database'.format(email)) from exc  # nopep8
84
        return user
85
86
    def get_one_by_public_name(self, public_name: str) -> User:
87
        """
88
        Get one user by public_name
89
        """
90
        try:
91
            user = self._base_query().filter(User.display_name == public_name).one()
92
        except NoResultFound as exc:
93
            raise UserDoesNotExist('User "{}" not found in database'.format(public_name)) from exc  # nopep8
94
        return user
95
    # FIXME - G.M - 24-04-2018 - Duplicate method with get_one.
96
97
    def get_one_by_id(self, id: int) -> User:
98
        return self.get_one(user_id=id)
99
100
    def get_current_user(self) -> User:
101
        """
102
        Get current_user
103
        """
104
        if not self._user:
105
            raise UserDoesNotExist('There is no current user')
106
        return self._user
107
108
    def _get_all_query(self) -> Query:
109
        return self._session.query(User).order_by(User.display_name)
110
111
    def get_all(self) -> typing.Iterable[User]:
112
        return self._get_all_query().all()
113
114
    def get_known_user(
115
            self,
116
            acp: str,
117
    ) -> typing.Iterable[User]:
118
        """
119
        Return list of know user by current UserApi user.
120
        :param acp: autocomplete filter by name/email
121
        :return: List of found users
122
        """
123
        if len(acp) < 2:
124
            raise TooShortAutocompleteString(
125
                '"{acp}" is a too short string, acp string need to have more than one character'.format(acp=acp)  # nopep8
126
            )
127
        query = self._get_all_query()
128
        query = query.filter(or_(User.display_name.ilike('%{}%'.format(acp)), User.email.ilike('%{}%'.format(acp))))  # nopep8
129
130
        # INFO - G.M - 2018-07-27 - if user is set and is simple user, we
131
        # should show only user in same workspace as user
132
        if self._user and self._user.profile.id <= Group.TIM_USER:
133
            user_workspaces_id_query = self._session.\
134
                query(UserRoleInWorkspace.workspace_id).\
135
                distinct(UserRoleInWorkspace.workspace_id).\
136
                filter(UserRoleInWorkspace.user_id == self._user.user_id)
137
            users_in_workspaces = self._session.\
138
                query(UserRoleInWorkspace.user_id).\
139
                distinct(UserRoleInWorkspace.user_id).\
140
                filter(UserRoleInWorkspace.workspace_id.in_(user_workspaces_id_query.subquery())).subquery()  # nopep8
141
            query = query.filter(User.user_id.in_(users_in_workspaces))
142
        return query.all()
143
144
    def find(
145
            self,
146
            user_id: int=None,
147
            email: str=None,
148
            public_name: str=None
149
    ) -> typing.Tuple[TypeUser, User]:
150
        """
151
        Find existing user from all theses params.
152
        Check is made in this order: user_id, email, public_name
153
        If no user found raise UserDoesNotExist exception
154
        """
155
        user = None
156
157
        if user_id:
158
            try:
159
                user = self.get_one(user_id)
160
                return TypeUser.USER_ID, user
161
            except UserDoesNotExist:
162
                pass
163
        if email:
164
            try:
165
                user = self.get_one_by_email(email)
166
                return TypeUser.EMAIL, user
167
            except UserDoesNotExist:
168
                pass
169
        if public_name:
170
            try:
171
                user = self.get_one_by_public_name(public_name)
172
                return TypeUser.PUBLIC_NAME, user
173
            except UserDoesNotExist:
174
                pass
175
176
        raise UserDoesNotExist('User not found with any of given params.')
177
178
    # Check methods
179
180
    def user_with_email_exists(self, email: str) -> bool:
181
        try:
182
            self.get_one_by_email(email)
183
            return True
184
        # TODO - G.M - 09-04-2018 - Better exception
185
        except:
186
            return False
187
188
    def authenticate_user(self, email: str, password: str) -> User:
189
        """
190
        Authenticate user with email and password, raise AuthenticationFailed
191
        if uncorrect.
192
        :param email: email of the user
193
        :param password: cleartext password of the user
194
        :return: User who was authenticated.
195
        """
196
        try:
197
            user = self.get_one_by_email(email)
198
            if not user.is_active:
199
                raise UserNotActive('User "{}" is not active'.format(email))
200
            if user.validate_password(password):
201
                return user
202
            else:
203
                raise WrongUserPassword('User "{}" password is incorrect'.format(email))  # nopep8
204
        except (WrongUserPassword, UserDoesNotExist) as exc:
205
            raise AuthenticationFailed('User "{}" authentication failed'.format(email)) from exc  # nopep8
206
207
    # Actions
208
    def set_password(
209
            self,
210
            user: User,
211
            loggedin_user_password: str,
212
            new_password: str,
213
            new_password2: str,
214
            do_save: bool=True
215
    ):
216
        """
217
        Set User password if loggedin user password is correct
218
        and both new_password are the same.
219
        :param user: User who need password changed
220
        :param loggedin_user_password: cleartext password of logged user (not
221
        same as user)
222
        :param new_password: new password for user
223
        :param new_password2: should be same as new_password
224
        :param do_save: should we save new user password ?
225
        :return:
226
        """
227
        if not self._user:
228
            raise NoUserSetted('Current User should be set in UserApi to use this method')  # nopep8
229
        if not self._user.validate_password(loggedin_user_password):  # nopep8
230
            raise WrongUserPassword(
231
                'Wrong password for authenticated user {}'. format(self._user.user_id)  # nopep8
232
            )
233
        if new_password != new_password2:
234
            raise PasswordDoNotMatch('Passwords given are different')
235
236
        self.update(
237
            user=user,
238
            password=new_password,
239
            do_save=do_save,
240
        )
241
        if do_save:
242
            # TODO - G.M - 2018-07-24 - Check why commit is needed here
243
            self.save(user)
244
        return user
245
246
    def set_email(
247
            self,
248
            user: User,
249
            loggedin_user_password: str,
250
            email: str,
251
            do_save: bool = True
252
    ):
253
        """
254
        Set email address of user if loggedin user password is correct
255
        :param user: User who need email changed
256
        :param loggedin_user_password: cleartext password of logged user (not
257
        same as user)
258
        :param email:
259
        :param do_save:
260
        :return:
261
        """
262
        if not self._user:
263
            raise NoUserSetted('Current User should be set in UserApi to use this method')  # nopep8
264
        if not self._user.validate_password(loggedin_user_password):  # nopep8
265
            raise WrongUserPassword(
266
                'Wrong password for authenticated user {}'. format(self._user.user_id)  # nopep8
267
            )
268
        self.update(
269
            user=user,
270
            email=email,
271
            do_save=do_save,
272
        )
273
        return user
274
275
    def _check_email(self, email: str) -> bool:
276
        """
277
        Check if email is completely ok to be used in user db table
278
        """
279
        is_email_correct = self._check_email_correctness(email)
280
        if not is_email_correct:
281
            raise EmailValidationFailed(
282
                'Email given form {} is uncorrect'.format(email))  # nopep8
283
        email_already_exist_in_db = self.check_email_already_in_db(email)
284
        if email_already_exist_in_db:
285
            raise EmailAlreadyExistInDb(
286
                'Email given {} already exist, please choose something else'.format(email)  # nopep8
287
            )
288
        return True
289
290
    def check_email_already_in_db(self, email: str) -> bool:
291
        """
292
        Verify if given email does not already exist in db
293
        """
294
        return self._session.query(User.email).filter(User.email==email).count() != 0  # nopep8
295
296
    def _check_email_correctness(self, email: str) -> bool:
297
        """
298
           Verify if given email is correct:
299
           - check format
300
           - futur active check for email ? (dns based ?)
301
           """
302
        # TODO - G.M - 2018-07-05 - find a better way to check email
303
        if not email:
304
            return False
305
        email = email.split('@')
306
        if len(email) != 2:
307
            return False
308
        return True
309
310
    def update(
311
            self,
312
            user: User,
313
            name: str=None,
314
            email: str=None,
315
            password: str=None,
316
            timezone: str=None,
317
            lang: str=None,
318
            groups: typing.Optional[typing.List[Group]]=None,
319
            do_save=True,
320
    ) -> User:
321
        if name is not None:
322
            user.display_name = name
323
324
        if email is not None and email != user.email:
325
            self._check_email(email)
326
            user.email = email
327
328
        if password is not None:
329
            user.password = password
330
331
        if timezone is not None:
332
            user.timezone = timezone
333
334
        if lang is not None:
335
            user.lang = lang
336
337
        if groups is not None:
338
            # INFO - G.M - 2018-07-18 - Delete old groups
339
            for group in user.groups:
340
                if group not in groups:
341
                    user.groups.remove(group)
342
            # INFO - G.M - 2018-07-18 - add new groups
343
            for group in groups:
344
                if group not in user.groups:
345
                    user.groups.append(group)
346
347
        if do_save:
348
            self.save(user)
349
350
        return user
351
352
    def create_user(
353
        self,
354
        email,
355
        password: str = None,
356
        name: str = None,
357
        timezone: str = '',
358
        lang: str= None,
359
        groups=[],
360
        do_save: bool=True,
361
        do_notify: bool=True,
362
    ) -> User:
363
        new_user = self.create_minimal_user(email, groups, save_now=False)
364
        self.update(
365
            user=new_user,
366
            name=name,
367
            email=email,
368
            password=password,
369
            timezone=timezone,
370
            lang=lang,
371
            do_save=False,
372
        )
373
        if do_notify:
374
            try:
375
                email_manager = get_email_manager(self._config, self._session)
376
                email_manager.notify_created_account(
377
                    new_user,
378
                    password=password
379
                )
380
            except SMTPException as e:
381
                raise NotificationNotSend()
382
        if do_save:
383
            self.save(new_user)
384
        return new_user
385
386
    def create_minimal_user(
387
            self,
388
            email,
389
            groups=[],
390
            save_now=False
391
    ) -> User:
392
        """Previous create_user method"""
393
        self._check_email(email)
394
        user = User()
395
        user.email = email
396
        user.display_name = email.split('@')[0]
397
398
        for group in groups:
399
            user.groups.append(group)
400
401
        self._session.add(user)
402
403
        if save_now:
404
            self._session.flush()
405
406
        return user
407
408
    def enable(self, user: User, do_save=False):
409
        user.is_active = True
410
        if do_save:
411
            self.save(user)
412
413
    def disable(self, user:User, do_save=False):
414
        user.is_active = False
415
        if do_save:
416
            self.save(user)
417
418
    def delete(self, user: User, do_save=False):
419
        user.is_deleted = True
420
        if do_save:
421
            self.save(user)
422
423
    def undelete(self, user: User, do_save=False):
424
        user.is_deleted = False
425
        if do_save:
426
            self.save(user)
427
428
    def save(self, user: User):
429
        self._session.flush()
430
431
    def execute_created_user_actions(self, created_user: User) -> None:
432
        """
433
        Execute actions when user just been created
434
        :return:
435
        """
436
        # NOTE: Cyclic import
437
        # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
438
        #from tracim.lib.calendar import CalendarManager
439
        #from tracim.model.organisational import UserCalendar
440
441
        # TODO - G.M - 04-04-2018 - [auth]
442
        # Check if this is already needed with
443
        # new auth system
444
        created_user.ensure_auth_token(
445
            session=self._session,
446
            validity_seconds=self._config.USER_AUTH_TOKEN_VALIDITY
447
        )
448
449
        # Ensure database is up-to-date
450
        self._session.flush()
451
        transaction.commit()
452
453
        # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
454
        # calendar_manager = CalendarManager(created_user)
455
        # calendar_manager.create_then_remove_fake_event(
456
        #     calendar_class=UserCalendar,
457
        #     related_object_id=created_user.user_id,
458
        # )
459