Passed
Pull Request — master (#84)
by
unknown
01:55
created

TestOspdOpenvas.test_build_credentials()   B

Complexity

Conditions 1

Size

Total Lines 49
Code Lines 43

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 43
nop 3
dl 0
loc 49
rs 8.8478
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018-2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-2.0-or-later
5
#
6
# This program is free software; you can redistribute it and/or
7
# modify it under the terms of the GNU General Public License
8
# as published by the Free Software Foundation; either version 2
9
# of the License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
20
""" Unit Test for ospd-openvas """
21
22
import unittest
23
24
from unittest.mock import patch
25
26
from tests.dummywrapper import DummyWrapper
27
28
from ospd_openvas.errors import OSPDOpenvasError
29
from ospd_openvas.wrapper import OSPD_PARAMS, OpenVasVtsFilter
30
31
OSPD_PARAMS_OUT = {
32
    'auto_enable_dependencies': {
33
        'type': 'boolean',
34
        'name': 'auto_enable_dependencies',
35
        'default': 1,
36
        'mandatory': 1,
37
        'description': 'Automatically enable the plugins that are depended on',
38
    },
39
    'cgi_path': {
40
        'type': 'string',
41
        'name': 'cgi_path',
42
        'default': '/cgi-bin:/scripts',
43
        'mandatory': 1,
44
        'description': 'Look for default CGIs in /cgi-bin and /scripts',
45
    },
46
    'checks_read_timeout': {
47
        'type': 'integer',
48
        'name': 'checks_read_timeout',
49
        'default': 5,
50
        'mandatory': 1,
51
        'description': 'Number  of seconds that the security checks will '
52
        'wait for when doing a recv()',
53
    },
54
    'drop_privileges': {
55
        'type': 'boolean',
56
        'name': 'drop_privileges',
57
        'default': 0,
58
        'mandatory': 1,
59
        'description': '',
60
    },
61
    'network_scan': {
62
        'type': 'boolean',
63
        'name': 'network_scan',
64
        'default': 0,
65
        'mandatory': 1,
66
        'description': '',
67
    },
68
    'non_simult_ports': {
69
        'type': 'string',
70
        'name': 'non_simult_ports',
71
        'default': '22',
72
        'mandatory': 1,
73
        'description': 'Prevent to make two connections on the same given '
74
        'ports at the same time.',
75
    },
76
    'open_sock_max_attempts': {
77
        'type': 'integer',
78
        'name': 'open_sock_max_attempts',
79
        'default': 5,
80
        'mandatory': 0,
81
        'description': 'Number of unsuccessful retries to open the socket '
82
        'before to set the port as closed.',
83
    },
84
    'timeout_retry': {
85
        'type': 'integer',
86
        'name': 'timeout_retry',
87
        'default': 5,
88
        'mandatory': 0,
89
        'description': 'Number of retries when a socket connection attempt '
90
        'timesout.',
91
    },
92
    'optimize_test': {
93
        'type': 'integer',
94
        'name': 'optimize_test',
95
        'default': 5,
96
        'mandatory': 0,
97
        'description': 'By default, openvas does not trust the remote '
98
        'host banners.',
99
    },
100
    'plugins_timeout': {
101
        'type': 'integer',
102
        'name': 'plugins_timeout',
103
        'default': 5,
104
        'mandatory': 0,
105
        'description': 'This is the maximum lifetime, in seconds of a plugin.',
106
    },
107
    'report_host_details': {
108
        'type': 'boolean',
109
        'name': 'report_host_details',
110
        'default': 1,
111
        'mandatory': 1,
112
        'description': '',
113
    },
114
    'safe_checks': {
115
        'type': 'boolean',
116
        'name': 'safe_checks',
117
        'default': 1,
118
        'mandatory': 1,
119
        'description': 'Disable the plugins with potential to crash '
120
        'the remote services',
121
    },
122
    'scanner_plugins_timeout': {
123
        'type': 'integer',
124
        'name': 'scanner_plugins_timeout',
125
        'default': 36000,
126
        'mandatory': 1,
127
        'description': 'Like plugins_timeout, but for ACT_SCANNER plugins.',
128
    },
129
    'time_between_request': {
130
        'type': 'integer',
131
        'name': 'time_between_request',
132
        'default': 0,
133
        'mandatory': 0,
134
        'description': 'Allow to set a wait time between two actions '
135
        '(open, send, close).',
136
    },
137
    'unscanned_closed': {
138
        'type': 'boolean',
139
        'name': 'unscanned_closed',
140
        'default': 1,
141
        'mandatory': 1,
142
        'description': '',
143
    },
144
    'unscanned_closed_udp': {
145
        'type': 'boolean',
146
        'name': 'unscanned_closed_udp',
147
        'default': 1,
148
        'mandatory': 1,
149
        'description': '',
150
    },
151
    'use_mac_addr': {
152
        'type': 'boolean',
153
        'name': 'use_mac_addr',
154
        'default': 0,
155
        'mandatory': 0,
156
        'description': 'To test the local network. '
157
        'Hosts will be referred to by their MAC address.',
158
    },
159
    'vhosts': {
160
        'type': 'string',
161
        'name': 'vhosts',
162
        'default': '',
163
        'mandatory': 0,
164
        'description': '',
165
    },
166
    'vhosts_ip': {
167
        'type': 'string',
168
        'name': 'vhosts_ip',
169
        'default': '',
170
        'mandatory': 0,
171
        'description': '',
172
    },
173
}
174
175
176
@patch('ospd_openvas.db.OpenvasDB')
177
@patch('ospd_openvas.nvticache.NVTICache')
178
class TestOspdOpenvas(unittest.TestCase):
179
    @patch('ospd_openvas.wrapper.subprocess')
180
    def test_redis_nvticache_init(self, mock_subproc, mock_nvti, mock_db):
181
        mock_subproc.check_call.return_value = True
182
        w = DummyWrapper(mock_nvti, mock_db)
183
        w.redis_nvticache_init()
184
        self.assertEqual(mock_subproc.check_call.call_count, 1)
185
186
    @patch('ospd_openvas.wrapper.subprocess')
187
    def test_parse_param(self, mock_subproc, mock_nvti, mock_db):
188
189
        mock_subproc.check_output.return_value = (
190
            'non_simult_ports = 22\nplugins_folder = /foo/bar'.encode()
191
        )
192
        w = DummyWrapper(mock_nvti, mock_db)
193
        w.parse_param()
194
        self.assertEqual(mock_subproc.check_output.call_count, 1)
195
        self.assertEqual(OSPD_PARAMS, OSPD_PARAMS_OUT)
196
        self.assertEqual(w.scan_only_params.get('plugins_folder'), '/foo/bar')
197
198
    def test_load_vts(self, mock_nvti, mock_db):
199
        w = DummyWrapper(mock_nvti, mock_db)
200
        w.load_vts()
201
        self.maxDiff = None
202
        self.assertEqual(w.vts, w.VT)
203
204
    def test_get_custom_xml(self, mock_nvti, mock_db):
205
        out = (
206
            '<custom><required_ports>Services/www, 80</re'
207
            'quired_ports><category>3</category><'
208
            'excluded_keys>Settings/disable_cgi_s'
209
            'canning</excluded_keys><family>Produ'
210
            'ct detection</family><filename>manti'
211
            's_detect.nasl</filename><timeout>0</'
212
            'timeout></custom>'
213
        )
214
        w = DummyWrapper(mock_nvti, mock_db)
215
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
216
        res = w.get_custom_vt_as_xml_str(
217
            '1.3.6.1.4.1.25623.1.0.100061', vt.get('custom')
218
        )
219
        self.assertEqual(len(res), len(out))
220
221
    def test_get_severities_xml(self, mock_nvti, mock_db):
222
        w = DummyWrapper(mock_nvti, mock_db)
223
        out = (
224
            '<severities><severity type="cvss_base_v2">'
225
            'AV:N/AC:L/Au:N/C:N/I:N/A:N</severity></severities>'
226
        )
227
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
228
        severities = vt.get('severities')
229
        res = w.get_severities_vt_as_xml_str(
230
            '1.3.6.1.4.1.25623.1.0.100061', severities
231
        )
232
233
        self.assertEqual(res, out)
234
235
    def test_get_params_xml(self, mock_nvti, mock_db):
236
        w = DummyWrapper(mock_nvti, mock_db)
237
        out = (
238
            '<params><param type="checkbox" id="2"><name>Do '
239
            'not randomize the  order  in  which ports are scanned</name'
240
            '><default>no</default></param><param type="ent'
241
            'ry" id="1"><name>Data length :</name><'
242
            '/param></params>'
243
        )
244
245
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
246
        params = vt.get('vt_params')
247
        res = w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params)
