Passed
Push — master ( 925b90...ee7e16 )
by Humberto
02:25 queued 17s
created

kytos.core.auth   B

Complexity

Total Complexity 44

Size/Duplication

Total Lines 355
Duplicated Lines 18.03 %

Test Coverage

Coverage 83.43%

Importance

Changes 0
Metric Value
wmc 44
eloc 256
dl 64
loc 355
ccs 141
cts 169
cp 0.8343
rs 8.8798
c 0
b 0
f 0

12 Methods

Rating   Name   Duplication   Size   Complexity  
A Auth.__init__() 0 11 2
A Auth.get_jwt_secret() 0 5 1
A Auth._generate_token() 0 11 1
B Auth._create_superuser() 0 35 5
A Auth.register_core_auth_services() 0 24 1
A Auth._authenticate_user() 0 16 3
B Auth._find_user() 0 34 5
A Auth._list_user() 0 7 2
B Auth._create_user() 0 43 5
B Auth._update_user() 0 44 7
B Auth._delete_user() 35 35 5
A Auth._list_users() 29 29 4

1 Function

Rating   Name   Duplication   Size   Complexity  
A authenticated() 0 22 3

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like kytos.core.auth often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Module with main classes related to Authentication."""
2 1
import datetime
3 1
import getpass
4 1
import hashlib
5 1
import logging
6 1
import time
7 1
from functools import wraps
8 1
from http import HTTPStatus
9
10 1
import jwt
11 1
from flask import jsonify, request
12
13 1
from kytos.core.config import KytosConfig
14 1
from kytos.core.events import KytosEvent
15
16 1
__all__ = ['authenticated']
17
18 1
LOG = logging.getLogger(__name__)
19
20
21 1
def authenticated(func):
22
    """Handle tokens from requests."""
23 1
    @wraps(func)
24
    def wrapper(*args, **kwargs):
25
        """Verify the requires of token."""
26 1
        try:
27 1
            content = request.headers.get("Authorization")
28 1
            if content is None:
29
                raise AttributeError
30 1
            token = content.split("Bearer ")[1]
31 1
            jwt.decode(token, key=Auth.get_jwt_secret())
32 1
        except (
33
            AttributeError,
34
            IndexError,
35
            jwt.ExpiredSignature,
36
            jwt.exceptions.DecodeError,
37
        ) as exc:
38 1
            msg = f"Token not sent or expired: {exc}"
39 1
            return jsonify({"error": msg}), HTTPStatus.UNAUTHORIZED.value
40 1
        return func(*args, **kwargs)
41
42 1
    return wrapper
43
44
45 1
class Auth:
46
    """Module used to provide Kytos authentication routes."""
47
48 1
    def __init__(self, controller):
49
        """Init method of Auth class takes the parameters below.
50
51
        Args:
52
            controller(kytos.core.controller): A Controller instance.
53
54
        """
55 1
        self.controller = controller
56 1
        self.namespace = "kytos.core.auth.users"
57 1
        if self.controller.options.create_superuser is True:
58
            self._create_superuser()
59
60 1
    @classmethod
61
    def get_jwt_secret(cls):
62
        """Return JWT secret defined in kytos conf."""
63
        options = KytosConfig().options['daemon']
64
        return options.jwt_secret
65
66 1
    @classmethod
67
    def _generate_token(cls, username, time_exp):
68
        """Generate a jwt token."""
69 1
        return jwt.encode(
70
            {
71
                'username': username,
72
                'iss': "Kytos NApps Server",
73
                'exp': time_exp,
74
            },
75
            Auth.get_jwt_secret(),
76
            algorithm='HS256',
77
        )
78
79 1
    def _create_superuser(self):
80
        """Create a superuser using Storehouse."""
81
        def _create_superuser_callback(_event, box, error):
82
            if box and not error:
83
                LOG.info("Superuser successfully created")
84
85
        def get_username():
86
            return input("Username: ")
87
88
        def get_email():
89
            return input("Email: ")
90
91
        username = get_username()
92
        email = get_email()
93
94
        while True:
95
            password = getpass.getpass()
96
            re_password = getpass.getpass('Retype password: ')
97
            if password == re_password:
98
                break
99
            print('Passwords do not match. Try again')
100
101
        user = {
102
            "username": username,
103
            "email": email,
104
            "password": hashlib.sha512(password.encode()).hexdigest(),
105
        }
106
        content = {
107
            "namespace": self.namespace,
108
            "box_id": user["username"],
109
            "data": user,
110
            "callback": _create_superuser_callback,
111
        }
112
        event = KytosEvent(name="kytos.storehouse.create", content=content)
113
        self.controller.buffers.app.put(event)
114
115 1
    def register_core_auth_services(self):
116
        """
