Completed
Push — master ( d1b721...16ca13 )
by Juan José
22s queued 12s
created

tests.testScanAndResult   F

Complexity

Total Complexity 66

Size/Duplication

Total Lines 815
Duplicated Lines 17.18 %

Importance

Changes 0
Metric Value
eloc 544
dl 140
loc 815
rs 3.12
c 0
b 0
f 0
wmc 66

55 Methods

Rating   Name   Duplication   Size   Complexity  
A FullTest.testStopScan() 0 17 1
A FullTest.testScanMultiTarget() 0 14 1
A FullTest.testScanWithVTs_and_param() 42 42 1
A FullTest.testScanGetTarget() 0 18 1
A FullTest.testBillonLaughs() 0 17 1
A FullTest.testScanWithVTs() 29 29 1
A FullTest.testMultiTargetWithCredentials() 0 29 1
A FullTest.testGetVTs_VTs_with_ctime() 0 12 1
A DummyWrapper.get_custom_vt_as_xml_str() 0 3 1
A FullTest.testGetVTs_VTs_with_detection_qodt() 0 12 1
B FullTest.testGetScanPop() 0 50 3
A DummyWrapper.get_creation_time_vt_as_xml_str() 0 5 1
A FullTest.testGetDefaultScannerVersion() 0 5 1
A FullTest.testGetVTs_VTs_with_impact() 0 11 1
A FullTest.testGetVTs_VTs_with_params() 16 18 1
A FullTest.testGetVTs_VTs_with_mtime() 0 12 1
A FullTest.testGetVTs_VTs_with_dependencies() 0 11 1
A DummyWrapper.get_dependencies_vt_as_xml_str() 0 8 1
A FullTest.testGetVTs_VTs_with_affected() 0 11 1
A DummyWrapper.get_insight_vt_as_xml_str() 0 5 1
A FullTest.testGetVTs_VTs_with_insight() 0 11 1
A FullTest.testGetVTs_VTs_with_solution() 0 12 1
A FullTest.testGetVTs_filter_negative() 16 16 1
A Result.__init__() 0 11 2
A FullTest.testScanWithError() 0 32 4
A DummyWrapper.get_severities_vt_as_xml_str() 0 7 1
A DummyWrapper.check() 0 2 1
A DummyWrapper.get_solution_vt_as_xml_str() 0 5 1
A FullTest.testGetVTs_multiple_VTs() 0 9 1
B DummyWrapper.exec_scan() 0 18 6
A FullTest.testGetVTs_filter_positive() 16 16 1
A DummyWrapper.get_detection_vt_as_xml_str() 0 6 1
A FullTest.testGetVTs_VTs_with_severities() 0 11 1
A FullTest.testGetVTs_no_VT() 0 5 1
A DummyWrapper.get_params_vt_as_xml_str() 0 3 1
A FullTest.testGetVTs_VTs_with_detection_qodv() 0 12 1
A FullTest.testGetDefaultHelp() 0 8 1
A DummyWrapper.get_impact_vt_as_xml_str() 0 5 1
A FullTest.testGetVTs_VTs_with_refs() 21 21 1
A FullTest.testGetVTs_VTs_with_summary() 0 11 1
A DummyWrapper.get_affected_vt_as_xml_str() 0 5 1
A DummyWrapper.get_summary_vt_as_xml_str() 0 5 1
A DummyWrapper.__init__() 0 4 1
A FullTest.testGetDefaultScannerParams() 0 10 1
A DummyWrapper.get_refs_vt_as_xml_str() 0 5 1
A FullTest.testGetVTs_multiple_VTs_with_custom() 0 10 1
A FullTest.testGetVTs_single_VT() 0 9 1
A DummyWrapper.get_modification_time_vt_as_xml_str() 0 6 1
A FullTest.testProgress() 0 17 1
A FullTest.testScanMultiTargetParallelWithError() 0 11 1
A FullTest.testScanMultiTargetParallel100() 0 12 1
A FullTest.testResumeTask() 0 43 1
A FullTest.testScanGetExcludeHosts() 0 20 1
A FullTest.testSetGetVtsVersion() 0 5 1
A FullTest.testSetGetVtsVersion_Error() 0 3 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like tests.testScanAndResult often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Copyright (C) 2015-2018 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: GPL-2.0-or-later
4
#
5
# This program is free software; you can redistribute it and/or
6
# modify it under the terms of the GNU General Public License
7
# as published by the Free Software Foundation; either version 2
8
# of the License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU General Public License for more details.
14
#
15
# You should have received a copy of the GNU General Public License
16
# along with this program; if not, write to the Free Software
17
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
19
""" Test module for scan runs
20
"""
21
22
from __future__ import print_function
23
24
import time
25
import unittest
26
import xml.etree.ElementTree as ET
27
import defusedxml.lxml as secET
28
from defusedxml.common import EntitiesForbidden
29
30
from ospd.ospd import OSPDaemon
31
from ospd.error import OSPDError
32
from ospd.xml import simple_response_str, get_result_xml
33
34
class Result(object):
35
    def __init__(self, type_, **kwargs):
