Completed
Push — master ( 2fc1a9...66c124 )
by Juan José
22s queued 12s
created

tests.test_db   F

Complexity

Total Complexity 95

Size/Duplication

Total Lines 758
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 467
dl 0
loc 758
rs 2
c 0
b 0
f 0
wmc 95

67 Methods

Rating   Name   Duplication   Size   Complexity  
A TestOpenvasDB.test_get_single_item() 0 8 1
A TestOpenvasDB.test_get_list_item() 0 8 1
A TestOpenvasDB.test_get_pattern_error() 0 8 3
A TestOpenvasDB.test_get_list_item_error() 0 8 3
A TestOpenvasDB.test_add_single_item_error() 0 11 4
A TestOpenvasDB.test_set_single_item_error() 0 11 4
A TestOpenvasDB.test_get_pattern() 0 8 1
A TestOpenvasDB.test_get_last_list_item() 0 8 1
A TestOpenvasDB.test_get_single_item_error() 0 8 3
A TestOpenvasDB.test_select_database_error() 0 6 3
A TestOpenvasDB.test_get_last_list_item_error() 0 8 3
A TestOpenvasDB.test_select_database() 0 6 1
A TestOpenvasDB.test_create_context_fail() 0 11 3
A TestOpenvasDB.test_remove_list_item() 0 7 1
A TestOpenvasDB.test_add_single_item() 0 7 1
A TestOpenvasDB.test_remove_list_item_error() 0 11 4
A TestOpenvasDB.test_set_single_item() 0 12 1
A TestOpenvasDB.test_get_db_connection() 0 20 1
A TestOpenvasDB.test_create_context_success() 0 4 1
A MainDBTestCase.test_try_database_success() 0 10 1
A KbDBTestCase.test_target_is_finished_false() 0 18 1
A TestOpenvasDB.test_get_key_count() 0 9 1
A TestOpenvasDB.test_get_filenames_and_oids() 0 8 1
A MainDBTestCase.test_max_database_index() 0 10 1
A ScanDBTestCase.test_host_is_finished_false() 0 8 1
A MainDBTestCase.test_max_database_index_fail() 0 12 2
A TestOpenvasDB.test_get_filenames_and_oids_error() 0 5 2
A MainDBTestCase.test_try_database_false() 0 10 1
A ScanDBTestCase.test_get_scan_status() 0 8 1
A TestOpenvasDB.test_get_keys_by_pattern_error() 0 8 3
A KbDBTestCase.test_target_is_finished_true() 0 24 1
A MainDBTestCase.test_try_db_index_error() 0 8 2
A MainDBTestCase.test_find_kb_database_by_scan_id_none() 0 20 1
A KbDBTestCase.test_add_scan_process_id() 0 5 1
A KbDBTestCase.test_scan_is_stopped_false() 0 8 1
A TestOpenvasDB.test_get_key_count_error() 0 3 2
A KbDBTestCase.test_stop_scan() 0 5 1
A KbDBTestCase.test_flush() 0 4 1
A MainDBTestCase.test_find_kb_database_by_scan_id() 0 19 1
A KbDBTestCase.setUp() 0 4 1
A TestOpenvasDB.test_get_keys_by_pattern() 0 8 1
A ScanDBTestCase.test_host_is_finished_true() 0 8 1
A KbDBTestCase.test_remove_scan_database() 0 8 1
A MainDBTestCase.test_release_database() 0 11 1
A KbDBTestCase.test_add_scan_id() 0 22 1
A KbDBTestCase.test_add_scan_preferences() 0 7 1
A MainDBTestCase.test_release() 0 8 1
A MainDBTestCase.test_release_database_by_index() 0 9 1
A TestOpenvasDB.test_get_key_count_with_default_pattern() 0 9 1
A ScanDBTestCase.test_get_result() 0 8 1
A ScanDBTestCase.test_get_host_ip() 0 8 1
A KbDBTestCase.test_get_status() 0 8 1
A TestOpenvasDB.test_find_database_by_pattern() 0 10 1
A ScanDBTestCase.setUp() 0 4 1
A MainDBTestCase.test_get_new_kb_database() 0 12 1
A ScanDBTestCase.test_get_host_scan_start_time() 0 8 1
A KbDBTestCase.test_get_result() 0 8 1
A MainDBTestCase.test_get_new_kb_database_none() 0 12 1
A KbDBTestCase.test_get_scan_process_id() 0 8 1
A ScanDBTestCase.test_get_host_scan_end_time() 0 8 1
A TestOpenvasDB.test_find_database_by_pattern_none() 0 8 1
A ScanDBTestCase.test_get_status() 0 8 1
A KbDBTestCase.test_get_scan_databases() 0 21 2
A ScanDBTestCase.test_select() 0 7 1
A KbDBTestCase.test_scan_is_stopped_true() 0 8 1
A ScanDBTestCase.test_flush() 0 4 1
A ScanDBTestCase.test_get_scan_id() 0 8 1

