Passed
Pull Request — master (#195)
by Juan José
01:25
created

TestOspdOpenvas.test_feed_is_healthy_false()   A

Complexity

Conditions 1

Size

Total Lines 11
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 8
nop 3
dl 0
loc 11
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018-2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-2.0-or-later
5
#
6
# This program is free software; you can redistribute it and/or
7
# modify it under the terms of the GNU General Public License
8
# as published by the Free Software Foundation; either version 2
9
# of the License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
20
# pylint: disable=invalid-name,line-too-long
21
22
""" Unit Test for ospd-openvas """
23
24
from pathlib import Path
25
26
from unittest import TestCase
27
from unittest.mock import patch, Mock, MagicMock
28
29
from ospd.vts import Vts
30
31
import io
32
import logging
33
34
from tests.dummydaemon import DummyDaemon
35
36
from ospd_openvas.daemon import OSPD_PARAMS, OpenVasVtsFilter
37
from ospd_openvas.openvas import Openvas
38
39
OSPD_PARAMS_OUT = {
40
    'auto_enable_dependencies': {
41
        'type': 'boolean',
42
        'name': 'auto_enable_dependencies',
43
        'default': 1,
44
        'mandatory': 1,
45
        'description': 'Automatically enable the plugins that are depended on',
46
    },
47
    'cgi_path': {
48
        'type': 'string',
49
        'name': 'cgi_path',
50
        'default': '/cgi-bin:/scripts',
51
        'mandatory': 1,
52
        'description': 'Look for default CGIs in /cgi-bin and /scripts',
53
    },
54
    'checks_read_timeout': {
55
        'type': 'integer',
56
        'name': 'checks_read_timeout',
57
        'default': 5,
58
        'mandatory': 1,
59
        'description': 'Number  of seconds that the security checks will '
60
        'wait for when doing a recv()',
61
    },
62
    'drop_privileges': {
63
        'type': 'boolean',
64
        'name': 'drop_privileges',
65
        'default': 0,
66
        'mandatory': 1,
67
        'description': '',
68
    },
69
    'network_scan': {
70
        'type': 'boolean',
71
        'name': 'network_scan',
72
        'default': 0,
73
        'mandatory': 1,
74
        'description': '',
75
    },
76
    'non_simult_ports': {
77
        'type': 'string',
78
        'name': 'non_simult_ports',
79
        'default': '22',
80
        'mandatory': 1,
81
        'description': 'Prevent to make two connections on the same given '
82
        'ports at the same time.',
83
    },
84
    'open_sock_max_attempts': {
85
        'type': 'integer',
86
        'name': 'open_sock_max_attempts',
87
        'default': 5,
88
        'mandatory': 0,
89
        'description': 'Number of unsuccessful retries to open the socket '
90
        'before to set the port as closed.',
91
    },
92
    'timeout_retry': {
93
        'type': 'integer',
94
        'name': 'timeout_retry',
95
        'default': 5,
96
        'mandatory': 0,
97
        'description': 'Number of retries when a socket connection attempt '
98
        'timesout.',
99
    },
100
    'optimize_test': {
101
        'type': 'integer',
102
        'name': 'optimize_test',
103
        'default': 5,
104
        'mandatory': 0,
105
        'description': 'By default, openvas does not trust the remote '
106
        'host banners.',
107
    },
108
    'plugins_timeout': {
109
        'type': 'integer',
110
        'name': 'plugins_timeout',
111
        'default': 5,
112
        'mandatory': 0,
113
        'description': 'This is the maximum lifetime, in seconds of a plugin.',
114
    },
115
    'report_host_details': {
116
        'type': 'boolean',
117
        'name': 'report_host_details',
118
        'default': 1,
119
        'mandatory': 1,
120
        'description': '',
121
    },
122
    'safe_checks': {
123
        'type': 'boolean',
124
        'name': 'safe_checks',
125
        'default': 1,
126
        'mandatory': 1,
127
        'description': 'Disable the plugins with potential to crash '
128
        'the remote services',
129
    },
130
    'scanner_plugins_timeout': {
131
        'type': 'integer',
132
        'name': 'scanner_plugins_timeout',
133
        'default': 36000,
134
        'mandatory': 1,
135
        'description': 'Like plugins_timeout, but for ACT_SCANNER plugins.',
136
    },
137
    'time_between_request': {
138
        'type': 'integer',
139
        'name': 'time_between_request',
140
        'default': 0,
141
        'mandatory': 0,
142
        'description': 'Allow to set a wait time between two actions '
143
        '(open, send, close).',
144
    },
145
    'unscanned_closed': {
146
        'type': 'boolean',
147
        'name': 'unscanned_closed',
148
        'default': 1,
149
        'mandatory': 1,
150
        'description': '',
151
    },
152
    'unscanned_closed_udp': {
153
        'type': 'boolean',
154
        'name': 'unscanned_closed_udp',
155
        'default': 1,
156
        'mandatory': 1,
157
        'description': '',
158
    },
159
    'expand_vhosts': {
160
        'type': 'boolean',
161
        'name': 'expand_vhosts',
162
        'default': 1,
163
        'mandatory': 0,
164
        'description': 'Whether to expand the target hosts '
165
        + 'list of vhosts with values gathered from sources '
166
        + 'such as reverse-lookup queries and VT checks '
167
        + 'for SSL/TLS certificates.',
168
    },
169
    'test_empty_vhost': {
170
        'type': 'boolean',
171
        'name': 'test_empty_vhost',
172
        'default': 0,
173
        'mandatory': 0,
174
        'description': 'If  set  to  yes, the scanner will '
175
        + 'also test the target by using empty vhost value '
176
        + 'in addition to the targets associated vhost values.',
177
    },
178
}
179
180
181
@patch('ospd_openvas.db.OpenvasDB')
182
@patch('ospd_openvas.nvticache.NVTICache')
183
class TestOspdOpenvas(TestCase):
184
    @patch('ospd_openvas.daemon.Openvas')
185
    def test_set_params_from_openvas_settings(
186
        self, mock_openvas: Openvas, mock_nvti: MagicMock, mock_db: MagicMock
187
    ):
188
        mock_openvas.get_settings.return_value = {
189
            'non_simult_ports': '22',
190
            'plugins_folder': '/foo/bar',
191
        }
192
        w = DummyDaemon(mock_nvti, mock_db)
193
        w.set_params_from_openvas_settings()
194
195
        self.assertEqual(mock_openvas.get_settings.call_count, 1)
196
        self.assertEqual(OSPD_PARAMS, OSPD_PARAMS_OUT)
197
        self.assertEqual(w.scan_only_params.get('plugins_folder'), '/foo/bar')
198
199
    @patch('ospd_openvas.daemon.Openvas')
200
    def test_sudo_available(self, mock_openvas, mock_nvti, mock_db):
201
        mock_openvas.check_sudo.return_value = True
202
203
        w = DummyDaemon(mock_nvti, mock_db)
204
        w._sudo_available = None  # pylint: disable=protected-access
205
        w.sudo_available  # pylint: disable=pointless-statement
206
207
        self.assertTrue(w.sudo_available)
208
209
    def test_load_vts(self, mock_nvti, mock_db):
210
        w = DummyDaemon(mock_nvti, mock_db)
211
        w.load_vts()
212
        self.maxDiff = None
213
        self.assertIsInstance(w.vts, type(Vts()))
214
        self.assertEqual(len(w.vts), len(w.VT))
215
216
    def test_get_custom_xml(self, mock_nvti, mock_db):
217
        out = (
218
            '<custom><required_ports>Services/www, 80</re'
219
            'quired_ports><category>3</category><'
220
            'excluded_keys>Settings/disable_cgi_s'
221
            'canning</excluded_keys><family>Produ'
222
            'ct detection</family><filename>manti'
223
            's_detect.nasl</filename><timeout>0</'
224
            'timeout></custom>'
225
        )
226
        w = DummyDaemon(mock_nvti, mock_db)
227
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
228
        res = w.get_custom_vt_as_xml_str(
229
            '1.3.6.1.4.1.25623.1.0.100061', vt.get('custom')
230
        )
231
        self.assertEqual(len(res), len(out))
232
233
    def test_get_custom_xml_failed(self, mock_nvti, mock_db):
234
        w = DummyDaemon(mock_nvti, mock_db)
235
        custom = {'a': u"\u0006"}
236
        logging.Logger.warning = Mock()
237
        w.get_custom_vt_as_xml_str(
238
            '1.3.6.1.4.1.25623.1.0.100061', custom=custom
239
        )
240
        if hasattr(Mock, 'assert_called_once'):
241
            logging.Logger.warning.assert_called_once()
242
243
    def test_get_severities_xml(self, mock_nvti, mock_db):
244
        w = DummyDaemon(mock_nvti, mock_db)
245
        out = (
246
            '<severities><severity type="cvss_base_v2">'
247
            'AV:N/AC:L/Au:N/C:N/I:N/A:N</severity></severities>'
248
        )
249
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
250
        severities = vt.get('severities')
251
        res = w.get_severities_vt_as_xml_str(
252
            '1.3.6.1.4.1.25623.1.0.100061', severities
253
        )
254
255
        self.assertEqual(res, out)
256
257
    def test_get_severities_xml_failed(self, mock_nvti, mock_db):
258
        w = DummyDaemon(mock_nvti, mock_db)
259
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
260
        sever = {'severity_base_vector': u"\u0006"}
261
        logging.Logger.warning = Mock()
262
        w.get_severities_vt_as_xml_str(
263
            '1.3.6.1.4.1.25623.1.0.100061', severities=sever
264
        )
265
        if hasattr(Mock, 'assert_called_once'):
266
            logging.Logger.warning.assert_called_once()
267
268
    def test_get_params_xml(self, mock_nvti, mock_db):
269
        w = DummyDaemon(mock_nvti, mock_db)
270
        out = (
271
            '<params><param type="checkbox" id="2"><name>Do '
272
            'not randomize the  order  in  which ports are scanned</name'
273
            '><default>no</default></param><param type="ent'
274
            'ry" id="1"><name>Data length :</name><'
275
            '/param></params>'
276
        )
277
278
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
279
        params = vt.get('vt_params')
280
        res = w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params)