36
        self.result_type = type_
37
        self.host = ''
38
        self.name = ''
39
        self.value = ''
40
        self.port = ''
41
        self.test_id = ''
42
        self.severity = ''
43
        self.qod = ''
44
        for name, value in kwargs.items():
45
            setattr(self, name, value)
46
47
48
class DummyWrapper(OSPDaemon):
49
    def __init__(self, results, checkresult=True):
50
        OSPDaemon.__init__(self, 'cert', 'key', 'ca')
51
        self.checkresult = checkresult
52
        self.results = results
53
54
    def check(self):
55
        return self.checkresults
56
57
    @staticmethod
58
    def get_custom_vt_as_xml_str(vt_id, custom):
59
        return '<custom><mytest>static test</mytest></custom>'
60
61
    @staticmethod
62
    def get_params_vt_as_xml_str(vt_id, vt_params):
63
        return '<params><param id="abc" type="string">' \
64
               '<name>ABC</name><description>Test ABC</description>' \
65
               '<default>yes</default></param>' \
66
               '<param id="def" type="string">' \
67
               '<name>DEF</name><description>Test DEF</description>' \
68
               '<default>no</default></param></params>'
69
70
    @staticmethod
71
    def get_refs_vt_as_xml_str(vt_id, vt_refs):
72
        response = '<refs><ref type="cve" id="CVE-2010-4480"/>' \
73
                    '<ref type="url" id="http://example.com"/></refs>'
74
        return response
75
76
    @staticmethod
77
    def get_dependencies_vt_as_xml_str(vt_id, vt_dependencies):
78
        response = '<dependencies>' \
79
                   '<dependency vt_id="1.3.6.1.4.1.25623.1.0.50282" />' \
80
                   '<dependency vt_id="1.3.6.1.4.1.25623.1.0.50283" />' \
81
                   '</dependencies>'
82
83
        return response
84
85
    @staticmethod
86
    def get_severities_vt_as_xml_str(vt_id, severities):
87
        response = '<severities><severity cvss_base="5.0" cvss_' \
88
                   'type="cvss_base_v2">AV:N/AC:L/Au:N/C:N/I:N/' \
89
                   'A:P</severity></severities>'
90
91
        return response
92
93
    @staticmethod
94
    def get_detection_vt_as_xml_str(vt_id, detection=None,
95
                                    qod_type=None, qod=None):
96
        response = '<detection qod_type="package">some detection</detection>'
97
98
        return response
99
100
    @staticmethod
101
    def get_summary_vt_as_xml_str(vt_id, summary):
102
        response = '<summary>Some summary</summary>'
103
104
        return response
105
106
    @staticmethod
107
    def get_affected_vt_as_xml_str(vt_id, affected):
108
        response = '<affected>Some affected</affected>'
109
110
        return response
111
112
    @staticmethod
113
    def get_impact_vt_as_xml_str(vt_id, impact):
114
        response = '<impact>Some impact</impact>'
115
116
        return response
117
118
    @staticmethod
119
    def get_insight_vt_as_xml_str(vt_id, insight):
120
        response = '<insight>Some insight</insight>'
121
122
        return response
123
124
    @staticmethod
125
    def get_solution_vt_as_xml_str(vt_id, solution, solution_type=None):
126
        response = '<solution>Some solution</solution>'
127
128
        return response
129
130
    @staticmethod
131
    def get_creation_time_vt_as_xml_str(vt_id, creation_time):
132
        response = '<creation_time>%s</creation_time>' % creation_time
133
134
        return response
135
136
    @staticmethod
137
    def get_modification_time_vt_as_xml_str(vt_id, modification_time):
138
        response = (
139
            '<modification_time>%s</modification_time>' % modification_time)
140
141
        return response
142
143
    def exec_scan(self, scan_id, target):
144
        time.sleep(0.01)
145
        for res in self.results:
146
            if res.result_type == 'log':
147
                self.add_scan_log(
148
                    scan_id, res.host or target, res.name, res.value, res.port)
149
            if res.result_type == 'error':
150
                self.add_scan_error(
151
                    scan_id, res.host or target, res.name, res.value, res.port)
152
            elif res.result_type == 'host-detail':
153
                self.add_scan_host_detail(
154
                    scan_id, res.host or target, res.name, res.value)
155
            elif res.result_type == 'alarm':
156
                self.add_scan_alarm(
157
                    scan_id, res.host or target, res.name, res.value,
158
                    res.port, res.test_id, res.severity, res.qod)
159
            else:
160
                raise ValueError(res.result_type)
