Passed
Push — master ( 62317c...eaef71 )
by Juan José
01:52 queued 13s
created

tests.test_preferencehandler   D

Complexity

Total Complexity 57

Size/Duplication

Total Lines 953
Duplicated Lines 3.46 %

Importance

Changes 0
Metric Value
wmc 57
eloc 685
dl 33
loc 953
rs 4.955
c 0
b 0
f 0

32 Methods

Rating   Name   Duplication   Size   Complexity  
A PreferenceHandlerTestCase.test_prepare_alive_test_no_enum_no_alive_test() 0 17 2
A PreferenceHandlerTestCase.test_process_vts_not_found() 0 15 1
A PreferenceHandlerTestCase.test_prepare_nvt_prefs_no_prefs() 0 10 1
A PreferenceHandlerTestCase.test_set_ports() 0 14 1
A PreferenceHandlerTestCase.test_process_vts_bad_param_id() 0 13 1
B PreferenceHandlerTestCase.test_build_credentials() 0 51 1
A PreferenceHandlerTestCase.test_set_alive_no_setting() 16 16 2
A PreferenceHandlerTestCase.test_set_alive_pinghost() 0 28 3
A PreferenceHandlerTestCase.test_prepare_nvt_prefs() 0 18 1
A PreferenceHandlerTestCase.test_set_host_options_none() 0 14 1
A PreferenceHandlerTestCase.test_build_alive_test_opt() 0 24 1
A PreferenceHandlerTestCase.test_build_credentials_ssh_up() 0 21 1
C PreferenceHandlerTestCase.test_set_boreas_alive_test_with_settings() 0 90 7
A PreferenceHandlerTestCase.test_set_reverse_lookup_opt() 0 17 1
A PreferenceHandlerTestCase.test_process_vts() 0 16 1
A PreferenceHandlerTestCase.test_set_target() 0 14 1
B PreferenceHandlerTestCase.test_alive_test_methods_to_bit_field() 0 73 1
A PreferenceHandlerTestCase.test_set_scan_params() 0 25 1
A PreferenceHandlerTestCase.test_build_alive_test_opt_empty() 0 16 1
A PreferenceHandlerTestCase.test_set_boreas_alive_test_enum_has_precedence() 0 19 2
A PreferenceHandlerTestCase.test_set_alive_no_invalid_alive_test_no_enum() 0 17 2
A PreferenceHandlerTestCase.test_set_alive_no_invalid_alive_test() 17 17 2
A PreferenceHandlerTestCase.test_set_credentials() 0 37 1
A PreferenceHandlerTestCase.test_set_credentials_empty() 0 14 1
D PreferenceHandlerTestCase.test_set_boreas_alive_test_not_as_enum() 0 140 10
A PreferenceHandlerTestCase.test_set_main_kbindex() 0 12 1
A PreferenceHandlerTestCase.test_set_plugins_true() 0 17 1
A PreferenceHandlerTestCase.test_build_alive_test_opt_fail_1() 0 10 1
A PreferenceHandlerTestCase.test_prepare_alive_test_not_supplied_as_enum() 0 26 2
A PreferenceHandlerTestCase.test_set_boreas_alive_test_without_settings() 0 13 2
A PreferenceHandlerTestCase.test_set_plugins_false() 0 12 1
A PreferenceHandlerTestCase.test_set_host_options() 0 16 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like tests.test_preferencehandler often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
import logging
21
22
from unittest import TestCase
23
from unittest.mock import call, patch, Mock, MagicMock
24
from collections import OrderedDict
25
26
from ospd.vts import Vts
27
28
from tests.dummydaemon import DummyDaemon
29
from tests.helper import assert_called_once
30
31
import ospd_openvas.db
32
33
from ospd_openvas.openvas import Openvas
34
from ospd_openvas.preferencehandler import (
35
    AliveTest,
36
    BOREAS_SETTING_NAME,
37
    BOREAS_ALIVE_TEST,
38
    BOREAS_ALIVE_TEST_PORTS,
39
    PreferenceHandler,
40
    alive_test_methods_to_bit_field,
41
)
42
43
44
class PreferenceHandlerTestCase(TestCase):
45
    @patch('ospd_openvas.db.KbDB')
46
    def test_process_vts_not_found(self, mock_kb):
47
        w = DummyDaemon()
48
        logging.Logger.warning = Mock()
49
50
        vts = {
51
            '1.3.6.1.4.1.25623.1.0.100065': {'3': 'new value'},
52
            'vt_groups': ['family=debian', 'family=general'],
53
        }
54
55
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, w.nvti)
56
        w.nvti.get_nvt_metadata.return_value = None
57
        p._process_vts(vts)
58
59
        assert_called_once(logging.Logger.warning)
60
61
    def test_process_vts_bad_param_id(self):
