Passed
Pull Request — master (#211)
by Juan José
01:24
created

FakeStartProcess.__call__()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 5
nop 5
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
# Copyright (C) 2015-2019 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: GPL-2.0-or-later
4
#
5
# This program is free software; you can redistribute it and/or
6
# modify it under the terms of the GNU General Public License
7
# as published by the Free Software Foundation; either version 2
8
# of the License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU General Public License for more details.
14
#
15
# You should have received a copy of the GNU General Public License
16
# along with this program; if not, write to the Free Software
17
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
19
# pylint: disable=too-many-lines
20
21
""" Test module for scan runs
22
"""
23
24
import time
25
import unittest
26
27
from unittest.mock import patch, MagicMock
28
29
import xml.etree.ElementTree as ET
30
import defusedxml.lxml as secET
31
32
from defusedxml.common import EntitiesForbidden
33
34
from .helper import DummyWrapper, assert_called, FakeStream
35
36
37
class FakeStartProcess:
38
    def __init__(self):
39
        self.run_mock = MagicMock()
40
        self.call_mock = MagicMock()
41
42
        self.func = None
43
        self.args = None
44
        self.kwargs = None
45
46
    def __call__(self, func, *, args=None, kwargs=None):
47
        self.func = func
48
        self.args = args or []
49
        self.kwargs = kwargs or {}
50
        return self.call_mock
51
52
    def run(self):
53
        self.func(*self.args, **self.kwargs)
54
        return self.run_mock
55
56
    def __repr__(self):
57
        return "<FakeProcess func={} args={} kwargs={}>".format(
58
            self.func, self.args, self.kwargs
59
        )
60
61
62
class Result(object):
63
    def __init__(self, type_, **kwargs):
64
        self.result_type = type_
65
        self.host = ''
66
        self.hostname = ''
67
        self.name = ''
68
        self.value = ''
69
        self.port = ''
70
        self.test_id = ''
71
        self.severity = ''
72
        self.qod = ''
73
        for name, value in kwargs.items():
74
            setattr(self, name, value)
75
76
77
class ScanTestCase(unittest.TestCase):
78
    def test_get_default_scanner_params(self):
79
        daemon = DummyWrapper([])
80
        fs = FakeStream()
81
82
        daemon.handle_command('<get_scanner_details />', fs)
83
        response = fs.get_response()
84
85
        # The status of the response must be success (i.e. 200)
86
        self.assertEqual(response.get('status'), '200')
87
        # The response root element must have the correct name
88
        self.assertEqual(response.tag, 'get_scanner_details_response')
89
        # The response must contain a 'scanner_params' element
90
        self.assertIsNotNone(response.find('scanner_params'))
91
92
    def test_get_default_help(self):
93
        daemon = DummyWrapper([])
94
        fs = FakeStream()
95
96
        daemon.handle_command('<help />', fs)
97
        response = fs.get_response()
98
        self.assertEqual(response.get('status'), '200')
99
100
        fs = FakeStream()
101
        daemon.handle_command('<help format="xml" />', fs)
102
        response = fs.get_response()
103
104
        self.assertEqual(response.get('status'), '200')
105
        self.assertEqual(response.tag, 'help_response')
106
107
    def test_get_default_scanner_version(self):
108
        daemon = DummyWrapper([])
109
        fs = FakeStream()
110
        daemon.handle_command('<get_version />', fs)
111
        response = fs.get_response()
112
113
        self.assertEqual(response.get('status'), '200')
114
        self.assertIsNotNone(response.find('protocol'))
115
116
    def test_get_vts_no_vt(self):
117
        daemon = DummyWrapper([])
118
        fs = FakeStream()
119
120
        daemon.handle_command('<get_vts />', fs)
121
        response = fs.get_response()
122
123
        self.assertEqual(response.get('status'), '200')
124
        self.assertIsNotNone(response.find('vts'))
125
126
    def test_get_vts_single_vt(self):
127
        daemon = DummyWrapper([])
128
        fs = FakeStream()
129
        daemon.add_vt('1.2.3.4', 'A vulnerability test')
130
        daemon.handle_command('<get_vts />', fs)
131
        response = fs.get_response()
132
133
        self.assertEqual(response.get('status'), '200')
134
135
        vts = response.find('vts')
136
        self.assertIsNotNone(vts.find('vt'))
137
138
        vt = vts.find('vt')
139
        self.assertEqual(vt.get('id'), '1.2.3.4')
140
141 View Code Duplication
    def test_get_vts_filter_positive(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
142
        daemon = DummyWrapper([])
143
        daemon.add_vt(
144
            '1.2.3.4',
145
            'A vulnerability test',
146
            vt_params="a",
147
            vt_modification_time='19000202',
148
        )
149
        fs = FakeStream()
150
151
        daemon.handle_command(
152
            '<get_vts filter="modification_time&gt;19000201"></get_vts>', fs
153
        )
154
        response = fs.get_response()
155
156
        self.assertEqual(response.get('status'), '200')
157
        vts = response.find('vts')
158
159
        vt = vts.find('vt')
160
        self.assertIsNotNone(vt)
161
        self.assertEqual(vt.get('id'), '1.2.3.4')
162
163
        modification_time = response.findall('vts/vt/modification_time')
164
        self.assertEqual(
165
            '<modification_time>19000202</modification_time>',
166
            ET.tostring(modification_time[0]).decode('utf-8'),
167
        )
168
169 View Code Duplication
    def test_get_vts_filter_negative(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
170
        daemon = DummyWrapper([])
171
        daemon.add_vt(
172
            '1.2.3.4',
173
            'A vulnerability test',
174
            vt_params="a",
175
            vt_modification_time='19000202',
176
        )
177
        fs = FakeStream()
178
        daemon.handle_command(
179
            '<get_vts filter="modification_time&lt;19000203"></get_vts>', fs,
180
        )
181
        response = fs.get_response()
182
183
        self.assertEqual(response.get('status'), '200')
184
185
        vts = response.find('vts')
186
187
        vt = vts.find('vt')
188
        self.assertIsNotNone(vt)
189
        self.assertEqual(vt.get('id'), '1.2.3.4')
190
191
        modification_time = response.findall('vts/vt/modification_time')
192
        self.assertEqual(
193
            '<modification_time>19000202</modification_time>',
194
            ET.tostring(modification_time[0]).decode('utf-8'),
195
        )
196
197
    def test_get_vtss_multiple_vts(self):
198
        daemon = DummyWrapper([])
199
        daemon.add_vt('1.2.3.4', 'A vulnerability test')
200
        daemon.add_vt('1.2.3.5', 'Another vulnerability test')
201
        daemon.add_vt('123456789', 'Yet another vulnerability test')
202
203
        fs = FakeStream()
204
205
        daemon.handle_command('<get_vts />', fs)
206
        response = fs.get_response()
207
        self.assertEqual(response.get('status'), '200')
208
209
        vts = response.find('vts')
210
        self.assertIsNotNone(vts.find('vt'))
211
212
    def test_get_vts_multiple_vts_with_custom(self):
213
        daemon = DummyWrapper([])
214
        daemon.add_vt('1.2.3.4', 'A vulnerability test', custom='b')
215
        daemon.add_vt(
216
            '4.3.2.1', 'Another vulnerability test with custom info', custom='b'
217
        )
218
        daemon.add_vt('123456789', 'Yet another vulnerability test', custom='b')
219
        fs = FakeStream()
220
221
        daemon.handle_command('<get_vts />', fs)
222
        response = fs.get_response()
223
224
        custom = response.findall('vts/vt/custom')
225
226
        self.assertEqual(3, len(custom))
227
228 View Code Duplication
    def test_get_vts_vts_with_params(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
229
        daemon = DummyWrapper([])
230
        daemon.add_vt(
231
            '1.2.3.4', 'A vulnerability test', vt_params="a", custom="b"
232
        )
233
        fs = FakeStream()
234
235
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
236
        response = fs.get_response()
237
238
        # The status of the response must be success (i.e. 200)
239
        self.assertEqual(response.get('status'), '200')
240
241
        # The response root element must have the correct name
242
        self.assertEqual(response.tag, 'get_vts_response')
243
        # The response must contain a 'scanner_params' element
244
        self.assertIsNotNone(response.find('vts'))
245
246
        vt_params = response[0][0].findall('params')
247
        self.assertEqual(1, len(vt_params))
248
249
        custom = response[0][0].findall('custom')
250
        self.assertEqual(1, len(custom))
251
252
        params = response.findall('vts/vt/params/param')
253
        self.assertEqual(2, len(params))
254
255 View Code Duplication
    def test_get_vts_vts_with_refs(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
256
        daemon = DummyWrapper([])
257
        daemon.add_vt(
258
            '1.2.3.4',
259
            'A vulnerability test',
260
            vt_params="a",
261
            custom="b",
262
            vt_refs="c",
263
        )
264
        fs = FakeStream()
265
266
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
267
        response = fs.get_response()
268
269
        # The status of the response must be success (i.e. 200)
270
        self.assertEqual(response.get('status'), '200')
271
272
        # The response root element must have the correct name
273
        self.assertEqual(response.tag, 'get_vts_response')
274
275
        # The response must contain a 'vts' element
276
        self.assertIsNotNone(response.find('vts'))
277
278
        vt_params = response[0][0].findall('params')
279
        self.assertEqual(1, len(vt_params))
280
281
        custom = response[0][0].findall('custom')
282
        self.assertEqual(1, len(custom))
283
284
        refs = response.findall('vts/vt/refs/ref')
285
        self.assertEqual(2, len(refs))
286
287
    def test_get_vts_vts_with_dependencies(self):
288
        daemon = DummyWrapper([])
289
        daemon.add_vt(
290
            '1.2.3.4',
291
            'A vulnerability test',
292
            vt_params="a",
293
            custom="b",
294
            vt_dependencies="c",
295
        )
296
        fs = FakeStream()
297
298
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
299
300
        response = fs.get_response()
301
302
        deps = response.findall('vts/vt/dependencies/dependency')
303
        self.assertEqual(2, len(deps))
304
305
    def test_get_vts_vts_with_severities(self):
306
        daemon = DummyWrapper([])
307
        daemon.add_vt(
308
            '1.2.3.4',
309
            'A vulnerability test',
310
            vt_params="a",
311
            custom="b",
312
            severities="c",
313
        )
314
        fs = FakeStream()
315
316
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
317
        response = fs.get_response()
318
319
        severity = response.findall('vts/vt/severities/severity')
320
        self.assertEqual(1, len(severity))
321
322
    def test_get_vts_vts_with_detection_qodt(self):
323
        daemon = DummyWrapper([])
324
        daemon.add_vt(
325
            '1.2.3.4',
326
            'A vulnerability test',
327
            vt_params="a",
328
            custom="b",
329
            detection="c",
330
            qod_t="d",
331
        )
332
        fs = FakeStream()
333
334
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
335
        response = fs.get_response()
336
337
        detection = response.findall('vts/vt/detection')
338
        self.assertEqual(1, len(detection))
339
340
    def test_get_vts_vts_with_detection_qodv(self):
341
        daemon = DummyWrapper([])
342
        daemon.add_vt(
343
            '1.2.3.4',
344
            'A vulnerability test',
345
            vt_params="a",
346
            custom="b",
347
            detection="c",
348
            qod_v="d",
349
        )
350
        fs = FakeStream()
351
352
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
353
        response = fs.get_response()
354
355
        detection = response.findall('vts/vt/detection')
356
        self.assertEqual(1, len(detection))
357
358
    def test_get_vts_vts_with_summary(self):
359
        daemon = DummyWrapper([])
360
        daemon.add_vt(
361
            '1.2.3.4',
362
            'A vulnerability test',
363
            vt_params="a",
364
            custom="b",
365
            summary="c",
366
        )
367
        fs = FakeStream()
368
369
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
370
        response = fs.get_response()
371
372
        summary = response.findall('vts/vt/summary')
373
        self.assertEqual(1, len(summary))
374
375
    def test_get_vts_vts_with_impact(self):
376
        daemon = DummyWrapper([])
377
        daemon.add_vt(
378
            '1.2.3.4',
379
            'A vulnerability test',
380
            vt_params="a",
381
            custom="b",
382
            impact="c",
383
        )
384
        fs = FakeStream()
385
386
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
387
        response = fs.get_response()
388
389
        impact = response.findall('vts/vt/impact')
390
        self.assertEqual(1, len(impact))
391
392
    def test_get_vts_vts_with_affected(self):
393
        daemon = DummyWrapper([])
394
        daemon.add_vt(
395
            '1.2.3.4',
396
            'A vulnerability test',
397
            vt_params="a",
398
            custom="b",
399
            affected="c",
400
        )
401
        fs = FakeStream()
402
403
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
404
        response = fs.get_response()
405
406
        affect = response.findall('vts/vt/affected')
407
        self.assertEqual(1, len(affect))
408
409
    def test_get_vts_vts_with_insight(self):
410
        daemon = DummyWrapper([])
411
        daemon.add_vt(
412
            '1.2.3.4',
413
            'A vulnerability test',
414
            vt_params="a",
415
            custom="b",
416
            insight="c",
417
        )
418
        fs = FakeStream()
419
420
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
421
        response = fs.get_response()
422
423
        insight = response.findall('vts/vt/insight')
424
        self.assertEqual(1, len(insight))
425
426
    def test_get_vts_vts_with_solution(self):
427
        daemon = DummyWrapper([])
428
        daemon.add_vt(
429
            '1.2.3.4',
430
            'A vulnerability test',
431
            vt_params="a",
432
            custom="b",
433
            solution="c",
434
            solution_t="d",
435
            solution_m="e",
436
        )
437
        fs = FakeStream()
438
439
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
440
        response = fs.get_response()
441
442
        solution = response.findall('vts/vt/solution')
443
        self.assertEqual(1, len(solution))
444
445
    def test_get_vts_vts_with_ctime(self):
446
        daemon = DummyWrapper([])
447
        daemon.add_vt(
448
            '1.2.3.4',
449
            'A vulnerability test',
450
            vt_params="a",
451
            vt_creation_time='01-01-1900',
452
        )
453
        fs = FakeStream()
454
455
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
456
        response = fs.get_response()
457
458
        creation_time = response.findall('vts/vt/creation_time')
459
        self.assertEqual(
460
            '<creation_time>01-01-1900</creation_time>',
461
            ET.tostring(creation_time[0]).decode('utf-8'),
462
        )
463
464
    def test_get_vts_vts_with_mtime(self):
465
        daemon = DummyWrapper([])
466
        daemon.add_vt(
467
            '1.2.3.4',
468
            'A vulnerability test',
469
            vt_params="a",
470
            vt_modification_time='02-01-1900',
471
        )
472
        fs = FakeStream()
473
474
        daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs)
475
        response = fs.get_response()
476
477
        modification_time = response.findall('vts/vt/modification_time')
478
        self.assertEqual(
479
            '<modification_time>02-01-1900</modification_time>',
480
            ET.tostring(modification_time[0]).decode('utf-8'),
481
        )
482
483
    def test_clean_forgotten_scans(self):
484
        daemon = DummyWrapper([])
485
        fs = FakeStream()
486
487
        daemon.handle_command(
488
            '<start_scan target="localhost" ports="80, '
489
            '443"><scanner_params /></start_scan>',
490
            fs,
491
        )
492
        response = fs.get_response()
493
494
        scan_id = response.findtext('id')
495
496
        finished = False
497
498 View Code Duplication
        while not finished:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
499
            fs = FakeStream()
500
            daemon.handle_command(
501
                '<get_scans scan_id="%s" details="1"/>' % scan_id, fs
502
            )
503
            response = fs.get_response()
504
505
            scans = response.findall('scan')
506
            self.assertEqual(1, len(scans))
507
508
            scan = scans[0]
509
            status = scan.get('status')
510
511
            if status == "init" or status == "running":
512
                self.assertEqual('0', scan.get('end_time'))
513
                time.sleep(0.010)
514
            else:
515
                finished = True
516
517
            fs = FakeStream()
518
            daemon.handle_command(
519
                '<get_scans scan_id="%s" details="1"/>' % scan_id, fs
520
            )
521
            response = fs.get_response()
522
523
        self.assertEqual(len(list(daemon.scan_collection.ids_iterator())), 1)
524
525
        # Set an old end_time
526
        daemon.scan_collection.scans_table[scan_id]['end_time'] = 123456
527
        # Run the check
528
        daemon.clean_forgotten_scans()
529
        # Not removed
530
        self.assertEqual(len(list(daemon.scan_collection.ids_iterator())), 1)
531
532
        # Set the max time and run again
533
        daemon.scaninfo_store_time = 1
534
        daemon.clean_forgotten_scans()
535
        # Now is removed
536
        self.assertEqual(len(list(daemon.scan_collection.ids_iterator())), 0)
537
538
    def test_scan_with_error(self):
539
        daemon = DummyWrapper([Result('error', value='something went wrong')])
540
        fs = FakeStream()
541
542
        daemon.handle_command(
543
            '<start_scan target="localhost" ports="80, '
544
            '443"><scanner_params /></start_scan>',
545
            fs,
546
        )
547
        response = fs.get_response()
548
        scan_id = response.findtext('id')
549
        print(scan_id)
550
        finished = False
551 View Code Duplication
        while not finished:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
552
            fs = FakeStream()
553
            daemon.handle_command(
554
                '<get_scans scan_id="%s" details="1"/>' % scan_id, fs
555
            )
556
            response = fs.get_response()
557
558
            scans = response.findall('scan')
559
            self.assertEqual(1, len(scans))
560
561
            scan = scans[0]
562
            status = scan.get('status')
563
564
            if status == "init" or status == "running":
565
                self.assertEqual('0', scan.get('end_time'))
566
                time.sleep(0.010)
567
            else:
568
                finished = True
569
570
            fs = FakeStream()
571
572
            daemon.handle_command(
573
                '<get_scans scan_id="%s" details="1"/>' % scan_id, fs
574
            )
575
            response = fs.get_response()
576
577
        self.assertEqual(
578
            response.findtext('scan/results/result'), 'something went wrong'
579
        )
580
        fs = FakeStream()
581
        daemon.handle_command('<delete_scan scan_id="%s" />' % scan_id, fs)
582
        response = fs.get_response()
583
584
        self.assertEqual(response.get('status'), '200')
585
586 View Code Duplication
    def test_get_scan_pop(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
587
        daemon = DummyWrapper([Result('host-detail', value='Some Host Detail')])
588
        fs = FakeStream()
589
590
        daemon.handle_command(
591
            '<start_scan target="localhost" ports="80, 443">'
592
            '<scanner_params /></start_scan>',
593
            fs,
594
        )
595
        response = fs.get_response()
596
597
        scan_id = response.findtext('id')
598
        time.sleep(1)
599
600
        fs = FakeStream()
601
        daemon.handle_command('<get_scans scan_id="%s"/>' % scan_id, fs)
602
        response = fs.get_response()
603
604
        self.assertEqual(
605
            response.findtext('scan/results/result'), 'Some Host Detail'
606
        )
607
        fs = FakeStream()
608
        daemon.handle_command(
609
            '<get_scans scan_id="%s" pop_results="1"/>' % scan_id, fs
610
        )
611
        response = fs.get_response()
612
613
        self.assertEqual(
614
            response.findtext('scan/results/result'), 'Some Host Detail'
615
        )
616
617
        fs = FakeStream()
618
        daemon.handle_command('<get_scans details="0" pop_results="1"/>', fs)
619
        response = fs.get_response()
620
621
        self.assertEqual(response.findtext('scan/results/result'), None)
622
623 View Code Duplication
    def test_get_scan_pop_max_res(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
624
        daemon = DummyWrapper(
625
            [
626
                Result('host-detail', value='Some Host Detail'),
627
                Result('host-detail', value='Some Host Detail1'),
628
                Result('host-detail', value='Some Host Detail2'),
629
            ]
630
        )
631
        fs = FakeStream()
632
        daemon.handle_command(
633
            '<start_scan target="localhost" ports="80, 443">'
634
            '<scanner_params /></start_scan>',
635
            fs,
636
        )
637
        response = fs.get_response()
638
639
        scan_id = response.findtext('id')
640
        time.sleep(1)
641
642
        fs = FakeStream()
643
        daemon.handle_command(
644
            '<get_scans scan_id="%s" pop_results="1" max_results="1"/>'
645
            % scan_id,
646
            fs,
647
        )
648
        response = fs.get_response()
649
650
        self.assertEqual(len(response.findall('scan/results/result')), 1)
651
652
        fs = FakeStream()
653
        daemon.handle_command(
654
            '<get_scans scan_id="%s" pop_results="1"/>' % scan_id, fs
655
        )
656
        response = fs.get_response()
657
658
        self.assertEqual(len(response.findall('scan/results/result')), 2)
659
660
    def test_billon_laughs(self):
661
        # pylint: disable=line-too-long
662
        daemon = DummyWrapper([])
663
        lol = (
664
            '<?xml version="1.0"?>'
665
            '<!DOCTYPE lolz ['
666
            ' <!ENTITY lol "lol">'
667
            ' <!ELEMENT lolz (#PCDATA)>'
668
            ' <!ENTITY lol1 "&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;">'
669
            ' <!ENTITY lol2 "&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;">'
670
            ' <!ENTITY lol3 "&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;">'
671
            ' <!ENTITY lol4 "&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;">'
672
            ' <!ENTITY lol5 "&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;">'
673
            ' <!ENTITY lol6 "&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;">'
674
            ' <!ENTITY lol7 "&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;">'
675
            ' <!ENTITY lol8 "&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;">'
676
            ' <!ENTITY lol9 "&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;">'
677
            ']>'
678
        )
679
        fs = FakeStream()
680
        self.assertRaises(EntitiesForbidden, daemon.handle_command, lol, fs)
681
682
    def test_target_with_credentials(self):
683
        daemon = DummyWrapper([])
684
        fs = FakeStream()
685
        daemon.handle_command(
686
            '<start_scan>'
687
            '<scanner_params /><vts><vt id="1.2.3.4" />'
688
            '</vts>'
689
            '<targets><target>'
690
            '<hosts>192.168.0.0/24</hosts><ports>22'
691
            '</ports><credentials>'
692
            '<credential type="up" service="ssh" port="22">'
693
            '<username>scanuser</username>'
694
            '<password>mypass</password>'
695
            '</credential><credential type="up" service="smb">'
696
            '<username>smbuser</username>'
697
            '<password>mypass</password></credential>'
698
            '</credentials>'
699
            '</target></targets>'
700
            '</start_scan>',
701
            fs,
702
        )
703
        response = fs.get_response()
704
705
        self.assertEqual(response.get('status'), '200')
706
707
        cred_dict = {
708
            'ssh': {
709
                'type': 'up',
710
                'password': 'mypass',
711
                'port': '22',
712
                'username': 'scanuser',
713
            },
714
            'smb': {'type': 'up', 'password': 'mypass', 'username': 'smbuser'},
715
        }
716
        scan_id = response.findtext('id')
717
        response = daemon.get_scan_credentials(scan_id)
718
        self.assertEqual(response, cred_dict)
719
720
    def test_scan_get_target(self):
721
        daemon = DummyWrapper([])
722
        fs = FakeStream()
723
        daemon.handle_command(
724
            '<start_scan>'
725
            '<scanner_params /><vts><vt id="1.2.3.4" />'
726
            '</vts>'
727
            '<targets><target>'
728
            '<hosts>localhosts,192.168.0.0/24</hosts>'
729
            '<ports>80,443</ports>'
730
            '</target></targets>'
731
            '</start_scan>',
732
            fs,
733
        )
734
        response = fs.get_response()
735
        scan_id = response.findtext('id')
736
737
        fs = FakeStream()
738
        daemon.handle_command('<get_scans scan_id="%s"/>' % scan_id, fs)
739
        response = fs.get_response()
740
741
        scan_res = response.find('scan')
742
        self.assertEqual(scan_res.get('target'), 'localhosts,192.168.0.0/24')
743
744
    def test_scan_get_target_options(self):
745
        daemon = DummyWrapper([])
746
        fs = FakeStream()
747
        daemon.handle_command(
748
            '<start_scan>'
749
            '<scanner_params /><vts><vt id="1.2.3.4" />'
750
            '</vts>'
751
            '<targets>'
752
            '<target><hosts>192.168.0.1</hosts>'
753
            '<ports>22</ports><alive_test>0</alive_test></target>'
754
            '</targets>'
755
            '</start_scan>',
756
            fs,
757
        )
758
        response = fs.get_response()
759
760
        scan_id = response.findtext('id')
761
        time.sleep(1)
762
        target_options = daemon.get_scan_target_options(scan_id)
763
        self.assertEqual(target_options, {'alive_test': '0'})
764
765
    def test_scan_get_finished_hosts(self):
766
        daemon = DummyWrapper([])
767
        fs = FakeStream()
768
        daemon.handle_command(
769
            '<start_scan>'
770
            '<scanner_params /><vts><vt id="1.2.3.4" />'
771
            '</vts>'
772
            '<targets><target>'
773
            '<hosts>192.168.10.20-25</hosts>'
774
            '<ports>80,443</ports>'
775
            '<finished_hosts>192.168.10.23-24'
776
            '</finished_hosts>'
777
            '</target>'
778
            '<target><hosts>192.168.0.0/24</hosts>'
779
            '<ports>22</ports></target>'
780
            '</targets>'
781
            '</start_scan>',
782
            fs,
783
        )
784
        response = fs.get_response()
785
786
        scan_id = response.findtext('id')
787
        time.sleep(1)
788
        finished = daemon.get_scan_finished_hosts(scan_id)
789
        self.assertEqual(finished, ['192.168.10.23', '192.168.10.24'])
790
791
    def test_progress(self):
792
        daemon = DummyWrapper([])
793
794
        fs = FakeStream()
795
        daemon.handle_command(
796
            '<start_scan parallel="2">'
797
            '<scanner_params />'
798
            '<targets><target>'
799
            '<hosts>localhost1, localhost2</hosts>'
800
            '<ports>22</ports>'
801
            '</target></targets>'
802
            '</start_scan>',
803
            fs,
804
        )
805
        response = fs.get_response()
806
807
        scan_id = response.findtext('id')
808
809
        daemon.set_scan_host_progress(scan_id, 'localhost1', 75)
810
        daemon.set_scan_host_progress(scan_id, 'localhost2', 25)
811
812
        self.assertEqual(daemon.calculate_progress(scan_id), 50)
813
814
    def test_set_get_vts_version(self):
815
        daemon = DummyWrapper([])
816
        daemon.set_vts_version('1234')
817
818
        version = daemon.get_vts_version()
819
        self.assertEqual('1234', version)
820
821
    def test_set_get_vts_version_error(self):
822
        daemon = DummyWrapper([])
823
        self.assertRaises(TypeError, daemon.set_vts_version)
824
825
    @patch("ospd.ospd.os")
826
    @patch("ospd.command.command.create_process")
827
    def test_resume_task(self, mock_create_process, _mock_os):
828
        daemon = DummyWrapper(
829
            [
830
                Result(
831
                    'host-detail', host='localhost', value='Some Host Detail'
832
                ),
833
                Result(
834
                    'host-detail', host='localhost', value='Some Host Detail2'
835
                ),
836
            ]
837
        )
838
839
        fp = FakeStartProcess()
840
        mock_create_process.side_effect = fp
841
        mock_process = fp.call_mock
842
        mock_process.start.side_effect = fp.run
843
        mock_process.is_alive.return_value = True
844
        mock_process.pid = "main-scan-process"
845
846
        fs = FakeStream()
847
        daemon.handle_command(
848
            '<start_scan>'
849
            '<scanner_params />'
850
            '<targets><target>'
851
            '<hosts>localhost</hosts>'
852
            '<ports>22</ports>'
853
            '</target></targets>'
854
            '</start_scan>',
855
            fs,
856
        )
857
        response = fs.get_response()
858
        scan_id = response.findtext('id')
859
860
        self.assertIsNotNone(scan_id)
861
862
        assert_called(mock_create_process)
863
        assert_called(mock_process.start)
864
865
        fs = FakeStream()
866
        daemon.handle_command('<stop_scan scan_id="%s" />' % scan_id, fs)
867
868
        fs = FakeStream()
869
        daemon.handle_command(
870
            '<get_scans scan_id="%s" details="1"/>' % scan_id, fs
871
        )
872
        response = fs.get_response()
873
874
        result = response.findall('scan/results/result')
875
        self.assertEqual(len(result), 2)
876
877
        # Resume the task
878
        cmd = (
879
            '<start_scan scan_id="{}" target="localhost" ports="80, 443">'
880
            '<scanner_params />'
881
            '</start_scan>'.format(scan_id)
882
        )
883
        fs = FakeStream()
884
        daemon.handle_command(cmd, fs)
885
        response = fs.get_response()
886
887
        # Check unfinished host
888
        self.assertEqual(response.findtext('id'), scan_id)
889
        self.assertEqual(
890
            daemon.get_scan_unfinished_hosts(scan_id), ['localhost']
891
        )
892
893
        # Finished the host and check unfinished again.
894
        daemon.set_scan_host_finished(scan_id, "localhost")
895
        self.assertEqual(len(daemon.get_scan_unfinished_hosts(scan_id)), 0)
896
897
        # Check finished hosts
898
        self.assertEqual(
899
            daemon.scan_collection.get_hosts_finished(scan_id), ['localhost']
900
        )
901
902
        # Check if the result was removed.
903
        fs = FakeStream()
904
        daemon.handle_command(
905
            '<get_scans scan_id="%s" details="1"/>' % scan_id, fs
906
        )
907
        response = fs.get_response()
908
        result = response.findall('scan/results/result')
909
910
        # current the response still contains the results
911
        # self.assertEqual(len(result), 0)
912
913
    def test_result_order(self):
914
        daemon = DummyWrapper([])
915
        fs = FakeStream()
916
        daemon.handle_command(
917
            '<start_scan parallel="1">'
918
            '<scanner_params />'
919
            '<targets><target>'
920
            '<hosts>a</hosts>'
921
            '<ports>22</ports>'
922
            '</target></targets>'
923
            '</start_scan>',
924
            fs,
925
        )
926
927
        response = fs.get_response()
928
929
        scan_id = response.findtext('id')
930
931
        daemon.add_scan_log(scan_id, host='a', name='a')
932
        daemon.add_scan_log(scan_id, host='c', name='c')
933
        daemon.add_scan_log(scan_id, host='b', name='b')
934
        hosts = ['a', 'c', 'b']
935
936
        fs = FakeStream()
937
        daemon.handle_command('<get_scans details="1"/>', fs)
938
        response = fs.get_response()
939
940
        results = response.findall("scan/results/")
941
942
        for idx, res in enumerate(results):
943
            att_dict = res.attrib
944
            self.assertEqual(hosts[idx], att_dict['name'])
945