Completed
Push — master ( 22ac2f...f571c7 )
by Juan José
17s queued 12s
created

tests.test_scan_and_result   C

Complexity

Total Complexity 53

Size/Duplication

Total Lines 998
Duplicated Lines 22.95 %

Importance

Changes 0
Metric Value
eloc 630
dl 229
loc 998
rs 6.93
c 0
b 0
f 0
wmc 53

44 Methods

Rating   Name   Duplication   Size   Complexity  
A FakeStartProcess.run() 0 3 1
A FakeStartProcess.__repr__() 0 3 1
A Result.__init__() 0 12 2
A FakeStartProcess.__init__() 0 7 1
A FakeStartProcess.__call__() 0 5 1
A ScanTestCase.test_get_default_scanner_version() 0 8 1
A ScanTestCase.test_get_vts_single_vt() 0 14 1
A ScanTestCase.test_get_vts_no_vt() 0 9 1
A ScanTestCase.test_get_default_help() 0 14 1
A ScanTestCase.test_get_default_scanner_params() 0 13 1
A ScanTestCase.test_get_vts_vts_with_dependencies() 0 17 1
A ScanTestCase.test_get_vts_vts_with_params() 26 26 1
A ScanTestCase.test_get_vts_vts_with_impact() 0 16 1
A ScanTestCase.test_get_vts_vts_with_refs() 31 31 1
A ScanTestCase.test_get_vts_vts_with_ctime() 0 17 1
A ScanTestCase.test_get_vts_still_not_init() 0 8 1
A ScanTestCase.test_get_vts_vts_with_summary() 0 16 1
A ScanTestCase.test_get_vts_vts_with_severities() 0 16 1
A ScanTestCase.test_get_vts_vts_with_solution() 0 18 1
A ScanTestCase.test_get_vts_vts_with_affected() 0 16 1
A ScanTestCase.test_get_vts_multiple_vts_with_custom() 0 15 1
A ScanTestCase.test_get_scan_pop_max_res() 36 36 1
A ScanTestCase.test_billon_laughs() 0 21 1
A ScanTestCase.test_get_vts_filter_negative() 26 26 1
A ScanTestCase.test_scan_with_error() 25 46 4
A ScanTestCase.test_get_vts_filter_positive() 26 26 1
A ScanTestCase.test_get_vts_vts_with_detection_qodv() 0 17 1
A ScanTestCase.test_clean_forgotten_scans() 23 53 3
A ScanTestCase.test_get_vts_vts_with_insight() 0 16 1
A ScanTestCase.test_scan_get_target_options() 0 20 1
A ScanTestCase.test_target_with_credentials() 0 37 1
A ScanTestCase.test_get_vts_vts_with_mtime() 0 17 1
A ScanTestCase.test_scan_get_target() 0 23 1
A ScanTestCase.test_get_help_still_not_init() 0 8 1
A ScanTestCase.test_get_vtss_multiple_vts() 0 14 1
A ScanTestCase.test_get_vts_vts_with_detection_qodt() 0 17 1
A ScanTestCase.test_get_scan_pop() 36 36 1
A ScanTestCase.test_set_get_vts_version() 0 6 1
A ScanTestCase.test_result_order() 0 32 2
B ScanTestCase.test_resume_task() 0 84 1
A ScanTestCase.test_progress() 0 22 1
A ScanTestCase.test_scan_get_finished_hosts() 0 27 2
A ScanTestCase.test_set_get_vts_version_error() 0 3 1
A ScanTestCase.test_batch_result() 0 34 2

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like tests.test_scan_and_result often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Copyright (C) 2014-2020 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: AGPL-3.0-or-later
4
#
5
# This program is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Affero General Public License as
7
# published by the Free Software Foundation, either version 3 of the
8
# License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
# pylint: disable=too-many-lines
19
20
""" Test module for scan runs
21
"""
22
23
import time
24
import unittest
25
26
from unittest.mock import patch, MagicMock
27
28
import xml.etree.ElementTree as ET
29
import defusedxml.lxml as secET
30
31
from defusedxml.common import EntitiesForbidden
32
33
from .helper import DummyWrapper, assert_called, FakeStream
34
from ospd.resultlist import ResultList
35
36
37
class FakeStartProcess:
38
    def __init__(self):
39
        self.run_mock = MagicMock()
40
        self.call_mock = MagicMock()
41
42
        self.func = None
43
        self.args = None
44
        self.kwargs = None
45
46
    def __call__(self, func, *, args=None, kwargs=None):
47
        self.func = func
48
        self.args = args or []
49
        self.kwargs = kwargs or {}
50
        return self.call_mock
