Completed
Push — master ( 351cbc...3c6238 )
by
unknown
37s queued 13s
created

TestOspdOpenvas.test_update_progress()   A

Complexity

Conditions 1

Size

Total Lines 15
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 11
nop 2
dl 0
loc 15
rs 9.85
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
# pylint: disable=invalid-name,line-too-long,no-value-for-parameter
21
22
""" Unit Test for ospd-openvas """
23
24
import io
25
import logging
26
27
from unittest import TestCase
28
from unittest.mock import patch, Mock, MagicMock
29
30
from ospd.vts import Vts
31
from ospd.protocol import OspRequest
32
33
from tests.dummydaemon import DummyDaemon
34
from tests.helper import assert_called_once
35
36
from ospd_openvas.daemon import OSPD_PARAMS, OpenVasVtsFilter
37
from ospd_openvas.openvas import Openvas
38
39
OSPD_PARAMS_OUT = {
40
    'auto_enable_dependencies': {
41
        'type': 'boolean',
42
        'name': 'auto_enable_dependencies',
43
        'default': 1,
44
        'mandatory': 1,
45
        'description': 'Automatically enable the plugins that are depended on',
46
    },
47
    'cgi_path': {
48
        'type': 'string',
49
        'name': 'cgi_path',
50
        'default': '/cgi-bin:/scripts',
51
        'mandatory': 1,
52
        'description': 'Look for default CGIs in /cgi-bin and /scripts',
53
    },
54
    'checks_read_timeout': {
55
        'type': 'integer',
56
        'name': 'checks_read_timeout',
57
        'default': 5,
58
        'mandatory': 1,
59
        'description': 'Number  of seconds that the security checks will '
60
        'wait for when doing a recv()',
61
    },
62
    'drop_privileges': {
63
        'type': 'boolean',
64
        'name': 'drop_privileges',
65
        'default': 0,
66
        'mandatory': 1,
67
        'description': '',
68
    },
69
    'non_simult_ports': {
70
        'type': 'string',
71
        'name': 'non_simult_ports',
72
        'default': '22',
73
        'mandatory': 1,
74
        'description': 'Prevent to make two connections on the same given '
75
        'ports at the same time.',
76
    },
77
    'open_sock_max_attempts': {
78
        'type': 'integer',
79
        'name': 'open_sock_max_attempts',
80
        'default': 5,
81
        'mandatory': 0,
82
        'description': 'Number of unsuccessful retries to open the socket '
83
        'before to set the port as closed.',
84
    },
85
    'timeout_retry': {
86
        'type': 'integer',
87
        'name': 'timeout_retry',
88
        'default': 5,
89
        'mandatory': 0,
90
        'description': 'Number of retries when a socket connection attempt '
91
        'timesout.',
92
    },
93
    'optimize_test': {
94
        'type': 'integer',
95
        'name': 'optimize_test',
96
        'default': 5,
97
        'mandatory': 0,
98
        'description': 'By default, openvas does not trust the remote '
99
        'host banners.',
100
    },
101
    'plugins_timeout': {
102
        'type': 'integer',
103
        'name': 'plugins_timeout',
104
        'default': 5,
105
        'mandatory': 0,
106
        'description': 'This is the maximum lifetime, in seconds of a plugin.',
107
    },
108
    'report_host_details': {
109
        'type': 'boolean',
110
        'name': 'report_host_details',
111
        'default': 1,
112
        'mandatory': 1,
113
        'description': '',
114
    },
115
    'safe_checks': {
116
        'type': 'boolean',
117
        'name': 'safe_checks',
118
        'default': 1,
119
        'mandatory': 1,
120
        'description': 'Disable the plugins with potential to crash '
121
        'the remote services',
122
    },
123
    'scanner_plugins_timeout': {
124
        'type': 'integer',
125
        'name': 'scanner_plugins_timeout',
126
        'default': 36000,
127
        'mandatory': 1,
128
        'description': 'Like plugins_timeout, but for ACT_SCANNER plugins.',
129
    },
130
    'time_between_request': {
131
        'type': 'integer',
132
        'name': 'time_between_request',
133
        'default': 0,
134
        'mandatory': 0,
135
        'description': 'Allow to set a wait time between two actions '
136
        '(open, send, close).',
137
    },
138
    'unscanned_closed': {
139
        'type': 'boolean',
140
        'name': 'unscanned_closed',
141
        'default': 1,
142
        'mandatory': 1,
143
        'description': '',
144
    },
145
    'unscanned_closed_udp': {
146
        'type': 'boolean',
147
        'name': 'unscanned_closed_udp',
148
        'default': 1,
149
        'mandatory': 1,
150
        'description': '',
151
    },
152
    'expand_vhosts': {
153
        'type': 'boolean',
154
        'name': 'expand_vhosts',
155
        'default': 1,
156
        'mandatory': 0,
157
        'description': 'Whether to expand the target hosts '
158
        + 'list of vhosts with values gathered from sources '
159
        + 'such as reverse-lookup queries and VT checks '
160
        + 'for SSL/TLS certificates.',
161
    },
162
    'test_empty_vhost': {
163
        'type': 'boolean',
164
        'name': 'test_empty_vhost',
165
        'default': 0,
166
        'mandatory': 0,
167
        'description': 'If  set  to  yes, the scanner will '
168
        + 'also test the target by using empty vhost value '
169
        + 'in addition to the targets associated vhost values.',
170
    },
171
}
172
173
174
class TestOspdOpenvas(TestCase):
175
    @patch('ospd_openvas.daemon.Openvas')
