Passed
Pull Request — master (#374)
by
unknown
01:23
created

TestOspdOpenvas.test_update_progress()   A

Complexity

Conditions 1

Size

Total Lines 15
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 11
nop 2
dl 0
loc 15
rs 9.85
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
# pylint: disable=invalid-name,line-too-long,no-value-for-parameter
21
22
""" Unit Test for ospd-openvas """
23
24
import io
25
import logging
26
27
from unittest import TestCase
28
from unittest.mock import patch, Mock, MagicMock
29
30
from ospd.vts import Vts
31
from ospd.protocol import OspRequest
32
33
from tests.dummydaemon import DummyDaemon
34
from tests.helper import assert_called_once
35
36
from ospd_openvas.daemon import OSPD_PARAMS, OpenVasVtsFilter
37
from ospd_openvas.openvas import Openvas
38
39
OSPD_PARAMS_OUT = {
40
    'auto_enable_dependencies': {
41
        'type': 'boolean',
42
        'name': 'auto_enable_dependencies',
43
        'default': 1,
44
        'mandatory': 1,
45
        'visible_for_client': True,
46
        'description': 'Automatically enable the plugins that are depended on',
47
    },
48
    'cgi_path': {
49
        'type': 'string',
50
        'name': 'cgi_path',
51
        'default': '/cgi-bin:/scripts',
52
        'mandatory': 1,
53
        'visible_for_client': True,
54
        'description': 'Look for default CGIs in /cgi-bin and /scripts',
55
    },
56
    'checks_read_timeout': {
57
        'type': 'integer',
58
        'name': 'checks_read_timeout',
59
        'default': 5,
60
        'mandatory': 1,
61
        'visible_for_client': True,
62
        'description': (
63
            'Number  of seconds that the security checks will '
64
            + 'wait for when doing a recv()'
65
        ),
66
    },
67
    'non_simult_ports': {
68
        'type': 'string',
69
        'name': 'non_simult_ports',
70
        'default': '139, 445, 3389, Services/irc',
71
        'mandatory': 1,
72
        'visible_for_client': True,
73
        'description': (
74
            'Prevent to make two connections on the same given '
75
            + 'ports at the same time.'
76
        ),
77
    },
78
    'open_sock_max_attempts': {
79
        'type': 'integer',
80
        'name': 'open_sock_max_attempts',
81
        'default': 5,
82
        'mandatory': 0,
83
        'visible_for_client': True,
84
        'description': (
85
            'Number of unsuccessful retries to open the socket '
86
            + 'before to set the port as closed.'
87
        ),
88
    },
89
    'timeout_retry': {
90
        'type': 'integer',
91
        'name': 'timeout_retry',
92
        'default': 5,
93
        'mandatory': 0,
94
        'visible_for_client': True,
95
        'description': (
96
            'Number of retries when a socket connection attempt ' + 'timesout.'
97
        ),
98
    },
99
    'optimize_test': {
100
        'type': 'boolean',
101
        'name': 'optimize_test',
102
        'default': 1,
103
        'mandatory': 0,
104
        'visible_for_client': True,
105
        'description': (
106
            'By default, optimize_test is enabled which means openvas does '
107
            + 'trust the remote host banners and is only launching plugins '
108
            + 'against the services they have been designed to check. '
109
            + 'For example it will check a web server claiming to be IIS only '
110
            + 'for IIS related flaws but will skip plugins testing for Apache '
111
            + 'flaws, and so on. This default behavior is used to optimize '
112
            + 'the scanning performance and to avoid false positives. '
113
            + 'If you are not sure that the banners of the remote host '
114
            + 'have been tampered with, you can disable this option.'
115
        ),
116
    },
117
    'plugins_timeout': {
118
        'type': 'integer',
119
        'name': 'plugins_timeout',
120
        'default': 5,
121
        'mandatory': 0,
122
        'visible_for_client': True,
123
        'description': 'This is the maximum lifetime, in seconds of a plugin.',
124
    },
125
    'report_host_details': {
126
        'type': 'boolean',
127
        'name': 'report_host_details',
128
        'default': 1,
129
        'mandatory': 1,
130
        'visible_for_client': True,
131
        'description': '',
132
    },
133
    'safe_checks': {
134
        'type': 'boolean',
135
        'name': 'safe_checks',
136
        'default': 1,
137
        'mandatory': 1,
138
        'visible_for_client': True,
139
        'description': (
140
            'Disable the plugins with potential to crash '
141
            + 'the remote services'
142
        ),
143
    },
144
    'scanner_plugins_timeout': {
145
        'type': 'integer',
146
        'name': 'scanner_plugins_timeout',
147
        'default': 36000,
148
        'mandatory': 1,
149
        'visible_for_client': True,
150
        'description': 'Like plugins_timeout, but for ACT_SCANNER plugins.',
151
    },
152
    'time_between_request': {
153
        'type': 'integer',
154
        'name': 'time_between_request',
155
        'default': 0,
156
        'mandatory': 0,
157
        'visible_for_client': True,
158
        'description': (
159
            'Allow to set a wait time between two actions '
160
            + '(open, send, close).'
161
        ),
162
    },
163
    'unscanned_closed': {
164
        'type': 'boolean',
165
        'name': 'unscanned_closed',
166
        'default': 1,
167
        'mandatory': 1,
168
        'visible_for_client': True,
169
        'description': '',
170
    },
171
    'unscanned_closed_udp': {
172
        'type': 'boolean',
173
        'name': 'unscanned_closed_udp',
174
        'default': 1,
175
        'mandatory': 1,
176
        'visible_for_client': True,
177
        'description': '',
178
    },
179
    'expand_vhosts': {
180
        'type': 'boolean',
181
        'name': 'expand_vhosts',
182
        'default': 1,
183
        'mandatory': 0,
184
        'visible_for_client': True,
185
        'description': 'Whether to expand the target hosts '
186
        + 'list of vhosts with values gathered from sources '
187
        + 'such as reverse-lookup queries and VT checks '
188
        + 'for SSL/TLS certificates.',
189
    },
190
    'test_empty_vhost': {
191
        'type': 'boolean',
192
        'name': 'test_empty_vhost',
193
        'default': 0,
194
        'mandatory': 0,
195
        'visible_for_client': True,
196
        'description': 'If  set  to  yes, the scanner will '
197
        + 'also test the target by using empty vhost value '
198
        + 'in addition to the targets associated vhost values.',
199
    },
200
    'max_hosts': {
201
        'type': 'integer',
202
        'name': 'max_hosts',
203
        'default': 30,
204
        'mandatory': 0,
205
        'visible_for_client': False,
206
        'description': (
207
            'The maximum number of hosts to test at the same time which '
208
            + 'should be given to the client (which can override it). '
209
            + 'This value must be computed given your bandwidth, '
210
            + 'the number of hosts you want to test, your amount of '
211
            + 'memory and the performance of your processor(s).'
212
        ),
213
    },
214
    'max_checks': {
215
        'type': 'integer',
216
        'name': 'max_checks',
217
        'default': 10,
218
        'mandatory': 0,
219
        'visible_for_client': False,
220
        'description': (
221
            'The number of plugins that will run against each host being '
222
            + 'tested. Note that the total number of process will be max '
223
            + 'checks x max_hosts so you need to find a balance between '
224
            + 'these two options. Note that launching too many plugins at '
225
            + 'the same time may disable the remote host, either temporarily '
226
            + '(ie: inetd closes its ports) or definitely (the remote host '
227
            + 'crash because it is asked to do too many things at the '
228
            + 'same time), so be careful.'
229
        ),
230
    },
231
    'port_range': {
232
        'type': 'string',
233
        'name': 'port_range',
234
        'default': '',
235
        'mandatory': 0,
236
        'visible_for_client': False,
237
        'description': (
238
            'This is the default range of ports that the scanner plugins will '
239
            + 'probe. The syntax of this option is flexible, it can be a '
240
            + 'single range ("1-1500"), several ports ("21,23,80"), several '
241
            + 'ranges of ports ("1-1500,32000-33000"). Note that you can '
242
            + 'specify UDP and TCP ports by prefixing each range by T or U. '
243
            + 'For instance, the following range will make openvas scan UDP '
244
            + 'ports 1 to 1024 and TCP ports 1 to 65535 : '
245
            + '"T:1-65535,U:1-1024".'
246
        ),
247
    },
248
    'test_alive_hosts_only': {
249
        'type': 'boolean',
250
        'name': 'test_alive_hosts_only',
251
        'default': 0,
252
        'mandatory': 0,
253
        'visible_for_client': False,
254
        'description': (
255
            'If this option is set, openvas will scan the target list for '
256
            + 'alive hosts in a separate process while only testing those '
257
            + 'hosts which are identified as alive. This boosts the scan '
258
            + 'speed of target ranges with a high amount of dead hosts '
259
            + 'significantly.'
260
        ),
261
    },
262
    'source_iface': {
263
        'type': 'string',
264
        'name': 'source_iface',
265
        'default': '',
266
        'mandatory': 0,
267
        'visible_for_client': False,
268
        'description': (
269
            'Name of the network interface that will be used as the source '
270
            + 'of connections established by openvas. The scan won\'t be '
271
            + 'launched if the value isn\'t authorized according to '
272
            + '(sys_)ifaces_allow / (sys_)ifaces_deny if present.'
273
        ),
274
    },
275
    'ifaces_allow': {
276
        'type': 'string',
277
        'name': 'ifaces_allow',
278
        'default': '',
279
        'mandatory': 0,
280
        'visible_for_client': False,
281
        'description': (
282
            'Comma-separated list of interfaces names that are authorized '
283
            + 'as source_iface values.'
284
        ),
285
    },
286
    'ifaces_deny': {
287
        'type': 'string',
288
        'name': 'ifaces_deny',
289
        'default': '',
290
        'mandatory': 0,
291
        'visible_for_client': False,
292
        'description': (
293
            'Comma-separated list of interfaces names that are not '
294
            + 'authorized as source_iface values.'
295
        ),
296
    },
297
    'hosts_allow': {
298
        'type': 'string',
299
        'name': 'hosts_allow',
300
        'default': '',
301
        'mandatory': 0,
302
        'visible_for_client': False,
303
        'description': (
304
            'Comma-separated list of the only targets that are authorized '
305
            + 'to be scanned. Supports the same syntax as the list targets. '
306
            + 'Both target hostnames and the address to which they resolve '
307
            + 'are checked. Hostnames in hosts_allow list are not resolved '
308
            + 'however.'
309
        ),
310
    },
311
    'hosts_deny': {
312
        'type': 'string',
313
        'name': 'hosts_deny',
314
        'default': '',
315
        'mandatory': 0,
316
        'visible_for_client': False,
317
        'description': (
318
            'Comma-separated list of targets that are not authorized to '
319
            + 'be scanned. Supports the same syntax as the list targets. '
320
            + 'Both target hostnames and the address to which they resolve '
321
            + 'are checked. Hostnames in hosts_deny list are not '
322
            + 'resolved however.'
323
        ),
324
    },
325
}
326
327
328
class TestOspdOpenvas(TestCase):
329
    @patch('ospd_openvas.daemon.Openvas')
330
    def test_set_params_from_openvas_settings(self, mock_openvas: Openvas):
331
        mock_openvas.get_settings.return_value = {
332
            'non_simult_ports': '139, 445, 3389, Services/irc',
333
            'plugins_folder': '/foo/bar',
334
        }
335
        w = DummyDaemon()
336
        w.set_params_from_openvas_settings()
337
338
        self.assertEqual(mock_openvas.get_settings.call_count, 1)
339
        self.assertEqual(OSPD_PARAMS, OSPD_PARAMS_OUT)
340
        self.assertEqual(w.scan_only_params.get('plugins_folder'), '/foo/bar')
341
342
    @patch('ospd_openvas.daemon.Openvas')
343
    def test_sudo_available(self, mock_openvas):
344
        mock_openvas.check_sudo.return_value = True
345
346
        w = DummyDaemon()
347
        w._sudo_available = None  # pylint: disable=protected-access
348
        w.sudo_available  # pylint: disable=pointless-statement
349
350
        self.assertTrue(w.sudo_available)
351
352
    def test_get_custom_xml(self):
353
        out = (
354
            '<custom>'
355
            '<required_ports>Services/www, 80</required_ports>'
356
            '<category>3</category>'
357
            '<excluded_keys>Settings/disable_cgi_scanning</excluded_keys>'
358
            '<family>Product detection</family>'
359
            '<filename>mantis_detect.nasl</filename>'
360
            '<timeout>0</timeout>'
361
            '</custom>'
362
        )
363
        w = DummyDaemon()
364
365
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
366
        res = w.get_custom_vt_as_xml_str(
367
            '1.3.6.1.4.1.25623.1.0.100061', vt.get('custom')
368
        )
369
        self.assertEqual(len(res), len(out))
370
371
    def test_get_custom_xml_failed(self):
372
        w = DummyDaemon()
373
        logging.Logger.warning = Mock()
374
375
        custom = {'a': u"\u0006"}
376
        w.get_custom_vt_as_xml_str(
377
            '1.3.6.1.4.1.25623.1.0.100061', custom=custom
378
        )
379
380
        assert_called_once(logging.Logger.warning)
381
382
    def test_get_severities_xml(self):
383
        w = DummyDaemon()
384
385
        out = (
386
            '<severities>'
387
            '<severity type="cvss_base_v2">'
388
            'AV:N/AC:L/Au:N/C:N/I:N/A:N'
389
            '</severity>'
390
            '</severities>'
391
        )
392
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
393
        severities = vt.get('severities')
394
        res = w.get_severities_vt_as_xml_str(
395
            '1.3.6.1.4.1.25623.1.0.100061', severities
396
        )
397
398
        self.assertEqual(res, out)
399
400
    def test_get_severities_xml_failed(self):
401
        w = DummyDaemon()
402
        logging.Logger.warning = Mock()
403
404
        sever = {'severity_base_vector': u"\u0006"}
405
        w.get_severities_vt_as_xml_str(
406
            '1.3.6.1.4.1.25623.1.0.100061', severities=sever
407
        )
408
409
        assert_called_once(logging.Logger.warning)
410
411
    def test_get_params_xml(self):
412
        w = DummyDaemon()
413
        out = (
414
            '<params>'
415
            '<param type="checkbox" id="2">'
416
            '<name>Do not randomize the  order  in  which ports are '
417
            'scanned</name>'
418
            '<default>no</default>'
419
            '</param>'
420
            '<param type="entry" id="1">'
421
            '<name>Data length :</name>'
422
            '</param>'
423
            '</params>'
424
        )
425
426
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
427
        params = vt.get('vt_params')
428
        res = w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params)
