Passed
Pull Request — master (#411)
by Juan José
01:36
created

tests.test_preferencehandler   D

Complexity

Total Complexity 58

Size/Duplication

Total Lines 964
Duplicated Lines 3.42 %

Importance

Changes 0
Metric Value
eloc 693
dl 33
loc 964
rs 4.467
c 0
b 0
f 0
wmc 58

33 Methods

Rating   Name   Duplication   Size   Complexity  
A PreferenceHandlerTestCase.test_process_vts_not_found() 0 15 1
A PreferenceHandlerTestCase.test_set_ports() 0 14 1
A PreferenceHandlerTestCase.test_process_vts_bad_param_id() 0 13 1
B PreferenceHandlerTestCase.test_build_credentials() 0 51 1
A PreferenceHandlerTestCase.test_build_alive_test_opt() 0 24 1
A PreferenceHandlerTestCase.test_build_credentials_ssh_up() 0 21 1
A PreferenceHandlerTestCase.test_process_vts() 0 16 1
A PreferenceHandlerTestCase.test_set_target() 0 14 1
A PreferenceHandlerTestCase.test_build_alive_test_opt_empty() 0 16 1
A PreferenceHandlerTestCase.test_set_plugins_true() 0 17 1
A PreferenceHandlerTestCase.test_build_alive_test_opt_fail_1() 0 10 1
A PreferenceHandlerTestCase.test_set_plugins_false() 0 12 1
A PreferenceHandlerTestCase.test_prepare_alive_test_no_enum_no_alive_test() 0 17 2
A PreferenceHandlerTestCase.test_prepare_nvt_prefs_no_prefs() 0 10 1
A PreferenceHandlerTestCase.test_set_alive_no_setting() 16 16 2
A PreferenceHandlerTestCase.test_set_alive_pinghost() 0 28 3
A PreferenceHandlerTestCase.test_prepare_nvt_prefs() 0 18 1
A PreferenceHandlerTestCase.test_set_host_options_none() 0 14 1
C PreferenceHandlerTestCase.test_set_boreas_alive_test_with_settings() 0 90 7
A PreferenceHandlerTestCase.test_set_reverse_lookup_opt() 0 17 1
B PreferenceHandlerTestCase.test_alive_test_methods_to_bit_field() 0 73 1
A PreferenceHandlerTestCase.test_set_scan_params() 0 25 1
A PreferenceHandlerTestCase.test_set_ports_invalid() 0 10 1
A PreferenceHandlerTestCase.test_set_boreas_alive_test_enum_has_precedence() 0 19 2
A PreferenceHandlerTestCase.test_set_alive_no_invalid_alive_test_no_enum() 0 17 2
A PreferenceHandlerTestCase.test_set_alive_no_invalid_alive_test() 17 17 2
A PreferenceHandlerTestCase.test_set_credentials() 0 37 1
A PreferenceHandlerTestCase.test_set_credentials_empty() 0 14 1
D PreferenceHandlerTestCase.test_set_boreas_alive_test_not_as_enum() 0 140 10
A PreferenceHandlerTestCase.test_set_main_kbindex() 0 12 1
A PreferenceHandlerTestCase.test_prepare_alive_test_not_supplied_as_enum() 0 26 2
A PreferenceHandlerTestCase.test_set_boreas_alive_test_without_settings() 0 13 2
A PreferenceHandlerTestCase.test_set_host_options() 0 16 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like tests.test_preferencehandler often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
import logging
21
22
from unittest import TestCase
23
from unittest.mock import call, patch, Mock, MagicMock
24
from collections import OrderedDict
25
26
from ospd.vts import Vts
27
28
from tests.dummydaemon import DummyDaemon
29
from tests.helper import assert_called_once
30
31
import ospd_openvas.db
32
33
from ospd_openvas.openvas import Openvas
34
from ospd_openvas.preferencehandler import (
35
    AliveTest,
36
    BOREAS_SETTING_NAME,
37
    BOREAS_ALIVE_TEST,
38
    BOREAS_ALIVE_TEST_PORTS,
39
    PreferenceHandler,
40
    alive_test_methods_to_bit_field,
41
)
42
43
44
class PreferenceHandlerTestCase(TestCase):
45
    @patch('ospd_openvas.db.KbDB')
46
    def test_process_vts_not_found(self, mock_kb):
47
        w = DummyDaemon()
48
        logging.Logger.warning = Mock()
49
50
        vts = {
51
            '1.3.6.1.4.1.25623.1.0.100065': {'3': 'new value'},
52
            'vt_groups': ['family=debian', 'family=general'],
53
        }
54
55
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, w.nvti)
56
        w.nvti.get_nvt_metadata.return_value = None
57
        p._process_vts(vts)
58
59
        assert_called_once(logging.Logger.warning)
60
61
    def test_process_vts_bad_param_id(self):
62
        w = DummyDaemon()
