Passed
Push — master ( ead621...422e90 )
by Juan José
01:34 queued 11s
created

tests.test_daemon.TestOspdOpenvas.test_lock_file()   A

Complexity

Conditions 1

Size

Total Lines 22
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 13
nop 1
dl 0
loc 22
rs 9.75
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018-2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-2.0-or-later
5
#
6
# This program is free software; you can redistribute it and/or
7
# modify it under the terms of the GNU General Public License
8
# as published by the Free Software Foundation; either version 2
9
# of the License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
20
# pylint: disable=invalid-name,line-too-long,no-value-for-parameter
21
22
""" Unit Test for ospd-openvas """
23
24
import io
25
import logging
26
27
from unittest import TestCase
28
from unittest.mock import patch, Mock, MagicMock
29
30
from ospd.vts import Vts
31
from ospd.protocol import OspRequest
32
33
from tests.dummydaemon import DummyDaemon
34
from tests.helper import assert_called_once
35
36
from ospd_openvas.daemon import OSPD_PARAMS, OpenVasVtsFilter
37
from ospd_openvas.openvas import Openvas
38
39
OSPD_PARAMS_OUT = {
40
    'auto_enable_dependencies': {
41
        'type': 'boolean',
42
        'name': 'auto_enable_dependencies',
43
        'default': 1,
44
        'mandatory': 1,
45
        'description': 'Automatically enable the plugins that are depended on',
46
    },
47
    'cgi_path': {
48
        'type': 'string',
49
        'name': 'cgi_path',
50
        'default': '/cgi-bin:/scripts',
51
        'mandatory': 1,
52
        'description': 'Look for default CGIs in /cgi-bin and /scripts',
53
    },
54
    'checks_read_timeout': {
55
        'type': 'integer',
56
        'name': 'checks_read_timeout',
57
        'default': 5,
58
        'mandatory': 1,
59
        'description': 'Number  of seconds that the security checks will '
60
        'wait for when doing a recv()',
61
    },
62
    'drop_privileges': {
63
        'type': 'boolean',
64
        'name': 'drop_privileges',
65
        'default': 0,
66
        'mandatory': 1,
67
        'description': '',
68
    },
69
    'network_scan': {
70
        'type': 'boolean',
71
        'name': 'network_scan',
72
        'default': 0,
73
        'mandatory': 1,
74
        'description': '',
75
    },
76
    'non_simult_ports': {
77
        'type': 'string',
78
        'name': 'non_simult_ports',
79
        'default': '22',
80
        'mandatory': 1,
81
        'description': 'Prevent to make two connections on the same given '
82
        'ports at the same time.',
83
    },
84
    'open_sock_max_attempts': {
85
        'type': 'integer',
86
        'name': 'open_sock_max_attempts',
87
        'default': 5,
88
        'mandatory': 0,
89
        'description': 'Number of unsuccessful retries to open the socket '
90
        'before to set the port as closed.',
91
    },
92
    'timeout_retry': {
93
        'type': 'integer',
94
        'name': 'timeout_retry',
95
        'default': 5,
96
        'mandatory': 0,
97
        'description': 'Number of retries when a socket connection attempt '
98
        'timesout.',
99
    },
100
    'optimize_test': {
101
        'type': 'integer',
102
        'name': 'optimize_test',
103
        'default': 5,
104
        'mandatory': 0,
105
        'description': 'By default, openvas does not trust the remote '
106
        'host banners.',
107
    },
108
    'plugins_timeout': {
109
        'type': 'integer',
110
        'name': 'plugins_timeout',
111
        'default': 5,
112
        'mandatory': 0,
113
        'description': 'This is the maximum lifetime, in seconds of a plugin.',
114
    },
115
    'report_host_details': {
116
        'type': 'boolean',
117
        'name': 'report_host_details',
118
        'default': 1,
119
        'mandatory': 1,
120
        'description': '',
121
    },
122
    'safe_checks': {
123
        'type': 'boolean',
124
        'name': 'safe_checks',
125
        'default': 1,
126
        'mandatory': 1,
127
        'description': 'Disable the plugins with potential to crash '
128
        'the remote services',
129
    },
130
    'scanner_plugins_timeout': {
131
        'type': 'integer',
132
        'name': 'scanner_plugins_timeout',
133
        'default': 36000,
134
        'mandatory': 1,
135
        'description': 'Like plugins_timeout, but for ACT_SCANNER plugins.',
136
    },
137
    'time_between_request': {
138
        'type': 'integer',
139
        'name': 'time_between_request',
140
        'default': 0,
141
        'mandatory': 0,
142
        'description': 'Allow to set a wait time between two actions '
143
        '(open, send, close).',
144
    },
145
    'unscanned_closed': {
146
        'type': 'boolean',
147
        'name': 'unscanned_closed',
148
        'default': 1,
149
        'mandatory': 1,
150
        'description': '',
151
    },
152
    'unscanned_closed_udp': {
153
        'type': 'boolean',
154
        'name': 'unscanned_closed_udp',
155
        'default': 1,
156
        'mandatory': 1,
157
        'description': '',
158
    },
159
    'expand_vhosts': {
160
        'type': 'boolean',
161
        'name': 'expand_vhosts',
162
        'default': 1,
163
        'mandatory': 0,
164
        'description': 'Whether to expand the target hosts '
165
        + 'list of vhosts with values gathered from sources '
166
        + 'such as reverse-lookup queries and VT checks '
167
        + 'for SSL/TLS certificates.',
168
    },
169
    'test_empty_vhost': {
170
        'type': 'boolean',
171
        'name': 'test_empty_vhost',
172
        'default': 0,
173
        'mandatory': 0,
174
        'description': 'If  set  to  yes, the scanner will '
175
        + 'also test the target by using empty vhost value '
176
        + 'in addition to the targets associated vhost values.',
177
    },
178
}
179
180
181
class TestOspdOpenvas(TestCase):
182
    @patch('ospd_openvas.daemon.Openvas')
