Issues (23)

tests/test_daemon.py (2 issues)

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
# pylint: disable=invalid-name,line-too-long,no-value-for-parameter
21
22
""" Unit Test for ospd-openvas """
23
24
import io
25
import logging
26
27
from unittest import TestCase
28
from unittest.mock import patch, Mock, MagicMock
29
30
from ospd.protocol import OspRequest
31
32
from tests.dummydaemon import DummyDaemon
33
from tests.helper import assert_called_once
34
35
from ospd_openvas.daemon import (
36
    OSPD_PARAMS,
37
    OpenVasVtsFilter,
38
)
39
from ospd_openvas.openvas import Openvas
40
41
OSPD_PARAMS_OUT = {
42
    'auto_enable_dependencies': {
43
        'type': 'boolean',
44
        'name': 'auto_enable_dependencies',
45
        'default': 1,
46
        'mandatory': 1,
47
        'visible_for_client': True,
48
        'description': 'Automatically enable the plugins that are depended on',
49
    },
50
    'cgi_path': {
51
        'type': 'string',
52
        'name': 'cgi_path',
53
        'default': '/cgi-bin:/scripts',
54
        'mandatory': 1,
55
        'visible_for_client': True,
56
        'description': 'Look for default CGIs in /cgi-bin and /scripts',
57
    },
58
    'checks_read_timeout': {
59
        'type': 'integer',
60
        'name': 'checks_read_timeout',
61
        'default': 5,
62
        'mandatory': 1,
63
        'visible_for_client': True,
64
        'description': (
65
            'Number  of seconds that the security checks will '
66
            + 'wait for when doing a recv()'
67
        ),
68
    },
69
    'non_simult_ports': {
70
        'type': 'string',
71
        'name': 'non_simult_ports',
72
        'default': '139, 445, 3389, Services/irc',
73
        'mandatory': 1,
74
        'visible_for_client': True,
75
        'description': (
76
            'Prevent to make two connections on the same given '
77
            + 'ports at the same time.'
78
        ),
79
    },
80
    'open_sock_max_attempts': {
81
        'type': 'integer',
82
        'name': 'open_sock_max_attempts',
83
        'default': 5,
84
        'mandatory': 0,
85
        'visible_for_client': True,
86
        'description': (
87
            'Number of unsuccessful retries to open the socket '
88
            + 'before to set the port as closed.'
89
        ),
90
    },
91
    'timeout_retry': {
92
        'type': 'integer',
93
        'name': 'timeout_retry',
94
        'default': 5,
95
        'mandatory': 0,
96
        'visible_for_client': True,
97
        'description': (
98
            'Number of retries when a socket connection attempt ' + 'timesout.'
99
        ),
100
    },
101
    'optimize_test': {
102
        'type': 'boolean',
103
        'name': 'optimize_test',
104
        'default': 1,
105
        'mandatory': 0,
106
        'visible_for_client': True,
107
        'description': (
108
            'By default, optimize_test is enabled which means openvas does '
109
            + 'trust the remote host banners and is only launching plugins '
110
            + 'against the services they have been designed to check. '
111
            + 'For example it will check a web server claiming to be IIS only '
112
            + 'for IIS related flaws but will skip plugins testing for Apache '
113
            + 'flaws, and so on. This default behavior is used to optimize '
114
            + 'the scanning performance and to avoid false positives. '
115
            + 'If you are not sure that the banners of the remote host '
116
            + 'have been tampered with, you can disable this option.'
117
        ),
118
    },
119
    'plugins_timeout': {
120
        'type': 'integer',
121
        'name': 'plugins_timeout',
122
        'default': 5,
123
        'mandatory': 0,
124
        'visible_for_client': True,
125
        'description': 'This is the maximum lifetime, in seconds of a plugin.',
126
    },
127
    'report_host_details': {
128
        'type': 'boolean',
129
        'name': 'report_host_details',
130
        'default': 1,
131
        'mandatory': 1,
132
        'visible_for_client': True,
133
        'description': '',
134
    },
135
    'safe_checks': {
136
        'type': 'boolean',
137
        'name': 'safe_checks',
138
        'default': 1,
139
        'mandatory': 1,
140
        'visible_for_client': True,
141
        'description': (
142
            'Disable the plugins with potential to crash '
143
            + 'the remote services'
144
        ),
145
    },
146
    'scanner_plugins_timeout': {
147
        'type': 'integer',
148
        'name': 'scanner_plugins_timeout',
149
        'default': 36000,
150
        'mandatory': 1,
151
        'visible_for_client': True,
152
        'description': 'Like plugins_timeout, but for ACT_SCANNER plugins.',
153
    },
154
    'time_between_request': {
155
        'type': 'integer',
156
        'name': 'time_between_request',
157
        'default': 0,
158
        'mandatory': 0,
159
        'visible_for_client': True,
160
        'description': (
161
            'Allow to set a wait time between two actions '
162
            + '(open, send, close).'
163
        ),
164
    },
165
    'unscanned_closed': {
166
        'type': 'boolean',
167
        'name': 'unscanned_closed',
168
        'default': 1,
169
        'mandatory': 1,
170
        'visible_for_client': True,
171
        'description': '',
172
    },
173
    'unscanned_closed_udp': {
174
        'type': 'boolean',
175
        'name': 'unscanned_closed_udp',
176
        'default': 1,
177
        'mandatory': 1,
178
        'visible_for_client': True,
179
        'description': '',
180
    },
181
    'expand_vhosts': {
182
        'type': 'boolean',
183
        'name': 'expand_vhosts',
184
        'default': 1,
185
        'mandatory': 0,
186
        'visible_for_client': True,
187
        'description': 'Whether to expand the target hosts '
188
        + 'list of vhosts with values gathered from sources '
189
        + 'such as reverse-lookup queries and VT checks '
190
        + 'for SSL/TLS certificates.',
191
    },
192
    'test_empty_vhost': {
193
        'type': 'boolean',
194
        'name': 'test_empty_vhost',
195
        'default': 0,
196
        'mandatory': 0,
197
        'visible_for_client': True,
198
        'description': 'If  set  to  yes, the scanner will '
199
        + 'also test the target by using empty vhost value '
200
        + 'in addition to the targets associated vhost values.',
201
    },
202
    'max_hosts': {
203
        'type': 'integer',
204
        'name': 'max_hosts',
205
        'default': 30,
206
        'mandatory': 0,
207
        'visible_for_client': False,
208
        'description': (
209
            'The maximum number of hosts to test at the same time which '
210
            + 'should be given to the client (which can override it). '
211
            + 'This value must be computed given your bandwidth, '
212
            + 'the number of hosts you want to test, your amount of '
213
            + 'memory and the performance of your processor(s).'
214
        ),
215
    },
216
    'max_checks': {
217
        'type': 'integer',
218
        'name': 'max_checks',
219
        'default': 10,
220
        'mandatory': 0,
221
        'visible_for_client': False,
222
        'description': (
223
            'The number of plugins that will run against each host being '
224
            + 'tested. Note that the total number of process will be max '
225
            + 'checks x max_hosts so you need to find a balance between '
226
            + 'these two options. Note that launching too many plugins at '
227
            + 'the same time may disable the remote host, either temporarily '
228
            + '(ie: inetd closes its ports) or definitely (the remote host '
229
            + 'crash because it is asked to do too many things at the '
230
            + 'same time), so be careful.'
231
        ),
232
    },
233
    'port_range': {
234
        'type': 'string',
235
        'name': 'port_range',
236
        'default': '',
237
        'mandatory': 0,
238
        'visible_for_client': False,
239
        'description': (
240
            'This is the default range of ports that the scanner plugins will '
241
            + 'probe. The syntax of this option is flexible, it can be a '
242
            + 'single range ("1-1500"), several ports ("21,23,80"), several '
243
            + 'ranges of ports ("1-1500,32000-33000"). Note that you can '
244
            + 'specify UDP and TCP ports by prefixing each range by T or U. '
245
            + 'For instance, the following range will make openvas scan UDP '
246
            + 'ports 1 to 1024 and TCP ports 1 to 65535 : '
247
            + '"T:1-65535,U:1-1024".'
248
        ),
249
    },
250
    'test_alive_hosts_only': {
251
        'type': 'boolean',
252
        'name': 'test_alive_hosts_only',
253
        'default': 0,
254
        'mandatory': 0,
255
        'visible_for_client': False,
256
        'description': (
257
            'If this option is set, openvas will scan the target list for '
258
            + 'alive hosts in a separate process while only testing those '
259
            + 'hosts which are identified as alive. This boosts the scan '
260
            + 'speed of target ranges with a high amount of dead hosts '
261
            + 'significantly.'
262
        ),
263
    },
264
    'hosts_allow': {
265
        'type': 'string',
266
        'name': 'hosts_allow',
267
        'default': '',
268
        'mandatory': 0,
269
        'visible_for_client': False,
270
        'description': (
271
            'Comma-separated list of the only targets that are authorized '
272
            + 'to be scanned. Supports the same syntax as the list targets. '
273
            + 'Both target hostnames and the address to which they resolve '
274
            + 'are checked. Hostnames in hosts_allow list are not resolved '
275
            + 'however.'
276
        ),
277
    },
278
    'hosts_deny': {
279
        'type': 'string',
280
        'name': 'hosts_deny',
281
        'default': '',
282
        'mandatory': 0,
283
        'visible_for_client': False,
284
        'description': (
285
            'Comma-separated list of targets that are not authorized to '
286
            + 'be scanned. Supports the same syntax as the list targets. '
287
            + 'Both target hostnames and the address to which they resolve '
288
            + 'are checked. Hostnames in hosts_deny list are not '
289
            + 'resolved however.'
290
        ),
291
    },
292
    'results_per_host': {
293
        'type': 'integer',
294
        'name': 'results_per_host',
295
        'default': 10,
296
        'mandatory': 0,
297
        'visible_for_client': True,
298
        'description': (
299
            'Amount of fake results generated per each host in the target '
300
            + 'list for a dry run scan.'
301
        ),
302
    },
303
}
304
305
306
class TestOspdOpenvas(TestCase):
307
    @patch('ospd_openvas.daemon.Openvas')
