Completed
Push — master ( a4dee7...a3f003 )
by Juan José
18s queued 14s
created

TestOspdOpenvas.test_get_openvas_result_hosts_count()   A

Complexity

Conditions 1

Size

Total Lines 17
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 14
nop 2
dl 0
loc 17
rs 9.7
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
# pylint: disable=invalid-name,line-too-long,no-value-for-parameter
21
22
""" Unit Test for ospd-openvas """
23
24
import io
25
import logging
26
import psutil
27
28
from unittest import TestCase
29
from unittest.mock import patch, Mock, MagicMock
30
31
from ospd.vts import Vts
32
from ospd.protocol import OspRequest
33
34
from tests.dummydaemon import DummyDaemon
35
from tests.helper import assert_called_once
36
37
from ospd_openvas.daemon import OSPD_PARAMS, OpenVasVtsFilter
38
from ospd_openvas.openvas import Openvas
39
40
OSPD_PARAMS_OUT = {
41
    'auto_enable_dependencies': {
42
        'type': 'boolean',
43
        'name': 'auto_enable_dependencies',
44
        'default': 1,
45
        'mandatory': 1,
46
        'visible_for_client': True,
47
        'description': 'Automatically enable the plugins that are depended on',
48
    },
49
    'cgi_path': {
50
        'type': 'string',
51
        'name': 'cgi_path',
52
        'default': '/cgi-bin:/scripts',
53
        'mandatory': 1,
54
        'visible_for_client': True,
55
        'description': 'Look for default CGIs in /cgi-bin and /scripts',
56
    },
57
    'checks_read_timeout': {
58
        'type': 'integer',
59
        'name': 'checks_read_timeout',
60
        'default': 5,
61
        'mandatory': 1,
62
        'visible_for_client': True,
63
        'description': (
64
            'Number  of seconds that the security checks will '
65
            + 'wait for when doing a recv()'
66
        ),
67
    },
68
    'non_simult_ports': {
69
        'type': 'string',
70
        'name': 'non_simult_ports',
71
        'default': '139, 445, 3389, Services/irc',
72
        'mandatory': 1,
73
        'visible_for_client': True,
74
        'description': (
75
            'Prevent to make two connections on the same given '
76
            + 'ports at the same time.'
77
        ),
78
    },
79
    'open_sock_max_attempts': {
80
        'type': 'integer',
81
        'name': 'open_sock_max_attempts',
82
        'default': 5,
83
        'mandatory': 0,
84
        'visible_for_client': True,
85
        'description': (
86
            'Number of unsuccessful retries to open the socket '
87
            + 'before to set the port as closed.'
88
        ),
89
    },
90
    'timeout_retry': {
91
        'type': 'integer',
92
        'name': 'timeout_retry',
93
        'default': 5,
94
        'mandatory': 0,
95
        'visible_for_client': True,
96
        'description': (
97
            'Number of retries when a socket connection attempt ' + 'timesout.'
98
        ),
99
    },
100
    'optimize_test': {
101
        'type': 'boolean',
102
        'name': 'optimize_test',
103
        'default': 1,
104
        'mandatory': 0,
105
        'visible_for_client': True,
106
        'description': (
107
            'By default, optimize_test is enabled which means openvas does '
108
            + 'trust the remote host banners and is only launching plugins '
109
            + 'against the services they have been designed to check. '
110
            + 'For example it will check a web server claiming to be IIS only '
111
            + 'for IIS related flaws but will skip plugins testing for Apache '
112
            + 'flaws, and so on. This default behavior is used to optimize '
113
            + 'the scanning performance and to avoid false positives. '
114
            + 'If you are not sure that the banners of the remote host '
115
            + 'have been tampered with, you can disable this option.'
116
        ),
117
    },
118
    'plugins_timeout': {
119
        'type': 'integer',
120
        'name': 'plugins_timeout',
121
        'default': 5,
122
        'mandatory': 0,
123
        'visible_for_client': True,
124
        'description': 'This is the maximum lifetime, in seconds of a plugin.',
125
    },
126
    'report_host_details': {
127
        'type': 'boolean',
128
        'name': 'report_host_details',
129
        'default': 1,
130
        'mandatory': 1,
131
        'visible_for_client': True,
132
        'description': '',
133
    },
134
    'safe_checks': {
135
        'type': 'boolean',
136
        'name': 'safe_checks',
137
        'default': 1,
138
        'mandatory': 1,
139
        'visible_for_client': True,
140
        'description': (
141
            'Disable the plugins with potential to crash '
142
            + 'the remote services'
143
        ),
144
    },
145
    'scanner_plugins_timeout': {
146
        'type': 'integer',
147
        'name': 'scanner_plugins_timeout',
148
        'default': 36000,
149
        'mandatory': 1,
150
        'visible_for_client': True,
151
        'description': 'Like plugins_timeout, but for ACT_SCANNER plugins.',
152
    },
153
    'time_between_request': {
154
        'type': 'integer',
155
        'name': 'time_between_request',
156
        'default': 0,
157
        'mandatory': 0,
158
        'visible_for_client': True,
159
        'description': (
160
            'Allow to set a wait time between two actions '
161
            + '(open, send, close).'
162
        ),
163
    },
164
    'unscanned_closed': {
165
        'type': 'boolean',
166
        'name': 'unscanned_closed',
167
        'default': 1,
168
        'mandatory': 1,
169
        'visible_for_client': True,
170
        'description': '',
171
    },
172
    'unscanned_closed_udp': {
173
        'type': 'boolean',
174
        'name': 'unscanned_closed_udp',
175
        'default': 1,
176
        'mandatory': 1,
177
        'visible_for_client': True,
178
        'description': '',
179
    },
180
    'expand_vhosts': {
181
        'type': 'boolean',
182
        'name': 'expand_vhosts',
183
        'default': 1,
184
        'mandatory': 0,
185
        'visible_for_client': True,
186
        'description': 'Whether to expand the target hosts '
187
        + 'list of vhosts with values gathered from sources '
188
        + 'such as reverse-lookup queries and VT checks '
189
        + 'for SSL/TLS certificates.',
190
    },
191
    'test_empty_vhost': {
192
        'type': 'boolean',
193
        'name': 'test_empty_vhost',
194
        'default': 0,
195
        'mandatory': 0,
196
        'visible_for_client': True,
197
        'description': 'If  set  to  yes, the scanner will '
198
        + 'also test the target by using empty vhost value '
199
        + 'in addition to the targets associated vhost values.',
200
    },
201
    'max_hosts': {
202
        'type': 'integer',
203
        'name': 'max_hosts',
204
        'default': 30,
205
        'mandatory': 0,
206
        'visible_for_client': False,
207
        'description': (
208
            'The maximum number of hosts to test at the same time which '
209
            + 'should be given to the client (which can override it). '
210
            + 'This value must be computed given your bandwidth, '
211
            + 'the number of hosts you want to test, your amount of '
212
            + 'memory and the performance of your processor(s).'
213
        ),
214
    },
215
    'max_checks': {
216
        'type': 'integer',
217
        'name': 'max_checks',
218
        'default': 10,
219
        'mandatory': 0,
220
        'visible_for_client': False,
221
        'description': (
222
            'The number of plugins that will run against each host being '
223
            + 'tested. Note that the total number of process will be max '
224
            + 'checks x max_hosts so you need to find a balance between '
225
            + 'these two options. Note that launching too many plugins at '
226
            + 'the same time may disable the remote host, either temporarily '
227
            + '(ie: inetd closes its ports) or definitely (the remote host '
228
            + 'crash because it is asked to do too many things at the '
229
            + 'same time), so be careful.'
230
        ),
231
    },
232
    'port_range': {
233
        'type': 'string',
234
        'name': 'port_range',
235
        'default': '',
236
        'mandatory': 0,
237
        'visible_for_client': False,
238
        'description': (
239
            'This is the default range of ports that the scanner plugins will '
240
            + 'probe. The syntax of this option is flexible, it can be a '
241
            + 'single range ("1-1500"), several ports ("21,23,80"), several '
242
            + 'ranges of ports ("1-1500,32000-33000"). Note that you can '
243
            + 'specify UDP and TCP ports by prefixing each range by T or U. '
244
            + 'For instance, the following range will make openvas scan UDP '
245
            + 'ports 1 to 1024 and TCP ports 1 to 65535 : '
246
            + '"T:1-65535,U:1-1024".'
247
        ),
248
    },
249
    'test_alive_hosts_only': {
250
        'type': 'boolean',
251
        'name': 'test_alive_hosts_only',
252
        'default': 0,
253
        'mandatory': 0,
254
        'visible_for_client': False,
255
        'description': (
256
            'If this option is set, openvas will scan the target list for '
257
            + 'alive hosts in a separate process while only testing those '
258
            + 'hosts which are identified as alive. This boosts the scan '
259
            + 'speed of target ranges with a high amount of dead hosts '
260
            + 'significantly.'
261
        ),
262
    },
263
    'source_iface': {
264
        'type': 'string',
265
        'name': 'source_iface',
266
        'default': '',
267
        'mandatory': 0,
268
        'visible_for_client': False,
269
        'description': (
270
            'Name of the network interface that will be used as the source '
271
            + 'of connections established by openvas. The scan won\'t be '
272
            + 'launched if the value isn\'t authorized according to '
273
            + '(sys_)ifaces_allow / (sys_)ifaces_deny if present.'
274
        ),
275
    },
276
    'ifaces_allow': {
277
        'type': 'string',
278
        'name': 'ifaces_allow',
279
        'default': '',
280
        'mandatory': 0,
281
        'visible_for_client': False,
282
        'description': (
283
            'Comma-separated list of interfaces names that are authorized '
284
            + 'as source_iface values.'
285
        ),
286
    },
287
    'ifaces_deny': {
288
        'type': 'string',
289
        'name': 'ifaces_deny',
290
        'default': '',
291
        'mandatory': 0,
292
        'visible_for_client': False,
293
        'description': (
294
            'Comma-separated list of interfaces names that are not '
295
            + 'authorized as source_iface values.'
296
        ),
297
    },
298
    'hosts_allow': {
299
        'type': 'string',
300
        'name': 'hosts_allow',
301
        'default': '',
302
        'mandatory': 0,
303
        'visible_for_client': False,
304
        'description': (
305
            'Comma-separated list of the only targets that are authorized '
306
            + 'to be scanned. Supports the same syntax as the list targets. '
307
            + 'Both target hostnames and the address to which they resolve '
308
            + 'are checked. Hostnames in hosts_allow list are not resolved '
309
            + 'however.'
310
        ),
311
    },
312
    'hosts_deny': {
313
        'type': 'string',
314
        'name': 'hosts_deny',
315
        'default': '',
316
        'mandatory': 0,
317
        'visible_for_client': False,
318
        'description': (
319
            'Comma-separated list of targets that are not authorized to '
320
            + 'be scanned. Supports the same syntax as the list targets. '
321
            + 'Both target hostnames and the address to which they resolve '
322
            + 'are checked. Hostnames in hosts_deny list are not '
323
            + 'resolved however.'
324
        ),
325
    },
326
    'table_driven_lsc': {
327
        'type': 'boolean',
328
        'name': 'table_driven_lsc',
329
        'default': 0,
330
        'mandatory': 0,
331
        'visible_for_client': False,
332
        'description': (
333
            'If this options is set to "yes", openvas enables the local '
334
            + 'security checks via the table-driven Notus scanner, perfoming '
335
            + 'the Notus metadata checksum check which allows the metadata '
336
            + 'upload into redis.'
337
        ),
338
    },
339
}
340
341
342
class TestOspdOpenvas(TestCase):
343
    @patch('ospd_openvas.daemon.Openvas')
