Completed
Push — master ( 1d2395...fedb21 )
by Juan José
14s queued 10s
created

tests.test_scan_and_result.Result.__init__()   A

Complexity

Conditions 2

Size

Total Lines 12
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 12
nop 3
dl 0
loc 12
rs 9.8
c 0
b 0
f 0
1
# Copyright (C) 2015-2018 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: GPL-2.0-or-later
4
#
5
# This program is free software; you can redistribute it and/or
6
# modify it under the terms of the GNU General Public License
7
# as published by the Free Software Foundation; either version 2
8
# of the License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU General Public License for more details.
14
#
15
# You should have received a copy of the GNU General Public License
16
# along with this program; if not, write to the Free Software
17
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
19
# pylint: disable=too-many-lines
20
21
""" Test module for scan runs
22
"""
23
24
import time
25
import unittest
26
27
import xml.etree.ElementTree as ET
28
import defusedxml.lxml as secET
29
30
from defusedxml.common import EntitiesForbidden
31
32
from ospd.ospd import OSPDaemon
33
from ospd.errors import OspdCommandError
34
35
36
class Result(object):
37
    def __init__(self, type_, **kwargs):
38
        self.result_type = type_
39
        self.host = ''
40
        self.hostname = ''
41
        self.name = ''
42
        self.value = ''
43
        self.port = ''
44
        self.test_id = ''
45
        self.severity = ''
46
        self.qod = ''
47
        for name, value in kwargs.items():
48
            setattr(self, name, value)
49
50
51
class DummyWrapper(OSPDaemon):
52
    def __init__(self, results, checkresult=True):
53
        super().__init__('cert', 'key', 'ca')
54
        self.checkresult = checkresult
55
        self.results = results
56
57
    def check(self):
58
        return self.checkresult
59
60
    @staticmethod
61
    def get_custom_vt_as_xml_str(vt_id, custom):
62
        return '<custom><mytest>static test</mytest></custom>'
63
64
    @staticmethod
65
    def get_params_vt_as_xml_str(vt_id, vt_params):
66
        return (
67
            '<params><param id="abc" type="string">'
68
            '<name>ABC</name><description>Test ABC</description>'
69
            '<default>yes</default></param>'
70
            '<param id="def" type="string">'
71
            '<name>DEF</name><description>Test DEF</description>'
72
            '<default>no</default></param></params>'
73
        )
74
75
    @staticmethod
76
    def get_refs_vt_as_xml_str(vt_id, vt_refs):
77
        response = (
78
            '<refs><ref type="cve" id="CVE-2010-4480"/>'
79
            '<ref type="url" id="http://example.com"/></refs>'
80
        )
81
        return response
82
83
    @staticmethod
84
    def get_dependencies_vt_as_xml_str(vt_id, vt_dependencies):
85
        response = (
86
            '<dependencies>'
87
            '<dependency vt_id="1.3.6.1.4.1.25623.1.0.50282" />'
88
            '<dependency vt_id="1.3.6.1.4.1.25623.1.0.50283" />'
89
            '</dependencies>'
90
        )
91
92
        return response
93
94
    @staticmethod
95
    def get_severities_vt_as_xml_str(vt_id, severities):
96
        response = (
97
            '<severities><severity cvss_base="5.0" cvss_'
98
            'type="cvss_base_v2">AV:N/AC:L/Au:N/C:N/I:N/'
99
            'A:P</severity></severities>'
100
        )
101
102
        return response
103
104
    @staticmethod
105
    def get_detection_vt_as_xml_str(
106
        vt_id, detection=None, qod_type=None, qod=None
107
    ):
108
        response = '<detection qod_type="package">some detection</detection>'
109
110
        return response
111
112
    @staticmethod
113
    def get_summary_vt_as_xml_str(vt_id, summary):
114
        response = '<summary>Some summary</summary>'
115
116
        return response
117
118
    @staticmethod
119
    def get_affected_vt_as_xml_str(vt_id, affected):
120
        response = '<affected>Some affected</affected>'
121
122
        return response
123
124
    @staticmethod
125
    def get_impact_vt_as_xml_str(vt_id, impact):
126
        response = '<impact>Some impact</impact>'
127
128
        return response
129
130
    @staticmethod
131
    def get_insight_vt_as_xml_str(vt_id, insight):
132
        response = '<insight>Some insight</insight>'
133
134
        return response
135
136
    @staticmethod
137
    def get_solution_vt_as_xml_str(vt_id, solution, solution_type=None):
138
        response = '<solution>Some solution</solution>'
139
140
        return response
141
142
    @staticmethod
143
    def get_creation_time_vt_as_xml_str(
144
        vt_id, creation_time
145
    ):  # pylint: disable=arguments-differ
146
        response = '<creation_time>%s</creation_time>' % creation_time
147
148
        return response