308
    def test_set_params_from_openvas_settings(self, mock_openvas: Openvas):
309
        mock_openvas.get_settings.return_value = {
310
            'non_simult_ports': '139, 445, 3389, Services/irc',
311
            'plugins_folder': '/foo/bar',
312
        }
313
        w = DummyDaemon()
314
        w.set_params_from_openvas_settings()
315
316
        self.assertEqual(mock_openvas.get_settings.call_count, 1)
317
        self.assertEqual(OSPD_PARAMS, OSPD_PARAMS_OUT)
318
        self.assertEqual(w.scan_only_params.get('plugins_folder'), '/foo/bar')
319
320
    @patch('ospd_openvas.daemon.Openvas')
321
    def test_sudo_available(self, mock_openvas):
322
        mock_openvas.check_sudo.return_value = True
323
324
        w = DummyDaemon()
325
        w._sudo_available = None  # pylint: disable=protected-access
326
        w._is_running_as_root = False  # pylint: disable=protected-access
327
328
        self.assertTrue(w.sudo_available)
329
330
    def test_get_custom_xml(self):
331
        out = (
332
            '<custom>'
333
            '<required_ports>Services/www, 80</required_ports>'
334
            '<category>3</category>'
335
            '<excluded_keys>Settings/disable_cgi_scanning</excluded_keys>'
336
            '<family>Product detection</family>'
337
            '<filename>mantis_detect.nasl</filename>'
338
            '<timeout>0</timeout>'
339
            '</custom>'
340
        )
