Passed
Pull Request — master (#223)
by Juan José
01:56
created

TestOspdOpenvas.test_build_credentials_ssh_up()   A

Complexity

Conditions 1

Size

Total Lines 20
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 14
nop 1
dl 0
loc 20
rs 9.7
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018-2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-2.0-or-later
5
#
6
# This program is free software; you can redistribute it and/or
7
# modify it under the terms of the GNU General Public License
8
# as published by the Free Software Foundation; either version 2
9
# of the License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
20
# pylint: disable=invalid-name,line-too-long,no-value-for-parameter
21
22
""" Unit Test for ospd-openvas """
23
24
import io
25
import logging
26
27
from unittest import TestCase
28
from unittest.mock import patch, Mock, MagicMock
29
30
from ospd.vts import Vts
31
from ospd.protocol import OspRequest
32
33
from tests.dummydaemon import DummyDaemon
34
from tests.helper import assert_called_once
35
36
from ospd_openvas.daemon import OSPD_PARAMS, OpenVasVtsFilter
37
from ospd_openvas.openvas import Openvas
38
39
OSPD_PARAMS_OUT = {
40
    'auto_enable_dependencies': {
41
        'type': 'boolean',
42
        'name': 'auto_enable_dependencies',
43
        'default': 1,
44
        'mandatory': 1,
45
        'description': 'Automatically enable the plugins that are depended on',
46
    },
47
    'cgi_path': {
48
        'type': 'string',
49
        'name': 'cgi_path',
50
        'default': '/cgi-bin:/scripts',
51
        'mandatory': 1,
52
        'description': 'Look for default CGIs in /cgi-bin and /scripts',
53
    },
54
    'checks_read_timeout': {
55
        'type': 'integer',
56
        'name': 'checks_read_timeout',
57
        'default': 5,
58
        'mandatory': 1,
59
        'description': 'Number  of seconds that the security checks will '
60
        'wait for when doing a recv()',
61
    },
62
    'drop_privileges': {
63
        'type': 'boolean',
64
        'name': 'drop_privileges',
65
        'default': 0,
66
        'mandatory': 1,
67
        'description': '',
68
    },
69
    'network_scan': {
70
        'type': 'boolean',
71
        'name': 'network_scan',
72
        'default': 0,
73
        'mandatory': 1,
74
        'description': '',
75
    },
76
    'non_simult_ports': {
77
        'type': 'string',
78
        'name': 'non_simult_ports',
79
        'default': '22',
80
        'mandatory': 1,
81
        'description': 'Prevent to make two connections on the same given '
82
        'ports at the same time.',
83
    },
84
    'open_sock_max_attempts': {
85
        'type': 'integer',
86
        'name': 'open_sock_max_attempts',
87
        'default': 5,
88
        'mandatory': 0,
89
        'description': 'Number of unsuccessful retries to open the socket '
90
        'before to set the port as closed.',
91
    },
92
    'timeout_retry': {
93
        'type': 'integer',
94
        'name': 'timeout_retry',
95
        'default': 5,
96
        'mandatory': 0,
97
        'description': 'Number of retries when a socket connection attempt '
98
        'timesout.',
99
    },
100
    'optimize_test': {
101
        'type': 'integer',
102
        'name': 'optimize_test',
103
        'default': 5,
104
        'mandatory': 0,
105
        'description': 'By default, openvas does not trust the remote '
106
        'host banners.',
107
    },
108
    'plugins_timeout': {
109
        'type': 'integer',
110
        'name': 'plugins_timeout',
111
        'default': 5,
112
        'mandatory': 0,
113
        'description': 'This is the maximum lifetime, in seconds of a plugin.',
114
    },
115
    'report_host_details': {
116
        'type': 'boolean',
117
        'name': 'report_host_details',
118
        'default': 1,
119
        'mandatory': 1,
120
        'description': '',
121
    },
122
    'safe_checks': {
123
        'type': 'boolean',
124
        'name': 'safe_checks',
125
        'default': 1,
126
        'mandatory': 1,
127
        'description': 'Disable the plugins with potential to crash '
128
        'the remote services',
129
    },
130
    'scanner_plugins_timeout': {
131
        'type': 'integer',
132
        'name': 'scanner_plugins_timeout',
133
        'default': 36000,
134
        'mandatory': 1,
135
        'description': 'Like plugins_timeout, but for ACT_SCANNER plugins.',
136
    },
137
    'time_between_request': {
138
        'type': 'integer',
139
        'name': 'time_between_request',
140
        'default': 0,
141
        'mandatory': 0,
142
        'description': 'Allow to set a wait time between two actions '
143
        '(open, send, close).',
144
    },
145
    'unscanned_closed': {
146
        'type': 'boolean',
147
        'name': 'unscanned_closed',
148
        'default': 1,
149
        'mandatory': 1,
150
        'description': '',
151
    },
152
    'unscanned_closed_udp': {
153
        'type': 'boolean',
154
        'name': 'unscanned_closed_udp',
155
        'default': 1,
156
        'mandatory': 1,
157
        'description': '',
158
    },
159
    'expand_vhosts': {
160
        'type': 'boolean',
161
        'name': 'expand_vhosts',
162
        'default': 1,
163
        'mandatory': 0,
164
        'description': 'Whether to expand the target hosts '
165
        + 'list of vhosts with values gathered from sources '
166
        + 'such as reverse-lookup queries and VT checks '
167
        + 'for SSL/TLS certificates.',
168
    },
169
    'test_empty_vhost': {
170
        'type': 'boolean',
171
        'name': 'test_empty_vhost',
172
        'default': 0,
173
        'mandatory': 0,
174
        'description': 'If  set  to  yes, the scanner will '
175
        + 'also test the target by using empty vhost value '
176
        + 'in addition to the targets associated vhost values.',
177
    },
178
}
179
180
181
class TestOspdOpenvas(TestCase):
182
    @patch('ospd_openvas.daemon.Openvas')
183
    def test_set_params_from_openvas_settings(self, mock_openvas: Openvas):
184
        mock_openvas.get_settings.return_value = {
185
            'non_simult_ports': '22',
186
            'plugins_folder': '/foo/bar',
187
        }
188
        w = DummyDaemon()
189
        w.set_params_from_openvas_settings()
190
191
        self.assertEqual(mock_openvas.get_settings.call_count, 1)
192
        self.assertEqual(OSPD_PARAMS, OSPD_PARAMS_OUT)
193
        self.assertEqual(w.scan_only_params.get('plugins_folder'), '/foo/bar')
194
195
    @patch('ospd_openvas.daemon.Openvas')
196
    def test_sudo_available(self, mock_openvas):
197
        mock_openvas.check_sudo.return_value = True
198
199
        w = DummyDaemon()
200
        w._sudo_available = None  # pylint: disable=protected-access
201
        w.sudo_available  # pylint: disable=pointless-statement
202
203
        self.assertTrue(w.sudo_available)
204
205
    def test_load_vts(self,):
206
        w = DummyDaemon()
207
        w.load_vts()
208
209
        self.assertIsInstance(w.vts, type(Vts()))
210
        self.assertEqual(len(w.vts), len(w.VTS))
211
212
    def test_get_custom_xml(self):
213
        out = (
214
            '<custom>'
215
            '<required_ports>Services/www, 80</required_ports>'
216
            '<category>3</category>'
217
            '<excluded_keys>Settings/disable_cgi_scanning</excluded_keys>'
218
            '<family>Product detection</family>'
219
            '<filename>mantis_detect.nasl</filename>'
220
            '<timeout>0</timeout>'
221
            '</custom>'
222
        )
223
        w = DummyDaemon()
224
225
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
226
        res = w.get_custom_vt_as_xml_str(
227
            '1.3.6.1.4.1.25623.1.0.100061', vt.get('custom')
228
        )
229
        self.assertEqual(len(res), len(out))
230
231
    def test_get_custom_xml_failed(self):
232
        w = DummyDaemon()
233
        logging.Logger.warning = Mock()
234
235
        custom = {'a': u"\u0006"}
236
        w.get_custom_vt_as_xml_str(
237
            '1.3.6.1.4.1.25623.1.0.100061', custom=custom
238
        )
239
240
        assert_called_once(logging.Logger.warning)
241
242
    def test_get_severities_xml(self):
243
        w = DummyDaemon()
244
245
        out = (
246
            '<severities>'
247
            '<severity type="cvss_base_v2">'
248
            'AV:N/AC:L/Au:N/C:N/I:N/A:N'
249
            '</severity>'
250
            '</severities>'
251
        )
252
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
253
        severities = vt.get('severities')
254
        res = w.get_severities_vt_as_xml_str(
255
            '1.3.6.1.4.1.25623.1.0.100061', severities
256
        )
257
258
        self.assertEqual(res, out)
259
260
    def test_get_severities_xml_failed(self):
261
        w = DummyDaemon()
262
        logging.Logger.warning = Mock()
263
264
        sever = {'severity_base_vector': u"\u0006"}
265
        w.get_severities_vt_as_xml_str(
266
            '1.3.6.1.4.1.25623.1.0.100061', severities=sever
267
        )
268
269
        assert_called_once(logging.Logger.warning)
270
271
    def test_get_params_xml(self):
272
        w = DummyDaemon()
273
        out = (
274
            '<params>'
275
            '<param type="checkbox" id="2">'
276
            '<name>Do not randomize the  order  in  which ports are '
277
            'scanned</name>'
278
            '<default>no</default>'
279
            '</param>'
280
            '<param type="entry" id="1">'
281
            '<name>Data length :</name>'
282
            '</param>'
283
            '</params>'
284
        )
285
286
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
287
        params = vt.get('vt_params')
288
        res = w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params)