429
430
        self.assertEqual(len(res), len(out))
431
432
    def test_get_params_xml_failed(self):
433
        w = DummyDaemon()
434
        logging.Logger.warning = Mock()
435
436
        params = {
437
            '1': {
438
                'id': '1',
439
                'type': 'entry',
440
                'default': u'\u0006',
441
                'name': 'dns-fuzz.timelimit',
442
                'description': 'Description',
443
            }
444
        }
445
        w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params)
446
447
        assert_called_once(logging.Logger.warning)
448
449
    def test_get_refs_xml(self):
450
        w = DummyDaemon()
451
452
        out = '<refs><ref type="url" id="http://www.mantisbt.org/"/></refs>'
453
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
454
        refs = vt.get('vt_refs')
455
        res = w.get_refs_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', refs)
456
457
        self.assertEqual(res, out)
458
459
    def test_get_dependencies_xml(self):
460
        w = DummyDaemon()
461
462
        out = (
463
            '<dependencies>'
464
            '<dependency vt_id="1.3.6.1.4.1.25623.1.2.3.4"/>'
465
            '<dependency vt_id="1.3.6.1.4.1.25623.4.3.2.1"/>'
466
            '</dependencies>'
467
        )
468
        dep = ['1.3.6.1.4.1.25623.1.2.3.4', '1.3.6.1.4.1.25623.4.3.2.1']
