Completed
Push — master ( 489d84...6ebf24 )
by Juan José
13s
created

TestOspdOpenvas.test_build_credentials()   B

Complexity

Conditions 1

Size

Total Lines 44
Code Lines 39

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 39
nop 3
dl 0
loc 44
rs 8.9439
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-2.0-or-later
5
#
6
# This program is free software; you can redistribute it and/or
7
# modify it under the terms of the GNU General Public License
8
# as published by the Free Software Foundation; either version 2
9
# of the License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
20
""" Unit Test for ospd-openvas """
21
22
import unittest
23
from unittest.mock import patch
24
from ospd_openvas.wrapper import OSPD_PARAMS
25
from tests.dummywrapper import DummyWrapper
26
27
OSPD_PARAMS_OUT = {
28
    'auto_enable_dependencies': {
29
        'type': 'boolean',
30
        'name': 'auto_enable_dependencies',
31
        'default': 1,
32
        'mandatory': 1,
33
        'description': 'Automatically enable the plugins that are depended on',
34
    },
35
    'cgi_path': {
36
        'type': 'string',
37
        'name': 'cgi_path',
38
        'default': '/cgi-bin:/scripts',
39
        'mandatory': 1,
40
        'description': 'Look for default CGIs in /cgi-bin and /scripts',
41
    },
42
    'checks_read_timeout': {
43
        'type': 'integer',
44
        'name': 'checks_read_timeout',
45
        'default': 5,
46
        'mandatory': 1,
47
        'description': 'Number  of seconds that the security checks will ' \
48
                       'wait for when doing a recv()',
49
    },
50
    'drop_privileges': {
51
        'type': 'boolean',
52
        'name': 'drop_privileges',
53
        'default': 0,
54
        'mandatory': 1,
55
        'description': '',
56
    },
57
    'network_scan': {
58
        'type': 'boolean',
59
        'name': 'network_scan',
60
        'default': 0,
61
        'mandatory': 1,
62
        'description': '',
63
    },
64
    'non_simult_ports': {
65
        'type': 'string',
66
        'name': 'non_simult_ports',
67
        'default': '22',
68
        'mandatory': 1,
69
        'description': 'Prevent to make two connections on the same given ' \
70
                       'ports at the same time.',
71
    },
72
    'open_sock_max_attempts': {
73
        'type': 'integer',
74
        'name': 'open_sock_max_attempts',
75
        'default': 5,
76
        'mandatory': 0,
77
        'description': 'Number of unsuccessful retries to open the socket ' \
78
                       'before to set the port as closed.',
79
    },
80
    'timeout_retry': {
81
        'type': 'integer',
82
        'name': 'timeout_retry',
83
        'default': 5,
84
        'mandatory': 0,
85
        'description': 'Number of retries when a socket connection attempt ' \
86
                       'timesout.',
87
    },
88
    'optimize_test': {
89
        'type': 'integer',
90
        'name': 'optimize_test',
91
        'default': 5,
92
        'mandatory': 0,
93
        'description': 'By default, openvassd does not trust the remote ' \
94
                       'host banners.',
95
    },
96
    'plugins_timeout': {
97
        'type': 'integer',
98
        'name': 'plugins_timeout',
99
        'default': 5,
100
        'mandatory': 0,
101
        'description': 'This is the maximum lifetime, in seconds of a plugin.',
102
    },
103
    'report_host_details': {
104
        'type': 'boolean',
105
        'name': 'report_host_details',
106
        'default': 1,
107
        'mandatory': 1,
108
        'description': '',
109
    },
110
    'safe_checks': {
111
        'type': 'boolean',
112
        'name': 'safe_checks',
113
        'default': 1,
114
        'mandatory': 1,
115
        'description': 'Disable the plugins with potential to crash ' \
116
                       'the remote services',
117
    },
118
    'scanner_plugins_timeout': {
119
        'type': 'integer',
120
        'name': 'scanner_plugins_timeout',
121
        'default': 36000,
122
        'mandatory': 1,
123
        'description': 'Like plugins_timeout, but for ACT_SCANNER plugins.',
124
    },
125
    'time_between_request': {
126
        'type': 'integer',
127
        'name': 'time_between_request',
128
        'default': 0,
129
        'mandatory': 0,
130
        'description': 'Allow to set a wait time between two actions ' \
131
                       '(open, send, close).',
132
    },
133
    'unscanned_closed': {
134
        'type': 'boolean',
135
        'name': 'unscanned_closed',
136
        'default': 1,
137
        'mandatory': 1,
138
        'description': '',
139
    },
140
    'unscanned_closed_udp': {
141
        'type': 'boolean',
142
        'name': 'unscanned_closed_udp',
143
        'default': 1,
144
        'mandatory': 1,
145
        'description': '',
146
    },
147
    'use_mac_addr': {
148
        'type': 'boolean',
149
        'name': 'use_mac_addr',
150
        'default': 0,
151
        'mandatory': 0,
152
        'description': 'To test the local network. ' \
153
                       'Hosts will be referred to by their MAC address.',
154
    },
155
    'vhosts': {
156
        'type': 'string',
157
        'name': 'vhosts',
158
        'default': '',
159
        'mandatory': 0,
160
        'description': '',
161
    },
162
    'vhosts_ip': {
163
        'type': 'string',
164
        'name': 'vhosts_ip',
165
        'default': '',
166
        'mandatory': 0,
167
        'description': '',
168
    },
169
}
170
171
172
@patch('ospd_openvas.db.OpenvasDB')
173
@patch('ospd_openvas.nvticache.NVTICache')
174
class TestOspdOpenvas(unittest.TestCase):
175
176
    @patch('ospd_openvas.wrapper.subprocess')
