Passed
Pull Request — master (#69)
by Juan José
01:26
created

TestOspdOpenvas.test_update_progress()   A

Complexity

Conditions 1

Size

Total Lines 14
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 13
nop 4
dl 0
loc 14
rs 9.75
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-2.0-or-later
5
#
6
# This program is free software; you can redistribute it and/or
7
# modify it under the terms of the GNU General Public License
8
# as published by the Free Software Foundation; either version 2
9
# of the License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
20
""" Unit Test for ospd-openvas """
21
22
import unittest
23
from unittest.mock import patch
24
from ospd_openvas.wrapper import OSPD_PARAMS
25
from tests.dummywrapper import DummyWrapper
26
from  ospd_openvas.wrapper import OpenVasVtsFilter
27
from ospd_openvas.errors import OSPDOpenvasError
28
29
OSPD_PARAMS_OUT = {
30
    'auto_enable_dependencies': {
31
        'type': 'boolean',
32
        'name': 'auto_enable_dependencies',
33
        'default': 1,
34
        'mandatory': 1,
35
        'description': 'Automatically enable the plugins that are depended on',
36
    },
37
    'cgi_path': {
38
        'type': 'string',
39
        'name': 'cgi_path',
40
        'default': '/cgi-bin:/scripts',
41
        'mandatory': 1,
42
        'description': 'Look for default CGIs in /cgi-bin and /scripts',
43
    },
44
    'checks_read_timeout': {
45
        'type': 'integer',
46
        'name': 'checks_read_timeout',
47
        'default': 5,
48
        'mandatory': 1,
49
        'description': 'Number  of seconds that the security checks will ' \
50
                       'wait for when doing a recv()',
51
    },
52
    'drop_privileges': {
53
        'type': 'boolean',
54
        'name': 'drop_privileges',
55
        'default': 0,
56
        'mandatory': 1,
57
        'description': '',
58
    },
59
    'network_scan': {
60
        'type': 'boolean',
61
        'name': 'network_scan',
62
        'default': 0,
63
        'mandatory': 1,
64
        'description': '',
65
    },
66
    'non_simult_ports': {
67
        'type': 'string',
68
        'name': 'non_simult_ports',
69
        'default': '22',
70
        'mandatory': 1,
71
        'description': 'Prevent to make two connections on the same given ' \
72
                       'ports at the same time.',
73
    },
74
    'open_sock_max_attempts': {
75
        'type': 'integer',
76
        'name': 'open_sock_max_attempts',
77
        'default': 5,
78
        'mandatory': 0,
79
        'description': 'Number of unsuccessful retries to open the socket ' \
80
                       'before to set the port as closed.',
81
    },
82
    'timeout_retry': {
83
        'type': 'integer',
84
        'name': 'timeout_retry',
85
        'default': 5,
86
        'mandatory': 0,
87
        'description': 'Number of retries when a socket connection attempt ' \
88
                       'timesout.',
89
    },
90
    'optimize_test': {
91
        'type': 'integer',
92
        'name': 'optimize_test',
93
        'default': 5,
94
        'mandatory': 0,
95
        'description': 'By default, openvassd does not trust the remote ' \
96
                       'host banners.',
97
    },
98
    'plugins_timeout': {
99
        'type': 'integer',
100
        'name': 'plugins_timeout',
101
        'default': 5,
102
        'mandatory': 0,
103
        'description': 'This is the maximum lifetime, in seconds of a plugin.',
104
    },
105
    'report_host_details': {
106
        'type': 'boolean',
107
        'name': 'report_host_details',
108
        'default': 1,
109
        'mandatory': 1,
110
        'description': '',
111
    },
112
    'safe_checks': {
113
        'type': 'boolean',
114
        'name': 'safe_checks',
115
        'default': 1,
116
        'mandatory': 1,
117
        'description': 'Disable the plugins with potential to crash ' \
118
                       'the remote services',
119
    },
120
    'scanner_plugins_timeout': {
121
        'type': 'integer',
122
        'name': 'scanner_plugins_timeout',
123
        'default': 36000,
124
        'mandatory': 1,
125
        'description': 'Like plugins_timeout, but for ACT_SCANNER plugins.',
126
    },
127
    'time_between_request': {
128
        'type': 'integer',
129
        'name': 'time_between_request',
130
        'default': 0,
131
        'mandatory': 0,
132
        'description': 'Allow to set a wait time between two actions ' \
133
                       '(open, send, close).',
134
    },
135
    'unscanned_closed': {
136
        'type': 'boolean',
137
        'name': 'unscanned_closed',
138
        'default': 1,
139
        'mandatory': 1,
140
        'description': '',
141
    },
142
    'unscanned_closed_udp': {
143
        'type': 'boolean',
144
        'name': 'unscanned_closed_udp',
145
        'default': 1,
146
        'mandatory': 1,
147
        'description': '',
148
    },
149
    'use_mac_addr': {
150
        'type': 'boolean',
151
        'name': 'use_mac_addr',
152
        'default': 0,
153
        'mandatory': 0,
154
        'description': 'To test the local network. ' \
155
                       'Hosts will be referred to by their MAC address.',
156
    },
157
    'vhosts': {
158
        'type': 'string',
159
        'name': 'vhosts',
160
        'default': '',
161
        'mandatory': 0,
162
        'description': '',
163
    },
164
    'vhosts_ip': {
165
        'type': 'string',
166
        'name': 'vhosts_ip',
167
        'default': '',
168
        'mandatory': 0,
169
        'description': '',
170
    },
171
}
172
173
174
@patch('ospd_openvas.db.OpenvasDB')
175
@patch('ospd_openvas.nvticache.NVTICache')
176
class TestOspdOpenvas(unittest.TestCase):
177
178
    @patch('ospd_openvas.wrapper.subprocess')
179
    def test_redis_nvticache_init(self, mock_subproc, mock_nvti, mock_db):
180
        mock_subproc.check_call.return_value = True
181
        w = DummyWrapper(mock_nvti, mock_db)
182
        w.redis_nvticache_init()
183
        self.assertEqual(mock_subproc.check_call.call_count, 1)
184
185
    @patch('ospd_openvas.wrapper.subprocess')
186
    def test_parse_param(self, mock_subproc, mock_nvti, mock_db):
187
188
        mock_subproc.check_output.return_value = \
189
            'non_simult_ports = 22\nplugins_folder = /foo/bar'.encode()
190
        w =  DummyWrapper(mock_nvti, mock_db)
191
        w.parse_param()
192
        self.assertEqual(mock_subproc.check_output.call_count, 1)
193
        self.assertEqual(OSPD_PARAMS, OSPD_PARAMS_OUT)
194
        self.assertEqual(w.scan_only_params.get('plugins_folder'), '/foo/bar')
195
196
    def test_load_vts(self, mock_nvti, mock_db):
197
        w =  DummyWrapper(mock_nvti, mock_db)
198
        w.load_vts()
199
        self.maxDiff = None
200
        self.assertEqual(w.vts, w.VT)
201
202
    def test_get_custom_xml(self, mock_nvti, mock_db):
203
        out = '<custom><required_ports>Services/www, 80</re' \
204
              'quired_ports><category>3</category><' \
205
              'excluded_keys>Settings/disable_cgi_s' \
206
              'canning</excluded_keys><family>Produ' \
207
              'ct detection</family><filename>manti' \
208
              's_detect.nasl</filename><timeout>0</' \
209
              'timeout></custom>'
210
        w =  DummyWrapper(mock_nvti, mock_db)
211
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
212
        res = w.get_custom_vt_as_xml_str(
213
            '1.3.6.1.4.1.25623.1.0.100061',
214
            vt.get('custom'))
215
        self.assertEqual(len(res), len(out))
216
217
    def test_get_severities_xml(self, mock_nvti, mock_db):
218
        w =  DummyWrapper(mock_nvti, mock_db)
219
        out = '<severities><severity type="cvss_base_v2">' \
220
              'AV:N/AC:L/Au:N/C:N/I:N/A:N</severity></severities>'
221
        vt =w.VT['1.3.6.1.4.1.25623.1.0.100061']
222
        severities = vt.get('severities')
223
        res = w.get_severities_vt_as_xml_str(
224
            '1.3.6.1.4.1.25623.1.0.100061', severities)
225
226
        self.assertEqual(res, out)
227
228
    def test_get_params_xml(self, mock_nvti, mock_db):
229
        w =  DummyWrapper(mock_nvti, mock_db)
230
        out = '<vt_params><vt_param type="checkbox" id="2"><name>Do ' \
231
              'not randomize the  order  in  which ports are scanned</name' \
232
              '><default>no</default></vt_param><vt_param type="ent' \
233
              'ry" id="1"><name>Data length :</name><' \
234
              '/vt_param></vt_params>'
235
236
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
237
        params = vt.get('vt_params')
238
        res = w.get_params_vt_as_xml_str(
239
            '1.3.6.1.4.1.25623.1.0.100061', params)
240
        self.assertEqual(len(res), len(out))
241
242
    def test_get_refs_xml(self, mock_nvti, mock_db):
243
        w =  DummyWrapper(mock_nvti, mock_db)
244
        out = '<refs><ref type="url" id="http://www.mantisbt.org/"/>' \
245
              '</refs>'
246
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
247
        refs = vt.get('vt_refs')
248
        res = w.get_refs_vt_as_xml_str(
249
            '1.3.6.1.4.1.25623.1.0.100061', refs)
250
251
        self.assertEqual(res, out)
252
253
    def test_get_dependencies_xml(self, mock_nvti, mock_db):
254
        w =  DummyWrapper(mock_nvti, mock_db)
255
        out = '<dependencies><dependency vt_id="1.2.3.4"/><dependency vt' \
256
              '_id="4.3.2.1"/></dependencies>'
257
        dep = ['1.2.3.4', '4.3.2.1']
258
        res = w.get_dependencies_vt_as_xml_str(
259
            '1.3.6.1.4.1.25623.1.0.100061', dep)
260
261
        self.assertEqual(res, out)
262
263
    def test_get_ctime_xml(self, mock_nvti, mock_db):
264
        w =  DummyWrapper(mock_nvti, mock_db)
265
        out = '<creation_time>2009-03-19 11:22:36 +0100 ' \
266
              '(Thu, 19 Mar 2009)</creation_time>'
267
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
268
        ctime = vt.get('creation_time')
269
        res = w.get_creation_time_vt_as_xml_str(
270
            '1.3.6.1.4.1.25623.1.0.100061', ctime)
271
272
        self.assertEqual(res, out)
273
274
    def test_get_mtime_xml(self, mock_nvti, mock_db):
275
        w =  DummyWrapper(mock_nvti, mock_db)
276
        out = '<modification_time>$Date: 2018-08-10 15:09:25 +0200 ' \
277
              '(Fri, 10 Aug 2018) $</modification_time>'
278
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
279
        mtime = vt.get('modification_time')
280
        res = w.get_modification_time_vt_as_xml_str(
281
            '1.3.6.1.4.1.25623.1.0.100061', mtime)
282
283
        self.assertEqual(res, out)
284
285
    def test_get_summary_xml(self, mock_nvti, mock_db):
286
        w =  DummyWrapper(mock_nvti, mock_db)
287
        out = '<summary>some summary</summary>'
288
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
289
        summary = vt.get('summary')
290
        res = w.get_summary_vt_as_xml_str(
291
            '1.3.6.1.4.1.25623.1.0.100061', summary)
292
293
        self.assertEqual(res, out)
294
295
    def test_get_impact_xml(self, mock_nvti, mock_db):
296
        w =  DummyWrapper(mock_nvti, mock_db)
297
        out = '<impact>some impact</impact>'
298
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
299
        impact = vt.get('impact')
300
        res = w.get_impact_vt_as_xml_str(
301
            '1.3.6.1.4.1.25623.1.0.100061', impact)
302
303
        self.assertEqual(res, out)
304
305
    def test_get_insight_xml(self, mock_nvti, mock_db):
306
        w =  DummyWrapper(mock_nvti, mock_db)
307
        out = '<insight>some insight</insight>'
308
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
309
        insight = vt.get('insight')
310
        res = w.get_insight_vt_as_xml_str(
311
            '1.3.6.1.4.1.25623.1.0.100061', insight)
312
313
        self.assertEqual(res, out)
314
315
    def test_get_solution_xml(self, mock_nvti, mock_db):
316
        w =  DummyWrapper(mock_nvti, mock_db)
317
        out = '<solution type="WillNotFix">some solution</solution>'
318
        vt =w.VT['1.3.6.1.4.1.25623.1.0.100061']
319
        solution = vt.get('solution')
320
        solution_type = vt.get('solution_type')
321
322
        res = w.get_solution_vt_as_xml_str(
323
            '1.3.6.1.4.1.25623.1.0.100061', solution, solution_type)
324
325
        self.assertEqual(res, out)
326
327
    def test_get_detection_xml(self, mock_nvti, mock_db):
328
        w =  DummyWrapper(mock_nvti, mock_db)
329
        out = '<detection qod_type="remote_banner"/>'
330
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
331
        detection_type = vt.get('qod_type')
332
333
        res = w.get_detection_vt_as_xml_str(
334
            '1.3.6.1.4.1.25623.1.0.100061', qod_type=detection_type)
335
336
        self.assertEqual(res, out)
337
338
    def test_get_affected_xml(self, mock_nvti, mock_db):
339
        w =  DummyWrapper(mock_nvti, mock_db)
340
        out = '<affected>some affection</affected>'
341
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
342
        affected =  vt.get('affected')
343
344
        res = w.get_affected_vt_as_xml_str(
345
            '1.3.6.1.4.1.25623.1.0.100061', affected=affected)
346
347
        self.assertEqual(res, out)
348
349
    def test_build_credentials(self, mock_nvti, mock_db):
350
        w =  DummyWrapper(mock_nvti, mock_db)
351
352
        cred_out = [
353
            '1.3.6.1.4.1.25623.1.0.105058:1:entry:ESXi login name:|||username',
354
            '1.3.6.1.4.1.25623.1.0.105058:2:password:ESXi login password:|||pass',
355
            'auth_port_ssh|||22',
356
            '1.3.6.1.4.1.25623.1.0.103591:1:entry:SSH login name:|||username',
357
            '1.3.6.1.4.1.25623.1.0.103591:2:password:SSH key passphrase:|||pass',
358
            '1.3.6.1.4.1.25623.1.0.103591:4:file:SSH private key:|||',
359
            '1.3.6.1.4.1.25623.1.0.90023:1:entry:SMB login:|||username',
360
            '1.3.6.1.4.1.25623.1.0.90023:2:password]:SMB password :|||pass',
361
            '1.3.6.1.4.1.25623.1.0.105076:1:password:SNMP Community:some comunity',
362
            '1.3.6.1.4.1.25623.1.0.105076:2:entry:SNMPv3 Username:username',
363
            '1.3.6.1.4.1.25623.1.0.105076:3:password:SNMPv3 Password:pass',
364
            '1.3.6.1.4.1.25623.1.0.105076:4:radio:SNMPv3 Authentication Algorithm:some auth algo',
365
            '1.3.6.1.4.1.25623.1.0.105076:5:password:SNMPv3 Privacy Password:privacy pass',
366
            '1.3.6.1.4.1.25623.1.0.105076:6:radio:SNMPv3 Privacy Algorithm:privacy algo']
367
        cred_dict = {
368
            'ssh': {'type': 'ssh',
369
                    'port': '22',
370
                    'username': 'username',
371
                    'password': 'pass',
372
            },
373
            'smb': {'type': 'smb',
374
                    'username': 'username',
375
                    'password': 'pass',
376
            },
377
            'esxi': {'type': 'esxi',
378
                    'username': 'username',
379
                    'password': 'pass',
380
            },
381
            'snmp': {'type': 'snmp',
382
                     'username': 'username',
383
                     'password': 'pass',
384
                     'community': 'some comunity',
385
                     'auth_algorithm': 'some auth algo',
386
                     'privacy_password': 'privacy pass',
387
                     'privacy_algorithm': 'privacy algo',
388
            },
389
        }
390
        self.maxDiff=None
391
        ret = w.build_credentials_as_prefs(cred_dict)
392
        self.assertEqual(len(ret), len(cred_out))
393
        self.assertIn('auth_port_ssh|||22', cred_out)
394
        self.assertIn(
395
            '1.3.6.1.4.1.25623.1.0.90023:1:entry:SMB login:|||username',
396
            cred_out)
397
398
    def test_build_credentials_ssh_up(self, mock_nvti, mock_db):
399
        w =  DummyWrapper(mock_nvti, mock_db)
400
        cred_out = [
401
            'auth_port_ssh|||22',
402
            '1.3.6.1.4.1.25623.1.0.103591:1:entry:SSH login name:|||username',
403
            '1.3.6.1.4.1.25623.1.0.103591:3:password:SSH password (unsafe!):|||pass']
404
        cred_dict = {
405
            'ssh': {'type': 'up',
406
                    'port': '22',
407
                    'username': 'username',
408
                    'password': 'pass',
409
            }
410
        }
411
        self.maxDiff=None
412
        ret = w.build_credentials_as_prefs(cred_dict)
413
        self.assertEqual(ret, cred_out)
414
415
    def test_process_vts(self, mock_nvti, mock_db):
416
        vts = {
417
            '1.3.6.1.4.1.25623.1.0.100061': {
418
                'Data length :': 'new value',
419
                'Do not randomize the  order  in  which ports are ' \
420
                'scanned': 'new value'},
421
            'vt_groups': ['family=debian', 'family=general']
422
        }
423
        vt_out = (
424
            ['1.3.6.1.4.1.25623.1.0.100061'],
425
            [['Mantis Detection[checkbox]:Do not randomize the  order  i' \
426
              'n  which ports are scanned', 'new value'],
427
             ['Mantis Detection[entry]:Data length : ', 'new value']]
428
            )
429
        w =  DummyWrapper(mock_nvti, mock_db)
430
        w.load_vts()
431
        ret = w.process_vts(vts)
432
        self.assertEqual(len(ret), len(vt_out))
433
434
    def test_get_openvas_timestamp_scan_host_end(self, mock_nvti, mock_db):
435
        mock_db.get_host_scan_scan_end_time.return_value = '12345'
436
        w =  DummyWrapper(mock_nvti, mock_db)
437
        targets = [['192.168.0.1', 'port', 'cred']]
438
        w.create_scan('123-456', targets, None, [])
439
        w.get_openvas_timestamp_scan_host('123-456', '192.168.0.1')
440
        for result in  w.scan_collection.results_iterator('123-456', False):
441
            self.assertEqual(result.get('value'), '12345')
442
443
    def test_get_openvas_timestamp_scan_host_start(self, mock_nvti, mock_db):
444
        mock_db.get_host_scan_scan_end_time.return_value = None
445
        mock_db.get_host_scan_scan_end_time.return_value = '54321'
446
        w =  DummyWrapper(mock_nvti, mock_db)
447
        targets = [['192.168.0.1', 'port', 'cred']]
448
        w.create_scan('123-456', targets, None, [])
449
        w.get_openvas_timestamp_scan_host('123-456', '192.168.0.1')
450
        for result in  w.scan_collection.results_iterator('123-456', False):
451
            self.assertEqual(result.get('value'), '54321')
452
453
    def test_scan_is_finished(self, mock_nvti, mock_db):
454
        mock_db.get_single_item.return_value = 'finished'
455
        w =  DummyWrapper(mock_nvti, mock_db)
456
        ret = w.scan_is_finished('123-456')
457
        self.assertEqual(ret, True)
458
459
    def test_scan_is_stopped(self, mock_nvti, mock_db):
460
        mock_db.get_single_item.return_value = 'stop_all'
461
        mock_db.kb_connect_item.return_value = mock_db
462
        mock_db.set_redisctx.return_value = None
463
        w =  DummyWrapper(mock_nvti, mock_db)
464
        ret = w.scan_is_stopped('123-456')
465
        self.assertEqual(ret, True)
466
467
    @patch('ospd_openvas.wrapper.open')
468
    def test_feed_is_outdated(self, mock_open, mock_nvti, mock_db):
469
        mock_open.return_value = ['PLUGIN_SET = "1234";']
470
        w =  DummyWrapper(mock_nvti, mock_db)
471
        self.assertRaises(OSPDOpenvasError, w.feed_is_outdated, '1234')
472
        # Return False
473
        w.scan_only_params['plugins_folder'] = '/foo/bar'
474
        ret = w.feed_is_outdated('1234')
475
        self.assertFalse(ret)
476
477
        # Return true
478
        mock_open.return_value = ['PLUGIN_SET = "1235";']
479
        ret = w.feed_is_outdated('1234')
480
        self.assertTrue(ret)
481
482
    @patch('ospd_openvas.wrapper.OSPDaemon.add_scan_log')
483
    def test_get_openvas_result(self, mock_ospd, mock_nvti, mock_db):
484
        results = [
485
            "LOG||| |||general/Host_Details||| |||Host dead",
486
            None
487
            ]
488
        mock_db.get_result.side_effect = results
489
        w =  DummyWrapper(mock_nvti, mock_db)
490
        w.load_vts()
491
        mock_ospd.return_value = None
492
        w.get_openvas_result('123-456', 'localhost')
493
        mock_ospd.assert_called_with(
494
            '123-456',
495
            host='localhost',
496
            name='',
497
            port='general/Host_Details',
498
            qod='',
499
            test_id=' ',
500
            value='Host dead')
501
502
    @patch('ospd_openvas.wrapper.OSPDaemon.set_scan_target_progress')
503
    def test_update_progress(self, mock_ospd, mock_nvti, mock_db):
504
        msg = '0/-1'
505
        targets = [['localhost', 'port', 'cred']]
506
        w =  DummyWrapper(mock_nvti, mock_db)
507
        w.create_scan('123-456', targets, None, [])
508
509
        mock_ospd.return_value = None
510
        w.update_progress('123-456', 'localhost', 'localhost', msg)
511
        mock_ospd.assert_called_with(
512
            '123-456',
513
            'localhost',
514
            'localhost',
515
            100,
516
        )
517
518
class TestFilters(unittest.TestCase):
519
520
    def test_format_vt_modification_time(self):
521
        ovformat = OpenVasVtsFilter()
522
        td = '$Date: 2018-02-01 02:09:01 +0200 (Thu, 18 Oct 2018) $'
523
        formatted = ovformat.format_vt_modification_time(td)
524
        self.assertEqual(formatted, "20180201020901")
525