183
    def test_set_params_from_openvas_settings(self, mock_openvas: Openvas):
184
        mock_openvas.get_settings.return_value = {
185
            'non_simult_ports': '22',
186
            'plugins_folder': '/foo/bar',
187
        }
188
        w = DummyDaemon()
189
        w.set_params_from_openvas_settings()
190
191
        self.assertEqual(mock_openvas.get_settings.call_count, 1)
192
        self.assertEqual(OSPD_PARAMS, OSPD_PARAMS_OUT)
193
        self.assertEqual(w.scan_only_params.get('plugins_folder'), '/foo/bar')
194
195
    @patch('ospd_openvas.daemon.Openvas')
196
    def test_sudo_available(self, mock_openvas):
197
        mock_openvas.check_sudo.return_value = True
198
199
        w = DummyDaemon()
200
        w._sudo_available = None  # pylint: disable=protected-access
201
        w.sudo_available  # pylint: disable=pointless-statement
202
203
        self.assertTrue(w.sudo_available)
204
205
    def test_load_vts(self,):
206
        w = DummyDaemon()
207
        w.load_vts()
208
209
        self.assertIsInstance(w.vts, type(Vts()))
210
        self.assertEqual(len(w.vts), len(w.VTS))
211
212
    def test_lock_file(self):
213
        w = DummyDaemon()
214
        # Not locked
215
        res = w.feed_locked()
216
        self.assertFalse(res)
217
218
        # Created
219
        res = w.create_feed_lock_file()
220
        self.assertTrue(res)
221
222
        # Locked
223
        res = w.feed_locked()
224
        self.assertTrue(res)
225
226
        # Not created because locked
227
        res = w.create_feed_lock_file()
228
        self.assertFalse(res)
229
230
        # Delete
231
        w.delete_feed_lock_file()
232
        res = w.feed_locked()
233
        self.assertFalse(res)
234
235
    def test_get_custom_xml(self):
236
        out = (
237
            '<custom>'
238
            '<required_ports>Services/www, 80</required_ports>'
239
            '<category>3</category>'
240
            '<excluded_keys>Settings/disable_cgi_scanning</excluded_keys>'
241
            '<family>Product detection</family>'
242
            '<filename>mantis_detect.nasl</filename>'
243
            '<timeout>0</timeout>'
244
            '</custom>'
245
        )
