Completed
Push — master ( a4dee7...a3f003 )
by Juan José
18s queued 14s
created

tests.test_preferencehandler   F

Complexity

Total Complexity 59

Size/Duplication

Total Lines 975
Duplicated Lines 3.38 %

Importance

Changes 0
Metric Value
eloc 701
dl 33
loc 975
rs 3.979
c 0
b 0
f 0
wmc 59

33 Methods

Rating   Name   Duplication   Size   Complexity  
A PreferenceHandlerTestCase.test_prepare_alive_test_no_enum_no_alive_test() 0 17 2
A PreferenceHandlerTestCase.test_process_vts_not_found() 0 18 2
A PreferenceHandlerTestCase.test_prepare_nvt_prefs_no_prefs() 0 10 1
A PreferenceHandlerTestCase.test_set_ports() 0 14 1
A PreferenceHandlerTestCase.test_process_vts_bad_param_id() 0 13 1
B PreferenceHandlerTestCase.test_build_credentials() 0 51 1
A PreferenceHandlerTestCase.test_set_alive_no_setting() 16 16 2
A PreferenceHandlerTestCase.test_set_alive_pinghost() 0 28 3
A PreferenceHandlerTestCase.test_prepare_nvt_prefs() 0 18 1
A PreferenceHandlerTestCase.test_set_host_options_none() 0 14 1
A PreferenceHandlerTestCase.test_build_alive_test_opt() 0 24 1
A PreferenceHandlerTestCase.test_build_credentials_ssh_up() 0 21 1
C PreferenceHandlerTestCase.test_set_boreas_alive_test_with_settings() 0 90 7
A PreferenceHandlerTestCase.test_set_reverse_lookup_opt() 0 17 1
A PreferenceHandlerTestCase.test_process_vts() 0 16 1
A PreferenceHandlerTestCase.test_set_target() 0 14 1
B PreferenceHandlerTestCase.test_alive_test_methods_to_bit_field() 0 73 1
A PreferenceHandlerTestCase.test_set_scan_params() 0 25 1
A PreferenceHandlerTestCase.test_build_alive_test_opt_empty() 0 16 1
A PreferenceHandlerTestCase.test_set_boreas_alive_test_enum_has_precedence() 0 19 2
A PreferenceHandlerTestCase.test_set_alive_no_invalid_alive_test_no_enum() 0 17 2
A PreferenceHandlerTestCase.test_set_alive_no_invalid_alive_test() 17 17 2
A PreferenceHandlerTestCase.test_set_credentials() 0 37 1
A PreferenceHandlerTestCase.test_set_credentials_empty() 0 14 1
D PreferenceHandlerTestCase.test_set_boreas_alive_test_not_as_enum() 0 140 10
A PreferenceHandlerTestCase.test_set_main_kbindex() 0 12 1
A PreferenceHandlerTestCase.test_set_plugins_true() 0 17 1
A PreferenceHandlerTestCase.test_build_alive_test_opt_fail_1() 0 10 1
A PreferenceHandlerTestCase.test_prepare_alive_test_not_supplied_as_enum() 0 26 2
A PreferenceHandlerTestCase.test_get_vts_in_groups() 0 18 1
A PreferenceHandlerTestCase.test_set_boreas_alive_test_without_settings() 0 13 2
A PreferenceHandlerTestCase.test_set_plugins_false() 0 12 1
A PreferenceHandlerTestCase.test_set_host_options() 0 16 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like tests.test_preferencehandler often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
import logging
21
22
from unittest import TestCase
23
from unittest.mock import call, patch, Mock, MagicMock
24
from collections import OrderedDict
25
26
from ospd.vts import Vts
27
28
from tests.dummydaemon import DummyDaemon
29
from tests.helper import assert_called_once
30
31
import ospd_openvas.db
32
33
from ospd_openvas.openvas import Openvas
34
from ospd_openvas.preferencehandler import (
35
    AliveTest,
36
    BOREAS_SETTING_NAME,
37
    BOREAS_ALIVE_TEST,
38
    BOREAS_ALIVE_TEST_PORTS,
39
    PreferenceHandler,
40
    alive_test_methods_to_bit_field,
41
)
42
43
44
class PreferenceHandlerTestCase(TestCase):
45
    @patch('ospd_openvas.db.KbDB')
46
    def test_process_vts_not_found(self, mock_kb):
47
        w = DummyDaemon()
48
        logging.Logger.warning = Mock()
49
50
        vts = {
51
            '1.3.6.1.4.1.25623.1.0.100065': {'3': 'new value'},
52
            'vt_groups': ['family=debian', 'family=general'],
53
        }
54
55
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, w.nvti)
56
        w.nvti.get_nvt_metadata.return_value = None
57
58
        ov_setting = {'table_driven_lsc': 0}
59
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
60
            p._process_vts(vts)
61
62
        assert_called_once(logging.Logger.warning)
63
64
    def test_process_vts_bad_param_id(self):