469
        res = w.get_dependencies_vt_as_xml_str(
470
            '1.3.6.1.4.1.25623.1.0.100061', dep
471
        )
472
473
        self.assertEqual(res, out)
474
475
    def test_get_dependencies_xml_missing_dep(self):
476
        w = DummyDaemon()
477
478
        out = (
479
            '<dependencies>'
480
            '<dependency vt_id="1.3.6.1.4.1.25623.1.2.3.4"/>'
481
            '</dependencies>'
482
        )
483
        dep = ['1.3.6.1.4.1.25623.1.2.3.4', 'file_name.nasl']
484
        res = w.get_dependencies_vt_as_xml_str(
485
            '1.3.6.1.4.1.25623.1.0.100061', dep
486
        )
487
488
        self.assertEqual(res, out)
489
490
    def test_get_dependencies_xml_failed(self):
491
        w = DummyDaemon()
492
        logging.Logger.error = Mock()
493
494
        dep = [u"\u0006"]
495
        w.get_dependencies_vt_as_xml_str(
496
            '1.3.6.1.4.1.25623.1.0.100061', vt_dependencies=dep
497
        )
498
499
        assert_called_once(logging.Logger.error)
500
501
    def test_get_ctime_xml(self):
502
        w = DummyDaemon()
503
504
        out = '<creation_time>1237458156</creation_time>'