289
290
        self.assertEqual(len(res), len(out))
291
292
    def test_get_params_xml_failed(self):
293
        w = DummyDaemon()
294
        logging.Logger.warning = Mock()
295
296
        params = {
297
            '1': {
298
                'id': '1',
299
                'type': 'entry',
300
                'default': u'\u0006',
301
                'name': 'dns-fuzz.timelimit',
302
                'description': 'Description',
303
            }
304
        }
305
        w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params)
306
307
        assert_called_once(logging.Logger.warning)
308
309
    def test_get_refs_xml(self):
310
        w = DummyDaemon()
311
312
        out = '<refs><ref type="url" id="http://www.mantisbt.org/"/></refs>'
313
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
314
        refs = vt.get('vt_refs')
315
        res = w.get_refs_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', refs)
316
317
        self.assertEqual(res, out)
318
319
    def test_get_dependencies_xml(self):
320
        w = DummyDaemon()
321
322
        out = (
323
            '<dependencies>'
324
            '<dependency vt_id="1.2.3.4"/><dependency vt_id="4.3.2.1"/>'
325
            '</dependencies>'
326
        )
327
        dep = ['1.2.3.4', '4.3.2.1']
328
        res = w.get_dependencies_vt_as_xml_str(
329
            '1.3.6.1.4.1.25623.1.0.100061', dep
330
        )