How to fix   Complexity   

Complexity

Complex classes like tests.test_db often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018-2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-2.0-or-later
5
#
6
# This program is free software; you can redistribute it and/or
7
# modify it under the terms of the GNU General Public License
8
# as published by the Free Software Foundation; either version 2
9
# of the License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
20
# pylint: disable=unused-argument
21
22
""" Unit Test for ospd-openvas """
23
24
import logging
25
26
from unittest import TestCase
27
from unittest.mock import patch, MagicMock
28
29
from redis.exceptions import ConnectionError as RCE
30
31
from ospd.errors import RequiredArgument
32
from ospd_openvas.db import OpenvasDB, MainDB, ScanDB, KbDB, DBINDEX_NAME, time
33
from ospd_openvas.errors import OspdOpenvasError
34
35
from tests.helper import assert_called
36
37
38
@patch('ospd_openvas.db.redis.Redis')
39
class TestOpenvasDB(TestCase):
40
    @patch('ospd_openvas.db.Openvas')
41
    def test_get_db_connection(
42
        self, mock_openvas: MagicMock, mock_redis: MagicMock
43
    ):
44
        OpenvasDB._db_address = None  # pylint: disable=protected-access
45
        mock_settings = mock_openvas.get_settings.return_value
46
        mock_settings.get.return_value = None
47
48
        self.assertIsNone(OpenvasDB.get_database_address())
49
50
        # set the first time
51
        mock_openvas.get_settings.return_value = {'db_address': '/foo/bar'}
52
53
        self.assertEqual(OpenvasDB.get_database_address(), "/foo/bar")
54
55
        self.assertEqual(mock_openvas.get_settings.call_count, 2)
56
57
        # should cache address
58
        self.assertEqual(OpenvasDB.get_database_address(), "/foo/bar")
59
        self.assertEqual(mock_openvas.get_settings.call_count, 2)
60
61
    def test_create_context_fail(self, mock_redis):
62
        mock_redis.side_effect = RCE
63
64
        logging.Logger.error = MagicMock()
65
66
        with patch.object(time, 'sleep', return_value=None):
67
            with self.assertRaises(SystemExit):
68
                OpenvasDB.create_context()
69
70
        logging.Logger.error.assert_called_with(  # pylint: disable=no-member
71
            'Redis Error: Not possible to connect to the kb.'
72
        )
73
74
    def test_create_context_success(self, mock_redis):
75
        ctx = mock_redis.return_value
76
        ret = OpenvasDB.create_context()
77
        self.assertIs(ret, ctx)
78
79
    def test_select_database_error(self, mock_redis):
80
        with self.assertRaises(RequiredArgument):
81
            OpenvasDB.select_database(None, 1)
82
83
        with self.assertRaises(RequiredArgument):
84
            OpenvasDB.select_database(mock_redis, None)
85
86
    def test_select_database(self, mock_redis):
87
        mock_redis.execute_command.return_value = mock_redis
88
89
        OpenvasDB.select_database(mock_redis, 1)
90
91
        mock_redis.execute_command.assert_called_with('SELECT 1')
92
93
    def test_get_list_item_error(self, mock_redis):
94
        ctx = mock_redis.return_value
95
96
        with self.assertRaises(RequiredArgument):
97
            OpenvasDB.get_list_item(None, 'foo')
