Completed
Push — master ( 494987...e33655 )
by Juan José
23s queued 10s
created

tests.test_scan_and_result   B

Complexity

Total Complexity 50

Size/Duplication

Total Lines 921
Duplicated Lines 21.28 %

Importance

Changes 0
Metric Value
eloc 529
dl 196
loc 921
rs 8.4
c 0
b 0
f 0
wmc 50

42 Methods

Rating   Name   Duplication   Size   Complexity  
A ScanTestCase.test_get_vts_vts_with_dependencies() 0 16 1
A ScanTestCase.test_get_vts_vts_with_params() 20 25 1
A FakeStartProcess.run() 0 3 1
A ScanTestCase.test_get_default_scanner_version() 0 6 1
A ScanTestCase.test_get_vts_single_vt() 0 12 1
A ScanTestCase.test_get_vts_vts_with_impact() 0 16 1
A FakeStartProcess.__repr__() 0 3 1
A ScanTestCase.test_get_vts_vts_with_refs() 24 30 1
A ScanTestCase.test_scan_multi_target() 0 18 1
A ScanTestCase.test_get_vts_vts_with_ctime() 0 17 1
A ScanTestCase.test_set_get_vts_version() 0 6 1
A ScanTestCase.test_get_vts_vts_with_summary() 0 16 1
A ScanTestCase.test_get_vts_vts_with_severities() 0 16 1
A ScanTestCase.test_get_vts_vts_with_solution() 0 18 1
A Result.__init__() 0 12 2
A ScanTestCase.test_result_order() 0 28 2
A ScanTestCase.test_get_vts_vts_with_affected() 0 16 1
A ScanTestCase.test_multi_target_with_credentials() 0 37 1
A ScanTestCase.test_get_vts_multiple_vts_with_custom() 0 12 1
A ScanTestCase.test_get_scan_pop_max_res() 33 33 1
A ScanTestCase.test_billon_laughs() 0 20 1
A ScanTestCase.test_get_vts_filter_negative() 26 26 1
A ScanTestCase.test_get_vts_no_vt() 0 6 1
A ScanTestCase.test_get_vts_filter_positive() 26 26 1
A ScanTestCase.test_scan_with_error() 17 45 4
A ScanTestCase.test_get_vts_vts_with_detection_qodv() 0 17 1
A ScanTestCase.test_clean_forgotten_scans() 17 49 4
A ScanTestCase.test_get_vts_vts_with_insight() 0 16 1
A ScanTestCase.test_scan_get_target_options() 0 18 1
A FakeStartProcess.__init__() 0 7 1
B ScanTestCase.test_resume_task() 0 80 1
A FakeStartProcess.__call__() 0 5 1
A ScanTestCase.test_progress() 0 24 1
A ScanTestCase.test_get_vts_vts_with_mtime() 0 17 1
A ScanTestCase.test_scan_get_target() 0 22 1
A ScanTestCase.test_get_default_help() 0 12 1
A ScanTestCase.test_scan_get_finished_hosts() 0 23 1
A ScanTestCase.test_set_get_vts_version_error() 0 3 1
A ScanTestCase.test_get_default_scanner_params() 0 12 1
A ScanTestCase.test_get_vtss_multiple_vts() 0 12 1
A ScanTestCase.test_get_vts_vts_with_detection_qodt() 0 17 1
A ScanTestCase.test_get_scan_pop() 33 33 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like tests.test_scan_and_result often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Copyright (C) 2015-2019 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: GPL-2.0-or-later
4
#
5
# This program is free software; you can redistribute it and/or
6
# modify it under the terms of the GNU General Public License
7
# as published by the Free Software Foundation; either version 2
8
# of the License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU General Public License for more details.
14
#
15
# You should have received a copy of the GNU General Public License
16
# along with this program; if not, write to the Free Software
17
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
19
# pylint: disable=too-many-lines
20
21
""" Test module for scan runs
22
"""
23
24
import time
25
import unittest
26
27
from unittest.mock import patch, MagicMock
28
29
import xml.etree.ElementTree as ET
30
import defusedxml.lxml as secET
31
32
from defusedxml.common import EntitiesForbidden
33
34
from .helper import DummyWrapper, assert_called
35
36
37
class FakeStartProcess:
38
    def __init__(self):
39
        self.run_mock = MagicMock()
