Passed
Push — master ( d88eea...1dd774 )
by Juan José
01:47 queued 10s
created

tests.test_db   F

Complexity

Total Complexity 93

Size/Duplication

Total Lines 756
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 469
dl 0
loc 756
rs 2
c 0
b 0
f 0
wmc 93

65 Methods

Rating   Name   Duplication   Size   Complexity  
A TestOpenvasDB.test_get_single_item() 0 8 1
A TestOpenvasDB.test_get_list_item() 0 8 1
A TestOpenvasDB.test_get_list_item_error() 0 8 3
A TestOpenvasDB.test_get_last_list_item() 0 8 1
A TestOpenvasDB.test_get_single_item_error() 0 8 3
A TestOpenvasDB.test_select_database_error() 0 6 3
A TestOpenvasDB.test_get_last_list_item_error() 0 8 3
A TestOpenvasDB.test_select_database() 0 6 1
A TestOpenvasDB.test_create_context_fail() 0 11 3
A TestOpenvasDB.test_remove_list_item() 0 7 1
A TestOpenvasDB.test_remove_list_item_error() 0 11 4
A TestOpenvasDB.test_get_db_connection() 0 20 1
A TestOpenvasDB.test_create_context_success() 0 4 1
A MainDBTestCase.test_try_database_success() 0 10 1
A MainDBTestCase.test_max_database_index() 0 10 1
A MainDBTestCase.test_max_database_index_fail() 0 12 2
A MainDBTestCase.test_try_database_false() 0 10 1
A MainDBTestCase.test_try_db_index_error() 0 8 2
A KbDBTestCase.test_scan_is_stopped_false() 0 8 1
A KbDBTestCase.test_stop_scan() 0 5 1
A MainDBTestCase.test_release_database() 0 11 1
A MainDBTestCase.test_release() 0 8 1
A MainDBTestCase.test_release_database_by_index() 0 9 1
A MainDBTestCase.test_get_new_kb_database() 0 12 1
A MainDBTestCase.test_get_new_kb_database_none() 0 12 1
A KbDBTestCase.test_get_scan_databases() 0 21 2
A KbDBTestCase.test_scan_is_stopped_true() 0 8 1
A KbDBTestCase.test_target_is_finished_false() 0 13 1
A TestOpenvasDB.test_get_key_count() 0 9 1
A TestOpenvasDB.test_get_filenames_and_oids() 0 8 1
A TestOpenvasDB.test_get_pattern_error() 0 8 3
A TestOpenvasDB.test_pop_list_items_no_results() 0 14 1
A TestOpenvasDB.test_get_filenames_and_oids_error() 0 5 2
A TestOpenvasDB.test_pop_list_items_with_results() 0 15 1
A TestOpenvasDB.test_add_single_item_error() 0 11 4
A TestOpenvasDB.test_add_single_list() 0 11 1
A TestOpenvasDB.test_set_single_item_error() 0 11 4
A TestOpenvasDB.test_get_pattern() 0 8 1
A KbDBTestCase.test_get_scan_status() 0 13 1
A TestOpenvasDB.test_get_keys_by_pattern_error() 0 8 3
A KbDBTestCase.test_target_is_finished_true() 0 13 1
A MainDBTestCase.test_find_kb_database_by_scan_id_none() 0 20 1
A KbDBTestCase.test_add_scan_process_id() 0 5 1
A TestOpenvasDB.test_get_key_count_error() 0 3 2
A KbDBTestCase.test_flush() 0 4 1
A MainDBTestCase.test_find_kb_database_by_scan_id() 0 18 1
A KbDBTestCase.setUp() 0 4 1
A TestOpenvasDB.test_get_keys_by_pattern() 0 8 1
A KbDBTestCase.test_remove_scan_database() 0 8 1
A KbDBTestCase.test_add_scan_id() 0 16 1
A KbDBTestCase.test_add_scan_preferences() 0 7 1
A TestOpenvasDB.test_get_key_count_with_default_pattern() 0 9 1
A ScanDBTestCase.test_get_result() 0 15 1
A KbDBTestCase.test_get_status() 0 8 1
A TestOpenvasDB.test_find_database_by_pattern() 0 10 1
A ScanDBTestCase.setUp() 0 4 1
A TestOpenvasDB.test_add_single_item() 0 7 1
A KbDBTestCase.test_get_result() 0 15 1
A TestOpenvasDB.test_set_single_item() 0 12 1
A KbDBTestCase.test_add_credentials_to_scan_preferences() 0 17 1
A KbDBTestCase.test_get_scan_process_id() 0 8 1
A TestOpenvasDB.test_find_database_by_pattern_none() 0 8 1
A ScanDBTestCase.test_get_status() 0 8 1
A ScanDBTestCase.test_select() 0 7 1
A ScanDBTestCase.test_flush() 0 4 1

