KbDBTestCase.test_scan_is_stopped_true()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 8
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 6
nop 2
dl 0
loc 8
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
# pylint: disable=unused-argument
21
22
""" Unit Test for ospd-openvas """
23
24
import logging
25
26
from unittest import TestCase
27
from unittest.mock import patch, MagicMock
28
29
from redis.exceptions import ConnectionError as RCE
30
31
from ospd.errors import RequiredArgument
32
from ospd_openvas.db import OpenvasDB, MainDB, ScanDB, KbDB, DBINDEX_NAME, time
33
from ospd_openvas.errors import OspdOpenvasError
34
35
from tests.helper import assert_called
36
37
38
@patch('ospd_openvas.db.redis.Redis')
39
class TestOpenvasDB(TestCase):
40
    @patch('ospd_openvas.db.Openvas')
41
    def test_get_db_connection(
42
        self, mock_openvas: MagicMock, mock_redis: MagicMock
43
    ):
44
        OpenvasDB._db_address = None  # pylint: disable=protected-access
45
        mock_settings = mock_openvas.get_settings.return_value
46
        mock_settings.get.return_value = None
47
48
        self.assertIsNone(OpenvasDB.get_database_address())
49
50
        # set the first time
51
        mock_openvas.get_settings.return_value = {'db_address': '/foo/bar'}
52
53
        self.assertEqual(OpenvasDB.get_database_address(), "/foo/bar")
54
55
        self.assertEqual(mock_openvas.get_settings.call_count, 2)
56
57
        # should cache address
58
        self.assertEqual(OpenvasDB.get_database_address(), "/foo/bar")
59
        self.assertEqual(mock_openvas.get_settings.call_count, 2)
60
61
    def test_create_context_fail(self, mock_redis):
62
        mock_redis.side_effect = RCE
63
64
        logging.Logger.error = MagicMock()
65
66
        with patch.object(time, 'sleep', return_value=None):
67
            with self.assertRaises(SystemExit):
68
                OpenvasDB.create_context()
69
70
        logging.Logger.error.assert_called_with(  # pylint: disable=no-member
71
            'Redis Error: Not possible to connect to the kb.'
72
        )
73
74
    def test_create_context_success(self, mock_redis):
75
        ctx = mock_redis.return_value
76
        ret = OpenvasDB.create_context()
77
        self.assertIs(ret, ctx)
78
79
    def test_select_database_error(self, mock_redis):
80
        with self.assertRaises(RequiredArgument):
81
            OpenvasDB.select_database(None, 1)
82
83
        with self.assertRaises(RequiredArgument):
84
            OpenvasDB.select_database(mock_redis, None)
85
86
    def test_select_database(self, mock_redis):
87
        mock_redis.execute_command.return_value = mock_redis
88
89
        OpenvasDB.select_database(mock_redis, 1)
90
91
        mock_redis.execute_command.assert_called_with('SELECT 1')
92
93
    def test_get_list_item_error(self, mock_redis):
94
        ctx = mock_redis.return_value
95
96
        with self.assertRaises(RequiredArgument):
97
            OpenvasDB.get_list_item(None, 'foo')
98
99
        with self.assertRaises(RequiredArgument):
100
            OpenvasDB.get_list_item(ctx, None)
101
102
    def test_get_list_item(self, mock_redis):
103
        ctx = mock_redis.return_value
104
        ctx.lrange.return_value = ['1234']
105
106
        ret = OpenvasDB.get_list_item(ctx, 'name')
107
108
        self.assertEqual(ret, ['1234'])
109
        assert_called(ctx.lrange)
110
111
    def test_get_last_list_item(self, mock_redis):
112
        ctx = mock_redis.return_value
113
        ctx.rpop.return_value = 'foo'
114
115
        ret = OpenvasDB.get_last_list_item(ctx, 'name')
116
117
        self.assertEqual(ret, 'foo')
118
        ctx.rpop.assert_called_with('name')
