Passed
Pull Request — master (#138)
by Juan José
01:26
created

tests.test_daemon   F

Complexity

Total Complexity 66

Size/Duplication

Total Lines 721
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 543
dl 0
loc 721
rs 3.12
c 0
b 0
f 0
wmc 66

44 Methods

Rating   Name   Duplication   Size   Complexity  
A TestOspdOpenvas.test_get_mtime_xml_failed() 0 10 2
A TestOspdOpenvas.test_get_severities_xml_failed() 0 10 2
A TestOspdOpenvas.test_get_dependencies_xml() 0 12 1
A TestOspdOpenvas.test_get_summary_xml() 0 10 1
A TestOspdOpenvas.test_get_ctime_xml() 0 10 1
A TestOspdOpenvas.test_get_params_xml_failed() 0 16 2
A TestOspdOpenvas.test_get_params_xml() 0 14 1
A TestOspdOpenvas.test_get_refs_xml() 0 8 1
A TestOspdOpenvas.test_get_severities_xml() 0 13 1
A TestOspdOpenvas.test_get_dependencies_xml_failed() 0 10 2
A TestOspdOpenvas.test_get_ctime_xml_failed() 0 10 2
A TestOspdOpenvas.test_get_mtime_xml() 0 10 1
A TestOspdOpenvas.test_get_insight_xml() 0 10 1
A TestOspdOpenvas.test_get_summary_xml_failed() 0 8 2
A TestOspdOpenvas.test_get_solution_xml() 0 12 1
A TestOspdOpenvas.test_get_impact_xml_failed() 0 8 2
A TestOspdOpenvas.test_get_impact_xml() 0 8 1
A TestOspdOpenvas.test_get_insight_xml_failed() 0 8 2
A TestOspdOpenvas.test_get_detection_xml() 0 11 1
A TestOspdOpenvas.test_get_solution_xml_failed() 0 8 2
A TestOspdOpenvas.test_redis_nvticache_init() 0 7 1
A TestOspdOpenvas.test_load_vts() 0 5 1
A TestOspdOpenvas.test_get_custom_xml() 0 16 1
A TestOspdOpenvas.test_sudo_available() 0 7 1
A TestOspdOpenvas.test_parse_param() 0 11 1
A TestOspdOpenvas.test_get_openvas_timestamp_scan_host_start() 0 9 2
A TestFilters.test_format_vt_modification_time() 0 5 1
A TestOspdOpenvas.test_feed_is_outdated_none() 0 9 2
A TestOspdOpenvas.test_feed_is_outdated_true() 0 13 4
A TestOspdOpenvas.test_process_vts_not_found() 0 11 2
A TestOspdOpenvas.test_process_vts_bad_param_id() 0 9 1
A TestOspdOpenvas.test_feed_is_outdated_false() 0 14 4
A TestOspdOpenvas.test_scan_is_stopped() 0 7 1
A TestOspdOpenvas.test_process_vts() 0 18 1
A TestOspdOpenvas.test_get_affected_xml_failed() 0 9 2
A TestOspdOpenvas.test_build_credentials_ssh_up() 0 18 1
A TestOspdOpenvas.test_get_affected_xml() 0 11 1
B TestOspdOpenvas.test_build_credentials() 0 49 1
A TestOspdOpenvas.test_host_is_finished() 0 5 1
A TestOspdOpenvas.test_get_detection_xml_failed() 0 7 2
A TestOspdOpenvas.test_get_openvas_result() 0 17 1
A TestOspdOpenvas.test_update_progress() 0 10 1
A TestOspdOpenvas.test_get_openvas_timestamp_scan_host_end() 0 8 2
A TestOspdOpenvas.test_get_custom_xml_failed() 0 9 2

How to fix   Complexity   

Complexity

Complex classes like tests.test_daemon often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018-2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-2.0-or-later
5
#
6
# This program is free software; you can redistribute it and/or
7
# modify it under the terms of the GNU General Public License
8
# as published by the Free Software Foundation; either version 2
9
# of the License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
20
# pylint: disable=invalid-name,line-too-long
21
22
""" Unit Test for ospd-openvas """
23
24
from unittest import TestCase
25
from unittest.mock import patch
26
from unittest.mock import Mock
27
28
import io
29
import logging
30
31
from tests.dummydaemon import DummyDaemon
32
33
from ospd_openvas.daemon import OSPD_PARAMS, OpenVasVtsFilter, Path
34
from ospd_openvas.errors import OspdOpenvasError
35
36
OSPD_PARAMS_OUT = {
37
    'auto_enable_dependencies': {
38
        'type': 'boolean',
39
        'name': 'auto_enable_dependencies',
40
        'default': 1,
41
        'mandatory': 1,
42
        'description': 'Automatically enable the plugins that are depended on',
43
    },
44
    'cgi_path': {
45
        'type': 'string',
46
        'name': 'cgi_path',
47
        'default': '/cgi-bin:/scripts',
48
        'mandatory': 1,
49
        'description': 'Look for default CGIs in /cgi-bin and /scripts',
50
    },
51
    'checks_read_timeout': {
52
        'type': 'integer',
53
        'name': 'checks_read_timeout',
54
        'default': 5,
55
        'mandatory': 1,
56
        'description': 'Number  of seconds that the security checks will '
57
        'wait for when doing a recv()',
58
    },
59
    'drop_privileges': {
60
        'type': 'boolean',
61
        'name': 'drop_privileges',
62
        'default': 0,
63
        'mandatory': 1,
64
        'description': '',
65
    },
66
    'network_scan': {
67
        'type': 'boolean',
68
        'name': 'network_scan',
69
        'default': 0,
70
        'mandatory': 1,
71
        'description': '',
72
    },
73
    'non_simult_ports': {
74
        'type': 'string',
75
        'name': 'non_simult_ports',
76
        'default': '22',
77
        'mandatory': 1,
78
        'description': 'Prevent to make two connections on the same given '
79
        'ports at the same time.',
80
    },
81
    'open_sock_max_attempts': {
82
        'type': 'integer',
83
        'name': 'open_sock_max_attempts',
84
        'default': 5,
85
        'mandatory': 0,
86
        'description': 'Number of unsuccessful retries to open the socket '
87
        'before to set the port as closed.',
88
    },
89
    'timeout_retry': {
90
        'type': 'integer',
91
        'name': 'timeout_retry',
92
        'default': 5,
93
        'mandatory': 0,
94
        'description': 'Number of retries when a socket connection attempt '
95
        'timesout.',
96
    },
97
    'optimize_test': {
98
        'type': 'integer',
99
        'name': 'optimize_test',
100
        'default': 5,
101
        'mandatory': 0,
102
        'description': 'By default, openvas does not trust the remote '
103
        'host banners.',
104
    },
105
    'plugins_timeout': {
106
        'type': 'integer',
107
        'name': 'plugins_timeout',
108
        'default': 5,
109
        'mandatory': 0,
110
        'description': 'This is the maximum lifetime, in seconds of a plugin.',
111
    },
112
    'report_host_details': {
113
        'type': 'boolean',
114
        'name': 'report_host_details',
115
        'default': 1,
116
        'mandatory': 1,
117
        'description': '',
118
    },
119
    'safe_checks': {
120
        'type': 'boolean',
121
        'name': 'safe_checks',
122
        'default': 1,
123
        'mandatory': 1,
124
        'description': 'Disable the plugins with potential to crash '
125
        'the remote services',
126
    },
127
    'scanner_plugins_timeout': {
128
        'type': 'integer',
129
        'name': 'scanner_plugins_timeout',
130
        'default': 36000,
131
        'mandatory': 1,
132
        'description': 'Like plugins_timeout, but for ACT_SCANNER plugins.',
133
    },
134
    'time_between_request': {
135
        'type': 'integer',
136
        'name': 'time_between_request',
137
        'default': 0,
138
        'mandatory': 0,
139
        'description': 'Allow to set a wait time between two actions '
140
        '(open, send, close).',
141
    },
142
    'unscanned_closed': {
143
        'type': 'boolean',
144
        'name': 'unscanned_closed',
145
        'default': 1,
146
        'mandatory': 1,
147
        'description': '',
148
    },
149
    'unscanned_closed_udp': {
150
        'type': 'boolean',
151
        'name': 'unscanned_closed_udp',
152
        'default': 1,
153
        'mandatory': 1,
154
        'description': '',
155
    },
156
    'use_mac_addr': {
157
        'type': 'boolean',
158
        'name': 'use_mac_addr',
159
        'default': 0,
160
        'mandatory': 0,
161
        'description': 'To test the local network. '
162
        'Hosts will be referred to by their MAC address.',
163
    },
164
    'vhosts': {
165
        'type': 'string',
166
        'name': 'vhosts',
167
        'default': '',
168
        'mandatory': 0,
169
        'description': '',
170
    },
171
    'vhosts_ip': {
172
        'type': 'string',
173
        'name': 'vhosts_ip',
174
        'default': '',
175
        'mandatory': 0,
176
        'description': '',
177
    },
178
}
179
180
181
@patch('ospd_openvas.db.OpenvasDB')
182
@patch('ospd_openvas.nvticache.NVTICache')
183
class TestOspdOpenvas(TestCase):
184
    @patch('ospd_openvas.daemon.subprocess')
185
    def test_redis_nvticache_init(self, mock_subproc, mock_nvti, mock_db):
186
        mock_subproc.check_call.return_value = True
187
        w = DummyDaemon(mock_nvti, mock_db)
188
        mock_subproc.reset_mock()
189
        w.redis_nvticache_init()
190
        self.assertEqual(mock_subproc.check_call.call_count, 1)
191
192
    @patch('ospd_openvas.daemon.subprocess')
193
    def test_parse_param(self, mock_subproc, mock_nvti, mock_db):
194
195
        mock_subproc.check_output.return_value = (
196
            'non_simult_ports = 22\nplugins_folder = /foo/bar'.encode()
197
        )
198
        w = DummyDaemon(mock_nvti, mock_db)
199
        w.parse_param()
200
        self.assertEqual(mock_subproc.check_output.call_count, 1)
201
        self.assertEqual(OSPD_PARAMS, OSPD_PARAMS_OUT)
202
        self.assertEqual(w.scan_only_params.get('plugins_folder'), '/foo/bar')
203
204
    @patch('ospd_openvas.daemon.subprocess')
205
    def test_sudo_available(self, mock_subproc, mock_nvti, mock_db):
206
        mock_subproc.check_call.return_value = 0
207
        w = DummyDaemon(mock_nvti, mock_db)
208
        w._sudo_available = None  # pylint: disable=protected-access
209
        w.sudo_available  # pylint: disable=pointless-statement
210
        self.assertTrue(w.sudo_available)
211
212
    def test_load_vts(self, mock_nvti, mock_db):
213
        w = DummyDaemon(mock_nvti, mock_db)
214
        w.load_vts()
215
        self.maxDiff = None
216
        self.assertEqual(w.vts, w.VT)
217
218
    def test_get_custom_xml(self, mock_nvti, mock_db):
219
        out = (
220
            '<custom><required_ports>Services/www, 80</re'
221
            'quired_ports><category>3</category><'
222
            'excluded_keys>Settings/disable_cgi_s'
223
            'canning</excluded_keys><family>Produ'
224
            'ct detection</family><filename>manti'
225
            's_detect.nasl</filename><timeout>0</'
226
            'timeout></custom>'
227
        )
228
        w = DummyDaemon(mock_nvti, mock_db)
229
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
230
        res = w.get_custom_vt_as_xml_str(
231
            '1.3.6.1.4.1.25623.1.0.100061', vt.get('custom')
232
        )
233
        self.assertEqual(len(res), len(out))
234
235
    def test_get_custom_xml_failed(self, mock_nvti, mock_db):
236
        w = DummyDaemon(mock_nvti, mock_db)
237
        custom = {'a': u"\u0006"}
238
        logging.Logger.warning = Mock()
239
        w.get_custom_vt_as_xml_str(
240
            '1.3.6.1.4.1.25623.1.0.100061', custom=custom
241
        )
242
        if hasattr(Mock, 'assert_called_once'):
243
            logging.Logger.warning.assert_called_once()
244
245
    def test_get_severities_xml(self, mock_nvti, mock_db):
246
        w = DummyDaemon(mock_nvti, mock_db)
247
        out = (
248
            '<severities><severity type="cvss_base_v2">'
249
            'AV:N/AC:L/Au:N/C:N/I:N/A:N</severity></severities>'
250
        )
251
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
252
        severities = vt.get('severities')
253
        res = w.get_severities_vt_as_xml_str(
254
            '1.3.6.1.4.1.25623.1.0.100061', severities
255
        )
256
257
        self.assertEqual(res, out)
258
259
    def test_get_severities_xml_failed(self, mock_nvti, mock_db):
260
        w = DummyDaemon(mock_nvti, mock_db)
261
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
262
        sever = {'severity_base_vector': u"\u0006"}
263
        logging.Logger.warning = Mock()
264
        w.get_severities_vt_as_xml_str(
265
            '1.3.6.1.4.1.25623.1.0.100061', severities=sever
266
        )
267
        if hasattr(Mock, 'assert_called_once'):
268
            logging.Logger.warning.assert_called_once()
269
270
    def test_get_params_xml(self, mock_nvti, mock_db):
271
        w = DummyDaemon(mock_nvti, mock_db)
272
        out = (
273
            '<params><param type="checkbox" id="2"><name>Do '
274
            'not randomize the  order  in  which ports are scanned</name'
275
            '><default>no</default></param><param type="ent'
276
            'ry" id="1"><name>Data length :</name><'
277
            '/param></params>'
278
        )
279
280
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
281
        params = vt.get('vt_params')
282
        res = w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params)