How to fix   Complexity   

Complexity

Complex classes like tests.test_db often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
# pylint: disable=unused-argument
21
22
""" Unit Test for ospd-openvas """
23
24
import logging
25
26
from unittest import TestCase
27
from unittest.mock import patch, MagicMock
28
29
from redis.exceptions import ConnectionError as RCE
30
31
from ospd.errors import RequiredArgument
32
from ospd_openvas.db import OpenvasDB, MainDB, ScanDB, KbDB, DBINDEX_NAME, time
33
from ospd_openvas.errors import OspdOpenvasError
34
35
from tests.helper import assert_called
36
37
38
@patch('ospd_openvas.db.redis.Redis')
39
class TestOpenvasDB(TestCase):
40
    @patch('ospd_openvas.db.Openvas')
41
    def test_get_db_connection(
42
        self, mock_openvas: MagicMock, mock_redis: MagicMock
43
    ):
44
        OpenvasDB._db_address = None  # pylint: disable=protected-access
45
        mock_settings = mock_openvas.get_settings.return_value
46
        mock_settings.get.return_value = None
47
48
        self.assertIsNone(OpenvasDB.get_database_address())
49
50
        # set the first time
51
        mock_openvas.get_settings.return_value = {'db_address': '/foo/bar'}
52
53
        self.assertEqual(OpenvasDB.get_database_address(), "/foo/bar")
54
55
        self.assertEqual(mock_openvas.get_settings.call_count, 2)
56
57
        # should cache address
58
        self.assertEqual(OpenvasDB.get_database_address(), "/foo/bar")
59
        self.assertEqual(mock_openvas.get_settings.call_count, 2)
60
61
    def test_create_context_fail(self, mock_redis):
62
        mock_redis.side_effect = RCE
63
64
        logging.Logger.error = MagicMock()
65
66
        with patch.object(time, 'sleep', return_value=None):
67
            with self.assertRaises(SystemExit):
68
                OpenvasDB.create_context()
69
70
        logging.Logger.error.assert_called_with(  # pylint: disable=no-member
71
            'Redis Error: Not possible to connect to the kb.'
72
        )
73
74
    def test_create_context_success(self, mock_redis):
75
        ctx = mock_redis.return_value
76
        ret = OpenvasDB.create_context()
77
        self.assertIs(ret, ctx)
78
79
    def test_select_database_error(self, mock_redis):
80
        with self.assertRaises(RequiredArgument):
81
            OpenvasDB.select_database(None, 1)
82
83
        with self.assertRaises(RequiredArgument):
84
            OpenvasDB.select_database(mock_redis, None)
85
86
    def test_select_database(self, mock_redis):
87
        mock_redis.execute_command.return_value = mock_redis
88
89
        OpenvasDB.select_database(mock_redis, 1)
90
91
        mock_redis.execute_command.assert_called_with('SELECT 1')
92
93
    def test_get_list_item_error(self, mock_redis):
94
        ctx = mock_redis.return_value
95
96
        with self.assertRaises(RequiredArgument):
97
            OpenvasDB.get_list_item(None, 'foo')
98
99
        with self.assertRaises(RequiredArgument):
100
            OpenvasDB.get_list_item(ctx, None)
101
102
    def test_get_list_item(self, mock_redis):
103
        ctx = mock_redis.return_value
