Passed
Pull Request — master (#307)
by
unknown
01:05
created

tests.test_daemon   B

Complexity

Total Complexity 42

Size/Duplication

Total Lines 937
Duplicated Lines 5.44 %

Importance

Changes 0
Metric Value
eloc 658
dl 51
loc 937
rs 8.982
c 0
b 0
f 0
wmc 42

42 Methods

Rating   Name   Duplication   Size   Complexity  
A TestOspdOpenvas.test_result_without_vt_oid() 0 19 1
A TestOspdOpenvas.test_get_openvas_result_dead_hosts() 0 16 1
A TestOspdOpenvas.test_report_openvas_scan_status() 0 34 1
A TestOspdOpenvas.test_get_mtime_xml_failed() 0 10 1
A TestOspdOpenvas.test_get_insight_xml() 0 11 1
A TestOspdOpenvas.test_check_feed_cache_unavailable() 0 8 1
A TestOspdOpenvas.test_get_severities_xml_failed() 0 10 1
A TestOspdOpenvas.test_get_dependencies_xml() 0 14 1
A TestFilters.test_format_vt_modification_time() 0 5 1
A TestOspdOpenvas.test_get_summary_xml() 0 11 1
A TestOspdOpenvas.test_get_summary_xml_failed() 0 8 1
A TestOspdOpenvas.test_feed_is_outdated_none() 0 17 1
A TestOspdOpenvas.test_feed_is_outdated_true() 0 22 1
A TestOspdOpenvas.test_get_ctime_xml() 0 11 1
A TestOspdOpenvas.test_get_openvas_result_host_start() 0 21 1
A TestOspdOpenvas.test_get_params_xml_failed() 0 16 1
A TestOspdOpenvas.test_get_solution_xml() 0 21 1
A TestOspdOpenvas.test_get_impact_xml_failed() 0 8 1
A TestOspdOpenvas.test_get_params_xml() 0 20 1
A TestOspdOpenvas.test_feed_is_outdated_false() 0 21 1
A TestOspdOpenvas.test_get_impact_xml() 0 9 1
A TestOspdOpenvas.test_get_insight_xml_failed() 0 8 1
A TestFilters.test_get_filtered_vts_true() 0 9 1
A TestOspdOpenvas.test_get_affected_xml_failed() 0 10 1
A TestOspdOpenvas.test_get_openvas_result_host_deny() 26 26 1
A TestOspdOpenvas.test_get_detection_xml() 0 12 1
A TestOspdOpenvas.test_get_refs_xml() 0 9 1
A TestOspdOpenvas.test_openvas_is_alive_still() 0 8 1
A TestOspdOpenvas.test_get_affected_xml() 0 11 1
A TestOspdOpenvas.test_get_severities_xml() 0 17 1
A TestOspdOpenvas.test_get_dependencies_xml_failed() 0 10 1
A TestOspdOpenvas.test_get_detection_xml_failed() 0 8 1
A TestOspdOpenvas.test_get_custom_xml() 0 18 1
A TestOspdOpenvas.test_get_openvas_result() 25 25 1
A TestOspdOpenvas.test_get_ctime_xml_failed() 0 10 1
A TestOspdOpenvas.test_sudo_available() 0 9 1
A TestOspdOpenvas.test_get_mtime_xml() 0 11 1
A TestOspdOpenvas.test_get_custom_xml_failed() 0 10 1
A TestFilters.test_get_filtered_vts_false() 0 9 1
A TestOspdOpenvas.test_set_params_from_openvas_settings() 0 12 1
A TestOspdOpenvas.test_openvas_is_alive_already_stopped() 0 8 1
A TestOspdOpenvas.test_get_solution_xml_failed() 0 8 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like tests.test_daemon often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
# pylint: disable=invalid-name,line-too-long,no-value-for-parameter
21
22
""" Unit Test for ospd-openvas """
23
24
import io
25
import logging
26
27
from unittest import TestCase
28
from unittest.mock import patch, Mock, MagicMock
29
30
from ospd.vts import Vts
31
from ospd.protocol import OspRequest
32
33
from tests.dummydaemon import DummyDaemon
34
from tests.helper import assert_called_once
35
36
from ospd_openvas.daemon import OSPD_PARAMS, OpenVasVtsFilter
37
from ospd_openvas.openvas import Openvas
38
39
OSPD_PARAMS_OUT = {
40
    'auto_enable_dependencies': {
41
        'type': 'boolean',
42
        'name': 'auto_enable_dependencies',
43
        'default': 1,
44
        'mandatory': 1,
45
        'visible_for_client': True,
46
        'description': 'Automatically enable the plugins that are depended on',
47
    },
48
    'cgi_path': {
49
        'type': 'string',
50
        'name': 'cgi_path',
51
        'default': '/cgi-bin:/scripts',
52
        'mandatory': 1,
53
        'visible_for_client': True,
54
        'description': 'Look for default CGIs in /cgi-bin and /scripts',
55
    },
56
    'checks_read_timeout': {
57
        'type': 'integer',
58
        'name': 'checks_read_timeout',
59
        'default': 5,
60
        'mandatory': 1,
61
        'visible_for_client': True,
62
        'description': (
63
            'Number  of seconds that the security checks will '
64
            + 'wait for when doing a recv()'
65
        ),
66
    },
67
    'non_simult_ports': {
68
        'type': 'string',
69
        'name': 'non_simult_ports',
70
        'default': '139, 445, 3389, Services/irc',
71
        'mandatory': 1,
72
        'visible_for_client': True,
73
        'description': (
74
            'Prevent to make two connections on the same given '
75
            + 'ports at the same time.'
76
        ),
77
    },
78
    'open_sock_max_attempts': {
79
        'type': 'integer',
80
        'name': 'open_sock_max_attempts',
81
        'default': 5,
82
        'mandatory': 0,
83
        'visible_for_client': True,
84
        'description': (
85
            'Number of unsuccessful retries to open the socket '
86
            + 'before to set the port as closed.'
87
        ),
88
    },
89
    'timeout_retry': {
90
        'type': 'integer',
91
        'name': 'timeout_retry',
92
        'default': 5,
93
        'mandatory': 0,
94
        'visible_for_client': True,
95
        'description': (
96
            'Number of retries when a socket connection attempt ' + 'timesout.'
97
        ),
98
    },
99
    'optimize_test': {
100
        'type': 'boolean',
101
        'name': 'optimize_test',
102
        'default': 1,
103
        'mandatory': 0,
104
        'visible_for_client': True,
105
        'description': (
106
            'By default, optimize_test is enabled which means openvas does '
107
            + 'trust the remote host banners and is only launching plugins '
108
            + 'against the services they have been designed to check. '
109
            + 'For example it will check a web server claiming to be IIS only '
110
            + 'for IIS related flaws but will skip plugins testing for Apache '
111
            + 'flaws, and so on. This default behavior is used to optimize '
112
            + 'the scanning performance and to avoid false positives. '
113
            + 'If you are not sure that the banners of the remote host '
114
            + 'have been tampered with, you can disable this option.'
115
        ),
116
    },
117
    'plugins_timeout': {
118
        'type': 'integer',
119
        'name': 'plugins_timeout',
120
        'default': 5,
121
        'mandatory': 0,
122
        'visible_for_client': True,
123
        'description': 'This is the maximum lifetime, in seconds of a plugin.',
124
    },
125
    'report_host_details': {
126
        'type': 'boolean',
127
        'name': 'report_host_details',
128
        'default': 1,
129
        'mandatory': 1,
130
        'visible_for_client': True,
131
        'description': '',
132
    },
133
    'safe_checks': {
134
        'type': 'boolean',
135
        'name': 'safe_checks',
136
        'default': 1,
137
        'mandatory': 1,
138
        'visible_for_client': True,
139
        'description': (
140
            'Disable the plugins with potential to crash '
141
            + 'the remote services'
142
        ),
143
    },
144
    'scanner_plugins_timeout': {
145
        'type': 'integer',
146
        'name': 'scanner_plugins_timeout',
147
        'default': 36000,
148
        'mandatory': 1,
149
        'visible_for_client': True,
150
        'description': 'Like plugins_timeout, but for ACT_SCANNER plugins.',
151
    },
152
    'time_between_request': {
153
        'type': 'integer',
154
        'name': 'time_between_request',
155
        'default': 0,
156
        'mandatory': 0,
157
        'visible_for_client': True,
158
        'description': (
159
            'Allow to set a wait time between two actions '
160
            + '(open, send, close).'
161
        ),
162
    },
163
    'unscanned_closed': {
164
        'type': 'boolean',
165
        'name': 'unscanned_closed',
166
        'default': 1,
167
        'mandatory': 1,
168
        'visible_for_client': True,
169
        'description': '',
170
    },
171
    'unscanned_closed_udp': {
172
        'type': 'boolean',
173
        'name': 'unscanned_closed_udp',
174
        'default': 1,
175
        'mandatory': 1,
176
        'visible_for_client': True,
177
        'description': '',
178
    },
179
    'expand_vhosts': {
180
        'type': 'boolean',
181
        'name': 'expand_vhosts',
182
        'default': 1,
183
        'mandatory': 0,
184
        'visible_for_client': True,
185
        'description': 'Whether to expand the target hosts '
186
        + 'list of vhosts with values gathered from sources '
187
        + 'such as reverse-lookup queries and VT checks '
188
        + 'for SSL/TLS certificates.',
189
    },
190
    'test_empty_vhost': {
191
        'type': 'boolean',
192
        'name': 'test_empty_vhost',
193
        'default': 0,
194
        'mandatory': 0,
195
        'visible_for_client': True,
196
        'description': 'If  set  to  yes, the scanner will '
197
        + 'also test the target by using empty vhost value '
198
        + 'in addition to the targets associated vhost values.',
199
    },
200
    'max_hosts': {
201
        'type': 'integer',
202
        'name': 'max_hosts',
203
        'default': 30,
204
        'mandatory': 0,
205
        'visible_for_client': False,
206
        'description': (
207
            'The maximum number of hosts to test at the same time which '
208
            + 'should be given to the client (which can override it). '
209
            + 'This value must be computed given your bandwidth, '
210
            + 'the number of hosts you want to test, your amount of '
211
            + 'memory and the performance of your processor(s).'
212
        ),
213
    },
214
    'max_checks': {
215
        'type': 'integer',
216
        'name': 'max_checks',
217
        'default': 10,
218
        'mandatory': 0,
219
        'visible_for_client': False,
220
        'description': (
221
            'The number of plugins that will run against each host being '
222
            + 'tested. Note that the total number of process will be max '
223
            + 'checks x max_hosts so you need to find a balance between '
224
            + 'these two options. Note that launching too many plugins at '
225
            + 'the same time may disable the remote host, either temporarily '
226
            + '(ie: inetd closes its ports) or definitely (the remote host '
227
            + 'crash because it is asked to do too many things at the '
228
            + 'same time), so be careful.'
229
        ),
230
    },
231
    'port_range': {
232
        'type': 'string',
233
        'name': 'port_range',
234
        'default': '',
235
        'mandatory': 0,
236
        'visible_for_client': False,
237
        'description': (
238
            'This is the default range of ports that the scanner plugins will '
239
            + 'probe. The syntax of this option is flexible, it can be a '
240
            + 'single range ("1-1500"), several ports ("21,23,80"), several '
241
            + 'ranges of ports ("1-1500,32000-33000"). Note that you can '
242
            + 'specify UDP and TCP ports by prefixing each range by T or U. '
243
            + 'For instance, the following range will make openvas scan UDP '
244
            + 'ports 1 to 1024 and TCP ports 1 to 65535 : '
245
            + '"T:1-65535,U:1-1024".'
246
        ),
247
    },
248
    'test_alive_hosts_only': {
249
        'type': 'boolean',
250
        'name': 'test_alive_hosts_only',
251
        'default': 0,
252
        'mandatory': 0,
253
        'visible_for_client': False,
254
        'description': (
255
            'If this option is set, openvas will scan the target list for '
256
            + 'alive hosts in a separate process while only testing those '
257
            + 'hosts which are identified as alive. This boosts the scan '
258
            + 'speed of target ranges with a high amount of dead hosts '
259
            + 'significantly.'
260
        ),
261
    },
262
    'source_iface': {
263
        'type': 'string',
264
        'name': 'source_iface',
265
        'default': '',
266
        'mandatory': 0,
267
        'visible_for_client': False,
268
        'description': (
269
            'Name of the network interface that will be used as the source '
270
            + 'of connections established by openvas. The scan won\'t be '
271
            + 'launched if the value isn\'t authorized according to '
272
            + '(sys_)ifaces_allow / (sys_)ifaces_deny if present.'
273
        ),
274
    },
275
    'ifaces_allow': {
276
        'type': 'string',
277
        'name': 'ifaces_allow',
278
        'default': '',
279
        'mandatory': 0,
280
        'visible_for_client': False,
281
        'description': (
282
            'Comma-separated list of interfaces names that are authorized '
283
            + 'as source_iface values.'
284
        ),
285
    },
286
    'ifaces_deny': {
287
        'type': 'string',
288
        'name': 'ifaces_deny',
289
        'default': '',
290
        'mandatory': 0,
291
        'visible_for_client': False,
292
        'description': (
293
            'Comma-separated list of interfaces names that are not '
294
            + 'authorized as source_iface values.'
295
        ),
296
    },
297
    'hosts_allow': {
298
        'type': 'string',
299
        'name': 'hosts_allow',
300
        'default': '',
301
        'mandatory': 0,
302
        'visible_for_client': False,
303
        'description': (
304
            'Comma-separated list of the only targets that are authorized '
305
            + 'to be scanned. Supports the same syntax as the list targets. '
306
            + 'Both target hostnames and the address to which they resolve '
307
            + 'are checked. Hostnames in hosts_allow list are not resolved '
308
            + 'however.'
309
        ),
310
    },
311
    'hosts_deny': {
312
        'type': 'string',
313
        'name': 'hosts_deny',
314
        'default': '',
315
        'mandatory': 0,
316
        'visible_for_client': False,
317
        'description': (
318
            'Comma-separated list of targets that are not authorized to '
319
            + 'be scanned. Supports the same syntax as the list targets. '
320
            + 'Both target hostnames and the address to which they resolve '
321
            + 'are checked. Hostnames in hosts_deny list are not '
322
            + 'resolved however.'
323
        ),
324
    },
325
}
326
327
328
class TestOspdOpenvas(TestCase):
329
    @patch('ospd_openvas.daemon.Openvas')
330
    def test_set_params_from_openvas_settings(self, mock_openvas: Openvas):
331
        mock_openvas.get_settings.return_value = {
332
            'non_simult_ports': '139, 445, 3389, Services/irc',
333
            'plugins_folder': '/foo/bar',
334
        }
335
        w = DummyDaemon()
336
        w.set_params_from_openvas_settings()
337
338
        self.assertEqual(mock_openvas.get_settings.call_count, 1)
339
        self.assertEqual(OSPD_PARAMS, OSPD_PARAMS_OUT)
340
        self.assertEqual(w.scan_only_params.get('plugins_folder'), '/foo/bar')
341
342
    @patch('ospd_openvas.daemon.Openvas')
343
    def test_sudo_available(self, mock_openvas):
344
        mock_openvas.check_sudo.return_value = True
345
346
        w = DummyDaemon()
347
        w._sudo_available = None  # pylint: disable=protected-access
348
        w.sudo_available  # pylint: disable=pointless-statement
349
350
        self.assertTrue(w.sudo_available)
351
352
    def test_get_custom_xml(self):
353
        out = (
354
            '<custom>'
355
            '<required_ports>Services/www, 80</required_ports>'
356
            '<category>3</category>'
357
            '<excluded_keys>Settings/disable_cgi_scanning</excluded_keys>'
358
            '<family>Product detection</family>'
359
            '<filename>mantis_detect.nasl</filename>'
360
            '<timeout>0</timeout>'
361
            '</custom>'
362
        )
363
        w = DummyDaemon()
364
365
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
366
        res = w.get_custom_vt_as_xml_str(
367
            '1.3.6.1.4.1.25623.1.0.100061', vt.get('custom')
368
        )
369
        self.assertEqual(len(res), len(out))
370
371
    def test_get_custom_xml_failed(self):
372
        w = DummyDaemon()
373
        logging.Logger.warning = Mock()
374
375
        custom = {'a': u"\u0006"}
376
        w.get_custom_vt_as_xml_str(
377
            '1.3.6.1.4.1.25623.1.0.100061', custom=custom
378
        )
379
380
        assert_called_once(logging.Logger.warning)
381
382
    def test_get_severities_xml(self):
383
        w = DummyDaemon()
384
385
        out = (
386
            '<severities>'
387
            '<severity type="cvss_base_v2">'
388
            'AV:N/AC:L/Au:N/C:N/I:N/A:N'
389
            '</severity>'
390
            '</severities>'
391
        )
392
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
393
        severities = vt.get('severities')
394
        res = w.get_severities_vt_as_xml_str(
395
            '1.3.6.1.4.1.25623.1.0.100061', severities
396
        )
397
398
        self.assertEqual(res, out)
399
400
    def test_get_severities_xml_failed(self):
401
        w = DummyDaemon()
402
        logging.Logger.warning = Mock()
403
404
        sever = {'severity_base_vector': u"\u0006"}
405
        w.get_severities_vt_as_xml_str(
406
            '1.3.6.1.4.1.25623.1.0.100061', severities=sever
407
        )
408
409
        assert_called_once(logging.Logger.warning)
410
411
    def test_get_params_xml(self):
412
        w = DummyDaemon()
413
        out = (
414
            '<params>'
415
            '<param type="checkbox" id="2">'
416
            '<name>Do not randomize the  order  in  which ports are '
417
            'scanned</name>'
418
            '<default>no</default>'
419
            '</param>'
420
            '<param type="entry" id="1">'
421
            '<name>Data length :</name>'
422
            '</param>'
423
            '</params>'
424
        )
425
426
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
427
        params = vt.get('vt_params')
428
        res = w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params)
