Passed
Push — master ( 14685a...bdb516 )
by Juan José
01:39 queued 10s
created

TestOspdOpenvas.test_redis_nvticache_init()   A

Complexity

Conditions 1

Size

Total Lines 7
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 7
nop 4
dl 0
loc 7
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018-2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-2.0-or-later
5
#
6
# This program is free software; you can redistribute it and/or
7
# modify it under the terms of the GNU General Public License
8
# as published by the Free Software Foundation; either version 2
9
# of the License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
20
""" Unit Test for ospd-openvas """
21
22
import unittest
23
24
from unittest.mock import patch
25
26
from tests.dummywrapper import DummyWrapper
27
28
from ospd_openvas.errors import OspdOpenvasError
29
from ospd_openvas.wrapper import OSPD_PARAMS, OpenVasVtsFilter
30
31
OSPD_PARAMS_OUT = {
32
    'auto_enable_dependencies': {
33
        'type': 'boolean',
34
        'name': 'auto_enable_dependencies',
35
        'default': 1,
36
        'mandatory': 1,
37
        'description': 'Automatically enable the plugins that are depended on',
38
    },
39
    'cgi_path': {
40
        'type': 'string',
41
        'name': 'cgi_path',
42
        'default': '/cgi-bin:/scripts',
43
        'mandatory': 1,
44
        'description': 'Look for default CGIs in /cgi-bin and /scripts',
45
    },
46
    'checks_read_timeout': {
47
        'type': 'integer',
48
        'name': 'checks_read_timeout',
49
        'default': 5,
50
        'mandatory': 1,
51
        'description': 'Number  of seconds that the security checks will '
52
        'wait for when doing a recv()',
53
    },
54
    'drop_privileges': {
55
        'type': 'boolean',
56
        'name': 'drop_privileges',
57
        'default': 0,
58
        'mandatory': 1,
59
        'description': '',
60
    },
61
    'network_scan': {
62
        'type': 'boolean',
63
        'name': 'network_scan',
64
        'default': 0,
65
        'mandatory': 1,
66
        'description': '',
67
    },
68
    'non_simult_ports': {
69
        'type': 'string',
70
        'name': 'non_simult_ports',
71
        'default': '22',
72
        'mandatory': 1,
73
        'description': 'Prevent to make two connections on the same given '
74
        'ports at the same time.',
75
    },
76
    'open_sock_max_attempts': {
77
        'type': 'integer',
78
        'name': 'open_sock_max_attempts',
79
        'default': 5,
80
        'mandatory': 0,
81
        'description': 'Number of unsuccessful retries to open the socket '
82
        'before to set the port as closed.',
83
    },
84
    'timeout_retry': {
85
        'type': 'integer',
86
        'name': 'timeout_retry',
87
        'default': 5,
88
        'mandatory': 0,
89
        'description': 'Number of retries when a socket connection attempt '
90
        'timesout.',
91
    },
92
    'optimize_test': {
93
        'type': 'integer',
94
        'name': 'optimize_test',
95
        'default': 5,
96
        'mandatory': 0,
97
        'description': 'By default, openvas does not trust the remote '
98
        'host banners.',
99
    },
100
    'plugins_timeout': {
101
        'type': 'integer',
102
        'name': 'plugins_timeout',
103
        'default': 5,
104
        'mandatory': 0,
105
        'description': 'This is the maximum lifetime, in seconds of a plugin.',
106
    },
107
    'report_host_details': {
108
        'type': 'boolean',
109
        'name': 'report_host_details',
110
        'default': 1,
111
        'mandatory': 1,
112
        'description': '',
113
    },
114
    'safe_checks': {
115
        'type': 'boolean',
116
        'name': 'safe_checks',
117
        'default': 1,
118
        'mandatory': 1,
119
        'description': 'Disable the plugins with potential to crash '
120
        'the remote services',
121
    },
122
    'scanner_plugins_timeout': {
123
        'type': 'integer',
124
        'name': 'scanner_plugins_timeout',
125
        'default': 36000,
126
        'mandatory': 1,
127
        'description': 'Like plugins_timeout, but for ACT_SCANNER plugins.',
128
    },
129
    'time_between_request': {
130
        'type': 'integer',
131
        'name': 'time_between_request',
132
        'default': 0,
133
        'mandatory': 0,
134
        'description': 'Allow to set a wait time between two actions '
135
        '(open, send, close).',
136
    },
137
    'unscanned_closed': {
138
        'type': 'boolean',
139
        'name': 'unscanned_closed',
140
        'default': 1,
141
        'mandatory': 1,
142
        'description': '',
143
    },
144
    'unscanned_closed_udp': {
145
        'type': 'boolean',
146
        'name': 'unscanned_closed_udp',
147
        'default': 1,
148
        'mandatory': 1,
149
        'description': '',
150
    },
151
    'use_mac_addr': {
152
        'type': 'boolean',
153
        'name': 'use_mac_addr',
154
        'default': 0,
155
        'mandatory': 0,
156
        'description': 'To test the local network. '
157
        'Hosts will be referred to by their MAC address.',
158
    },
159
    'vhosts': {
160
        'type': 'string',
161
        'name': 'vhosts',
162
        'default': '',
163
        'mandatory': 0,
164
        'description': '',
165
    },
166
    'vhosts_ip': {
167
        'type': 'string',
168
        'name': 'vhosts_ip',
169
        'default': '',
170
        'mandatory': 0,
171
        'description': '',
172
    },
173
}
174
175
176
@patch('ospd_openvas.db.OpenvasDB')
177
@patch('ospd_openvas.nvticache.NVTICache')
178
class TestOspdOpenvas(unittest.TestCase):
179
    @patch('ospd_openvas.wrapper.subprocess')
