Completed
Push — master ( 8082e9...516ce7 )
by
unknown
17s queued 13s
created

tests.test_scan_and_result   F

Complexity

Total Complexity 66

Size/Duplication

Total Lines 1001
Duplicated Lines 17.88 %

Importance

Changes 0
Metric Value
eloc 614
dl 179
loc 1001
rs 3.106
c 0
b 0
f 0
wmc 66

55 Methods

Rating   Name   Duplication   Size   Complexity  
A ScanTestCase.test_get_vts_vts_with_dependencies() 0 14 1
A DummyWrapper.get_custom_vt_as_xml_str() 0 3 1
A ScanTestCase.test_get_vts_vts_with_params() 20 20 1
A ScanTestCase.test_scan_multi_target_parallel_with_error() 0 13 1
A ScanTestCase.test_get_vts_single_vt() 0 9 1
A ScanTestCase.test_get_default_scanner_version() 0 5 1
A ScanTestCase.test_get_vts_vts_with_impact() 0 14 1
A DummyWrapper.get_creation_time_vt_as_xml_str() 0 5 1
A ScanTestCase.test_scan_get_exclude_hosts() 0 23 1
A ScanTestCase.test_get_vts_vts_with_refs() 24 24 1
A ScanTestCase.test_scan_multi_target() 0 17 1
A ScanTestCase.test_get_vts_vts_with_ctime() 0 16 1
A ScanTestCase.test_set_get_vts_version() 0 5 1
A ScanTestCase.test_get_vts_vts_with_summary() 0 14 1
A ScanTestCase.test_scan_with_vts_and_param() 53 53 1
A ScanTestCase.test_get_vts_vts_with_severities() 0 14 1
A ScanTestCase.test_get_vts_vts_with_solution() 0 15 1
A Result.__init__() 0 12 2
A ScanTestCase.test_get_vts_vts_with_affected() 0 14 1
A DummyWrapper.get_dependencies_vt_as_xml_str() 0 10 1
B DummyWrapper.exec_scan() 0 43 6
A ScanTestCase.test_multi_target_with_credentials() 0 36 1
A ScanTestCase.test_stop_scan() 0 20 1
A ScanTestCase.test_get_vts_multiple_vts_with_custom() 0 10 1
A ScanTestCase.test_get_vts_filter_negative() 22 22 1
A ScanTestCase.test_billon_laughs() 0 20 1
A ScanTestCase.test_get_vts_no_vt() 0 5 1
A ScanTestCase.test_scan_with_error() 0 40 4
A ScanTestCase.test_get_vts_filter_positive() 22 22 1
A ScanTestCase.test_scan_multi_target_parallel_100() 0 15 1
A ScanTestCase.test_get_vts_vts_with_detection_qodv() 0 15 1
A ScanTestCase.test_get_vts_vts_with_insight() 0 14 1
A DummyWrapper.get_modification_time_vt_as_xml_str() 0 7 1
A ScanTestCase.test_resume_task() 0 65 1
A DummyWrapper.get_params_vt_as_xml_str() 0 4 1
A DummyWrapper.get_impact_vt_as_xml_str() 0 5 1
A DummyWrapper.get_insight_vt_as_xml_str() 0 5 1
A DummyWrapper.__init__() 0 4 1
A ScanTestCase.test_progress() 0 20 1
A ScanTestCase.test_scan_get_target() 0 22 1
A ScanTestCase.test_get_vts_vts_with_mtime() 0 16 1
A ScanTestCase.test_get_default_help() 0 9 1
A DummyWrapper.get_summary_vt_as_xml_str() 0 5 1
A ScanTestCase.test_scan_with_vts() 38 38 1
A ScanTestCase.test_set_get_vts_version_error() 0 3 1
A DummyWrapper.get_severities_vt_as_xml_str() 0 9 1
A DummyWrapper.check() 0 2 1
A DummyWrapper.get_solution_vt_as_xml_str() 0 5 1
A DummyWrapper.get_affected_vt_as_xml_str() 0 5 1
A ScanTestCase.test_get_default_scanner_params() 0 11 1
A DummyWrapper.get_detection_vt_as_xml_str() 0 7 1
A ScanTestCase.test_get_vtss_multiple_vts() 0 9 1
A DummyWrapper.get_refs_vt_as_xml_str() 0 7 1
A ScanTestCase.test_get_vts_vts_with_detection_qodt() 0 15 1
B ScanTestCase.test_get_scan_pop() 0 62 3

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like tests.test_scan_and_result often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Copyright (C) 2015-2018 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: GPL-2.0-or-later
4
#
5
# This program is free software; you can redistribute it and/or
6
# modify it under the terms of the GNU General Public License
7
# as published by the Free Software Foundation; either version 2
8
# of the License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU General Public License for more details.
14
#
15
# You should have received a copy of the GNU General Public License
16
# along with this program; if not, write to the Free Software
17
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
19
""" Test module for scan runs
20
"""
21
22
import time
23
import unittest
24
25
import xml.etree.ElementTree as ET
26
import defusedxml.lxml as secET
27
28
from defusedxml.common import EntitiesForbidden
29
30
from ospd.ospd import OSPDaemon
31
from ospd.error import OSPDError
32
33
34
class Result(object):
35
    def __init__(self, type_, **kwargs):
