Passed
Push — master ( 2c72f4...de2b0f )
by Humberto
02:01 queued 11s
created

tests/unit/test_core/test_auth.py (4 issues)

1
"""Test kytos.core.auth module."""
2
import asyncio
3
import base64
4
import hashlib
5
from unittest import TestCase
6
from unittest.mock import Mock, patch
7
8
from kytos.core import Controller
9
from kytos.core.auth import Auth
10
from kytos.core.config import KytosConfig
11
12
KYTOS_CORE_API = "http://127.0.0.1:8181/api/kytos/"
13
API_URI = KYTOS_CORE_API+"core"
14
STOREHOUSE_API_URI = KYTOS_CORE_API+"storehouse/v1/kytos.core.auth.users"
15
16
17
# pylint: disable=unused-argument
18
class TestAuth(TestCase):
19
    """Auth tests."""
20
21
    def setUp(self):
22
        """Instantiate a controller and an Auth."""
23
        self.patched_events = []  # {'event_name': box_object}
24
        self.server_name_url = 'http://localhost:8181/api/kytos'
25
        self.controller = self._get_controller_mock()
26
        self.auth = Auth(self.controller)
27
        self.username, self.password = self._create_super_user()
28
        self.token = self._get_token()
29
        self.user_data = {
30
            "username": "authtempuser",
31
            "email": "[email protected]",
32
            "password": "password",
33
        }
34
35
    def _patch_event_trigger(self, event):
36
        """Patch event callback trigger."""
37
        for patched_event in self.patched_events:
38
            box = patched_event.get(event.content.get('callback').__name__)
39
            event.content.get('callback')(None, box, None)
40
41
    def _get_controller_mock(self):
42
        """Return a controller mock."""
43
        loop = asyncio.new_event_loop()
44
        asyncio.set_event_loop(None)
45
        options = KytosConfig().options['daemon']
46
        options.jwt_secret = 'jwt_secret'
47
48
        controller = Controller(options, loop=loop)
49
        controller.log = Mock()
50
51
        # Patch event callback trigger.
52
        controller.buffers.app.put = self._patch_event_trigger
53
54
        return controller
55
56
    @staticmethod
57
    def get_auth_test_client(auth):
58
        """Return a flask api test client."""
59
        return auth.controller.api_server.app.test_client()
60
61
    @patch('kytos.core.auth.Auth._create_superuser')
62
    def _create_super_user(self, mock_username=None):
63
        """Create a superuser to integration test."""
64
        username = "test"
65
        password = "password"
66
        email = "[email protected]"
67
68
        mock_username.return_value.get_username.return_value = username
69
        mock_username.return_value.get_email.return_value = email
70
        self.auth._create_superuser()  # pylint: disable=protected-access
71
72
        return username, password
73
74
    @patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc")
75
    def _get_token(self, mock_jwt_secret=None):
76
        """Make a request to get a token to be used in tests."""
77
        box = Mock()
78
        box.data = {
79
            # "password" digested
80
            'password': 'b109f3bbbc244eb82441917ed06d618b9008dd09b3befd1b5e073'
81
                        '94c706a8bb980b1d7785e5976ec049b46df5f1326af5a2ea6d103'
82
                        'fd07c95385ffab0cacbc86'
83
        }
84
        header = {
85
            "Authorization": "Basic "
86
            + base64.b64encode(
87
                bytes(self.username + ":" + self.password, "ascii")
88
            ).decode("ascii")
89
        }
90
        # Patch _find_user_callback event callback.
91
        self.patched_events.append({'_find_user_callback': box})
92
        url = "%s/auth/login/" % API_URI
93
        api = self.get_auth_test_client(self.auth)
94
        success_response = api.open(url, method='GET', headers=header)
95
96
        json_response = success_response.json
97
        return json_response["token"]
98
99
    def _validate_schema(self, my_dict, check_against):
100
        """Check if a dict respects a given schema."""