62
        w = DummyDaemon()
63
64
        vts = {
65
            '1.3.6.1.4.1.25623.1.0.100061': {'3': 'new value'},
66
            'vt_groups': ['family=debian', 'family=general'],
67
        }
68
69
        p = PreferenceHandler('1234-1234', None, w.scan_collection, w.nvti)
70
71
        ret = p._process_vts(vts)
72
73
        self.assertFalse(ret[1])
74
75
    def test_process_vts(self):
76
        w = DummyDaemon()
77
78
        vts = {
79
            '1.3.6.1.4.1.25623.1.0.100061': {'1': 'new value'},
80
            'vt_groups': ['family=debian', 'family=general'],
81
        }
82
        vt_out = (
83
            ['1.3.6.1.4.1.25623.1.0.100061'],
84
            {'1.3.6.1.4.1.25623.1.0.100061:1:entry:Data length :': 'new value'},
85
        )
86
87
        p = PreferenceHandler('1234-1234', None, w.scan_collection, w.nvti)
88
        ret = p._process_vts(vts)
89
90
        self.assertEqual(ret, vt_out)
91
92
    @patch('ospd_openvas.db.KbDB')
93
    def test_set_plugins_false(self, mock_kb):
94
        w = DummyDaemon()
95
96
        w.scan_collection.get_vts = Mock()
97
        w.scan_collection.get_vts.return_value = {}
98
99
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, w.nvti)
100
        p.kbdb.add_scan_preferences = Mock()
101
        r = p.prepare_plugins_for_openvas()
102
103
        self.assertFalse(r)
104
105
    @patch('ospd_openvas.db.KbDB')
106
    def test_set_plugins_true(self, mock_kb):
107
        w = DummyDaemon()
108
109
        vts = {
110
            '1.3.6.1.4.1.25623.1.0.100061': {'3': 'new value'},
111
            'vt_groups': ['family=debian', 'family=general'],
112
        }
113
114
        w.scan_collection.get_vts = Mock()
115
        w.scan_collection.get_vts.return_value = vts
116
117
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, w.nvti)
118
        p.kbdb.add_scan_preferences = Mock()
119
        r = p.prepare_plugins_for_openvas()
120
121
        self.assertTrue(r)
122
123
    def test_build_credentials_ssh_up(self):
124
        w = DummyDaemon()
125
126
        cred_out = [
127
            'auth_port_ssh|||22',
128
            '1.3.6.1.4.1.25623.1.0.103591:1:entry:SSH login name:|||username',
129
            '1.3.6.1.4.1.25623.1.0.103591:3:password:SSH password (unsafe!):|||pass',
130
        ]
131
        cred_dict = {
132
            'ssh': {
133
                'type': 'up',
134
                'port': '22',
135
                'username': 'username',
136
                'password': 'pass',
137
            }
138
        }
139
        p = PreferenceHandler('1234-1234', None, w.scan_collection, None)
140
141
        ret = p.build_credentials_as_prefs(cred_dict)
142
143
        self.assertEqual(ret, cred_out)
144
145
    def test_build_credentials(self):
146
        w = DummyDaemon()
147
148
        cred_out = [
149
            '1.3.6.1.4.1.25623.1.0.105058:1:entry:ESXi login name:|||username',
150
            '1.3.6.1.4.1.25623.1.0.105058:2:password:ESXi login password:|||pass',
151
            'auth_port_ssh|||22',
152
            '1.3.6.1.4.1.25623.1.0.103591:1:entry:SSH login name:|||username',
153
            '1.3.6.1.4.1.25623.1.0.103591:2:password:SSH key passphrase:|||pass',
154
            '1.3.6.1.4.1.25623.1.0.103591:4:file:SSH private key:|||',
155
            '1.3.6.1.4.1.25623.1.0.90023:1:entry:SMB login:|||username',
156
            '1.3.6.1.4.1.25623.1.0.90023:2:password]:SMB password :|||pass',
157
            '1.3.6.1.4.1.25623.1.0.105076:1:password:SNMP Community:some comunity',
158
            '1.3.6.1.4.1.25623.1.0.105076:2:entry:SNMPv3 Username:username',
159
            '1.3.6.1.4.1.25623.1.0.105076:3:password:SNMPv3 Password:pass',
160
            '1.3.6.1.4.1.25623.1.0.105076:4:radio:SNMPv3 Authentication Algorithm:some auth algo',
161
            '1.3.6.1.4.1.25623.1.0.105076:5:password:SNMPv3 Privacy Password:privacy pass',
162
            '1.3.6.1.4.1.25623.1.0.105076:6:radio:SNMPv3 Privacy Algorithm:privacy algo',
163
        ]