36
        self.result_type = type_
37
        self.host = ''
38
        self.hostname = ''
39
        self.name = ''
40
        self.value = ''
41
        self.port = ''
42
        self.test_id = ''
43
        self.severity = ''
44
        self.qod = ''
45
        for name, value in kwargs.items():
46
            setattr(self, name, value)
47
48
49
class DummyWrapper(OSPDaemon):
50
    def __init__(self, results, checkresult=True):
51
        OSPDaemon.__init__(self, 'cert', 'key', 'ca', '10')
52
        self.checkresult = checkresult
53
        self.results = results
54
55
    def check(self):
56
        return self.checkresult
57
58
    @staticmethod
59
    def get_custom_vt_as_xml_str(vt_id, custom):
60
        return '<custom><mytest>static test</mytest></custom>'
61
62
    @staticmethod
63
    def get_params_vt_as_xml_str(vt_id, vt_params):
64
        return (
65
            '<params><param id="abc" type="string">'
66
            '<name>ABC</name><description>Test ABC</description>'
67
            '<default>yes</default></param>'
68
            '<param id="def" type="string">'
69
            '<name>DEF</name><description>Test DEF</description>'
70
            '<default>no</default></param></params>'
71
        )
72
73
    @staticmethod
74
    def get_refs_vt_as_xml_str(vt_id, vt_refs):
75
        response = (
76
            '<refs><ref type="cve" id="CVE-2010-4480"/>'
77
            '<ref type="url" id="http://example.com"/></refs>'
78
        )
79
        return response
80
81
    @staticmethod
82
    def get_dependencies_vt_as_xml_str(vt_id, vt_dependencies):
83
        response = (
84
            '<dependencies>'
85
            '<dependency vt_id="1.3.6.1.4.1.25623.1.0.50282" />'
86
            '<dependency vt_id="1.3.6.1.4.1.25623.1.0.50283" />'
87
            '</dependencies>'
88
        )
89
90
        return response
91
92
    @staticmethod
93
    def get_severities_vt_as_xml_str(vt_id, severities):
94
        response = (
95
            '<severities><severity cvss_base="5.0" cvss_'
96
            'type="cvss_base_v2">AV:N/AC:L/Au:N/C:N/I:N/'
97
            'A:P</severity></severities>'
98
        )
99
100
        return response
101
102
    @staticmethod
103
    def get_detection_vt_as_xml_str(
104
        vt_id, detection=None, qod_type=None, qod=None
105
    ):
106
        response = '<detection qod_type="package">some detection</detection>'
107
108
        return response
109
110
    @staticmethod
111
    def get_summary_vt_as_xml_str(vt_id, summary):
112
        response = '<summary>Some summary</summary>'
113
114
        return response
115
116
    @staticmethod
117
    def get_affected_vt_as_xml_str(vt_id, affected):
118
        response = '<affected>Some affected</affected>'
119
120
        return response
121
122
    @staticmethod
123
    def get_impact_vt_as_xml_str(vt_id, impact):
124
        response = '<impact>Some impact</impact>'
125
126
        return response
127
128
    @staticmethod
129
    def get_insight_vt_as_xml_str(vt_id, insight):
130
        response = '<insight>Some insight</insight>'
131
132
        return response
133
134
    @staticmethod
135
    def get_solution_vt_as_xml_str(vt_id, solution, solution_type=None):
136
        response = '<solution>Some solution</solution>'
137
138
        return response
139
140
    @staticmethod
141
    def get_creation_time_vt_as_xml_str(vt_id, creation_time): # pylint: disable=arguments-differ
142
        response = '<creation_time>%s</creation_time>' % creation_time
143
144
        return response
145
146
    @staticmethod
147
    def get_modification_time_vt_as_xml_str(vt_id, modification_time): # pylint: disable=arguments-differ
148
        response = (
149
            '<modification_time>%s</modification_time>' % modification_time
150
        )
151
152
        return response
153
154
    def exec_scan(self, scan_id, target):
155
        time.sleep(0.01)
156
        for res in self.results:
157
            if res.result_type == 'log':
158
                self.add_scan_log(
159
                    scan_id,
160
                    res.host or target,
161
                    res.hostname,
162
                    res.name,
163
                    res.value,
164
                    res.port,
165
                )
166
            if res.result_type == 'error':
167
                self.add_scan_error(
168
                    scan_id,
169
                    res.host or target,
170
                    res.hostname,
171
                    res.name,
172
                    res.value,
173
                    res.port,
174
                )
175
            elif res.result_type == 'host-detail':
176
                self.add_scan_host_detail(
177
                    scan_id,
178
                    res.host or target,
179
                    res.hostname,
180
                    res.name,
181
                    res.value,
182
                )
183
            elif res.result_type == 'alarm':