341
        w = DummyDaemon()
342
343
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
344
        res = w.get_custom_vt_as_xml_str(
345
            '1.3.6.1.4.1.25623.1.0.100061', vt.get('custom')
346
        )
347
        self.assertEqual(len(res), len(out))
348
349
    def test_get_custom_xml_failed(self):
350
        w = DummyDaemon()
351
        logging.Logger.warning = Mock()
352
353
        custom = {'a': u"\u0006"}
354
        w.get_custom_vt_as_xml_str(
355
            '1.3.6.1.4.1.25623.1.0.100061', custom=custom
356
        )
357
358
        assert_called_once(logging.Logger.warning)
359
360
    def test_get_severities_xml(self):
361
        w = DummyDaemon()
362
363
        out = (
364
            '<severities>'
365
            '<severity type="cvss_base_v2">'
366
            '<value>AV:N/AC:L/Au:N/C:N/I:N/A:N</value>'
367
            '<origin>Greenbone</origin>'
368
            '<date>1237458156</date>'
369
            '</severity>'
370
            '</severities>'
371
        )
372
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
373
        severities = vt.get('severities')
374
        res = w.get_severities_vt_as_xml_str(
375
            '1.3.6.1.4.1.25623.1.0.100061', severities
376
        )
377
378
        self.assertEqual(res, out)
