Passed
Pull Request — master (#68)
by Juan José
01:33
created

tests.testDB.TestDB.test_get_host_ip()   A

Complexity

Conditions 2

Size

Total Lines 6
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 6
nop 2
dl 0
loc 6
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-2.0-or-later
5
#
6
# This program is free software; you can redistribute it and/or
7
# modify it under the terms of the GNU General Public License
8
# as published by the Free Software Foundation; either version 2
9
# of the License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
20
""" Unit Test for ospd-openvas """
21
22
from unittest import TestCase
23
from unittest.mock import patch
24
from ospd_openvas.db import OpenvasDB
25
from ospd_openvas.errors import OSPDOpenvasError, RequiredArgument
26
from redis.exceptions import ConnectionError as RCE
27
28
@patch('ospd_openvas.db.redis.Redis')
29
class TestDB(TestCase):
30
31
    def setUp(self):
32
        self.db = OpenvasDB()
33
34
    def test_parse_openvassd_db_addres(self, mock_redis):
35
        with self.assertRaises(OSPDOpenvasError):
36
            self.db._parse_openvassd_db_address(b'somedata')
37
38
    def test_max_db_index_fail(self, mock_redis):
39
        mock_redis.config_get.return_value = {}
40
        with patch.object(OpenvasDB,
41
                          'kb_connect', return_value=mock_redis):
42
            with self.assertRaises(OSPDOpenvasError):
43
                self.db.max_db_index()
44
45
    def test_set_ctx_with_error(self, mock_redis):
46
        with self.assertRaises(RequiredArgument):
47
            self.db.set_redisctx(None)
48
49
    def test_set_ctx(self, mock_redis):
50
        self.db.set_redisctx(mock_redis)
51
        self.assertIs(self.db.rediscontext, mock_redis)
52
53
    def test_try_db_index_success(self, mock_redis):
54
        mock_redis.hsetnx.return_value = 1
55
        ret = self.db.try_database_index(mock_redis, 1)
56
        self.assertEqual(ret, True)
57
58
    def test_try_db_index_no_succes(self, mock_redis):
59
        mock_redis.hsetnx.return_value = 0
60
        ret = self.db.try_database_index(mock_redis, 1)
61
        self.assertEqual(ret, False)
62
63
    def test_try_db_index_error(self, mock_redis):
64
        mock_redis.hsetnx.side_effect = Exception
65
        with self.assertRaises(OSPDOpenvasError):
66
            self.db.try_database_index(mock_redis, 1)
67
68
    def test_kb_connect(self, mock_redis):
69
        mock_redis.side_effect = RCE
70
        with patch.object(OpenvasDB,
71
                          'get_db_connection', return_value=None):
72
            with self.assertRaises(OSPDOpenvasError):
73
                self.db.kb_connect()
74
75
    def test_kb_new_fail(self, mock_redis):
76
        ret = self.db.kb_new()
77
        self.assertEqual(ret, None)
78
79
    def test_kb_new(self, mock_redis):
80
        with patch.object(OpenvasDB,
81
                          'db_find', return_value=mock_redis):
82
            with patch.object(OpenvasDB,
83
                              'try_database_index', return_value=True):
84
                with patch.object(OpenvasDB,
85
                                  'kb_connect', return_value=mock_redis):
86
                    self.db.max_dbindex = 10
87
                    ret = self.db.kb_new()
88
        self.assertIs(ret, mock_redis)
89
90
    def test_get_kb_context(self, mock_redis):
91
        self.db.rediscontext = mock_redis
92
        ret = self.db.get_kb_context()
93
        self.assertIs(ret, mock_redis)
94
95
    def test_get_kb_context_fail(self, mock_redis):
96
        with patch.object(OpenvasDB,
97
                          'db_find', return_value=None):
98
            with self.assertRaises(OSPDOpenvasError):
99
                self.db.get_kb_context()
100
101
    def test_select_kb_error(self, mock_redis):
102
        with self.assertRaises(RequiredArgument):
103
            self.db.select_kb(None, 1)
104
105
    def test_select_kb_error1(self, mock_redis):
106
        with self.assertRaises(RequiredArgument):
107
            self.db.select_kb(mock_redis, None)
108
109
    def test_select_kb(self, mock_redis):
110
        mock_redis.execute_command.return_value = mock_redis
111
        self.db.select_kb(mock_redis, 1, True)
112
        self.assertEqual(self.db.db_index, '1')
113
        self.assertIs(self.db.rediscontext, mock_redis)
114
115
    def test_get_list_item_fail(self, mock_redis):
116
        with self.assertRaises(RequiredArgument):
117
            self.db.get_list_item(None)
118
119
    def test_get_list_item(self, mock_redis):
120
        mock_redis.lrange.return_value = ['1234']
121
        with patch.object(OpenvasDB,
122
                          'get_kb_context', return_value=mock_redis):
123
            ret = self.db.get_list_item('name', ctx=None)
124
        self.assertEqual(ret, ['1234'])
125
126
    def test_rm_list_item(self, mock_redis):
127
        mock_redis.lrem.return_value = 1
128
        with patch.object(OpenvasDB,
129
                          'get_kb_context', return_value=mock_redis):
130
            self.db.remove_list_item('name', '1234',ctx=None)
131
        mock_redis.lrem.assert_called_once_with('name', count=0, value='1234')
132
133
    def test_rm_list_item_error(self, mock_redis):
134
        with self.assertRaises(RequiredArgument):
135
            self.db.remove_list_item('1', None)
136
137
    def test_rm_list_item_error1(self, mock_redis):
138
        with self.assertRaises(RequiredArgument):
139
            self.db.remove_list_item(None, '1')
140
141
    def test_get_single_item_error(self, mock_redis):
142
        with self.assertRaises(RequiredArgument):
143
            self.db.get_single_item(None, '1')
144
145
    def test_get_single_item(self, mock_redis):
146
        mock_redis.lindex.return_value = 'a'
147
        with patch.object(OpenvasDB,
148
                          'get_kb_context', return_value=mock_redis):
149
            self.db.get_single_item('a', ctx=None)
150
        mock_redis.lindex.assert_called_once_with('a', 0)
151
152
    def test_add_single_item(self, mock_redis):
153
        mock_redis.rpush.return_value = 1
154
        with patch.object(OpenvasDB,
155
                          'get_kb_context', return_value=mock_redis):
156
            self.db.add_single_item('a', ['12'], ctx=None)
157
        mock_redis.rpush.assert_called_once_with('a', '12')
158
159
    def test_add_single_item_error(self, mock_redis):
160
        with self.assertRaises(RequiredArgument):
161
            self.db.add_single_item(None, '1')
162
163
    def test_add_single_item_error1(self, mock_redis):
164
        with self.assertRaises(RequiredArgument):
165
            self.db.add_single_item('1', None)
166
167
    def test_set_single_item_error(self, mock_redis):
168
        with self.assertRaises(RequiredArgument):
169
            self.db.set_single_item(None, '1')
170
171
    def test_set_single_item_error1(self, mock_redis):
172
        with self.assertRaises(RequiredArgument):
173
            self.db.set_single_item('1', None)
174
175
    def test_set_single_item(self, mock_redis):
176
        mock_redis.pipeline.return_value = mock_redis.pipeline
177
        mock_redis.pipeline.delete.return_value = None
178
        mock_redis.pipeline.rpush.return_value = None
179
        mock_redis.execute.return_value = None
180
        with patch.object(OpenvasDB,
181
                          'get_kb_context', return_value=mock_redis):
182
            self.db.set_single_item('a', ['12'], ctx=None)
183
        mock_redis.pipeline.rpush.assert_called_once_with('a', '12')
184
        mock_redis.pipeline.delete.assert_called_once_with('a')
185
186
    def test_get_pattern(self, mock_redis):
187
        mock_redis.keys.return_value = ['a', 'b']
188
        mock_redis.lrange.return_value = [1, 2, 3]
189
        with patch.object(OpenvasDB,
190
                          'get_kb_context', return_value=mock_redis):
191
            ret = self.db.get_pattern('a')
192
        self.assertEqual(ret, [['a', [1, 2, 3]], ['b', [1, 2, 3]]])
193
194
    def test_get_pattern_error(self, mock_redis):
195
        with self.assertRaises(RequiredArgument):
196
            self.db.get_pattern(None)
197
198
    def test_get_elem_pattern_by_index_error(self, mock_redis):
199
        with self.assertRaises(RequiredArgument):
200
            self.db.get_elem_pattern_by_index(None)
201
202
    def test_get_elem_pattern_by_index(self, mock_redis):
203
        mock_redis.keys.return_value = ['aa', 'ab']
204
        mock_redis.lindex.side_effect = [1, 2]
205
        with patch.object(OpenvasDB,
206
                          'get_kb_context', return_value=mock_redis):
207
            ret = self.db.get_elem_pattern_by_index('a')
208
        self.assertEqual(ret, [['aa', 1], ['ab', 2]])
209
210
    def test_release_db(self, mock_redis):
211
        mock_redis.delete.return_value = None
212
        mock_redis.flushdb.return_value = None
213
        mock_redis.hdel.return_value = 1
214
        with patch.object(OpenvasDB,
215
                          'kb_connect', return_value=mock_redis):
216
            self.db.release_db(3)
217
        mock_redis.hdel.assert_called_once_with('GVM.__GlobalDBIndex', 3)
218
219
    def test_get_result(self, mock_redis):
220
        mock_redis.rpop.return_value = 'some result'
221
        with patch.object(OpenvasDB,
222
                          'get_kb_context', return_value=mock_redis):
223
            ret = self.db.get_result()
224
        self.assertEqual(ret, 'some result')
225
226
    def test_get_status(self, mock_redis):
227
        mock_redis.rpop.return_value = 'some status'
228
        with patch.object(OpenvasDB,
229
                          'get_kb_context', return_value=mock_redis):
230
            ret = self.db.get_status()
231
        self.assertEqual(ret, 'some status')
232
233
    def test_get_stime(self, mock_redis):
234
        mock_redis.rpop.return_value = 'some start time'
235
        with patch.object(OpenvasDB,
236
                          'get_kb_context', return_value=mock_redis):
237
            ret = self.db.get_host_scan_scan_start_time()
238
        self.assertEqual(ret, 'some start time')
239
240
    def test_get_etime(self, mock_redis):
241
        mock_redis.rpop.return_value = 'some end time'
242
        with patch.object(OpenvasDB,
243
                          'get_kb_context', return_value=mock_redis):
244
            ret = self.db.get_host_scan_scan_end_time()
245
        self.assertEqual(ret, 'some end time')
246
247
    def test_get_host_ip(self, mock_redis):
248
        mock_redis.lindex.return_value = '192.168.0.1'
249
        with patch.object(OpenvasDB,
250
                          'get_kb_context', return_value=mock_redis):
251
            ret = self.db.get_host_ip()
252
        self.assertEqual(ret, '192.168.0.1')
253