331
332
        self.assertEqual(res, out)
333
334
    def test_get_dependencies_xml_failed(self):
335
        w = DummyDaemon()
336
        logging.Logger.error = Mock()
337
338
        dep = [u"\u0006"]
339
        w.get_dependencies_vt_as_xml_str(
340
            '1.3.6.1.4.1.25623.1.0.100061', vt_dependencies=dep
341
        )
342
343
        assert_called_once(logging.Logger.error)
344
345
    def test_get_ctime_xml(self):
346
        w = DummyDaemon()
347
348
        out = '<creation_time>1237458156</creation_time>'
349
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
350
        ctime = vt.get('creation_time')
351
        res = w.get_creation_time_vt_as_xml_str(
352
            '1.3.6.1.4.1.25623.1.0.100061', ctime
353
        )
354
355
        self.assertEqual(res, out)
356
357
    def test_get_ctime_xml_failed(self):
358
        w = DummyDaemon()
359
        logging.Logger.warning = Mock()
360
361
        ctime = u'\u0006'
362
        w.get_creation_time_vt_as_xml_str(
363
            '1.3.6.1.4.1.25623.1.0.100061', vt_creation_time=ctime
364
        )
365
366
        assert_called_once(logging.Logger.warning)
367
368
    def test_get_mtime_xml(self):
369
        w = DummyDaemon()
370
371
        out = '<modification_time>1533906565</modification_time>'
372
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
373
        mtime = vt.get('modification_time')
374
        res = w.get_modification_time_vt_as_xml_str(
375
            '1.3.6.1.4.1.25623.1.0.100061', mtime
376
        )
377
378
        self.assertEqual(res, out)
379
380
    def test_get_mtime_xml_failed(self):
381
        w = DummyDaemon()
382
        logging.Logger.warning = Mock()
383
384
        mtime = u'\u0006'
385
        w.get_modification_time_vt_as_xml_str(
386
            '1.3.6.1.4.1.25623.1.0.100061', mtime
387
        )
388
389
        assert_called_once(logging.Logger.warning)
390
391
    def test_get_summary_xml(self):
392
        w = DummyDaemon()
393
394
        out = '<summary>some summary</summary>'
395
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
396
        summary = vt.get('summary')
397
        res = w.get_summary_vt_as_xml_str(
398
            '1.3.6.1.4.1.25623.1.0.100061', summary
399
        )
400
401
        self.assertEqual(res, out)
402
403
    def test_get_summary_xml_failed(self):
404
        w = DummyDaemon()
405
406
        summary = u'\u0006'
407
        logging.Logger.warning = Mock()
408
        w.get_summary_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', summary)
409
410
        assert_called_once(logging.Logger.warning)