119
120
    def test_get_last_list_item_error(self, mock_redis):
121
        ctx = mock_redis.return_value
122
123
        with self.assertRaises(RequiredArgument):
124
            OpenvasDB.get_last_list_item(ctx, None)
125
126
        with self.assertRaises(RequiredArgument):
127
            OpenvasDB.get_last_list_item(None, 'name')
128
129
    def test_remove_list_item(self, mock_redis):
130
        ctx = mock_redis.return_value
131
        ctx.lrem.return_value = 1
132
133
        OpenvasDB.remove_list_item(ctx, 'name', '1234')
134
135
        ctx.lrem.assert_called_once_with('name', count=0, value='1234')
136
137
    def test_remove_list_item_error(self, mock_redis):
138
        ctx = mock_redis.return_value
139
140
        with self.assertRaises(RequiredArgument):
141
            OpenvasDB.remove_list_item(None, '1', 'bar')
142
143
        with self.assertRaises(RequiredArgument):
144
            OpenvasDB.remove_list_item(ctx, None, 'bar')
145
146
        with self.assertRaises(RequiredArgument):
147
            OpenvasDB.remove_list_item(ctx, '1', None)
148
149
    def test_get_single_item_error(self, mock_redis):
150
        ctx = mock_redis.return_value
151
152
        with self.assertRaises(RequiredArgument):
153
            OpenvasDB.get_single_item(None, 'foo')
154
155
        with self.assertRaises(RequiredArgument):
156
            OpenvasDB.get_single_item(ctx, None)
157
158
    def test_get_single_item(self, mock_redis):
159
        ctx = mock_redis.return_value
160
        ctx.lindex.return_value = 'a'
161
162
        value = OpenvasDB.get_single_item(ctx, 'a')
163
164
        self.assertEqual(value, 'a')
165
        ctx.lindex.assert_called_once_with('a', 0)
166
167
    def test_add_single_list(self, mock_redis):
168
        ctx = mock_redis.return_value
169
        pipeline = ctx.pipeline.return_value
170
        pipeline.delete.return_value = None
171
        pipeline.execute.return_value = (None, 0)
172
173
        OpenvasDB.add_single_list(ctx, 'a', ['12', '11', '12'])
174
175
        pipeline.delete.assert_called_once_with('a')
176
        pipeline.rpush.assert_called_once_with('a', '12', '11', '12')
177
        assert_called(pipeline.execute)
178
179
    def test_add_single_item(self, mock_redis):
180
        ctx = mock_redis.return_value
181
        ctx.rpush.return_value = 1
182
183
        OpenvasDB.add_single_item(ctx, 'a', ['12', '12'])
184
185
        ctx.rpush.assert_called_once_with('a', '12')
186
187
    def test_add_single_item_error(self, mock_redis):
188
        ctx = mock_redis.return_value
189
190
        with self.assertRaises(RequiredArgument):
191
            OpenvasDB.add_single_item(None, '1', ['12'])
192
193
        with self.assertRaises(RequiredArgument):
194
            OpenvasDB.add_single_item(ctx, None, ['12'])
195
196
        with self.assertRaises(RequiredArgument):
197
            OpenvasDB.add_single_item(ctx, '1', None)
198
199
    def test_set_single_item_error(self, mock_redis):
200
        ctx = mock_redis.return_value
201
202
        with self.assertRaises(RequiredArgument):
203
            OpenvasDB.set_single_item(None, '1', ['12'])
204
205
        with self.assertRaises(RequiredArgument):
206
            OpenvasDB.set_single_item(ctx, None, ['12'])
207
208
        with self.assertRaises(RequiredArgument):
209
            OpenvasDB.set_single_item(ctx, '1', None)
210
211
    def test_pop_list_items_no_results(self, mock_redis):
212
        ctx = mock_redis.return_value
213
        pipeline = ctx.pipeline.return_value
214
        pipeline.lrange.return_value = None
215
        pipeline.delete.return_value = None
