Passed
Pull Request — master (#270)
by Juan José
01:20
created

TestOspdOpenvas.test_get_custom_xml()   A

Complexity

Conditions 1

Size

Total Lines 18
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 8
nop 1
dl 0
loc 18
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
# pylint: disable=invalid-name,line-too-long,no-value-for-parameter
21
22
""" Unit Test for ospd-openvas """
23
24
import io
25
import logging
26
27
from unittest import TestCase
28
from unittest.mock import patch, Mock, MagicMock
29
30
from ospd.vts import Vts
31
from ospd.protocol import OspRequest
32
33
from tests.dummydaemon import DummyDaemon
34
from tests.helper import assert_called_once
35
36
from ospd_openvas.daemon import OSPD_PARAMS, OpenVasVtsFilter
37
from ospd_openvas.openvas import Openvas
38
39
OSPD_PARAMS_OUT = {
40
    'auto_enable_dependencies': {
41
        'type': 'boolean',
42
        'name': 'auto_enable_dependencies',
43
        'default': 1,
44
        'mandatory': 1,
45
        'description': 'Automatically enable the plugins that are depended on',
46
    },
47
    'cgi_path': {
48
        'type': 'string',
49
        'name': 'cgi_path',
50
        'default': '/cgi-bin:/scripts',
51
        'mandatory': 1,
52
        'description': 'Look for default CGIs in /cgi-bin and /scripts',
53
    },
54
    'checks_read_timeout': {
55
        'type': 'integer',
56
        'name': 'checks_read_timeout',
57
        'default': 5,
58
        'mandatory': 1,
59
        'description': 'Number  of seconds that the security checks will '
60
        'wait for when doing a recv()',
61
    },
62
    'drop_privileges': {
63
        'type': 'boolean',
64
        'name': 'drop_privileges',
65
        'default': 0,
66
        'mandatory': 1,
67
        'description': '',
68
    },
69
    'non_simult_ports': {
70
        'type': 'string',
71
        'name': 'non_simult_ports',
72
        'default': '22',
73
        'mandatory': 1,
74
        'description': 'Prevent to make two connections on the same given '
75
        'ports at the same time.',
76
    },
77
    'open_sock_max_attempts': {
78
        'type': 'integer',
79
        'name': 'open_sock_max_attempts',
80
        'default': 5,
81
        'mandatory': 0,
82
        'description': 'Number of unsuccessful retries to open the socket '
83
        'before to set the port as closed.',
84
    },
85
    'timeout_retry': {
86
        'type': 'integer',
87
        'name': 'timeout_retry',
88
        'default': 5,
89
        'mandatory': 0,
90
        'description': 'Number of retries when a socket connection attempt '
91
        'timesout.',
92
    },
93
    'optimize_test': {
94
        'type': 'integer',
95
        'name': 'optimize_test',
96
        'default': 5,
97
        'mandatory': 0,
98
        'description': 'By default, openvas does not trust the remote '
99
        'host banners.',
100
    },
101
    'plugins_timeout': {
102
        'type': 'integer',
103
        'name': 'plugins_timeout',
104
        'default': 5,
105
        'mandatory': 0,
106
        'description': 'This is the maximum lifetime, in seconds of a plugin.',
107
    },
108
    'report_host_details': {
109
        'type': 'boolean',
110
        'name': 'report_host_details',
111
        'default': 1,
112
        'mandatory': 1,
113
        'description': '',
114
    },
115
    'safe_checks': {
116
        'type': 'boolean',
117
        'name': 'safe_checks',
118
        'default': 1,
119
        'mandatory': 1,
120
        'description': 'Disable the plugins with potential to crash '
121
        'the remote services',
122
    },
123
    'scanner_plugins_timeout': {
124
        'type': 'integer',
125
        'name': 'scanner_plugins_timeout',
126
        'default': 36000,
127
        'mandatory': 1,
128
        'description': 'Like plugins_timeout, but for ACT_SCANNER plugins.',
129
    },
130
    'time_between_request': {
131
        'type': 'integer',
132
        'name': 'time_between_request',
133
        'default': 0,
134
        'mandatory': 0,
135
        'description': 'Allow to set a wait time between two actions '
136
        '(open, send, close).',
137
    },
138
    'unscanned_closed': {
139
        'type': 'boolean',
140
        'name': 'unscanned_closed',
141
        'default': 1,
142
        'mandatory': 1,
143
        'description': '',
144
    },
145
    'unscanned_closed_udp': {
146
        'type': 'boolean',
147
        'name': 'unscanned_closed_udp',
148
        'default': 1,
149
        'mandatory': 1,
150
        'description': '',
151
    },
152
    'expand_vhosts': {
153
        'type': 'boolean',
154
        'name': 'expand_vhosts',
155
        'default': 1,
156
        'mandatory': 0,
157
        'description': 'Whether to expand the target hosts '
158
        + 'list of vhosts with values gathered from sources '
159
        + 'such as reverse-lookup queries and VT checks '
160
        + 'for SSL/TLS certificates.',
161
    },
162
    'test_empty_vhost': {
163
        'type': 'boolean',
164
        'name': 'test_empty_vhost',
165
        'default': 0,
166
        'mandatory': 0,
167
        'description': 'If  set  to  yes, the scanner will '
168
        + 'also test the target by using empty vhost value '
169
        + 'in addition to the targets associated vhost values.',
170
    },
171
}
172
173
174
class TestOspdOpenvas(TestCase):
175
    @patch('ospd_openvas.daemon.Openvas')
176
    def test_set_params_from_openvas_settings(self, mock_openvas: Openvas):
177
        mock_openvas.get_settings.return_value = {
178
            'non_simult_ports': '22',
179
            'plugins_folder': '/foo/bar',
180
        }
181
        w = DummyDaemon()
182
        w.set_params_from_openvas_settings()
183
184
        self.assertEqual(mock_openvas.get_settings.call_count, 1)
185
        self.assertEqual(OSPD_PARAMS, OSPD_PARAMS_OUT)
186
        self.assertEqual(w.scan_only_params.get('plugins_folder'), '/foo/bar')
187
188
    @patch('ospd_openvas.daemon.Openvas')
189
    def test_sudo_available(self, mock_openvas):
190
        mock_openvas.check_sudo.return_value = True
191
192
        w = DummyDaemon()
193
        w._sudo_available = None  # pylint: disable=protected-access
194
        w.sudo_available  # pylint: disable=pointless-statement
195
196
        self.assertTrue(w.sudo_available)
197
198
    def test_get_custom_xml(self):
199
        out = (
200
            '<custom>'
201
            '<required_ports>Services/www, 80</required_ports>'
202
            '<category>3</category>'
203
            '<excluded_keys>Settings/disable_cgi_scanning</excluded_keys>'
204
            '<family>Product detection</family>'
205
            '<filename>mantis_detect.nasl</filename>'
206
            '<timeout>0</timeout>'
207
            '</custom>'
208
        )
209
        w = DummyDaemon()
210
211
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
212
        res = w.get_custom_vt_as_xml_str(
213
            '1.3.6.1.4.1.25623.1.0.100061', vt.get('custom')
214
        )
215
        self.assertEqual(len(res), len(out))
216
217
    def test_get_custom_xml_failed(self):
218
        w = DummyDaemon()
219
        logging.Logger.warning = Mock()
220
221
        custom = {'a': u"\u0006"}
222
        w.get_custom_vt_as_xml_str(
223
            '1.3.6.1.4.1.25623.1.0.100061', custom=custom
224
        )
225
226
        assert_called_once(logging.Logger.warning)
227
228
    def test_get_severities_xml(self):
229
        w = DummyDaemon()
230
231
        out = (
232
            '<severities>'
233
            '<severity type="cvss_base_v2">'
234
            'AV:N/AC:L/Au:N/C:N/I:N/A:N'
235
            '</severity>'
236
            '</severities>'
237
        )
238
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
239
        severities = vt.get('severities')
240
        res = w.get_severities_vt_as_xml_str(
241
            '1.3.6.1.4.1.25623.1.0.100061', severities
242
        )
243
244
        self.assertEqual(res, out)
245
246
    def test_get_severities_xml_failed(self):
247
        w = DummyDaemon()
248
        logging.Logger.warning = Mock()
249
250
        sever = {'severity_base_vector': u"\u0006"}
251
        w.get_severities_vt_as_xml_str(
252
            '1.3.6.1.4.1.25623.1.0.100061', severities=sever
253
        )
254
255
        assert_called_once(logging.Logger.warning)
256
257
    def test_get_params_xml(self):
258
        w = DummyDaemon()
259
        out = (
260
            '<params>'
261
            '<param type="checkbox" id="2">'
262
            '<name>Do not randomize the  order  in  which ports are '
263
            'scanned</name>'
264
            '<default>no</default>'
265
            '</param>'
266
            '<param type="entry" id="1">'
267
            '<name>Data length :</name>'
268
            '</param>'
269
            '</params>'
270
        )
271
272
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
273
        params = vt.get('vt_params')
274
        res = w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params)