63
64
        vts = {
65
            '1.3.6.1.4.1.25623.1.0.100061': {'3': 'new value'},
66
            'vt_groups': ['family=debian', 'family=general'],
67
        }
68
69
        p = PreferenceHandler('1234-1234', None, w.scan_collection, w.nvti)
70
71
        ret = p._process_vts(vts)
72
73
        self.assertFalse(ret[1])
74
75
    def test_process_vts(self):
76
        w = DummyDaemon()
77
78
        vts = {
79
            '1.3.6.1.4.1.25623.1.0.100061': {'1': 'new value'},
80
            'vt_groups': ['family=debian', 'family=general'],
81
        }
82
        vt_out = (
83
            ['1.3.6.1.4.1.25623.1.0.100061'],
84
            {'1.3.6.1.4.1.25623.1.0.100061:1:entry:Data length :': 'new value'},
85
        )
86
87
        p = PreferenceHandler('1234-1234', None, w.scan_collection, w.nvti)
88
        ret = p._process_vts(vts)
89
90
        self.assertEqual(ret, vt_out)
91
92
    @patch('ospd_openvas.db.KbDB')
93
    def test_set_plugins_false(self, mock_kb):
94
        w = DummyDaemon()
95
96
        w.scan_collection.get_vts = Mock()
97
        w.scan_collection.get_vts.return_value = {}
98
99
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, w.nvti)
100
        p.kbdb.add_scan_preferences = Mock()
101
        r = p.prepare_plugins_for_openvas()
102
103
        self.assertFalse(r)
104
105
    @patch('ospd_openvas.db.KbDB')
106
    def test_set_plugins_true(self, mock_kb):
107
        w = DummyDaemon()
108
109
        vts = {
110
            '1.3.6.1.4.1.25623.1.0.100061': {'3': 'new value'},
111
            'vt_groups': ['family=debian', 'family=general'],
112
        }
113
114
        w.scan_collection.get_vts = Mock()
115
        w.scan_collection.get_vts.return_value = vts
116
117
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, w.nvti)
118
        p.kbdb.add_scan_preferences = Mock()
119
        r = p.prepare_plugins_for_openvas()
120
121
        self.assertTrue(r)
122
123
    def test_build_credentials_ssh_up(self):
124
        w = DummyDaemon()
125
126
        cred_out = [
127
            'auth_port_ssh|||22',
128
            '1.3.6.1.4.1.25623.1.0.103591:1:entry:SSH login name:|||username',
129
            '1.3.6.1.4.1.25623.1.0.103591:3:password:SSH password (unsafe!):|||pass',
130
        ]
131
        cred_dict = {
132
            'ssh': {
133
                'type': 'up',
134
                'port': '22',
135
                'username': 'username',
136
                'password': 'pass',
137
            }
138
        }
139
        p = PreferenceHandler('1234-1234', None, w.scan_collection, None)
140
141
        ret = p.build_credentials_as_prefs(cred_dict)
142
143
        self.assertEqual(ret, cred_out)
144
145
    def test_build_credentials(self):
146
        w = DummyDaemon()
147
148
        cred_out = [
149
            '1.3.6.1.4.1.25623.1.0.105058:1:entry:ESXi login name:|||username',
150
            '1.3.6.1.4.1.25623.1.0.105058:2:password:ESXi login password:|||pass',
151
            'auth_port_ssh|||22',
152
            '1.3.6.1.4.1.25623.1.0.103591:1:entry:SSH login name:|||username',
153
            '1.3.6.1.4.1.25623.1.0.103591:2:password:SSH key passphrase:|||pass',
154
            '1.3.6.1.4.1.25623.1.0.103591:4:file:SSH private key:|||',
155
            '1.3.6.1.4.1.25623.1.0.90023:1:entry:SMB login:|||username',
156
            '1.3.6.1.4.1.25623.1.0.90023:2:password]:SMB password :|||pass',
157
            '1.3.6.1.4.1.25623.1.0.105076:1:password:SNMP Community:some comunity',
158
            '1.3.6.1.4.1.25623.1.0.105076:2:entry:SNMPv3 Username:username',
159
            '1.3.6.1.4.1.25623.1.0.105076:3:password:SNMPv3 Password:pass',
160
            '1.3.6.1.4.1.25623.1.0.105076:4:radio:SNMPv3 Authentication Algorithm:some auth algo',
161
            '1.3.6.1.4.1.25623.1.0.105076:5:password:SNMPv3 Privacy Password:privacy pass',
162
            '1.3.6.1.4.1.25623.1.0.105076:6:radio:SNMPv3 Privacy Algorithm:privacy algo',
163
        ]