283
        self.assertEqual(len(res), len(out))
284
285
    def test_get_params_xml_failed(self, mock_nvti, mock_db):
286
        w = DummyDaemon(mock_nvti, mock_db)
287
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
288
        params = {
289
            '1': {
290
                'id': '1',
291
                'type': 'entry',
292
                'default': u'\u0006',
293
                'name': 'dns-fuzz.timelimit',
294
                'description': 'Description',
295
            }
296
        }
297
        logging.Logger.warning = Mock()
298
        w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params)
299
        if hasattr(Mock, 'assert_called_once'):
300
            logging.Logger.warning.assert_called_once()
301
302
    def test_get_refs_xml(self, mock_nvti, mock_db):
303
        w = DummyDaemon(mock_nvti, mock_db)
304
        out = '<refs><ref type="url" id="http://www.mantisbt.org/"/>' '</refs>'
305
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
306
        refs = vt.get('vt_refs')
307
        res = w.get_refs_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', refs)
308
309
        self.assertEqual(res, out)
310
311
    def test_get_dependencies_xml(self, mock_nvti, mock_db):
312
        w = DummyDaemon(mock_nvti, mock_db)
313
        out = (
314
            '<dependencies><dependency vt_id="1.2.3.4"/><dependency vt'
315
            '_id="4.3.2.1"/></dependencies>'
316
        )