429
430
        self.assertEqual(len(res), len(out))
431
432
    def test_get_params_xml_failed(self):
433
        w = DummyDaemon()
434
        logging.Logger.warning = Mock()
435
436
        params = {
437
            '1': {
438
                'id': '1',
439
                'type': 'entry',
440
                'default': u'\u0006',
441
                'name': 'dns-fuzz.timelimit',
442
                'description': 'Description',
443
            }
444
        }
445
        w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params)
446
447
        assert_called_once(logging.Logger.warning)
448
449
    def test_get_refs_xml(self):
450
        w = DummyDaemon()
451
452
        out = '<refs><ref type="url" id="http://www.mantisbt.org/"/></refs>'
453
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
454
        refs = vt.get('vt_refs')
455
        res = w.get_refs_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', refs)
456
457
        self.assertEqual(res, out)
458
459
    def test_get_dependencies_xml(self):
460
        w = DummyDaemon()
461
462
        out = (
463
            '<dependencies>'
464
            '<dependency vt_id="1.2.3.4"/><dependency vt_id="4.3.2.1"/>'
465
            '</dependencies>'
466
        )
467
        dep = ['1.2.3.4', '4.3.2.1']
468
        res = w.get_dependencies_vt_as_xml_str(
469
            '1.3.6.1.4.1.25623.1.0.100061', dep
470
        )