65
        w = DummyDaemon()
66
67
        vts = {
68
            '1.3.6.1.4.1.25623.1.0.100061': {'3': 'new value'},
69
            'vt_groups': ['family=debian', 'family=general'],
70
        }
71
72
        p = PreferenceHandler('1234-1234', None, w.scan_collection, w.nvti)
73
74
        ret = p._process_vts(vts)
75
76
        self.assertFalse(ret[1])
77
78
    def test_process_vts(self):
79
        w = DummyDaemon()
80
81
        vts = {
82
            '1.3.6.1.4.1.25623.1.0.100061': {'1': 'new value'},
83
            'vt_groups': ['family=debian', 'family=general'],
84
        }
85
        vt_out = (
86
            ['1.3.6.1.4.1.25623.1.0.100061'],
87
            {'1.3.6.1.4.1.25623.1.0.100061:1:entry:Data length :': 'new value'},
88
        )
89
90
        p = PreferenceHandler('1234-1234', None, w.scan_collection, w.nvti)
91
        ret = p._process_vts(vts)
92
93
        self.assertEqual(ret, vt_out)
94
95
    @patch('ospd_openvas.preferencehandler.Openvas')
96
    @patch('ospd_openvas.preferencehandler.NotusMetadataHandler')
97
    def test_get_vts_in_groups(self, MockNotus, MockOpenvas):
98
        w = DummyDaemon()
99
        openvas = MockOpenvas()
100
        openvas.get_settings.return_value = {'table_driven_lsc': 1}
101
        notus = MockNotus()
102
        notus.get_family_driver_linkers.return_value = {
103
            'some_family': '1.3.6.1.4.1.25623.1.0.1234'
104
        }
105
106
        filters = ['family=some_family']
107
        vt_out = ['1.3.6.1.4.1.25623.1.0.1234']
108
109
        p = PreferenceHandler('1234-1234', None, w.scan_collection, w.nvti)
110
        ret = p._get_vts_in_groups(filters)
111
112
        self.assertEqual(ret, vt_out)
113
114
    @patch('ospd_openvas.db.KbDB')
115
    def test_set_plugins_false(self, mock_kb):
116
        w = DummyDaemon()
117
118
        w.scan_collection.get_vts = Mock()
119
        w.scan_collection.get_vts.return_value = {}
120
121
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, w.nvti)
122
        p.kbdb.add_scan_preferences = Mock()
123
        r = p.prepare_plugins_for_openvas()
124
125
        self.assertFalse(r)
126
127
    @patch('ospd_openvas.db.KbDB')
128
    def test_set_plugins_true(self, mock_kb):
129
        w = DummyDaemon()
130
131
        vts = {
132
            '1.3.6.1.4.1.25623.1.0.100061': {'3': 'new value'},
133
            'vt_groups': ['family=debian', 'family=general'],
134
        }
135
136
        w.scan_collection.get_vts = Mock()
137
        w.scan_collection.get_vts.return_value = vts
138
139
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, w.nvti)
140
        p.kbdb.add_scan_preferences = Mock()
141
        r = p.prepare_plugins_for_openvas()
142
143
        self.assertTrue(r)
144
145
    def test_build_credentials_ssh_up(self):
146
        w = DummyDaemon()
147
148
        cred_out = [
149
            'auth_port_ssh|||22',
150
            '1.3.6.1.4.1.25623.1.0.103591:1:entry:SSH login name:|||username',
151
            '1.3.6.1.4.1.25623.1.0.103591:3:password:SSH password (unsafe!):|||pass',
152
        ]
153
        cred_dict = {
154
            'ssh': {
155
                'type': 'up',
156
                'port': '22',
157
                'username': 'username',
158
                'password': 'pass',
159
            }
160
        }
161
        p = PreferenceHandler('1234-1234', None, w.scan_collection, None)
162
163
        ret = p.build_credentials_as_prefs(cred_dict)
164
165
        self.assertEqual(ret, cred_out)
166
167
    def test_build_credentials(self):
168
        w = DummyDaemon()
169
170
        cred_out = [
171
            '1.3.6.1.4.1.25623.1.0.105058:1:entry:ESXi login name:|||username',
172
            '1.3.6.1.4.1.25623.1.0.105058:2:password:ESXi login password:|||pass',
173
            'auth_port_ssh|||22',
174
            '1.3.6.1.4.1.25623.1.0.103591:1:entry:SSH login name:|||username',
175
            '1.3.6.1.4.1.25623.1.0.103591:2:password:SSH key passphrase:|||pass',
176
            '1.3.6.1.4.1.25623.1.0.103591:4:file:SSH private key:|||',
177
            '1.3.6.1.4.1.25623.1.0.90023:1:entry:SMB login:|||username',
178
            '1.3.6.1.4.1.25623.1.0.90023:2:password]:SMB password :|||pass',
179
            '1.3.6.1.4.1.25623.1.0.105076:1:password:SNMP Community:some comunity',
180
            '1.3.6.1.4.1.25623.1.0.105076:2:entry:SNMPv3 Username:username',
181
            '1.3.6.1.4.1.25623.1.0.105076:3:password:SNMPv3 Password:pass',
182
            '1.3.6.1.4.1.25623.1.0.105076:4:radio:SNMPv3 Authentication Algorithm:some auth algo',
183
            '1.3.6.1.4.1.25623.1.0.105076:5:password:SNMPv3 Privacy Password:privacy pass',
184
            '1.3.6.1.4.1.25623.1.0.105076:6:radio:SNMPv3 Privacy Algorithm:privacy algo',
185
        ]