505
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
506
        ctime = vt.get('creation_time')
507
        res = w.get_creation_time_vt_as_xml_str(
508
            '1.3.6.1.4.1.25623.1.0.100061', ctime
509
        )
510
511
        self.assertEqual(res, out)
512
513
    def test_get_ctime_xml_failed(self):
514
        w = DummyDaemon()
515
        logging.Logger.warning = Mock()
516
517
        ctime = u'\u0006'
518
        w.get_creation_time_vt_as_xml_str(
519
            '1.3.6.1.4.1.25623.1.0.100061', vt_creation_time=ctime
520
        )
521
522
        assert_called_once(logging.Logger.warning)
523
524
    def test_get_mtime_xml(self):
525
        w = DummyDaemon()
526
527
        out = '<modification_time>1533906565</modification_time>'
528
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
529
        mtime = vt.get('modification_time')
530
        res = w.get_modification_time_vt_as_xml_str(
531
            '1.3.6.1.4.1.25623.1.0.100061', mtime
532
        )
533
534
        self.assertEqual(res, out)
535
536
    def test_get_mtime_xml_failed(self):
537
        w = DummyDaemon()
538
        logging.Logger.warning = Mock()
539
540
        mtime = u'\u0006'
541
        w.get_modification_time_vt_as_xml_str(
542
            '1.3.6.1.4.1.25623.1.0.100061', mtime
543
        )