275
276
        self.assertEqual(len(res), len(out))
277
278
    def test_get_params_xml_failed(self):
279
        w = DummyDaemon()
280
        logging.Logger.warning = Mock()
281
282
        params = {
283
            '1': {
284
                'id': '1',
285
                'type': 'entry',
286
                'default': u'\u0006',
287
                'name': 'dns-fuzz.timelimit',
288
                'description': 'Description',
289
            }
290
        }
291
        w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params)
292
293
        assert_called_once(logging.Logger.warning)
294
295
    def test_get_refs_xml(self):
296
        w = DummyDaemon()
297
298
        out = '<refs><ref type="url" id="http://www.mantisbt.org/"/></refs>'
299
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
300
        refs = vt.get('vt_refs')
301
        res = w.get_refs_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', refs)
302
303
        self.assertEqual(res, out)
304
305
    def test_get_dependencies_xml(self):
306
        w = DummyDaemon()
307
308
        out = (
309
            '<dependencies>'
310
            '<dependency vt_id="1.2.3.4"/><dependency vt_id="4.3.2.1"/>'
311
            '</dependencies>'
312
        )
313
        dep = ['1.2.3.4', '4.3.2.1']
314
        res = w.get_dependencies_vt_as_xml_str(
315
            '1.3.6.1.4.1.25623.1.0.100061', dep
316
        )