248
        self.assertEqual(len(res), len(out))
249
250
    def test_get_refs_xml(self, mock_nvti, mock_db):
251
        w = DummyWrapper(mock_nvti, mock_db)
252
        out = '<refs><ref type="url" id="http://www.mantisbt.org/"/>' '</refs>'
253
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
254
        refs = vt.get('vt_refs')
255
        res = w.get_refs_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', refs)
256
257
        self.assertEqual(res, out)
258
259
    def test_get_dependencies_xml(self, mock_nvti, mock_db):
260
        w = DummyWrapper(mock_nvti, mock_db)
261
        out = (
262
            '<dependencies><dependency vt_id="1.2.3.4"/><dependency vt'
263
            '_id="4.3.2.1"/></dependencies>'
264
        )
265
        dep = ['1.2.3.4', '4.3.2.1']
266
        res = w.get_dependencies_vt_as_xml_str(
267
            '1.3.6.1.4.1.25623.1.0.100061', dep
268
        )
269
270
        self.assertEqual(res, out)
271
272
    def test_get_ctime_xml(self, mock_nvti, mock_db):
273
        w = DummyWrapper(mock_nvti, mock_db)
274
        out = (
275
            '<creation_time>2009-03-19 11:22:36 +0100 '
276
            '(Thu, 19 Mar 2009)</creation_time>'
277
        )
