Test Failed
Pull Request — master (#375)
by Vinicius
05:01
created

kytos.core.auth.Auth._delete_user()   A

Complexity

Conditions 2

Size

Total Lines 7
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 6
nop 2
dl 0
loc 7
rs 10
c 0
b 0
f 0
1
"""Module with main classes related to Authentication."""
2
3
# pylint: disable=invalid-name
4
import base64
5
import binascii
6
import datetime
7
import getpass
8
import logging
9
import os
10
import uuid
11
from functools import wraps
12
from http import HTTPStatus
13
14
import jwt
15
import pymongo
16
from pydantic import ValidationError
17
from pymongo.collection import ReturnDocument
18
from pymongo.errors import AutoReconnect, DuplicateKeyError
19
from pymongo.results import InsertOneResult
20
from tenacity import retry_if_exception_type, stop_after_attempt, wait_random
21
22
from kytos.core.config import KytosConfig
23
from kytos.core.db import Mongo
24
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
25
                                 content_type_json_or_415, get_json_or_400)
26
from kytos.core.retry import before_sleep, for_all_methods, retries
27
from kytos.core.user import HashSubDoc, UserDoc, UserDocUpdate
28
29
__all__ = ['authenticated']
30
31
LOG = logging.getLogger(__name__)
32
33
34
def authenticated(func):
35
    """Handle tokens from requests."""
36
    @wraps(func)
37
    def wrapper(*args, **kwargs):
38
        """Verify the requires of token."""
39
        try:
40
            request: Request = None
41
            for arg in args:
42
                if isinstance(arg, Request):
43
                    request = arg
44
                    break
45
            else:
46
                raise ValueError("Request arg not found in the decorated func")
47
            content = request.headers.get("Authorization")
48
            if content is None:
49
                raise ValueError("The attribute 'content' has an invalid "
50
                                 "value 'None'.")
51
            token = content.split("Bearer ")[1]
52
            jwt.decode(token, key=Auth.get_jwt_secret(),
53
                       algorithms=[Auth.encode_algorithm])
54
        except (
55
            ValueError,
56
            IndexError,
57
            jwt.exceptions.ExpiredSignatureError,
58
            jwt.exceptions.DecodeError,
59
        ) as exc:
60
            msg = f"Token not sent or expired: {exc}"
61
            return JSONResponse({"error": msg},
62
                                status_code=HTTPStatus.UNAUTHORIZED.value)
63
        return func(*args, **kwargs)
64
65
    return wrapper
66
67
68
@for_all_methods(
69
    retries,
70
    stop=stop_after_attempt(
71
        int(os.environ.get("MONGO_AUTO_RETRY_STOP_AFTER_ATTEMPT", 3))
72
    ),
73
    wait=wait_random(
74
        min=int(os.environ.get("MONGO_AUTO_RETRY_WAIT_RANDOM_MIN", 0.1)),
75
        max=int(os.environ.get("MONGO_AUTO_RETRY_WAIT_RANDOM_MAX", 1)),
76
    ),
77
    before_sleep=before_sleep,
78
    retry=retry_if_exception_type((AutoReconnect,)),
79
)
80
class UserController:
81
    """UserController"""
82
83
    # pylint: disable=unnecessary-lambda
84
    def __init__(self, get_mongo=lambda: Mongo()):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable Mongo does not seem to be defined.
Loading history...
85
        self.mongo = get_mongo()
86
        self.db_client = self.mongo.client
87
        self.db = self.db_client[self.mongo.db_name]
88
89
    def bootstrap_indexes(self):
90
        """Bootstrap all users related indexes."""
91
        index_tuples = [
92
            ("users", [("username", pymongo.ASCENDING)], {"unique": True}),
93
        ]
94
        for collection, keys, kwargs in index_tuples:
95
            if self.mongo.bootstrap_index(collection, keys, **kwargs):
96
                LOG.info(
97
                    f"Created DB index {keys}, collection: {collection}"
98
                )
99
100
    def create_user(self, user_data: dict) -> InsertOneResult:
101
        """Create user to database"""
102
        try:
103
            utc_now = datetime.datetime.utcnow()
104
            result = self.db.users.insert_one(UserDoc(**{
105
                "_id": str(uuid.uuid4()),
106
                "username": user_data.get('username'),
107
                "hash": HashSubDoc(),
108
                "password": user_data.get('password'),
109
                "email": user_data.get('email'),
110
                "inserted_at": utc_now,
111
                "updated_at": utc_now,
112
            }).dict())
113
        except DuplicateKeyError as err:
114
            raise err
115
        except ValidationError as err:
116
            raise err
117
        return result
118
119
    def delete_user(self, username: str) -> dict:
120
        """Delete user from database"""
121
        utc_now = datetime.datetime.utcnow()
