Passed
Pull Request — master (#331)
by
unknown
01:20
created

tests.test_preferencehandler   C

Complexity

Total Complexity 54

Size/Duplication

Total Lines 927
Duplicated Lines 18.12 %

Importance

Changes 0
Metric Value
eloc 666
dl 168
loc 927
rs 6.414
c 0
b 0
f 0
wmc 54

30 Methods

Rating   Name   Duplication   Size   Complexity  
A PreferenceHandlerTestCase.test_process_vts_not_found() 0 15 1
A PreferenceHandlerTestCase.test_set_ports() 0 14 1
A PreferenceHandlerTestCase.test_process_vts_bad_param_id() 0 13 1
B PreferenceHandlerTestCase.test_build_credentials() 0 51 1
A PreferenceHandlerTestCase.test_set_host_options_none() 0 14 1
A PreferenceHandlerTestCase.test_build_alive_test_opt() 31 31 1
A PreferenceHandlerTestCase.test_build_credentials_ssh_up() 0 21 1
C PreferenceHandlerTestCase.test_set_boreas_alive_test_with_settings() 0 90 7
A PreferenceHandlerTestCase.test_set_reverse_lookup_opt() 0 17 1
A PreferenceHandlerTestCase.test_process_vts() 0 16 1
A PreferenceHandlerTestCase.test_set_target() 0 14 1
A PreferenceHandlerTestCase.test_set_scan_params() 0 25 1
A PreferenceHandlerTestCase.test_build_alive_test_opt_empty() 16 16 1
A PreferenceHandlerTestCase.test_set_boreas_alive_test_enum_has_precedence() 0 19 2
A PreferenceHandlerTestCase.test_set_credentials() 0 37 1
A PreferenceHandlerTestCase.test_set_credentials_empty() 0 14 1
D PreferenceHandlerTestCase.test_set_boreas_alive_test_not_as_enum() 0 140 10
A PreferenceHandlerTestCase.test_set_main_kbindex() 0 12 1
A PreferenceHandlerTestCase.test_set_plugins_true() 0 17 1
A PreferenceHandlerTestCase.test_build_alive_test_opt_fail_1() 0 10 1
A PreferenceHandlerTestCase.test_set_boreas_alive_test_without_settings() 0 13 2
A PreferenceHandlerTestCase.test_set_plugins_false() 0 12 1
A PreferenceHandlerTestCase.test_set_host_options() 0 16 1
A PreferenceHandlerTestCase.test_prepare_alive_test_no_enum_no_alive_test() 17 17 2
A PreferenceHandlerTestCase.test_set_alive_no_setting() 16 16 2
A PreferenceHandlerTestCase.test_set_alive_pinghost() 28 28 2
B PreferenceHandlerTestCase.test_alive_test_methods_to_bit_field() 0 73 1
A PreferenceHandlerTestCase.test_set_alive_no_invalid_alive_test_no_enum() 16 16 2
A PreferenceHandlerTestCase.test_set_alive_no_invalid_alive_test() 16 16 2
A PreferenceHandlerTestCase.test_prepare_alive_test_not_supplied_as_enum() 28 28 2

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like tests.test_preferencehandler often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
import logging
21
22
from unittest import TestCase
23
from unittest.mock import call, patch, Mock, MagicMock
24
25
from ospd.vts import Vts
26
27
from tests.dummydaemon import DummyDaemon
28
from tests.helper import assert_called_once
29
30
import ospd_openvas.db
31
32
from ospd_openvas.openvas import Openvas
33
from ospd_openvas.preferencehandler import (
34
    AliveTest,
35
    BOREAS_SETTING_NAME,
36
    BOREAS_ALIVE_TEST,
37
    BOREAS_ALIVE_TEST_PORTS,
38
    PreferenceHandler,
39
    alive_test_methods_to_bit_field,
40
)
41
42
43
class PreferenceHandlerTestCase(TestCase):
44
    @patch('ospd_openvas.db.KbDB')
45
    def test_process_vts_not_found(self, mock_kb):
46
        w = DummyDaemon()
47
        logging.Logger.warning = Mock()
48
49
        vts = {
50
            '1.3.6.1.4.1.25623.1.0.100065': {'3': 'new value'},
51
            'vt_groups': ['family=debian', 'family=general'],
52
        }
53
54
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, w.nvti)
55
        w.nvti.get_nvt_metadata.return_value = None
56
        p._process_vts(vts)
57
58
        assert_called_once(logging.Logger.warning)
59
60
    def test_process_vts_bad_param_id(self):
61
        w = DummyDaemon()
62
63
        vts = {
64
            '1.3.6.1.4.1.25623.1.0.100061': {'3': 'new value'},
65
            'vt_groups': ['family=debian', 'family=general'],
66
        }