176
    def test_set_params_from_openvas_settings(self, mock_openvas: Openvas):
177
        mock_openvas.get_settings.return_value = {
178
            'non_simult_ports': '22',
179
            'plugins_folder': '/foo/bar',
180
        }
181
        w = DummyDaemon()
182
        w.set_params_from_openvas_settings()
183
184
        self.assertEqual(mock_openvas.get_settings.call_count, 1)
185
        self.assertEqual(OSPD_PARAMS, OSPD_PARAMS_OUT)
186
        self.assertEqual(w.scan_only_params.get('plugins_folder'), '/foo/bar')
187
188
    @patch('ospd_openvas.daemon.Openvas')
189
    def test_sudo_available(self, mock_openvas):
190
        mock_openvas.check_sudo.return_value = True
191
192
        w = DummyDaemon()
193
        w._sudo_available = None  # pylint: disable=protected-access
194
        w.sudo_available  # pylint: disable=pointless-statement
195
196
        self.assertTrue(w.sudo_available)
197
198
    def test_get_custom_xml(self):
199
        out = (
200
            '<custom>'
201
            '<required_ports>Services/www, 80</required_ports>'
202
            '<category>3</category>'
203
            '<excluded_keys>Settings/disable_cgi_scanning</excluded_keys>'
204
            '<family>Product detection</family>'
205
            '<filename>mantis_detect.nasl</filename>'
206
            '<timeout>0</timeout>'
207
            '</custom>'
208
        )
209
        w = DummyDaemon()
210
211
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
212
        res = w.get_custom_vt_as_xml_str(
213
            '1.3.6.1.4.1.25623.1.0.100061', vt.get('custom')
214
        )
215
        self.assertEqual(len(res), len(out))
216
217
    def test_get_custom_xml_failed(self):
218
        w = DummyDaemon()
219
        logging.Logger.warning = Mock()
220
221
        custom = {'a': u"\u0006"}
222
        w.get_custom_vt_as_xml_str(
223
            '1.3.6.1.4.1.25623.1.0.100061', custom=custom
224
        )
225
226
        assert_called_once(logging.Logger.warning)
227
228
    def test_get_severities_xml(self):
229
        w = DummyDaemon()
230
231
        out = (
232
            '<severities>'
233
            '<severity type="cvss_base_v2">'
234
            'AV:N/AC:L/Au:N/C:N/I:N/A:N'
235
            '</severity>'
236
            '</severities>'
237
        )