186
        cred_dict = {
187
            'ssh': {
188
                'type': 'ssh',
189
                'port': '22',
190
                'username': 'username',
191
                'password': 'pass',
192
            },
193
            'smb': {'type': 'smb', 'username': 'username', 'password': 'pass'},
194
            'esxi': {
195
                'type': 'esxi',
196
                'username': 'username',
197
                'password': 'pass',
198
            },
199
            'snmp': {
200
                'type': 'snmp',
201
                'username': 'username',
202
                'password': 'pass',
203
                'community': 'some comunity',
204
                'auth_algorithm': 'some auth algo',
205
                'privacy_password': 'privacy pass',
206
                'privacy_algorithm': 'privacy algo',
207
            },
208
        }
209
210
        p = PreferenceHandler('1234-1234', None, w.scan_collection, None)
211
        ret = p.build_credentials_as_prefs(cred_dict)
212
213
        self.assertEqual(len(ret), len(cred_out))
214
        self.assertIn('auth_port_ssh|||22', cred_out)
215
        self.assertIn(
216
            '1.3.6.1.4.1.25623.1.0.90023:1:entry:SMB login:|||username',
217
            cred_out,
218
        )
219
220
    def test_build_alive_test_opt_empty(self):
221
        w = DummyDaemon()
222
223
        target_options_dict = {'alive_test': '0'}
224
225
        p = PreferenceHandler('1234-1234', None, w.scan_collection, None)
226
        ret = p.build_alive_test_opt_as_prefs(target_options_dict)
227
228
        self.assertEqual(ret, {})
229
230
        # alive test was supplied via seperate xml element
231
        w = DummyDaemon()
232
        target_options_dict = {'alive_test_methods': '1', 'icmp': '0'}
233
        p = PreferenceHandler('1234-1234', None, w.scan_collection, None)
234
        ret = p.build_alive_test_opt_as_prefs(target_options_dict)
235
        self.assertEqual(ret, {})
236
237
    def test_build_alive_test_opt(self):
238
        w = DummyDaemon()
239
240
        alive_test_out = {
241
            "1.3.6.1.4.1.25623.1.0.100315:1:checkbox:Do a TCP ping": "no",
242
            "1.3.6.1.4.1.25623.1.0.100315:2:checkbox:TCP ping tries also TCP-SYN ping": "no",
243
            "1.3.6.1.4.1.25623.1.0.100315:7:checkbox:TCP ping tries only TCP-SYN ping": "no",
244
            "1.3.6.1.4.1.25623.1.0.100315:3:checkbox:Do an ICMP ping": "yes",
245
            "1.3.6.1.4.1.25623.1.0.100315:4:checkbox:Use ARP": "no",
246
            "1.3.6.1.4.1.25623.1.0.100315:5:checkbox:Mark unrechable Hosts as dead (not scanning)": "yes",
247
        }
248
249
        target_options_dict = {'alive_test': '2'}
250
        p = PreferenceHandler('1234-1234', None, w.scan_collection, None)
251
        ret = p.build_alive_test_opt_as_prefs(target_options_dict)
252
253
        self.assertEqual(ret, alive_test_out)
254
255
        # alive test was supplied via sepertae xml element
256
        w = DummyDaemon()
257
        target_options_dict = {'alive_test_methods': '1', 'icmp': '1'}
258
        p = PreferenceHandler('1234-1234', None, w.scan_collection, None)
259
        ret = p.build_alive_test_opt_as_prefs(target_options_dict)
260
        self.assertEqual(ret, alive_test_out)
261
262
    def test_build_alive_test_opt_fail_1(self):
263
        w = DummyDaemon()
264
        logging.Logger.debug = Mock()
265
266
        target_options_dict = {'alive_test': 'a'}
267
        p = PreferenceHandler('1234-1234', None, w.scan_collection, None)
268
        target_options = p.build_alive_test_opt_as_prefs(target_options_dict)
269
270
        assert_called_once(logging.Logger.debug)
271
        self.assertEqual(len(target_options), 0)
272
273
    @patch('ospd_openvas.db.KbDB')
274
    def test_set_target(self, mock_kb):
275
        w = DummyDaemon()
276
277
        w.scan_collection.get_host_list = MagicMock(return_value='192.168.0.1')