544
545
        assert_called_once(logging.Logger.warning)
546
547
    def test_get_summary_xml(self):
548
        w = DummyDaemon()
549
550
        out = '<summary>some summary</summary>'
551
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
552
        summary = vt.get('summary')
553
        res = w.get_summary_vt_as_xml_str(
554
            '1.3.6.1.4.1.25623.1.0.100061', summary
555
        )
556
557
        self.assertEqual(res, out)
558
559
    def test_get_summary_xml_failed(self):
560
        w = DummyDaemon()
561
562
        summary = u'\u0006'
563
        logging.Logger.warning = Mock()
564
        w.get_summary_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', summary)
565
566
        assert_called_once(logging.Logger.warning)
567
568
    def test_get_impact_xml(self):
569
        w = DummyDaemon()
570
571
        out = '<impact>some impact</impact>'
572
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
573
        impact = vt.get('impact')
574
        res = w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact)
575
576
        self.assertEqual(res, out)
577
578
    def test_get_impact_xml_failed(self):
579
        w = DummyDaemon()
580
        logging.Logger.warning = Mock()
581
582
        impact = u'\u0006'
583
        w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact)
584
585
        assert_called_once(logging.Logger.warning)
586
587
    def test_get_insight_xml(self):
588
        w = DummyDaemon()
589
590
        out = '<insight>some insight</insight>'
591
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
592
        insight = vt.get('insight')
593
        res = w.get_insight_vt_as_xml_str(
594
            '1.3.6.1.4.1.25623.1.0.100061', insight
595
        )
596
597
        self.assertEqual(res, out)
598
599
    def test_get_insight_xml_failed(self):
600
        w = DummyDaemon()
601
        logging.Logger.warning = Mock()
602
603
        insight = u'\u0006'
604
        w.get_insight_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', insight)
605
606
        assert_called_once(logging.Logger.warning)
607
608
    def test_get_solution_xml(self):
609
        w = DummyDaemon()