164
        cred_dict = {
165
            'ssh': {
166
                'type': 'ssh',
167
                'port': '22',
168
                'username': 'username',
169
                'password': 'pass',
170
            },
171
            'smb': {'type': 'smb', 'username': 'username', 'password': 'pass'},
172
            'esxi': {
173
                'type': 'esxi',
174
                'username': 'username',
175
                'password': 'pass',
176
            },
177
            'snmp': {
178
                'type': 'snmp',
179
                'username': 'username',
180
                'password': 'pass',
181
                'community': 'some comunity',
182
                'auth_algorithm': 'some auth algo',
183
                'privacy_password': 'privacy pass',
184
                'privacy_algorithm': 'privacy algo',
185
            },
186
        }
187
188
        p = PreferenceHandler('1234-1234', None, w.scan_collection, None)
189
        ret = p.build_credentials_as_prefs(cred_dict)
190
191
        self.assertEqual(len(ret), len(cred_out))
192
        self.assertIn('auth_port_ssh|||22', cred_out)
193
        self.assertIn(
194
            '1.3.6.1.4.1.25623.1.0.90023:1:entry:SMB login:|||username',
195
            cred_out,
196
        )
197
198
    def test_build_alive_test_opt_empty(self):
199
        w = DummyDaemon()
200
201
        target_options_dict = {'alive_test': '0'}
202
203
        p = PreferenceHandler('1234-1234', None, w.scan_collection, None)
204
        ret = p.build_alive_test_opt_as_prefs(target_options_dict)
205
206
        self.assertEqual(ret, {})
207
208
        # alive test was supplied via seperate xml element
209
        w = DummyDaemon()
210
        target_options_dict = {'alive_test_methods': '1', 'icmp': '0'}
211
        p = PreferenceHandler('1234-1234', None, w.scan_collection, None)
212
        ret = p.build_alive_test_opt_as_prefs(target_options_dict)
213
        self.assertEqual(ret, {})
214
215
    def test_build_alive_test_opt(self):
216
        w = DummyDaemon()
217
218
        alive_test_out = {
219
            "1.3.6.1.4.1.25623.1.0.100315:1:checkbox:Do a TCP ping": "no",
220
            "1.3.6.1.4.1.25623.1.0.100315:2:checkbox:TCP ping tries also TCP-SYN ping": "no",
221
            "1.3.6.1.4.1.25623.1.0.100315:7:checkbox:TCP ping tries only TCP-SYN ping": "no",
222
            "1.3.6.1.4.1.25623.1.0.100315:3:checkbox:Do an ICMP ping": "yes",
223
            "1.3.6.1.4.1.25623.1.0.100315:4:checkbox:Use ARP": "no",
224
            "1.3.6.1.4.1.25623.1.0.100315:5:checkbox:Mark unrechable Hosts as dead (not scanning)": "yes",
225
        }
226
227
        target_options_dict = {'alive_test': '2'}
228
        p = PreferenceHandler('1234-1234', None, w.scan_collection, None)
229
        ret = p.build_alive_test_opt_as_prefs(target_options_dict)
230
231
        self.assertEqual(ret, alive_test_out)
232
233
        # alive test was supplied via sepertae xml element
234
        w = DummyDaemon()
235
        target_options_dict = {'alive_test_methods': '1', 'icmp': '1'}
236
        p = PreferenceHandler('1234-1234', None, w.scan_collection, None)
237
        ret = p.build_alive_test_opt_as_prefs(target_options_dict)
238
        self.assertEqual(ret, alive_test_out)
239
240
    def test_build_alive_test_opt_fail_1(self):
241
        w = DummyDaemon()
242
        logging.Logger.debug = Mock()
243
244
        target_options_dict = {'alive_test': 'a'}
245
        p = PreferenceHandler('1234-1234', None, w.scan_collection, None)
246
        target_options = p.build_alive_test_opt_as_prefs(target_options_dict)
247
248
        assert_called_once(logging.Logger.debug)
249
        self.assertEqual(len(target_options), 0)
250
251
    @patch('ospd_openvas.db.KbDB')
252
    def test_set_target(self, mock_kb):
253
        w = DummyDaemon()
254
255
        w.scan_collection.get_host_list = MagicMock(return_value='192.168.0.1')
256
257
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
258
        p.scan_id = '456-789'
259
        p.kbdb.add_scan_preferences = MagicMock()
260
        p.prepare_target_for_openvas()
261
262
        p.kbdb.add_scan_preferences.assert_called_with(
263
            p.scan_id,
264
            ['TARGET|||192.168.0.1'],
265
        )
266
267
    @patch('ospd_openvas.db.KbDB')
268
    def test_set_ports(self, mock_kb):
269
        w = DummyDaemon()
270
271
        w.scan_collection.get_ports = MagicMock(return_value='80,443')
272
273
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
274
        p.scan_id = '456-789'
275
        p.kbdb.add_scan_preferences = MagicMock()