246
        w = DummyDaemon()
247
248
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
249
        res = w.get_custom_vt_as_xml_str(
250
            '1.3.6.1.4.1.25623.1.0.100061', vt.get('custom')
251
        )
252
        self.assertEqual(len(res), len(out))
253
254
    def test_get_custom_xml_failed(self):
255
        w = DummyDaemon()
256
        logging.Logger.warning = Mock()
257
258
        custom = {'a': u"\u0006"}
259
        w.get_custom_vt_as_xml_str(
260
            '1.3.6.1.4.1.25623.1.0.100061', custom=custom
261
        )
262
263
        assert_called_once(logging.Logger.warning)
264
265
    def test_get_severities_xml(self):
266
        w = DummyDaemon()
267
268
        out = (
269
            '<severities>'
270
            '<severity type="cvss_base_v2">'
271
            'AV:N/AC:L/Au:N/C:N/I:N/A:N'
272
            '</severity>'
273
            '</severities>'
274
        )
275
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
276
        severities = vt.get('severities')
277
        res = w.get_severities_vt_as_xml_str(
278
            '1.3.6.1.4.1.25623.1.0.100061', severities
279
        )
280
281
        self.assertEqual(res, out)
282
283
    def test_get_severities_xml_failed(self):
284
        w = DummyDaemon()
285
        logging.Logger.warning = Mock()
286
287
        sever = {'severity_base_vector': u"\u0006"}
288
        w.get_severities_vt_as_xml_str(
289
            '1.3.6.1.4.1.25623.1.0.100061', severities=sever
290
        )
291
292
        assert_called_once(logging.Logger.warning)
293
294
    def test_get_params_xml(self):
295
        w = DummyDaemon()
296
        out = (
297
            '<params>'
298
            '<param type="checkbox" id="2">'
299
            '<name>Do not randomize the  order  in  which ports are '
300
            'scanned</name>'
301
            '<default>no</default>'
302
            '</param>'
303
            '<param type="entry" id="1">'
304
            '<name>Data length :</name>'
305
            '</param>'
306
            '</params>'
307
        )
308
309
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
310
        params = vt.get('vt_params')
311
        res = w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params)
312
313
        self.assertEqual(len(res), len(out))
314
315
    def test_get_params_xml_failed(self):
316
        w = DummyDaemon()
317
        logging.Logger.warning = Mock()
318
319
        params = {
320
            '1': {
321
                'id': '1',
322
                'type': 'entry',
323
                'default': u'\u0006',
324
                'name': 'dns-fuzz.timelimit',
325
                'description': 'Description',
326
            }
327
        }
328
        w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params)
329
330
        assert_called_once(logging.Logger.warning)
331
332
    def test_get_refs_xml(self):
333
        w = DummyDaemon()
334
335
        out = '<refs><ref type="url" id="http://www.mantisbt.org/"/></refs>'
336
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
337
        refs = vt.get('vt_refs')
338
        res = w.get_refs_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', refs)
339
340
        self.assertEqual(res, out)
341
342
    def test_get_dependencies_xml(self):
343
        w = DummyDaemon()
344
345
        out = (
346
            '<dependencies>'
347
            '<dependency vt_id="1.2.3.4"/><dependency vt_id="4.3.2.1"/>'
348
            '</dependencies>'
349
        )
350
        dep = ['1.2.3.4', '4.3.2.1']
351
        res = w.get_dependencies_vt_as_xml_str(
352
            '1.3.6.1.4.1.25623.1.0.100061', dep
353
        )
354
355
        self.assertEqual(res, out)
356
357
    def test_get_dependencies_xml_failed(self):
358
        w = DummyDaemon()
359
        logging.Logger.error = Mock()
360
361
        dep = [u"\u0006"]
362
        w.get_dependencies_vt_as_xml_str(
363
            '1.3.6.1.4.1.25623.1.0.100061', vt_dependencies=dep
364
        )
365
366
        assert_called_once(logging.Logger.error)
367
368
    def test_get_ctime_xml(self):
369
        w = DummyDaemon()
370
371
        out = '<creation_time>1237458156</creation_time>'