177
    def test_redis_nvticache_init(self, mock_subproc, mock_nvti, mock_db):
178
        mock_subproc.check_call.return_value = True
179
        w = DummyWrapper(mock_nvti, mock_db)
180
        w.redis_nvticache_init()
181
        self.assertEqual(mock_subproc.check_call.call_count, 1)
182
183
    @patch('ospd_openvas.wrapper.subprocess')
184
    def test_parse_param(self, mock_subproc, mock_nvti, mock_db):
185
186
        mock_subproc.check_output.return_value = \
187
            'non_simult_ports = 22'.encode()
188
        w =  DummyWrapper(mock_nvti, mock_db)
189
        w.parse_param()
190
        self.assertEqual(mock_subproc.check_output.call_count, 1)
191
        self.assertEqual(OSPD_PARAMS, OSPD_PARAMS_OUT)
192
193
    def test_load_vts(self, mock_nvti, mock_db):
194
        w =  DummyWrapper(mock_nvti, mock_db)
195
        w.load_vts()
196
        self.maxDiff = None
197
        self.assertEqual(w.vts, w.VT)
198
199
    def test_get_custom_xml(self, mock_nvti, mock_db):
200
        out = '<required_ports>Services/www, 80</re' \
201
              'quired_ports><category>3</category><' \
202
              'excluded_keys>Settings/disable_cgi_s' \
203
              'canning</excluded_keys><family>Produ' \
204
              'ct detection</family><filename>manti' \
205
              's_detect.nasl</filename><timeout>0</' \
206
              'timeout>'
207
        w =  DummyWrapper(mock_nvti, mock_db)
208
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
209
        res = w.get_custom_vt_as_xml_str(
210
            '1.3.6.1.4.1.25623.1.0.100061',
211
            vt.get('custom'))
212
        self.assertEqual(len(res), len(out))
213
214
    def test_get_severities_xml(self, mock_nvti, mock_db):
215
        w =  DummyWrapper(mock_nvti, mock_db)
216
        out = '<severity type="cvss_base_v2">' \