317
318
        self.assertEqual(res, out)
319
320
    def test_get_dependencies_xml_failed(self):
321
        w = DummyDaemon()
322
        logging.Logger.error = Mock()
323
324
        dep = [u"\u0006"]
325
        w.get_dependencies_vt_as_xml_str(
326
            '1.3.6.1.4.1.25623.1.0.100061', vt_dependencies=dep
327
        )
328
329
        assert_called_once(logging.Logger.error)
330
331
    def test_get_ctime_xml(self):
332
        w = DummyDaemon()
333
334
        out = '<creation_time>1237458156</creation_time>'
335
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
336
        ctime = vt.get('creation_time')
337
        res = w.get_creation_time_vt_as_xml_str(
338
            '1.3.6.1.4.1.25623.1.0.100061', ctime
339
        )
340
341
        self.assertEqual(res, out)
342
343
    def test_get_ctime_xml_failed(self):
344
        w = DummyDaemon()
345
        logging.Logger.warning = Mock()
346
347
        ctime = u'\u0006'
348
        w.get_creation_time_vt_as_xml_str(
349
            '1.3.6.1.4.1.25623.1.0.100061', vt_creation_time=ctime
350
        )
351
352
        assert_called_once(logging.Logger.warning)
353
354
    def test_get_mtime_xml(self):
355
        w = DummyDaemon()
356
357
        out = '<modification_time>1533906565</modification_time>'
358
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
359
        mtime = vt.get('modification_time')
360
        res = w.get_modification_time_vt_as_xml_str(
361
            '1.3.6.1.4.1.25623.1.0.100061', mtime
362
        )
363
364
        self.assertEqual(res, out)
365
366
    def test_get_mtime_xml_failed(self):
367
        w = DummyDaemon()
368
        logging.Logger.warning = Mock()
369
370
        mtime = u'\u0006'
371
        w.get_modification_time_vt_as_xml_str(
372
            '1.3.6.1.4.1.25623.1.0.100061', mtime
373
        )
374
375
        assert_called_once(logging.Logger.warning)
376
377
    def test_get_summary_xml(self):
378
        w = DummyDaemon()
379
380
        out = '<summary>some summary</summary>'
381
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
382
        summary = vt.get('summary')
383
        res = w.get_summary_vt_as_xml_str(
384
            '1.3.6.1.4.1.25623.1.0.100061', summary
385
        )
386
387
        self.assertEqual(res, out)
388
389
    def test_get_summary_xml_failed(self):
390
        w = DummyDaemon()
391
392
        summary = u'\u0006'
393
        logging.Logger.warning = Mock()
394
        w.get_summary_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', summary)
395
396
        assert_called_once(logging.Logger.warning)