372
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
373
        ctime = vt.get('creation_time')
374
        res = w.get_creation_time_vt_as_xml_str(
375
            '1.3.6.1.4.1.25623.1.0.100061', ctime
376
        )
377
378
        self.assertEqual(res, out)
379
380
    def test_get_ctime_xml_failed(self):
381
        w = DummyDaemon()
382
        logging.Logger.warning = Mock()
383
384
        ctime = u'\u0006'
385
        w.get_creation_time_vt_as_xml_str(
386
            '1.3.6.1.4.1.25623.1.0.100061', vt_creation_time=ctime
387
        )
388
389
        assert_called_once(logging.Logger.warning)
390
391
    def test_get_mtime_xml(self):
392
        w = DummyDaemon()
393
394
        out = '<modification_time>1533906565</modification_time>'
395
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
396
        mtime = vt.get('modification_time')
397
        res = w.get_modification_time_vt_as_xml_str(
398
            '1.3.6.1.4.1.25623.1.0.100061', mtime
399
        )
400
401
        self.assertEqual(res, out)
402
403
    def test_get_mtime_xml_failed(self):
404
        w = DummyDaemon()
405
        logging.Logger.warning = Mock()
406
407
        mtime = u'\u0006'
408
        w.get_modification_time_vt_as_xml_str(
409
            '1.3.6.1.4.1.25623.1.0.100061', mtime
410
        )
411
412
        assert_called_once(logging.Logger.warning)
413
414
    def test_get_summary_xml(self):
415
        w = DummyDaemon()
416
417
        out = '<summary>some summary</summary>'
418
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
419
        summary = vt.get('summary')
420
        res = w.get_summary_vt_as_xml_str(
421
            '1.3.6.1.4.1.25623.1.0.100061', summary
422
        )
423
424
        self.assertEqual(res, out)
425
426
    def test_get_summary_xml_failed(self):
427
        w = DummyDaemon()
428
429
        summary = u'\u0006'
430
        logging.Logger.warning = Mock()
431
        w.get_summary_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', summary)
432
433
        assert_called_once(logging.Logger.warning)
434
435
    def test_get_impact_xml(self):
436
        w = DummyDaemon()
437
438
        out = '<impact>some impact</impact>'
439
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
440
        impact = vt.get('impact')
441
        res = w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact)
442
443
        self.assertEqual(res, out)
444
445
    def test_get_impact_xml_failed(self):
446
        w = DummyDaemon()
447
        logging.Logger.warning = Mock()
448
449
        impact = u'\u0006'
450
        w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact)
451
452
        assert_called_once(logging.Logger.warning)
453
454
    def test_get_insight_xml(self):
455
        w = DummyDaemon()
456
457
        out = '<insight>some insight</insight>'
458
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
459
        insight = vt.get('insight')
460
        res = w.get_insight_vt_as_xml_str(
461
            '1.3.6.1.4.1.25623.1.0.100061', insight
462
        )
463
464
        self.assertEqual(res, out)
465
466
    def test_get_insight_xml_failed(self):
467
        w = DummyDaemon()
468
        logging.Logger.warning = Mock()
469
470
        insight = u'\u0006'
471
        w.get_insight_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', insight)
472
473
        assert_called_once(logging.Logger.warning)
474
475
    def test_get_solution_xml(self):
476
        w = DummyDaemon()
477
478
        out = (
479
            '<solution type="WillNotFix" method="DebianAPTUpgrade">'
480
            'some solution'
481
            '</solution>'
482
        )
483
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
484
        solution = vt.get('solution')
485
        solution_type = vt.get('solution_type')
486
        solution_method = vt.get('solution_method')
487
488
        res = w.get_solution_vt_as_xml_str(
489
            '1.3.6.1.4.1.25623.1.0.100061',
490
            solution,
491
            solution_type,
492
            solution_method,
493
        )
494
495
        self.assertEqual(res, out)
496
497
    def test_get_solution_xml_failed(self):
498
        w = DummyDaemon()
499
        logging.Logger.warning = Mock()
500
501
        solution = u'\u0006'
502
        w.get_solution_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', solution)