344
    def test_set_params_from_openvas_settings(self, mock_openvas: Openvas):
345
        mock_openvas.get_settings.return_value = {
346
            'non_simult_ports': '139, 445, 3389, Services/irc',
347
            'plugins_folder': '/foo/bar',
348
        }
349
        w = DummyDaemon()
350
        w.set_params_from_openvas_settings()
351
352
        self.assertEqual(mock_openvas.get_settings.call_count, 1)
353
        self.assertEqual(OSPD_PARAMS, OSPD_PARAMS_OUT)
354
        self.assertEqual(w.scan_only_params.get('plugins_folder'), '/foo/bar')
355
356
    @patch('ospd_openvas.daemon.Openvas')
357
    def test_sudo_available(self, mock_openvas):
358
        mock_openvas.check_sudo.return_value = True
359
360
        w = DummyDaemon()
361
        w._sudo_available = None  # pylint: disable=protected-access
362
        w.sudo_available  # pylint: disable=pointless-statement
363
364
        self.assertTrue(w.sudo_available)
365
366
    def test_get_custom_xml(self):
367
        out = (
368
            '<custom>'
369
            '<required_ports>Services/www, 80</required_ports>'
370
            '<category>3</category>'
371
            '<excluded_keys>Settings/disable_cgi_scanning</excluded_keys>'
372
            '<family>Product detection</family>'
373
            '<filename>mantis_detect.nasl</filename>'
374
            '<timeout>0</timeout>'
375
            '</custom>'
376
        )