276
        p.prepare_ports_for_openvas()
277
278
        p.kbdb.add_scan_preferences.assert_called_with(
279
            p.scan_id,
280
            ['port_range|||80,443'],
281
        )
282
283
    @patch('ospd_openvas.db.KbDB')
284
    def test_set_main_kbindex(self, mock_kb):
285
        w = DummyDaemon()
286
287
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
288
        p.kbdb.add_scan_preferences = Mock()
289
        p.kbdb.index = 2
290
        p.prepare_main_kbindex_for_openvas()
291
292
        p.kbdb.add_scan_preferences.assert_called_with(
293
            p.scan_id,
294
            ['ov_maindbid|||2'],
295
        )
296
297
    @patch('ospd_openvas.db.KbDB')
298
    def test_set_credentials(self, mock_kb):
299
        w = DummyDaemon()
300
301
        creds = {
302
            'ssh': {
303
                'type': 'ssh',
304
                'port': '22',
305
                'username': 'username',
306
                'password': 'pass',
307
            },
308
            'smb': {'type': 'smb', 'username': 'username', 'password': 'pass'},
309
            'esxi': {
310
                'type': 'esxi',
311
                'username': 'username',
312
                'password': 'pass',
313
            },
314
            'snmp': {
315
                'type': 'snmp',
316
                'username': 'username',
317
                'password': 'pass',
318
                'community': 'some comunity',
319
                'auth_algorithm': 'some auth algo',
320
                'privacy_password': 'privacy pass',
321
                'privacy_algorithm': 'privacy algo',
322
            },
323
        }
324
325
        w.scan_collection.get_credentials = MagicMock(return_value=creds)
326
327
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
328
        p.scan_id = '456-789'
329
        p.kbdb.add_scan_preferences = MagicMock()
330
        r = p.prepare_credentials_for_openvas()
331
332
        self.assertTrue(r)
333
        assert_called_once(p.kbdb.add_scan_preferences)
334
335
    @patch('ospd_openvas.db.KbDB')
336
    def test_set_credentials(self, mock_kb):
337
        w = DummyDaemon()
338
339
        # bad cred type shh instead of ssh
340
        creds = {
341
            'shh': {
342
                'type': 'ssh',
343
                'port': '22',
344
                'username': 'username',
345
                'password': 'pass',
346
            },
347
        }
348
349
        w.scan_collection.get_credentials = MagicMock(return_value=creds)
350
351
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
352
        p.scan_id = '456-789'
353
        p.kbdb.add_scan_preferences = MagicMock()
354
        r = p.prepare_credentials_for_openvas()
355
356
        self.assertFalse(r)
357
358
    @patch('ospd_openvas.db.KbDB')
359
    def test_set_credentials_empty(self, mock_kb):
360
        w = DummyDaemon()
361
362
        creds = {}
363
364
        w.scan_collection.get_credentials = MagicMock(return_value=creds)
365
366
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
367
        p.scan_id = '456-789'
368
        p.kbdb.add_scan_preferences = MagicMock()
369
        r = p.prepare_credentials_for_openvas()
370
371
        self.assertTrue(r)
372
373
    @patch('ospd_openvas.db.KbDB')
374
    def test_set_host_options(self, mock_kb):
375
        w = DummyDaemon()
376
377
        exc = '192.168.0.1'
378
379
        w.scan_collection.get_exclude_hosts = MagicMock(return_value=exc)
380
381
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
382
        p.scan_id = '456-789'
383
        p.kbdb.add_scan_preferences = MagicMock()
384
        p.prepare_host_options_for_openvas()
385
386
        p.kbdb.add_scan_preferences.assert_called_with(
387
            p.scan_id,
388
            ['exclude_hosts|||192.168.0.1'],
389
        )
390
391
    @patch('ospd_openvas.db.KbDB')
392
    def test_set_host_options_none(self, mock_kb):
393
        w = DummyDaemon()
394
395
        exc = ''
396
397
        w.scan_collection.get_exclude_hosts = MagicMock(return_value=exc)
398
399
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
400
        p.scan_id = '456-789'
401
        p.kbdb.add_scan_preferences = MagicMock()
402
        p.prepare_host_options_for_openvas()
403
404
        p.kbdb.add_scan_preferences.assert_not_called()
405
406
    @patch('ospd_openvas.db.KbDB')
407
    def test_set_scan_params(self, mock_kb):
408
        w = DummyDaemon()
409
410
        OSPD_PARAMS_MOCK = {
411
            'drop_privileges': {
412
                'type': 'boolean',
413
                'name': 'drop_privileges',
414
                'default': 0,
415
                'mandatory': 1,
416
                'description': '',
417
            },
418
        }
419
420
        opt = {'drop_privileges': 1}
421
422
        w.scan_collection.get_options = MagicMock(return_value=opt)