503
504
        assert_called_once(logging.Logger.warning)
505
506
    def test_get_detection_xml(self):
507
        w = DummyDaemon()
508
509
        out = '<detection qod_type="remote_banner"/>'
510
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
511
        detection_type = vt.get('qod_type')
512
513
        res = w.get_detection_vt_as_xml_str(
514
            '1.3.6.1.4.1.25623.1.0.100061', qod_type=detection_type
515
        )
516
517
        self.assertEqual(res, out)
518
519
    def test_get_detection_xml_failed(self):
520
        w = DummyDaemon()
521
        logging.Logger.warning = Mock()
522
523
        detection = u'\u0006'
524
        w.get_detection_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', detection)
525
526
        assert_called_once(logging.Logger.warning)
527
528
    def test_get_affected_xml(self):
529
        w = DummyDaemon()
530
        out = '<affected>some affection</affected>'
531
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
532
        affected = vt.get('affected')
533
534
        res = w.get_affected_vt_as_xml_str(
535
            '1.3.6.1.4.1.25623.1.0.100061', affected=affected
536
        )
537
538
        self.assertEqual(res, out)
539
540
    def test_get_affected_xml_failed(self):
541
        w = DummyDaemon()
542
        logging.Logger.warning = Mock()
543
544
        affected = u"\u0006" + "affected"
545
        w.get_affected_vt_as_xml_str(
546
            '1.3.6.1.4.1.25623.1.0.100061', affected=affected
547
        )
548
549
        assert_called_once(logging.Logger.warning)
550
551
    def test_build_credentials(self):
552
        w = DummyDaemon()
553
554
        cred_out = [
555
            '1.3.6.1.4.1.25623.1.0.105058:1:entry:ESXi login name:|||username',
556
            '1.3.6.1.4.1.25623.1.0.105058:2:password:ESXi login password:|||pass',
557
            'auth_port_ssh|||22',
558
            '1.3.6.1.4.1.25623.1.0.103591:1:entry:SSH login name:|||username',
559
            '1.3.6.1.4.1.25623.1.0.103591:2:password:SSH key passphrase:|||pass',
560
            '1.3.6.1.4.1.25623.1.0.103591:4:file:SSH private key:|||',
561
            '1.3.6.1.4.1.25623.1.0.90023:1:entry:SMB login:|||username',
562
            '1.3.6.1.4.1.25623.1.0.90023:2:password]:SMB password :|||pass',
563
            '1.3.6.1.4.1.25623.1.0.105076:1:password:SNMP Community:some comunity',
564
            '1.3.6.1.4.1.25623.1.0.105076:2:entry:SNMPv3 Username:username',
565
            '1.3.6.1.4.1.25623.1.0.105076:3:password:SNMPv3 Password:pass',
566
            '1.3.6.1.4.1.25623.1.0.105076:4:radio:SNMPv3 Authentication Algorithm:some auth algo',
567
            '1.3.6.1.4.1.25623.1.0.105076:5:password:SNMPv3 Privacy Password:privacy pass',
568
            '1.3.6.1.4.1.25623.1.0.105076:6:radio:SNMPv3 Privacy Algorithm:privacy algo',
569
        ]
570
        cred_dict = {
571
            'ssh': {
572
                'type': 'ssh',
573
                'port': '22',
574
                'username': 'username',
575
                'password': 'pass',
576
            },
577
            'smb': {'type': 'smb', 'username': 'username', 'password': 'pass'},
578
            'esxi': {
579
                'type': 'esxi',
580
                'username': 'username',
581
                'password': 'pass',
582
            },
583
            'snmp': {
584
                'type': 'snmp',
585
                'username': 'username',
586
                'password': 'pass',
587
                'community': 'some comunity',
588
                'auth_algorithm': 'some auth algo',
589
                'privacy_password': 'privacy pass',
590
                'privacy_algorithm': 'privacy algo',
591
            },
592
        }
593
594
        ret = w.build_credentials_as_prefs(cred_dict)
595
596
        self.assertEqual(len(ret), len(cred_out))
597
        self.assertIn('auth_port_ssh|||22', cred_out)