164
        cred_dict = {
165
            'ssh': {
166
                'type': 'ssh',
167
                'port': '22',
168
                'username': 'username',
169
                'password': 'pass',
170
            },
171
            'smb': {'type': 'smb', 'username': 'username', 'password': 'pass'},
172
            'esxi': {
173
                'type': 'esxi',
174
                'username': 'username',
175
                'password': 'pass',
176
            },
177
            'snmp': {
178
                'type': 'snmp',
179
                'username': 'username',
180
                'password': 'pass',
181
                'community': 'some comunity',
182
                'auth_algorithm': 'some auth algo',
183
                'privacy_password': 'privacy pass',
184
                'privacy_algorithm': 'privacy algo',
185
            },
186
        }
187
188
        p = PreferenceHandler('1234-1234', None, w.scan_collection, None)
189
        ret = p.build_credentials_as_prefs(cred_dict)
190
191
        self.assertEqual(len(ret), len(cred_out))
192
        self.assertIn('auth_port_ssh|||22', cred_out)
193
        self.assertIn(
194
            '1.3.6.1.4.1.25623.1.0.90023:1:entry:SMB login:|||username',
195
            cred_out,
196
        )
197
198
    def test_build_alive_test_opt_empty(self):
199
        w = DummyDaemon()
200
201
        target_options_dict = {'alive_test': '0'}
202
203
        p = PreferenceHandler('1234-1234', None, w.scan_collection, None)
204
        ret = p.build_alive_test_opt_as_prefs(target_options_dict)
205
206
        self.assertEqual(ret, {})
207
208
        # alive test was supplied via separate xml element
209
        w = DummyDaemon()
210
        target_options_dict = {'alive_test_methods': '1', 'icmp': '0'}
211
        p = PreferenceHandler('1234-1234', None, w.scan_collection, None)
212
        ret = p.build_alive_test_opt_as_prefs(target_options_dict)
213
        self.assertEqual(ret, {})
214
215
    def test_build_alive_test_opt(self):
216
        w = DummyDaemon()
217
218
        alive_test_out = {
219
            "1.3.6.1.4.1.25623.1.0.100315:1:checkbox:Do a TCP ping": "no",
220
            "1.3.6.1.4.1.25623.1.0.100315:2:checkbox:TCP ping tries also TCP-SYN ping": "no",
221
            "1.3.6.1.4.1.25623.1.0.100315:7:checkbox:TCP ping tries only TCP-SYN ping": "no",
222
            "1.3.6.1.4.1.25623.1.0.100315:3:checkbox:Do an ICMP ping": "yes",
223
            "1.3.6.1.4.1.25623.1.0.100315:4:checkbox:Use ARP": "no",
224
            "1.3.6.1.4.1.25623.1.0.100315:5:checkbox:Mark unrechable Hosts as dead (not scanning)": "yes",
225
        }
226
227
        target_options_dict = {'alive_test': '2'}
228
        p = PreferenceHandler('1234-1234', None, w.scan_collection, None)
229
        ret = p.build_alive_test_opt_as_prefs(target_options_dict)
230
231
        self.assertEqual(ret, alive_test_out)
232
233
        # alive test was supplied via sepertae xml element
234
        w = DummyDaemon()
235
        target_options_dict = {'alive_test_methods': '1', 'icmp': '1'}
236
        p = PreferenceHandler('1234-1234', None, w.scan_collection, None)
237
        ret = p.build_alive_test_opt_as_prefs(target_options_dict)
238
        self.assertEqual(ret, alive_test_out)
239
240
    def test_build_alive_test_opt_fail_1(self):
241
        w = DummyDaemon()
242
        logging.Logger.debug = Mock()
243
244
        target_options_dict = {'alive_test': 'a'}
245
        p = PreferenceHandler('1234-1234', None, w.scan_collection, None)
246
        target_options = p.build_alive_test_opt_as_prefs(target_options_dict)
247
248
        assert_called_once(logging.Logger.debug)
249
        self.assertEqual(len(target_options), 0)
250
251
    @patch('ospd_openvas.db.KbDB')
252
    def test_set_target(self, mock_kb):
253
        w = DummyDaemon()
254
255
        w.scan_collection.get_host_list = MagicMock(return_value='192.168.0.1')
256
257
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
258
        p.scan_id = '456-789'
259
        p.kbdb.add_scan_preferences = MagicMock()
260
        p.prepare_target_for_openvas()
261
262
        p.kbdb.add_scan_preferences.assert_called_with(
263
            p.scan_id,
264
            ['TARGET|||192.168.0.1'],
265
        )
266
267
    @patch('ospd_openvas.db.KbDB')
268
    def test_set_ports(self, mock_kb):
269
        w = DummyDaemon()
270
271
        w.scan_collection.get_ports = MagicMock(return_value='80,443')
272
273
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
274
        p.scan_id = '456-789'
275
        p.kbdb.add_scan_preferences = MagicMock()
276
        p.prepare_ports_for_openvas()
277
278
        p.kbdb.add_scan_preferences.assert_called_with(
279
            p.scan_id,
280
            ['port_range|||80,443'],
281
        )