377
        w = DummyDaemon()
378
379
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
380
        res = w.get_custom_vt_as_xml_str(
381
            '1.3.6.1.4.1.25623.1.0.100061', vt.get('custom')
382
        )
383
        self.assertEqual(len(res), len(out))
384
385
    def test_get_custom_xml_failed(self):
386
        w = DummyDaemon()
387
        logging.Logger.warning = Mock()
388
389
        custom = {'a': u"\u0006"}
390
        w.get_custom_vt_as_xml_str(
391
            '1.3.6.1.4.1.25623.1.0.100061', custom=custom
392
        )
393
394
        assert_called_once(logging.Logger.warning)
395
396
    def test_get_severities_xml(self):
397
        w = DummyDaemon()
398
399
        out = (
400
            '<severities>'
401
            '<severity type="cvss_base_v2">'
402
            '<value>AV:N/AC:L/Au:N/C:N/I:N/A:N</value>'
403
            '<origin>Greenbone</origin>'
404
            '<date>1237458156</date>'
405
            '</severity>'
406
            '</severities>'
407
        )
408
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
409
        severities = vt.get('severities')
410
        res = w.get_severities_vt_as_xml_str(
411
            '1.3.6.1.4.1.25623.1.0.100061', severities
412
        )
413
414
        self.assertEqual(res, out)