411
412
    def test_get_impact_xml(self):
413
        w = DummyDaemon()
414
415
        out = '<impact>some impact</impact>'
416
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
417
        impact = vt.get('impact')
418
        res = w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact)
419
420
        self.assertEqual(res, out)
421
422
    def test_get_impact_xml_failed(self):
423
        w = DummyDaemon()
424
        logging.Logger.warning = Mock()
425
426
        impact = u'\u0006'
427
        w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact)
428
429
        assert_called_once(logging.Logger.warning)
430
431
    def test_get_insight_xml(self):
432
        w = DummyDaemon()
433
434
        out = '<insight>some insight</insight>'
435
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
436
        insight = vt.get('insight')
437
        res = w.get_insight_vt_as_xml_str(
438
            '1.3.6.1.4.1.25623.1.0.100061', insight
439
        )
440
441
        self.assertEqual(res, out)
442
443
    def test_get_insight_xml_failed(self):
444
        w = DummyDaemon()
445
        logging.Logger.warning = Mock()
446
447
        insight = u'\u0006'
448
        w.get_insight_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', insight)
449
450
        assert_called_once(logging.Logger.warning)
451
452
    def test_get_solution_xml(self):
453
        w = DummyDaemon()
454
455
        out = (
456
            '<solution type="WillNotFix" method="DebianAPTUpgrade">'
457
            'some solution'
458
            '</solution>'
459
        )
460
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
461
        solution = vt.get('solution')
462
        solution_type = vt.get('solution_type')
463
        solution_method = vt.get('solution_method')
464
465
        res = w.get_solution_vt_as_xml_str(
466
            '1.3.6.1.4.1.25623.1.0.100061',
467
            solution,
468
            solution_type,
469
            solution_method,
470
        )
471
472
        self.assertEqual(res, out)
473
474
    def test_get_solution_xml_failed(self):
475
        w = DummyDaemon()
476
        logging.Logger.warning = Mock()
477
478
        solution = u'\u0006'
479
        w.get_solution_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', solution)
480
481
        assert_called_once(logging.Logger.warning)
482
483
    def test_get_detection_xml(self):
484
        w = DummyDaemon()
485
486
        out = '<detection qod_type="remote_banner"/>'
487
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
488
        detection_type = vt.get('qod_type')
489
490
        res = w.get_detection_vt_as_xml_str(
491
            '1.3.6.1.4.1.25623.1.0.100061', qod_type=detection_type
492
        )
493
494
        self.assertEqual(res, out)
495
496
    def test_get_detection_xml_failed(self):
497
        w = DummyDaemon()
498
        logging.Logger.warning = Mock()
499
500
        detection = u'\u0006'
501
        w.get_detection_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', detection)
502
503
        assert_called_once(logging.Logger.warning)
504
505
    def test_get_affected_xml(self):
506
        w = DummyDaemon()
507
        out = '<affected>some affection</affected>'
508
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
509
        affected = vt.get('affected')
510
511
        res = w.get_affected_vt_as_xml_str(
512
            '1.3.6.1.4.1.25623.1.0.100061', affected=affected
513
        )
514
515
        self.assertEqual(res, out)
516
517
    def test_get_affected_xml_failed(self):
518
        w = DummyDaemon()
519
        logging.Logger.warning = Mock()
520
521
        affected = u"\u0006" + "affected"
522
        w.get_affected_vt_as_xml_str(
523
            '1.3.6.1.4.1.25623.1.0.100061', affected=affected
524
        )
525
526
        assert_called_once(logging.Logger.warning)
527
528
    def test_feed_is_healthy_true(self):
529
        w = DummyDaemon()
530
531
        w.nvti.get_nvt_count.return_value = 2
532
        w.nvti.get_nvt_files_count.return_value = 2
533
        w.vts = ["a", "b"]
534
535
        ret = w.feed_is_healthy()
536
        self.assertTrue(ret)
537
538
    def test_feed_is_healthy_false(self):
539
        w = DummyDaemon()
540
541
        w.nvti.get_nvt_count.return_value = 1
542
        w.nvti.get_nvt_files_count.return_value = 2
543
544
        w.vts = ["a", "b"]
545
546
        ret = w.feed_is_healthy()
547
548
        self.assertFalse(ret)
549
550
        w.nvti.get_nvt_count.return_value = 2
551
        w.nvti.get_nvt_files_count.return_value = 1
552
553
        ret = w.feed_is_healthy()
554
555
        self.assertFalse(ret)
556
557
        w.nvti.get_nvt_count.return_value = 2
558
        w.nvti.get_nvt_files_count.return_value = 2