98
99
        with self.assertRaises(RequiredArgument):
100
            OpenvasDB.get_list_item(ctx, None)
101
102
    def test_get_list_item(self, mock_redis):
103
        ctx = mock_redis.return_value
104
        ctx.lrange.return_value = ['1234']
105
106
        ret = OpenvasDB.get_list_item(ctx, 'name')
107
108
        self.assertEqual(ret, ['1234'])
109
        assert_called(ctx.lrange)
110
111
    def test_get_last_list_item(self, mock_redis):
112
        ctx = mock_redis.return_value
113
        ctx.rpop.return_value = 'foo'
114
115
        ret = OpenvasDB.get_last_list_item(ctx, 'name')
116
117
        self.assertEqual(ret, 'foo')
118
        ctx.rpop.assert_called_with('name')
119
120
    def test_get_last_list_item_error(self, mock_redis):
121
        ctx = mock_redis.return_value
122
123
        with self.assertRaises(RequiredArgument):
124
            OpenvasDB.get_last_list_item(ctx, None)
125
126
        with self.assertRaises(RequiredArgument):
127
            OpenvasDB.get_last_list_item(None, 'name')
128
129
    def test_remove_list_item(self, mock_redis):
130
        ctx = mock_redis.return_value
131
        ctx.lrem.return_value = 1
132
133
        OpenvasDB.remove_list_item(ctx, 'name', '1234')
134
135
        ctx.lrem.assert_called_once_with('name', count=0, value='1234')
136
137
    def test_remove_list_item_error(self, mock_redis):
138
        ctx = mock_redis.return_value
139
140
        with self.assertRaises(RequiredArgument):
141
            OpenvasDB.remove_list_item(None, '1', 'bar')
142
143
        with self.assertRaises(RequiredArgument):
144
            OpenvasDB.remove_list_item(ctx, None, 'bar')
145
146
        with self.assertRaises(RequiredArgument):
147
            OpenvasDB.remove_list_item(ctx, '1', None)
148
149
    def test_get_single_item_error(self, mock_redis):
150
        ctx = mock_redis.return_value
151
152
        with self.assertRaises(RequiredArgument):
153
            OpenvasDB.get_single_item(None, 'foo')
154
155
        with self.assertRaises(RequiredArgument):
156
            OpenvasDB.get_single_item(ctx, None)
157
158
    def test_get_single_item(self, mock_redis):
159
        ctx = mock_redis.return_value
160
        ctx.lindex.return_value = 'a'
161
162
        value = OpenvasDB.get_single_item(ctx, 'a')
163
164
        self.assertEqual(value, 'a')
165
        ctx.lindex.assert_called_once_with('a', 0)
166
167
    def test_add_single_item(self, mock_redis):
168
        ctx = mock_redis.return_value
169
        ctx.rpush.return_value = 1
170
171
        OpenvasDB.add_single_item(ctx, 'a', ['12'])
172
173
        ctx.rpush.assert_called_once_with('a', '12')
174
175
    def test_add_single_item_error(self, mock_redis):
176
        ctx = mock_redis.return_value
177
178
        with self.assertRaises(RequiredArgument):
179
            OpenvasDB.add_single_item(None, '1', ['12'])
180
181
        with self.assertRaises(RequiredArgument):
182
            OpenvasDB.add_single_item(ctx, None, ['12'])
183
184
        with self.assertRaises(RequiredArgument):
185
            OpenvasDB.add_single_item(ctx, '1', None)
186
187
    def test_set_single_item_error(self, mock_redis):
188
        ctx = mock_redis.return_value
189
190
        with self.assertRaises(RequiredArgument):
191
            OpenvasDB.set_single_item(None, '1', ['12'])
192
193
        with self.assertRaises(RequiredArgument):
194
            OpenvasDB.set_single_item(ctx, None, ['12'])
195
196
        with self.assertRaises(RequiredArgument):
197
            OpenvasDB.set_single_item(ctx, '1', None)
198
199
    def test_set_single_item(self, mock_redis):
200
        ctx = mock_redis.return_value
201
        pipeline = ctx.pipeline.return_value
202
        pipeline.delete.return_value = None
203
        pipeline.rpush.return_value = None