238
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
239
        severities = vt.get('severities')
240
        res = w.get_severities_vt_as_xml_str(
241
            '1.3.6.1.4.1.25623.1.0.100061', severities
242
        )
243
244
        self.assertEqual(res, out)
245
246
    def test_get_severities_xml_failed(self):
247
        w = DummyDaemon()
248
        logging.Logger.warning = Mock()
249
250
        sever = {'severity_base_vector': u"\u0006"}
251
        w.get_severities_vt_as_xml_str(
252
            '1.3.6.1.4.1.25623.1.0.100061', severities=sever
253
        )
254
255
        assert_called_once(logging.Logger.warning)
256
257
    def test_get_params_xml(self):
258
        w = DummyDaemon()
259
        out = (
260
            '<params>'
261
            '<param type="checkbox" id="2">'
262
            '<name>Do not randomize the  order  in  which ports are '
263
            'scanned</name>'
264
            '<default>no</default>'
265
            '</param>'
266
            '<param type="entry" id="1">'
267
            '<name>Data length :</name>'
268
            '</param>'
269
            '</params>'
270
        )
271
272
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
273
        params = vt.get('vt_params')
274
        res = w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params)
275
276
        self.assertEqual(len(res), len(out))
277
278
    def test_get_params_xml_failed(self):
279
        w = DummyDaemon()
280
        logging.Logger.warning = Mock()
281
282
        params = {
283
            '1': {
284
                'id': '1',
285
                'type': 'entry',
286
                'default': u'\u0006',
287
                'name': 'dns-fuzz.timelimit',
288
                'description': 'Description',
289
            }
290
        }
291
        w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params)
292
293
        assert_called_once(logging.Logger.warning)
294
295
    def test_get_refs_xml(self):
296
        w = DummyDaemon()
297
298
        out = '<refs><ref type="url" id="http://www.mantisbt.org/"/></refs>'
299
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
300
        refs = vt.get('vt_refs')
301
        res = w.get_refs_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', refs)
302
303
        self.assertEqual(res, out)
304
305
    def test_get_dependencies_xml(self):
306
        w = DummyDaemon()
307
308
        out = (
309
            '<dependencies>'
310
            '<dependency vt_id="1.2.3.4"/><dependency vt_id="4.3.2.1"/>'
311
            '</dependencies>'
312
        )
313
        dep = ['1.2.3.4', '4.3.2.1']
314
        res = w.get_dependencies_vt_as_xml_str(
315
            '1.3.6.1.4.1.25623.1.0.100061', dep
316
        )
317
318
        self.assertEqual(res, out)
319
320
    def test_get_dependencies_xml_failed(self):
321
        w = DummyDaemon()
322
        logging.Logger.error = Mock()
323
324
        dep = [u"\u0006"]
325
        w.get_dependencies_vt_as_xml_str(
326
            '1.3.6.1.4.1.25623.1.0.100061', vt_dependencies=dep
327
        )
328
329
        assert_called_once(logging.Logger.error)
330
331
    def test_get_ctime_xml(self):
332
        w = DummyDaemon()
333
334
        out = '<creation_time>1237458156</creation_time>'
335
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
336
        ctime = vt.get('creation_time')
337
        res = w.get_creation_time_vt_as_xml_str(
338
            '1.3.6.1.4.1.25623.1.0.100061', ctime
339
        )
340
341
        self.assertEqual(res, out)
342
343
    def test_get_ctime_xml_failed(self):
344
        w = DummyDaemon()
345
        logging.Logger.warning = Mock()
346
347
        ctime = u'\u0006'
348
        w.get_creation_time_vt_as_xml_str(
349
            '1.3.6.1.4.1.25623.1.0.100061', vt_creation_time=ctime
350
        )
351
352
        assert_called_once(logging.Logger.warning)
353
354
    def test_get_mtime_xml(self):
355
        w = DummyDaemon()
356
357
        out = '<modification_time>1533906565</modification_time>'
358
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
359
        mtime = vt.get('modification_time')
