Completed
Push — master ( 9fc341...dffd8e )
by Juan José
15s queued 13s
created

TestOspdOpenvas.test_get_openvas_result_escaped()   A

Complexity

Conditions 1

Size

Total Lines 22
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 20
nop 4
dl 0
loc 22
rs 9.4
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018-2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-2.0-or-later
5
#
6
# This program is free software; you can redistribute it and/or
7
# modify it under the terms of the GNU General Public License
8
# as published by the Free Software Foundation; either version 2
9
# of the License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
20
# pylint: disable=invalid-name,line-too-long
21
22
""" Unit Test for ospd-openvas """
23
24
from unittest import TestCase
25
from unittest.mock import patch
26
from unittest.mock import Mock
27
28
from multiprocessing import Manager
29
30
import io
31
import logging
32
33
from tests.dummydaemon import DummyDaemon
34
35
from ospd_openvas.daemon import OSPD_PARAMS, OpenVasVtsFilter, Path
36
from ospd_openvas.errors import OspdOpenvasError
37
38
OSPD_PARAMS_OUT = {
39
    'auto_enable_dependencies': {
40
        'type': 'boolean',
41
        'name': 'auto_enable_dependencies',
42
        'default': 1,
43
        'mandatory': 1,
44
        'description': 'Automatically enable the plugins that are depended on',
45
    },
46
    'cgi_path': {
47
        'type': 'string',
48
        'name': 'cgi_path',
49
        'default': '/cgi-bin:/scripts',
50
        'mandatory': 1,
51
        'description': 'Look for default CGIs in /cgi-bin and /scripts',
52
    },
53
    'checks_read_timeout': {
54
        'type': 'integer',
55
        'name': 'checks_read_timeout',
56
        'default': 5,
57
        'mandatory': 1,
58
        'description': 'Number  of seconds that the security checks will '
59
        'wait for when doing a recv()',
60
    },
61
    'drop_privileges': {
62
        'type': 'boolean',
63
        'name': 'drop_privileges',
64
        'default': 0,
65
        'mandatory': 1,
66
        'description': '',
67
    },
68
    'network_scan': {
69
        'type': 'boolean',
70
        'name': 'network_scan',
71
        'default': 0,
72
        'mandatory': 1,
73
        'description': '',
74
    },
75
    'non_simult_ports': {
76
        'type': 'string',
77
        'name': 'non_simult_ports',
78
        'default': '22',
79
        'mandatory': 1,
80
        'description': 'Prevent to make two connections on the same given '
81
        'ports at the same time.',
82
    },
83
    'open_sock_max_attempts': {
84
        'type': 'integer',
85
        'name': 'open_sock_max_attempts',
86
        'default': 5,
87
        'mandatory': 0,
88
        'description': 'Number of unsuccessful retries to open the socket '
89
        'before to set the port as closed.',
90
    },
91
    'timeout_retry': {
92
        'type': 'integer',
93
        'name': 'timeout_retry',
94
        'default': 5,
95
        'mandatory': 0,
96
        'description': 'Number of retries when a socket connection attempt '
97
        'timesout.',
98
    },
99
    'optimize_test': {
100
        'type': 'integer',
101
        'name': 'optimize_test',
102
        'default': 5,
103
        'mandatory': 0,
104
        'description': 'By default, openvas does not trust the remote '
105
        'host banners.',
106
    },
107
    'plugins_timeout': {
108
        'type': 'integer',
109
        'name': 'plugins_timeout',
110
        'default': 5,
111
        'mandatory': 0,
112
        'description': 'This is the maximum lifetime, in seconds of a plugin.',
113
    },
114
    'report_host_details': {
115
        'type': 'boolean',
116
        'name': 'report_host_details',
117
        'default': 1,
118
        'mandatory': 1,
119
        'description': '',
120
    },
121
    'safe_checks': {
122
        'type': 'boolean',
123
        'name': 'safe_checks',
124
        'default': 1,
125
        'mandatory': 1,
126
        'description': 'Disable the plugins with potential to crash '
127
        'the remote services',
128
    },
129
    'scanner_plugins_timeout': {
130
        'type': 'integer',
131
        'name': 'scanner_plugins_timeout',
132
        'default': 36000,
133
        'mandatory': 1,
134
        'description': 'Like plugins_timeout, but for ACT_SCANNER plugins.',
135
    },
136
    'time_between_request': {
137
        'type': 'integer',
138
        'name': 'time_between_request',
139
        'default': 0,
140
        'mandatory': 0,
141
        'description': 'Allow to set a wait time between two actions '
142
        '(open, send, close).',
143
    },
144
    'unscanned_closed': {
145
        'type': 'boolean',
146
        'name': 'unscanned_closed',
147
        'default': 1,
148
        'mandatory': 1,
149
        'description': '',
150
    },
151
    'unscanned_closed_udp': {
152
        'type': 'boolean',
153
        'name': 'unscanned_closed_udp',
154
        'default': 1,
155
        'mandatory': 1,
156
        'description': '',
157
    },
158
    'expand_vhosts': {
159
        'type': 'boolean',
160
        'name': 'expand_vhosts',
161
        'default': 1,
162
        'mandatory': 0,
163
        'description': 'Whether to expand the target hosts '
164
        + 'list of vhosts with values gathered from sources '
165
        + 'such as reverse-lookup queries and VT checks '
166
        + 'for SSL/TLS certificates.',
167
    },
168
    'test_empty_vhost': {
169
        'type': 'boolean',
170
        'name': 'test_empty_vhost',
171
        'default': 0,
172
        'mandatory': 0,
173
        'description': 'If  set  to  yes, the scanner will '
174
        + 'also test the target by using empty vhost value '
175
        + 'in addition to the targets associated vhost values.',
176
    },
177
}
178
179
180
@patch('ospd_openvas.db.OpenvasDB')
181
@patch('ospd_openvas.nvticache.NVTICache')
182
class TestOspdOpenvas(TestCase):
183
    @patch('ospd_openvas.daemon.subprocess')