40
        self.call_mock = MagicMock()
41
42
        self.func = None
43
        self.args = None
44
        self.kwargs = None
45
46
    def __call__(self, func, *, args=None, kwargs=None):
47
        self.func = func
48
        self.args = args or []
49
        self.kwargs = kwargs or {}
50
        return self.call_mock
51
52
    def run(self):
53
        self.func(*self.args, **self.kwargs)
54
        return self.run_mock
55
56
    def __repr__(self):
57
        return "<FakeProcess func={} args={} kwargs={}>".format(
58
            self.func, self.args, self.kwargs
59
        )
60
61
62
class Result(object):
63
    def __init__(self, type_, **kwargs):
64
        self.result_type = type_
65
        self.host = ''
66
        self.hostname = ''
67
        self.name = ''
68
        self.value = ''
69
        self.port = ''
70
        self.test_id = ''
71
        self.severity = ''
72
        self.qod = ''
73
        for name, value in kwargs.items():
74
            setattr(self, name, value)
75
76
77
class ScanTestCase(unittest.TestCase):
78
    def test_get_default_scanner_params(self):
79
        daemon = DummyWrapper([])
80
        response = secET.fromstring(
81
            daemon.handle_command('<get_scanner_details />')
82
        )
83
84
        # The status of the response must be success (i.e. 200)
85
        self.assertEqual(response.get('status'), '200')
86
        # The response root element must have the correct name
87
        self.assertEqual(response.tag, 'get_scanner_details_response')
88
        # The response must contain a 'scanner_params' element
89
        self.assertIsNotNone(response.find('scanner_params'))
90
91
    def test_get_default_help(self):
92
        daemon = DummyWrapper([])
93
        response = secET.fromstring(daemon.handle_command('<help />'))
94
95
        self.assertEqual(response.get('status'), '200')
96
97
        response = secET.fromstring(
98
            daemon.handle_command('<help format="xml" />')
99
        )
100
101
        self.assertEqual(response.get('status'), '200')
102
        self.assertEqual(response.tag, 'help_response')
103
104
    def test_get_default_scanner_version(self):
105
        daemon = DummyWrapper([])
106
        response = secET.fromstring(daemon.handle_command('<get_version />'))
107
108
        self.assertEqual(response.get('status'), '200')
109
        self.assertIsNotNone(response.find('protocol'))
110
111
    def test_get_vts_no_vt(self):
112
        daemon = DummyWrapper([])
113
        response = secET.fromstring(daemon.handle_command('<get_vts />'))
114
115
        self.assertEqual(response.get('status'), '200')
116
        self.assertIsNotNone(response.find('vts'))
117
118
    def test_get_vts_single_vt(self):
119
        daemon = DummyWrapper([])
120
        daemon.add_vt('1.2.3.4', 'A vulnerability test')
121
        response = secET.fromstring(daemon.handle_command('<get_vts />'))
122
123
        self.assertEqual(response.get('status'), '200')
124
125
        vts = response.find('vts')
126
        self.assertIsNotNone(vts.find('vt'))
127
128
        vt = vts.find('vt')
129
        self.assertEqual(vt.get('id'), '1.2.3.4')
