Passed
Pull Request — master (#102)
by Juan José
01:15
created

DummyWrapper.get_summary_vt_as_xml_str()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 2
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
# Copyright (C) 2015-2018 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: GPL-2.0-or-later
4
#
5
# This program is free software; you can redistribute it and/or
6
# modify it under the terms of the GNU General Public License
7
# as published by the Free Software Foundation; either version 2
8
# of the License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU General Public License for more details.
14
#
15
# You should have received a copy of the GNU General Public License
16
# along with this program; if not, write to the Free Software
17
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
19
""" Test module for scan runs
20
"""
21
22
from __future__ import print_function
23
24
import time
25
import unittest
26
import xml.etree.ElementTree as ET
27
import defusedxml.lxml as secET
28
from defusedxml.common import EntitiesForbidden
29
30
from ospd.ospd import OSPDaemon
31
from ospd.error import OSPDError
32
from ospd.xml import simple_response_str, get_result_xml
33
34
35
class Result(object):
36
    def __init__(self, type_, **kwargs):
37
        self.result_type = type_
38
        self.host = ''
39
        self.name = ''
40
        self.value = ''
41
        self.port = ''
42
        self.test_id = ''
43
        self.severity = ''
44
        self.qod = ''
45
        for name, value in kwargs.items():
46
            setattr(self, name, value)
47
48
49
class DummyWrapper(OSPDaemon):
50
    def __init__(self, results, checkresult=True):
51
        OSPDaemon.__init__(self, 'cert', 'key', 'ca')
52
        self.checkresult = checkresult
53
        self.results = results
54
55
    def check(self):
56
        return self.checkresults
57
58
    @staticmethod
59
    def get_custom_vt_as_xml_str(vt_id, custom):
60
        return '<custom><mytest>static test</mytest></custom>'
61
62
    @staticmethod
63
    def get_params_vt_as_xml_str(vt_id, vt_params):
64
        return '<vt_params><vt_param id="abc" type="string">' \
65
               '<name>ABC</name><description>Test ABC</description>' \
66
               '<default>yes</default></vt_param>' \
67
               '<vt_param id="def" type="string">' \
68
               '<name>DEF</name><description>Test DEF</description>' \
69
               '<default>no</default></vt_param></vt_params>'
70
71
    @staticmethod
72
    def get_refs_vt_as_xml_str(vt_id, vt_refs):
73
        response = '<refs><ref type="cve" id="CVE-2010-4480"/>' \
74
                    '<ref type="url" id="http://example.com"/></refs>'
75
        return response
76
77
    @staticmethod
78
    def get_dependencies_vt_as_xml_str(vt_id, vt_dependencies):
79
        response = '<dependencies>' \
80
                   '<dependency vt_id="1.3.6.1.4.1.25623.1.0.50282" />' \
81
                   '<dependency vt_id="1.3.6.1.4.1.25623.1.0.50283" />' \
82
                   '</dependencies>'
83
84
        return response
85
86
    @staticmethod
87
    def get_severities_vt_as_xml_str(vt_id, severities):
88
        response = '<severities><severity cvss_base="5.0" cvss_' \
89
                   'type="cvss_base_v2">AV:N/AC:L/Au:N/C:N/I:N/' \
90
                   'A:P</severity></severities>'
91
92
        return response
93
94
    @staticmethod
95
    def get_detection_vt_as_xml_str(vt_id, detection=None,
96
                                    qod_type=None, qod=None):
97
        response = '<detection qod_type="package">some detection</detection>'
98
99
        return response
100
101
    @staticmethod
102
    def get_summary_vt_as_xml_str(vt_id, summary):
103
        response = '<summary>Some summary</summary>'
104
105
        return response
106
107
    @staticmethod
108
    def get_affected_vt_as_xml_str(vt_id, affected):
109
        response = '<affected>Some affected</affected>'
110
111
        return response
112
113
    @staticmethod
114
    def get_impact_vt_as_xml_str(vt_id, impact):
115
        response = '<impact>Some impact</impact>'
116
117
        return response
118
119
    @staticmethod
120
    def get_insight_vt_as_xml_str(vt_id, insight):
121
        response = '<insight>Some insight</insight>'
122
123
        return response
124
125
    @staticmethod
126
    def get_solution_vt_as_xml_str(vt_id, solution, solution_type=None):
127
        response = '<solution>Some solution</solution>'
128
129
        return response
130
131
    @staticmethod
132
    def get_creation_time_vt_as_xml_str(vt_id, creation_time):
133
        response = '<creation_time>%s</creation_time>' % creation_time
134
135
        return response
136
137
    @staticmethod
138
    def get_modification_time_vt_as_xml_str(vt_id, modification_time):
139
        response = (
140
            '<modification_time>%s</modification_time>' % modification_time)
141
142
        return response
143
144
    def exec_scan(self, scan_id, target):
145
        time.sleep(0.01)
146
        for res in self.results:
147
            if res.result_type == 'log':
148
                self.add_scan_log(
149
                    scan_id, res.host or target, res.name, res.value, res.port)
150
            if res.result_type == 'error':
151
                self.add_scan_error(
152
                    scan_id, res.host or target, res.name, res.value, res.port)
153
            elif res.result_type == 'host-detail':
154
                self.add_scan_host_detail(
155
                    scan_id, res.host or target, res.name, res.value)
156
            elif res.result_type == 'alarm':
157
                self.add_scan_alarm(
158
                    scan_id, res.host or target, res.name, res.value,
159
                    res.port, res.test_id, res.severity, res.qod)
160
            else:
161
                raise ValueError(res.result_type)
162
163
164
class FullTest(unittest.TestCase):
165
    # TODO: There should be a lot more assert in there !
166
167
    def testGetDefaultScannerParams(self):
168
        daemon = DummyWrapper([])
169
        response = secET.fromstring(
170
            daemon.handle_command('<get_scanner_details />'))
171
        # The status of the response must be success (i.e. 200)
172
        self.assertEqual(response.get('status'), '200')
173
        # The response root element must have the correct name
174
        self.assertEqual(response.tag, 'get_scanner_details_response')
175
        # The response must contain a 'scanner_params' element
176
        self.assertIsNotNone(response.find('scanner_params'))
177
178
    def testGetDefaultHelp(self):
179
        daemon = DummyWrapper([])
180
        response = secET.fromstring(daemon.handle_command('<help />'))
181
        self.assertEqual(response.get('status'), '200')
182
        response = secET.fromstring(
183
            daemon.handle_command('<help format="xml" />'))
184
        self.assertEqual(response.get('status'), '200')
185
        self.assertEqual(response.tag, 'help_response')
186
187
    def testGetDefaultScannerVersion(self):
188
        daemon = DummyWrapper([])
189
        response = secET.fromstring(daemon.handle_command('<get_version />'))
190
        self.assertEqual(response.get('status'), '200')
191
        self.assertIsNotNone(response.find('protocol'))
192
193
    def testGetVTs_no_VT(self):
194
        daemon = DummyWrapper([])
195
        response = secET.fromstring(daemon.handle_command('<get_vts />'))
196
        self.assertEqual(response.get('status'), '200')
197
        self.assertIsNotNone(response.find('vts'))
198
199
    def testGetVTs_single_VT(self):
200
        daemon = DummyWrapper([])
201
        daemon.add_vt('1.2.3.4', 'A vulnerability test')
202
        response = secET.fromstring(daemon.handle_command('<get_vts />'))
203
        self.assertEqual(response.get('status'), '200')
204
        vts = response.find('vts')
205
        self.assertIsNotNone(vts.find('vt'))
206
        vt = vts.find('vt')
207
        self.assertEqual(vt.get('id'), '1.2.3.4')
208
209 View Code Duplication
    def testGetVTs_filter_positive(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
210
        daemon = DummyWrapper([])
211
        daemon.add_vt('1.2.3.4',
212
                      'A vulnerability test',
213
                      vt_params="a",
214
                      vt_modification_time='19000202')
215
        response = secET.fromstring(
216
            daemon.handle_command('<get_vts filter="modification_time&gt;19000201"></get_vts>'))
217
        self.assertEqual(response.get('status'), '200')
218
        vts = response.find('vts')
219
        self.assertIsNotNone(vts.find('vt'))
220
        vt = vts.find('vt')
221
        self.assertEqual(vt.get('id'), '1.2.3.4')
222
        modification_time = response.findall('vts/vt/modification_time')
223
        self.assertEqual('<modification_time>19000202</modification_time>',
224
                         ET.tostring(modification_time[0]).decode('utf-8'))
225
226 View Code Duplication
    def testGetVTs_filter_negative(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
227
        daemon = DummyWrapper([])
228
        daemon.add_vt('1.2.3.4',
229
                      'A vulnerability test',
230
                      vt_params="a",
231
                      vt_modification_time='19000202')
232
        response = secET.fromstring(
233
            daemon.handle_command('<get_vts filter="modification_time&lt;19000203"></get_vts>'))
234
        self.assertEqual(response.get('status'), '200')
235
        vts = response.find('vts')
236
        self.assertIsNotNone(vts.find('vt'))
237
        vt = vts.find('vt')
238
        self.assertEqual(vt.get('id'), '1.2.3.4')
239
        modification_time = response.findall('vts/vt/modification_time')
240
        self.assertEqual('<modification_time>19000202</modification_time>',
241
                         ET.tostring(modification_time[0]).decode('utf-8'))
242
243
    def testGetVTs_multiple_VTs(self):
244
        daemon = DummyWrapper([])
245
        daemon.add_vt('1.2.3.4', 'A vulnerability test')
246
        daemon.add_vt('some id', 'Another vulnerability test')
247
        daemon.add_vt('123456789', 'Yet another vulnerability test')
248
        response = secET.fromstring(daemon.handle_command('<get_vts />'))
249
        self.assertEqual(response.get('status'), '200')
250
        vts = response.find('vts')
251
        self.assertIsNotNone(vts.find('vt'))
252
253
    def testGetVTs_multiple_VTs_with_custom(self):
254
        daemon = DummyWrapper([])
255
        daemon.add_vt('1.2.3.4', 'A vulnerability test', custom='b')
256
        daemon.add_vt('4.3.2.1', 'Another vulnerability test with custom info',
257
                      custom='b')
258
        daemon.add_vt('123456789', 'Yet another vulnerability test',
259
                      custom='b')
260
        response = secET.fromstring(daemon.handle_command('<get_vts />'))
261
        custom = response.findall('vts/vt/custom')
262
        self.assertEqual(3, len(custom))
263
264 View Code Duplication
    def testGetVTs_VTs_with_params(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
265
        daemon = DummyWrapper([])
266
        daemon.add_vt('1.2.3.4', 'A vulnerability test',
267
                      vt_params="a", custom="b")
268
        response = secET.fromstring(
269
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
270
        # The status of the response must be success (i.e. 200)
271
        self.assertEqual(response.get('status'), '200')
272
        # The response root element must have the correct name
273
        self.assertEqual(response.tag, 'get_vts_response')
274
        # The response must contain a 'scanner_params' element
275
        self.assertIsNotNone(response.find('vts'))
276
        vt_params = response[0][0].findall('vt_params')
277
        self.assertEqual(1, len(vt_params))
278
        custom = response[0][0].findall('custom')
279
        self.assertEqual(1, len(custom))
280
        params = response.findall('vts/vt/vt_params/vt_param')
281
        self.assertEqual(2, len(params))
282
283 View Code Duplication
    def testGetVTs_VTs_with_refs(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
284
        daemon = DummyWrapper([])
285
        daemon.add_vt('1.2.3.4',
286
                      'A vulnerability test',
287
                      vt_params="a",
288
                      custom="b",
289
                      vt_refs="c")
290
        response = secET.fromstring(
291
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
292
        # The status of the response must be success (i.e. 200)
293
        self.assertEqual(response.get('status'), '200')
294
        # The response root element must have the correct name
295
        self.assertEqual(response.tag, 'get_vts_response')
296
        # The response must contain a 'vts' element
297
        self.assertIsNotNone(response.find('vts'))
298
        vt_params = response[0][0].findall('vt_params')
299
        self.assertEqual(1, len(vt_params))
300
        custom = response[0][0].findall('custom')
301
        self.assertEqual(1, len(custom))
302
        refs = response.findall('vts/vt/refs/ref')
303
        self.assertEqual(2, len(refs))
304
305
    def testGetVTs_VTs_with_dependencies(self):
306
        daemon = DummyWrapper([])
307
        daemon.add_vt('1.2.3.4',
308
                      'A vulnerability test',
309
                      vt_params="a",
310
                      custom="b",
311
                      vt_dependencies="c",)
312
        response = secET.fromstring(
313
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
314
        deps = response.findall('vts/vt/dependencies/dependency')
315
        self.assertEqual(2, len(deps))
316
317
    def testGetVTs_VTs_with_severities(self):
318
        daemon = DummyWrapper([])
319
        daemon.add_vt('1.2.3.4',
320
                      'A vulnerability test',
321
                      vt_params="a",
322
                      custom="b",
323
                      severities="c",)
324
        response = secET.fromstring(
325
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
326
        severity = response.findall('vts/vt/severities/severity')
327
        self.assertEqual(1, len(severity))
328
329
    def testGetVTs_VTs_with_detection_qodt(self):
330
        daemon = DummyWrapper([])
331
        daemon.add_vt('1.2.3.4',
332
                      'A vulnerability test',
333
                      vt_params="a",
334
                      custom="b",
335
                      detection="c",
336
                      qod_t="d")
337
        response = secET.fromstring(
338
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
339
        detection = response.findall('vts/vt/detection')
340
        self.assertEqual(1, len(detection))
341
342
    def testGetVTs_VTs_with_detection_qodv(self):
343
        daemon = DummyWrapper([])
344
        daemon.add_vt('1.2.3.4',
345
                      'A vulnerability test',
346
                      vt_params="a",
347
                      custom="b",
348
                      detection="c",
349
                      qod_v="d")
350
        response = secET.fromstring(
351
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
352
        detection = response.findall('vts/vt/detection')
353
        self.assertEqual(1, len(detection))
354
355
    def testGetVTs_VTs_with_summary(self):
356
        daemon = DummyWrapper([])
357
        daemon.add_vt('1.2.3.4',
358
                      'A vulnerability test',
359
                      vt_params="a",
360
                      custom="b",
361
                      summary="c",)
362
        response = secET.fromstring(
363
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
364
        summary = response.findall('vts/vt/summary')
365
        self.assertEqual(1, len(summary))
366
367
    def testGetVTs_VTs_with_impact(self):
368
        daemon = DummyWrapper([])
369
        daemon.add_vt('1.2.3.4',
370
                      'A vulnerability test',
371
                      vt_params="a",
372
                      custom="b",
373
                      impact="c",)
374
        response = secET.fromstring(
375
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
376
        impact = response.findall('vts/vt/impact')
377
        self.assertEqual(1, len(impact))
378
379
    def testGetVTs_VTs_with_affected(self):
380
        daemon = DummyWrapper([])
381
        daemon.add_vt('1.2.3.4',
382
                      'A vulnerability test',
383
                      vt_params="a",
384
                      custom="b",
385
                      affected="c",)
386
        response = secET.fromstring(
387
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
388
        affect = response.findall('vts/vt/affected')
389
        self.assertEqual(1, len(affect))
390
391
    def testGetVTs_VTs_with_insight(self):
392
        daemon = DummyWrapper([])
393
        daemon.add_vt('1.2.3.4',
394
                      'A vulnerability test',
395
                      vt_params="a",
396
                      custom="b",
397
                      insight="c",)
398
        response = secET.fromstring(
399
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
400
        insight = response.findall('vts/vt/insight')
401
        self.assertEqual(1, len(insight))
402
403
    def testGetVTs_VTs_with_solution(self):
404
        daemon = DummyWrapper([])
405
        daemon.add_vt('1.2.3.4',
406
                      'A vulnerability test',
407
                      vt_params="a",
408
                      custom="b",
409
                      solution="c",
410
                      solution_t="d")
411
        response = secET.fromstring(
412
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
413
        solution = response.findall('vts/vt/solution')
414
        self.assertEqual(1, len(solution))
415
416
    def testGetVTs_VTs_with_ctime(self):
417
        daemon = DummyWrapper([])
418
        daemon.add_vt('1.2.3.4',
419
                      'A vulnerability test',
420
                      vt_params="a",
421
                      vt_creation_time='01-01-1900')
422
        response = secET.fromstring(
423
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
424
        creation_time = response.findall('vts/vt/creation_time')
425
426
        self.assertEqual('<creation_time>01-01-1900</creation_time>',
427
                         ET.tostring(creation_time[0]).decode('utf-8'))
428
429
    def testGetVTs_VTs_with_mtime(self):
430
        daemon = DummyWrapper([])
431
        daemon.add_vt('1.2.3.4',
432
                      'A vulnerability test',
433
                      vt_params="a",
434
                      vt_modification_time='02-01-1900')
435
        response = secET.fromstring(
436
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
437
        modification_time = response.findall('vts/vt/modification_time')
438
439
        self.assertEqual('<modification_time>02-01-1900</modification_time>',
440
                         ET.tostring(modification_time[0]).decode('utf-8'))
441
442
    def testScanWithError(self):
443
        daemon = DummyWrapper([
444
            Result('error', value='something went wrong'),
445
        ])
446
447
        response = secET.fromstring(
448
            daemon.handle_command('<start_scan target="localhost" ports="80, '
449
                                  '443"><scanner_params /></start_scan>'))
450
        scan_id = response.findtext('id')
451
452
        finished = False
453
        while not finished:
454
            response = secET.fromstring(
455
                daemon.handle_command(
456
                    '<get_scans scan_id="%s" details="1"/>' % scan_id))
457
            scans = response.findall('scan')
458
            self.assertEqual(1, len(scans))
459
            scan = scans[0]
460
            status = scan.get('status')
461
            if status == "init" or status == "running":
462
                self.assertEqual('0', scan.get('end_time'))
463
                time.sleep(.010)
464
            else:
465
                finished = True
466
        response = secET.fromstring(daemon.handle_command(
467
            '<get_scans scan_id="%s" details="1"/>' % scan_id))
468
        self.assertEqual(response.findtext('scan/results/result'),
469
                         'something went wrong')
470
471
        response = secET.fromstring(
472
            daemon.handle_command('<delete_scan scan_id="%s" />' % scan_id))
473
        self.assertEqual(response.get('status'), '200')
474
475
    def testGetScanPop(self):
476
        daemon = DummyWrapper([
477
            Result('host-detail', value='Some Host Detail'),
478
        ])
479
480
        response = secET.fromstring(daemon.handle_command(
481
            '<start_scan target="localhost" ports="80, 443">'
482
            '<scanner_params /></start_scan>'))
483
        scan_id = response.findtext('id')
484
        time.sleep(1)
485
486
        response = secET.fromstring(
487
            daemon.handle_command('<get_scans scan_id="%s"/>' % scan_id))
488
        self.assertEqual(response.findtext('scan/results/result'),
489
                         'Some Host Detail')
490
491
        response = secET.fromstring(
492
            daemon.handle_command(
493
                '<get_scans scan_id="%s" pop_results="1"/>' % scan_id))
494
        self.assertEqual(response.findtext('scan/results/result'),
495
                         'Some Host Detail')
496
497
        response = secET.fromstring(
498
            daemon.handle_command(
499
                '<get_scans details="0" pop_results="1"/>'))
500
        self.assertEqual(response.findtext('scan/results/result'),
501
                         None)
502
503
        response = secET.fromstring(
504
            daemon.handle_command(
505
                '<get_scans scan_id="%s" pop_results="1"/>' % scan_id))
506
507
        while True:
508
            response = secET.fromstring(
509
                daemon.handle_command(
510
                    '<get_scans scan_id="%s" details="1"/>' % scan_id))
511
            scans = response.findall('scan')
512
            self.assertEqual(1, len(scans))
513
            scan = scans[0]
514
            if scan.get('status') == "stopped":
515
                break
516
517
        scans = response.findall('scan')
518
        scan = scans[0]
519
        self.assertTrue(response.findtext('scan/results/result') in
520
                            ['Scan process failure.', 'Scan stopped.'])
521
522
        response = secET.fromstring(
523
            daemon.handle_command('<delete_scan scan_id="%s" />' % scan_id))
524
        self.assertEqual(response.get('status'), '200')
525
526
    def testStopScan(self):
527
        daemon = DummyWrapper([])
528
        response = secET.fromstring(
529
            daemon.handle_command('<start_scan '
530
                                  'target="localhost" ports="80, 443">'
531
                                  '<scanner_params /></start_scan>'))
532
        scan_id = response.findtext('id')
533
534
        # Depending on the sistem this test can end with a race condition
535
        # because the scanner is already stopped when the <stop_scan>
536
        # command is run.
537
        time.sleep(3)
538
        cmd = secET.fromstring('<stop_scan scan_id="%s" />' % scan_id)
539
        self.assertRaises(OSPDError, daemon.handle_stop_scan_command, cmd)
540
541
        cmd = secET.fromstring('<stop_scan />')
542
        self.assertRaises(OSPDError, daemon.handle_stop_scan_command, cmd)
543
544 View Code Duplication
    def testScanWithVTs(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
545
        daemon = DummyWrapper([])
546
        cmd = secET.fromstring('<start_scan '
547
                               'target="localhost" ports="80, 443">'
548
                               '<scanner_params /><vt_selection />'
549
                               '</start_scan>')
550
        self.assertRaises(OSPDError, daemon.handle_start_scan_command, cmd)
551
552
        # With one VT, without params
553
        response = secET.fromstring(
554
            daemon.handle_command('<start_scan '
555
                                  'target="localhost" ports="80, 443">'
556
                                  '<scanner_params /><vt_selection>'
557
                                  '<vt_single id="1.2.3.4" />'
558
                                  '</vt_selection></start_scan>'))
559
        scan_id = response.findtext('id')
560
        time.sleep(0.01)
561
        self.assertEqual(
562
            daemon.get_scan_vts(scan_id), {'1.2.3.4': {}, 'vt_groups': []})
563
        self.assertNotEqual(daemon.get_scan_vts(scan_id), {'1.2.3.6': {}})
564
565
        # With out VTS
566
        response = secET.fromstring(
567
            daemon.handle_command('<start_scan '
568
                                  'target="localhost" ports="80, 443">'
569
                                  '<scanner_params /></start_scan>'))
570
        scan_id = response.findtext('id')
571
        time.sleep(0.01)
572
        self.assertEqual(daemon.get_scan_vts(scan_id), {})
573
574 View Code Duplication
    def testScanWithVTs_and_param(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
575
        daemon = DummyWrapper([])
576
577
        # Raise because no vt_param id attribute
578
        cmd = secET.fromstring('<start_scan '
579
                               'target="localhost" ports="80, 443">'
580
                               '<scanner_params /><vt_selection><vt_si'
581
                               'ngle id="1234"><vt_value>200</vt_value>'
582
                               '</vt_single></vt_selection></start_scan>')
583
        self.assertRaises(OSPDError, daemon.handle_start_scan_command, cmd)
584
585
        # No error
586
        response = secET.fromstring(
587
            daemon.handle_command('<start_scan '
588
                                  'target="localhost" ports="80, 443">'
589
                                  '<scanner_params /><vt_selection><vt'
590
                                  '_single id="1234"><vt_value id="ABC">200'
591
                                  '</vt_value></vt_single></vt_selection>'
592
                                  '</start_scan>'))
593
        scan_id = response.findtext('id')
594
        time.sleep(0.01)
595
        self.assertEqual(daemon.get_scan_vts(scan_id),
596
                         {'1234': {'ABC': '200'}, 'vt_groups': []})
597
598
        # Raise because no vtgroup filter attribute
599
        cmd = secET.fromstring('<start_scan '
600
                               'target="localhost" ports="80, 443">'
601
                               '<scanner_params /><vt_selection><vt_group/>'
602
                               '</vt_selection></start_scan>')
603
        self.assertRaises(OSPDError, daemon.handle_start_scan_command, cmd)
604
605
        # No error
606
        response = secET.fromstring(
607
            daemon.handle_command('<start_scan '
608
                                  'target="localhost" ports="80, 443">'
609
                                  '<scanner_params /><vt_selection>'
610
                                  '<vt_group filter="a"/>'
611
                                  '</vt_selection></start_scan>'))
612
        scan_id = response.findtext('id')
613
        time.sleep(0.01)
614
        self.assertEqual(daemon.get_scan_vts(scan_id),
615
                         {'vt_groups': ['a']})
616
617
    def testBillonLaughs(self):
618
        daemon = DummyWrapper([])
619
        lol = '<?xml version="1.0"?>' \
620
              '<!DOCTYPE lolz [' \
621
              ' <!ENTITY lol "lol">' \
622
              ' <!ELEMENT lolz (#PCDATA)>' \
623
              ' <!ENTITY lol1 "&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;">' \
624
              ' <!ENTITY lol2 "&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;">' \
625
              ' <!ENTITY lol3 "&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;">' \
626
              ' <!ENTITY lol4 "&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;">' \
627
              ' <!ENTITY lol5 "&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;">' \
628
              ' <!ENTITY lol6 "&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;">' \
629
              ' <!ENTITY lol7 "&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;">' \
630
              ' <!ENTITY lol8 "&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;">' \
631
              ' <!ENTITY lol9 "&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;">' \
632
              ']>'
633
        self.assertRaises(EntitiesForbidden, daemon.handle_command, lol)
634
635
    def testScanMultiTarget(self):
636
        daemon = DummyWrapper([])
637
        response = secET.fromstring(
638
            daemon.handle_command('<start_scan>'
639
                                  '<scanner_params /><vts><vt id="1.2.3.4" />'
640
                                  '</vts>'
641
                                  '<targets><target>'
642
                                  '<hosts>localhosts</hosts>'
643
                                  '<ports>80,443</ports>'
644
                                  '</target>'
645
                                  '<target><hosts>192.168.0.0/24</hosts>'
646
                                  '<ports>22</ports></target></targets>'
647
                                  '</start_scan>'))
648
        self.assertEqual(response.get('status'), '200')
649
650
    def testMultiTargetWithCredentials(self):
651
        daemon = DummyWrapper([])
652
        response = secET.fromstring(
653
            daemon.handle_command(
654
                '<start_scan>'
655
                '<scanner_params /><vts><vt id="1.2.3.4" />'
656
                '</vts>'
657
                '<targets><target><hosts>localhosts</hosts>'
658
                '<ports>80,443</ports></target><target>'
659
                '<hosts>192.168.0.0/24</hosts><ports>22'
660
                '</ports><credentials>'
661
                '<credential type="up" service="ssh" port="22">'
662
                '<username>scanuser</username>'
663
                '<password>mypass</password>'
664
                '</credential><credential type="up" service="smb">'
665
                '<username>smbuser</username>'
666
                '<password>mypass</password></credential>'
667
                '</credentials>'
668
                '</target></targets>'
669
                '</start_scan>'))
670
671
        self.assertEqual(response.get('status'), '200')
672
        cred_dict = {'ssh': {'type': 'up', 'password':
673
                             'mypass', 'port': '22', 'username': 'scanuser'},
674
                     'smb': {'type': 'up', 'password': 'mypass',
675
                             'username': 'smbuser'}}
676
        scan_id = response.findtext('id')
677
        response = daemon.get_scan_credentials(scan_id, "192.168.0.0/24")
678
        self.assertEqual(response, cred_dict)
679
680
    def testScanGetTarget(self):
681
        daemon = DummyWrapper([])
682
        response = secET.fromstring(
683
            daemon.handle_command('<start_scan>'
684
                                  '<scanner_params /><vts><vt id="1.2.3.4" />'
685
                                  '</vts>'
686
                                  '<targets><target>'
687
                                  '<hosts>localhosts</hosts>'
688
                                  '<ports>80,443</ports>'
689
                                  '</target>'
690
                                  '<target><hosts>192.168.0.0/24</hosts>'
691
                                  '<ports>22</ports></target></targets>'
692
                                  '</start_scan>'))
693
        scan_id = response.findtext('id')
694
        response = secET.fromstring(
695
            daemon.handle_command('<get_scans scan_id="%s"/>' % scan_id))
696
        scan_res = response.find('scan')
697
        self.assertEqual(scan_res.get('target'), 'localhosts,192.168.0.0/24')
698
699
    def testScanMultiTargetParallelWithError(self):
700
        daemon = DummyWrapper([])
701
        cmd = secET.fromstring('<start_scan parallel="100a">'
702
                               '<scanner_params />'
703
                               '<targets><target>'
704
                               '<hosts>localhosts</hosts>'
705
                               '<ports>22</ports>'
706
                               '</target></targets>'
707
                               '</start_scan>')
708
        time.sleep(1)
709
        self.assertRaises(OSPDError, daemon.handle_start_scan_command, cmd)
710
711
    def testScanMultiTargetParallel100(self):
712
        daemon = DummyWrapper([])
713
        response = secET.fromstring(
714
            daemon.handle_command('<start_scan parallel="100">'
715
                                  '<scanner_params />'
716
                                  '<targets><target>'
717
                                  '<hosts>localhosts</hosts>'
718
                                  '<ports>22</ports>'
719
                                  '</target></targets>'
720
                                  '</start_scan>'))
721
        time.sleep(1)
722
        self.assertEqual(response.get('status'), '200')
723
724
    def testProgress(self):
725
        daemon = DummyWrapper([])
726
        response = secET.fromstring(
727
            daemon.handle_command('<start_scan parallel="2">'
728
                                  '<scanner_params />'
729
                                  '<targets><target>'
730
                                  '<hosts>localhost1</hosts>'
731
                                  '<ports>22</ports>'
732
                                  '</target><target>'
733
                                  '<hosts>localhost2</hosts>'
734
                                  '<ports>22</ports>'
735
                                  '</target></targets>'
736
                                  '</start_scan>'))
737
        scan_id = response.findtext('id')
738
        daemon.set_scan_target_progress(scan_id, 'localhost1', 'localhost1', 75)
739
        daemon.set_scan_target_progress(scan_id, 'localhost2', 'localhost2', 25)
740
        self.assertEqual(daemon.calculate_progress(scan_id), 50)
741
742
    def testSetGetVtsVersion(self):
743
        daemon = DummyWrapper([])
744
        daemon.set_vts_version('1234')
745
        version = daemon.get_vts_version()
746
        self.assertEqual('1234', version)
747
748
    def testSetGetVtsVersion_Error(self):
749
        daemon = DummyWrapper([])
750
        self.assertRaises(TypeError, daemon.set_vts_version)
751
752
    def testResumeTask(self):
753
        daemon = DummyWrapper([])
754
        response = secET.fromstring(
755
            daemon.handle_command('<start_scan '
756
                                  'target="localhost" ports="80, 443">'
757
                                  '<scanner_params /></start_scan>'))
758
        scan_id = response.findtext('id')
759
        time.sleep(3)
760
        cmd = secET.fromstring('<stop_scan scan_id="%s" />' % scan_id)
761
        self.assertRaises(OSPDError, daemon.handle_stop_scan_command, cmd)
762
        time.sleep(3)
763
        cmd = '<start_scan scan_id="%s" target="localhost" ports="80, 443"><scanner_params /></start_scan>' % scan_id
764
        response = secET.fromstring(
765
            daemon.handle_command(cmd))
766
        self.assertEqual(response.findtext('id'), scan_id)
767
        self.assertEqual(daemon.scan_collection.get_hosts_unfinished(scan_id), ['localhost'])
768