149
150
    @staticmethod
151
    def get_modification_time_vt_as_xml_str(
152
        vt_id, modification_time
153
    ):  # pylint: disable=arguments-differ
154
        response = (
155
            '<modification_time>%s</modification_time>' % modification_time
156
        )
157
158
        return response
159
160
    def exec_scan(self, scan_id, target):
161
        time.sleep(0.01)
162
        for res in self.results:
163
            if res.result_type == 'log':
164
                self.add_scan_log(
165
                    scan_id,
166
                    res.host or target,
167
                    res.hostname,
168
                    res.name,
169
                    res.value,
170
                    res.port,
171
                )
172
            if res.result_type == 'error':
173
                self.add_scan_error(
174
                    scan_id,
175
                    res.host or target,
176
                    res.hostname,
177
                    res.name,
178
                    res.value,
179
                    res.port,
180
                )
181
            elif res.result_type == 'host-detail':
182
                self.add_scan_host_detail(
183
                    scan_id,
184
                    res.host or target,
185
                    res.hostname,
186
                    res.name,
187
                    res.value,
188
                )
189
            elif res.result_type == 'alarm':
190
                self.add_scan_alarm(
191
                    scan_id,
192
                    res.host or target,
193
                    res.hostname,
194
                    res.name,
195
                    res.value,
196
                    res.port,
197
                    res.test_id,
198
                    res.severity,
199
                    res.qod,
200
                )
201
            else:
202
                raise ValueError(res.result_type)
203
204
205
class ScanTestCase(unittest.TestCase):
206
    def test_get_default_scanner_params(self):
207
        daemon = DummyWrapper([])
208
        response = secET.fromstring(
209
            daemon.handle_command('<get_scanner_details />')
210
        )
211
212
        # The status of the response must be success (i.e. 200)
213
        self.assertEqual(response.get('status'), '200')
214
        # The response root element must have the correct name
215
        self.assertEqual(response.tag, 'get_scanner_details_response')
216
        # The response must contain a 'scanner_params' element
217
        self.assertIsNotNone(response.find('scanner_params'))
218
219
    def test_get_default_help(self):
220
        daemon = DummyWrapper([])
221
        response = secET.fromstring(daemon.handle_command('<help />'))
222
223
        self.assertEqual(response.get('status'), '200')
224
225
        response = secET.fromstring(
226
            daemon.handle_command('<help format="xml" />')
227
        )
228
229
        self.assertEqual(response.get('status'), '200')
230
        self.assertEqual(response.tag, 'help_response')
231
232
    def test_get_default_scanner_version(self):
233
        daemon = DummyWrapper([])
234
        response = secET.fromstring(daemon.handle_command('<get_version />'))
235
236
        self.assertEqual(response.get('status'), '200')
237
        self.assertIsNotNone(response.find('protocol'))
238
239
    def test_get_vts_no_vt(self):
240
        daemon = DummyWrapper([])
241
        response = secET.fromstring(daemon.handle_command('<get_vts />'))
242
243
        self.assertEqual(response.get('status'), '200')
244
        self.assertIsNotNone(response.find('vts'))
245
246
    def test_get_vts_single_vt(self):
247
        daemon = DummyWrapper([])
248
        daemon.add_vt('1.2.3.4', 'A vulnerability test')
249
        response = secET.fromstring(daemon.handle_command('<get_vts />'))
250
251
        self.assertEqual(response.get('status'), '200')
252
253
        vts = response.find('vts')
254
        self.assertIsNotNone(vts.find('vt'))
255
256
        vt = vts.find('vt')
257
        self.assertEqual(vt.get('id'), '1.2.3.4')
