Test Failed
Pull Request — master (#966)
by Gleyberson
02:36
created

kytos.core.auth   B

Complexity

Total Complexity 45

Size/Duplication

Total Lines 328
Duplicated Lines 26.83 %

Test Coverage

Coverage 24.84%

Importance

Changes 0
Metric Value
eloc 235
dl 88
loc 328
ccs 39
cts 157
cp 0.2484
rs 8.8
c 0
b 0
f 0
wmc 45

1 Function

Rating   Name   Duplication   Size   Complexity  
A authenticated() 0 21 3

12 Methods

Rating   Name   Duplication   Size   Complexity  
A Auth.__init__() 0 11 2
A Auth.get_jwt_secret() 0 5 1
A Auth._generate_token() 0 11 1
B Auth._create_user() 0 38 5
B Auth._update_user() 0 39 7
B Auth._delete_user() 30 30 5
A Auth._authenticate_user() 0 16 3
B Auth._find_user() 29 29 5
A Auth._list_users() 29 29 4
A Auth.register_core_auth_services() 0 24 1
A Auth._list_user() 0 7 2
B Auth._create_superuser() 0 29 6

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like kytos.core.auth often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Module with main classes related to Authentication."""
2 1
import datetime
3 1
import getpass
4 1
import hashlib
5 1
import logging
6 1
import time
7 1
from functools import wraps
8 1
from http import HTTPStatus
9
10 1
import jwt
11 1
from flask import jsonify, request
12
13 1
from kytos.core.config import KytosConfig
14 1
from kytos.core.events import KytosEvent
15
16 1
__all__ = ['authenticated']
17
18 1
LOG = logging.getLogger(__name__)
19
20
21 1
def authenticated(func):
22
    """Handle tokens from requests."""
23 1
    @wraps(func)
24
    def wrapper(*args, **kwargs):
25
        """Verify the requires of token."""
26
        try:
27
            content = request.headers.get("Authorization")
28
            if content is None:
29
                raise AttributeError
30
            token = content.split("Bearer ")[1]
31
            jwt.decode(token, key=Auth.get_jwt_secret())
32
        except (
33
            AttributeError,
34
            jwt.ExpiredSignature,
35
            jwt.exceptions.DecodeError,
36
        ):
37
            msg = "Token not sent or expired."
38
            return jsonify({"error": msg}), 401
39
        return func(*args, **kwargs)
40
41 1
    return wrapper
42
43
44 1
class Auth:
45
    """Module used to provide Kytos authentication routes."""
46
47 1
    def __init__(self, controller):
48
        """Init method of Auth class takes the parameters below.
49
50
        Args:
51
            controller(kytos.core.controller): A Controller instance.
52
53
        """
54 1
        self.controller = controller
55 1
        self.namespace = "kytos.core.auth.users"
56 1
        if self.controller.options.create_superuser:
57 1
            self._create_superuser()
58
59 1
    @classmethod
60
    def get_jwt_secret(cls):
61
        """Return JWT secret defined in kytos conf."""
62
        options = KytosConfig().options['daemon']
63
        return options.jwt_secret
64
65 1
    @classmethod
66
    def _generate_token(cls, username, time_exp):
67
        """Generate a jwt token."""
68
        return jwt.encode(
69
            {
70
                'username': username,
71
                'iss': "Kytos NApps Server",
72
                'exp': time_exp,
73
            },
74
            Auth.get_jwt_secret(),
75
            algorithm='HS256',
76
        )
77
78 1
    def _create_superuser(self):
79
        """Create a superuser using Storehouse."""
80
        def _create_superuser_callback(_event, box, error):
81
            if box and not error:
82
                LOG.info("Superuser successfully created")
83
84
        username = input("Username: ")
85
        email = input("Email: ")
86
        pprompt = lambda: (getpass.getpass(),
87
                           getpass.getpass('Retype password: '))
88
        while True:
89
            password, re_password = pprompt()
90
            if password == re_password:
91
                break
92
            print('Passwords do not match. Try again')
93
94
        user = {
95
            "username": username,
96
            "email": email,
97
            "password": hashlib.sha512(password.encode()).hexdigest(),
98
        }
99
        content = {
100
            "namespace": self.namespace,
101
            "box_id": user["username"],
102
            "data": user,
103
            "callback": _create_superuser_callback,
104
        }
105
        event = KytosEvent(name="kytos.storehouse.create", content=content)
106
        self.controller.buffers.app.put(event)
107
108 1
    def register_core_auth_services(self):
109
        """
110
        Register /kytos/core/ services over authentication.
111
112
        It registers create, authenticate, list all, list specific, delete and
113
        update users.
114
        """
115 1
        self.controller.api_server.register_core_endpoint(
116
            "auth/login/", self._authenticate_user
117
        )
118 1
        self.controller.api_server.register_core_endpoint(
119
            "auth/users/", self._list_users
120
        )
121 1
        self.controller.api_server.register_core_endpoint(
122
            "auth/users/<uid>", self._list_user
123
        )
124 1
        self.controller.api_server.register_core_endpoint(
125
            "auth/users/", self._create_user, methods=["POST"]
126
        )
127 1
        self.controller.api_server.register_core_endpoint(
128
            "auth/users/<uid>", self._delete_user, methods=["DELETE"]
129
        )
130 1
        self.controller.api_server.register_core_endpoint(
131
            "auth/users/<uid>", self._update_user, methods=["PATCH"]
132
        )
133
134 1
    def _authenticate_user(self):
135
        """Authenticate a user using Storehouse."""
136
        username = request.authorization["username"]
137
        password = request.authorization["password"].encode()
138
        try:
139
            user = self._find_user(username)[0].get("data")
140
            if user.get("password") != hashlib.sha512(password).hexdigest():
141
                raise KeyError
142
            time_exp = datetime.datetime.utcnow() + datetime.timedelta(
143
                minutes=10
144
            )
145
            token = self._generate_token(username, time_exp)
146
            return {"token": token.decode()}, HTTPStatus.OK.value
147
        except KeyError:
148
            result = "Incorrect username or password"
149
            return result, HTTPStatus.UNAUTHORIZED.value
150
151 1 View Code Duplication
    def _find_user(self, uid):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
152
        """Find a specific user using Storehouse."""
153
        response = {}
154
155
        def _find_user_callback(_event, box, error):
156
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable response does not seem to be defined.
Loading history...
157
            if error or not box:
158
                response = {
159
                    "answer": "User data cannot be shown",
160
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
161
                }
162
            else:
163
                response = {
164
                    "answer": {"data": box.data},
165
                    "code": HTTPStatus.OK.value,
166
                }
167
168
        content = {
169
            "box_id": uid,
170
            "namespace": self.namespace,
171
            "callback": _find_user_callback,
172
        }
173
        event = KytosEvent(name="kytos.storehouse.retrieve", content=content)
174
        self.controller.buffers.app.put(event)
175
        while True:
176
            time.sleep(0.1)
177
            if response:
178
                break
179
        return response["answer"], response["code"]
180
181 1
    @authenticated
182
    def _list_user(self, uid):
183
        """List a specific user using Storehouse."""
184
        answer, code = self._find_user(uid)
185
        if code == HTTPStatus.OK.value:
186
            del answer['data']['password']
187
        return answer, code
188
189 1 View Code Duplication
    @authenticated
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
190
    def _list_users(self):
191
        """List all users using Storehouse."""
192
        response = {}
193
194
        def _list_users_callback(_event, boxes, error):
195
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable response does not seem to be defined.
Loading history...
196
            if error:
197
                response = {
198
                    "answer": "Users cannot be listed",
199
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
200
                }
201
            else:
202
                response = {
203
                    "answer": {"users": boxes},
204
                    "code": HTTPStatus.OK.value,
205
                }
206
207
        content = {
208
            "namespace": self.namespace,
209
            "callback": _list_users_callback,
210
        }
211
        event = KytosEvent(name="kytos.storehouse.list", content=content)
212
        self.controller.buffers.app.put(event)
213
        while True:
214
            time.sleep(0.1)
215
            if response:
216
                break
217
        return response["answer"], response["code"]
218
219 1
    @authenticated
220
    def _create_user(self):
221
        """Save a user using Storehouse."""
222
        response = {}
223
224
        def _create_user_callback(_event, box, error):
225
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable response does not seem to be defined.
Loading history...
226
            if error or not box:
227
                response = {
228
                    "answer": "User cannot be created",
229
                    "code": HTTPStatus.CONFLICT.value,
230
                }
231
            else:
232
                response = {
233
                    "answer": "User successfully created",
234
                    "code": HTTPStatus.OK.value,
235
                }
236
237
        req = request.json
238
        password = req["password"].encode()
239
        data = {
240
            "username": req["username"],
241
            "email": req["email"],
242
            "password": hashlib.sha512(password).hexdigest(),
243
        }
244
        content = {
245
            "namespace": self.namespace,
246
            "box_id": data["username"],
247
            "data": data,
248
            "callback": _create_user_callback,
249
        }
250
        event = KytosEvent(name="kytos.storehouse.create", content=content)
251
        self.controller.buffers.app.put(event)
252
        while True:
253
            time.sleep(0.1)
254
            if response:
255
                break
256
        return response["answer"], response["code"]
257
258 1 View Code Duplication
    @authenticated
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
259
    def _delete_user(self, uid):
260
        """Delete a user using Storehouse."""
261
        response = {}
262
263
        def _delete_user_callback(_event, box, error):
264
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable response does not seem to be defined.
Loading history...
265
            if error or not box:
266
                response = {
267
                    "answer": "User has not been deleted",
268
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
269
                }
270
            else:
271
                response = {
272
                    "answer": "User successfully deleted",
273
                    "code": HTTPStatus.OK.value,
274
                }
275
276
        content = {
277
            "box_id": uid,
278
            "namespace": self.namespace,
279
            "callback": _delete_user_callback,
280
        }
281
        event = KytosEvent(name="kytos.storehouse.delete", content=content)
282
        self.controller.buffers.app.put(event)
283
        while True:
284
            time.sleep(0.1)
285
            if response:
286
                break
287
        return response["answer"], response["code"]
288
289 1
    @authenticated
290
    def _update_user(self, uid):
291
        """Update user data using Storehouse."""
292
        response = {}
293
294
        def _update_user_callback(_event, box, error):
295
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable response does not seem to be defined.
Loading history...
296
            if error or not box:
297
                response = {
298
                    "answer": "User has not been updated",
299
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
300
                }
301
            else:
302
                response = {
303
                    "answer": "User successfully updated",
304
                    "code": HTTPStatus.OK.value,
305
                }
306
307
        req = request.json
308
        allowed = ["username", "email", "password"]
309
310
        data = {}
311
        for key, value in req.items():
312
            if key in allowed:
313
                data[key] = value
314
315
        content = {
316
            "namespace": self.namespace,
317
            "box_id": uid,
318
            "data": data,
319
            "callback": _update_user_callback,
320
        }
321
        event = KytosEvent(name="kytos.storehouse.update", content=content)
322
        self.controller.buffers.app.put(event)
323
        while True:
324
            time.sleep(0.1)
325
            if response:
326
                break
327
        return response["answer"], response["code"]
328