184
    def test_redis_nvticache_init(self, mock_subproc, mock_nvti, mock_db):
185
        mock_subproc.check_call.return_value = True
186
        w = DummyDaemon(mock_nvti, mock_db)
187
        mock_subproc.reset_mock()
188
        w.redis_nvticache_init()
189
        self.assertEqual(mock_subproc.check_call.call_count, 1)
190
191
    @patch('ospd_openvas.daemon.subprocess')
192
    def test_parse_param(self, mock_subproc, mock_nvti, mock_db):
193
194
        mock_subproc.check_output.return_value = (
195
            'non_simult_ports = 22\nplugins_folder = /foo/bar'.encode()
196
        )
197
        w = DummyDaemon(mock_nvti, mock_db)
198
        w.parse_param()
199
        self.assertEqual(mock_subproc.check_output.call_count, 1)
200
        self.assertEqual(OSPD_PARAMS, OSPD_PARAMS_OUT)
201
        self.assertEqual(w.scan_only_params.get('plugins_folder'), '/foo/bar')
202
203
    @patch('ospd_openvas.daemon.subprocess')
204
    def test_sudo_available(self, mock_subproc, mock_nvti, mock_db):
205
        mock_subproc.check_call.return_value = 0
206
        w = DummyDaemon(mock_nvti, mock_db)
207
        w._sudo_available = None  # pylint: disable=protected-access
208
        w.sudo_available  # pylint: disable=pointless-statement
209
        self.assertTrue(w.sudo_available)
210
211
    def test_load_vts(self, mock_nvti, mock_db):
212
        w = DummyDaemon(mock_nvti, mock_db)
213
        w.load_vts()
214
        self.maxDiff = None
215
        self.assertIsInstance(w.vts, type(Manager().dict()))
216
        self.assertEqual(len(w.vts), len(w.VT))
217
218
    def test_get_custom_xml(self, mock_nvti, mock_db):
219
        out = (
220
            '<custom><required_ports>Services/www, 80</re'
221
            'quired_ports><category>3</category><'
222
            'excluded_keys>Settings/disable_cgi_s'
223
            'canning</excluded_keys><family>Produ'
224
            'ct detection</family><filename>manti'
225
            's_detect.nasl</filename><timeout>0</'
226
            'timeout></custom>'
227
        )
228
        w = DummyDaemon(mock_nvti, mock_db)
229
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
230
        res = w.get_custom_vt_as_xml_str(
231
            '1.3.6.1.4.1.25623.1.0.100061', vt.get('custom')
232
        )
