Completed
Push — master ( 22e702...524bc6 )
by Juan José
14s queued 12s
created

tests.test_db.TestDB.test_set_single_item()   A

Complexity

Conditions 2

Size

Total Lines 9
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 9
nop 2
dl 0
loc 9
rs 9.95
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018-2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-2.0-or-later
5
#
6
# This program is free software; you can redistribute it and/or
7
# modify it under the terms of the GNU General Public License
8
# as published by the Free Software Foundation; either version 2
9
# of the License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
20
# pylint: disable=unused-argument
21
22
""" Unit Test for ospd-openvas """
23
24
from unittest import TestCase
25
from unittest.mock import patch
26
27
from redis.exceptions import ConnectionError as RCE
28
29
from ospd.errors import RequiredArgument
30
from ospd_openvas.db import OpenvasDB, time
31
from ospd_openvas.errors import OspdOpenvasError
32
33
34
@patch('ospd_openvas.db.redis.Redis')
35
class TestDB(TestCase):
36
    def setUp(self):
37
        self.db = OpenvasDB()
38
39
    def test_parse_openvas_db_addres(self, mock_redis):
40
        with self.assertRaises(OspdOpenvasError):
41
            self.db._parse_openvas_db_address(  # pylint: disable=protected-access
42
                b'somedata'
43
            )
44
45
    @patch('ospd_openvas.db.subprocess')
46
    def test_get_db_connection(self, mock_subproc, mock_redis):
47
        # it is none
48
        self.assertIsNone(self.db.db_address)
49
        # set the first time
50
        mock_subproc.check_output.return_value = 'db_address = /foo/bar'.encode()
51
        self.db.get_db_connection()
52
        self.assertEqual(self.db.db_address, "/foo/bar")
53
54
        # return immediately because already set
55
        self.db.get_db_connection()
56
        self.assertEqual(self.db.db_address, "/foo/bar")
57
58
    def test_max_db_index_fail(self, mock_redis):
59
        mock_redis.config_get.return_value = {}
60
        with patch.object(OpenvasDB, 'kb_connect', return_value=mock_redis):
61
            with self.assertRaises(OspdOpenvasError):
62
                self.db.max_db_index()
63
64
    def test_set_ctx_with_error(self, mock_redis):
65
        with self.assertRaises(RequiredArgument):
66
            self.db.set_redisctx(None)
67
68
    def test_set_ctx(self, mock_redis):
69
        self.db.set_redisctx(mock_redis)
70
        self.assertIs(self.db.rediscontext, mock_redis)
71
72
    def test_try_db_index_success(self, mock_redis):
73
        mock_redis.hsetnx.return_value = 1
74
        ret = self.db.try_database_index(mock_redis, 1)
75
        self.assertEqual(ret, True)
76
77
    def test_try_db_index_no_succes(self, mock_redis):
78
        mock_redis.hsetnx.return_value = 0
79
        ret = self.db.try_database_index(mock_redis, 1)
80
        self.assertEqual(ret, False)
81
82
    def test_try_db_index_error(self, mock_redis):
83
        mock_redis.hsetnx.side_effect = Exception
84
        with self.assertRaises(OspdOpenvasError):
85
            self.db.try_database_index(mock_redis, 1)
86
87
    def test_kb_connect(self, mock_redis):
88
        mock_redis.side_effect = RCE
89
        with patch.object(OpenvasDB, 'get_db_connection', return_value=None):
90
            with patch.object(time, 'sleep', return_value=None):
91
                with self.assertRaises(OspdOpenvasError):
92
                    self.db.kb_connect()
93
94
    def test_kb_new_fail(self, mock_redis):
95
        ret = self.db.kb_new()
96
        self.assertEqual(ret, None)
97
98
    def test_kb_new(self, mock_redis):
99
        with patch.object(OpenvasDB, 'db_find', return_value=mock_redis):
100
            with patch.object(
101
                OpenvasDB, 'try_database_index', return_value=True
102
            ):
103
                with patch.object(
104
                    OpenvasDB, 'kb_connect', return_value=mock_redis
105
                ):
106
                    self.db.max_dbindex = 10
107
                    ret = self.db.kb_new()
108
        self.assertIs(ret, mock_redis)
109
110
    def test_get_kb_context(self, mock_redis):
111
        self.db.rediscontext = mock_redis
112
        ret = self.db.get_kb_context()
113
        self.assertIs(ret, mock_redis)
114
115
    def test_get_kb_context_fail(self, mock_redis):
116
        with patch.object(OpenvasDB, 'db_find', return_value=None):
117
            with self.assertRaises(OspdOpenvasError):
118
                self.db.get_kb_context()
119
120
    def test_select_kb_error(self, mock_redis):
121
        with self.assertRaises(RequiredArgument):
122
            self.db.select_kb(None, 1)
123
124
    def test_select_kb_error1(self, mock_redis):
125
        with self.assertRaises(RequiredArgument):
126
            self.db.select_kb(mock_redis, None)
127
128
    def test_select_kb(self, mock_redis):
129
        mock_redis.execute_command.return_value = mock_redis
130
        self.db.select_kb(mock_redis, 1, True)
131
        self.assertEqual(self.db.db_index, '1')
132
        self.assertIs(self.db.rediscontext, mock_redis)
133
134
    def test_get_list_item_fail(self, mock_redis):
135
        with self.assertRaises(RequiredArgument):
136
            self.db.get_list_item(None)