423
424
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
425
        p.scan_id = '456-789'
426
        p.kbdb.add_scan_preferences = MagicMock()
427
        p.prepare_scan_params_for_openvas(OSPD_PARAMS_MOCK)
428
429
        p.kbdb.add_scan_preferences.assert_called_with(
430
            p.scan_id, ['drop_privileges|||yes']
431
        )
432
433
    @patch('ospd_openvas.db.KbDB')
434
    def test_set_reverse_lookup_opt(self, mock_kb):
435
        w = DummyDaemon()
436
437
        t_opt = {'reverse_lookup_only': 1}
438
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
439
440
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
441
        p.scan_id = '456-789'
442
        p.kbdb.add_scan_preferences = MagicMock()
443
        p.prepare_reverse_lookup_opt_for_openvas()
444
445
        p.kbdb.add_scan_preferences.assert_called_with(
446
            p.scan_id,
447
            [
448
                'reverse_lookup_only|||yes',
449
                'reverse_lookup_unify|||no',
450
            ],
451
        )
452
453
    @patch('ospd_openvas.db.KbDB')
454
    def test_set_boreas_alive_test_with_settings(self, mock_kb):
455
        # No Boreas config setting (BOREAS_SETTING_NAME) set
456
        w = DummyDaemon()
457
        ov_setting = {'not_the_correct_setting': 1}
458
        t_opt = {}
459
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
460
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
461
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
462
            p.scan_id = '456-789'
463
            p.kbdb.add_scan_preferences = MagicMock()
464
            p.prepare_boreas_alive_test()
465
466
            p.kbdb.add_scan_preferences.assert_not_called()
467
468
        # Boreas config setting set but invalid alive_test.
469
        w = DummyDaemon()
470
        t_opt = {'alive_test': "error"}
471
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
472
        ov_setting = {BOREAS_SETTING_NAME: 1}
473
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
474
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
475
            p.scan_id = '456-789'
476
            p.kbdb.add_scan_preferences = MagicMock()
477
            p.prepare_boreas_alive_test()
478
479
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||2'])]
480
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
481
482
        # ALIVE_TEST_TCP_SYN_SERVICE as alive test.
483
        w = DummyDaemon()
484
        t_opt = {'alive_test': AliveTest.ALIVE_TEST_TCP_SYN_SERVICE}
485
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
486
        ov_setting = {BOREAS_SETTING_NAME: 1}
487
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
488
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
489
            p.scan_id = '456-789'
490
            p.kbdb.add_scan_preferences = MagicMock()
491
            p.prepare_boreas_alive_test()
492
493
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||16'])]
494
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
495
496
        # ICMP was chosen as alive test.
497
        w = DummyDaemon()
498
        t_opt = {'alive_test': AliveTest.ALIVE_TEST_ICMP}
499
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
500
        ov_setting = {BOREAS_SETTING_NAME: 1}
501
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
502
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
503
            p.scan_id = '456-789'
504
            p.kbdb.add_scan_preferences = MagicMock()
505
            p.prepare_boreas_alive_test()
506
507
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||2'])]
508
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
509
510
        # "Scan Config Default" as alive_test.
511
        w = DummyDaemon()
512
        t_opt = {'alive_test': AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT}
513
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
514
        ov_setting = {BOREAS_SETTING_NAME: 1}
515
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
516
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
517
            p.scan_id = '456-789'
518
            p.kbdb.add_scan_preferences = MagicMock()
519
            p.prepare_boreas_alive_test()
520
521
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||2'])]
522
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
523
524
        # TCP-SYN alive test and dedicated port list for alive scan provided.
525
        w = DummyDaemon()
526
        t_opt = {
527
            'alive_test_ports': "80,137",
528
            'alive_test': AliveTest.ALIVE_TEST_TCP_SYN_SERVICE,
529
        }
530
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
531
        ov_setting = {BOREAS_SETTING_NAME: 1}
532
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
533
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
534
            p.scan_id = '456-789'
535
            p.kbdb.add_scan_preferences = MagicMock()
536
            p.prepare_boreas_alive_test()
537
538
            calls = [
539
                call(p.scan_id, [BOREAS_ALIVE_TEST + '|||16']),
540
                call(p.scan_id, [BOREAS_ALIVE_TEST_PORTS + '|||80,137']),
541
            ]
542
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
543
544
    @patch('ospd_openvas.db.KbDB')
545
    def test_set_boreas_alive_test_not_as_enum(self, mock_kb):
546
        # No Boreas config setting (BOREAS_SETTING_NAME) set
547
        w = DummyDaemon()
548
        ov_setting = {'not_the_correct_setting': 1}
549
        t_opt = {}
550
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
551
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
552
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
553
            p.scan_id = '456-789'
554
            p.kbdb.add_scan_preferences = MagicMock()