415
416
    def test_get_severities_xml_failed(self):
417
        w = DummyDaemon()
418
        logging.Logger.warning = Mock()
419
420
        sever = {'severity_base_vector': u"\u0006"}
421
        w.get_severities_vt_as_xml_str(
422
            '1.3.6.1.4.1.25623.1.0.100061', severities=sever
423
        )
424
425
        assert_called_once(logging.Logger.warning)
426
427
    def test_get_params_xml(self):
428
        w = DummyDaemon()
429
        out = (
430
            '<params>'
431
            '<param type="checkbox" id="2">'
432
            '<name>Do not randomize the  order  in  which ports are '
433
            'scanned</name>'
434
            '<default>no</default>'
435
            '</param>'
436
            '<param type="entry" id="1">'
437
            '<name>Data length :</name>'
438
            '</param>'
439
            '</params>'
440
        )
441
442
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
443
        params = vt.get('vt_params')
444
        res = w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params)
445
446
        self.assertEqual(len(res), len(out))
447
448
    def test_get_params_xml_failed(self):
449
        w = DummyDaemon()
450
        logging.Logger.warning = Mock()
451
452
        params = {
453
            '1': {
454
                'id': '1',
455
                'type': 'entry',
456
                'default': u'\u0006',
457
                'name': 'dns-fuzz.timelimit',
458
                'description': 'Description',
459
            }
460
        }
461
        w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params)
462
463
        assert_called_once(logging.Logger.warning)
464
465
    def test_get_refs_xml(self):
466
        w = DummyDaemon()
467
468
        out = '<refs><ref type="url" id="http://www.mantisbt.org/"/></refs>'
469
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
470
        refs = vt.get('vt_refs')
471
        res = w.get_refs_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', refs)
472
473
        self.assertEqual(res, out)
474
475
    def test_get_dependencies_xml(self):
476
        w = DummyDaemon()
477
478
        out = (
479
            '<dependencies>'
480
            '<dependency vt_id="1.3.6.1.4.1.25623.1.2.3.4"/>'
481
            '<dependency vt_id="1.3.6.1.4.1.25623.4.3.2.1"/>'
482
            '</dependencies>'
483
        )
484
        dep = ['1.3.6.1.4.1.25623.1.2.3.4', '1.3.6.1.4.1.25623.4.3.2.1']
485
        res = w.get_dependencies_vt_as_xml_str(
486
            '1.3.6.1.4.1.25623.1.0.100061', dep
487
        )
488
489
        self.assertEqual(res, out)
490
491
    def test_get_dependencies_xml_missing_dep(self):
492
        w = DummyDaemon()
493
494
        out = (
495
            '<dependencies>'
496
            '<dependency vt_id="1.3.6.1.4.1.25623.1.2.3.4"/>'
497
            '</dependencies>'
498
        )
499
        dep = ['1.3.6.1.4.1.25623.1.2.3.4', 'file_name.nasl']
500
        res = w.get_dependencies_vt_as_xml_str(
501
            '1.3.6.1.4.1.25623.1.0.100061', dep
502
        )
503
504
        self.assertEqual(res, out)
505
506
    def test_get_dependencies_xml_failed(self):
507
        w = DummyDaemon()
508
        logging.Logger.error = Mock()
509
510
        dep = [u"\u0006"]
511
        w.get_dependencies_vt_as_xml_str(
512
            '1.3.6.1.4.1.25623.1.0.100061', vt_dependencies=dep
513
        )
514
515
        assert_called_once(logging.Logger.error)
516
517
    def test_get_ctime_xml(self):
518
        w = DummyDaemon()
519
520
        out = '<creation_time>1237458156</creation_time>'
521
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
522
        ctime = vt.get('creation_time')
523
        res = w.get_creation_time_vt_as_xml_str(
524
            '1.3.6.1.4.1.25623.1.0.100061', ctime
525
        )