233
        self.assertEqual(len(res), len(out))
234
235
    def test_get_custom_xml_failed(self, mock_nvti, mock_db):
236
        w = DummyDaemon(mock_nvti, mock_db)
237
        custom = {'a': u"\u0006"}
238
        logging.Logger.warning = Mock()
239
        w.get_custom_vt_as_xml_str(
240
            '1.3.6.1.4.1.25623.1.0.100061', custom=custom
241
        )
242
        if hasattr(Mock, 'assert_called_once'):
243
            logging.Logger.warning.assert_called_once()
244
245
    def test_get_severities_xml(self, mock_nvti, mock_db):
246
        w = DummyDaemon(mock_nvti, mock_db)
247
        out = (
248
            '<severities><severity type="cvss_base_v2">'
249
            'AV:N/AC:L/Au:N/C:N/I:N/A:N</severity></severities>'
250
        )
251
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
252
        severities = vt.get('severities')
253
        res = w.get_severities_vt_as_xml_str(
254
            '1.3.6.1.4.1.25623.1.0.100061', severities
255
        )
256
257
        self.assertEqual(res, out)
258
259
    def test_get_severities_xml_failed(self, mock_nvti, mock_db):
260
        w = DummyDaemon(mock_nvti, mock_db)
261
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
262
        sever = {'severity_base_vector': u"\u0006"}
263
        logging.Logger.warning = Mock()
264
        w.get_severities_vt_as_xml_str(
265
            '1.3.6.1.4.1.25623.1.0.100061', severities=sever
266
        )
267
        if hasattr(Mock, 'assert_called_once'):
268
            logging.Logger.warning.assert_called_once()
269
270
    def test_get_params_xml(self, mock_nvti, mock_db):
271
        w = DummyDaemon(mock_nvti, mock_db)
272
        out = (
273
            '<params><param type="checkbox" id="2"><name>Do '
274
            'not randomize the  order  in  which ports are scanned</name'
275
            '><default>no</default></param><param type="ent'
276
            'ry" id="1"><name>Data length :</name><'
277
            '/param></params>'
278
        )
279
280
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
281
        params = vt.get('vt_params')
282
        res = w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params)
283
        self.assertEqual(len(res), len(out))
284
285
    def test_get_params_xml_failed(self, mock_nvti, mock_db):
286
        w = DummyDaemon(mock_nvti, mock_db)
287
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
288
        params = {
289
            '1': {
290
                'id': '1',
291
                'type': 'entry',
292
                'default': u'\u0006',
293
                'name': 'dns-fuzz.timelimit',
294
                'description': 'Description',
295
            }
296
        }
297
        logging.Logger.warning = Mock()
298
        w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params)
299
        if hasattr(Mock, 'assert_called_once'):
300
            logging.Logger.warning.assert_called_once()
301
302
    def test_get_refs_xml(self, mock_nvti, mock_db):
303
        w = DummyDaemon(mock_nvti, mock_db)
304
        out = '<refs><ref type="url" id="http://www.mantisbt.org/"/>' '</refs>'
305
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
306
        refs = vt.get('vt_refs')
307
        res = w.get_refs_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', refs)
308
309
        self.assertEqual(res, out)
310
311
    def test_get_dependencies_xml(self, mock_nvti, mock_db):
312
        w = DummyDaemon(mock_nvti, mock_db)
313
        out = (
314
            '<dependencies><dependency vt_id="1.2.3.4"/><dependency vt'
315
            '_id="4.3.2.1"/></dependencies>'
316
        )
317
        dep = ['1.2.3.4', '4.3.2.1']
318
        res = w.get_dependencies_vt_as_xml_str(
319
            '1.3.6.1.4.1.25623.1.0.100061', dep
320
        )
321
322
        self.assertEqual(res, out)
323
324
    def test_get_dependencies_xml_failed(self, mock_nvti, mock_db):
325
        w = DummyDaemon(mock_nvti, mock_db)
326
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
327
        dep = [u"\u0006"]
328
        logging.Logger.error = Mock()
329
        w.get_dependencies_vt_as_xml_str(
330
            '1.3.6.1.4.1.25623.1.0.100061', vt_dependencies=dep
331
        )
332
        if hasattr(Mock, 'assert_called_once'):
333
            logging.Logger.error.assert_called_once()