104
        ctx.lrange.return_value = ['1234']
105
106
        ret = OpenvasDB.get_list_item(ctx, 'name')
107
108
        self.assertEqual(ret, ['1234'])
109
        assert_called(ctx.lrange)
110
111
    def test_get_last_list_item(self, mock_redis):
112
        ctx = mock_redis.return_value
113
        ctx.rpop.return_value = 'foo'
114
115
        ret = OpenvasDB.get_last_list_item(ctx, 'name')
116
117
        self.assertEqual(ret, 'foo')
118
        ctx.rpop.assert_called_with('name')
119
120
    def test_get_last_list_item_error(self, mock_redis):
121
        ctx = mock_redis.return_value
122
123
        with self.assertRaises(RequiredArgument):
124
            OpenvasDB.get_last_list_item(ctx, None)
125
126
        with self.assertRaises(RequiredArgument):
127
            OpenvasDB.get_last_list_item(None, 'name')
128
129
    def test_remove_list_item(self, mock_redis):
130
        ctx = mock_redis.return_value
131
        ctx.lrem.return_value = 1
132
133
        OpenvasDB.remove_list_item(ctx, 'name', '1234')
134
135
        ctx.lrem.assert_called_once_with('name', count=0, value='1234')
136
137
    def test_remove_list_item_error(self, mock_redis):
138
        ctx = mock_redis.return_value
139
140
        with self.assertRaises(RequiredArgument):
141
            OpenvasDB.remove_list_item(None, '1', 'bar')
142
143
        with self.assertRaises(RequiredArgument):
144
            OpenvasDB.remove_list_item(ctx, None, 'bar')
145
146
        with self.assertRaises(RequiredArgument):
147
            OpenvasDB.remove_list_item(ctx, '1', None)
148
149
    def test_get_single_item_error(self, mock_redis):
150
        ctx = mock_redis.return_value
151
152
        with self.assertRaises(RequiredArgument):
153
            OpenvasDB.get_single_item(None, 'foo')
154
155
        with self.assertRaises(RequiredArgument):
156
            OpenvasDB.get_single_item(ctx, None)
157
158
    def test_get_single_item(self, mock_redis):
159
        ctx = mock_redis.return_value
160
        ctx.lindex.return_value = 'a'
161
162
        value = OpenvasDB.get_single_item(ctx, 'a')
163
164
        self.assertEqual(value, 'a')
165
        ctx.lindex.assert_called_once_with('a', 0)
166
167
    def test_add_single_list(self, mock_redis):
168
        ctx = mock_redis.return_value
169
        pipeline = ctx.pipeline.return_value
170
        pipeline.delete.return_value = None
171
        pipeline.execute.return_value = (None, 0)
172
173
        OpenvasDB.add_single_list(ctx, 'a', ['12', '11', '12'])
174
175
        pipeline.delete.assert_called_once_with('a')
176
        pipeline.rpush.assert_called_once_with('a', '12', '11', '12')
177
        assert_called(pipeline.execute)
178
179
    def test_add_single_item(self, mock_redis):
180
        ctx = mock_redis.return_value
181
        ctx.rpush.return_value = 1
182
183
        OpenvasDB.add_single_item(ctx, 'a', ['12', '12'])
184
185
        ctx.rpush.assert_called_once_with('a', '12')
186
187
    def test_add_single_item_error(self, mock_redis):
188
        ctx = mock_redis.return_value
189
190
        with self.assertRaises(RequiredArgument):
191
            OpenvasDB.add_single_item(None, '1', ['12'])
192
193
        with self.assertRaises(RequiredArgument):
194
            OpenvasDB.add_single_item(ctx, None, ['12'])
195
196
        with self.assertRaises(RequiredArgument):
197
            OpenvasDB.add_single_item(ctx, '1', None)
198
199
    def test_set_single_item_error(self, mock_redis):
200
        ctx = mock_redis.return_value
201
202
        with self.assertRaises(RequiredArgument):
203
            OpenvasDB.set_single_item(None, '1', ['12'])