180
    def test_redis_nvticache_init(self, mock_subproc, mock_nvti, mock_db):
181
        mock_subproc.check_call.return_value = True
182
        w = DummyWrapper(mock_nvti, mock_db)
183
        mock_subproc.reset_mock()
184
        w.redis_nvticache_init()
185
        self.assertEqual(mock_subproc.check_call.call_count, 1)
186
187
    @patch('ospd_openvas.wrapper.subprocess')
188
    def test_parse_param(self, mock_subproc, mock_nvti, mock_db):
189
190
        mock_subproc.check_output.return_value = (
191
            'non_simult_ports = 22\nplugins_folder = /foo/bar'.encode()
192
        )
193
        w = DummyWrapper(mock_nvti, mock_db)
194
        w.parse_param()
195
        self.assertEqual(mock_subproc.check_output.call_count, 1)
196
        self.assertEqual(OSPD_PARAMS, OSPD_PARAMS_OUT)
197
        self.assertEqual(w.scan_only_params.get('plugins_folder'), '/foo/bar')
198
199
    @patch('ospd_openvas.wrapper.subprocess')
200
    def test_sudo_available(self, mock_subproc, mock_nvti, mock_db):
201
        mock_subproc.check_call.return_value = 0
202
        w = DummyWrapper(mock_nvti, mock_db)
203
        w._sudo_available = None
204
        w.sudo_available
205
        self.assertTrue(w.sudo_available)
206
207
    def test_load_vts(self, mock_nvti, mock_db):
208
        w = DummyWrapper(mock_nvti, mock_db)
209
        w.load_vts()
210
        self.maxDiff = None
211
        self.assertEqual(w.vts, w.VT)
212
213
    def test_get_custom_xml(self, mock_nvti, mock_db):
214
        out = (
215
            '<custom><required_ports>Services/www, 80</re'
216
            'quired_ports><category>3</category><'
217
            'excluded_keys>Settings/disable_cgi_s'
218
            'canning</excluded_keys><family>Produ'
219
            'ct detection</family><filename>manti'
220
            's_detect.nasl</filename><timeout>0</'
221
            'timeout></custom>'
222
        )