555
            p.prepare_boreas_alive_test()
556
557
            p.kbdb.add_scan_preferences.assert_not_called()
558
559
        # Boreas config setting set but invalid alive_test.
560
        w = DummyDaemon()
561
        t_opt = {'alive_test_methods': "1", 'arp': '-1'}
562
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
563
        ov_setting = {BOREAS_SETTING_NAME: 1}
564
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
565
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
566
            p.scan_id = '456-789'
567
            p.kbdb.add_scan_preferences = MagicMock()
568
            p.prepare_boreas_alive_test()
569
570
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||2'])]
571
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
572
573
        # ICMP was chosen as alive test.
574
        w = DummyDaemon()
575
        t_opt = {'alive_test_methods': "1", 'icmp': '1'}
576
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
577
        ov_setting = {BOREAS_SETTING_NAME: 1}
578
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
579
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
580
            p.scan_id = '456-789'
581
            p.kbdb.add_scan_preferences = MagicMock()
582
            p.prepare_boreas_alive_test()
583
584
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||2'])]
585
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
586
587
        # tcp_syn as alive test.
588
        w = DummyDaemon()
589
        t_opt = {'alive_test_methods': "1", 'tcp_syn': '1'}
590
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
591
        ov_setting = {BOREAS_SETTING_NAME: 1}
592
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
593
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
594
            p.scan_id = '456-789'
595
            p.kbdb.add_scan_preferences = MagicMock()
596
            p.prepare_boreas_alive_test()
597
598
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||16'])]
599
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
600
601
        # tcp_ack as alive test.
602
        w = DummyDaemon()
603
        t_opt = {'alive_test_methods': "1", 'tcp_ack': '1'}
604
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
605
        ov_setting = {BOREAS_SETTING_NAME: 1}
606
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
607
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
608
            p.scan_id = '456-789'
609
            p.kbdb.add_scan_preferences = MagicMock()
610
            p.prepare_boreas_alive_test()
611
612
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||1'])]
613
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
614
615
        # arp as alive test.
616
        w = DummyDaemon()
617
        t_opt = {'alive_test_methods': "1", 'arp': '1'}
618
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
619
        ov_setting = {BOREAS_SETTING_NAME: 1}
620
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
621
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
622
            p.scan_id = '456-789'
623
            p.kbdb.add_scan_preferences = MagicMock()
624
            p.prepare_boreas_alive_test()
625
626
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||4'])]
627
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
628
629
        # arp as alive test.
630
        w = DummyDaemon()
631
        t_opt = {'alive_test_methods': "1", 'consider_alive': '1'}
632
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
633
        ov_setting = {BOREAS_SETTING_NAME: 1}
634
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
635
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
636
            p.scan_id = '456-789'
637
            p.kbdb.add_scan_preferences = MagicMock()
638
            p.prepare_boreas_alive_test()
639
640
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||8'])]
641
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
642
643
        # all alive test methods
644
        w = DummyDaemon()
645
        t_opt = {
646
            'alive_test_methods': "1",
647
            'icmp': '1',
648
            'tcp_ack': '1',
649
            'tcp_syn': '1',
650
            'arp': '1',
651
            'consider_alive': '1',
652
        }
653
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
654
        ov_setting = {BOREAS_SETTING_NAME: 1}
655
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
656
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
657
            p.scan_id = '456-789'
658
            p.kbdb.add_scan_preferences = MagicMock()
659
            p.prepare_boreas_alive_test()
660
661
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||31'])]
662
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
663
664
        # TCP-SYN alive test and dedicated port list for alive scan provided.
665
        w = DummyDaemon()
666
        t_opt = {
667
            'alive_test_ports': "80,137",
668
            'alive_test_methods': "1",
669
            'tcp_syn': '1',
670
        }
671
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
672
        ov_setting = {BOREAS_SETTING_NAME: 1}
673
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
674
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
675
            p.scan_id = '456-789'
676
            p.kbdb.add_scan_preferences = MagicMock()
677
            p.prepare_boreas_alive_test()
678
679
            calls = [
680
                call(p.scan_id, [BOREAS_ALIVE_TEST + '|||16']),
681
                call(p.scan_id, [BOREAS_ALIVE_TEST_PORTS + '|||80,137']),
682
            ]
683
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
684
685
    @patch('ospd_openvas.db.KbDB')
686
    def test_set_boreas_alive_test_enum_has_precedence(self, mock_kb):
687
        w = DummyDaemon()
688
        t_opt = {
689
            'alive_test_methods': "1",
690
            'consider_alive': '1',
691
            'alive_test': AliveTest.ALIVE_TEST_ICMP,
692
        }
693
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
694
        ov_setting = {BOREAS_SETTING_NAME: 1}
695
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
696
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
697
            p.scan_id = '456-789'