67
68
        p = PreferenceHandler('1234-1234', None, w.scan_collection, w.nvti)
69
70
        ret = p._process_vts(vts)
71
72
        self.assertFalse(ret[1])
73
74
    def test_process_vts(self):
75
        w = DummyDaemon()
76
77
        vts = {
78
            '1.3.6.1.4.1.25623.1.0.100061': {'1': 'new value'},
79
            'vt_groups': ['family=debian', 'family=general'],
80
        }
81
        vt_out = (
82
            ['1.3.6.1.4.1.25623.1.0.100061'],
83
            {'1.3.6.1.4.1.25623.1.0.100061:1:entry:Data length :': 'new value'},
84
        )
85
86
        p = PreferenceHandler('1234-1234', None, w.scan_collection, w.nvti)
87
        ret = p._process_vts(vts)
88
89
        self.assertEqual(ret, vt_out)
90
91
    @patch('ospd_openvas.db.KbDB')
92
    def test_set_plugins_false(self, mock_kb):
93
        w = DummyDaemon()
94
95
        w.scan_collection.get_vts = Mock()
96
        w.scan_collection.get_vts.return_value = {}
97
98
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, w.nvti)
99
        p.kbdb.add_scan_preferences = Mock()
100
        r = p.prepare_plugins_for_openvas()
101
102
        self.assertFalse(r)
103
104
    @patch('ospd_openvas.db.KbDB')
105
    def test_set_plugins_true(self, mock_kb):
106
        w = DummyDaemon()
107
108
        vts = {
109
            '1.3.6.1.4.1.25623.1.0.100061': {'3': 'new value'},
110
            'vt_groups': ['family=debian', 'family=general'],
111
        }
112
113
        w.scan_collection.get_vts = Mock()
114
        w.scan_collection.get_vts.return_value = vts
115
116
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, w.nvti)
117
        p.kbdb.add_scan_preferences = Mock()
118
        r = p.prepare_plugins_for_openvas()
119
120
        self.assertTrue(r)
121
122
    def test_build_credentials_ssh_up(self):
123
        w = DummyDaemon()
124
125
        cred_out = [
126
            'auth_port_ssh|||22',
127
            '1.3.6.1.4.1.25623.1.0.103591:1:entry:SSH login name:|||username',
128
            '1.3.6.1.4.1.25623.1.0.103591:3:password:SSH password (unsafe!):|||pass',
129
        ]
130
        cred_dict = {
131
            'ssh': {
132
                'type': 'up',
133
                'port': '22',
134
                'username': 'username',
135
                'password': 'pass',
136
            }
137
        }
138
        p = PreferenceHandler('1234-1234', None, w.scan_collection, None)
139
140
        ret = p.build_credentials_as_prefs(cred_dict)
141
142
        self.assertEqual(ret, cred_out)
143
144
    def test_build_credentials(self):
145
        w = DummyDaemon()
146
147
        cred_out = [
148
            '1.3.6.1.4.1.25623.1.0.105058:1:entry:ESXi login name:|||username',
149
            '1.3.6.1.4.1.25623.1.0.105058:2:password:ESXi login password:|||pass',
150
            'auth_port_ssh|||22',
151
            '1.3.6.1.4.1.25623.1.0.103591:1:entry:SSH login name:|||username',
152
            '1.3.6.1.4.1.25623.1.0.103591:2:password:SSH key passphrase:|||pass',
153
            '1.3.6.1.4.1.25623.1.0.103591:4:file:SSH private key:|||',
154
            '1.3.6.1.4.1.25623.1.0.90023:1:entry:SMB login:|||username',
155
            '1.3.6.1.4.1.25623.1.0.90023:2:password]:SMB password :|||pass',
156
            '1.3.6.1.4.1.25623.1.0.105076:1:password:SNMP Community:some comunity',
157
            '1.3.6.1.4.1.25623.1.0.105076:2:entry:SNMPv3 Username:username',
158
            '1.3.6.1.4.1.25623.1.0.105076:3:password:SNMPv3 Password:pass',
159
            '1.3.6.1.4.1.25623.1.0.105076:4:radio:SNMPv3 Authentication Algorithm:some auth algo',
160
            '1.3.6.1.4.1.25623.1.0.105076:5:password:SNMPv3 Privacy Password:privacy pass',
161
            '1.3.6.1.4.1.25623.1.0.105076:6:radio:SNMPv3 Privacy Algorithm:privacy algo',
162
        ]