281
        self.assertEqual(len(res), len(out))
282
283
    def test_get_params_xml_failed(self, mock_nvti, mock_db):
284
        w = DummyDaemon(mock_nvti, mock_db)
285
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
286
        params = {
287
            '1': {
288
                'id': '1',
289
                'type': 'entry',
290
                'default': u'\u0006',
291
                'name': 'dns-fuzz.timelimit',
292
                'description': 'Description',
293
            }
294
        }
295
        logging.Logger.warning = Mock()
296
        w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params)
297
        if hasattr(Mock, 'assert_called_once'):
298
            logging.Logger.warning.assert_called_once()
299
300
    def test_get_refs_xml(self, mock_nvti, mock_db):
301
        w = DummyDaemon(mock_nvti, mock_db)
302
        out = '<refs><ref type="url" id="http://www.mantisbt.org/"/>' '</refs>'
303
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
304
        refs = vt.get('vt_refs')
305
        res = w.get_refs_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', refs)
306
307
        self.assertEqual(res, out)
308
309
    def test_get_dependencies_xml(self, mock_nvti, mock_db):
310
        w = DummyDaemon(mock_nvti, mock_db)
311
        out = (
312
            '<dependencies><dependency vt_id="1.2.3.4"/><dependency vt'
313
            '_id="4.3.2.1"/></dependencies>'
314
        )