223
        w = DummyWrapper(mock_nvti, mock_db)
224
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
225
        res = w.get_custom_vt_as_xml_str(
226
            '1.3.6.1.4.1.25623.1.0.100061', vt.get('custom')
227
        )
228
        self.assertEqual(len(res), len(out))
229
230
    def test_get_severities_xml(self, mock_nvti, mock_db):
231
        w = DummyWrapper(mock_nvti, mock_db)
232
        out = (
233
            '<severities><severity type="cvss_base_v2">'
234
            'AV:N/AC:L/Au:N/C:N/I:N/A:N</severity></severities>'
235
        )
236
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
237
        severities = vt.get('severities')
238
        res = w.get_severities_vt_as_xml_str(
239
            '1.3.6.1.4.1.25623.1.0.100061', severities
240
        )
241
242
        self.assertEqual(res, out)
243
244
    def test_get_params_xml(self, mock_nvti, mock_db):
245
        w = DummyWrapper(mock_nvti, mock_db)
246
        out = (
247
            '<params><param type="checkbox" id="2"><name>Do '
248
            'not randomize the  order  in  which ports are scanned</name'
249
            '><default>no</default></param><param type="ent'
250
            'ry" id="1"><name>Data length :</name><'
251
            '/param></params>'
252
        )
253
254
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
255
        params = vt.get('vt_params')
256
        res = w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params)
257
        self.assertEqual(len(res), len(out))
258
259
    def test_get_refs_xml(self, mock_nvti, mock_db):
260
        w = DummyWrapper(mock_nvti, mock_db)
261
        out = '<refs><ref type="url" id="http://www.mantisbt.org/"/>' '</refs>'
262
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
263
        refs = vt.get('vt_refs')
264
        res = w.get_refs_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', refs)
265
266
        self.assertEqual(res, out)
267
268
    def test_get_dependencies_xml(self, mock_nvti, mock_db):
269
        w = DummyWrapper(mock_nvti, mock_db)
270
        out = (
271
            '<dependencies><dependency vt_id="1.2.3.4"/><dependency vt'
272
            '_id="4.3.2.1"/></dependencies>'
273
        )
274
        dep = ['1.2.3.4', '4.3.2.1']
275
        res = w.get_dependencies_vt_as_xml_str(
276
            '1.3.6.1.4.1.25623.1.0.100061', dep
277
        )
278
279
        self.assertEqual(res, out)
280
281
    def test_get_ctime_xml(self, mock_nvti, mock_db):
282
        w = DummyWrapper(mock_nvti, mock_db)
283
        out = (
284
            '<creation_time>2009-03-19 11:22:36 +0100 '
285
            '(Thu, 19 Mar 2009)</creation_time>'
286
        )
287
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
288
        ctime = vt.get('creation_time')
289
        res = w.get_creation_time_vt_as_xml_str(
290
            '1.3.6.1.4.1.25623.1.0.100061', ctime
291
        )
292
293
        self.assertEqual(res, out)
294
295
    def test_get_mtime_xml(self, mock_nvti, mock_db):
296
        w = DummyWrapper(mock_nvti, mock_db)
297
        out = (
298
            '<modification_time>$Date: 2018-08-10 15:09:25 +0200 '
299
            '(Fri, 10 Aug 2018) $</modification_time>'
300
        )
301
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
302
        mtime = vt.get('modification_time')
303
        res = w.get_modification_time_vt_as_xml_str(
304
            '1.3.6.1.4.1.25623.1.0.100061', mtime
305
        )
306
307
        self.assertEqual(res, out)
308
309
    def test_get_summary_xml(self, mock_nvti, mock_db):
310
        w = DummyWrapper(mock_nvti, mock_db)
311
        out = '<summary>some summary</summary>'
312
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
313
        summary = vt.get('summary')