698
            p.kbdb.add_scan_preferences = MagicMock()
699
            p.prepare_boreas_alive_test()
700
701
            # has icmp and not consider_alive
702
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||2'])]
703
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
704
705
    @patch('ospd_openvas.db.KbDB')
706
    def test_set_boreas_alive_test_without_settings(self, mock_kb):
707
        w = DummyDaemon()
708
        t_opt = {'alive_test': 16}
709
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
710
        ov_setting = {}
711
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
712
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
713
            p.scan_id = '456-789'
714
            p.kbdb.add_scan_preferences = MagicMock()
715
            p.prepare_boreas_alive_test()
716
717
            p.kbdb.add_scan_preferences.assert_not_called()
718
719 View Code Duplication
    @patch('ospd_openvas.db.KbDB')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
720
    def test_set_alive_no_setting(self, mock_kb):
721
        w = DummyDaemon()
722
723
        t_opt = {}
724
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
725
726
        ov_setting = {}
727
728
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
729
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
730
            p.scan_id = '456-789'
731
            p.kbdb.add_scan_preferences = MagicMock()
732
            p.prepare_alive_test_option_for_openvas()
733
734
            p.kbdb.add_scan_preferences.assert_not_called()
735
736 View Code Duplication
    @patch('ospd_openvas.db.KbDB')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
737
    def test_set_alive_no_invalid_alive_test(self, mock_kb):
738
        w = DummyDaemon()
739
740
        t_opt = {'alive_test': -1}
741
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
742
743
        ov_setting = {'some_setting': 1}
744
745
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
746
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
747
            p._nvts_params = {}
748
            p.scan_id = '456-789'
749
            p.kbdb.add_scan_preferences = MagicMock()
750
            p.prepare_alive_test_option_for_openvas()
751
752
            p.kbdb.add_scan_preferences.assert_not_called()
753
754
    @patch('ospd_openvas.db.KbDB')
755
    def test_set_alive_no_invalid_alive_test_no_enum(self, mock_kb):
756
        w = DummyDaemon()
757
758
        t_opt = {'alive_test_methods': '1', 'icmp': '-1'}
759
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
760
761
        ov_setting = {'some_setting': 1}
762
763
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
764
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
765
            p._nvts_params = {}
766
            p.scan_id = '456-789'
767
            p.kbdb.add_scan_preferences = MagicMock()
768
            p.prepare_alive_test_option_for_openvas()
769
770
            p.kbdb.add_scan_preferences.assert_not_called()
771
772
    @patch('ospd_openvas.db.KbDB')
773
    def test_set_alive_pinghost(self, mock_kb):
774
        w = DummyDaemon()
775
776
        alive_test_out = [
777
            "1.3.6.1.4.1.25623.1.0.100315:1:checkbox:Do a TCP ping|||no",
778
            "1.3.6.1.4.1.25623.1.0.100315:2:checkbox:TCP ping tries also TCP-SYN ping|||no",
779
            "1.3.6.1.4.1.25623.1.0.100315:7:checkbox:TCP ping tries only TCP-SYN ping|||no",
780
            "1.3.6.1.4.1.25623.1.0.100315:3:checkbox:Do an ICMP ping|||yes",
781
            "1.3.6.1.4.1.25623.1.0.100315:4:checkbox:Use ARP|||no",
782
            "1.3.6.1.4.1.25623.1.0.100315:5:checkbox:Mark unrechable Hosts as dead (not scanning)|||yes",
783
        ]
784
785
        t_opt = {'alive_test': 2}
786
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
787
788
        ov_setting = {'some_setting': 1}
789
790
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
791
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
792
            p._nvts_params = {}
793
            p.scan_id = '456-789'
794
            p.kbdb.add_scan_preferences = MagicMock()
795
            p.prepare_alive_test_option_for_openvas()
796
797
            for key, value in p._nvts_params.items():
798
                self.assertTrue(
799
                    "{0}|||{1}".format(key, value) in alive_test_out
800
                )
801
802
    @patch('ospd_openvas.db.KbDB')
803
    def test_prepare_alive_test_not_supplied_as_enum(self, mock_kb):
804
        w = DummyDaemon()
805
806
        alive_test_out = {
807
            "1.3.6.1.4.1.25623.1.0.100315:1:checkbox:Do a TCP ping": "no",
808
            "1.3.6.1.4.1.25623.1.0.100315:2:checkbox:TCP ping tries also TCP-SYN ping": "no",
809
            "1.3.6.1.4.1.25623.1.0.100315:7:checkbox:TCP ping tries only TCP-SYN ping": "no",
810
            "1.3.6.1.4.1.25623.1.0.100315:3:checkbox:Do an ICMP ping": "yes",
811
            "1.3.6.1.4.1.25623.1.0.100315:4:checkbox:Use ARP": "no",
812
            "1.3.6.1.4.1.25623.1.0.100315:5:checkbox:Mark unrechable Hosts as dead (not scanning)": "yes",
813
        }
