kytos.core.auth.Auth._generate_token()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 11
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 9
nop 3
dl 0
loc 11
rs 9.95
c 0
b 0
f 0
1
"""Module with main classes related to Authentication."""
2
3
# pylint: disable=invalid-name
4
import base64
5
import binascii
6
import datetime
7
import getpass
8
import logging
9
import os
10
import uuid
11
from functools import wraps
12
from http import HTTPStatus
13
14
import jwt
15
import pymongo
16
from pydantic import ValidationError
17
from pymongo.collection import ReturnDocument
18
from pymongo.errors import (ConnectionFailure, DuplicateKeyError,
19
                            ExecutionTimeout)
20
from pymongo.results import InsertOneResult
21
from tenacity import retry_if_exception_type, stop_after_attempt, wait_random
22
23
from kytos.core.config import KytosConfig
24
from kytos.core.db import Mongo
25
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
26
                                 content_type_json_or_415, error_msg,
27
                                 get_json_or_400)
28
from kytos.core.retry import before_sleep, for_all_methods, retries
29
from kytos.core.user import HashSubDoc, UserDoc, UserDocUpdate, hashing
30
31
__all__ = ['authenticated']
32
33
LOG = logging.getLogger(__name__)
34
35
36
def authenticated(func):
37
    """Handle tokens from requests."""
38
    @wraps(func)
39
    def wrapper(*args, **kwargs):
40
        """Verify the requires of token."""
41
        try:
42
            request: Request = None
43
            for arg in args:
44
                if isinstance(arg, Request):
45
                    request = arg
46
                    break
47
            else:
48
                raise ValueError("Request arg not found in the decorated func")
49
            content = request.headers.get("Authorization")
50
            if content is None:
51
                raise ValueError("The attribute 'content' has an invalid "
52
                                 "value 'None'.")
53
            token = content.split("Bearer ")[1]
54
            jwt.decode(token, key=Auth.get_jwt_secret(),
55
                       algorithms=[Auth.encode_algorithm])
56
        except (
57
            ValueError,
58
            IndexError,
59
            jwt.exceptions.ExpiredSignatureError,
60
            jwt.exceptions.DecodeError,
61
        ) as exc:
62
            msg = f"Token not sent or expired: {exc}"
63
            return JSONResponse({"error": msg},
64
                                status_code=HTTPStatus.UNAUTHORIZED.value)
65
        return func(*args, **kwargs)
66
67
    return wrapper
68
69
70
@for_all_methods(
71
    retries,
72
    stop=stop_after_attempt(
73
        int(os.environ.get("MONGO_AUTO_RETRY_STOP_AFTER_ATTEMPT", 3))
74
    ),
75
    wait=wait_random(
76
        min=int(os.environ.get("MONGO_AUTO_RETRY_WAIT_RANDOM_MIN", 0.1)),
77
        max=int(os.environ.get("MONGO_AUTO_RETRY_WAIT_RANDOM_MAX", 1)),
78
    ),
79
    before_sleep=before_sleep,
80
    retry=retry_if_exception_type((ConnectionFailure, ExecutionTimeout)),
81
)
82
class UserController:
83
    """UserController"""
84
85
    # pylint: disable=unnecessary-lambda
86
    def __init__(self, get_mongo=lambda: Mongo()):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable Mongo does not seem to be defined.
Loading history...
87
        self.mongo = get_mongo()
88
        self.db_client = self.mongo.client
89
        self.db = self.db_client[self.mongo.db_name]
90
91
    def bootstrap_indexes(self):
92
        """Bootstrap all users related indexes."""
93
        index_tuples = [
94
            ("users", [("username", pymongo.ASCENDING)], {"unique": True}),
95
        ]
96
        for collection, keys, kwargs in index_tuples:
97
            if self.mongo.bootstrap_index(collection, keys, **kwargs):
98
                LOG.info(
99
                    f"Created DB index {keys}, collection: {collection}"
100
                )
101
102
    def create_user(self, user_data: dict) -> InsertOneResult:
103
        """Create user to database"""
104
        try:
105
            utc_now = datetime.datetime.utcnow()
106
            result = self.db.users.insert_one(UserDoc(**{
107
                "_id": str(uuid.uuid4()),
108
                "username": user_data.get('username'),
109
                "hash": HashSubDoc(),
110
                "password": user_data.get('password'),
111
                "email": user_data.get('email'),
112
                "inserted_at": utc_now,
113
                "updated_at": utc_now,
114
            }).model_dump())
115
        except DuplicateKeyError as err:
116
            raise err
117
        except ValidationError as err:
118
            raise err
119
        return result
120
121
    def delete_user(self, username: str) -> dict:
122
        """Delete user from database"""
