Passed
Pull Request — master (#285)
by Juan José
01:23
created

tests.test_db.ScanDBTestCase.test_get_host_ip()   A

Complexity

Conditions 1

Size

Total Lines 8
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 6
nop 2
dl 0
loc 8
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
# pylint: disable=unused-argument
21
22
""" Unit Test for ospd-openvas """
23
24
import logging
25
26
from unittest import TestCase
27
from unittest.mock import patch, MagicMock
28
29
from redis.exceptions import ConnectionError as RCE
30
31
from ospd.errors import RequiredArgument
32
from ospd_openvas.db import OpenvasDB, MainDB, ScanDB, KbDB, DBINDEX_NAME, time
33
from ospd_openvas.errors import OspdOpenvasError
34
35
from tests.helper import assert_called
36
37
38
@patch('ospd_openvas.db.redis.Redis')
39
class TestOpenvasDB(TestCase):
40
    @patch('ospd_openvas.db.Openvas')
41
    def test_get_db_connection(
42
        self, mock_openvas: MagicMock, mock_redis: MagicMock
43
    ):
44
        OpenvasDB._db_address = None  # pylint: disable=protected-access
45
        mock_settings = mock_openvas.get_settings.return_value
46
        mock_settings.get.return_value = None
47
48
        self.assertIsNone(OpenvasDB.get_database_address())
49
50
        # set the first time
51
        mock_openvas.get_settings.return_value = {'db_address': '/foo/bar'}
52
53
        self.assertEqual(OpenvasDB.get_database_address(), "/foo/bar")
54
55
        self.assertEqual(mock_openvas.get_settings.call_count, 2)
56
57
        # should cache address
58
        self.assertEqual(OpenvasDB.get_database_address(), "/foo/bar")
59
        self.assertEqual(mock_openvas.get_settings.call_count, 2)
60
61
    def test_create_context_fail(self, mock_redis):
62
        mock_redis.side_effect = RCE
63
64
        logging.Logger.error = MagicMock()
65
66
        with patch.object(time, 'sleep', return_value=None):
67
            with self.assertRaises(SystemExit):
68
                OpenvasDB.create_context()
69
70
        logging.Logger.error.assert_called_with(  # pylint: disable=no-member
71
            'Redis Error: Not possible to connect to the kb.'
72
        )
73
74
    def test_create_context_success(self, mock_redis):
75
        ctx = mock_redis.return_value
76
        ret = OpenvasDB.create_context()
77
        self.assertIs(ret, ctx)
78
79
    def test_select_database_error(self, mock_redis):
80
        with self.assertRaises(RequiredArgument):
81
            OpenvasDB.select_database(None, 1)
82
83
        with self.assertRaises(RequiredArgument):
84
            OpenvasDB.select_database(mock_redis, None)
85
86
    def test_select_database(self, mock_redis):
87
        mock_redis.execute_command.return_value = mock_redis
88
89
        OpenvasDB.select_database(mock_redis, 1)
90
91
        mock_redis.execute_command.assert_called_with('SELECT 1')
92
93
    def test_get_list_item_error(self, mock_redis):
94
        ctx = mock_redis.return_value
95
96
        with self.assertRaises(RequiredArgument):
97
            OpenvasDB.get_list_item(None, 'foo')
98
99
        with self.assertRaises(RequiredArgument):
100
            OpenvasDB.get_list_item(ctx, None)
101
102
    def test_get_list_item(self, mock_redis):
103
        ctx = mock_redis.return_value
104
        ctx.lrange.return_value = ['1234']
105
106
        ret = OpenvasDB.get_list_item(ctx, 'name')
107
108
        self.assertEqual(ret, ['1234'])
109
        assert_called(ctx.lrange)
110
111
    def test_get_last_list_item(self, mock_redis):
112
        ctx = mock_redis.return_value
113
        ctx.rpop.return_value = 'foo'
114
115
        ret = OpenvasDB.get_last_list_item(ctx, 'name')
116
117
        self.assertEqual(ret, 'foo')
118
        ctx.rpop.assert_called_with('name')
119
120
    def test_get_last_list_item_error(self, mock_redis):
121
        ctx = mock_redis.return_value
122
123
        with self.assertRaises(RequiredArgument):
124
            OpenvasDB.get_last_list_item(ctx, None)
125
126
        with self.assertRaises(RequiredArgument):
127
            OpenvasDB.get_last_list_item(None, 'name')
128
129
    def test_remove_list_item(self, mock_redis):
130
        ctx = mock_redis.return_value
131
        ctx.lrem.return_value = 1
132
133
        OpenvasDB.remove_list_item(ctx, 'name', '1234')
134
135
        ctx.lrem.assert_called_once_with('name', count=0, value='1234')
136
137
    def test_remove_list_item_error(self, mock_redis):
138
        ctx = mock_redis.return_value
139
140
        with self.assertRaises(RequiredArgument):
141
            OpenvasDB.remove_list_item(None, '1', 'bar')
142
143
        with self.assertRaises(RequiredArgument):
144
            OpenvasDB.remove_list_item(ctx, None, 'bar')
145
146
        with self.assertRaises(RequiredArgument):
147
            OpenvasDB.remove_list_item(ctx, '1', None)
148
149
    def test_get_single_item_error(self, mock_redis):
150
        ctx = mock_redis.return_value
151
152
        with self.assertRaises(RequiredArgument):
153
            OpenvasDB.get_single_item(None, 'foo')
154
155
        with self.assertRaises(RequiredArgument):
156
            OpenvasDB.get_single_item(ctx, None)
157
158
    def test_get_single_item(self, mock_redis):
159
        ctx = mock_redis.return_value
160
        ctx.lindex.return_value = 'a'
161
162
        value = OpenvasDB.get_single_item(ctx, 'a')
163
164
        self.assertEqual(value, 'a')
165
        ctx.lindex.assert_called_once_with('a', 0)
166
167
    def test_add_single_item(self, mock_redis):
168
        ctx = mock_redis.return_value
169
        ctx.rpush.return_value = 1
170
171
        OpenvasDB.add_single_item(ctx, 'a', ['12'])
172
173
        ctx.rpush.assert_called_once_with('a', '12')
174
175
    def test_add_single_item_error(self, mock_redis):
176
        ctx = mock_redis.return_value
177
178
        with self.assertRaises(RequiredArgument):
179
            OpenvasDB.add_single_item(None, '1', ['12'])
180
181
        with self.assertRaises(RequiredArgument):
182
            OpenvasDB.add_single_item(ctx, None, ['12'])
183
184
        with self.assertRaises(RequiredArgument):
185
            OpenvasDB.add_single_item(ctx, '1', None)
186
187
    def test_set_single_item_error(self, mock_redis):
188
        ctx = mock_redis.return_value
189
190
        with self.assertRaises(RequiredArgument):
191
            OpenvasDB.set_single_item(None, '1', ['12'])
192
193
        with self.assertRaises(RequiredArgument):
194
            OpenvasDB.set_single_item(ctx, None, ['12'])
195
196
        with self.assertRaises(RequiredArgument):
197
            OpenvasDB.set_single_item(ctx, '1', None)
198
199
    def test_pop_list_items_no_results(self, mock_redis):
200
        ctx = mock_redis.return_value
201
        pipeline = ctx.pipeline.return_value
202
        pipeline.lrange.return_value = None
203
        pipeline.delete.return_value = None
204
        pipeline.execute.return_value = (None, 0)
205
206
        ret = OpenvasDB.pop_list_items(ctx, 'foo')
207
208
        self.assertEqual(ret, [])
209
210
        pipeline.lrange.assert_called_once_with('foo', 0, -1)
211
        pipeline.delete.assert_called_once_with('foo')
212
        assert_called(pipeline.execute)
213
214
    def test_pop_list_items_with_results(self, mock_redis):
215
        ctx = mock_redis.return_value
216
        pipeline = ctx.pipeline.return_value
217
        pipeline.lrange.return_value = None
218
        pipeline.delete.return_value = None
219
        pipeline.execute.return_value = [['c', 'b', 'a'], 2]
220
221
        ret = OpenvasDB.pop_list_items(ctx, 'results')
222
223
        # reversed list
224
        self.assertEqual(ret, ['a', 'b', 'c'])
225
226
        pipeline.lrange.assert_called_once_with('results', 0, -1)
227
        pipeline.delete.assert_called_once_with('results')
228
        assert_called(pipeline.execute)
229
230
    def test_set_single_item(self, mock_redis):
231
        ctx = mock_redis.return_value
232
        pipeline = ctx.pipeline.return_value
233
        pipeline.delete.return_value = None
234
        pipeline.rpush.return_value = None
235
        pipeline.execute.return_value = None
236
237
        OpenvasDB.set_single_item(ctx, 'foo', ['bar'])
238
239
        pipeline.delete.assert_called_once_with('foo')
240
        pipeline.rpush.assert_called_once_with('foo', 'bar')
241
        assert_called(pipeline.execute)
242
243
    def test_get_pattern(self, mock_redis):
244
        ctx = mock_redis.return_value
245
        ctx.keys.return_value = ['a', 'b']
246
        ctx.lrange.return_value = [1, 2, 3]
247
248
        ret = OpenvasDB.get_pattern(ctx, 'a')
249
250
        self.assertEqual(ret, [['a', [1, 2, 3]], ['b', [1, 2, 3]]])
251
252
    def test_get_pattern_error(self, mock_redis):
253
        ctx = mock_redis.return_value
254
255
        with self.assertRaises(RequiredArgument):
256
            OpenvasDB.get_pattern(None, 'a')
257
258
        with self.assertRaises(RequiredArgument):
259
            OpenvasDB.get_pattern(ctx, None)
260
261
    def test_get_filenames_and_oids_error(self, mock_redis):
262
        ctx = mock_redis.return_value
263
264
        with self.assertRaises(RequiredArgument):
265
            OpenvasDB.get_filenames_and_oids(None)
266
267
    def test_get_filenames_and_oids(self, mock_redis):
268
        ctx = mock_redis.return_value
269
        ctx.keys.return_value = ['nvt:1', 'nvt:2']
270
        ctx.lindex.side_effect = ['aa', 'ab']
271
272
        ret = OpenvasDB.get_filenames_and_oids(ctx)
273
274
        self.assertEqual(list(ret), [('aa', '1'), ('ab', '2')])
275
276
    def test_get_keys_by_pattern_error(self, mock_redis):
277
        ctx = mock_redis.return_value
278
279
        with self.assertRaises(RequiredArgument):
280
            OpenvasDB.get_keys_by_pattern(None, 'a')
281
282
        with self.assertRaises(RequiredArgument):
283
            OpenvasDB.get_keys_by_pattern(ctx, None)
284
285
    def test_get_keys_by_pattern(self, mock_redis):
286
        ctx = mock_redis.return_value
287
        ctx.keys.return_value = ['nvt:2', 'nvt:1']
288
289
        ret = OpenvasDB.get_keys_by_pattern(ctx, 'nvt:*')
290
291
        # Return sorted list
292
        self.assertEqual(ret, ['nvt:1', 'nvt:2'])
293
294
    def test_get_key_count(self, mock_redis):
295
        ctx = mock_redis.return_value
296
297
        ctx.keys.return_value = ['aa', 'ab']
298
299
        ret = OpenvasDB.get_key_count(ctx, "foo")
300
301
        self.assertEqual(ret, 2)
302
        ctx.keys.assert_called_with('foo')
303
304
    def test_get_key_count_with_default_pattern(self, mock_redis):
305
        ctx = mock_redis.return_value
306
307
        ctx.keys.return_value = ['aa', 'ab']
308
309
        ret = OpenvasDB.get_key_count(ctx)
310
311
        self.assertEqual(ret, 2)
312
        ctx.keys.assert_called_with('*')
313
314
    def test_get_key_count_error(self, mock_redis):
315
        with self.assertRaises(RequiredArgument):
316
            OpenvasDB.get_key_count(None)
317
318
    def test_find_database_by_pattern_none(self, mock_redis):
319
        ctx = mock_redis.return_value
320
        ctx.keys.return_value = None
321
322
        new_ctx, index = OpenvasDB.find_database_by_pattern('foo*', 123)
323
324
        self.assertIsNone(new_ctx)
325
        self.assertIsNone(index)
326
327
    def test_find_database_by_pattern(self, mock_redis):
328
        ctx = mock_redis.return_value
329
330
        # keys is called twice per iteration
331
        ctx.keys.side_effect = [None, None, None, None, True, True]
332
333
        new_ctx, index = OpenvasDB.find_database_by_pattern('foo*', 123)
334
335
        self.assertEqual(new_ctx, ctx)
336
        self.assertEqual(index, 2)
337
338
339
@patch('ospd_openvas.db.OpenvasDB')
340
class ScanDBTestCase(TestCase):
341
    @patch('ospd_openvas.db.redis.Redis')
342
    def setUp(self, mock_redis):  # pylint: disable=arguments-differ
343
        self.ctx = mock_redis.return_value
344
        self.db = ScanDB(10, self.ctx)
345
346
    def test_get_result(self, mock_openvas_db):
347
        mock_openvas_db.pop_list_items.return_value = [
348
            'some result',
349
        ]
350
351
        ret = self.db.get_result()
352
353
        self.assertEqual(ret, ['some result',])
354
        mock_openvas_db.pop_list_items.assert_called_with(
355
            self.ctx, 'internal/results'
356
        )
357
358
    def test_get_status(self, mock_openvas_db):
359
        mock_openvas_db.get_single_item.return_value = 'some status'
360
361
        ret = self.db.get_status('foo')
362
363
        self.assertEqual(ret, 'some status')
364
        mock_openvas_db.get_single_item.assert_called_with(
365
            self.ctx, 'internal/foo'
366
        )
367
368
    def test_select(self, mock_openvas_db):
369
        ret = self.db.select(11)
370
371
        self.assertIs(ret, self.db)
372
        self.assertEqual(self.db.index, 11)
373
374
        mock_openvas_db.select_database.assert_called_with(self.ctx, 11)
375
376
    def test_flush(self, mock_openvas_db):
377
        self.db.flush()
378
379
        self.ctx.flushdb.assert_called_with()
380
381
382
@patch('ospd_openvas.db.OpenvasDB')
383
class KbDBTestCase(TestCase):
384
    @patch('ospd_openvas.db.redis.Redis')
385
    def setUp(self, mock_redis):  # pylint: disable=arguments-differ
386
        self.ctx = mock_redis.return_value
387
        self.db = KbDB(10, self.ctx)
388
389
    def test_get_result(self, mock_openvas_db):
390
        mock_openvas_db.pop_list_items.return_value = [
391
            'some results',
392
        ]
393
394
        ret = self.db.get_result()
395
396
        self.assertEqual(ret, ['some results',])
397
        mock_openvas_db.pop_list_items.assert_called_with(
398
            self.ctx, 'internal/results'
399
        )
400
401
    def test_get_status(self, mock_openvas_db):
402
        mock_openvas_db.get_single_item.return_value = 'some status'
403
404
        ret = self.db.get_status('foo')
405
406
        self.assertEqual(ret, 'some status')
407
        mock_openvas_db.get_single_item.assert_called_with(
408
            self.ctx, 'internal/foo'
409
        )
410
411
    def test_get_scan_status(self, mock_openvas_db):
412
        status = [
413
            '192.168.0.1/10/120',
414
            '192.168.0.2/35/120',
415
        ]
416
417
        mock_openvas_db.pop_list_items.return_value = status
418
419
        ret = self.db.get_scan_status()
420
421
        self.assertEqual(ret, status)
422
        mock_openvas_db.pop_list_items.assert_called_with(
423
            self.ctx, 'internal/status'
424
        )
425
426
    def test_flush(self, mock_openvas_db):
427
        self.db.flush()
428
429
        self.ctx.flushdb.assert_called_with()
430
431
    def test_add_scan_id(self, mock_openvas_db):
432
        self.db.add_scan_id('foo', 'bar')
433
434
        calls = mock_openvas_db.add_single_item.call_args_list
435
436
        call = calls[0]
437
        kwargs = call[0]
438
439
        self.assertEqual(kwargs[1], 'internal/bar')
440
        self.assertEqual(kwargs[2], ['new'])
441
442
        call = calls[1]
443
        kwargs = call[0]
444
445
        self.assertEqual(kwargs[1], 'internal/foo/globalscanid')
446
        self.assertEqual(kwargs[2], ['bar'])
447
448
        call = calls[2]
449
        kwargs = call[0]
450
451
        self.assertEqual(kwargs[1], 'internal/scanid')
452
        self.assertEqual(kwargs[2], ['bar'])
453
454
    def test_add_scan_preferences(self, mock_openvas_db):
455
        prefs = ['foo', 'bar']
456
457
        self.db.add_scan_preferences('foo', prefs)
458
459
        mock_openvas_db.add_single_item.assert_called_with(
460
            self.ctx, 'internal/foo/scanprefs', prefs
461
        )
462
463
    def test_add_scan_process_id(self, mock_openvas_db):
464
        self.db.add_scan_process_id(123)
465
466
        mock_openvas_db.add_single_item.assert_called_with(
467
            self.ctx, 'internal/ovas_pid', [123]
468
        )
469
470
    def test_get_scan_process_id(self, mock_openvas_db):
471
        mock_openvas_db.get_single_item.return_value = '123'
472
473
        ret = self.db.get_scan_process_id()
474
475
        self.assertEqual(ret, '123')
476
        mock_openvas_db.get_single_item.assert_called_with(
477
            self.ctx, 'internal/ovas_pid'
478
        )
479
480
    def test_remove_scan_database(self, mock_openvas_db):
481
        scan_db = MagicMock(spec=ScanDB)
482
        scan_db.index = 123
483
484
        self.db.remove_scan_database(scan_db)
485
486
        mock_openvas_db.remove_list_item.assert_called_with(
487
            self.ctx, 'internal/dbindex', 123
488
        )
489
490
    def test_target_is_finished_false(self, mock_openvas_db):
491
        mock_openvas_db.get_single_item.side_effect = ['bar', 'new']
492
493
        ret = self.db.target_is_finished('foo')
494
495
        self.assertFalse(ret)
496
497
        calls = mock_openvas_db.get_single_item.call_args_list
498
499
        call = calls[0]
500
        args = call[0]
501
502
        self.assertEqual(args[1], 'internal/foo/globalscanid')
503
504
        call = calls[1]
505
        args = call[0]
506
507
        self.assertEqual(args[1], 'internal/bar')
508
509
    def test_target_is_finished_true(self, mock_openvas_db):
510
        mock_openvas_db.get_single_item.side_effect = ['bar', 'finished']
511
512
        ret = self.db.target_is_finished('foo')
513
514
        self.assertTrue(ret)
515
516
        calls = mock_openvas_db.get_single_item.call_args_list
517
518
        call = calls[0]
519
        args = call[0]
520
521
        self.assertEqual(args[1], 'internal/foo/globalscanid')
522
523
        call = calls[1]
524
        args = call[0]
525
526
        self.assertEqual(args[1], 'internal/bar')
527
528
        mock_openvas_db.get_single_item.side_effect = ['bar', None]
529
530
        ret = self.db.target_is_finished('foo')
531
532
        self.assertTrue(ret)
533
534
    def test_stop_scan(self, mock_openvas_db):
535
        self.db.stop_scan('foo')
536
537
        mock_openvas_db.set_single_item.assert_called_with(
538
            self.ctx, 'internal/foo', ['stop_all']
539
        )
540
541
    def test_scan_is_stopped_false(self, mock_openvas_db):
542
        mock_openvas_db.get_single_item.return_value = 'new'
543
544
        ret = self.db.scan_is_stopped('foo')
545
546
        self.assertFalse(ret)
547
        mock_openvas_db.get_single_item.assert_called_with(
548
            self.ctx, 'internal/foo'
549
        )
550
551
    def test_scan_is_stopped_true(self, mock_openvas_db):
552
        mock_openvas_db.get_single_item.return_value = 'stop_all'
553
554
        ret = self.db.scan_is_stopped('foo')
555
556
        self.assertTrue(ret)
557
        mock_openvas_db.get_single_item.assert_called_with(
558
            self.ctx, 'internal/foo'
559
        )
560
561
    def test_get_scan_databases(self, mock_openvas_db):
562
        mock_openvas_db.get_list_item.return_value = [
563
            '4',
564
            self.db.index,
565
            '7',
566
            '11',
567
        ]
568
569
        scan_dbs = self.db.get_scan_databases()
570
571
        scan_db = next(scan_dbs)
572
        self.assertEqual(scan_db.index, '4')
573
574
        scan_db = next(scan_dbs)
575
        self.assertEqual(scan_db.index, '7')
576
577
        scan_db = next(scan_dbs)
578
        self.assertEqual(scan_db.index, '11')
579
580
        with self.assertRaises(StopIteration):
581
            next(scan_dbs)
582
583
584
@patch('ospd_openvas.db.redis.Redis')
585
class MainDBTestCase(TestCase):
586
    def test_max_database_index_fail(self, mock_redis):
587
        ctx = mock_redis.return_value
588
        ctx.config_get.return_value = {}
589
590
        maindb = MainDB(ctx)
591
592
        with self.assertRaises(OspdOpenvasError):
593
            max_db = (  # pylint: disable=unused-variable
594
                maindb.max_database_index
595
            )
596
597
        ctx.config_get.assert_called_with('databases')
598
599
    def test_max_database_index(self, mock_redis):
600
        ctx = mock_redis.return_value
601
        ctx.config_get.return_value = {'databases': '123'}
602
603
        maindb = MainDB(ctx)
604
605
        max_db = maindb.max_database_index
606
607
        self.assertEqual(max_db, 123)
608
        ctx.config_get.assert_called_with('databases')
609
610
    def test_try_database_success(self, mock_redis):
611
        ctx = mock_redis.return_value
612
        ctx.hsetnx.return_value = 1
613
614
        maindb = MainDB(ctx)
615
616
        ret = maindb.try_database(1)
617
618
        self.assertEqual(ret, True)
619
        ctx.hsetnx.assert_called_with(DBINDEX_NAME, 1, 1)
620
621
    def test_try_database_false(self, mock_redis):
622
        ctx = mock_redis.return_value
623
        ctx.hsetnx.return_value = 0
624
625
        maindb = MainDB(ctx)
626
627
        ret = maindb.try_database(1)
628
629
        self.assertEqual(ret, False)
630
        ctx.hsetnx.assert_called_with(DBINDEX_NAME, 1, 1)
631
632
    def test_try_db_index_error(self, mock_redis):
633
        ctx = mock_redis.return_value
634
        ctx.hsetnx.side_effect = Exception
635
636
        maindb = MainDB(ctx)
637
638
        with self.assertRaises(OspdOpenvasError):
639
            maindb.try_database(1)
640
641
    def test_release_database_by_index(self, mock_redis):
642
        ctx = mock_redis.return_value
643
        ctx.hdel.return_value = 1
644
645
        maindb = MainDB(ctx)
646
647
        maindb.release_database_by_index(3)
648
649
        ctx.hdel.assert_called_once_with(DBINDEX_NAME, 3)
650
651
    def test_release_database(self, mock_redis):
652
        ctx = mock_redis.return_value
653
        ctx.hdel.return_value = 1
654
655
        db = MagicMock()
656
        db.index = 3
657
        maindb = MainDB(ctx)
658
        maindb.release_database(db)
659
660
        ctx.hdel.assert_called_once_with(DBINDEX_NAME, 3)
661
        db.flush.assert_called_with()
662
663
    def test_release(self, mock_redis):
664
        ctx = mock_redis.return_value
665
666
        maindb = MainDB(ctx)
667
        maindb.release()
668
669
        ctx.hdel.assert_called_with(DBINDEX_NAME, maindb.index)
670
        ctx.flushdb.assert_called_with()
671
672
    def test_get_new_kb_database(self, mock_redis):
673
        ctx = mock_redis.return_value
674
675
        maindb = MainDB(ctx)
676
        maindb._max_dbindex = 123  # pylint: disable=protected-access
677
678
        ctx.hsetnx.side_effect = [0, 0, 1]
679
680
        kbdb = maindb.get_new_kb_database()
681
682
        self.assertEqual(kbdb.index, 3)
683
        ctx.flushdb.assert_called_once_with()
684
685
    def test_get_new_kb_database_none(self, mock_redis):
686
        ctx = mock_redis.return_value
687
688
        maindb = MainDB(ctx)
689
        maindb._max_dbindex = 3  # pylint: disable=protected-access
690
691
        ctx.hsetnx.side_effect = [0, 0, 0]
692
693
        kbdb = maindb.get_new_kb_database()
694
695
        self.assertIsNone(kbdb)
696
        ctx.flushdb.assert_not_called()
697
698
    @patch('ospd_openvas.db.OpenvasDB')
699
    def test_find_kb_database_by_scan_id_none(
700
        self, mock_openvas_db, mock_redis
701
    ):
702
        ctx = mock_redis.return_value
703
704
        new_ctx = 'bar'  # just some object to compare
705
        mock_openvas_db.create_context.return_value = new_ctx
706
        mock_openvas_db.get_single_item.return_value = None
707
708
        maindb = MainDB(ctx)
709
        maindb._max_dbindex = 2  # pylint: disable=protected-access
710
711
        scan_id, kbdb = maindb.find_kb_database_by_scan_id('foo')
712
713
        mock_openvas_db.get_single_item.assert_called_once_with(
714
            new_ctx, 'internal/foo/globalscanid'
715
        )
716
        self.assertIsNone(scan_id)
717
        self.assertIsNone(kbdb)
718
719
    @patch('ospd_openvas.db.OpenvasDB')
720
    def test_find_kb_database_by_scan_id(self, mock_openvas_db, mock_redis):
721
        ctx = mock_redis.return_value
722
723
        new_ctx = 'bar'  # just some object to compare
724
        mock_openvas_db.create_context.return_value = new_ctx
725
        mock_openvas_db.get_single_item.side_effect = [None, 'ipsum']
726
727
        maindb = MainDB(ctx)
728
        maindb._max_dbindex = 3  # pylint: disable=protected-access
729
730
        scan_id, kbdb = maindb.find_kb_database_by_scan_id('foo')
731
732
        mock_openvas_db.get_single_item.assert_called_with(
733
            new_ctx, 'internal/foo/globalscanid'
734
        )
735
        self.assertEqual(scan_id, 'ipsum')
736
        self.assertEqual(kbdb.index, 2)
737
        self.assertIs(kbdb.ctx, new_ctx)
738