278
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
279
        ctime = vt.get('creation_time')
280
        res = w.get_creation_time_vt_as_xml_str(
281
            '1.3.6.1.4.1.25623.1.0.100061', ctime
282
        )
283
284
        self.assertEqual(res, out)
285
286
    def test_get_mtime_xml(self, mock_nvti, mock_db):
287
        w = DummyWrapper(mock_nvti, mock_db)
288
        out = (
289
            '<modification_time>$Date: 2018-08-10 15:09:25 +0200 '
290
            '(Fri, 10 Aug 2018) $</modification_time>'
291
        )
292
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
293
        mtime = vt.get('modification_time')
294
        res = w.get_modification_time_vt_as_xml_str(
295
            '1.3.6.1.4.1.25623.1.0.100061', mtime
296
        )
297
298
        self.assertEqual(res, out)
299
300
    def test_get_summary_xml(self, mock_nvti, mock_db):
301
        w = DummyWrapper(mock_nvti, mock_db)
302
        out = '<summary>some summary</summary>'
303
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
304
        summary = vt.get('summary')
305
        res = w.get_summary_vt_as_xml_str(
306
            '1.3.6.1.4.1.25623.1.0.100061', summary
307
        )
308
309
        self.assertEqual(res, out)
310
311
    def test_get_impact_xml(self, mock_nvti, mock_db):
312
        w = DummyWrapper(mock_nvti, mock_db)
313
        out = '<impact>some impact</impact>'
314
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
315
        impact = vt.get('impact')
316
        res = w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact)
317
318
        self.assertEqual(res, out)
319
320
    def test_get_insight_xml(self, mock_nvti, mock_db):
321
        w = DummyWrapper(mock_nvti, mock_db)
322
        out = '<insight>some insight</insight>'
323
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
324
        insight = vt.get('insight')
325
        res = w.get_insight_vt_as_xml_str(
326
            '1.3.6.1.4.1.25623.1.0.100061', insight
327
        )
328
329
        self.assertEqual(res, out)
330
331
    def test_get_solution_xml(self, mock_nvti, mock_db):
332
        w = DummyWrapper(mock_nvti, mock_db)
333
        out = '<solution type="WillNotFix">some solution</solution>'
334
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
335
        solution = vt.get('solution')
336
        solution_type = vt.get('solution_type')
337
338
        res = w.get_solution_vt_as_xml_str(
339
            '1.3.6.1.4.1.25623.1.0.100061', solution, solution_type
340
        )
341
342
        self.assertEqual(res, out)
343
344
    def test_get_detection_xml(self, mock_nvti, mock_db):
345
        w = DummyWrapper(mock_nvti, mock_db)
346
        out = '<detection qod_type="remote_banner"/>'
347
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
348
        detection_type = vt.get('qod_type')
349
350
        res = w.get_detection_vt_as_xml_str(
351
            '1.3.6.1.4.1.25623.1.0.100061', qod_type=detection_type
352
        )
353
354
        self.assertEqual(res, out)
355
356
    def test_get_affected_xml(self, mock_nvti, mock_db):
357
        w = DummyWrapper(mock_nvti, mock_db)
358
        out = '<affected>some affection</affected>'
359
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
360
        affected = vt.get('affected')
361
362
        res = w.get_affected_vt_as_xml_str(
363
            '1.3.6.1.4.1.25623.1.0.100061', affected=affected
364
        )
365
366
        self.assertEqual(res, out)
367
368
    def test_build_credentials(self, mock_nvti, mock_db):
