Test Failed
Pull Request — master (#966)
by Gleyberson
02:10
created

kytos.core.auth.Auth.get_jwt_secret()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.2963

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 1
dl 0
loc 5
ccs 1
cts 3
cp 0.3333
crap 1.2963
rs 10
c 0
b 0
f 0
1
"""Module with main classes related to Authentication."""
2 1
import datetime
3 1
import getpass
4 1
import hashlib
5 1
import logging
6 1
import time
7 1
from functools import wraps
8 1
from http import HTTPStatus
9
10 1
import jwt
11 1
from flask import jsonify, request
12
13 1
from kytos.core.config import KytosConfig
14 1
from kytos.core.events import KytosEvent
15
16 1
__all__ = ['authenticated']
17
18 1
LOG = logging.getLogger(__name__)
19
20
21 1
def authenticated(func):
22
    """Handle tokens from requests."""
23 1
    @wraps(func)
24
    def wrapper(*args, **kwargs):
25
        """Verify the requires of token."""
26
        try:
27
            content = request.headers.get("Authorization")
28
            if content is None:
29
                raise AttributeError
30
            token = content.split("Bearer ")[1]
31
            jwt.decode(token, key=Auth.get_jwt_secret())
32
        except (
33
            AttributeError,
34
            jwt.ExpiredSignature,
35
            jwt.exceptions.DecodeError,
36
        ):
37
            msg = "Token not sent or expired."
38
            return jsonify({"error": msg}), 401
39
        return func(*args, **kwargs)
40
41 1
    return wrapper
42
43
44 1
class Auth:
45
    """Module used to provide Kytos authentication routes."""
46
47 1
    def __init__(self, controller):
48
        """Init method of Auth class takes the parameters below.
49
50
        Args:
51
            controller(kytos.core.controller): A Controller instance.
52
53
        """
54 1
        self.controller = controller
55 1
        self.namespace = "kytos.core.auth.users"
56 1
        if self.controller.options.create_superuser:
57 1
            self._create_superuser()
58
59 1
    @classmethod
60
    def get_jwt_secret(cls):
61
        """Return JWT secret defined in kytos conf."""
62
        options = KytosConfig().options['daemon']
63
        return options.jwt_secret
64
65 1
    @classmethod
66
    def _generate_token(cls, username, time_exp):
67
        """Generate a jwt token."""
68
        return jwt.encode(
69
            {
70
                'username': username,
71
                'iss': "Kytos NApps Server",
72
                'exp': time_exp,
73
            },
74
            Auth.get_jwt_secret(),
75
            algorithm='HS256',
76
        )
77
78 1
    def _create_superuser(self):
79
        """Create a superuser using Storehouse."""
80 1
        def _create_superuser_callback(_event, box, error):
81
            if box and not error:
82
                LOG.info("Superuser successfully created")
83
84 1
        username = input("Username: ")
85
        email = input("Email: ")
86
        pprompt = lambda: (getpass.getpass(),
87
                           getpass.getpass('Retype password: '))
88
        while True:
89
            password, re_password = pprompt()
90
            if password == re_password:
91
                break
92
            print('Passwords do not match. Try again')
93
94
        user = {
95
            "username": username,
96
            "email": email,
97
            "password": hashlib.sha512(password.encode()).hexdigest(),
98
        }
99
        content = {
100
            "namespace": self.namespace,
101
            "box_id": user["username"],
102
            "data": user,
103
            "callback": _create_superuser_callback,
104
        }
105
        event = KytosEvent(name="kytos.storehouse.create", content=content)
106
        self.controller.buffers.app.put(event)
107
108 1
    def register_core_auth_services(self):
109
        """
110
        Register /kytos/core/ services over authentication.
111
112
        It registers create, authenticate, list all, list specific, delete and
113
        update users.
114
        """
115 1
        self.controller.api_server.register_core_endpoint(
116
            "auth/login/", self._authenticate_user
117
        )
118 1
        self.controller.api_server.register_core_endpoint(
119
            "auth/users/", self._list_users
120
        )
121 1
        self.controller.api_server.register_core_endpoint(
122
            "auth/users/<uid>", self._list_user
123
        )
124 1
        self.controller.api_server.register_core_endpoint(
125
            "auth/users/", self._create_user, methods=["POST"]
126
        )
127 1
        self.controller.api_server.register_core_endpoint(
128
            "auth/users/<uid>", self._delete_user, methods=["DELETE"]
129
        )
130 1
        self.controller.api_server.register_core_endpoint(
131
            "auth/users/<uid>", self._update_user, methods=["PATCH"]
132
        )
133
134 1
    def _authenticate_user(self):
135
        """Authenticate a user using Storehouse."""
136
        username = request.authorization["username"]
137
        password = request.authorization["password"].encode()
138
        try:
139
            user = self._find_user(username)[0].get("data")
140
            if user.get("password") != hashlib.sha512(password).hexdigest():
141
                raise KeyError
142
            time_exp = datetime.datetime.utcnow() + datetime.timedelta(
143
                minutes=10
144
            )
145
            token = self._generate_token(username, time_exp)
146
            return {"token": token.decode()}, HTTPStatus.OK.value
147
        except KeyError:
148
            result = "Incorrect username or password"
149
            return result, HTTPStatus.UNAUTHORIZED.value
150
151 1 View Code Duplication
    def _find_user(self, uid):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
152
        """Find a specific user using Storehouse."""
153
        response = {}
154
155
        def _find_user_callback(_event, box, error):
156
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable response does not seem to be defined.
Loading history...
157
            if error or not box:
158
                response = {
159
                    "answer": "User data cannot be shown",
160
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
161
                }
162
            else:
163
                response = {
164
                    "answer": {"data": box.data},
165
                    "code": HTTPStatus.OK.value,
166
                }
167
168
        content = {
169
            "box_id": uid,
170
            "namespace": self.namespace,
171
            "callback": _find_user_callback,
172
        }
173
        event = KytosEvent(name="kytos.storehouse.retrieve", content=content)
174
        self.controller.buffers.app.put(event)
175
        while True:
176
            time.sleep(0.1)
177
            if response:
178
                break
179
        return response["answer"], response["code"]
180
181 1
    @authenticated
182
    def _list_user(self, uid):
183
        """List a specific user using Storehouse."""
184
        answer, code = self._find_user(uid)
185
        if code == HTTPStatus.OK.value:
186
            del answer['data']['password']
187
        return answer, code
188
189 1 View Code Duplication
    @authenticated
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
190
    def _list_users(self):
191
        """List all users using Storehouse."""
192
        response = {}
193
194
        def _list_users_callback(_event, boxes, error):
195
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable response does not seem to be defined.
Loading history...
196
            if error:
197
                response = {
198
                    "answer": "Users cannot be listed",
199
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
200
                }
201
            else:
202
                response = {
203
                    "answer": {"users": boxes},
204
                    "code": HTTPStatus.OK.value,
205
                }
206
207
        content = {
208
            "namespace": self.namespace,
209
            "callback": _list_users_callback,
210
        }
211
        event = KytosEvent(name="kytos.storehouse.list", content=content)
212
        self.controller.buffers.app.put(event)
213
        while True:
214
            time.sleep(0.1)
215
            if response:
216
                break
217
        return response["answer"], response["code"]
218
219 1
    @authenticated
220
    def _create_user(self):
221
        """Save a user using Storehouse."""
222
        response = {}
223
224
        def _create_user_callback(_event, box, error):
225
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable response does not seem to be defined.
Loading history...
226
            if error or not box:
227
                response = {
228
                    "answer": "User cannot be created",
229
                    "code": HTTPStatus.CONFLICT.value,
230
                }
231
            else:
232
                response = {
233
                    "answer": "User successfully created",
234
                    "code": HTTPStatus.OK.value,
235
                }
236
237
        req = request.json
238
        password = req["password"].encode()
239
        data = {
240
            "username": req["username"],
241
            "email": req["email"],
242
            "password": hashlib.sha512(password).hexdigest(),
243
        }
244
        content = {
245
            "namespace": self.namespace,
246
            "box_id": data["username"],
247
            "data": data,
248
            "callback": _create_user_callback,
249
        }
250
        event = KytosEvent(name="kytos.storehouse.create", content=content)
251
        self.controller.buffers.app.put(event)
252
        while True:
253
            time.sleep(0.1)
254
            if response:
255
                break
256
        return response["answer"], response["code"]
257
258 1 View Code Duplication
    @authenticated
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
259
    def _delete_user(self, uid):
260
        """Delete a user using Storehouse."""
261
        response = {}
262
263
        def _delete_user_callback(_event, box, error):
264
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable response does not seem to be defined.
Loading history...
265
            if error or not box:
266
                response = {
267
                    "answer": "User has not been deleted",
268
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
269
                }
270
            else:
271
                response = {
272
                    "answer": "User successfully deleted",
273
                    "code": HTTPStatus.OK.value,
274
                }
275
276
        content = {
277
            "box_id": uid,
278
            "namespace": self.namespace,
279
            "callback": _delete_user_callback,
280
        }
281
        event = KytosEvent(name="kytos.storehouse.delete", content=content)
282
        self.controller.buffers.app.put(event)
283
        while True:
284
            time.sleep(0.1)
285
            if response:
286
                break
287
        return response["answer"], response["code"]
288
289 1
    @authenticated
290
    def _update_user(self, uid):
291
        """Update user data using Storehouse."""
292
        response = {}
293
294
        def _update_user_callback(_event, box, error):
295
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable response does not seem to be defined.
Loading history...
296
            if error or not box:
297
                response = {
298
                    "answer": "User has not been updated",
299
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
300
                }
301
            else:
302
                response = {
303
                    "answer": "User successfully updated",
304
                    "code": HTTPStatus.OK.value,
305
                }
306
307
        req = request.json
308
        allowed = ["username", "email", "password"]
309
310
        data = {}
311
        for key, value in req.items():
312
            if key in allowed:
313
                data[key] = value
314
315
        content = {
316
            "namespace": self.namespace,
317
            "box_id": uid,
318
            "data": data,
319
            "callback": _update_user_callback,
320
        }
321
        event = KytosEvent(name="kytos.storehouse.update", content=content)
322
        self.controller.buffers.app.put(event)
323
        while True:
324
            time.sleep(0.1)
325
            if response:
326
                break
327
        return response["answer"], response["code"]
328