Completed
Push — master ( 834fd2...202079 )
by Juan José
17s queued 13s
created

ScanTestCase.test_get_default_help()   A

Complexity

Conditions 1

Size

Total Lines 12
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 8
nop 1
dl 0
loc 12
rs 10
c 0
b 0
f 0
1
# Copyright (C) 2015-2018 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: GPL-2.0-or-later
4
#
5
# This program is free software; you can redistribute it and/or
6
# modify it under the terms of the GNU General Public License
7
# as published by the Free Software Foundation; either version 2
8
# of the License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU General Public License for more details.
14
#
15
# You should have received a copy of the GNU General Public License
16
# along with this program; if not, write to the Free Software
17
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
19
# pylint: disable=too-many-lines
20
21
""" Test module for scan runs
22
"""
23
24
import time
25
import unittest
26
27
import xml.etree.ElementTree as ET
28
import defusedxml.lxml as secET
29
30
from defusedxml.common import EntitiesForbidden
31
32
from ospd.ospd import OSPDaemon
33
from ospd.errors import OspdCommandError
34
35
36
class Result(object):
37
    def __init__(self, type_, **kwargs):
38
        self.result_type = type_
39
        self.host = ''
40
        self.hostname = ''
41
        self.name = ''
42
        self.value = ''
43
        self.port = ''
44
        self.test_id = ''
45
        self.severity = ''
46
        self.qod = ''
47
        for name, value in kwargs.items():
48
            setattr(self, name, value)
49
50
51
class DummyWrapper(OSPDaemon):
52
    def __init__(self, results, checkresult=True):
53
        super().__init__()
54
        self.checkresult = checkresult
55
        self.results = results
56
57
    def check(self):
58
        return self.checkresult
59
60
    @staticmethod
61
    def get_custom_vt_as_xml_str(vt_id, custom):
62
        return '<custom><mytest>static test</mytest></custom>'
63
64
    @staticmethod
65
    def get_params_vt_as_xml_str(vt_id, vt_params):
66
        return (
67
            '<params><param id="abc" type="string">'
68
            '<name>ABC</name><description>Test ABC</description>'
69
            '<default>yes</default></param>'
70
            '<param id="def" type="string">'
71
            '<name>DEF</name><description>Test DEF</description>'
72
            '<default>no</default></param></params>'
73
        )
74
75
    @staticmethod
76
    def get_refs_vt_as_xml_str(vt_id, vt_refs):
77
        response = (
78
            '<refs><ref type="cve" id="CVE-2010-4480"/>'
79
            '<ref type="url" id="http://example.com"/></refs>'
80
        )
81
        return response
82
83
    @staticmethod
84
    def get_dependencies_vt_as_xml_str(vt_id, vt_dependencies):
85
        response = (
86
            '<dependencies>'
87
            '<dependency vt_id="1.3.6.1.4.1.25623.1.0.50282" />'
88
            '<dependency vt_id="1.3.6.1.4.1.25623.1.0.50283" />'
89
            '</dependencies>'
90
        )
91
92
        return response
93
94
    @staticmethod
95
    def get_severities_vt_as_xml_str(vt_id, severities):
96
        response = (
97
            '<severities><severity cvss_base="5.0" cvss_'
98
            'type="cvss_base_v2">AV:N/AC:L/Au:N/C:N/I:N/'
99
            'A:P</severity></severities>'
100
        )
101
102
        return response
103
104
    @staticmethod
105
    def get_detection_vt_as_xml_str(
106
        vt_id, detection=None, qod_type=None, qod=None
107
    ):
108
        response = '<detection qod_type="package">some detection</detection>'
109
110
        return response
111
112
    @staticmethod
113
    def get_summary_vt_as_xml_str(vt_id, summary):
114
        response = '<summary>Some summary</summary>'
115
116
        return response
117
118
    @staticmethod
119
    def get_affected_vt_as_xml_str(vt_id, affected):
120
        response = '<affected>Some affected</affected>'
121
122
        return response
123
124
    @staticmethod
125
    def get_impact_vt_as_xml_str(vt_id, impact):
126
        response = '<impact>Some impact</impact>'
127
128
        return response
129
130
    @staticmethod
131
    def get_insight_vt_as_xml_str(vt_id, insight):
132
        response = '<insight>Some insight</insight>'
133
134
        return response
135
136
    @staticmethod
137
    def get_solution_vt_as_xml_str(vt_id, solution, solution_type=None):
138
        response = '<solution>Some solution</solution>'
139
140
        return response
141
142
    @staticmethod
143
    def get_creation_time_vt_as_xml_str(
144
        vt_id, creation_time
145
    ):  # pylint: disable=arguments-differ