526
527
        self.assertEqual(res, out)
528
529
    def test_get_ctime_xml_failed(self):
530
        w = DummyDaemon()
531
        logging.Logger.warning = Mock()
532
533
        ctime = u'\u0006'
534
        w.get_creation_time_vt_as_xml_str(
535
            '1.3.6.1.4.1.25623.1.0.100061', vt_creation_time=ctime
536
        )
537
538
        assert_called_once(logging.Logger.warning)
539
540
    def test_get_mtime_xml(self):
541
        w = DummyDaemon()
542
543
        out = '<modification_time>1533906565</modification_time>'
544
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
545
        mtime = vt.get('modification_time')
546
        res = w.get_modification_time_vt_as_xml_str(
547
            '1.3.6.1.4.1.25623.1.0.100061', mtime
548
        )
549
550
        self.assertEqual(res, out)
551
552
    def test_get_mtime_xml_failed(self):
553
        w = DummyDaemon()
554
        logging.Logger.warning = Mock()
555
556
        mtime = u'\u0006'
557
        w.get_modification_time_vt_as_xml_str(
558
            '1.3.6.1.4.1.25623.1.0.100061', mtime
559
        )
560
561
        assert_called_once(logging.Logger.warning)
562
563
    def test_get_summary_xml(self):
564
        w = DummyDaemon()
565
566
        out = '<summary>some summary</summary>'
567
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
568
        summary = vt.get('summary')
569
        res = w.get_summary_vt_as_xml_str(
570
            '1.3.6.1.4.1.25623.1.0.100061', summary
571
        )
572
573
        self.assertEqual(res, out)
574
575
    def test_get_summary_xml_failed(self):
576
        w = DummyDaemon()
577
578
        summary = u'\u0006'
579
        logging.Logger.warning = Mock()
580
        w.get_summary_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', summary)
581
582
        assert_called_once(logging.Logger.warning)
583
584
    def test_get_impact_xml(self):
585
        w = DummyDaemon()
586
587
        out = '<impact>some impact</impact>'
588
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
589
        impact = vt.get('impact')
590
        res = w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact)
591
592
        self.assertEqual(res, out)
593
594
    def test_get_impact_xml_failed(self):
595
        w = DummyDaemon()
596
        logging.Logger.warning = Mock()
597
598
        impact = u'\u0006'
599
        w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact)
600
601
        assert_called_once(logging.Logger.warning)
602
603
    def test_get_insight_xml(self):
604
        w = DummyDaemon()
605
606
        out = '<insight>some insight</insight>'
607
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
608
        insight = vt.get('insight')
609
        res = w.get_insight_vt_as_xml_str(
610
            '1.3.6.1.4.1.25623.1.0.100061', insight
611
        )
612
613
        self.assertEqual(res, out)
614
615
    def test_get_insight_xml_failed(self):
616
        w = DummyDaemon()
617
        logging.Logger.warning = Mock()
618
619
        insight = u'\u0006'
620
        w.get_insight_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', insight)
621
622
        assert_called_once(logging.Logger.warning)
623
624
    def test_get_solution_xml(self):
625
        w = DummyDaemon()
626
627
        out = (
628
            '<solution type="WillNotFix" method="DebianAPTUpgrade">'
629
            'some solution'
630
            '</solution>'
631
        )
632
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
633
        solution = vt.get('solution')
634
        solution_type = vt.get('solution_type')
635
        solution_method = vt.get('solution_method')
636
637
        res = w.get_solution_vt_as_xml_str(
638
            '1.3.6.1.4.1.25623.1.0.100061',
639
            solution,
640
            solution_type,
641
            solution_method,
642
        )
643
644
        self.assertEqual(res, out)
645
646
    def test_get_solution_xml_failed(self):
647
        w = DummyDaemon()
648
        logging.Logger.warning = Mock()
649
650
        solution = u'\u0006'
651
        w.get_solution_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', solution)
652
653
        assert_called_once(logging.Logger.warning)
654
655
    def test_get_detection_xml(self):
656
        w = DummyDaemon()
657
658
        out = '<detection qod_type="remote_banner"/>'
659
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
660
        detection_type = vt.get('qod_type')
661
662
        res = w.get_detection_vt_as_xml_str(
663
            '1.3.6.1.4.1.25623.1.0.100061', qod_type=detection_type
664
        )
665
666
        self.assertEqual(res, out)
667
668
    def test_get_detection_xml_failed(self):
669
        w = DummyDaemon()
670
        logging.Logger.warning = Mock()
671
672
        detection = u'\u0006'
673
        w.get_detection_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', detection)
674
675
        assert_called_once(logging.Logger.warning)