101
        for key, value in check_against.items():
102
            if isinstance(value, dict):
103
                return self._validate_schema(my_dict[key], value)
104
            if not isinstance(my_dict[key], value):
105
                return False
106
        return True
107
108
    @patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc")
109
    def test_01_login_request(self, mock_jwt_secret):
110
        """Test auth login endpoint."""
111
        valid_header = {
112
            "Authorization": "Basic "
113
            + base64.b64encode(
114
                bytes(self.username + ":" + self.password, "ascii")
115
            ).decode("ascii")
116
        }
117
        invalid_header = {
118
            "Authorization": "Basic "
119
            + base64.b64encode(
120
                bytes("nonexistent" + ":" + "nonexistent", "ascii")
121
            ).decode("ascii")
122
        }
123
        box = Mock()
124
        box.data = {
125
            # "password" digested
126
            'password': 'b109f3bbbc244eb82441917ed06d618b9008dd09b3befd1b5e073'
127
                        '94c706a8bb980b1d7785e5976ec049b46df5f1326af5a2ea6d103'
128
                        'fd07c95385ffab0cacbc86'
129
        }
130
        # Patch _find_user_callback event callback.
131
        self.patched_events.append({'_find_user_callback': box})
132
        url = "%s/auth/login/" % API_URI
133
        api = self.get_auth_test_client(self.auth)
134
        success_response = api.open(url, method='GET', headers=valid_header)
135
        error_response = api.open(url, method='GET', headers=invalid_header)
136
137
        self.assertEqual(success_response.status_code, 200)
138
        self.assertEqual(error_response.status_code, 401)
139
140
    @patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc")
141
    def test_02_list_users_request(self, mock_jwt_secret):
142
        """Test auth list users endpoint."""
143
        valid_header = {"Authorization": "Bearer %s" % self.token}
144
        invalid_header = {"Authorization": "Bearer invalidtoken"}
145
        schema = {"users": list}
146
        password = "password".encode()
147
        # Patch _list_users_callback event callback.
148
        event_boxes = [self.user_data,
149
                       {"username": "authtempuser2",
150
                        "email": "[email protected]",
151
                        "password": hashlib.sha512(password).hexdigest()}]
152
        self.patched_events.append({'_list_users_callback': event_boxes})
153
        api = self.get_auth_test_client(self.auth)
154
        url = "%s/auth/users/" % API_URI
155
        success_response = api.open(url, method='GET', headers=valid_header)
156
        error_response = api.open(url, method='GET', headers=invalid_header)
157
        is_valid = self._validate_schema(success_response.json, schema)
158
159
        self.assertEqual(success_response.status_code, 200)
160
        self.assertEqual(error_response.status_code, 401)
161
        self.assertTrue(is_valid)
162
163 View Code Duplication
    @patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc")
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
164
    def test_03_create_user_request(self, mock_jwt_secret):
165
        """Test auth create user endpoint."""
166
        header = {"Authorization": "Bearer %s" % self.token}
167
        # Patch _create_user_callback event callback.
168
        self.patched_events.append({'_create_user_callback': self.user_data})
169
        api = self.get_auth_test_client(self.auth)
170
        url = "%s/auth/users/" % API_URI
171
        success_response = api.open(url, method='POST', json=self.user_data,
172
                                    headers=header)
173
174
        self.assertEqual(success_response.status_code, 200)
175
176
    @patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc")
177
    def test_03_create_user_request_error(self, mock_jwt_secret):
178
        """Test auth create user endpoint."""
179
        header = {"Authorization": "Bearer %s" % self.token}
180
        # Patch _create_user_callback event callback.
181
        self.patched_events.append({'_create_user_callback': None})
182
        api = self.get_auth_test_client(self.auth)
183
        url = "%s/auth/users/" % API_URI
184
        error_response = api.open(url, method='POST', json=self.user_data,
185
                                  headers=header)