471
472
        self.assertEqual(res, out)
473
474
    def test_get_dependencies_xml_failed(self):
475
        w = DummyDaemon()
476
        logging.Logger.error = Mock()
477
478
        dep = [u"\u0006"]
479
        w.get_dependencies_vt_as_xml_str(
480
            '1.3.6.1.4.1.25623.1.0.100061', vt_dependencies=dep
481
        )
482
483
        assert_called_once(logging.Logger.error)
484
485
    def test_get_ctime_xml(self):
486
        w = DummyDaemon()
487
488
        out = '<creation_time>1237458156</creation_time>'
489
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
490
        ctime = vt.get('creation_time')
491
        res = w.get_creation_time_vt_as_xml_str(
492
            '1.3.6.1.4.1.25623.1.0.100061', ctime
493
        )
494
495
        self.assertEqual(res, out)
496
497
    def test_get_ctime_xml_failed(self):
498
        w = DummyDaemon()
499
        logging.Logger.warning = Mock()
500
501
        ctime = u'\u0006'
502
        w.get_creation_time_vt_as_xml_str(
503
            '1.3.6.1.4.1.25623.1.0.100061', vt_creation_time=ctime
504
        )
505
506
        assert_called_once(logging.Logger.warning)
507
508
    def test_get_mtime_xml(self):