315
        dep = ['1.2.3.4', '4.3.2.1']
316
        res = w.get_dependencies_vt_as_xml_str(
317
            '1.3.6.1.4.1.25623.1.0.100061', dep
318
        )
319
320
        self.assertEqual(res, out)
321
322
    def test_get_dependencies_xml_failed(self, mock_nvti, mock_db):
323
        w = DummyDaemon(mock_nvti, mock_db)
324
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
325
        dep = [u"\u0006"]
326
        logging.Logger.error = Mock()
327
        w.get_dependencies_vt_as_xml_str(
328
            '1.3.6.1.4.1.25623.1.0.100061', vt_dependencies=dep
329
        )
330
        if hasattr(Mock, 'assert_called_once'):
331
            logging.Logger.error.assert_called_once()
332
333
    def test_get_ctime_xml(self, mock_nvti, mock_db):
334
        w = DummyDaemon(mock_nvti, mock_db)
335
        out = '<creation_time>1237458156</creation_time>'
336
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
337
        ctime = vt.get('creation_time')
338
        res = w.get_creation_time_vt_as_xml_str(
339
            '1.3.6.1.4.1.25623.1.0.100061', ctime
340
        )
341
342
        self.assertEqual(res, out)
343
344
    def test_get_ctime_xml_failed(self, mock_nvti, mock_db):
345
        w = DummyDaemon(mock_nvti, mock_db)
346
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
347
        ctime = u'\u0006'
348
        logging.Logger.warning = Mock()
349
        w.get_creation_time_vt_as_xml_str(
350
            '1.3.6.1.4.1.25623.1.0.100061', vt_creation_time=ctime
351
        )
352
        if hasattr(Mock, 'assert_called_onc'):
353
            logging.Logger.warning.assert_called_once()
