Passed
Push — develop ( 8c70ee...cae2d7 )
by Bastien
01:41
created

UserCommand._proceed_groups()   A

Complexity

Conditions 3

Size

Total Lines 13
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 9
dl 0
loc 13
rs 9.95
c 0
b 0
f 0
cc 3
nop 3
1
# -*- coding: utf-8 -*-
2
import argparse
3
4
import transaction
5
from pyramid.scripting import AppEnvironment
6
from sqlalchemy.exc import IntegrityError
7
from sqlalchemy.orm.exc import NoResultFound
8
9
from tracim_backend import CFG
10
from tracim_backend.command import AppContextCommand
11
from tracim_backend.command import Extender
12
from tracim_backend.exceptions import BadCommandError
13
from tracim_backend.exceptions import GroupDoesNotExist
14
from tracim_backend.exceptions import NotificationDisabled
15
from tracim_backend.exceptions import NotificationSendingFailed
16
from tracim_backend.exceptions import UserAlreadyExistError
17
from tracim_backend.lib.core.group import GroupApi
18
from tracim_backend.lib.core.user import UserApi
19
from tracim_backend.models import Group
20
from tracim_backend.models import User
21
22
23
class UserCommand(AppContextCommand):
24
25
    ACTION_CREATE = 'create'
26
    ACTION_UPDATE = 'update'
27
28
    action = NotImplemented
29
30
    def get_description(self) -> str:
31
        return '''Create or update user.'''
32
33
    def get_parser(self, prog_name: str) -> argparse.ArgumentParser:
34
        parser = super().get_parser(prog_name)
35
36
        parser.add_argument(
37
            "-l",
38
            "--login",
39
            help='User login (email)',
40
            dest='login',
41
            required=True
42
        )
43
44
        parser.add_argument(
45
            "-p",
46
            "--password",
47
            help='User password',
48
            dest='password',
49
            required=False,
50
            default=None
51
        )
52
53
        parser.add_argument(
54
            "-g",
55
            "--add-to-group",
56
            help='Add user to group',
57
            dest='add_to_group',
58
            nargs='*',
59
            action=Extender,
60
            default=[],
61
        )
62
63
        parser.add_argument(
64
            "-rmg",
65
            "--remove-from-group",
66
            help='Remove user from group',
67
            dest='remove_from_group',
68
            nargs='*',
69
            action=Extender,
70
            default=[],
71
        )
72
73
        parser.add_argument(
74
            "--send-email",
75
            help='Send mail to user',
76
            dest='send_email',
77
            required=False,
78
            action='store_true',
79
            default=False,
80
        )
81
82
        return parser
83
84
    def _user_exist(self, login: str) -> User:
85
        return self._user_api.user_with_email_exists(login)
86
87
    def _get_group(self, name: str) -> Group:
88
        groups_availables = [group.group_name
89
                             for group in self._group_api.get_all()]
90
        if name not in groups_availables:
91
            msg = "Group '{}' does not exist, choose a group name in : ".format(name)  # nopep8
92
            for group in groups_availables:
93
                msg+= "'{}',".format(group)
94
            self._session.rollback()
95
            raise GroupDoesNotExist(msg)
96
        return self._group_api.get_one_with_name(name)
97
98
    def _add_user_to_named_group(
99
            self,
100
            user: str,
101
            group_name: str
102
    ) -> None:
103
104
        group = self._get_group(group_name)
105
        if user not in group.users:
106
            group.users.append(user)
107
        self._session.flush()
108
109
    def _remove_user_from_named_group(
110
            self,
111
            user: User,
112
            group_name: str
113
    ) -> None:
114
        group = self._get_group(group_name)
115
        if user in group.users:
116
            group.users.remove(user)
117
        self._session.flush()
118
119
    def _create_user(
120
            self,
121
            login: str,
122
            password: str,
123
            do_notify: bool,
124
            **kwargs
125
    ) -> User:
126
        if not password:
127
            if self._password_required():
128
                raise BadCommandError(
129
                    "You must provide -p/--password parameter"
130
                )
131
            password = ''
132
        if self._user_api.check_email_already_in_db(login):
133
            raise UserAlreadyExistError()
134
        try:
135
            user = self._user_api.create_user(
136
                email=login,
137
                password=password,
138
                do_save=True,
139
                do_notify=do_notify,
140
            )