186
187
        self.assertEqual(error_response.status_code, 409)
188
189
    @patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc")
190
    def test_04_list_user_request(self, mock_jwt_secret):
191
        """Test auth list user endpoint."""
192
        valid_header = {"Authorization": "Bearer %s" % self.token}
193
        schema = {"data": {"email": str, "username": str}}
194
        box = Mock()
195
        box.data = self.user_data
196
        self.patched_events.append({'_find_user_callback': box})
197
        api = self.get_auth_test_client(self.auth)
198
        url = "%s/auth/users/%s" % (API_URI, self.user_data.get("username"))
199
        success_response = api.open(url, method='GET', headers=valid_header)
200
        is_valid = self._validate_schema(success_response.json, schema)
201
202
        self.assertEqual(success_response.status_code, 200)
203
        self.assertTrue(is_valid)
204
205 View Code Duplication
    @patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc")
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
206
    def test_04_list_user_request_error(self, mock_jwt_secret):
207
        """Test auth list user endpoint."""
208
        valid_header = {"Authorization": "Bearer %s" % self.token}
209
        self.patched_events.append({'_find_user_callback': None})
210
        api = self.get_auth_test_client(self.auth)
211
        url = "%s/auth/users/%s" % (API_URI, 'user3')
212
        error_response = api.open(url, method='GET', headers=valid_header)
213
214
        self.assertEqual(error_response.status_code, 404)
215
216
    @patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc")
217
    def test_05_update_user_request(self, mock_jwt_secret):
218
        """Test auth update user endpoint."""
219
        valid_header = {"Authorization": "Bearer %s" % self.token}
220
        data = {"email": "[email protected]"}
221
        self.patched_events.append({'_update_user_callback': data})
222
        api = self.get_auth_test_client(self.auth)
223
        url = "%s/auth/users/%s" % (API_URI, self.user_data.get("username"))
224
        success_response = api.open(url, method='PATCH', json=data,
225
                                    headers=valid_header)
226
227
        self.assertEqual(success_response.status_code, 200)
228
229 View Code Duplication
    @patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc")
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
230
    def test_05_update_user_request_error(self, mock_jwt_secret):
231
        """Test auth update user endpoint."""
232
        valid_header = {"Authorization": "Bearer %s" % self.token}
233
        self.patched_events.append({'_update_user_callback': None})
234
        api = self.get_auth_test_client(self.auth)
235
        url = "%s/auth/users/%s" % (API_URI, 'user5')
236
        error_response = api.open(url, method='PATCH', json={},
237
                                  headers=valid_header)
238
239
        self.assertEqual(error_response.status_code, 404)
240
241 View Code Duplication
    @patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc")
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
242
    def test_06_delete_user_request(self, mock_jwt_secret):
243
        """Test auth delete user endpoint."""
244
        header = {"Authorization": "Bearer %s" % self.token}
245
        # Patch _delete_user_callback event callback.
246
        self.patched_events.append({'_delete_user_callback': self.user_data})
247
        api = self.get_auth_test_client(self.auth)
248
        url = "%s/auth/users/%s" % (API_URI, self.user_data.get("username"))
249
        success_response = api.open(url, method='DELETE', headers=header)
250
251
        self.assertEqual(success_response.status_code, 200)
252
253
    @patch('kytos.core.auth.Auth.get_jwt_secret', return_value="abc")
254
    def test_06_delete_user_request_error(self, mock_jwt_secret):
255
        """Test auth delete user endpoint."""
256
        header = {"Authorization": "Bearer %s" % self.token}
257
        # Patch _delete_user_callback event callback.
258
        self.patched_events.append({'_delete_user_callback': None})
259
        api = self.get_auth_test_client(self.auth)
260
        url = "%s/auth/users/%s" % (API_URI, "nonexistent")
261
        success_response = api.open(url, method='DELETE', headers=header)
262
263
        self.assertEqual(success_response.status_code, 404)
264