204
205
        with self.assertRaises(RequiredArgument):
206
            OpenvasDB.set_single_item(ctx, None, ['12'])
207
208
        with self.assertRaises(RequiredArgument):
209
            OpenvasDB.set_single_item(ctx, '1', None)
210
211
    def test_pop_list_items_no_results(self, mock_redis):
212
        ctx = mock_redis.return_value
213
        pipeline = ctx.pipeline.return_value
214
        pipeline.lrange.return_value = None
215
        pipeline.delete.return_value = None
216
        pipeline.execute.return_value = (None, 0)
217
218
        ret = OpenvasDB.pop_list_items(ctx, 'foo')
219
220
        self.assertEqual(ret, [])
221
222
        pipeline.lrange.assert_called_once_with('foo', 0, -1)
223
        pipeline.delete.assert_called_once_with('foo')
224
        assert_called(pipeline.execute)
225
226
    def test_pop_list_items_with_results(self, mock_redis):
227
        ctx = mock_redis.return_value
228
        pipeline = ctx.pipeline.return_value
229
        pipeline.lrange.return_value = None
230
        pipeline.delete.return_value = None
231
        pipeline.execute.return_value = [['c', 'b', 'a'], 2]
232
233
        ret = OpenvasDB.pop_list_items(ctx, 'results')
234
235
        # reversed list
236
        self.assertEqual(ret, ['a', 'b', 'c'])
237
238
        pipeline.lrange.assert_called_once_with('results', 0, -1)
239
        pipeline.delete.assert_called_once_with('results')
240
        assert_called(pipeline.execute)
241
242
    def test_set_single_item(self, mock_redis):
243
        ctx = mock_redis.return_value
244
        pipeline = ctx.pipeline.return_value
245
        pipeline.delete.return_value = None
246
        pipeline.rpush.return_value = None
247
        pipeline.execute.return_value = None
248
249
        OpenvasDB.set_single_item(ctx, 'foo', ['bar'])
250
251
        pipeline.delete.assert_called_once_with('foo')
252
        pipeline.rpush.assert_called_once_with('foo', 'bar')
253
        assert_called(pipeline.execute)
254
255
    def test_get_pattern(self, mock_redis):
256
        ctx = mock_redis.return_value
257
        ctx.keys.return_value = ['a', 'b']
258
        ctx.lrange.return_value = [1, 2, 3]
259
260
        ret = OpenvasDB.get_pattern(ctx, 'a')
261
262
        self.assertEqual(ret, [['a', [1, 2, 3]], ['b', [1, 2, 3]]])
263
264
    def test_get_pattern_error(self, mock_redis):
265
        ctx = mock_redis.return_value
266
267
        with self.assertRaises(RequiredArgument):
268
            OpenvasDB.get_pattern(None, 'a')
269
270
        with self.assertRaises(RequiredArgument):
271
            OpenvasDB.get_pattern(ctx, None)
272
273
    def test_get_filenames_and_oids_error(self, mock_redis):
274
        ctx = mock_redis.return_value
275
276
        with self.assertRaises(RequiredArgument):
277
            OpenvasDB.get_filenames_and_oids(None)
278
279
    def test_get_filenames_and_oids(self, mock_redis):
280
        ctx = mock_redis.return_value
281
        ctx.keys.return_value = ['nvt:1', 'nvt:2']
282
        ctx.lindex.side_effect = ['aa', 'ab']
283
284
        ret = OpenvasDB.get_filenames_and_oids(ctx)
285
286
        self.assertEqual(list(ret), [('aa', '1'), ('ab', '2')])
287
288
    def test_get_keys_by_pattern_error(self, mock_redis):
289
        ctx = mock_redis.return_value
290
291
        with self.assertRaises(RequiredArgument):
292
            OpenvasDB.get_keys_by_pattern(None, 'a')
293
294
        with self.assertRaises(RequiredArgument):
295
            OpenvasDB.get_keys_by_pattern(ctx, None)
296
297
    def test_get_keys_by_pattern(self, mock_redis):
298
        ctx = mock_redis.return_value