598
        self.assertIn(
599
            '1.3.6.1.4.1.25623.1.0.90023:1:entry:SMB login:|||username',
600
            cred_out,
601
        )
602
603
    def test_build_credentials_ssh_up(self):
604
        w = DummyDaemon()
605
606
        cred_out = [
607
            'auth_port_ssh|||22',
608
            '1.3.6.1.4.1.25623.1.0.103591:1:entry:SSH login name:|||username',
609
            '1.3.6.1.4.1.25623.1.0.103591:3:password:SSH password (unsafe!):|||pass',
610
        ]
611
        cred_dict = {
612
            'ssh': {
613
                'type': 'up',
614
                'port': '22',
615
                'username': 'username',
616
                'password': 'pass',
617
            }
618
        }
619
620
        ret = w.build_credentials_as_prefs(cred_dict)
621
622
        self.assertEqual(ret, cred_out)
623
624
    def test_build_alive_test_opt_empty(self):
625
        w = DummyDaemon()
626
627
        target_options_dict = {'alive_test': '0'}
628
629
        ret = w.build_alive_test_opt_as_prefs(target_options_dict)
630
631
        self.assertEqual(ret, [])
632
633
    def test_build_alive_test_opt(self):
634
        w = DummyDaemon()
635
636
        alive_test_out = [
637
            "1.3.6.1.4.1.25623.1.0.100315:1:checkbox:Do a TCP ping|||no",
638
            "1.3.6.1.4.1.25623.1.0.100315:2:checkbox:TCP ping tries also TCP-SYN ping|||no",
639
            "1.3.6.1.4.1.25623.1.0.100315:7:checkbox:TCP ping tries only TCP-SYN ping|||no",
640
            "1.3.6.1.4.1.25623.1.0.100315:3:checkbox:Do an ICMP ping|||yes",
641
            "1.3.6.1.4.1.25623.1.0.100315:4:checkbox:Use ARP|||no",
642
            "1.3.6.1.4.1.25623.1.0.100315:5:checkbox:Mark unrechable Hosts as dead (not scanning)|||yes",
643
        ]
644
        target_options_dict = {'alive_test': '2'}
645
646
        ret = w.build_alive_test_opt_as_prefs(target_options_dict)
647
648
        self.assertEqual(ret, alive_test_out)
649
650
    def test_build_alive_test_opt_fail_1(self):
651
        w = DummyDaemon()
652
        logging.Logger.debug = Mock()
653
654
        target_options_dict = {'alive_test': 'a'}
655
656
        target_options = w.build_alive_test_opt_as_prefs(target_options_dict)
657
658
        assert_called_once(logging.Logger.debug)
659
        self.assertEqual(len(target_options), 0)
660
661
    def test_process_vts(self):
662
        w = DummyDaemon()
663
664
        vts = {
665
            '1.3.6.1.4.1.25623.1.0.100061': {'1': 'new value'},
666
            'vt_groups': ['family=debian', 'family=general'],
667
        }
668
        vt_out = (
669
            ['1.3.6.1.4.1.25623.1.0.100061'],
670
            {'1.3.6.1.4.1.25623.1.0.100061:1:entry:Data length :': 'new value'},
671
        )
672
673
        w.load_vts()
674
        w.temp_vts = w.vts
675
676
        ret = w.process_vts(vts)
677
678
        self.assertEqual(ret, vt_out)
679
680
    def test_process_vts_bad_param_id(self):
681
        w = DummyDaemon()
682
683
        vts = {
684
            '1.3.6.1.4.1.25623.1.0.100061': {'3': 'new value'},
685
            'vt_groups': ['family=debian', 'family=general'],
686
        }
687
688
        w.load_vts()
689
        w.temp_vts = w.vts
690
691
        ret = w.process_vts(vts)
692
693
        self.assertFalse(ret[1])
694
695
    def test_process_vts_not_found(self):
696
        w = DummyDaemon()
697
        logging.Logger.warning = Mock()
698
699
        vts = {
700
            '1.3.6.1.4.1.25623.1.0.100065': {'3': 'new value'},
701
            'vt_groups': ['family=debian', 'family=general'],
702
        }
703
704
        w.load_vts()