204
        pipeline.execute.return_value = None
205
206
        OpenvasDB.set_single_item(ctx, 'foo', ['bar'])
207
208
        pipeline.delete.assert_called_once_with('foo')
209
        pipeline.rpush.assert_called_once_with('foo', 'bar')
210
        assert_called(pipeline.execute)
211
212
    def test_get_pattern(self, mock_redis):
213
        ctx = mock_redis.return_value
214
        ctx.keys.return_value = ['a', 'b']
215
        ctx.lrange.return_value = [1, 2, 3]
216
217
        ret = OpenvasDB.get_pattern(ctx, 'a')
218
219
        self.assertEqual(ret, [['a', [1, 2, 3]], ['b', [1, 2, 3]]])
220
221
    def test_get_pattern_error(self, mock_redis):
222
        ctx = mock_redis.return_value
223
224
        with self.assertRaises(RequiredArgument):
225
            OpenvasDB.get_pattern(None, 'a')
226
227
        with self.assertRaises(RequiredArgument):
228
            OpenvasDB.get_pattern(ctx, None)
229
230
    def test_get_filenames_and_oids_error(self, mock_redis):
231
        ctx = mock_redis.return_value
232
233
        with self.assertRaises(RequiredArgument):
234
            OpenvasDB.get_filenames_and_oids(None)
235
236
    def test_get_filenames_and_oids(self, mock_redis):
237
        ctx = mock_redis.return_value
238
        ctx.keys.return_value = ['nvt:1', 'nvt:2']
239
        ctx.lindex.side_effect = ['aa', 'ab']
240
241
        ret = OpenvasDB.get_filenames_and_oids(ctx)
242
243
        self.assertEqual(list(ret), [('aa', '1'), ('ab', '2')])
244
245
    def test_get_keys_by_pattern_error(self, mock_redis):
246
        ctx = mock_redis.return_value
247
248
        with self.assertRaises(RequiredArgument):
249
            OpenvasDB.get_keys_by_pattern(None, 'a')
250
251
        with self.assertRaises(RequiredArgument):
252
            OpenvasDB.get_keys_by_pattern(ctx, None)
253
254
    def test_get_keys_by_pattern(self, mock_redis):
255
        ctx = mock_redis.return_value
256
        ctx.keys.return_value = ['nvt:2', 'nvt:1']
257
258
        ret = OpenvasDB.get_keys_by_pattern(ctx, 'nvt:*')
259
260
        # Return sorted list
261
        self.assertEqual(ret, ['nvt:1', 'nvt:2'])
262
263
    def test_get_key_count(self, mock_redis):
264
        ctx = mock_redis.return_value
265
266
        ctx.keys.return_value = ['aa', 'ab']
267
268
        ret = OpenvasDB.get_key_count(ctx, "foo")
269
270
        self.assertEqual(ret, 2)
271
        ctx.keys.assert_called_with('foo')
272
273
    def test_get_key_count_with_default_pattern(self, mock_redis):
274
        ctx = mock_redis.return_value
275
276
        ctx.keys.return_value = ['aa', 'ab']
277
278
        ret = OpenvasDB.get_key_count(ctx)
279
280
        self.assertEqual(ret, 2)
281
        ctx.keys.assert_called_with('*')
282
283
    def test_get_key_count_error(self, mock_redis):
284
        with self.assertRaises(RequiredArgument):
285
            OpenvasDB.get_key_count(None)
286
287
    def test_find_database_by_pattern_none(self, mock_redis):
288
        ctx = mock_redis.return_value
289
        ctx.keys.return_value = None
290
291
        new_ctx, index = OpenvasDB.find_database_by_pattern('foo*', 123)
292
293
        self.assertIsNone(new_ctx)
294
        self.assertIsNone(index)
295
296
    def test_find_database_by_pattern(self, mock_redis):
297
        ctx = mock_redis.return_value
298
299
        # keys is called twice per iteration
300
        ctx.keys.side_effect = [None, None, None, None, True, True]
301
302
        new_ctx, index = OpenvasDB.find_database_by_pattern('foo*', 123)
303
304
        self.assertEqual(new_ctx, ctx)