360
        res = w.get_modification_time_vt_as_xml_str(
361
            '1.3.6.1.4.1.25623.1.0.100061', mtime
362
        )
363
364
        self.assertEqual(res, out)
365
366
    def test_get_mtime_xml_failed(self):
367
        w = DummyDaemon()
368
        logging.Logger.warning = Mock()
369
370
        mtime = u'\u0006'
371
        w.get_modification_time_vt_as_xml_str(
372
            '1.3.6.1.4.1.25623.1.0.100061', mtime
373
        )
374
375
        assert_called_once(logging.Logger.warning)
376
377
    def test_get_summary_xml(self):
378
        w = DummyDaemon()
379
380
        out = '<summary>some summary</summary>'
381
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
382
        summary = vt.get('summary')
383
        res = w.get_summary_vt_as_xml_str(
384
            '1.3.6.1.4.1.25623.1.0.100061', summary
385
        )
386
387
        self.assertEqual(res, out)
388
389
    def test_get_summary_xml_failed(self):
390
        w = DummyDaemon()
391
392
        summary = u'\u0006'
393
        logging.Logger.warning = Mock()
394
        w.get_summary_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', summary)
395
396
        assert_called_once(logging.Logger.warning)
397
398
    def test_get_impact_xml(self):
399
        w = DummyDaemon()
400
401
        out = '<impact>some impact</impact>'
402
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
403
        impact = vt.get('impact')
404
        res = w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact)
405
406
        self.assertEqual(res, out)
407
408
    def test_get_impact_xml_failed(self):
409
        w = DummyDaemon()
410
        logging.Logger.warning = Mock()
411
412
        impact = u'\u0006'
413
        w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact)
414
415
        assert_called_once(logging.Logger.warning)
416
417
    def test_get_insight_xml(self):
418
        w = DummyDaemon()
419
420
        out = '<insight>some insight</insight>'
421
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
422
        insight = vt.get('insight')
423
        res = w.get_insight_vt_as_xml_str(
424
            '1.3.6.1.4.1.25623.1.0.100061', insight
425
        )
426
427
        self.assertEqual(res, out)
428
429
    def test_get_insight_xml_failed(self):
430
        w = DummyDaemon()
431
        logging.Logger.warning = Mock()
432
433
        insight = u'\u0006'
434
        w.get_insight_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', insight)
435
436
        assert_called_once(logging.Logger.warning)
437
438
    def test_get_solution_xml(self):
439
        w = DummyDaemon()
440
441
        out = (
442
            '<solution type="WillNotFix" method="DebianAPTUpgrade">'
443
            'some solution'
444
            '</solution>'
445
        )
446
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
447
        solution = vt.get('solution')
448
        solution_type = vt.get('solution_type')
449
        solution_method = vt.get('solution_method')
450
451
        res = w.get_solution_vt_as_xml_str(
452
            '1.3.6.1.4.1.25623.1.0.100061',
453
            solution,
454
            solution_type,
455
            solution_method,
456
        )
457
458
        self.assertEqual(res, out)
459
460
    def test_get_solution_xml_failed(self):
461
        w = DummyDaemon()
462
        logging.Logger.warning = Mock()
463
464
        solution = u'\u0006'
465
        w.get_solution_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', solution)
466
467
        assert_called_once(logging.Logger.warning)
468
469
    def test_get_detection_xml(self):
470
        w = DummyDaemon()
471
472
        out = '<detection qod_type="remote_banner"/>'
473
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
474
        detection_type = vt.get('qod_type')
475
476
        res = w.get_detection_vt_as_xml_str(
477
            '1.3.6.1.4.1.25623.1.0.100061', qod_type=detection_type
478
        )
479
480
        self.assertEqual(res, out)
481
482
    def test_get_detection_xml_failed(self):
483
        w = DummyDaemon()
484
        logging.Logger.warning = Mock()
485
486
        detection = u'\u0006'
487
        w.get_detection_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', detection)
488
489
        assert_called_once(logging.Logger.warning)
490
491
    def test_get_affected_xml(self):