354
355
    def test_get_mtime_xml(self, mock_nvti, mock_db):
356
        w = DummyDaemon(mock_nvti, mock_db)
357
        out = '<modification_time>1533906565</modification_time>'
358
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
359
        mtime = vt.get('modification_time')
360
        res = w.get_modification_time_vt_as_xml_str(
361
            '1.3.6.1.4.1.25623.1.0.100061', mtime
362
        )
363
364
        self.assertEqual(res, out)
365
366
    def test_get_mtime_xml_failed(self, mock_nvti, mock_db):
367
        w = DummyDaemon(mock_nvti, mock_db)
368
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
369
        mtime = u'\u0006'
370
        logging.Logger.warning = Mock()
371
        w.get_modification_time_vt_as_xml_str(
372
            '1.3.6.1.4.1.25623.1.0.100061', mtime
373
        )
374
        if hasattr(Mock, 'assert_called_once'):
375
            logging.Logger.warning.assert_called_once()
376
377
    def test_get_summary_xml(self, mock_nvti, mock_db):
378
        w = DummyDaemon(mock_nvti, mock_db)
379
        out = '<summary>some summary</summary>'
380
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
381
        summary = vt.get('summary')
382
        res = w.get_summary_vt_as_xml_str(
383
            '1.3.6.1.4.1.25623.1.0.100061', summary
384
        )
385
386
        self.assertEqual(res, out)
387
388
    def test_get_summary_xml_failed(self, mock_nvti, mock_db):
389
        w = DummyDaemon(mock_nvti, mock_db)
390
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
391
        summary = u'\u0006'
392
        logging.Logger.warning = Mock()
393
        w.get_summary_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', summary)
394
        if hasattr(Mock, 'assert_calledonce'):
395
            logging.Logger.warning.assert_called_once()
396
397
    def test_get_impact_xml(self, mock_nvti, mock_db):
398
        w = DummyDaemon(mock_nvti, mock_db)
399
        out = '<impact>some impact</impact>'
400
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
401
        impact = vt.get('impact')
402
        res = w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact)
403
404
        self.assertEqual(res, out)
405
406
    def test_get_impact_xml_failed(self, mock_nvti, mock_db):
407
        w = DummyDaemon(mock_nvti, mock_db)
408
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
409
        impact = u'\u0006'
410
        logging.Logger.warning = Mock()
411
        w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact)
412
        if hasattr(Mock, 'assert_called_once'):
413
            logging.Logger.warning.assert_called_once()
414
415
    def test_get_insight_xml(self, mock_nvti, mock_db):
416
        w = DummyDaemon(mock_nvti, mock_db)
417
        out = '<insight>some insight</insight>'
418
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
419
        insight = vt.get('insight')
420
        res = w.get_insight_vt_as_xml_str(
421
            '1.3.6.1.4.1.25623.1.0.100061', insight
422
        )
423
424
        self.assertEqual(res, out)
425
426
    def test_get_insight_xml_failed(self, mock_nvti, mock_db):
427
        w = DummyDaemon(mock_nvti, mock_db)
428
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
429
        insight = u'\u0006'
430
        logging.Logger.warning = Mock()
431
        w.get_insight_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', insight)
432
        if hasattr(Mock, 'assert_called_once'):
433
            logging.Logger.warning.assert_called_once()
434
435
    def test_get_solution_xml(self, mock_nvti, mock_db):
436
        w = DummyDaemon(mock_nvti, mock_db)
437
        out = '<solution type="WillNotFix" method="DebianAPTUpgrade">some solution</solution>'
438
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
439
        solution = vt.get('solution')
440
        solution_type = vt.get('solution_type')
441
        solution_method = vt.get('solution_method')
442
443
        res = w.get_solution_vt_as_xml_str(
444
            '1.3.6.1.4.1.25623.1.0.100061',
445
            solution,
446
            solution_type,
447
            solution_method,
448
        )
449
450
        self.assertEqual(res, out)
451
452
    def test_get_solution_xml_failed(self, mock_nvti, mock_db):
453
        w = DummyDaemon(mock_nvti, mock_db)
454
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
455
        solution = u'\u0006'
456
        logging.Logger.warning = Mock()
457
        w.get_solution_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', solution)
458
        if hasattr(Mock, 'assert_called_once'):
459
            logging.Logger.warning.assert_called_once()
460
461
    def test_get_detection_xml(self, mock_nvti, mock_db):
462
        w = DummyDaemon(mock_nvti, mock_db)
463
        out = '<detection qod_type="remote_banner"/>'