305
        self.assertEqual(index, 2)
306
307
308
@patch('ospd_openvas.db.OpenvasDB')
309
class ScanDBTestCase(TestCase):
310
    @patch('ospd_openvas.db.redis.Redis')
311
    def setUp(self, mock_redis):  # pylint: disable=arguments-differ
312
        self.ctx = mock_redis.return_value
313
        self.db = ScanDB(10, self.ctx)
314
315
    def test_get_result(self, mock_openvas_db):
316
        mock_openvas_db.get_last_list_item.return_value = 'some result'
317
318
        ret = self.db.get_result()
319
320
        self.assertEqual(ret, 'some result')
321
        mock_openvas_db.get_last_list_item.assert_called_with(
322
            self.ctx, 'internal/results'
323
        )
324
325
    def test_get_status(self, mock_openvas_db):
326
        mock_openvas_db.get_single_item.return_value = 'some status'
327
328
        ret = self.db.get_status('foo')
329
330
        self.assertEqual(ret, 'some status')
331
        mock_openvas_db.get_single_item.assert_called_with(
332
            self.ctx, 'internal/foo'
333
        )
334
335
    def test_select(self, mock_openvas_db):
336
        ret = self.db.select(11)
337
338
        self.assertIs(ret, self.db)
339
        self.assertEqual(self.db.index, 11)
340
341
        mock_openvas_db.select_database.assert_called_with(self.ctx, 11)
342
343
    def test_get_scan_id(self, mock_openvas_db):
344
        mock_openvas_db.get_single_item.return_value = 'foo'
345
346
        ret = self.db.get_scan_id()
347
348
        self.assertEqual(ret, 'foo')
349
        mock_openvas_db.get_single_item.assert_called_with(
350
            self.ctx, 'internal/scan_id'
351
        )
352
353
    def test_get_scan_status(self, mock_openvas_db):
354
        mock_openvas_db.get_last_list_item.return_value = 'foo'
355
356
        ret = self.db.get_scan_status()
357
358
        self.assertEqual(ret, 'foo')
359
        mock_openvas_db.get_last_list_item.assert_called_with(
360
            self.ctx, 'internal/status'
361
        )
362
363
    def test_get_host_scan_start_time(self, mock_openvas_db):
364
        mock_openvas_db.get_last_list_item.return_value = 'some start time'
365
366
        ret = self.db.get_host_scan_start_time()
367
368
        self.assertEqual(ret, 'some start time')
369
        mock_openvas_db.get_last_list_item.assert_called_with(
370
            self.ctx, 'internal/start_time'
371
        )
372
373
    def test_get_host_scan_end_time(self, mock_openvas_db):
374
        mock_openvas_db.get_last_list_item.return_value = 'some end time'
375
376
        ret = self.db.get_host_scan_end_time()
377
378
        self.assertEqual(ret, 'some end time')
379
        mock_openvas_db.get_last_list_item.assert_called_with(
380
            self.ctx, 'internal/end_time'
381
        )
382
383
    def test_get_host_ip(self, mock_openvas_db):
384
        mock_openvas_db.get_single_item.return_value = '192.168.0.1'
385
386
        ret = self.db.get_host_ip()
387
388
        self.assertEqual(ret, '192.168.0.1')
389
        mock_openvas_db.get_single_item.assert_called_with(
390
            self.ctx, 'internal/ip'
391
        )
392
393
    def test_host_is_finished_false(self, mock_openvas_db):
394
        mock_openvas_db.get_single_item.return_value = 'foo'
395
396
        ret = self.db.host_is_finished('bar')
397
398
        self.assertFalse(ret)
399
        mock_openvas_db.get_single_item.assert_called_with(
400
            self.ctx, 'internal/bar'
401
        )
402
403
    def test_host_is_finished_true(self, mock_openvas_db):
404
        mock_openvas_db.get_single_item.return_value = 'finished'
405
406
        ret = self.db.host_is_finished('bar')
407
408
        self.assertTrue(ret)
409
        mock_openvas_db.get_single_item.assert_called_with(
410
            self.ctx, 'internal/bar'
411
        )
412
413
    def test_flush(self, mock_openvas_db):
