Passed
Pull Request — master (#375)
by Vinicius
08:19
created

TestAuth.auth_headers()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 3
dl 0
loc 4
rs 10
c 0
b 0
f 0
ccs 3
cts 3
cp 1
crap 1
1
"""Test kytos.core.auth module."""
2 1
import base64
3
4 1
import pytest
5 1
from httpx import AsyncClient
6
# pylint: disable=no-name-in-module,attribute-defined-outside-init
7 1
from pydantic import BaseModel, ValidationError
8 1
from pymongo.errors import DuplicateKeyError
9
10 1
from kytos.core.auth import Auth
11 1
from kytos.core.rest_api import HTTPException
12 1
from kytos.core.user import UserDoc
13
14
15
# pylint: disable=unused-argument
16 1
class TestAuth:
17
    """Auth tests."""
18
19 1
    def setup_method(self):
20
        """Instantiate a controller and an Auth."""
21 1
        def hashing(password: bytes, _hash) -> str:
22 1
            if password == b"password":
23 1
                return "some_hash"
24 1
            return "wrong_hash"
25 1
        UserDoc.hashing = hashing
26 1
        self.username, self.password = ("test", "password")
27 1
        self.user_data = {
28
            "username": "authtempuser",
29
            "email": "[email protected]",
30
            "password": "password",
31
        }
32
33 1
    async def auth_headers(self, auth: Auth, api_client: AsyncClient) -> dict:
34
        """Get Authorization headers with token."""
35 1
        token = await self._get_token(auth, api_client)
36 1
        return {"Authorization": f"Bearer {token}"}
37
38 1
    async def _get_token(self, auth: Auth, api_client: AsyncClient) -> str:
39
        """Make a request to get a token to be used in tests."""
40 1
        valid_header = {
41
            "Authorization": "Basic "
42
            + base64.b64encode(
43
                bytes(self.username + ":" + self.password, "ascii")
44
            ).decode("ascii")
45
        }
46 1
        user_dict = {"state": "active", "password": "some_hash", "hash": {}}
47 1
        auth.user_controller.get_user.return_value = user_dict
48 1
        endpoint = "kytos/core/auth/login"
49 1
        resp = await api_client.get(endpoint, headers=valid_header)
50 1
        assert resp.status_code == 200
51 1
        token = resp.json()["token"]
52 1
        return token
53
54 1
    def _validate_schema(self, my_dict, check_against):
55
        """Check if a dict respects a given schema."""
56 1
        for key, value in check_against.items():
57 1
            if isinstance(value, dict):
58
                return self._validate_schema(my_dict[key], value)
59 1
            if not isinstance(my_dict[key], value):
60
                return False
61 1
        return True
62
63 1
    async def test_01_login_request(self, auth, api_client, monkeypatch):
64
        """Test auth login endpoint."""
65 1
        valid_header = {
66
            "Authorization": "Basic "
67
            + base64.b64encode(
68
                bytes(self.username + ":" + self.password, "ascii")
69
            ).decode("ascii")
70
        }
71 1
        invalid_header = {
72
            "Authorization": "Basic "
73
            + base64.b64encode(
74
                bytes("nonexistent" + ":" + "nonexistent", "ascii")
75
            ).decode("ascii")
76
        }
77 1
        user_dict = {"state": "active", "password": "some_hash", "hash": {}}
78 1
        auth.user_controller.get_user.return_value = user_dict
79 1
        endpoint = "kytos/core/auth/login"
80 1
        resp = await api_client.get(endpoint, headers=valid_header)
81 1
        assert resp.json()["token"]
82 1
        assert resp.status_code == 200
83 1
        resp = await api_client.get(endpoint, headers=invalid_header)
84 1
        assert resp.status_code == 401
85 1
        assert "Incorrect password" in resp.text
86 1
        resp = await api_client.get(endpoint)
87 1
        assert "Authorization header missing" in resp.text
88
89 1
    async def test_02_list_users_request(self, auth, api_client):
90
        """Test auth list users endpoint."""
91 1
        invalid_header = {"Authorization": "Bearer invalidtoken"}
92 1
        schema = {"users": list}
93 1
        response = {'users': [
94
                        self.user_data['username'],
95
                        {"username": "authtempuser2"}
96
                    ]}
97 1
        auth.user_controller.get_users.return_value = response
98 1
        endpoint = "kytos/core/auth/users"
99 1
        headers = await self.auth_headers(auth, api_client)
100 1
        resp = await api_client.get(endpoint, headers=headers)
101 1
        assert resp.status_code == 200
102 1
        assert self._validate_schema(resp.json(), schema)
103 1
        resp = await api_client.get(endpoint, headers=invalid_header)
104 1
        assert resp.status_code == 401
105
106 1
    async def test_03_create_user_request(self, auth, api_client, event_loop):
107
        """Test auth create user endpoint."""
108 1
        auth.controller.loop = event_loop
109 1
        endpoint = "kytos/core/auth/users"
110 1
        headers = await self.auth_headers(auth, api_client)
111 1
        resp = await api_client.post(endpoint, json=self.user_data,
112
                                     headers=headers)
113 1
        assert resp.status_code == 201
114 1
        assert "User successfully created" in resp.json()
115
116 1
    async def test_03_create_user_request_conflict(
117
        self, auth, api_client, event_loop
118
    ):
119
        """Test auth create user endpoint."""
120 1
        auth.controller.loop = event_loop
121 1
        endpoint = "kytos/core/auth/users"
122 1
        auth.user_controller.create_user.side_effect = DuplicateKeyError("0")
123 1
        headers = await self.auth_headers(auth, api_client)
124 1
        resp = await api_client.post(endpoint, json=self.user_data,
125
                                     headers=headers)
126 1
        assert resp.status_code == 409
127 1
        assert "already exists" in resp.json()["description"]
128
129 1
    async def test_03_create_user_request_bad(
130
        self, auth, api_client, event_loop
131
    ):
132
        """Test auth create user endpoint."""
133 1
        auth.controller.loop = event_loop
134 1
        exc = ValidationError('', BaseModel)
135 1
        auth.user_controller.create_user.side_effect = exc
136 1
        endpoint = "kytos/core/auth/users"
137 1
        headers = await self.auth_headers(auth, api_client)
138 1
        resp = await api_client.post(endpoint, json=self.user_data,
139
                                     headers=headers)
140 1
        assert resp.status_code == 400
141
142 1
    async def test_04_list_user_request(self, auth, api_client):
143
        """Test auth list user endpoint."""
144 1
        schema = {"email": str, "username": str}
145 1
        auth.user_controller.get_user_nopw.return_value = self.user_data
146 1
        endpoint = f"kytos/core/auth/users/{self.user_data['username']}"
147 1
        headers = await self.auth_headers(auth, api_client)
148 1
        resp = await api_client.get(endpoint, headers=headers)
149 1
        assert resp.status_code == 200
150 1
        assert self._validate_schema(resp.json(), schema)
151
152 1
    async def test_04_list_user_request_error(self, auth, api_client):
153
        """Test auth list user endpoint."""
154 1
        auth.user_controller.get_user_nopw.return_value = None
155 1
        endpoint = "kytos/core/auth/users/user3"
156 1
        headers = await self.auth_headers(auth, api_client)
157 1
        resp = await api_client.get(endpoint, headers=headers)
158 1
        assert resp.status_code == 404
159 1
        assert "not found" in resp.json()["description"]
160
161 1
    async def test_05_update_user_request(self, auth, api_client, event_loop):
162
        """Test auth update user endpoint."""
163 1
        auth.controller.loop = event_loop
164 1
        data = {"email": "[email protected]"}
165 1
        endpoint = f"kytos/core/auth/users/{self.user_data['username']}"
166 1
        headers = await self.auth_headers(auth, api_client)
167 1
        resp = await api_client.patch(endpoint, json=data,
168
                                      headers=headers)
169 1
        assert resp.status_code == 200
170
171 1
    async def test_05_update_user_request_not_found(self, auth, api_client,
172
                                                    event_loop):
173
        """Test auth update user endpoint."""
174 1
        auth.controller.loop = event_loop
175 1
        data = {"email": "[email protected]"}
176 1
        auth.user_controller.update_user.return_value = {}
177 1
        endpoint = "kytos/core/auth/users/user5"
178 1
        headers = await self.auth_headers(auth, api_client)
179 1
        resp = await api_client.patch(endpoint, json=data,
180
                                      headers=headers)
181 1
        assert resp.status_code == 404
182 1
        assert "not found" in resp.json()["description"]
183
184 1
    async def test_05_update_user_request_bad(self, auth, api_client,
185
                                              event_loop):
186
        """Test auth update user endpoint"""
187 1
        auth.controller.loop = event_loop
188 1
        exc = ValidationError('', BaseModel)
189 1
        auth.user_controller.update_user.side_effect = exc
190 1
        endpoint = "kytos/core/auth/users/user5"
191 1
        headers = await self.auth_headers(auth, api_client)
192 1
        resp = await api_client.patch(endpoint, json={},
193
                                      headers=headers)
194 1
        assert resp.status_code == 400
195
196 1
    async def test_05_update_user_request_conflict(self, auth, api_client,
197
                                                   event_loop):
198
        """Test auth update user endpoint"""
199 1
        auth.controller.loop = event_loop
200 1
        auth.user_controller.update_user.side_effect = DuplicateKeyError("0")
201 1
        endpoint = "kytos/core/auth/users/user5"
202 1
        headers = await self.auth_headers(auth, api_client)
203 1
        resp = await api_client.patch(endpoint, json={},
204
                                      headers=headers)
205 1
        assert resp.status_code == 409
206
207 1
    async def test_06_delete_user_request(self, auth, api_client):
208
        """Test auth delete user endpoint."""
209 1
        endpoint = f"kytos/core/auth/users/{self.user_data['username']}"
210 1
        headers = await self.auth_headers(auth, api_client)
211 1
        resp = await api_client.delete(endpoint, headers=headers)
212 1
        assert resp.status_code == 200
213
214 1
    async def test_06_delete_user_request_error(self, auth, api_client):
215
        """Test auth delete user endpoint."""
216 1
        auth.user_controller.delete_user.return_value = {}
217 1
        endpoint = "kytos/core/auth/users/nonexistent"
218 1
        headers = await self.auth_headers(auth, api_client)
219 1
        resp = await api_client.delete(endpoint, headers=headers)
220 1
        assert resp.status_code == 404
221
222 1
    def test_07_find_user_error(self, auth):
223
        """Test _find_user not found."""
224 1
        auth.user_controller.get_user.return_value = None
225 1
        with pytest.raises(HTTPException):
226
            # pylint: disable=protected-access
227 1
            auth._find_user("name")
228
229 1
    def test_08_error_msg(self, auth):
230
        """Test error_msg"""
231
        # ValidationErro mocked response
232 1
        error_list = [{'loc': ('username', ), 'msg': 'mock_msg_1'},
233
                      {'loc': ('email', ), 'msg': 'mock_msg_2'}]
234 1
        actual_msg = auth.error_msg(error_list)
235 1
        expected_msg = 'username: mock_msg_1; email: mock_msg_2'
236
        assert actual_msg == expected_msg
237