397
398
    def test_get_impact_xml(self):
399
        w = DummyDaemon()
400
401
        out = '<impact>some impact</impact>'
402
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
403
        impact = vt.get('impact')
404
        res = w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact)
405
406
        self.assertEqual(res, out)
407
408
    def test_get_impact_xml_failed(self):
409
        w = DummyDaemon()
410
        logging.Logger.warning = Mock()
411
412
        impact = u'\u0006'
413
        w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact)
414
415
        assert_called_once(logging.Logger.warning)
416
417
    def test_get_insight_xml(self):
418
        w = DummyDaemon()
419
420
        out = '<insight>some insight</insight>'
421
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
422
        insight = vt.get('insight')
423
        res = w.get_insight_vt_as_xml_str(
424
            '1.3.6.1.4.1.25623.1.0.100061', insight
425
        )
426
427
        self.assertEqual(res, out)
428
429
    def test_get_insight_xml_failed(self):
430
        w = DummyDaemon()
431
        logging.Logger.warning = Mock()
432
433
        insight = u'\u0006'
434
        w.get_insight_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', insight)
435
436
        assert_called_once(logging.Logger.warning)
437
438
    def test_get_solution_xml(self):
439
        w = DummyDaemon()
440
441
        out = (
442
            '<solution type="WillNotFix" method="DebianAPTUpgrade">'
443
            'some solution'
444
            '</solution>'
445
        )
446
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
447
        solution = vt.get('solution')
448
        solution_type = vt.get('solution_type')
449
        solution_method = vt.get('solution_method')
450
451
        res = w.get_solution_vt_as_xml_str(
452
            '1.3.6.1.4.1.25623.1.0.100061',
453
            solution,
454
            solution_type,
455
            solution_method,
456
        )
457
458
        self.assertEqual(res, out)
459
460
    def test_get_solution_xml_failed(self):
461
        w = DummyDaemon()
462
        logging.Logger.warning = Mock()
463
464
        solution = u'\u0006'
465
        w.get_solution_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', solution)
466
467
        assert_called_once(logging.Logger.warning)
468
469
    def test_get_detection_xml(self):
470
        w = DummyDaemon()
471
472
        out = '<detection qod_type="remote_banner"/>'
473
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
474
        detection_type = vt.get('qod_type')
475
476
        res = w.get_detection_vt_as_xml_str(
477
            '1.3.6.1.4.1.25623.1.0.100061', qod_type=detection_type
478
        )
479
480
        self.assertEqual(res, out)
481
482
    def test_get_detection_xml_failed(self):
483
        w = DummyDaemon()
484
        logging.Logger.warning = Mock()
485
486
        detection = u'\u0006'
487
        w.get_detection_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', detection)
488
489
        assert_called_once(logging.Logger.warning)
490
491
    def test_get_affected_xml(self):
492
        w = DummyDaemon()
493
        out = '<affected>some affection</affected>'
494
        vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061']
495
        affected = vt.get('affected')
496
497
        res = w.get_affected_vt_as_xml_str(
498
            '1.3.6.1.4.1.25623.1.0.100061', affected=affected
499
        )
500
501
        self.assertEqual(res, out)
502
503
    def test_get_affected_xml_failed(self):
504
        w = DummyDaemon()
505
        logging.Logger.warning = Mock()
506
507
        affected = u"\u0006" + "affected"
508
        w.get_affected_vt_as_xml_str(
509
            '1.3.6.1.4.1.25623.1.0.100061', affected=affected
510
        )
511
512
        assert_called_once(logging.Logger.warning)
513
514
    @patch('ospd_openvas.daemon.Path.exists')
515
    @patch('ospd_openvas.daemon.OSPDopenvas.set_params_from_openvas_settings')
516
    def test_feed_is_outdated_none(
517
        self, mock_set_params: MagicMock, mock_path_exists: MagicMock
518
    ):
519
        w = DummyDaemon()
520
521
        w.scan_only_params['plugins_folder'] = '/foo/bar'
522
523
        # Return None
524
        mock_path_exists.return_value = False
525
526
        ret = w.feed_is_outdated('1234')
527
        self.assertIsNone(ret)
528
529
        self.assertEqual(mock_set_params.call_count, 1)
530
        self.assertEqual(mock_path_exists.call_count, 1)
531
532
    @patch('ospd_openvas.daemon.Path.exists')
533
    @patch('ospd_openvas.daemon.Path.open')