492
        w = DummyDaemon()
493
        out = '<affected>some affection</affected>'
494
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
495
        affected = vt.get('affected')
496
497
        res = w.get_affected_vt_as_xml_str(
498
            '1.3.6.1.4.1.25623.1.0.100061', affected=affected
499
        )
500
501
        self.assertEqual(res, out)
502
503
    def test_get_affected_xml_failed(self):
504
        w = DummyDaemon()
505
        logging.Logger.warning = Mock()
506
507
        affected = u"\u0006" + "affected"
508
        w.get_affected_vt_as_xml_str(
509
            '1.3.6.1.4.1.25623.1.0.100061', affected=affected
510
        )
511
512
        assert_called_once(logging.Logger.warning)
513
514
    @patch('ospd_openvas.daemon.Path.exists')
515
    @patch('ospd_openvas.daemon.OSPDopenvas.set_params_from_openvas_settings')
516
    def test_feed_is_outdated_none(
517
        self, mock_set_params: MagicMock, mock_path_exists: MagicMock
518
    ):
519
        w = DummyDaemon()
520
521
        w.scan_only_params['plugins_folder'] = '/foo/bar'
522
523
        # Return None
524
        mock_path_exists.return_value = False
525
526
        ret = w.feed_is_outdated('1234')
527
        self.assertIsNone(ret)
528
529
        self.assertEqual(mock_set_params.call_count, 1)
530
        self.assertEqual(mock_path_exists.call_count, 1)
531
532
    @patch('ospd_openvas.daemon.Path.exists')
533
    @patch('ospd_openvas.daemon.Path.open')
534
    def test_feed_is_outdated_true(
535
        self, mock_path_open: MagicMock, mock_path_exists: MagicMock,
536
    ):
537
        read_data = 'PLUGIN_SET = "1235";'
538
539
        mock_path_exists.return_value = True
540
        mock_read = MagicMock(name='Path open context manager')
541
        mock_read.__enter__ = MagicMock(return_value=io.StringIO(read_data))
542
        mock_path_open.return_value = mock_read
543
544
        w = DummyDaemon()
545
546
        # Return True
547
        w.scan_only_params['plugins_folder'] = '/foo/bar'
548
549
        ret = w.feed_is_outdated('1234')
550
        self.assertTrue(ret)
551
552
        self.assertEqual(mock_path_exists.call_count, 1)
553
        self.assertEqual(mock_path_open.call_count, 1)
554
555
    @patch('ospd_openvas.daemon.Path.exists')
556
    @patch('ospd_openvas.daemon.Path.open')
557
    def test_feed_is_outdated_false(
558
        self, mock_path_open: MagicMock, mock_path_exists: MagicMock,
559
    ):
560
        mock_path_exists.return_value = True
561
562
        read_data = 'PLUGIN_SET = "1234"'
563
        mock_path_exists.return_value = True
564
        mock_read = MagicMock(name='Path open context manager')
565
        mock_read.__enter__ = MagicMock(return_value=io.StringIO(read_data))
566
        mock_path_open.return_value = mock_read
567
568
        w = DummyDaemon()
569
        w.scan_only_params['plugins_folder'] = '/foo/bar'
570
571
        ret = w.feed_is_outdated('1234')
572
        self.assertFalse(ret)
573
574
        self.assertEqual(mock_path_exists.call_count, 1)
575
        self.assertEqual(mock_path_open.call_count, 1)
576
577
    def test_check_feed_cache_unavailable(self):
578
        w = DummyDaemon()
579
        w.vts.is_cache_available = False
580
        w.feed_is_outdated = Mock()
581
        res = w.check_feed()
582
583
        self.assertFalse(res)
584
        w.feed_is_outdated.assert_not_called()
585
586 View Code Duplication
    @patch('ospd_openvas.daemon.BaseDB')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
587
    @patch('ospd_openvas.daemon.ResultList.add_scan_log_to_list')
588
    def test_get_openvas_result(self, mock_add_scan_log_to_list, MockDBClass):
589
        w = DummyDaemon()