282
283
    @patch('ospd_openvas.db.KbDB')
284
    def test_set_ports_invalid(self, mock_kb):
285
        w = DummyDaemon()
286
287
        w.scan_collection.get_ports = MagicMock(return_value='2,-9,4')
288
289
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
290
        p.scan_id = '456-789'
291
        p.kbdb.add_scan_preferences = MagicMock()
292
        self.assertFalse(p.prepare_ports_for_openvas())
293
294
    @patch('ospd_openvas.db.KbDB')
295
    def test_set_main_kbindex(self, mock_kb):
296
        w = DummyDaemon()
297
298
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
299
        p.kbdb.add_scan_preferences = Mock()
300
        p.kbdb.index = 2
301
        p.prepare_main_kbindex_for_openvas()
302
303
        p.kbdb.add_scan_preferences.assert_called_with(
304
            p.scan_id,
305
            ['ov_maindbid|||2'],
306
        )
307
308
    @patch('ospd_openvas.db.KbDB')
309
    def test_set_credentials(self, mock_kb):
310
        w = DummyDaemon()
311
312
        creds = {
313
            'ssh': {
314
                'type': 'ssh',
315
                'port': '22',
316
                'username': 'username',
317
                'password': 'pass',
318
            },
319
            'smb': {'type': 'smb', 'username': 'username', 'password': 'pass'},
320
            'esxi': {
321
                'type': 'esxi',
322
                'username': 'username',
323
                'password': 'pass',
324
            },
325
            'snmp': {
326
                'type': 'snmp',
327
                'username': 'username',
328
                'password': 'pass',
329
                'community': 'some comunity',
330
                'auth_algorithm': 'some auth algo',
331
                'privacy_password': 'privacy pass',
332
                'privacy_algorithm': 'privacy algo',
333
            },
334
        }
335
336
        w.scan_collection.get_credentials = MagicMock(return_value=creds)
337
338
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
339
        p.scan_id = '456-789'
340
        p.kbdb.add_scan_preferences = MagicMock()
341
        r = p.prepare_credentials_for_openvas()
342
343
        self.assertTrue(r)
344
        assert_called_once(p.kbdb.add_scan_preferences)
345
346
    @patch('ospd_openvas.db.KbDB')
347
    def test_set_credentials(self, mock_kb):
348
        w = DummyDaemon()
349
350
        # bad cred type shh instead of ssh
351
        creds = {
352
            'shh': {
353
                'type': 'ssh',
354
                'port': '22',
355
                'username': 'username',
356
                'password': 'pass',
357
            },
358
        }
359
360
        w.scan_collection.get_credentials = MagicMock(return_value=creds)
361
362
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
363
        p.scan_id = '456-789'
364
        p.kbdb.add_scan_preferences = MagicMock()
365
        r = p.prepare_credentials_for_openvas()
366
367
        self.assertFalse(r)
368
369
    @patch('ospd_openvas.db.KbDB')
370
    def test_set_credentials_empty(self, mock_kb):
371
        w = DummyDaemon()
372
373
        creds = {}
374
375
        w.scan_collection.get_credentials = MagicMock(return_value=creds)
376
377
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
378
        p.scan_id = '456-789'
379
        p.kbdb.add_scan_preferences = MagicMock()
380
        r = p.prepare_credentials_for_openvas()
381
382
        self.assertTrue(r)
383
384
    @patch('ospd_openvas.db.KbDB')
385
    def test_set_host_options(self, mock_kb):
386
        w = DummyDaemon()
387
388
        exc = '192.168.0.1'
389
390
        w.scan_collection.get_exclude_hosts = MagicMock(return_value=exc)
391
392
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
393
        p.scan_id = '456-789'
394
        p.kbdb.add_scan_preferences = MagicMock()
395
        p.prepare_host_options_for_openvas()
396
397
        p.kbdb.add_scan_preferences.assert_called_with(
398
            p.scan_id,
399
            ['exclude_hosts|||192.168.0.1'],
400
        )
401
402
    @patch('ospd_openvas.db.KbDB')
403
    def test_set_host_options_none(self, mock_kb):
404
        w = DummyDaemon()
405
406
        exc = ''
407
408
        w.scan_collection.get_exclude_hosts = MagicMock(return_value=exc)
409
410
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
411
        p.scan_id = '456-789'
412
        p.kbdb.add_scan_preferences = MagicMock()
413
        p.prepare_host_options_for_openvas()
414
415
        p.kbdb.add_scan_preferences.assert_not_called()
416
417
    @patch('ospd_openvas.db.KbDB')
418
    def test_set_scan_params(self, mock_kb):
419
        w = DummyDaemon()
420
421
        OSPD_PARAMS_MOCK = {
422
            'drop_privileges': {
423
                'type': 'boolean',
424
                'name': 'drop_privileges',
425
                'default': 0,
426
                'mandatory': 1,
427
                'description': '',
428
            },
429
        }