334
335
    def test_get_ctime_xml(self, mock_nvti, mock_db):
336
        w = DummyDaemon(mock_nvti, mock_db)
337
        out = '<creation_time>1237458156</creation_time>'
338
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
339
        ctime = vt.get('creation_time')
340
        res = w.get_creation_time_vt_as_xml_str(
341
            '1.3.6.1.4.1.25623.1.0.100061', ctime
342
        )
343
344
        self.assertEqual(res, out)
345
346
    def test_get_ctime_xml_failed(self, mock_nvti, mock_db):
347
        w = DummyDaemon(mock_nvti, mock_db)
348
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
349
        ctime = u'\u0006'
350
        logging.Logger.warning = Mock()
351
        w.get_creation_time_vt_as_xml_str(
352
            '1.3.6.1.4.1.25623.1.0.100061', vt_creation_time=ctime
353
        )
354
        if hasattr(Mock, 'assert_called_onc'):
355
            logging.Logger.warning.assert_called_once()
356
357
    def test_get_mtime_xml(self, mock_nvti, mock_db):
358
        w = DummyDaemon(mock_nvti, mock_db)
359
        out = '<modification_time>1533906565</modification_time>'
360
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
361
        mtime = vt.get('modification_time')
362
        res = w.get_modification_time_vt_as_xml_str(
363
            '1.3.6.1.4.1.25623.1.0.100061', mtime
364
        )
365
366
        self.assertEqual(res, out)
367
368
    def test_get_mtime_xml_failed(self, mock_nvti, mock_db):
369
        w = DummyDaemon(mock_nvti, mock_db)
370
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
371
        mtime = u'\u0006'
372
        logging.Logger.warning = Mock()
373
        w.get_modification_time_vt_as_xml_str(
374
            '1.3.6.1.4.1.25623.1.0.100061', mtime
375
        )
376
        if hasattr(Mock, 'assert_called_once'):
377
            logging.Logger.warning.assert_called_once()
378
379
    def test_get_summary_xml(self, mock_nvti, mock_db):
380
        w = DummyDaemon(mock_nvti, mock_db)
381
        out = '<summary>some summary</summary>'
382
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
383
        summary = vt.get('summary')
384
        res = w.get_summary_vt_as_xml_str(
385
            '1.3.6.1.4.1.25623.1.0.100061', summary
386
        )
387
388
        self.assertEqual(res, out)
389
390
    def test_get_summary_xml_failed(self, mock_nvti, mock_db):
391
        w = DummyDaemon(mock_nvti, mock_db)
392
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
393
        summary = u'\u0006'
394
        logging.Logger.warning = Mock()
395
        w.get_summary_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', summary)
396
        if hasattr(Mock, 'assert_calledonce'):
397
            logging.Logger.warning.assert_called_once()
398
399
    def test_get_impact_xml(self, mock_nvti, mock_db):
400
        w = DummyDaemon(mock_nvti, mock_db)
401
        out = '<impact>some impact</impact>'
402
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
403
        impact = vt.get('impact')
404
        res = w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact)
405
406
        self.assertEqual(res, out)
407
408
    def test_get_impact_xml_failed(self, mock_nvti, mock_db):
409
        w = DummyDaemon(mock_nvti, mock_db)
410
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
411
        impact = u'\u0006'
412
        logging.Logger.warning = Mock()
413
        w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact)
414
        if hasattr(Mock, 'assert_called_once'):
415
            logging.Logger.warning.assert_called_once()
416
417
    def test_get_insight_xml(self, mock_nvti, mock_db):
418
        w = DummyDaemon(mock_nvti, mock_db)
419
        out = '<insight>some insight</insight>'
420
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
421
        insight = vt.get('insight')
422
        res = w.get_insight_vt_as_xml_str(
423
            '1.3.6.1.4.1.25623.1.0.100061', insight
424
        )
425
426
        self.assertEqual(res, out)
427
428
    def test_get_insight_xml_failed(self, mock_nvti, mock_db):
429
        w = DummyDaemon(mock_nvti, mock_db)
430
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
431
        insight = u'\u0006'
432
        logging.Logger.warning = Mock()
433
        w.get_insight_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', insight)
434
        if hasattr(Mock, 'assert_called_once'):
435
            logging.Logger.warning.assert_called_once()