161
162
163
class FullTest(unittest.TestCase):
164
    # TODO: There should be a lot more assert in there !
165
166
    def testGetDefaultScannerParams(self):
167
        daemon = DummyWrapper([])
168
        response = secET.fromstring(
169
            daemon.handle_command('<get_scanner_details />'))
170
        # The status of the response must be success (i.e. 200)
171
        self.assertEqual(response.get('status'), '200')
172
        # The response root element must have the correct name
173
        self.assertEqual(response.tag, 'get_scanner_details_response')
174
        # The response must contain a 'scanner_params' element
175
        self.assertIsNotNone(response.find('scanner_params'))
176
177
    def testGetDefaultHelp(self):
178
        daemon = DummyWrapper([])
179
        response = secET.fromstring(daemon.handle_command('<help />'))
180
        self.assertEqual(response.get('status'), '200')
181
        response = secET.fromstring(
182
            daemon.handle_command('<help format="xml" />'))
183
        self.assertEqual(response.get('status'), '200')
184
        self.assertEqual(response.tag, 'help_response')
185
186
    def testGetDefaultScannerVersion(self):
187
        daemon = DummyWrapper([])
188
        response = secET.fromstring(daemon.handle_command('<get_version />'))
189
        self.assertEqual(response.get('status'), '200')
190
        self.assertIsNotNone(response.find('protocol'))
191
192
    def testGetVTs_no_VT(self):
193
        daemon = DummyWrapper([])
194
        response = secET.fromstring(daemon.handle_command('<get_vts />'))
195
        self.assertEqual(response.get('status'), '200')
196
        self.assertIsNotNone(response.find('vts'))
197
198
    def testGetVTs_single_VT(self):
199
        daemon = DummyWrapper([])
200
        daemon.add_vt('1.2.3.4', 'A vulnerability test')
201
        response = secET.fromstring(daemon.handle_command('<get_vts />'))
202
        self.assertEqual(response.get('status'), '200')
203
        vts = response.find('vts')
204
        self.assertIsNotNone(vts.find('vt'))
205
        vt = vts.find('vt')
206
        self.assertEqual(vt.get('id'), '1.2.3.4')