430
431
        opt = {'drop_privileges': 1}
432
433
        w.scan_collection.get_options = MagicMock(return_value=opt)
434
435
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
436
        p.scan_id = '456-789'
437
        p.kbdb.add_scan_preferences = MagicMock()
438
        p.prepare_scan_params_for_openvas(OSPD_PARAMS_MOCK)
439
440
        p.kbdb.add_scan_preferences.assert_called_with(
441
            p.scan_id, ['drop_privileges|||yes']
442
        )
443
444
    @patch('ospd_openvas.db.KbDB')
445
    def test_set_reverse_lookup_opt(self, mock_kb):
446
        w = DummyDaemon()
447
448
        t_opt = {'reverse_lookup_only': 1}
449
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
450
451
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
452
        p.scan_id = '456-789'
453
        p.kbdb.add_scan_preferences = MagicMock()
454
        p.prepare_reverse_lookup_opt_for_openvas()
455
456
        p.kbdb.add_scan_preferences.assert_called_with(
457
            p.scan_id,
458
            [
459
                'reverse_lookup_only|||yes',
460
                'reverse_lookup_unify|||no',
461
            ],
462
        )
463
464
    @patch('ospd_openvas.db.KbDB')
465
    def test_set_boreas_alive_test_with_settings(self, mock_kb):
466
        # No Boreas config setting (BOREAS_SETTING_NAME) set
467
        w = DummyDaemon()
468
        ov_setting = {'not_the_correct_setting': 1}
469
        t_opt = {}
470
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
471
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
472
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
473
            p.scan_id = '456-789'
474
            p.kbdb.add_scan_preferences = MagicMock()
475
            p.prepare_boreas_alive_test()
476
477
            p.kbdb.add_scan_preferences.assert_not_called()
478
479
        # Boreas config setting set but invalid alive_test.
480
        w = DummyDaemon()
481
        t_opt = {'alive_test': "error"}
482
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
483
        ov_setting = {BOREAS_SETTING_NAME: 1}
484
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
485
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
486
            p.scan_id = '456-789'
487
            p.kbdb.add_scan_preferences = MagicMock()
488
            p.prepare_boreas_alive_test()
489
490
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||2'])]
491
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
492
493
        # ALIVE_TEST_TCP_SYN_SERVICE as alive test.
494
        w = DummyDaemon()
495
        t_opt = {'alive_test': AliveTest.ALIVE_TEST_TCP_SYN_SERVICE}
496
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
497
        ov_setting = {BOREAS_SETTING_NAME: 1}
498
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
499
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
500
            p.scan_id = '456-789'
501
            p.kbdb.add_scan_preferences = MagicMock()
502
            p.prepare_boreas_alive_test()
503
504
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||16'])]
505
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
506
507
        # ICMP was chosen as alive test.
508
        w = DummyDaemon()
509
        t_opt = {'alive_test': AliveTest.ALIVE_TEST_ICMP}
510
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
511
        ov_setting = {BOREAS_SETTING_NAME: 1}
512
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
513
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
514
            p.scan_id = '456-789'
515
            p.kbdb.add_scan_preferences = MagicMock()
516
            p.prepare_boreas_alive_test()
517
518
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||2'])]
519
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
520
521
        # "Scan Config Default" as alive_test.
522
        w = DummyDaemon()
523
        t_opt = {'alive_test': AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT}
524
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
525
        ov_setting = {BOREAS_SETTING_NAME: 1}
526
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
527
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
528
            p.scan_id = '456-789'
529
            p.kbdb.add_scan_preferences = MagicMock()
530
            p.prepare_boreas_alive_test()
531
532
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||2'])]
533
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
534
535
        # TCP-SYN alive test and dedicated port list for alive scan provided.
536
        w = DummyDaemon()
537
        t_opt = {
538
            'alive_test_ports': "80,137",
539
            'alive_test': AliveTest.ALIVE_TEST_TCP_SYN_SERVICE,
540
        }
541
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
542
        ov_setting = {BOREAS_SETTING_NAME: 1}
543
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
544
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
545
            p.scan_id = '456-789'
546
            p.kbdb.add_scan_preferences = MagicMock()
547
            p.prepare_boreas_alive_test()
548
549
            calls = [
550
                call(p.scan_id, [BOREAS_ALIVE_TEST + '|||16']),
551
                call(p.scan_id, [BOREAS_ALIVE_TEST_PORTS + '|||80,137']),
552
            ]
553
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
554
555
    @patch('ospd_openvas.db.KbDB')
556
    def test_set_boreas_alive_test_not_as_enum(self, mock_kb):
557
        # No Boreas config setting (BOREAS_SETTING_NAME) set
558
        w = DummyDaemon()
559
        ov_setting = {'not_the_correct_setting': 1}