137
138
    def test_get_list_item(self, mock_redis):
139
        mock_redis.lrange.return_value = ['1234']
140
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
141
            ret = self.db.get_list_item('name', ctx=None)
142
        self.assertEqual(ret, ['1234'])
143
144
    def test_rm_list_item(self, mock_redis):
145
        mock_redis.lrem.return_value = 1
146
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
147
            self.db.remove_list_item('name', '1234', ctx=None)
148
        mock_redis.lrem.assert_called_once_with('name', count=0, value='1234')
149
150
    def test_rm_list_item_error(self, mock_redis):
151
        with self.assertRaises(RequiredArgument):
152
            self.db.remove_list_item('1', None)
153
154
    def test_rm_list_item_error1(self, mock_redis):
155
        with self.assertRaises(RequiredArgument):
156
            self.db.remove_list_item(None, '1')
157
158
    def test_get_single_item_error(self, mock_redis):
159
        with self.assertRaises(RequiredArgument):
160
            self.db.get_single_item(None, '1')
161
162
    def test_get_single_item(self, mock_redis):
163
        mock_redis.lindex.return_value = 'a'
164
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
165
            self.db.get_single_item('a', ctx=None)
166
        mock_redis.lindex.assert_called_once_with('a', 0)
167
168
    def test_add_single_item(self, mock_redis):
169
        mock_redis.rpush.return_value = 1
170
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
171
            self.db.add_single_item('a', ['12'], ctx=None)
172
        mock_redis.rpush.assert_called_once_with('a', '12')
173
174
    def test_add_single_item_error(self, mock_redis):
175
        with self.assertRaises(RequiredArgument):
176
            self.db.add_single_item(None, '1')
177
178
    def test_add_single_item_error1(self, mock_redis):
179
        with self.assertRaises(RequiredArgument):
180
            self.db.add_single_item('1', None)
181
182
    def test_set_single_item_error(self, mock_redis):
183
        with self.assertRaises(RequiredArgument):
184
            self.db.set_single_item(None, '1')
185
186
    def test_set_single_item_error1(self, mock_redis):
187
        with self.assertRaises(RequiredArgument):
188
            self.db.set_single_item('1', None)
189
190
    def test_set_single_item(self, mock_redis):
191
        mock_redis.pipeline.return_value = mock_redis.pipeline
192
        mock_redis.pipeline.delete.return_value = None
193
        mock_redis.pipeline.rpush.return_value = None
194
        mock_redis.execute.return_value = None
195
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
196
            self.db.set_single_item('a', ['12'], ctx=None)
197
        mock_redis.pipeline.rpush.assert_called_once_with('a', '12')
198
        mock_redis.pipeline.delete.assert_called_once_with('a')
199
200
    def test_get_pattern(self, mock_redis):
201
        mock_redis.keys.return_value = ['a', 'b']
202
        mock_redis.lrange.return_value = [1, 2, 3]
203
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
204
            ret = self.db.get_pattern('a')
205
        self.assertEqual(ret, [['a', [1, 2, 3]], ['b', [1, 2, 3]]])
206
207
    def test_get_pattern_error(self, mock_redis):
208
        with self.assertRaises(RequiredArgument):
209
            self.db.get_pattern(None)
210
211
    def test_get_elem_pattern_by_index_error(self, mock_redis):
212
        with self.assertRaises(RequiredArgument):
213
            self.db.get_elem_pattern_by_index(None)
214
215
    def test_get_elem_pattern_by_index(self, mock_redis):
216
        mock_redis.keys.return_value = ['aa', 'ab']
217
        mock_redis.lindex.side_effect = [1, 2]
218
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
219
            ret = self.db.get_elem_pattern_by_index('a')
220
        self.assertEqual(ret, [['aa', 1], ['ab', 2]])
221
222
    def test_release_db(self, mock_redis):
223
        mock_redis.delete.return_value = None
224
        mock_redis.flushdb.return_value = None
225
        mock_redis.hdel.return_value = 1
226
        with patch.object(OpenvasDB, 'kb_connect', return_value=mock_redis):
227
            self.db.release_db(3)
228
        mock_redis.hdel.assert_called_once_with('GVM.__GlobalDBIndex', 3)
229
230
    def test_get_result(self, mock_redis):
231
        mock_redis.rpop.return_value = 'some result'
232
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
233
            ret = self.db.get_result()
234
        self.assertEqual(ret, 'some result')
235
236
    def test_get_status(self, mock_redis):
237
        mock_redis.rpop.return_value = 'some status'
238
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
239
            ret = self.db.get_status()
240
        self.assertEqual(ret, 'some status')
241
242
    def test_get_stime(self, mock_redis):
243
        mock_redis.rpop.return_value = 'some start time'
244
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
245
            ret = self.db.get_host_scan_scan_start_time()
246
        self.assertEqual(ret, 'some start time')
247
248
    def test_get_etime(self, mock_redis):
249
        mock_redis.rpop.return_value = 'some end time'
250
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
251
            ret = self.db.get_host_scan_scan_end_time()
252
        self.assertEqual(ret, 'some end time')
253
254
    def test_get_host_ip(self, mock_redis):
255
        mock_redis.lindex.return_value = '192.168.0.1'
256
        with patch.object(OpenvasDB, 'get_kb_context', return_value=mock_redis):
257
            ret = self.db.get_host_ip()
258
        self.assertEqual(ret, '192.168.0.1')
259