216
        pipeline.execute.return_value = (None, 0)
217
218
        ret = OpenvasDB.pop_list_items(ctx, 'foo')
219
220
        self.assertEqual(ret, [])
221
222
        pipeline.lrange.assert_called_once_with('foo', 0, -1)
223
        pipeline.delete.assert_called_once_with('foo')
224
        assert_called(pipeline.execute)
225
226
    def test_pop_list_items_with_results(self, mock_redis):
227
        ctx = mock_redis.return_value
228
        pipeline = ctx.pipeline.return_value
229
        pipeline.lrange.return_value = None
230
        pipeline.delete.return_value = None
231
        pipeline.execute.return_value = [['c', 'b', 'a'], 2]
232
233
        ret = OpenvasDB.pop_list_items(ctx, 'results')
234
235
        # reversed list
236
        self.assertEqual(ret, ['a', 'b', 'c'])
237
238
        pipeline.lrange.assert_called_once_with('results', 0, -1)
239
        pipeline.delete.assert_called_once_with('results')
240
        assert_called(pipeline.execute)
241
242
    def test_set_single_item(self, mock_redis):
243
        ctx = mock_redis.return_value
244
        pipeline = ctx.pipeline.return_value
245
        pipeline.delete.return_value = None
246
        pipeline.rpush.return_value = None
247
        pipeline.execute.return_value = None
248
249
        OpenvasDB.set_single_item(ctx, 'foo', ['bar'])
250
251
        pipeline.delete.assert_called_once_with('foo')
252
        pipeline.rpush.assert_called_once_with('foo', 'bar')
253
        assert_called(pipeline.execute)
254
255
    def test_get_pattern(self, mock_redis):
256
        ctx = mock_redis.return_value
257
        ctx.keys.return_value = ['a', 'b']
258
        ctx.lrange.return_value = [1, 2, 3]
259
260
        ret = OpenvasDB.get_pattern(ctx, 'a')
261
262
        self.assertEqual(ret, [['a', [1, 2, 3]], ['b', [1, 2, 3]]])
263
264
    def test_get_pattern_error(self, mock_redis):
265
        ctx = mock_redis.return_value
266
267
        with self.assertRaises(RequiredArgument):
268
            OpenvasDB.get_pattern(None, 'a')
269
270
        with self.assertRaises(RequiredArgument):
271
            OpenvasDB.get_pattern(ctx, None)
272
273
    def test_get_filenames_and_oids_error(self, mock_redis):
274
        with self.assertRaises(RequiredArgument):
275
            OpenvasDB.get_filenames_and_oids(None)
276
277
    def test_get_filenames_and_oids(self, mock_redis):
278
        ctx = mock_redis.return_value
279
        ctx.keys.return_value = ['nvt:1', 'nvt:2']
280
        ctx.lindex.side_effect = ['aa', 'ab']
281
282
        ret = OpenvasDB.get_filenames_and_oids(ctx)
283
284
        self.assertEqual(list(ret), [('aa', '1'), ('ab', '2')])
285
286
    def test_get_keys_by_pattern_error(self, mock_redis):
287
        ctx = mock_redis.return_value
288
289
        with self.assertRaises(RequiredArgument):
290
            OpenvasDB.get_keys_by_pattern(None, 'a')
291
292
        with self.assertRaises(RequiredArgument):
293
            OpenvasDB.get_keys_by_pattern(ctx, None)
294
295
    def test_get_keys_by_pattern(self, mock_redis):
296
        ctx = mock_redis.return_value
297
        ctx.keys.return_value = ['nvt:2', 'nvt:1']
298
299
        ret = OpenvasDB.get_keys_by_pattern(ctx, 'nvt:*')
300
301
        # Return sorted list
302
        self.assertEqual(ret, ['nvt:1', 'nvt:2'])
303
304
    def test_get_key_count(self, mock_redis):
305
        ctx = mock_redis.return_value
306
307
        ctx.keys.return_value = ['aa', 'ab']