464
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
465
        detection_type = vt.get('qod_type')
466
467
        res = w.get_detection_vt_as_xml_str(
468
            '1.3.6.1.4.1.25623.1.0.100061', qod_type=detection_type
469
        )
470
471
        self.assertEqual(res, out)
472
473
    def test_get_detection_xml_failed(self, mock_nvti, mock_db):
474
        w = DummyDaemon(mock_nvti, mock_db)
475
        detection = u'\u0006'
476
        logging.Logger.warning = Mock()
477
        w.get_detection_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', detection)
478
        if hasattr(Mock, 'assert_called_once'):
479
            logging.Logger.warning.assert_called_once()
480
481
    def test_get_affected_xml(self, mock_nvti, mock_db):
482
        w = DummyDaemon(mock_nvti, mock_db)
483
        out = '<affected>some affection</affected>'
484
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
485
        affected = vt.get('affected')
486
487
        res = w.get_affected_vt_as_xml_str(
488
            '1.3.6.1.4.1.25623.1.0.100061', affected=affected
489
        )
490
491
        self.assertEqual(res, out)
492
493
    def test_get_affected_xml_failed(self, mock_nvti, mock_db):
494
        w = DummyDaemon(mock_nvti, mock_db)
495
        affected = u"\u0006" + "affected"
496
        logging.Logger.warning = Mock()
497
        w.get_affected_vt_as_xml_str(
498
            '1.3.6.1.4.1.25623.1.0.100061', affected=affected
499
        )
500
        if hasattr(Mock, 'assert_called_once'):
501
            logging.Logger.warning.assert_called_once()
502
503
    def test_build_credentials(self, mock_nvti, mock_db):
504
        w = DummyDaemon(mock_nvti, mock_db)
505
506
        cred_out = [
507
            '1.3.6.1.4.1.25623.1.0.105058:1:entry:ESXi login name:|||username',
508
            '1.3.6.1.4.1.25623.1.0.105058:2:password:ESXi login password:|||pass',
509
            'auth_port_ssh|||22',
510
            '1.3.6.1.4.1.25623.1.0.103591:1:entry:SSH login name:|||username',
511
            '1.3.6.1.4.1.25623.1.0.103591:2:password:SSH key passphrase:|||pass',
512
            '1.3.6.1.4.1.25623.1.0.103591:4:file:SSH private key:|||',
513
            '1.3.6.1.4.1.25623.1.0.90023:1:entry:SMB login:|||username',
514
            '1.3.6.1.4.1.25623.1.0.90023:2:password]:SMB password :|||pass',
515
            '1.3.6.1.4.1.25623.1.0.105076:1:password:SNMP Community:some comunity',
516
            '1.3.6.1.4.1.25623.1.0.105076:2:entry:SNMPv3 Username:username',
517
            '1.3.6.1.4.1.25623.1.0.105076:3:password:SNMPv3 Password:pass',
518
            '1.3.6.1.4.1.25623.1.0.105076:4:radio:SNMPv3 Authentication Algorithm:some auth algo',
519
            '1.3.6.1.4.1.25623.1.0.105076:5:password:SNMPv3 Privacy Password:privacy pass',
520
            '1.3.6.1.4.1.25623.1.0.105076:6:radio:SNMPv3 Privacy Algorithm:privacy algo',
521
        ]
522
        cred_dict = {
523
            'ssh': {
524
                'type': 'ssh',
525
                'port': '22',
526
                'username': 'username',
527
                'password': 'pass',
528
            },
529
            'smb': {'type': 'smb', 'username': 'username', 'password': 'pass'},
530
            'esxi': {
531
                'type': 'esxi',
532
                'username': 'username',
533
                'password': 'pass',
534
            },
535
            'snmp': {
536
                'type': 'snmp',
537
                'username': 'username',
538
                'password': 'pass',
539
                'community': 'some comunity',
540
                'auth_algorithm': 'some auth algo',
541
                'privacy_password': 'privacy pass',
542
                'privacy_algorithm': 'privacy algo',
543
            },
544
        }
545
        self.maxDiff = None
546
        ret = w.build_credentials_as_prefs(cred_dict)
547
        self.assertEqual(len(ret), len(cred_out))
548
        self.assertIn('auth_port_ssh|||22', cred_out)
549
        self.assertIn(
550
            '1.3.6.1.4.1.25623.1.0.90023:1:entry:SMB login:|||username',
551
            cred_out,
552
        )
553
554
    def test_build_credentials_ssh_up(self, mock_nvti, mock_db):