590
591
        target_element = w.create_xml_target()
592
        targets = OspRequest.process_target_element(target_element)
593
        w.create_scan('123-456', targets, None, [])
594
595
        results = [
596
            "LOG|||192.168.0.1|||localhost|||general/Host_Details||||||Host dead",
597
        ]
598
        MockDBClass.get_result.return_value = results
599
        mock_add_scan_log_to_list.return_value = None
600
601
        w.report_openvas_results(MockDBClass, '123-456')
602
        mock_add_scan_log_to_list.assert_called_with(
603
            host='192.168.0.1',
604
            hostname='localhost',
605
            name='',
606
            port='general/Host_Details',
607
            qod='',
608
            test_id='',
609
            uri='',
610
            value='Host dead',
611
        )
612
613 View Code Duplication
    @patch('ospd_openvas.daemon.BaseDB')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
614
    @patch('ospd_openvas.daemon.ResultList.add_scan_error_to_list')
615
    def test_get_openvas_result_host_deny(
616
        self, mock_add_scan_error_to_list, MockDBClass
617
    ):
618
        w = DummyDaemon()
619
620
        target_element = w.create_xml_target()
621
        targets = OspRequest.process_target_element(target_element)
622
        w.create_scan('123-456', targets, None, [])
623
624
        results = [
625
            "ERRMSG|||127.0.0.1|||localhost|||||||||Host access denied.",
626
        ]
627
        MockDBClass.get_result.return_value = results
628
        mock_add_scan_error_to_list.return_value = None
629
630
        w.report_openvas_results(MockDBClass, '123-456')
631
        mock_add_scan_error_to_list.assert_called_with(
632
            host='127.0.0.1',
633
            hostname='localhost',
634
            name='',
635
            port='',
636
            test_id='',
637
            uri='',
638
            value='Host access denied.',
639
        )
640
641
    @patch('ospd_openvas.daemon.BaseDB')
642
    def test_get_openvas_result_dead_hosts(self, MockDBClass):
643
        w = DummyDaemon()
644
        target_element = w.create_xml_target()
645
        targets = OspRequest.process_target_element(target_element)
646
        w.create_scan('123-456', targets, None, [])
647
648
        results = [
649
            "DEADHOST||| ||| ||| ||| |||4",
650
        ]
651
        MockDBClass.get_result.return_value = results
652
        w.scan_collection.set_amount_dead_hosts = MagicMock()
653
654
        w.report_openvas_results(MockDBClass, '123-456')
655
        w.scan_collection.set_amount_dead_hosts.assert_called_with(
656
            '123-456', total_dead=4,
657
        )
658
659
    @patch('ospd_openvas.daemon.BaseDB')
660
    @patch('ospd_openvas.daemon.ResultList.add_scan_log_to_list')
661
    def test_get_openvas_result_host_start(
662
        self, mock_add_scan_log_to_list, MockDBClass
663
    ):
664
        w = DummyDaemon()
665
        target_element = w.create_xml_target()
666
        targets = OspRequest.process_target_element(target_element)
667
        w.create_scan('123-456', targets, None, [])
668
669
        results = [
670
            "HOST_START|||192.168.10.124||| ||| ||||||today 1",
671
        ]
672
673
        MockDBClass.get_result.return_value = results
674
        mock_add_scan_log_to_list.return_value = None
675
676
        w.report_openvas_results(MockDBClass, '123-456')
677
678
        mock_add_scan_log_to_list.assert_called_with(
679
            host='192.168.10.124', name='HOST_START', value='today 1',
680
        )
681
682
    @patch('ospd_openvas.daemon.BaseDB')
683
    @patch('ospd_openvas.daemon.ResultList.add_scan_alarm_to_list')
684
    def test_result_without_vt_oid(
685
        self, mock_add_scan_alarm_to_list, MockDBClass
686
    ):
687
        w = DummyDaemon()
688
        logging.Logger.warning = Mock()
689
690
        target_element = w.create_xml_target()
691
        targets = OspRequest.process_target_element(target_element)