299
        ctx.keys.return_value = ['nvt:2', 'nvt:1']
300
301
        ret = OpenvasDB.get_keys_by_pattern(ctx, 'nvt:*')
302
303
        # Return sorted list
304
        self.assertEqual(ret, ['nvt:1', 'nvt:2'])
305
306
    def test_get_key_count(self, mock_redis):
307
        ctx = mock_redis.return_value
308
309
        ctx.keys.return_value = ['aa', 'ab']
310
311
        ret = OpenvasDB.get_key_count(ctx, "foo")
312
313
        self.assertEqual(ret, 2)
314
        ctx.keys.assert_called_with('foo')
315
316
    def test_get_key_count_with_default_pattern(self, mock_redis):
317
        ctx = mock_redis.return_value
318
319
        ctx.keys.return_value = ['aa', 'ab']
320
321
        ret = OpenvasDB.get_key_count(ctx)
322
323
        self.assertEqual(ret, 2)
324
        ctx.keys.assert_called_with('*')
325
326
    def test_get_key_count_error(self, mock_redis):
327
        with self.assertRaises(RequiredArgument):
328
            OpenvasDB.get_key_count(None)
329
330
    def test_find_database_by_pattern_none(self, mock_redis):
331
        ctx = mock_redis.return_value
332
        ctx.keys.return_value = None
333
334
        new_ctx, index = OpenvasDB.find_database_by_pattern('foo*', 123)
335
336
        self.assertIsNone(new_ctx)
337
        self.assertIsNone(index)
338
339
    def test_find_database_by_pattern(self, mock_redis):
340
        ctx = mock_redis.return_value
341
342
        # keys is called twice per iteration
343
        ctx.keys.side_effect = [None, None, None, None, True, True]
344
345
        new_ctx, index = OpenvasDB.find_database_by_pattern('foo*', 123)
346
347
        self.assertEqual(new_ctx, ctx)
348
        self.assertEqual(index, 2)
349
350
351
@patch('ospd_openvas.db.OpenvasDB')
352
class ScanDBTestCase(TestCase):
353
    @patch('ospd_openvas.db.redis.Redis')
354
    def setUp(self, mock_redis):  # pylint: disable=arguments-differ
355
        self.ctx = mock_redis.return_value
356
        self.db = ScanDB(10, self.ctx)
357
358
    def test_get_result(self, mock_openvas_db):
359
        mock_openvas_db.pop_list_items.return_value = [
360
            'some result',
361
        ]
362
363
        ret = self.db.get_result()
364
365
        self.assertEqual(
366
            ret,
367
            [
368
                'some result',
369
            ],
370
        )
371
        mock_openvas_db.pop_list_items.assert_called_with(
372
            self.ctx, 'internal/results'
373
        )
374
375
    def test_get_status(self, mock_openvas_db):
376
        mock_openvas_db.get_single_item.return_value = 'some status'
377
378
        ret = self.db.get_status('foo')
379
380
        self.assertEqual(ret, 'some status')
381
        mock_openvas_db.get_single_item.assert_called_with(
382
            self.ctx, 'internal/foo'
383
        )
384
385
    def test_select(self, mock_openvas_db):
386
        ret = self.db.select(11)
387
388
        self.assertIs(ret, self.db)
389
        self.assertEqual(self.db.index, 11)
390
391
        mock_openvas_db.select_database.assert_called_with(self.ctx, 11)
392
393
    def test_flush(self, mock_openvas_db):
394
        self.db.flush()
395
396
        self.ctx.flushdb.assert_called_with()
397
398
399
@patch('ospd_openvas.db.OpenvasDB')
400
class KbDBTestCase(TestCase):
401
    @patch('ospd_openvas.db.redis.Redis')
402
    def setUp(self, mock_redis):  # pylint: disable=arguments-differ
403
        self.ctx = mock_redis.return_value
404
        self.db = KbDB(10, self.ctx)
405
406
    def test_get_result(self, mock_openvas_db):
407
        mock_openvas_db.pop_list_items.return_value = [
408
            'some results',
409
        ]
