Passed
Pull Request — master (#291)
by Juan José
01:21
created

tests.test_scan_and_result   F

Complexity

Total Complexity 70

Size/Duplication

Total Lines 1287
Duplicated Lines 12.67 %

Importance

Changes 0
Metric Value
eloc 801
dl 163
loc 1287
rs 2.599
c 0
b 0
f 0
wmc 70

61 Methods

Rating   Name   Duplication   Size   Complexity  
A FakeStartProcess.run() 0 3 1
A ScanTestCase.test_get_vts_single_vt() 0 13 1
A ScanTestCase.test_get_default_scanner_version() 0 7 1
A FakeStartProcess.__repr__() 0 3 1
A Result.__init__() 0 13 2
A ScanTestCase.test_get_vts_no_vt() 0 8 1
A ScanTestCase.setUp() 0 4 1
A FakeStartProcess.__init__() 0 7 1
A FakeStartProcess.__call__() 0 5 1
A ScanTestCase.test_get_vt_xml_no_dict() 0 4 1
A ScanTestCase.test_get_default_help() 0 13 1
A ScanTestCase.test_get_default_scanner_params() 0 12 1
A ScanTestCase.test_get_vts_vts_with_dependencies() 0 16 1
A ScanTestCase.test_scan_exists() 0 54 1
A ScanTestCase.test_get_vts_vts_with_params() 25 25 1
A ScanTestCase.test_interrupted_scan() 0 32 2
A ScanTestCase.test_sort_host_finished() 0 29 1
A ScanTestCase.test_get_vts_vts_with_impact() 0 15 1
A ScanTestCase.test_batch_result() 0 35 2
A ScanTestCase.test_get_vts_vts_with_refs() 30 30 1
A ScanTestCase.test_get_vts_vts_with_ctime() 0 16 1
A ScanTestCase.test_get_vts_still_not_init() 0 7 1
A ScanTestCase.test_set_get_vts_version() 0 5 1
A ScanTestCase.test_calculate_progress_without_current_hosts() 0 31 1
A ScanTestCase.test_get_vts_vts_with_summary() 0 15 1
A ScanTestCase.test_free_memory_true() 0 9 1
A ScanTestCase.test_get_vts_vts_with_severities() 0 15 1
A ScanTestCase.test_result_order() 0 34 2
A ScanTestCase.test_get_vts_vts_with_solution() 0 17 1
A ScanTestCase.test_get_vts_vts_with_affected() 0 15 1
A ScanTestCase.test_get_scan_results_clean() 29 29 1
A ScanTestCase.test_get_vts_multiple_vts_with_custom() 0 16 1
A ScanTestCase.test_get_scan_pop_max_res() 0 32 1
A ScanTestCase.test_billon_laughs() 0 22 1
A ScanTestCase.test_get_vts_filter_negative() 25 25 1
A ScanTestCase.test_get_vts_version() 0 17 1
A ScanTestCase.test_get_vts_filter_positive() 25 25 1
A ScanTestCase.test_scan_with_error() 0 51 4
A ScanTestCase.test_is_new_scan_allowed_false() 0 8 1
A ScanTestCase.test_get_vts_vts_with_detection_qodv() 0 16 1
A ScanTestCase.test_count_queued_scans() 0 17 1
A ScanTestCase.test_clean_forgotten_scans() 0 57 3
A ScanTestCase.test_get_scan_results_restore() 29 29 1
A ScanTestCase.test_scan_get_target_options() 0 21 1
A ScanTestCase.test_get_scan_without_scanid() 0 21 1
A ScanTestCase.test_get_vts_vts_with_insight() 0 15 1
A ScanTestCase.test_target_with_credentials() 0 37 1
A ScanTestCase.test_get_vts_bad_filter() 0 6 1
A ScanTestCase.test_is_new_scan_allowed_true() 0 8 1
A ScanTestCase.test_progress() 0 22 1
A ScanTestCase.test_scan_get_target() 0 24 1
A ScanTestCase.test_get_vts_vts_with_mtime() 0 16 1
A ScanTestCase.test_get_help_still_not_init() 0 7 1
A ScanTestCase.test_set_get_vts_version_error() 0 2 1
A ScanTestCase.test_free_memory_false() 0 9 1
A ScanTestCase.test_get_vts_version_only() 0 14 1
A ScanTestCase.test_get_scan_progress_xml() 0 47 1
A ScanTestCase.test_start_queue_scan_daemon_not_init() 0 8 1
A ScanTestCase.test_get_vtss_multiple_vts() 0 13 1
A ScanTestCase.test_get_vts_vts_with_detection_qodt() 0 16 1
A ScanTestCase.test_get_scan_pop() 0 43 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like tests.test_scan_and_result often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Copyright (C) 2014-2020 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: AGPL-3.0-or-later
4
#
5
# This program is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Affero General Public License as
7
# published by the Free Software Foundation, either version 3 of the
8
# License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
# pylint: disable=too-many-lines
19
20
""" Test module for scan runs
21
"""
22
23
import time
24
import unittest
25
26
from unittest.mock import patch, MagicMock, Mock
27
28
import logging
29
import xml.etree.ElementTree as ET
30
31
from defusedxml.common import EntitiesForbidden
32
33
from ospd.resultlist import ResultList
34
from ospd.errors import OspdCommandError
35
from ospd.scan import ScanStatus
36
37
from .helper import (
38
    DummyWrapper,
39
    assert_called,
40
    FakeStream,
41
    FakeDataManager,
42
    FakePsutil,
43
)
44
45
46
class FakeStartProcess:
47
    def __init__(self):
48
        self.run_mock = MagicMock()
49
        self.call_mock = MagicMock()
50
51
        self.func = None
52
        self.args = None
53
        self.kwargs = None
54
55
    def __call__(self, func, *, args=None, kwargs=None):
56
        self.func = func
57
        self.args = args or []
58
        self.kwargs = kwargs or {}
59
        return self.call_mock
60
61
    def run(self):
62
        self.func(*self.args, **self.kwargs)
63
        return self.run_mock
64
65
    def __repr__(self):
66
        return "<FakeProcess func={} args={} kwargs={}>".format(
67
            self.func, self.args, self.kwargs
68
        )
69
70
71
class Result(object):
72
    def __init__(self, type_, **kwargs):
73
        self.result_type = type_
74
        self.host = ''
75
        self.hostname = ''
76
        self.name = ''
77
        self.value = ''
78
        self.port = ''
79
        self.test_id = ''
80
        self.severity = ''
81
        self.qod = ''
82
        self.uri = ''
83
        for name, value in kwargs.items():
84
            setattr(self, name, value)
85
86
87
class ScanTestCase(unittest.TestCase):
88
    def setUp(self):
89
        self.daemon = DummyWrapper([])
90
        self.daemon.scan_collection.datamanager = FakeDataManager()
91
        self.daemon.scan_collection.file_storage_dir = '/tmp'
92
93
    def test_get_default_scanner_params(self):
94
        fs = FakeStream()
95
96
        self.daemon.handle_command('<get_scanner_details />', fs)
97
        response = fs.get_response()
98
99
        # The status of the response must be success (i.e. 200)
100
        self.assertEqual(response.get('status'), '200')
101
        # The response root element must have the correct name
102
        self.assertEqual(response.tag, 'get_scanner_details_response')
103
        # The response must contain a 'scanner_params' element
104
        self.assertIsNotNone(response.find('scanner_params'))
105
106
    def test_get_default_help(self):
107
        fs = FakeStream()
108
109
        self.daemon.handle_command('<help />', fs)
110
        response = fs.get_response()
111
        self.assertEqual(response.get('status'), '200')
112
113
        fs = FakeStream()
114
        self.daemon.handle_command('<help format="xml" />', fs)
115
        response = fs.get_response()
116
117
        self.assertEqual(response.get('status'), '200')
118
        self.assertEqual(response.tag, 'help_response')
119
120
    def test_get_default_scanner_version(self):
121
        fs = FakeStream()
122
        self.daemon.handle_command('<get_version />', fs)
123
        response = fs.get_response()
124
125
        self.assertEqual(response.get('status'), '200')
126
        self.assertIsNotNone(response.find('protocol'))
127
128
    def test_get_vts_no_vt(self):
129
        fs = FakeStream()
130
131
        self.daemon.handle_command('<get_vts />', fs)
132
        response = fs.get_response()
133
134
        self.assertEqual(response.get('status'), '200')
135
        self.assertIsNotNone(response.find('vts'))
136
137
    def test_get_vt_xml_no_dict(self):
138
        single_vt = ('1234', None)
139
        vt = self.daemon.get_vt_xml(single_vt)
140
        self.assertFalse(vt.get('id'))
141
142
    def test_get_vts_single_vt(self):
143
        fs = FakeStream()
144
        self.daemon.add_vt('1.2.3.4', 'A vulnerability test')
145
        self.daemon.handle_command('<get_vts />', fs)
146
        response = fs.get_response()
147
148
        self.assertEqual(response.get('status'), '200')
149
150
        vts = response.find('vts')
151
        self.assertIsNotNone(vts.find('vt'))
152
153
        vt = vts.find('vt')
154
        self.assertEqual(vt.get('id'), '1.2.3.4')
155
156
    def test_get_vts_version(self):
157
        fs = FakeStream()
158
        self.daemon.add_vt('1.2.3.4', 'A vulnerability test')
159
        self.daemon.set_vts_version('today')
160
        self.daemon.handle_command('<get_vts />', fs)
161
        response = fs.get_response()
162
163
        self.assertEqual(response.get('status'), '200')
164
165
        vts_version = response.find('vts').attrib['vts_version']
166
        self.assertEqual(vts_version, self.daemon.get_vts_version())
167
168
        vts = response.find('vts')
169
        self.assertIsNotNone(vts.find('vt'))
170
171
        vt = vts.find('vt')
172
        self.assertEqual(vt.get('id'), '1.2.3.4')
173
174
    def test_get_vts_version_only(self):
175
        fs = FakeStream()
176
        self.daemon.add_vt('1.2.3.4', 'A vulnerability test')
177
        self.daemon.set_vts_version('today')
178
        self.daemon.handle_command('<get_vts version_only="1"/>', fs)
179
        response = fs.get_response()
180
181
        self.assertEqual(response.get('status'), '200')
182
183
        vts_version = response.find('vts').attrib['vts_version']
184
        self.assertEqual(vts_version, self.daemon.get_vts_version())
185
186
        vts = response.find('vts')
187
        self.assertIsNone(vts.find('vt'))
188
189
    def test_get_vts_still_not_init(self):
190
        fs = FakeStream()
191
        self.daemon.initialized = False
192
        self.daemon.handle_command('<get_vts />', fs)
193
        response = fs.get_response()
194
195
        self.assertEqual(response.get('status'), '400')
196
197
    def test_get_help_still_not_init(self):
198
        fs = FakeStream()
199
        self.daemon.initialized = False
200
        self.daemon.handle_command('<help/>', fs)
201
        response = fs.get_response()
202
203
        self.assertEqual(response.get('status'), '200')
204
205 View Code Duplication
    def test_get_vts_filter_positive(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
206
        self.daemon.add_vt(
207
            '1.2.3.4',
208
            'A vulnerability test',
209
            vt_params="a",
210
            vt_modification_time='19000202',
211
        )
212
        fs = FakeStream()
213
214
        self.daemon.handle_command(
215
            '<get_vts filter="modification_time&gt;19000201"></get_vts>', fs
216
        )
217
        response = fs.get_response()
218
219
        self.assertEqual(response.get('status'), '200')
220
        vts = response.find('vts')
221
222
        vt = vts.find('vt')
223
        self.assertIsNotNone(vt)
224
        self.assertEqual(vt.get('id'), '1.2.3.4')
225
226
        modification_time = response.findall('vts/vt/modification_time')
227
        self.assertEqual(
228
            '<modification_time>19000202</modification_time>',
229
            ET.tostring(modification_time[0]).decode('utf-8'),
230
        )
231
232 View Code Duplication
    def test_get_vts_filter_negative(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
233
        self.daemon.add_vt(
234
            '1.2.3.4',
235
            'A vulnerability test',
236
            vt_params="a",
237
            vt_modification_time='19000202',
238
        )
239
        fs = FakeStream()
240
        self.daemon.handle_command(
241
            '<get_vts filter="modification_time&lt;19000203"></get_vts>', fs,
242
        )
243
        response = fs.get_response()
244
245
        self.assertEqual(response.get('status'), '200')
246
247
        vts = response.find('vts')
248
249
        vt = vts.find('vt')
250
        self.assertIsNotNone(vt)
251
        self.assertEqual(vt.get('id'), '1.2.3.4')
252
253
        modification_time = response.findall('vts/vt/modification_time')
254
        self.assertEqual(
255
            '<modification_time>19000202</modification_time>',
256
            ET.tostring(modification_time[0]).decode('utf-8'),
257
        )
258
259
    def test_get_vts_bad_filter(self):
260
        fs = FakeStream()
261
        cmd = '<get_vts filter="modification_time"/>'
262
263
        self.assertRaises(OspdCommandError, self.daemon.handle_command, cmd, fs)
264
        self.assertTrue(self.daemon.vts.is_cache_available)
265
266
    def test_get_vtss_multiple_vts(self):
267
        self.daemon.add_vt('1.2.3.4', 'A vulnerability test')
268
        self.daemon.add_vt('1.2.3.5', 'Another vulnerability test')
269
        self.daemon.add_vt('123456789', 'Yet another vulnerability test')
270
271
        fs = FakeStream()
272
273
        self.daemon.handle_command('<get_vts />', fs)
274
        response = fs.get_response()
275
        self.assertEqual(response.get('status'), '200')
276
277
        vts = response.find('vts')
278
        self.assertIsNotNone(vts.find('vt'))
279
280
    def test_get_vts_multiple_vts_with_custom(self):
281
        self.daemon.add_vt('1.2.3.4', 'A vulnerability test', custom='b')
282
        self.daemon.add_vt(
283
            '4.3.2.1', 'Another vulnerability test with custom info', custom='b'
284
        )
285
        self.daemon.add_vt(
286
            '123456789', 'Yet another vulnerability test', custom='b'
287
        )
288
        fs = FakeStream()
289
290
        self.daemon.handle_command('<get_vts />', fs)
291
        response = fs.get_response()
292
293
        custom = response.findall('vts/vt/custom')
294
295
        self.assertEqual(3, len(custom))
296
297 View Code Duplication
    def test_get_vts_vts_with_params(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
298
        self.daemon.add_vt(
299
            '1.2.3.4', 'A vulnerability test', vt_params="a", custom="b"
300
        )
301
        fs = FakeStream()
302
303
        self.daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
304
        response = fs.get_response()
305
306
        # The status of the response must be success (i.e. 200)
307
        self.assertEqual(response.get('status'), '200')
308
309
        # The response root element must have the correct name
310
        self.assertEqual(response.tag, 'get_vts_response')
311
        # The response must contain a 'scanner_params' element
312
        self.assertIsNotNone(response.find('vts'))
313
314
        vt_params = response[0][0].findall('params')
315
        self.assertEqual(1, len(vt_params))
316
317
        custom = response[0][0].findall('custom')
318
        self.assertEqual(1, len(custom))
319
320
        params = response.findall('vts/vt/params/param')
321
        self.assertEqual(2, len(params))
322
323 View Code Duplication
    def test_get_vts_vts_with_refs(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
324
        self.daemon.add_vt(
325
            '1.2.3.4',
326
            'A vulnerability test',
327
            vt_params="a",
328
            custom="b",
329
            vt_refs="c",
330
        )
331
        fs = FakeStream()
332
333
        self.daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
334
        response = fs.get_response()
335
336
        # The status of the response must be success (i.e. 200)
337
        self.assertEqual(response.get('status'), '200')
338
339
        # The response root element must have the correct name
340
        self.assertEqual(response.tag, 'get_vts_response')
341
342
        # The response must contain a 'vts' element
343
        self.assertIsNotNone(response.find('vts'))
344
345
        vt_params = response[0][0].findall('params')
346
        self.assertEqual(1, len(vt_params))
347
348
        custom = response[0][0].findall('custom')
349
        self.assertEqual(1, len(custom))
350
351
        refs = response.findall('vts/vt/refs/ref')
352
        self.assertEqual(2, len(refs))
353
354
    def test_get_vts_vts_with_dependencies(self):
355
        self.daemon.add_vt(
356
            '1.2.3.4',
357
            'A vulnerability test',
358
            vt_params="a",
359
            custom="b",
360
            vt_dependencies="c",
361
        )
362
        fs = FakeStream()
363
364
        self.daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
365
366
        response = fs.get_response()
367
368
        deps = response.findall('vts/vt/dependencies/dependency')
369
        self.assertEqual(2, len(deps))
370
371
    def test_get_vts_vts_with_severities(self):
372
        self.daemon.add_vt(
373
            '1.2.3.4',
374
            'A vulnerability test',
375
            vt_params="a",
376
            custom="b",
377
            severities="c",
378
        )
379
        fs = FakeStream()
380
381
        self.daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
382
        response = fs.get_response()
383
384
        severity = response.findall('vts/vt/severities/severity')
385
        self.assertEqual(1, len(severity))
386
387
    def test_get_vts_vts_with_detection_qodt(self):
388
        self.daemon.add_vt(
389
            '1.2.3.4',
390
            'A vulnerability test',
391
            vt_params="a",
392
            custom="b",
393
            detection="c",
394
            qod_t="d",
395
        )
396
        fs = FakeStream()
397
398
        self.daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
399
        response = fs.get_response()
400
401
        detection = response.findall('vts/vt/detection')
402
        self.assertEqual(1, len(detection))
403
404
    def test_get_vts_vts_with_detection_qodv(self):
405
        self.daemon.add_vt(
406
            '1.2.3.4',
407
            'A vulnerability test',
408
            vt_params="a",
409
            custom="b",
410
            detection="c",
411
            qod_v="d",
412
        )
413
        fs = FakeStream()
414
415
        self.daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
416
        response = fs.get_response()
417
418
        detection = response.findall('vts/vt/detection')
419
        self.assertEqual(1, len(detection))
420
421
    def test_get_vts_vts_with_summary(self):
422
        self.daemon.add_vt(
423
            '1.2.3.4',
424
            'A vulnerability test',
425
            vt_params="a",
426
            custom="b",
427
            summary="c",
428
        )
429
        fs = FakeStream()
430
431
        self.daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
432
        response = fs.get_response()
433
434
        summary = response.findall('vts/vt/summary')
435
        self.assertEqual(1, len(summary))
436
437
    def test_get_vts_vts_with_impact(self):
438
        self.daemon.add_vt(
439
            '1.2.3.4',
440
            'A vulnerability test',
441
            vt_params="a",
442
            custom="b",
443
            impact="c",
444
        )
445
        fs = FakeStream()
446
447
        self.daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
448
        response = fs.get_response()
449
450
        impact = response.findall('vts/vt/impact')
451
        self.assertEqual(1, len(impact))
452
453
    def test_get_vts_vts_with_affected(self):
454
        self.daemon.add_vt(
455
            '1.2.3.4',
456
            'A vulnerability test',
457
            vt_params="a",
458
            custom="b",
459
            affected="c",
460
        )
461
        fs = FakeStream()
462
463
        self.daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
464
        response = fs.get_response()
465
466
        affect = response.findall('vts/vt/affected')
467
        self.assertEqual(1, len(affect))
468
469
    def test_get_vts_vts_with_insight(self):
470
        self.daemon.add_vt(
471
            '1.2.3.4',
472
            'A vulnerability test',
473
            vt_params="a",
474
            custom="b",
475
            insight="c",
476
        )
477
        fs = FakeStream()
478
479
        self.daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
480
        response = fs.get_response()
481
482
        insight = response.findall('vts/vt/insight')
483
        self.assertEqual(1, len(insight))
484
485
    def test_get_vts_vts_with_solution(self):
486
        self.daemon.add_vt(
487
            '1.2.3.4',
488
            'A vulnerability test',
489
            vt_params="a",
490
            custom="b",
491
            solution="c",
492
            solution_t="d",
493
            solution_m="e",
494
        )
495
        fs = FakeStream()
496
497
        self.daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
498
        response = fs.get_response()
499
500
        solution = response.findall('vts/vt/solution')
501
        self.assertEqual(1, len(solution))
502
503
    def test_get_vts_vts_with_ctime(self):
504
        self.daemon.add_vt(
505
            '1.2.3.4',
506
            'A vulnerability test',
507
            vt_params="a",
508
            vt_creation_time='01-01-1900',
509
        )
510
        fs = FakeStream()
511
512
        self.daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
513
        response = fs.get_response()
514
515
        creation_time = response.findall('vts/vt/creation_time')
516
        self.assertEqual(
517
            '<creation_time>01-01-1900</creation_time>',
518
            ET.tostring(creation_time[0]).decode('utf-8'),
519
        )
520
521
    def test_get_vts_vts_with_mtime(self):
522
        self.daemon.add_vt(
523
            '1.2.3.4',
524
            'A vulnerability test',
525
            vt_params="a",
526
            vt_modification_time='02-01-1900',
527
        )
528
        fs = FakeStream()
529
530
        self.daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
531
        response = fs.get_response()
532
533
        modification_time = response.findall('vts/vt/modification_time')
534
        self.assertEqual(
535
            '<modification_time>02-01-1900</modification_time>',
536
            ET.tostring(modification_time[0]).decode('utf-8'),
537
        )
538
539
    def test_clean_forgotten_scans(self):
540
        fs = FakeStream()
541
542
        self.daemon.handle_command(
543
            '<start_scan target="localhost" ports="80, '
544
            '443"><scanner_params /></start_scan>',
545
            fs,
546
        )
547
        response = fs.get_response()
548
549
        scan_id = response.findtext('id')
550
551
        finished = False
552
553
        self.daemon.start_queued_scans()
554
        while not finished:
555
            fs = FakeStream()
556
            self.daemon.handle_command(
557
                '<get_scans scan_id="%s" details="1"/>' % scan_id, fs
558
            )
559
            response = fs.get_response()
560
561
            scans = response.findall('scan')
562
            self.assertEqual(1, len(scans))
563
564
            scan = scans[0]
565
566
            if scan.get('end_time') != '0':
567
                finished = True
568
            else:
569
                time.sleep(0.01)
570
571
            fs = FakeStream()
572
            self.daemon.handle_command(
573
                '<get_scans scan_id="%s" details="1"/>' % scan_id, fs
574
            )
575
            response = fs.get_response()
576
577
        self.assertEqual(
578
            len(list(self.daemon.scan_collection.ids_iterator())), 1
579
        )
580
581
        # Set an old end_time
582
        self.daemon.scan_collection.scans_table[scan_id]['end_time'] = 123456
583
        # Run the check
584
        self.daemon.clean_forgotten_scans()
585
        # Not removed
586
        self.assertEqual(
587
            len(list(self.daemon.scan_collection.ids_iterator())), 1
588
        )
589
590
        # Set the max time and run again
591
        self.daemon.scaninfo_store_time = 1
592
        self.daemon.clean_forgotten_scans()
593
        # Now is removed
594
        self.assertEqual(
595
            len(list(self.daemon.scan_collection.ids_iterator())), 0
596
        )
597
598
    def test_scan_with_error(self):
599
        fs = FakeStream()
600
601
        self.daemon.handle_command(
602
            '<start_scan target="localhost" ports="80, '
603
            '443"><scanner_params /></start_scan>',
604
            fs,
605
        )
606
607
        response = fs.get_response()
608
        scan_id = response.findtext('id')
609
        finished = False
610
        self.daemon.start_queued_scans()
611
        self.daemon.add_scan_error(
612
            scan_id, host='a', value='something went wrong'
613
        )
614
615
        while not finished:
616
            fs = FakeStream()
617
            self.daemon.handle_command(
618
                '<get_scans scan_id="%s" details="1"/>' % scan_id, fs
619
            )
620
            response = fs.get_response()
621
622
            scans = response.findall('scan')
623
            self.assertEqual(1, len(scans))
624
625
            scan = scans[0]
626
            status = scan.get('status')
627
628
            if status == "init" or status == "running":
629
                self.assertEqual('0', scan.get('end_time'))
630
                time.sleep(0.010)
631
            else:
632
                finished = True
633
634
            fs = FakeStream()
635
636
            self.daemon.handle_command(
637
                '<get_scans scan_id="%s" details="1"/>' % scan_id, fs
638
            )
639
            response = fs.get_response()
640
641
        self.assertEqual(
642
            response.findtext('scan/results/result'), 'something went wrong'
643
        )
644
        fs = FakeStream()
645
        self.daemon.handle_command('<delete_scan scan_id="%s" />' % scan_id, fs)
646
        response = fs.get_response()
647
648
        self.assertEqual(response.get('status'), '200')
649
650
    def test_get_scan_pop(self):
651
        fs = FakeStream()
652
653
        self.daemon.handle_command(
654
            '<start_scan target="localhost" ports="80, 443">'
655
            '<scanner_params /></start_scan>',
656
            fs,
657
        )
658
        self.daemon.start_queued_scans()
659
        response = fs.get_response()
660
661
        scan_id = response.findtext('id')
662
        self.daemon.add_scan_host_detail(
663
            scan_id, host='a', value='Some Host Detail'
664
        )
665
666
        time.sleep(1)
667
668
        fs = FakeStream()
669
        self.daemon.handle_command('<get_scans scan_id="%s"/>' % scan_id, fs)
670
        response = fs.get_response()
671
672
        self.assertEqual(
673
            response.findtext('scan/results/result'), 'Some Host Detail'
674
        )
675
        fs = FakeStream()
676
        self.daemon.handle_command(
677
            '<get_scans scan_id="%s" pop_results="1"/>' % scan_id, fs
678
        )
679
        response = fs.get_response()
680
681
        self.assertEqual(
682
            response.findtext('scan/results/result'), 'Some Host Detail'
683
        )
684
685
        fs = FakeStream()
686
        self.daemon.handle_command(
687
            '<get_scans scan_id="%s" details="0" pop_results="1"/>' % scan_id,
688
            fs,
689
        )
690
        response = fs.get_response()
691
692
        self.assertEqual(response.findtext('scan/results/result'), None)
693
694
    def test_get_scan_pop_max_res(self):
695
        fs = FakeStream()
696
        self.daemon.handle_command(
697
            '<start_scan target="localhost" ports="80, 443">'
698
            '<scanner_params /></start_scan>',
699
            fs,
700
        )
701
        self.daemon.start_queued_scans()
702
        response = fs.get_response()
703
        scan_id = response.findtext('id')
704
705
        self.daemon.add_scan_log(scan_id, host='a', name='a')
706
        self.daemon.add_scan_log(scan_id, host='c', name='c')
707
        self.daemon.add_scan_log(scan_id, host='b', name='b')
708
709
        fs = FakeStream()
710
        self.daemon.handle_command(
711
            '<get_scans scan_id="%s" pop_results="1" max_results="1"/>'
712
            % scan_id,
713
            fs,
714
        )
715
716
        response = fs.get_response()
717
718
        self.assertEqual(len(response.findall('scan/results/result')), 1)
719
720
        fs = FakeStream()
721
        self.daemon.handle_command(
722
            '<get_scans scan_id="%s" pop_results="1"/>' % scan_id, fs
723
        )
724
        response = fs.get_response()
725
        self.assertEqual(len(response.findall('scan/results/result')), 2)
726
727 View Code Duplication
    def test_get_scan_results_clean(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
728
        fs = FakeStream()
729
        self.daemon.handle_command(
730
            '<start_scan target="localhost" ports="80, 443">'
731
            '<scanner_params /></start_scan>',
732
            fs,
733
        )
734
        self.daemon.start_queued_scans()
735
        response = fs.get_response()
736
        scan_id = response.findtext('id')
737
738
        self.daemon.add_scan_log(scan_id, host='a', name='a')
739
        self.daemon.add_scan_log(scan_id, host='c', name='c')
740
        self.daemon.add_scan_log(scan_id, host='b', name='b')
741
742
        fs = FakeStream()
743
        self.daemon.handle_command(
744
            '<get_scans scan_id="%s" pop_results="1"/>' % scan_id, fs,
745
        )
746
747
        res_len = len(
748
            self.daemon.scan_collection.scans_table[scan_id]['results']
749
        )
750
        self.assertEqual(res_len, 0)
751
752
        res_len = len(
753
            self.daemon.scan_collection.scans_table[scan_id]['temp_results']
754
        )
755
        self.assertEqual(res_len, 0)
756
757 View Code Duplication
    def test_get_scan_results_restore(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
758
        fs = FakeStream()
759
        self.daemon.handle_command(
760
            '<start_scan target="localhost" ports="80, 443">'
761
            '<scanner_params /></start_scan>',
762
            fs,
763
        )
764
        self.daemon.start_queued_scans()
765
        response = fs.get_response()
766
        scan_id = response.findtext('id')
767
768
        self.daemon.add_scan_log(scan_id, host='a', name='a')
769
        self.daemon.add_scan_log(scan_id, host='c', name='c')
770
        self.daemon.add_scan_log(scan_id, host='b', name='b')
771
772
        fs = FakeStream(return_value=False)
773
        self.daemon.handle_command(
774
            '<get_scans scan_id="%s" pop_results="1"/>' % scan_id, fs,
775
        )
776
777
        res_len = len(
778
            self.daemon.scan_collection.scans_table[scan_id]['results']
779
        )
780
        self.assertEqual(res_len, 3)
781
782
        res_len = len(
783
            self.daemon.scan_collection.scans_table[scan_id]['temp_results']
784
        )
785
        self.assertEqual(res_len, 0)
786
787
    def test_billon_laughs(self):
788
        # pylint: disable=line-too-long
789
790
        lol = (
791
            '<?xml version="1.0"?>'
792
            '<!DOCTYPE lolz ['
793
            ' <!ENTITY lol "lol">'
794
            ' <!ELEMENT lolz (#PCDATA)>'
795
            ' <!ENTITY lol1 "&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;">'
796
            ' <!ENTITY lol2 "&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;">'
797
            ' <!ENTITY lol3 "&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;">'
798
            ' <!ENTITY lol4 "&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;">'
799
            ' <!ENTITY lol5 "&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;">'
800
            ' <!ENTITY lol6 "&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;">'
801
            ' <!ENTITY lol7 "&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;">'
802
            ' <!ENTITY lol8 "&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;">'
803
            ' <!ENTITY lol9 "&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;">'
804
            ']>'
805
        )
806
        fs = FakeStream()
807
        self.assertRaises(
808
            EntitiesForbidden, self.daemon.handle_command, lol, fs
809
        )
810
811
    def test_target_with_credentials(self):
812
        fs = FakeStream()
813
        self.daemon.handle_command(
814
            '<start_scan>'
815
            '<scanner_params /><vts><vt id="1.2.3.4" />'
816
            '</vts>'
817
            '<targets><target>'
818
            '<hosts>192.168.0.0/24</hosts><ports>22'
819
            '</ports><credentials>'
820
            '<credential type="up" service="ssh" port="22">'
821
            '<username>scanuser</username>'
822
            '<password>mypass</password>'
823
            '</credential><credential type="up" service="smb">'
824
            '<username>smbuser</username>'
825
            '<password>mypass</password></credential>'
826
            '</credentials>'
827
            '</target></targets>'
828
            '</start_scan>',
829
            fs,
830
        )
831
        self.daemon.start_queued_scans()
832
        response = fs.get_response()
833
834
        self.assertEqual(response.get('status'), '200')
835
836
        cred_dict = {
837
            'ssh': {
838
                'type': 'up',
839
                'password': 'mypass',
840
                'port': '22',
841
                'username': 'scanuser',
842
            },
843
            'smb': {'type': 'up', 'password': 'mypass', 'username': 'smbuser'},
844
        }
845
        scan_id = response.findtext('id')
846
        response = self.daemon.get_scan_credentials(scan_id)
847
        self.assertEqual(response, cred_dict)
848
849
    def test_scan_get_target(self):
850
        fs = FakeStream()
851
        self.daemon.handle_command(
852
            '<start_scan>'
853
            '<scanner_params /><vts><vt id="1.2.3.4" />'
854
            '</vts>'
855
            '<targets><target>'
856
            '<hosts>localhosts,192.168.0.0/24</hosts>'
857
            '<ports>80,443</ports>'
858
            '</target></targets>'
859
            '</start_scan>',
860
            fs,
861
        )
862
        self.daemon.start_queued_scans()
863
864
        response = fs.get_response()
865
        scan_id = response.findtext('id')
866
867
        fs = FakeStream()
868
        self.daemon.handle_command('<get_scans scan_id="%s"/>' % scan_id, fs)
869
        response = fs.get_response()
870
871
        scan_res = response.find('scan')
872
        self.assertEqual(scan_res.get('target'), 'localhosts,192.168.0.0/24')
873
874
    def test_scan_get_target_options(self):
875
        fs = FakeStream()
876
        self.daemon.handle_command(
877
            '<start_scan>'
878
            '<scanner_params /><vts><vt id="1.2.3.4" />'
879
            '</vts>'
880
            '<targets>'
881
            '<target><hosts>192.168.0.1</hosts>'
882
            '<ports>22</ports><alive_test>0</alive_test></target>'
883
            '</targets>'
884
            '</start_scan>',
885
            fs,
886
        )
887
        self.daemon.start_queued_scans()
888
889
        response = fs.get_response()
890
891
        scan_id = response.findtext('id')
892
        time.sleep(1)
893
        target_options = self.daemon.get_scan_target_options(scan_id)
894
        self.assertEqual(target_options, {'alive_test': '0'})
895
896
    def test_progress(self):
897
898
        fs = FakeStream()
899
        self.daemon.handle_command(
900
            '<start_scan parallel="2">'
901
            '<scanner_params />'
902
            '<targets><target>'
903
            '<hosts>localhost1, localhost2</hosts>'
904
            '<ports>22</ports>'
905
            '</target></targets>'
906
            '</start_scan>',
907
            fs,
908
        )
909
        self.daemon.start_queued_scans()
910
        response = fs.get_response()
911
912
        scan_id = response.findtext('id')
913
        self.daemon.set_scan_host_progress(scan_id, 'localhost1', 75)
914
        self.daemon.set_scan_host_progress(scan_id, 'localhost2', 25)
915
916
        self.assertEqual(
917
            self.daemon.scan_collection.calculate_target_progress(scan_id), 50
918
        )
919
920
    @patch('ospd.ospd.os')
921
    def test_interrupted_scan(self, mock_os):
922
        mock_os.setsid.return_value = None
923
        fs = FakeStream()
924
        self.daemon.handle_command(
925
            '<start_scan parallel="2">'
926
            '<scanner_params />'
927
            '<targets><target>'
928
            '<hosts>localhost1, localhost2, localhost3, localhost4</hosts>'
929
            '<ports>22</ports>'
930
            '</target></targets>'
931
            '</start_scan>',
932
            fs,
933
        )
934
        self.daemon.start_queued_scans()
935
936
        response = fs.get_response()
937
        scan_id = response.findtext('id')
938
939
        self.daemon.exec_scan = Mock(return_value=None)
940
        self.daemon.set_scan_host_progress(scan_id, 'localhost1', 5)
941
        self.daemon.set_scan_host_progress(scan_id, 'localhost2', 14)
942
        while self.daemon.get_scan_status(scan_id) == ScanStatus.INIT:
943
            fs = FakeStream()
944
            self.daemon.handle_command(
945
                '<get_scans scan_id="%s" details="0" progress="0"/>' % scan_id,
946
                fs,
947
            )
948
        response = fs.get_response()
949
        status = response.find('scan').attrib['status']
950
951
        self.assertEqual(status, ScanStatus.INTERRUPTED.name.lower())
952
953
    def test_sort_host_finished(self):
954
955
        fs = FakeStream()
956
        self.daemon.handle_command(
957
            '<start_scan parallel="2">'
958
            '<scanner_params />'
959
            '<targets><target>'
960
            '<hosts>localhost1, localhost2, localhost3, localhost4</hosts>'
961
            '<ports>22</ports>'
962
            '</target></targets>'
963
            '</start_scan>',
964
            fs,
965
        )
966
        self.daemon.start_queued_scans()
967
968
        response = fs.get_response()
969
970
        scan_id = response.findtext('id')
971
        self.daemon.set_scan_host_progress(scan_id, 'localhost3', -1)
972
        self.daemon.set_scan_host_progress(scan_id, 'localhost1', 75)
973
        self.daemon.set_scan_host_progress(scan_id, 'localhost4', 100)
974
        self.daemon.set_scan_host_progress(scan_id, 'localhost2', 25)
975
976
        self.daemon.sort_host_finished(scan_id, ['localhost3', 'localhost4'])
977
978
        rounded_progress = self.daemon.scan_collection.calculate_target_progress(  # pylint: disable=line-too-long)
979
            scan_id
980
        )
981
        self.assertEqual(rounded_progress, 66)
982
983
    def test_calculate_progress_without_current_hosts(self):
984
985
        fs = FakeStream()
986
        self.daemon.handle_command(
987
            '<start_scan parallel="2">'
988
            '<scanner_params />'
989
            '<targets><target>'
990
            '<hosts>localhost1, localhost2, localhost3, localhost4</hosts>'
991
            '<ports>22</ports>'
992
            '</target></targets>'
993
            '</start_scan>',
994
            fs,
995
        )
996
        self.daemon.start_queued_scans()
997
        response = fs.get_response()
998
999
        scan_id = response.findtext('id')
1000
        self.daemon.set_scan_host_progress(scan_id)
1001
        self.daemon.set_scan_host_progress(scan_id, 'localhost3', -1)
1002
        self.daemon.set_scan_host_progress(scan_id, 'localhost4', 100)
1003
1004
        self.daemon.sort_host_finished(scan_id, ['localhost3', 'localhost4'])
1005
1006
        float_progress = self.daemon.scan_collection.calculate_target_progress(
1007
            scan_id
1008
        )
1009
        self.assertEqual(int(float_progress), 33)
1010
1011
        self.daemon.scan_collection.set_progress(scan_id, float_progress)
1012
        progress = self.daemon.get_scan_progress(scan_id)
1013
        self.assertEqual(progress, 33)
1014
1015
    def test_get_scan_without_scanid(self):
1016
1017
        fs = FakeStream()
1018
        self.daemon.handle_command(
1019
            '<start_scan parallel="2">'
1020
            '<scanner_params />'
1021
            '<targets><target>'
1022
            '<hosts>localhost1, localhost2, localhost3, localhost4</hosts>'
1023
            '<ports>22</ports>'
1024
            '</target></targets>'
1025
            '</start_scan>',
1026
            fs,
1027
        )
1028
        self.daemon.start_queued_scans()
1029
1030
        fs = FakeStream()
1031
        self.assertRaises(
1032
            OspdCommandError,
1033
            self.daemon.handle_command,
1034
            '<get_scans details="0" progress="1"/>',
1035
            fs,
1036
        )
1037
1038
    def test_get_scan_progress_xml(self):
1039
1040
        fs = FakeStream()
1041
        self.daemon.handle_command(
1042
            '<start_scan parallel="2">'
1043
            '<scanner_params />'
1044
            '<targets><target>'
1045
            '<hosts>localhost1, localhost2, localhost3, localhost4</hosts>'
1046
            '<ports>22</ports>'
1047
            '</target></targets>'
1048
            '</start_scan>',
1049
            fs,
1050
        )
1051
        self.daemon.start_queued_scans()
1052
1053
        response = fs.get_response()
1054
        scan_id = response.findtext('id')
1055
1056
        self.daemon.set_scan_host_progress(scan_id, 'localhost3', -1)
1057
        self.daemon.set_scan_host_progress(scan_id, 'localhost4', 100)
1058
        self.daemon.sort_host_finished(scan_id, ['localhost3', 'localhost4'])
1059
1060
        self.daemon.set_scan_host_progress(scan_id, 'localhost1', 75)
1061
        self.daemon.set_scan_host_progress(scan_id, 'localhost2', 25)
1062
1063
        fs = FakeStream()
1064
        self.daemon.handle_command(
1065
            '<get_scans scan_id="%s" details="0" progress="1"/>' % scan_id, fs,
1066
        )
1067
        response = fs.get_response()
1068
1069
        progress = response.find('scan/progress')
1070
1071
        overall = float(progress.findtext('overall'))
1072
        self.assertEqual(int(overall), 66)
1073
1074
        count_alive = progress.findtext('count_alive')
1075
        self.assertEqual(count_alive, '1')
1076
1077
        count_dead = progress.findtext('count_dead')
1078
        self.assertEqual(count_dead, '1')
1079
1080
        current_hosts = progress.findall('host')
1081
        self.assertEqual(len(current_hosts), 2)
1082
1083
        count_excluded = progress.findtext('count_excluded')
1084
        self.assertEqual(count_excluded, '0')
1085
1086
    def test_set_get_vts_version(self):
1087
        self.daemon.set_vts_version('1234')
1088
1089
        version = self.daemon.get_vts_version()
1090
        self.assertEqual('1234', version)
1091
1092
    def test_set_get_vts_version_error(self):
1093
        self.assertRaises(TypeError, self.daemon.set_vts_version)
1094
1095
    @patch("ospd.ospd.os")
1096
    @patch("ospd.ospd.create_process")
1097
    def test_scan_exists(self, mock_create_process, _mock_os):
1098
        fp = FakeStartProcess()
1099
        mock_create_process.side_effect = fp
1100
        mock_process = fp.call_mock
1101
        mock_process.start.side_effect = fp.run
1102
        mock_process.is_alive.return_value = True
1103
        mock_process.pid = "main-scan-process"
1104
1105
        fs = FakeStream()
1106
        self.daemon.handle_command(
1107
            '<start_scan>'
1108
            '<scanner_params />'
1109
            '<targets><target>'
1110
            '<hosts>localhost</hosts>'
1111
            '<ports>22</ports>'
1112
            '</target></targets>'
1113
            '</start_scan>',
1114
            fs,
1115
        )
1116
        response = fs.get_response()
1117
        scan_id = response.findtext('id')
1118
        self.assertIsNotNone(scan_id)
1119
1120
        status = response.get('status_text')
1121
        self.assertEqual(status, 'OK')
1122
1123
        self.daemon.start_queued_scans()
1124
1125
        assert_called(mock_create_process)
1126
        assert_called(mock_process.start)
1127
1128
        self.daemon.handle_command('<stop_scan scan_id="%s" />' % scan_id, fs)
1129
1130
        fs = FakeStream()
1131
        cmd = (
1132
            '<start_scan scan_id="' + scan_id + '">'
1133
            '<scanner_params />'
1134
            '<targets><target>'
1135
            '<hosts>localhost</hosts>'
1136
            '<ports>22</ports>'
1137
            '</target></targets>'
1138
            '</start_scan>'
1139
        )
1140
1141
        self.daemon.handle_command(
1142
            cmd, fs,
1143
        )
1144
        self.daemon.start_queued_scans()
1145
1146
        response = fs.get_response()
1147
        status = response.get('status_text')
1148
        self.assertEqual(status, 'Continue')
1149
1150
    def test_result_order(self):
1151
1152
        fs = FakeStream()
1153
        self.daemon.handle_command(
1154
            '<start_scan parallel="1">'
1155
            '<scanner_params />'
1156
            '<targets><target>'
1157
            '<hosts>a</hosts>'
1158
            '<ports>22</ports>'
1159
            '</target></targets>'
1160
            '</start_scan>',
1161
            fs,
1162
        )
1163
        self.daemon.start_queued_scans()
1164
        response = fs.get_response()
1165
1166
        scan_id = response.findtext('id')
1167
1168
        self.daemon.add_scan_log(scan_id, host='a', name='a')
1169
        self.daemon.add_scan_log(scan_id, host='c', name='c')
1170
        self.daemon.add_scan_log(scan_id, host='b', name='b')
1171
        hosts = ['a', 'c', 'b']
1172
1173
        fs = FakeStream()
1174
        self.daemon.handle_command(
1175
            '<get_scans scan_id="%s" details="1"/>' % scan_id, fs
1176
        )
1177
        response = fs.get_response()
1178
1179
        results = response.findall("scan/results/")
1180
1181
        for idx, res in enumerate(results):
1182
            att_dict = res.attrib
1183
            self.assertEqual(hosts[idx], att_dict['name'])
1184
1185
    def test_batch_result(self):
1186
        reslist = ResultList()
1187
        fs = FakeStream()
1188
        self.daemon.handle_command(
1189
            '<start_scan parallel="1">'
1190
            '<scanner_params />'
1191
            '<targets><target>'
1192
            '<hosts>a</hosts>'
1193
            '<ports>22</ports>'
1194
            '</target></targets>'
1195
            '</start_scan>',
1196
            fs,
1197
        )
1198
        self.daemon.start_queued_scans()
1199
        response = fs.get_response()
1200
1201
        scan_id = response.findtext('id')
1202
        reslist.add_scan_log_to_list(host='a', name='a')
1203
        reslist.add_scan_log_to_list(host='c', name='c')
1204
        reslist.add_scan_log_to_list(host='b', name='b')
1205
        self.daemon.scan_collection.add_result_list(scan_id, reslist)
1206
1207
        hosts = ['a', 'c', 'b']
1208
1209
        fs = FakeStream()
1210
        self.daemon.handle_command(
1211
            '<get_scans scan_id="%s" details="1"/>' % scan_id, fs
1212
        )
1213
        response = fs.get_response()
1214
1215
        results = response.findall("scan/results/")
1216
1217
        for idx, res in enumerate(results):
1218
            att_dict = res.attrib
1219
            self.assertEqual(hosts[idx], att_dict['name'])
1220
1221
    def test_is_new_scan_allowed_false(self):
1222
        self.daemon.scan_processes = {  # pylint: disable=protected-access
1223
            'a': 1,
1224
            'b': 2,
1225
        }
1226
        self.daemon.max_scans = 1
1227
1228
        self.assertFalse(self.daemon.is_new_scan_allowed())
1229
1230
    def test_is_new_scan_allowed_true(self):
1231
        self.daemon.scan_processes = {  # pylint: disable=protected-access
1232
            'a': 1,
1233
            'b': 2,
1234
        }
1235
        self.daemon.max_scans = 3
1236
1237
        self.assertTrue(self.daemon.is_new_scan_allowed())
1238
1239
    def test_start_queue_scan_daemon_not_init(self):
1240
        self.daemon.get_count_queued_scans = MagicMock(return_value=10)
1241
        self.daemon.initialized = False
1242
        logging.Logger.info = Mock()
1243
        self.daemon.start_queued_scans()
1244
1245
        logging.Logger.info.assert_called_with(  # pylint: disable=no-member
1246
            "Queued task can not be started because a "
1247
            "feed update is being performed."
1248
        )
1249
1250
    @patch("ospd.ospd.psutil")
1251
    def test_free_memory_true(self, mock_psutil):
1252
        self.daemon.min_free_mem_scan_queue = 1000
1253
        # 1.5 GB free
1254
        mock_psutil.virtual_memory.return_value = FakePsutil(
1255
            available=1500000000
1256
        )
1257
1258
        self.assertTrue(self.daemon.is_enough_free_memory())
1259
1260
    @patch("ospd.ospd.psutil")
1261
    def test_free_memory_false(self, mock_psutil):
1262
        self.daemon.min_free_mem_scan_queue = 2000
1263
        # 1.5 GB free
1264
        mock_psutil.virtual_memory.return_value = FakePsutil(
1265
            available=1500000000
1266
        )
1267
1268
        self.assertFalse(self.daemon.is_enough_free_memory())
1269
1270
    def test_count_queued_scans(self):
1271
        fs = FakeStream()
1272
        self.daemon.handle_command(
1273
            '<start_scan>'
1274
            '<scanner_params /><vts><vt id="1.2.3.4" />'
1275
            '</vts>'
1276
            '<targets><target>'
1277
            '<hosts>localhosts,192.168.0.0/24</hosts>'
1278
            '<ports>80,443</ports>'
1279
            '</target></targets>'
1280
            '</start_scan>',
1281
            fs,
1282
        )
1283
1284
        self.assertEqual(self.daemon.get_count_queued_scans(), 1)
1285
        self.daemon.start_queued_scans()
1286
        self.assertEqual(self.daemon.get_count_queued_scans(), 0)
1287