123
        utc_now = datetime.datetime.utcnow()
124
        result = self.db.users.find_one_and_update(
125
            {"username": username},
126
            {"$set": {
127
                "state": "inactive",
128
                "deleted_at": utc_now,
129
            }},
130
            return_document=ReturnDocument.AFTER,
131
        )
132
        return result
133
134
    def update_user(self, username: str, data: dict) -> dict:
135
        """Update user from database"""
136
        utc_now = datetime.datetime.utcnow()
137
        if "password" in data:
138
            data["hash"] = HashSubDoc()
139
        try:
140
            result = self.db.users.find_one_and_update(
141
                {"username": username},
142
                {
143
                    "$set": UserDocUpdate(**{
144
                        **data,
145
                        **{"updated_at": utc_now}
146
                    }).model_dump(exclude_none=True)
147
                },
148
                return_document=ReturnDocument.AFTER
149
            )
150
        except ValidationError as err:
151
            raise err
152
        except DuplicateKeyError as err:
153
            raise err
154
        return result
155
156
    def get_user(self, username: str) -> dict:
157
        """Return a user information from database"""
158
        data = self.db.users.aggregate([
159
            {"$match": {"username": username}},
160
            {"$project": UserDoc.projection()},
161
            {"$limit": 1}
162
        ])
163
        try:
164
            user, *_ = list(value for value in data)
165
            return user
166
        except ValueError:
167
            return {}
168
169
    def get_user_nopw(self, username: str) -> dict:
170
        """Return a user information from database without password"""
171
        data = self.db.users.aggregate([
172
            {"$match": {"username": username}},
173
            {"$project": UserDoc.projection_nopw()},
174
            {"$limit": 1}
175
        ])
176
        try:
177
            user, *_ = list(value for value in data)
178
            return user
179
        except ValueError:
180
            return {}
181
182
    def get_users(self) -> dict:
183
        """Return all the users"""
184
        data = self.db.users.aggregate([
185
            {"$project": UserDoc.projection_nopw()}
186
        ])
187
        return {'users': list(value for value in data)}
188
189
190
class Auth:
191
    """Module used to provide Kytos authentication routes."""
192
193
    encode_algorithm = "HS256"
194
195
    def __init__(self, controller):
196
        """Init method of Auth class takes the parameters below.
197
198
        Args:
199
            controller(kytos.core.controller): A Controller instance.
200
201
        """
202
        self.user_controller = self.get_user_controller()
203
        self.user_controller.bootstrap_indexes()
204
        self.controller = controller
205
        self.token_expiration_minutes = self.get_token_expiration()
206
        if self.controller.options.create_superuser is True:
207
            self._create_superuser()
208
209
    @staticmethod
210
    def get_user_controller():
211
        """Get UserController"""
212
        return UserController()
213
214
    @staticmethod
215
    def get_token_expiration():
216
        """Return token expiration time in minutes defined in kytos conf."""
217
        options = KytosConfig().options['daemon']
218
        return options.token_expiration_minutes
219
220
    @classmethod
221
    def get_jwt_secret(cls):
222
        """Return JWT secret defined in kytos conf."""
223
        options = KytosConfig().options['daemon']
224
        return options.jwt_secret
225
226
    @classmethod
227
    def _generate_token(cls, username, time_exp):
228
        """Generate a jwt token."""
229
        return jwt.encode(
230
            {
231
                'username': username,
232
                'iss': "Kytos NApps Server",
233
                'exp': time_exp,
234
            },
235
            Auth.get_jwt_secret(),
236
            algorithm=cls.encode_algorithm,
237
        )
238
239
    def _create_superuser(self):
240
        """Create a superuser using MongoDB."""
241
        def get_username():
242
            return input("Username: ")
243
244
        def get_email():
245
            return input("Email: ")
246
247
        username = get_username()
248
        email = get_email()
249
        while True:
250
            password = getpass.getpass()
251
            re_password = getpass.getpass('Retype password: ')
252
            if password == re_password:
253
                break
254
        user = {
255
            "username": username,
256
            "email": email,
257
            "password": password,
258
        }
259
        try:
260
            self.user_controller.create_user(user)
261
        except ValidationError as err:
262
            msg = error_msg(err.errors())
263
            return f"Error: {msg}"
264
        except DuplicateKeyError:
265
            return f"Error: {username} already exist."
266
267
        return "User successfully created"
268
269
    def register_core_auth_services(self):
270
        """
271
        Register /kytos/core/ services over authentication.
272
273
        It registers create, authenticate, list all, list specific, delete and
274
        update users.
275
        """
276
        self.controller.api_server.register_core_endpoint(
277
            "auth/login/", self._authenticate_user
278
        )