560
        t_opt = {}
561
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
562
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
563
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
564
            p.scan_id = '456-789'
565
            p.kbdb.add_scan_preferences = MagicMock()
566
            p.prepare_boreas_alive_test()
567
568
            p.kbdb.add_scan_preferences.assert_not_called()
569
570
        # Boreas config setting set but invalid alive_test.
571
        w = DummyDaemon()
572
        t_opt = {'alive_test_methods': "1", 'arp': '-1'}
573
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
574
        ov_setting = {BOREAS_SETTING_NAME: 1}
575
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
576
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
577
            p.scan_id = '456-789'
578
            p.kbdb.add_scan_preferences = MagicMock()
579
            p.prepare_boreas_alive_test()
580
581
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||2'])]
582
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
583
584
        # ICMP was chosen as alive test.
585
        w = DummyDaemon()
586
        t_opt = {'alive_test_methods': "1", 'icmp': '1'}
587
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
588
        ov_setting = {BOREAS_SETTING_NAME: 1}
589
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
590
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
591
            p.scan_id = '456-789'
592
            p.kbdb.add_scan_preferences = MagicMock()
593
            p.prepare_boreas_alive_test()
594
595
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||2'])]
596
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
597
598
        # tcp_syn as alive test.
599
        w = DummyDaemon()
600
        t_opt = {'alive_test_methods': "1", 'tcp_syn': '1'}
601
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
602
        ov_setting = {BOREAS_SETTING_NAME: 1}
603
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
604
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
605
            p.scan_id = '456-789'
606
            p.kbdb.add_scan_preferences = MagicMock()
607
            p.prepare_boreas_alive_test()
608
609
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||16'])]
610
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
611
612
        # tcp_ack as alive test.
613
        w = DummyDaemon()
614
        t_opt = {'alive_test_methods': "1", 'tcp_ack': '1'}
615
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
616
        ov_setting = {BOREAS_SETTING_NAME: 1}
617
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
618
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
619
            p.scan_id = '456-789'
620
            p.kbdb.add_scan_preferences = MagicMock()
621
            p.prepare_boreas_alive_test()
622
623
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||1'])]
624
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
625
626
        # arp as alive test.
627
        w = DummyDaemon()
628
        t_opt = {'alive_test_methods': "1", 'arp': '1'}
629
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
630
        ov_setting = {BOREAS_SETTING_NAME: 1}
631
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
632
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
633
            p.scan_id = '456-789'
634
            p.kbdb.add_scan_preferences = MagicMock()
635
            p.prepare_boreas_alive_test()
636
637
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||4'])]
638
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
639
640
        # arp as alive test.
641
        w = DummyDaemon()
642
        t_opt = {'alive_test_methods': "1", 'consider_alive': '1'}
643
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
644
        ov_setting = {BOREAS_SETTING_NAME: 1}
645
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
646
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
647
            p.scan_id = '456-789'
648
            p.kbdb.add_scan_preferences = MagicMock()
649
            p.prepare_boreas_alive_test()
650
651
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||8'])]
652
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
653
654
        # all alive test methods
655
        w = DummyDaemon()
656
        t_opt = {
657
            'alive_test_methods': "1",
658
            'icmp': '1',
659
            'tcp_ack': '1',
660
            'tcp_syn': '1',
661
            'arp': '1',
662
            'consider_alive': '1',
663
        }
664
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
665
        ov_setting = {BOREAS_SETTING_NAME: 1}
666
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
667
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
668
            p.scan_id = '456-789'
669
            p.kbdb.add_scan_preferences = MagicMock()
670
            p.prepare_boreas_alive_test()
671
672
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||31'])]
673
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
674
675
        # TCP-SYN alive test and dedicated port list for alive scan provided.
676
        w = DummyDaemon()
677
        t_opt = {
678
            'alive_test_ports': "80,137",
679
            'alive_test_methods': "1",
680
            'tcp_syn': '1',
681
        }
682
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
683
        ov_setting = {BOREAS_SETTING_NAME: 1}
684
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
685
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
686
            p.scan_id = '456-789'
687
            p.kbdb.add_scan_preferences = MagicMock()
688
            p.prepare_boreas_alive_test()
689
690
            calls = [
691
                call(p.scan_id, [BOREAS_ALIVE_TEST + '|||16']),
692
                call(p.scan_id, [BOREAS_ALIVE_TEST_PORTS + '|||80,137']),
693
            ]
694
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
695
696
    @patch('ospd_openvas.db.KbDB')
697
    def test_set_boreas_alive_test_enum_has_precedence(self, mock_kb):
698
        w = DummyDaemon()
699
        t_opt = {
700
            'alive_test_methods': "1",
701
            'consider_alive': '1',
702
            'alive_test': AliveTest.ALIVE_TEST_ICMP,
703
        }
