Passed
Pull Request — master (#91)
by Juan José
02:45
created

DummyWrapper.get_impact_vt_as_xml_str()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 2
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
# Copyright (C) 2015-2018 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: GPL-2.0-or-later
4
#
5
# This program is free software; you can redistribute it and/or
6
# modify it under the terms of the GNU General Public License
7
# as published by the Free Software Foundation; either version 2
8
# of the License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU General Public License for more details.
14
#
15
# You should have received a copy of the GNU General Public License
16
# along with this program; if not, write to the Free Software
17
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
19
""" Test module for scan runs
20
"""
21
22
from __future__ import print_function
23
24
import time
25
import unittest
26
import xml.etree.ElementTree as ET
27
import defusedxml.lxml as secET
28
from defusedxml.common import EntitiesForbidden
29
30
from ospd.ospd import OSPDaemon, OSPDError
31
32
33
class Result(object):
34
    def __init__(self, type_, **kwargs):
35
        self.result_type = type_
36
        self.host = ''
37
        self.name = ''
38
        self.value = ''
39
        self.port = ''
40
        self.test_id = ''
41
        self.severity = ''
42
        self.qod = ''
43
        for name, value in kwargs.items():
44
            setattr(self, name, value)
45
46
47
class DummyWrapper(OSPDaemon):
48
    def __init__(self, results, checkresult=True):
49
        OSPDaemon.__init__(self, 'cert', 'key', 'ca')
50
        self.checkresult = checkresult
51
        self.results = results
52
53
    def check(self):
54
        return self.checkresults
55
56
    @staticmethod
57
    def get_custom_vt_as_xml_str(vt_id, custom):
58
        return '<custom><mytest>static test</mytest></custom>'
59
60
    @staticmethod
61
    def get_params_vt_as_xml_str(vt_id, vt_params):
62
        return '<vt_params><vt_param id="abc" type="string">' \
63
               '<name>ABC</name><description>Test ABC</description>' \
64
               '<default>yes</default></vt_param>' \
65
               '<vt_param id="def" type="string">' \
66
               '<name>DEF</name><description>Test DEF</description>' \
67
               '<default>no</default></vt_param></vt_params>'
68
69
    @staticmethod
70
    def get_refs_vt_as_xml_str(vt_id, vt_refs):
71
        response = '<vt_refs><ref type="cve" id="CVE-2010-4480"/>' \
72
                    '<ref type="url" id="http://example.com"/></vt_refs>'
73
        return response
74
75
    @staticmethod
76
    def get_dependencies_vt_as_xml_str(vt_id, vt_dependencies):
77
        response = '<dependencies>' \
78
                   '<dependency vt_id="1.3.6.1.4.1.25623.1.0.50282" />' \
79
                   '<dependency vt_id="1.3.6.1.4.1.25623.1.0.50283" />' \
80
                   '</dependencies>'
81
82
        return response
83
84
    @staticmethod
85
    def get_severities_vt_as_xml_str(vt_id, severities):
86
        response = '<severities><severity cvss_base="5.0" cvss_' \
87
                   'type="cvss_base_v2">AV:N/AC:L/Au:N/C:N/I:N/' \
88
                   'A:P</severity></severities>'
89
90
        return response
91
92
    @staticmethod
93
    def get_detection_vt_as_xml_str(vt_id, detection=None,
94
                                    qod_type=None, qod=None):
95
        response = '<detection qod_type="package">some detection</detection>'
96
97
        return response
98
99
    @staticmethod
100
    def get_summary_vt_as_xml_str(vt_id, summary):
101
        response = '<summary>Some summary</summary>'
102
103
        return response
104
105
    @staticmethod
106
    def get_affected_vt_as_xml_str(vt_id, affected):
107
        response = '<affected>Some affected</affected>'
108
109
        return response
110
111
    @staticmethod
112
    def get_impact_vt_as_xml_str(vt_id, impact):
113
        response = '<impact>Some impact</impact>'
114
115
        return response
116
117
    @staticmethod
118
    def get_insight_vt_as_xml_str(vt_id, insight):
119
        response = '<insight>Some insight</insight>'
120
121
        return response
122
123
    @staticmethod
124
    def get_solution_vt_as_xml_str(vt_id, solution, solution_type=None):
125
        response = '<solution>Some solution</solution>'
126
127
        return response
128
129
    @staticmethod
130
    def get_creation_time_vt_as_xml_str(vt_id, creation_time):
131
        response = '<creation_time>%s</creation_time>' % creation_time
132
133
        return response
134
135
    @staticmethod
136
    def get_modification_time_vt_as_xml_str(vt_id, modification_time):
137
        response = (
138
            '<modification_time>%s</modification_time>' % modification_time)
139
140
        return response
141
142
    def exec_scan(self, scan_id, target):
143
        time.sleep(0.01)
144
        for res in self.results:
145
            if res.result_type == 'log':
146
                self.add_scan_log(
147
                    scan_id, res.host or target, res.name, res.value, res.port)
148
            if res.result_type == 'error':
149
                self.add_scan_error(
150
                    scan_id, res.host or target, res.name, res.value, res.port)
151
            elif res.result_type == 'host-detail':
152
                self.add_scan_host_detail(
153
                    scan_id, res.host or target, res.name, res.value)
154
            elif res.result_type == 'alarm':
155
                self.add_scan_alarm(
156
                    scan_id, res.host or target, res.name, res.value,
157
                    res.port, res.test_id, res.severity, res.qod)
158
            else:
159
                raise ValueError(res.result_type)
160
161
162
class FullTest(unittest.TestCase):
163
    # TODO: There should be a lot more assert in there !
164
165
    def testGetDefaultScannerParams(self):
166
        daemon = DummyWrapper([])
167
        response = secET.fromstring(
168
            daemon.handle_command('<get_scanner_details />'))
169
        # The status of the response must be success (i.e. 200)
170
        self.assertEqual(response.get('status'), '200')
171
        # The response root element must have the correct name
172
        self.assertEqual(response.tag, 'get_scanner_details_response')
173
        # The response must contain a 'scanner_params' element
174
        self.assertIsNotNone(response.find('scanner_params'))
175
176
    def testGetDefaultHelp(self):
177
        daemon = DummyWrapper([])
178
        response = secET.fromstring(daemon.handle_command('<help />'))
179
        self.assertEqual(response.get('status'), '200')
180
        response = secET.fromstring(
181
            daemon.handle_command('<help format="xml" />'))
182
        self.assertEqual(response.get('status'), '200')
183
        self.assertEqual(response.tag, 'help_response')
184
185
    def testGetDefaultScannerVersion(self):
186
        daemon = DummyWrapper([])
187
        response = secET.fromstring(daemon.handle_command('<get_version />'))
188
        self.assertEqual(response.get('status'), '200')
189
        self.assertIsNotNone(response.find('protocol'))
190
191
    def testGetVTs_no_VT(self):
192
        daemon = DummyWrapper([])
193
        response = secET.fromstring(daemon.handle_command('<get_vts />'))
194
        self.assertEqual(response.get('status'), '200')
195
        self.assertIsNotNone(response.find('vts'))
196
197
    def testGetVTs_single_VT(self):
198
        daemon = DummyWrapper([])
199
        daemon.add_vt('1.2.3.4', 'A vulnerability test')
200
        response = secET.fromstring(daemon.handle_command('<get_vts />'))
201
        self.assertEqual(response.get('status'), '200')
202
        vts = response.find('vts')
203
        self.assertIsNotNone(vts.find('vt'))
204
        vt = vts.find('vt')
205
        self.assertEqual(vt.get('id'), '1.2.3.4')
206
207
    def testGetVTs_multiple_VTs(self):
208
        daemon = DummyWrapper([])
209
        daemon.add_vt('1.2.3.4', 'A vulnerability test')
210
        daemon.add_vt('some id', 'Another vulnerability test')
211
        daemon.add_vt('123456789', 'Yet another vulnerability test')
212
        response = secET.fromstring(daemon.handle_command('<get_vts />'))
213
        self.assertEqual(response.get('status'), '200')
214
        vts = response.find('vts')
215
        self.assertIsNotNone(vts.find('vt'))
216
217
    def testGetVTs_multiple_VTs_with_custom(self):
218
        daemon = DummyWrapper([])
219
        daemon.add_vt('1.2.3.4', 'A vulnerability test', custom='b')
220
        daemon.add_vt('4.3.2.1', 'Another vulnerability test with custom info',
221
                      custom='b')
222
        daemon.add_vt('123456789', 'Yet another vulnerability test',
223
                      custom='b')
224
        response = secET.fromstring(daemon.handle_command('<get_vts />'))
225
        custom = response.findall('vts/vt/custom')
226
        self.assertEqual(3, len(custom))
227
228 View Code Duplication
    def testGetVTs_VTs_with_params(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
229
        daemon = DummyWrapper([])
230
        daemon.add_vt('1.2.3.4', 'A vulnerability test',
231
                      vt_params="a", custom="b")
232
        response = secET.fromstring(
233
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
234
        # The status of the response must be success (i.e. 200)
235
        self.assertEqual(response.get('status'), '200')
236
        # The response root element must have the correct name
237
        self.assertEqual(response.tag, 'get_vts_response')
238
        # The response must contain a 'scanner_params' element
239
        self.assertIsNotNone(response.find('vts'))
240
        vt_params = response[0][0].findall('vt_params')
241
        self.assertEqual(1, len(vt_params))
242
        custom = response[0][0].findall('custom')
243
        self.assertEqual(1, len(custom))
244
        params = response.findall('vts/vt/vt_params/vt_param')
245
        self.assertEqual(2, len(params))
246
247 View Code Duplication
    def testGetVTs_VTs_with_refs(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
248
        daemon = DummyWrapper([])
249
        daemon.add_vt('1.2.3.4',
250
                      'A vulnerability test',
251
                      vt_params="a",
252
                      custom="b",
253
                      vt_refs="c")
254
        response = secET.fromstring(
255
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
256
        # The status of the response must be success (i.e. 200)
257
        self.assertEqual(response.get('status'), '200')
258
        # The response root element must have the correct name
259
        self.assertEqual(response.tag, 'get_vts_response')
260
        # The response must contain a 'vts' element
261
        self.assertIsNotNone(response.find('vts'))
262
        vt_params = response[0][0].findall('vt_params')
263
        self.assertEqual(1, len(vt_params))
264
        custom = response[0][0].findall('custom')
265
        self.assertEqual(1, len(custom))
266
        refs = response.findall('vts/vt/vt_refs/ref')
267
        self.assertEqual(2, len(refs))
268
269
    def testGetVTs_VTs_with_dependencies(self):
270
        daemon = DummyWrapper([])
271
        daemon.add_vt('1.2.3.4',
272
                      'A vulnerability test',
273
                      vt_params="a",
274
                      custom="b",
275
                      vt_dependencies="c",)
276
        response = secET.fromstring(
277
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
278
        deps = response.findall('vts/vt/dependencies/dependency')
279
        self.assertEqual(2, len(deps))
280
281
    def testGetVTs_VTs_with_severities(self):
282
        daemon = DummyWrapper([])
283
        daemon.add_vt('1.2.3.4',
284
                      'A vulnerability test',
285
                      vt_params="a",
286
                      custom="b",
287
                      severities="c",)
288
        response = secET.fromstring(
289
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
290
        severity = response.findall('vts/vt/severities/severity')
291
        self.assertEqual(1, len(severity))
292
293
    def testGetVTs_VTs_with_detection_qodt(self):
294
        daemon = DummyWrapper([])
295
        daemon.add_vt('1.2.3.4',
296
                      'A vulnerability test',
297
                      vt_params="a",
298
                      custom="b",
299
                      detection="c",
300
                      qod_t="d")
301
        response = secET.fromstring(
302
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
303
        detection = response.findall('vts/vt/detection')
304
        self.assertEqual(1, len(detection))
305
306
    def testGetVTs_VTs_with_detection_qodv(self):
307
        daemon = DummyWrapper([])
308
        daemon.add_vt('1.2.3.4',
309
                      'A vulnerability test',
310
                      vt_params="a",
311
                      custom="b",
312
                      detection="c",
313
                      qod_v="d")
314
        response = secET.fromstring(
315
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
316
        detection = response.findall('vts/vt/detection')
317
        self.assertEqual(1, len(detection))
318
319
    def testGetVTs_VTs_with_summary(self):
320
        daemon = DummyWrapper([])
321
        daemon.add_vt('1.2.3.4',
322
                      'A vulnerability test',
323
                      vt_params="a",
324
                      custom="b",
325
                      summary="c",)
326
        response = secET.fromstring(
327
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
328
        summary = response.findall('vts/vt/summary')
329
        self.assertEqual(1, len(summary))
330
331
    def testGetVTs_VTs_with_impact(self):
332
        daemon = DummyWrapper([])
333
        daemon.add_vt('1.2.3.4',
334
                      'A vulnerability test',
335
                      vt_params="a",
336
                      custom="b",
337
                      impact="c",)
338
        response = secET.fromstring(
339
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
340
        impact = response.findall('vts/vt/impact')
341
        self.assertEqual(1, len(impact))
342
343
    def testGetVTs_VTs_with_affected(self):
344
        daemon = DummyWrapper([])
345
        daemon.add_vt('1.2.3.4',
346
                      'A vulnerability test',
347
                      vt_params="a",
348
                      custom="b",
349
                      affected="c",)
350
        response = secET.fromstring(
351
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
352
        affect = response.findall('vts/vt/affected')
353
        self.assertEqual(1, len(affect))
354
355
    def testGetVTs_VTs_with_insight(self):
356
        daemon = DummyWrapper([])
357
        daemon.add_vt('1.2.3.4',
358
                      'A vulnerability test',
359
                      vt_params="a",
360
                      custom="b",
361
                      insight="c",)
362
        response = secET.fromstring(
363
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
364
        insight = response.findall('vts/vt/insight')
365
        self.assertEqual(1, len(insight))
366
367
    def testGetVTs_VTs_with_solution(self):
368
        daemon = DummyWrapper([])
369
        daemon.add_vt('1.2.3.4',
370
                      'A vulnerability test',
371
                      vt_params="a",
372
                      custom="b",
373
                      solution="c",
374
                      solution_t="d")
375
        response = secET.fromstring(
376
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
377
        solution = response.findall('vts/vt/solution')
378
        self.assertEqual(1, len(solution))
379
380
    def testGetVTs_VTs_with_ctime(self):
381
        daemon = DummyWrapper([])
382
        daemon.add_vt('1.2.3.4',
383
                      'A vulnerability test',
384
                      vt_params="a",
385
                      vt_creation_time='01-01-1900')
386
        response = secET.fromstring(
387
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
388
        creation_time = response.findall('vts/vt/creation_time')
389
390
        self.assertEqual('<creation_time>01-01-1900</creation_time>',
391
                         ET.tostring(creation_time[0]).decode('utf-8'))
392
393
    def testGetVTs_VTs_with_mtime(self):
394
        daemon = DummyWrapper([])
395
        daemon.add_vt('1.2.3.4',
396
                      'A vulnerability test',
397
                      vt_params="a",
398
                      vt_modification_time='02-01-1900')
399
        response = secET.fromstring(
400
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
401
        modification_time = response.findall('vts/vt/modification_time')
402
403
        self.assertEqual('<modification_time>02-01-1900</modification_time>',
404
                         ET.tostring(modification_time[0]).decode('utf-8'))
405
406
    def testiScanWithError(self):
407
        daemon = DummyWrapper([
408
            Result('error', value='something went wrong'),
409
        ])
410
411
        response = secET.fromstring(
412
            daemon.handle_command('<start_scan target="localhost" ports="80, '
413
                                  '443"><scanner_params /></start_scan>'))
414
        scan_id = response.findtext('id')
415
        finished = False
416
        while not finished:
417
            response = secET.fromstring(
418
                daemon.handle_command(
419
                    '<get_scans scan_id="%s" details="0"/>' % scan_id))
420
            scans = response.findall('scan')
421
            self.assertEqual(1, len(scans))
422
            scan = scans[0]
423
            if int(scan.get('progress')) != 100:
424
                self.assertEqual('0', scan.get('end_time'))
425
                time.sleep(.010)
426
            else:
427
                finished = True
428
        response = secET.fromstring(
429
            daemon.handle_command('<get_scans scan_id="%s"/>' % scan_id))
430
        response = secET.fromstring(daemon.handle_command('<get_scans />'))
431
        response = secET.fromstring(
432
            daemon.handle_command(
433
                '<get_scans scan_id="%s" details="1"/>' % scan_id))
434
        self.assertEqual(response.findtext('scan/results/result'),
435
                         'something went wrong')
436
437
        response = secET.fromstring(
438
            daemon.handle_command('<delete_scan scan_id="%s" />' % scan_id))
439
        self.assertEqual(response.get('status'), '200')
440
441
    def testGetScanPop(self):
442
        daemon = DummyWrapper([
443
            Result('host-detail', value='Some Host Detail'),
444
        ])
445
446
        response = secET.fromstring(daemon.handle_command(
447
            '<start_scan target="localhost" ports="80, 443">'
448
            '<scanner_params /></start_scan>'))
449
        scan_id = response.findtext('id')
450
        time.sleep(1)
451
452
        response = secET.fromstring(
453
            daemon.handle_command('<get_scans scan_id="%s"/>' % scan_id))
454
        self.assertEqual(response.findtext('scan/results/result'),
455
                         'Some Host Detail')
456
457
        response = secET.fromstring(
458
            daemon.handle_command(
459
                '<get_scans details="0" pop_results="1"/>'))
460
        self.assertEqual(response.findtext('scan/results/result'),
461
                         None)
462
463
        response = secET.fromstring(
464
            daemon.handle_command(
465
                '<get_scans scan_id="%s" pop_results="1"/>' % scan_id))
466
        self.assertEqual(response.findtext('scan/results/result'),
467
                         'Some Host Detail')
468
469
        response = secET.fromstring(
470
            daemon.handle_command(
471
                '<get_scans scan_id="%s" pop_results="1"/>' % scan_id))
472
        self.assertNotEqual(response.findtext('scan/results/result'),
473
                            'Some Host Detail')
474
        self.assertEqual(response.findtext('scan/results/result'),
475
                         None)
476
477
        while True:
478
            response = secET.fromstring(
479
                daemon.handle_command(
480
                    '<get_scans scan_id="%s" details="0"/>' % scan_id))
481
            scans = response.findall('scan')
482
            self.assertEqual(1, len(scans))
483
            scan = scans[0]
484
            if int(scan.get('progress')) == 100:
485
                break
486
487
        response = secET.fromstring(
488
            daemon.handle_command('<delete_scan scan_id="%s" />' % scan_id))
489
        self.assertEqual(response.get('status'), '200')
490
491
    def testStopScan(self):
492
        daemon = DummyWrapper([])
493
        response = secET.fromstring(
494
            daemon.handle_command('<start_scan '
495
                                  'target="localhost" ports="80, 443">'
496
                                  '<scanner_params /></start_scan>'))
497
        scan_id = response.findtext('id')
498
499
        # Depending on the sistem this test can end with a race condition
500
        # because the scanner is already stopped when the <stop_scan>
501
        # command is run.
502
        time.sleep(3)
503
        cmd = secET.fromstring('<stop_scan scan_id="%s" />' % scan_id)
504
        self.assertRaises(OSPDError, daemon.handle_stop_scan_command, cmd)
505
506
        cmd = secET.fromstring('<stop_scan />')
507
        self.assertRaises(OSPDError, daemon.handle_stop_scan_command, cmd)
508
509 View Code Duplication
    def testScanWithVTs(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
510
        daemon = DummyWrapper([])
511
        cmd = secET.fromstring('<start_scan '
512
                               'target="localhost" ports="80, 443">'
513
                               '<scanner_params /><vt_selection />'
514
                               '</start_scan>')
515
        self.assertRaises(OSPDError, daemon.handle_start_scan_command, cmd)
516
517
        # With one VT, without params
518
        response = secET.fromstring(
519
            daemon.handle_command('<start_scan '
520
                                  'target="localhost" ports="80, 443">'
521
                                  '<scanner_params /><vt_selection>'
522
                                  '<vt_single id="1.2.3.4" />'
523
                                  '</vt_selection></start_scan>'))
524
        scan_id = response.findtext('id')
525
        time.sleep(0.01)
526
        self.assertEqual(
527
            daemon.get_scan_vts(scan_id), {'1.2.3.4': {}, 'vt_groups': []})
528
        self.assertNotEqual(daemon.get_scan_vts(scan_id), {'1.2.3.6': {}})
529
530
        # With out VTS
531
        response = secET.fromstring(
532
            daemon.handle_command('<start_scan '
533
                                  'target="localhost" ports="80, 443">'
534
                                  '<scanner_params /></start_scan>'))
535
        scan_id = response.findtext('id')
536
        time.sleep(0.01)
537
        self.assertEqual(daemon.get_scan_vts(scan_id), {})
538
539 View Code Duplication
    def testScanWithVTs_and_param(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
540
        daemon = DummyWrapper([])
541
542
        # Raise because no vt_param id attribute
543
        cmd = secET.fromstring('<start_scan '
544
                               'target="localhost" ports="80, 443">'
545
                               '<scanner_params /><vt_selection><vt_si'
546
                               'ngle id="1234"><vt_value>200</vt_value>'
547
                               '</vt_single></vt_selection></start_scan>')
548
        self.assertRaises(OSPDError, daemon.handle_start_scan_command, cmd)
549
550
        # No error
551
        response = secET.fromstring(
552
            daemon.handle_command('<start_scan '
553
                                  'target="localhost" ports="80, 443">'
554
                                  '<scanner_params /><vt_selection><vt'
555
                                  '_single id="1234"><vt_value id="ABC">200'
556
                                  '</vt_value></vt_single></vt_selection>'
557
                                  '</start_scan>'))
558
        scan_id = response.findtext('id')
559
        time.sleep(0.01)
560
        self.assertEqual(daemon.get_scan_vts(scan_id),
561
                         {'1234': {'ABC': '200'}, 'vt_groups': []})
562
563
        # Raise because no vtgroup filter attribute
564
        cmd = secET.fromstring('<start_scan '
565
                               'target="localhost" ports="80, 443">'
566
                               '<scanner_params /><vt_selection><vt_group/>'
567
                               '</vt_selection></start_scan>')
568
        self.assertRaises(OSPDError, daemon.handle_start_scan_command, cmd)
569
570
        # No error
571
        response = secET.fromstring(
572
            daemon.handle_command('<start_scan '
573
                                  'target="localhost" ports="80, 443">'
574
                                  '<scanner_params /><vt_selection>'
575
                                  '<vt_group filter="a"/>'
576
                                  '</vt_selection></start_scan>'))
577
        scan_id = response.findtext('id')
578
        time.sleep(0.01)
579
        self.assertEqual(daemon.get_scan_vts(scan_id),
580
                         {'vt_groups': ['a']})
581
582
    def testBillonLaughs(self):
583
        daemon = DummyWrapper([])
584
        lol = '<?xml version="1.0"?>' \
585
              '<!DOCTYPE lolz [' \
586
              ' <!ENTITY lol "lol">' \
587
              ' <!ELEMENT lolz (#PCDATA)>' \
588
              ' <!ENTITY lol1 "&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;">' \
589
              ' <!ENTITY lol2 "&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;">' \
590
              ' <!ENTITY lol3 "&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;">' \
591
              ' <!ENTITY lol4 "&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;">' \
592
              ' <!ENTITY lol5 "&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;">' \
593
              ' <!ENTITY lol6 "&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;">' \
594
              ' <!ENTITY lol7 "&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;">' \
595
              ' <!ENTITY lol8 "&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;">' \
596
              ' <!ENTITY lol9 "&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;">' \
597
              ']>'
598
        self.assertRaises(EntitiesForbidden, daemon.handle_command, lol)
599
600
    def testScanMultiTarget(self):
601
        daemon = DummyWrapper([])
602
        response = secET.fromstring(
603
            daemon.handle_command('<start_scan>'
604
                                  '<scanner_params /><vts><vt id="1.2.3.4" />'
605
                                  '</vts>'
606
                                  '<targets><target>'
607
                                  '<hosts>localhosts</hosts>'
608
                                  '<ports>80,443</ports>'
609
                                  '</target>'
610
                                  '<target><hosts>192.168.0.0/24</hosts>'
611
                                  '<ports>22</ports></target></targets>'
612
                                  '</start_scan>'))
613
        self.assertEqual(response.get('status'), '200')
614
615
    def testMultiTargetWithCredentials(self):
616
        daemon = DummyWrapper([])
617
        response = secET.fromstring(
618
            daemon.handle_command(
619
                '<start_scan>'
620
                '<scanner_params /><vts><vt id="1.2.3.4" />'
621
                '</vts>'
622
                '<targets><target><hosts>localhosts</hosts>'
623
                '<ports>80,443</ports></target><target>'
624
                '<hosts>192.168.0.0/24</hosts><ports>22'
625
                '</ports><credentials>'
626
                '<credential type="up" service="ssh" port="22">'
627
                '<username>scanuser</username>'
628
                '<password>mypass</password>'
629
                '</credential><credential type="up" service="smb">'
630
                '<username>smbuser</username>'
631
                '<password>mypass</password></credential>'
632
                '</credentials>'
633
                '</target></targets>'
634
                '</start_scan>'))
635
636
        self.assertEqual(response.get('status'), '200')
637
        cred_dict = {'ssh': {'type': 'up', 'password':
638
                             'mypass', 'port': '22', 'username': 'scanuser'},
639
                     'smb': {'type': 'up', 'password': 'mypass',
640
                             'username': 'smbuser'}}
641
        scan_id = response.findtext('id')
642
        response = daemon.get_scan_credentials(scan_id, "192.168.0.0/24")
643
        self.assertEqual(response, cred_dict)
644
645
    def testScanGetTarget(self):
646
        daemon = DummyWrapper([])
647
        response = secET.fromstring(
648
            daemon.handle_command('<start_scan>'
649
                                  '<scanner_params /><vts><vt id="1.2.3.4" />'
650
                                  '</vts>'
651
                                  '<targets><target>'
652
                                  '<hosts>localhosts</hosts>'
653
                                  '<ports>80,443</ports>'
654
                                  '</target>'
655
                                  '<target><hosts>192.168.0.0/24</hosts>'
656
                                  '<ports>22</ports></target></targets>'
657
                                  '</start_scan>'))
658
        scan_id = response.findtext('id')
659
        response = secET.fromstring(
660
            daemon.handle_command('<get_scans scan_id="%s"/>' % scan_id))
661
        scan_res = response.find('scan')
662
        self.assertEqual(scan_res.get('target'), 'localhosts,192.168.0.0/24')
663
664
    def testScanGetLegacyTarget(self):
665
        daemon = DummyWrapper([])
666
667
        response = secET.fromstring(
668
            daemon.handle_command(
669
                '<start_scan target="localhosts,192.168.0.0/24" ports="22">'
670
                '<scanner_params /><vts><vt id="1.2.3.4" />'
671
                '</vts>'
672
                '</start_scan>'))
673
        scan_id = response.findtext('id')
674
        response = secET.fromstring(
675
            daemon.handle_command('<get_scans scan_id="%s"/>' % scan_id))
676
        scan_res = response.find('scan')
677
        self.assertEqual(scan_res.get('target'), 'localhosts,192.168.0.0/24')
678
679
    def testScanMultiTargetParallelWithError(self):
680
        daemon = DummyWrapper([])
681
        cmd = secET.fromstring('<start_scan parallel="100a">'
682
                               '<scanner_params />'
683
                               '<targets><target>'
684
                               '<hosts>localhosts</hosts>'
685
                               '<ports>22</ports>'
686
                               '</target></targets>'
687
                               '</start_scan>')
688
        time.sleep(1)
689
        self.assertRaises(OSPDError, daemon.handle_start_scan_command, cmd)
690
691
    def testScanMultiTargetParallel100(self):
692
        daemon = DummyWrapper([])
693
        response = secET.fromstring(
694
            daemon.handle_command('<start_scan parallel="100">'
695
                                  '<scanner_params />'
696
                                  '<targets><target>'
697
                                  '<hosts>localhosts</hosts>'
698
                                  '<ports>22</ports>'
699
                                  '</target></targets>'
700
                                  '</start_scan>'))
701
        time.sleep(1)
702
        self.assertEqual(response.get('status'), '200')
703
704
    def testProgress(self):
705
        daemon = DummyWrapper([])
706
        response = secET.fromstring(
707
            daemon.handle_command('<start_scan parallel="2">'
708
                                  '<scanner_params />'
709
                                  '<targets><target>'
710
                                  '<hosts>localhost1</hosts>'
711
                                  '<ports>22</ports>'
712
                                  '</target><target>'
713
                                  '<hosts>localhost2</hosts>'
714
                                  '<ports>22</ports>'
715
                                  '</target></targets>'
716
                                  '</start_scan>'))
717
        scan_id = response.findtext('id')
718
        daemon.set_scan_target_progress(scan_id, 'localhost1', 75)
719
        daemon.set_scan_target_progress(scan_id, 'localhost2', 25)
720
        self.assertEqual(daemon.calculate_progress(scan_id), 50)
721
722
    def testSetGetVtsVersion(self):
723
        daemon = DummyWrapper([])
724
        daemon.set_vts_version('1234')
725
        version = daemon.get_vts_version()
726
        self.assertEqual('1234', version)
727
728
    def testSetGetVtsVersion_Error(self):
729
        daemon = DummyWrapper([])
730
        self.assertRaises(TypeError, daemon.set_vts_version)
731