308
309
        ret = OpenvasDB.get_key_count(ctx, "foo")
310
311
        self.assertEqual(ret, 2)
312
        ctx.keys.assert_called_with('foo')
313
314
    def test_get_key_count_with_default_pattern(self, mock_redis):
315
        ctx = mock_redis.return_value
316
317
        ctx.keys.return_value = ['aa', 'ab']
318
319
        ret = OpenvasDB.get_key_count(ctx)
320
321
        self.assertEqual(ret, 2)
322
        ctx.keys.assert_called_with('*')
323
324
    def test_get_key_count_error(self, mock_redis):
325
        with self.assertRaises(RequiredArgument):
326
            OpenvasDB.get_key_count(None)
327
328
    def test_find_database_by_pattern_none(self, mock_redis):
329
        ctx = mock_redis.return_value
330
        ctx.keys.return_value = None
331
332
        new_ctx, index = OpenvasDB.find_database_by_pattern('foo*', 123)
333
334
        self.assertIsNone(new_ctx)
335
        self.assertIsNone(index)
336
337
    def test_find_database_by_pattern(self, mock_redis):
338
        ctx = mock_redis.return_value
339
340
        # keys is called twice per iteration
341
        ctx.keys.side_effect = [None, None, None, None, True, True]
342
343
        new_ctx, index = OpenvasDB.find_database_by_pattern('foo*', 123)
344
345
        self.assertEqual(new_ctx, ctx)
346
        self.assertEqual(index, 2)
347
348
349
@patch('ospd_openvas.db.OpenvasDB')
350
class ScanDBTestCase(TestCase):
351
    @patch('ospd_openvas.db.redis.Redis')
352
    def setUp(self, mock_redis):  # pylint: disable=arguments-differ
353
        self.ctx = mock_redis.return_value
354
        self.db = ScanDB(10, self.ctx)
355
356
    def test_get_result(self, mock_openvas_db):
357
        mock_openvas_db.pop_list_items.return_value = [
358
            'some result',
359
        ]
360
361
        ret = self.db.get_result()
362
363
        self.assertEqual(
364
            ret,
365
            [
366
                'some result',
367
            ],
368
        )
369
        mock_openvas_db.pop_list_items.assert_called_with(
370
            self.ctx, 'internal/results'
371
        )
372
373
    def test_get_status(self, mock_openvas_db):
374
        mock_openvas_db.get_single_item.return_value = 'some status'
375
376
        ret = self.db.get_status('foo')
377
378
        self.assertEqual(ret, 'some status')
379
        mock_openvas_db.get_single_item.assert_called_with(
380
            self.ctx, 'internal/foo'
381
        )
382
383
    def test_select(self, mock_openvas_db):
384
        ret = self.db.select(11)
385
386
        self.assertIs(ret, self.db)
387
        self.assertEqual(self.db.index, 11)
388
389
        mock_openvas_db.select_database.assert_called_with(self.ctx, 11)
390
391
    def test_flush(self, mock_openvas_db):
392
        self.db.flush()
393
394
        self.ctx.flushdb.assert_called_with()
395
396
397
@patch('ospd_openvas.db.OpenvasDB')
398
class KbDBTestCase(TestCase):
399
    @patch('ospd_openvas.db.redis.Redis')
400
    def setUp(self, mock_redis):  # pylint: disable=arguments-differ
401
        self.ctx = mock_redis.return_value
402
        self.db = KbDB(10, self.ctx)
403
404
    def test_get_result(self, mock_openvas_db):
405
        mock_openvas_db.pop_list_items.return_value = [
406
            'some results',
407
        ]
408
409
        ret = self.db.get_result()
410
411
        self.assertEqual(
412
            ret,
413
            [
414
                'some results',
415
            ],
416
        )
417
        mock_openvas_db.pop_list_items.assert_called_with(
418
            self.ctx, 'internal/results'
419
        )
420
421
    def test_get_status(self, mock_openvas_db):
