1 | """Test kytos.core.auth module.""" |
||
2 | import asyncio |
||
3 | import base64 |
||
4 | import hashlib |
||
5 | from unittest import TestCase |
||
6 | from unittest.mock import Mock, patch |
||
7 | |||
8 | from kytos.core import Controller |
||
9 | from kytos.core.auth import Auth |
||
10 | from kytos.core.config import KytosConfig |
||
11 | |||
12 | KYTOS_CORE_API = "http://127.0.0.1:8181/api/kytos/" |
||
13 | API_URI = KYTOS_CORE_API+"core" |
||
14 | STOREHOUSE_API_URI = KYTOS_CORE_API+"storehouse/v1/kytos.core.auth.users" |
||
15 | |||
16 | |||
17 | # pylint: disable=unused-argument |
||
18 | class TestAuth(TestCase): |
||
19 | """Auth tests.""" |
||
20 | |||
21 | def setUp(self): |
||
22 | """Instantiate a controller and an Auth.""" |
||
23 | self.patched_events = [] # {'event_name': box_object} |
||
24 | self.server_name_url = 'http://localhost:8181/api/kytos' |
||
25 | self.controller = self._get_controller_mock() |
||
26 | self.auth = Auth(self.controller) |
||
27 | self.username, self.password = self._create_super_user() |
||
28 | self.token = self._get_token() |
||
29 | self.user_data = { |
||
30 | "username": "authtempuser", |
||
31 | "email": "[email protected]", |
||
32 | "password": "password", |
||
33 | } |
||
34 | |||
35 | def _patch_event_trigger(self, event): |
||
36 | """Patch event callback trigger.""" |
||
37 | for patched_event in self.patched_events: |
||
38 | box = patched_event.get(event.content.get('callback').__name__) |
||
39 | event.content.get('callback')(None, box, None) |
||
40 | |||
41 | def _get_controller_mock(self): |
||
42 | """Return a controller mock.""" |
||
43 | loop = asyncio.new_event_loop() |
||
44 | asyncio.set_event_loop(None) |
||
45 | options = KytosConfig().options['daemon'] |
||
46 | options.jwt_secret = 'jwt_secret' |
||
47 | |||
48 | controller = Controller(options, loop=loop) |
||
49 | controller.log = Mock() |
||
50 | |||
51 | # Patch event callback trigger. |
||
52 | controller.buffers.app.put = self._patch_event_trigger |
||
53 | |||
54 | return controller |
||
55 | |||
56 | @staticmethod |
||
57 | def get_auth_test_client(auth): |
||
58 | """Return a flask api test client.""" |
||
59 | auth.controller.api_server.register_napp_endpoints(auth) |
||
60 | return auth.controller.api_server.app.test_client() |
||
61 | |||
62 | @patch('kytos.core.auth.Auth._create_superuser') |
||
63 | def _create_super_user(self, mock_username=None): |
||
64 | """Create a superuser to integration test.""" |
||
65 | username = "test" |
||
66 | password = "password" |
||
67 | email = "[email protected]" |
||
68 | |||
69 | mock_username.return_value.get_username.return_value = username |
||
70 | mock_username.return_value.get_email.return_value = email |
||
71 | self.auth._create_superuser() # pylint: disable=protected-access |
||
72 | |||
73 | return username, password |
||
74 | |||
75 | @patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc") |
||
76 | def _get_token(self, mock_jwt_secret=None): |
||
77 | """Make a request to get a token to be used in tests.""" |
||
78 | box = Mock() |
||
79 | box.data = { |
||
80 | # "password" digested |
||
81 | 'password': 'b109f3bbbc244eb82441917ed06d618b9008dd09b3befd1b5e073' |
||
82 | '94c706a8bb980b1d7785e5976ec049b46df5f1326af5a2ea6d103' |
||
83 | 'fd07c95385ffab0cacbc86' |
||
84 | } |
||
85 | header = { |
||
86 | "Authorization": "Basic " |
||
87 | + base64.b64encode( |
||
88 | bytes(self.username + ":" + self.password, "ascii") |
||
89 | ).decode("ascii") |
||
90 | } |
||
91 | # Patch _find_user_callback event callback. |
||
92 | self.patched_events.append({'_find_user_callback': box}) |
||
93 | url = "%s/auth/login/" % API_URI |
||
94 | api = self.get_auth_test_client(self.auth) |
||
95 | success_response = api.open(url, method='GET', headers=header) |
||
96 | |||
97 | json_response = success_response.json |
||
98 | return json_response["token"] |
||
99 | |||
100 | def _validate_schema(self, my_dict, check_against): |
||
101 | """Check if a dict respects a given schema.""" |
||
102 | for key, value in check_against.items(): |
||
103 | if isinstance(value, dict): |
||
104 | return self._validate_schema(my_dict[key], value) |
||
105 | if not isinstance(my_dict[key], value): |
||
106 | return False |
||
107 | return True |
||
108 | |||
109 | @patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc") |
||
110 | def test_01_login_request(self, mock_jwt_secret): |
||
111 | """Test auth login endpoint.""" |
||
112 | valid_header = { |
||
113 | "Authorization": "Basic " |
||
114 | + base64.b64encode( |
||
115 | bytes(self.username + ":" + self.password, "ascii") |
||
116 | ).decode("ascii") |
||
117 | } |
||
118 | invalid_header = { |
||
119 | "Authorization": "Basic " |
||
120 | + base64.b64encode( |
||
121 | bytes("nonexistent" + ":" + "nonexistent", "ascii") |
||
122 | ).decode("ascii") |
||
123 | } |
||
124 | box = Mock() |
||
125 | box.data = { |
||
126 | # "password" digested |
||
127 | 'password': 'b109f3bbbc244eb82441917ed06d618b9008dd09b3befd1b5e073' |
||
128 | '94c706a8bb980b1d7785e5976ec049b46df5f1326af5a2ea6d103' |
||
129 | 'fd07c95385ffab0cacbc86' |
||
130 | } |
||
131 | # Patch _find_user_callback event callback. |
||
132 | self.patched_events.append({'_find_user_callback': box}) |
||
133 | url = "%s/auth/login/" % API_URI |
||
134 | api = self.get_auth_test_client(self.auth) |
||
135 | success_response = api.open(url, method='GET', headers=valid_header) |
||
136 | error_response = api.open(url, method='GET', headers=invalid_header) |
||
137 | |||
138 | self.assertEqual(success_response.status_code, 200) |
||
139 | self.assertEqual(error_response.status_code, 401) |
||
140 | |||
141 | @patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc") |
||
142 | def test_02_list_users_request(self, mock_jwt_secret): |
||
143 | """Test auth list users endpoint.""" |
||
144 | valid_header = {"Authorization": "Bearer %s" % self.token} |
||
145 | invalid_header = {"Authorization": "Bearer invalidtoken"} |
||
146 | schema = {"users": list} |
||
147 | password = "password".encode() |
||
148 | # Patch _list_users_callback event callback. |
||
149 | event_boxes = [self.user_data, |
||
150 | {"username": "authtempuser2", |
||
151 | "email": "[email protected]", |
||
152 | "password": hashlib.sha512(password).hexdigest()}] |
||
153 | self.patched_events.append({'_list_users_callback': event_boxes}) |
||
154 | api = self.get_auth_test_client(self.auth) |
||
155 | url = "%s/auth/users/" % API_URI |
||
156 | success_response = api.open(url, method='GET', headers=valid_header) |
||
157 | error_response = api.open(url, method='GET', headers=invalid_header) |
||
158 | is_valid = self._validate_schema(success_response.json, schema) |
||
159 | |||
160 | self.assertEqual(success_response.status_code, 200) |
||
161 | self.assertEqual(error_response.status_code, 401) |
||
162 | self.assertTrue(is_valid) |
||
163 | |||
164 | View Code Duplication | @patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc") |
|
0 ignored issues
–
show
Duplication
introduced
by
Loading history...
|
|||
165 | def test_03_create_user_request(self, mock_jwt_secret): |
||
166 | """Test auth create user endpoint.""" |
||
167 | header = {"Authorization": "Bearer %s" % self.token} |
||
168 | # Patch _create_user_callback event callback. |
||
169 | self.patched_events.append({'_create_user_callback': self.user_data}) |
||
170 | api = self.get_auth_test_client(self.auth) |
||
171 | url = "%s/auth/users/" % API_URI |
||
172 | success_response = api.open(url, method='POST', json=self.user_data, |
||
173 | headers=header) |
||
174 | |||
175 | self.assertEqual(success_response.status_code, 200) |
||
176 | |||
177 | @patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc") |
||
178 | def test_03_create_user_request_error(self, mock_jwt_secret): |
||
179 | """Test auth create user endpoint.""" |
||
180 | header = {"Authorization": "Bearer %s" % self.token} |
||
181 | # Patch _create_user_callback event callback. |
||
182 | self.patched_events.append({'_create_user_callback': None}) |
||
183 | api = self.get_auth_test_client(self.auth) |
||
184 | url = "%s/auth/users/" % API_URI |
||
185 | error_response = api.open(url, method='POST', json=self.user_data, |
||
186 | headers=header) |
||
187 | |||
188 | self.assertEqual(error_response.status_code, 409) |
||
189 | |||
190 | @patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc") |
||
191 | def test_04_list_user_request(self, mock_jwt_secret): |
||
192 | """Test auth list user endpoint.""" |
||
193 | valid_header = {"Authorization": "Bearer %s" % self.token} |
||
194 | schema = {"data": {"email": str, "username": str}} |
||
195 | box = Mock() |
||
196 | box.data = self.user_data |
||
197 | self.patched_events.append({'_find_user_callback': box}) |
||
198 | api = self.get_auth_test_client(self.auth) |
||
199 | url = "%s/auth/users/%s" % (API_URI, self.user_data.get("username")) |
||
200 | success_response = api.open(url, method='GET', headers=valid_header) |
||
201 | is_valid = self._validate_schema(success_response.json, schema) |
||
202 | |||
203 | self.assertEqual(success_response.status_code, 200) |
||
204 | self.assertTrue(is_valid) |
||
205 | |||
206 | View Code Duplication | @patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc") |
|
0 ignored issues
–
show
|
|||
207 | def test_04_list_user_request_error(self, mock_jwt_secret): |
||
208 | """Test auth list user endpoint.""" |
||
209 | valid_header = {"Authorization": "Bearer %s" % self.token} |
||
210 | self.patched_events.append({'_find_user_callback': None}) |
||
211 | api = self.get_auth_test_client(self.auth) |
||
212 | url = "%s/auth/users/%s" % (API_URI, 'user3') |
||
213 | error_response = api.open(url, method='GET', headers=valid_header) |
||
214 | |||
215 | self.assertEqual(error_response.status_code, 404) |
||
216 | |||
217 | @patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc") |
||
218 | def test_05_update_user_request(self, mock_jwt_secret): |
||
219 | """Test auth update user endpoint.""" |
||
220 | valid_header = {"Authorization": "Bearer %s" % self.token} |
||
221 | data = {"email": "[email protected]"} |
||
222 | self.patched_events.append({'_update_user_callback': data}) |
||
223 | api = self.get_auth_test_client(self.auth) |
||
224 | url = "%s/auth/users/%s" % (API_URI, self.user_data.get("username")) |
||
225 | success_response = api.open(url, method='PATCH', json=data, |
||
226 | headers=valid_header) |
||
227 | |||
228 | self.assertEqual(success_response.status_code, 200) |
||
229 | |||
230 | View Code Duplication | @patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc") |
|
0 ignored issues
–
show
|
|||
231 | def test_05_update_user_request_error(self, mock_jwt_secret): |
||
232 | """Test auth update user endpoint.""" |
||
233 | valid_header = {"Authorization": "Bearer %s" % self.token} |
||
234 | self.patched_events.append({'_update_user_callback': None}) |
||
235 | api = self.get_auth_test_client(self.auth) |
||
236 | url = "%s/auth/users/%s" % (API_URI, 'user5') |
||
237 | error_response = api.open(url, method='PATCH', json={}, |
||
238 | headers=valid_header) |
||
239 | |||
240 | self.assertEqual(error_response.status_code, 404) |
||
241 | |||
242 | View Code Duplication | @patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc") |
|
0 ignored issues
–
show
|
|||
243 | def test_06_delete_user_request(self, mock_jwt_secret): |
||
244 | """Test auth delete user endpoint.""" |
||
245 | header = {"Authorization": "Bearer %s" % self.token} |
||
246 | # Patch _delete_user_callback event callback. |
||
247 | self.patched_events.append({'_delete_user_callback': self.user_data}) |
||
248 | api = self.get_auth_test_client(self.auth) |
||
249 | url = "%s/auth/users/%s" % (API_URI, self.user_data.get("username")) |
||
250 | success_response = api.open(url, method='DELETE', headers=header) |
||
251 | |||
252 | self.assertEqual(success_response.status_code, 200) |
||
253 | |||
254 | @patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc") |
||
255 | def test_06_delete_user_request_error(self, mock_jwt_secret): |
||
256 | """Test auth delete user endpoint.""" |
||
257 | header = {"Authorization": "Bearer %s" % self.token} |
||
258 | # Patch _delete_user_callback event callback. |
||
259 | self.patched_events.append({'_delete_user_callback': None}) |
||
260 | api = self.get_auth_test_client(self.auth) |
||
261 | url = "%s/auth/users/%s" % (API_URI, "nonexistent") |
||
262 | success_response = api.open(url, method='DELETE', headers=header) |
||
263 | |||
264 | self.assertEqual(success_response.status_code, 404) |
||
265 |