705
        w.temp_vts = w.vts
706
707
        w.process_vts(vts)
708
709
        assert_called_once(logging.Logger.warning)
710
711
    # def test_get_openvas_timestamp_scan_host_end(self):
712
    #     w = DummyDaemon()
713
714
    #     mock_db.get_host_scan_scan_end_time.return_value = '12345'
715
716
    #     target_list = w.create_xml_target()
717
    #     targets = w.process_targets_element(target_list)
718
719
    #     w.create_scan('123-456', targets, None, [])
720
    #     w.get_openvas_timestamp_scan_host('123-456', '192.168.0.1')
721
722
    #     for result in w.scan_collection.results_iterator('123-456', False):
723
    #         self.assertEqual(result.get('value'), '12345')
724
725
    # def test_get_openvas_timestamp_scan_host_start(self):
726
    #     w = DummyDaemon()
727
728
    #     mock_db.get_host_scan_scan_end_time.return_value = None
729
    #     mock_db.get_host_scan_scan_end_time.return_value = '54321'
730
731
    #     target_list = w.create_xml_target()
732
    #     targets = w.process_targets_element(target_list)
733
734
    #     w.create_scan('123-456', targets, None, [])
735
    #     w.get_openvas_timestamp_scan_host('123-456', '192.168.0.1')
736
737
    #     for result in w.scan_collection.results_iterator('123-456', False):
738
    #         self.assertEqual(result.get('value'), '54321')
739
740
    def test_feed_is_healthy_true(self):
741
        w = DummyDaemon()
742
743
        w.nvti.get_nvt_count.return_value = 2
744
        w.nvti.get_nvt_files_count.return_value = 2
745
        w.vts = ["a", "b"]
746
747
        ret = w.feed_is_healthy()
748
        self.assertTrue(ret)
749
750
    def test_feed_is_healthy_false(self):
751
        w = DummyDaemon()
752
753
        w.nvti.get_nvt_count.return_value = 1
754
        w.nvti.get_nvt_files_count.return_value = 2
755
756
        w.vts = ["a", "b"]
757
758
        ret = w.feed_is_healthy()
759
760
        self.assertFalse(ret)
761
762
        w.nvti.get_nvt_count.return_value = 2
763
        w.nvti.get_nvt_files_count.return_value = 1
764
765
        ret = w.feed_is_healthy()
766
767
        self.assertFalse(ret)
768
769
        w.nvti.get_nvt_count.return_value = 2
770
        w.nvti.get_nvt_files_count.return_value = 2
771
772
        w.vts = ["a"]
773
774
        ret = w.feed_is_healthy()
775
776
        self.assertFalse(ret)
777
778
    @patch('ospd_openvas.daemon.Path.exists')
779
    @patch('ospd_openvas.daemon.OSPDopenvas.set_params_from_openvas_settings')
780
    def test_feed_is_outdated_none(
781
        self, mock_set_params: MagicMock, mock_path_exists: MagicMock
782
    ):
783
        w = DummyDaemon()
784
785
        w.scan_only_params['plugins_folder'] = '/foo/bar'
786
787
        # Return None
788
        mock_path_exists.return_value = False
789
790
        ret = w.feed_is_outdated('1234')
791
        self.assertIsNone(ret)
792
793
        self.assertEqual(mock_set_params.call_count, 1)
794
        self.assertEqual(mock_path_exists.call_count, 1)
795
796
    @patch('ospd_openvas.daemon.Path.exists')
797
    @patch('ospd_openvas.daemon.Path.open')
798
    def test_feed_is_outdated_true(
799
        self, mock_path_open: MagicMock, mock_path_exists: MagicMock,
800
    ):
801
        read_data = 'PLUGIN_SET = "1235";'
802
803
        mock_path_exists.return_value = True
804
        mock_read = MagicMock(name='Path open context manager')
805
        mock_read.__enter__ = MagicMock(return_value=io.StringIO(read_data))
806
        mock_path_open.return_value = mock_read
807
808
        w = DummyDaemon()
809
810
        # Return True
811
        w.scan_only_params['plugins_folder'] = '/foo/bar'
812
813
        ret = w.feed_is_outdated('1234')