369
        w = DummyWrapper(mock_nvti, mock_db)
370
371
        cred_out = [
372
            '1.3.6.1.4.1.25623.1.0.105058:1:entry:ESXi login name:|||username',
373
            '1.3.6.1.4.1.25623.1.0.105058:2:password:ESXi login password:|||pass',
374
            'auth_port_ssh|||22',
375
            '1.3.6.1.4.1.25623.1.0.103591:1:entry:SSH login name:|||username',
376
            '1.3.6.1.4.1.25623.1.0.103591:2:password:SSH key passphrase:|||pass',
377
            '1.3.6.1.4.1.25623.1.0.103591:4:file:SSH private key:|||',
378
            '1.3.6.1.4.1.25623.1.0.90023:1:entry:SMB login:|||username',
379
            '1.3.6.1.4.1.25623.1.0.90023:2:password]:SMB password :|||pass',
380
            '1.3.6.1.4.1.25623.1.0.105076:1:password:SNMP Community:some comunity',
381
            '1.3.6.1.4.1.25623.1.0.105076:2:entry:SNMPv3 Username:username',
382
            '1.3.6.1.4.1.25623.1.0.105076:3:password:SNMPv3 Password:pass',
383
            '1.3.6.1.4.1.25623.1.0.105076:4:radio:SNMPv3 Authentication Algorithm:some auth algo',
384
            '1.3.6.1.4.1.25623.1.0.105076:5:password:SNMPv3 Privacy Password:privacy pass',
385
            '1.3.6.1.4.1.25623.1.0.105076:6:radio:SNMPv3 Privacy Algorithm:privacy algo',
386
        ]
387
        cred_dict = {
388
            'ssh': {
389
                'type': 'ssh',
390
                'port': '22',
391
                'username': 'username',
392
                'password': 'pass',
393
            },
394
            'smb': {'type': 'smb', 'username': 'username', 'password': 'pass'},
395
            'esxi': {
396
                'type': 'esxi',
397
                'username': 'username',
398
                'password': 'pass',
399
            },
400
            'snmp': {
401
                'type': 'snmp',
402
                'username': 'username',
403
                'password': 'pass',
404
                'community': 'some comunity',
405
                'auth_algorithm': 'some auth algo',
406
                'privacy_password': 'privacy pass',
407
                'privacy_algorithm': 'privacy algo',
408
            },
409
        }
410
        self.maxDiff = None
411
        ret = w.build_credentials_as_prefs(cred_dict)
412
        self.assertEqual(len(ret), len(cred_out))
413
        self.assertIn('auth_port_ssh|||22', cred_out)
414
        self.assertIn(
415
            '1.3.6.1.4.1.25623.1.0.90023:1:entry:SMB login:|||username',
416
            cred_out,
417
        )
418
419
    def test_build_credentials_ssh_up(self, mock_nvti, mock_db):
420
        w = DummyWrapper(mock_nvti, mock_db)
421
        cred_out = [
422
            'auth_port_ssh|||22',
423
            '1.3.6.1.4.1.25623.1.0.103591:1:entry:SSH login name:|||username',
424
            '1.3.6.1.4.1.25623.1.0.103591:3:password:SSH password (unsafe!):|||pass',
425
        ]
426
        cred_dict = {
427
            'ssh': {
428
                'type': 'up',
429
                'port': '22',
430
                'username': 'username',
431
                'password': 'pass',
432
            }
433
        }
434
        self.maxDiff = None
435
        ret = w.build_credentials_as_prefs(cred_dict)
436
        self.assertEqual(ret, cred_out)
437
438
    def test_process_vts(self, mock_nvti, mock_db):
439
        vts = {
440
            '1.3.6.1.4.1.25623.1.0.100061': {
441
                'Data length :': 'new value',
442
                'Do not randomize the  order  in  which ports are '
443
                'scanned': 'new value',
444
            },
445
            'vt_groups': ['family=debian', 'family=general'],
446
        }
447
        vt_out = (
448
            ['1.3.6.1.4.1.25623.1.0.100061'],
449
            [
450
                [
451
                    'Mantis Detection[checkbox]:Do not randomize the  order  i'
452
                    'n  which ports are scanned',
453
                    'new value',
454
                ],
455
                ['Mantis Detection[entry]:Data length : ', 'new value'],
456
            ],
457
        )
458
        w = DummyWrapper(mock_nvti, mock_db)
459
        w.load_vts()
460
        ret = w.process_vts(vts)
461
        self.assertEqual(len(ret), len(vt_out))