692
        w.create_scan('123-456', targets, None, [])
693
        w.scan_collection.scans_table['123-456']['results'] = list()
694
        results = ["ALARM||| ||| ||| ||| |||some alarm|||path", None]
695
        MockDBClass.get_result.return_value = results
696
        mock_add_scan_alarm_to_list.return_value = None
697
698
        w.report_openvas_results(MockDBClass, '123-456')
699
700
        assert_called_once(logging.Logger.warning)
701
702
    @patch('ospd_openvas.db.KbDB')
703
    def test_openvas_is_alive_already_stopped(self, mock_db):
704
        w = DummyDaemon()
705
        # mock_psutil = MockPsutil.return_value
706
        mock_db.scan_is_stopped.return_value = True
707
        ret = w.is_openvas_process_alive(mock_db, '1234', 'a1-b2-c3-d4')
708
709
        self.assertTrue(ret)
710
711
    @patch('ospd_openvas.db.KbDB')
712
    def test_openvas_is_alive_still(self, mock_db):
713
        w = DummyDaemon()
714
        # mock_psutil = MockPsutil.return_value
715
        mock_db.scan_is_stopped.return_value = False
716
        ret = w.is_openvas_process_alive(mock_db, '1234', 'a1-b2-c3-d4')
717
718
        self.assertFalse(ret)
719
720
    @patch('ospd_openvas.daemon.OSPDaemon.set_scan_progress_batch')
721
    @patch('ospd_openvas.daemon.OSPDaemon.sort_host_finished')
722
    @patch('ospd_openvas.db.KbDB')
723
    def test_report_openvas_scan_status(
724
        self, mock_db, mock_sort_host_finished, mock_set_scan_progress_batch
725
    ):
726
        w = DummyDaemon()
727
728
        mock_set_scan_progress_batch.return_value = None
729
        mock_sort_host_finished.return_value = None
730
        mock_db.get_scan_status.return_value = [
731
            '192.168.0.1/15/1000',
732
            '192.168.0.2/15/0',
733
            '192.168.0.3/15/-1',
734
            '192.168.0.4/1500/1500',
735
        ]
736
737
        target_element = w.create_xml_target()
738
        targets = OspRequest.process_target_element(target_element)
739
740
        w.create_scan('123-456', targets, None, [])
741
        w.report_openvas_scan_status(mock_db, '123-456')
742
743
        mock_set_scan_progress_batch.assert_called_with(
744
            '123-456',
745
            host_progress={
746
                '192.168.0.1': 1,
747
                '192.168.0.3': -1,
748
                '192.168.0.4': 100,
749
            },
750
        )
751
752
        mock_sort_host_finished.assert_called_with(
753
            '123-456', ['192.168.0.3', '192.168.0.4']
754
        )
755
756
757
class TestFilters(TestCase):
758
    def test_format_vt_modification_time(self):
759
        ovformat = OpenVasVtsFilter(None)
760
        td = '1517443741'
761
        formatted = ovformat.format_vt_modification_time(td)
762
        self.assertEqual(formatted, "20180201000901")
763
764
    def test_get_filtered_vts_false(self):
765
        w = DummyDaemon()
766
        vts_collection = ['1234', '1.3.6.1.4.1.25623.1.0.100061']
767
768
        ovfilter = OpenVasVtsFilter(w.nvti)
769
        res = ovfilter.get_filtered_vts_list(
770
            vts_collection, "modification_time<10"
771
        )
772
        self.assertNotIn('1.3.6.1.4.1.25623.1.0.100061', res)
773
774
    def test_get_filtered_vts_true(self):
775
        w = DummyDaemon()
776
        vts_collection = ['1234', '1.3.6.1.4.1.25623.1.0.100061']
777
778
        ovfilter = OpenVasVtsFilter(w.nvti)
779
        res = ovfilter.get_filtered_vts_list(
780
            vts_collection, "modification_time>10"
781
        )
782
        self.assertIn('1.3.6.1.4.1.25623.1.0.100061', res)
783