278
279
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
280
        p.scan_id = '456-789'
281
        p.kbdb.add_scan_preferences = MagicMock()
282
        p.prepare_target_for_openvas()
283
284
        p.kbdb.add_scan_preferences.assert_called_with(
285
            p.scan_id,
286
            ['TARGET|||192.168.0.1'],
287
        )
288
289
    @patch('ospd_openvas.db.KbDB')
290
    def test_set_ports(self, mock_kb):
291
        w = DummyDaemon()
292
293
        w.scan_collection.get_ports = MagicMock(return_value='80,443')
294
295
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
296
        p.scan_id = '456-789'
297
        p.kbdb.add_scan_preferences = MagicMock()
298
        p.prepare_ports_for_openvas()
299
300
        p.kbdb.add_scan_preferences.assert_called_with(
301
            p.scan_id,
302
            ['port_range|||80,443'],
303
        )
304
305
    @patch('ospd_openvas.db.KbDB')
306
    def test_set_main_kbindex(self, mock_kb):
307
        w = DummyDaemon()
308
309
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
310
        p.kbdb.add_scan_preferences = Mock()
311
        p.kbdb.index = 2
312
        p.prepare_main_kbindex_for_openvas()
313
314
        p.kbdb.add_scan_preferences.assert_called_with(
315
            p.scan_id,
316
            ['ov_maindbid|||2'],
317
        )
318
319
    @patch('ospd_openvas.db.KbDB')
320
    def test_set_credentials(self, mock_kb):
321
        w = DummyDaemon()
322
323
        creds = {
324
            'ssh': {
325
                'type': 'ssh',
326
                'port': '22',
327
                'username': 'username',
328
                'password': 'pass',
329
            },
330
            'smb': {'type': 'smb', 'username': 'username', 'password': 'pass'},
331
            'esxi': {
332
                'type': 'esxi',
333
                'username': 'username',
334
                'password': 'pass',
335
            },
336
            'snmp': {
337
                'type': 'snmp',
338
                'username': 'username',
339
                'password': 'pass',
340
                'community': 'some comunity',
341
                'auth_algorithm': 'some auth algo',
342
                'privacy_password': 'privacy pass',
343
                'privacy_algorithm': 'privacy algo',
344
            },
345
        }
346
347
        w.scan_collection.get_credentials = MagicMock(return_value=creds)
348
349
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
350
        p.scan_id = '456-789'
351
        p.kbdb.add_scan_preferences = MagicMock()
352
        r = p.prepare_credentials_for_openvas()
353
354
        self.assertTrue(r)
355
        assert_called_once(p.kbdb.add_scan_preferences)
356
357
    @patch('ospd_openvas.db.KbDB')
358
    def test_set_credentials(self, mock_kb):
359
        w = DummyDaemon()
360
361
        # bad cred type shh instead of ssh
362
        creds = {
363
            'shh': {
364
                'type': 'ssh',
365
                'port': '22',
366
                'username': 'username',
367
                'password': 'pass',
368
            },
369
        }
370
371
        w.scan_collection.get_credentials = MagicMock(return_value=creds)
372
373
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
374
        p.scan_id = '456-789'
375
        p.kbdb.add_scan_preferences = MagicMock()
376
        r = p.prepare_credentials_for_openvas()
377
378
        self.assertFalse(r)
379
380
    @patch('ospd_openvas.db.KbDB')
381
    def test_set_credentials_empty(self, mock_kb):
382
        w = DummyDaemon()
383
384
        creds = {}
385
386
        w.scan_collection.get_credentials = MagicMock(return_value=creds)
387
388
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
389
        p.scan_id = '456-789'
390
        p.kbdb.add_scan_preferences = MagicMock()
391
        r = p.prepare_credentials_for_openvas()
392
393
        self.assertTrue(r)
394
395
    @patch('ospd_openvas.db.KbDB')
396
    def test_set_host_options(self, mock_kb):
397
        w = DummyDaemon()
398
399
        exc = '192.168.0.1'
400
401
        w.scan_collection.get_exclude_hosts = MagicMock(return_value=exc)
402
403
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
404
        p.scan_id = '456-789'
405
        p.kbdb.add_scan_preferences = MagicMock()
406
        p.prepare_host_options_for_openvas()
407
408
        p.kbdb.add_scan_preferences.assert_called_with(
409
            p.scan_id,
410
            ['exclude_hosts|||192.168.0.1'],
411
        )
412
413
    @patch('ospd_openvas.db.KbDB')
414
    def test_set_host_options_none(self, mock_kb):
415
        w = DummyDaemon()
416
417
        exc = ''
418
419
        w.scan_collection.get_exclude_hosts = MagicMock(return_value=exc)
420
421
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
422
        p.scan_id = '456-789'
423
        p.kbdb.add_scan_preferences = MagicMock()
424
        p.prepare_host_options_for_openvas()
425
426
        p.kbdb.add_scan_preferences.assert_not_called()