509
        w = DummyDaemon()
510
511
        out = '<modification_time>1533906565</modification_time>'
512
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
513
        mtime = vt.get('modification_time')
514
        res = w.get_modification_time_vt_as_xml_str(
515
            '1.3.6.1.4.1.25623.1.0.100061', mtime
516
        )
517
518
        self.assertEqual(res, out)
519
520
    def test_get_mtime_xml_failed(self):
521
        w = DummyDaemon()
522
        logging.Logger.warning = Mock()
523
524
        mtime = u'\u0006'
525
        w.get_modification_time_vt_as_xml_str(
526
            '1.3.6.1.4.1.25623.1.0.100061', mtime
527
        )
528
529
        assert_called_once(logging.Logger.warning)
530
531
    def test_get_summary_xml(self):
532
        w = DummyDaemon()
533
534
        out = '<summary>some summary</summary>'
535
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
536
        summary = vt.get('summary')
537
        res = w.get_summary_vt_as_xml_str(
538
            '1.3.6.1.4.1.25623.1.0.100061', summary
539
        )
540
541
        self.assertEqual(res, out)
542
543
    def test_get_summary_xml_failed(self):
544
        w = DummyDaemon()
545
546
        summary = u'\u0006'
547
        logging.Logger.warning = Mock()
548
        w.get_summary_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', summary)