207
208 View Code Duplication
    def testGetVTs_filter_positive(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
209
        daemon = DummyWrapper([])
210
        daemon.add_vt('1.2.3.4',
211
                      'A vulnerability test',
212
                      vt_params="a",
213
                      vt_modification_time='19000202')
214
        response = secET.fromstring(
215
            daemon.handle_command('<get_vts filter="modification_time&gt;19000201"></get_vts>'))
216
        self.assertEqual(response.get('status'), '200')
217
        vts = response.find('vts')
218
        self.assertIsNotNone(vts.find('vt'))
219
        vt = vts.find('vt')
220
        self.assertEqual(vt.get('id'), '1.2.3.4')
221
        modification_time = response.findall('vts/vt/modification_time')
222
        self.assertEqual('<modification_time>19000202</modification_time>',
223
                         ET.tostring(modification_time[0]).decode('utf-8'))
224
225 View Code Duplication
    def testGetVTs_filter_negative(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
226
        daemon = DummyWrapper([])
227
        daemon.add_vt('1.2.3.4',
228
                      'A vulnerability test',
229
                      vt_params="a",
230
                      vt_modification_time='19000202')
231
        response = secET.fromstring(
232
            daemon.handle_command('<get_vts filter="modification_time&lt;19000203"></get_vts>'))
233
        self.assertEqual(response.get('status'), '200')
234
        vts = response.find('vts')
235
        self.assertIsNotNone(vts.find('vt'))
236
        vt = vts.find('vt')
237
        self.assertEqual(vt.get('id'), '1.2.3.4')
238
        modification_time = response.findall('vts/vt/modification_time')
239
        self.assertEqual('<modification_time>19000202</modification_time>',
240
                         ET.tostring(modification_time[0]).decode('utf-8'))
241
242
    def testGetVTs_multiple_VTs(self):
243
        daemon = DummyWrapper([])
244
        daemon.add_vt('1.2.3.4', 'A vulnerability test')
245
        daemon.add_vt('some id', 'Another vulnerability test')
246
        daemon.add_vt('123456789', 'Yet another vulnerability test')
247
        response = secET.fromstring(daemon.handle_command('<get_vts />'))
248
        self.assertEqual(response.get('status'), '200')
249
        vts = response.find('vts')
250
        self.assertIsNotNone(vts.find('vt'))
251
252
    def testGetVTs_multiple_VTs_with_custom(self):
253
        daemon = DummyWrapper([])
254
        daemon.add_vt('1.2.3.4', 'A vulnerability test', custom='b')
255
        daemon.add_vt('4.3.2.1', 'Another vulnerability test with custom info',
256
                      custom='b')
257
        daemon.add_vt('123456789', 'Yet another vulnerability test',
258
                      custom='b')
259
        response = secET.fromstring(daemon.handle_command('<get_vts />'))
260
        custom = response.findall('vts/vt/custom')
261
        self.assertEqual(3, len(custom))
262
263 View Code Duplication
    def testGetVTs_VTs_with_params(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
264
        daemon = DummyWrapper([])
265
        daemon.add_vt('1.2.3.4', 'A vulnerability test',
266
                      vt_params="a", custom="b")
267
        response = secET.fromstring(
268
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
269
        # The status of the response must be success (i.e. 200)
270
        self.assertEqual(response.get('status'), '200')
271
        # The response root element must have the correct name
272
        self.assertEqual(response.tag, 'get_vts_response')
273
        # The response must contain a 'scanner_params' element
274
        self.assertIsNotNone(response.find('vts'))
275
        vt_params = response[0][0].findall('params')
276
        self.assertEqual(1, len(vt_params))
277
        custom = response[0][0].findall('custom')
278
        self.assertEqual(1, len(custom))
279
        params = response.findall('vts/vt/params/param')
280
        self.assertEqual(2, len(params))
281
282 View Code Duplication
    def testGetVTs_VTs_with_refs(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
283
        daemon = DummyWrapper([])
284
        daemon.add_vt('1.2.3.4',
285
                      'A vulnerability test',
286
                      vt_params="a",
287
                      custom="b",
288
                      vt_refs="c")
289
        response = secET.fromstring(
290
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
291
        # The status of the response must be success (i.e. 200)
292
        self.assertEqual(response.get('status'), '200')
293
        # The response root element must have the correct name
294
        self.assertEqual(response.tag, 'get_vts_response')
295
        # The response must contain a 'vts' element
296
        self.assertIsNotNone(response.find('vts'))
297
        vt_params = response[0][0].findall('params')
298
        self.assertEqual(1, len(vt_params))
299
        custom = response[0][0].findall('custom')
300
        self.assertEqual(1, len(custom))
301
        refs = response.findall('vts/vt/refs/ref')
302
        self.assertEqual(2, len(refs))
303
304
    def testGetVTs_VTs_with_dependencies(self):
305
        daemon = DummyWrapper([])
306
        daemon.add_vt('1.2.3.4',
307
                      'A vulnerability test',
308
                      vt_params="a",
309
                      custom="b",
310
                      vt_dependencies="c",)
311
        response = secET.fromstring(
312
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
313
        deps = response.findall('vts/vt/dependencies/dependency')
314
        self.assertEqual(2, len(deps))
315
316
    def testGetVTs_VTs_with_severities(self):
317
        daemon = DummyWrapper([])
318
        daemon.add_vt('1.2.3.4',
319
                      'A vulnerability test',
320
                      vt_params="a",
321
                      custom="b",
322
                      severities="c",)
323
        response = secET.fromstring(
324
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
325
        severity = response.findall('vts/vt/severities/severity')
326
        self.assertEqual(1, len(severity))
327
328
    def testGetVTs_VTs_with_detection_qodt(self):
329
        daemon = DummyWrapper([])
330
        daemon.add_vt('1.2.3.4',
331
                      'A vulnerability test',
332
                      vt_params="a",
333
                      custom="b",
334
                      detection="c",
335
                      qod_t="d")
336
        response = secET.fromstring(
337
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
338
        detection = response.findall('vts/vt/detection')
339
        self.assertEqual(1, len(detection))
340
341
    def testGetVTs_VTs_with_detection_qodv(self):
342
        daemon = DummyWrapper([])
343
        daemon.add_vt('1.2.3.4',
344
                      'A vulnerability test',
345
                      vt_params="a",
346
                      custom="b",
347
                      detection="c",
348
                      qod_v="d")
349
        response = secET.fromstring(
350
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
351
        detection = response.findall('vts/vt/detection')
352
        self.assertEqual(1, len(detection))
353
354
    def testGetVTs_VTs_with_summary(self):
355
        daemon = DummyWrapper([])
356
        daemon.add_vt('1.2.3.4',
357
                      'A vulnerability test',
358
                      vt_params="a",
359
                      custom="b",
360
                      summary="c",)
361
        response = secET.fromstring(
362
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
363
        summary = response.findall('vts/vt/summary')
364
        self.assertEqual(1, len(summary))
365
366
    def testGetVTs_VTs_with_impact(self):
367
        daemon = DummyWrapper([])
368
        daemon.add_vt('1.2.3.4',
369
                      'A vulnerability test',
370
                      vt_params="a",
371
                      custom="b",
372
                      impact="c",)
373
        response = secET.fromstring(
374
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
375
        impact = response.findall('vts/vt/impact')
376
        self.assertEqual(1, len(impact))
377
378
    def testGetVTs_VTs_with_affected(self):
379
        daemon = DummyWrapper([])
380
        daemon.add_vt('1.2.3.4',
381
                      'A vulnerability test',
382
                      vt_params="a",
383
                      custom="b",
384
                      affected="c",)
385
        response = secET.fromstring(
386
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
387
        affect = response.findall('vts/vt/affected')
388
        self.assertEqual(1, len(affect))
389
390
    def testGetVTs_VTs_with_insight(self):
391
        daemon = DummyWrapper([])
392
        daemon.add_vt('1.2.3.4',
393
                      'A vulnerability test',
394
                      vt_params="a",
395
                      custom="b",
396
                      insight="c",)
397
        response = secET.fromstring(
398
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
399
        insight = response.findall('vts/vt/insight')
400
        self.assertEqual(1, len(insight))
401
402
    def testGetVTs_VTs_with_solution(self):
403
        daemon = DummyWrapper([])
404
        daemon.add_vt('1.2.3.4',
405
                      'A vulnerability test',
406
                      vt_params="a",
407
                      custom="b",
408
                      solution="c",
409
                      solution_t="d")
410
        response = secET.fromstring(
411
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
412
        solution = response.findall('vts/vt/solution')
413
        self.assertEqual(1, len(solution))
414
415
    def testGetVTs_VTs_with_ctime(self):
416
        daemon = DummyWrapper([])
417
        daemon.add_vt('1.2.3.4',
418
                      'A vulnerability test',
419
                      vt_params="a",
420
                      vt_creation_time='01-01-1900')
421
        response = secET.fromstring(
422
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
423
        creation_time = response.findall('vts/vt/creation_time')
424
425
        self.assertEqual('<creation_time>01-01-1900</creation_time>',
426
                         ET.tostring(creation_time[0]).decode('utf-8'))
427
428
    def testGetVTs_VTs_with_mtime(self):
429
        daemon = DummyWrapper([])
430
        daemon.add_vt('1.2.3.4',
431
                      'A vulnerability test',
432
                      vt_params="a",
433
                      vt_modification_time='02-01-1900')
434
        response = secET.fromstring(
435
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
436
        modification_time = response.findall('vts/vt/modification_time')
437
438
        self.assertEqual('<modification_time>02-01-1900</modification_time>',
439
                         ET.tostring(modification_time[0]).decode('utf-8'))
440
441
    def testScanWithError(self):
442
        daemon = DummyWrapper([
443
            Result('error', value='something went wrong'),
444
        ])
445
446
        response = secET.fromstring(
447
            daemon.handle_command('<start_scan target="localhost" ports="80, '
448
                                  '443"><scanner_params /></start_scan>'))
449
        scan_id = response.findtext('id')
450
451
        finished = False
452
        while not finished:
453
            response = secET.fromstring(
454
                daemon.handle_command(
455
                    '<get_scans scan_id="%s" details="1"/>' % scan_id))
456
            scans = response.findall('scan')
457
            self.assertEqual(1, len(scans))
458
            scan = scans[0]
459
            status = scan.get('status')
460
            if status == "init" or status == "running":
461
                self.assertEqual('0', scan.get('end_time'))
462
                time.sleep(.010)
463
            else:
464
                finished = True
465
        response = secET.fromstring(daemon.handle_command(
466
            '<get_scans scan_id="%s" details="1"/>' % scan_id))
467
        self.assertEqual(response.findtext('scan/results/result'),
468
                         'something went wrong')
469
470
        response = secET.fromstring(
471
            daemon.handle_command('<delete_scan scan_id="%s" />' % scan_id))
472
        self.assertEqual(response.get('status'), '200')
473
474
    def testGetScanPop(self):
475
        daemon = DummyWrapper([
476
            Result('host-detail', value='Some Host Detail'),
477
        ])
478
479
        response = secET.fromstring(daemon.handle_command(
480
            '<start_scan target="localhost" ports="80, 443">'
481
            '<scanner_params /></start_scan>'))
482
        scan_id = response.findtext('id')
483
        time.sleep(1)
484
485
        response = secET.fromstring(
486
            daemon.handle_command('<get_scans scan_id="%s"/>' % scan_id))
487
        self.assertEqual(response.findtext('scan/results/result'),
488
                         'Some Host Detail')
489
490
        response = secET.fromstring(
491
            daemon.handle_command(
492
                '<get_scans scan_id="%s" pop_results="1"/>' % scan_id))
493
        self.assertEqual(response.findtext('scan/results/result'),
494
                         'Some Host Detail')
495
496
        response = secET.fromstring(
497
            daemon.handle_command(
498
                '<get_scans details="0" pop_results="1"/>'))
499
        self.assertEqual(response.findtext('scan/results/result'),
500
                         None)
501
502
        response = secET.fromstring(
503
            daemon.handle_command(
504
                '<get_scans scan_id="%s" pop_results="1"/>' % scan_id))
505
506
        while True:
507
            response = secET.fromstring(
508
                daemon.handle_command(
509
                    '<get_scans scan_id="%s" details="1"/>' % scan_id))
510
            scans = response.findall('scan')
511
            self.assertEqual(1, len(scans))
512
            scan = scans[0]
513
            if scan.get('status') == "stopped":
514
                break
515
516
        scans = response.findall('scan')
517
        scan = scans[0]
518
        self.assertTrue(response.findtext('scan/results/result') in
519
                            ['Scan process failure.', 'Scan stopped.'])
520
521
        response = secET.fromstring(
522
            daemon.handle_command('<delete_scan scan_id="%s" />' % scan_id))
523
        self.assertEqual(response.get('status'), '200')
524
525
    def testStopScan(self):
526
        daemon = DummyWrapper([])
527
        response = secET.fromstring(
528
            daemon.handle_command('<start_scan '
529
                                  'target="localhost" ports="80, 443">'
530
                                  '<scanner_params /></start_scan>'))
531
        scan_id = response.findtext('id')
532
533
        # Depending on the sistem this test can end with a race condition
534
        # because the scanner is already stopped when the <stop_scan>
535
        # command is run.
536
        time.sleep(3)
537
        cmd = secET.fromstring('<stop_scan scan_id="%s" />' % scan_id)
538
        self.assertRaises(OSPDError, daemon.handle_stop_scan_command, cmd)
539
540
        cmd = secET.fromstring('<stop_scan />')
541
        self.assertRaises(OSPDError, daemon.handle_stop_scan_command, cmd)
542
543 View Code Duplication
    def testScanWithVTs(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
544
        daemon = DummyWrapper([])
545
        cmd = secET.fromstring('<start_scan '
546
                               'target="localhost" ports="80, 443">'
547
                               '<scanner_params /><vt_selection />'
548
                               '</start_scan>')
549
        self.assertRaises(OSPDError, daemon.handle_start_scan_command, cmd)
550
551
        # With one VT, without params
552
        response = secET.fromstring(
553
            daemon.handle_command('<start_scan '
554
                                  'target="localhost" ports="80, 443">'
555
                                  '<scanner_params /><vt_selection>'
556
                                  '<vt_single id="1.2.3.4" />'
557
                                  '</vt_selection></start_scan>'))
558
        scan_id = response.findtext('id')
559
        time.sleep(0.01)
560
        self.assertEqual(
561
            daemon.get_scan_vts(scan_id), {'1.2.3.4': {}, 'vt_groups': []})
562
        self.assertNotEqual(daemon.get_scan_vts(scan_id), {'1.2.3.6': {}})
563
564
        # With out VTS
565
        response = secET.fromstring(
566
            daemon.handle_command('<start_scan '
567
                                  'target="localhost" ports="80, 443">'
568
                                  '<scanner_params /></start_scan>'))
569
        scan_id = response.findtext('id')
570
        time.sleep(0.01)
571
        self.assertEqual(daemon.get_scan_vts(scan_id), {})
572
573 View Code Duplication
    def testScanWithVTs_and_param(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
574
        daemon = DummyWrapper([])
575
576
        # Raise because no vt_param id attribute
577
        cmd = secET.fromstring('<start_scan '
578
                               'target="localhost" ports="80, 443">'
579
                               '<scanner_params /><vt_selection><vt_si'
580
                               'ngle id="1234"><vt_value>200</vt_value>'
581
                               '</vt_single></vt_selection></start_scan>')
582
        self.assertRaises(OSPDError, daemon.handle_start_scan_command, cmd)
583
584
        # No error
585
        response = secET.fromstring(
586
            daemon.handle_command('<start_scan '
587
                                  'target="localhost" ports="80, 443">'
588
                                  '<scanner_params /><vt_selection><vt'
589
                                  '_single id="1234"><vt_value id="ABC">200'
590
                                  '</vt_value></vt_single></vt_selection>'
591
                                  '</start_scan>'))
592
        scan_id = response.findtext('id')
593
        time.sleep(0.01)
594
        self.assertEqual(daemon.get_scan_vts(scan_id),
595
                         {'1234': {'ABC': '200'}, 'vt_groups': []})
596
597
        # Raise because no vtgroup filter attribute
598
        cmd = secET.fromstring('<start_scan '
599
                               'target="localhost" ports="80, 443">'
600
                               '<scanner_params /><vt_selection><vt_group/>'
601
                               '</vt_selection></start_scan>')
602
        self.assertRaises(OSPDError, daemon.handle_start_scan_command, cmd)
603
604
        # No error
605
        response = secET.fromstring(
606
            daemon.handle_command('<start_scan '
607
                                  'target="localhost" ports="80, 443">'
608
                                  '<scanner_params /><vt_selection>'
609
                                  '<vt_group filter="a"/>'
610
                                  '</vt_selection></start_scan>'))
611
        scan_id = response.findtext('id')
612
        time.sleep(0.01)
613
        self.assertEqual(daemon.get_scan_vts(scan_id),
614
                         {'vt_groups': ['a']})
615
616
    def testBillonLaughs(self):
617
        daemon = DummyWrapper([])
618
        lol = '<?xml version="1.0"?>' \
619
              '<!DOCTYPE lolz [' \
620
              ' <!ENTITY lol "lol">' \
621
              ' <!ELEMENT lolz (#PCDATA)>' \
622
              ' <!ENTITY lol1 "&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;">' \
623
              ' <!ENTITY lol2 "&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;">' \
624
              ' <!ENTITY lol3 "&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;">' \
625
              ' <!ENTITY lol4 "&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;">' \
626
              ' <!ENTITY lol5 "&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;">' \
627
              ' <!ENTITY lol6 "&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;">' \
628
              ' <!ENTITY lol7 "&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;">' \
629
              ' <!ENTITY lol8 "&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;">' \
630
              ' <!ENTITY lol9 "&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;">' \
631
              ']>'
632
        self.assertRaises(EntitiesForbidden, daemon.handle_command, lol)
633
634
    def testScanMultiTarget(self):
635
        daemon = DummyWrapper([])
636
        response = secET.fromstring(
637
            daemon.handle_command('<start_scan>'
638
                                  '<scanner_params /><vts><vt id="1.2.3.4" />'
639
                                  '</vts>'
640
                                  '<targets><target>'
641
                                  '<hosts>localhosts</hosts>'
642
                                  '<ports>80,443</ports>'
643
                                  '</target>'
644
                                  '<target><hosts>192.168.0.0/24</hosts>'
645
                                  '<ports>22</ports></target></targets>'
646
                                  '</start_scan>'))
647
        self.assertEqual(response.get('status'), '200')
648
649
    def testMultiTargetWithCredentials(self):
650
        daemon = DummyWrapper([])
651
        response = secET.fromstring(
652
            daemon.handle_command(
653
                '<start_scan>'
654
                '<scanner_params /><vts><vt id="1.2.3.4" />'
655
                '</vts>'
656
                '<targets><target><hosts>localhosts</hosts>'
657
                '<ports>80,443</ports></target><target>'
658
                '<hosts>192.168.0.0/24</hosts><ports>22'
659
                '</ports><credentials>'
660
                '<credential type="up" service="ssh" port="22">'
661
                '<username>scanuser</username>'
662
                '<password>mypass</password>'
663
                '</credential><credential type="up" service="smb">'
664
                '<username>smbuser</username>'
665
                '<password>mypass</password></credential>'
666
                '</credentials>'
667
                '</target></targets>'
668
                '</start_scan>'))
669
670
        self.assertEqual(response.get('status'), '200')
671
        cred_dict = {'ssh': {'type': 'up', 'password':
672
                             'mypass', 'port': '22', 'username': 'scanuser'},
673
                     'smb': {'type': 'up', 'password': 'mypass',
674
                             'username': 'smbuser'}}
675
        scan_id = response.findtext('id')
676
        response = daemon.get_scan_credentials(scan_id, "192.168.0.0/24")
677
        self.assertEqual(response, cred_dict)
678
679
    def testScanGetTarget(self):
680
        daemon = DummyWrapper([])
681
        response = secET.fromstring(
682
            daemon.handle_command('<start_scan>'
683
                                  '<scanner_params /><vts><vt id="1.2.3.4" />'
684
                                  '</vts>'
685
                                  '<targets><target>'
686
                                  '<hosts>localhosts</hosts>'
687
                                  '<ports>80,443</ports>'
688
                                  '</target>'
689
                                  '<target><hosts>192.168.0.0/24</hosts>'
690
                                  '<ports>22</ports></target></targets>'
691
                                  '</start_scan>'))
692
        scan_id = response.findtext('id')
693
        response = secET.fromstring(
694
            daemon.handle_command('<get_scans scan_id="%s"/>' % scan_id))
695
        scan_res = response.find('scan')
696
        self.assertEqual(scan_res.get('target'), 'localhosts,192.168.0.0/24')
697
698
    def testScanGetExcludeHosts(self):
699
        daemon = DummyWrapper([])
700
        response = secET.fromstring(
701
            daemon.handle_command('<start_scan>'
702
                                  '<scanner_params /><vts><vt id="1.2.3.4" />'
703
                                  '</vts>'
704
                                  '<targets><target>'
705
                                  '<hosts>192.168.10.20-25</hosts>'
706
                                  '<ports>80,443</ports>'
707
                                  '<exclude_hosts>192.168.10.23-24'
708
                                  '</exclude_hosts>'
709
                                  '</target>'
710
                                  '<target><hosts>192.168.0.0/24</hosts>'
711
                                  '<ports>22</ports></target>'
712
                                  '</targets>'
713
                                  '</start_scan>'))
714
        scan_id = response.findtext('id')
715
        time.sleep(1)
716
        finished = daemon.get_scan_finished_hosts(scan_id)
717
        self.assertEqual(finished, ['192.168.10.23', '192.168.10.24'])
718
719
    def testScanMultiTargetParallelWithError(self):
720
        daemon = DummyWrapper([])
721
        cmd = secET.fromstring('<start_scan parallel="100a">'
722
                               '<scanner_params />'
723
                               '<targets><target>'
724
                               '<hosts>localhosts</hosts>'
725
                               '<ports>22</ports>'
726
                               '</target></targets>'
727
                               '</start_scan>')
728
        time.sleep(1)
729
        self.assertRaises(OSPDError, daemon.handle_start_scan_command, cmd)
730
731
    def testScanMultiTargetParallel100(self):
732
        daemon = DummyWrapper([])
733
        response = secET.fromstring(
734
            daemon.handle_command('<start_scan parallel="100">'
735
                                  '<scanner_params />'
736
                                  '<targets><target>'
737
                                  '<hosts>localhosts</hosts>'
738
                                  '<ports>22</ports>'
739
                                  '</target></targets>'
740
                                  '</start_scan>'))
741
        time.sleep(1)
742
        self.assertEqual(response.get('status'), '200')
743
744
    def testProgress(self):
745
        daemon = DummyWrapper([])
746
        response = secET.fromstring(
747
            daemon.handle_command('<start_scan parallel="2">'
748
                                  '<scanner_params />'
749
                                  '<targets><target>'
750
                                  '<hosts>localhost1</hosts>'
751
                                  '<ports>22</ports>'
752
                                  '</target><target>'
753
                                  '<hosts>localhost2</hosts>'
754
                                  '<ports>22</ports>'
755
                                  '</target></targets>'
756
                                  '</start_scan>'))
757
        scan_id = response.findtext('id')
758
        daemon.set_scan_target_progress(scan_id, 'localhost1', 'localhost1', 75)
759
        daemon.set_scan_target_progress(scan_id, 'localhost2', 'localhost2', 25)
760
        self.assertEqual(daemon.calculate_progress(scan_id), 50)
761
762
    def testSetGetVtsVersion(self):
763
        daemon = DummyWrapper([])
764
        daemon.set_vts_version('1234')
765
        version = daemon.get_vts_version()
766
        self.assertEqual('1234', version)
767
768
    def testSetGetVtsVersion_Error(self):
769
        daemon = DummyWrapper([])
770
        self.assertRaises(TypeError, daemon.set_vts_version)
771
772
    def testResumeTask(self):
773
        daemon = DummyWrapper([
774
            Result('host-detail', host='localhost', value='Some Host Detail'),
775
            Result('host-detail', host='localhost', value='Some Host Detail2'),
776
        ])
777
        response = secET.fromstring(
778
            daemon.handle_command('<start_scan parallel="2">'
779
                                  '<scanner_params />'
780
                                  '<targets><target>'
781
                                  '<hosts>localhost</hosts>'
782
                                  '<ports>22</ports>'
783
                                  '</target></targets>'
784
                                  '</start_scan>'))
785
        scan_id = response.findtext('id')
786
787
        time.sleep(3)
788
        cmd = secET.fromstring('<stop_scan scan_id="%s" />' % scan_id)
789
        self.assertRaises(OSPDError, daemon.handle_stop_scan_command, cmd)
790
791
        response = secET.fromstring(daemon.handle_command(
792
            '<get_scans scan_id="%s" details="1"/>' % scan_id))
793
        result = response.findall('scan/results/result')
794
        self.assertEqual(len(result), 4)
795
796
        # Resume the task
797
        cmd = '<start_scan scan_id="%s" target="localhost" ports="80, 443"><scanner_params /></start_scan>' % scan_id
798
        response = secET.fromstring(
799
            daemon.handle_command(cmd))
800
801
        # Check unfinished host
802
        self.assertEqual(response.findtext('id'), scan_id)
803
        self.assertEqual(daemon.get_scan_unfinished_hosts(scan_id), ['localhost'])
804
        # Finished the host and check unfinished again.
805
        daemon.set_scan_host_finished(scan_id, "localhost", "localhost")
806
        self.assertEqual(daemon.get_scan_unfinished_hosts(scan_id), [])
807
        # Check finished hosts
808
        self.assertEqual(daemon.scan_collection.get_hosts_finished(scan_id), ['localhost'])
809
810
        # Check if the result was removed.
811
        response = secET.fromstring(daemon.handle_command(
812
            '<get_scans scan_id="%s" details="1"/>' % scan_id))
813
        result = response.findall('scan/results/result')
814
        self.assertEqual(len(result), 2)
815