Passed
Pull Request — master (#375)
by Vinicius
05:03
created

tests.unit.test_core.test_auth.TestAuth.setUp()   A

Complexity

Conditions 1

Size

Total Lines 16
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 14
nop 1
dl 0
loc 16
ccs 11
cts 11
cp 1
crap 1
rs 9.7
c 0
b 0
f 0
1
"""Test kytos.core.auth module."""
2 1
import base64
3
4 1
import pytest
5 1
from httpx import AsyncClient
6
# pylint: disable=no-name-in-module,attribute-defined-outside-init
7 1
from pydantic import BaseModel, ValidationError
8 1
from pymongo.errors import DuplicateKeyError
9
10 1
from kytos.core.auth import Auth
11 1
from kytos.core.rest_api import HTTPException
12 1
from kytos.core.user import UserDoc
13
14
15
# pylint: disable=unused-argument
16 1
class TestAuth:
17
    """Auth tests."""
18
19 1
    def setup_method(self):
20
        """Instantiate a controller and an Auth."""
21 1
        def hashing(password: bytes, _hash) -> str:
22 1
            if password == b"password":
23 1
                return "some_hash"
24 1
            return "wrong_hash"
25 1
        UserDoc.hashing = hashing
26 1
        self.username, self.password = ("test", "password")
27 1
        self.user_data = {
28
            "username": "authtempuser",
29
            "email": "[email protected]",
30
            "password": "password",
31
        }
32
33 1
    async def auth_headers(self, auth: Auth, api_client: AsyncClient) -> dict:
34
        """Get Authorization headers with token."""
35 1
        token = await self._get_token(auth, api_client)
36 1
        return {"Authorization": f"Bearer {token}"}
37
38 1
    async def _get_token(self, auth: Auth, api_client: AsyncClient) -> str:
39
        """Make a request to get a token to be used in tests."""
40 1
        valid_header = {
41
            "Authorization": "Basic "
42
            + base64.b64encode(
43
                bytes(self.username + ":" + self.password, "ascii")
44
            ).decode("ascii")
45
        }
46 1
        user_dict = {"state": "active", "password": "some_hash", "hash": {}}
47 1
        auth.user_controller.get_user.return_value = user_dict
48 1
        endpoint = "kytos/core/auth/login"
49 1
        resp = await api_client.get(endpoint, headers=valid_header)
50 1
        assert resp.status_code == 200
51 1
        token = resp.json()["token"]
52 1
        return token
53
54 1
    def _validate_schema(self, my_dict, check_against):
55
        """Check if a dict respects a given schema."""
56 1
        for key, value in check_against.items():
57 1
            if isinstance(value, dict):
58
                return self._validate_schema(my_dict[key], value)
59 1
            if not isinstance(my_dict[key], value):
60
                return False
61 1
        return True
62
63 1
    async def test_01_login_request(self, auth, api_client, monkeypatch):
64
        """Test auth login endpoint."""
65 1
        valid_header = {
66
            "Authorization": "Basic "
67
            + base64.b64encode(
68
                bytes(self.username + ":" + self.password, "ascii")
69
            ).decode("ascii")
70
        }
71 1
        invalid_header = {
72
            "Authorization": "Basic "
73
            + base64.b64encode(
74
                bytes("nonexistent" + ":" + "nonexistent", "ascii")
75
            ).decode("ascii")
76
        }
77 1
        user_dict = {"state": "active", "password": "some_hash", "hash": {}}
78 1
        auth.user_controller.get_user.return_value = user_dict
79 1
        endpoint = "kytos/core/auth/login"
80 1
        resp = await api_client.get(endpoint, headers=valid_header)
81 1
        assert resp.json()["token"]
82 1
        assert resp.status_code == 200
83 1
        resp = await api_client.get(endpoint, headers=invalid_header)
84 1
        assert resp.status_code == 401
85 1
        assert "Incorrect password" in resp.text
86 1
        resp = await api_client.get(endpoint)
87 1
        assert "Authorization header missing" in resp.text
88
89 1
    async def test_02_list_users_request(self, auth, api_client):
90
        """Test auth list users endpoint."""
91 1
        invalid_header = {"Authorization": "Bearer invalidtoken"}
92 1
        schema = {"users": list}
93 1
        response = {'users': [
94
                        self.user_data['username'],
95
                        {"username": "authtempuser2"}
96
                    ]}
97 1
        auth.user_controller.get_users.return_value = response
98 1
        endpoint = "kytos/core/auth/users"
99 1
        headers = await self.auth_headers(auth, api_client)
100 1
        resp = await api_client.get(endpoint, headers=headers)
101 1
        assert resp.status_code == 200
102 1
        assert self._validate_schema(resp.json(), schema)
103 1
        resp = await api_client.get(endpoint, headers=invalid_header)
104 1
        assert resp.status_code == 401
105
106 1
    async def test_03_create_user_request(self, auth, api_client):
107
        """Test auth create user endpoint."""
108 1
        endpoint = "kytos/core/auth/users"
109 1
        headers = await self.auth_headers(auth, api_client)
110 1
        resp = await api_client.post(endpoint, json=self.user_data,
111
                                     headers=headers)
112 1
        assert resp.status_code == 201
113 1
        assert "User successfully created" in resp.json()
114
115 1
    async def test_03_create_user_request_conflict(self, auth, api_client):
116
        """Test auth create user endpoint."""
117 1
        endpoint = "kytos/core/auth/users"
118 1
        auth.user_controller.create_user.side_effect = DuplicateKeyError("0")
119 1
        headers = await self.auth_headers(auth, api_client)
120 1
        resp = await api_client.post(endpoint, json=self.user_data,
121
                                     headers=headers)
122 1
        assert resp.status_code == 409
123 1
        assert "already exists" in resp.json()["description"]
124
125 1
    async def test_03_create_user_request_bad(self, auth, api_client):
126
        """Test auth create user endpoint."""
127 1
        exc = ValidationError('', BaseModel)
128 1
        auth.user_controller.create_user.side_effect = exc
129 1
        endpoint = "kytos/core/auth/users"
130 1
        headers = await self.auth_headers(auth, api_client)
131 1
        resp = await api_client.post(endpoint, json=self.user_data,
132
                                     headers=headers)
133 1
        assert resp.status_code == 400
134
135 1
    async def test_04_list_user_request(self, auth, api_client):
136
        """Test auth list user endpoint."""
137 1
        schema = {"email": str, "username": str}
138 1
        auth.user_controller.get_user_nopw.return_value = self.user_data
139 1
        endpoint = f"kytos/core/auth/users/{self.user_data['username']}"
140 1
        headers = await self.auth_headers(auth, api_client)
141 1
        resp = await api_client.get(endpoint, headers=headers)
142 1
        assert resp.status_code == 200
143 1
        assert self._validate_schema(resp.json(), schema)
144
145 1
    async def test_04_list_user_request_error(self, auth, api_client):
146
        """Test auth list user endpoint."""
147 1
        auth.user_controller.get_user_nopw.return_value = None
148 1
        endpoint = "kytos/core/auth/users/user3"
149 1
        headers = await self.auth_headers(auth, api_client)
150 1
        resp = await api_client.get(endpoint, headers=headers)
151 1
        assert resp.status_code == 404
152 1
        assert "not found" in resp.json()["description"]
153
154 1
    async def test_05_update_user_request(self, auth, api_client):
155
        """Test auth update user endpoint."""
156 1
        data = {"email": "[email protected]"}
157 1
        endpoint = f"kytos/core/auth/users/{self.user_data['username']}"
158 1
        headers = await self.auth_headers(auth, api_client)
159 1
        resp = await api_client.patch(endpoint, json=data,
160
                                      headers=headers)
161 1
        assert resp.status_code == 200
162
163 1
    async def test_05_update_user_request_not_found(self, auth, api_client):
164
        """Test auth update user endpoint."""
165 1
        data = {"email": "[email protected]"}
166 1
        auth.user_controller.update_user.return_value = {}
167 1
        endpoint = "kytos/core/auth/users/user5"
168 1
        headers = await self.auth_headers(auth, api_client)
169 1
        resp = await api_client.patch(endpoint, json=data,
170
                                      headers=headers)
171 1
        assert resp.status_code == 404
172 1
        assert "not found" in resp.json()["description"]
173
174 1
    async def test_05_update_user_request_bad(self, auth, api_client):
175
        """Test auth update user endpoint"""
176 1
        exc = ValidationError('', BaseModel)
177 1
        auth.user_controller.update_user.side_effect = exc
178 1
        endpoint = "kytos/core/auth/users/user5"
179 1
        headers = await self.auth_headers(auth, api_client)
180 1
        resp = await api_client.patch(endpoint, json={},
181
                                      headers=headers)
182 1
        assert resp.status_code == 400
183
184 1
    async def test_05_update_user_request_conflict(self, auth, api_client):
185
        """Test auth update user endpoint"""
186 1
        auth.user_controller.update_user.side_effect = DuplicateKeyError("0")
187 1
        endpoint = "kytos/core/auth/users/user5"
188 1
        headers = await self.auth_headers(auth, api_client)
189 1
        resp = await api_client.patch(endpoint, json={},
190
                                      headers=headers)
191 1
        assert resp.status_code == 409
192
193 1
    async def test_06_delete_user_request(self, auth, api_client):
194
        """Test auth delete user endpoint."""
195 1
        endpoint = f"kytos/core/auth/users/{self.user_data['username']}"
196 1
        headers = await self.auth_headers(auth, api_client)
197 1
        resp = await api_client.delete(endpoint, headers=headers)
198 1
        assert resp.status_code == 200
199
200 1
    async def test_06_delete_user_request_error(self, auth, api_client):
201
        """Test auth delete user endpoint."""
202 1
        auth.user_controller.delete_user.return_value = {}
203 1
        endpoint = "kytos/core/auth/users/nonexistent"
204 1
        headers = await self.auth_headers(auth, api_client)
205 1
        resp = await api_client.delete(endpoint, headers=headers)
206 1
        assert resp.status_code == 404
207
208 1
    def test_07_find_user_error(self, auth):
209
        """Test _find_user not found."""
210 1
        auth.user_controller.get_user.return_value = None
211 1
        with pytest.raises(HTTPException):
212
            # pylint: disable=protected-access
213 1
            auth._find_user("name")
214
215 1
    def test_08_error_msg(self, auth):
216
        """Test error_msg"""
217
        # ValidationErro mocked response
218 1
        error_list = [{'loc': ('username', ), 'msg': 'mock_msg_1'},
219
                      {'loc': ('email', ), 'msg': 'mock_msg_2'}]
220 1
        actual_msg = auth.error_msg(error_list)
221 1
        expected_msg = 'username: mock_msg_1; email: mock_msg_2'
222
        assert actual_msg == expected_msg
223