51
52
    def run(self):
53
        self.func(*self.args, **self.kwargs)
54
        return self.run_mock
55
56
    def __repr__(self):
57
        return "<FakeProcess func={} args={} kwargs={}>".format(
58
            self.func, self.args, self.kwargs
59
        )
60
61
62
class Result(object):
63
    def __init__(self, type_, **kwargs):
64
        self.result_type = type_
65
        self.host = ''
66
        self.hostname = ''
67
        self.name = ''
68
        self.value = ''
69
        self.port = ''
70
        self.test_id = ''
71
        self.severity = ''
72
        self.qod = ''
73
        for name, value in kwargs.items():
74
            setattr(self, name, value)
75
76
77
class ScanTestCase(unittest.TestCase):
78
    def test_get_default_scanner_params(self):
79
        daemon = DummyWrapper([])
80
        fs = FakeStream()
81
82
        daemon.handle_command('<get_scanner_details />', fs)
83
        response = fs.get_response()
84
85
        # The status of the response must be success (i.e. 200)
86
        self.assertEqual(response.get('status'), '200')
87
        # The response root element must have the correct name
88
        self.assertEqual(response.tag, 'get_scanner_details_response')
89
        # The response must contain a 'scanner_params' element
90
        self.assertIsNotNone(response.find('scanner_params'))
91
92
    def test_get_default_help(self):
93
        daemon = DummyWrapper([])
94
        fs = FakeStream()
95
96
        daemon.handle_command('<help />', fs)
97
        response = fs.get_response()
98
        self.assertEqual(response.get('status'), '200')
99
100
        fs = FakeStream()
101
        daemon.handle_command('<help format="xml" />', fs)
102
        response = fs.get_response()
103
104
        self.assertEqual(response.get('status'), '200')
105
        self.assertEqual(response.tag, 'help_response')
106
107
    def test_get_default_scanner_version(self):
108
        daemon = DummyWrapper([])
109
        fs = FakeStream()
110
        daemon.handle_command('<get_version />', fs)
111
        response = fs.get_response()
112
113
        self.assertEqual(response.get('status'), '200')
114
        self.assertIsNotNone(response.find('protocol'))
115
116
    def test_get_vts_no_vt(self):
117
        daemon = DummyWrapper([])
118
        fs = FakeStream()
119
120
        daemon.handle_command('<get_vts />', fs)
121
        response = fs.get_response()
122
123
        self.assertEqual(response.get('status'), '200')
124
        self.assertIsNotNone(response.find('vts'))
125
126
    def test_get_vts_single_vt(self):
127
        daemon = DummyWrapper([])
128
        fs = FakeStream()
129
        daemon.add_vt('1.2.3.4', 'A vulnerability test')
130
        daemon.handle_command('<get_vts />', fs)
131
        response = fs.get_response()
132
133
        self.assertEqual(response.get('status'), '200')
134
135
        vts = response.find('vts')
136
        self.assertIsNotNone(vts.find('vt'))
137
138
        vt = vts.find('vt')
139
        self.assertEqual(vt.get('id'), '1.2.3.4')
140
141
    def test_get_vts_still_not_init(self):
142
        daemon = DummyWrapper([])
143
        fs = FakeStream()
144
        daemon.initialized = False
145
        daemon.handle_command('<get_vts />', fs)
146
        response = fs.get_response()
147
148
        self.assertEqual(response.get('status'), '400')
149
150
    def test_get_help_still_not_init(self):
151
        daemon = DummyWrapper([])
152
        fs = FakeStream()
153
        daemon.initialized = False
154
        daemon.handle_command('<help/>', fs)
155
        response = fs.get_response()
156
157
        self.assertEqual(response.get('status'), '200')