379
380
    def test_get_severities_xml_failed(self):
381
        w = DummyDaemon()
382
        logging.Logger.warning = Mock()
383
384
        sever = {'severity_base_vector': u"\u0006"}
385
        w.get_severities_vt_as_xml_str(
386
            '1.3.6.1.4.1.25623.1.0.100061', severities=sever
387
        )
388
389
        assert_called_once(logging.Logger.warning)
390
391
    def test_get_params_xml(self):
392
        w = DummyDaemon()
393
        out = (
394
            '<params>'
395
            '<param type="checkbox" id="2">'
396
            '<name>Do not randomize the  order  in  which ports are '
397
            'scanned</name>'
398
            '<default>no</default>'
399
            '</param>'
400
            '<param type="entry" id="1">'
401
            '<name>Data length :</name>'
402
            '</param>'
403
            '</params>'
404
        )
405
406
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
407
        params = vt.get('vt_params')
408
        res = w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params)
409
410
        self.assertEqual(len(res), len(out))
411
412
    def test_get_params_xml_failed(self):
413
        w = DummyDaemon()
414
        logging.Logger.warning = Mock()
415
416
        params = {
417
            '1': {
418
                'id': '1',
419
                'type': 'entry',
420
                'default': u'\u0006',
421
                'name': 'dns-fuzz.timelimit',
422
                'description': 'Description',
423
            }
424
        }
425
        w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params)
426
427
        assert_called_once(logging.Logger.warning)
428
429
    def test_get_refs_xml(self):
430
        w = DummyDaemon()
431
432
        out = '<refs><ref type="url" id="http://www.mantisbt.org/"/></refs>'
433
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
434
        refs = vt.get('vt_refs')
435
        res = w.get_refs_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', refs)
436
437
        self.assertEqual(res, out)
438
439
    def test_get_dependencies_xml(self):
440
        w = DummyDaemon()
441
442
        out = (
443
            '<dependencies>'
444
            '<dependency vt_id="1.3.6.1.4.1.25623.1.2.3.4"/>'
445
            '<dependency vt_id="1.3.6.1.4.1.25623.4.3.2.1"/>'
446
            '</dependencies>'
447
        )
448
        dep = ['1.3.6.1.4.1.25623.1.2.3.4', '1.3.6.1.4.1.25623.4.3.2.1']
449
        res = w.get_dependencies_vt_as_xml_str(
450
            '1.3.6.1.4.1.25623.1.0.100061', dep
451
        )
452
453
        self.assertEqual(res, out)
454
455
    def test_get_dependencies_xml_missing_dep(self):
456
        w = DummyDaemon()
457
458
        out = (
459
            '<dependencies>'
460
            '<dependency vt_id="1.3.6.1.4.1.25623.1.2.3.4"/>'
461
            '</dependencies>'
462
        )
463
        dep = ['1.3.6.1.4.1.25623.1.2.3.4', 'file_name.nasl']
464
        res = w.get_dependencies_vt_as_xml_str(
465
            '1.3.6.1.4.1.25623.1.0.100061', dep
466
        )
467
468
        self.assertEqual(res, out)
469
470
    def test_get_dependencies_xml_failed(self):
471
        w = DummyDaemon()
472
        logging.Logger.error = Mock()
473
474
        dep = [u"\u0006"]
475
        w.get_dependencies_vt_as_xml_str(
476
            '1.3.6.1.4.1.25623.1.0.100061', vt_dependencies=dep
477
        )
478
479
        assert_called_once(logging.Logger.error)
480
481
    def test_get_ctime_xml(self):
482
        w = DummyDaemon()
483
484
        out = '<creation_time>1237458156</creation_time>'
485
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
486
        ctime = vt.get('creation_time')
487
        res = w.get_creation_time_vt_as_xml_str(
488
            '1.3.6.1.4.1.25623.1.0.100061', ctime
489
        )
490
491
        self.assertEqual(res, out)
492
493
    def test_get_ctime_xml_failed(self):