314
        res = w.get_summary_vt_as_xml_str(
315
            '1.3.6.1.4.1.25623.1.0.100061', summary
316
        )
317
318
        self.assertEqual(res, out)
319
320
    def test_get_impact_xml(self, mock_nvti, mock_db):
321
        w = DummyWrapper(mock_nvti, mock_db)
322
        out = '<impact>some impact</impact>'
323
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
324
        impact = vt.get('impact')
325
        res = w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact)
326
327
        self.assertEqual(res, out)
328
329
    def test_get_insight_xml(self, mock_nvti, mock_db):
330
        w = DummyWrapper(mock_nvti, mock_db)
331
        out = '<insight>some insight</insight>'
332
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
333
        insight = vt.get('insight')
334
        res = w.get_insight_vt_as_xml_str(
335
            '1.3.6.1.4.1.25623.1.0.100061', insight
336
        )
337
338
        self.assertEqual(res, out)
339
340
    def test_get_solution_xml(self, mock_nvti, mock_db):
341
        w = DummyWrapper(mock_nvti, mock_db)
342
        out = '<solution type="WillNotFix">some solution</solution>'
343
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
344
        solution = vt.get('solution')
345
        solution_type = vt.get('solution_type')
346
347
        res = w.get_solution_vt_as_xml_str(
348
            '1.3.6.1.4.1.25623.1.0.100061', solution, solution_type
349
        )
350
351
        self.assertEqual(res, out)
352
353
    def test_get_detection_xml(self, mock_nvti, mock_db):
354
        w = DummyWrapper(mock_nvti, mock_db)
355
        out = '<detection qod_type="remote_banner"/>'
356
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
357
        detection_type = vt.get('qod_type')
358
359
        res = w.get_detection_vt_as_xml_str(
360
            '1.3.6.1.4.1.25623.1.0.100061', qod_type=detection_type
361
        )
362
363
        self.assertEqual(res, out)
364
365
    def test_get_affected_xml(self, mock_nvti, mock_db):
366
        w = DummyWrapper(mock_nvti, mock_db)
367
        out = '<affected>some affection</affected>'
368
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
369
        affected = vt.get('affected')
370
371
        res = w.get_affected_vt_as_xml_str(
372
            '1.3.6.1.4.1.25623.1.0.100061', affected=affected
373
        )
374
375
        self.assertEqual(res, out)
376
377
    def test_build_credentials(self, mock_nvti, mock_db):
378
        w = DummyWrapper(mock_nvti, mock_db)
379
380
        cred_out = [
381
            '1.3.6.1.4.1.25623.1.0.105058:1:entry:ESXi login name:|||username',
382
            '1.3.6.1.4.1.25623.1.0.105058:2:password:ESXi login password:|||pass',
383
            'auth_port_ssh|||22',
384
            '1.3.6.1.4.1.25623.1.0.103591:1:entry:SSH login name:|||username',
385
            '1.3.6.1.4.1.25623.1.0.103591:2:password:SSH key passphrase:|||pass',
386
            '1.3.6.1.4.1.25623.1.0.103591:4:file:SSH private key:|||',
387
            '1.3.6.1.4.1.25623.1.0.90023:1:entry:SMB login:|||username',
388
            '1.3.6.1.4.1.25623.1.0.90023:2:password]:SMB password :|||pass',
389
            '1.3.6.1.4.1.25623.1.0.105076:1:password:SNMP Community:some comunity',
390
            '1.3.6.1.4.1.25623.1.0.105076:2:entry:SNMPv3 Username:username',
391
            '1.3.6.1.4.1.25623.1.0.105076:3:password:SNMPv3 Password:pass',
392
            '1.3.6.1.4.1.25623.1.0.105076:4:radio:SNMPv3 Authentication Algorithm:some auth algo',
393
            '1.3.6.1.4.1.25623.1.0.105076:5:password:SNMPv3 Privacy Password:privacy pass',
394
            '1.3.6.1.4.1.25623.1.0.105076:6:radio:SNMPv3 Privacy Algorithm:privacy algo',
395
        ]