158
159 View Code Duplication
    def test_get_vts_filter_positive(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
160
        daemon = DummyWrapper([])
161
        daemon.add_vt(
162
            '1.2.3.4',
163
            'A vulnerability test',
164
            vt_params="a",
165
            vt_modification_time='19000202',
166
        )
167
        fs = FakeStream()
168
169
        daemon.handle_command(
170
            '<get_vts filter="modification_time&gt;19000201"></get_vts>', fs
171
        )
172
        response = fs.get_response()
173
174
        self.assertEqual(response.get('status'), '200')
175
        vts = response.find('vts')
176
177
        vt = vts.find('vt')
178
        self.assertIsNotNone(vt)
179
        self.assertEqual(vt.get('id'), '1.2.3.4')
180
181
        modification_time = response.findall('vts/vt/modification_time')
182
        self.assertEqual(
183
            '<modification_time>19000202</modification_time>',
184
            ET.tostring(modification_time[0]).decode('utf-8'),
185
        )
186
187 View Code Duplication
    def test_get_vts_filter_negative(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
188
        daemon = DummyWrapper([])
189
        daemon.add_vt(
190
            '1.2.3.4',
191
            'A vulnerability test',
192
            vt_params="a",
193
            vt_modification_time='19000202',
194
        )
195
        fs = FakeStream()
196
        daemon.handle_command(
197
            '<get_vts filter="modification_time&lt;19000203"></get_vts>', fs,
198
        )
199
        response = fs.get_response()
200
201
        self.assertEqual(response.get('status'), '200')
202
203
        vts = response.find('vts')
204
205
        vt = vts.find('vt')
206
        self.assertIsNotNone(vt)
207
        self.assertEqual(vt.get('id'), '1.2.3.4')
208
209
        modification_time = response.findall('vts/vt/modification_time')
210
        self.assertEqual(
211
            '<modification_time>19000202</modification_time>',
212
            ET.tostring(modification_time[0]).decode('utf-8'),
213
        )
214
215
    def test_get_vtss_multiple_vts(self):
216
        daemon = DummyWrapper([])
217
        daemon.add_vt('1.2.3.4', 'A vulnerability test')
218
        daemon.add_vt('1.2.3.5', 'Another vulnerability test')
219
        daemon.add_vt('123456789', 'Yet another vulnerability test')
220
221
        fs = FakeStream()
222
223
        daemon.handle_command('<get_vts />', fs)
224
        response = fs.get_response()
225
        self.assertEqual(response.get('status'), '200')
226
227
        vts = response.find('vts')
228
        self.assertIsNotNone(vts.find('vt'))
229
230
    def test_get_vts_multiple_vts_with_custom(self):
231
        daemon = DummyWrapper([])
232
        daemon.add_vt('1.2.3.4', 'A vulnerability test', custom='b')
233
        daemon.add_vt(
234
            '4.3.2.1', 'Another vulnerability test with custom info', custom='b'
235
        )
236
        daemon.add_vt('123456789', 'Yet another vulnerability test', custom='b')
237
        fs = FakeStream()
238
239
        daemon.handle_command('<get_vts />', fs)
240
        response = fs.get_response()
241
242
        custom = response.findall('vts/vt/custom')
243
244
        self.assertEqual(3, len(custom))
245
246 View Code Duplication
    def test_get_vts_vts_with_params(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
247
        daemon = DummyWrapper([])
248
        daemon.add_vt(
249
            '1.2.3.4', 'A vulnerability test', vt_params="a", custom="b"
250
        )
251
        fs = FakeStream()
252
253
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
254
        response = fs.get_response()
255
256
        # The status of the response must be success (i.e. 200)
257
        self.assertEqual(response.get('status'), '200')
258
259
        # The response root element must have the correct name
260
        self.assertEqual(response.tag, 'get_vts_response')
261
        # The response must contain a 'scanner_params' element
262
        self.assertIsNotNone(response.find('vts'))
263
264
        vt_params = response[0][0].findall('params')
265
        self.assertEqual(1, len(vt_params))
266
267
        custom = response[0][0].findall('custom')
268
        self.assertEqual(1, len(custom))
269
270
        params = response.findall('vts/vt/params/param')
271
        self.assertEqual(2, len(params))
272
273 View Code Duplication
    def test_get_vts_vts_with_refs(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
274
        daemon = DummyWrapper([])
275
        daemon.add_vt(
276
            '1.2.3.4',
277
            'A vulnerability test',
278
            vt_params="a",
279
            custom="b",
280
            vt_refs="c",
281
        )
282
        fs = FakeStream()
283
284
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
285
        response = fs.get_response()
286
287
        # The status of the response must be success (i.e. 200)
288
        self.assertEqual(response.get('status'), '200')
289
290
        # The response root element must have the correct name
291
        self.assertEqual(response.tag, 'get_vts_response')
292
293
        # The response must contain a 'vts' element
294
        self.assertIsNotNone(response.find('vts'))
295
296
        vt_params = response[0][0].findall('params')
297
        self.assertEqual(1, len(vt_params))
298
299
        custom = response[0][0].findall('custom')
300
        self.assertEqual(1, len(custom))
301
302
        refs = response.findall('vts/vt/refs/ref')
303
        self.assertEqual(2, len(refs))
304
305
    def test_get_vts_vts_with_dependencies(self):
306
        daemon = DummyWrapper([])
307
        daemon.add_vt(
308
            '1.2.3.4',
309
            'A vulnerability test',
310
            vt_params="a",
311
            custom="b",
312
            vt_dependencies="c",
313
        )
314
        fs = FakeStream()
315
316
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
317
318
        response = fs.get_response()
319
320
        deps = response.findall('vts/vt/dependencies/dependency')
321
        self.assertEqual(2, len(deps))
322
323
    def test_get_vts_vts_with_severities(self):
324
        daemon = DummyWrapper([])
325
        daemon.add_vt(
326
            '1.2.3.4',
327
            'A vulnerability test',
328
            vt_params="a",
329
            custom="b",
330
            severities="c",
331
        )
332
        fs = FakeStream()
333
334
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
335
        response = fs.get_response()
336
337
        severity = response.findall('vts/vt/severities/severity')
338
        self.assertEqual(1, len(severity))
339
340
    def test_get_vts_vts_with_detection_qodt(self):
341
        daemon = DummyWrapper([])
342
        daemon.add_vt(
343
            '1.2.3.4',
344
            'A vulnerability test',
345
            vt_params="a",
346
            custom="b",
347
            detection="c",
348
            qod_t="d",
349
        )
350
        fs = FakeStream()
351
352
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
353
        response = fs.get_response()
354
355
        detection = response.findall('vts/vt/detection')
356
        self.assertEqual(1, len(detection))
357
358
    def test_get_vts_vts_with_detection_qodv(self):
359
        daemon = DummyWrapper([])
360
        daemon.add_vt(
361
            '1.2.3.4',
362
            'A vulnerability test',
363
            vt_params="a",
364
            custom="b",
365
            detection="c",
366
            qod_v="d",
367
        )
368
        fs = FakeStream()
369
370
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
371
        response = fs.get_response()
372
373
        detection = response.findall('vts/vt/detection')
374
        self.assertEqual(1, len(detection))
375
376
    def test_get_vts_vts_with_summary(self):
377
        daemon = DummyWrapper([])
378
        daemon.add_vt(
379
            '1.2.3.4',
380
            'A vulnerability test',
381
            vt_params="a",
382
            custom="b",
383
            summary="c",
384
        )
385
        fs = FakeStream()
386
387
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
388
        response = fs.get_response()
389
390
        summary = response.findall('vts/vt/summary')
391
        self.assertEqual(1, len(summary))
392
393
    def test_get_vts_vts_with_impact(self):
394
        daemon = DummyWrapper([])
395
        daemon.add_vt(
396
            '1.2.3.4',
397
            'A vulnerability test',
398
            vt_params="a",
399
            custom="b",
400
            impact="c",
401
        )
402
        fs = FakeStream()
403
404
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
405
        response = fs.get_response()
406
407
        impact = response.findall('vts/vt/impact')
408
        self.assertEqual(1, len(impact))
409
410
    def test_get_vts_vts_with_affected(self):
411
        daemon = DummyWrapper([])
412
        daemon.add_vt(
413
            '1.2.3.4',
414
            'A vulnerability test',
415
            vt_params="a",
416
            custom="b",
417
            affected="c",
418
        )
419
        fs = FakeStream()
420
421
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
422
        response = fs.get_response()
423
424
        affect = response.findall('vts/vt/affected')
425
        self.assertEqual(1, len(affect))
426
427
    def test_get_vts_vts_with_insight(self):
428
        daemon = DummyWrapper([])
429
        daemon.add_vt(
430
            '1.2.3.4',
431
            'A vulnerability test',
432
            vt_params="a",
433
            custom="b",
434
            insight="c",
435
        )
436
        fs = FakeStream()
437
438
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
439
        response = fs.get_response()
440
441
        insight = response.findall('vts/vt/insight')
442
        self.assertEqual(1, len(insight))
443
444
    def test_get_vts_vts_with_solution(self):
445
        daemon = DummyWrapper([])
446
        daemon.add_vt(
447
            '1.2.3.4',
448
            'A vulnerability test',
449
            vt_params="a",
450
            custom="b",
451
            solution="c",
452
            solution_t="d",
453
            solution_m="e",
454
        )
455
        fs = FakeStream()
456
457
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
458
        response = fs.get_response()
459
460
        solution = response.findall('vts/vt/solution')
461
        self.assertEqual(1, len(solution))
462
463
    def test_get_vts_vts_with_ctime(self):
464
        daemon = DummyWrapper([])
465
        daemon.add_vt(
466
            '1.2.3.4',
467
            'A vulnerability test',
468
            vt_params="a",
469
            vt_creation_time='01-01-1900',
470
        )
471
        fs = FakeStream()
472
473
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
474
        response = fs.get_response()
475
476
        creation_time = response.findall('vts/vt/creation_time')
477
        self.assertEqual(
478
            '<creation_time>01-01-1900</creation_time>',
479
            ET.tostring(creation_time[0]).decode('utf-8'),
480
        )
481
482
    def test_get_vts_vts_with_mtime(self):
483
        daemon = DummyWrapper([])
484
        daemon.add_vt(
485
            '1.2.3.4',
486
            'A vulnerability test',
487
            vt_params="a",
488
            vt_modification_time='02-01-1900',
489
        )
490
        fs = FakeStream()
491
492
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
493
        response = fs.get_response()
494
495
        modification_time = response.findall('vts/vt/modification_time')
496
        self.assertEqual(
497
            '<modification_time>02-01-1900</modification_time>',
498
            ET.tostring(modification_time[0]).decode('utf-8'),
499
        )
500
501
    def test_clean_forgotten_scans(self):
502
        daemon = DummyWrapper([])
503
        fs = FakeStream()
504
505
        daemon.handle_command(
506
            '<start_scan target="localhost" ports="80, '
507
            '443"><scanner_params /></start_scan>',
508
            fs,
509
        )
510
        response = fs.get_response()
511
512
        scan_id = response.findtext('id')
513
514
        finished = False
515
516 View Code Duplication
        while not finished:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
517
            fs = FakeStream()
518
            daemon.handle_command(
519
                '<get_scans scan_id="%s" details="1"/>' % scan_id, fs
520
            )
521
            response = fs.get_response()
522
523
            scans = response.findall('scan')
524
            self.assertEqual(1, len(scans))
525
526
            scan = scans[0]
527
            status = scan.get('status')
528
529
            if scan.get('end_time') != '0':
530
                finished = True
531
            else:
532
                time.sleep(0.01)
533
534
            fs = FakeStream()
535
            daemon.handle_command(
536
                '<get_scans scan_id="%s" details="1"/>' % scan_id, fs
537
            )
538
            response = fs.get_response()
539
540
        self.assertEqual(len(list(daemon.scan_collection.ids_iterator())), 1)
541
542
        # Set an old end_time
543
        daemon.scan_collection.scans_table[scan_id]['end_time'] = 123456
544
        # Run the check
545
        daemon.clean_forgotten_scans()
546
        # Not removed
547
        self.assertEqual(len(list(daemon.scan_collection.ids_iterator())), 1)
548
549
        # Set the max time and run again
550
        daemon.scaninfo_store_time = 1
551
        daemon.clean_forgotten_scans()
552
        # Now is removed
553
        self.assertEqual(len(list(daemon.scan_collection.ids_iterator())), 0)
554
555
    def test_scan_with_error(self):
556
        daemon = DummyWrapper([Result('error', value='something went wrong')])
557
        fs = FakeStream()
558
559
        daemon.handle_command(
560
            '<start_scan target="localhost" ports="80, '
561
            '443"><scanner_params /></start_scan>',
562
            fs,
563
        )
564
        response = fs.get_response()
565
        scan_id = response.findtext('id')
566
        finished = False
567 View Code Duplication
        while not finished:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
568
            fs = FakeStream()
569
            daemon.handle_command(
570
                '<get_scans scan_id="%s" details="1"/>' % scan_id, fs
571
            )
572
            response = fs.get_response()
573
574
            scans = response.findall('scan')
575
            self.assertEqual(1, len(scans))
576
577
            scan = scans[0]
578
            status = scan.get('status')
579
580
            if status == "init" or status == "running":
581
                self.assertEqual('0', scan.get('end_time'))
582
                time.sleep(0.010)
583
            else:
584
                finished = True
585
586
            fs = FakeStream()
587
588
            daemon.handle_command(
589
                '<get_scans scan_id="%s" details="1"/>' % scan_id, fs
590
            )
591
            response = fs.get_response()
592
593
        self.assertEqual(
594
            response.findtext('scan/results/result'), 'something went wrong'
595
        )
596
        fs = FakeStream()
597
        daemon.handle_command('<delete_scan scan_id="%s" />' % scan_id, fs)
598
        response = fs.get_response()
599
600
        self.assertEqual(response.get('status'), '200')
601
602 View Code Duplication
    def test_get_scan_pop(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
603
        daemon = DummyWrapper([Result('host-detail', value='Some Host Detail')])
604
        fs = FakeStream()
605
606
        daemon.handle_command(
607
            '<start_scan target="localhost" ports="80, 443">'
608
            '<scanner_params /></start_scan>',
609
            fs,
610
        )
611
        response = fs.get_response()
612
613
        scan_id = response.findtext('id')
614
        time.sleep(1)
615
616
        fs = FakeStream()
617
        daemon.handle_command('<get_scans scan_id="%s"/>' % scan_id, fs)
618
        response = fs.get_response()
619
620
        self.assertEqual(
621
            response.findtext('scan/results/result'), 'Some Host Detail'
622
        )
623
        fs = FakeStream()
624
        daemon.handle_command(
625
            '<get_scans scan_id="%s" pop_results="1"/>' % scan_id, fs
626
        )
627
        response = fs.get_response()
628
629
        self.assertEqual(
630
            response.findtext('scan/results/result'), 'Some Host Detail'
631
        )
632
633
        fs = FakeStream()
634
        daemon.handle_command('<get_scans details="0" pop_results="1"/>', fs)
635
        response = fs.get_response()
636
637
        self.assertEqual(response.findtext('scan/results/result'), None)
638
639 View Code Duplication
    def test_get_scan_pop_max_res(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
640
        daemon = DummyWrapper(
641
            [
642
                Result('host-detail', value='Some Host Detail'),
643
                Result('host-detail', value='Some Host Detail1'),
644
                Result('host-detail', value='Some Host Detail2'),
645
            ]
646
        )
647
        fs = FakeStream()
648
        daemon.handle_command(
649
            '<start_scan target="localhost" ports="80, 443">'
650
            '<scanner_params /></start_scan>',
651
            fs,
652
        )
653
        response = fs.get_response()
654
655
        scan_id = response.findtext('id')
656
        time.sleep(1)
657
658
        fs = FakeStream()
659
        daemon.handle_command(
660
            '<get_scans scan_id="%s" pop_results="1" max_results="1"/>'
661
            % scan_id,
662
            fs,
663
        )
664
        response = fs.get_response()
665
666
        self.assertEqual(len(response.findall('scan/results/result')), 1)
667
668
        fs = FakeStream()
669
        daemon.handle_command(
670
            '<get_scans scan_id="%s" pop_results="1"/>' % scan_id, fs
671
        )
672
        response = fs.get_response()
673
674
        self.assertEqual(len(response.findall('scan/results/result')), 2)
675
676
    def test_billon_laughs(self):
677
        # pylint: disable=line-too-long
678
        daemon = DummyWrapper([])
679
        lol = (
680
            '<?xml version="1.0"?>'
681
            '<!DOCTYPE lolz ['
682
            ' <!ENTITY lol "lol">'
683
            ' <!ELEMENT lolz (#PCDATA)>'
684
            ' <!ENTITY lol1 "&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;">'
685
            ' <!ENTITY lol2 "&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;">'
686
            ' <!ENTITY lol3 "&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;">'
687
            ' <!ENTITY lol4 "&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;">'
688
            ' <!ENTITY lol5 "&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;">'
689
            ' <!ENTITY lol6 "&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;">'
690
            ' <!ENTITY lol7 "&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;">'
691
            ' <!ENTITY lol8 "&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;">'
692
            ' <!ENTITY lol9 "&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;">'
693
            ']>'
694
        )
695
        fs = FakeStream()
696
        self.assertRaises(EntitiesForbidden, daemon.handle_command, lol, fs)
697
698
    def test_target_with_credentials(self):
699
        daemon = DummyWrapper([])
700
        fs = FakeStream()
701
        daemon.handle_command(
702
            '<start_scan>'
703
            '<scanner_params /><vts><vt id="1.2.3.4" />'
704
            '</vts>'
705
            '<targets><target>'
706
            '<hosts>192.168.0.0/24</hosts><ports>22'
707
            '</ports><credentials>'
708
            '<credential type="up" service="ssh" port="22">'
709
            '<username>scanuser</username>'
710
            '<password>mypass</password>'
711
            '</credential><credential type="up" service="smb">'
712
            '<username>smbuser</username>'
713
            '<password>mypass</password></credential>'
714
            '</credentials>'
715
            '</target></targets>'
716
            '</start_scan>',
717
            fs,
718
        )
719
        response = fs.get_response()
720
721
        self.assertEqual(response.get('status'), '200')
722
723
        cred_dict = {
724
            'ssh': {
725
                'type': 'up',
726
                'password': 'mypass',
727
                'port': '22',
728
                'username': 'scanuser',
729
            },
730
            'smb': {'type': 'up', 'password': 'mypass', 'username': 'smbuser'},
731
        }
732
        scan_id = response.findtext('id')
733
        response = daemon.get_scan_credentials(scan_id)
734
        self.assertEqual(response, cred_dict)
735
736
    def test_scan_get_target(self):
737
        daemon = DummyWrapper([])
738
        fs = FakeStream()
739
        daemon.handle_command(
740
            '<start_scan>'
741
            '<scanner_params /><vts><vt id="1.2.3.4" />'
742
            '</vts>'
743
            '<targets><target>'
744
            '<hosts>localhosts,192.168.0.0/24</hosts>'
745
            '<ports>80,443</ports>'
746
            '</target></targets>'
747
            '</start_scan>',
748
            fs,
749
        )
750
        response = fs.get_response()
751
        scan_id = response.findtext('id')
752
753
        fs = FakeStream()
754
        daemon.handle_command('<get_scans scan_id="%s"/>' % scan_id, fs)
755
        response = fs.get_response()
756
757
        scan_res = response.find('scan')
758
        self.assertEqual(scan_res.get('target'), 'localhosts,192.168.0.0/24')
759
760
    def test_scan_get_target_options(self):
761
        daemon = DummyWrapper([])
762
        fs = FakeStream()
763
        daemon.handle_command(
764
            '<start_scan>'
765
            '<scanner_params /><vts><vt id="1.2.3.4" />'
766
            '</vts>'
767
            '<targets>'
768
            '<target><hosts>192.168.0.1</hosts>'
769
            '<ports>22</ports><alive_test>0</alive_test></target>'
770
            '</targets>'
771
            '</start_scan>',
772
            fs,
773
        )
774
        response = fs.get_response()
775
776
        scan_id = response.findtext('id')
777
        time.sleep(1)
778
        target_options = daemon.get_scan_target_options(scan_id)
779
        self.assertEqual(target_options, {'alive_test': '0'})
780
781
    def test_scan_get_finished_hosts(self):
782
        daemon = DummyWrapper([])
783
        fs = FakeStream()
784
        daemon.handle_command(
785
            '<start_scan>'
786
            '<scanner_params /><vts><vt id="1.2.3.4" />'
787
            '</vts>'
788
            '<targets><target>'
789
            '<hosts>192.168.10.20-25</hosts>'
790
            '<ports>80,443</ports>'
791
            '<finished_hosts>192.168.10.23-24'
792
            '</finished_hosts>'
793
            '</target>'
794
            '<target><hosts>192.168.0.0/24</hosts>'
795
            '<ports>22</ports></target>'
796
            '</targets>'
797
            '</start_scan>',
798
            fs,
799
        )
800
        response = fs.get_response()
801
802
        scan_id = response.findtext('id')
803
        time.sleep(1)
804
        finished = daemon.get_scan_finished_hosts(scan_id)
805
        for host in ['192.168.10.23', '192.168.10.24']:
806
            self.assertIn(host, finished)
807
        self.assertEqual(len(finished), 2)
808
809
    def test_progress(self):
810
        daemon = DummyWrapper([])
811
812
        fs = FakeStream()
813
        daemon.handle_command(
814
            '<start_scan parallel="2">'
815
            '<scanner_params />'
816
            '<targets><target>'
817
            '<hosts>localhost1, localhost2</hosts>'
818
            '<ports>22</ports>'
819
            '</target></targets>'
820
            '</start_scan>',
821
            fs,
822
        )
823
        response = fs.get_response()
824
825
        scan_id = response.findtext('id')
826
827
        daemon.set_scan_host_progress(scan_id, 'localhost1', 75)
828
        daemon.set_scan_host_progress(scan_id, 'localhost2', 25)
829
830
        self.assertEqual(daemon.calculate_progress(scan_id), 50)
831
832
    def test_set_get_vts_version(self):
833
        daemon = DummyWrapper([])
834
        daemon.set_vts_version('1234')
835
836
        version = daemon.get_vts_version()
837
        self.assertEqual('1234', version)
838
839
    def test_set_get_vts_version_error(self):
840
        daemon = DummyWrapper([])
841
        self.assertRaises(TypeError, daemon.set_vts_version)
842
843
    @patch("ospd.ospd.os")
844
    @patch("ospd.command.command.create_process")
845
    def test_resume_task(self, mock_create_process, _mock_os):
846
        daemon = DummyWrapper(
847
            [
848
                Result(
849
                    'host-detail', host='localhost', value='Some Host Detail'
850
                ),
851
                Result(
852
                    'host-detail', host='localhost', value='Some Host Detail2'
853
                ),
854
            ]
855
        )
856
857
        fp = FakeStartProcess()
858
        mock_create_process.side_effect = fp
859
        mock_process = fp.call_mock
860
        mock_process.start.side_effect = fp.run
861
        mock_process.is_alive.return_value = True
862
        mock_process.pid = "main-scan-process"
863
864
        fs = FakeStream()
865
        daemon.handle_command(
866
            '<start_scan>'
867
            '<scanner_params />'
868
            '<targets><target>'
869
            '<hosts>localhost</hosts>'
870
            '<ports>22</ports>'
871
            '</target></targets>'
872
            '</start_scan>',
873
            fs,
874
        )
875
        response = fs.get_response()
876
        scan_id = response.findtext('id')
877
878
        self.assertIsNotNone(scan_id)
879
880
        assert_called(mock_create_process)
881
        assert_called(mock_process.start)
882
883
        fs = FakeStream()
884
        daemon.handle_command('<stop_scan scan_id="%s" />' % scan_id, fs)
885
886
        fs = FakeStream()
887
        daemon.handle_command(
888
            '<get_scans scan_id="%s" details="1"/>' % scan_id, fs
889
        )
890
        response = fs.get_response()
891
892
        result = response.findall('scan/results/result')
893
        self.assertEqual(len(result), 2)
894
895
        # Resume the task
896
        cmd = (
897
            '<start_scan scan_id="{}" target="localhost" ports="80, 443">'
898
            '<scanner_params />'
899
            '</start_scan>'.format(scan_id)
900
        )
901
        fs = FakeStream()
902
        daemon.handle_command(cmd, fs)
903
        response = fs.get_response()
904
905
        # Check unfinished host
906
        self.assertEqual(response.findtext('id'), scan_id)
907
        self.assertEqual(
908
            daemon.get_scan_unfinished_hosts(scan_id), ['localhost']
909
        )
910
911
        # Finished the host and check unfinished again.
912
        daemon.set_scan_host_finished(scan_id, "localhost")
913
        self.assertEqual(len(daemon.get_scan_unfinished_hosts(scan_id)), 0)
914
915
        # Check finished hosts
916
        self.assertEqual(
917
            daemon.scan_collection.get_hosts_finished(scan_id), ['localhost']
918
        )
919
920
        # Check if the result was removed.
921
        fs = FakeStream()
922
        daemon.handle_command(
923
            '<get_scans scan_id="%s" details="1"/>' % scan_id, fs
924
        )
925
        response = fs.get_response()
926
        result = response.findall('scan/results/result')
927
928
        # current the response still contains the results
929
        # self.assertEqual(len(result), 0)
930
931
    def test_result_order(self):
932
        daemon = DummyWrapper([])
933
        fs = FakeStream()
934
        daemon.handle_command(
935
            '<start_scan parallel="1">'
936
            '<scanner_params />'
937
            '<targets><target>'
938
            '<hosts>a</hosts>'
939
            '<ports>22</ports>'
940
            '</target></targets>'
941
            '</start_scan>',
942
            fs,
943
        )
944
945
        response = fs.get_response()
946
947
        scan_id = response.findtext('id')
948
949
        daemon.add_scan_log(scan_id, host='a', name='a')
950
        daemon.add_scan_log(scan_id, host='c', name='c')
951
        daemon.add_scan_log(scan_id, host='b', name='b')
952
        hosts = ['a', 'c', 'b']
953
954
        fs = FakeStream()
955
        daemon.handle_command('<get_scans details="1"/>', fs)
956
        response = fs.get_response()
957
958
        results = response.findall("scan/results/")
959
960
        for idx, res in enumerate(results):
961
            att_dict = res.attrib
962
            self.assertEqual(hosts[idx], att_dict['name'])
963
964
    def test_batch_result(self):
965
        daemon = DummyWrapper([])
966
        reslist = ResultList()
967
        fs = FakeStream()
968
        daemon.handle_command(
969
            '<start_scan parallel="1">'
970
            '<scanner_params />'
971
            '<targets><target>'
972
            '<hosts>a</hosts>'
973
            '<ports>22</ports>'
974
            '</target></targets>'
975
            '</start_scan>',
976
            fs,
977
        )
978
979
        response = fs.get_response()
980
981
        scan_id = response.findtext('id')
982
        reslist.add_scan_log_to_list(host='a', name='a')
983
        reslist.add_scan_log_to_list(host='c', name='c')
984
        reslist.add_scan_log_to_list(host='b', name='b')
985
        daemon.scan_collection.add_result_list(scan_id, reslist)
986
987
        hosts = ['a', 'c', 'b']
988
989
        fs = FakeStream()
990
        daemon.handle_command('<get_scans details="1"/>', fs)
991
        response = fs.get_response()
992
993
        results = response.findall("scan/results/")
994
995
        for idx, res in enumerate(results):
996
            att_dict = res.attrib
997
            self.assertEqual(hosts[idx], att_dict['name'])
998