494
        w = DummyDaemon()
495
        logging.Logger.warning = Mock()
496
497
        ctime = u'\u0006'
498
        w.get_creation_time_vt_as_xml_str(
499
            '1.3.6.1.4.1.25623.1.0.100061', vt_creation_time=ctime
500
        )
501
502
        assert_called_once(logging.Logger.warning)
503
504
    def test_get_mtime_xml(self):
505
        w = DummyDaemon()
506
507
        out = '<modification_time>1533906565</modification_time>'
508
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
509
        mtime = vt.get('modification_time')
510
        res = w.get_modification_time_vt_as_xml_str(
511
            '1.3.6.1.4.1.25623.1.0.100061', mtime
512
        )
513
514
        self.assertEqual(res, out)
515
516
    def test_get_mtime_xml_failed(self):
517
        w = DummyDaemon()
518
        logging.Logger.warning = Mock()
519
520
        mtime = u'\u0006'
521
        w.get_modification_time_vt_as_xml_str(
522
            '1.3.6.1.4.1.25623.1.0.100061', mtime
523
        )
524
525
        assert_called_once(logging.Logger.warning)
526
527
    def test_get_summary_xml(self):
528
        w = DummyDaemon()
529
530
        out = '<summary>some summary</summary>'
531
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
532
        summary = vt.get('summary')
533
        res = w.get_summary_vt_as_xml_str(
534
            '1.3.6.1.4.1.25623.1.0.100061', summary
535
        )
536
537
        self.assertEqual(res, out)
538
539
    def test_get_summary_xml_failed(self):
540
        w = DummyDaemon()
541
542
        summary = u'\u0006'
543
        logging.Logger.warning = Mock()
544
        w.get_summary_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', summary)
545
546
        assert_called_once(logging.Logger.warning)
547
548
    def test_get_impact_xml(self):
549
        w = DummyDaemon()
550
551
        out = '<impact>some impact</impact>'
552
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
553
        impact = vt.get('impact')
554
        res = w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact)
555
556
        self.assertEqual(res, out)
557
558
    def test_get_impact_xml_failed(self):
559
        w = DummyDaemon()
560
        logging.Logger.warning = Mock()
561
562
        impact = u'\u0006'
563
        w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact)
564
565
        assert_called_once(logging.Logger.warning)
566
567
    def test_get_insight_xml(self):
568
        w = DummyDaemon()
569
570
        out = '<insight>some insight</insight>'
571
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
572
        insight = vt.get('insight')
573
        res = w.get_insight_vt_as_xml_str(
574
            '1.3.6.1.4.1.25623.1.0.100061', insight
575
        )
576
577
        self.assertEqual(res, out)
578
579
    def test_get_insight_xml_failed(self):
580
        w = DummyDaemon()
581
        logging.Logger.warning = Mock()
582
583
        insight = u'\u0006'
584
        w.get_insight_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', insight)
585
586
        assert_called_once(logging.Logger.warning)
587
588
    def test_get_solution_xml(self):
589
        w = DummyDaemon()
590
591
        out = (
592
            '<solution type="WillNotFix" method="DebianAPTUpgrade">'
593
            'some solution'
594
            '</solution>'
595
        )
596
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
597
        solution = vt.get('solution')
598
        solution_type = vt.get('solution_type')
599
        solution_method = vt.get('solution_method')
600
601
        res = w.get_solution_vt_as_xml_str(
602
            '1.3.6.1.4.1.25623.1.0.100061',
603
            solution,
604
            solution_type,
605
            solution_method,
606
        )
607
608
        self.assertEqual(res, out)
609
610
    def test_get_solution_xml_failed(self):
611
        w = DummyDaemon()
612
        logging.Logger.warning = Mock()
613
614
        solution = u'\u0006'
615
        w.get_solution_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', solution)
616
617
        assert_called_once(logging.Logger.warning)
618
619
    def test_get_detection_xml(self):
620
        w = DummyDaemon()
621
622
        out = '<detection qod_type="remote_banner"/>'
623
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
624
        detection_type = vt.get('qod_type')
625
626
        res = w.get_detection_vt_as_xml_str(
627
            '1.3.6.1.4.1.25623.1.0.100061', qod_type=detection_type
628
        )
629
630
        self.assertEqual(res, out)