317
        dep = ['1.2.3.4', '4.3.2.1']
318
        res = w.get_dependencies_vt_as_xml_str(
319
            '1.3.6.1.4.1.25623.1.0.100061', dep
320
        )
321
322
        self.assertEqual(res, out)
323
324
    def test_get_dependencies_xml_failed(self, mock_nvti, mock_db):
325
        w = DummyDaemon(mock_nvti, mock_db)
326
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
327
        dep = [u"\u0006"]
328
        logging.Logger.error = Mock()
329
        w.get_dependencies_vt_as_xml_str(
330
            '1.3.6.1.4.1.25623.1.0.100061', dep_list=dep
331
        )
332
        if hasattr(Mock, 'assert_called_once'):
333
            logging.Logger.error.assert_called_once()
334
335
    def test_get_ctime_xml(self, mock_nvti, mock_db):
336
        w = DummyDaemon(mock_nvti, mock_db)
337
        out = '<creation_time>1237458156</creation_time>'
338
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
339
        ctime = vt.get('creation_time')
340
        res = w.get_creation_time_vt_as_xml_str(
341
            '1.3.6.1.4.1.25623.1.0.100061', ctime
342
        )
343
344
        self.assertEqual(res, out)
345
346
    def test_get_ctime_xml_failed(self, mock_nvti, mock_db):