555
        w = DummyDaemon(mock_nvti, mock_db)
556
        cred_out = [
557
            'auth_port_ssh|||22',
558
            '1.3.6.1.4.1.25623.1.0.103591:1:entry:SSH login name:|||username',
559
            '1.3.6.1.4.1.25623.1.0.103591:3:password:SSH password (unsafe!):|||pass',
560
        ]
561
        cred_dict = {
562
            'ssh': {
563
                'type': 'up',
564
                'port': '22',
565
                'username': 'username',
566
                'password': 'pass',
567
            }
568
        }
569
        self.maxDiff = None
570
        ret = w.build_credentials_as_prefs(cred_dict)
571
        self.assertEqual(ret, cred_out)
572
573
    def test_build_alive_test_opt_empty(self, mock_nvti, mock_db):
574
        w = DummyDaemon(mock_nvti, mock_db)
575
        target_options_dict = {'alive_test': '0'}
576
577
        self.maxDiff = None
578
        ret = w.build_alive_test_opt_as_prefs(target_options_dict)
579
        self.assertEqual(ret, [])
580
581
    def test_build_alive_test_opt(self, mock_nvti, mock_db):
582
        w = DummyDaemon(mock_nvti, mock_db)
583
        alive_test_out = [
584
            "1.3.6.1.4.1.25623.1.0.100315:1:checkbox:Do a TCP ping|||no",
585
            "1.3.6.1.4.1.25623.1.0.100315:2:checkbox:TCP ping tries also TCP-SYN ping|||no",
586
            "1.3.6.1.4.1.25623.1.0.100315:7:checkbox:TCP ping tries only TCP-SYN ping|||no",
587
            "1.3.6.1.4.1.25623.1.0.100315:3:checkbox:Do an ICMP ping|||yes",
588
            "1.3.6.1.4.1.25623.1.0.100315:4:checkbox:Use ARP|||no",
589
            "1.3.6.1.4.1.25623.1.0.100315:5:checkbox:Mark unrechable Hosts as dead (not scanning)|||yes",
590
        ]
591
        target_options_dict = {'alive_test': '2'}
592
593
        self.maxDiff = None
594
        ret = w.build_alive_test_opt_as_prefs(target_options_dict)
595
        self.assertEqual(ret, alive_test_out)
596
597
    def test_build_alive_test_opt_fail_1(self, mock_nvti, mock_db):
598
        w = DummyDaemon(mock_nvti, mock_db)
599
        target_options_dict = {'alive_test': 'a'}
600
601
        self.maxDiff = None
602
        logging.Logger.debug = Mock()
603
        ret = w.build_alive_test_opt_as_prefs(target_options_dict)
604
        if hasattr(Mock, 'assert_called_once'):
605
            logging.Logger.debug.assert_called_once()
606
607
    def test_process_vts(self, mock_nvti, mock_db):
608
        vts = {
609
            '1.3.6.1.4.1.25623.1.0.100061': {'1': 'new value'},
610
            'vt_groups': ['family=debian', 'family=general'],
611
        }
612
        vt_out = (
613
            ['1.3.6.1.4.1.25623.1.0.100061'],
614
            {'1.3.6.1.4.1.25623.1.0.100061:1:entry:Data length :': 'new value'},
615
        )
616
617
        w = DummyDaemon(mock_nvti, mock_db)
618
        w.load_vts()
619
        w.temp_vts_dict = w.vts
620
        ret = w.process_vts(vts)
621
        self.assertEqual(ret, vt_out)
622
623
    def test_process_vts_bad_param_id(self, mock_nvti, mock_db):
624
        vts = {
625
            '1.3.6.1.4.1.25623.1.0.100061': {'3': 'new value'},
626
            'vt_groups': ['family=debian', 'family=general'],
627
        }
628
        w = DummyDaemon(mock_nvti, mock_db)
629
        w.load_vts()
630
        w.temp_vts_dict = w.vts
631
        ret = w.process_vts(vts)
632
        self.assertFalse(ret[1])
633
634
    def test_process_vts_not_found(self, mock_nvti, mock_db):
635
        vts = {
636
            '1.3.6.1.4.1.25623.1.0.100065': {'3': 'new value'},
637
            'vt_groups': ['family=debian', 'family=general'],
638
        }
639
        w = DummyDaemon(mock_nvti, mock_db)
640
        w.load_vts()
641
        w.temp_vts_dict = w.vts
642
        logging.Logger.warning = Mock()
