Passed
Pull Request — master (#266)
by Juan José
01:20
created

ScanTestCase.test_result_order()   A

Complexity

Conditions 2

Size

Total Lines 32
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 19
nop 1
dl 0
loc 32
rs 9.45
c 0
b 0
f 0
1
# Copyright (C) 2014-2020 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: AGPL-3.0-or-later
4
#
5
# This program is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Affero General Public License as
7
# published by the Free Software Foundation, either version 3 of the
8
# License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
# pylint: disable=too-many-lines
19
20
""" Test module for scan runs
21
"""
22
23
import time
24
import unittest
25
26
from unittest.mock import patch, MagicMock
27
28
import xml.etree.ElementTree as ET
29
30
from defusedxml.common import EntitiesForbidden
31
32
from ospd.resultlist import ResultList
33
from ospd.errors import OspdCommandError
34
35
from .helper import DummyWrapper, assert_called, FakeStream
36
37
38
class FakeStartProcess:
39
    def __init__(self):
40
        self.run_mock = MagicMock()
41
        self.call_mock = MagicMock()
42
43
        self.func = None
44
        self.args = None
45
        self.kwargs = None
46
47
    def __call__(self, func, *, args=None, kwargs=None):
48
        self.func = func
49
        self.args = args or []
50
        self.kwargs = kwargs or {}
51
        return self.call_mock
52
53
    def run(self):
54
        self.func(*self.args, **self.kwargs)
55
        return self.run_mock
56
57
    def __repr__(self):
58
        return "<FakeProcess func={} args={} kwargs={}>".format(
59
            self.func, self.args, self.kwargs
60
        )
61
62
63
class Result(object):
64
    def __init__(self, type_, **kwargs):
65
        self.result_type = type_
66
        self.host = ''
67
        self.hostname = ''
68
        self.name = ''
69
        self.value = ''
70
        self.port = ''
71
        self.test_id = ''
72
        self.severity = ''
73
        self.qod = ''
74
        for name, value in kwargs.items():
75
            setattr(self, name, value)
76
77
78
class ScanTestCase(unittest.TestCase):
79
    def test_get_default_scanner_params(self):
80
        daemon = DummyWrapper([])
81
        fs = FakeStream()
82
83
        daemon.handle_command('<get_scanner_details />', fs)
84
        response = fs.get_response()
85
86
        # The status of the response must be success (i.e. 200)
87
        self.assertEqual(response.get('status'), '200')
88
        # The response root element must have the correct name
89
        self.assertEqual(response.tag, 'get_scanner_details_response')
90
        # The response must contain a 'scanner_params' element
91
        self.assertIsNotNone(response.find('scanner_params'))
92
93
    def test_get_default_help(self):
94
        daemon = DummyWrapper([])
95
        fs = FakeStream()
96
97
        daemon.handle_command('<help />', fs)
98
        response = fs.get_response()
99
        self.assertEqual(response.get('status'), '200')
100
101
        fs = FakeStream()
102
        daemon.handle_command('<help format="xml" />', fs)
103
        response = fs.get_response()
104
105
        self.assertEqual(response.get('status'), '200')
106
        self.assertEqual(response.tag, 'help_response')
107
108
    def test_get_default_scanner_version(self):
109
        daemon = DummyWrapper([])
110
        fs = FakeStream()
111
        daemon.handle_command('<get_version />', fs)
112
        response = fs.get_response()
113
114
        self.assertEqual(response.get('status'), '200')
115
        self.assertIsNotNone(response.find('protocol'))
116
117
    def test_get_vts_no_vt(self):
118
        daemon = DummyWrapper([])
119
        fs = FakeStream()
120
121
        daemon.handle_command('<get_vts />', fs)
122
        response = fs.get_response()
123
124
        self.assertEqual(response.get('status'), '200')
125
        self.assertIsNotNone(response.find('vts'))
126
127
    def test_get_vt_xml_no_dict(self):
128
        daemon = DummyWrapper([])
129
        single_vt = ('1234', None)
130
        vt = daemon.get_vt_xml(single_vt)
131
        self.assertFalse(vt.get('id'))
132
133
    def test_get_vts_single_vt(self):
134
        daemon = DummyWrapper([])
135
        fs = FakeStream()
136
        daemon.add_vt('1.2.3.4', 'A vulnerability test')
137
        daemon.handle_command('<get_vts />', fs)
138
        response = fs.get_response()
139
140
        self.assertEqual(response.get('status'), '200')
141
142
        vts = response.find('vts')
143
        self.assertIsNotNone(vts.find('vt'))
144
145
        vt = vts.find('vt')
146
        self.assertEqual(vt.get('id'), '1.2.3.4')
147
148
    def test_get_vts_still_not_init(self):
149
        daemon = DummyWrapper([])
150
        fs = FakeStream()
151
        daemon.initialized = False
152
        daemon.handle_command('<get_vts />', fs)
153
        response = fs.get_response()
154
155
        self.assertEqual(response.get('status'), '400')
156
157
    def test_get_help_still_not_init(self):
158
        daemon = DummyWrapper([])
159
        fs = FakeStream()
160
        daemon.initialized = False
161
        daemon.handle_command('<help/>', fs)
162
        response = fs.get_response()
163
164
        self.assertEqual(response.get('status'), '200')
165
166 View Code Duplication
    def test_get_vts_filter_positive(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
167
        daemon = DummyWrapper([])
168
        daemon.add_vt(
169
            '1.2.3.4',
170
            'A vulnerability test',
171
            vt_params="a",
172
            vt_modification_time='19000202',
173
        )
174
        fs = FakeStream()
175
176
        daemon.handle_command(
177
            '<get_vts filter="modification_time&gt;19000201"></get_vts>', fs
178
        )
179
        response = fs.get_response()
180
181
        self.assertEqual(response.get('status'), '200')
182
        vts = response.find('vts')
183
184
        vt = vts.find('vt')
185
        self.assertIsNotNone(vt)
186
        self.assertEqual(vt.get('id'), '1.2.3.4')
187
188
        modification_time = response.findall('vts/vt/modification_time')
189
        self.assertEqual(
190
            '<modification_time>19000202</modification_time>',
191
            ET.tostring(modification_time[0]).decode('utf-8'),
192
        )
193
194 View Code Duplication
    def test_get_vts_filter_negative(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
195
        daemon = DummyWrapper([])
196
        daemon.add_vt(
197
            '1.2.3.4',
198
            'A vulnerability test',
199
            vt_params="a",
200
            vt_modification_time='19000202',
201
        )
202
        fs = FakeStream()
203
        daemon.handle_command(
204
            '<get_vts filter="modification_time&lt;19000203"></get_vts>', fs,
205
        )
206
        response = fs.get_response()
207
208
        self.assertEqual(response.get('status'), '200')
209
210
        vts = response.find('vts')
211
212
        vt = vts.find('vt')
213
        self.assertIsNotNone(vt)
214
        self.assertEqual(vt.get('id'), '1.2.3.4')
215
216
        modification_time = response.findall('vts/vt/modification_time')
217
        self.assertEqual(
218
            '<modification_time>19000202</modification_time>',
219
            ET.tostring(modification_time[0]).decode('utf-8'),
220
        )
221
222
    def test_get_vts_bad_filter(self):
223
        daemon = DummyWrapper([])
224
        fs = FakeStream()
225
        cmd = '<get_vts filter="modification_time"/>'
226
227
        self.assertRaises(OspdCommandError, daemon.handle_command, cmd, fs)
228
        self.assertTrue(daemon.vts.is_cache_available)
229
230
    def test_get_vtss_multiple_vts(self):
231
        daemon = DummyWrapper([])
232
        daemon.add_vt('1.2.3.4', 'A vulnerability test')
233
        daemon.add_vt('1.2.3.5', 'Another vulnerability test')
234
        daemon.add_vt('123456789', 'Yet another vulnerability test')
235
236
        fs = FakeStream()
237
238
        daemon.handle_command('<get_vts />', fs)
239
        response = fs.get_response()
240
        self.assertEqual(response.get('status'), '200')
241
242
        vts = response.find('vts')
243
        self.assertIsNotNone(vts.find('vt'))
244
245
    def test_get_vts_multiple_vts_with_custom(self):
246
        daemon = DummyWrapper([])
247
        daemon.add_vt('1.2.3.4', 'A vulnerability test', custom='b')
248
        daemon.add_vt(
249
            '4.3.2.1', 'Another vulnerability test with custom info', custom='b'
250
        )
251
        daemon.add_vt('123456789', 'Yet another vulnerability test', custom='b')
252
        fs = FakeStream()
253
254
        daemon.handle_command('<get_vts />', fs)
255
        response = fs.get_response()
256
257
        custom = response.findall('vts/vt/custom')
258
259
        self.assertEqual(3, len(custom))
260
261 View Code Duplication
    def test_get_vts_vts_with_params(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
262
        daemon = DummyWrapper([])
263
        daemon.add_vt(
264
            '1.2.3.4', 'A vulnerability test', vt_params="a", custom="b"
265
        )
266
        fs = FakeStream()
267
268
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
269
        response = fs.get_response()
270
271
        # The status of the response must be success (i.e. 200)
272
        self.assertEqual(response.get('status'), '200')
273
274
        # The response root element must have the correct name
275
        self.assertEqual(response.tag, 'get_vts_response')
276
        # The response must contain a 'scanner_params' element
277
        self.assertIsNotNone(response.find('vts'))
278
279
        vt_params = response[0][0].findall('params')
280
        self.assertEqual(1, len(vt_params))
281
282
        custom = response[0][0].findall('custom')
283
        self.assertEqual(1, len(custom))
284
285
        params = response.findall('vts/vt/params/param')
286
        self.assertEqual(2, len(params))
287
288 View Code Duplication
    def test_get_vts_vts_with_refs(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
289
        daemon = DummyWrapper([])
290
        daemon.add_vt(
291
            '1.2.3.4',
292
            'A vulnerability test',
293
            vt_params="a",
294
            custom="b",
295
            vt_refs="c",
296
        )
297
        fs = FakeStream()
298
299
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
300
        response = fs.get_response()
301
302
        # The status of the response must be success (i.e. 200)
303
        self.assertEqual(response.get('status'), '200')
304
305
        # The response root element must have the correct name
306
        self.assertEqual(response.tag, 'get_vts_response')
307
308
        # The response must contain a 'vts' element
309
        self.assertIsNotNone(response.find('vts'))
310
311
        vt_params = response[0][0].findall('params')
312
        self.assertEqual(1, len(vt_params))
313
314
        custom = response[0][0].findall('custom')
315
        self.assertEqual(1, len(custom))
316
317
        refs = response.findall('vts/vt/refs/ref')
318
        self.assertEqual(2, len(refs))
319
320
    def test_get_vts_vts_with_dependencies(self):
321
        daemon = DummyWrapper([])
322
        daemon.add_vt(
323
            '1.2.3.4',
324
            'A vulnerability test',
325
            vt_params="a",
326
            custom="b",
327
            vt_dependencies="c",
328
        )
329
        fs = FakeStream()
330
331
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
332
333
        response = fs.get_response()
334
335
        deps = response.findall('vts/vt/dependencies/dependency')
336
        self.assertEqual(2, len(deps))
337
338
    def test_get_vts_vts_with_severities(self):
339
        daemon = DummyWrapper([])
340
        daemon.add_vt(
341
            '1.2.3.4',
342
            'A vulnerability test',
343
            vt_params="a",
344
            custom="b",
345
            severities="c",
346
        )
347
        fs = FakeStream()
348
349
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
350
        response = fs.get_response()
351
352
        severity = response.findall('vts/vt/severities/severity')
353
        self.assertEqual(1, len(severity))
354
355
    def test_get_vts_vts_with_detection_qodt(self):
356
        daemon = DummyWrapper([])
357
        daemon.add_vt(
358
            '1.2.3.4',
359
            'A vulnerability test',
360
            vt_params="a",
361
            custom="b",
362
            detection="c",
363
            qod_t="d",
364
        )
365
        fs = FakeStream()
366
367
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
368
        response = fs.get_response()
369
370
        detection = response.findall('vts/vt/detection')
371
        self.assertEqual(1, len(detection))
372
373
    def test_get_vts_vts_with_detection_qodv(self):
374
        daemon = DummyWrapper([])
375
        daemon.add_vt(
376
            '1.2.3.4',
377
            'A vulnerability test',
378
            vt_params="a",
379
            custom="b",
380
            detection="c",
381
            qod_v="d",
382
        )
383
        fs = FakeStream()
384
385
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
386
        response = fs.get_response()
387
388
        detection = response.findall('vts/vt/detection')
389
        self.assertEqual(1, len(detection))
390
391
    def test_get_vts_vts_with_summary(self):
392
        daemon = DummyWrapper([])
393
        daemon.add_vt(
394
            '1.2.3.4',
395
            'A vulnerability test',
396
            vt_params="a",
397
            custom="b",
398
            summary="c",
399
        )
400
        fs = FakeStream()
401
402
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
403
        response = fs.get_response()
404
405
        summary = response.findall('vts/vt/summary')
406
        self.assertEqual(1, len(summary))
407
408
    def test_get_vts_vts_with_impact(self):
409
        daemon = DummyWrapper([])
410
        daemon.add_vt(
411
            '1.2.3.4',
412
            'A vulnerability test',
413
            vt_params="a",
414
            custom="b",
415
            impact="c",
416
        )
417
        fs = FakeStream()
418
419
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
420
        response = fs.get_response()
421
422
        impact = response.findall('vts/vt/impact')
423
        self.assertEqual(1, len(impact))
424
425
    def test_get_vts_vts_with_affected(self):
426
        daemon = DummyWrapper([])
427
        daemon.add_vt(
428
            '1.2.3.4',
429
            'A vulnerability test',
430
            vt_params="a",
431
            custom="b",
432
            affected="c",
433
        )
434
        fs = FakeStream()
435
436
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
437
        response = fs.get_response()
438
439
        affect = response.findall('vts/vt/affected')
440
        self.assertEqual(1, len(affect))
441
442
    def test_get_vts_vts_with_insight(self):
443
        daemon = DummyWrapper([])
444
        daemon.add_vt(
445
            '1.2.3.4',
446
            'A vulnerability test',
447
            vt_params="a",
448
            custom="b",
449
            insight="c",
450
        )
451
        fs = FakeStream()
452
453
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
454
        response = fs.get_response()
455
456
        insight = response.findall('vts/vt/insight')
457
        self.assertEqual(1, len(insight))
458
459
    def test_get_vts_vts_with_solution(self):
460
        daemon = DummyWrapper([])
461
        daemon.add_vt(
462
            '1.2.3.4',
463
            'A vulnerability test',
464
            vt_params="a",
465
            custom="b",
466
            solution="c",
467
            solution_t="d",
468
            solution_m="e",
469
        )
470
        fs = FakeStream()
471
472
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
473
        response = fs.get_response()
474
475
        solution = response.findall('vts/vt/solution')
476
        self.assertEqual(1, len(solution))
477
478
    def test_get_vts_vts_with_ctime(self):
479
        daemon = DummyWrapper([])
480
        daemon.add_vt(
481
            '1.2.3.4',
482
            'A vulnerability test',
483
            vt_params="a",
484
            vt_creation_time='01-01-1900',
485
        )
486
        fs = FakeStream()
487
488
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
489
        response = fs.get_response()
490
491
        creation_time = response.findall('vts/vt/creation_time')
492
        self.assertEqual(
493
            '<creation_time>01-01-1900</creation_time>',
494
            ET.tostring(creation_time[0]).decode('utf-8'),
495
        )
496
497
    def test_get_vts_vts_with_mtime(self):
498
        daemon = DummyWrapper([])
499
        daemon.add_vt(
500
            '1.2.3.4',
501
            'A vulnerability test',
502
            vt_params="a",
503
            vt_modification_time='02-01-1900',
504
        )
505
        fs = FakeStream()
506
507
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
508
        response = fs.get_response()
509
510
        modification_time = response.findall('vts/vt/modification_time')
511
        self.assertEqual(
512
            '<modification_time>02-01-1900</modification_time>',
513
            ET.tostring(modification_time[0]).decode('utf-8'),
514
        )
515
516
    def test_clean_forgotten_scans(self):
517
        daemon = DummyWrapper([])
518
        fs = FakeStream()
519
520
        daemon.handle_command(
521
            '<start_scan target="localhost" ports="80, '
522
            '443"><scanner_params /></start_scan>',
523
            fs,
524
        )
525
        response = fs.get_response()
526
527
        scan_id = response.findtext('id')
528
529
        finished = False
530
531
        while not finished:
532
            fs = FakeStream()
533
            daemon.handle_command(
534
                '<get_scans scan_id="%s" details="1"/>' % scan_id, fs
535
            )
536
            response = fs.get_response()
537
538
            scans = response.findall('scan')
539
            self.assertEqual(1, len(scans))
540
541
            scan = scans[0]
542
543
            if scan.get('end_time') != '0':
544
                finished = True
545
            else:
546
                time.sleep(0.01)
547
548
            fs = FakeStream()
549
            daemon.handle_command(
550
                '<get_scans scan_id="%s" details="1"/>' % scan_id, fs
551
            )
552
            response = fs.get_response()
553
554
        self.assertEqual(len(list(daemon.scan_collection.ids_iterator())), 1)
555
556
        # Set an old end_time
557
        daemon.scan_collection.scans_table[scan_id]['end_time'] = 123456
558
        # Run the check
559
        daemon.clean_forgotten_scans()
560
        # Not removed
561
        self.assertEqual(len(list(daemon.scan_collection.ids_iterator())), 1)
562
563
        # Set the max time and run again
564
        daemon.scaninfo_store_time = 1
565
        daemon.clean_forgotten_scans()
566
        # Now is removed
567
        self.assertEqual(len(list(daemon.scan_collection.ids_iterator())), 0)
568
569
    def test_scan_with_error(self):
570
        daemon = DummyWrapper([Result('error', value='something went wrong')])
571
        fs = FakeStream()
572
573
        daemon.handle_command(
574
            '<start_scan target="localhost" ports="80, '
575
            '443"><scanner_params /></start_scan>',
576
            fs,
577
        )
578
        response = fs.get_response()
579
        scan_id = response.findtext('id')
580
        finished = False
581
        while not finished:
582
            fs = FakeStream()
583
            daemon.handle_command(
584
                '<get_scans scan_id="%s" details="1"/>' % scan_id, fs
585
            )
586
            response = fs.get_response()
587
588
            scans = response.findall('scan')
589
            self.assertEqual(1, len(scans))
590
591
            scan = scans[0]
592
            status = scan.get('status')
593
594
            if status == "init" or status == "running":
595
                self.assertEqual('0', scan.get('end_time'))
596
                time.sleep(0.010)
597
            else:
598
                finished = True
599
600
            fs = FakeStream()
601
602
            daemon.handle_command(
603
                '<get_scans scan_id="%s" details="1"/>' % scan_id, fs
604
            )
605
            response = fs.get_response()
606
607
        self.assertEqual(
608
            response.findtext('scan/results/result'), 'something went wrong'
609
        )
610
        fs = FakeStream()
611
        daemon.handle_command('<delete_scan scan_id="%s" />' % scan_id, fs)
612
        response = fs.get_response()
613
614
        self.assertEqual(response.get('status'), '200')
615
616 View Code Duplication
    def test_get_scan_pop(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
617
        daemon = DummyWrapper([Result('host-detail', value='Some Host Detail')])
618
        fs = FakeStream()
619
620
        daemon.handle_command(
621
            '<start_scan target="localhost" ports="80, 443">'
622
            '<scanner_params /></start_scan>',
623
            fs,
624
        )
625
        response = fs.get_response()
626
627
        scan_id = response.findtext('id')
628
        time.sleep(1)
629
630
        fs = FakeStream()
631
        daemon.handle_command('<get_scans scan_id="%s"/>' % scan_id, fs)
632
        response = fs.get_response()
633
634
        self.assertEqual(
635
            response.findtext('scan/results/result'), 'Some Host Detail'
636
        )
637
        fs = FakeStream()
638
        daemon.handle_command(
639
            '<get_scans scan_id="%s" pop_results="1"/>' % scan_id, fs
640
        )
641
        response = fs.get_response()
642
643
        self.assertEqual(
644
            response.findtext('scan/results/result'), 'Some Host Detail'
645
        )
646
647
        fs = FakeStream()
648
        daemon.handle_command('<get_scans details="0" pop_results="1"/>', fs)
649
        response = fs.get_response()
650
651
        self.assertEqual(response.findtext('scan/results/result'), None)
652
653 View Code Duplication
    def test_get_scan_pop_max_res(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
654
        daemon = DummyWrapper(
655
            [
656
                Result('host-detail', value='Some Host Detail'),
657
                Result('host-detail', value='Some Host Detail1'),
658
                Result('host-detail', value='Some Host Detail2'),
659
            ]
660
        )
661
        fs = FakeStream()
662
        daemon.handle_command(
663
            '<start_scan target="localhost" ports="80, 443">'
664
            '<scanner_params /></start_scan>',
665
            fs,
666
        )
667
        response = fs.get_response()
668
669
        scan_id = response.findtext('id')
670
        time.sleep(1)
671
672
        fs = FakeStream()
673
        daemon.handle_command(
674
            '<get_scans scan_id="%s" pop_results="1" max_results="1"/>'
675
            % scan_id,
676
            fs,
677
        )
678
        response = fs.get_response()
679
680
        self.assertEqual(len(response.findall('scan/results/result')), 1)
681
682
        fs = FakeStream()
683
        daemon.handle_command(
684
            '<get_scans scan_id="%s" pop_results="1"/>' % scan_id, fs
685
        )
686
        response = fs.get_response()
687
688
        self.assertEqual(len(response.findall('scan/results/result')), 2)
689
690
    def test_billon_laughs(self):
691
        # pylint: disable=line-too-long
692
        daemon = DummyWrapper([])
693
        lol = (
694
            '<?xml version="1.0"?>'
695
            '<!DOCTYPE lolz ['
696
            ' <!ENTITY lol "lol">'
697
            ' <!ELEMENT lolz (#PCDATA)>'
698
            ' <!ENTITY lol1 "&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;">'
699
            ' <!ENTITY lol2 "&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;">'
700
            ' <!ENTITY lol3 "&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;">'
701
            ' <!ENTITY lol4 "&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;">'
702
            ' <!ENTITY lol5 "&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;">'
703
            ' <!ENTITY lol6 "&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;">'
704
            ' <!ENTITY lol7 "&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;">'
705
            ' <!ENTITY lol8 "&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;">'
706
            ' <!ENTITY lol9 "&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;">'
707
            ']>'
708
        )
709
        fs = FakeStream()
710
        self.assertRaises(EntitiesForbidden, daemon.handle_command, lol, fs)
711
712
    def test_target_with_credentials(self):
713
        daemon = DummyWrapper([])
714
        fs = FakeStream()
715
        daemon.handle_command(
716
            '<start_scan>'
717
            '<scanner_params /><vts><vt id="1.2.3.4" />'
718
            '</vts>'
719
            '<targets><target>'
720
            '<hosts>192.168.0.0/24</hosts><ports>22'
721
            '</ports><credentials>'
722
            '<credential type="up" service="ssh" port="22">'
723
            '<username>scanuser</username>'
724
            '<password>mypass</password>'
725
            '</credential><credential type="up" service="smb">'
726
            '<username>smbuser</username>'
727
            '<password>mypass</password></credential>'
728
            '</credentials>'
729
            '</target></targets>'
730
            '</start_scan>',
731
            fs,
732
        )
733
        response = fs.get_response()
734
735
        self.assertEqual(response.get('status'), '200')
736
737
        cred_dict = {
738
            'ssh': {
739
                'type': 'up',
740
                'password': 'mypass',
741
                'port': '22',
742
                'username': 'scanuser',
743
            },
744
            'smb': {'type': 'up', 'password': 'mypass', 'username': 'smbuser'},
745
        }
746
        scan_id = response.findtext('id')
747
        response = daemon.get_scan_credentials(scan_id)
748
        self.assertEqual(response, cred_dict)
749
750
    def test_scan_get_target(self):
751
        daemon = DummyWrapper([])
752
        fs = FakeStream()
753
        daemon.handle_command(
754
            '<start_scan>'
755
            '<scanner_params /><vts><vt id="1.2.3.4" />'
756
            '</vts>'
757
            '<targets><target>'
758
            '<hosts>localhosts,192.168.0.0/24</hosts>'
759
            '<ports>80,443</ports>'
760
            '</target></targets>'
761
            '</start_scan>',
762
            fs,
763
        )
764
        response = fs.get_response()
765
        scan_id = response.findtext('id')
766
767
        fs = FakeStream()
768
        daemon.handle_command('<get_scans scan_id="%s"/>' % scan_id, fs)
769
        response = fs.get_response()
770
771
        scan_res = response.find('scan')
772
        self.assertEqual(scan_res.get('target'), 'localhosts,192.168.0.0/24')
773
774
    def test_scan_get_target_options(self):
775
        daemon = DummyWrapper([])
776
        fs = FakeStream()
777
        daemon.handle_command(
778
            '<start_scan>'
779
            '<scanner_params /><vts><vt id="1.2.3.4" />'
780
            '</vts>'
781
            '<targets>'
782
            '<target><hosts>192.168.0.1</hosts>'
783
            '<ports>22</ports><alive_test>0</alive_test></target>'
784
            '</targets>'
785
            '</start_scan>',
786
            fs,
787
        )
788
        response = fs.get_response()
789
790
        scan_id = response.findtext('id')
791
        time.sleep(1)
792
        target_options = daemon.get_scan_target_options(scan_id)
793
        self.assertEqual(target_options, {'alive_test': '0'})
794
795
    def test_progress(self):
796
        daemon = DummyWrapper([])
797
798
        fs = FakeStream()
799
        daemon.handle_command(
800
            '<start_scan parallel="2">'
801
            '<scanner_params />'
802
            '<targets><target>'
803
            '<hosts>localhost1, localhost2</hosts>'
804
            '<ports>22</ports>'
805
            '</target></targets>'
806
            '</start_scan>',
807
            fs,
808
        )
809
        response = fs.get_response()
810
811
        scan_id = response.findtext('id')
812
        daemon.set_scan_host_progress(scan_id, 'localhost1', 75)
813
        daemon.set_scan_host_progress(scan_id, 'localhost2', 25)
814
815
        self.assertEqual(daemon.calculate_progress(scan_id), 50)
816
817
    def test_sort_host_finished(self):
818
        daemon = DummyWrapper([])
819
820
        fs = FakeStream()
821
        daemon.handle_command(
822
            '<start_scan parallel="2">'
823
            '<scanner_params />'
824
            '<targets><target>'
825
            '<hosts>localhost1, localhost2, localhost3, localhost4</hosts>'
826
            '<ports>22</ports>'
827
            '</target></targets>'
828
            '</start_scan>',
829
            fs,
830
        )
831
        response = fs.get_response()
832
833
        scan_id = response.findtext('id')
834
        daemon.set_scan_host_progress(scan_id, 'localhost3', -1)
835
        daemon.set_scan_host_progress(scan_id, 'localhost1', 75)
836
        daemon.set_scan_host_progress(scan_id, 'localhost4', 100)
837
        daemon.set_scan_host_progress(scan_id, 'localhost2', 25)
838
839
        daemon.sort_host_finished(scan_id, ['localhost3', 'localhost4'])
840
841
        rounded_progress = int(daemon.calculate_progress(scan_id))
842
        self.assertEqual(rounded_progress, 66)
843
844
    def test_get_scan_progress_xml(self):
845
        daemon = DummyWrapper([])
846
847
        fs = FakeStream()
848
        daemon.handle_command(
849
            '<start_scan parallel="2">'
850
            '<scanner_params />'
851
            '<targets><target>'
852
            '<hosts>localhost1, localhost2, localhost3, localhost4</hosts>'
853
            '<ports>22</ports>'
854
            '</target></targets>'
855
            '</start_scan>',
856
            fs,
857
        )
858
        response = fs.get_response()
859
        scan_id = response.findtext('id')
860
861
        daemon.set_scan_host_progress(scan_id, 'localhost3', -1)
862
        daemon.set_scan_host_progress(scan_id, 'localhost4', 100)
863
        daemon.sort_host_finished(scan_id, ['localhost3', 'localhost4'])
864
865
        daemon.set_scan_host_progress(scan_id, 'localhost1', 75)
866
        daemon.set_scan_host_progress(scan_id, 'localhost2', 25)
867
868
        fs = FakeStream()
869
        daemon.handle_command(
870
            '<get_scans details="0" progress="1"/>', fs,
871
        )
872
        response = fs.get_response()
873
874
        progress = response.find('scan/progress')
875
876
        overall = float(progress.findtext('overall'))
877
        self.assertEqual(int(overall), 66)
878
879
        count_alive = progress.findtext('count_alive')
880
        self.assertEqual(count_alive, '1')
881
882
        count_dead = progress.findtext('count_dead')
883
        self.assertEqual(count_dead, '1')
884
885
        current_hosts = progress.findall('host')
886
        self.assertEqual(len(current_hosts), 2)
887
888
        count_excluded = progress.findtext('count_excluded')
889
        self.assertEqual(count_excluded, '0')
890
891
    def test_set_get_vts_version(self):
892
        daemon = DummyWrapper([])
893
        daemon.set_vts_version('1234')
894
895
        version = daemon.get_vts_version()
896
        self.assertEqual('1234', version)
897
898
    def test_set_get_vts_version_error(self):
899
        daemon = DummyWrapper([])
900
        self.assertRaises(TypeError, daemon.set_vts_version)
901
902
    @patch("ospd.ospd.os")
903
    @patch("ospd.command.command.create_process")
904
    def test_scan_exists(self, mock_create_process, _mock_os):
905
        daemon = DummyWrapper([])
906
907
        fp = FakeStartProcess()
908
        mock_create_process.side_effect = fp
909
        mock_process = fp.call_mock
910
        mock_process.start.side_effect = fp.run
911
        mock_process.is_alive.return_value = True
912
        mock_process.pid = "main-scan-process"
913
914
        fs = FakeStream()
915
        daemon.handle_command(
916
            '<start_scan>'
917
            '<scanner_params />'
918
            '<targets><target>'
919
            '<hosts>localhost</hosts>'
920
            '<ports>22</ports>'
921
            '</target></targets>'
922
            '</start_scan>',
923
            fs,
924
        )
925
        response = fs.get_response()
926
        scan_id = response.findtext('id')
927
        self.assertIsNotNone(scan_id)
928
929
        status = response.get('status_text')
930
        self.assertEqual(status, 'OK')
931
932
        assert_called(mock_create_process)
933
        assert_called(mock_process.start)
934
935
        daemon.handle_command('<stop_scan scan_id="%s" />' % scan_id, fs)
936
937
        fs = FakeStream()
938
        cmd = (
939
            '<start_scan scan_id="' + scan_id + '">'
940
            '<scanner_params />'
941
            '<targets><target>'
942
            '<hosts>localhost</hosts>'
943
            '<ports>22</ports>'
944
            '</target></targets>'
945
            '</start_scan>'
946
        )
947
948
        daemon.handle_command(
949
            cmd, fs,
950
        )
951
        response = fs.get_response()
952
        status = response.get('status_text')
953
        self.assertEqual(status, 'Continue')
954
955
    def test_result_order(self):
956
        daemon = DummyWrapper([])
957
        fs = FakeStream()
958
        daemon.handle_command(
959
            '<start_scan parallel="1">'
960
            '<scanner_params />'
961
            '<targets><target>'
962
            '<hosts>a</hosts>'
963
            '<ports>22</ports>'
964
            '</target></targets>'
965
            '</start_scan>',
966
            fs,
967
        )
968
969
        response = fs.get_response()
970
971
        scan_id = response.findtext('id')
972
973
        daemon.add_scan_log(scan_id, host='a', name='a')
974
        daemon.add_scan_log(scan_id, host='c', name='c')
975
        daemon.add_scan_log(scan_id, host='b', name='b')
976
        hosts = ['a', 'c', 'b']
977
978
        fs = FakeStream()
979
        daemon.handle_command('<get_scans details="1"/>', fs)
980
        response = fs.get_response()
981
982
        results = response.findall("scan/results/")
983
984
        for idx, res in enumerate(results):
985
            att_dict = res.attrib
986
            self.assertEqual(hosts[idx], att_dict['name'])
987
988
    def test_batch_result(self):
989
        daemon = DummyWrapper([])
990
        reslist = ResultList()
991
        fs = FakeStream()
992
        daemon.handle_command(
993
            '<start_scan parallel="1">'
994
            '<scanner_params />'
995
            '<targets><target>'
996
            '<hosts>a</hosts>'
997
            '<ports>22</ports>'
998
            '</target></targets>'
999
            '</start_scan>',
1000
            fs,
1001
        )
1002
1003
        response = fs.get_response()
1004
1005
        scan_id = response.findtext('id')
1006
        reslist.add_scan_log_to_list(host='a', name='a')
1007
        reslist.add_scan_log_to_list(host='c', name='c')
1008
        reslist.add_scan_log_to_list(host='b', name='b')
1009
        daemon.scan_collection.add_result_list(scan_id, reslist)
1010
1011
        hosts = ['a', 'c', 'b']
1012
1013
        fs = FakeStream()
1014
        daemon.handle_command('<get_scans details="1"/>', fs)
1015
        response = fs.get_response()
1016
1017
        results = response.findall("scan/results/")
1018
1019
        for idx, res in enumerate(results):
1020
            att_dict = res.attrib
1021
            self.assertEqual(hosts[idx], att_dict['name'])
1022