631
632
    def test_get_detection_xml_failed(self):
633
        w = DummyDaemon()
634
        logging.Logger.warning = Mock()
635
636
        detection = u'\u0006'
637
        w.get_detection_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', detection)
638
639
        assert_called_once(logging.Logger.warning)
640
641
    def test_get_affected_xml(self):
642
        w = DummyDaemon()
643
        out = '<affected>some affection</affected>'
644
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
645
        affected = vt.get('affected')
646
647
        res = w.get_affected_vt_as_xml_str(
648
            '1.3.6.1.4.1.25623.1.0.100061', affected=affected
649
        )
650
651
        self.assertEqual(res, out)
652
653
    def test_get_affected_xml_failed(self):
654
        w = DummyDaemon()
655
        logging.Logger.warning = Mock()
656
657
        affected = u"\u0006" + "affected"
658
        w.get_affected_vt_as_xml_str(
659
            '1.3.6.1.4.1.25623.1.0.100061', affected=affected
660
        )
661
662
        assert_called_once(logging.Logger.warning)
663
664
    @patch('ospd_openvas.daemon.Path.exists')
665
    @patch('ospd_openvas.daemon.OSPDopenvas.set_params_from_openvas_settings')
666
    def test_feed_is_outdated_none(
667
        self, mock_set_params: MagicMock, mock_path_exists: MagicMock
668
    ):
669
        w = DummyDaemon()
670
671
        w.scan_only_params['plugins_folder'] = '/foo/bar'
672
673
        # Return None
674
        mock_path_exists.return_value = False
675
676
        ret = w.feed_is_outdated('1234')
677
        self.assertIsNone(ret)
678
679
        self.assertEqual(mock_set_params.call_count, 1)
680
        self.assertEqual(mock_path_exists.call_count, 1)
681
682
    @patch('ospd_openvas.daemon.Path.exists')
683
    @patch('ospd_openvas.daemon.Path.open')
684
    def test_feed_is_outdated_true(
685
        self,
686
        mock_path_open: MagicMock,
687
        mock_path_exists: MagicMock,
688
    ):
689
        read_data = 'PLUGIN_SET = "1235";'
690
691
        mock_path_exists.return_value = True
692
        mock_read = MagicMock(name='Path open context manager')
693
        mock_read.__enter__ = MagicMock(return_value=io.StringIO(read_data))
694
        mock_path_open.return_value = mock_read
695
696
        w = DummyDaemon()
697
698
        # Return True
699
        w.scan_only_params['plugins_folder'] = '/foo/bar'
700
701
        ret = w.feed_is_outdated('1234')
702
        self.assertTrue(ret)
703
704
        self.assertEqual(mock_path_exists.call_count, 1)
705
        self.assertEqual(mock_path_open.call_count, 1)
706
707
    @patch('ospd_openvas.daemon.Path.exists')
708
    @patch('ospd_openvas.daemon.Path.open')
709
    def test_feed_is_outdated_false(
710
        self,
711
        mock_path_open: MagicMock,
712
        mock_path_exists: MagicMock,
713
    ):
714
        mock_path_exists.return_value = True
715
716
        read_data = 'PLUGIN_SET = "1234"'
717
        mock_path_exists.return_value = True
718
        mock_read = MagicMock(name='Path open context manager')
719
        mock_read.__enter__ = MagicMock(return_value=io.StringIO(read_data))
720
        mock_path_open.return_value = mock_read
721
722
        w = DummyDaemon()
723
        w.scan_only_params['plugins_folder'] = '/foo/bar'
724
725
        ret = w.feed_is_outdated('1234')
726
        self.assertFalse(ret)
727
728
        self.assertEqual(mock_path_exists.call_count, 1)
729
        self.assertEqual(mock_path_open.call_count, 1)
730
731
    def test_check_feed_cache_unavailable(self):
732
        w = DummyDaemon()
733
        w.vts.is_cache_available = False
734
        w.feed_is_outdated = Mock()
735
736
        w.feed_is_outdated.assert_not_called()
737
738 View Code Duplication
    @patch('ospd_openvas.daemon.BaseDB')
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
739
    @patch('ospd_openvas.daemon.ResultList.add_scan_log_to_list')
740
    def test_get_openvas_result(self, mock_add_scan_log_to_list, MockDBClass):