462
463
    def test_get_openvas_timestamp_scan_host_end(self, mock_nvti, mock_db):
464
        mock_db.get_host_scan_scan_end_time.return_value = '12345'
465
        w = DummyWrapper(mock_nvti, mock_db)
466
        targets = [['192.168.0.1', 'port', 'cred', 'exclude_host']]
467
        w.create_scan('123-456', targets, None, [])
468
        w.get_openvas_timestamp_scan_host('123-456', '192.168.0.1')
469
        for result in w.scan_collection.results_iterator('123-456', False):
470
            self.assertEqual(result.get('value'), '12345')
471
472
    def test_get_openvas_timestamp_scan_host_start(self, mock_nvti, mock_db):
473
        mock_db.get_host_scan_scan_end_time.return_value = None
474
        mock_db.get_host_scan_scan_end_time.return_value = '54321'
475
        w = DummyWrapper(mock_nvti, mock_db)
476
        targets = [['192.168.0.1', 'port', 'cred', 'exclude_host']]
477
        w.create_scan('123-456', targets, None, [])
478
        w.get_openvas_timestamp_scan_host('123-456', '192.168.0.1')
479
        for result in w.scan_collection.results_iterator('123-456', False):
480
            self.assertEqual(result.get('value'), '54321')
481
482
    def test_scan_is_finished(self, mock_nvti, mock_db):
483
        mock_db.get_single_item.return_value = 'finished'
484
        w = DummyWrapper(mock_nvti, mock_db)
485
        ret = w.scan_is_finished('123-456')
486
        self.assertEqual(ret, True)
487
488
    def test_scan_is_stopped(self, mock_nvti, mock_db):
489
        mock_db.get_single_item.return_value = 'stop_all'
490
        mock_db.kb_connect_item.return_value = mock_db
491
        mock_db.set_redisctx.return_value = None
492
        w = DummyWrapper(mock_nvti, mock_db)
493
        ret = w.scan_is_stopped('123-456')
494
        self.assertEqual(ret, True)
495
496
    @patch('ospd_openvas.wrapper.open')
497
    def test_feed_is_outdated(self, mock_open, mock_nvti, mock_db):
498
        mock_open.return_value = ['PLUGIN_SET = "1234";']
499
        w = DummyWrapper(mock_nvti, mock_db)
500
        self.assertRaises(OSPDOpenvasError, w.feed_is_outdated, '1234')
501
        # Return False
502
        w.scan_only_params['plugins_folder'] = '/foo/bar'
503
        ret = w.feed_is_outdated('1234')
504
        self.assertFalse(ret)
505
506
        # Return true
507
        mock_open.return_value = ['PLUGIN_SET = "1235";']
508
        ret = w.feed_is_outdated('1234')
509
        self.assertTrue(ret)
510
511
    @patch('ospd_openvas.wrapper.OSPDaemon.add_scan_log')
512
    def test_get_openvas_result(self, mock_ospd, mock_nvti, mock_db):
513
        results = ["LOG||| |||general/Host_Details||| |||Host dead", None]
514
        mock_db.get_result.side_effect = results
515
        w = DummyWrapper(mock_nvti, mock_db)
516
        w.load_vts()
517
        mock_ospd.return_value = None
518
        w.get_openvas_result('123-456', 'localhost')
519
        mock_ospd.assert_called_with(
520
            '123-456',
521
            host='localhost',
522
            hostname=' ',
523
            name='',
524
            port='general/Host_Details',
525
            qod='',
526
            test_id=' ',
527
            value='Host dead',
528
        )
529
530
    @patch('ospd_openvas.wrapper.OSPDaemon.set_scan_target_progress')
531
    def test_update_progress(self, mock_ospd, mock_nvti, mock_db):
532
        msg = '0/-1'
533
        targets = [['localhost', 'port', 'cred', 'exclude_host']]
534
        w = DummyWrapper(mock_nvti, mock_db)
535
        w.create_scan('123-456', targets, None, [])
536
537
        mock_ospd.return_value = None
538
        w.update_progress('123-456', 'localhost', 'localhost', msg)
539
        mock_ospd.assert_called_with('123-456', 'localhost', 'localhost', 100)
540
541
542
class TestFilters(unittest.TestCase):
543
    def test_format_vt_modification_time(self):
544
        ovformat = OpenVasVtsFilter()
545
        td = '$Date: 2018-02-01 02:09:01 +0200 (Thu, 18 Oct 2018) $'
546
        formatted = ovformat.format_vt_modification_time(td)
547
        self.assertEqual(formatted, "20180201020901")
548