422
        mock_openvas_db.get_single_item.return_value = 'some status'
423
424
        ret = self.db.get_status('foo')
425
426
        self.assertEqual(ret, 'some status')
427
        mock_openvas_db.get_single_item.assert_called_with(
428
            self.ctx, 'internal/foo'
429
        )
430
431
    def test_get_scan_status(self, mock_openvas_db):
432
        status = [
433
            '192.168.0.1/10/120',
434
            '192.168.0.2/35/120',
435
        ]
436
437
        mock_openvas_db.pop_list_items.return_value = status
438
439
        ret = self.db.get_scan_status()
440
441
        self.assertEqual(ret, status)
442
        mock_openvas_db.pop_list_items.assert_called_with(
443
            self.ctx, 'internal/status'
444
        )
445
446
    def test_flush(self, mock_openvas_db):
447
        self.db.flush()
448
449
        self.ctx.flushdb.assert_called_with()
450
451
    def test_add_scan_id(self, mock_openvas_db):
452
        self.db.add_scan_id('bar')
453
454
        calls = mock_openvas_db.add_single_item.call_args_list
455
456
        call = calls[0]
457
        kwargs = call[0]
458
459
        self.assertEqual(kwargs[1], 'internal/bar')
460
        self.assertEqual(kwargs[2], ['new'])
461
462
        call = calls[1]
463
        kwargs = call[0]
464
465
        self.assertEqual(kwargs[1], 'internal/scanid')
466
        self.assertEqual(kwargs[2], ['bar'])
467
468
    def test_add_scan_preferences(self, mock_openvas_db):
469
        prefs = ['foo', 'bar']
470
471
        self.db.add_scan_preferences('foo', prefs)
472
473
        mock_openvas_db.add_single_item.assert_called_with(
474
            self.ctx, 'internal/foo/scanprefs', prefs
475
        )
476
477
    @patch('ospd_openvas.db.OpenvasDB')
478
    def test_add_credentials_to_scan_preferences(
479
        self, mock_redis, mock_openvas_db
480
    ):
481
        prefs = ['foo', 'bar']
482
483
        ctx = mock_redis.return_value
484
        mock_openvas_db.create_context.return_value = ctx
485
486
        self.db.add_credentials_to_scan_preferences('scan_id', prefs)
487
488
        mock_openvas_db.create_context.assert_called_with(
489
            self.db.index, encoding='utf-8'
490
        )
491
492
        mock_openvas_db.add_single_item.assert_called_with(
493
            ctx, 'internal/scan_id/scanprefs', prefs
494
        )
495
496
    def test_add_scan_process_id(self, mock_openvas_db):
497
        self.db.add_scan_process_id(123)
498
499
        mock_openvas_db.add_single_item.assert_called_with(
500
            self.ctx, 'internal/ovas_pid', [123]
501
        )
502
503
    def test_get_scan_process_id(self, mock_openvas_db):
504
        mock_openvas_db.get_single_item.return_value = '123'
505
506
        ret = self.db.get_scan_process_id()
507
508
        self.assertEqual(ret, '123')
509
        mock_openvas_db.get_single_item.assert_called_with(
510
            self.ctx, 'internal/ovas_pid'
511
        )
512
513
    def test_remove_scan_database(self, mock_openvas_db):
514
        scan_db = MagicMock(spec=ScanDB)
515
        scan_db.index = 123
516
517
        self.db.remove_scan_database(scan_db)
518
519
        mock_openvas_db.remove_list_item.assert_called_with(
520
            self.ctx, 'internal/dbindex', 123
521
        )
522
523
    def test_target_is_finished_false(self, mock_openvas_db):
524
        mock_openvas_db.get_single_item.side_effect = ['new']
525
526
        ret = self.db.target_is_finished('bar')
527
528
        self.assertFalse(ret)
529
530
        calls = mock_openvas_db.get_single_item.call_args_list
531
532
        call = calls[0]
533
        args = call[0]