217
              'AV:N/AC:L/Au:N/C:N/I:N/A:N</severity>'
218
        vt =w.VT['1.3.6.1.4.1.25623.1.0.100061']
219
        severities = vt.get('severities')
220
        res = w.get_severities_vt_as_xml_str(
221
            '1.3.6.1.4.1.25623.1.0.100061', severities)
222
223
        self.assertEqual(res, out)
224
225
    def test_get_params_xml(self, mock_nvti, mock_db):
226
        w =  DummyWrapper(mock_nvti, mock_db)
227
        out = '<vt_param type="checkbox" id="Do not randomize the  ' \
228
              'order  in  which ports are scanned"><name>Do not ran' \
229
              'domize the  order  in  which ports are scanned</name' \
230
              '><default>no</default></vt_param><vt_param type="ent' \
231
              'ry" id="Data length : "><name>Data length : </name><' \
232
              '/vt_param>'
233
234
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
235
        params = vt.get('vt_params')
236
        res = w.get_params_vt_as_xml_str(
237
            '1.3.6.1.4.1.25623.1.0.100061', params)
238
        self.assertEqual(len(res), len(out))
239
240
    def test_get_refs_xml(self, mock_nvti, mock_db):
241
        w =  DummyWrapper(mock_nvti, mock_db)
242
        out = '<ref type="url" id="http://www.mantisbt.org/"/>'
243
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
244
        refs = vt.get('vt_refs')
245
        res = w.get_refs_vt_as_xml_str(
246
            '1.3.6.1.4.1.25623.1.0.100061', refs)
247
248
        self.assertEqual(res, out)
249
250
    def test_get_dependencies_xml(self, mock_nvti, mock_db):
251
        w =  DummyWrapper(mock_nvti, mock_db)
252
        out = '<dependency vt_id="1.2.3.4"/><dependency vt_id="4.3.2.1"/>'
253
        dep = ['1.2.3.4', '4.3.2.1']
254
        res = w.get_dependencies_vt_as_xml_str(
255
            '1.3.6.1.4.1.25623.1.0.100061', dep)
256
257
        self.assertEqual(res, out)
258
259
    def test_get_ctime_xml(self, mock_nvti, mock_db):
260
        w =  DummyWrapper(mock_nvti, mock_db)
261
        out = '2009-03-19 11:22:36 +0100 (Thu, 19 Mar 2009)'
262
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
263
        ctime = vt.get('creation_time')
264
        res = w.get_creation_time_vt_as_xml_str(
265
            '1.3.6.1.4.1.25623.1.0.100061', ctime)
266
267
        self.assertEqual(res, out)
268
269
    def test_get_mtime_xml(self, mock_nvti, mock_db):
270
        w =  DummyWrapper(mock_nvti, mock_db)
271
        out = '$Date: 2018-08-10 15:09:25 +0200 (Fri, 10 Aug 2018) $'
272
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
273
        mtime = vt.get('modification_time')
274
        res = w.get_modification_time_vt_as_xml_str(
275
            '1.3.6.1.4.1.25623.1.0.100061', mtime)
276
277
        self.assertEqual(res, out)
278
279
    def test_get_summary_xml(self, mock_nvti, mock_db):
280
        w =  DummyWrapper(mock_nvti, mock_db)
281
        out = '<summary>some summary</summary>'
282
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
283
        summary = vt.get('summary')
284
        res = w.get_summary_vt_as_xml_str(
285
            '1.3.6.1.4.1.25623.1.0.100061', summary)
286
287
        self.assertEqual(res, out)
288
289
    def test_get_impact_xml(self, mock_nvti, mock_db):
290
        w =  DummyWrapper(mock_nvti, mock_db)
291
        out = '<impact>some impact</impact>'
292
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
293
        impact = vt.get('impact')
294
        res = w.get_impact_vt_as_xml_str(
295
            '1.3.6.1.4.1.25623.1.0.100061', impact)
