Passed
Pull Request — master (#300)
by
unknown
04:17
created

kytos.core.auth   C

Complexity

Total Complexity 56

Size/Duplication

Total Lines 375
Duplicated Lines 0 %

Test Coverage

Coverage 84.98%

Importance

Changes 0
Metric Value
eloc 273
dl 0
loc 375
rs 5.5199
c 0
b 0
f 0
ccs 181
cts 213
cp 0.8498
wmc 56

23 Methods

Rating   Name   Duplication   Size   Complexity  
A Auth.get_token_expiration() 0 5 1
A Auth._generate_token() 0 11 1
A UserController.create_user() 0 15 3
A UserController.__init__() 0 4 2
A Auth.__init__() 0 13 2
A Auth.get_jwt_secret() 0 5 1
A UserController.get_user() 0 12 2
A UserController.delete_user() 0 12 1
A UserController.update_user() 0 19 3
A UserController.get_user_nopw() 0 12 2
B Auth._create_superuser() 0 28 5
A Auth.get_user_controller() 0 4 1
A Auth.register_core_auth_services() 0 24 1
A UserController.bootstrap_indexes() 0 9 3
A UserController.get_users() 0 6 1
A Auth._create_user() 0 10 3
A Auth._get_request() 0 20 5
B Auth._update_user() 0 18 6
A Auth._find_user() 0 7 2
A Auth._delete_user() 0 6 2
A Auth._list_users() 0 5 1
A Auth._list_user() 0 8 2
A Auth._authenticate_user() 0 15 3

1 Function

Rating   Name   Duplication   Size   Complexity  
A authenticated() 0 24 3

How to fix   Complexity   

Complexity

Complex classes like kytos.core.auth often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Module with main classes related to Authentication."""
2
3
# pylint: disable=invalid-name
4 1
import datetime
5 1
import getpass
6 1
import hashlib
7 1
import logging
8 1
import os
9 1
import uuid
10 1
from functools import wraps
11 1
from http import HTTPStatus
12
13 1
import jwt
14 1
import pymongo
15 1
from flask import jsonify, request
16 1
from pydantic import ValidationError
17 1
from pymongo.collection import ReturnDocument
18 1
from pymongo.errors import AutoReconnect, DuplicateKeyError
19 1
from pymongo.results import InsertOneResult
20 1
from tenacity import retry_if_exception_type, stop_after_attempt, wait_random
21 1
from werkzeug.exceptions import (BadRequest, Conflict, NotFound, Unauthorized,
22
                                 UnsupportedMediaType)
23
24 1
from kytos.core.config import KytosConfig
25 1
from kytos.core.db import Mongo
26 1
from kytos.core.retry import before_sleep, for_all_methods, retries
27 1
from kytos.core.user import UserDoc, UserDocUpdate
28
29 1
__all__ = ['authenticated']
30
31 1
LOG = logging.getLogger(__name__)
32
33
34 1
def authenticated(func):
35
    """Handle tokens from requests."""
36 1
    @wraps(func)
37 1
    def wrapper(*args, **kwargs):
38
        """Verify the requires of token."""
39 1
        try:
40 1
            content = request.headers.get("Authorization")
41 1
            if content is None:
42
                raise ValueError("The attribute 'content' has an invalid "
43
                                 "value 'None'.")
44 1
            token = content.split("Bearer ")[1]
45 1
            jwt.decode(token, key=Auth.get_jwt_secret(),
46
                       algorithms=[Auth.encode_algorithm])
47 1
        except (
48
            ValueError,
49
            IndexError,
50
            jwt.exceptions.ExpiredSignatureError,
51
            jwt.exceptions.DecodeError,
52
        ) as exc:
53 1
            msg = f"Token not sent or expired: {exc}"
54 1
            return jsonify({"error": msg}), HTTPStatus.UNAUTHORIZED.value
55 1
        return func(*args, **kwargs)
56
57 1
    return wrapper
58
59
60 1
@for_all_methods(
61
    retries,
62
    stop=stop_after_attempt(
63
        int(os.environ.get("MONGO_AUTO_RETRY_STOP_AFTER_ATTEMPT", 3))
64
    ),
65
    wait=wait_random(
66
        min=int(os.environ.get("MONGO_AUTO_RETRY_WAIT_RANDOM_MIN", 0.1)),
67
        max=int(os.environ.get("MONGO_AUTO_RETRY_WAIT_RANDOM_MAX", 1)),
68
    ),
69
    before_sleep=before_sleep,
70
    retry=retry_if_exception_type((AutoReconnect,)),
71
)
72 1
class UserController:
73
    """UserController"""
74
75
    # pylint: disable=unnecessary-lambda
76 1
    def __init__(self, get_mongo=lambda: Mongo()):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable Mongo does not seem to be defined.
Loading history...
77 1
        self.mongo = get_mongo()
78 1
        self.db_client = self.mongo.client
79 1
        self.db = self.db_client[self.mongo.db_name]
80
81 1
    def bootstrap_indexes(self):
82
        """Bootstrap all users related indexes."""
83 1
        index_tuples = [
84
            ("users", [("username", pymongo.ASCENDING)], {"unique": True}),
85
        ]
86 1
        for collection, keys, kwargs in index_tuples:
87 1
            if self.mongo.bootstrap_index(collection, keys, **kwargs):
88 1
                LOG.info(
89
                    f"Created DB index {keys}, collection: {collection}"
90
                )
91
92 1
    def create_user(self, user_data: dict) -> InsertOneResult:
93
        """Create user to database"""
94 1
        try:
95 1
            utc_now = datetime.datetime.utcnow()
96 1
            result = self.db.users.insert_one(UserDoc(**{
97
                **{"_id": str(uuid.uuid4())},
98
                **user_data,
99
                **{"inserted_at": utc_now},
100
                **{"updated_at": utc_now}
101
            }).dict())
102 1
        except DuplicateKeyError as err:
103 1
            raise err
104 1
        except ValidationError as err:
105 1
            raise err
106 1
        return result
107
108 1
    def delete_user(self, username: str) -> dict:
109
        """Delete user from database"""
110 1
        utc_now = datetime.datetime.utcnow()
111 1
        result = self.db.users.find_one_and_update(
112
            {"username": username},
113
            {"$set": {
114
                "state": "inactive",
115
                "deleted_at": utc_now,
116
            }},
117
            return_document=ReturnDocument.AFTER,
118
        )
119 1
        return result
120
121 1
    def update_user(self, username: str, data: dict) -> dict:
122
        """Update user from database"""
123 1
        utc_now = datetime.datetime.utcnow()
124 1
        try:
125 1
            result = self.db.users.find_one_and_update(
126
                {"username": username},
127
                {
128
                    "$set": UserDocUpdate(**{
129
                        **data,
130
                        **{"updated_at": utc_now}
131
                    }).dict(exclude_none=True)
132
                },
133
                return_document=ReturnDocument.AFTER
134
            )
135 1
        except ValidationError as err:
136 1
            raise err
137 1
        except DuplicateKeyError as err:
138 1
            raise err
139 1
        return result
140
141 1
    def get_user(self, username: str) -> dict:
142
        """Return a user information from database"""
143 1
        data = self.db.users.aggregate([
144
            {"$match": {"username": username}},
145
            {"$project": UserDoc.projection()},
146
            {"$limit": 1}
147
        ])
148 1
        try:
149 1
            user, *_ = list(value for value in data)
150 1
            return user
151 1
        except ValueError:
152 1
            return {}
153
154 1
    def get_user_nopw(self, username: str) -> dict:
155
        """Return a user information from database without password"""
156 1
        data = self.db.users.aggregate([
157
            {"$match": {"username": username}},
158
            {"$project": UserDoc.projection_nopw()},
159
            {"$limit": 1}
160
        ])
161 1
        try:
162 1
            user, *_ = list(value for value in data)
163 1
            return user
164 1
        except ValueError:
165 1
            return {}
166
167 1
    def get_users(self) -> dict:
168
        """Return all the users"""
169 1
        data = self.db.users.aggregate([
170
            {"$project": UserDoc.projection_nopw()}
171
        ])
172 1
        return {'users': list(value for value in data)}
173
174
175 1
class Auth:
176
    """Module used to provide Kytos authentication routes."""
177
178 1
    encode_algorithm = "HS256"
179
180 1
    def __init__(self, controller):
181
        """Init method of Auth class takes the parameters below.
182
183
        Args:
184
            controller(kytos.core.controller): A Controller instance.
185
186
        """
187 1
        self.user_controller = self.get_user_controller()
188 1
        self.user_controller.bootstrap_indexes()
189 1
        self.controller = controller
190 1
        self.token_expiration_minutes = self.get_token_expiration()
191 1
        if self.controller.options.create_superuser is True:
192
            self._create_superuser()
193
194 1
    @staticmethod
195 1
    def get_user_controller():
196
        """Get UserController"""
197
        return UserController()
198
199 1
    @staticmethod
200 1
    def get_token_expiration():
201
        """Return token expiration time in minutes defined in kytos conf."""
202 1
        options = KytosConfig().options['daemon']
203 1
        return options.token_expiration_minutes
204
205 1
    @classmethod
206 1
    def get_jwt_secret(cls):
207
        """Return JWT secret defined in kytos conf."""
208
        options = KytosConfig().options['daemon']
209
        return options.jwt_secret
210
211 1
    @classmethod
212 1
    def _generate_token(cls, username, time_exp):
213
        """Generate a jwt token."""
214 1
        return jwt.encode(
215
            {
216
                'username': username,
217
                'iss': "Kytos NApps Server",
218
                'exp': time_exp,
219
            },
220
            Auth.get_jwt_secret(),
221
            algorithm=cls.encode_algorithm,
222
        )
223
224 1
    def _create_superuser(self):
225
        """Create a superuser using MongoDB."""
226
        def get_username():
227
            return input("Username: ")
228
229
        def get_email():
230
            return input("Email: ")
231
232
        username = get_username()
233
        email = get_email()
234
        while True:
235
            password = getpass.getpass()
236
            re_password = getpass.getpass('Retype password: ')
237
            if password == re_password:
238
                break
239
        user = {
240
            "username": username,
241
            "email": email,
242
            "password": password,
243
        }
244
        try:
245
            self.user_controller.create_user(user)
246
        except ValidationError:
247
            print("Inputs could not be validated.")
248
        except DuplicateKeyError:
249
            print(f"{username} already exist.")
250
251
        return "User successfully created"
252
253 1
    def register_core_auth_services(self):
254
        """
255
        Register /kytos/core/ services over authentication.
256
257
        It registers create, authenticate, list all, list specific, delete and
258
        update users.
259
        """
260 1
        self.controller.api_server.register_core_endpoint(
261
            "auth/login/", self._authenticate_user
262
        )
263 1
        self.controller.api_server.register_core_endpoint(
264
            "auth/users/", self._list_users,
265
        )
266 1
        self.controller.api_server.register_core_endpoint(
267
            "auth/users/<username>", self._list_user
268
        )
269 1
        self.controller.api_server.register_core_endpoint(
270
            "auth/users/", self._create_user, methods=["POST"]
271
        )
272 1
        self.controller.api_server.register_core_endpoint(
273
            "auth/users/<username>", self._delete_user, methods=["DELETE"]
274
        )
275 1
        self.controller.api_server.register_core_endpoint(
276
            "auth/users/<username>", self._update_user, methods=["PATCH"]
277
        )
278
279 1
    def _authenticate_user(self):
280
        """Authenticate a user using MongoDB."""
281 1
        try:
282 1
            username = request.authorization["username"]
283 1
            password = request.authorization["password"].encode()
284 1
        except TypeError as err:
285 1
            raise BadRequest("Credentials were not sent.") from err
286 1
        user = self._find_user(username)
287 1
        if user["password"] != hashlib.sha512(password).hexdigest():
288 1
            raise Unauthorized("Incorrect password")
289 1
        time_exp = datetime.datetime.utcnow() + datetime.timedelta(
290
            minutes=self.token_expiration_minutes
291
        )
292 1
        token = self._generate_token(username, time_exp)
293 1
        return {"token": token}, HTTPStatus.OK.value
294
295 1
    def _find_user(self, username):
296
        """Find a specific user using MongoDB."""
297 1
        user = self.user_controller.get_user(username)
298 1
        if not user:
299 1
            result = f"User {username} not found"
300 1
            raise NotFound(result)
301 1
        return user
302
303 1
    @authenticated
304 1
    def _list_user(self, username):
305
        """List a specific user using MongoDB."""
306 1
        user = self.user_controller.get_user_nopw(username)
307 1
        if not user:
308 1
            result = f"User {username} not found"
309 1
            raise NotFound(result)
310 1
        return user
311
312 1
    @authenticated
313 1
    def _list_users(self):
314
        """List all users using MongoDB."""
315 1
        users_list = self.user_controller.get_users()
316 1
        return users_list
317
318 1
    @staticmethod
319 1
    def _get_request():
320
        """Get JSON from request"""
321 1
        try:
322 1
            metadata = request.json
323 1
            content_type = request.content_type
324
        except BadRequest as err:
325
            result = 'The request body is not a well-formed JSON.'
326
            raise BadRequest(result) from err
327 1
        if content_type is None:
328
            result = 'The request body is empty.'
329
            raise BadRequest(result)
330 1
        if metadata is None:
331
            if content_type != 'application/json':
332
                result = ('The content type must be application/json '
333
                          f'(received {content_type}).')
334
            else:
335
                result = 'Metadata is empty.'
336
            raise UnsupportedMediaType(result)
337 1
        return metadata
338
339 1
    @authenticated
340 1
    def _create_user(self):
341
        """Save a user using MongoDB."""
342 1
        try:
343 1
            self.user_controller.create_user(self._get_request())
344 1
        except ValidationError as err:
345 1
            raise BadRequest from err
346 1
        except DuplicateKeyError as err:
347 1
            raise Conflict from err
348 1
        return jsonify("User successfully created"), 201
349
350 1
    @authenticated
351 1
    def _delete_user(self, username):
352
        """Delete a user using MongoDB."""
353 1
        if not self.user_controller.delete_user(username):
354 1
            raise NotFound(f"User {username} not found")
355 1
        return jsonify(f"User {username} deleted succesfully"), 200
356
357 1
    @authenticated
358 1
    def _update_user(self, username):
359
        """Update user data using MongoDB."""
360 1
        req = self._get_request()
361 1
        allowed = ["username", "email", "password"]
362 1
        data = {}
363 1
        for key, value in req.items():
364 1
            if key in allowed:
365 1
                data[key] = value
366 1
        try:
367 1
            updated = self.user_controller.update_user(username, data)
368 1
        except ValidationError as err:
369 1
            raise BadRequest from err
370 1
        except DuplicateKeyError as err:
371 1
            raise Conflict from err
372 1
        if not updated:
373 1
            raise NotFound(f"User {username} not found")
374
        return jsonify("User successfully updated"), 200
375