347
        w = DummyDaemon(mock_nvti, mock_db)
348
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
349
        ctime = u'\u0006'
350
        logging.Logger.warning = Mock()
351
        w.get_creation_time_vt_as_xml_str(
352
            '1.3.6.1.4.1.25623.1.0.100061', creation_time=ctime
353
        )
354
        if hasattr(Mock, 'assert_called_onc'):
355
            logging.Logger.warning.assert_called_once()
356
357
    def test_get_mtime_xml(self, mock_nvti, mock_db):
358
        w = DummyDaemon(mock_nvti, mock_db)
359
        out = '<modification_time>1533906565</modification_time>'
360
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
361
        mtime = vt.get('modification_time')
362
        res = w.get_modification_time_vt_as_xml_str(
363
            '1.3.6.1.4.1.25623.1.0.100061', mtime
364
        )
365
366
        self.assertEqual(res, out)
367
368
    def test_get_mtime_xml_failed(self, mock_nvti, mock_db):
369
        w = DummyDaemon(mock_nvti, mock_db)
370
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
371
        mtime = u'\u0006'
372
        logging.Logger.warning = Mock()
373
        w.get_modification_time_vt_as_xml_str(
374
            '1.3.6.1.4.1.25623.1.0.100061', mtime
375
        )
376
        if hasattr(Mock, 'assert_called_once'):