296
297
        self.assertEqual(res, out)
298
299
    def test_get_insight_xml(self, mock_nvti, mock_db):
300
        w =  DummyWrapper(mock_nvti, mock_db)
301
        out = '<insight>some insight</insight>'
302
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
303
        insight = vt.get('insight')
304
        res = w.get_insight_vt_as_xml_str(
305
            '1.3.6.1.4.1.25623.1.0.100061', insight)
306
307
        self.assertEqual(res, out)
308
309
    def test_get_solution_xml(self, mock_nvti, mock_db):
310
        w =  DummyWrapper(mock_nvti, mock_db)
311
        out = '<solution type="WillNotFix">some solution</solution>'
312
        vt =w.VT['1.3.6.1.4.1.25623.1.0.100061']
313
        solution = vt.get('solution')
314
        solution_type = vt.get('solution_type')
315
316
        res = w.get_solution_vt_as_xml_str(
317
            '1.3.6.1.4.1.25623.1.0.100061', solution, solution_type)
318
319
        self.assertEqual(res, out)
320
321
    def test_get_detection_xml(self, mock_nvti, mock_db):
322
        w =  DummyWrapper(mock_nvti, mock_db)
323
        out = '<detection qod_type="remote_banner"/>'
324
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
325
        detection_type = vt.get('qod_type')
326
327
        res = w.get_detection_vt_as_xml_str(
328
            '1.3.6.1.4.1.25623.1.0.100061', qod_type=detection_type)
329
330
        self.assertEqual(res, out)
331
332
    def test_get_affected_xml(self, mock_nvti, mock_db):
333
        w =  DummyWrapper(mock_nvti, mock_db)
334
        out = '<affected>some affection</affected>'
335
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
336
        affected =  vt.get('affected')
337
338
        res = w.get_affected_vt_as_xml_str(
339
            '1.3.6.1.4.1.25623.1.0.100061', affected=affected)
340
341
        self.assertEqual(res, out)
342
343
    def test_build_credentials(self, mock_nvti, mock_db):
344
        w =  DummyWrapper(mock_nvti, mock_db)
345
        cred_out = [
346
            'auth_port_ssh|||22',
347
            'SSH Authorization[entry]:SSH login name:|||username',
348
            'SSH Authorization[password]:SSH key passphrase:|||pass',
349
            'SSH Authorization[file]:SSH private key:|||',
350
            'SMB Authorization[entry]:SMB login:|||username',
351
            'SMB Authorization[password]:SMB password :|||pass',
352
            'SNMP Authorization[password]:SNMP Community:some comunity',
353
            'SNMP Authorization[entry]:SNMPv3 Username:username',
354
            'SNMP Authorization[password]:SNMPv3 Password:pass',
355
            'SNMP Authorization[radio]:SNMPv3 Authentication Algorithm:some auth algo',
356
            'SNMP Authorization[password]:SNMPv3 Privacy Password:privacy pass',
357
            'SNMP Authorization[radio]:SNMPv3 Privacy Algorithm:privacy algo',
358
            'ESXi Authorization[entry]:ESXi login name:|||username',
359
            'ESXi Authorization[password]:ESXi login password:|||pass']
360
        cred_dict = {
361
            'ssh': {'type': 'ssh',
362
                    'port': '22',
363
                    'username': 'username',
364
                    'password': 'pass',
365
            },
366
            'smb': {'type': 'smb',
367
                    'username': 'username',
368
                    'password': 'pass',
369
            },
370
            'esxi': {'type': 'esxi',
371
                    'username': 'username',
372
                    'password': 'pass',
373
            },
374
            'snmp': {'type': 'snmp',
375
                     'username': 'username',
376
                     'password': 'pass',
377
                     'community': 'some comunity',
378
                     'auth_algorithm': 'some auth algo',
379
                     'privacy_password': 'privacy pass',
380
                     'privacy_algorithm': 'privacy algo',
381
            },
382
        }