610
611
        out = (
612
            '<solution type="WillNotFix" method="DebianAPTUpgrade">'
613
            'some solution'
614
            '</solution>'
615
        )
616
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
617
        solution = vt.get('solution')
618
        solution_type = vt.get('solution_type')
619
        solution_method = vt.get('solution_method')
620
621
        res = w.get_solution_vt_as_xml_str(
622
            '1.3.6.1.4.1.25623.1.0.100061',
623
            solution,
624
            solution_type,
625
            solution_method,
626
        )
627
628
        self.assertEqual(res, out)
629
630
    def test_get_solution_xml_failed(self):
631
        w = DummyDaemon()
632
        logging.Logger.warning = Mock()
633
634
        solution = u'\u0006'
635
        w.get_solution_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', solution)
636
637
        assert_called_once(logging.Logger.warning)
638
639
    def test_get_detection_xml(self):
640
        w = DummyDaemon()
641
642
        out = '<detection qod_type="remote_banner"/>'
643
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
644
        detection_type = vt.get('qod_type')
645
646
        res = w.get_detection_vt_as_xml_str(
647
            '1.3.6.1.4.1.25623.1.0.100061', qod_type=detection_type
648
        )
649
650
        self.assertEqual(res, out)
651
652
    def test_get_detection_xml_failed(self):
653
        w = DummyDaemon()
654
        logging.Logger.warning = Mock()
655
656
        detection = u'\u0006'
657
        w.get_detection_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', detection)
658
659
        assert_called_once(logging.Logger.warning)
660
661
    def test_get_affected_xml(self):
662
        w = DummyDaemon()
663
        out = '<affected>some affection</affected>'
664
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
665
        affected = vt.get('affected')
666
667
        res = w.get_affected_vt_as_xml_str(
668
            '1.3.6.1.4.1.25623.1.0.100061', affected=affected
669
        )
670
671
        self.assertEqual(res, out)
672
673
    def test_get_affected_xml_failed(self):
674
        w = DummyDaemon()
675
        logging.Logger.warning = Mock()
676
677
        affected = u"\u0006" + "affected"
678
        w.get_affected_vt_as_xml_str(
679
            '1.3.6.1.4.1.25623.1.0.100061', affected=affected
680
        )
681
682
        assert_called_once(logging.Logger.warning)
683
684
    @patch('ospd_openvas.daemon.Path.exists')
685
    @patch('ospd_openvas.daemon.OSPDopenvas.set_params_from_openvas_settings')
686
    def test_feed_is_outdated_none(
687
        self, mock_set_params: MagicMock, mock_path_exists: MagicMock
688
    ):
689
        w = DummyDaemon()
690
691
        w.scan_only_params['plugins_folder'] = '/foo/bar'
692
693
        # Return None
694
        mock_path_exists.return_value = False
695
696
        ret = w.feed_is_outdated('1234')
697
        self.assertIsNone(ret)
698
699
        self.assertEqual(mock_set_params.call_count, 1)
700
        self.assertEqual(mock_path_exists.call_count, 1)
701
702
    @patch('ospd_openvas.daemon.Path.exists')
703
    @patch('ospd_openvas.daemon.Path.open')
704
    def test_feed_is_outdated_true(
705
        self,
706
        mock_path_open: MagicMock,
707
        mock_path_exists: MagicMock,
708
    ):
709
        read_data = 'PLUGIN_SET = "1235";'
710
711
        mock_path_exists.return_value = True
712
        mock_read = MagicMock(name='Path open context manager')
713
        mock_read.__enter__ = MagicMock(return_value=io.StringIO(read_data))
714
        mock_path_open.return_value = mock_read
715
716
        w = DummyDaemon()
717
718
        # Return True
719
        w.scan_only_params['plugins_folder'] = '/foo/bar'
720
721
        ret = w.feed_is_outdated('1234')
722
        self.assertTrue(ret)
723
724
        self.assertEqual(mock_path_exists.call_count, 1)
725
        self.assertEqual(mock_path_open.call_count, 1)
726
727
    @patch('ospd_openvas.daemon.Path.exists')
728
    @patch('ospd_openvas.daemon.Path.open')
729
    def test_feed_is_outdated_false(
730
        self,
731
        mock_path_open: MagicMock,
732
        mock_path_exists: MagicMock,
733
    ):
