1
|
|
|
"""Test kytos.core.auth module.""" |
2
|
|
|
import asyncio |
3
|
|
|
import base64 |
4
|
|
|
import hashlib |
5
|
|
|
from unittest import TestCase |
6
|
|
|
from unittest.mock import Mock, patch |
7
|
|
|
|
8
|
|
|
from kytos.core import Controller |
9
|
|
|
from kytos.core.auth import Auth |
10
|
|
|
from kytos.core.config import KytosConfig |
11
|
|
|
|
12
|
|
|
KYTOS_CORE_API = "http://127.0.0.1:8181/api/kytos/" |
13
|
|
|
API_URI = KYTOS_CORE_API+"core" |
14
|
|
|
STOREHOUSE_API_URI = KYTOS_CORE_API+"storehouse/v1/kytos.core.auth.users" |
15
|
|
|
|
16
|
|
|
|
17
|
|
|
# pylint: disable=unused-argument |
18
|
|
|
class TestAuth(TestCase): |
19
|
|
|
"""Auth tests.""" |
20
|
|
|
|
21
|
|
|
def setUp(self): |
22
|
|
|
"""Instantiate a controller and an Auth.""" |
23
|
|
|
self.patched_events = [] # {'event_name': box_object} |
24
|
|
|
self.server_name_url = 'http://localhost:8181/api/kytos' |
25
|
|
|
self.controller = self._get_controller_mock() |
26
|
|
|
self.auth = Auth(self.controller) |
27
|
|
|
self.username, self.password = self._create_super_user() |
28
|
|
|
self.token = self._get_token() |
29
|
|
|
self.user_data = { |
30
|
|
|
"username": "authtempuser", |
31
|
|
|
"email": "[email protected]", |
32
|
|
|
"password": "password", |
33
|
|
|
} |
34
|
|
|
|
35
|
|
|
def _patch_event_trigger(self, event): |
36
|
|
|
"""Patch event callback trigger.""" |
37
|
|
|
for patched_event in self.patched_events: |
38
|
|
|
box = patched_event.get(event.content.get('callback').__name__) |
39
|
|
|
event.content.get('callback')(None, box, None) |
40
|
|
|
|
41
|
|
|
def _get_controller_mock(self): |
42
|
|
|
"""Return a controller mock.""" |
43
|
|
|
loop = asyncio.new_event_loop() |
44
|
|
|
asyncio.set_event_loop(None) |
45
|
|
|
options = KytosConfig().options['daemon'] |
46
|
|
|
options.jwt_secret = 'jwt_secret' |
47
|
|
|
|
48
|
|
|
controller = Controller(options, loop=loop) |
49
|
|
|
controller.log = Mock() |
50
|
|
|
|
51
|
|
|
# Patch event callback trigger. |
52
|
|
|
controller.buffers.app.put = self._patch_event_trigger |
53
|
|
|
|
54
|
|
|
return controller |
55
|
|
|
|
56
|
|
|
@staticmethod |
57
|
|
|
def get_auth_test_client(auth): |
58
|
|
|
"""Return a flask api test client.""" |
59
|
|
|
auth.controller.api_server.register_napp_endpoints(auth) |
60
|
|
|
return auth.controller.api_server.app.test_client() |
61
|
|
|
|
62
|
|
|
@patch('kytos.core.auth.Auth._create_superuser') |
63
|
|
|
def _create_super_user(self, mock_username=None): |
64
|
|
|
"""Create a superuser to integration test.""" |
65
|
|
|
username = "test" |
66
|
|
|
password = "password" |
67
|
|
|
email = "[email protected]" |
68
|
|
|
|
69
|
|
|
mock_username.return_value.get_username.return_value = username |
70
|
|
|
mock_username.return_value.get_email.return_value = email |
71
|
|
|
self.auth._create_superuser() # pylint: disable=protected-access |
72
|
|
|
|
73
|
|
|
return username, password |
74
|
|
|
|
75
|
|
|
@patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc") |
76
|
|
|
def _get_token(self, mock_jwt_secret=None): |
77
|
|
|
"""Make a request to get a token to be used in tests.""" |
78
|
|
|
box = Mock() |
79
|
|
|
box.data = { |
80
|
|
|
# "password" digested |
81
|
|
|
'password': 'b109f3bbbc244eb82441917ed06d618b9008dd09b3befd1b5e073' |
82
|
|
|
'94c706a8bb980b1d7785e5976ec049b46df5f1326af5a2ea6d103' |
83
|
|
|
'fd07c95385ffab0cacbc86' |
84
|
|
|
} |
85
|
|
|
header = { |
86
|
|
|
"Authorization": "Basic " |
87
|
|
|
+ base64.b64encode( |
88
|
|
|
bytes(self.username + ":" + self.password, "ascii") |
89
|
|
|
).decode("ascii") |
90
|
|
|
} |
91
|
|
|
# Patch _find_user_callback event callback. |
92
|
|
|
self.patched_events.append({'_find_user_callback': box}) |
93
|
|
|
url = "%s/auth/login/" % API_URI |
94
|
|
|
api = self.get_auth_test_client(self.auth) |
95
|
|
|
success_response = api.open(url, method='GET', headers=header) |
96
|
|
|
|
97
|
|
|
json_response = success_response.json |
98
|
|
|
return json_response["token"] |
99
|
|
|
|
100
|
|
|
def _validate_schema(self, my_dict, check_against): |
101
|
|
|
"""Check if a dict respects a given schema.""" |
102
|
|
|
for key, value in check_against.items(): |
103
|
|
|
if isinstance(value, dict): |
104
|
|
|
return self._validate_schema(my_dict[key], value) |
105
|
|
|
if not isinstance(my_dict[key], value): |
106
|
|
|
return False |
107
|
|
|
return True |
108
|
|
|
|
109
|
|
|
@patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc") |
110
|
|
|
def test_01_login_request(self, mock_jwt_secret): |
111
|
|
|
"""Test auth login endpoint.""" |
112
|
|
|
valid_header = { |
113
|
|
|
"Authorization": "Basic " |
114
|
|
|
+ base64.b64encode( |
115
|
|
|
bytes(self.username + ":" + self.password, "ascii") |
116
|
|
|
).decode("ascii") |
117
|
|
|
} |
118
|
|
|
invalid_header = { |
119
|
|
|
"Authorization": "Basic " |
120
|
|
|
+ base64.b64encode( |
121
|
|
|
bytes("nonexistent" + ":" + "nonexistent", "ascii") |
122
|
|
|
).decode("ascii") |
123
|
|
|
} |
124
|
|
|
box = Mock() |
125
|
|
|
box.data = { |
126
|
|
|
# "password" digested |
127
|
|
|
'password': 'b109f3bbbc244eb82441917ed06d618b9008dd09b3befd1b5e073' |
128
|
|
|
'94c706a8bb980b1d7785e5976ec049b46df5f1326af5a2ea6d103' |
129
|
|
|
'fd07c95385ffab0cacbc86' |
130
|
|
|
} |
131
|
|
|
# Patch _find_user_callback event callback. |
132
|
|
|
self.patched_events.append({'_find_user_callback': box}) |
133
|
|
|
url = "%s/auth/login/" % API_URI |
134
|
|
|
api = self.get_auth_test_client(self.auth) |
135
|
|
|
success_response = api.open(url, method='GET', headers=valid_header) |
136
|
|
|
error_response = api.open(url, method='GET', headers=invalid_header) |
137
|
|
|
|
138
|
|
|
self.assertEqual(success_response.status_code, 200) |
139
|
|
|
self.assertEqual(error_response.status_code, 401) |
140
|
|
|
|
141
|
|
|
@patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc") |
142
|
|
|
def test_02_list_users_request(self, mock_jwt_secret): |
143
|
|
|
"""Test auth list users endpoint.""" |
144
|
|
|
valid_header = {"Authorization": "Bearer %s" % self.token} |
145
|
|
|
invalid_header = {"Authorization": "Bearer invalidtoken"} |
146
|
|
|
schema = {"users": list} |
147
|
|
|
password = "password".encode() |
148
|
|
|
# Patch _list_users_callback event callback. |
149
|
|
|
event_boxes = [self.user_data, |
150
|
|
|
{"username": "authtempuser2", |
151
|
|
|
"email": "[email protected]", |
152
|
|
|
"password": hashlib.sha512(password).hexdigest()}] |
153
|
|
|
self.patched_events.append({'_list_users_callback': event_boxes}) |
154
|
|
|
api = self.get_auth_test_client(self.auth) |
155
|
|
|
url = "%s/auth/users/" % API_URI |
156
|
|
|
success_response = api.open(url, method='GET', headers=valid_header) |
157
|
|
|
error_response = api.open(url, method='GET', headers=invalid_header) |
158
|
|
|
is_valid = self._validate_schema(success_response.json, schema) |
159
|
|
|
|
160
|
|
|
self.assertEqual(success_response.status_code, 200) |
161
|
|
|
self.assertEqual(error_response.status_code, 401) |
162
|
|
|
self.assertTrue(is_valid) |
163
|
|
|
|
164
|
|
View Code Duplication |
@patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc") |
|
|
|
|
165
|
|
|
def test_03_create_user_request(self, mock_jwt_secret): |
166
|
|
|
"""Test auth create user endpoint.""" |
167
|
|
|
header = {"Authorization": "Bearer %s" % self.token} |
168
|
|
|
# Patch _create_user_callback event callback. |
169
|
|
|
self.patched_events.append({'_create_user_callback': self.user_data}) |
170
|
|
|
api = self.get_auth_test_client(self.auth) |
171
|
|
|
url = "%s/auth/users/" % API_URI |
172
|
|
|
success_response = api.open(url, method='POST', json=self.user_data, |
173
|
|
|
headers=header) |
174
|
|
|
|
175
|
|
|
self.assertEqual(success_response.status_code, 200) |
176
|
|
|
|
177
|
|
|
@patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc") |
178
|
|
|
def test_03_create_user_request_error(self, mock_jwt_secret): |
179
|
|
|
"""Test auth create user endpoint.""" |
180
|
|
|
header = {"Authorization": "Bearer %s" % self.token} |
181
|
|
|
# Patch _create_user_callback event callback. |
182
|
|
|
self.patched_events.append({'_create_user_callback': None}) |
183
|
|
|
api = self.get_auth_test_client(self.auth) |
184
|
|
|
url = "%s/auth/users/" % API_URI |
185
|
|
|
error_response = api.open(url, method='POST', json=self.user_data, |
186
|
|
|
headers=header) |
187
|
|
|
|
188
|
|
|
self.assertEqual(error_response.status_code, 409) |
189
|
|
|
|
190
|
|
|
@patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc") |
191
|
|
|
def test_04_list_user_request(self, mock_jwt_secret): |
192
|
|
|
"""Test auth list user endpoint.""" |
193
|
|
|
valid_header = {"Authorization": "Bearer %s" % self.token} |
194
|
|
|
schema = {"data": {"email": str, "username": str}} |
195
|
|
|
box = Mock() |
196
|
|
|
box.data = self.user_data |
197
|
|
|
self.patched_events.append({'_find_user_callback': box}) |
198
|
|
|
api = self.get_auth_test_client(self.auth) |
199
|
|
|
url = "%s/auth/users/%s" % (API_URI, self.user_data.get("username")) |
200
|
|
|
success_response = api.open(url, method='GET', headers=valid_header) |
201
|
|
|
is_valid = self._validate_schema(success_response.json, schema) |
202
|
|
|
|
203
|
|
|
self.assertEqual(success_response.status_code, 200) |
204
|
|
|
self.assertTrue(is_valid) |
205
|
|
|
|
206
|
|
View Code Duplication |
@patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc") |
|
|
|
|
207
|
|
|
def test_04_list_user_request_error(self, mock_jwt_secret): |
208
|
|
|
"""Test auth list user endpoint.""" |
209
|
|
|
valid_header = {"Authorization": "Bearer %s" % self.token} |
210
|
|
|
self.patched_events.append({'_find_user_callback': None}) |
211
|
|
|
api = self.get_auth_test_client(self.auth) |
212
|
|
|
url = "%s/auth/users/%s" % (API_URI, 'user3') |
213
|
|
|
error_response = api.open(url, method='GET', headers=valid_header) |
214
|
|
|
|
215
|
|
|
self.assertEqual(error_response.status_code, 404) |
216
|
|
|
|
217
|
|
|
@patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc") |
218
|
|
|
def test_05_update_user_request(self, mock_jwt_secret): |
219
|
|
|
"""Test auth update user endpoint.""" |
220
|
|
|
valid_header = {"Authorization": "Bearer %s" % self.token} |
221
|
|
|
data = {"email": "[email protected]"} |
222
|
|
|
self.patched_events.append({'_update_user_callback': data}) |
223
|
|
|
api = self.get_auth_test_client(self.auth) |
224
|
|
|
url = "%s/auth/users/%s" % (API_URI, self.user_data.get("username")) |
225
|
|
|
success_response = api.open(url, method='PATCH', json=data, |
226
|
|
|
headers=valid_header) |
227
|
|
|
|
228
|
|
|
self.assertEqual(success_response.status_code, 200) |
229
|
|
|
|
230
|
|
View Code Duplication |
@patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc") |
|
|
|
|
231
|
|
|
def test_05_update_user_request_error(self, mock_jwt_secret): |
232
|
|
|
"""Test auth update user endpoint.""" |
233
|
|
|
valid_header = {"Authorization": "Bearer %s" % self.token} |
234
|
|
|
self.patched_events.append({'_update_user_callback': None}) |
235
|
|
|
api = self.get_auth_test_client(self.auth) |
236
|
|
|
url = "%s/auth/users/%s" % (API_URI, 'user5') |
237
|
|
|
error_response = api.open(url, method='PATCH', json={}, |
238
|
|
|
headers=valid_header) |
239
|
|
|
|
240
|
|
|
self.assertEqual(error_response.status_code, 404) |
241
|
|
|
|
242
|
|
View Code Duplication |
@patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc") |
|
|
|
|
243
|
|
|
def test_06_delete_user_request(self, mock_jwt_secret): |
244
|
|
|
"""Test auth delete user endpoint.""" |
245
|
|
|
header = {"Authorization": "Bearer %s" % self.token} |
246
|
|
|
# Patch _delete_user_callback event callback. |
247
|
|
|
self.patched_events.append({'_delete_user_callback': self.user_data}) |
248
|
|
|
api = self.get_auth_test_client(self.auth) |
249
|
|
|
url = "%s/auth/users/%s" % (API_URI, self.user_data.get("username")) |
250
|
|
|
success_response = api.open(url, method='DELETE', headers=header) |
251
|
|
|
|
252
|
|
|
self.assertEqual(success_response.status_code, 200) |
253
|
|
|
|
254
|
|
|
@patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc") |
255
|
|
|
def test_06_delete_user_request_error(self, mock_jwt_secret): |
256
|
|
|
"""Test auth delete user endpoint.""" |
257
|
|
|
header = {"Authorization": "Bearer %s" % self.token} |
258
|
|
|
# Patch _delete_user_callback event callback. |
259
|
|
|
self.patched_events.append({'_delete_user_callback': None}) |
260
|
|
|
api = self.get_auth_test_client(self.auth) |
261
|
|
|
url = "%s/auth/users/%s" % (API_URI, "nonexistent") |
262
|
|
|
success_response = api.open(url, method='DELETE', headers=header) |
263
|
|
|
|
264
|
|
|
self.assertEqual(success_response.status_code, 404) |
265
|
|
|
|