436
437
    def test_get_solution_xml(self, mock_nvti, mock_db):
438
        w = DummyDaemon(mock_nvti, mock_db)
439
        out = '<solution type="WillNotFix" method="DebianAPTUpgrade">some solution</solution>'
440
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
441
        solution = vt.get('solution')
442
        solution_type = vt.get('solution_type')
443
        solution_method = vt.get('solution_method')
444
445
        res = w.get_solution_vt_as_xml_str(
446
            '1.3.6.1.4.1.25623.1.0.100061',
447
            solution,
448
            solution_type,
449
            solution_method,
450
        )
451
452
        self.assertEqual(res, out)
453
454
    def test_get_solution_xml_failed(self, mock_nvti, mock_db):
455
        w = DummyDaemon(mock_nvti, mock_db)
456
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
457
        solution = u'\u0006'
458
        logging.Logger.warning = Mock()
459
        w.get_solution_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', solution)
460
        if hasattr(Mock, 'assert_called_once'):
461
            logging.Logger.warning.assert_called_once()
462
463
    def test_get_detection_xml(self, mock_nvti, mock_db):
464
        w = DummyDaemon(mock_nvti, mock_db)
465
        out = '<detection qod_type="remote_banner"/>'
466
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
467
        detection_type = vt.get('qod_type')
468
469
        res = w.get_detection_vt_as_xml_str(
470
            '1.3.6.1.4.1.25623.1.0.100061', qod_type=detection_type
471
        )
472
473
        self.assertEqual(res, out)
474
475
    def test_get_detection_xml_failed(self, mock_nvti, mock_db):
476
        w = DummyDaemon(mock_nvti, mock_db)
477
        detection = u'\u0006'
478
        logging.Logger.warning = Mock()
479
        w.get_detection_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', detection)
480
        if hasattr(Mock, 'assert_called_once'):
481
            logging.Logger.warning.assert_called_once()
482
483
    def test_get_affected_xml(self, mock_nvti, mock_db):
484
        w = DummyDaemon(mock_nvti, mock_db)
485
        out = '<affected>some affection</affected>'
486
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
487
        affected = vt.get('affected')
488
489
        res = w.get_affected_vt_as_xml_str(
490
            '1.3.6.1.4.1.25623.1.0.100061', affected=affected
491
        )
492
493
        self.assertEqual(res, out)
494
495
    def test_get_affected_xml_failed(self, mock_nvti, mock_db):
496
        w = DummyDaemon(mock_nvti, mock_db)
497
        affected = u"\u0006" + "affected"
498
        logging.Logger.warning = Mock()
499
        w.get_affected_vt_as_xml_str(
500
            '1.3.6.1.4.1.25623.1.0.100061', affected=affected
501
        )
502
        if hasattr(Mock, 'assert_called_once'):
503
            logging.Logger.warning.assert_called_once()
504
505
    def test_build_credentials(self, mock_nvti, mock_db):
506
        w = DummyDaemon(mock_nvti, mock_db)
507
508
        cred_out = [
509
            '1.3.6.1.4.1.25623.1.0.105058:1:entry:ESXi login name:|||username',
510
            '1.3.6.1.4.1.25623.1.0.105058:2:password:ESXi login password:|||pass',
511
            'auth_port_ssh|||22',
512
            '1.3.6.1.4.1.25623.1.0.103591:1:entry:SSH login name:|||username',
513
            '1.3.6.1.4.1.25623.1.0.103591:2:password:SSH key passphrase:|||pass',
514
            '1.3.6.1.4.1.25623.1.0.103591:4:file:SSH private key:|||',
515
            '1.3.6.1.4.1.25623.1.0.90023:1:entry:SMB login:|||username',
516
            '1.3.6.1.4.1.25623.1.0.90023:2:password]:SMB password :|||pass',
517
            '1.3.6.1.4.1.25623.1.0.105076:1:password:SNMP Community:some comunity',
518
            '1.3.6.1.4.1.25623.1.0.105076:2:entry:SNMPv3 Username:username',
519
            '1.3.6.1.4.1.25623.1.0.105076:3:password:SNMPv3 Password:pass',
520
            '1.3.6.1.4.1.25623.1.0.105076:4:radio:SNMPv3 Authentication Algorithm:some auth algo',
521
            '1.3.6.1.4.1.25623.1.0.105076:5:password:SNMPv3 Privacy Password:privacy pass',
522
            '1.3.6.1.4.1.25623.1.0.105076:6:radio:SNMPv3 Privacy Algorithm:privacy algo',
523
        ]