383
        self.maxDiff=None
384
        ret = w.build_credentials_as_prefs(cred_dict)
385
        self.assertEqual(len(ret), len(cred_out))
386
        self.assertIn('auth_port_ssh|||22', cred_out)
387
388
    def test_build_credentials_ssh_up(self, mock_nvti, mock_db):
389
        w =  DummyWrapper(mock_nvti, mock_db)
390
        cred_out = [
391
            'auth_port_ssh|||22',
392
            'SSH Authorization[entry]:SSH login name:|||username',
393
            'SSH Authorization[password]:SSH password (unsafe!):|||pass']
394
        cred_dict = {
395
            'ssh': {'type': 'up',
396
                    'port': '22',
397
                    'username': 'username',
398
                    'password': 'pass',
399
            }
400
        }
401
        self.maxDiff=None
402
        ret = w.build_credentials_as_prefs(cred_dict)
403
        self.assertEqual(ret, cred_out)
404
405
    def test_process_vts(self, mock_nvti, mock_db):
406
        vts = {
407
            '1.3.6.1.4.1.25623.1.0.100061': {
408
                'Data length : ': 'new value',
409
                'Do not randomize the  order  in  which ports are ' \
410
                'scanned': 'new value'},
411
            'vt_groups': ['family=debian', 'family=general']
412
        }
413
        vt_out = (
414
            ['1.3.6.1.4.1.25623.1.0.100061'],
415
            [['Mantis Detection[checkbox]:Do not randomize the  order  i' \
416
              'n  which ports are scanned', 'new value'],
417
             ['Mantis Detection[entry]:Data length : ', 'new value']]
418
            )
419
        w =  DummyWrapper(mock_nvti, mock_db)
420
        w.load_vts()
421
        ret = w.process_vts(vts)
422
        self.assertEqual(len(ret), len(vt_out))
423
424
    def test_get_openvas_timestamp_scan_host_end(self, mock_nvti, mock_db):
425
        mock_db.get_host_scan_scan_end_time.return_value = '12345'
426
        w =  DummyWrapper(mock_nvti, mock_db)
427
        w.create_scan('123-456', '192.168.0.1', '192.168.0.1', None, [])
428
        w.get_openvas_timestamp_scan_host('123-456', '192.168.0.1')
429
        for result in  w.scan_collection.results_iterator('123-456', False):
430
            self.assertEqual(result.get('value'), '12345')
431
432
    def test_get_openvas_timestamp_scan_host_start(self, mock_nvti, mock_db):
433
        mock_db.get_host_scan_scan_end_time.return_value = None
434
        mock_db.get_host_scan_scan_end_time.return_value = '54321'
435
        w =  DummyWrapper(mock_nvti, mock_db)
436
        w.create_scan('123-456', '192.168.0.1', '192.168.0.1', None, [])
437
        w.get_openvas_timestamp_scan_host('123-456', '192.168.0.1')
438
        for result in  w.scan_collection.results_iterator('123-456', False):
439
            self.assertEqual(result.get('value'), '54321')
440
441
    def test_scan_is_finished(self, mock_nvti, mock_db):
442
        mock_db.get_single_item.return_value = 'finished'
443
        w =  DummyWrapper(mock_nvti, mock_db)
444
        ret = w.scan_is_finished('123-456')
445
        self.assertEqual(ret, True)
446
447
    def test_scan_is_stopped(self, mock_nvti, mock_db):
448
        mock_db.get_single_item.return_value = 'stop_all'
449
        mock_db.kb_connect_item.return_value = mock_db
450
        mock_db.set_redisctx.return_value = None
451
        w =  DummyWrapper(mock_nvti, mock_db)
452
        ret = w.scan_is_stopped('123-456')
453
        self.assertEqual(ret, True)
454