427
428
    @patch('ospd_openvas.db.KbDB')
429
    def test_set_scan_params(self, mock_kb):
430
        w = DummyDaemon()
431
432
        OSPD_PARAMS_MOCK = {
433
            'drop_privileges': {
434
                'type': 'boolean',
435
                'name': 'drop_privileges',
436
                'default': 0,
437
                'mandatory': 1,
438
                'description': '',
439
            },
440
        }
441
442
        opt = {'drop_privileges': 1}
443
444
        w.scan_collection.get_options = MagicMock(return_value=opt)
445
446
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
447
        p.scan_id = '456-789'
448
        p.kbdb.add_scan_preferences = MagicMock()
449
        p.prepare_scan_params_for_openvas(OSPD_PARAMS_MOCK)
450
451
        p.kbdb.add_scan_preferences.assert_called_with(
452
            p.scan_id, ['drop_privileges|||yes']
453
        )
454
455
    @patch('ospd_openvas.db.KbDB')
456
    def test_set_reverse_lookup_opt(self, mock_kb):
457
        w = DummyDaemon()
458
459
        t_opt = {'reverse_lookup_only': 1}
460
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
461
462
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
463
        p.scan_id = '456-789'
464
        p.kbdb.add_scan_preferences = MagicMock()
465
        p.prepare_reverse_lookup_opt_for_openvas()
466
467
        p.kbdb.add_scan_preferences.assert_called_with(
468
            p.scan_id,
469
            [
470
                'reverse_lookup_only|||yes',
471
                'reverse_lookup_unify|||no',
472
            ],
473
        )
474
475
    @patch('ospd_openvas.db.KbDB')
476
    def test_set_boreas_alive_test_with_settings(self, mock_kb):
477
        # No Boreas config setting (BOREAS_SETTING_NAME) set
478
        w = DummyDaemon()
479
        ov_setting = {'not_the_correct_setting': 1}
480
        t_opt = {}
481
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
482
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
483
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
484
            p.scan_id = '456-789'
485
            p.kbdb.add_scan_preferences = MagicMock()
486
            p.prepare_boreas_alive_test()
487
488
            p.kbdb.add_scan_preferences.assert_not_called()
489
490
        # Boreas config setting set but invalid alive_test.
491
        w = DummyDaemon()
492
        t_opt = {'alive_test': "error"}
493
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
494
        ov_setting = {BOREAS_SETTING_NAME: 1}
495
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
496
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
497
            p.scan_id = '456-789'
498
            p.kbdb.add_scan_preferences = MagicMock()
499
            p.prepare_boreas_alive_test()
500
501
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||2'])]
502
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
503
504
        # ALIVE_TEST_TCP_SYN_SERVICE as alive test.
505
        w = DummyDaemon()
506
        t_opt = {'alive_test': AliveTest.ALIVE_TEST_TCP_SYN_SERVICE}
507
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
508
        ov_setting = {BOREAS_SETTING_NAME: 1}
509
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
510
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
511
            p.scan_id = '456-789'
512
            p.kbdb.add_scan_preferences = MagicMock()
513
            p.prepare_boreas_alive_test()
514
515
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||16'])]
516
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
517
518
        # ICMP was chosen as alive test.
519
        w = DummyDaemon()
520
        t_opt = {'alive_test': AliveTest.ALIVE_TEST_ICMP}
521
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
522
        ov_setting = {BOREAS_SETTING_NAME: 1}
523
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
524
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
525
            p.scan_id = '456-789'
526
            p.kbdb.add_scan_preferences = MagicMock()
527
            p.prepare_boreas_alive_test()
528
529
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||2'])]
530
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
531
532
        # "Scan Config Default" as alive_test.
533
        w = DummyDaemon()
534
        t_opt = {'alive_test': AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT}
535
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
536
        ov_setting = {BOREAS_SETTING_NAME: 1}
537
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
538
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
539
            p.scan_id = '456-789'
540
            p.kbdb.add_scan_preferences = MagicMock()
541
            p.prepare_boreas_alive_test()
542
543
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||2'])]
544
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
545
546
        # TCP-SYN alive test and dedicated port list for alive scan provided.
547
        w = DummyDaemon()
548
        t_opt = {
549
            'alive_test_ports': "80,137",
550
            'alive_test': AliveTest.ALIVE_TEST_TCP_SYN_SERVICE,
551
        }
552
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
553
        ov_setting = {BOREAS_SETTING_NAME: 1}
554
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
555
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
556
            p.scan_id = '456-789'
557
            p.kbdb.add_scan_preferences = MagicMock()
558
            p.prepare_boreas_alive_test()
559
560
            calls = [
561
                call(p.scan_id, [BOREAS_ALIVE_TEST + '|||16']),
562
                call(p.scan_id, [BOREAS_ALIVE_TEST_PORTS + '|||80,137']),
563
            ]
564
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
565
566
    @patch('ospd_openvas.db.KbDB')