524
        cred_dict = {
525
            'ssh': {
526
                'type': 'ssh',
527
                'port': '22',
528
                'username': 'username',
529
                'password': 'pass',
530
            },
531
            'smb': {'type': 'smb', 'username': 'username', 'password': 'pass'},
532
            'esxi': {
533
                'type': 'esxi',
534
                'username': 'username',
535
                'password': 'pass',
536
            },
537
            'snmp': {
538
                'type': 'snmp',
539
                'username': 'username',
540
                'password': 'pass',
541
                'community': 'some comunity',
542
                'auth_algorithm': 'some auth algo',
543
                'privacy_password': 'privacy pass',
544
                'privacy_algorithm': 'privacy algo',
545
            },
546
        }
547
        self.maxDiff = None
548
        ret = w.build_credentials_as_prefs(cred_dict)
549
        self.assertEqual(len(ret), len(cred_out))
550
        self.assertIn('auth_port_ssh|||22', cred_out)
551
        self.assertIn(
552
            '1.3.6.1.4.1.25623.1.0.90023:1:entry:SMB login:|||username',
553
            cred_out,
554
        )
555
556
    def test_build_credentials_ssh_up(self, mock_nvti, mock_db):
557
        w = DummyDaemon(mock_nvti, mock_db)
558
        cred_out = [
559
            'auth_port_ssh|||22',
560
            '1.3.6.1.4.1.25623.1.0.103591:1:entry:SSH login name:|||username',
561
            '1.3.6.1.4.1.25623.1.0.103591:3:password:SSH password (unsafe!):|||pass',
562
        ]
563
        cred_dict = {
564
            'ssh': {
565
                'type': 'up',
566
                'port': '22',
567
                'username': 'username',
568
                'password': 'pass',
569
            }
570
        }
571
        self.maxDiff = None
572
        ret = w.build_credentials_as_prefs(cred_dict)
573
        self.assertEqual(ret, cred_out)
574
575
    def test_build_alive_test_opt_empty(self, mock_nvti, mock_db):
576
        w = DummyDaemon(mock_nvti, mock_db)
577
        target_options_dict = {'alive_test': '0'}
578
579
        self.maxDiff = None
580
        ret = w.build_alive_test_opt_as_prefs(target_options_dict)
581
        self.assertEqual(ret, [])
582
583
    def test_build_alive_test_opt(self, mock_nvti, mock_db):
584
        w = DummyDaemon(mock_nvti, mock_db)
585
        alive_test_out = [
586
            "1.3.6.1.4.1.25623.1.0.100315:1:checkbox:Do a TCP ping|||no",
587
            "1.3.6.1.4.1.25623.1.0.100315:2:checkbox:TCP ping tries also TCP-SYN ping|||no",
588
            "1.3.6.1.4.1.25623.1.0.100315:7:checkbox:TCP ping tries only TCP-SYN ping|||no",
589
            "1.3.6.1.4.1.25623.1.0.100315:3:checkbox:Do an ICMP ping|||yes",
590
            "1.3.6.1.4.1.25623.1.0.100315:4:checkbox:Use ARP|||no",
591
            "1.3.6.1.4.1.25623.1.0.100315:5:checkbox:Mark unrechable Hosts as dead (not scanning)|||yes",
592
        ]
593
        target_options_dict = {'alive_test': '2'}
594
595
        self.maxDiff = None
596
        ret = w.build_alive_test_opt_as_prefs(target_options_dict)
597
        self.assertEqual(ret, alive_test_out)
598
599
    def test_build_alive_test_opt_fail_1(self, mock_nvti, mock_db):
600
        w = DummyDaemon(mock_nvti, mock_db)
601
        target_options_dict = {'alive_test': 'a'}
602
603
        self.maxDiff = None
604
        logging.Logger.debug = Mock()
605
        ret = w.build_alive_test_opt_as_prefs(target_options_dict)
606
        if hasattr(Mock, 'assert_called_once'):
607
            logging.Logger.debug.assert_called_once()
608
609
    def test_process_vts(self, mock_nvti, mock_db):