141
            # TODO - G.M - 04-04-2018 - [Caldav] Check this code
142
            # # We need to enable radicale if it not already done
143
            # daemons = DaemonsManager()
144
            # daemons.run('radicale', RadicaleDaemon)
145
            self._user_api.execute_created_user_actions(user)
146
        except IntegrityError as exception:
147
            self._session.rollback()
148
            raise UserAlreadyExistError() from exception
149
        except (NotificationSendingFailed, NotificationDisabled) as exception:
150
            self._session.rollback()
151
            raise exception from exception
152
        return user
153
154
    def _update_password_for_login(self, login: str, password: str) -> None:
155
        user = self._user_api.get_one_by_email(login)
156
        user.password = password
157
        self._session.flush()
158
        transaction.commit()
159
160
    def take_app_action(
161
            self,
162
            parsed_args: argparse.Namespace,
163
            app_context: AppEnvironment
164
    ) -> None:
165
        # TODO - G.M - 05-04-2018 -Refactor this in order
166
        # to not setup object var outside of __init__ .
167
        self._session = app_context['request'].dbsession
168
        self._app_config = app_context['registry'].settings['CFG']
169
        self._user_api = UserApi(
170
            current_user=None,
171
            session=self._session,
172
            config=self._app_config,
173
        )
174
        self._group_api = GroupApi(
175
            current_user=None,
176
            session=self._session,
177
            config=self._app_config,
178
        )
179
        user = self._proceed_user(parsed_args)
180
        self._proceed_groups(user, parsed_args)
181
182
        print("User created/updated")
183
184
    def _proceed_user(self, parsed_args: argparse.Namespace) -> User:
185
        self._check_context(parsed_args)
186
187
        if self.action == self.ACTION_CREATE:
188
            try:
189
                user = self._create_user(
190
                    login=parsed_args.login,
191
                    password=parsed_args.password,
192
                    do_notify=parsed_args.send_email,
193
                )
194
            except UserAlreadyExistError as exc:
195
                raise UserAlreadyExistError("Error: User already exist (use `user update` command instead)") from exc # nopep8
196
            except NotificationSendingFailed as exc:
197
                raise NotificationSendingFailed("Error: Cannot send email notification due to error, user not created.") from exc  # nopep8
198
            except NotificationDisabled as exc:
199
                raise NotificationDisabled("Error: Email notification disabled but notification required, user not created.") from exc  # nopep8
200
        else:
201
            if parsed_args.password:
202
                self._update_password_for_login(
203
                    login=parsed_args.login,
204
                    password=parsed_args.password
205
                )
206
            user = self._user_api.get_one_by_email(parsed_args.login)
207
208
        return user
209
210
    def _proceed_groups(
211
            self,
212
            user: User,
213
            parsed_args: argparse.Namespace
214
    ) -> None:
215
        # User always in "users" group
216
        self._add_user_to_named_group(user, 'users')
217
218
        for group_name in parsed_args.add_to_group:
219
            self._add_user_to_named_group(user, group_name)
220
221
        for group_name in parsed_args.remove_from_group:
222
            self._remove_user_from_named_group(user, group_name)
223
224
    def _password_required(self) -> bool:
225
        # TODO - G.M - 04-04-2018 - [LDAP] Check this code
226
        # if config.get('auth_type') == LDAPAuth.name:
227
        #     return False
228
        return True
229
230
    def _check_context(self, parsed_args: argparse.Namespace) -> None:
231
        # TODO - G.M - 04-04-2018 - [LDAP] Check this code
232
        # if config.get('auth_type') == LDAPAuth.name:
233
        #     auth_instance = config.get('auth_instance')
234
        #     if not auth_instance.ldap_auth.user_exist(parsed_args.login):
235
        #         raise LDAPUserUnknown(
236
        #             "LDAP is enabled and user with login/email \"%s\" not found in LDAP" % parsed_args.login
237
        #         )
238
        pass
239
240
241
class CreateUserCommand(UserCommand):
242
    action = UserCommand.ACTION_CREATE
243
244
245
class UpdateUserCommand(UserCommand):
246
    action = UserCommand.ACTION_UPDATE
247
248
249
class LDAPUserUnknown(BadCommandError):
250
    pass
251