676
677
    def test_get_affected_xml(self):
678
        w = DummyDaemon()
679
        out = '<affected>some affection</affected>'
680
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
681
        affected = vt.get('affected')
682
683
        res = w.get_affected_vt_as_xml_str(
684
            '1.3.6.1.4.1.25623.1.0.100061', affected=affected
685
        )
686
687
        self.assertEqual(res, out)
688
689
    def test_get_affected_xml_failed(self):
690
        w = DummyDaemon()
691
        logging.Logger.warning = Mock()
692
693
        affected = u"\u0006" + "affected"
694
        w.get_affected_vt_as_xml_str(
695
            '1.3.6.1.4.1.25623.1.0.100061', affected=affected
696
        )
697
698
        assert_called_once(logging.Logger.warning)
699
700
    @patch('ospd_openvas.daemon.Path.exists')
701
    @patch('ospd_openvas.daemon.OSPDopenvas.set_params_from_openvas_settings')
702
    def test_feed_is_outdated_none(
703
        self, mock_set_params: MagicMock, mock_path_exists: MagicMock
704
    ):
705
        w = DummyDaemon()
706
707
        w.scan_only_params['plugins_folder'] = '/foo/bar'
708
709
        # Return None
710
        mock_path_exists.return_value = False
711
712
        ret = w.feed_is_outdated('1234')
713
        self.assertIsNone(ret)
714
715
        self.assertEqual(mock_set_params.call_count, 1)
716
        self.assertEqual(mock_path_exists.call_count, 1)
717
718
    @patch('ospd_openvas.daemon.Path.exists')
719
    @patch('ospd_openvas.daemon.Path.open')
720
    def test_feed_is_outdated_true(
721
        self,
722
        mock_path_open: MagicMock,
723
        mock_path_exists: MagicMock,
724
    ):
725
        read_data = 'PLUGIN_SET = "1235";'
726
727
        mock_path_exists.return_value = True
728
        mock_read = MagicMock(name='Path open context manager')
729
        mock_read.__enter__ = MagicMock(return_value=io.StringIO(read_data))
730
        mock_path_open.return_value = mock_read
731
732
        w = DummyDaemon()
733
734
        # Return True
735
        w.scan_only_params['plugins_folder'] = '/foo/bar'
736
737
        ret = w.feed_is_outdated('1234')
738
        self.assertTrue(ret)
739
740
        self.assertEqual(mock_path_exists.call_count, 1)
741
        self.assertEqual(mock_path_open.call_count, 1)
742
743
    @patch('ospd_openvas.daemon.Path.exists')
744
    @patch('ospd_openvas.daemon.Path.open')
745
    def test_feed_is_outdated_false(
746
        self,
747
        mock_path_open: MagicMock,
748
        mock_path_exists: MagicMock,
749
    ):
750
        mock_path_exists.return_value = True
751
752
        read_data = 'PLUGIN_SET = "1234"'
753
        mock_path_exists.return_value = True
754
        mock_read = MagicMock(name='Path open context manager')
755
        mock_read.__enter__ = MagicMock(return_value=io.StringIO(read_data))
756
        mock_path_open.return_value = mock_read
757
758
        w = DummyDaemon()
759
        w.scan_only_params['plugins_folder'] = '/foo/bar'
760
761
        ret = w.feed_is_outdated('1234')
762
        self.assertFalse(ret)
763
764
        self.assertEqual(mock_path_exists.call_count, 1)
765
        self.assertEqual(mock_path_open.call_count, 1)
766
767
    def test_check_feed_cache_unavailable(self):
768
        w = DummyDaemon()
769
        w.vts.is_cache_available = False
770
        w.feed_is_outdated = Mock()
771
        res = w.check_feed()
772
773
        self.assertFalse(res)
774
        w.feed_is_outdated.assert_not_called()
775
776 View Code Duplication
    @patch('ospd_openvas.daemon.BaseDB')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
777
    @patch('ospd_openvas.daemon.ResultList.add_scan_log_to_list')
778
    def test_get_openvas_result(self, mock_add_scan_log_to_list, MockDBClass):
779
        w = DummyDaemon()
780
781
        target_element = w.create_xml_target()
782
        targets = OspRequest.process_target_element(target_element)
783
        w.create_scan('123-456', targets, None, [])
784
785
        results = [
786
            "LOG|||192.168.0.1|||localhost|||general/Host_Details||||||Host dead",
787
        ]
788
        MockDBClass.get_result.return_value = results
789
        mock_add_scan_log_to_list.return_value = None
790
791
        w.report_openvas_results(MockDBClass, '123-456')