122
        result = self.db.users.find_one_and_update(
123
            {"username": username},
124
            {"$set": {
125
                "state": "inactive",
126
                "deleted_at": utc_now,
127
            }},
128
            return_document=ReturnDocument.AFTER,
129
        )
130
        return result
131
132
    def update_user(self, username: str, data: dict) -> dict:
133
        """Update user from database"""
134
        utc_now = datetime.datetime.utcnow()
135
        if "password" in data:
136
            data["hash"] = HashSubDoc()
137
        try:
138
            result = self.db.users.find_one_and_update(
139
                {"username": username},
140
                {
141
                    "$set": UserDocUpdate(**{
142
                        **data,
143
                        **{"updated_at": utc_now}
144
                    }).dict(exclude_none=True)
145
                },
146
                return_document=ReturnDocument.AFTER
147
            )
148
        except ValidationError as err:
149
            raise err
150
        except DuplicateKeyError as err:
151
            raise err
152
        return result
153
154
    def get_user(self, username: str) -> dict:
155
        """Return a user information from database"""
156
        data = self.db.users.aggregate([
157
            {"$match": {"username": username}},
158
            {"$project": UserDoc.projection()},
159
            {"$limit": 1}
160
        ])
161
        try:
162
            user, *_ = list(value for value in data)
163
            return user
164
        except ValueError:
165
            return {}
166
167
    def get_user_nopw(self, username: str) -> dict:
168
        """Return a user information from database without password"""
169
        data = self.db.users.aggregate([
170
            {"$match": {"username": username}},
171
            {"$project": UserDoc.projection_nopw()},
172
            {"$limit": 1}
173
        ])
174
        try:
175
            user, *_ = list(value for value in data)
176
            return user
177
        except ValueError:
178
            return {}
179
180
    def get_users(self) -> dict:
181
        """Return all the users"""
182
        data = self.db.users.aggregate([
183
            {"$project": UserDoc.projection_nopw()}
184
        ])
185
        return {'users': list(value for value in data)}
186
187
188
class Auth:
189
    """Module used to provide Kytos authentication routes."""
190
191
    encode_algorithm = "HS256"
192
193
    def __init__(self, controller):
194
        """Init method of Auth class takes the parameters below.
195
196
        Args:
197
            controller(kytos.core.controller): A Controller instance.
198
199
        """
200
        self.user_controller = self.get_user_controller()
201
        self.user_controller.bootstrap_indexes()
202
        self.controller = controller
203
        self.token_expiration_minutes = self.get_token_expiration()
204
        if self.controller.options.create_superuser is True:
205
            self._create_superuser()
206
207
    @staticmethod
208
    def get_user_controller():
209
        """Get UserController"""
210
        return UserController()
211
212
    @staticmethod
213
    def get_token_expiration():
214
        """Return token expiration time in minutes defined in kytos conf."""
215
        options = KytosConfig().options['daemon']
216
        return options.token_expiration_minutes
217
218
    @classmethod
219
    def get_jwt_secret(cls):
220
        """Return JWT secret defined in kytos conf."""
221
        options = KytosConfig().options['daemon']
222
        return options.jwt_secret
223
224
    @classmethod
225
    def _generate_token(cls, username, time_exp):
226
        """Generate a jwt token."""
227
        return jwt.encode(
228
            {
229
                'username': username,
230
                'iss': "Kytos NApps Server",
231
                'exp': time_exp,
232
            },
233
            Auth.get_jwt_secret(),
234
            algorithm=cls.encode_algorithm,
235
        )
236
237
    def _create_superuser(self):
238
        """Create a superuser using MongoDB."""
239
        def get_username():
240
            return input("Username: ")
241
242
        def get_email():
243
            return input("Email: ")
244
245
        username = get_username()
246
        email = get_email()
247
        while True:
248
            password = getpass.getpass()
249
            re_password = getpass.getpass('Retype password: ')
250
            if password == re_password:
251
                break
252
        user = {
253
            "username": username,
254
            "email": email,
255
            "password": password,
256
        }
257
        try:
258
            self.user_controller.create_user(user)
259
        except ValidationError as err:
260
            msg = self.error_msg(err.errors())
261
            return f"Error: {msg}"
262
        except DuplicateKeyError:
263
            return f"Error: {username} already exist."
264
265
        return "User successfully created"
266
267
    def register_core_auth_services(self):
268
        """
269
        Register /kytos/core/ services over authentication.
270
271
        It registers create, authenticate, list all, list specific, delete and
272
        update users.
273
        """
274
        self.controller.api_server.register_core_endpoint(
275
            "auth/login/", self._authenticate_user
276
        )
277
        self.controller.api_server.register_core_endpoint(
278
            "auth/users/", self._list_users,
279
        )
280
        self.controller.api_server.register_core_endpoint(
281
            "auth/users/{username}", self._list_user
282
        )