377
            logging.Logger.warning.assert_called_once()
378
379
    def test_get_summary_xml(self, mock_nvti, mock_db):
380
        w = DummyDaemon(mock_nvti, mock_db)
381
        out = '<summary>some summary</summary>'
382
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
383
        summary = vt.get('summary')
384
        res = w.get_summary_vt_as_xml_str(
385
            '1.3.6.1.4.1.25623.1.0.100061', summary
386
        )
387
388
        self.assertEqual(res, out)
389
390
    def test_get_summary_xml_failed(self, mock_nvti, mock_db):
391
        w = DummyDaemon(mock_nvti, mock_db)
392
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
393
        summary = u'\u0006'
394
        logging.Logger.warning = Mock()
395
        w.get_summary_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', summary)
396
        if hasattr(Mock, 'assert_calledonce'):
397
            logging.Logger.warning.assert_called_once()
398
399
    def test_get_impact_xml(self, mock_nvti, mock_db):
400
        w = DummyDaemon(mock_nvti, mock_db)
401
        out = '<impact>some impact</impact>'
402
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
403
        impact = vt.get('impact')
404
        res = w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact)
405
406
        self.assertEqual(res, out)
407
408
    def test_get_impact_xml_failed(self, mock_nvti, mock_db):
409
        w = DummyDaemon(mock_nvti, mock_db)
410
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
411
        impact = u'\u0006'
412
        logging.Logger.warning = Mock()
413
        w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact)
414
        if hasattr(Mock, 'assert_called_once'):
415
            logging.Logger.warning.assert_called_once()
416
417
    def test_get_insight_xml(self, mock_nvti, mock_db):
418
        w = DummyDaemon(mock_nvti, mock_db)
419
        out = '<insight>some insight</insight>'
420
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
421
        insight = vt.get('insight')
422
        res = w.get_insight_vt_as_xml_str(
423
            '1.3.6.1.4.1.25623.1.0.100061', insight
424
        )
425
426
        self.assertEqual(res, out)
427
428
    def test_get_insight_xml_failed(self, mock_nvti, mock_db):
429
        w = DummyDaemon(mock_nvti, mock_db)
430
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
431
        insight = u'\u0006'
432
        logging.Logger.warning = Mock()
433
        w.get_insight_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', insight)
434
        if hasattr(Mock, 'assert_called_once'):
435
            logging.Logger.warning.assert_called_once()
436
437
    def test_get_solution_xml(self, mock_nvti, mock_db):
438
        w = DummyDaemon(mock_nvti, mock_db)
439
        out = '<solution type="WillNotFix">some solution</solution>'
440
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
441
        solution = vt.get('solution')
442
        solution_type = vt.get('solution_type')
443
444
        res = w.get_solution_vt_as_xml_str(
445
            '1.3.6.1.4.1.25623.1.0.100061', solution, solution_type
446
        )
447
448
        self.assertEqual(res, out)
449
450
    def test_get_solution_xml_failed(self, mock_nvti, mock_db):
451
        w = DummyDaemon(mock_nvti, mock_db)
452
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
453
        solution = u'\u0006'
454
        logging.Logger.warning = Mock()
455
        w.get_solution_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', solution)
456
        if hasattr(Mock, 'assert_called_once'):
457
            logging.Logger.warning.assert_called_once()
458
459
    def test_get_detection_xml(self, mock_nvti, mock_db):
460
        w = DummyDaemon(mock_nvti, mock_db)