814
        self.assertTrue(ret)
815
816
        self.assertEqual(mock_path_exists.call_count, 1)
817
        self.assertEqual(mock_path_open.call_count, 1)
818
819
    @patch('ospd_openvas.daemon.Path.exists')
820
    @patch('ospd_openvas.daemon.Path.open')
821
    def test_feed_is_outdated_false(
822
        self, mock_path_open: MagicMock, mock_path_exists: MagicMock,
823
    ):
824
        mock_path_exists.return_value = True
825
826
        read_data = 'PLUGIN_SET = "1234"'
827
        mock_path_exists.return_value = True
828
        mock_read = MagicMock(name='Path open context manager')
829
        mock_read.__enter__ = MagicMock(return_value=io.StringIO(read_data))
830
        mock_path_open.return_value = mock_read
831
832
        w = DummyDaemon()
833
        w.scan_only_params['plugins_folder'] = '/foo/bar'
834
835
        ret = w.feed_is_outdated('1234')
836
        self.assertFalse(ret)
837
838
        self.assertEqual(mock_path_exists.call_count, 1)
839
        self.assertEqual(mock_path_open.call_count, 1)
840
841
    @patch('ospd_openvas.daemon.ScanDB')
842
    @patch('ospd_openvas.daemon.OSPDaemon.add_scan_log')
843
    def test_get_openvas_result(self, mock_add_scan_log, MockDBClass):
844
        w = DummyDaemon()
845
        mock_db = MockDBClass.return_value
846
847
        results = ["LOG||| |||general/Host_Details||| |||Host dead", None]
848
        mock_db.get_result.side_effect = results
849
        mock_add_scan_log.return_value = None
850
851
        w.load_vts()
852
        w.report_openvas_results(mock_db, '123-456', 'localhost')
853
854
        mock_add_scan_log.assert_called_with(
855
            '123-456',
856
            host='localhost',
857
            hostname='',
858
            name='',
859
            port='general/Host_Details',
860
            qod='',
861
            test_id='',
862
            value='Host dead',
863
        )
864
865
    @patch('ospd_openvas.daemon.ScanDB')
866
    @patch('ospd_openvas.daemon.OSPDaemon.add_scan_log')
867
    def test_get_openvas_result_escaped(self, mock_ospd, MockDBClass):
868
        w = DummyDaemon()
869
        mock_db = MockDBClass.return_value
870
871
        results = [
872
            "LOG||| |||general/Host_Details|||1.3.6.1.4.1.25623.1.0.100061|||Alive",
873
            None,
874
        ]
875
        mock_db.get_result.side_effect = results
876
        mock_ospd.return_value = None
877
878
        w.nvti.QOD_TYPES.__getitem__.return_value = ''
879
880
        w.load_vts()
881
        w.report_openvas_results(mock_db, '123-456', 'localhost')
882
883
        mock_ospd.assert_called_with(
884
            '123-456',
885
            host='localhost',
886
            hostname='',
887
            name='Mantis Detection &amp; foo',
888
            port='general/Host_Details',
889
            qod='',
890
            test_id='1.3.6.1.4.1.25623.1.0.100061',
891
            value='Alive',
892
        )
893
894
    @patch('ospd_openvas.daemon.OSPDaemon.set_scan_host_progress')
895
    def test_update_progress(self, mock_set_scan_host_progress):
896
        w = DummyDaemon()
897
898
        mock_set_scan_host_progress.return_value = None
899
900
        msg = '0/-1'
901
        target_element = w.create_xml_target()
902
        targets = OspRequest.process_target_element(target_element)
903
904
        w.create_scan('123-456', targets, None, [])
905
        w.update_progress('123-456', 'localhost', msg)
906
907
        mock_set_scan_host_progress.assert_called_with(
908
            '123-456', 'localhost', 100
909
        )
910
911
912
class TestFilters(TestCase):
913
    def test_format_vt_modification_time(self):
914
        ovformat = OpenVasVtsFilter()
915
        td = '1517443741'
916
        formatted = ovformat.format_vt_modification_time(td)
917
        self.assertEqual(formatted, "20180201000901")
918