414
        self.db.flush()
415
416
        self.ctx.flushdb.assert_called_with()
417
418
419
@patch('ospd_openvas.db.OpenvasDB')
420
class KbDBTestCase(TestCase):
421
    @patch('ospd_openvas.db.redis.Redis')
422
    def setUp(self, mock_redis):  # pylint: disable=arguments-differ
423
        self.ctx = mock_redis.return_value
424
        self.db = KbDB(10, self.ctx)
425
426
    def test_get_result(self, mock_openvas_db):
427
        mock_openvas_db.get_last_list_item.return_value = 'some result'
428
429
        ret = self.db.get_result()
430
431
        self.assertEqual(ret, 'some result')
432
        mock_openvas_db.get_last_list_item.assert_called_with(
433
            self.ctx, 'internal/results'
434
        )
435
436
    def test_get_status(self, mock_openvas_db):
437
        mock_openvas_db.get_single_item.return_value = 'some status'
438
439
        ret = self.db.get_status('foo')
440
441
        self.assertEqual(ret, 'some status')
442
        mock_openvas_db.get_single_item.assert_called_with(
443
            self.ctx, 'internal/foo'
444
        )
445
446
    def test_flush(self, mock_openvas_db):
447
        self.db.flush()
448
449
        self.ctx.flushdb.assert_called_with()
450
451
    def test_add_scan_id(self, mock_openvas_db):
452
        self.db.add_scan_id('foo', 'bar')
453
454
        calls = mock_openvas_db.add_single_item.call_args_list
455
456
        call = calls[0]
457
        kwargs = call[0]
458
459
        self.assertEqual(kwargs[1], 'internal/bar')
460
        self.assertEqual(kwargs[2], ['new'])
461
462
        call = calls[1]
463
        kwargs = call[0]
464
465
        self.assertEqual(kwargs[1], 'internal/foo/globalscanid')
466
        self.assertEqual(kwargs[2], ['bar'])
467
468
        call = calls[2]
469
        kwargs = call[0]
470
471
        self.assertEqual(kwargs[1], 'internal/scanid')
472
        self.assertEqual(kwargs[2], ['bar'])
473
474
    def test_add_scan_preferences(self, mock_openvas_db):
475
        prefs = ['foo', 'bar']
476
477
        self.db.add_scan_preferences('foo', prefs)
478
479
        mock_openvas_db.add_single_item.assert_called_with(
480
            self.ctx, 'internal/foo/scanprefs', prefs
481
        )
482
483
    def test_add_scan_process_id(self, mock_openvas_db):
484
        self.db.add_scan_process_id(123)
485
486
        mock_openvas_db.add_single_item.assert_called_with(
487
            self.ctx, 'internal/ovas_pid', [123]
488
        )
489
490
    def test_get_scan_process_id(self, mock_openvas_db):
491
        mock_openvas_db.get_single_item.return_value = '123'
492
493
        ret = self.db.get_scan_process_id()
494
495
        self.assertEqual(ret, '123')
496
        mock_openvas_db.get_single_item.assert_called_with(
497
            self.ctx, 'internal/ovas_pid'
498
        )
499
500
    def test_remove_scan_database(self, mock_openvas_db):
501
        scan_db = MagicMock(spec=ScanDB)
502
        scan_db.index = 123
503
504
        self.db.remove_scan_database(scan_db)
505
506
        mock_openvas_db.remove_list_item.assert_called_with(
507
            self.ctx, 'internal/dbindex', 123
508
        )
509
510
    def test_target_is_finished_false(self, mock_openvas_db):
511
        mock_openvas_db.get_single_item.side_effect = ['bar', 'new']
512
513
        ret = self.db.target_is_finished('foo')
514
515
        self.assertFalse(ret)
516
517
        calls = mock_openvas_db.get_single_item.call_args_list
518
519
        call = calls[0]
520
        args = call[0]
521
522
        self.assertEqual(args[1], 'internal/foo/globalscanid')
523
524
        call = calls[1]
525
        args = call[0]
526
527
        self.assertEqual(args[1], 'internal/bar')
528
529
    def test_target_is_finished_true(self, mock_openvas_db):