461
        out = '<detection qod_type="remote_banner"/>'
462
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
463
        detection_type = vt.get('qod_type')
464
465
        res = w.get_detection_vt_as_xml_str(
466
            '1.3.6.1.4.1.25623.1.0.100061', qod_type=detection_type
467
        )
468
469
        self.assertEqual(res, out)
470
471
    def test_get_detection_xml_failed(self, mock_nvti, mock_db):
472
        w = DummyDaemon(mock_nvti, mock_db)
473
        detection = u'\u0006'
474
        logging.Logger.warning = Mock()
475
        w.get_detection_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', detection)
476
        if hasattr(Mock, 'assert_called_once'):
477
            logging.Logger.warning.assert_called_once()
478
479
    def test_get_affected_xml(self, mock_nvti, mock_db):
480
        w = DummyDaemon(mock_nvti, mock_db)
481
        out = '<affected>some affection</affected>'
482
        vt = w.VT['1.3.6.1.4.1.25623.1.0.100061']
483
        affected = vt.get('affected')
484
485
        res = w.get_affected_vt_as_xml_str(
486
            '1.3.6.1.4.1.25623.1.0.100061', affected=affected
487
        )
488
489
        self.assertEqual(res, out)
490
491
    def test_get_affected_xml_failed(self, mock_nvti, mock_db):
492
        w = DummyDaemon(mock_nvti, mock_db)
493
        affected = u"\u0006" + "affected"
494
        logging.Logger.warning = Mock()
495
        w.get_affected_vt_as_xml_str(
496
            '1.3.6.1.4.1.25623.1.0.100061', affected=affected
497
        )
498
        if hasattr(Mock, 'assert_called_once'):
499
            logging.Logger.warning.assert_called_once()
500
501
    def test_build_credentials(self, mock_nvti, mock_db):
502
        w = DummyDaemon(mock_nvti, mock_db)
503
504
        cred_out = [
505
            '1.3.6.1.4.1.25623.1.0.105058:1:entry:ESXi login name:|||username',
506
            '1.3.6.1.4.1.25623.1.0.105058:2:password:ESXi login password:|||pass',
507
            'auth_port_ssh|||22',
508
            '1.3.6.1.4.1.25623.1.0.103591:1:entry:SSH login name:|||username',
509
            '1.3.6.1.4.1.25623.1.0.103591:2:password:SSH key passphrase:|||pass',
510
            '1.3.6.1.4.1.25623.1.0.103591:4:file:SSH private key:|||',
511
            '1.3.6.1.4.1.25623.1.0.90023:1:entry:SMB login:|||username',
512
            '1.3.6.1.4.1.25623.1.0.90023:2:password]:SMB password :|||pass',
513
            '1.3.6.1.4.1.25623.1.0.105076:1:password:SNMP Community:some comunity',
514
            '1.3.6.1.4.1.25623.1.0.105076:2:entry:SNMPv3 Username:username',
515
            '1.3.6.1.4.1.25623.1.0.105076:3:password:SNMPv3 Password:pass',
516
            '1.3.6.1.4.1.25623.1.0.105076:4:radio:SNMPv3 Authentication Algorithm:some auth algo',
517
            '1.3.6.1.4.1.25623.1.0.105076:5:password:SNMPv3 Privacy Password:privacy pass',
518
            '1.3.6.1.4.1.25623.1.0.105076:6:radio:SNMPv3 Privacy Algorithm:privacy algo',
519
        ]
520
        cred_dict = {
521
            'ssh': {
522
                'type': 'ssh',
523
                'port': '22',
524
                'username': 'username',
525
                'password': 'pass',
526
            },
527
            'smb': {'type': 'smb', 'username': 'username', 'password': 'pass'},
528
            'esxi': {
529
                'type': 'esxi',
530
                'username': 'username',
531
                'password': 'pass',
532
            },
533
            'snmp': {
534
                'type': 'snmp',
535
                'username': 'username',
536
                'password': 'pass',
537
                'community': 'some comunity',
538
                'auth_algorithm': 'some auth algo',
539
                'privacy_password': 'privacy pass',
540
                'privacy_algorithm': 'privacy algo',
541
            },
542
        }
543
        self.maxDiff = None
544
        ret = w.build_credentials_as_prefs(cred_dict)