814
815
        t_opt = {'alive_test_methods': '1', 'icmp': '1'}
816
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
817
818
        ov_setting = {'some_setting': 1}
819
820
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
821
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
822
            p._nvts_params = {}
823
            p.scan_id = '456-789'
824
            p.kbdb.add_scan_preferences = MagicMock()
825
            p.prepare_alive_test_option_for_openvas()
826
827
            self.assertEqual(p._nvts_params, alive_test_out)
828
829
    @patch('ospd_openvas.db.KbDB')
830
    def test_prepare_alive_test_no_enum_no_alive_test(self, mock_kb):
831
        w = DummyDaemon()
832
833
        t_opt = {'alive_test_methods': '1', 'icmp': '0'}
834
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
835
836
        ov_setting = {'some_setting': 1}
837
838
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
839
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
840
            p._nvts_params = {}
841
            p.scan_id = '456-789'
842
            p.kbdb.add_scan_preferences = MagicMock()
843
            p.prepare_alive_test_option_for_openvas()
844
845
            p.kbdb.add_scan_preferences.assert_not_called()
846
847
    def test_alive_test_methods_to_bit_field(self):
848
849
        self.assertEqual(
850
            AliveTest.ALIVE_TEST_TCP_ACK_SERVICE,
851
            alive_test_methods_to_bit_field(
852
                icmp=False,
853
                tcp_ack=True,
854
                tcp_syn=False,
855
                arp=False,
856
                consider_alive=False,
857
            ),
858
        )
859
860
        self.assertEqual(
861
            AliveTest.ALIVE_TEST_ICMP,
862
            alive_test_methods_to_bit_field(
863
                icmp=True,
864
                tcp_ack=False,
865
                tcp_syn=False,
866
                arp=False,
867
                consider_alive=False,
868
            ),
869
        )
870
871
        self.assertEqual(
872
            AliveTest.ALIVE_TEST_ARP,
873
            alive_test_methods_to_bit_field(
874
                icmp=False,
875
                tcp_ack=False,
876
                tcp_syn=False,
877
                arp=True,
878
                consider_alive=False,
879
            ),
880
        )
881
882
        self.assertEqual(
883
            AliveTest.ALIVE_TEST_CONSIDER_ALIVE,
884
            alive_test_methods_to_bit_field(
885
                icmp=False,
886
                tcp_ack=False,
887
                tcp_syn=False,
888
                arp=False,
889
                consider_alive=True,
890
            ),
891
        )
892
893
        self.assertEqual(
894
            AliveTest.ALIVE_TEST_TCP_SYN_SERVICE,
895
            alive_test_methods_to_bit_field(
896
                icmp=False,
897
                tcp_ack=False,
898
                tcp_syn=True,
899
                arp=False,
900
                consider_alive=False,
901
            ),
902
        )
903
904
        all_alive_test_methods = (
905
            AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT
906
            | AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
907
            | AliveTest.ALIVE_TEST_ICMP
908
            | AliveTest.ALIVE_TEST_ARP
909
            | AliveTest.ALIVE_TEST_CONSIDER_ALIVE
910
            | AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
911
        )
912
        self.assertEqual(
913
            all_alive_test_methods,
914
            alive_test_methods_to_bit_field(
915
                icmp=True,
916
                tcp_ack=True,
917
                tcp_syn=True,
918
                arp=True,
919
                consider_alive=True,
920
            ),
921
        )
922
923
    @patch('ospd_openvas.db.KbDB')
924
    def test_prepare_nvt_prefs(self, mock_kb):
925
        w = DummyDaemon()
926
927
        alive_test_out = [
928
            "1.3.6.1.4.1.25623.1.0.100315:1:checkbox:Do a TCP ping|||no"
929
        ]
930
931
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
932
        p._nvts_params = {
933
            "1.3.6.1.4.1.25623.1.0.100315:1:checkbox:Do a TCP ping": "no"
934
        }
935
        p.kbdb.add_scan_preferences = MagicMock()
936
        p.prepare_nvt_preferences()
937
938
        p.kbdb.add_scan_preferences.assert_called_with(
939
            p.scan_id,
940
            alive_test_out,
941
        )
942
943
    @patch('ospd_openvas.db.KbDB')
944
    def test_prepare_nvt_prefs_no_prefs(self, mock_kb):
945
        w = DummyDaemon()
946
947
        p = PreferenceHandler('456-789', mock_kb, w.scan_collection, None)
948
        p._nvts_params = {}
949
        p.kbdb.add_scan_preferences = MagicMock()
950
        p.prepare_nvt_preferences()
951
952
        p.kbdb.add_scan_preferences.assert_not_called()
953