Passed
Pull Request — master (#966)
by Gleyberson
02:23
created

kytos.core.auth.Auth._create_user()   B

Complexity

Conditions 5

Size

Total Lines 38
Code Lines 30

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 25.8442

Importance

Changes 0
Metric Value
cc 5
eloc 30
nop 1
dl 0
loc 38
ccs 1
cts 17
cp 0.0588
crap 25.8442
rs 8.6933
c 0
b 0
f 0
1
"""Module with main classes related to Authentication."""
2 1
import datetime
3 1
import hashlib
4 1
import logging
5 1
import time
6 1
from functools import wraps
7 1
from http import HTTPStatus
8
9 1
import jwt
10 1
from flask import jsonify, request
11
12 1
from kytos.core.config import KytosConfig
13 1
from kytos.core.events import KytosEvent
14
15 1
__all__ = ['authenticated']
16
17 1
LOG = logging.getLogger(__name__)
18
19
20 1
def authenticated(func):
21
    """Handle tokens from requests."""
22 1
    @wraps(func)
23
    def wrapper(*args, **kwargs):
24
        """Verify the requires of token."""
25
        try:
26
            content = request.headers.get("Authorization")
27
            if content is None:
28
                raise AttributeError
29
            token = content.split("Bearer ")[1]
30
            jwt.decode(token, key=Auth.get_jwt_secret())
31
        except (
32
            AttributeError,
33
            jwt.ExpiredSignature,
34
            jwt.exceptions.DecodeError,
35
        ):
36
            msg = "Token not sent or expired."
37
            return jsonify({"error": msg}), 401
38
        return func(*args, **kwargs)
39
40 1
    return wrapper
41
42
43 1
class Auth:
44
    """Module used to provide Kytos authentication routes."""
45
46 1
    def __init__(self, controller):
47
        """Init method of Auth class takes the parameters below.
48
49
        Args:
50
            controller(kytos.core.controller): A Controller instance.
51
52
        """
53 1
        self.controller = controller
54 1
        self.namespace = "kytos.core.auth.users"
55 1
        if self.controller.options.create_superuser:
56 1
            self._create_superuser()
57
58 1
    @classmethod
59
    def get_jwt_secret(cls):
60
        """Return JWT secret defined in kytos conf."""
61
        options = KytosConfig().options['daemon']
62
        return options.jwt_secret
63
64 1
    @classmethod
65
    def _generate_token(cls, username, time_exp):
66
        """Generate a jwt token."""
67
        return jwt.encode(
68
            {
69
                'username': username,
70
                'iss': "Kytos NApps Server",
71
                'exp': time_exp,
72
            },
73
            Auth.get_jwt_secret(),
74
            algorithm='HS256',
75
        )
76
77 1
    def _create_superuser(self):
78
        """Create a superuser using Storehouse."""
79 1
        def _create_superuser_callback(_event, box, error):
80
            if box and not error:
81
                LOG.info("Superuser successfully created")
82
83 1
        username = "Username: "
84 1
        email = "Email: "
85 1
        password = "AAAA"
86
87 1
        user = {
88
            "username": username,
89
            "email": email,
90
            "password": hashlib.sha512(password.encode()).hexdigest(),
91
        }
92 1
        content = {
93
            "namespace": self.namespace,
94
            "box_id": user["username"],
95
            "data": user,
96
            "callback": _create_superuser_callback,
97
        }
98 1
        event = KytosEvent(name="kytos.storehouse.create", content=content)
99 1
        self.controller.buffers.app.put(event)
100
101 1
    def register_core_auth_services(self):
102
        """
103
        Register /kytos/core/ services over authentication.
104
105
        It registers create, authenticate, list all, list specific, delete and
106
        update users.
107
        """
108 1
        self.controller.api_server.register_core_endpoint(
109
            "auth/login/", self._authenticate_user
110
        )
111 1
        self.controller.api_server.register_core_endpoint(
112
            "auth/users/", self._list_users
113
        )
114 1
        self.controller.api_server.register_core_endpoint(
115
            "auth/users/<uid>", self._list_user
116
        )
117 1
        self.controller.api_server.register_core_endpoint(
118
            "auth/users/", self._create_user, methods=["POST"]
119
        )
120 1
        self.controller.api_server.register_core_endpoint(
121
            "auth/users/<uid>", self._delete_user, methods=["DELETE"]
122
        )
123 1
        self.controller.api_server.register_core_endpoint(
124
            "auth/users/<uid>", self._update_user, methods=["PATCH"]
125
        )
126
127 1
    def _authenticate_user(self):
128
        """Authenticate a user using Storehouse."""
129
        username = request.authorization["username"]
130
        password = request.authorization["password"].encode()
131
        try:
132
            user = self._find_user(username)[0].get("data")
133
            if user.get("password") != hashlib.sha512(password).hexdigest():
134
                raise KeyError
135
            time_exp = datetime.datetime.utcnow() + datetime.timedelta(
136
                minutes=10
137
            )
138
            token = self._generate_token(username, time_exp)
139
            return {"token": token.decode()}, HTTPStatus.OK.value
140
        except KeyError:
141
            result = "Incorrect username or password"
142
            return result, HTTPStatus.UNAUTHORIZED.value
143
144 1 View Code Duplication
    def _find_user(self, uid):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
145
        """Find a specific user using Storehouse."""
146
        response = {}
147
148
        def _find_user_callback(_event, box, error):
149
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable response does not seem to be defined.
Loading history...
150
            if error or not box:
151
                response = {
152
                    "answer": "User data cannot be shown",
153
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
154
                }
155
            else:
156
                response = {
157
                    "answer": {"data": box.data},
158
                    "code": HTTPStatus.OK.value,
159
                }
160
161
        content = {
162
            "box_id": uid,
163
            "namespace": self.namespace,
164
            "callback": _find_user_callback,
165
        }
166
        event = KytosEvent(name="kytos.storehouse.retrieve", content=content)
167
        self.controller.buffers.app.put(event)
168
        while True:
169
            time.sleep(0.1)
170
            if response:
171
                break
172
        return response["answer"], response["code"]
173
174 1
    @authenticated
175
    def _list_user(self, uid):
176
        """List a specific user using Storehouse."""
177
        answer, code = self._find_user(uid)
178
        if code == HTTPStatus.OK.value:
179
            del answer['data']['password']
180
        return answer, code
181
182 1 View Code Duplication
    @authenticated
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
183
    def _list_users(self):
184
        """List all users using Storehouse."""
185
        response = {}
186
187
        def _list_users_callback(_event, boxes, error):
188
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable response does not seem to be defined.
Loading history...
189
            if error:
190
                response = {
191
                    "answer": "Users cannot be listed",
192
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
193
                }
194
            else:
195
                response = {
196
                    "answer": {"users": boxes},
197
                    "code": HTTPStatus.OK.value,
198
                }
199
200
        content = {
201
            "namespace": self.namespace,
202
            "callback": _list_users_callback,
203
        }
204
        event = KytosEvent(name="kytos.storehouse.list", content=content)
205
        self.controller.buffers.app.put(event)
206
        while True:
207
            time.sleep(0.1)
208
            if response:
209
                break
210
        return response["answer"], response["code"]
211
212 1
    @authenticated
213
    def _create_user(self):
214
        """Save a user using Storehouse."""
215
        response = {}
216
217
        def _create_user_callback(_event, box, error):
218
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable response does not seem to be defined.
Loading history...
219
            if error or not box:
220
                response = {
221
                    "answer": "User cannot be created",
222
                    "code": HTTPStatus.CONFLICT.value,
223
                }
224
            else:
225
                response = {
226
                    "answer": "User successfully created",
227
                    "code": HTTPStatus.OK.value,
228
                }
229
230
        req = request.json
231
        password = req["password"].encode()
232
        data = {
233
            "username": req["username"],
234
            "email": req["email"],
235
            "password": hashlib.sha512(password).hexdigest(),
236
        }
237
        content = {
238
            "namespace": self.namespace,
239
            "box_id": data["username"],
240
            "data": data,
241
            "callback": _create_user_callback,
242
        }
243
        event = KytosEvent(name="kytos.storehouse.create", content=content)
244
        self.controller.buffers.app.put(event)
245
        while True:
246
            time.sleep(0.1)
247
            if response:
248
                break
249
        return response["answer"], response["code"]
250
251 1 View Code Duplication
    @authenticated
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
252
    def _delete_user(self, uid):
253
        """Delete a user using Storehouse."""
254
        response = {}
255
256
        def _delete_user_callback(_event, box, error):
257
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable response does not seem to be defined.
Loading history...
258
            if error or not box:
259
                response = {
260
                    "answer": "User has not been deleted",
261
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
262
                }
263
            else:
264
                response = {
265
                    "answer": "User successfully deleted",
266
                    "code": HTTPStatus.OK.value,
267
                }
268
269
        content = {
270
            "box_id": uid,
271
            "namespace": self.namespace,
272
            "callback": _delete_user_callback,
273
        }
274
        event = KytosEvent(name="kytos.storehouse.delete", content=content)
275
        self.controller.buffers.app.put(event)
276
        while True:
277
            time.sleep(0.1)
278
            if response:
279
                break
280
        return response["answer"], response["code"]
281
282 1
    @authenticated
283
    def _update_user(self, uid):
284
        """Update user data using Storehouse."""
285
        response = {}
286
287
        def _update_user_callback(_event, box, error):
288
            nonlocal response
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable response does not seem to be defined.
Loading history...
289
            if error or not box:
290
                response = {
291
                    "answer": "User has not been updated",
292
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
293
                }
294
            else:
295
                response = {
296
                    "answer": "User successfully updated",
297
                    "code": HTTPStatus.OK.value,
298
                }
299
300
        req = request.json
301
        allowed = ["username", "email", "password"]
302
303
        data = {}
304
        for key, value in req.items():
305
            if key in allowed:
306
                data[key] = value
307
308
        content = {
309
            "namespace": self.namespace,
310
            "box_id": uid,
311
            "data": data,
312
            "callback": _update_user_callback,
313
        }
314
        event = KytosEvent(name="kytos.storehouse.update", content=content)
315
        self.controller.buffers.app.put(event)
316
        while True:
317
            time.sleep(0.1)
318
            if response:
319
                break
320
        return response["answer"], response["code"]
321