610
        vts = {
611
            '1.3.6.1.4.1.25623.1.0.100061': {'1': 'new value'},
612
            'vt_groups': ['family=debian', 'family=general'],
613
        }
614
        vt_out = (
615
            ['1.3.6.1.4.1.25623.1.0.100061'],
616
            {'1.3.6.1.4.1.25623.1.0.100061:1:entry:Data length :': 'new value'},
617
        )
618
619
        w = DummyDaemon(mock_nvti, mock_db)
620
        w.load_vts()
621
        w.temp_vts_dict = w.vts
622
        ret = w.process_vts(vts)
623
        self.assertEqual(ret, vt_out)
624
625
    def test_process_vts_bad_param_id(self, mock_nvti, mock_db):
626
        vts = {
627
            '1.3.6.1.4.1.25623.1.0.100061': {'3': 'new value'},
628
            'vt_groups': ['family=debian', 'family=general'],
629
        }
630
        w = DummyDaemon(mock_nvti, mock_db)
631
        w.load_vts()
632
        w.temp_vts_dict = w.vts
633
        ret = w.process_vts(vts)
634
        self.assertFalse(ret[1])
635
636
    def test_process_vts_not_found(self, mock_nvti, mock_db):
637
        vts = {
638
            '1.3.6.1.4.1.25623.1.0.100065': {'3': 'new value'},
639
            'vt_groups': ['family=debian', 'family=general'],
640
        }
641
        w = DummyDaemon(mock_nvti, mock_db)
642
        w.load_vts()
643
        w.temp_vts_dict = w.vts
644
        logging.Logger.warning = Mock()
645
        ret = w.process_vts(vts)
646
        if hasattr(Mock, 'assert_called_once'):
647
            logging.Logger.warning.assert_called_once()
648
649 View Code Duplication
    def test_get_openvas_timestamp_scan_host_end(self, mock_nvti, mock_db):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
650
        mock_db.get_host_scan_scan_end_time.return_value = '12345'
651
        w = DummyDaemon(mock_nvti, mock_db)
652
653
        target_list = w.create_xml_target()
654
        targets = w.process_targets_element(target_list)
655
656
        w.create_scan('123-456', targets, None, [])
657
        w.get_openvas_timestamp_scan_host('123-456', '192.168.0.1')
658
        for result in w.scan_collection.results_iterator('123-456', False):
659
            self.assertEqual(result.get('value'), '12345')
660
661 View Code Duplication
    def test_get_openvas_timestamp_scan_host_start(self, mock_nvti, mock_db):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
662
        mock_db.get_host_scan_scan_end_time.return_value = None
663
        mock_db.get_host_scan_scan_end_time.return_value = '54321'
664
        w = DummyDaemon(mock_nvti, mock_db)
665
666
        target_list = w.create_xml_target()
667
        targets = w.process_targets_element(target_list)
668
669
        w.create_scan('123-456', targets, None, [])
670
        w.get_openvas_timestamp_scan_host('123-456', '192.168.0.1')
671
        for result in w.scan_collection.results_iterator('123-456', False):
672
            self.assertEqual(result.get('value'), '54321')
673
674
    def test_host_is_finished(self, mock_nvti, mock_db):
675
        mock_db.get_single_item.return_value = 'finished'
676
        w = DummyDaemon(mock_nvti, mock_db)
677
        ret = w.host_is_finished('123-456')
678
        self.assertEqual(ret, True)
679
680
    def test_scan_is_stopped(self, mock_nvti, mock_db):
681
        mock_db.get_single_item.return_value = 'stop_all'
682
        mock_db.kb_connect_item.return_value = mock_db
683
        mock_db.set_redisctx.return_value = None
684
        w = DummyDaemon(mock_nvti, mock_db)
685
        ret = w.scan_is_stopped('123-456')
686
        self.assertEqual(ret, True)
687
688
    @patch('ospd_openvas.daemon.open')
689
    def test_feed_is_outdated_none(self, mock_open, mock_nvti, mock_db):
690
        w = DummyDaemon(mock_nvti, mock_db)
691
        # Mock parse_param, because feed_is_oudated() will call it.
692
        with patch.object(w, 'parse_param', return_value=None):
693
            # Return None
694
            w.scan_only_params['plugins_folder'] = '/foo/bar'
