Passed
Pull Request — master (#24)
by
unknown
03:00
created

tests.testScanAndResult.FullTest.testStopScan()   A

Complexity

Conditions 1

Size

Total Lines 17
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 15
nop 1
dl 0
loc 17
rs 9.65
c 0
b 0
f 0
1
from __future__ import print_function
2
3
4
import time
5
import unittest
6
import xml.etree.ElementTree as ET
7
8
from ospd.ospd import OSPDaemon
9
10
class Result(object):
11
    def __init__(self, type_, **kwargs):
12
        self.result_type=type_
13
        self.host=''
14
        self.name=''
15
        self.value=''
16
        self.port=''
17
        self.test_id=''
18
        self.severity=''
19
        self.qod=''
20
        for name, value in kwargs.items():
21
            setattr(self, name, value)
22
23
class DummyWrapper(OSPDaemon):
24
    def __init__(self, results, checkresult=True):
25
        OSPDaemon.__init__(self, 'cert', 'key', 'ca')
26
        self.checkresult = checkresult
27
        self.results = results
28
29
    def check(self):
30
        return self.checkresults
31
32
    def get_custom_vt_as_xml_str(self, custom):
33
        return '<mytest>static test</mytest>'
34
35
    def exec_scan(self, scan_id, target):
36
        time.sleep(0.01)
37
        for res in self.results:
38
            if res.result_type=='log':
39
                self.add_scan_log(scan_id, res.host or target, res.name, res.value, res.port)
40
            if res.result_type == 'error':
41
                self.add_scan_error(scan_id, res.host or target, res.name, res.value, res.port)
42
            elif res.result_type == 'host-detail':
43
                self.add_scan_error(scan_id, res.host  or target, res.name, res.value)
44
            elif res.result_type == 'alarm':
45
                self.add_scan_alarm(scan_id, res.host or target, res.name, res.value, res.port, res.test_id, res.severity, res.qod)
46
            else:
47
                raise ValueError(res.result_type)
48
49
class FullTest(unittest.TestCase):
50
    # TODO: There should be a lot more assert in there !
51
52
    def testGetDefaultScannerParams(self):
53
        daemon = DummyWrapper([])
54
        response = ET.fromstring(daemon.handle_command('<get_scanner_details />'))
55
        # The status of the response must be success (i.e. 200)
56
        self.assertEqual(response.get('status'), '200')
57
        # The response root element must have the correct name
58
        self.assertEqual(response.tag, 'get_scanner_details_response')
59
        # The response must contain a 'scanner_params' element
60
        self.assertIsNotNone(response.find('scanner_params'))
61
62
    def testGetDefaultHelp(self):
63
        daemon = DummyWrapper([])
64
        response = ET.fromstring(daemon.handle_command('<help />'))
65
        print(ET.tostring(response))
66
        response = ET.fromstring(daemon.handle_command('<help format="xml" />'))
67
        print(ET.tostring(response))
68
69
    def testGetDefaultScannerVersion(self):
70
        daemon = DummyWrapper([])
71
        response = ET.fromstring(daemon.handle_command('<get_version />'))
72
        print(ET.tostring(response))
73
74
    def testGetVTs_no_VT(self):
75
        daemon = DummyWrapper([])
76
        response = ET.fromstring(daemon.handle_command('<get_vts />'))
77
        print(ET.tostring(response))
78
79
    def testGetVTs_single_VT(self):
80
        daemon = DummyWrapper([])
81
        daemon.add_vt('1.2.3.4', 'A vulnerability test')
82
        response = ET.fromstring(daemon.handle_command('<get_vts />'))
83
        print(ET.tostring(response))
84
85
    def testGetVTs_multiple_VTs(self):
86
        daemon = DummyWrapper([])
87
        daemon.add_vt('1.2.3.4', 'A vulnerability test')
88
        daemon.add_vt('some id', 'Another vulnerability test')
89
        daemon.add_vt('123456789', 'Yet another vulnerability test')
90
        response = ET.fromstring(daemon.handle_command('<get_vts />'))
91
        print(ET.tostring(response))
92
93
    def testGetVTs_multiple_VTs_with_custom(self):
94
        daemon = DummyWrapper([])
95
        daemon.add_vt('1.2.3.4', 'A vulnerability test')
96
        daemon.add_vt('some id', 'Another vulnerability test with custom info', { 'depencency': '1.2.3.4' })
97
        daemon.add_vt('123456789', 'Yet another vulnerability test')
98
        response = ET.fromstring(daemon.handle_command('<get_vts />'))
99
        print(ET.tostring(response))
100
101
    def testiScanWithError(self):
102
        daemon = DummyWrapper([
103
            Result('error', value='something went wrong'),
104
        ])
105
106
        response = ET.fromstring(daemon.handle_command('<start_scan target="localhost" ports="80, 443"><scanner_params /></start_scan>'))
107
        print(ET.tostring(response))
108
        scan_id = response.findtext('id')
109
        finished = False
110
        while not finished:
111
            response = ET.fromstring(daemon.handle_command('<get_scans scan_id="%s" details="0"/>' % scan_id))
112
            print(ET.tostring(response))
113
            scans = response.findall('scan')
114
            self.assertEqual(1, len(scans))
115
            scan = scans[0]
116
            if int(scan.get('progress')) != 100:
117
                self.assertEqual('0', scan.get('end_time'))
118
                time.sleep(.010)
119
            else:
120
                finished = True
121
        response = ET.fromstring(daemon.handle_command('<get_scans scan_id="%s"/>' % scan_id))
122
        print(ET.tostring(response))
123
        response = ET.fromstring(daemon.handle_command('<get_scans />'))
124
        print(ET.tostring(response))
125
        response = ET.fromstring(daemon.handle_command('<get_scans scan_id="%s" details="1"/>' % scan_id))
126
        self.assertEqual(response.findtext('scan/results/result'), 'something went wrong')
127
        print(ET.tostring(response))
128
129
        response = ET.fromstring(daemon.handle_command('<delete_scan scan_id="%s" />' % scan_id))
130
        self.assertEqual(response.get('status'), '200')
131
        print(ET.tostring(response))
132
133
    def testStopScan(self):
134
        daemon = DummyWrapper([])
135
        response = ET.fromstring(
136
            daemon.handle_command('<start_scan ' +
137
                                  'target="localhost" ports="80, 443">' +
138
                                  '<scanner_params /></start_scan>'))
139
        print(ET.tostring(response))
140
        scan_id = response.findtext('id')
141
        time.sleep(0.01)
142
143
        response = daemon.stop_scan(scan_id)
144
        self.assertEqual(response, None)
145
146
        response = ET.fromstring(daemon.handle_command(
147
            '<stop_scan scan_id="%s" />' % scan_id))
148
        self.assertEqual(response.get('status'), '200')
149
        print(ET.tostring(response))
150