741
        w = DummyDaemon()
742
743
        target_element = w.create_xml_target()
744
        targets = OspRequest.process_target_element(target_element)
745
        w.create_scan('123-456', targets, None, [])
746
747
        results = [
748
            "LOG|||192.168.0.1|||localhost|||general/Host_Details||||||Host"
749
            " dead",
750
        ]
751
        MockDBClass.get_result.return_value = results
752
        mock_add_scan_log_to_list.return_value = None
753
754
        w.report_openvas_results(MockDBClass, '123-456')
755
        mock_add_scan_log_to_list.assert_called_with(
756
            host='192.168.0.1',
757
            hostname='localhost',
758
            name='',
759
            port='general/Host_Details',
760
            qod='',
761
            test_id='',
762
            uri='',
763
            value='Host dead',
764
        )
765
766 View Code Duplication
    @patch('ospd_openvas.daemon.BaseDB')
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
767
    @patch('ospd_openvas.daemon.ResultList.add_scan_error_to_list')
768
    def test_get_openvas_result_host_deny(
769
        self, mock_add_scan_error_to_list, MockDBClass
770
    ):
771
        w = DummyDaemon()
772
773
        target_element = w.create_xml_target()
774
        targets = OspRequest.process_target_element(target_element)
775
        w.create_scan('123-456', targets, None, [])
776
777
        results = [
778
            "ERRMSG|||127.0.0.1|||localhost|||||||||Host access denied.",
779
        ]
780
        MockDBClass.get_result.return_value = results
781
        mock_add_scan_error_to_list.return_value = None
782
783
        w.report_openvas_results(MockDBClass, '123-456')
784
        mock_add_scan_error_to_list.assert_called_with(
785
            host='127.0.0.1',
786
            hostname='localhost',
787
            name='',
788
            port='',
789
            test_id='',
790
            uri='',
791
            value='Host access denied.',
792
        )
793
794
    @patch('ospd_openvas.daemon.BaseDB')
795
    def test_get_openvas_result_dead_hosts(self, MockDBClass):
796
        w = DummyDaemon()
797
        target_element = w.create_xml_target()
798
        targets = OspRequest.process_target_element(target_element)
799
        w.create_scan('123-456', targets, None, [])
800
801
        results = [
802
            "DEADHOST||| ||| ||| ||| |||4",
803
        ]
804
        MockDBClass.get_result.return_value = results
805
        w.scan_collection.set_amount_dead_hosts = MagicMock()
806
807
        w.report_openvas_results(MockDBClass, '123-456')
808
        w.scan_collection.set_amount_dead_hosts.assert_called_with(
809
            '123-456',
810
            total_dead=4,
811
        )
812
813
    @patch('ospd_openvas.daemon.BaseDB')
814
    @patch('ospd_openvas.daemon.ResultList.add_scan_log_to_list')
815
    def test_get_openvas_result_host_start(
816
        self, mock_add_scan_log_to_list, MockDBClass
817
    ):
818
        w = DummyDaemon()
819
        target_element = w.create_xml_target()
820
        targets = OspRequest.process_target_element(target_element)
821
        w.create_scan('123-456', targets, None, [])
822
823
        results = [
824
            "HOST_START|||192.168.10.124||| ||| ||||||today 1",
825
        ]
826
827
        MockDBClass.get_result.return_value = results
828
        mock_add_scan_log_to_list.return_value = None
829
830
        w.report_openvas_results(MockDBClass, '123-456')
831
832
        mock_add_scan_log_to_list.assert_called_with(
833
            host='192.168.10.124',
834
            name='HOST_START',
835
            value='today 1',
836
        )
837
838
    @patch('ospd_openvas.daemon.BaseDB')
839
    def test_get_openvas_result_hosts_count(self, MockDBClass):
840
        w = DummyDaemon()
841
        target_element = w.create_xml_target()
842
        targets = OspRequest.process_target_element(target_element)
843
        w.create_scan('123-456', targets, None, [])
844
845
        results = [
846
            "HOSTS_COUNT||| ||| ||| ||| |||4",
847
        ]
848
        MockDBClass.get_result.return_value = results
849
        w.set_scan_total_hosts = MagicMock()
850
851
        w.report_openvas_results(MockDBClass, '123-456')