792
        mock_add_scan_log_to_list.assert_called_with(
793
            host='192.168.0.1',
794
            hostname='localhost',
795
            name='',
796
            port='general/Host_Details',
797
            qod='',
798
            test_id='',
799
            uri='',
800
            value='Host dead',
801
        )
802
803 View Code Duplication
    @patch('ospd_openvas.daemon.BaseDB')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
804
    @patch('ospd_openvas.daemon.ResultList.add_scan_error_to_list')
805
    def test_get_openvas_result_host_deny(
806
        self, mock_add_scan_error_to_list, MockDBClass
807
    ):
808
        w = DummyDaemon()
809
810
        target_element = w.create_xml_target()
811
        targets = OspRequest.process_target_element(target_element)
812
        w.create_scan('123-456', targets, None, [])
813
814
        results = [
815
            "ERRMSG|||127.0.0.1|||localhost|||||||||Host access denied.",
816
        ]
817
        MockDBClass.get_result.return_value = results
818
        mock_add_scan_error_to_list.return_value = None
819
820
        w.report_openvas_results(MockDBClass, '123-456')
821
        mock_add_scan_error_to_list.assert_called_with(
822
            host='127.0.0.1',
823
            hostname='localhost',
824
            name='',
825
            port='',
826
            test_id='',
827
            uri='',
828
            value='Host access denied.',
829
        )
830
831
    @patch('ospd_openvas.daemon.BaseDB')
832
    def test_get_openvas_result_dead_hosts(self, MockDBClass):
833
        w = DummyDaemon()
834
        target_element = w.create_xml_target()
835
        targets = OspRequest.process_target_element(target_element)
836
        w.create_scan('123-456', targets, None, [])
837
838
        results = [
839
            "DEADHOST||| ||| ||| ||| |||4",
840
        ]
841
        MockDBClass.get_result.return_value = results
842
        w.scan_collection.set_amount_dead_hosts = MagicMock()
843
844
        w.report_openvas_results(MockDBClass, '123-456')
845
        w.scan_collection.set_amount_dead_hosts.assert_called_with(
846
            '123-456',
847
            total_dead=4,
848
        )
849
850
    @patch('ospd_openvas.daemon.BaseDB')
851
    @patch('ospd_openvas.daemon.ResultList.add_scan_log_to_list')
852
    def test_get_openvas_result_host_start(
853
        self, mock_add_scan_log_to_list, MockDBClass
854
    ):
855
        w = DummyDaemon()
856
        target_element = w.create_xml_target()
857
        targets = OspRequest.process_target_element(target_element)
858
        w.create_scan('123-456', targets, None, [])
859
860
        results = [
861
            "HOST_START|||192.168.10.124||| ||| ||||||today 1",
862
        ]
863
864
        MockDBClass.get_result.return_value = results
865
        mock_add_scan_log_to_list.return_value = None
866
867
        w.report_openvas_results(MockDBClass, '123-456')
868
869
        mock_add_scan_log_to_list.assert_called_with(
870
            host='192.168.10.124',
871
            name='HOST_START',
872
            value='today 1',
873
        )
874
875
    @patch('ospd_openvas.daemon.BaseDB')
876
    def test_get_openvas_result_hosts_count(self, MockDBClass):
877
        w = DummyDaemon()
878
        target_element = w.create_xml_target()
879
        targets = OspRequest.process_target_element(target_element)
880
        w.create_scan('123-456', targets, None, [])
881
882
        results = [
883
            "HOSTS_COUNT||| ||| ||| ||| |||4",
884
        ]
885
        MockDBClass.get_result.return_value = results
886
        w.set_scan_total_hosts = MagicMock()
887
888
        w.report_openvas_results(MockDBClass, '123-456')
889
        w.set_scan_total_hosts.assert_called_with(
890
            '123-456',
891
            4,
892
        )
893
894
    @patch('ospd_openvas.daemon.BaseDB')
895
    @patch('ospd_openvas.daemon.ResultList.add_scan_alarm_to_list')
896
    def test_result_without_vt_oid(
897
        self, mock_add_scan_alarm_to_list, MockDBClass
898
    ):
899
        w = DummyDaemon()
900
        logging.Logger.warning = Mock()
901
902
        target_element = w.create_xml_target()
903
        targets = OspRequest.process_target_element(target_element)
904
        w.create_scan('123-456', targets, None, [])
905
        w.scan_collection.scans_table['123-456']['results'] = list()
906
        results = ["ALARM||| ||| ||| ||| |||some alarm|||path", None]
907
        MockDBClass.get_result.return_value = results
908
        mock_add_scan_alarm_to_list.return_value = None