258
259 View Code Duplication
    def test_get_vts_filter_positive(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
260
        daemon = DummyWrapper([])
261
        daemon.add_vt(
262
            '1.2.3.4',
263
            'A vulnerability test',
264
            vt_params="a",
265
            vt_modification_time='19000202',
266
        )
267
268
        response = secET.fromstring(
269
            daemon.handle_command(
270
                '<get_vts filter="modification_time&gt;19000201"></get_vts>'
271
            )
272
        )
273
274
        self.assertEqual(response.get('status'), '200')
275
        vts = response.find('vts')
276
277
        vt = vts.find('vt')
278
        self.assertIsNotNone(vt)
279
        self.assertEqual(vt.get('id'), '1.2.3.4')
280
281
        modification_time = response.findall('vts/vt/modification_time')
282
        self.assertEqual(
283
            '<modification_time>19000202</modification_time>',
284
            ET.tostring(modification_time[0]).decode('utf-8'),
285
        )
286
287 View Code Duplication
    def test_get_vts_filter_negative(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
288
        daemon = DummyWrapper([])
289
        daemon.add_vt(
290
            '1.2.3.4',
291
            'A vulnerability test',
292
            vt_params="a",
293
            vt_modification_time='19000202',
294
        )
295
296
        response = secET.fromstring(
297
            daemon.handle_command(
298
                '<get_vts filter="modification_time&lt;19000203"></get_vts>'
299
            )
300
        )
301
        self.assertEqual(response.get('status'), '200')
302
303
        vts = response.find('vts')
304
305
        vt = vts.find('vt')
306
        self.assertIsNotNone(vt)
307
        self.assertEqual(vt.get('id'), '1.2.3.4')
308
309
        modification_time = response.findall('vts/vt/modification_time')
310
        self.assertEqual(
311
            '<modification_time>19000202</modification_time>',
312
            ET.tostring(modification_time[0]).decode('utf-8'),
313
        )
314
315
    def test_get_vtss_multiple_vts(self):
316
        daemon = DummyWrapper([])
317
        daemon.add_vt('1.2.3.4', 'A vulnerability test')
318
        daemon.add_vt('1.2.3.5', 'Another vulnerability test')
319
        daemon.add_vt('123456789', 'Yet another vulnerability test')
320
321
        response = secET.fromstring(daemon.handle_command('<get_vts />'))
322
323
        self.assertEqual(response.get('status'), '200')
324
325
        vts = response.find('vts')
326
        self.assertIsNotNone(vts.find('vt'))
327
328
    def test_get_vts_multiple_vts_with_custom(self):
329
        daemon = DummyWrapper([])
330
        daemon.add_vt('1.2.3.4', 'A vulnerability test', custom='b')
331
        daemon.add_vt(
332
            '4.3.2.1', 'Another vulnerability test with custom info', custom='b'
333
        )
334
        daemon.add_vt('123456789', 'Yet another vulnerability test', custom='b')
335
336
        response = secET.fromstring(daemon.handle_command('<get_vts />'))
337
        custom = response.findall('vts/vt/custom')
338
339
        self.assertEqual(3, len(custom))
340
341 View Code Duplication
    def test_get_vts_vts_with_params(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
342
        daemon = DummyWrapper([])
343
        daemon.add_vt(
344
            '1.2.3.4', 'A vulnerability test', vt_params="a", custom="b"
345
        )
346
347
        response = secET.fromstring(
348
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
349
        )
350
        # The status of the response must be success (i.e. 200)
351
        self.assertEqual(response.get('status'), '200')
352
353
        # The response root element must have the correct name
354
        self.assertEqual(response.tag, 'get_vts_response')
355
        # The response must contain a 'scanner_params' element
356
        self.assertIsNotNone(response.find('vts'))
357
358
        vt_params = response[0][0].findall('params')
359
        self.assertEqual(1, len(vt_params))
360
361
        custom = response[0][0].findall('custom')
362
        self.assertEqual(1, len(custom))
363
364
        params = response.findall('vts/vt/params/param')
365
        self.assertEqual(2, len(params))
366
367 View Code Duplication
    def test_get_vts_vts_with_refs(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
368
        daemon = DummyWrapper([])
369
        daemon.add_vt(
370
            '1.2.3.4',
371
            'A vulnerability test',
372
            vt_params="a",
373
            custom="b",
374
            vt_refs="c",
375
        )
376
377
        response = secET.fromstring(
378
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
379
        )
380
        # The status of the response must be success (i.e. 200)
381
        self.assertEqual(response.get('status'), '200')
382
383
        # The response root element must have the correct name
384
        self.assertEqual(response.tag, 'get_vts_response')
385
386
        # The response must contain a 'vts' element
387
        self.assertIsNotNone(response.find('vts'))
388
389
        vt_params = response[0][0].findall('params')
390
        self.assertEqual(1, len(vt_params))
391
392
        custom = response[0][0].findall('custom')
393
        self.assertEqual(1, len(custom))
394
395
        refs = response.findall('vts/vt/refs/ref')
396
        self.assertEqual(2, len(refs))
397
398
    def test_get_vts_vts_with_dependencies(self):
399
        daemon = DummyWrapper([])
400
        daemon.add_vt(
401
            '1.2.3.4',
402
            'A vulnerability test',
403
            vt_params="a",
404
            custom="b",
405
            vt_dependencies="c",
406
        )
407
408
        response = secET.fromstring(
409
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
410
        )
411
412
        deps = response.findall('vts/vt/dependencies/dependency')
413
        self.assertEqual(2, len(deps))
414
415
    def test_get_vts_vts_with_severities(self):
416
        daemon = DummyWrapper([])
417
        daemon.add_vt(
418
            '1.2.3.4',
419
            'A vulnerability test',
420
            vt_params="a",
421
            custom="b",
422
            severities="c",
423
        )
424
425
        response = secET.fromstring(
426
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
427
        )
428
429
        severity = response.findall('vts/vt/severities/severity')
430
        self.assertEqual(1, len(severity))
431
432
    def test_get_vts_vts_with_detection_qodt(self):
433
        daemon = DummyWrapper([])
434
        daemon.add_vt(
435
            '1.2.3.4',
436
            'A vulnerability test',
437
            vt_params="a",
438
            custom="b",
439
            detection="c",
440
            qod_t="d",
441
        )
442
443
        response = secET.fromstring(
444
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
445
        )
446
447
        detection = response.findall('vts/vt/detection')
448
        self.assertEqual(1, len(detection))
449
450
    def test_get_vts_vts_with_detection_qodv(self):
451
        daemon = DummyWrapper([])
452
        daemon.add_vt(
453
            '1.2.3.4',
454
            'A vulnerability test',
455
            vt_params="a",
456
            custom="b",
457
            detection="c",
458
            qod_v="d",
459
        )
460
461
        response = secET.fromstring(
462
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
463
        )
464
465
        detection = response.findall('vts/vt/detection')
466
        self.assertEqual(1, len(detection))
467
468
    def test_get_vts_vts_with_summary(self):
469
        daemon = DummyWrapper([])
470
        daemon.add_vt(
471
            '1.2.3.4',
472
            'A vulnerability test',
473
            vt_params="a",
474
            custom="b",
475
            summary="c",
476
        )
477
478
        response = secET.fromstring(
479
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
480
        )
481
482
        summary = response.findall('vts/vt/summary')
483
        self.assertEqual(1, len(summary))
484
485
    def test_get_vts_vts_with_impact(self):
486
        daemon = DummyWrapper([])
487
        daemon.add_vt(
488
            '1.2.3.4',
489
            'A vulnerability test',
490
            vt_params="a",
491
            custom="b",
492
            impact="c",
493
        )
494
495
        response = secET.fromstring(
496
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
497
        )
498
499
        impact = response.findall('vts/vt/impact')
500
        self.assertEqual(1, len(impact))
501
502
    def test_get_vts_vts_with_affected(self):
503
        daemon = DummyWrapper([])
504
        daemon.add_vt(
505
            '1.2.3.4',
506
            'A vulnerability test',
507
            vt_params="a",
508
            custom="b",
509
            affected="c",
510
        )
511
512
        response = secET.fromstring(
513
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
514
        )
515
516
        affect = response.findall('vts/vt/affected')
517
        self.assertEqual(1, len(affect))
518
519
    def test_get_vts_vts_with_insight(self):
520
        daemon = DummyWrapper([])
521
        daemon.add_vt(
522
            '1.2.3.4',
523
            'A vulnerability test',
524
            vt_params="a",
525
            custom="b",
526
            insight="c",
527
        )
528
529
        response = secET.fromstring(
530
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
531
        )
532
533
        insight = response.findall('vts/vt/insight')
534
        self.assertEqual(1, len(insight))
535
536
    def test_get_vts_vts_with_solution(self):
537
        daemon = DummyWrapper([])
538
        daemon.add_vt(
539
            '1.2.3.4',
540
            'A vulnerability test',
541
            vt_params="a",
542
            custom="b",
543
            solution="c",
544
            solution_t="d",
545
        )
546
547
        response = secET.fromstring(
548
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
549
        )
550
551
        solution = response.findall('vts/vt/solution')
552
        self.assertEqual(1, len(solution))
553
554
    def test_get_vts_vts_with_ctime(self):
555
        daemon = DummyWrapper([])
556
        daemon.add_vt(
557
            '1.2.3.4',
558
            'A vulnerability test',
559
            vt_params="a",
560
            vt_creation_time='01-01-1900',
561
        )
562
563
        response = secET.fromstring(
564
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
565
        )
566
567
        creation_time = response.findall('vts/vt/creation_time')
568
        self.assertEqual(
569
            '<creation_time>01-01-1900</creation_time>',
570
            ET.tostring(creation_time[0]).decode('utf-8'),
571
        )
572
573
    def test_get_vts_vts_with_mtime(self):
574
        daemon = DummyWrapper([])
575
        daemon.add_vt(
576
            '1.2.3.4',
577
            'A vulnerability test',
578
            vt_params="a",
579
            vt_modification_time='02-01-1900',
580
        )
581
582
        response = secET.fromstring(
583
            daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>')
584
        )
585
586
        modification_time = response.findall('vts/vt/modification_time')
587
        self.assertEqual(
588
            '<modification_time>02-01-1900</modification_time>',
589
            ET.tostring(modification_time[0]).decode('utf-8'),
590
        )
591
592
    def test_scan_with_error(self):
593
        daemon = DummyWrapper([Result('error', value='something went wrong')])
594
595
        response = secET.fromstring(
596
            daemon.handle_command(
597
                '<start_scan target="localhost" ports="80, '
598
                '443"><scanner_params /></start_scan>'
599
            )
600
        )
601
        scan_id = response.findtext('id')
602
603
        finished = False
604
        while not finished:
605
            response = secET.fromstring(
606
                daemon.handle_command(
607
                    '<get_scans scan_id="%s" details="1"/>' % scan_id
608
                )
609
            )
610
            scans = response.findall('scan')
611
            self.assertEqual(1, len(scans))
612
613
            scan = scans[0]
614
            status = scan.get('status')
615
616
            if status == "init" or status == "running":
617
                self.assertEqual('0', scan.get('end_time'))
618
                time.sleep(0.010)
619
            else:
620
                finished = True
621
622
        response = secET.fromstring(
623
            daemon.handle_command(
624
                '<get_scans scan_id="%s" details="1"/>' % scan_id
625
            )
626
        )
627
628
        self.assertEqual(
629
            response.findtext('scan/results/result'), 'something went wrong'
630
        )
631
632
        response = secET.fromstring(
633
            daemon.handle_command('<delete_scan scan_id="%s" />' % scan_id)
634
        )
635
636
        self.assertEqual(response.get('status'), '200')
637
638
    def test_get_scan_pop(self):
639
        daemon = DummyWrapper([Result('host-detail', value='Some Host Detail')])
640
641
        response = secET.fromstring(
642
            daemon.handle_command(
643
                '<start_scan target="localhost" ports="80, 443">'
644
                '<scanner_params /></start_scan>'
645
            )
646
        )
647
648
        scan_id = response.findtext('id')
649
        time.sleep(1)
650
651
        response = secET.fromstring(
652
            daemon.handle_command('<get_scans scan_id="%s"/>' % scan_id)
653
        )
654
        self.assertEqual(
655
            response.findtext('scan/results/result'), 'Some Host Detail'
656
        )
657
658
        response = secET.fromstring(
659
            daemon.handle_command(
660
                '<get_scans scan_id="%s" pop_results="1"/>' % scan_id
661
            )
662
        )
663
        self.assertEqual(
664
            response.findtext('scan/results/result'), 'Some Host Detail'
665
        )
666
667
        response = secET.fromstring(
668
            daemon.handle_command('<get_scans details="0" pop_results="1"/>')
669
        )
670
        self.assertEqual(response.findtext('scan/results/result'), None)
671
672
        response = secET.fromstring(
673
            daemon.handle_command(
674
                '<get_scans scan_id="%s" pop_results="1"/>' % scan_id
675
            )
676
        )
677
678
        while True:
679
            response = secET.fromstring(
680
                daemon.handle_command(
681
                    '<get_scans scan_id="%s" details="1"/>' % scan_id
682
                )
683
            )
684
            scans = response.findall('scan')
685
686
            self.assertEqual(1, len(scans))
687
688
            scan = scans[0]
689
            if scan.get('status') == "stopped":
690
                time.sleep(1)  # avoid race condition by waiting
691
                break
692
693
        response = secET.fromstring(
694
            daemon.handle_command(
695
                '<get_scans scan_id="%s" details="1"/>' % scan_id
696
            )
697
        )
698
699
        self.assertIn(
700
            response.findtext('scan/results/result'),
701
            ['Scan process failure.', 'Scan stopped.'],
702
        )
703
704
        response = secET.fromstring(
705
            daemon.handle_command('<delete_scan scan_id="%s" />' % scan_id)
706
        )
707
        self.assertEqual(response.get('status'), '200')
708
709
    def test_stop_scan(self):
710
        daemon = DummyWrapper([])
711
        response = secET.fromstring(
712
            daemon.handle_command(
713
                '<start_scan '
714
                'target="localhost" ports="80, 443">'
715
                '<scanner_params /></start_scan>'
716
            )
717
        )
718
        scan_id = response.findtext('id')
719
720
        # Depending on the sistem this test can end with a race condition
721
        # because the scanner is already stopped when the <stop_scan>
722
        # command is run.
723
        time.sleep(3)
724
725
        cmd = secET.fromstring('<stop_scan scan_id="%s" />' % scan_id)
726
        self.assertRaises(
727
            OspdCommandError, daemon.handle_stop_scan_command, cmd
728
        )
729
730
        cmd = secET.fromstring('<stop_scan />')
731
        self.assertRaises(
732
            OspdCommandError, daemon.handle_stop_scan_command, cmd
733
        )
734
735 View Code Duplication
    def test_scan_with_vts(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
736
        daemon = DummyWrapper([])
737
        cmd = secET.fromstring(
738
            '<start_scan '
739
            'target="localhost" ports="80, 443">'
740
            '<scanner_params /><vt_selection />'
741
            '</start_scan>'
742
        )
743
744
        with self.assertRaises(OspdCommandError):
745
            daemon.handle_start_scan_command(cmd)
746
747
        # With one vt, without params
748
        response = secET.fromstring(
749
            daemon.handle_command(
750
                '<start_scan '
751
                'target="localhost" ports="80, 443">'
752
                '<scanner_params /><vt_selection>'
753
                '<vt_single id="1.2.3.4" />'
754
                '</vt_selection></start_scan>'
755
            )
756
        )
757
        scan_id = response.findtext('id')
758
        time.sleep(0.01)
759
760
        self.assertEqual(
761
            daemon.get_scan_vts(scan_id), {'1.2.3.4': {}, 'vt_groups': []}
762
        )
763
        self.assertNotEqual(daemon.get_scan_vts(scan_id), {'1.2.3.6': {}})
764
765
        # With out vtS
766
        response = secET.fromstring(
767
            daemon.handle_command(
768
                '<start_scan '
769
                'target="localhost" ports="80, 443">'
770
                '<scanner_params /></start_scan>'
771
            )
772
        )
773
774
        scan_id = response.findtext('id')
775
        time.sleep(0.01)
776
        self.assertEqual(daemon.get_scan_vts(scan_id), {})
777
778 View Code Duplication
    def test_scan_with_vts_and_param(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
779
        daemon = DummyWrapper([])
780
781
        # Raise because no vt_param id attribute
782
        cmd = secET.fromstring(
783
            '<start_scan '
784
            'target="localhost" ports="80, 443">'
785
            '<scanner_params /><vt_selection><vt_si'
786
            'ngle id="1234"><vt_value>200</vt_value>'
787
            '</vt_single></vt_selection></start_scan>'
788
        )
789
790
        with self.assertRaises(OspdCommandError):
791
            daemon.handle_start_scan_command(cmd)
792
793
        # No error
794
        response = secET.fromstring(
795
            daemon.handle_command(
796
                '<start_scan '
797
                'target="localhost" ports="80, 443">'
798
                '<scanner_params /><vt_selection><vt'
799
                '_single id="1234"><vt_value id="ABC">200'
800
                '</vt_value></vt_single></vt_selection>'
801
                '</start_scan>'
802
            )
803
        )
804
        scan_id = response.findtext('id')
805
        time.sleep(0.01)
806
        self.assertEqual(
807
            daemon.get_scan_vts(scan_id),
808
            {'1234': {'ABC': '200'}, 'vt_groups': []},
809
        )
810
811
        # Raise because no vtgroup filter attribute
812
        cmd = secET.fromstring(
813
            '<start_scan '
814
            'target="localhost" ports="80, 443">'
815
            '<scanner_params /><vt_selection><vt_group/>'
816
            '</vt_selection></start_scan>'
817
        )
818
        self.assertRaises(
819
            OspdCommandError, daemon.handle_start_scan_command, cmd
820
        )
821
822
        # No error
823
        response = secET.fromstring(
824
            daemon.handle_command(
825
                '<start_scan '
826
                'target="localhost" ports="80, 443">'
827
                '<scanner_params /><vt_selection>'
828
                '<vt_group filter="a"/>'
829
                '</vt_selection></start_scan>'
830
            )
831
        )
832
        scan_id = response.findtext('id')
833
        time.sleep(0.01)
834
        self.assertEqual(daemon.get_scan_vts(scan_id), {'vt_groups': ['a']})
835
836
    def test_billon_laughs(self):
837
        # pylint: disable=line-too-long
838
        daemon = DummyWrapper([])
839
        lol = (
840
            '<?xml version="1.0"?>'
841
            '<!DOCTYPE lolz ['
842
            ' <!ENTITY lol "lol">'
843
            ' <!ELEMENT lolz (#PCDATA)>'
844
            ' <!ENTITY lol1 "&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;">'
845
            ' <!ENTITY lol2 "&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;">'
846
            ' <!ENTITY lol3 "&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;">'
847
            ' <!ENTITY lol4 "&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;">'
848
            ' <!ENTITY lol5 "&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;">'
849
            ' <!ENTITY lol6 "&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;">'
850
            ' <!ENTITY lol7 "&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;">'
851
            ' <!ENTITY lol8 "&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;">'
852
            ' <!ENTITY lol9 "&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;">'
853
            ']>'
854
        )
855
        self.assertRaises(EntitiesForbidden, daemon.handle_command, lol)
856
857
    def test_scan_multi_target(self):
858
        daemon = DummyWrapper([])
859
        response = secET.fromstring(
860
            daemon.handle_command(
861
                '<start_scan>'
862
                '<scanner_params /><vts><vt id="1.2.3.4" />'
863
                '</vts>'
864
                '<targets><target>'
865
                '<hosts>localhosts</hosts>'
866
                '<ports>80,443</ports>'
867
                '</target>'
868
                '<target><hosts>192.168.0.0/24</hosts>'
869
                '<ports>22</ports></target></targets>'
870
                '</start_scan>'
871
            )
872
        )
873
        self.assertEqual(response.get('status'), '200')
874
875
    def test_multi_target_with_credentials(self):
876
        daemon = DummyWrapper([])
877
        response = secET.fromstring(
878
            daemon.handle_command(
879
                '<start_scan>'
880
                '<scanner_params /><vts><vt id="1.2.3.4" />'
881
                '</vts>'
882
                '<targets><target><hosts>localhosts</hosts>'
883
                '<ports>80,443</ports></target><target>'
884
                '<hosts>192.168.0.0/24</hosts><ports>22'
885
                '</ports><credentials>'
886
                '<credential type="up" service="ssh" port="22">'
887
                '<username>scanuser</username>'
888
                '<password>mypass</password>'
889
                '</credential><credential type="up" service="smb">'
890
                '<username>smbuser</username>'
891
                '<password>mypass</password></credential>'
892
                '</credentials>'
893
                '</target></targets>'
894
                '</start_scan>'
895
            )
896
        )
897
898
        self.assertEqual(response.get('status'), '200')
899
900
        cred_dict = {
901
            'ssh': {
902
                'type': 'up',
903
                'password': 'mypass',
904
                'port': '22',
905
                'username': 'scanuser',
906
            },
907
            'smb': {'type': 'up', 'password': 'mypass', 'username': 'smbuser'},
908
        }
909
        scan_id = response.findtext('id')
910
        response = daemon.get_scan_credentials(scan_id, "192.168.0.0/24")
911
        self.assertEqual(response, cred_dict)
912
913
    def test_scan_get_target(self):
914
        daemon = DummyWrapper([])
915
        response = secET.fromstring(
916
            daemon.handle_command(
917
                '<start_scan>'
918
                '<scanner_params /><vts><vt id="1.2.3.4" />'
919
                '</vts>'
920
                '<targets><target>'
921
                '<hosts>localhosts</hosts>'
922
                '<ports>80,443</ports>'
923
                '</target>'
924
                '<target><hosts>192.168.0.0/24</hosts>'
925
                '<ports>22</ports></target></targets>'
926
                '</start_scan>'
927
            )
928
        )
929
        scan_id = response.findtext('id')
930
        response = secET.fromstring(
931
            daemon.handle_command('<get_scans scan_id="%s"/>' % scan_id)
932
        )
933
        scan_res = response.find('scan')
934
        self.assertEqual(scan_res.get('target'), 'localhosts,192.168.0.0/24')
935
936
    def test_scan_get_exclude_hosts(self):
937
        daemon = DummyWrapper([])
938
        response = secET.fromstring(
939
            daemon.handle_command(
940
                '<start_scan>'
941
                '<scanner_params /><vts><vt id="1.2.3.4" />'
942
                '</vts>'
943
                '<targets><target>'
944
                '<hosts>192.168.10.20-25</hosts>'
945
                '<ports>80,443</ports>'
946
                '<exclude_hosts>192.168.10.23-24'
947
                '</exclude_hosts>'
948
                '</target>'
949
                '<target><hosts>192.168.0.0/24</hosts>'
950
                '<ports>22</ports></target>'
951
                '</targets>'
952
                '</start_scan>'
953
            )
954
        )
955
        scan_id = response.findtext('id')
956
        time.sleep(1)
957
        finished = daemon.get_scan_finished_hosts(scan_id)
958
        self.assertEqual(finished, ['192.168.10.23', '192.168.10.24'])
959
960
    def test_scan_multi_target_parallel_with_error(self):
961
        daemon = DummyWrapper([])
962
        cmd = secET.fromstring(
963
            '<start_scan parallel="100a">'
964
            '<scanner_params />'
965
            '<targets><target>'
966
            '<hosts>localhosts</hosts>'
967
            '<ports>22</ports>'
968
            '</target></targets>'
969
            '</start_scan>'
970
        )
971
        time.sleep(1)
972
        self.assertRaises(
973
            OspdCommandError, daemon.handle_start_scan_command, cmd
974
        )
975
976
    def test_scan_multi_target_parallel_100(self):
977
        daemon = DummyWrapper([])
978
        response = secET.fromstring(
979
            daemon.handle_command(
980
                '<start_scan parallel="100">'
981
                '<scanner_params />'
982
                '<targets><target>'
983
                '<hosts>localhosts</hosts>'
984
                '<ports>22</ports>'
985
                '</target></targets>'
986
                '</start_scan>'
987
            )
988
        )
989
        time.sleep(1)
990
        self.assertEqual(response.get('status'), '200')
991
992
    def test_progress(self):
993
        daemon = DummyWrapper([])
994
995
        response = secET.fromstring(
996
            daemon.handle_command(
997
                '<start_scan parallel="2">'
998
                '<scanner_params />'
999
                '<targets><target>'
1000
                '<hosts>localhost1</hosts>'
1001
                '<ports>22</ports>'
1002
                '</target><target>'
1003
                '<hosts>localhost2</hosts>'
1004
                '<ports>22</ports>'
1005
                '</target></targets>'
1006
                '</start_scan>'
1007
            )
1008
        )
1009
1010
        scan_id = response.findtext('id')
1011
1012
        daemon.set_scan_target_progress(scan_id, 'localhost1', 'localhost1', 75)
1013
        daemon.set_scan_target_progress(scan_id, 'localhost2', 'localhost2', 25)
1014
1015
        self.assertEqual(daemon.calculate_progress(scan_id), 50)
1016
1017
    def test_set_get_vts_version(self):
1018
        daemon = DummyWrapper([])
1019
        daemon.set_vts_version('1234')
1020
1021
        version = daemon.get_vts_version()
1022
        self.assertEqual('1234', version)
1023
1024
    def test_set_get_vts_version_error(self):
1025
        daemon = DummyWrapper([])
1026
        self.assertRaises(TypeError, daemon.set_vts_version)
1027
1028
    def test_resume_task(self):
1029
        daemon = DummyWrapper(
1030
            [
1031
                Result(
1032
                    'host-detail', host='localhost', value='Some Host Detail'
1033
                ),
1034
                Result(
1035
                    'host-detail', host='localhost', value='Some Host Detail2'
1036
                ),
1037
            ]
1038
        )
1039
1040
        response = secET.fromstring(
1041
            daemon.handle_command(
1042
                '<start_scan parallel="2">'
1043
                '<scanner_params />'
1044
                '<targets><target>'
1045
                '<hosts>localhost</hosts>'
1046
                '<ports>22</ports>'
1047
                '</target></targets>'
1048
                '</start_scan>'
1049
            )
1050
        )
1051
        scan_id = response.findtext('id')
1052
1053
        time.sleep(3)
1054
        cmd = secET.fromstring('<stop_scan scan_id="%s" />' % scan_id)
1055
1056
        with self.assertRaises(OspdCommandError):
1057
            daemon.handle_stop_scan_command(cmd)
1058
1059
        response = secET.fromstring(
1060
            daemon.handle_command(
1061
                '<get_scans scan_id="%s" details="1"/>' % scan_id
1062
            )
1063
        )
1064
1065
        result = response.findall('scan/results/result')
1066
        self.assertEqual(len(result), 4)
1067
1068
        # Resume the task
1069
        cmd = (
1070
            '<start_scan scan_id="%s" target="localhost" ports="80, 443">'
1071
            '<scanner_params /></start_scan>' % scan_id
1072
        )
1073
        response = secET.fromstring(daemon.handle_command(cmd))
1074
1075
        # Check unfinished host
1076
        self.assertEqual(response.findtext('id'), scan_id)
1077
        self.assertEqual(
1078
            daemon.get_scan_unfinished_hosts(scan_id), ['localhost']
1079
        )
1080
1081
        # Finished the host and check unfinished again.
1082
        daemon.set_scan_host_finished(scan_id, "localhost", "localhost")
1083
        self.assertEqual(daemon.get_scan_unfinished_hosts(scan_id), [])
1084
1085
        # Check finished hosts
1086
        self.assertEqual(
1087
            daemon.scan_collection.get_hosts_finished(scan_id), ['localhost']
1088
        )
1089
1090
        # Check if the result was removed.
1091
        response = secET.fromstring(
1092
            daemon.handle_command(
1093
                '<get_scans scan_id="%s" details="1"/>' % scan_id
1094
            )
1095
        )
1096
        result = response.findall('scan/results/result')
1097
        self.assertEqual(len(result), 2)
1098
1099
    def test_result_order (self):
1100
        daemon = DummyWrapper([])
1101
        response = secET.fromstring(
1102
            daemon.handle_command(
1103
                '<start_scan parallel="1">'
1104
                '<scanner_params />'
1105
                '<targets><target>'
1106
                '<hosts>a</hosts>'
1107
                '<ports>22</ports>'
1108
                '</target></targets>'
1109
                '</start_scan>'
1110
            )
1111
        )
1112
1113
        scan_id = response.findtext('id')
1114
1115
        daemon.add_scan_log(scan_id, host='a', name='a')
1116
        daemon.add_scan_log(scan_id, host='c', name='c')
1117
        daemon.add_scan_log(scan_id, host='b', name='b')
1118
        hosts = ['a','c','b']
1119
        response = secET.fromstring(
1120
            daemon.handle_command('<get_scans details="1"/>'
1121
            )
1122
        )
1123
        results = response.findall("scan/results/")
1124
1125
        for idx, res in enumerate(results):
1126
            att_dict = res.attrib
1127
            self.assertEqual(hosts[idx], att_dict['name'])
1128