567
    def test_set_boreas_alive_test_not_as_enum(self, mock_kb):
568
        # No Boreas config setting (BOREAS_SETTING_NAME) set
569
        w = DummyDaemon()
570
        ov_setting = {'not_the_correct_setting': 1}
571
        t_opt = {}
572
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
573
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
574
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
575
            p.scan_id = '456-789'
576
            p.kbdb.add_scan_preferences = MagicMock()
577
            p.prepare_boreas_alive_test()
578
579
            p.kbdb.add_scan_preferences.assert_not_called()
580
581
        # Boreas config setting set but invalid alive_test.
582
        w = DummyDaemon()
583
        t_opt = {'alive_test_methods': "1", 'arp': '-1'}
584
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
585
        ov_setting = {BOREAS_SETTING_NAME: 1}
586
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
587
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
588
            p.scan_id = '456-789'
589
            p.kbdb.add_scan_preferences = MagicMock()
590
            p.prepare_boreas_alive_test()
591
592
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||2'])]
593
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
594
595
        # ICMP was chosen as alive test.
596
        w = DummyDaemon()
597
        t_opt = {'alive_test_methods': "1", 'icmp': '1'}
598
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
599
        ov_setting = {BOREAS_SETTING_NAME: 1}
600
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
601
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
602
            p.scan_id = '456-789'
603
            p.kbdb.add_scan_preferences = MagicMock()
604
            p.prepare_boreas_alive_test()
605
606
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||2'])]
607
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
608
609
        # tcp_syn as alive test.
610
        w = DummyDaemon()
611
        t_opt = {'alive_test_methods': "1", 'tcp_syn': '1'}
612
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
613
        ov_setting = {BOREAS_SETTING_NAME: 1}
614
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
615
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
616
            p.scan_id = '456-789'
617
            p.kbdb.add_scan_preferences = MagicMock()
618
            p.prepare_boreas_alive_test()
619
620
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||16'])]
621
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
622
623
        # tcp_ack as alive test.
624
        w = DummyDaemon()
625
        t_opt = {'alive_test_methods': "1", 'tcp_ack': '1'}
626
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
627
        ov_setting = {BOREAS_SETTING_NAME: 1}
628
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
629
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
630
            p.scan_id = '456-789'
631
            p.kbdb.add_scan_preferences = MagicMock()
632
            p.prepare_boreas_alive_test()
633
634
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||1'])]
635
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
636
637
        # arp as alive test.
638
        w = DummyDaemon()
639
        t_opt = {'alive_test_methods': "1", 'arp': '1'}
640
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
641
        ov_setting = {BOREAS_SETTING_NAME: 1}
642
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
643
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
644
            p.scan_id = '456-789'
645
            p.kbdb.add_scan_preferences = MagicMock()
646
            p.prepare_boreas_alive_test()
647
648
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||4'])]
649
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
650
651
        # arp as alive test.
652
        w = DummyDaemon()
653
        t_opt = {'alive_test_methods': "1", 'consider_alive': '1'}
654
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
655
        ov_setting = {BOREAS_SETTING_NAME: 1}
656
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
657
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
658
            p.scan_id = '456-789'
659
            p.kbdb.add_scan_preferences = MagicMock()
660
            p.prepare_boreas_alive_test()
661
662
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||8'])]
663
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
664
665
        # all alive test methods
666
        w = DummyDaemon()
667
        t_opt = {
668
            'alive_test_methods': "1",
669
            'icmp': '1',
670
            'tcp_ack': '1',
671
            'tcp_syn': '1',
672
            'arp': '1',
673
            'consider_alive': '1',
674
        }
675
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
676
        ov_setting = {BOREAS_SETTING_NAME: 1}
677
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
678
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
679
            p.scan_id = '456-789'
680
            p.kbdb.add_scan_preferences = MagicMock()
681
            p.prepare_boreas_alive_test()
682
683
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||31'])]
684
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
685
686
        # TCP-SYN alive test and dedicated port list for alive scan provided.
687
        w = DummyDaemon()
688
        t_opt = {
689
            'alive_test_ports': "80,137",
690
            'alive_test_methods': "1",
691
            'tcp_syn': '1',
692
        }
693
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
694
        ov_setting = {BOREAS_SETTING_NAME: 1}
695
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
696
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
697
            p.scan_id = '456-789'
698
            p.kbdb.add_scan_preferences = MagicMock()
699
            p.prepare_boreas_alive_test()
700
701
            calls = [
702
                call(p.scan_id, [BOREAS_ALIVE_TEST + '|||16']),
703
                call(p.scan_id, [BOREAS_ALIVE_TEST_PORTS + '|||80,137']),
704
            ]
705
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
706
707
    @patch('ospd_openvas.db.KbDB')
708
    def test_set_boreas_alive_test_enum_has_precedence(self, mock_kb):
709
        w = DummyDaemon()