559
560
        w.vts = ["a"]
561
562
        ret = w.feed_is_healthy()
563
564
        self.assertFalse(ret)
565
566
    @patch('ospd_openvas.daemon.Path.exists')
567
    @patch('ospd_openvas.daemon.OSPDopenvas.set_params_from_openvas_settings')
568
    def test_feed_is_outdated_none(
569
        self, mock_set_params: MagicMock, mock_path_exists: MagicMock
570
    ):
571
        w = DummyDaemon()
572
573
        w.scan_only_params['plugins_folder'] = '/foo/bar'
574
575
        # Return None
576
        mock_path_exists.return_value = False
577
578
        ret = w.feed_is_outdated('1234')
579
        self.assertIsNone(ret)
580
581
        self.assertEqual(mock_set_params.call_count, 1)
582
        self.assertEqual(mock_path_exists.call_count, 1)
583
584
    @patch('ospd_openvas.daemon.Path.exists')
585
    @patch('ospd_openvas.daemon.Path.open')
586
    def test_feed_is_outdated_true(
587
        self, mock_path_open: MagicMock, mock_path_exists: MagicMock,
588
    ):
589
        read_data = 'PLUGIN_SET = "1235";'
590
591
        mock_path_exists.return_value = True
592
        mock_read = MagicMock(name='Path open context manager')
593
        mock_read.__enter__ = MagicMock(return_value=io.StringIO(read_data))
594
        mock_path_open.return_value = mock_read
595
596
        w = DummyDaemon()
597
598
        # Return True
599
        w.scan_only_params['plugins_folder'] = '/foo/bar'
600
601
        ret = w.feed_is_outdated('1234')
602
        self.assertTrue(ret)
603
604
        self.assertEqual(mock_path_exists.call_count, 1)
605
        self.assertEqual(mock_path_open.call_count, 1)
606
607
    @patch('ospd_openvas.daemon.Path.exists')
608
    @patch('ospd_openvas.daemon.Path.open')
609
    def test_feed_is_outdated_false(
610
        self, mock_path_open: MagicMock, mock_path_exists: MagicMock,
611
    ):
612
        mock_path_exists.return_value = True
613
614
        read_data = 'PLUGIN_SET = "1234"'
615
        mock_path_exists.return_value = True
616
        mock_read = MagicMock(name='Path open context manager')
617
        mock_read.__enter__ = MagicMock(return_value=io.StringIO(read_data))
618
        mock_path_open.return_value = mock_read
619
620
        w = DummyDaemon()
621
        w.scan_only_params['plugins_folder'] = '/foo/bar'
622
623
        ret = w.feed_is_outdated('1234')
624
        self.assertFalse(ret)
625
626
        self.assertEqual(mock_path_exists.call_count, 1)
627
        self.assertEqual(mock_path_open.call_count, 1)
628
629
    @patch('ospd_openvas.daemon.ScanDB')
630
    @patch('ospd_openvas.daemon.OSPDaemon.add_scan_log')
631
    def test_get_openvas_result(self, mock_add_scan_log, MockDBClass):
632
        w = DummyDaemon()
633
        mock_db = MockDBClass.return_value
634
635
        results = ["LOG||| |||general/Host_Details||| |||Host dead", None]
636
        mock_db.get_result.side_effect = results
637
        mock_add_scan_log.return_value = None
638
639
        w.load_vts()
640
        w.report_openvas_results(mock_db, '123-456', 'localhost')
641
642
        mock_add_scan_log.assert_called_with(
643
            '123-456',
644
            host='localhost',
645
            hostname='',
646
            name='',
647
            port='general/Host_Details',
648
            qod='',
649
            test_id='',
650
            value='Host dead',
651
        )
652
653
    @patch('ospd_openvas.daemon.OSPDaemon.set_scan_host_progress')
654
    def test_update_progress(self, mock_set_scan_host_progress):
655
        w = DummyDaemon()
656
657
        mock_set_scan_host_progress.return_value = None
658
659
        msg = '0/-1'
660
        target_element = w.create_xml_target()
661
        targets = OspRequest.process_target_element(target_element)
662
663
        w.create_scan('123-456', targets, None, [])
664
        w.update_progress('123-456', 'localhost', msg)
665
666
        mock_set_scan_host_progress.assert_called_with(
667
            '123-456', 'localhost', 100
668
        )
669
670
671
class TestFilters(TestCase):
672
    def test_format_vt_modification_time(self):
673
        ovformat = OpenVasVtsFilter()
674
        td = '1517443741'
675
        formatted = ovformat.format_vt_modification_time(td)
676
        self.assertEqual(formatted, "20180201000901")
677