852
        w.set_scan_total_hosts.assert_called_with(
853
            '123-456',
854
            4,
855
        )
856
857
    @patch('ospd_openvas.daemon.BaseDB')
858
    @patch('ospd_openvas.daemon.ResultList.add_scan_alarm_to_list')
859
    def test_result_without_vt_oid(
860
        self, mock_add_scan_alarm_to_list, MockDBClass
861
    ):
862
        w = DummyDaemon()
863
        logging.Logger.warning = Mock()
864
865
        target_element = w.create_xml_target()
866
        targets = OspRequest.process_target_element(target_element)
867
        w.create_scan('123-456', targets, None, [])
868
        w.scan_collection.scans_table['123-456']['results'] = list()
869
        results = ["ALARM||| ||| ||| ||| |||some alarm|||path", None]
870
        MockDBClass.get_result.return_value = results
871
        mock_add_scan_alarm_to_list.return_value = None
872
873
        w.report_openvas_results(MockDBClass, '123-456')
874
875
        assert_called_once(logging.Logger.warning)
876
877
    @patch('psutil.Popen')
878
    def test_openvas_is_alive_already_stopped(self, mock_process):
879
        w = DummyDaemon()
880
881
        mock_process.is_running.return_value = True
882
        ret = w.is_openvas_process_alive(mock_process)
883
        self.assertTrue(ret)
884
885
    @patch('psutil.Popen')
886
    def test_openvas_is_alive_still(self, mock_process):
887
        w = DummyDaemon()
888
889
        mock_process.is_running.return_value = False
890
        ret = w.is_openvas_process_alive(mock_process)
891
        self.assertFalse(ret)
892
893
    @patch('ospd_openvas.daemon.OSPDaemon.set_scan_progress_batch')
894
    @patch('ospd_openvas.daemon.OSPDaemon.sort_host_finished')
895
    @patch('ospd_openvas.db.KbDB')
896
    def test_report_openvas_scan_status(
897
        self, mock_db, mock_sort_host_finished, mock_set_scan_progress_batch
898
    ):
899
        w = DummyDaemon()
900
901
        mock_set_scan_progress_batch.return_value = None
902
        mock_sort_host_finished.return_value = None
903
        mock_db.get_scan_status.return_value = [
904
            '192.168.0.1/15/1000',
905
            '192.168.0.2/15/0',
906
            '192.168.0.3/15/-1',
907
            '192.168.0.4/1500/1500',
908
        ]
909
910
        target_element = w.create_xml_target()
911
        targets = OspRequest.process_target_element(target_element)
912
913
        w.create_scan('123-456', targets, None, [])
914
        w.report_openvas_scan_status(mock_db, '123-456')
915
916
        mock_set_scan_progress_batch.assert_called_with(
917
            '123-456',
918
            host_progress={
919
                '192.168.0.1': 1,
920
                '192.168.0.3': -1,
921
                '192.168.0.4': 100,
922
            },
923
        )
924
925
        mock_sort_host_finished.assert_called_with(
926
            '123-456', ['192.168.0.3', '192.168.0.4']
927
        )
928
929
930
class TestFilters(TestCase):
931
    def test_format_vt_modification_time(self):
932
        ovformat = OpenVasVtsFilter(None)
933
        td = '1517443741'
934
        formatted = ovformat.format_vt_modification_time(td)
935
        self.assertEqual(formatted, "20180201000901")
936
937
    def test_get_filtered_vts_false(self):
938
        w = DummyDaemon()
939
        vts_collection = ['1234', '1.3.6.1.4.1.25623.1.0.100061']
940
941
        ovfilter = OpenVasVtsFilter(w.nvti)
942
        res = ovfilter.get_filtered_vts_list(
943
            vts_collection, "modification_time<10"
944
        )
945
        self.assertNotIn('1.3.6.1.4.1.25623.1.0.100061', res)
946
947
    def test_get_filtered_vts_true(self):
948
        w = DummyDaemon()
949
        vts_collection = ['1234', '1.3.6.1.4.1.25623.1.0.100061']
950
951
        ovfilter = OpenVasVtsFilter(w.nvti)
952
        res = ovfilter.get_filtered_vts_list(
953
            vts_collection, "modification_time>10"
954
        )
955
        self.assertIn('1.3.6.1.4.1.25623.1.0.100061', res)
956