184
                self.add_scan_alarm(
185
                    scan_id,
186
                    res.host or target,
187
                    res.hostname,
188
                    res.name,
189
                    res.value,
190
                    res.port,
191
                    res.test_id,
192
                    res.severity,
193
                    res.qod,
194
                )
195
            else:
196
                raise ValueError(res.result_type)
197
198
199
class ScanTestCase(unittest.TestCase):
200
201
    def test_get_default_scanner_params(self):
202
        daemon = DummyWrapper([])
203
        response = secET.fromstring(
204
            daemon.handle_command('<get_scanner_details />')
205
        )
206
        # The status of the response must be success (i.e. 200)
207
        self.assertEqual(response.get('status'), '200')
208
        # The response root element must have the correct name
209
        self.assertEqual(response.tag, 'get_scanner_details_response')
210
        # The response must contain a 'scanner_params' element
211
        self.assertIsNotNone(response.find('scanner_params'))
212
213
    def test_get_default_help(self):
214
        daemon = DummyWrapper([])
215
        response = secET.fromstring(daemon.handle_command('<help />'))
216
        self.assertEqual(response.get('status'), '200')
217
        response = secET.fromstring(
218
            daemon.handle_command('<help format="xml" />')
219
        )
220
        self.assertEqual(response.get('status'), '200')
221
        self.assertEqual(response.tag, 'help_response')
222
223
    def test_get_default_scanner_version(self):
224
        daemon = DummyWrapper([])
225
        response = secET.fromstring(daemon.handle_command('<get_version />'))
226
        self.assertEqual(response.get('status'), '200')
227
        self.assertIsNotNone(response.find('protocol'))
228
229
    def test_get_vts_no_vt(self):
230
        daemon = DummyWrapper([])
231
        response = secET.fromstring(daemon.handle_command('<get_vts />'))
232
        self.assertEqual(response.get('status'), '200')
233
        self.assertIsNotNone(response.find('vts'))
234
235
    def test_get_vts_single_vt(self):
236
        daemon = DummyWrapper([])
237
        daemon.add_vt('1.2.3.4', 'A vulnerability test')
238
        response = secET.fromstring(daemon.handle_command('<get_vts />'))
239
        self.assertEqual(response.get('status'), '200')
240
        vts = response.find('vts')
241
        self.assertIsNotNone(vts.find('vt'))
242
        vt = vts.find('vt')
243
        self.assertEqual(vt.get('id'), '1.2.3.4')