710
        t_opt = {
711
            'alive_test_methods': "1",
712
            'consider_alive': '1',
713
            'alive_test': AliveTest.ALIVE_TEST_ICMP,
714
        }
715
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
716
        ov_setting = {BOREAS_SETTING_NAME: 1}
717
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
718
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
719
            p.scan_id = '456-789'
720
            p.kbdb.add_scan_preferences = MagicMock()
721
            p.prepare_boreas_alive_test()
722
723
            # has icmp and not consider_alive
724
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||2'])]
725
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
726
727
    @patch('ospd_openvas.db.KbDB')
728
    def test_set_boreas_alive_test_without_settings(self, mock_kb):
729
        w = DummyDaemon()
730
        t_opt = {'alive_test': 16}
731
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
732
        ov_setting = {}
733
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
734
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
735
            p.scan_id = '456-789'
736
            p.kbdb.add_scan_preferences = MagicMock()
737
            p.prepare_boreas_alive_test()
738
739
            p.kbdb.add_scan_preferences.assert_not_called()
740
741 View Code Duplication
    @patch('ospd_openvas.db.KbDB')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
742
    def test_set_alive_no_setting(self, mock_kb):
743
        w = DummyDaemon()
744
745
        t_opt = {}
746
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
747
748
        ov_setting = {}
749
750
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
751
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
752
            p.scan_id = '456-789'
753
            p.kbdb.add_scan_preferences = MagicMock()
754
            p.prepare_alive_test_option_for_openvas()
755
756
            p.kbdb.add_scan_preferences.assert_not_called()
757
758 View Code Duplication
    @patch('ospd_openvas.db.KbDB')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
759
    def test_set_alive_no_invalid_alive_test(self, mock_kb):
760
        w = DummyDaemon()
761
762
        t_opt = {'alive_test': -1}
763
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
764
765
        ov_setting = {'some_setting': 1}
766
767
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
768
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
769
            p._nvts_params = {}
770
            p.scan_id = '456-789'
771
            p.kbdb.add_scan_preferences = MagicMock()
772
            p.prepare_alive_test_option_for_openvas()
773
774
            p.kbdb.add_scan_preferences.assert_not_called()
775
776
    @patch('ospd_openvas.db.KbDB')
777
    def test_set_alive_no_invalid_alive_test_no_enum(self, mock_kb):
778
        w = DummyDaemon()
779
780
        t_opt = {'alive_test_methods': '1', 'icmp': '-1'}
781
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
782
783
        ov_setting = {'some_setting': 1}
784
785
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
786
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
787
            p._nvts_params = {}
788
            p.scan_id = '456-789'
789
            p.kbdb.add_scan_preferences = MagicMock()
790
            p.prepare_alive_test_option_for_openvas()
791
792
            p.kbdb.add_scan_preferences.assert_not_called()
793
794
    @patch('ospd_openvas.db.KbDB')
795
    def test_set_alive_pinghost(self, mock_kb):
796
        w = DummyDaemon()
797
798
        alive_test_out = [
799
            "1.3.6.1.4.1.25623.1.0.100315:1:checkbox:Do a TCP ping|||no",
800
            "1.3.6.1.4.1.25623.1.0.100315:2:checkbox:TCP ping tries also TCP-SYN ping|||no",
801
            "1.3.6.1.4.1.25623.1.0.100315:7:checkbox:TCP ping tries only TCP-SYN ping|||no",
802
            "1.3.6.1.4.1.25623.1.0.100315:3:checkbox:Do an ICMP ping|||yes",
803
            "1.3.6.1.4.1.25623.1.0.100315:4:checkbox:Use ARP|||no",
804
            "1.3.6.1.4.1.25623.1.0.100315:5:checkbox:Mark unrechable Hosts as dead (not scanning)|||yes",
805
        ]
806
807
        t_opt = {'alive_test': 2}
808
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
809
810
        ov_setting = {'some_setting': 1}
811
812
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
813
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
814
            p._nvts_params = {}
815
            p.scan_id = '456-789'
816
            p.kbdb.add_scan_preferences = MagicMock()
817
            p.prepare_alive_test_option_for_openvas()
818
819
            for key, value in p._nvts_params.items():
820
                self.assertTrue(
821
                    "{0}|||{1}".format(key, value) in alive_test_out
822
                )
823
824
    @patch('ospd_openvas.db.KbDB')
825
    def test_prepare_alive_test_not_supplied_as_enum(self, mock_kb):
826
        w = DummyDaemon()
827
828
        alive_test_out = {
829
            "1.3.6.1.4.1.25623.1.0.100315:1:checkbox:Do a TCP ping": "no",
830
            "1.3.6.1.4.1.25623.1.0.100315:2:checkbox:TCP ping tries also TCP-SYN ping": "no",
831
            "1.3.6.1.4.1.25623.1.0.100315:7:checkbox:TCP ping tries only TCP-SYN ping": "no",
832
            "1.3.6.1.4.1.25623.1.0.100315:3:checkbox:Do an ICMP ping": "yes",
833
            "1.3.6.1.4.1.25623.1.0.100315:4:checkbox:Use ARP": "no",
834
            "1.3.6.1.4.1.25623.1.0.100315:5:checkbox:Mark unrechable Hosts as dead (not scanning)": "yes",
835
        }