396
        cred_dict = {
397
            'ssh': {
398
                'type': 'ssh',
399
                'port': '22',
400
                'username': 'username',
401
                'password': 'pass',
402
            },
403
            'smb': {'type': 'smb', 'username': 'username', 'password': 'pass'},
404
            'esxi': {
405
                'type': 'esxi',
406
                'username': 'username',
407
                'password': 'pass',
408
            },
409
            'snmp': {
410
                'type': 'snmp',
411
                'username': 'username',
412
                'password': 'pass',
413
                'community': 'some comunity',
414
                'auth_algorithm': 'some auth algo',
415
                'privacy_password': 'privacy pass',
416
                'privacy_algorithm': 'privacy algo',
417
            },
418
        }
419
        self.maxDiff = None
420
        ret = w.build_credentials_as_prefs(cred_dict)
421
        self.assertEqual(len(ret), len(cred_out))
422
        self.assertIn('auth_port_ssh|||22', cred_out)
423
        self.assertIn(
424
            '1.3.6.1.4.1.25623.1.0.90023:1:entry:SMB login:|||username',
425
            cred_out,
426
        )
427
428
    def test_build_credentials_ssh_up(self, mock_nvti, mock_db):
429
        w = DummyWrapper(mock_nvti, mock_db)
430
        cred_out = [
431
            'auth_port_ssh|||22',
432
            '1.3.6.1.4.1.25623.1.0.103591:1:entry:SSH login name:|||username',
433
            '1.3.6.1.4.1.25623.1.0.103591:3:password:SSH password (unsafe!):|||pass',
434
        ]
435
        cred_dict = {
436
            'ssh': {
437
                'type': 'up',
438
                'port': '22',
439
                'username': 'username',
440
                'password': 'pass',
441
            }
442
        }
443
        self.maxDiff = None
444
        ret = w.build_credentials_as_prefs(cred_dict)
445
        self.assertEqual(ret, cred_out)
446
447
    def test_process_vts(self, mock_nvti, mock_db):
448
        vts = {
449
            '1.3.6.1.4.1.25623.1.0.100061': {
450
                '1': 'new value',
451
            },
452
            'vt_groups': ['family=debian', 'family=general'],
453
        }
454
        vt_out = (
455
            ['1.3.6.1.4.1.25623.1.0.100061'],
456
            [
457
                ['1.3.6.1.4.1.25623.1.0.100061:entry:Data length :', 'new value'],
458
            ],
459
        )
460
        w = DummyWrapper(mock_nvti, mock_db)
461
        w.load_vts()
462
        ret = w.process_vts(vts)
463
        self.assertEqual(ret, vt_out)
464
465
    def test_process_vts_bad_param_id(self, mock_nvti, mock_db):
466
        vts = {
467
            '1.3.6.1.4.1.25623.1.0.100061': {
468
                '3': 'new value',
469
            },
470
            'vt_groups': ['family=debian', 'family=general'],
471
        }
472
        w = DummyWrapper(mock_nvti, mock_db)
473
        w.load_vts()
474
        ret = w.process_vts(vts)
475
        self.assertFalse(ret[1])
476
477
    def test_get_openvas_timestamp_scan_host_end(self, mock_nvti, mock_db):
478
        mock_db.get_host_scan_scan_end_time.return_value = '12345'
479
        w = DummyWrapper(mock_nvti, mock_db)
480
        targets = [['192.168.0.1', 'port', 'cred', 'exclude_host']]
481
        w.create_scan('123-456', targets, None, [])
482
        w.get_openvas_timestamp_scan_host('123-456', '192.168.0.1')
483
        for result in w.scan_collection.results_iterator('123-456', False):
484
            self.assertEqual(result.get('value'), '12345')
485
486
    def test_get_openvas_timestamp_scan_host_start(self, mock_nvti, mock_db):