244
245 View Code Duplication
    def test_get_vts_filter_positive(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
246
        daemon = DummyWrapper([])
247
        daemon.add_vt(
248
            '1.2.3.4',
249
            'A vulnerability test',
250
            vt_params="a",
251
            vt_modification_time='19000202',
252
        )
253
        response = secET.fromstring(
254
            daemon.handle_command(
255
                '<get_vts filter="modification_time&gt;19000201"></get_vts>'
256
            )
257
        )
258
        self.assertEqual(response.get('status'), '200')
259
        vts = response.find('vts')
260
        self.assertIsNotNone(vts.find('vt'))
261
        vt = vts.find('vt')
262
        self.assertEqual(vt.get('id'), '1.2.3.4')
263
        modification_time = response.findall('vts/vt/modification_time')
264
        self.assertEqual(
265
            '<modification_time>19000202</modification_time>',
266
            ET.tostring(modification_time[0]).decode('utf-8'),
267
        )
268
269 View Code Duplication
    def test_get_vts_filter_negative(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
270
        daemon = DummyWrapper([])
271
        daemon.add_vt(
272
            '1.2.3.4',
273
            'A vulnerability test',
274
            vt_params="a",
275
            vt_modification_time='19000202',
276
        )
277
        response = secET.fromstring(
278
            daemon.handle_command(
279
                '<get_vts filter="modification_time&lt;19000203"></get_vts>'
280
            )
281
        )
282
        self.assertEqual(response.get('status'), '200')
283
        vts = response.find('vts')
284
        self.assertIsNotNone(vts.find('vt'))
285
        vt = vts.find('vt')
286
        self.assertEqual(vt.get('id'), '1.2.3.4')
287
        modification_time = response.findall('vts/vt/modification_time')
288
        self.assertEqual(
289
            '<modification_time>19000202</modification_time>',
290
            ET.tostring(modification_time[0]).decode('utf-8'),
291
        )
292
293
    def test_get_vtss_multiple_vts(self):
294
        daemon = DummyWrapper([])
295
        daemon.add_vt('1.2.3.4', 'A vulnerability test')
296
        daemon.add_vt('some id', 'Another vulnerability test')
297
        daemon.add_vt('123456789', 'Yet another vulnerability test')
298
        response = secET.fromstring(daemon.handle_command('<get_vts />'))
299
        self.assertEqual(response.get('status'), '200')
300
        vts = response.find('vts')
301
        self.assertIsNotNone(vts.find('vt'))
302
303
    def test_get_vts_multiple_vts_with_custom(self):
304
        daemon = DummyWrapper([])
305
        daemon.add_vt('1.2.3.4', 'A vulnerability test', custom='b')
306
        daemon.add_vt(
307
            '4.3.2.1', 'Another vulnerability test with custom info', custom='b'
308
        )
309
        daemon.add_vt('123456789', 'Yet another vulnerability test', custom='b')
310
        response = secET.fromstring(daemon.handle_command('<get_vts />'))
311
        custom = response.findall('vts/vt/custom')
312
        self.assertEqual(3, len(custom))
313
314 View Code Duplication
    def test_get_vts_vts_with_params(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
315
        daemon = DummyWrapper([])
316
        daemon.add_vt(
317
            '1.2.3.4', 'A vulnerability test', vt_params="a", custom="b"
318
        )
319
        response = secET.fromstring(
320
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
321
        )
322
        # The status of the response must be success (i.e. 200)
323
        self.assertEqual(response.get('status'), '200')
324
        # The response root element must have the correct name
325
        self.assertEqual(response.tag, 'get_vts_response')
326
        # The response must contain a 'scanner_params' element
327
        self.assertIsNotNone(response.find('vts'))
328
        vt_params = response[0][0].findall('params')
329
        self.assertEqual(1, len(vt_params))
330
        custom = response[0][0].findall('custom')
331
        self.assertEqual(1, len(custom))
332
        params = response.findall('vts/vt/params/param')
333
        self.assertEqual(2, len(params))
334
335 View Code Duplication
    def test_get_vts_vts_with_refs(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
336
        daemon = DummyWrapper([])
337
        daemon.add_vt(
338
            '1.2.3.4',
339
            'A vulnerability test',
340
            vt_params="a",
341
            custom="b",
342
            vt_refs="c",
343
        )
344
        response = secET.fromstring(
345
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
346
        )
347
        # The status of the response must be success (i.e. 200)
348
        self.assertEqual(response.get('status'), '200')
349
        # The response root element must have the correct name
350
        self.assertEqual(response.tag, 'get_vts_response')
351
        # The response must contain a 'vts' element
352
        self.assertIsNotNone(response.find('vts'))
353
        vt_params = response[0][0].findall('params')
354
        self.assertEqual(1, len(vt_params))
355
        custom = response[0][0].findall('custom')
356
        self.assertEqual(1, len(custom))
357
        refs = response.findall('vts/vt/refs/ref')
358
        self.assertEqual(2, len(refs))
359
360
    def test_get_vts_vts_with_dependencies(self):
361
        daemon = DummyWrapper([])
362
        daemon.add_vt(
363
            '1.2.3.4',
364
            'A vulnerability test',
365
            vt_params="a",
366
            custom="b",
367
            vt_dependencies="c",
368
        )
369
        response = secET.fromstring(
370
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
371
        )
372
        deps = response.findall('vts/vt/dependencies/dependency')
373
        self.assertEqual(2, len(deps))
374
375
    def test_get_vts_vts_with_severities(self):
376
        daemon = DummyWrapper([])
377
        daemon.add_vt(
378
            '1.2.3.4',
379
            'A vulnerability test',
380
            vt_params="a",
381
            custom="b",
382
            severities="c",
383
        )
384
        response = secET.fromstring(
385
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
386
        )
387
        severity = response.findall('vts/vt/severities/severity')
388
        self.assertEqual(1, len(severity))
389
390
    def test_get_vts_vts_with_detection_qodt(self):
391
        daemon = DummyWrapper([])
392
        daemon.add_vt(
393
            '1.2.3.4',
394
            'A vulnerability test',
395
            vt_params="a",
396
            custom="b",
397
            detection="c",
398
            qod_t="d",
399
        )
400
        response = secET.fromstring(
401
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
402
        )
403
        detection = response.findall('vts/vt/detection')
404
        self.assertEqual(1, len(detection))
405
406
    def test_get_vts_vts_with_detection_qodv(self):
407
        daemon = DummyWrapper([])
408
        daemon.add_vt(
409
            '1.2.3.4',
410
            'A vulnerability test',
411
            vt_params="a",
412
            custom="b",
413
            detection="c",
414
            qod_v="d",
415
        )
416
        response = secET.fromstring(
417
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
418
        )
419
        detection = response.findall('vts/vt/detection')
420
        self.assertEqual(1, len(detection))
421
422
    def test_get_vts_vts_with_summary(self):
423
        daemon = DummyWrapper([])
424
        daemon.add_vt(
425
            '1.2.3.4',
426
            'A vulnerability test',
427
            vt_params="a",
428
            custom="b",
429
            summary="c",
430
        )
431
        response = secET.fromstring(
432
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
433
        )
434
        summary = response.findall('vts/vt/summary')
435
        self.assertEqual(1, len(summary))
436
437
    def test_get_vts_vts_with_impact(self):
438
        daemon = DummyWrapper([])
439
        daemon.add_vt(
440
            '1.2.3.4',
441
            'A vulnerability test',
442
            vt_params="a",
443
            custom="b",
444
            impact="c",
445
        )
446
        response = secET.fromstring(
447
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
448
        )
449
        impact = response.findall('vts/vt/impact')
450
        self.assertEqual(1, len(impact))
451
452
    def test_get_vts_vts_with_affected(self):
453
        daemon = DummyWrapper([])
454
        daemon.add_vt(
455
            '1.2.3.4',
456
            'A vulnerability test',
457
            vt_params="a",
458
            custom="b",
459
            affected="c",
460
        )
461
        response = secET.fromstring(
462
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
463
        )
464
        affect = response.findall('vts/vt/affected')
465
        self.assertEqual(1, len(affect))
466
467
    def test_get_vts_vts_with_insight(self):
468
        daemon = DummyWrapper([])
469
        daemon.add_vt(
470
            '1.2.3.4',
471
            'A vulnerability test',
472
            vt_params="a",
473
            custom="b",
474
            insight="c",
475
        )
476
        response = secET.fromstring(
477
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
478
        )
479
        insight = response.findall('vts/vt/insight')
480
        self.assertEqual(1, len(insight))
481
482
    def test_get_vts_vts_with_solution(self):
483
        daemon = DummyWrapper([])
484
        daemon.add_vt(
485
            '1.2.3.4',
486
            'A vulnerability test',
487
            vt_params="a",
488
            custom="b",
489
            solution="c",
490
            solution_t="d",
491
        )
492
        response = secET.fromstring(
493
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
494
        )
495
        solution = response.findall('vts/vt/solution')
496
        self.assertEqual(1, len(solution))
497
498
    def test_get_vts_vts_with_ctime(self):
499
        daemon = DummyWrapper([])
500
        daemon.add_vt(
501
            '1.2.3.4',
502
            'A vulnerability test',
503
            vt_params="a",
504
            vt_creation_time='01-01-1900',
505
        )
506
        response = secET.fromstring(
507
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
508
        )
509
        creation_time = response.findall('vts/vt/creation_time')
510
511
        self.assertEqual(
512
            '<creation_time>01-01-1900</creation_time>',
513
            ET.tostring(creation_time[0]).decode('utf-8'),
514
        )
515
516
    def test_get_vts_vts_with_mtime(self):
517
        daemon = DummyWrapper([])
518
        daemon.add_vt(
519
            '1.2.3.4',
520
            'A vulnerability test',
521
            vt_params="a",
522
            vt_modification_time='02-01-1900',
523
        )
524
        response = secET.fromstring(
525
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
526
        )
527
        modification_time = response.findall('vts/vt/modification_time')
528
529
        self.assertEqual(
530
            '<modification_time>02-01-1900</modification_time>',
531
            ET.tostring(modification_time[0]).decode('utf-8'),
532
        )
533
534
    def test_scan_with_error(self):
535
        daemon = DummyWrapper([Result('error', value='something went wrong')])
536
537
        response = secET.fromstring(
538
            daemon.handle_command(
539
                '<start_scan target="localhost" ports="80, '
540
                '443"><scanner_params /></start_scan>'
541
            )
542
        )
543
        scan_id = response.findtext('id')
544
545
        finished = False
546
        while not finished:
547
            response = secET.fromstring(
548
                daemon.handle_command(
549
                    '<get_scans scan_id="%s" details="1"/>' % scan_id
550
                )
551
            )
552
            scans = response.findall('scan')
553
            self.assertEqual(1, len(scans))
554
            scan = scans[0]
555
            status = scan.get('status')
556
            if status == "init" or status == "running":
557
                self.assertEqual('0', scan.get('end_time'))
558
                time.sleep(0.010)
559
            else:
560
                finished = True
561
        response = secET.fromstring(
562
            daemon.handle_command(
563
                '<get_scans scan_id="%s" details="1"/>' % scan_id
564
            )
565
        )
566
        self.assertEqual(
567
            response.findtext('scan/results/result'), 'something went wrong'
568
        )
569
570
        response = secET.fromstring(
571
            daemon.handle_command('<delete_scan scan_id="%s" />' % scan_id)
572
        )
573
        self.assertEqual(response.get('status'), '200')
574
575
    def test_get_scan_pop(self):
576
        daemon = DummyWrapper([Result('host-detail', value='Some Host Detail')])
577
578
        response = secET.fromstring(
579
            daemon.handle_command(
580
                '<start_scan target="localhost" ports="80, 443">'
581
                '<scanner_params /></start_scan>'
582
            )
583
        )
584
        scan_id = response.findtext('id')
585
        time.sleep(1)
586
587
        response = secET.fromstring(
588
            daemon.handle_command('<get_scans scan_id="%s"/>' % scan_id)
589
        )
590
        self.assertEqual(
591
            response.findtext('scan/results/result'), 'Some Host Detail'
592
        )
593
594
        response = secET.fromstring(
595
            daemon.handle_command(
596
                '<get_scans scan_id="%s" pop_results="1"/>' % scan_id
597
            )
598
        )
599
        self.assertEqual(
600
            response.findtext('scan/results/result'), 'Some Host Detail'
601
        )
602
603
        response = secET.fromstring(
604
            daemon.handle_command('<get_scans details="0" pop_results="1"/>')
605
        )
606
        self.assertEqual(response.findtext('scan/results/result'), None)
607
608
        response = secET.fromstring(
609
            daemon.handle_command(
610
                '<get_scans scan_id="%s" pop_results="1"/>' % scan_id
611
            )
612
        )
613
614
        while True:
615
            response = secET.fromstring(
616
                daemon.handle_command(
617
                    '<get_scans scan_id="%s" details="1"/>' % scan_id
618
                )
619
            )
620
            scans = response.findall('scan')
621
            self.assertEqual(1, len(scans))
622
            scan = scans[0]
623
            if scan.get('status') == "stopped":
624
                break
625
626
        scans = response.findall('scan')
627
        scan = scans[0]
628
        self.assertTrue(
629
            response.findtext('scan/results/result')
630
            in ['Scan process failure.', 'Scan stopped.']
631
        )
632
633
        response = secET.fromstring(
634
            daemon.handle_command('<delete_scan scan_id="%s" />' % scan_id)
635
        )
636
        self.assertEqual(response.get('status'), '200')
637
638
    def test_stop_scan(self):
639
        daemon = DummyWrapper([])
640
        response = secET.fromstring(
641
            daemon.handle_command(
642
                '<start_scan '
643
                'target="localhost" ports="80, 443">'
644
                '<scanner_params /></start_scan>'
645
            )
646
        )
647
        scan_id = response.findtext('id')
648
649
        # Depending on the sistem this test can end with a race condition
650
        # because the scanner is already stopped when the <stop_scan>
651
        # command is run.
652
        time.sleep(3)
653
        cmd = secET.fromstring('<stop_scan scan_id="%s" />' % scan_id)
654
        self.assertRaises(OSPDError, daemon.handle_stop_scan_command, cmd)
655
656
        cmd = secET.fromstring('<stop_scan />')
657
        self.assertRaises(OSPDError, daemon.handle_stop_scan_command, cmd)
658
659 View Code Duplication
    def test_scan_with_vts(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
660
        daemon = DummyWrapper([])
661
        cmd = secET.fromstring(
662
            '<start_scan '
663
            'target="localhost" ports="80, 443">'
664
            '<scanner_params /><vt_selection />'
665
            '</start_scan>'
666
        )
667
        self.assertRaises(OSPDError, daemon.handle_start_scan_command, cmd)
668
669
        # With one vt, without params
670
        response = secET.fromstring(
671
            daemon.handle_command(
672
                '<start_scan '
673
                'target="localhost" ports="80, 443">'
674
                '<scanner_params /><vt_selection>'
675
                '<vt_single id="1.2.3.4" />'
676
                '</vt_selection></start_scan>'
677
            )
678
        )
679
        scan_id = response.findtext('id')
680
        time.sleep(0.01)
681
        self.assertEqual(
682
            daemon.get_scan_vts(scan_id), {'1.2.3.4': {}, 'vt_groups': []}
683
        )
684
        self.assertNotEqual(daemon.get_scan_vts(scan_id), {'1.2.3.6': {}})
685
686
        # With out vtS
687
        response = secET.fromstring(
688
            daemon.handle_command(
689
                '<start_scan '
690
                'target="localhost" ports="80, 443">'
691
                '<scanner_params /></start_scan>'
692
            )
693
        )
694
        scan_id = response.findtext('id')
695
        time.sleep(0.01)
696
        self.assertEqual(daemon.get_scan_vts(scan_id), {})
697
698 View Code Duplication
    def test_scan_with_vts_and_param(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
699
        daemon = DummyWrapper([])
700
701
        # Raise because no vt_param id attribute
702
        cmd = secET.fromstring(
703
            '<start_scan '
704
            'target="localhost" ports="80, 443">'
705
            '<scanner_params /><vt_selection><vt_si'
706
            'ngle id="1234"><vt_value>200</vt_value>'
707
            '</vt_single></vt_selection></start_scan>'
708
        )
709
        self.assertRaises(OSPDError, daemon.handle_start_scan_command, cmd)
710
711
        # No error
712
        response = secET.fromstring(
713
            daemon.handle_command(
714
                '<start_scan '
715
                'target="localhost" ports="80, 443">'
716
                '<scanner_params /><vt_selection><vt'
717
                '_single id="1234"><vt_value id="ABC">200'
718
                '</vt_value></vt_single></vt_selection>'
719
                '</start_scan>'
720
            )
721
        )
722
        scan_id = response.findtext('id')
723
        time.sleep(0.01)
724
        self.assertEqual(
725
            daemon.get_scan_vts(scan_id),
726
            {'1234': {'ABC': '200'}, 'vt_groups': []},
727
        )
728
729
        # Raise because no vtgroup filter attribute
730
        cmd = secET.fromstring(
731
            '<start_scan '
732
            'target="localhost" ports="80, 443">'
733
            '<scanner_params /><vt_selection><vt_group/>'
734
            '</vt_selection></start_scan>'
735
        )
736
        self.assertRaises(OSPDError, daemon.handle_start_scan_command, cmd)
737
738
        # No error
739
        response = secET.fromstring(
740
            daemon.handle_command(
741
                '<start_scan '
742
                'target="localhost" ports="80, 443">'
743
                '<scanner_params /><vt_selection>'
744
                '<vt_group filter="a"/>'
745
                '</vt_selection></start_scan>'
746
            )
747
        )
748
        scan_id = response.findtext('id')
749
        time.sleep(0.01)
750
        self.assertEqual(daemon.get_scan_vts(scan_id), {'vt_groups': ['a']})
751
752
    def test_billon_laughs(self):
753
        # pylint: disable=line-too-long
754
        daemon = DummyWrapper([])
755
        lol = (
756
            '<?xml version="1.0"?>'
757
            '<!DOCTYPE lolz ['
758
            ' <!ENTITY lol "lol">'
759
            ' <!ELEMENT lolz (#PCDATA)>'
760
            ' <!ENTITY lol1 "&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;">'
761
            ' <!ENTITY lol2 "&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;">'
762
            ' <!ENTITY lol3 "&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;">'
763
            ' <!ENTITY lol4 "&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;">'
764
            ' <!ENTITY lol5 "&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;">'
765
            ' <!ENTITY lol6 "&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;">'
766
            ' <!ENTITY lol7 "&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;">'
767
            ' <!ENTITY lol8 "&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;">'
768
            ' <!ENTITY lol9 "&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;">'
769
            ']>'
770
        )
771
        self.assertRaises(EntitiesForbidden, daemon.handle_command, lol)
772
773
    def test_scan_multi_target(self):
774
        daemon = DummyWrapper([])
775
        response = secET.fromstring(
776
            daemon.handle_command(
777
                '<start_scan>'
778
                '<scanner_params /><vts><vt id="1.2.3.4" />'
779
                '</vts>'
780
                '<targets><target>'
781
                '<hosts>localhosts</hosts>'
782
                '<ports>80,443</ports>'
783
                '</target>'
784
                '<target><hosts>192.168.0.0/24</hosts>'
785
                '<ports>22</ports></target></targets>'
786
                '</start_scan>'
787
            )
788
        )
789
        self.assertEqual(response.get('status'), '200')
790
791
    def test_multi_target_with_credentials(self):
792
        daemon = DummyWrapper([])
793
        response = secET.fromstring(
794
            daemon.handle_command(
795
                '<start_scan>'
796
                '<scanner_params /><vts><vt id="1.2.3.4" />'
797
                '</vts>'
798
                '<targets><target><hosts>localhosts</hosts>'
799
                '<ports>80,443</ports></target><target>'
800
                '<hosts>192.168.0.0/24</hosts><ports>22'
801
                '</ports><credentials>'
802
                '<credential type="up" service="ssh" port="22">'
803
                '<username>scanuser</username>'
804
                '<password>mypass</password>'
805
                '</credential><credential type="up" service="smb">'
806
                '<username>smbuser</username>'
807
                '<password>mypass</password></credential>'
808
                '</credentials>'
809
                '</target></targets>'
810
                '</start_scan>'
811
            )
812
        )
813
814
        self.assertEqual(response.get('status'), '200')
815
        cred_dict = {
816
            'ssh': {
817
                'type': 'up',
818
                'password': 'mypass',
819
                'port': '22',
820
                'username': 'scanuser',
821
            },
822
            'smb': {'type': 'up', 'password': 'mypass', 'username': 'smbuser'},
823
        }
824
        scan_id = response.findtext('id')
825
        response = daemon.get_scan_credentials(scan_id, "192.168.0.0/24")
826
        self.assertEqual(response, cred_dict)
827
828
    def test_scan_get_target(self):
829
        daemon = DummyWrapper([])
830
        response = secET.fromstring(
831
            daemon.handle_command(
832
                '<start_scan>'
833
                '<scanner_params /><vts><vt id="1.2.3.4" />'
834
                '</vts>'
835
                '<targets><target>'
836
                '<hosts>localhosts</hosts>'
837
                '<ports>80,443</ports>'
838
                '</target>'
839
                '<target><hosts>192.168.0.0/24</hosts>'
840
                '<ports>22</ports></target></targets>'
841
                '</start_scan>'
842
            )
843
        )
844
        scan_id = response.findtext('id')
845
        response = secET.fromstring(
846
            daemon.handle_command('<get_scans scan_id="%s"/>' % scan_id)
847
        )
848
        scan_res = response.find('scan')
849
        self.assertEqual(scan_res.get('target'), 'localhosts,192.168.0.0/24')
850
851
    def test_scan_get_exclude_hosts(self):
852
        daemon = DummyWrapper([])
853
        response = secET.fromstring(
854
            daemon.handle_command(
855
                '<start_scan>'
856
                '<scanner_params /><vts><vt id="1.2.3.4" />'
857
                '</vts>'
858
                '<targets><target>'
859
                '<hosts>192.168.10.20-25</hosts>'
860
                '<ports>80,443</ports>'
861
                '<exclude_hosts>192.168.10.23-24'
862
                '</exclude_hosts>'
863
                '</target>'
864
                '<target><hosts>192.168.0.0/24</hosts>'
865
                '<ports>22</ports></target>'
866
                '</targets>'
867
                '</start_scan>'
868
            )
869
        )
870
        scan_id = response.findtext('id')
871
        time.sleep(1)
872
        finished = daemon.get_scan_finished_hosts(scan_id)
873
        self.assertEqual(finished, ['192.168.10.23', '192.168.10.24'])
874
875
    def test_scan_multi_target_parallel_with_error(self):
876
        daemon = DummyWrapper([])
877
        cmd = secET.fromstring(
878
            '<start_scan parallel="100a">'
879
            '<scanner_params />'
880
            '<targets><target>'
881
            '<hosts>localhosts</hosts>'
882
            '<ports>22</ports>'
883
            '</target></targets>'
884
            '</start_scan>'
885
        )
886
        time.sleep(1)
887
        self.assertRaises(OSPDError, daemon.handle_start_scan_command, cmd)
888
889
    def test_scan_multi_target_parallel_100(self):
890
        daemon = DummyWrapper([])
891
        response = secET.fromstring(
892
            daemon.handle_command(
893
                '<start_scan parallel="100">'
894
                '<scanner_params />'
895
                '<targets><target>'
896
                '<hosts>localhosts</hosts>'
897
                '<ports>22</ports>'
898
                '</target></targets>'
899
                '</start_scan>'
900
            )
901
        )
902
        time.sleep(1)
903
        self.assertEqual(response.get('status'), '200')
904
905
    def test_progress(self):
906
        daemon = DummyWrapper([])
907
        response = secET.fromstring(
908
            daemon.handle_command(
909
                '<start_scan parallel="2">'
910
                '<scanner_params />'
911
                '<targets><target>'
912
                '<hosts>localhost1</hosts>'
913
                '<ports>22</ports>'
914
                '</target><target>'
915
                '<hosts>localhost2</hosts>'
916
                '<ports>22</ports>'
917
                '</target></targets>'
918
                '</start_scan>'
919
            )
920
        )
921
        scan_id = response.findtext('id')
922
        daemon.set_scan_target_progress(scan_id, 'localhost1', 'localhost1', 75)
923
        daemon.set_scan_target_progress(scan_id, 'localhost2', 'localhost2', 25)
924
        self.assertEqual(daemon.calculate_progress(scan_id), 50)
925
926
    def test_set_get_vts_version(self):
927
        daemon = DummyWrapper([])
928
        daemon.set_vts_version('1234')
929
        version = daemon.get_vts_version()
930
        self.assertEqual('1234', version)
931
932
    def test_set_get_vts_version_error(self):
933
        daemon = DummyWrapper([])
934
        self.assertRaises(TypeError, daemon.set_vts_version)
935
936
    def test_resume_task(self):
937
        daemon = DummyWrapper(
938
            [
939
                Result(
940
                    'host-detail', host='localhost', value='Some Host Detail'
941
                ),
942
                Result(
943
                    'host-detail', host='localhost', value='Some Host Detail2'
944
                ),
945
            ]
946
        )
947
        response = secET.fromstring(
948
            daemon.handle_command(
949
                '<start_scan parallel="2">'
950
                '<scanner_params />'
951
                '<targets><target>'
952
                '<hosts>localhost</hosts>'
953
                '<ports>22</ports>'
954
                '</target></targets>'
955
                '</start_scan>'
956
            )
957
        )
958
        scan_id = response.findtext('id')
959
960
        time.sleep(3)
961
        cmd = secET.fromstring('<stop_scan scan_id="%s" />' % scan_id)
962
        self.assertRaises(OSPDError, daemon.handle_stop_scan_command, cmd)
963
964
        response = secET.fromstring(
965
            daemon.handle_command(
966
                '<get_scans scan_id="%s" details="1"/>' % scan_id
967
            )
968
        )
969
        result = response.findall('scan/results/result')
970
        self.assertEqual(len(result), 4)
971
972
        # Resume the task
973
        cmd = (
974
            '<start_scan scan_id="%s" target="localhost" ports="80, 443">'
975
            '<scanner_params /></start_scan>'
976
            % scan_id
977
        )
978
        response = secET.fromstring(daemon.handle_command(cmd))
979
980
        # Check unfinished host
981
        self.assertEqual(response.findtext('id'), scan_id)
982
        self.assertEqual(
983
            daemon.get_scan_unfinished_hosts(scan_id), ['localhost']
984
        )
985
        # Finished the host and check unfinished again.
986
        daemon.set_scan_host_finished(scan_id, "localhost", "localhost")
987
        self.assertEqual(daemon.get_scan_unfinished_hosts(scan_id), [])
988
        # Check finished hosts
989
        self.assertEqual(
990
            daemon.scan_collection.get_hosts_finished(scan_id), ['localhost']
991
        )
992
993
        # Check if the result was removed.
994
        response = secET.fromstring(
995
            daemon.handle_command(
996
                '<get_scans scan_id="%s" details="1"/>' % scan_id
997
            )
998
        )
999
        result = response.findall('scan/results/result')
1000
        self.assertEqual(len(result), 2)
1001