704
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
705
        ov_setting = {BOREAS_SETTING_NAME: 1}
706
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
707
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
708
            p.scan_id = '456-789'
709
            p.kbdb.add_scan_preferences = MagicMock()
710
            p.prepare_boreas_alive_test()
711
712
            # has icmp and not consider_alive
713
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||2'])]
714
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
715
716
    @patch('ospd_openvas.db.KbDB')
717
    def test_set_boreas_alive_test_without_settings(self, mock_kb):
718
        w = DummyDaemon()
719
        t_opt = {'alive_test': 16}
720
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
721
        ov_setting = {}
722
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
723
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
724
            p.scan_id = '456-789'
725
            p.kbdb.add_scan_preferences = MagicMock()
726
            p.prepare_boreas_alive_test()
727
728
            p.kbdb.add_scan_preferences.assert_not_called()
729
730 View Code Duplication
    @patch('ospd_openvas.db.KbDB')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
731
    def test_set_alive_no_setting(self, mock_kb):
732
        w = DummyDaemon()
733
734
        t_opt = {}
735
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
736
737
        ov_setting = {}
738
739
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
740
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
741
            p.scan_id = '456-789'
742
            p.kbdb.add_scan_preferences = MagicMock()
743
            p.prepare_alive_test_option_for_openvas()
744
745
            p.kbdb.add_scan_preferences.assert_not_called()
746
747 View Code Duplication
    @patch('ospd_openvas.db.KbDB')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
748
    def test_set_alive_no_invalid_alive_test(self, mock_kb):
749
        w = DummyDaemon()
750
751
        t_opt = {'alive_test': -1}
752
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
753
754
        ov_setting = {'some_setting': 1}
755
756
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
757
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
758
            p._nvts_params = {}
759
            p.scan_id = '456-789'
760
            p.kbdb.add_scan_preferences = MagicMock()
761
            p.prepare_alive_test_option_for_openvas()
762
763
            p.kbdb.add_scan_preferences.assert_not_called()
764
765
    @patch('ospd_openvas.db.KbDB')
766
    def test_set_alive_no_invalid_alive_test_no_enum(self, mock_kb):
767
        w = DummyDaemon()
768
769
        t_opt = {'alive_test_methods': '1', 'icmp': '-1'}
770
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
771
772
        ov_setting = {'some_setting': 1}
773
774
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
775
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
776
            p._nvts_params = {}
777
            p.scan_id = '456-789'
778
            p.kbdb.add_scan_preferences = MagicMock()
779
            p.prepare_alive_test_option_for_openvas()
780
781
            p.kbdb.add_scan_preferences.assert_not_called()
782
783
    @patch('ospd_openvas.db.KbDB')
784
    def test_set_alive_pinghost(self, mock_kb):
785
        w = DummyDaemon()
786
787
        alive_test_out = [
788
            "1.3.6.1.4.1.25623.1.0.100315:1:checkbox:Do a TCP ping|||no",
789
            "1.3.6.1.4.1.25623.1.0.100315:2:checkbox:TCP ping tries also TCP-SYN ping|||no",
790
            "1.3.6.1.4.1.25623.1.0.100315:7:checkbox:TCP ping tries only TCP-SYN ping|||no",
791
            "1.3.6.1.4.1.25623.1.0.100315:3:checkbox:Do an ICMP ping|||yes",
792
            "1.3.6.1.4.1.25623.1.0.100315:4:checkbox:Use ARP|||no",
793
            "1.3.6.1.4.1.25623.1.0.100315:5:checkbox:Mark unrechable Hosts as dead (not scanning)|||yes",
794
        ]
795
796
        t_opt = {'alive_test': 2}
797
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
798
799
        ov_setting = {'some_setting': 1}
800
801
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
802
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
803
            p._nvts_params = {}
804
            p.scan_id = '456-789'
805
            p.kbdb.add_scan_preferences = MagicMock()
806
            p.prepare_alive_test_option_for_openvas()
807
808
            for key, value in p._nvts_params.items():
809
                self.assertTrue(
810
                    "{0}|||{1}".format(key, value) in alive_test_out
811
                )
812
813
    @patch('ospd_openvas.db.KbDB')
814
    def test_prepare_alive_test_not_supplied_as_enum(self, mock_kb):
815
        w = DummyDaemon()
816
817
        alive_test_out = {
818
            "1.3.6.1.4.1.25623.1.0.100315:1:checkbox:Do a TCP ping": "no",
819
            "1.3.6.1.4.1.25623.1.0.100315:2:checkbox:TCP ping tries also TCP-SYN ping": "no",
820
            "1.3.6.1.4.1.25623.1.0.100315:7:checkbox:TCP ping tries only TCP-SYN ping": "no",
821
            "1.3.6.1.4.1.25623.1.0.100315:3:checkbox:Do an ICMP ping": "yes",
822
            "1.3.6.1.4.1.25623.1.0.100315:4:checkbox:Use ARP": "no",
823
            "1.3.6.1.4.1.25623.1.0.100315:5:checkbox:Mark unrechable Hosts as dead (not scanning)": "yes",
824
        }