410
411
        ret = self.db.get_result()
412
413
        self.assertEqual(
414
            ret,
415
            [
416
                'some results',
417
            ],
418
        )
419
        mock_openvas_db.pop_list_items.assert_called_with(
420
            self.ctx, 'internal/results'
421
        )
422
423
    def test_get_status(self, mock_openvas_db):
424
        mock_openvas_db.get_single_item.return_value = 'some status'
425
426
        ret = self.db.get_status('foo')
427
428
        self.assertEqual(ret, 'some status')
429
        mock_openvas_db.get_single_item.assert_called_with(
430
            self.ctx, 'internal/foo'
431
        )
432
433
    def test_get_scan_status(self, mock_openvas_db):
434
        status = [
435
            '192.168.0.1/10/120',
436
            '192.168.0.2/35/120',
437
        ]
438
439
        mock_openvas_db.pop_list_items.return_value = status
440
441
        ret = self.db.get_scan_status()
442
443
        self.assertEqual(ret, status)
444
        mock_openvas_db.pop_list_items.assert_called_with(
445
            self.ctx, 'internal/status'
446
        )
447
448
    def test_flush(self, mock_openvas_db):
449
        self.db.flush()
450
451
        self.ctx.flushdb.assert_called_with()
452
453
    def test_add_scan_id(self, mock_openvas_db):
454
        self.db.add_scan_id('bar')
455
456
        calls = mock_openvas_db.add_single_item.call_args_list
457
458
        call = calls[0]
459
        kwargs = call[0]
460
461
        self.assertEqual(kwargs[1], 'internal/bar')
462
        self.assertEqual(kwargs[2], ['new'])
463
464
        call = calls[1]
465
        kwargs = call[0]
466
467
        self.assertEqual(kwargs[1], 'internal/scanid')
468
        self.assertEqual(kwargs[2], ['bar'])
469
470
    def test_add_scan_preferences(self, mock_openvas_db):
471
        prefs = ['foo', 'bar']
472
473
        self.db.add_scan_preferences('foo', prefs)
474
475
        mock_openvas_db.add_single_item.assert_called_with(
476
            self.ctx, 'internal/foo/scanprefs', prefs
477
        )
478
479
    @patch('ospd_openvas.db.OpenvasDB')
480
    def test_add_credentials_to_scan_preferences(
481
        self, mock_redis, mock_openvas_db
482
    ):
483
        prefs = ['foo', 'bar']
484
485
        ctx = mock_redis.return_value
486
        mock_openvas_db.create_context.return_value = ctx
487
488
        self.db.add_credentials_to_scan_preferences('scan_id', prefs)
489
490
        mock_openvas_db.create_context.assert_called_with(
491
            self.db.index, encoding='utf-8'
492
        )
493
494
        mock_openvas_db.add_single_item.assert_called_with(
495
            ctx, 'internal/scan_id/scanprefs', prefs
496
        )
497
498
    def test_add_scan_process_id(self, mock_openvas_db):
499
        self.db.add_scan_process_id(123)
500
501
        mock_openvas_db.add_single_item.assert_called_with(
502
            self.ctx, 'internal/ovas_pid', [123]
503
        )
504
505
    def test_get_scan_process_id(self, mock_openvas_db):
506
        mock_openvas_db.get_single_item.return_value = '123'
507
508
        ret = self.db.get_scan_process_id()
509
510
        self.assertEqual(ret, '123')
511
        mock_openvas_db.get_single_item.assert_called_with(
512
            self.ctx, 'internal/ovas_pid'
513
        )
514
515
    def test_remove_scan_database(self, mock_openvas_db):
516
        scan_db = MagicMock(spec=ScanDB)
517
        scan_db.index = 123
518
519
        self.db.remove_scan_database(scan_db)
520
521
        mock_openvas_db.remove_list_item.assert_called_with(
522
            self.ctx, 'internal/dbindex', 123
523
        )
524
525
    def test_target_is_finished_false(self, mock_openvas_db):
526
        mock_openvas_db.get_single_item.side_effect = ['new']