545
        self.assertEqual(len(ret), len(cred_out))
546
        self.assertIn('auth_port_ssh|||22', cred_out)
547
        self.assertIn(
548
            '1.3.6.1.4.1.25623.1.0.90023:1:entry:SMB login:|||username',
549
            cred_out,
550
        )
551
552
    def test_build_credentials_ssh_up(self, mock_nvti, mock_db):
553
        w = DummyDaemon(mock_nvti, mock_db)
554
        cred_out = [
555
            'auth_port_ssh|||22',
556
            '1.3.6.1.4.1.25623.1.0.103591:1:entry:SSH login name:|||username',
557
            '1.3.6.1.4.1.25623.1.0.103591:3:password:SSH password (unsafe!):|||pass',
558
        ]
559
        cred_dict = {
560
            'ssh': {
561
                'type': 'up',
562
                'port': '22',
563
                'username': 'username',
564
                'password': 'pass',
565
            }
566
        }
567
        self.maxDiff = None
568
        ret = w.build_credentials_as_prefs(cred_dict)
569
        self.assertEqual(ret, cred_out)
570
571
    def test_process_vts(self, mock_nvti, mock_db):
572
        vts = {
573
            '1.3.6.1.4.1.25623.1.0.100061': {'1': 'new value'},
574
            'vt_groups': ['family=debian', 'family=general'],
575
        }
576
        vt_out = (
577
            ['1.3.6.1.4.1.25623.1.0.100061'],
578
            [
579
                [
580
                    '1.3.6.1.4.1.25623.1.0.100061:1:entry:Data length :',
581
                    'new value',
582
                ]
583
            ],
584
        )
585
        w = DummyDaemon(mock_nvti, mock_db)
586
        w.load_vts()
587
        ret = w.process_vts(vts)
588
        self.assertEqual(ret, vt_out)
589
590
    def test_process_vts_bad_param_id(self, mock_nvti, mock_db):
591
        vts = {
592
            '1.3.6.1.4.1.25623.1.0.100061': {'3': 'new value'},
593
            'vt_groups': ['family=debian', 'family=general'],
594
        }
595
        w = DummyDaemon(mock_nvti, mock_db)
596
        w.load_vts()
597
        ret = w.process_vts(vts)
598
        self.assertFalse(ret[1])
599
600
    def test_process_vts_not_found(self, mock_nvti, mock_db):
601
        vts = {
602
            '1.3.6.1.4.1.25623.1.0.100065': {'3': 'new value'},
603
            'vt_groups': ['family=debian', 'family=general'],
604
        }
605
        w = DummyDaemon(mock_nvti, mock_db)
606
        w.load_vts()
607
        logging.Logger.warning = Mock()
608
        ret = w.process_vts(vts)
609
        if hasattr(Mock, 'assert_called_once'):
610
            logging.Logger.warning.assert_called_once()
611
612
    def test_get_openvas_timestamp_scan_host_end(self, mock_nvti, mock_db):
613
        mock_db.get_host_scan_scan_end_time.return_value = '12345'
614
        w = DummyDaemon(mock_nvti, mock_db)
615
        targets = [['192.168.0.1', 'port', 'cred', 'exclude_host']]
616
        w.create_scan('123-456', targets, None, [])
617
        w.get_openvas_timestamp_scan_host('123-456', '192.168.0.1')
618
        for result in w.scan_collection.results_iterator('123-456', False):
619
            self.assertEqual(result.get('value'), '12345')
620
621
    def test_get_openvas_timestamp_scan_host_start(self, mock_nvti, mock_db):
622
        mock_db.get_host_scan_scan_end_time.return_value = None
623
        mock_db.get_host_scan_scan_end_time.return_value = '54321'
624
        w = DummyDaemon(mock_nvti, mock_db)
625
        targets = [['192.168.0.1', 'port', 'cred', 'exclude_host']]
626
        w.create_scan('123-456', targets, None, [])
627
        w.get_openvas_timestamp_scan_host('123-456', '192.168.0.1')
628
        for result in w.scan_collection.results_iterator('123-456', False):
629
            self.assertEqual(result.get('value'), '54321')
630
631
    def test_host_is_finished(self, mock_nvti, mock_db):
632
        mock_db.get_single_item.return_value = 'finished'
