Passed
Pull Request — master (#195)
by Juan José
01:25
created

tests.test_db.TestDB.test_get_key_count()   A

Complexity

Conditions 2

Size

Total Lines 5
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 5
nop 2
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018-2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-2.0-or-later
5
#
6
# This program is free software; you can redistribute it and/or
7
# modify it under the terms of the GNU General Public License
8
# as published by the Free Software Foundation; either version 2
9
# of the License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
20
# pylint: disable=unused-argument
21
22
""" Unit Test for ospd-openvas """
23
24
from unittest import TestCase
25
from unittest.mock import patch, MagicMock
26
27
from redis.exceptions import ConnectionError as RCE
28
29
from ospd.errors import RequiredArgument
30
from ospd_openvas.db import OpenvasDB, time
31
from ospd_openvas.errors import OspdOpenvasError
32
33
34
@patch('ospd_openvas.db.redis.Redis')
35
class TestDB(TestCase):
36
    def setUp(self):
37
        self.db = OpenvasDB()
38
39
    @patch('ospd_openvas.db.Openvas')
40
    def test_get_db_connection(
41
        self, mock_openvas: MagicMock, mock_redis: MagicMock
42
    ):
43
        # it is none
44
        self.assertIsNone(self.db.db_address)
45
        # set the first time
46
        mock_openvas.get_settings.return_value = {'db_address': '/foo/bar'}
47
48
        self.db.get_db_connection()
49
        self.assertEqual(self.db.db_address, "/foo/bar")
50
51
        # return immediately because already set
52
        self.db.get_db_connection()
53
        self.assertEqual(self.db.db_address, "/foo/bar")
54
55
        self.assertEqual(mock_openvas.get_settings.call_count, 1)
56
57
    def test_max_db_index_fail(self, mock_redis):
58
        mock_redis.config_get.return_value = {}
59
        with patch.object(OpenvasDB, 'kb_connect', return_value=mock_redis):
60
            with self.assertRaises(OspdOpenvasError):
61
                self.db.max_db_index()
62
63
    def test_set_ctx_with_error(self, mock_redis):
64
        with self.assertRaises(RequiredArgument):
65
            self.db.set_redisctx(None)
66
67
    def test_set_ctx(self, mock_redis):
68
        self.db.set_redisctx(mock_redis)
69
        self.assertIs(self.db.rediscontext, mock_redis)
70
71
    def test_try_db_index_success(self, mock_redis):
72
        mock_redis.hsetnx.return_value = 1
73
        ret = self.db.try_database_index(mock_redis, 1)
74
        self.assertEqual(ret, True)
75
76
    def test_try_db_index_no_succes(self, mock_redis):
77
        mock_redis.hsetnx.return_value = 0
78
        ret = self.db.try_database_index(mock_redis, 1)
79
        self.assertEqual(ret, False)
80
81
    def test_try_db_index_error(self, mock_redis):
82
        mock_redis.hsetnx.side_effect = Exception
83
        with self.assertRaises(OspdOpenvasError):
84
            self.db.try_database_index(mock_redis, 1)
85
86
    def test_kb_connect(self, mock_redis):
87
        mock_redis.side_effect = RCE
88
        with patch.object(OpenvasDB, 'get_db_connection', return_value=None):
89
            with patch.object(time, 'sleep', return_value=None):
90
                with self.assertRaises(SystemExit):
91
                    self.db.kb_connect()
92
93
    def test_kb_new_fail(self, mock_redis):
94
        ret = self.db.kb_new()
95
        self.assertEqual(ret, None)
96
97
    def test_kb_new(self, mock_redis):
98
        with patch.object(OpenvasDB, 'db_find', return_value=mock_redis):
99
            with patch.object(
100
                OpenvasDB, 'try_database_index', return_value=True
101
            ):
102
                with patch.object(
103
                    OpenvasDB, 'kb_connect', return_value=mock_redis
104
                ):
105
                    self.db.max_dbindex = 10
106
                    ret = self.db.kb_new()
107
        self.assertIs(ret, mock_redis)
108
109
    def test_get_kb_context(self, mock_redis):
110
        self.db.rediscontext = mock_redis
111
        ret = self.db.get_kb_context()
112
        self.assertIs(ret, mock_redis)
113
114
    def test_get_kb_context_fail(self, mock_redis):
115
        with patch.object(OpenvasDB, 'db_find', return_value=None):
116
            with self.assertRaises(OspdOpenvasError):
117
                self.db.get_kb_context()
118
119
    def test_select_kb_error(self, mock_redis):
120
        with self.assertRaises(RequiredArgument):
121
            self.db.select_kb(None, 1)
122
123
    def test_select_kb_error1(self, mock_redis):
124
        with self.assertRaises(RequiredArgument):
125
            self.db.select_kb(mock_redis, None)
126
127
    def test_select_kb(self, mock_redis):
128
        mock_redis.execute_command.return_value = mock_redis
129
        self.db.select_kb(mock_redis, 1, True)
130
        self.assertEqual(self.db.db_index, '1')
131
        self.assertIs(self.db.rediscontext, mock_redis)
132
133
    def test_get_list_item_fail(self, mock_redis):
134
        with self.assertRaises(RequiredArgument):
135
            self.db.get_list_item(None)
136
137
    def test_get_list_item(self, mock_redis):
138
        mock_redis.lrange.return_value = ['1234']
139
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
140
            ret = self.db.get_list_item('name', ctx=None)
141
        self.assertEqual(ret, ['1234'])
142
143
    def test_rm_list_item(self, mock_redis):
144
        mock_redis.lrem.return_value = 1
145
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
146
            self.db.remove_list_item('name', '1234', ctx=None)
147
        mock_redis.lrem.assert_called_once_with('name', count=0, value='1234')
148
149
    def test_rm_list_item_error(self, mock_redis):
150
        with self.assertRaises(RequiredArgument):
151
            self.db.remove_list_item('1', None)
152
153
    def test_rm_list_item_error1(self, mock_redis):
154
        with self.assertRaises(RequiredArgument):
155
            self.db.remove_list_item(None, '1')
156
157
    def test_get_single_item_error(self, mock_redis):
158
        with self.assertRaises(RequiredArgument):
159
            self.db.get_single_item(None, '1')
160
161
    def test_get_single_item(self, mock_redis):
162
        mock_redis.lindex.return_value = 'a'
163
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
164
            self.db.get_single_item('a', ctx=None)
165
        mock_redis.lindex.assert_called_once_with('a', 0)
166
167
    def test_add_single_item(self, mock_redis):
168
        mock_redis.rpush.return_value = 1
169
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
170
            self.db.add_single_item('a', ['12'], ctx=None)
171
        mock_redis.rpush.assert_called_once_with('a', '12')
172
173
    def test_add_single_item_error(self, mock_redis):
174
        with self.assertRaises(RequiredArgument):
175
            self.db.add_single_item(None, '1')
176
177
    def test_add_single_item_error1(self, mock_redis):
178
        with self.assertRaises(RequiredArgument):
179
            self.db.add_single_item('1', None)
180
181
    def test_set_single_item_error(self, mock_redis):
182
        with self.assertRaises(RequiredArgument):
183
            self.db.set_single_item(None, '1')
184
185
    def test_set_single_item_error1(self, mock_redis):
186
        with self.assertRaises(RequiredArgument):
187
            self.db.set_single_item('1', None)
188
189
    def test_set_single_item(self, mock_redis):
190
        mock_redis.pipeline.return_value = mock_redis.pipeline
191
        mock_redis.pipeline.delete.return_value = None
192
        mock_redis.pipeline.rpush.return_value = None
193
        mock_redis.execute.return_value = None
194
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
195
            self.db.set_single_item('a', ['12'], ctx=None)
196
        mock_redis.pipeline.rpush.assert_called_once_with('a', '12')
197
        mock_redis.pipeline.delete.assert_called_once_with('a')
198
199
    def test_get_pattern(self, mock_redis):
200
        mock_redis.keys.return_value = ['a', 'b']
201
        mock_redis.lrange.return_value = [1, 2, 3]
202
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
203
            ret = self.db.get_pattern('a')
204
        self.assertEqual(ret, [['a', [1, 2, 3]], ['b', [1, 2, 3]]])
205
206
    def test_get_pattern_error(self, mock_redis):
207
        with self.assertRaises(RequiredArgument):
208
            self.db.get_pattern(None)
209
210
    def test_get_elem_pattern_by_index_error(self, mock_redis):
211
        with self.assertRaises(RequiredArgument):
212
            self.db.get_elem_pattern_by_index(None)
213
214
    def test_get_elem_pattern_by_index(self, mock_redis):
215
        mock_redis.keys.return_value = ['aa', 'ab']
216
        mock_redis.lindex.side_effect = [1, 2]
217
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
218
            ret = self.db.get_elem_pattern_by_index('a')
219
        self.assertEqual(ret, [['aa', 1], ['ab', 2]])
220
221
    def test_release_db(self, mock_redis):
222
        mock_redis.delete.return_value = None
223
        mock_redis.flushdb.return_value = None
224
        mock_redis.hdel.return_value = 1
225
        with patch.object(OpenvasDB, 'kb_connect', return_value=mock_redis):
226
            self.db.release_db(3)
227
        mock_redis.hdel.assert_called_once_with('GVM.__GlobalDBIndex', 3)
228
229
    def test_get_result(self, mock_redis):
230
        mock_redis.rpop.return_value = 'some result'
231
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
232
            ret = self.db.get_result()
233
        self.assertEqual(ret, 'some result')
234
235
    def test_get_status(self, mock_redis):
236
        mock_redis.rpop.return_value = 'some status'
237
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
238
            ret = self.db.get_status()
239
        self.assertEqual(ret, 'some status')
240
241
    def test_get_stime(self, mock_redis):
242
        mock_redis.rpop.return_value = 'some start time'
243
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
244
            ret = self.db.get_host_scan_scan_start_time()
245
        self.assertEqual(ret, 'some start time')
246
247
    def test_get_etime(self, mock_redis):
248
        mock_redis.rpop.return_value = 'some end time'
249
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
250
            ret = self.db.get_host_scan_scan_end_time()
251
        self.assertEqual(ret, 'some end time')
252
253
    def test_get_host_ip(self, mock_redis):
254
        mock_redis.lindex.return_value = '192.168.0.1'
255
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
256
            ret = self.db.get_host_ip()
257
        self.assertEqual(ret, '192.168.0.1')
258
259
    def test_get_key_count(self, mock_redis):
260
        mock_redis.keys.return_value = ['aa', 'ab']
261
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
262
            ret = self.db.get_key_count("")
263
        self.assertEqual(ret, 2)
264