527
528
        ret = self.db.target_is_finished('bar')
529
530
        self.assertFalse(ret)
531
532
        calls = mock_openvas_db.get_single_item.call_args_list
533
534
        call = calls[0]
535
        args = call[0]
536
537
        self.assertEqual(args[1], 'internal/bar')
538
539
    def test_target_is_finished_true(self, mock_openvas_db):
540
        mock_openvas_db.get_single_item.side_effect = ['finished']
541
542
        ret = self.db.target_is_finished('bar')
543
544
        self.assertTrue(ret)
545
546
        calls = mock_openvas_db.get_single_item.call_args_list
547
548
        call = calls[0]
549
        args = call[0]
550
551
        self.assertEqual(args[1], 'internal/bar')
552
553
    def test_stop_scan(self, mock_openvas_db):
554
        self.db.stop_scan('foo')
555
556
        mock_openvas_db.set_single_item.assert_called_with(
557
            self.ctx, 'internal/foo', ['stop_all']
558
        )
559
560
    def test_scan_is_stopped_false(self, mock_openvas_db):
561
        mock_openvas_db.get_single_item.return_value = 'new'
562
563
        ret = self.db.scan_is_stopped('foo')
564
565
        self.assertFalse(ret)
566
        mock_openvas_db.get_single_item.assert_called_with(
567
            self.ctx, 'internal/foo'
568
        )
569
570
    def test_scan_is_stopped_true(self, mock_openvas_db):
571
        mock_openvas_db.get_single_item.return_value = 'stop_all'
572
573
        ret = self.db.scan_is_stopped('foo')
574
575
        self.assertTrue(ret)
576
        mock_openvas_db.get_single_item.assert_called_with(
577
            self.ctx, 'internal/foo'
578
        )
579
580
    def test_get_scan_databases(self, mock_openvas_db):
581
        mock_openvas_db.get_list_item.return_value = [
582
            '4',
583
            self.db.index,
584
            '7',
585
            '11',
586
        ]
587
588
        scan_dbs = self.db.get_scan_databases()
589
590
        scan_db = next(scan_dbs)
591
        self.assertEqual(scan_db.index, '4')
592
593
        scan_db = next(scan_dbs)
594
        self.assertEqual(scan_db.index, '7')
595
596
        scan_db = next(scan_dbs)
597
        self.assertEqual(scan_db.index, '11')
598
599
        with self.assertRaises(StopIteration):
600
            next(scan_dbs)
601
602
603
@patch('ospd_openvas.db.redis.Redis')
604
class MainDBTestCase(TestCase):
605
    def test_max_database_index_fail(self, mock_redis):
606
        ctx = mock_redis.return_value
607
        ctx.config_get.return_value = {}
608
609
        maindb = MainDB(ctx)
610
611
        with self.assertRaises(OspdOpenvasError):
612
            max_db = (  # pylint: disable=unused-variable
613
                maindb.max_database_index
614
            )
615
616
        ctx.config_get.assert_called_with('databases')
617
618
    def test_max_database_index(self, mock_redis):
619
        ctx = mock_redis.return_value
620
        ctx.config_get.return_value = {'databases': '123'}
621
622
        maindb = MainDB(ctx)
623
624
        max_db = maindb.max_database_index
625
626
        self.assertEqual(max_db, 123)
627
        ctx.config_get.assert_called_with('databases')
628
629
    def test_try_database_success(self, mock_redis):
630
        ctx = mock_redis.return_value
631
        ctx.hsetnx.return_value = 1
632
633
        maindb = MainDB(ctx)
634
635
        ret = maindb.try_database(1)
636
637
        self.assertEqual(ret, True)
638
        ctx.hsetnx.assert_called_with(DBINDEX_NAME, 1, 1)
639
640
    def test_try_database_false(self, mock_redis):
641
        ctx = mock_redis.return_value
642
        ctx.hsetnx.return_value = 0
643
644
        maindb = MainDB(ctx)
645
646
        ret = maindb.try_database(1)
647
648
        self.assertEqual(ret, False)
649
        ctx.hsetnx.assert_called_with(DBINDEX_NAME, 1, 1)