734
        mock_path_exists.return_value = True
735
736
        read_data = 'PLUGIN_SET = "1234"'
737
        mock_path_exists.return_value = True
738
        mock_read = MagicMock(name='Path open context manager')
739
        mock_read.__enter__ = MagicMock(return_value=io.StringIO(read_data))
740
        mock_path_open.return_value = mock_read
741
742
        w = DummyDaemon()
743
        w.scan_only_params['plugins_folder'] = '/foo/bar'
744
745
        ret = w.feed_is_outdated('1234')
746
        self.assertFalse(ret)
747
748
        self.assertEqual(mock_path_exists.call_count, 1)
749
        self.assertEqual(mock_path_open.call_count, 1)
750
751
    def test_check_feed_cache_unavailable(self):
752
        w = DummyDaemon()
753
        w.vts.is_cache_available = False
754
        w.feed_is_outdated = Mock()
755
        res = w.check_feed()
756
757
        self.assertFalse(res)
758
        w.feed_is_outdated.assert_not_called()
759
760 View Code Duplication
    @patch('ospd_openvas.daemon.ScanDB')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
761
    @patch('ospd_openvas.daemon.ResultList.add_scan_log_to_list')
762
    def test_get_openvas_result(self, mock_add_scan_log_to_list, MockDBClass):
763
        w = DummyDaemon()
764
765
        target_element = w.create_xml_target()
766
        targets = OspRequest.process_target_element(target_element)
767
        w.create_scan('123-456', targets, None, [])
768
769
        results = [
770
            "LOG||||||general/Host_Details||||||Host dead",
771
        ]
772
        MockDBClass.get_result.return_value = results
773
        mock_add_scan_log_to_list.return_value = None
774
775
        w.report_openvas_results(MockDBClass, '123-456', 'localhost')
776
        mock_add_scan_log_to_list.assert_called_with(
777
            host='localhost',
778
            hostname='',
779
            name='',
780
            port='general/Host_Details',
781
            qod='',
782
            test_id='',
783
            uri='',
784
            value='Host dead',
785
        )
786
787 View Code Duplication
    @patch('ospd_openvas.daemon.ScanDB')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
788
    @patch('ospd_openvas.daemon.ResultList.add_scan_error_to_list')
789
    def test_get_openvas_result_host_deny(
790
        self, mock_add_scan_error_to_list, MockDBClass
791
    ):
792
        w = DummyDaemon()
793
794
        target_element = w.create_xml_target()
795
        targets = OspRequest.process_target_element(target_element)
796
        w.create_scan('123-456', targets, None, [])
797
798
        results = [
799
            "ERRMSG|||127.0.0.1|||||||||Host access denied.",
800
        ]
801
        MockDBClass.get_result.return_value = results
802
        mock_add_scan_error_to_list.return_value = None
803
804
        w.report_openvas_results(MockDBClass, '123-456', '')
805
        mock_add_scan_error_to_list.assert_called_with(
806
            host='127.0.0.1',
807
            hostname='127.0.0.1',
808
            name='',
809
            port='',
810
            test_id='',
811
            uri='',
812
            value='Host access denied.',
813
        )
814
815
    @patch('ospd_openvas.daemon.ScanDB')
816
    def test_get_openvas_result_dead_hosts(self, MockDBClass):
817
        w = DummyDaemon()
818
        target_element = w.create_xml_target()
819
        targets = OspRequest.process_target_element(target_element)
820
        w.create_scan('123-456', targets, None, [])
821
822
        results = [
823
            "DEADHOST||| ||| ||| |||4",
824
        ]
825
        MockDBClass.get_result.return_value = results
826
        w.scan_collection.set_amount_dead_hosts = MagicMock()
827
828
        w.report_openvas_results(MockDBClass, '123-456', 'localhost')
829
        w.scan_collection.set_amount_dead_hosts.assert_called_with(
830
            '123-456',
831
            total_dead=4,
832
        )
833
834
    @patch('ospd_openvas.daemon.ScanDB')
835
    def test_get_openvas_result_hosts_count(self, MockDBClass):
836
        w = DummyDaemon()
837
        target_element = w.create_xml_target()
