Test Failed
Pull Request — master (#966)
by Gleyberson
03:38
created

kytos.core.auth.Auth.__init__()   A

Complexity

Conditions 2

Size

Total Lines 11
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2.032

Importance

Changes 0
Metric Value
cc 2
eloc 5
nop 2
dl 0
loc 11
ccs 4
cts 5
cp 0.8
crap 2.032
rs 10
c 0
b 0
f 0
1
"""Module with main classes related to Authentication."""
2 1
import datetime
3 1
import getpass
4 1
import hashlib
5 1
import logging
6 1
import time
7 1
from functools import wraps
8 1
from http import HTTPStatus
9
10 1
import jwt
11 1
from flask import jsonify, request
12
13 1
from kytos.core.config import KytosConfig
14 1
from kytos.core.events import KytosEvent
15
16 1
__all__ = ['authenticated']
17
18 1
LOG = logging.getLogger(__name__)
19
20
21 1
def authenticated(func):
22
    """Handle tokens from requests."""
23 1
    @wraps(func)
24
    def wrapper(*args, **kwargs):
25
        """Verify the requires of token."""
26 1
        try:
27 1
            content = request.headers.get("Authorization")
28 1
            if content is None:
29
                raise AttributeError
30 1
            token = content.split("Bearer ")[1]
31 1
            jwt.decode(token, key=Auth.get_jwt_secret())
32 1
        except (
33
            AttributeError,
34
            IndexError,
35
            jwt.ExpiredSignature,
36
            jwt.exceptions.DecodeError,
37
        ) as exc:
38 1
            msg = f"Token not sent or expired: {exc}"
39 1
            return jsonify({"error": msg}), HTTPStatus.UNAUTHORIZED.value
40 1
        return func(*args, **kwargs)
41
42 1
    return wrapper
43
44
45 1
class Auth:
46
    """Module used to provide Kytos authentication routes."""
47
48 1
    def __init__(self, controller):
49
        """Init method of Auth class takes the parameters below.
50
51
        Args:
52
            controller(kytos.core.controller): A Controller instance.
53
54
        """
55 1
        self.controller = controller
56 1
        self.namespace = "kytos.core.auth.users"
57 1
        if self.controller.options.create_superuser is True:
58
            self._create_superuser()
59
60 1
    @classmethod
61
    def get_jwt_secret(cls):
62
        """Return JWT secret defined in kytos conf."""
63
        options = KytosConfig().options['daemon']
64
        return options.jwt_secret
65
66 1
    @classmethod
67
    def _generate_token(cls, username, time_exp):
68
        """Generate a jwt token."""
69 1
        return jwt.encode(
70
            {
71
                'username': username,
72
                'iss': "Kytos NApps Server",
73
                'exp': time_exp,
74
            },
75
            Auth.get_jwt_secret(),
76
            algorithm='HS256',
77
        )
78
79 1
    def _create_superuser(self):
80
        """Create a superuser using Storehouse."""
81
        def _create_superuser_callback(_event, box, error):
82
            if box and not error:
83
                LOG.info("Superuser successfully created")
84
85
        def get_username():
86
            return input("Username: ")
87
88
        def get_email():
89
            return input("Email: ")
90
91
        username = get_username()
92
        email = get_email()
93
94
        while True:
95
            password = getpass.getpass()
96
            re_password = getpass.getpass('Retype password: ')
97
            if password == re_password:
98
                break
99
            print('Passwords do not match. Try again')
100
101
        user = {
102
            "username": username,
103
            "email": email,
104
            "password": hashlib.sha512(password.encode()).hexdigest(),
105
        }
106
        content = {
107
            "namespace": self.namespace,
108
            "box_id": user["username"],
109
            "data": user,
110
            "callback": _create_superuser_callback,
111
        }
112
        event = KytosEvent(name="kytos.storehouse.create", content=content)
113
        self.controller.buffers.app.put(event)
114
115 1
    def register_core_auth_services(self):
116
        """
117
        Register /kytos/core/ services over authentication.
118
119
        It registers create, authenticate, list all, list specific, delete and
120
        update users.
121
        """
122 1
        self.controller.api_server.register_core_endpoint(
123
            "auth/login/", self._authenticate_user
124
        )
125 1
        self.controller.api_server.register_core_endpoint(
126
            "auth/users/", self._list_users
127
        )
128 1
        self.controller.api_server.register_core_endpoint(
129
            "auth/users/<uid>", self._list_user
130
        )
131 1
        self.controller.api_server.register_core_endpoint(
132
            "auth/users/", self._create_user, methods=["POST"]
133
        )
134 1
        self.controller.api_server.register_core_endpoint(
135
            "auth/users/<uid>", self._delete_user, methods=["DELETE"]
136
        )
137 1
        self.controller.api_server.register_core_endpoint(
138
            "auth/users/<uid>", self._update_user, methods=["PATCH"]
139
        )
140
141 1
    def _authenticate_user(self):
142
        """Authenticate a user using Storehouse."""
143 1
        username = request.authorization["username"]
144 1
        password = request.authorization["password"].encode()
145 1
        try:
146 1
            user = self._find_user(username)[0].get("data")
147 1
            if user.get("password") != hashlib.sha512(password).hexdigest():
148
                raise KeyError
149 1
            time_exp = datetime.datetime.utcnow() + datetime.timedelta(
150
                minutes=10
151
            )
152 1
            token = self._generate_token(username, time_exp)
153 1
            return {"token": token.decode()}, HTTPStatus.OK.value
154
        except (AttributeError, KeyError) as exc:
155
            result = f"Incorrect username or password: {exc}"
156
            return result, HTTPStatus.UNAUTHORIZED.value
157
158 1 View Code Duplication
    def _find_user(self, uid):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
159
        """Find a specific user using Storehouse."""
160 1
        response = {}
161
162 1
        def _find_user_callback(_event, box, error):
163
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable response does not seem to be defined.
Loading history...
164 1
            if error or not box:
165
                response = {
166
                    "answer": "User data cannot be shown",
167
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
168
                }
169
            else:
170 1
                response = {
171
                    "answer": {"data": box.data},
172
                    "code": HTTPStatus.OK.value,
173
                }
174
175 1
        content = {
176
            "box_id": uid,
177
            "namespace": self.namespace,
178
            "callback": _find_user_callback,
179
        }
180 1
        event = KytosEvent(name="kytos.storehouse.retrieve", content=content)
181 1
        self.controller.buffers.app.put(event)
182 1
        while True:
183 1
            time.sleep(0.1)
184 1
            if response:
185 1
                break
186 1
        return response["answer"], response["code"]
187
188 1
    @authenticated
189
    def _list_user(self, uid):
190
        """List a specific user using Storehouse."""
191
        answer, code = self._find_user(uid)
192
        if code == HTTPStatus.OK.value:
193
            del answer['data']['password']
194
        return answer, code
195
196 1 View Code Duplication
    @authenticated
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
197
    def _list_users(self):
198
        """List all users using Storehouse."""
199 1
        response = {}
200
201 1
        def _list_users_callback(_event, boxes, error):
202
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable response does not seem to be defined.
Loading history...
203 1
            if error:
204
                response = {
205
                    "answer": "Users cannot be listed",
206
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
207
                }
208
            else:
209 1
                response = {
210
                    "answer": {"users": boxes},
211
                    "code": HTTPStatus.OK.value,
212
                }
213
214 1
        content = {
215
            "namespace": self.namespace,
216
            "callback": _list_users_callback,
217
        }
218 1
        event = KytosEvent(name="kytos.storehouse.list", content=content)
219 1
        self.controller.buffers.app.put(event)
220 1
        while True:
221 1
            time.sleep(0.1)
222 1
            if response:
223 1
                break
224 1
        return response["answer"], response["code"]
225
226 1
    @authenticated
227
    def _create_user(self):
228
        """Save a user using Storehouse."""
229 1
        response = {}
230
231 1
        def _create_user_callback(_event, box, error):
232
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable response does not seem to be defined.
Loading history...
233 1
            if error or not box:
234 1
                response = {
235
                    "answer": "User cannot be created",
236
                    "code": HTTPStatus.CONFLICT.value,
237
                }
238
            else:
239 1
                response = {
240
                    "answer": "User successfully created",
241
                    "code": HTTPStatus.OK.value,
242
                }
243
244 1
        req = request.json
245 1
        password = req["password"].encode()
246 1
        data = {
247
            "username": req["username"],
248
            "email": req["email"],
249
            "password": hashlib.sha512(password).hexdigest(),
250
        }
251 1
        content = {
252
            "namespace": self.namespace,
253
            "box_id": data["username"],
254
            "data": data,
255
            "callback": _create_user_callback,
256
        }
257 1
        event = KytosEvent(name="kytos.storehouse.create", content=content)
258 1
        self.controller.buffers.app.put(event)
259 1
        while True:
260 1
            time.sleep(0.1)
261 1
            if response:
262 1
                break
263 1
        return response["answer"], response["code"]
264
265 1 View Code Duplication
    @authenticated
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
266
    def _delete_user(self, uid):
267
        """Delete a user using Storehouse."""
268
        response = {}
269
270
        def _delete_user_callback(_event, box, error):
271
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable response does not seem to be defined.
Loading history...
272
            if error or not box:
273
                response = {
274
                    "answer": "User has not been deleted",
275
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
276
                }
277
            else:
278
                response = {
279
                    "answer": "User successfully deleted",
280
                    "code": HTTPStatus.OK.value,
281
                }
282
283
        content = {
284
            "box_id": uid,
285
            "namespace": self.namespace,
286
            "callback": _delete_user_callback,
287
        }
288
        event = KytosEvent(name="kytos.storehouse.delete", content=content)
289
        self.controller.buffers.app.put(event)
290
        while True:
291
            time.sleep(0.1)
292
            if response:
293
                break
294
        return response["answer"], response["code"]
295
296 1
    @authenticated
297
    def _update_user(self, uid):
298
        """Update user data using Storehouse."""
299
        response = {}
300
301
        def _update_user_callback(_event, box, error):
302
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable response does not seem to be defined.
Loading history...
303
            if error or not box:
304
                response = {
305
                    "answer": "User has not been updated",
306
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
307
                }
308
            else:
309
                response = {
310
                    "answer": "User successfully updated",
311
                    "code": HTTPStatus.OK.value,
312
                }
313
314
        req = request.json
315
        allowed = ["username", "email", "password"]
316
317
        data = {}
318
        for key, value in req.items():
319
            if key in allowed:
320
                data[key] = value
321
322
        content = {
323
            "namespace": self.namespace,
324
            "box_id": uid,
325
            "data": data,
326
            "callback": _update_user_callback,
327
        }
328
        event = KytosEvent(name="kytos.storehouse.update", content=content)
329
        self.controller.buffers.app.put(event)
330
        while True:
331
            time.sleep(0.1)
332
            if response:
333
                break
334
        return response["answer"], response["code"]
335