tests.unit.test_core.test_auth   A
last analyzed

Complexity

Total Complexity 24

Size/Duplication

Total Lines 230
Duplicated Lines 0 %

Test Coverage

Coverage 89.19%

Importance

Changes 0
Metric Value
eloc 176
dl 0
loc 230
rs 10
c 0
b 0
f 0
ccs 132
cts 148
cp 0.8919
wmc 24

19 Methods

Rating   Name   Duplication   Size   Complexity  
A TestAuth.new_hashing() 0 6 2
A TestAuth.auth_headers() 0 5 1
A TestAuth.test_05_update_user_request_not_found() 0 11 1
A TestAuth.test_03_create_user_request_bad() 0 11 1
A TestAuth.test_01_login_request() 0 26 1
A TestAuth.test_05_update_user_request_conflict() 0 9 1
A TestAuth.test_07_find_user_error() 0 6 2
A TestAuth.test_03_create_user_request_conflict() 0 12 1
A TestAuth.test_04_list_user_request_error() 0 8 1
A TestAuth.test_02_list_users_request() 0 16 1
A TestAuth._validate_schema() 0 8 4
A TestAuth.test_06_delete_user_request() 0 6 1
A TestAuth.setup_method() 0 7 1
A TestAuth._get_token() 0 16 1
A TestAuth.test_05_update_user_request() 0 9 1
A TestAuth.test_05_update_user_request_bad() 0 10 1
A TestAuth.test_04_list_user_request() 0 9 1
A TestAuth.test_03_create_user_request() 0 9 1
A TestAuth.test_06_delete_user_request_error() 0 7 1
1
"""Test kytos.core.auth module."""
2 1
import asyncio
3 1
import base64
4 1
from unittest import mock
5
6 1
import pytest
7 1
from httpx import AsyncClient
8
# pylint: disable=no-name-in-module
9 1
from pydantic import ValidationError
10 1
from pymongo.errors import DuplicateKeyError
11
12 1
from kytos.core.auth import Auth
13 1
from kytos.core.rest_api import HTTPException
14
15
16
# pylint: disable=unused-argument
17 1
class TestAuth:
18
    """Auth tests."""
19
20 1
    def setup_method(self):
21
        """Instantiate a controller and an Auth."""
22 1
        self.username, self.password = ("test", "password")
23 1
        self.user_data = {
24
            "username": "authtempuser",
25
            "email": "[email protected]",
26
            "password": "password",
27
        }
28
29 1
    @staticmethod
30 1
    def new_hashing(password: bytes, _hash) -> str:
31
        """Use this method to mock auth.hashing."""
32 1
        if password == b"password":
33 1
            return "some_hash"
34 1
        return "wrong_hash"
35
36 1
    async def auth_headers(self, auth: Auth, api_client: AsyncClient) -> dict:
37
        """Get Authorization headers with token."""
38
        # pylint: disable=no-value-for-parameter
39 1
        token = await self._get_token(auth, api_client)
40 1
        return {"Authorization": f"Bearer {token}"}
41
42 1
    @mock.patch("kytos.core.auth.hashing", wraps=new_hashing)
43 1
    async def _get_token(self, auth: Auth, api_client: AsyncClient, _) -> str:
44
        """Make a request to get a token to be used in tests."""
45 1
        valid_header = {
46
            "Authorization": "Basic "
47
            + base64.b64encode(
48
                bytes(self.username + ":" + self.password, "ascii")
49
            ).decode("ascii")
50
        }
51 1
        user_dict = {"state": "active", "password": "some_hash", "hash": {}}
52 1
        auth.user_controller.get_user.return_value = user_dict
53 1
        endpoint = "kytos/core/auth/login"
54 1
        resp = await api_client.get(endpoint, headers=valid_header)
55 1
        assert resp.status_code == 200
56 1
        token = resp.json()["token"]
57 1
        return token
58
59 1
    def _validate_schema(self, my_dict, check_against):
60
        """Check if a dict respects a given schema."""
61 1
        for key, value in check_against.items():
62 1
            if isinstance(value, dict):
63
                return self._validate_schema(my_dict[key], value)
