Test Failed
Pull Request — master (#966)
by Gleyberson
02:35 queued 30s
created

kytos.core.auth.Auth.register_core_auth_services()   A

Complexity

Conditions 1

Size

Total Lines 24
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 13
nop 1
dl 0
loc 24
ccs 7
cts 7
cp 1
crap 1
rs 9.75
c 0
b 0
f 0
1
"""Module with main classes related to Authentication."""
2 1
import datetime
3 1
import getpass
4 1
import hashlib
5 1
import logging
6 1
import time
7 1
from functools import wraps
8 1
from http import HTTPStatus
9
10 1
import jwt
11 1
from flask import jsonify, request
12
13 1
from kytos.core.config import KytosConfig
14 1
from kytos.core.events import KytosEvent
15
16 1
__all__ = ['authenticated']
17
18 1
LOG = logging.getLogger(__name__)
19
20
21 1
def authenticated(func):
22
    """Handle tokens from requests."""
23 1
    @wraps(func)
24
    def wrapper(*args, **kwargs):
25
        """Verify the requires of token."""
26
        try:
27
            content = request.headers.get("Authorization")
28
            if content is None:
29
                raise AttributeError
30
            token = content.split("Bearer ")[1]
31
            jwt.decode(token, key=Auth.get_jwt_secret())
32
        except (
33
            AttributeError,
34
            jwt.ExpiredSignature,
35
            jwt.exceptions.DecodeError,
36
        ):
37
            msg = "Token not sent or expired."
38
            return jsonify({"error": msg}), 401
39
        return func(*args, **kwargs)
40
41 1
    return wrapper
42
43
44 1
class Auth:
45
    """Module used to provide Kytos authentication routes."""
46
47 1
    def __init__(self, controller):
48
        """Init method of Auth class takes the parameters below.
49
50
        Args:
51
            controller(kytos.core.controller): A Controller instance.
52
53
        """
54 1
        self.controller = controller
55 1
        self.namespace = "kytos.core.auth.users"
56 1
        if self.controller.options.create_superuser:
57 1
            self._create_superuser()
58
59 1
    @classmethod
60
    def get_jwt_secret(cls):
61
        """Return JWT secret defined in kytos conf."""
62
        options = KytosConfig().options['daemon']
63
        return options.jwt_secret
64
65 1
    @classmethod
66
    def _generate_token(cls, username, time_exp):
67
        """Generate a jwt token."""
68
        return jwt.encode(
69
            {
70
                'username': username,
71
                'iss': "Kytos NApps Server",
72
                'exp': time_exp,
73
            },
74
            Auth.get_jwt_secret(),
75
            algorithm='HS256',
76
        )
77
78 1
    def _create_superuser(self):
79
        """Create a superuser using Storehouse."""
80 1
        def _create_superuser_callback(_event, box, error):
81
            if box and not error:
82
                LOG.info("Superuser successfully created")
83
84 1
        username = input("Username: ")
85
        email = input("Email: ")
86
        pprompt = lambda: (getpass.getpass(),
87
                           getpass.getpass('Retype password: '))
88
89
        while True:
90
            password, re_password = pprompt()
91
            if password == re_password:
92
                break
93
            print('Passwords do not match. Try again')
94
95
        user = {
96
            "username": username,
97
            "email": email,
98
            "password": hashlib.sha512(password.encode()).hexdigest(),
99
        }
100
        content = {
101
            "namespace": self.namespace,
102
            "box_id": user["username"],
103
            "data": user,
104
            "callback": _create_superuser_callback,
105
        }
106
        event = KytosEvent(name="kytos.storehouse.create", content=content)
107
        self.controller.buffers.app.put(event)
108
109 1
    def register_core_auth_services(self):
110
        """
111
        Register /kytos/core/ services over authentication.
112
113
        It registers create, authenticate, list all, list specific, delete and
114
        update users.
115
        """
116 1
        self.controller.api_server.register_core_endpoint(
117
            "auth/login/", self._authenticate_user
118
        )
119 1
        self.controller.api_server.register_core_endpoint(
120
            "auth/users/", self._list_users
121
        )
122 1
        self.controller.api_server.register_core_endpoint(
123
            "auth/users/<uid>", self._list_user
124
        )
125 1
        self.controller.api_server.register_core_endpoint(
126
            "auth/users/", self._create_user, methods=["POST"]
127
        )
128 1
        self.controller.api_server.register_core_endpoint(
129
            "auth/users/<uid>", self._delete_user, methods=["DELETE"]
130
        )
131 1
        self.controller.api_server.register_core_endpoint(
132
            "auth/users/<uid>", self._update_user, methods=["PATCH"]
133
        )
134
135 1
    def _authenticate_user(self):
136
        """Authenticate a user using Storehouse."""
137
        username = request.authorization["username"]
138
        password = request.authorization["password"].encode()
139
        try:
140
            user = self._find_user(username)[0].get("data")
141
            if user.get("password") != hashlib.sha512(password).hexdigest():
142
                raise KeyError
143
            time_exp = datetime.datetime.utcnow() + datetime.timedelta(
144
                minutes=10
145
            )
146
            token = self._generate_token(username, time_exp)
147
            return {"token": token.decode()}, HTTPStatus.OK.value
148
        except KeyError:
149
            result = "Incorrect username or password"
150
            return result, HTTPStatus.UNAUTHORIZED.value
151
152 1 View Code Duplication
    def _find_user(self, uid):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
153
        """Find a specific user using Storehouse."""
154
        response = {}
155
156
        def _find_user_callback(_event, box, error):
157
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable response does not seem to be defined.
Loading history...
158
            if error or not box:
159
                response = {
160
                    "answer": "User data cannot be shown",
161
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
162
                }
163
            else:
164
                response = {
165
                    "answer": {"data": box.data},
166
                    "code": HTTPStatus.OK.value,
167
                }
168
169
        content = {
170
            "box_id": uid,
171
            "namespace": self.namespace,
172
            "callback": _find_user_callback,
173
        }
174
        event = KytosEvent(name="kytos.storehouse.retrieve", content=content)
175
        self.controller.buffers.app.put(event)
176
        while True:
177
            time.sleep(0.1)
178
            if response:
179
                break
180
        return response["answer"], response["code"]
181
182 1
    @authenticated
183
    def _list_user(self, uid):
184
        """List a specific user using Storehouse."""
185
        answer, code = self._find_user(uid)
186
        if code == HTTPStatus.OK.value:
187
            del answer['data']['password']
188
        return answer, code
189
190 1 View Code Duplication
    @authenticated
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
191
    def _list_users(self):
192
        """List all users using Storehouse."""
193
        response = {}
194
195
        def _list_users_callback(_event, boxes, error):
196
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable response does not seem to be defined.
Loading history...
197
            if error:
198
                response = {
199
                    "answer": "Users cannot be listed",
200
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
201
                }
202
            else:
203
                response = {
204
                    "answer": {"users": boxes},
205
                    "code": HTTPStatus.OK.value,
206
                }
207
208
        content = {
209
            "namespace": self.namespace,
210
            "callback": _list_users_callback,
211
        }
212
        event = KytosEvent(name="kytos.storehouse.list", content=content)
213
        self.controller.buffers.app.put(event)
214
        while True:
215
            time.sleep(0.1)
216
            if response:
217
                break
218
        return response["answer"], response["code"]
219
220 1
    @authenticated
221
    def _create_user(self):
222
        """Save a user using Storehouse."""
223
        response = {}
224
225
        def _create_user_callback(_event, box, error):
226
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable response does not seem to be defined.
Loading history...
227
            if error or not box:
228
                response = {
229
                    "answer": "User cannot be created",
230
                    "code": HTTPStatus.CONFLICT.value,
231
                }
232
            else:
233
                response = {
234
                    "answer": "User successfully created",
235
                    "code": HTTPStatus.OK.value,
236
                }
237
238
        req = request.json
239
        password = req["password"].encode()
240
        data = {
241
            "username": req["username"],
242
            "email": req["email"],
243
            "password": hashlib.sha512(password).hexdigest(),
244
        }
245
        content = {
246
            "namespace": self.namespace,
247
            "box_id": data["username"],
248
            "data": data,
249
            "callback": _create_user_callback,
250
        }
251
        event = KytosEvent(name="kytos.storehouse.create", content=content)
252
        self.controller.buffers.app.put(event)
253
        while True:
254
            time.sleep(0.1)
255
            if response:
256
                break
257
        return response["answer"], response["code"]
258
259 1 View Code Duplication
    @authenticated
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
260
    def _delete_user(self, uid):
261
        """Delete a user using Storehouse."""
262
        response = {}
263
264
        def _delete_user_callback(_event, box, error):
265
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable response does not seem to be defined.
Loading history...
266
            if error or not box:
267
                response = {
268
                    "answer": "User has not been deleted",
269
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
270
                }
271
            else:
272
                response = {
273
                    "answer": "User successfully deleted",
274
                    "code": HTTPStatus.OK.value,
275
                }
276
277
        content = {
278
            "box_id": uid,
279
            "namespace": self.namespace,
280
            "callback": _delete_user_callback,
281
        }
282
        event = KytosEvent(name="kytos.storehouse.delete", content=content)
283
        self.controller.buffers.app.put(event)
284
        while True:
285
            time.sleep(0.1)
286
            if response:
287
                break
288
        return response["answer"], response["code"]
289
290 1
    @authenticated
291
    def _update_user(self, uid):
292
        """Update user data using Storehouse."""
293
        response = {}
294
295
        def _update_user_callback(_event, box, error):
296
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable response does not seem to be defined.
Loading history...
297
            if error or not box:
298
                response = {
299
                    "answer": "User has not been updated",
300
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
301
                }
302
            else:
303
                response = {
304
                    "answer": "User successfully updated",
305
                    "code": HTTPStatus.OK.value,
306
                }
307
308
        req = request.json
309
        allowed = ["username", "email", "password"]
310
311
        data = {}
312
        for key, value in req.items():
313
            if key in allowed:
314
                data[key] = value
315
316
        content = {
317
            "namespace": self.namespace,
318
            "box_id": uid,
319
            "data": data,
320
            "callback": _update_user_callback,
321
        }
322
        event = KytosEvent(name="kytos.storehouse.update", content=content)
323
        self.controller.buffers.app.put(event)
324
        while True:
325
            time.sleep(0.1)
326
            if response:
327
                break
328
        return response["answer"], response["code"]
329