117
        Register /kytos/core/ services over authentication.
118
119
        It registers create, authenticate, list all, list specific, delete and
120
        update users.
121
        """
122 1
        self.controller.api_server.register_core_endpoint(
123
            "auth/login/", self._authenticate_user
124
        )
125 1
        self.controller.api_server.register_core_endpoint(
126
            "auth/users/", self._list_users
127
        )
128 1
        self.controller.api_server.register_core_endpoint(
129
            "auth/users/<uid>", self._list_user
130
        )
131 1
        self.controller.api_server.register_core_endpoint(
132
            "auth/users/", self._create_user, methods=["POST"]
133
        )
134 1
        self.controller.api_server.register_core_endpoint(
135
            "auth/users/<uid>", self._delete_user, methods=["DELETE"]
136
        )
137 1
        self.controller.api_server.register_core_endpoint(
138
            "auth/users/<uid>", self._update_user, methods=["PATCH"]
139
        )
140
141 1
    def _authenticate_user(self):
142
        """Authenticate a user using Storehouse."""
143 1
        username = request.authorization["username"]
144 1
        password = request.authorization["password"].encode()
145 1
        try:
146 1
            user = self._find_user(username)[0].get("data")
147 1
            if user.get("password") != hashlib.sha512(password).hexdigest():
148 1
                raise KeyError
149 1
            time_exp = datetime.datetime.utcnow() + datetime.timedelta(
150
                minutes=10
151
            )
152 1
            token = self._generate_token(username, time_exp)
153 1
            return {"token": token.decode()}, HTTPStatus.OK.value
154 1
        except (AttributeError, KeyError) as exc:
155 1
            result = f"Incorrect username or password: {exc}"
156 1
            return result, HTTPStatus.UNAUTHORIZED.value
157
158 1
    def _find_user(self, uid):
159
        """Find a specific user using Storehouse."""
160 1
        response = {}
161
162 1
        def _find_user_callback(_event, box, error):
163
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by Gleyberson Andrade
The variable response does not seem to be defined.
Loading history...
164 1
            if not box:
165 1
                response = {
166
                    "answer": f'User with uid {uid} not found',
167
                    "code": HTTPStatus.NOT_FOUND.value
168
                }
169 1
            elif error:
170
                response = {
171
                    "answer": "User data cannot be shown",
172
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
173
                }
174
            else:
175 1
                response = {
176
                    "answer": {"data": box.data},
177
                    "code": HTTPStatus.OK.value,
178
                }
179
180 1
        content = {
181
            "box_id": uid,
182
            "namespace": self.namespace,
183
            "callback": _find_user_callback,
184
        }
185 1
        event = KytosEvent(name="kytos.storehouse.retrieve", content=content)
186 1
        self.controller.buffers.app.put(event)
187 1
        while True:
188 1
            time.sleep(0.1)
189 1
            if response:
190 1
                break
191 1
        return response["answer"], response["code"]
192
193 1
    @authenticated
194
    def _list_user(self, uid):
195
        """List a specific user using Storehouse."""
196 1
        answer, code = self._find_user(uid)
197 1
        if code == HTTPStatus.OK.value:
198 1
            del answer['data']['password']
199 1
        return answer, code
200
201 1 View Code Duplication
    @authenticated
0 ignored issues
show
Duplication introduced by Gleyberson Andrade
This code seems to be duplicated in your project.
Loading history...
202
    def _list_users(self):
203
        """List all users using Storehouse."""
204 1
        response = {}
205
206 1
        def _list_users_callback(_event, boxes, error):
207
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by Gleyberson Andrade
The variable response does not seem to be defined.
Loading history...
208 1
            if error:
209
                response = {
210
                    "answer": "Users cannot be listed",
211
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
212
                }
213
            else:
214 1
                response = {
215
                    "answer": {"users": boxes},
216
                    "code": HTTPStatus.OK.value,
217
                }
218
219 1
        content = {
220
            "namespace": self.namespace,
221
            "callback": _list_users_callback,
222
        }
223 1
        event = KytosEvent(name="kytos.storehouse.list", content=content)
224 1
        self.controller.buffers.app.put(event)
225 1
        while True:
226 1
            time.sleep(0.1)
227 1
            if response:
228 1
                break
229 1
        return response["answer"], response["code"]
230
231 1
    @authenticated
232
    def _create_user(self):
233
        """Save a user using Storehouse."""
234 1
        response = {}
235
236 1
        def _create_user_callback(_event, box, error):
237
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by Gleyberson Andrade
The variable response does not seem to be defined.
Loading history...
238 1
            if not box:
239 1
                response = {
240
                    "answer": f'User already exists',
241
                    "code": HTTPStatus.CONFLICT.value,
242
                }
243 1
            elif error:
244
                response = {
245
                    "answer": "User has not been created",
246
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
247
                }
248
            else:
249 1
                response = {
250
                    "answer": "User successfully created",
251
                    "code": HTTPStatus.OK.value,
252
                }
253
254 1
        req = request.json
255 1
        password = req["password"].encode()
256 1
        data = {
257
            "username": req["username"],
258
            "email": req["email"],
259
            "password": hashlib.sha512(password).hexdigest(),
260
        }
261 1
        content = {
262
            "namespace": self.namespace,
263
            "box_id": data["username"],
264
            "data": data,
265
            "callback": _create_user_callback,
266
        }
267 1
        event = KytosEvent(name="kytos.storehouse.create", content=content)
268 1
        self.controller.buffers.app.put(event)
269 1
        while True:
270 1
            time.sleep(0.1)
271 1
            if response:
272 1
                break
273 1
        return response["answer"], response["code"]
274
275 1 View Code Duplication
    @authenticated
0 ignored issues
show
Duplication introduced by Gleyberson Andrade
This code seems to be duplicated in your project.
Loading history...
276
    def _delete_user(self, uid):
277
        """Delete a user using Storehouse."""
278 1
        response = {}
279
280 1
        def _delete_user_callback(_event, box, error):
281
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by Gleyberson Andrade
The variable response does not seem to be defined.
Loading history...
282 1
            if not box:
283 1
                response = {
284
                    "answer": f'User with uid {uid} not found',
285
                    "code": HTTPStatus.NOT_FOUND.value
286
                }
287 1
            elif error:
288
                response = {
289
                    "answer": "User has not been deleted",
290
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
291
                }
292
            else:
293 1
                response = {
294
                    "answer": "User successfully deleted",
295
                    "code": HTTPStatus.OK.value,
296
                }
297
298 1
        content = {
299
            "box_id": uid,
300
            "namespace": self.namespace,
301
            "callback": _delete_user_callback,
302
        }
303 1
        event = KytosEvent(name="kytos.storehouse.delete", content=content)
304 1
        self.controller.buffers.app.put(event)
305 1
        while True:
306 1
            time.sleep(0.1)
307 1
            if response:
308 1
                break
309 1
        return response["answer"], response["code"]
310
311 1
    @authenticated
312
    def _update_user(self, uid):
313
        """Update user data using Storehouse."""
314 1
        response = {}
315
316 1
        def _update_user_callback(_event, box, error):
317
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by Gleyberson Andrade
The variable response does not seem to be defined.
Loading history...
318 1
            if not box:
319 1
                response = {
320
                    "answer": f'User with uid {uid} not found',
321
                    "code": HTTPStatus.NOT_FOUND.value
322
                }
323 1
            elif error:
324
                response = {
325
                    "answer": "User has not been updated",
326
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
327
                }
328
            else:
329 1
                response = {
330
                    "answer": "User successfully updated",
331
                    "code": HTTPStatus.OK.value,
332
                }
333
334 1
        req = request.json
335 1
        allowed = ["username", "email", "password"]
336
337 1
        data = {}
338 1
        for key, value in req.items():
339 1
            if key in allowed:
340 1
                data[key] = value
341
342 1
        content = {
343
            "namespace": self.namespace,
344
            "box_id": uid,
345
            "data": data,
346
            "callback": _update_user_callback,
347
        }
348 1
        event = KytosEvent(name="kytos.storehouse.update", content=content)
349 1
        self.controller.buffers.app.put(event)
350 1
        while True:
351 1
            time.sleep(0.1)
352 1
            if response:
353 1
                break
354
        return response["answer"], response["code"]
355