534
    def test_feed_is_outdated_true(
535
        self, mock_path_open: MagicMock, mock_path_exists: MagicMock,
536
    ):
537
        read_data = 'PLUGIN_SET = "1235";'
538
539
        mock_path_exists.return_value = True
540
        mock_read = MagicMock(name='Path open context manager')
541
        mock_read.__enter__ = MagicMock(return_value=io.StringIO(read_data))
542
        mock_path_open.return_value = mock_read
543
544
        w = DummyDaemon()
545
546
        # Return True
547
        w.scan_only_params['plugins_folder'] = '/foo/bar'
548
549
        ret = w.feed_is_outdated('1234')
550
        self.assertTrue(ret)
551
552
        self.assertEqual(mock_path_exists.call_count, 1)
553
        self.assertEqual(mock_path_open.call_count, 1)
554
555
    @patch('ospd_openvas.daemon.Path.exists')
556
    @patch('ospd_openvas.daemon.Path.open')
557
    def test_feed_is_outdated_false(
558
        self, mock_path_open: MagicMock, mock_path_exists: MagicMock,
559
    ):
560
        mock_path_exists.return_value = True
561
562
        read_data = 'PLUGIN_SET = "1234"'
563
        mock_path_exists.return_value = True
564
        mock_read = MagicMock(name='Path open context manager')
565
        mock_read.__enter__ = MagicMock(return_value=io.StringIO(read_data))
566
        mock_path_open.return_value = mock_read
567
568
        w = DummyDaemon()
569
        w.scan_only_params['plugins_folder'] = '/foo/bar'
570
571
        ret = w.feed_is_outdated('1234')
572
        self.assertFalse(ret)
573
574
        self.assertEqual(mock_path_exists.call_count, 1)
575
        self.assertEqual(mock_path_open.call_count, 1)
576
577
    def test_check_feed_cache_unavailable(self):
578
        w = DummyDaemon()
579
        w.vts.is_cache_available = False
580
        w.feed_is_outdated = Mock()
581
        res = w.check_feed()
582
583
        self.assertFalse(res)
584
        w.feed_is_outdated.assert_not_called()
585
586 View Code Duplication
    @patch('ospd_openvas.daemon.ScanDB')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
587
    @patch('ospd_openvas.daemon.ResultList.add_scan_log_to_list')
588
    def test_get_openvas_result(self, mock_add_scan_log_to_list, MockDBClass):
589
        w = DummyDaemon()
590
591
        target_element = w.create_xml_target()
592
        targets = OspRequest.process_target_element(target_element)
593
        w.create_scan('123-456', targets, None, [])
594
595
        results = ["LOG||| |||general/Host_Details||| |||Host dead", None]
596
        mock_db = MockDBClass.return_value
597
        mock_db.get_result.side_effect = results
598
        mock_add_scan_log_to_list.return_value = None
599
600
        w.report_openvas_results(mock_db, '123-456', 'localhost')
601
        mock_add_scan_log_to_list.assert_called_with(
602
            host='localhost',
603
            hostname='',
604
            name='',
605
            port='general/Host_Details',
606
            qod='',
607
            test_id='',
608
            value='Host dead',
609
        )
610
611 View Code Duplication
    @patch('ospd_openvas.daemon.ScanDB')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
612
    @patch('ospd_openvas.daemon.ResultList.add_scan_error_to_list')
613
    def test_get_openvas_result_host_deny(
614
        self, mock_add_scan_error_to_list, MockDBClass
615
    ):
616
        w = DummyDaemon()
617
618
        target_element = w.create_xml_target()
619
        targets = OspRequest.process_target_element(target_element)
620
        w.create_scan('123-456', targets, None, [])
621
622
        results = ["ERRMSG|||127.0.0.1|||||| |||Host access denied.", None]
623
        mock_db = MockDBClass.return_value
624
        mock_db.get_result.side_effect = results
625
        mock_add_scan_error_to_list.return_value = None
626
627
        w.report_openvas_results(mock_db, '123-456', '')
628
        mock_add_scan_error_to_list.assert_called_with(
629
            host='127.0.0.1',
630
            hostname='127.0.0.1',
631
            name='',
632
            port='',
633
            test_id='',
634
            value='Host access denied.',
635
        )
636
637
    @patch('ospd_openvas.daemon.ScanDB')
638
    def test_get_openvas_result_dead_hosts(self, MockDBClass):
