kytos.core.auth   B
last analyzed

Complexity

Total Complexity 45

Size/Duplication

Total Lines 365
Duplicated Lines 17.53 %

Test Coverage

Coverage 82.86%

Importance

Changes 0
Metric Value
eloc 263
dl 64
loc 365
ccs 145
cts 175
cp 0.8286
rs 8.8
c 0
b 0
f 0
wmc 45

13 Methods

Rating   Name   Duplication   Size   Complexity  
A Auth._generate_token() 0 11 1
A Auth.get_token_expiration() 0 5 1
A Auth.__init__() 0 12 2
A Auth._authenticate_user() 0 16 3
B Auth._create_user() 0 43 5
B Auth._update_user() 0 44 7
B Auth._find_user() 0 34 5
B Auth._create_superuser() 0 37 5
B Auth._delete_user() 35 35 5
A Auth._list_users() 29 29 4
A Auth.register_core_auth_services() 0 24 1
A Auth._list_user() 0 7 2
A Auth.get_jwt_secret() 0 5 1

1 Function

Rating   Name   Duplication   Size   Complexity  
A authenticated() 0 23 3

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like kytos.core.auth often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Module with main classes related to Authentication."""
2 1
import datetime
3 1
import getpass
4 1
import hashlib
5 1
import logging
6 1
import time
7 1
from functools import wraps
8 1
from http import HTTPStatus
9
10 1
import jwt
11 1
from flask import jsonify, request
12
13 1
from kytos.core.config import KytosConfig
14 1
from kytos.core.events import KytosEvent
15
16 1
__all__ = ['authenticated']
17
18 1
LOG = logging.getLogger(__name__)
19
20
21 1
def authenticated(func):
22
    """Handle tokens from requests."""
23 1
    @wraps(func)
24
    def wrapper(*args, **kwargs):
25
        """Verify the requires of token."""
26 1
        try:
27 1
            content = request.headers.get("Authorization")
28 1
            if content is None:
29
                raise ValueError("The attribute 'content' has an invalid "
30
                                 "value 'None'.")
31 1
            token = content.split("Bearer ")[1]
32 1
            jwt.decode(token, key=Auth.get_jwt_secret())
33 1
        except (
34
            ValueError,
35
            IndexError,
36
            jwt.ExpiredSignature,
37
            jwt.exceptions.DecodeError,
38
        ) as exc:
39 1
            msg = f"Token not sent or expired: {exc}"
40 1
            return jsonify({"error": msg}), HTTPStatus.UNAUTHORIZED.value
41 1
        return func(*args, **kwargs)
42
43 1
    return wrapper
44
45
46 1
class Auth:
47
    """Module used to provide Kytos authentication routes."""
48
49 1
    def __init__(self, controller):
50
        """Init method of Auth class takes the parameters below.
51
52
        Args:
53
            controller(kytos.core.controller): A Controller instance.
54
55
        """
56 1
        self.controller = controller
57 1
        self.namespace = "kytos.core.auth.users"
58 1
        self.token_expiration_minutes = self.get_token_expiration()
59 1
        if self.controller.options.create_superuser is True:
60
            self._create_superuser()
61
62 1
    @staticmethod
63
    def get_token_expiration():
64
        """Return token expiration time in minutes defined in kytos conf."""
65 1
        options = KytosConfig().options['daemon']
66 1
        return options.token_expiration_minutes
67
68 1
    @classmethod
69
    def get_jwt_secret(cls):
70
        """Return JWT secret defined in kytos conf."""
71
        options = KytosConfig().options['daemon']
72
        return options.jwt_secret
73
74 1
    @classmethod
75
    def _generate_token(cls, username, time_exp):
76
        """Generate a jwt token."""
77 1
        return jwt.encode(
78
            {
79
                'username': username,
80
                'iss': "Kytos NApps Server",
81
                'exp': time_exp,
82
            },
83
            Auth.get_jwt_secret(),
84
            algorithm='HS256',
85
        )
86
87 1
    def _create_superuser(self):
88
        """Create a superuser using Storehouse."""
89
        def _create_superuser_callback(_event, box, error):
90
            if error:
91
                LOG.error('Superuser was not created. Error: %s', error)
92
            if box:
93
                LOG.info("Superuser successfully created")
94
95
        def get_username():
96
            return input("Username: ")
97
98
        def get_email():
99
            return input("Email: ")
100
101
        username = get_username()
102
        email = get_email()
103
104
        while True:
105
            password = getpass.getpass()
106
            re_password = getpass.getpass('Retype password: ')
107
            if password == re_password:
108
                break
109
            print('Passwords do not match. Try again')
110
111
        user = {
112
            "username": username,
113
            "email": email,
114
            "password": hashlib.sha512(password.encode()).hexdigest(),
115
        }
116
        content = {
117
            "namespace": self.namespace,
118
            "box_id": user["username"],
119
            "data": user,
120
            "callback": _create_superuser_callback,
121
        }
122
        event = KytosEvent(name="kytos.storehouse.create", content=content)
123
        self.controller.buffers.app.put(event)
124
125 1
    def register_core_auth_services(self):
126
        """