283
        self.controller.api_server.register_core_endpoint(
284
            "auth/users/", self._create_user, methods=["POST"]
285
        )
286
        self.controller.api_server.register_core_endpoint(
287
            "auth/users/{username}", self._delete_user, methods=["DELETE"]
288
        )
289
        self.controller.api_server.register_core_endpoint(
290
            "auth/users/{username}", self._update_user, methods=["PATCH"]
291
        )
292
293
    def _authenticate_user(self, request: Request) -> JSONResponse:
294
        """Authenticate a user using MongoDB."""
295
        if "Authorization" not in request.headers:
296
            raise HTTPException(400, detail="Authorization header missing")
297
        try:
298
            auth = request.headers["Authorization"]
299
            _scheme, credentials = auth.split()
300
            decoded = base64.b64decode(credentials).decode("ascii")
301
            username, _, password = decoded.partition(":")
302
            password = password.encode()
303
        except (ValueError, TypeError, binascii.Error) as err:
304
            msg = "Credentials were not correctly set."
305
            raise HTTPException(400, detail=msg) from err
306
        user = self._find_user(username)
307
        if user["state"] != 'active':
308
            raise HTTPException(401, detail='This user is not active')
309
        password_hashed = UserDoc.hashing(password, user["hash"])
310
        if user["password"] != password_hashed:
311
            raise HTTPException(401, detail="Incorrect password")
312
        time_exp = datetime.datetime.utcnow() + datetime.timedelta(
313
            minutes=self.token_expiration_minutes
314
        )
315
        token = self._generate_token(username, time_exp)
316
        return JSONResponse({"token": token})
317
318
    def _find_user(self, username):
319
        """Find a specific user using MongoDB."""
320
        user = self.user_controller.get_user(username)
321
        if not user:
322
            raise HTTPException(404, detail=f"User {username} not found")
323
        return user
324
325
    @authenticated
326
    def _list_user(self, request: Request) -> JSONResponse:
327
        """List a specific user using MongoDB."""
328
        username = request.path_params["username"]
329
        user = self.user_controller.get_user_nopw(username)
330
        if not user:
331
            raise HTTPException(404, detail=f"User {username} not found")
332
        return JSONResponse(user)
333
334
    @authenticated
335
    def _list_users(self, _request: Request) -> JSONResponse:
336
        """List all users using MongoDB."""
337
        users_list = self.user_controller.get_users()
338
        return JSONResponse(users_list)
339
340
    @staticmethod
341
    def _get_request_body(request: Request) -> dict:
342
        """Get JSON from request"""
343
        content_type_json_or_415(request)
344
        body = get_json_or_400(request)
345
        if not isinstance(body, dict):
346
            raise HTTPException(400, "Invalid payload type: {body}")
347
        return body
348
349
    @staticmethod
350
    def error_msg(error_list: list) -> str:
351
        """Return a more request friendly error message from ValidationError"""
352
        msg = ""
353
        for err in error_list:
354
            for value in err['loc']:
355
                msg += value + ", "
356
            msg = msg[:-2]
357
            msg += ": " + err["msg"] + "; "
358
        return msg[:-2]
359
360
    @authenticated
361
    def _create_user(self, request: Request) -> JSONResponse:
362
        """Save a user using MongoDB."""
363
        try:
364
            user_data = self._get_request_body(request)
365
            self.user_controller.create_user(user_data)
366
        except ValidationError as err:
367
            msg = self.error_msg(err.errors())
368
            raise HTTPException(400, msg) from err
369
        except DuplicateKeyError as err:
370
            msg = user_data.get("username") + " already exists."
371
            raise HTTPException(409, detail=msg) from err
372
        return JSONResponse("User successfully created", status_code=201)
373
374
    @authenticated
375
    def _delete_user(self, request: Request) -> JSONResponse:
376
        """Delete a user using MongoDB."""
377
        username = request.path_params["username"]
378
        if not self.user_controller.delete_user(username):
379
            raise HTTPException(404, detail=f"User {username} not found")
380
        return JSONResponse(f"User {username} deleted succesfully")
381
382
    @authenticated
383
    def _update_user(self, request: Request) -> JSONResponse:
384
        """Update user data using MongoDB."""
385
        body = self._get_request_body(request)
386
        username = request.path_params["username"]
387
        allowed = ["username", "email", "password"]
388
        data = {}
389
        for key, value in body.items():
390
            if key in allowed:
391
                data[key] = value
392
        try:
393
            updated = self.user_controller.update_user(username, data)
394
        except ValidationError as err:
395
            msg = self.error_msg(err.errors())
396
            raise HTTPException(400, detail=msg) from err
397
        except DuplicateKeyError as err:
398
            msg = username + " already exists."
399
            raise HTTPException(409, detail=msg) from err
400
        if not updated:
401
            raise HTTPException(404, detail=f"User {username} not found")
402
        return JSONResponse("User successfully updated")
403