639
        w = DummyDaemon()
640
        target_element = w.create_xml_target()
641
        targets = OspRequest.process_target_element(target_element)
642
        w.create_scan('123-456', targets, None, [])
643
644
        results = ["DEADHOST||| ||| ||| |||4", None]
645
        mock_db = MockDBClass.return_value
646
        mock_db.get_result.side_effect = results
647
        w.scan_collection.set_amount_dead_hosts = MagicMock()
648
649
        w.report_openvas_results(mock_db, '123-456', 'localhost')
650
        w.scan_collection.set_amount_dead_hosts.assert_called_with(
651
            '123-456', total_dead=4,
652
        )
653
654
    @patch('ospd_openvas.daemon.ScanDB')
655
    @patch('ospd_openvas.daemon.ResultList.add_scan_log_to_list')
656
    def test_result_without_vt_oid(
657
        self, mock_add_scan_log_to_list, MockDBClass
658
    ):
659
        w = DummyDaemon()
660
        logging.Logger.warning = Mock()
661
662
        target_element = w.create_xml_target()
663
        targets = OspRequest.process_target_element(target_element)
664
        w.create_scan('123-456', targets, None, [])
665
        w.scan_collection.scans_table['123-456']['results'] = list()
666
        results = ["ALARM||| ||| ||| |||some alarm", None]
667
        mock_db = MockDBClass.return_value
668
        mock_db.get_result.side_effect = results
669
        mock_add_scan_log_to_list.return_value = None
670
671
        w.report_openvas_results(mock_db, '123-456', 'localhost')
672
673
        assert_called_once(logging.Logger.warning)
674
675
    @patch('ospd_openvas.db.KbDB')
676
    def test_openvas_is_alive_already_stopped(self, mock_db):
677
        w = DummyDaemon()
678
        # mock_psutil = MockPsutil.return_value
679
        mock_db.scan_is_stopped.return_value = True
680
        ret = w.is_openvas_process_alive(mock_db, '1234', 'a1-b2-c3-d4')
681
682
        self.assertTrue(ret)
683
684
    @patch('ospd_openvas.db.KbDB')
685
    def test_openvas_is_alive_still(self, mock_db):
686
        w = DummyDaemon()
687
        # mock_psutil = MockPsutil.return_value
688
        mock_db.scan_is_stopped.return_value = False
689
        ret = w.is_openvas_process_alive(mock_db, '1234', 'a1-b2-c3-d4')
690
691
        self.assertFalse(ret)
692
693
    @patch('ospd_openvas.daemon.OSPDaemon.set_scan_host_progress')
694
    def test_update_progress(self, mock_set_scan_host_progress):
695
        w = DummyDaemon()
696
697
        mock_set_scan_host_progress.return_value = None
698
699
        msg = '0/-1'
700
        target_element = w.create_xml_target()
701
        targets = OspRequest.process_target_element(target_element)
702
703
        w.create_scan('123-456', targets, None, [])
704
        w.update_progress('123-456', 'localhost', msg)
705
706
        mock_set_scan_host_progress.assert_called_with(
707
            '123-456', host='localhost', progress=-1
708
        )
709
710
711
class TestFilters(TestCase):
712
    def test_format_vt_modification_time(self):
713
        ovformat = OpenVasVtsFilter(None)
714
        td = '1517443741'
715
        formatted = ovformat.format_vt_modification_time(td)
716
        self.assertEqual(formatted, "20180201000901")
717
718
    def test_get_filtered_vts_false(self):
719
        w = DummyDaemon()
720
        vts_collection = ['1234', '1.3.6.1.4.1.25623.1.0.100061']
721
722
        ovfilter = OpenVasVtsFilter(w.nvti)
723
        res = ovfilter.get_filtered_vts_list(
724
            vts_collection, "modification_time<10"
725
        )
726
        self.assertNotIn('1.3.6.1.4.1.25623.1.0.100061', res)
727
728
    def test_get_filtered_vts_true(self):
729
        w = DummyDaemon()
730
        vts_collection = ['1234', '1.3.6.1.4.1.25623.1.0.100061']
731
732
        ovfilter = OpenVasVtsFilter(w.nvti)
733
        res = ovfilter.get_filtered_vts_list(
734
            vts_collection, "modification_time>10"
735
        )
736
        self.assertIn('1.3.6.1.4.1.25623.1.0.100061', res)
737