838
        targets = OspRequest.process_target_element(target_element)
839
        w.create_scan('123-456', targets, None, [])
840
841
        results = [
842
            "HOSTS_COUNT||| ||| ||| |||4",
843
        ]
844
        MockDBClass.get_result.return_value = results
845
        w.set_scan_total_hosts = MagicMock()
846
847
        w.report_openvas_results(MockDBClass, '123-456', 'localhost')
848
        w.set_scan_total_hosts.assert_called_with(
849
            '123-456',
850
            4,
851
        )
852
853
    @patch('ospd_openvas.daemon.ScanDB')
854
    @patch('ospd_openvas.daemon.ResultList.add_scan_alarm_to_list')
855
    def test_result_without_vt_oid(
856
        self, mock_add_scan_alarm_to_list, MockDBClass
857
    ):
858
        w = DummyDaemon()
859
        logging.Logger.warning = Mock()
860
861
        target_element = w.create_xml_target()
862
        targets = OspRequest.process_target_element(target_element)
863
        w.create_scan('123-456', targets, None, [])
864
        w.scan_collection.scans_table['123-456']['results'] = list()
865
        results = ["ALARM||| ||| ||| |||some alarm|||path", None]
866
        MockDBClass.get_result.return_value = results
867
        mock_add_scan_alarm_to_list.return_value = None
868
869
        w.report_openvas_results(MockDBClass, '123-456', 'localhost')
870
871
        assert_called_once(logging.Logger.warning)
872
873
    @patch('ospd_openvas.db.KbDB')
874
    def test_openvas_is_alive_already_stopped(self, mock_db):
875
        w = DummyDaemon()
876
        # mock_psutil = MockPsutil.return_value
877
        mock_db.scan_is_stopped.return_value = True
878
        ret = w.is_openvas_process_alive(mock_db, '1234', 'a1-b2-c3-d4')
879
880
        self.assertTrue(ret)
881
882
    @patch('ospd_openvas.db.KbDB')
883
    def test_openvas_is_alive_still(self, mock_db):
884
        w = DummyDaemon()
885
        # mock_psutil = MockPsutil.return_value
886
        mock_db.scan_is_stopped.return_value = False
887
        ret = w.is_openvas_process_alive(mock_db, '1234', 'a1-b2-c3-d4')
888
889
        self.assertFalse(ret)
890
891
    @patch('ospd_openvas.daemon.OSPDaemon.set_scan_host_progress')
892
    def test_update_progress(self, mock_set_scan_host_progress):
893
        w = DummyDaemon()
894
895
        mock_set_scan_host_progress.return_value = None
896
897
        msg = '0/-1'
898
        target_element = w.create_xml_target()
899
        targets = OspRequest.process_target_element(target_element)
900
901
        w.create_scan('123-456', targets, None, [])
902
        w.update_progress('123-456', 'localhost', msg)
903
904
        mock_set_scan_host_progress.assert_called_with(
905
            '123-456', host='localhost', progress=-1
906
        )
907
908
909
class TestFilters(TestCase):
910
    def test_format_vt_modification_time(self):
911
        ovformat = OpenVasVtsFilter(None)
912
        td = '1517443741'
913
        formatted = ovformat.format_vt_modification_time(td)
914
        self.assertEqual(formatted, "20180201000901")
915
916
    def test_get_filtered_vts_false(self):
917
        w = DummyDaemon()
918
        vts_collection = ['1234', '1.3.6.1.4.1.25623.1.0.100061']
919
920
        ovfilter = OpenVasVtsFilter(w.nvti)
921
        res = ovfilter.get_filtered_vts_list(
922
            vts_collection, "modification_time<10"
923
        )
924
        self.assertNotIn('1.3.6.1.4.1.25623.1.0.100061', res)
925
926
    def test_get_filtered_vts_true(self):
927
        w = DummyDaemon()
928
        vts_collection = ['1234', '1.3.6.1.4.1.25623.1.0.100061']
929
930
        ovfilter = OpenVasVtsFilter(w.nvti)
931
        res = ovfilter.get_filtered_vts_list(
932
            vts_collection, "modification_time>10"
933
        )
934
        self.assertIn('1.3.6.1.4.1.25623.1.0.100061', res)
935