Passed
Pull Request — master (#307)
by
unknown
01:05
created

KbDBTestCase.test_add_credentials_to_scan_preferences()   A

Complexity

Conditions 1

Size

Total Lines 17
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 11
nop 3
dl 0
loc 17
rs 9.85
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
# pylint: disable=unused-argument
21
22
""" Unit Test for ospd-openvas """
23
24
import logging
25
26
from unittest import TestCase
27
from unittest.mock import patch, MagicMock
28
29
from redis.exceptions import ConnectionError as RCE
30
31
from ospd.errors import RequiredArgument
32
from ospd_openvas.db import OpenvasDB, MainDB, ScanDB, KbDB, DBINDEX_NAME, time
33
from ospd_openvas.errors import OspdOpenvasError
34
35
from tests.helper import assert_called
36
37
38
@patch('ospd_openvas.db.redis.Redis')
39
class TestOpenvasDB(TestCase):
40
    @patch('ospd_openvas.db.Openvas')
41
    def test_get_db_connection(
42
        self, mock_openvas: MagicMock, mock_redis: MagicMock
43
    ):
44
        OpenvasDB._db_address = None  # pylint: disable=protected-access
45
        mock_settings = mock_openvas.get_settings.return_value
46
        mock_settings.get.return_value = None
47
48
        self.assertIsNone(OpenvasDB.get_database_address())
49
50
        # set the first time
51
        mock_openvas.get_settings.return_value = {'db_address': '/foo/bar'}
52
53
        self.assertEqual(OpenvasDB.get_database_address(), "/foo/bar")
54
55
        self.assertEqual(mock_openvas.get_settings.call_count, 2)
56
57
        # should cache address
58
        self.assertEqual(OpenvasDB.get_database_address(), "/foo/bar")
59
        self.assertEqual(mock_openvas.get_settings.call_count, 2)
60
61
    def test_create_context_fail(self, mock_redis):
62
        mock_redis.side_effect = RCE
63
64
        logging.Logger.error = MagicMock()
65
66
        with patch.object(time, 'sleep', return_value=None):
67
            with self.assertRaises(SystemExit):
68
                OpenvasDB.create_context()
69
70
        logging.Logger.error.assert_called_with(  # pylint: disable=no-member
71
            'Redis Error: Not possible to connect to the kb.'
72
        )
73
74
    def test_create_context_success(self, mock_redis):
75
        ctx = mock_redis.return_value
76
        ret = OpenvasDB.create_context()
77
        self.assertIs(ret, ctx)
78
79
    def test_select_database_error(self, mock_redis):
80
        with self.assertRaises(RequiredArgument):
81
            OpenvasDB.select_database(None, 1)
82
83
        with self.assertRaises(RequiredArgument):
84
            OpenvasDB.select_database(mock_redis, None)
85
86
    def test_select_database(self, mock_redis):
87
        mock_redis.execute_command.return_value = mock_redis
88
89
        OpenvasDB.select_database(mock_redis, 1)
90
91
        mock_redis.execute_command.assert_called_with('SELECT 1')
92
93
    def test_get_list_item_error(self, mock_redis):
94
        ctx = mock_redis.return_value
95
96
        with self.assertRaises(RequiredArgument):
97
            OpenvasDB.get_list_item(None, 'foo')
98
99
        with self.assertRaises(RequiredArgument):
100
            OpenvasDB.get_list_item(ctx, None)
101
102
    def test_get_list_item(self, mock_redis):
103
        ctx = mock_redis.return_value
104
        ctx.lrange.return_value = ['1234']
105
106
        ret = OpenvasDB.get_list_item(ctx, 'name')
107
108
        self.assertEqual(ret, ['1234'])
109
        assert_called(ctx.lrange)
110
111
    def test_get_last_list_item(self, mock_redis):
112
        ctx = mock_redis.return_value
113
        ctx.rpop.return_value = 'foo'
114
115
        ret = OpenvasDB.get_last_list_item(ctx, 'name')
116
117
        self.assertEqual(ret, 'foo')
118
        ctx.rpop.assert_called_with('name')
119
120
    def test_get_last_list_item_error(self, mock_redis):
121
        ctx = mock_redis.return_value
122
123
        with self.assertRaises(RequiredArgument):
124
            OpenvasDB.get_last_list_item(ctx, None)
125
126
        with self.assertRaises(RequiredArgument):
127
            OpenvasDB.get_last_list_item(None, 'name')
128
129
    def test_remove_list_item(self, mock_redis):
130
        ctx = mock_redis.return_value
131
        ctx.lrem.return_value = 1
132
133
        OpenvasDB.remove_list_item(ctx, 'name', '1234')
134
135
        ctx.lrem.assert_called_once_with('name', count=0, value='1234')
136
137
    def test_remove_list_item_error(self, mock_redis):
138
        ctx = mock_redis.return_value
139
140
        with self.assertRaises(RequiredArgument):
141
            OpenvasDB.remove_list_item(None, '1', 'bar')
142
143
        with self.assertRaises(RequiredArgument):
144
            OpenvasDB.remove_list_item(ctx, None, 'bar')
145
146
        with self.assertRaises(RequiredArgument):
147
            OpenvasDB.remove_list_item(ctx, '1', None)
148
149
    def test_get_single_item_error(self, mock_redis):
150
        ctx = mock_redis.return_value
151
152
        with self.assertRaises(RequiredArgument):
153
            OpenvasDB.get_single_item(None, 'foo')
154
155
        with self.assertRaises(RequiredArgument):
156
            OpenvasDB.get_single_item(ctx, None)
157
158
    def test_get_single_item(self, mock_redis):
159
        ctx = mock_redis.return_value
160
        ctx.lindex.return_value = 'a'
161
162
        value = OpenvasDB.get_single_item(ctx, 'a')
163
164
        self.assertEqual(value, 'a')
165
        ctx.lindex.assert_called_once_with('a', 0)
166
167
    def test_add_single_item(self, mock_redis):
168
        ctx = mock_redis.return_value
169
        ctx.rpush.return_value = 1
170
171
        OpenvasDB.add_single_item(ctx, 'a', ['12'])
172
173
        ctx.rpush.assert_called_once_with('a', '12')
174
175
    def test_add_single_item_error(self, mock_redis):
176
        ctx = mock_redis.return_value
177
178
        with self.assertRaises(RequiredArgument):
179
            OpenvasDB.add_single_item(None, '1', ['12'])
180
181
        with self.assertRaises(RequiredArgument):
182
            OpenvasDB.add_single_item(ctx, None, ['12'])
183
184
        with self.assertRaises(RequiredArgument):
185
            OpenvasDB.add_single_item(ctx, '1', None)
186
187
    def test_set_single_item_error(self, mock_redis):
188
        ctx = mock_redis.return_value
189
190
        with self.assertRaises(RequiredArgument):
191
            OpenvasDB.set_single_item(None, '1', ['12'])
192
193
        with self.assertRaises(RequiredArgument):
194
            OpenvasDB.set_single_item(ctx, None, ['12'])
195
196
        with self.assertRaises(RequiredArgument):
197
            OpenvasDB.set_single_item(ctx, '1', None)
198
199
    def test_pop_list_items_no_results(self, mock_redis):
200
        ctx = mock_redis.return_value
201
        pipeline = ctx.pipeline.return_value
202
        pipeline.lrange.return_value = None
203
        pipeline.delete.return_value = None
204
        pipeline.execute.return_value = (None, 0)
205
206
        ret = OpenvasDB.pop_list_items(ctx, 'foo')
207
208
        self.assertEqual(ret, [])
209
210
        pipeline.lrange.assert_called_once_with('foo', 0, -1)
211
        pipeline.delete.assert_called_once_with('foo')
212
        assert_called(pipeline.execute)
213
214
    def test_pop_list_items_with_results(self, mock_redis):
215
        ctx = mock_redis.return_value
216
        pipeline = ctx.pipeline.return_value
217
        pipeline.lrange.return_value = None
218
        pipeline.delete.return_value = None
219
        pipeline.execute.return_value = [['c', 'b', 'a'], 2]
220
221
        ret = OpenvasDB.pop_list_items(ctx, 'results')
222
223
        # reversed list
224
        self.assertEqual(ret, ['a', 'b', 'c'])
225
226
        pipeline.lrange.assert_called_once_with('results', 0, -1)
227
        pipeline.delete.assert_called_once_with('results')
228
        assert_called(pipeline.execute)
229
230
    def test_set_single_item(self, mock_redis):
231
        ctx = mock_redis.return_value
232
        pipeline = ctx.pipeline.return_value
233
        pipeline.delete.return_value = None
234
        pipeline.rpush.return_value = None
235
        pipeline.execute.return_value = None
236
237
        OpenvasDB.set_single_item(ctx, 'foo', ['bar'])
238
239
        pipeline.delete.assert_called_once_with('foo')
240
        pipeline.rpush.assert_called_once_with('foo', 'bar')
241
        assert_called(pipeline.execute)
242
243
    def test_get_pattern(self, mock_redis):
244
        ctx = mock_redis.return_value
245
        ctx.keys.return_value = ['a', 'b']
246
        ctx.lrange.return_value = [1, 2, 3]
247
248
        ret = OpenvasDB.get_pattern(ctx, 'a')
249
250
        self.assertEqual(ret, [['a', [1, 2, 3]], ['b', [1, 2, 3]]])
251
252
    def test_get_pattern_error(self, mock_redis):
253
        ctx = mock_redis.return_value
254
255
        with self.assertRaises(RequiredArgument):
256
            OpenvasDB.get_pattern(None, 'a')
257
258
        with self.assertRaises(RequiredArgument):
259
            OpenvasDB.get_pattern(ctx, None)
260
261
    def test_get_filenames_and_oids_error(self, mock_redis):
262
        ctx = mock_redis.return_value
263
264
        with self.assertRaises(RequiredArgument):
265
            OpenvasDB.get_filenames_and_oids(None)
266
267
    def test_get_filenames_and_oids(self, mock_redis):
268
        ctx = mock_redis.return_value
269
        ctx.keys.return_value = ['nvt:1', 'nvt:2']
270
        ctx.lindex.side_effect = ['aa', 'ab']
271
272
        ret = OpenvasDB.get_filenames_and_oids(ctx)
273
274
        self.assertEqual(list(ret), [('aa', '1'), ('ab', '2')])
275
276
    def test_get_keys_by_pattern_error(self, mock_redis):
277
        ctx = mock_redis.return_value
278
279
        with self.assertRaises(RequiredArgument):
280
            OpenvasDB.get_keys_by_pattern(None, 'a')
281
282
        with self.assertRaises(RequiredArgument):
283
            OpenvasDB.get_keys_by_pattern(ctx, None)
284
285
    def test_get_keys_by_pattern(self, mock_redis):
286
        ctx = mock_redis.return_value
287
        ctx.keys.return_value = ['nvt:2', 'nvt:1']
288
289
        ret = OpenvasDB.get_keys_by_pattern(ctx, 'nvt:*')
290
291
        # Return sorted list
292
        self.assertEqual(ret, ['nvt:1', 'nvt:2'])
293
294
    def test_get_key_count(self, mock_redis):
295
        ctx = mock_redis.return_value
296
297
        ctx.keys.return_value = ['aa', 'ab']
298
299
        ret = OpenvasDB.get_key_count(ctx, "foo")
300
301
        self.assertEqual(ret, 2)
302
        ctx.keys.assert_called_with('foo')
303
304
    def test_get_key_count_with_default_pattern(self, mock_redis):
305
        ctx = mock_redis.return_value
306
307
        ctx.keys.return_value = ['aa', 'ab']
308
309
        ret = OpenvasDB.get_key_count(ctx)
310
311
        self.assertEqual(ret, 2)
312
        ctx.keys.assert_called_with('*')
313
314
    def test_get_key_count_error(self, mock_redis):
315
        with self.assertRaises(RequiredArgument):
316
            OpenvasDB.get_key_count(None)
317
318
    def test_find_database_by_pattern_none(self, mock_redis):
319
        ctx = mock_redis.return_value
320
        ctx.keys.return_value = None
321
322
        new_ctx, index = OpenvasDB.find_database_by_pattern('foo*', 123)
323
324
        self.assertIsNone(new_ctx)
325
        self.assertIsNone(index)
326
327
    def test_find_database_by_pattern(self, mock_redis):
328
        ctx = mock_redis.return_value
329
330
        # keys is called twice per iteration
331
        ctx.keys.side_effect = [None, None, None, None, True, True]
332
333
        new_ctx, index = OpenvasDB.find_database_by_pattern('foo*', 123)
334
335
        self.assertEqual(new_ctx, ctx)
336
        self.assertEqual(index, 2)
337
338
339
@patch('ospd_openvas.db.OpenvasDB')
340
class ScanDBTestCase(TestCase):
341
    @patch('ospd_openvas.db.redis.Redis')
342
    def setUp(self, mock_redis):  # pylint: disable=arguments-differ
343
        self.ctx = mock_redis.return_value
344
        self.db = ScanDB(10, self.ctx)
345
346
    def test_get_result(self, mock_openvas_db):
347
        mock_openvas_db.pop_list_items.return_value = [
348
            'some result',
349
        ]
350
351
        ret = self.db.get_result()
352
353
        self.assertEqual(ret, ['some result',])
354
        mock_openvas_db.pop_list_items.assert_called_with(
355
            self.ctx, 'internal/results'
356
        )
357
358
    def test_get_status(self, mock_openvas_db):
359
        mock_openvas_db.get_single_item.return_value = 'some status'
360
361
        ret = self.db.get_status('foo')
362
363
        self.assertEqual(ret, 'some status')
364
        mock_openvas_db.get_single_item.assert_called_with(
365
            self.ctx, 'internal/foo'
366
        )
367
368
    def test_select(self, mock_openvas_db):
369
        ret = self.db.select(11)
370
371
        self.assertIs(ret, self.db)
372
        self.assertEqual(self.db.index, 11)
373
374
        mock_openvas_db.select_database.assert_called_with(self.ctx, 11)
375
376
    def test_flush(self, mock_openvas_db):
377
        self.db.flush()
378
379
        self.ctx.flushdb.assert_called_with()
380
381
382
@patch('ospd_openvas.db.OpenvasDB')
383
class KbDBTestCase(TestCase):
384
    @patch('ospd_openvas.db.redis.Redis')
385
    def setUp(self, mock_redis):  # pylint: disable=arguments-differ
386
        self.ctx = mock_redis.return_value
387
        self.db = KbDB(10, self.ctx)
388
389
    def test_get_result(self, mock_openvas_db):
390
        mock_openvas_db.pop_list_items.return_value = [
391
            'some results',
392
        ]
393
394
        ret = self.db.get_result()
395
396
        self.assertEqual(ret, ['some results',])
397
        mock_openvas_db.pop_list_items.assert_called_with(
398
            self.ctx, 'internal/results'
399
        )
400
401
    def test_get_status(self, mock_openvas_db):
402
        mock_openvas_db.get_single_item.return_value = 'some status'
403
404
        ret = self.db.get_status('foo')
405
406
        self.assertEqual(ret, 'some status')
407
        mock_openvas_db.get_single_item.assert_called_with(
408
            self.ctx, 'internal/foo'
409
        )
410
411
    def test_get_scan_status(self, mock_openvas_db):
412
        status = [
413
            '192.168.0.1/10/120',
414
            '192.168.0.2/35/120',
415
        ]
416
417
        mock_openvas_db.pop_list_items.return_value = status
418
419
        ret = self.db.get_scan_status()
420
421
        self.assertEqual(ret, status)
422
        mock_openvas_db.pop_list_items.assert_called_with(
423
            self.ctx, 'internal/status'
424
        )
425
426
    def test_flush(self, mock_openvas_db):
427
        self.db.flush()
428
429
        self.ctx.flushdb.assert_called_with()
430
431
    def test_add_scan_id(self, mock_openvas_db):
432
        self.db.add_scan_id('foo', 'bar')
433
434
        calls = mock_openvas_db.add_single_item.call_args_list
435
436
        call = calls[0]
437
        kwargs = call[0]
438
439
        self.assertEqual(kwargs[1], 'internal/bar')
440
        self.assertEqual(kwargs[2], ['new'])
441
442
        call = calls[1]
443
        kwargs = call[0]
444
445
        self.assertEqual(kwargs[1], 'internal/foo/globalscanid')
446
        self.assertEqual(kwargs[2], ['bar'])
447
448
        call = calls[2]
449
        kwargs = call[0]
450
451
        self.assertEqual(kwargs[1], 'internal/scanid')
452
        self.assertEqual(kwargs[2], ['bar'])
453
454
    def test_add_scan_preferences(self, mock_openvas_db):
455
        prefs = ['foo', 'bar']
456
457
        self.db.add_scan_preferences('foo', prefs)
458
459
        mock_openvas_db.add_single_item.assert_called_with(
460
            self.ctx, 'internal/foo/scanprefs', prefs
461
        )
462
463
    @patch('ospd_openvas.db.OpenvasDB')
464
    def test_add_credentials_to_scan_preferences(
465
        self, mock_redis, mock_openvas_db
466
    ):
467
        prefs = ['foo', 'bar']
468
469
        ctx = mock_redis.return_value
470
        mock_openvas_db.create_context.return_value = ctx
471
472
        self.db.add_credentials_to_scan_preferences('scan_id', prefs)
473
474
        mock_openvas_db.create_context.assert_called_with(
475
            self.db.index, encoding='utf-8'
476
        )
477
478
        mock_openvas_db.add_single_item.assert_called_with(
479
            ctx, 'internal/scan_id/scanprefs', prefs
480
        )
481
482
    def test_add_scan_process_id(self, mock_openvas_db):
483
        self.db.add_scan_process_id(123)
484
485
        mock_openvas_db.add_single_item.assert_called_with(
486
            self.ctx, 'internal/ovas_pid', [123]
487
        )
488
489
    def test_get_scan_process_id(self, mock_openvas_db):
490
        mock_openvas_db.get_single_item.return_value = '123'
491
492
        ret = self.db.get_scan_process_id()
493
494
        self.assertEqual(ret, '123')
495
        mock_openvas_db.get_single_item.assert_called_with(
496
            self.ctx, 'internal/ovas_pid'
497
        )
498
499
    def test_remove_scan_database(self, mock_openvas_db):
500
        scan_db = MagicMock(spec=ScanDB)
501
        scan_db.index = 123
502
503
        self.db.remove_scan_database(scan_db)
504
505
        mock_openvas_db.remove_list_item.assert_called_with(
506
            self.ctx, 'internal/dbindex', 123
507
        )
508
509
    def test_target_is_finished_false(self, mock_openvas_db):
510
        mock_openvas_db.get_single_item.side_effect = ['bar', 'new']
511
512
        ret = self.db.target_is_finished('foo')
513
514
        self.assertFalse(ret)
515
516
        calls = mock_openvas_db.get_single_item.call_args_list
517
518
        call = calls[0]
519
        args = call[0]
520
521
        self.assertEqual(args[1], 'internal/foo/globalscanid')
522
523
        call = calls[1]
524
        args = call[0]
525
526
        self.assertEqual(args[1], 'internal/bar')
527
528
    def test_target_is_finished_true(self, mock_openvas_db):
529
        mock_openvas_db.get_single_item.side_effect = ['bar', 'finished']
530
531
        ret = self.db.target_is_finished('foo')
532
533
        self.assertTrue(ret)
534
535
        calls = mock_openvas_db.get_single_item.call_args_list
536
537
        call = calls[0]
538
        args = call[0]
539
540
        self.assertEqual(args[1], 'internal/foo/globalscanid')
541
542
        call = calls[1]
543
        args = call[0]
544
545
        self.assertEqual(args[1], 'internal/bar')
546
547
        mock_openvas_db.get_single_item.side_effect = ['bar', None]
548
549
        ret = self.db.target_is_finished('foo')
550
551
        self.assertTrue(ret)
552
553
    def test_stop_scan(self, mock_openvas_db):
554
        self.db.stop_scan('foo')
555
556
        mock_openvas_db.set_single_item.assert_called_with(
557
            self.ctx, 'internal/foo', ['stop_all']
558
        )
559
560
    def test_scan_is_stopped_false(self, mock_openvas_db):
561
        mock_openvas_db.get_single_item.return_value = 'new'
562
563
        ret = self.db.scan_is_stopped('foo')
564
565
        self.assertFalse(ret)
566
        mock_openvas_db.get_single_item.assert_called_with(
567
            self.ctx, 'internal/foo'
568
        )
569
570
    def test_scan_is_stopped_true(self, mock_openvas_db):
571
        mock_openvas_db.get_single_item.return_value = 'stop_all'
572
573
        ret = self.db.scan_is_stopped('foo')
574
575
        self.assertTrue(ret)
576
        mock_openvas_db.get_single_item.assert_called_with(
577
            self.ctx, 'internal/foo'
578
        )
579
580
    def test_get_scan_databases(self, mock_openvas_db):
581
        mock_openvas_db.get_list_item.return_value = [
582
            '4',
583
            self.db.index,
584
            '7',
585
            '11',
586
        ]
587
588
        scan_dbs = self.db.get_scan_databases()
589
590
        scan_db = next(scan_dbs)
591
        self.assertEqual(scan_db.index, '4')
592
593
        scan_db = next(scan_dbs)
594
        self.assertEqual(scan_db.index, '7')
595
596
        scan_db = next(scan_dbs)
597
        self.assertEqual(scan_db.index, '11')
598
599
        with self.assertRaises(StopIteration):
600
            next(scan_dbs)
601
602
603
@patch('ospd_openvas.db.redis.Redis')
604
class MainDBTestCase(TestCase):
605
    def test_max_database_index_fail(self, mock_redis):
606
        ctx = mock_redis.return_value
607
        ctx.config_get.return_value = {}
608
609
        maindb = MainDB(ctx)
610
611
        with self.assertRaises(OspdOpenvasError):
612
            max_db = (  # pylint: disable=unused-variable
613
                maindb.max_database_index
614
            )
615
616
        ctx.config_get.assert_called_with('databases')
617
618
    def test_max_database_index(self, mock_redis):
619
        ctx = mock_redis.return_value
620
        ctx.config_get.return_value = {'databases': '123'}
621
622
        maindb = MainDB(ctx)
623
624
        max_db = maindb.max_database_index
625
626
        self.assertEqual(max_db, 123)
627
        ctx.config_get.assert_called_with('databases')
628
629
    def test_try_database_success(self, mock_redis):
630
        ctx = mock_redis.return_value
631
        ctx.hsetnx.return_value = 1
632
633
        maindb = MainDB(ctx)
634
635
        ret = maindb.try_database(1)
636
637
        self.assertEqual(ret, True)
638
        ctx.hsetnx.assert_called_with(DBINDEX_NAME, 1, 1)
639
640
    def test_try_database_false(self, mock_redis):
641
        ctx = mock_redis.return_value
642
        ctx.hsetnx.return_value = 0
643
644
        maindb = MainDB(ctx)
645
646
        ret = maindb.try_database(1)
647
648
        self.assertEqual(ret, False)
649
        ctx.hsetnx.assert_called_with(DBINDEX_NAME, 1, 1)
650
651
    def test_try_db_index_error(self, mock_redis):
652
        ctx = mock_redis.return_value
653
        ctx.hsetnx.side_effect = Exception
654
655
        maindb = MainDB(ctx)
656
657
        with self.assertRaises(OspdOpenvasError):
658
            maindb.try_database(1)
659
660
    def test_release_database_by_index(self, mock_redis):
661
        ctx = mock_redis.return_value
662
        ctx.hdel.return_value = 1
663
664
        maindb = MainDB(ctx)
665
666
        maindb.release_database_by_index(3)
667
668
        ctx.hdel.assert_called_once_with(DBINDEX_NAME, 3)
669
670
    def test_release_database(self, mock_redis):
671
        ctx = mock_redis.return_value
672
        ctx.hdel.return_value = 1
673
674
        db = MagicMock()
675
        db.index = 3
676
        maindb = MainDB(ctx)
677
        maindb.release_database(db)
678
679
        ctx.hdel.assert_called_once_with(DBINDEX_NAME, 3)
680
        db.flush.assert_called_with()
681
682
    def test_release(self, mock_redis):
683
        ctx = mock_redis.return_value
684
685
        maindb = MainDB(ctx)
686
        maindb.release()
687
688
        ctx.hdel.assert_called_with(DBINDEX_NAME, maindb.index)
689
        ctx.flushdb.assert_called_with()
690
691
    def test_get_new_kb_database(self, mock_redis):
692
        ctx = mock_redis.return_value
693
694
        maindb = MainDB(ctx)
695
        maindb._max_dbindex = 123  # pylint: disable=protected-access
696
697
        ctx.hsetnx.side_effect = [0, 0, 1]
698
699
        kbdb = maindb.get_new_kb_database()
700
701
        self.assertEqual(kbdb.index, 3)
702
        ctx.flushdb.assert_called_once_with()
703
704
    def test_get_new_kb_database_none(self, mock_redis):
705
        ctx = mock_redis.return_value
706
707
        maindb = MainDB(ctx)
708
        maindb._max_dbindex = 3  # pylint: disable=protected-access
709
710
        ctx.hsetnx.side_effect = [0, 0, 0]
711
712
        kbdb = maindb.get_new_kb_database()
713
714
        self.assertIsNone(kbdb)
715
        ctx.flushdb.assert_not_called()
716
717
    @patch('ospd_openvas.db.OpenvasDB')
718
    def test_find_kb_database_by_scan_id_none(
719
        self, mock_openvas_db, mock_redis
720
    ):
721
        ctx = mock_redis.return_value
722
723
        new_ctx = 'bar'  # just some object to compare
724
        mock_openvas_db.create_context.return_value = new_ctx
725
        mock_openvas_db.get_single_item.return_value = None
726
727
        maindb = MainDB(ctx)
728
        maindb._max_dbindex = 2  # pylint: disable=protected-access
729
730
        scan_id, kbdb = maindb.find_kb_database_by_scan_id('foo')
731
732
        mock_openvas_db.get_single_item.assert_called_once_with(
733
            new_ctx, 'internal/foo/globalscanid'
734
        )
735
        self.assertIsNone(scan_id)
736
        self.assertIsNone(kbdb)
737
738
    @patch('ospd_openvas.db.OpenvasDB')
739
    def test_find_kb_database_by_scan_id(self, mock_openvas_db, mock_redis):
740
        ctx = mock_redis.return_value
741
742
        new_ctx = 'bar'  # just some object to compare
743
        mock_openvas_db.create_context.return_value = new_ctx
744
        mock_openvas_db.get_single_item.side_effect = [None, 'ipsum']
745
746
        maindb = MainDB(ctx)
747
        maindb._max_dbindex = 3  # pylint: disable=protected-access
748
749
        scan_id, kbdb = maindb.find_kb_database_by_scan_id('foo')
750
751
        mock_openvas_db.get_single_item.assert_called_with(
752
            new_ctx, 'internal/foo/globalscanid'
753
        )
754
        self.assertEqual(scan_id, 'ipsum')
755
        self.assertEqual(kbdb.index, 2)
756
        self.assertIs(kbdb.ctx, new_ctx)
757