825
826
        t_opt = {'alive_test_methods': '1', 'icmp': '1'}
827
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
828
829
        ov_setting = {'some_setting': 1}
830
831
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
832
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
833
            p._nvts_params = {}
834
            p.scan_id = '456-789'
835
            p.kbdb.add_scan_preferences = MagicMock()
836
            p.prepare_alive_test_option_for_openvas()
837
838
            self.assertEqual(p._nvts_params, alive_test_out)
839
840
    @patch('ospd_openvas.db.KbDB')
841
    def test_prepare_alive_test_no_enum_no_alive_test(self, mock_kb):
842
        w = DummyDaemon()
843
844
        t_opt = {'alive_test_methods': '1', 'icmp': '0'}
845
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
846
847
        ov_setting = {'some_setting': 1}
848
849
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
850
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
851
            p._nvts_params = {}
852
            p.scan_id = '456-789'
853
            p.kbdb.add_scan_preferences = MagicMock()
854
            p.prepare_alive_test_option_for_openvas()
855
856
            p.kbdb.add_scan_preferences.assert_not_called()
857
858
    def test_alive_test_methods_to_bit_field(self):
859
860
        self.assertEqual(
861
            AliveTest.ALIVE_TEST_TCP_ACK_SERVICE,
862
            alive_test_methods_to_bit_field(
863
                icmp=False,
864
                tcp_ack=True,
865
                tcp_syn=False,
866
                arp=False,
867
                consider_alive=False,
868
            ),
869
        )
870
871
        self.assertEqual(
872
            AliveTest.ALIVE_TEST_ICMP,
873
            alive_test_methods_to_bit_field(
874
                icmp=True,
875
                tcp_ack=False,
876
                tcp_syn=False,
877
                arp=False,
878
                consider_alive=False,
879
            ),
880
        )
881
882
        self.assertEqual(
883
            AliveTest.ALIVE_TEST_ARP,
884
            alive_test_methods_to_bit_field(
885
                icmp=False,
886
                tcp_ack=False,
887
                tcp_syn=False,
888
                arp=True,
889
                consider_alive=False,
890
            ),
891
        )
892
893
        self.assertEqual(
894
            AliveTest.ALIVE_TEST_CONSIDER_ALIVE,
895
            alive_test_methods_to_bit_field(
896
                icmp=False,
897
                tcp_ack=False,
898
                tcp_syn=False,
899
                arp=False,
900
                consider_alive=True,
901
            ),
902
        )
903
904
        self.assertEqual(
905
            AliveTest.ALIVE_TEST_TCP_SYN_SERVICE,
906
            alive_test_methods_to_bit_field(
907
                icmp=False,
908
                tcp_ack=False,
909
                tcp_syn=True,
910
                arp=False,
911
                consider_alive=False,
912
            ),
913
        )
914
915
        all_alive_test_methods = (
916
            AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT
917
            | AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
918
            | AliveTest.ALIVE_TEST_ICMP
919
            | AliveTest.ALIVE_TEST_ARP
920
            | AliveTest.ALIVE_TEST_CONSIDER_ALIVE
921
            | AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
922
        )
923
        self.assertEqual(
924
            all_alive_test_methods,
925
            alive_test_methods_to_bit_field(
926
                icmp=True,
927
                tcp_ack=True,
928
                tcp_syn=True,
929
                arp=True,
930
                consider_alive=True,
931
            ),
932
        )
933
934
    @patch('ospd_openvas.db.KbDB')
935
    def test_prepare_nvt_prefs(self, mock_kb):
936
        w = DummyDaemon()
937
938
        alive_test_out = [
939
            "1.3.6.1.4.1.25623.1.0.100315:1:checkbox:Do a TCP ping|||no"
940
        ]
941
942
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
943
        p._nvts_params = {
944
            "1.3.6.1.4.1.25623.1.0.100315:1:checkbox:Do a TCP ping": "no"
945
        }
946
        p.kbdb.add_scan_preferences = MagicMock()
947
        p.prepare_nvt_preferences()
948
949
        p.kbdb.add_scan_preferences.assert_called_with(
950
            p.scan_id,
951
            alive_test_out,
952
        )
953
954
    @patch('ospd_openvas.db.KbDB')
955
    def test_prepare_nvt_prefs_no_prefs(self, mock_kb):
956
        w = DummyDaemon()
957
958
        p = PreferenceHandler('456-789', mock_kb, w.scan_collection, None)
959
        p._nvts_params = {}
960
        p.kbdb.add_scan_preferences = MagicMock()
961
        p.prepare_nvt_preferences()
962
963
        p.kbdb.add_scan_preferences.assert_not_called()
964