64 1
            if not isinstance(my_dict[key], value):
65
                return False
66 1
        return True
67
68 1
    @mock.patch("kytos.core.auth.hashing", wraps=new_hashing)
69 1
    async def test_01_login_request(self, _, auth, api_client, monkeypatch):
70
        """Test auth login endpoint."""
71 1
        valid_header = {
72
            "Authorization": "Basic "
73
            + base64.b64encode(
74
                bytes(self.username + ":" + self.password, "ascii")
75
            ).decode("ascii")
76
        }
77 1
        invalid_header = {
78
            "Authorization": "Basic "
79
            + base64.b64encode(
80
                bytes("nonexistent" + ":" + "nonexistent", "ascii")
81
            ).decode("ascii")
82
        }
83 1
        user_dict = {"state": "active", "password": "some_hash", "hash": {}}
84 1
        auth.user_controller.get_user.return_value = user_dict
85 1
        endpoint = "kytos/core/auth/login"
86 1
        resp = await api_client.get(endpoint, headers=valid_header)
87 1
        assert resp.json()["token"]
88 1
        assert resp.status_code == 200
89 1
        resp = await api_client.get(endpoint, headers=invalid_header)
90
        assert resp.status_code == 401
91
        assert "Incorrect password" in resp.text
92
        resp = await api_client.get(endpoint)
93
        assert "Authorization header missing" in resp.text
94
95 1
    async def test_02_list_users_request(self, auth, api_client):
96
        """Test auth list users endpoint."""
97 1
        invalid_header = {"Authorization": "Bearer invalidtoken"}
98 1
        schema = {"users": list}
99 1
        response = {'users': [
100
                        self.user_data['username'],
101
                        {"username": "authtempuser2"}
102
                    ]}
103 1
        auth.user_controller.get_users.return_value = response
104 1
        endpoint = "kytos/core/auth/users"
105 1
        headers = await self.auth_headers(auth, api_client)
106 1
        resp = await api_client.get(endpoint, headers=headers)
107 1
        assert resp.status_code == 200
108 1
        assert self._validate_schema(resp.json(), schema)
109 1
        resp = await api_client.get(endpoint, headers=invalid_header)
110 1
        assert resp.status_code == 401
111
112 1
    async def test_03_create_user_request(self, auth, api_client):
113
        """Test auth create user endpoint."""
114 1
        auth.controller.loop = asyncio.get_running_loop()
115 1
        endpoint = "kytos/core/auth/users"
116 1
        headers = await self.auth_headers(auth, api_client)
117 1
        resp = await api_client.post(endpoint, json=self.user_data,
118
                                     headers=headers)
119 1
        assert resp.status_code == 201
120 1
        assert "User successfully created" in resp.json()
121
122 1
    async def test_03_create_user_request_conflict(
123
        self, auth, api_client
124
    ):
125
        """Test auth create user endpoint."""
126 1
        auth.controller.loop = asyncio.get_running_loop()
127 1
        endpoint = "kytos/core/auth/users"
128 1
        auth.user_controller.create_user.side_effect = DuplicateKeyError("0")
129 1
        headers = await self.auth_headers(auth, api_client)
130 1
        resp = await api_client.post(endpoint, json=self.user_data,
131
                                     headers=headers)
132
        assert resp.status_code == 409
133
        assert "already exists" in resp.json()["description"]
134
135 1
    async def test_03_create_user_request_bad(
136
        self, auth, api_client
137
    ):
138
        """Test auth create user endpoint."""
139 1
        auth.controller.loop = asyncio.get_running_loop()
140 1
        data = "wrong_json"
141 1
        endpoint = "kytos/core/auth/users"
142 1
        headers = await self.auth_headers(auth, api_client)
143 1
        resp = await api_client.post(endpoint, json=data,
144
                                     headers=headers)
145
        assert resp.status_code == 400
146
147 1
    async def test_04_list_user_request(self, auth, api_client):
148
        """Test auth list user endpoint."""
149 1
        schema = {"email": str, "username": str}
150 1
        auth.user_controller.get_user_nopw.return_value = self.user_data