695
            ret = w.feed_is_outdated('1234')
696
            self.assertIsNone(ret)
697
698
    def test_feed_is_outdated_true(self, mock_nvti, mock_db):
699
        w = DummyDaemon(mock_nvti, mock_db)
700
        # Mock parse_param, because feed_is_oudated() will call it.
701
        with patch.object(w, 'parse_param', return_value=None):
702
            with patch.object(Path, 'exists', return_value=True):
703
                read_data = 'PLUGIN_SET = "1235";'
704
                with patch(
705
                    "builtins.open", return_value=io.StringIO(read_data)
706
                ):
707
                    # Return True
708
                    w.scan_only_params['plugins_folder'] = '/foo/bar'
709
                    ret = w.feed_is_outdated('1234')
710
                    self.assertTrue(ret)
711
712
    def test_feed_is_outdated_false(self, mock_nvti, mock_db):
713
        w = DummyDaemon(mock_nvti, mock_db)
714
        # Mock parse_param, because feed_is_oudated() will call it.
715
        with patch.object(w, 'parse_param', return_value=None):
716
            read_data = 'PLUGIN_SET = "1234";'
717
            with patch.object(Path, 'exists', return_value=True):
718
                read_data = 'PLUGIN_SET = "1234"'
719
                with patch(
720
                    "builtins.open", return_value=io.StringIO(read_data)
721
                ):
722
                    # Return True
723
                    w.scan_only_params['plugins_folder'] = '/foo/bar'
724
                    ret = w.feed_is_outdated('1234')
725
                    self.assertFalse(ret)
726
727
    @patch('ospd_openvas.daemon.OSPDaemon.add_scan_log')
728
    def test_get_openvas_result(self, mock_ospd, mock_nvti, mock_db):
729
        results = ["LOG||| |||general/Host_Details||| |||Host dead", None]
730
        mock_db.get_result.side_effect = results
731
        w = DummyDaemon(mock_nvti, mock_db)
732
        w.load_vts()
733
        mock_ospd.return_value = None
734
        w.get_openvas_result('123-456', 'localhost')
735
        mock_ospd.assert_called_with(
736
            '123-456',
737
            host='localhost',
738
            hostname='',
739
            name='',
740
            port='general/Host_Details',
741
            qod='',
742
            test_id='',
743
            value='Host dead',
744
        )
745
746
    @patch('ospd_openvas.daemon.OSPDaemon.add_scan_log')
747
    def test_get_openvas_result_escaped(self, mock_ospd, mock_nvti, mock_db):
748
        results = [
749
            "LOG||| |||general/Host_Details|||1.3.6.1.4.1.25623.1.0.100061|||Alive",
750
            None,
751
        ]
752
        mock_db.get_result.side_effect = results
753
        w = DummyDaemon(mock_nvti, mock_db)
754
        w.load_vts()
755
        mock_ospd.return_value = None
756
        mock_nvti.QOD_TYPES.__getitem__.return_value = ''
757
        w.get_openvas_result('123-456', 'localhost')
758
759
        mock_ospd.assert_called_with(
760
            '123-456',
761
            host='localhost',
762
            hostname='',
763
            name='Mantis Detection &amp; foo',
764
            port='general/Host_Details',
765
            qod='',
766
            test_id='1.3.6.1.4.1.25623.1.0.100061',
767
            value='Alive',
768
        )
769
770
    @patch('ospd_openvas.daemon.OSPDaemon.set_scan_host_progress')
771
    def test_update_progress(self, mock_ospd, mock_nvti, mock_db):
772
        msg = '0/-1'
773
        w = DummyDaemon(mock_nvti, mock_db)
774
        target_list = w.create_xml_target()
775
        targets = w.process_targets_element(target_list)
776
777
        w.create_scan('123-456', targets, None, [])
778
779
        mock_ospd.return_value = None
780
        w.update_progress('123-456', 'localhost', 'localhost', msg)
781
        mock_ospd.assert_called_with('123-456', 'localhost', 'localhost', 100)
782
783
784
class TestFilters(TestCase):
785
    def test_format_vt_modification_time(self):
786
        ovformat = OpenVasVtsFilter()
787
        td = '1517443741'
788
        formatted = ovformat.format_vt_modification_time(td)
789
        self.assertEqual(formatted, "20180201000901")
790