836
837
        t_opt = {'alive_test_methods': '1', 'icmp': '1'}
838
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
839
840
        ov_setting = {'some_setting': 1}
841
842
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
843
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
844
            p._nvts_params = {}
845
            p.scan_id = '456-789'
846
            p.kbdb.add_scan_preferences = MagicMock()
847
            p.prepare_alive_test_option_for_openvas()
848
849
            self.assertEqual(p._nvts_params, alive_test_out)
850
851
    @patch('ospd_openvas.db.KbDB')
852
    def test_prepare_alive_test_no_enum_no_alive_test(self, mock_kb):
853
        w = DummyDaemon()
854
855
        t_opt = {'alive_test_methods': '1', 'icmp': '0'}
856
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
857
858
        ov_setting = {'some_setting': 1}
859
860
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
861
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
862
            p._nvts_params = {}
863
            p.scan_id = '456-789'
864
            p.kbdb.add_scan_preferences = MagicMock()
865
            p.prepare_alive_test_option_for_openvas()
866
867
            p.kbdb.add_scan_preferences.assert_not_called()
868
869
    def test_alive_test_methods_to_bit_field(self):
870
871
        self.assertEqual(
872
            AliveTest.ALIVE_TEST_TCP_ACK_SERVICE,
873
            alive_test_methods_to_bit_field(
874
                icmp=False,
875
                tcp_ack=True,
876
                tcp_syn=False,
877
                arp=False,
878
                consider_alive=False,
879
            ),
880
        )
881
882
        self.assertEqual(
883
            AliveTest.ALIVE_TEST_ICMP,
884
            alive_test_methods_to_bit_field(
885
                icmp=True,
886
                tcp_ack=False,
887
                tcp_syn=False,
888
                arp=False,
889
                consider_alive=False,
890
            ),
891
        )
892
893
        self.assertEqual(
894
            AliveTest.ALIVE_TEST_ARP,
895
            alive_test_methods_to_bit_field(
896
                icmp=False,
897
                tcp_ack=False,
898
                tcp_syn=False,
899
                arp=True,
900
                consider_alive=False,
901
            ),
902
        )
903
904
        self.assertEqual(
905
            AliveTest.ALIVE_TEST_CONSIDER_ALIVE,
906
            alive_test_methods_to_bit_field(
907
                icmp=False,
908
                tcp_ack=False,
909
                tcp_syn=False,
910
                arp=False,
911
                consider_alive=True,
912
            ),
913
        )
914
915
        self.assertEqual(
916
            AliveTest.ALIVE_TEST_TCP_SYN_SERVICE,
917
            alive_test_methods_to_bit_field(
918
                icmp=False,
919
                tcp_ack=False,
920
                tcp_syn=True,
921
                arp=False,
922
                consider_alive=False,
923
            ),
924
        )
925
926
        all_alive_test_methods = (
927
            AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT
928
            | AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
929
            | AliveTest.ALIVE_TEST_ICMP
930
            | AliveTest.ALIVE_TEST_ARP
931
            | AliveTest.ALIVE_TEST_CONSIDER_ALIVE
932
            | AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
933
        )
934
        self.assertEqual(
935
            all_alive_test_methods,
936
            alive_test_methods_to_bit_field(
937
                icmp=True,
938
                tcp_ack=True,
939
                tcp_syn=True,
940
                arp=True,
941
                consider_alive=True,
942
            ),
943
        )
944
945
    @patch('ospd_openvas.db.KbDB')
946
    def test_prepare_nvt_prefs(self, mock_kb):
947
        w = DummyDaemon()
948
949
        alive_test_out = [
950
            "1.3.6.1.4.1.25623.1.0.100315:1:checkbox:Do a TCP ping|||no"
951
        ]
952
953
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
954
        p._nvts_params = {
955
            "1.3.6.1.4.1.25623.1.0.100315:1:checkbox:Do a TCP ping": "no"
956
        }
957
        p.kbdb.add_scan_preferences = MagicMock()
958
        p.prepare_nvt_preferences()
959
960
        p.kbdb.add_scan_preferences.assert_called_with(
961
            p.scan_id,
962
            alive_test_out,
963
        )
964
965
    @patch('ospd_openvas.db.KbDB')
966
    def test_prepare_nvt_prefs_no_prefs(self, mock_kb):
967
        w = DummyDaemon()
968
969
        p = PreferenceHandler('456-789', mock_kb, w.scan_collection, None)
970
        p._nvts_params = {}
971
        p.kbdb.add_scan_preferences = MagicMock()
972
        p.prepare_nvt_preferences()
973
974
        p.kbdb.add_scan_preferences.assert_not_called()
975