530
        mock_openvas_db.get_single_item.side_effect = ['bar', 'finished']
531
532
        ret = self.db.target_is_finished('foo')
533
534
        self.assertTrue(ret)
535
536
        calls = mock_openvas_db.get_single_item.call_args_list
537
538
        call = calls[0]
539
        args = call[0]
540
541
        self.assertEqual(args[1], 'internal/foo/globalscanid')
542
543
        call = calls[1]
544
        args = call[0]
545
546
        self.assertEqual(args[1], 'internal/bar')
547
548
        mock_openvas_db.get_single_item.side_effect = ['bar', None]
549
550
        ret = self.db.target_is_finished('foo')
551
552
        self.assertTrue(ret)
553
554
    def test_stop_scan(self, mock_openvas_db):
555
        self.db.stop_scan('foo')
556
557
        mock_openvas_db.set_single_item.assert_called_with(
558
            self.ctx, 'internal/foo', ['stop_all']
559
        )
560
561
    def test_scan_is_stopped_false(self, mock_openvas_db):
562
        mock_openvas_db.get_single_item.return_value = 'new'
563
564
        ret = self.db.scan_is_stopped('foo')
565
566
        self.assertFalse(ret)
567
        mock_openvas_db.get_single_item.assert_called_with(
568
            self.ctx, 'internal/foo'
569
        )
570
571
    def test_scan_is_stopped_true(self, mock_openvas_db):
572
        mock_openvas_db.get_single_item.return_value = 'stop_all'
573
574
        ret = self.db.scan_is_stopped('foo')
575
576
        self.assertTrue(ret)
577
        mock_openvas_db.get_single_item.assert_called_with(
578
            self.ctx, 'internal/foo'
579
        )
580
581
    def test_get_scan_databases(self, mock_openvas_db):
582
        mock_openvas_db.get_list_item.return_value = [
583
            '4',
584
            self.db.index,
585
            '7',
586
            '11',
587
        ]
588
589
        scan_dbs = self.db.get_scan_databases()
590
591
        scan_db = next(scan_dbs)
592
        self.assertEqual(scan_db.index, '4')
593
594
        scan_db = next(scan_dbs)
595
        self.assertEqual(scan_db.index, '7')
596
597
        scan_db = next(scan_dbs)
598
        self.assertEqual(scan_db.index, '11')
599
600
        with self.assertRaises(StopIteration):
601
            next(scan_dbs)
602
603
604
@patch('ospd_openvas.db.redis.Redis')
605
class MainDBTestCase(TestCase):
606
    def test_max_database_index_fail(self, mock_redis):
607
        ctx = mock_redis.return_value
608
        ctx.config_get.return_value = {}
609
610
        maindb = MainDB(ctx)
611
612
        with self.assertRaises(OspdOpenvasError):
613
            max_db = (  # pylint: disable=unused-variable
614
                maindb.max_database_index
615
            )
616
617
        ctx.config_get.assert_called_with('databases')
618
619
    def test_max_database_index(self, mock_redis):
620
        ctx = mock_redis.return_value
621
        ctx.config_get.return_value = {'databases': '123'}
622
623
        maindb = MainDB(ctx)
624
625
        max_db = maindb.max_database_index
626
627
        self.assertEqual(max_db, 123)
628
        ctx.config_get.assert_called_with('databases')
629
630
    def test_try_database_success(self, mock_redis):
631
        ctx = mock_redis.return_value
632
        ctx.hsetnx.return_value = 1
633
634
        maindb = MainDB(ctx)
635
636
        ret = maindb.try_database(1)
637
638
        self.assertEqual(ret, True)
639
        ctx.hsetnx.assert_called_with(DBINDEX_NAME, 1, 1)
640
641
    def test_try_database_false(self, mock_redis):
642
        ctx = mock_redis.return_value
643
        ctx.hsetnx.return_value = 0
644
645
        maindb = MainDB(ctx)
646
647
        ret = maindb.try_database(1)
648
649
        self.assertEqual(ret, False)
650
        ctx.hsetnx.assert_called_with(DBINDEX_NAME, 1, 1)
651
652
    def test_try_db_index_error(self, mock_redis):