643
        ret = w.process_vts(vts)
644
        if hasattr(Mock, 'assert_called_once'):
645
            logging.Logger.warning.assert_called_once()
646
647 View Code Duplication
    def test_get_openvas_timestamp_scan_host_end(self, mock_nvti, mock_db):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
648
        mock_db.get_host_scan_scan_end_time.return_value = '12345'
649
        w = DummyDaemon(mock_nvti, mock_db)
650
651
        target_list = w.create_xml_target()
652
        targets = w.process_targets_element(target_list)
653
654
        w.create_scan('123-456', targets, None, [])
655
        w.get_openvas_timestamp_scan_host('123-456', '192.168.0.1')
656
        for result in w.scan_collection.results_iterator('123-456', False):
657
            self.assertEqual(result.get('value'), '12345')
658
659 View Code Duplication
    def test_get_openvas_timestamp_scan_host_start(self, mock_nvti, mock_db):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
660
        mock_db.get_host_scan_scan_end_time.return_value = None
661
        mock_db.get_host_scan_scan_end_time.return_value = '54321'
662
        w = DummyDaemon(mock_nvti, mock_db)
663
664
        target_list = w.create_xml_target()
665
        targets = w.process_targets_element(target_list)
666
667
        w.create_scan('123-456', targets, None, [])
668
        w.get_openvas_timestamp_scan_host('123-456', '192.168.0.1')
669
        for result in w.scan_collection.results_iterator('123-456', False):
670
            self.assertEqual(result.get('value'), '54321')
671
672
    def test_host_is_finished(self, mock_nvti, mock_db):
673
        mock_db.get_single_item.return_value = 'finished'
674
        w = DummyDaemon(mock_nvti, mock_db)
675
        ret = w.host_is_finished('123-456')
676
        self.assertEqual(ret, True)
677
678
    def test_scan_is_stopped(self, mock_nvti, mock_db):
679
        mock_db.get_single_item.return_value = 'stop_all'
680
        mock_db.kb_connect_item.return_value = mock_db
681
        mock_db.set_redisctx.return_value = None
682
        w = DummyDaemon(mock_nvti, mock_db)
683
        ret = w.scan_is_stopped('123-456')
684
        self.assertEqual(ret, True)
685
686
    def test_feed_is_healthy_true(
687
        self, mock_nvti: MagicMock, mock_db: MagicMock,
688
    ):
689
        w = DummyDaemon(mock_nvti, mock_db)
690
691
        mock_nvti.get_redix_context.return_value = None
692
        mock_db.get_key_count.side_effect = [2, 2]
693
        w.vts = ["a", "b"]
694
695
        ret = w.feed_is_healthy()
696
        self.assertTrue(ret)
697
698
    def test_feed_is_healthy_false(
699
        self, mock_nvti: MagicMock, mock_db: MagicMock,
700
    ):
701
        w = DummyDaemon(mock_nvti, mock_db)
702
703
        mock_nvti.get_redix_context.return_value = None
704
        mock_db.get_key_count.side_effect = [1, 2]
705
        w.vts = ["a", "b"]
706
707
        ret = w.feed_is_healthy()
708
        self.assertFalse(ret)
709
710
    @patch('ospd_openvas.daemon.Path.exists')
711
    @patch('ospd_openvas.daemon.OSPDopenvas.set_params_from_openvas_settings')
712
    def test_feed_is_outdated_none(
713
        self,
714
        mock_set_params: MagicMock,
715
        mock_path_exists: MagicMock,
716
        mock_nvti: MagicMock,
717
        mock_db: MagicMock,
718
    ):
719
        w = DummyDaemon(mock_nvti, mock_db)
720
        w.scan_only_params['plugins_folder'] = '/foo/bar'
721
722
        mock_path_exists.return_value = False
723
724
        # Return None
725
        ret = w.feed_is_outdated('1234')
726
        self.assertIsNone(ret)
727
728
        self.assertEqual(mock_set_params.call_count, 1)
729
        self.assertEqual(mock_path_exists.call_count, 1)
730
731 View Code Duplication
    @patch('ospd_openvas.daemon.Path.exists')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
732
    @patch('ospd_openvas.daemon.Path.open')
733
    def test_feed_is_outdated_true(
734
        self,
735
        mock_path_open: MagicMock,
736
        mock_path_exists: MagicMock,
737
        mock_nvti: MagicMock,
738
        mock_db: MagicMock,
739
    ):
740
        read_data = 'PLUGIN_SET = "1235";'