650
651
    def test_try_db_index_error(self, mock_redis):
652
        ctx = mock_redis.return_value
653
        ctx.hsetnx.side_effect = Exception
654
655
        maindb = MainDB(ctx)
656
657
        with self.assertRaises(OspdOpenvasError):
658
            maindb.try_database(1)
659
660
    def test_release_database_by_index(self, mock_redis):
661
        ctx = mock_redis.return_value
662
        ctx.hdel.return_value = 1
663
664
        maindb = MainDB(ctx)
665
666
        maindb.release_database_by_index(3)
667
668
        ctx.hdel.assert_called_once_with(DBINDEX_NAME, 3)
669
670
    def test_release_database(self, mock_redis):
671
        ctx = mock_redis.return_value
672
        ctx.hdel.return_value = 1
673
674
        db = MagicMock()
675
        db.index = 3
676
        maindb = MainDB(ctx)
677
        maindb.release_database(db)
678
679
        ctx.hdel.assert_called_once_with(DBINDEX_NAME, 3)
680
        db.flush.assert_called_with()
681
682
    def test_release(self, mock_redis):
683
        ctx = mock_redis.return_value
684
685
        maindb = MainDB(ctx)
686
        maindb.release()
687
688
        ctx.hdel.assert_called_with(DBINDEX_NAME, maindb.index)
689
        ctx.flushdb.assert_called_with()
690
691
    def test_get_new_kb_database(self, mock_redis):
692
        ctx = mock_redis.return_value
693
694
        maindb = MainDB(ctx)
695
        maindb._max_dbindex = 123  # pylint: disable=protected-access
696
697
        ctx.hsetnx.side_effect = [0, 0, 1]
698
699
        kbdb = maindb.get_new_kb_database()
700
701
        self.assertEqual(kbdb.index, 3)
702
        ctx.flushdb.assert_called_once_with()
703
704
    def test_get_new_kb_database_none(self, mock_redis):
705
        ctx = mock_redis.return_value
706
707
        maindb = MainDB(ctx)
708
        maindb._max_dbindex = 3  # pylint: disable=protected-access
709
710
        ctx.hsetnx.side_effect = [0, 0, 0]
711
712
        kbdb = maindb.get_new_kb_database()
713
714
        self.assertIsNone(kbdb)
715
        ctx.flushdb.assert_not_called()
716
717
    @patch('ospd_openvas.db.OpenvasDB')
718
    def test_find_kb_database_by_scan_id_none(
719
        self, mock_openvas_db, mock_redis
720
    ):
721
        ctx = mock_redis.return_value
722
723
        new_ctx = 'bar'  # just some object to compare
724
        mock_openvas_db.create_context.return_value = new_ctx
725
        mock_openvas_db.get_key_count.return_value = None
726
727
        maindb = MainDB(ctx)
728
        maindb._max_dbindex = 2  # pylint: disable=protected-access
729
730
        kbdb = maindb.find_kb_database_by_scan_id('foo')
731
732
        mock_openvas_db.get_key_count.assert_called_once_with(
733
            new_ctx, 'internal/foo'
734
        )
735
736
        self.assertIsNone(kbdb)
737
738
    @patch('ospd_openvas.db.OpenvasDB')
739
    def test_find_kb_database_by_scan_id(self, mock_openvas_db, mock_redis):
740
        ctx = mock_redis.return_value
741
742
        new_ctx = 'foo'  # just some object to compare
743
        mock_openvas_db.create_context.return_value = new_ctx
744
        mock_openvas_db.get_key_count.side_effect = [0, 1]
745
746
        maindb = MainDB(ctx)
747
        maindb._max_dbindex = 3  # pylint: disable=protected-access
748
749
        kbdb = maindb.find_kb_database_by_scan_id('foo')
750
751
        mock_openvas_db.get_key_count.assert_called_with(
752
            new_ctx, 'internal/foo'
753
        )
754
        self.assertEqual(kbdb.index, 2)
755
        self.assertIs(kbdb.ctx, new_ctx)
756