633
        w = DummyDaemon(mock_nvti, mock_db)
634
        ret = w.host_is_finished('123-456')
635
        self.assertEqual(ret, True)
636
637
    def test_scan_is_stopped(self, mock_nvti, mock_db):
638
        mock_db.get_single_item.return_value = 'stop_all'
639
        mock_db.kb_connect_item.return_value = mock_db
640
        mock_db.set_redisctx.return_value = None
641
        w = DummyDaemon(mock_nvti, mock_db)
642
        ret = w.scan_is_stopped('123-456')
643
        self.assertEqual(ret, True)
644
645
    @patch('ospd_openvas.daemon.open')
646
    def test_feed_is_outdated_none(self, mock_open, mock_nvti, mock_db):
647
        w = DummyDaemon(mock_nvti, mock_db)
648
        # Mock parse_param, because feed_is_oudated() will call it.
649
        with patch.object(w, 'parse_param', return_value=None):
650
            # Return None
651
            w.scan_only_params['plugins_folder'] = '/foo/bar'
652
            ret = w.feed_is_outdated('1234')
653
            self.assertIsNone(ret)
654
655
    def test_feed_is_outdated_true(self, mock_nvti, mock_db):
656
        w = DummyDaemon(mock_nvti, mock_db)
657
        # Mock parse_param, because feed_is_oudated() will call it.
658
        with patch.object(w, 'parse_param', return_value=None):
659
            with patch.object(Path, 'exists', return_value=True):
660
                read_data = 'PLUGIN_SET = "1235";'
661
                with patch(
662
                    "builtins.open", return_value=io.StringIO(read_data)
663
                ):
664
                    # Return True
665
                    w.scan_only_params['plugins_folder'] = '/foo/bar'
666
                    ret = w.feed_is_outdated('1234')
667
                    self.assertTrue(ret)
668
669
    def test_feed_is_outdated_false(self, mock_nvti, mock_db):
670
        w = DummyDaemon(mock_nvti, mock_db)
671
        # Mock parse_param, because feed_is_oudated() will call it.
672
        with patch.object(w, 'parse_param', return_value=None):
673
            read_data = 'PLUGIN_SET = "1234";'
674
            with patch.object(Path, 'exists', return_value=True):
675
                read_data = 'PLUGIN_SET = "1234"'
676
                with patch(
677
                    "builtins.open", return_value=io.StringIO(read_data)
678
                ):
679
                    # Return True
680
                    w.scan_only_params['plugins_folder'] = '/foo/bar'
681
                    ret = w.feed_is_outdated('1234')
682
                    self.assertFalse(ret)
683
684
    @patch('ospd_openvas.daemon.OSPDaemon.add_scan_log')
685
    def test_get_openvas_result(self, mock_ospd, mock_nvti, mock_db):
686
        results = ["LOG||| |||general/Host_Details||| |||Host dead", None]
687
        mock_db.get_result.side_effect = results
688
        w = DummyDaemon(mock_nvti, mock_db)
689
        w.load_vts()
690
        mock_ospd.return_value = None
691
        w.get_openvas_result('123-456', 'localhost')
692
        mock_ospd.assert_called_with(
693
            '123-456',
694
            host='localhost',
695
            hostname=' ',
696
            name='',
697
            port='general/Host_Details',
698
            qod='',
699
            test_id=' ',
700
            value='Host dead',
701
        )
702
703
    @patch('ospd_openvas.daemon.OSPDaemon.set_scan_host_progress')
704
    def test_update_progress(self, mock_ospd, mock_nvti, mock_db):
705
        msg = '0/-1'
706
        targets = [['localhost', 'port', 'cred', 'exclude_host']]
707
        w = DummyDaemon(mock_nvti, mock_db)
708
        w.create_scan('123-456', targets, None, [])
709
710
        mock_ospd.return_value = None
711
        w.update_progress('123-456', 'localhost', 'localhost', msg)
712
        mock_ospd.assert_called_with('123-456', 'localhost', 'localhost', 100)
713
714
715
class TestFilters(TestCase):
716
    def test_format_vt_modification_time(self):
717
        ovformat = OpenVasVtsFilter()
718
        td = '1517443741'
719
        formatted = ovformat.format_vt_modification_time(td)
720
        self.assertEqual(formatted, "20180201000901")
721