549
550
        assert_called_once(logging.Logger.warning)
551
552
    def test_get_impact_xml(self):
553
        w = DummyDaemon()
554
555
        out = '<impact>some impact</impact>'
556
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
557
        impact = vt.get('impact')
558
        res = w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact)
559
560
        self.assertEqual(res, out)
561
562
    def test_get_impact_xml_failed(self):
563
        w = DummyDaemon()
564
        logging.Logger.warning = Mock()
565
566
        impact = u'\u0006'
567
        w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact)
568
569
        assert_called_once(logging.Logger.warning)
570
571
    def test_get_insight_xml(self):
572
        w = DummyDaemon()
573
574
        out = '<insight>some insight</insight>'
575
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
576
        insight = vt.get('insight')
577
        res = w.get_insight_vt_as_xml_str(
578
            '1.3.6.1.4.1.25623.1.0.100061', insight
579
        )
580
581
        self.assertEqual(res, out)
582
583
    def test_get_insight_xml_failed(self):
584
        w = DummyDaemon()
585
        logging.Logger.warning = Mock()
586
587
        insight = u'\u0006'
588
        w.get_insight_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', insight)
589
590
        assert_called_once(logging.Logger.warning)
591
592
    def test_get_solution_xml(self):
593
        w = DummyDaemon()