151 1
        endpoint = f"kytos/core/auth/users/{self.user_data['username']}"
152 1
        headers = await self.auth_headers(auth, api_client)
153 1
        resp = await api_client.get(endpoint, headers=headers)
154 1
        assert resp.status_code == 200
155 1
        assert self._validate_schema(resp.json(), schema)
156
157 1
    async def test_04_list_user_request_error(self, auth, api_client):
158
        """Test auth list user endpoint."""
159 1
        auth.user_controller.get_user_nopw.return_value = None
160 1
        endpoint = "kytos/core/auth/users/user3"
161 1
        headers = await self.auth_headers(auth, api_client)
162 1
        resp = await api_client.get(endpoint, headers=headers)
163
        assert resp.status_code == 404
164
        assert "not found" in resp.json()["description"]
165
166 1
    async def test_05_update_user_request(self, auth, api_client):
167
        """Test auth update user endpoint."""
168 1
        auth.controller.loop = asyncio.get_running_loop()
169 1
        data = {"email": "[email protected]"}
170 1
        endpoint = f"kytos/core/auth/users/{self.user_data['username']}"
171 1
        headers = await self.auth_headers(auth, api_client)
172 1
        resp = await api_client.patch(endpoint, json=data,
173
                                      headers=headers)
174 1
        assert resp.status_code == 200
175
176 1
    async def test_05_update_user_request_not_found(self, auth, api_client):
177
        """Test auth update user endpoint."""
178 1
        auth.controller.loop = asyncio.get_running_loop()
179 1
        data = {"email": "[email protected]"}
180 1
        auth.user_controller.update_user.return_value = {}
181 1
        endpoint = "kytos/core/auth/users/user5"
182 1
        headers = await self.auth_headers(auth, api_client)
183 1
        resp = await api_client.patch(endpoint, json=data,
184
                                      headers=headers)
185
        assert resp.status_code == 404
186
        assert "not found" in resp.json()["description"]
187
188 1
    async def test_05_update_user_request_bad(self, auth, api_client):
189
        """Test auth update user endpoint"""
190 1
        auth.controller.loop = asyncio.get_running_loop()
191 1
        exc = ValidationError.from_exception_data('', [])
192 1
        auth.user_controller.update_user.side_effect = exc
193 1
        endpoint = "kytos/core/auth/users/user5"
194 1
        headers = await self.auth_headers(auth, api_client)
195 1
        resp = await api_client.patch(endpoint, json={},
196
                                      headers=headers)
197
        assert resp.status_code == 400
198
199 1
    async def test_05_update_user_request_conflict(self, auth, api_client):
200
        """Test auth update user endpoint"""
201 1
        auth.controller.loop = asyncio.get_running_loop()
202 1
        auth.user_controller.update_user.side_effect = DuplicateKeyError("0")
203 1
        endpoint = "kytos/core/auth/users/user5"
204 1
        headers = await self.auth_headers(auth, api_client)
205 1
        resp = await api_client.patch(endpoint, json={},
206
                                      headers=headers)
207
        assert resp.status_code == 409
208
209 1
    async def test_06_delete_user_request(self, auth, api_client):
210
        """Test auth delete user endpoint."""
211 1
        endpoint = f"kytos/core/auth/users/{self.user_data['username']}"
212 1
        headers = await self.auth_headers(auth, api_client)
213 1
        resp = await api_client.delete(endpoint, headers=headers)
214 1
        assert resp.status_code == 200
215
216 1
    async def test_06_delete_user_request_error(self, auth, api_client):
217
        """Test auth delete user endpoint."""
218 1
        auth.user_controller.delete_user.return_value = {}
219 1
        endpoint = "kytos/core/auth/users/nonexistent"
220 1
        headers = await self.auth_headers(auth, api_client)
221 1
        resp = await api_client.delete(endpoint, headers=headers)
222
        assert resp.status_code == 404
223
224 1
    def test_07_find_user_error(self, auth):
225
        """Test _find_user not found."""
226 1
        auth.user_controller.get_user.return_value = None
227 1
        with pytest.raises(HTTPException):
228
            # pylint: disable=protected-access
229
            auth._find_user("name")
230