146
        response = '<creation_time>%s</creation_time>' % creation_time
147
148
        return response
149
150
    @staticmethod
151
    def get_modification_time_vt_as_xml_str(
152
        vt_id, modification_time
153
    ):  # pylint: disable=arguments-differ
154
        response = (
155
            '<modification_time>%s</modification_time>' % modification_time
156
        )
157
158
        return response
159
160
    def exec_scan(self, scan_id, target):
161
        time.sleep(0.01)
162
        for res in self.results:
163
            if res.result_type == 'log':
164
                self.add_scan_log(
165
                    scan_id,
166
                    res.host or target,
167
                    res.hostname,
168
                    res.name,
169
                    res.value,
170
                    res.port,
171
                )
172
            if res.result_type == 'error':
173
                self.add_scan_error(
174
                    scan_id,
175
                    res.host or target,
176
                    res.hostname,
177
                    res.name,
178
                    res.value,
179
                    res.port,
180
                )
181
            elif res.result_type == 'host-detail':
182
                self.add_scan_host_detail(
183
                    scan_id,
184
                    res.host or target,
185
                    res.hostname,
186
                    res.name,
187
                    res.value,
188
                )
189
            elif res.result_type == 'alarm':
190
                self.add_scan_alarm(
191
                    scan_id,
192
                    res.host or target,
193
                    res.hostname,
194
                    res.name,
195
                    res.value,
196
                    res.port,
197
                    res.test_id,
198
                    res.severity,
199
                    res.qod,
200
                )
201
            else:
202
                raise ValueError(res.result_type)
203
204
205
class ScanTestCase(unittest.TestCase):
206
    def test_get_default_scanner_params(self):
207
        daemon = DummyWrapper([])
208
        response = secET.fromstring(
209
            daemon.handle_command('<get_scanner_details />')
210
        )
211
212
        # The status of the response must be success (i.e. 200)
213
        self.assertEqual(response.get('status'), '200')
214
        # The response root element must have the correct name
215
        self.assertEqual(response.tag, 'get_scanner_details_response')
216
        # The response must contain a 'scanner_params' element
217
        self.assertIsNotNone(response.find('scanner_params'))
218
219
    def test_get_default_help(self):
220
        daemon = DummyWrapper([])
221
        response = secET.fromstring(daemon.handle_command('<help />'))
222
223
        self.assertEqual(response.get('status'), '200')
224
225
        response = secET.fromstring(
226
            daemon.handle_command('<help format="xml" />')
227
        )
228
229
        self.assertEqual(response.get('status'), '200')
230
        self.assertEqual(response.tag, 'help_response')
231
232
    def test_get_default_scanner_version(self):
233
        daemon = DummyWrapper([])
234
        response = secET.fromstring(daemon.handle_command('<get_version />'))
235
236
        self.assertEqual(response.get('status'), '200')
237
        self.assertIsNotNone(response.find('protocol'))
238
239
    def test_get_vts_no_vt(self):
240
        daemon = DummyWrapper([])
241
        response = secET.fromstring(daemon.handle_command('<get_vts />'))
242
243
        self.assertEqual(response.get('status'), '200')
244
        self.assertIsNotNone(response.find('vts'))
245
246
    def test_get_vts_single_vt(self):
247
        daemon = DummyWrapper([])
248
        daemon.add_vt('1.2.3.4', 'A vulnerability test')
249
        response = secET.fromstring(daemon.handle_command('<get_vts />'))
250
251
        self.assertEqual(response.get('status'), '200')
252
253
        vts = response.find('vts')
254
        self.assertIsNotNone(vts.find('vt'))
255
256
        vt = vts.find('vt')
257
        self.assertEqual(vt.get('id'), '1.2.3.4')
