Passed
Pull Request — master (#84)
by
unknown
01:55
created

tests.test_db   F

Complexity

Total Complexity 76

Size/Duplication

Total Lines 238
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 175
dl 0
loc 238
rs 2.32
c 0
b 0
f 0
wmc 76

39 Methods

Rating   Name   Duplication   Size   Complexity  
A TestDB.test_add_single_item_error() 0 3 2
A TestDB.test_release_db() 0 7 2
A TestDB.test_try_db_index_no_succes() 0 4 1
A TestDB.test_get_elem_pattern_by_index() 0 6 2
A TestDB.test_try_db_index_error() 0 4 2
A TestDB.test_get_pattern() 0 6 2
A TestDB.test_max_db_index_fail() 0 5 3
A TestDB.test_get_elem_pattern_by_index_error() 0 3 2
A TestDB.test_select_kb_error1() 0 3 2
A TestDB.test_set_single_item() 0 9 2
A TestDB.test_kb_connect() 0 5 3
A TestDB.test_get_list_item() 0 5 2
A TestDB.test_get_single_item() 0 5 2
A TestDB.test_get_status() 0 5 2
A TestDB.test_set_single_item_error() 0 3 2
A TestDB.test_rm_list_item_error1() 0 3 2
A TestDB.test_try_db_index_success() 0 4 1
A TestDB.test_add_single_item() 0 5 2
A TestDB.test_add_single_item_error1() 0 3 2
A TestDB.test_get_kb_context() 0 4 1
A TestDB.test_kb_new() 0 11 4
A TestDB.test_parse_openvas_db_addres() 0 3 2
A TestDB.test_rm_list_item() 0 5 2
A TestDB.test_get_list_item_fail() 0 3 2
A TestDB.setUp() 0 2 1
A TestDB.test_get_single_item_error() 0 3 2
A TestDB.test_set_single_item_error1() 0 3 2
A TestDB.test_get_etime() 0 5 2
A TestDB.test_get_result() 0 5 2
A TestDB.test_get_host_ip() 0 5 2
A TestDB.test_select_kb_error() 0 3 2
A TestDB.test_rm_list_item_error() 0 3 2
A TestDB.test_set_ctx_with_error() 0 3 2
A TestDB.test_set_ctx() 0 3 1
A TestDB.test_select_kb() 0 5 1
A TestDB.test_get_pattern_error() 0 3 2
A TestDB.test_kb_new_fail() 0 3 1
A TestDB.test_get_stime() 0 5 2
A TestDB.test_get_kb_context_fail() 0 4 3

How to fix   Complexity   

Complexity

Complex classes like tests.test_db often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018-2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-2.0-or-later
5
#
6
# This program is free software; you can redistribute it and/or
7
# modify it under the terms of the GNU General Public License
8
# as published by the Free Software Foundation; either version 2
9
# of the License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
20
""" Unit Test for ospd-openvas """
21
22
from unittest import TestCase
23
from unittest.mock import patch
24
from ospd_openvas.db import OpenvasDB
25
from ospd_openvas.errors import OSPDOpenvasError, RequiredArgument
26
from redis.exceptions import ConnectionError as RCE
27
28
29
@patch('ospd_openvas.db.redis.Redis')
30
class TestDB(TestCase):
31
    def setUp(self):
32
        self.db = OpenvasDB()
33
34
    def test_parse_openvas_db_addres(self, mock_redis):
35
        with self.assertRaises(OSPDOpenvasError):
36
            self.db._parse_openvas_db_address(b'somedata')
37
38
    def test_max_db_index_fail(self, mock_redis):
39
        mock_redis.config_get.return_value = {}
40
        with patch.object(OpenvasDB, 'kb_connect', return_value=mock_redis):
41
            with self.assertRaises(OSPDOpenvasError):
42
                self.db.max_db_index()
43
44
    def test_set_ctx_with_error(self, mock_redis):
45
        with self.assertRaises(RequiredArgument):
46
            self.db.set_redisctx(None)
47
48
    def test_set_ctx(self, mock_redis):
49
        self.db.set_redisctx(mock_redis)
50
        self.assertIs(self.db.rediscontext, mock_redis)
51
52
    def test_try_db_index_success(self, mock_redis):
53
        mock_redis.hsetnx.return_value = 1
54
        ret = self.db.try_database_index(mock_redis, 1)
55
        self.assertEqual(ret, True)
56
57
    def test_try_db_index_no_succes(self, mock_redis):
58
        mock_redis.hsetnx.return_value = 0
59
        ret = self.db.try_database_index(mock_redis, 1)
60
        self.assertEqual(ret, False)
61
62
    def test_try_db_index_error(self, mock_redis):
63
        mock_redis.hsetnx.side_effect = Exception
64
        with self.assertRaises(OSPDOpenvasError):
65
            self.db.try_database_index(mock_redis, 1)
66
67
    def test_kb_connect(self, mock_redis):
68
        mock_redis.side_effect = RCE
69
        with patch.object(OpenvasDB, 'get_db_connection', return_value=None):
70
            with self.assertRaises(OSPDOpenvasError):
71
                self.db.kb_connect()
72
73
    def test_kb_new_fail(self, mock_redis):
74
        ret = self.db.kb_new()
75
        self.assertEqual(ret, None)
76
77
    def test_kb_new(self, mock_redis):
78
        with patch.object(OpenvasDB, 'db_find', return_value=mock_redis):
79
            with patch.object(
80
                OpenvasDB, 'try_database_index', return_value=True
81
            ):
82
                with patch.object(
83
                    OpenvasDB, 'kb_connect', return_value=mock_redis
84
                ):
85
                    self.db.max_dbindex = 10
86
                    ret = self.db.kb_new()
87
        self.assertIs(ret, mock_redis)
88
89
    def test_get_kb_context(self, mock_redis):
90
        self.db.rediscontext = mock_redis
91
        ret = self.db.get_kb_context()
92
        self.assertIs(ret, mock_redis)
93
94
    def test_get_kb_context_fail(self, mock_redis):
95
        with patch.object(OpenvasDB, 'db_find', return_value=None):
96
            with self.assertRaises(OSPDOpenvasError):
97
                self.db.get_kb_context()
98
99
    def test_select_kb_error(self, mock_redis):
100
        with self.assertRaises(RequiredArgument):
101
            self.db.select_kb(None, 1)
102
103
    def test_select_kb_error1(self, mock_redis):
104
        with self.assertRaises(RequiredArgument):
105
            self.db.select_kb(mock_redis, None)
106
107
    def test_select_kb(self, mock_redis):
108
        mock_redis.execute_command.return_value = mock_redis
109
        self.db.select_kb(mock_redis, 1, True)
110
        self.assertEqual(self.db.db_index, '1')
111
        self.assertIs(self.db.rediscontext, mock_redis)
112
113
    def test_get_list_item_fail(self, mock_redis):
114
        with self.assertRaises(RequiredArgument):
115
            self.db.get_list_item(None)
116
117
    def test_get_list_item(self, mock_redis):
118
        mock_redis.lrange.return_value = ['1234']
119
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
120
            ret = self.db.get_list_item('name', ctx=None)
121
        self.assertEqual(ret, ['1234'])
122
123
    def test_rm_list_item(self, mock_redis):
124
        mock_redis.lrem.return_value = 1
125
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
126
            self.db.remove_list_item('name', '1234', ctx=None)
127
        mock_redis.lrem.assert_called_once_with('name', count=0, value='1234')
128
129
    def test_rm_list_item_error(self, mock_redis):
130
        with self.assertRaises(RequiredArgument):
131
            self.db.remove_list_item('1', None)
132
133
    def test_rm_list_item_error1(self, mock_redis):
134
        with self.assertRaises(RequiredArgument):
135
            self.db.remove_list_item(None, '1')
136
137
    def test_get_single_item_error(self, mock_redis):
138
        with self.assertRaises(RequiredArgument):
139
            self.db.get_single_item(None, '1')
140
141
    def test_get_single_item(self, mock_redis):
142
        mock_redis.lindex.return_value = 'a'
143
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
144
            self.db.get_single_item('a', ctx=None)
145
        mock_redis.lindex.assert_called_once_with('a', 0)
146
147
    def test_add_single_item(self, mock_redis):
148
        mock_redis.rpush.return_value = 1
149
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
150
            self.db.add_single_item('a', ['12'], ctx=None)
151
        mock_redis.rpush.assert_called_once_with('a', '12')
152
153
    def test_add_single_item_error(self, mock_redis):
154
        with self.assertRaises(RequiredArgument):
155
            self.db.add_single_item(None, '1')
156
157
    def test_add_single_item_error1(self, mock_redis):
158
        with self.assertRaises(RequiredArgument):
159
            self.db.add_single_item('1', None)
160
161
    def test_set_single_item_error(self, mock_redis):
162
        with self.assertRaises(RequiredArgument):
163
            self.db.set_single_item(None, '1')
164
165
    def test_set_single_item_error1(self, mock_redis):
166
        with self.assertRaises(RequiredArgument):
167
            self.db.set_single_item('1', None)
168
169
    def test_set_single_item(self, mock_redis):
170
        mock_redis.pipeline.return_value = mock_redis.pipeline
171
        mock_redis.pipeline.delete.return_value = None
172
        mock_redis.pipeline.rpush.return_value = None
173
        mock_redis.execute.return_value = None
174
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
175
            self.db.set_single_item('a', ['12'], ctx=None)
176
        mock_redis.pipeline.rpush.assert_called_once_with('a', '12')
177
        mock_redis.pipeline.delete.assert_called_once_with('a')
178
179
    def test_get_pattern(self, mock_redis):
180
        mock_redis.keys.return_value = ['a', 'b']
181
        mock_redis.lrange.return_value = [1, 2, 3]
182
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
183
            ret = self.db.get_pattern('a')
184
        self.assertEqual(ret, [['a', [1, 2, 3]], ['b', [1, 2, 3]]])
185
186
    def test_get_pattern_error(self, mock_redis):
187
        with self.assertRaises(RequiredArgument):
188
            self.db.get_pattern(None)
189
190
    def test_get_elem_pattern_by_index_error(self, mock_redis):
191
        with self.assertRaises(RequiredArgument):
192
            self.db.get_elem_pattern_by_index(None)
193
194
    def test_get_elem_pattern_by_index(self, mock_redis):
195
        mock_redis.keys.return_value = ['aa', 'ab']
196
        mock_redis.lindex.side_effect = [1, 2]
197
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
198
            ret = self.db.get_elem_pattern_by_index('a')
199
        self.assertEqual(ret, [['aa', 1], ['ab', 2]])
200
201
    def test_release_db(self, mock_redis):
202
        mock_redis.delete.return_value = None
203
        mock_redis.flushdb.return_value = None
204
        mock_redis.hdel.return_value = 1
205
        with patch.object(OpenvasDB, 'kb_connect', return_value=mock_redis):
206
            self.db.release_db(3)
207
        mock_redis.hdel.assert_called_once_with('GVM.__GlobalDBIndex', 3)
208
209
    def test_get_result(self, mock_redis):
210
        mock_redis.rpop.return_value = 'some result'
211
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
212
            ret = self.db.get_result()
213
        self.assertEqual(ret, 'some result')
214
215
    def test_get_status(self, mock_redis):
216
        mock_redis.rpop.return_value = 'some status'
217
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
218
            ret = self.db.get_status()
219
        self.assertEqual(ret, 'some status')
220
221
    def test_get_stime(self, mock_redis):
222
        mock_redis.rpop.return_value = 'some start time'
223
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
224
            ret = self.db.get_host_scan_scan_start_time()
225
        self.assertEqual(ret, 'some start time')
226
227
    def test_get_etime(self, mock_redis):
228
        mock_redis.rpop.return_value = 'some end time'
229
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
230
            ret = self.db.get_host_scan_scan_end_time()
231
        self.assertEqual(ret, 'some end time')
232
233
    def test_get_host_ip(self, mock_redis):
234
        mock_redis.lindex.return_value = '192.168.0.1'
235
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
236
            ret = self.db.get_host_ip()
237
        self.assertEqual(ret, '192.168.0.1')
238