163
        cred_dict = {
164
            'ssh': {
165
                'type': 'ssh',
166
                'port': '22',
167
                'username': 'username',
168
                'password': 'pass',
169
            },
170
            'smb': {'type': 'smb', 'username': 'username', 'password': 'pass'},
171
            'esxi': {
172
                'type': 'esxi',
173
                'username': 'username',
174
                'password': 'pass',
175
            },
176
            'snmp': {
177
                'type': 'snmp',
178
                'username': 'username',
179
                'password': 'pass',
180
                'community': 'some comunity',
181
                'auth_algorithm': 'some auth algo',
182
                'privacy_password': 'privacy pass',
183
                'privacy_algorithm': 'privacy algo',
184
            },
185
        }
186
187
        p = PreferenceHandler('1234-1234', None, w.scan_collection, None)
188
        ret = p.build_credentials_as_prefs(cred_dict)
189
190
        self.assertEqual(len(ret), len(cred_out))
191
        self.assertIn('auth_port_ssh|||22', cred_out)
192
        self.assertIn(
193
            '1.3.6.1.4.1.25623.1.0.90023:1:entry:SMB login:|||username',
194
            cred_out,
195
        )
196
197 View Code Duplication
    def test_build_alive_test_opt_empty(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
198
        w = DummyDaemon()
199
200
        target_options_dict = {'alive_test': '0'}
201
202
        p = PreferenceHandler('1234-1234', None, w.scan_collection, None)
203
        ret = p.build_alive_test_opt_as_prefs(target_options_dict)
204
205
        self.assertEqual(ret, [])
206
207
        # alive test was supplied via seperate xml element
208
        w = DummyDaemon()
209
        target_options_dict = {'alive_test_methods': '1', 'icmp': '0'}
210
        p = PreferenceHandler('1234-1234', None, w.scan_collection, None)
211
        ret = p.build_alive_test_opt_as_prefs(target_options_dict)
212
        self.assertEqual(ret, [])
213
214 View Code Duplication
    def test_build_alive_test_opt(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
215
        w = DummyDaemon()
216
217
        alive_test_out = [
218
            "1.3.6.1.4.1.25623.1.0.100315:1:checkbox:Do a TCP ping|||no",
219
            "1.3.6.1.4.1.25623.1.0.100315:2:checkbox:TCP ping tries also TCP-SYN ping|||no",
220
            "1.3.6.1.4.1.25623.1.0.100315:7:checkbox:TCP ping tries only TCP-SYN ping|||no",
221
            "1.3.6.1.4.1.25623.1.0.100315:3:checkbox:Do an ICMP ping|||yes",
222
            "1.3.6.1.4.1.25623.1.0.100315:4:checkbox:Use ARP|||no",
223
            "1.3.6.1.4.1.25623.1.0.100315:5:checkbox:Mark unrechable Hosts as dead (not scanning)|||yes",
224
        ]
225
        target_options_dict = {'alive_test': '2'}
226
        p = PreferenceHandler('1234-1234', None, w.scan_collection, None)
227
        ret = p.build_alive_test_opt_as_prefs(target_options_dict)
228
229
        self.assertEqual(ret, alive_test_out)
230
231
        # alive test was supplied via sepertae xml element
232
        w = DummyDaemon()
233
        alive_test_out = [
234
            "1.3.6.1.4.1.25623.1.0.100315:1:checkbox:Do a TCP ping|||no",
235
            "1.3.6.1.4.1.25623.1.0.100315:2:checkbox:TCP ping tries also TCP-SYN ping|||no",
236
            "1.3.6.1.4.1.25623.1.0.100315:7:checkbox:TCP ping tries only TCP-SYN ping|||no",
237
            "1.3.6.1.4.1.25623.1.0.100315:3:checkbox:Do an ICMP ping|||yes",
238
            "1.3.6.1.4.1.25623.1.0.100315:4:checkbox:Use ARP|||no",
239
            "1.3.6.1.4.1.25623.1.0.100315:5:checkbox:Mark unrechable Hosts as dead (not scanning)|||yes",
240
        ]
241
        target_options_dict = {'alive_test_methods': '1', 'icmp': '1'}
242
        p = PreferenceHandler('1234-1234', None, w.scan_collection, None)
243
        ret = p.build_alive_test_opt_as_prefs(target_options_dict)
244
        self.assertEqual(ret, alive_test_out)
245
246
    def test_build_alive_test_opt_fail_1(self):
247
        w = DummyDaemon()
248
        logging.Logger.debug = Mock()
249
250
        target_options_dict = {'alive_test': 'a'}
251
        p = PreferenceHandler('1234-1234', None, w.scan_collection, None)
252
        target_options = p.build_alive_test_opt_as_prefs(target_options_dict)
253
254
        assert_called_once(logging.Logger.debug)
255
        self.assertEqual(len(target_options), 0)
256
257
    @patch('ospd_openvas.db.KbDB')
258
    def test_set_target(self, mock_kb):
259
        w = DummyDaemon()
260
261
        w.scan_collection.get_host_list = MagicMock(return_value='192.168.0.1')
262
263
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
264
        p.scan_id = '456-789'
265
        p.kbdb.add_scan_preferences = MagicMock()
266
        p.prepare_target_for_openvas()
267
268
        p.kbdb.add_scan_preferences.assert_called_with(
269
            p.scan_id,
270
            ['TARGET|||192.168.0.1'],
271
        )
272
273
    @patch('ospd_openvas.db.KbDB')
274
    def test_set_ports(self, mock_kb):
275
        w = DummyDaemon()
276
277
        w.scan_collection.get_ports = MagicMock(return_value='80,443')
278
279
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
280
        p.scan_id = '456-789'
281
        p.kbdb.add_scan_preferences = MagicMock()
282
        p.prepare_ports_for_openvas()
283
284
        p.kbdb.add_scan_preferences.assert_called_with(
285
            p.scan_id,
286
            ['port_range|||80,443'],
287
        )
288
289
    @patch('ospd_openvas.db.KbDB')
290
    def test_set_main_kbindex(self, mock_kb):
291
        w = DummyDaemon()
292
293
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
294
        p.kbdb.add_scan_preferences = Mock()
295
        p.kbdb.index = 2
296
        p.prepare_main_kbindex_for_openvas()
297
298
        p.kbdb.add_scan_preferences.assert_called_with(
299
            p.scan_id,
300
            ['ov_maindbid|||2'],
301
        )
302
303
    @patch('ospd_openvas.db.KbDB')
304
    def test_set_credentials(self, mock_kb):
305
        w = DummyDaemon()
306
307
        creds = {
308
            'ssh': {
309
                'type': 'ssh',
310
                'port': '22',
311
                'username': 'username',
312
                'password': 'pass',
313
            },
314
            'smb': {'type': 'smb', 'username': 'username', 'password': 'pass'},
315
            'esxi': {
316
                'type': 'esxi',
317
                'username': 'username',
318
                'password': 'pass',
319
            },
320
            'snmp': {
321
                'type': 'snmp',
322
                'username': 'username',
323
                'password': 'pass',
324
                'community': 'some comunity',
325
                'auth_algorithm': 'some auth algo',
326
                'privacy_password': 'privacy pass',
327
                'privacy_algorithm': 'privacy algo',
328
            },
329
        }
330
331
        w.scan_collection.get_credentials = MagicMock(return_value=creds)
332
333
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
334
        p.scan_id = '456-789'
335
        p.kbdb.add_scan_preferences = MagicMock()
336
        r = p.prepare_credentials_for_openvas()
337
338
        self.assertTrue(r)
339
        assert_called_once(p.kbdb.add_scan_preferences)
340
341
    @patch('ospd_openvas.db.KbDB')
342
    def test_set_credentials(self, mock_kb):
343
        w = DummyDaemon()
344
345
        # bad cred type shh instead of ssh
346
        creds = {
347
            'shh': {
348
                'type': 'ssh',
349
                'port': '22',
350
                'username': 'username',
351
                'password': 'pass',
352
            },
353
        }
354
355
        w.scan_collection.get_credentials = MagicMock(return_value=creds)
356
357
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
358
        p.scan_id = '456-789'
359
        p.kbdb.add_scan_preferences = MagicMock()
360
        r = p.prepare_credentials_for_openvas()
361
362
        self.assertFalse(r)
363
364
    @patch('ospd_openvas.db.KbDB')
365
    def test_set_credentials_empty(self, mock_kb):
366
        w = DummyDaemon()
367
368
        creds = {}
369
370
        w.scan_collection.get_credentials = MagicMock(return_value=creds)
371
372
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
373
        p.scan_id = '456-789'
374
        p.kbdb.add_scan_preferences = MagicMock()
375
        r = p.prepare_credentials_for_openvas()
376
377
        self.assertTrue(r)
378
379
    @patch('ospd_openvas.db.KbDB')
380
    def test_set_host_options(self, mock_kb):
381
        w = DummyDaemon()
382
383
        exc = '192.168.0.1'
384
385
        w.scan_collection.get_exclude_hosts = MagicMock(return_value=exc)
386
387
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
388
        p.scan_id = '456-789'
389
        p.kbdb.add_scan_preferences = MagicMock()
390
        p.prepare_host_options_for_openvas()
391
392
        p.kbdb.add_scan_preferences.assert_called_with(
393
            p.scan_id,
394
            ['exclude_hosts|||192.168.0.1'],
395
        )
396
397
    @patch('ospd_openvas.db.KbDB')
398
    def test_set_host_options_none(self, mock_kb):
399
        w = DummyDaemon()
400
401
        exc = ''
402
403
        w.scan_collection.get_exclude_hosts = MagicMock(return_value=exc)
404
405
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
406
        p.scan_id = '456-789'
407
        p.kbdb.add_scan_preferences = MagicMock()
408
        p.prepare_host_options_for_openvas()
409
410
        p.kbdb.add_scan_preferences.assert_not_called()
411
412
    @patch('ospd_openvas.db.KbDB')
413
    def test_set_scan_params(self, mock_kb):
414
        w = DummyDaemon()
415
416
        OSPD_PARAMS_MOCK = {
417
            'drop_privileges': {
418
                'type': 'boolean',
419
                'name': 'drop_privileges',
420
                'default': 0,
421
                'mandatory': 1,
422
                'description': '',
423
            },
424
        }
425
426
        opt = {'drop_privileges': 1}
427
428
        w.scan_collection.get_options = MagicMock(return_value=opt)
429
430
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
431
        p.scan_id = '456-789'
432
        p.kbdb.add_scan_preferences = MagicMock()
433
        p.prepare_scan_params_for_openvas(OSPD_PARAMS_MOCK)
434
435
        p.kbdb.add_scan_preferences.assert_called_with(
436
            p.scan_id, ['drop_privileges|||yes']
437
        )
438
439
    @patch('ospd_openvas.db.KbDB')
440
    def test_set_reverse_lookup_opt(self, mock_kb):
441
        w = DummyDaemon()
442
443
        t_opt = {'reverse_lookup_only': 1}
444
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
445
446
        p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
447
        p.scan_id = '456-789'
448
        p.kbdb.add_scan_preferences = MagicMock()
449
        p.prepare_reverse_lookup_opt_for_openvas()
450
451
        p.kbdb.add_scan_preferences.assert_called_with(
452
            p.scan_id,
453
            [
454
                'reverse_lookup_only|||yes',
455
                'reverse_lookup_unify|||no',
456
            ],
457
        )
458
459
    @patch('ospd_openvas.db.KbDB')
460
    def test_set_boreas_alive_test_with_settings(self, mock_kb):
461
        # No Boreas config setting (BOREAS_SETTING_NAME) set
462
        w = DummyDaemon()
463
        ov_setting = {'not_the_correct_setting': 1}
464
        t_opt = {}
465
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
466
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
467
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
468
            p.scan_id = '456-789'
469
            p.kbdb.add_scan_preferences = MagicMock()
470
            p.prepare_boreas_alive_test()
471
472
            p.kbdb.add_scan_preferences.assert_not_called()
473
474
        # Boreas config setting set but invalid alive_test.
475
        w = DummyDaemon()
476
        t_opt = {'alive_test': "error"}
477
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
478
        ov_setting = {BOREAS_SETTING_NAME: 1}
479
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
480
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
481
            p.scan_id = '456-789'
482
            p.kbdb.add_scan_preferences = MagicMock()
483
            p.prepare_boreas_alive_test()
484
485
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||2'])]
486
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
487
488
        # ALIVE_TEST_TCP_SYN_SERVICE as alive test.
489
        w = DummyDaemon()
490
        t_opt = {'alive_test': AliveTest.ALIVE_TEST_TCP_SYN_SERVICE}
491
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
492
        ov_setting = {BOREAS_SETTING_NAME: 1}
493
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
494
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
495
            p.scan_id = '456-789'
496
            p.kbdb.add_scan_preferences = MagicMock()
497
            p.prepare_boreas_alive_test()
498
499
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||16'])]
500
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
501
502
        # ICMP was chosen as alive test.
503
        w = DummyDaemon()
504
        t_opt = {'alive_test': AliveTest.ALIVE_TEST_ICMP}
505
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
506
        ov_setting = {BOREAS_SETTING_NAME: 1}
507
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
508
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
509
            p.scan_id = '456-789'
510
            p.kbdb.add_scan_preferences = MagicMock()
511
            p.prepare_boreas_alive_test()
512
513
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||2'])]
514
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
515
516
        # "Scan Config Default" as alive_test.
517
        w = DummyDaemon()
518
        t_opt = {'alive_test': AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT}
519
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
520
        ov_setting = {BOREAS_SETTING_NAME: 1}
521
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
522
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
523
            p.scan_id = '456-789'
524
            p.kbdb.add_scan_preferences = MagicMock()
525
            p.prepare_boreas_alive_test()
526
527
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||2'])]
528
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
529
530
        # TCP-SYN alive test and dedicated port list for alive scan provided.
531
        w = DummyDaemon()
532
        t_opt = {
533
            'alive_test_ports': "80,137",
534
            'alive_test': AliveTest.ALIVE_TEST_TCP_SYN_SERVICE,
535
        }
536
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
537
        ov_setting = {BOREAS_SETTING_NAME: 1}
538
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
539
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
540
            p.scan_id = '456-789'
541
            p.kbdb.add_scan_preferences = MagicMock()
542
            p.prepare_boreas_alive_test()
543
544
            calls = [
545
                call(p.scan_id, [BOREAS_ALIVE_TEST + '|||16']),
546
                call(p.scan_id, [BOREAS_ALIVE_TEST_PORTS + '|||80,137']),
547
            ]
548
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
549
550
    @patch('ospd_openvas.db.KbDB')
551
    def test_set_boreas_alive_test_not_as_enum(self, mock_kb):
552
        # No Boreas config setting (BOREAS_SETTING_NAME) set
553
        w = DummyDaemon()
554
        ov_setting = {'not_the_correct_setting': 1}
555
        t_opt = {}
556
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
557
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
558
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
559
            p.scan_id = '456-789'
560
            p.kbdb.add_scan_preferences = MagicMock()
561
            p.prepare_boreas_alive_test()
562
563
            p.kbdb.add_scan_preferences.assert_not_called()
564
565
        # Boreas config setting set but invalid alive_test.
566
        w = DummyDaemon()
567
        t_opt = {'alive_test_methods': "1", 'arp': '-1'}
568
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
569
        ov_setting = {BOREAS_SETTING_NAME: 1}
570
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
571
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
572
            p.scan_id = '456-789'
573
            p.kbdb.add_scan_preferences = MagicMock()
574
            p.prepare_boreas_alive_test()
575
576
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||2'])]
577
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
578
579
        # ICMP was chosen as alive test.
580
        w = DummyDaemon()
581
        t_opt = {'alive_test_methods': "1", 'icmp': '1'}
582
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
583
        ov_setting = {BOREAS_SETTING_NAME: 1}
584
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
585
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
586
            p.scan_id = '456-789'
587
            p.kbdb.add_scan_preferences = MagicMock()
588
            p.prepare_boreas_alive_test()
589
590
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||2'])]
591
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
592
593
        # tcp_syn as alive test.
594
        w = DummyDaemon()
595
        t_opt = {'alive_test_methods': "1", 'tcp_syn': '1'}
596
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
597
        ov_setting = {BOREAS_SETTING_NAME: 1}
598
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
599
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
600
            p.scan_id = '456-789'
601
            p.kbdb.add_scan_preferences = MagicMock()
602
            p.prepare_boreas_alive_test()
603
604
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||16'])]
605
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
606
607
        # tcp_ack as alive test.
608
        w = DummyDaemon()
609
        t_opt = {'alive_test_methods': "1", 'tcp_ack': '1'}
610
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
611
        ov_setting = {BOREAS_SETTING_NAME: 1}
612
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
613
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
614
            p.scan_id = '456-789'
615
            p.kbdb.add_scan_preferences = MagicMock()
616
            p.prepare_boreas_alive_test()
617
618
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||1'])]
619
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
620
621
        # arp as alive test.
622
        w = DummyDaemon()
623
        t_opt = {'alive_test_methods': "1", 'arp': '1'}
624
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
625
        ov_setting = {BOREAS_SETTING_NAME: 1}
626
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
627
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
628
            p.scan_id = '456-789'
629
            p.kbdb.add_scan_preferences = MagicMock()
630
            p.prepare_boreas_alive_test()
631
632
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||4'])]
633
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
634
635
        # arp as alive test.
636
        w = DummyDaemon()
637
        t_opt = {'alive_test_methods': "1", 'consider_alive': '1'}
638
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
639
        ov_setting = {BOREAS_SETTING_NAME: 1}
640
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
641
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
642
            p.scan_id = '456-789'
643
            p.kbdb.add_scan_preferences = MagicMock()
644
            p.prepare_boreas_alive_test()
645
646
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||8'])]
647
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
648
649
        # all alive test methods
650
        w = DummyDaemon()
651
        t_opt = {
652
            'alive_test_methods': "1",
653
            'icmp': '1',
654
            'tcp_ack': '1',
655
            'tcp_syn': '1',
656
            'arp': '1',
657
            'consider_alive': '1',
658
        }
659
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
660
        ov_setting = {BOREAS_SETTING_NAME: 1}
661
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
662
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
663
            p.scan_id = '456-789'
664
            p.kbdb.add_scan_preferences = MagicMock()
665
            p.prepare_boreas_alive_test()
666
667
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||31'])]
668
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
669
670
        # TCP-SYN alive test and dedicated port list for alive scan provided.
671
        w = DummyDaemon()
672
        t_opt = {
673
            'alive_test_ports': "80,137",
674
            'alive_test_methods': "1",
675
            'tcp_syn': '1',
676
        }
677
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
678
        ov_setting = {BOREAS_SETTING_NAME: 1}
679
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
680
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
681
            p.scan_id = '456-789'
682
            p.kbdb.add_scan_preferences = MagicMock()
683
            p.prepare_boreas_alive_test()
684
685
            calls = [
686
                call(p.scan_id, [BOREAS_ALIVE_TEST + '|||16']),
687
                call(p.scan_id, [BOREAS_ALIVE_TEST_PORTS + '|||80,137']),
688
            ]
689
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
690
691
    @patch('ospd_openvas.db.KbDB')
692
    def test_set_boreas_alive_test_enum_has_precedence(self, mock_kb):
693
        w = DummyDaemon()
694
        t_opt = {
695
            'alive_test_methods': "1",
696
            'consider_alive': '1',
697
            'alive_test': AliveTest.ALIVE_TEST_ICMP,
698
        }
699
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
700
        ov_setting = {BOREAS_SETTING_NAME: 1}
701
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
702
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
703
            p.scan_id = '456-789'
704
            p.kbdb.add_scan_preferences = MagicMock()
705
            p.prepare_boreas_alive_test()
706
707
            # has icmp and not consider_alive
708
            calls = [call(p.scan_id, [BOREAS_ALIVE_TEST + '|||2'])]
709
            p.kbdb.add_scan_preferences.assert_has_calls(calls)
710
711
    @patch('ospd_openvas.db.KbDB')
712
    def test_set_boreas_alive_test_without_settings(self, mock_kb):
713
        w = DummyDaemon()
714
        t_opt = {'alive_test': 16}
715
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
716
        ov_setting = {}
717
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
718
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
719
            p.scan_id = '456-789'
720
            p.kbdb.add_scan_preferences = MagicMock()
721
            p.prepare_boreas_alive_test()
722
723
            p.kbdb.add_scan_preferences.assert_not_called()
724
725 View Code Duplication
    @patch('ospd_openvas.db.KbDB')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
726
    def test_set_alive_no_setting(self, mock_kb):
727
        w = DummyDaemon()
728
729
        t_opt = {}
730
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
731
732
        ov_setting = {}
733
734
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
735
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
736
            p.scan_id = '456-789'
737
            p.kbdb.add_scan_preferences = MagicMock()
738
            p.prepare_alive_test_option_for_openvas()
739
740
            p.kbdb.add_scan_preferences.assert_not_called()
741
742 View Code Duplication
    @patch('ospd_openvas.db.KbDB')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
743
    def test_set_alive_no_invalid_alive_test(self, mock_kb):
744
        w = DummyDaemon()
745
746
        t_opt = {'alive_test': -1}
747
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
748
749
        ov_setting = {'some_setting': 1}
750
751
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
752
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
753
            p.scan_id = '456-789'
754
            p.kbdb.add_scan_preferences = MagicMock()
755
            p.prepare_alive_test_option_for_openvas()
756
757
            p.kbdb.add_scan_preferences.assert_not_called()
758
759 View Code Duplication
    @patch('ospd_openvas.db.KbDB')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
760
    def test_set_alive_no_invalid_alive_test_no_enum(self, mock_kb):
761
        w = DummyDaemon()
762
763
        t_opt = {'alive_test_methods': '1', 'icmp': '-1'}
764
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
765
766
        ov_setting = {'some_setting': 1}
767
768
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
769
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
770
            p.scan_id = '456-789'
771
            p.kbdb.add_scan_preferences = MagicMock()
772
            p.prepare_alive_test_option_for_openvas()
773
774
            p.kbdb.add_scan_preferences.assert_not_called()
775
776 View Code Duplication
    @patch('ospd_openvas.db.KbDB')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
777
    def test_set_alive_pinghost(self, mock_kb):
778
        w = DummyDaemon()
779
780
        alive_test_out = [
781
            "1.3.6.1.4.1.25623.1.0.100315:1:checkbox:Do a TCP ping|||no",
782
            "1.3.6.1.4.1.25623.1.0.100315:2:checkbox:TCP ping tries also TCP-SYN ping|||no",
783
            "1.3.6.1.4.1.25623.1.0.100315:7:checkbox:TCP ping tries only TCP-SYN ping|||no",
784
            "1.3.6.1.4.1.25623.1.0.100315:3:checkbox:Do an ICMP ping|||yes",
785
            "1.3.6.1.4.1.25623.1.0.100315:4:checkbox:Use ARP|||no",
786
            "1.3.6.1.4.1.25623.1.0.100315:5:checkbox:Mark unrechable Hosts as dead (not scanning)|||yes",
787
        ]
788
789
        t_opt = {'alive_test': 2}
790
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
791
792
        ov_setting = {'some_setting': 1}
793
794
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
795
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
796
797
            p.scan_id = '456-789'
798
            p.kbdb.add_scan_preferences = MagicMock()
799
            p.prepare_alive_test_option_for_openvas()
800
801
            p.kbdb.add_scan_preferences.assert_called_with(
802
                p.scan_id,
803
                alive_test_out,
804
            )
805
806 View Code Duplication
    @patch('ospd_openvas.db.KbDB')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
807
    def test_prepare_alive_test_not_supplied_as_enum(self, mock_kb):
808
        w = DummyDaemon()
809
810
        alive_test_out = [
811
            "1.3.6.1.4.1.25623.1.0.100315:1:checkbox:Do a TCP ping|||no",
812
            "1.3.6.1.4.1.25623.1.0.100315:2:checkbox:TCP ping tries also TCP-SYN ping|||no",
813
            "1.3.6.1.4.1.25623.1.0.100315:7:checkbox:TCP ping tries only TCP-SYN ping|||no",
814
            "1.3.6.1.4.1.25623.1.0.100315:3:checkbox:Do an ICMP ping|||yes",
815
            "1.3.6.1.4.1.25623.1.0.100315:4:checkbox:Use ARP|||no",
816
            "1.3.6.1.4.1.25623.1.0.100315:5:checkbox:Mark unrechable Hosts as dead (not scanning)|||yes",
817
        ]
818
819
        t_opt = {'alive_test_methods': '1', 'icmp': '1'}
820
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
821
822
        ov_setting = {'some_setting': 1}
823
824
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
825
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
826
827
            p.scan_id = '456-789'
828
            p.kbdb.add_scan_preferences = MagicMock()
829
            p.prepare_alive_test_option_for_openvas()
830
831
            p.kbdb.add_scan_preferences.assert_called_with(
832
                p.scan_id,
833
                alive_test_out,
834
            )
835
836 View Code Duplication
    @patch('ospd_openvas.db.KbDB')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
837
    def test_prepare_alive_test_no_enum_no_alive_test(self, mock_kb):
838
        w = DummyDaemon()
839
840
        t_opt = {'alive_test_methods': '1', 'icmp': '0'}
841
        w.scan_collection.get_target_options = MagicMock(return_value=t_opt)
842
843
        ov_setting = {'some_setting': 1}
844
845
        with patch.object(Openvas, 'get_settings', return_value=ov_setting):
846
            p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
847
848
            p.scan_id = '456-789'
849
            p.kbdb.add_scan_preferences = MagicMock()
850
            p.prepare_alive_test_option_for_openvas()
851
852
            p.kbdb.add_scan_preferences.assert_not_called()
853
854
    def test_alive_test_methods_to_bit_field(self):
855
856
        self.assertEqual(
857
            AliveTest.ALIVE_TEST_TCP_ACK_SERVICE,
858
            alive_test_methods_to_bit_field(
859
                icmp=False,
860
                tcp_ack=True,
861
                tcp_syn=False,
862
                arp=False,
863
                consider_alive=False,
864
            ),
865
        )
866
867
        self.assertEqual(
868
            AliveTest.ALIVE_TEST_ICMP,
869
            alive_test_methods_to_bit_field(
870
                icmp=True,
871
                tcp_ack=False,
872
                tcp_syn=False,
873
                arp=False,
874
                consider_alive=False,
875
            ),
876
        )
877
878
        self.assertEqual(
879
            AliveTest.ALIVE_TEST_ARP,
880
            alive_test_methods_to_bit_field(
881
                icmp=False,
882
                tcp_ack=False,
883
                tcp_syn=False,
884
                arp=True,
885
                consider_alive=False,
886
            ),
887
        )
888
889
        self.assertEqual(
890
            AliveTest.ALIVE_TEST_CONSIDER_ALIVE,
891
            alive_test_methods_to_bit_field(
892
                icmp=False,
893
                tcp_ack=False,
894
                tcp_syn=False,
895
                arp=False,
896
                consider_alive=True,
897
            ),
898
        )
899
900
        self.assertEqual(
901
            AliveTest.ALIVE_TEST_TCP_SYN_SERVICE,
902
            alive_test_methods_to_bit_field(
903
                icmp=False,
904
                tcp_ack=False,
905
                tcp_syn=True,
906
                arp=False,
907
                consider_alive=False,
908
            ),
909
        )
910
911
        all_alive_test_methods = (
912
            AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT
913
            | AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
914
            | AliveTest.ALIVE_TEST_ICMP
915
            | AliveTest.ALIVE_TEST_ARP
916
            | AliveTest.ALIVE_TEST_CONSIDER_ALIVE
917
            | AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
918
        )
919
        self.assertEqual(
920
            all_alive_test_methods,
921
            alive_test_methods_to_bit_field(
922
                icmp=True,
923
                tcp_ack=True,
924
                tcp_syn=True,
925
                arp=True,
926
                consider_alive=True,
927
            ),
928
        )
929