594
595
        out = (
596
            '<solution type="WillNotFix" method="DebianAPTUpgrade">'
597
            'some solution'
598
            '</solution>'
599
        )
600
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
601
        solution = vt.get('solution')
602
        solution_type = vt.get('solution_type')
603
        solution_method = vt.get('solution_method')
604
605
        res = w.get_solution_vt_as_xml_str(
606
            '1.3.6.1.4.1.25623.1.0.100061',
607
            solution,
608
            solution_type,
609
            solution_method,
610
        )
611
612
        self.assertEqual(res, out)
613
614
    def test_get_solution_xml_failed(self):
615
        w = DummyDaemon()
616
        logging.Logger.warning = Mock()
617
618
        solution = u'\u0006'
619
        w.get_solution_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', solution)
620
621
        assert_called_once(logging.Logger.warning)
622
623
    def test_get_detection_xml(self):
624
        w = DummyDaemon()
625
626
        out = '<detection qod_type="remote_banner"/>'
627
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
628
        detection_type = vt.get('qod_type')
629
630
        res = w.get_detection_vt_as_xml_str(
631
            '1.3.6.1.4.1.25623.1.0.100061', qod_type=detection_type
632
        )
633
634
        self.assertEqual(res, out)
635
636
    def test_get_detection_xml_failed(self):
637
        w = DummyDaemon()
638
        logging.Logger.warning = Mock()
639
640
        detection = u'\u0006'
641
        w.get_detection_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', detection)
642
643
        assert_called_once(logging.Logger.warning)
644
645
    def test_get_affected_xml(self):
646
        w = DummyDaemon()
647
        out = '<affected>some affection</affected>'
648
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
649
        affected = vt.get('affected')
650
651
        res = w.get_affected_vt_as_xml_str(
652
            '1.3.6.1.4.1.25623.1.0.100061', affected=affected
653
        )
654
655
        self.assertEqual(res, out)
656
657
    def test_get_affected_xml_failed(self):
658
        w = DummyDaemon()
659
        logging.Logger.warning = Mock()
660
661
        affected = u"\u0006" + "affected"
662
        w.get_affected_vt_as_xml_str(
663
            '1.3.6.1.4.1.25623.1.0.100061', affected=affected
664
        )
665
666
        assert_called_once(logging.Logger.warning)
667
668
    @patch('ospd_openvas.daemon.Path.exists')
669
    @patch('ospd_openvas.daemon.OSPDopenvas.set_params_from_openvas_settings')
670
    def test_feed_is_outdated_none(
671
        self, mock_set_params: MagicMock, mock_path_exists: MagicMock
672
    ):
673
        w = DummyDaemon()
674
675
        w.scan_only_params['plugins_folder'] = '/foo/bar'
676
677
        # Return None
678
        mock_path_exists.return_value = False
679
680
        ret = w.feed_is_outdated('1234')
681
        self.assertIsNone(ret)
682
683
        self.assertEqual(mock_set_params.call_count, 1)
684
        self.assertEqual(mock_path_exists.call_count, 1)
685
686
    @patch('ospd_openvas.daemon.Path.exists')
687
    @patch('ospd_openvas.daemon.Path.open')
688
    def test_feed_is_outdated_true(
689
        self, mock_path_open: MagicMock, mock_path_exists: MagicMock,
690
    ):
691
        read_data = 'PLUGIN_SET = "1235";'
692
693
        mock_path_exists.return_value = True
694
        mock_read = MagicMock(name='Path open context manager')
695
        mock_read.__enter__ = MagicMock(return_value=io.StringIO(read_data))
696
        mock_path_open.return_value = mock_read
697
698
        w = DummyDaemon()
699
700
        # Return True
701
        w.scan_only_params['plugins_folder'] = '/foo/bar'
702
703
        ret = w.feed_is_outdated('1234')
704
        self.assertTrue(ret)
705
706
        self.assertEqual(mock_path_exists.call_count, 1)
707
        self.assertEqual(mock_path_open.call_count, 1)
708
709
    @patch('ospd_openvas.daemon.Path.exists')
710
    @patch('ospd_openvas.daemon.Path.open')
711
    def test_feed_is_outdated_false(
712
        self, mock_path_open: MagicMock, mock_path_exists: MagicMock,
713
    ):