534
535
        self.assertEqual(args[1], 'internal/bar')
536
537
    def test_target_is_finished_true(self, mock_openvas_db):
538
        mock_openvas_db.get_single_item.side_effect = ['finished']
539
540
        ret = self.db.target_is_finished('bar')
541
542
        self.assertTrue(ret)
543
544
        calls = mock_openvas_db.get_single_item.call_args_list
545
546
        call = calls[0]
547
        args = call[0]
548
549
        self.assertEqual(args[1], 'internal/bar')
550
551
    def test_stop_scan(self, mock_openvas_db):
552
        self.db.stop_scan('foo')
553
554
        mock_openvas_db.set_single_item.assert_called_with(
555
            self.ctx, 'internal/foo', ['stop_all']
556
        )
557
558
    def test_scan_is_stopped_false(self, mock_openvas_db):
559
        mock_openvas_db.get_single_item.return_value = 'new'
560
561
        ret = self.db.scan_is_stopped('foo')
562
563
        self.assertFalse(ret)
564
        mock_openvas_db.get_single_item.assert_called_with(
565
            self.ctx, 'internal/foo'
566
        )
567
568
    def test_scan_is_stopped_true(self, mock_openvas_db):
569
        mock_openvas_db.get_single_item.return_value = 'stop_all'
570
571
        ret = self.db.scan_is_stopped('foo')
572
573
        self.assertTrue(ret)
574
        mock_openvas_db.get_single_item.assert_called_with(
575
            self.ctx, 'internal/foo'
576
        )
577
578
    def test_get_scan_databases(self, mock_openvas_db):
579
        mock_openvas_db.get_list_item.return_value = [
580
            '4',
581
            self.db.index,
582
            '7',
583
            '11',
584
        ]
585
586
        scan_dbs = self.db.get_scan_databases()
587
588
        scan_db = next(scan_dbs)
589
        self.assertEqual(scan_db.index, '4')
590
591
        scan_db = next(scan_dbs)
592
        self.assertEqual(scan_db.index, '7')
593
594
        scan_db = next(scan_dbs)
595
        self.assertEqual(scan_db.index, '11')
596
597
        with self.assertRaises(StopIteration):
598
            next(scan_dbs)
599
600
601
@patch('ospd_openvas.db.redis.Redis')
602
class MainDBTestCase(TestCase):
603
    def test_max_database_index_fail(self, mock_redis):
604
        ctx = mock_redis.return_value
605
        ctx.config_get.return_value = {}
606
607
        maindb = MainDB(ctx)
608
609
        with self.assertRaises(OspdOpenvasError):
610
            max_db = (  # pylint: disable=unused-variable
611
                maindb.max_database_index
612
            )
613
614
        ctx.config_get.assert_called_with('databases')
615
616
    def test_max_database_index(self, mock_redis):
617
        ctx = mock_redis.return_value
618
        ctx.config_get.return_value = {'databases': '123'}
619
620
        maindb = MainDB(ctx)
621
622
        max_db = maindb.max_database_index
623
624
        self.assertEqual(max_db, 123)
625
        ctx.config_get.assert_called_with('databases')
626
627
    def test_try_database_success(self, mock_redis):
628
        ctx = mock_redis.return_value
629
        ctx.hsetnx.return_value = 1
630
631
        maindb = MainDB(ctx)
632
633
        ret = maindb.try_database(1)
634
635
        self.assertEqual(ret, True)
636
        ctx.hsetnx.assert_called_with(DBINDEX_NAME, 1, 1)
637
638
    def test_try_database_false(self, mock_redis):
639
        ctx = mock_redis.return_value
640
        ctx.hsetnx.return_value = 0
641
642
        maindb = MainDB(ctx)
643
644
        ret = maindb.try_database(1)
645
646
        self.assertEqual(ret, False)
647
        ctx.hsetnx.assert_called_with(DBINDEX_NAME, 1, 1)
648
649
    def test_try_db_index_error(self, mock_redis):