653
        ctx = mock_redis.return_value
654
        ctx.hsetnx.side_effect = Exception
655
656
        maindb = MainDB(ctx)
657
658
        with self.assertRaises(OspdOpenvasError):
659
            maindb.try_database(1)
660
661
    def test_release_database_by_index(self, mock_redis):
662
        ctx = mock_redis.return_value
663
        ctx.hdel.return_value = 1
664
665
        maindb = MainDB(ctx)
666
667
        maindb.release_database_by_index(3)
668
669
        ctx.hdel.assert_called_once_with(DBINDEX_NAME, 3)
670
671
    def test_release_database(self, mock_redis):
672
        ctx = mock_redis.return_value
673
        ctx.hdel.return_value = 1
674
675
        db = MagicMock()
676
        db.index = 3
677
        maindb = MainDB(ctx)
678
        maindb.release_database(db)
679
680
        ctx.hdel.assert_called_once_with(DBINDEX_NAME, 3)
681
        db.flush.assert_called_with()
682
683
    def test_release(self, mock_redis):
684
        ctx = mock_redis.return_value
685
686
        maindb = MainDB(ctx)
687
        maindb.release()
688
689
        ctx.hdel.assert_called_with(DBINDEX_NAME, maindb.index)
690
        ctx.flushdb.assert_called_with()
691
692
    def test_get_new_kb_database(self, mock_redis):
693
        ctx = mock_redis.return_value
694
695
        maindb = MainDB(ctx)
696
        maindb._max_dbindex = 123  # pylint: disable=protected-access
697
698
        ctx.hsetnx.side_effect = [0, 0, 1]
699
700
        kbdb = maindb.get_new_kb_database()
701
702
        self.assertEqual(kbdb.index, 3)
703
        ctx.flushdb.assert_called_once_with()
704
705
    def test_get_new_kb_database_none(self, mock_redis):
706
        ctx = mock_redis.return_value
707
708
        maindb = MainDB(ctx)
709
        maindb._max_dbindex = 3  # pylint: disable=protected-access
710
711
        ctx.hsetnx.side_effect = [0, 0, 0]
712
713
        kbdb = maindb.get_new_kb_database()
714
715
        self.assertIsNone(kbdb)
716
        ctx.flushdb.assert_not_called()
717
718
    @patch('ospd_openvas.db.OpenvasDB')
719
    def test_find_kb_database_by_scan_id_none(
720
        self, mock_openvas_db, mock_redis
721
    ):
722
        ctx = mock_redis.return_value
723
724
        new_ctx = 'bar'  # just some object to compare
725
        mock_openvas_db.create_context.return_value = new_ctx
726
        mock_openvas_db.get_single_item.return_value = None
727
728
        maindb = MainDB(ctx)
729
        maindb._max_dbindex = 2  # pylint: disable=protected-access
730
731
        scan_id, kbdb = maindb.find_kb_database_by_scan_id('foo')
732
733
        mock_openvas_db.get_single_item.assert_called_once_with(
734
            new_ctx, 'internal/foo/globalscanid'
735
        )
736
        self.assertIsNone(scan_id)
737
        self.assertIsNone(kbdb)
738
739
    @patch('ospd_openvas.db.OpenvasDB')
740
    def test_find_kb_database_by_scan_id(self, mock_openvas_db, mock_redis):
741
        ctx = mock_redis.return_value
742
743
        new_ctx = 'bar'  # just some object to compare
744
        mock_openvas_db.create_context.return_value = new_ctx
745
        mock_openvas_db.get_single_item.side_effect = [None, 'ipsum']
746
747
        maindb = MainDB(ctx)
748
        maindb._max_dbindex = 3  # pylint: disable=protected-access
749
750
        scan_id, kbdb = maindb.find_kb_database_by_scan_id('foo')
751
752
        mock_openvas_db.get_single_item.assert_called_with(
753
            new_ctx, 'internal/foo/globalscanid'
754
        )
755
        self.assertEqual(scan_id, 'ipsum')
756
        self.assertEqual(kbdb.index, 2)
757
        self.assertIs(kbdb.ctx, new_ctx)
758