487
        mock_db.get_host_scan_scan_end_time.return_value = None
488
        mock_db.get_host_scan_scan_end_time.return_value = '54321'
489
        w = DummyWrapper(mock_nvti, mock_db)
490
        targets = [['192.168.0.1', 'port', 'cred', 'exclude_host']]
491
        w.create_scan('123-456', targets, None, [])
492
        w.get_openvas_timestamp_scan_host('123-456', '192.168.0.1')
493
        for result in w.scan_collection.results_iterator('123-456', False):
494
            self.assertEqual(result.get('value'), '54321')
495
496
    def test_scan_is_finished(self, mock_nvti, mock_db):
497
        mock_db.get_single_item.return_value = 'finished'
498
        w = DummyWrapper(mock_nvti, mock_db)
499
        ret = w.scan_is_finished('123-456')
500
        self.assertEqual(ret, True)
501
502
    def test_scan_is_stopped(self, mock_nvti, mock_db):
503
        mock_db.get_single_item.return_value = 'stop_all'
504
        mock_db.kb_connect_item.return_value = mock_db
505
        mock_db.set_redisctx.return_value = None
506
        w = DummyWrapper(mock_nvti, mock_db)
507
        ret = w.scan_is_stopped('123-456')
508
        self.assertEqual(ret, True)
509
510
    @patch('ospd_openvas.wrapper.open')
511
    def test_feed_is_outdated(self, mock_open, mock_nvti, mock_db):
512
        mock_open.return_value = ['PLUGIN_SET = "1234";']
513
        w = DummyWrapper(mock_nvti, mock_db)
514
        self.assertRaises(OspdOpenvasError, w.feed_is_outdated, '1234')
515
        # Return False
516
        w.scan_only_params['plugins_folder'] = '/foo/bar'
517
        ret = w.feed_is_outdated('1234')
518
        self.assertFalse(ret)
519
520
        # Return true
521
        mock_open.return_value = ['PLUGIN_SET = "1235";']
522
        ret = w.feed_is_outdated('1234')
523
        self.assertTrue(ret)
524
525
    @patch('ospd_openvas.wrapper.OSPDaemon.add_scan_log')
526
    def test_get_openvas_result(self, mock_ospd, mock_nvti, mock_db):
527
        results = ["LOG||| |||general/Host_Details||| |||Host dead", None]
528
        mock_db.get_result.side_effect = results
529
        w = DummyWrapper(mock_nvti, mock_db)
530
        w.load_vts()
531
        mock_ospd.return_value = None
532
        w.get_openvas_result('123-456', 'localhost')
533
        mock_ospd.assert_called_with(
534
            '123-456',
535
            host='localhost',
536
            hostname=' ',
537
            name='',
538
            port='general/Host_Details',
539
            qod='',
540
            test_id=' ',
541
            value='Host dead',
542
        )
543
544
    @patch('ospd_openvas.wrapper.OSPDaemon.set_scan_target_progress')
545
    def test_update_progress(self, mock_ospd, mock_nvti, mock_db):
546
        msg = '0/-1'
547
        targets = [['localhost', 'port', 'cred', 'exclude_host']]
548
        w = DummyWrapper(mock_nvti, mock_db)
549
        w.create_scan('123-456', targets, None, [])
550
551
        mock_ospd.return_value = None
552
        w.update_progress('123-456', 'localhost', 'localhost', msg)
553
        mock_ospd.assert_called_with('123-456', 'localhost', 'localhost', 100)
554
555
556
class TestFilters(unittest.TestCase):
557
    def test_format_vt_modification_time(self):
558
        ovformat = OpenVasVtsFilter()
559
        td = '$Date: 2018-02-01 02:09:01 +0200 (Thu, 18 Oct 2018) $'
560
        formatted = ovformat.format_vt_modification_time(td)
561
        self.assertEqual(formatted, "20180201020901")
562