741
742
        mock_path_exists.return_value = True
743
        mock_read = MagicMock(name='Path open context manager')
744
        mock_read.__enter__ = MagicMock(return_value=io.StringIO(read_data))
745
        mock_path_open.return_value = mock_read
746
747
        w = DummyDaemon(mock_nvti, mock_db)
748
749
        # Return True
750
        w.scan_only_params['plugins_folder'] = '/foo/bar'
751
752
        ret = w.feed_is_outdated('1234')
753
        self.assertTrue(ret)
754
755
        self.assertEqual(mock_path_exists.call_count, 1)
756
        self.assertEqual(mock_path_open.call_count, 1)
757
758 View Code Duplication
    @patch('ospd_openvas.daemon.Path.exists')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
759
    @patch('ospd_openvas.daemon.Path.open')
760
    def test_feed_is_outdated_false(
761
        self,
762
        mock_path_open: MagicMock,
763
        mock_path_exists: MagicMock,
764
        mock_nvti: MagicMock,
765
        mock_db: MagicMock,
766
    ):
767
        mock_path_exists.return_value = True
768
769
        read_data = 'PLUGIN_SET = "1234"'
770
        mock_path_exists.return_value = True
771
        mock_read = MagicMock(name='Path open context manager')
772
        mock_read.__enter__ = MagicMock(return_value=io.StringIO(read_data))
773
        mock_path_open.return_value = mock_read
774
775
        w = DummyDaemon(mock_nvti, mock_db)
776
        w.scan_only_params['plugins_folder'] = '/foo/bar'
777
778
        ret = w.feed_is_outdated('1234')
779
        self.assertFalse(ret)
780
781
        self.assertEqual(mock_path_exists.call_count, 1)
782
        self.assertEqual(mock_path_open.call_count, 1)
783
784
    @patch('ospd_openvas.daemon.OSPDaemon.add_scan_log')
785
    def test_get_openvas_result(self, mock_ospd, mock_nvti, mock_db):
786
        results = ["LOG||| |||general/Host_Details||| |||Host dead", None]
787
        mock_db.get_result.side_effect = results
788
        w = DummyDaemon(mock_nvti, mock_db)
789
        w.load_vts()
790
        mock_ospd.return_value = None
791
        w.get_openvas_result('123-456', 'localhost')
792
        mock_ospd.assert_called_with(
793
            '123-456',
794
            host='localhost',
795
            hostname='',
796
            name='',
797
            port='general/Host_Details',
798
            qod='',
799
            test_id='',
800
            value='Host dead',
801
        )
802
803
    @patch('ospd_openvas.daemon.OSPDaemon.add_scan_log')
804
    def test_get_openvas_result_escaped(self, mock_ospd, mock_nvti, mock_db):
805
        results = [
806
            "LOG||| |||general/Host_Details|||1.3.6.1.4.1.25623.1.0.100061|||Alive",
807
            None,
808
        ]
809
        mock_db.get_result.side_effect = results
810
        w = DummyDaemon(mock_nvti, mock_db)
811
        w.load_vts()
812
        mock_ospd.return_value = None
813
        mock_nvti.QOD_TYPES.__getitem__.return_value = ''
814
        w.get_openvas_result('123-456', 'localhost')
815
816
        mock_ospd.assert_called_with(
817
            '123-456',
818
            host='localhost',
819
            hostname='',
820
            name='Mantis Detection &amp; foo',
821
            port='general/Host_Details',
822
            qod='',
823
            test_id='1.3.6.1.4.1.25623.1.0.100061',
824
            value='Alive',
825
        )
826
827
    @patch('ospd_openvas.daemon.OSPDaemon.set_scan_host_progress')
828
    def test_update_progress(self, mock_ospd, mock_nvti, mock_db):
829
        msg = '0/-1'
830
        w = DummyDaemon(mock_nvti, mock_db)
831
        target_list = w.create_xml_target()
832
        targets = w.process_targets_element(target_list)
833
834
        w.create_scan('123-456', targets, None, [])
835
836
        mock_ospd.return_value = None
837
        w.update_progress('123-456', 'localhost', 'localhost', msg)
838
        mock_ospd.assert_called_with('123-456', 'localhost', 'localhost', 100)
839
840
841
class TestFilters(TestCase):
842
    def test_format_vt_modification_time(self):
843
        ovformat = OpenVasVtsFilter()
844
        td = '1517443741'
845
        formatted = ovformat.format_vt_modification_time(td)
846
        self.assertEqual(formatted, "20180201000901")
847