714
        mock_path_exists.return_value = True
715
716
        read_data = 'PLUGIN_SET = "1234"'
717
        mock_path_exists.return_value = True
718
        mock_read = MagicMock(name='Path open context manager')
719
        mock_read.__enter__ = MagicMock(return_value=io.StringIO(read_data))
720
        mock_path_open.return_value = mock_read
721
722
        w = DummyDaemon()
723
        w.scan_only_params['plugins_folder'] = '/foo/bar'
724
725
        ret = w.feed_is_outdated('1234')
726
        self.assertFalse(ret)
727
728
        self.assertEqual(mock_path_exists.call_count, 1)
729
        self.assertEqual(mock_path_open.call_count, 1)
730
731
    def test_check_feed_cache_unavailable(self):
732
        w = DummyDaemon()
733
        w.vts.is_cache_available = False
734
        w.feed_is_outdated = Mock()
735
        res = w.check_feed()
736
737
        self.assertFalse(res)
738
        w.feed_is_outdated.assert_not_called()
739
740 View Code Duplication
    @patch('ospd_openvas.daemon.BaseDB')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
741
    @patch('ospd_openvas.daemon.ResultList.add_scan_log_to_list')
742
    def test_get_openvas_result(self, mock_add_scan_log_to_list, MockDBClass):
743
        w = DummyDaemon()
744
745
        target_element = w.create_xml_target()
746
        targets = OspRequest.process_target_element(target_element)
747
        w.create_scan('123-456', targets, None, [])
748
749
        results = [
750
            "LOG|||192.168.0.1|||localhost|||general/Host_Details||||||Host dead",
751
        ]
752
        MockDBClass.get_result.return_value = results
753
        mock_add_scan_log_to_list.return_value = None
754
755
        w.report_openvas_results(MockDBClass, '123-456')
756
        mock_add_scan_log_to_list.assert_called_with(
757
            host='192.168.0.1',
758
            hostname='localhost',
759
            name='',
760
            port='general/Host_Details',
761
            qod='',
762
            test_id='',
763
            uri='',
764
            value='Host dead',
765
        )
766
767 View Code Duplication
    @patch('ospd_openvas.daemon.BaseDB')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
768
    @patch('ospd_openvas.daemon.ResultList.add_scan_error_to_list')
769
    def test_get_openvas_result_host_deny(
770
        self, mock_add_scan_error_to_list, MockDBClass
771
    ):
772
        w = DummyDaemon()
773
774
        target_element = w.create_xml_target()
775
        targets = OspRequest.process_target_element(target_element)
776
        w.create_scan('123-456', targets, None, [])
777
778
        results = [
779
            "ERRMSG|||127.0.0.1|||localhost|||||||||Host access denied.",
780
        ]
781
        MockDBClass.get_result.return_value = results
782
        mock_add_scan_error_to_list.return_value = None
783
784
        w.report_openvas_results(MockDBClass, '123-456')
785
        mock_add_scan_error_to_list.assert_called_with(
786
            host='127.0.0.1',
787
            hostname='localhost',
788
            name='',
789
            port='',
790
            test_id='',
791
            uri='',
792
            value='Host access denied.',
793
        )
794
795
    @patch('ospd_openvas.daemon.BaseDB')
796
    def test_get_openvas_result_dead_hosts(self, MockDBClass):
797
        w = DummyDaemon()
798
        target_element = w.create_xml_target()
799
        targets = OspRequest.process_target_element(target_element)
800
        w.create_scan('123-456', targets, None, [])
801
802
        results = [
803
            "DEADHOST||| ||| ||| ||| |||4",
804
        ]
805
        MockDBClass.get_result.return_value = results
806
        w.scan_collection.set_amount_dead_hosts = MagicMock()
807
808
        w.report_openvas_results(MockDBClass, '123-456')
809
        w.scan_collection.set_amount_dead_hosts.assert_called_with(
810
            '123-456', total_dead=4,
811
        )
812
813
    @patch('ospd_openvas.daemon.BaseDB')
814
    @patch('ospd_openvas.daemon.ResultList.add_scan_log_to_list')
815
    def test_get_openvas_result_host_start(
816
        self, mock_add_scan_log_to_list, MockDBClass
817
    ):
818
        w = DummyDaemon()
819
        target_element = w.create_xml_target()
820
        targets = OspRequest.process_target_element(target_element)
821
        w.create_scan('123-456', targets, None, [])
822
823
        results = [
824
            "HOST_START|||192.168.10.124||| ||| ||||||today 1",
825
        ]
826
827
        MockDBClass.get_result.return_value = results
828
        mock_add_scan_log_to_list.return_value = None
