Test Failed
Pull Request — master (#375)
by Vinicius
05:01
created

tests.unit.test_core.test_auth   A

Complexity

Total Complexity 24

Size/Duplication

Total Lines 223
Duplicated Lines 0 %

Test Coverage

Coverage 98.6%

Importance

Changes 0
Metric Value
eloc 171
dl 0
loc 223
rs 10
c 0
b 0
f 0
ccs 141
cts 143
cp 0.986
wmc 24

19 Methods

Rating   Name   Duplication   Size   Complexity  
A TestAuth.auth_headers() 0 4 1
A TestAuth.test_05_update_user_request_not_found() 0 10 1
A TestAuth.test_03_create_user_request_bad() 0 9 1
A TestAuth.test_01_login_request() 0 25 1
A TestAuth.test_05_update_user_request_conflict() 0 8 1
A TestAuth.test_07_find_user_error() 0 6 2
A TestAuth.test_04_list_user_request_error() 0 8 1
A TestAuth.test_02_list_users_request() 0 16 1
A TestAuth.test_03_create_user_request_conflict() 0 9 1
A TestAuth._validate_schema() 0 8 4
A TestAuth.test_06_delete_user_request() 0 6 1
A TestAuth.setup_method() 0 12 2
A TestAuth._get_token() 0 15 1
A TestAuth.test_08_error_msg() 0 8 1
A TestAuth.test_05_update_user_request() 0 8 1
A TestAuth.test_05_update_user_request_bad() 0 9 1
A TestAuth.test_04_list_user_request() 0 9 1
A TestAuth.test_03_create_user_request() 0 8 1
A TestAuth.test_06_delete_user_request_error() 0 7 1
1
"""Test kytos.core.auth module."""
2 1
import base64
3 1
4 1
import pytest
5
from httpx import AsyncClient
6
# pylint: disable=no-name-in-module,attribute-defined-outside-init
7 1
from pydantic import BaseModel, ValidationError
8 1
from pymongo.errors import DuplicateKeyError
9 1
10
from kytos.core.auth import Auth
11 1
from kytos.core.rest_api import HTTPException
12 1
from kytos.core.user import UserDoc
13 1
14 1
15
# pylint: disable=unused-argument
16 1
class TestAuth:
17 1
    """Auth tests."""
18
19
    def setup_method(self):
20
        """Instantiate a controller and an Auth."""
21 1
        def hashing(password: bytes, _hash) -> str:
22
            if password == b"password":
23
                return "some_hash"
24 1
            return "wrong_hash"
25
        UserDoc.hashing = hashing
26 1
        self.username, self.password = ("test", "password")
27 1
        self.user_data = {
28 1
            "username": "authtempuser",
29 1
            "email": "[email protected]",
30 1
            "password": "password",
31 1
        }
32 1
33 1
    async def auth_headers(self, auth: Auth, api_client: AsyncClient) -> dict:
34 1
        """Get Authorization headers with token."""
35
        token = await self._get_token(auth, api_client)
36
        return {"Authorization": f"Bearer {token}"}
37
38
    async def _get_token(self, auth: Auth, api_client: AsyncClient) -> str:
39 1
        """Make a request to get a token to be used in tests."""
40
        valid_header = {
41 1
            "Authorization": "Basic "
42
            + base64.b64encode(
43 1
                bytes(self.username + ":" + self.password, "ascii")
44 1
            ).decode("ascii")
45 1
        }
46 1
        user_dict = {"state": "active", "password": "some_hash", "hash": {}}
47 1
        auth.user_controller.get_user.return_value = user_dict
48 1
        endpoint = "kytos/core/auth/login"
49
        resp = await api_client.get(endpoint, headers=valid_header)
50 1
        assert resp.status_code == 200
51 1
        token = resp.json()["token"]
52
        return token
53 1
54
    def _validate_schema(self, my_dict, check_against):
55 1
        """Check if a dict respects a given schema."""
56
        for key, value in check_against.items():
57 1
            if isinstance(value, dict):
58 1
                return self._validate_schema(my_dict[key], value)
59 1
            if not isinstance(my_dict[key], value):
60
                return False
61 1
        return True
62 1
63 1
    async def test_01_login_request(self, auth, api_client, monkeypatch):
64
        """Test auth login endpoint."""
65 1
        valid_header = {
66
            "Authorization": "Basic "
67
            + base64.b64encode(
68
                bytes(self.username + ":" + self.password, "ascii")
69
            ).decode("ascii")
70
        }
71 1
        invalid_header = {
72
            "Authorization": "Basic "
73
            + base64.b64encode(
74
                bytes("nonexistent" + ":" + "nonexistent", "ascii")
75
            ).decode("ascii")
76
        }
77 1
        user_dict = {"state": "active", "password": "some_hash", "hash": {}}
78 1
        auth.user_controller.get_user.return_value = user_dict
79 1
        endpoint = "kytos/core/auth/login"
80 1
        resp = await api_client.get(endpoint, headers=valid_header)
81 1
        assert resp.json()["token"]
82
        assert resp.status_code == 200
83 1
        resp = await api_client.get(endpoint, headers=invalid_header)
84 1
        assert resp.status_code == 401
85
        assert "Incorrect password" in resp.text
86 1
        resp = await api_client.get(endpoint)
87
        assert "Authorization header missing" in resp.text
88 1
89 1
    async def test_02_list_users_request(self, auth, api_client):
90
        """Test auth list users endpoint."""
91 1
        invalid_header = {"Authorization": "Bearer invalidtoken"}
92
        schema = {"users": list}
93 1
        response = {'users': [
94
                        self.user_data['username'],
95 1
                        {"username": "authtempuser2"}
96 1
                    ]}
97 1
        auth.user_controller.get_users.return_value = response
98
        endpoint = "kytos/core/auth/users"
99 1
        headers = await self.auth_headers(auth, api_client)
100
        resp = await api_client.get(endpoint, headers=headers)
101
        assert resp.status_code == 200
102
        assert self._validate_schema(resp.json(), schema)
103
        resp = await api_client.get(endpoint, headers=invalid_header)
104
        assert resp.status_code == 401
105 1
106
    async def test_03_create_user_request(self, auth, api_client):
107
        """Test auth create user endpoint."""
108
        endpoint = "kytos/core/auth/users"
109
        headers = await self.auth_headers(auth, api_client)
110
        resp = await api_client.post(endpoint, json=self.user_data,
111 1
                                     headers=headers)
112 1
        assert resp.status_code == 201
113 1
        assert "User successfully created" in resp.json()
114 1
115 1
    async def test_03_create_user_request_conflict(self, auth, api_client):
116 1
        """Test auth create user endpoint."""
117 1
        endpoint = "kytos/core/auth/users"
118 1
        auth.user_controller.create_user.side_effect = DuplicateKeyError("0")
119 1
        headers = await self.auth_headers(auth, api_client)
120 1
        resp = await api_client.post(endpoint, json=self.user_data,
121 1
                                     headers=headers)
122 1
        assert resp.status_code == 409
123 1
        assert "already exists" in resp.json()["description"]
124
125 1
    async def test_03_create_user_request_bad(self, auth, api_client):
126 1
        """Test auth create user endpoint."""
127
        exc = ValidationError('', BaseModel)
128 1
        auth.user_controller.create_user.side_effect = exc
129 1
        endpoint = "kytos/core/auth/users"
130 1
        headers = await self.auth_headers(auth, api_client)
131
        resp = await api_client.post(endpoint, json=self.user_data,
132
                                     headers=headers)
133
        assert resp.status_code == 400
134 1
135 1
    async def test_04_list_user_request(self, auth, api_client):
136 1
        """Test auth list user endpoint."""
137 1
        schema = {"email": str, "username": str}
138
        auth.user_controller.get_user_nopw.return_value = self.user_data
139 1
        endpoint = f"kytos/core/auth/users/{self.user_data['username']}"
140 1
        headers = await self.auth_headers(auth, api_client)
141
        resp = await api_client.get(endpoint, headers=headers)
142 1
        assert resp.status_code == 200
143 1
        assert self._validate_schema(resp.json(), schema)
144 1
145
    async def test_04_list_user_request_error(self, auth, api_client):
146 1
        """Test auth list user endpoint."""
147 1
        auth.user_controller.get_user_nopw.return_value = None
148
        endpoint = "kytos/core/auth/users/user3"
149 1
        headers = await self.auth_headers(auth, api_client)
150 1
        resp = await api_client.get(endpoint, headers=headers)
151 1
        assert resp.status_code == 404
152
        assert "not found" in resp.json()["description"]
153 1
154
    async def test_05_update_user_request(self, auth, api_client):
155 1
        """Test auth update user endpoint."""
156 1
        data = {"email": "[email protected]"}
157
        endpoint = f"kytos/core/auth/users/{self.user_data['username']}"
158 1
        headers = await self.auth_headers(auth, api_client)
159 1
        resp = await api_client.patch(endpoint, json=data,
160 1
                                      headers=headers)
161 1
        assert resp.status_code == 200
162 1
163
    async def test_05_update_user_request_not_found(self, auth, api_client):
164 1
        """Test auth update user endpoint."""
165
        data = {"email": "[email protected]"}
166 1
        auth.user_controller.update_user.return_value = {}
167 1
        endpoint = "kytos/core/auth/users/user5"
168
        headers = await self.auth_headers(auth, api_client)
169 1
        resp = await api_client.patch(endpoint, json=data,
170 1
                                      headers=headers)
171 1
        assert resp.status_code == 404
172 1
        assert "not found" in resp.json()["description"]
173 1
174
    async def test_05_update_user_request_bad(self, auth, api_client):
175 1
        """Test auth update user endpoint"""
176
        exc = ValidationError('', BaseModel)
177 1
        auth.user_controller.update_user.side_effect = exc
178 1
        endpoint = "kytos/core/auth/users/user5"
179
        headers = await self.auth_headers(auth, api_client)
180 1
        resp = await api_client.patch(endpoint, json={},
181 1
                                      headers=headers)
182 1
        assert resp.status_code == 400
183 1
184 1
    async def test_05_update_user_request_conflict(self, auth, api_client):
185 1
        """Test auth update user endpoint"""
186
        auth.user_controller.update_user.side_effect = DuplicateKeyError("0")
187 1
        endpoint = "kytos/core/auth/users/user5"
188
        headers = await self.auth_headers(auth, api_client)
189 1
        resp = await api_client.patch(endpoint, json={},
190 1
                                      headers=headers)
191
        assert resp.status_code == 409
192 1
193 1
    async def test_06_delete_user_request(self, auth, api_client):
194
        """Test auth delete user endpoint."""
195 1
        endpoint = f"kytos/core/auth/users/{self.user_data['username']}"
196 1
        headers = await self.auth_headers(auth, api_client)
197 1
        resp = await api_client.delete(endpoint, headers=headers)
198 1
        assert resp.status_code == 200
199 1
200
    async def test_06_delete_user_request_error(self, auth, api_client):
201 1
        """Test auth delete user endpoint."""
202 1
        auth.user_controller.delete_user.return_value = {}
203
        endpoint = "kytos/core/auth/users/nonexistent"
204 1
        headers = await self.auth_headers(auth, api_client)
205 1
        resp = await api_client.delete(endpoint, headers=headers)
206 1
        assert resp.status_code == 404
207 1
208
    def test_07_find_user_error(self, auth):
209
        """Test _find_user not found."""
210 1
        auth.user_controller.get_user.return_value = None
211
        with pytest.raises(HTTPException):
212 1
            # pylint: disable=protected-access
213 1
            auth._find_user("name")
214
215 1
    def test_08_error_msg(self, auth):
216 1
        """Test error_msg"""
217 1
        # ValidationErro mocked response
218 1
        error_list = [{'loc': ('username', ), 'msg': 'mock_msg_1'},
219
                      {'loc': ('email', ), 'msg': 'mock_msg_2'}]
220
        actual_msg = auth.error_msg(error_list)
221 1
        expected_msg = 'username: mock_msg_1; email: mock_msg_2'
222
        assert actual_msg == expected_msg
223