909
910
        w.report_openvas_results(MockDBClass, '123-456')
911
912
        assert_called_once(logging.Logger.warning)
913
914
    @patch('ospd_openvas.db.KbDB')
915
    def test_openvas_is_alive_already_stopped(self, mock_db):
916
        w = DummyDaemon()
917
        # mock_psutil = MockPsutil.return_value
918
        mock_db.scan_is_stopped.return_value = True
919
        ret = w.is_openvas_process_alive(mock_db, '1234', 'a1-b2-c3-d4')
920
921
        self.assertTrue(ret)
922
923
    @patch('psutil.Process')
924
    @patch('ospd_openvas.db.KbDB')
925
    def test_openvas_is_alive_still(self, mock_db, mock_psutil):
926
        w = DummyDaemon()
927
        mock_psutil.side_effect = TypeError
928
        mock_db.scan_is_stopped.return_value = False
929
        ret = w.is_openvas_process_alive(mock_db, '1234', 'a1-b2-c3-d3')
930
931
        self.assertFalse(ret)
932
933
    @patch('ospd_openvas.daemon.OSPDaemon.set_scan_progress_batch')
934
    @patch('ospd_openvas.daemon.OSPDaemon.sort_host_finished')
935
    @patch('ospd_openvas.db.KbDB')
936
    def test_report_openvas_scan_status(
937
        self, mock_db, mock_sort_host_finished, mock_set_scan_progress_batch
938
    ):
939
        w = DummyDaemon()
940
941
        mock_set_scan_progress_batch.return_value = None
942
        mock_sort_host_finished.return_value = None
943
        mock_db.get_scan_status.return_value = [
944
            '192.168.0.1/15/1000',
945
            '192.168.0.2/15/0',
946
            '192.168.0.3/15/-1',
947
            '192.168.0.4/1500/1500',
948
        ]
949
950
        target_element = w.create_xml_target()
951
        targets = OspRequest.process_target_element(target_element)
952
953
        w.create_scan('123-456', targets, None, [])
954
        w.report_openvas_scan_status(mock_db, '123-456')
955
956
        mock_set_scan_progress_batch.assert_called_with(
957
            '123-456',
958
            host_progress={
959
                '192.168.0.1': 1,
960
                '192.168.0.3': -1,
961
                '192.168.0.4': 100,
962
            },
963
        )
964
965
        mock_sort_host_finished.assert_called_with(
966
            '123-456', ['192.168.0.3', '192.168.0.4']
967
        )
968
969
970
class TestFilters(TestCase):
971
    def test_format_vt_modification_time(self):
972
        ovformat = OpenVasVtsFilter(None)
973
        td = '1517443741'
974
        formatted = ovformat.format_vt_modification_time(td)
975
        self.assertEqual(formatted, "20180201000901")
976
977
    def test_get_filtered_vts_false(self):
978
        w = DummyDaemon()
979
        vts_collection = ['1234', '1.3.6.1.4.1.25623.1.0.100061']
980
981
        ovfilter = OpenVasVtsFilter(w.nvti)
982
        res = ovfilter.get_filtered_vts_list(
983
            vts_collection, "modification_time<10"
984
        )
985
        self.assertNotIn('1.3.6.1.4.1.25623.1.0.100061', res)
986
987
    def test_get_filtered_vts_true(self):
988
        w = DummyDaemon()
989
        vts_collection = ['1234', '1.3.6.1.4.1.25623.1.0.100061']
990
991
        ovfilter = OpenVasVtsFilter(w.nvti)
992
        res = ovfilter.get_filtered_vts_list(
993
            vts_collection, "modification_time>10"
994
        )
995
        self.assertIn('1.3.6.1.4.1.25623.1.0.100061', res)
996
997
    def test_get_severity_score_v2(self):
998
        w = DummyDaemon()
999
        vtaux = {
1000
            'severities': {
1001
                'severity_type': 'cvss_base_v2',
1002
                'severity_base_vector': 'AV:N/AC:L/Au:N/C:P/I:N/A:N',
1003
            }
1004
        }
1005
1006
        a = w.get_severity_score(vtaux)
1007
        self.assertEqual(w.get_severity_score(vtaux), 5.0)
1008
1009
    def test_get_severity_score_v3(self):
1010
        w = DummyDaemon()
1011
        vtaux = {
1012
            'severities': {
1013
                'severity_type': 'cvss_base_v3',
1014
                'severity_base_vector': 'CVSS:3.0/AV:L/AC:H/PR:H/UI:R/S:U/C:N/I:L/A:L',
1015
            }
1016
        }
1017
1018
        self.assertEqual(w.get_severity_score(vtaux), 2.9)
1019