130
131 View Code Duplication
    def test_get_vts_filter_positive(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
132
        daemon = DummyWrapper([])
133
        daemon.add_vt(
134
            '1.2.3.4',
135
            'A vulnerability test',
136
            vt_params="a",
137
            vt_modification_time='19000202',
138
        )
139
140
        response = secET.fromstring(
141
            daemon.handle_command(
142
                '<get_vts filter="modification_time&gt;19000201"></get_vts>'
143
            )
144
        )
145
146
        self.assertEqual(response.get('status'), '200')
147
        vts = response.find('vts')
148
149
        vt = vts.find('vt')
150
        self.assertIsNotNone(vt)
151
        self.assertEqual(vt.get('id'), '1.2.3.4')
152
153
        modification_time = response.findall('vts/vt/modification_time')
154
        self.assertEqual(
155
            '<modification_time>19000202</modification_time>',
156
            ET.tostring(modification_time[0]).decode('utf-8'),
157
        )
158
159 View Code Duplication
    def test_get_vts_filter_negative(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
160
        daemon = DummyWrapper([])
161
        daemon.add_vt(
162
            '1.2.3.4',
163
            'A vulnerability test',
164
            vt_params="a",
165
            vt_modification_time='19000202',
166
        )
167
168
        response = secET.fromstring(
169
            daemon.handle_command(
170
                '<get_vts filter="modification_time&lt;19000203"></get_vts>'
171
            )
172
        )
173
        self.assertEqual(response.get('status'), '200')
174
175
        vts = response.find('vts')
176
177
        vt = vts.find('vt')
178
        self.assertIsNotNone(vt)
179
        self.assertEqual(vt.get('id'), '1.2.3.4')
180
181
        modification_time = response.findall('vts/vt/modification_time')
182
        self.assertEqual(
183
            '<modification_time>19000202</modification_time>',
184
            ET.tostring(modification_time[0]).decode('utf-8'),
185
        )
186
187
    def test_get_vtss_multiple_vts(self):
188
        daemon = DummyWrapper([])
189
        daemon.add_vt('1.2.3.4', 'A vulnerability test')
190
        daemon.add_vt('1.2.3.5', 'Another vulnerability test')
191
        daemon.add_vt('123456789', 'Yet another vulnerability test')
192
193
        response = secET.fromstring(daemon.handle_command('<get_vts />'))
194
195
        self.assertEqual(response.get('status'), '200')
196
197
        vts = response.find('vts')
198
        self.assertIsNotNone(vts.find('vt'))
199
200
    def test_get_vts_multiple_vts_with_custom(self):
201
        daemon = DummyWrapper([])
202
        daemon.add_vt('1.2.3.4', 'A vulnerability test', custom='b')
203
        daemon.add_vt(
204
            '4.3.2.1', 'Another vulnerability test with custom info', custom='b'
205
        )
206
        daemon.add_vt('123456789', 'Yet another vulnerability test', custom='b')
207
208
        response = secET.fromstring(daemon.handle_command('<get_vts />'))
209
        custom = response.findall('vts/vt/custom')
210
211
        self.assertEqual(3, len(custom))
212
213 View Code Duplication
    def test_get_vts_vts_with_params(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
214
        daemon = DummyWrapper([])
215
        daemon.add_vt(
216
            '1.2.3.4', 'A vulnerability test', vt_params="a", custom="b"
217
        )
218
219
        response = secET.fromstring(
220
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
221
        )
222
        # The status of the response must be success (i.e. 200)
223
        self.assertEqual(response.get('status'), '200')
224
225
        # The response root element must have the correct name
226
        self.assertEqual(response.tag, 'get_vts_response')
227
        # The response must contain a 'scanner_params' element
228
        self.assertIsNotNone(response.find('vts'))
229
230
        vt_params = response[0][0].findall('params')
231
        self.assertEqual(1, len(vt_params))
232
233
        custom = response[0][0].findall('custom')
234
        self.assertEqual(1, len(custom))
235
236
        params = response.findall('vts/vt/params/param')
237
        self.assertEqual(2, len(params))
238
239 View Code Duplication
    def test_get_vts_vts_with_refs(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
240
        daemon = DummyWrapper([])
241
        daemon.add_vt(
242
            '1.2.3.4',
243
            'A vulnerability test',
244
            vt_params="a",
245
            custom="b",
246
            vt_refs="c",
247
        )
248
249
        response = secET.fromstring(
250
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
251
        )
252
        # The status of the response must be success (i.e. 200)
253
        self.assertEqual(response.get('status'), '200')
254
255
        # The response root element must have the correct name
256
        self.assertEqual(response.tag, 'get_vts_response')
257
258
        # The response must contain a 'vts' element
259
        self.assertIsNotNone(response.find('vts'))
260
261
        vt_params = response[0][0].findall('params')
262
        self.assertEqual(1, len(vt_params))
263
264
        custom = response[0][0].findall('custom')
265
        self.assertEqual(1, len(custom))
266
267
        refs = response.findall('vts/vt/refs/ref')
268
        self.assertEqual(2, len(refs))
269
270
    def test_get_vts_vts_with_dependencies(self):
271
        daemon = DummyWrapper([])
272
        daemon.add_vt(
273
            '1.2.3.4',
274
            'A vulnerability test',
275
            vt_params="a",
276
            custom="b",
277
            vt_dependencies="c",
278
        )
279
280
        response = secET.fromstring(
281
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
282
        )
283
284
        deps = response.findall('vts/vt/dependencies/dependency')
285
        self.assertEqual(2, len(deps))
286
287
    def test_get_vts_vts_with_severities(self):
288
        daemon = DummyWrapper([])
289
        daemon.add_vt(
290
            '1.2.3.4',
291
            'A vulnerability test',
292
            vt_params="a",
293
            custom="b",
294
            severities="c",
295
        )
296
297
        response = secET.fromstring(
298
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
299
        )
300
301
        severity = response.findall('vts/vt/severities/severity')
302
        self.assertEqual(1, len(severity))
303
304
    def test_get_vts_vts_with_detection_qodt(self):
305
        daemon = DummyWrapper([])
306
        daemon.add_vt(
307
            '1.2.3.4',
308
            'A vulnerability test',
309
            vt_params="a",
310
            custom="b",
311
            detection="c",
312
            qod_t="d",
313
        )
314
315
        response = secET.fromstring(
316
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
317
        )
318
319
        detection = response.findall('vts/vt/detection')
320
        self.assertEqual(1, len(detection))
321
322
    def test_get_vts_vts_with_detection_qodv(self):
323
        daemon = DummyWrapper([])
324
        daemon.add_vt(
325
            '1.2.3.4',
326
            'A vulnerability test',
327
            vt_params="a",
328
            custom="b",
329
            detection="c",
330
            qod_v="d",
331
        )
332
333
        response = secET.fromstring(
334
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
335
        )
336
337
        detection = response.findall('vts/vt/detection')
338
        self.assertEqual(1, len(detection))
339
340
    def test_get_vts_vts_with_summary(self):
341
        daemon = DummyWrapper([])
342
        daemon.add_vt(
343
            '1.2.3.4',
344
            'A vulnerability test',
345
            vt_params="a",
346
            custom="b",
347
            summary="c",
348
        )
349
350
        response = secET.fromstring(
351
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
352
        )
353
354
        summary = response.findall('vts/vt/summary')
355
        self.assertEqual(1, len(summary))
356
357
    def test_get_vts_vts_with_impact(self):
358
        daemon = DummyWrapper([])
359
        daemon.add_vt(
360
            '1.2.3.4',
361
            'A vulnerability test',
362
            vt_params="a",
363
            custom="b",
364
            impact="c",
365
        )
366
367
        response = secET.fromstring(
368
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
369
        )
370
371
        impact = response.findall('vts/vt/impact')
372
        self.assertEqual(1, len(impact))
373
374
    def test_get_vts_vts_with_affected(self):
375
        daemon = DummyWrapper([])
376
        daemon.add_vt(
377
            '1.2.3.4',
378
            'A vulnerability test',
379
            vt_params="a",
380
            custom="b",
381
            affected="c",
382
        )
383
384
        response = secET.fromstring(
385
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
386
        )
387
388
        affect = response.findall('vts/vt/affected')
389
        self.assertEqual(1, len(affect))
390
391
    def test_get_vts_vts_with_insight(self):
392
        daemon = DummyWrapper([])
393
        daemon.add_vt(
394
            '1.2.3.4',
395
            'A vulnerability test',
396
            vt_params="a",
397
            custom="b",
398
            insight="c",
399
        )
400
401
        response = secET.fromstring(
402
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
403
        )
404
405
        insight = response.findall('vts/vt/insight')
406
        self.assertEqual(1, len(insight))
407
408
    def test_get_vts_vts_with_solution(self):
409
        daemon = DummyWrapper([])
410
        daemon.add_vt(
411
            '1.2.3.4',
412
            'A vulnerability test',
413
            vt_params="a",
414
            custom="b",
415
            solution="c",
416
            solution_t="d",
417
            solution_m="e",
418
        )
419
420
        response = secET.fromstring(
421
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
422
        )
423
424
        solution = response.findall('vts/vt/solution')
425
        self.assertEqual(1, len(solution))
426
427
    def test_get_vts_vts_with_ctime(self):
428
        daemon = DummyWrapper([])
429
        daemon.add_vt(
430
            '1.2.3.4',
431
            'A vulnerability test',
432
            vt_params="a",
433
            vt_creation_time='01-01-1900',
434
        )
435
436
        response = secET.fromstring(
437
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
438
        )
439
440
        creation_time = response.findall('vts/vt/creation_time')
441
        self.assertEqual(
442
            '<creation_time>01-01-1900</creation_time>',
443
            ET.tostring(creation_time[0]).decode('utf-8'),
444
        )
445
446
    def test_get_vts_vts_with_mtime(self):
447
        daemon = DummyWrapper([])
448
        daemon.add_vt(
449
            '1.2.3.4',
450
            'A vulnerability test',
451
            vt_params="a",
452
            vt_modification_time='02-01-1900',
453
        )
454
455
        response = secET.fromstring(
456
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
457
        )
458
459
        modification_time = response.findall('vts/vt/modification_time')
460
        self.assertEqual(
461
            '<modification_time>02-01-1900</modification_time>',
462
            ET.tostring(modification_time[0]).decode('utf-8'),
463
        )
464
465
    def test_clean_forgotten_scans(self):
466
        daemon = DummyWrapper([])
467
468
        response = secET.fromstring(
469
            daemon.handle_command(
470
                '<start_scan target="localhost" ports="80, '
471
                '443"><scanner_params /></start_scan>'
472
            )
473
        )
474
        scan_id = response.findtext('id')
475
476
        finished = False
477 View Code Duplication
        while not finished:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
478
            response = secET.fromstring(
479
                daemon.handle_command(
480
                    '<get_scans scan_id="%s" details="1"/>' % scan_id
481
                )
482
            )
483
            scans = response.findall('scan')
484
            self.assertEqual(1, len(scans))
485
486
            scan = scans[0]
487
            status = scan.get('status')
488
489
            if status == "init" or status == "running":
490
                self.assertEqual('0', scan.get('end_time'))
491
                time.sleep(0.010)
492
            else:
493
                finished = True
494
495
        response = secET.fromstring(
496
            daemon.handle_command(
497
                '<get_scans scan_id="%s" details="1"/>' % scan_id
498
            )
499
        )
500
        self.assertEqual(len(list(daemon.scan_collection.ids_iterator())), 1)
501
502
        # Set an old end_time
503
        daemon.scan_collection.scans_table[scan_id]['end_time'] = 123456
504
        # Run the check
505
        daemon.clean_forgotten_scans()
506
        # Not removed
507
        self.assertEqual(len(list(daemon.scan_collection.ids_iterator())), 1)
508
509
        # Set the max time and run again
510
        daemon.scaninfo_store_time = 1
511
        daemon.clean_forgotten_scans()
512
        # Now is removed
513
        self.assertEqual(len(list(daemon.scan_collection.ids_iterator())), 0)
514
515
    def test_scan_with_error(self):
516
        daemon = DummyWrapper([Result('error', value='something went wrong')])
517
518
        response = secET.fromstring(
519
            daemon.handle_command(
520
                '<start_scan target="localhost" ports="80, '
521
                '443"><scanner_params /></start_scan>'
522
            )
523
        )
524
        scan_id = response.findtext('id')
525
526
        finished = False
527 View Code Duplication
        while not finished:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
528
            response = secET.fromstring(
529
                daemon.handle_command(
530
                    '<get_scans scan_id="%s" details="1"/>' % scan_id
531
                )
532
            )
533
            scans = response.findall('scan')
534
            self.assertEqual(1, len(scans))
535
536
            scan = scans[0]
537
            status = scan.get('status')
538
539
            if status == "init" or status == "running":
540
                self.assertEqual('0', scan.get('end_time'))
541
                time.sleep(0.010)
542
            else:
543
                finished = True
544
545
        response = secET.fromstring(
546
            daemon.handle_command(
547
                '<get_scans scan_id="%s" details="1"/>' % scan_id
548
            )
549
        )
550
551
        self.assertEqual(
552
            response.findtext('scan/results/result'), 'something went wrong'
553
        )
554
555
        response = secET.fromstring(
556
            daemon.handle_command('<delete_scan scan_id="%s" />' % scan_id)
557
        )
558
559
        self.assertEqual(response.get('status'), '200')
560
561 View Code Duplication
    def test_get_scan_pop(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
562
        daemon = DummyWrapper([Result('host-detail', value='Some Host Detail')])
563
564
        response = secET.fromstring(
565
            daemon.handle_command(
566
                '<start_scan target="localhost" ports="80, 443">'
567
                '<scanner_params /></start_scan>'
568
            )
569
        )
570
571
        scan_id = response.findtext('id')
572
        time.sleep(1)
573
574
        response = secET.fromstring(
575
            daemon.handle_command('<get_scans scan_id="%s"/>' % scan_id)
576
        )
577
        self.assertEqual(
578
            response.findtext('scan/results/result'), 'Some Host Detail'
579
        )
580
581
        response = secET.fromstring(
582
            daemon.handle_command(
583
                '<get_scans scan_id="%s" pop_results="1"/>' % scan_id
584
            )
585
        )
586
        self.assertEqual(
587
            response.findtext('scan/results/result'), 'Some Host Detail'
588
        )
589
590
        response = secET.fromstring(
591
            daemon.handle_command('<get_scans details="0" pop_results="1"/>')
592
        )
593
        self.assertEqual(response.findtext('scan/results/result'), None)
594
595 View Code Duplication
    def test_get_scan_pop_max_res(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
596
        daemon = DummyWrapper(
597
            [
598
                Result('host-detail', value='Some Host Detail'),
599
                Result('host-detail', value='Some Host Detail1'),
600
                Result('host-detail', value='Some Host Detail2'),
601
            ]
602
        )
603
604
        response = secET.fromstring(
605
            daemon.handle_command(
606
                '<start_scan target="localhost" ports="80, 443">'
607
                '<scanner_params /></start_scan>'
608
            )
609
        )
610
611
        scan_id = response.findtext('id')
612
        time.sleep(1)
613
614
        response = secET.fromstring(
615
            daemon.handle_command(
616
                '<get_scans scan_id="%s" pop_results="1" max_results="1"/>'
617
                % scan_id
618
            )
619
        )
620
        self.assertEqual(len(response.findall('scan/results/result')), 1)
621
622
        response = secET.fromstring(
623
            daemon.handle_command(
624
                '<get_scans scan_id="%s" pop_results="1"/>' % scan_id
625
            )
626
        )
627
        self.assertEqual(len(response.findall('scan/results/result')), 2)
628
629
    def test_billon_laughs(self):
630
        # pylint: disable=line-too-long
631
        daemon = DummyWrapper([])
632
        lol = (
633
            '<?xml version="1.0"?>'
634
            '<!DOCTYPE lolz ['
635
            ' <!ENTITY lol "lol">'
636
            ' <!ELEMENT lolz (#PCDATA)>'
637
            ' <!ENTITY lol1 "&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;">'
638
            ' <!ENTITY lol2 "&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;">'
639
            ' <!ENTITY lol3 "&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;">'
640
            ' <!ENTITY lol4 "&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;">'
641
            ' <!ENTITY lol5 "&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;">'
642
            ' <!ENTITY lol6 "&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;">'
643
            ' <!ENTITY lol7 "&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;">'
644
            ' <!ENTITY lol8 "&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;">'
645
            ' <!ENTITY lol9 "&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;">'
646
            ']>'
647
        )
648
        self.assertRaises(EntitiesForbidden, daemon.handle_command, lol)
649
650
    def test_scan_multi_target(self):
651
        daemon = DummyWrapper([])
652
        response = secET.fromstring(
653
            daemon.handle_command(
654
                '<start_scan>'
655
                '<scanner_params /><vts><vt id="1.2.3.4" />'
656
                '</vts>'
657
                '<targets><target>'
658
                '<hosts>localhosts</hosts>'
659
                '<ports>80,443</ports>'
660
                '<alive_test>0</alive_test>'
661
                '</target>'
662
                '<target><hosts>192.168.0.0/24</hosts>'
663
                '<ports>22</ports></target></targets>'
664
                '</start_scan>'
665
            )
666
        )
667
        self.assertEqual(response.get('status'), '200')
668
669
    def test_multi_target_with_credentials(self):
670
        daemon = DummyWrapper([])
671
        response = secET.fromstring(
672
            daemon.handle_command(
673
                '<start_scan>'
674
                '<scanner_params /><vts><vt id="1.2.3.4" />'
675
                '</vts>'
676
                '<targets><target><hosts>localhosts</hosts>'
677
                '<ports>80,443</ports></target><target>'
678
                '<hosts>192.168.0.0/24</hosts><ports>22'
679
                '</ports><credentials>'
680
                '<credential type="up" service="ssh" port="22">'
681
                '<username>scanuser</username>'
682
                '<password>mypass</password>'
683
                '</credential><credential type="up" service="smb">'
684
                '<username>smbuser</username>'
685
                '<password>mypass</password></credential>'
686
                '</credentials>'
687
                '</target></targets>'
688
                '</start_scan>'
689
            )
690
        )
691
692
        self.assertEqual(response.get('status'), '200')
693
694
        cred_dict = {
695
            'ssh': {
696
                'type': 'up',
697
                'password': 'mypass',
698
                'port': '22',
699
                'username': 'scanuser',
700
            },
701
            'smb': {'type': 'up', 'password': 'mypass', 'username': 'smbuser'},
702
        }
703
        scan_id = response.findtext('id')
704
        response = daemon.get_scan_credentials(scan_id, "192.168.0.0/24")
705
        self.assertEqual(response, cred_dict)
706
707
    def test_scan_get_target(self):
708
        daemon = DummyWrapper([])
709
        response = secET.fromstring(
710
            daemon.handle_command(
711
                '<start_scan>'
712
                '<scanner_params /><vts><vt id="1.2.3.4" />'
713
                '</vts>'
714
                '<targets><target>'
715
                '<hosts>localhosts</hosts>'
716
                '<ports>80,443</ports>'
717
                '</target>'
718
                '<target><hosts>192.168.0.0/24</hosts>'
719
                '<ports>22</ports></target></targets>'
720
                '</start_scan>'
721
            )
722
        )
723
        scan_id = response.findtext('id')
724
        response = secET.fromstring(
725
            daemon.handle_command('<get_scans scan_id="%s"/>' % scan_id)
726
        )
727
        scan_res = response.find('scan')
728
        self.assertEqual(scan_res.get('target'), 'localhosts,192.168.0.0/24')
729
730
    def test_scan_get_target_options(self):
731
        daemon = DummyWrapper([])
732
        response = secET.fromstring(
733
            daemon.handle_command(
734
                '<start_scan>'
735
                '<scanner_params /><vts><vt id="1.2.3.4" />'
736
                '</vts>'
737
                '<targets>'
738
                '<target><hosts>192.168.0.1</hosts>'
739
                '<ports>22</ports><alive_test>0</alive_test></target>'
740
                '</targets>'
741
                '</start_scan>'
742
            )
743
        )
744
        scan_id = response.findtext('id')
745
        time.sleep(1)
746
        target_options = daemon.get_scan_target_options(scan_id, '192.168.0.1')
747
        self.assertEqual(target_options, {'alive_test': '0'})
748
749
    def test_scan_get_finished_hosts(self):
750
        daemon = DummyWrapper([])
751
        response = secET.fromstring(
752
            daemon.handle_command(
753
                '<start_scan>'
754
                '<scanner_params /><vts><vt id="1.2.3.4" />'
755
                '</vts>'
756
                '<targets><target>'
757
                '<hosts>192.168.10.20-25</hosts>'
758
                '<ports>80,443</ports>'
759
                '<finished_hosts>192.168.10.23-24'
760
                '</finished_hosts>'
761
                '</target>'
762
                '<target><hosts>192.168.0.0/24</hosts>'
763
                '<ports>22</ports></target>'
764
                '</targets>'
765
                '</start_scan>'
766
            )
767
        )
768
        scan_id = response.findtext('id')
769
        time.sleep(1)
770
        finished = daemon.get_scan_finished_hosts(scan_id)
771
        self.assertEqual(finished, ['192.168.10.23', '192.168.10.24'])
772
773
    def test_progress(self):
774
        daemon = DummyWrapper([])
775
776
        response = secET.fromstring(
777
            daemon.handle_command(
778
                '<start_scan parallel="2">'
779
                '<scanner_params />'
780
                '<targets><target>'
781
                '<hosts>localhost1</hosts>'
782
                '<ports>22</ports>'
783
                '</target><target>'
784
                '<hosts>localhost2</hosts>'
785
                '<ports>22</ports>'
786
                '</target></targets>'
787
                '</start_scan>'
788
            )
789
        )
790
791
        scan_id = response.findtext('id')
792
793
        daemon.set_scan_host_progress(scan_id, 'localhost1', 'localhost1', 75)
794
        daemon.set_scan_host_progress(scan_id, 'localhost2', 'localhost2', 25)
795
796
        self.assertEqual(daemon.calculate_progress(scan_id), 50)
797
798
    def test_set_get_vts_version(self):
799
        daemon = DummyWrapper([])
800
        daemon.set_vts_version('1234')
801
802
        version = daemon.get_vts_version()
803
        self.assertEqual('1234', version)
804
805
    def test_set_get_vts_version_error(self):
806
        daemon = DummyWrapper([])
807
        self.assertRaises(TypeError, daemon.set_vts_version)
808
809
    @patch("ospd.ospd.os")
810
    @patch("ospd.command.command.create_process")
811
    def test_resume_task(self, mock_create_process, _mock_os):
812
        daemon = DummyWrapper(
813
            [
814
                Result(
815
                    'host-detail', host='localhost', value='Some Host Detail'
816
                ),
817
                Result(
818
                    'host-detail', host='localhost', value='Some Host Detail2'
819
                ),
820
            ]
821
        )
822
823
        fp = FakeStartProcess()
824
        mock_create_process.side_effect = fp
825
        mock_process = fp.call_mock
826
        mock_process.start.side_effect = fp.run
827
        mock_process.is_alive.return_value = True
828
        mock_process.pid = "main-scan-process"
829
830
        response = ET.fromstring(
831
            daemon.handle_command(
832
                '<start_scan>'
833
                '<scanner_params />'
834
                '<targets><target>'
835
                '<hosts>localhost</hosts>'
836
                '<ports>22</ports>'
837
                '</target></targets>'
838
                '</start_scan>'
839
            )
840
        )
841
        scan_id = response.findtext('id')
842
843
        self.assertIsNotNone(scan_id)
844
845
        assert_called(mock_create_process)
846
        assert_called(mock_process.start)
847
848
        daemon.handle_command('<stop_scan scan_id="%s" />' % scan_id)
849
850
        response = ET.fromstring(
851
            daemon.handle_command(
852
                '<get_scans scan_id="%s" details="1"/>' % scan_id
853
            )
854
        )
855
856
        result = response.findall('scan/results/result')
857
        self.assertEqual(len(result), 2)
858
859
        # Resume the task
860
        cmd = (
861
            '<start_scan scan_id="{}" target="localhost" ports="80, 443">'
862
            '<scanner_params />'
863
            '</start_scan>'.format(scan_id)
864
        )
865
        response = ET.fromstring(daemon.handle_command(cmd))
866
867
        # Check unfinished host
868
        self.assertEqual(response.findtext('id'), scan_id)
869
        self.assertEqual(
870
            daemon.get_scan_unfinished_hosts(scan_id), ['localhost']
871
        )
872
873
        # Finished the host and check unfinished again.
874
        daemon.set_scan_host_finished(scan_id, "localhost", "localhost")
875
        self.assertEqual(len(daemon.get_scan_unfinished_hosts(scan_id)), 0)
876
877
        # Check finished hosts
878
        self.assertEqual(
879
            daemon.scan_collection.get_hosts_finished(scan_id), ['localhost']
880
        )
881
882
        # Check if the result was removed.
883
        response = ET.fromstring(
884
            daemon.handle_command(
885
                '<get_scans scan_id="%s" details="1"/>' % scan_id
886
            )
887
        )
888
        result = response.findall('scan/results/result')
889
890
        # current the response still contains the results
891
        # self.assertEqual(len(result), 0)
892
893
    def test_result_order(self):
894
        daemon = DummyWrapper([])
895
        response = secET.fromstring(
896
            daemon.handle_command(
897
                '<start_scan parallel="1">'
898
                '<scanner_params />'
899
                '<targets><target>'
900
                '<hosts>a</hosts>'
901
                '<ports>22</ports>'
902
                '</target></targets>'
903
                '</start_scan>'
904
            )
905
        )
906
907
        scan_id = response.findtext('id')
908
909
        daemon.add_scan_log(scan_id, host='a', name='a')
910
        daemon.add_scan_log(scan_id, host='c', name='c')
911
        daemon.add_scan_log(scan_id, host='b', name='b')
912
        hosts = ['a', 'c', 'b']
913
        response = secET.fromstring(
914
            daemon.handle_command('<get_scans details="1"/>')
915
        )
916
        results = response.findall("scan/results/")
917
918
        for idx, res in enumerate(results):
919
            att_dict = res.attrib
920
            self.assertEqual(hosts[idx], att_dict['name'])
921