650
        ctx = mock_redis.return_value
651
        ctx.hsetnx.side_effect = Exception
652
653
        maindb = MainDB(ctx)
654
655
        with self.assertRaises(OspdOpenvasError):
656
            maindb.try_database(1)
657
658
    def test_release_database_by_index(self, mock_redis):
659
        ctx = mock_redis.return_value
660
        ctx.hdel.return_value = 1
661
662
        maindb = MainDB(ctx)
663
664
        maindb.release_database_by_index(3)
665
666
        ctx.hdel.assert_called_once_with(DBINDEX_NAME, 3)
667
668
    def test_release_database(self, mock_redis):
669
        ctx = mock_redis.return_value
670
        ctx.hdel.return_value = 1
671
672
        db = MagicMock()
673
        db.index = 3
674
        maindb = MainDB(ctx)
675
        maindb.release_database(db)
676
677
        ctx.hdel.assert_called_once_with(DBINDEX_NAME, 3)
678
        db.flush.assert_called_with()
679
680
    def test_release(self, mock_redis):
681
        ctx = mock_redis.return_value
682
683
        maindb = MainDB(ctx)
684
        maindb.release()
685
686
        ctx.hdel.assert_called_with(DBINDEX_NAME, maindb.index)
687
        ctx.flushdb.assert_called_with()
688
689
    def test_get_new_kb_database(self, mock_redis):
690
        ctx = mock_redis.return_value
691
692
        maindb = MainDB(ctx)
693
        maindb._max_dbindex = 123  # pylint: disable=protected-access
694
695
        ctx.hsetnx.side_effect = [0, 0, 1]
696
697
        kbdb = maindb.get_new_kb_database()
698
699
        self.assertEqual(kbdb.index, 3)
700
        ctx.flushdb.assert_called_once_with()
701
702
    def test_get_new_kb_database_none(self, mock_redis):
703
        ctx = mock_redis.return_value
704
705
        maindb = MainDB(ctx)
706
        maindb._max_dbindex = 3  # pylint: disable=protected-access
707
708
        ctx.hsetnx.side_effect = [0, 0, 0]
709
710
        kbdb = maindb.get_new_kb_database()
711
712
        self.assertIsNone(kbdb)
713
        ctx.flushdb.assert_not_called()
714
715
    @patch('ospd_openvas.db.OpenvasDB')
716
    def test_find_kb_database_by_scan_id_none(
717
        self, mock_openvas_db, mock_redis
718
    ):
719
        ctx = mock_redis.return_value
720
721
        new_ctx = 'bar'  # just some object to compare
722
        mock_openvas_db.create_context.return_value = new_ctx
723
        mock_openvas_db.get_key_count.return_value = None
724
725
        maindb = MainDB(ctx)
726
        maindb._max_dbindex = 2  # pylint: disable=protected-access
727
728
        kbdb = maindb.find_kb_database_by_scan_id('foo')
729
730
        mock_openvas_db.get_key_count.assert_called_once_with(
731
            new_ctx, 'internal/foo'
732
        )
733
734
        self.assertIsNone(kbdb)
735
736
    @patch('ospd_openvas.db.OpenvasDB')
737
    def test_find_kb_database_by_scan_id(self, mock_openvas_db, mock_redis):
738
        ctx = mock_redis.return_value
739
740
        new_ctx = 'foo'  # just some object to compare
741
        mock_openvas_db.create_context.return_value = new_ctx
742
        mock_openvas_db.get_key_count.side_effect = [0, 1]
743
744
        maindb = MainDB(ctx)
745
        maindb._max_dbindex = 3  # pylint: disable=protected-access
746
747
        kbdb = maindb.find_kb_database_by_scan_id('foo')
748
749
        mock_openvas_db.get_key_count.assert_called_with(
750
            new_ctx, 'internal/foo'
751
        )
752
        self.assertEqual(kbdb.index, 2)
753
        self.assertIs(kbdb.ctx, new_ctx)
754