258
259 View Code Duplication
    def test_get_vts_filter_positive(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
260
        daemon = DummyWrapper([])
261
        daemon.add_vt(
262
            '1.2.3.4',
263
            'A vulnerability test',
264
            vt_params="a",
265
            vt_modification_time='19000202',
266
        )
267
268
        response = secET.fromstring(
269
            daemon.handle_command(
270
                '<get_vts filter="modification_time&gt;19000201"></get_vts>'
271
            )
272
        )
273
274
        self.assertEqual(response.get('status'), '200')
275
        vts = response.find('vts')
276
277
        vt = vts.find('vt')
278
        self.assertIsNotNone(vt)
279
        self.assertEqual(vt.get('id'), '1.2.3.4')
280
281
        modification_time = response.findall('vts/vt/modification_time')
282
        self.assertEqual(
283
            '<modification_time>19000202</modification_time>',
284
            ET.tostring(modification_time[0]).decode('utf-8'),
285
        )
286
287 View Code Duplication
    def test_get_vts_filter_negative(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
288
        daemon = DummyWrapper([])
289
        daemon.add_vt(
290
            '1.2.3.4',
291
            'A vulnerability test',
292
            vt_params="a",
293
            vt_modification_time='19000202',
294
        )
295
296
        response = secET.fromstring(
297
            daemon.handle_command(
298
                '<get_vts filter="modification_time&lt;19000203"></get_vts>'
299
            )
300
        )
301
        self.assertEqual(response.get('status'), '200')
302
303
        vts = response.find('vts')
304
305
        vt = vts.find('vt')
306
        self.assertIsNotNone(vt)
307
        self.assertEqual(vt.get('id'), '1.2.3.4')
308
309
        modification_time = response.findall('vts/vt/modification_time')
310
        self.assertEqual(
311
            '<modification_time>19000202</modification_time>',
312
            ET.tostring(modification_time[0]).decode('utf-8'),
313
        )
314
315
    def test_get_vtss_multiple_vts(self):
316
        daemon = DummyWrapper([])
317
        daemon.add_vt('1.2.3.4', 'A vulnerability test')
318
        daemon.add_vt('1.2.3.5', 'Another vulnerability test')
319
        daemon.add_vt('123456789', 'Yet another vulnerability test')
320
321
        response = secET.fromstring(daemon.handle_command('<get_vts />'))
322
323
        self.assertEqual(response.get('status'), '200')
324
325
        vts = response.find('vts')
326
        self.assertIsNotNone(vts.find('vt'))
327
328
    def test_get_vts_multiple_vts_with_custom(self):
329
        daemon = DummyWrapper([])
330
        daemon.add_vt('1.2.3.4', 'A vulnerability test', custom='b')
331
        daemon.add_vt(
332
            '4.3.2.1', 'Another vulnerability test with custom info', custom='b'
333
        )
334
        daemon.add_vt('123456789', 'Yet another vulnerability test', custom='b')
335
336
        response = secET.fromstring(daemon.handle_command('<get_vts />'))
337
        custom = response.findall('vts/vt/custom')
338
339
        self.assertEqual(3, len(custom))
340
341 View Code Duplication
    def test_get_vts_vts_with_params(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
342
        daemon = DummyWrapper([])
343
        daemon.add_vt(
344
            '1.2.3.4', 'A vulnerability test', vt_params="a", custom="b"
345
        )
346
347
        response = secET.fromstring(
348
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
349
        )
350
        # The status of the response must be success (i.e. 200)
351
        self.assertEqual(response.get('status'), '200')
352
353
        # The response root element must have the correct name
354
        self.assertEqual(response.tag, 'get_vts_response')
355
        # The response must contain a 'scanner_params' element
356
        self.assertIsNotNone(response.find('vts'))
357
358
        vt_params = response[0][0].findall('params')
359
        self.assertEqual(1, len(vt_params))
360
361
        custom = response[0][0].findall('custom')
362
        self.assertEqual(1, len(custom))
363
364
        params = response.findall('vts/vt/params/param')
365
        self.assertEqual(2, len(params))
366
367 View Code Duplication
    def test_get_vts_vts_with_refs(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
368
        daemon = DummyWrapper([])
369
        daemon.add_vt(
370
            '1.2.3.4',
371
            'A vulnerability test',
372
            vt_params="a",
373
            custom="b",
374
            vt_refs="c",
375
        )
376
377
        response = secET.fromstring(
378
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
379
        )
380
        # The status of the response must be success (i.e. 200)
381
        self.assertEqual(response.get('status'), '200')
382
383
        # The response root element must have the correct name
384
        self.assertEqual(response.tag, 'get_vts_response')
385
386
        # The response must contain a 'vts' element
387
        self.assertIsNotNone(response.find('vts'))
388
389
        vt_params = response[0][0].findall('params')
390
        self.assertEqual(1, len(vt_params))
391
392
        custom = response[0][0].findall('custom')
393
        self.assertEqual(1, len(custom))
394
395
        refs = response.findall('vts/vt/refs/ref')
396
        self.assertEqual(2, len(refs))
397
398
    def test_get_vts_vts_with_dependencies(self):
399
        daemon = DummyWrapper([])
400
        daemon.add_vt(
401
            '1.2.3.4',
402
            'A vulnerability test',
403
            vt_params="a",
404
            custom="b",
405
            vt_dependencies="c",
406
        )
407
408
        response = secET.fromstring(
409
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
410
        )
411
412
        deps = response.findall('vts/vt/dependencies/dependency')
413
        self.assertEqual(2, len(deps))
414
415
    def test_get_vts_vts_with_severities(self):
416
        daemon = DummyWrapper([])
417
        daemon.add_vt(
418
            '1.2.3.4',
419
            'A vulnerability test',
420
            vt_params="a",
421
            custom="b",
422
            severities="c",
423
        )
424
425
        response = secET.fromstring(
426
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
427
        )
428
429
        severity = response.findall('vts/vt/severities/severity')
430
        self.assertEqual(1, len(severity))
431
432
    def test_get_vts_vts_with_detection_qodt(self):
433
        daemon = DummyWrapper([])
434
        daemon.add_vt(
435
            '1.2.3.4',
436
            'A vulnerability test',
437
            vt_params="a",
438
            custom="b",
439
            detection="c",
440
            qod_t="d",
441
        )
442
443
        response = secET.fromstring(
444
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
445
        )
446
447
        detection = response.findall('vts/vt/detection')
448
        self.assertEqual(1, len(detection))
449
450
    def test_get_vts_vts_with_detection_qodv(self):
451
        daemon = DummyWrapper([])
452
        daemon.add_vt(
453
            '1.2.3.4',
454
            'A vulnerability test',
455
            vt_params="a",
456
            custom="b",
457
            detection="c",
458
            qod_v="d",
459
        )
460
461
        response = secET.fromstring(
462
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
463
        )
464
465
        detection = response.findall('vts/vt/detection')
466
        self.assertEqual(1, len(detection))
467
468
    def test_get_vts_vts_with_summary(self):
469
        daemon = DummyWrapper([])
470
        daemon.add_vt(
471
            '1.2.3.4',
472
            'A vulnerability test',
473
            vt_params="a",
474
            custom="b",
475
            summary="c",
476
        )
477
478
        response = secET.fromstring(
479
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
480
        )
481
482
        summary = response.findall('vts/vt/summary')
483
        self.assertEqual(1, len(summary))
484
485
    def test_get_vts_vts_with_impact(self):
486
        daemon = DummyWrapper([])
487
        daemon.add_vt(
488
            '1.2.3.4',
489
            'A vulnerability test',
490
            vt_params="a",
491
            custom="b",
492
            impact="c",
493
        )
494
495
        response = secET.fromstring(
496
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
497
        )
498
499
        impact = response.findall('vts/vt/impact')
500
        self.assertEqual(1, len(impact))
501
502
    def test_get_vts_vts_with_affected(self):
503
        daemon = DummyWrapper([])
504
        daemon.add_vt(
505
            '1.2.3.4',
506
            'A vulnerability test',
507
            vt_params="a",
508
            custom="b",
509
            affected="c",
510
        )
511
512
        response = secET.fromstring(
513
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
514
        )
515
516
        affect = response.findall('vts/vt/affected')
517
        self.assertEqual(1, len(affect))
518
519
    def test_get_vts_vts_with_insight(self):
520
        daemon = DummyWrapper([])
521
        daemon.add_vt(
522
            '1.2.3.4',
523
            'A vulnerability test',
524
            vt_params="a",
525
            custom="b",
526
            insight="c",
527
        )
528
529
        response = secET.fromstring(
530
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
531
        )
532
533
        insight = response.findall('vts/vt/insight')
534
        self.assertEqual(1, len(insight))
535
536
    def test_get_vts_vts_with_solution(self):
537
        daemon = DummyWrapper([])
538
        daemon.add_vt(
539
            '1.2.3.4',
540
            'A vulnerability test',
541
            vt_params="a",
542
            custom="b",
543
            solution="c",
544
            solution_t="d",
545
        )
546
547
        response = secET.fromstring(
548
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
549
        )
550
551
        solution = response.findall('vts/vt/solution')
552
        self.assertEqual(1, len(solution))
553
554
    def test_get_vts_vts_with_ctime(self):
555
        daemon = DummyWrapper([])
556
        daemon.add_vt(
557
            '1.2.3.4',
558
            'A vulnerability test',
559
            vt_params="a",
560
            vt_creation_time='01-01-1900',
561
        )
562
563
        response = secET.fromstring(
564
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
565
        )
566
567
        creation_time = response.findall('vts/vt/creation_time')
568
        self.assertEqual(
569
            '<creation_time>01-01-1900</creation_time>',
570
            ET.tostring(creation_time[0]).decode('utf-8'),
571
        )
572
573
    def test_get_vts_vts_with_mtime(self):
574
        daemon = DummyWrapper([])
575
        daemon.add_vt(
576
            '1.2.3.4',
577
            'A vulnerability test',
578
            vt_params="a",
579
            vt_modification_time='02-01-1900',
580
        )
581
582
        response = secET.fromstring(
583
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
584
        )
585
586
        modification_time = response.findall('vts/vt/modification_time')
587
        self.assertEqual(
588
            '<modification_time>02-01-1900</modification_time>',
589
            ET.tostring(modification_time[0]).decode('utf-8'),
590
        )
591
592
    def test_scan_with_error(self):
593
        daemon = DummyWrapper([Result('error', value='something went wrong')])
594
595
        response = secET.fromstring(
596
            daemon.handle_command(
597
                '<start_scan target="localhost" ports="80, '
598
                '443"><scanner_params /></start_scan>'
599
            )
600
        )
601
        scan_id = response.findtext('id')
602
603
        finished = False
604
        while not finished:
605
            response = secET.fromstring(
606
                daemon.handle_command(
607
                    '<get_scans scan_id="%s" details="1"/>' % scan_id
608
                )
609
            )
610
            scans = response.findall('scan')
611
            self.assertEqual(1, len(scans))
612
613
            scan = scans[0]
614
            status = scan.get('status')
615
616
            if status == "init" or status == "running":
617
                self.assertEqual('0', scan.get('end_time'))
618
                time.sleep(0.010)
619
            else:
620
                finished = True
621
622
        response = secET.fromstring(
623
            daemon.handle_command(
624
                '<get_scans scan_id="%s" details="1"/>' % scan_id
625
            )
626
        )
627
628
        self.assertEqual(
629
            response.findtext('scan/results/result'), 'something went wrong'
630
        )
631
632
        response = secET.fromstring(
633
            daemon.handle_command('<delete_scan scan_id="%s" />' % scan_id)
634
        )
635
636
        self.assertEqual(response.get('status'), '200')
637
638
    def test_get_scan_pop(self):
639
        daemon = DummyWrapper([Result('host-detail', value='Some Host Detail')])
640
641
        response = secET.fromstring(
642
            daemon.handle_command(
643
                '<start_scan target="localhost" ports="80, 443">'
644
                '<scanner_params /></start_scan>'
645
            )
646
        )
647
648
        scan_id = response.findtext('id')
649
        time.sleep(1)
650
651
        response = secET.fromstring(
652
            daemon.handle_command('<get_scans scan_id="%s"/>' % scan_id)
653
        )
654
        self.assertEqual(
655
            response.findtext('scan/results/result'), 'Some Host Detail'
656
        )
657
658
        response = secET.fromstring(
659
            daemon.handle_command(
660
                '<get_scans scan_id="%s" pop_results="1"/>' % scan_id
661
            )
662
        )
663
        self.assertEqual(
664
            response.findtext('scan/results/result'), 'Some Host Detail'
665
        )
666
667
        response = secET.fromstring(
668
            daemon.handle_command('<get_scans details="0" pop_results="1"/>')
669
        )
670
        self.assertEqual(response.findtext('scan/results/result'), None)
671
672
    def test_stop_scan(self):
673
        daemon = DummyWrapper([])
674
        response = secET.fromstring(
675
            daemon.handle_command(
676
                '<start_scan '
677
                'target="localhost" ports="80, 443">'
678
                '<scanner_params /></start_scan>'
679
            )
680
        )
681
        scan_id = response.findtext('id')
682
683
        # Depending on the sistem this test can end with a race condition
684
        # because the scanner is already stopped when the <stop_scan>
685
        # command is run.
686
        time.sleep(3)
687
688
        cmd = secET.fromstring('<stop_scan scan_id="%s" />' % scan_id)
689
        self.assertRaises(
690
            OspdCommandError, daemon.handle_stop_scan_command, cmd
691
        )
692
693
        cmd = secET.fromstring('<stop_scan />')
694
        self.assertRaises(
695
            OspdCommandError, daemon.handle_stop_scan_command, cmd
696
        )
697
698 View Code Duplication
    def test_scan_with_vts(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
699
        daemon = DummyWrapper([])
700
        cmd = secET.fromstring(
701
            '<start_scan '
702
            'target="localhost" ports="80, 443">'
703
            '<scanner_params /><vt_selection />'
704
            '</start_scan>'
705
        )
706
707
        with self.assertRaises(OspdCommandError):
708
            daemon.handle_start_scan_command(cmd)
709
710
        # With one vt, without params
711
        response = secET.fromstring(
712
            daemon.handle_command(
713
                '<start_scan '
714
                'target="localhost" ports="80, 443">'
715
                '<scanner_params /><vt_selection>'
716
                '<vt_single id="1.2.3.4" />'
717
                '</vt_selection></start_scan>'
718
            )
719
        )
720
        scan_id = response.findtext('id')
721
        time.sleep(0.01)
722
723
        self.assertEqual(
724
            daemon.get_scan_vts(scan_id), {'1.2.3.4': {}, 'vt_groups': []}
725
        )
726
        self.assertNotEqual(daemon.get_scan_vts(scan_id), {'1.2.3.6': {}})
727
728
        # With out vtS
729
        response = secET.fromstring(
730
            daemon.handle_command(
731
                '<start_scan '
732
                'target="localhost" ports="80, 443">'
733
                '<scanner_params /></start_scan>'
734
            )
735
        )
736
737
        scan_id = response.findtext('id')
738
        time.sleep(0.01)
739
        self.assertEqual(daemon.get_scan_vts(scan_id), {})
740
741 View Code Duplication
    def test_scan_with_vts_and_param(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
742
        daemon = DummyWrapper([])
743
744
        # Raise because no vt_param id attribute
745
        cmd = secET.fromstring(
746
            '<start_scan '
747
            'target="localhost" ports="80, 443">'
748
            '<scanner_params /><vt_selection><vt_si'
749
            'ngle id="1234"><vt_value>200</vt_value>'
750
            '</vt_single></vt_selection></start_scan>'
751
        )
752
753
        with self.assertRaises(OspdCommandError):
754
            daemon.handle_start_scan_command(cmd)
755
756
        # No error
757
        response = secET.fromstring(
758
            daemon.handle_command(
759
                '<start_scan '
760
                'target="localhost" ports="80, 443">'
761
                '<scanner_params /><vt_selection><vt'
762
                '_single id="1234"><vt_value id="ABC">200'
763
                '</vt_value></vt_single></vt_selection>'
764
                '</start_scan>'
765
            )
766
        )
767
        scan_id = response.findtext('id')
768
        time.sleep(0.01)
769
        self.assertEqual(
770
            daemon.get_scan_vts(scan_id),
771
            {'1234': {'ABC': '200'}, 'vt_groups': []},
772
        )
773
774
        # Raise because no vtgroup filter attribute
775
        cmd = secET.fromstring(
776
            '<start_scan '
777
            'target="localhost" ports="80, 443">'
778
            '<scanner_params /><vt_selection><vt_group/>'
779
            '</vt_selection></start_scan>'
780
        )
781
        self.assertRaises(
782
            OspdCommandError, daemon.handle_start_scan_command, cmd
783
        )
784
785
        # No error
786
        response = secET.fromstring(
787
            daemon.handle_command(
788
                '<start_scan '
789
                'target="localhost" ports="80, 443">'
790
                '<scanner_params /><vt_selection>'
791
                '<vt_group filter="a"/>'
792
                '</vt_selection></start_scan>'
793
            )
794
        )
795
        scan_id = response.findtext('id')
796
        time.sleep(0.01)
797
        self.assertEqual(daemon.get_scan_vts(scan_id), {'vt_groups': ['a']})
798
799
    def test_billon_laughs(self):
800
        # pylint: disable=line-too-long
801
        daemon = DummyWrapper([])
802
        lol = (
803
            '<?xml version="1.0"?>'
804
            '<!DOCTYPE lolz ['
805
            ' <!ENTITY lol "lol">'
806
            ' <!ELEMENT lolz (#PCDATA)>'
807
            ' <!ENTITY lol1 "&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;">'
808
            ' <!ENTITY lol2 "&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;">'
809
            ' <!ENTITY lol3 "&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;">'
810
            ' <!ENTITY lol4 "&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;">'
811
            ' <!ENTITY lol5 "&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;">'
812
            ' <!ENTITY lol6 "&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;">'
813
            ' <!ENTITY lol7 "&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;">'
814
            ' <!ENTITY lol8 "&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;">'
815
            ' <!ENTITY lol9 "&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;">'
816
            ']>'
817
        )
818
        self.assertRaises(EntitiesForbidden, daemon.handle_command, lol)
819
820
    def test_scan_multi_target(self):
821
        daemon = DummyWrapper([])
822
        response = secET.fromstring(
823
            daemon.handle_command(
824
                '<start_scan>'
825
                '<scanner_params /><vts><vt id="1.2.3.4" />'
826
                '</vts>'
827
                '<targets><target>'
828
                '<hosts>localhosts</hosts>'
829
                '<ports>80,443</ports>'
830
                '</target>'
831
                '<target><hosts>192.168.0.0/24</hosts>'
832
                '<ports>22</ports></target></targets>'
833
                '</start_scan>'
834
            )
835
        )
836
        self.assertEqual(response.get('status'), '200')
837
838
    def test_multi_target_with_credentials(self):
839
        daemon = DummyWrapper([])
840
        response = secET.fromstring(
841
            daemon.handle_command(
842
                '<start_scan>'
843
                '<scanner_params /><vts><vt id="1.2.3.4" />'
844
                '</vts>'
845
                '<targets><target><hosts>localhosts</hosts>'
846
                '<ports>80,443</ports></target><target>'
847
                '<hosts>192.168.0.0/24</hosts><ports>22'
848
                '</ports><credentials>'
849
                '<credential type="up" service="ssh" port="22">'
850
                '<username>scanuser</username>'
851
                '<password>mypass</password>'
852
                '</credential><credential type="up" service="smb">'
853
                '<username>smbuser</username>'
854
                '<password>mypass</password></credential>'
855
                '</credentials>'
856
                '</target></targets>'
857
                '</start_scan>'
858
            )
859
        )
860
861
        self.assertEqual(response.get('status'), '200')
862
863
        cred_dict = {
864
            'ssh': {
865
                'type': 'up',
866
                'password': 'mypass',
867
                'port': '22',
868
                'username': 'scanuser',
869
            },
870
            'smb': {'type': 'up', 'password': 'mypass', 'username': 'smbuser'},
871
        }
872
        scan_id = response.findtext('id')
873
        response = daemon.get_scan_credentials(scan_id, "192.168.0.0/24")
874
        self.assertEqual(response, cred_dict)
875
876
    def test_scan_get_target(self):
877
        daemon = DummyWrapper([])
878
        response = secET.fromstring(
879
            daemon.handle_command(
880
                '<start_scan>'
881
                '<scanner_params /><vts><vt id="1.2.3.4" />'
882
                '</vts>'
883
                '<targets><target>'
884
                '<hosts>localhosts</hosts>'
885
                '<ports>80,443</ports>'
886
                '</target>'
887
                '<target><hosts>192.168.0.0/24</hosts>'
888
                '<ports>22</ports></target></targets>'
889
                '</start_scan>'
890
            )
891
        )
892
        scan_id = response.findtext('id')
893
        response = secET.fromstring(
894
            daemon.handle_command('<get_scans scan_id="%s"/>' % scan_id)
895
        )
896
        scan_res = response.find('scan')
897
        self.assertEqual(scan_res.get('target'), 'localhosts,192.168.0.0/24')
898
899
    def test_scan_get_exclude_hosts(self):
900
        daemon = DummyWrapper([])
901
        response = secET.fromstring(
902
            daemon.handle_command(
903
                '<start_scan>'
904
                '<scanner_params /><vts><vt id="1.2.3.4" />'
905
                '</vts>'
906
                '<targets><target>'
907
                '<hosts>192.168.10.20-25</hosts>'
908
                '<ports>80,443</ports>'
909
                '<exclude_hosts>192.168.10.23-24'
910
                '</exclude_hosts>'
911
                '</target>'
912
                '<target><hosts>192.168.0.0/24</hosts>'
913
                '<ports>22</ports></target>'
914
                '</targets>'
915
                '</start_scan>'
916
            )
917
        )
918
        scan_id = response.findtext('id')
919
        time.sleep(1)
920
        finished = daemon.get_scan_finished_hosts(scan_id)
921
        self.assertEqual(finished, ['192.168.10.23', '192.168.10.24'])
922
923
    def test_scan_multi_target_parallel_with_error(self):
924
        daemon = DummyWrapper([])
925
        cmd = secET.fromstring(
926
            '<start_scan parallel="100a">'
927
            '<scanner_params />'
928
            '<targets><target>'
929
            '<hosts>localhosts</hosts>'
930
            '<ports>22</ports>'
931
            '</target></targets>'
932
            '</start_scan>'
933
        )
934
        time.sleep(1)
935
        self.assertRaises(
936
            OspdCommandError, daemon.handle_start_scan_command, cmd
937
        )
938
939
    def test_scan_multi_target_parallel_100(self):
940
        daemon = DummyWrapper([])
941
        response = secET.fromstring(
942
            daemon.handle_command(
943
                '<start_scan parallel="100">'
944
                '<scanner_params />'
945
                '<targets><target>'
946
                '<hosts>localhosts</hosts>'
947
                '<ports>22</ports>'
948
                '</target></targets>'
949
                '</start_scan>'
950
            )
951
        )
952
        time.sleep(1)
953
        self.assertEqual(response.get('status'), '200')
954
955
    def test_progress(self):
956
        daemon = DummyWrapper([])
957
958
        response = secET.fromstring(
959
            daemon.handle_command(
960
                '<start_scan parallel="2">'
961
                '<scanner_params />'
962
                '<targets><target>'
963
                '<hosts>localhost1</hosts>'
964
                '<ports>22</ports>'
965
                '</target><target>'
966
                '<hosts>localhost2</hosts>'
967
                '<ports>22</ports>'
968
                '</target></targets>'
969
                '</start_scan>'
970
            )
971
        )
972
973
        scan_id = response.findtext('id')
974
975
        daemon.set_scan_target_progress(scan_id, 'localhost1', 'localhost1', 75)
976
        daemon.set_scan_target_progress(scan_id, 'localhost2', 'localhost2', 25)
977
978
        self.assertEqual(daemon.calculate_progress(scan_id), 50)
979
980
    def test_set_get_vts_version(self):
981
        daemon = DummyWrapper([])
982
        daemon.set_vts_version('1234')
983
984
        version = daemon.get_vts_version()
985
        self.assertEqual('1234', version)
986
987
    def test_set_get_vts_version_error(self):
988
        daemon = DummyWrapper([])
989
        self.assertRaises(TypeError, daemon.set_vts_version)
990
991
    def test_resume_task(self):
992
        daemon = DummyWrapper(
993
            [
994
                Result(
995
                    'host-detail', host='localhost', value='Some Host Detail'
996
                ),
997
                Result(
998
                    'host-detail', host='localhost', value='Some Host Detail2'
999
                ),
1000
            ]
1001
        )
1002
1003
        response = secET.fromstring(
1004
            daemon.handle_command(
1005
                '<start_scan parallel="2">'
1006
                '<scanner_params />'
1007
                '<targets><target>'
1008
                '<hosts>localhost</hosts>'
1009
                '<ports>22</ports>'
1010
                '</target></targets>'
1011
                '</start_scan>'
1012
            )
1013
        )
1014
        scan_id = response.findtext('id')
1015
1016
        time.sleep(3)
1017
        cmd = secET.fromstring('<stop_scan scan_id="%s" />' % scan_id)
1018
1019
        with self.assertRaises(OspdCommandError):
1020
            daemon.handle_stop_scan_command(cmd)
1021
1022
        response = secET.fromstring(
1023
            daemon.handle_command(
1024
                '<get_scans scan_id="%s" details="1"/>' % scan_id
1025
            )
1026
        )
1027
1028
        result = response.findall('scan/results/result')
1029
        self.assertEqual(len(result), 2)
1030
1031
        # Resume the task
1032
        cmd = (
1033
            '<start_scan scan_id="%s" target="localhost" ports="80, 443">'
1034
            '<scanner_params /></start_scan>' % scan_id
1035
        )
1036
        response = secET.fromstring(daemon.handle_command(cmd))
1037
1038
        # Check unfinished host
1039
        self.assertEqual(response.findtext('id'), scan_id)
1040
        self.assertEqual(
1041
            daemon.get_scan_unfinished_hosts(scan_id), ['localhost']
1042
        )
1043
1044
        # Finished the host and check unfinished again.
1045
        daemon.set_scan_host_finished(scan_id, "localhost", "localhost")
1046
        self.assertEqual(daemon.get_scan_unfinished_hosts(scan_id), [])
1047
1048
        # Check finished hosts
1049
        self.assertEqual(
1050
            daemon.scan_collection.get_hosts_finished(scan_id), ['localhost']
1051
        )
1052
1053
        # Check if the result was removed.
1054
        response = secET.fromstring(
1055
            daemon.handle_command(
1056
                '<get_scans scan_id="%s" details="1"/>' % scan_id
1057
            )
1058
        )
1059
        result = response.findall('scan/results/result')
1060
        self.assertEqual(len(result), 0)
1061
1062
    def test_result_order (self):
1063
        daemon = DummyWrapper([])
1064
        response = secET.fromstring(
1065
            daemon.handle_command(
1066
                '<start_scan parallel="1">'
1067
                '<scanner_params />'
1068
                '<targets><target>'
1069
                '<hosts>a</hosts>'
1070
                '<ports>22</ports>'
1071
                '</target></targets>'
1072
                '</start_scan>'
1073
            )
1074
        )
1075
1076
        scan_id = response.findtext('id')
1077
1078
        daemon.add_scan_log(scan_id, host='a', name='a')
1079
        daemon.add_scan_log(scan_id, host='c', name='c')
1080
        daemon.add_scan_log(scan_id, host='b', name='b')
1081
        hosts = ['a','c','b']
1082
        response = secET.fromstring(
1083
            daemon.handle_command('<get_scans details="1"/>'
1084
            )
1085
        )
1086
        results = response.findall("scan/results/")
1087
1088
        for idx, res in enumerate(results):
1089
            att_dict = res.attrib
1090
            self.assertEqual(hosts[idx], att_dict['name'])
1091