279
        self.controller.api_server.register_core_endpoint(
280
            "auth/users/", self._list_users,
281
        )
282
        self.controller.api_server.register_core_endpoint(
283
            "auth/users/{username}", self._list_user
284
        )
285
        self.controller.api_server.register_core_endpoint(
286
            "auth/users/", self._create_user, methods=["POST"]
287
        )
288
        self.controller.api_server.register_core_endpoint(
289
            "auth/users/{username}", self._delete_user, methods=["DELETE"]
290
        )
291
        self.controller.api_server.register_core_endpoint(
292
            "auth/users/{username}", self._update_user, methods=["PATCH"]
293
        )
294
295
    def _authenticate_user(self, request: Request) -> JSONResponse:
296
        """Authenticate a user using MongoDB."""
297
        if "Authorization" not in request.headers:
298
            raise HTTPException(400, detail="Authorization header missing")
299
        try:
300
            auth = request.headers["Authorization"]
301
            _scheme, credentials = auth.split()
302
            decoded = base64.b64decode(credentials).decode("ascii")
303
            username, _, password = decoded.partition(":")
304
            password = password.encode()
305
        except (ValueError, TypeError, binascii.Error) as err:
306
            msg = "Credentials were not correctly set."
307
            raise HTTPException(400, detail=msg) from err
308
        user = self._find_user(username)
309
        if user["state"] != 'active':
310
            raise HTTPException(401, detail='This user is not active')
311
        password_hashed = hashing(password, user["hash"])
312
        if user["password"] != password_hashed:
313
            raise HTTPException(401, detail="Incorrect password")
314
        time_exp = datetime.datetime.utcnow() + datetime.timedelta(
315
            minutes=self.token_expiration_minutes
316
        )
317
        token = self._generate_token(username, time_exp)
318
        return JSONResponse({"token": token})
319
320
    def _find_user(self, username):
321
        """Find a specific user using MongoDB."""
322
        user = self.user_controller.get_user(username)
323
        if not user:
324
            raise HTTPException(404, detail=f"User {username} not found")
325
        return user
326
327
    @authenticated
328
    def _list_user(self, request: Request) -> JSONResponse:
329
        """List a specific user using MongoDB."""
330
        username = request.path_params["username"]
331
        user = self.user_controller.get_user_nopw(username)
332
        if not user:
333
            raise HTTPException(404, detail=f"User {username} not found")
334
        return JSONResponse(user)
335
336
    @authenticated
337
    def _list_users(self, _request: Request) -> JSONResponse:
338
        """List all users using MongoDB."""
339
        users_list = self.user_controller.get_users()
340
        return JSONResponse(users_list)
341
342
    def _get_request_body(self, request: Request) -> dict:
343
        """Get JSON from request"""
344
        content_type_json_or_415(request)
345
        body = get_json_or_400(request, self.controller.loop)
346
        if not isinstance(body, dict):
347
            raise HTTPException(400, "Invalid payload type: {body}")
348
        return body
349
350
    @authenticated
351
    def _create_user(self, request: Request) -> JSONResponse:
352
        """Save a user using MongoDB."""
353
        try:
354
            user_data = self._get_request_body(request)
355
            self.user_controller.create_user(user_data)
356
        except ValidationError as err:
357
            msg = error_msg(err.errors())
358
            raise HTTPException(400, msg) from err
359
        except DuplicateKeyError as err:
360
            msg = user_data.get("username") + " already exists."
361
            raise HTTPException(409, detail=msg) from err
362
        return JSONResponse("User successfully created", status_code=201)
363
364
    @authenticated
365
    def _delete_user(self, request: Request) -> JSONResponse:
366
        """Delete a user using MongoDB."""
367
        username = request.path_params["username"]
368
        if not self.user_controller.delete_user(username):
369
            raise HTTPException(404, detail=f"User {username} not found")
370
        return JSONResponse(f"User {username} deleted succesfully")
371
372
    @authenticated
373
    def _update_user(self, request: Request) -> JSONResponse:
374
        """Update user data using MongoDB."""
375
        body = self._get_request_body(request)
376
        username = request.path_params["username"]
377
        allowed = ["username", "email", "password"]
378
        data = {}
379
        for key, value in body.items():
380
            if key in allowed:
381
                data[key] = value
382
        try:
383
            updated = self.user_controller.update_user(username, data)
384
        except ValidationError as err:
385
            msg = error_msg(err.errors())
386
            raise HTTPException(400, detail=msg) from err
387
        except DuplicateKeyError as err:
388
            msg = username + " already exists."
389
            raise HTTPException(409, detail=msg) from err
390
        if not updated:
391
            raise HTTPException(404, detail=f"User {username} not found")
392
        return JSONResponse("User successfully updated")
393