829
830
        w.report_openvas_results(MockDBClass, '123-456')
831
832
        mock_add_scan_log_to_list.assert_called_with(
833
            host='192.168.10.124', name='HOST_START', value='today 1',
834
        )
835
836
    @patch('ospd_openvas.daemon.BaseDB')
837
    @patch('ospd_openvas.daemon.ResultList.add_scan_alarm_to_list')
838
    def test_result_without_vt_oid(
839
        self, mock_add_scan_alarm_to_list, MockDBClass
840
    ):
841
        w = DummyDaemon()
842
        logging.Logger.warning = Mock()
843
844
        target_element = w.create_xml_target()
845
        targets = OspRequest.process_target_element(target_element)
846
        w.create_scan('123-456', targets, None, [])
847
        w.scan_collection.scans_table['123-456']['results'] = list()
848
        results = ["ALARM||| ||| ||| ||| |||some alarm|||path", None]
849
        MockDBClass.get_result.return_value = results
850
        mock_add_scan_alarm_to_list.return_value = None
851
852
        w.report_openvas_results(MockDBClass, '123-456')
853
854
        assert_called_once(logging.Logger.warning)
855
856
    @patch('ospd_openvas.db.KbDB')
857
    def test_openvas_is_alive_already_stopped(self, mock_db):
858
        w = DummyDaemon()
859
        # mock_psutil = MockPsutil.return_value
860
        mock_db.scan_is_stopped.return_value = True
861
        ret = w.is_openvas_process_alive(mock_db, '1234', 'a1-b2-c3-d4')
862
863
        self.assertTrue(ret)
864
865
    @patch('ospd_openvas.db.KbDB')
866
    def test_openvas_is_alive_still(self, mock_db):
867
        w = DummyDaemon()
868
        # mock_psutil = MockPsutil.return_value
869
        mock_db.scan_is_stopped.return_value = False
870
        ret = w.is_openvas_process_alive(mock_db, '1234', 'a1-b2-c3-d4')
871
872
        self.assertFalse(ret)
873
874
    @patch('ospd_openvas.daemon.OSPDaemon.set_scan_progress_batch')
875
    @patch('ospd_openvas.daemon.OSPDaemon.sort_host_finished')
876
    @patch('ospd_openvas.db.KbDB')
877
    def test_report_openvas_scan_status(
878
        self, mock_db, mock_sort_host_finished, mock_set_scan_progress_batch
879
    ):
880
        w = DummyDaemon()
881
882
        mock_set_scan_progress_batch.return_value = None
883
        mock_sort_host_finished.return_value = None
884
        mock_db.get_scan_status.return_value = [
885
            '192.168.0.1/15/1000',
886
            '192.168.0.2/15/0',
887
            '192.168.0.3/15/-1',
888
            '192.168.0.4/1500/1500',
889
        ]
890
891
        target_element = w.create_xml_target()
892
        targets = OspRequest.process_target_element(target_element)
893
894
        w.create_scan('123-456', targets, None, [])
895
        w.report_openvas_scan_status(mock_db, '123-456')
896
897
        mock_set_scan_progress_batch.assert_called_with(
898
            '123-456',
899
            host_progress={
900
                '192.168.0.1': 1,
901
                '192.168.0.3': -1,
902
                '192.168.0.4': 100,
903
            },
904
        )
905
906
        mock_sort_host_finished.assert_called_with(
907
            '123-456', ['192.168.0.3', '192.168.0.4']
908
        )
909
910
911
class TestFilters(TestCase):
912
    def test_format_vt_modification_time(self):
913
        ovformat = OpenVasVtsFilter(None)
914
        td = '1517443741'
915
        formatted = ovformat.format_vt_modification_time(td)
916
        self.assertEqual(formatted, "20180201000901")
917
918
    def test_get_filtered_vts_false(self):
919
        w = DummyDaemon()
920
        vts_collection = ['1234', '1.3.6.1.4.1.25623.1.0.100061']
921
922
        ovfilter = OpenVasVtsFilter(w.nvti)
923
        res = ovfilter.get_filtered_vts_list(
924
            vts_collection, "modification_time<10"
925
        )
926
        self.assertNotIn('1.3.6.1.4.1.25623.1.0.100061', res)
927
928
    def test_get_filtered_vts_true(self):
929
        w = DummyDaemon()
930
        vts_collection = ['1234', '1.3.6.1.4.1.25623.1.0.100061']
931
932
        ovfilter = OpenVasVtsFilter(w.nvti)
933
        res = ovfilter.get_filtered_vts_list(
934
            vts_collection, "modification_time>10"
935
        )
936
        self.assertIn('1.3.6.1.4.1.25623.1.0.100061', res)
937