127
        Register /kytos/core/ services over authentication.
128
129
        It registers create, authenticate, list all, list specific, delete and
130
        update users.
131
        """
132 1
        self.controller.api_server.register_core_endpoint(
133
            "auth/login/", self._authenticate_user
134
        )
135 1
        self.controller.api_server.register_core_endpoint(
136
            "auth/users/", self._list_users
137
        )
138 1
        self.controller.api_server.register_core_endpoint(
139
            "auth/users/<uid>", self._list_user
140
        )
141 1
        self.controller.api_server.register_core_endpoint(
142
            "auth/users/", self._create_user, methods=["POST"]
143
        )
144 1
        self.controller.api_server.register_core_endpoint(
145
            "auth/users/<uid>", self._delete_user, methods=["DELETE"]
146
        )
147 1
        self.controller.api_server.register_core_endpoint(
148
            "auth/users/<uid>", self._update_user, methods=["PATCH"]
149
        )
150
151 1
    def _authenticate_user(self):
152
        """Authenticate a user using Storehouse."""
153 1
        username = request.authorization["username"]
154 1
        password = request.authorization["password"].encode()
155 1
        try:
156 1
            user = self._find_user(username)[0].get("data")
157 1
            if user.get("password") != hashlib.sha512(password).hexdigest():
158 1
                raise KeyError
159 1
            time_exp = datetime.datetime.utcnow() + datetime.timedelta(
160
                minutes=self.token_expiration_minutes
161
            )
162 1
            token = self._generate_token(username, time_exp)
163 1
            return {"token": token.decode()}, HTTPStatus.OK.value
164 1
        except (AttributeError, KeyError) as exc:
165 1
            result = f"Incorrect username or password: {exc}"
166 1
            return result, HTTPStatus.UNAUTHORIZED.value
167
168 1
    def _find_user(self, uid):
169
        """Find a specific user using Storehouse."""
170 1
        response = {}
171
172 1
        def _find_user_callback(_event, box, error):
173
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable response does not seem to be defined.
Loading history...
174 1
            if not box:
175 1
                response = {
176
                    "answer": f'User with uid {uid} not found',
177
                    "code": HTTPStatus.NOT_FOUND.value
178
                }
179 1
            elif error:
180
                response = {
181
                    "answer": "User data cannot be shown",
182
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
183
                }
184
            else:
185 1
                response = {
186
                    "answer": {"data": box.data},
187
                    "code": HTTPStatus.OK.value,
188
                }
189
190 1
        content = {
191
            "box_id": uid,
192
            "namespace": self.namespace,
193
            "callback": _find_user_callback,
194
        }
195 1
        event = KytosEvent(name="kytos.storehouse.retrieve", content=content)
196 1
        self.controller.buffers.app.put(event)
197 1
        while True:
198 1
            time.sleep(0.1)
199 1
            if response:
200 1
                break
201 1
        return response["answer"], response["code"]
202
203 1
    @authenticated
204
    def _list_user(self, uid):
205
        """List a specific user using Storehouse."""
206 1
        answer, code = self._find_user(uid)
207 1
        if code == HTTPStatus.OK.value:
208 1
            del answer['data']['password']
209 1
        return answer, code
210
211 1 View Code Duplication
    @authenticated
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
212
    def _list_users(self):
213
        """List all users using Storehouse."""
214 1
        response = {}
215
216 1
        def _list_users_callback(_event, boxes, error):
217
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable response does not seem to be defined.
Loading history...
218 1
            if error:
219
                response = {
220
                    "answer": "Users cannot be listed",
221
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
222
                }
223
            else:
224 1
                response = {
225
                    "answer": {"users": boxes},
226
                    "code": HTTPStatus.OK.value,
227
                }
228
229 1
        content = {
230
            "namespace": self.namespace,
231
            "callback": _list_users_callback,
232
        }
233 1
        event = KytosEvent(name="kytos.storehouse.list", content=content)
234 1
        self.controller.buffers.app.put(event)
235 1
        while True:
236 1
            time.sleep(0.1)
237 1
            if response:
238 1
                break
239 1
        return response["answer"], response["code"]
240
241 1
    @authenticated
242
    def _create_user(self):
243
        """Save a user using Storehouse."""
244 1
        response = {}
245
246 1
        def _create_user_callback(_event, box, error):
247
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable response does not seem to be defined.
Loading history...
248 1
            if not box:
249 1
                response = {
250
                    "answer": f'User already exists',
251
                    "code": HTTPStatus.CONFLICT.value,
252
                }
253 1
            elif error:
254
                response = {
255
                    "answer": "User has not been created",
256
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
257
                }
258
            else:
259 1
                response = {
260
                    "answer": "User successfully created",
261
                    "code": HTTPStatus.OK.value,
262
                }
263
264 1
        req = request.json
265 1
        password = req["password"].encode()
266 1
        data = {
267
            "username": req["username"],
268
            "email": req["email"],
269
            "password": hashlib.sha512(password).hexdigest(),
270
        }
271 1
        content = {
272
            "namespace": self.namespace,
273
            "box_id": data["username"],
274
            "data": data,
275
            "callback": _create_user_callback,
276
        }
277 1
        event = KytosEvent(name="kytos.storehouse.create", content=content)
278 1
        self.controller.buffers.app.put(event)
279 1
        while True:
280 1
            time.sleep(0.1)
281 1
            if response:
282 1
                break
283 1
        return response["answer"], response["code"]
284
285 1 View Code Duplication
    @authenticated
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
286
    def _delete_user(self, uid):
287
        """Delete a user using Storehouse."""
288 1
        response = {}
289
290 1
        def _delete_user_callback(_event, box, error):
291
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable response does not seem to be defined.
Loading history...
292 1
            if not box:
293 1
                response = {
294
                    "answer": f'User with uid {uid} not found',
295
                    "code": HTTPStatus.NOT_FOUND.value
296
                }
297 1
            elif error:
298
                response = {
299
                    "answer": "User has not been deleted",
300
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
301
                }
302
            else:
303 1
                response = {
304
                    "answer": "User successfully deleted",
305
                    "code": HTTPStatus.OK.value,
306
                }
307
308 1
        content = {
309
            "box_id": uid,
310
            "namespace": self.namespace,
311
            "callback": _delete_user_callback,
312
        }
313 1
        event = KytosEvent(name="kytos.storehouse.delete", content=content)
314 1
        self.controller.buffers.app.put(event)
315 1
        while True:
316 1
            time.sleep(0.1)
317 1
            if response:
318 1
                break
319 1
        return response["answer"], response["code"]
320
321 1
    @authenticated
322
    def _update_user(self, uid):
323
        """Update user data using Storehouse."""
324 1
        response = {}
325
326 1
        def _update_user_callback(_event, box, error):
327
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable response does not seem to be defined.
Loading history...
328 1
            if not box:
329 1
                response = {
330
                    "answer": f'User with uid {uid} not found',
331
                    "code": HTTPStatus.NOT_FOUND.value
332
                }
333 1
            elif error:
334
                response = {
335
                    "answer": "User has not been updated",
336
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
337
                }
338
            else:
339 1
                response = {
340
                    "answer": "User successfully updated",
341
                    "code": HTTPStatus.OK.value,
342
                }
343
344 1
        req = request.json
345 1
        allowed = ["username", "email", "password"]
346
347 1
        data = {}
348 1
        for key, value in req.items():
349 1
            if key in allowed:
350 1
                data[key] = value
351
352 1
        content = {
353
            "namespace": self.namespace,
354
            "box_id": uid,
355
            "data": data,
356
            "callback": _update_user_callback,
357
        }
358 1
        event = KytosEvent(name="kytos.storehouse.update", content=content)
359 1
        self.controller.buffers.app.put(event)
360 1
        while True:
361 1
            time.sleep(0.1)
362 1
            if response:
363 1
                break
364
        return response["answer"], response["code"]
365