Test Failed
Push — master ( b41a4b...a8c94d )
by
unknown
26s
created

DummyWrapper.get_params_vt_as_xml_str()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
from __future__ import print_function
2
3
4
import time
5
import unittest
6
import xml.etree.ElementTree as ET
7
8
from ospd.ospd import OSPDaemon, OSPDError
9
10
class Result(object):
11
    def __init__(self, type_, **kwargs):
12
        self.result_type=type_
13
        self.host=''
14
        self.name=''
15
        self.value=''
16
        self.port=''
17
        self.test_id=''
18
        self.severity=''
19
        self.qod=''
20
        for name, value in kwargs.items():
21
            setattr(self, name, value)
22
23
class DummyWrapper(OSPDaemon):
24
    def __init__(self, results, checkresult=True):
25
        OSPDaemon.__init__(self, 'cert', 'key', 'ca')
26
        self.checkresult = checkresult
27
        self.results = results
28
29
    def check(self):
30
        return self.checkresults
31
32
    def get_custom_vt_as_xml_str(self, custom):
33
        return '<mytest>static test</mytest>'
34
35
    def get_params_vt_as_xml_str(self, vt_param):
36
        return ('<vt_param id="abc" type="string">'
37
                '<name>ABC</name><description>Test ABC</description><default>yes</default>'
38
                '</vt_param>'
39
                '<vt_param id="def" type="string">'
40
                '<name>DEF</name><description>Test DEF</description><default>no</default>'
41
                '</vt_param>')
42
43
    def exec_scan(self, scan_id, target):
44
        time.sleep(0.01)
45
        for res in self.results:
46
            if res.result_type=='log':
47
                self.add_scan_log(scan_id, res.host or target, res.name, res.value, res.port)
48
            if res.result_type == 'error':
49
                self.add_scan_error(scan_id, res.host or target, res.name, res.value, res.port)
50
            elif res.result_type == 'host-detail':
51
                self.add_scan_error(scan_id, res.host  or target, res.name, res.value)
52
            elif res.result_type == 'alarm':
53
                self.add_scan_alarm(scan_id, res.host or target, res.name, res.value, res.port, res.test_id, res.severity, res.qod)
54
            else:
55
                raise ValueError(res.result_type)
56
57
class FullTest(unittest.TestCase):
58
    # TODO: There should be a lot more assert in there !
59
60
    def testGetDefaultScannerParams(self):
61
        daemon = DummyWrapper([])
62
        response = ET.fromstring(daemon.handle_command('<get_scanner_details />'))
63
        # The status of the response must be success (i.e. 200)
64
        self.assertEqual(response.get('status'), '200')
65
        # The response root element must have the correct name
66
        self.assertEqual(response.tag, 'get_scanner_details_response')
67
        # The response must contain a 'scanner_params' element
68
        self.assertIsNotNone(response.find('scanner_params'))
69
70
    def testGetDefaultHelp(self):
71
        daemon = DummyWrapper([])
72
        response = ET.fromstring(daemon.handle_command('<help />'))
73
        print(ET.tostring(response))
74
        response = ET.fromstring(daemon.handle_command('<help format="xml" />'))
75
        print(ET.tostring(response))
76
77
    def testGetDefaultScannerVersion(self):
78
        daemon = DummyWrapper([])
79
        response = ET.fromstring(daemon.handle_command('<get_version />'))
80
        print(ET.tostring(response))
81
82
    def testGetVTs_no_VT(self):
83
        daemon = DummyWrapper([])
84
        response = ET.fromstring(daemon.handle_command('<get_vts />'))
85
        print(ET.tostring(response))
86
87
    def testGetVTs_single_VT(self):
88
        daemon = DummyWrapper([])
89
        daemon.add_vt('1.2.3.4', 'A vulnerability test')
90
        response = ET.fromstring(daemon.handle_command('<get_vts />'))
91
        print(ET.tostring(response))
92
93
    def testGetVTs_multiple_VTs(self):
94
        daemon = DummyWrapper([])
95
        daemon.add_vt('1.2.3.4', 'A vulnerability test')
96
        daemon.add_vt('some id', 'Another vulnerability test')
97
        daemon.add_vt('123456789', 'Yet another vulnerability test')
98
        response = ET.fromstring(daemon.handle_command('<get_vts />'))
99
        print(ET.tostring(response))
100
101
    def testGetVTs_multiple_VTs_with_custom(self):
102
        daemon = DummyWrapper([])
103
        daemon.add_vt('1.2.3.4', 'A vulnerability test')
104
        daemon.add_vt('some id', 'Another vulnerability test with custom info', { 'depencency': '1.2.3.4' })
105
        daemon.add_vt('123456789', 'Yet another vulnerability test')
106
        response = ET.fromstring(daemon.handle_command('<get_vts />'))
107
        print(ET.tostring(response))
108
109
    def testGetVTs_VTs_with_params(self):
110
        daemon = DummyWrapper([])
111
        daemon.add_vt('1.2.3.4', 'A vulnerability test', vt_params="a", custom="b")
112
        response = ET.fromstring(daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>'))
113
        print(ET.tostring(response))
114
        # The status of the response must be success (i.e. 200)
115
        self.assertEqual(response.get('status'), '200')
116
        # The response root element must have the correct name
117
        self.assertEqual(response.tag, 'get_vts_response')
118
        # The response must contain a 'scanner_params' element
119
        self.assertIsNotNone(response.find('vts'))
120
        vt_params =  response[0][0].findall('vt_params')
121
        self.assertEqual(1, len(vt_params))
122
        custom =  response[0][0].findall('custom')
123
        self.assertEqual(1, len(custom))
124
        params = response.findall('vts/vt/vt_params/vt_param')
125
        self.assertEqual(2, len(params))
126
127
128
    def testiScanWithError(self):
129
        daemon = DummyWrapper([
130
            Result('error', value='something went wrong'),
131
        ])
132
133
        response = ET.fromstring(daemon.handle_command('<start_scan target="localhost" ports="80, 443"><scanner_params /></start_scan>'))
134
        print(ET.tostring(response))
135
        scan_id = response.findtext('id')
136
        finished = False
137
        while not finished:
138
            response = ET.fromstring(daemon.handle_command('<get_scans scan_id="%s" details="0"/>' % scan_id))
139
            print(ET.tostring(response))
140
            scans = response.findall('scan')
141
            self.assertEqual(1, len(scans))
142
            scan = scans[0]
143
            if int(scan.get('progress')) != 100:
144
                self.assertEqual('0', scan.get('end_time'))
145
                time.sleep(.010)
146
            else:
147
                finished = True
148
        response = ET.fromstring(daemon.handle_command('<get_scans scan_id="%s"/>' % scan_id))
149
        print(ET.tostring(response))
150
        response = ET.fromstring(daemon.handle_command('<get_scans />'))
151
        print(ET.tostring(response))
152
        response = ET.fromstring(daemon.handle_command('<get_scans scan_id="%s" details="1"/>' % scan_id))
153
        self.assertEqual(response.findtext('scan/results/result'), 'something went wrong')
154
        print(ET.tostring(response))
155
156
        response = ET.fromstring(daemon.handle_command('<delete_scan scan_id="%s" />' % scan_id))
157
        self.assertEqual(response.get('status'), '200')
158
        print(ET.tostring(response))
159
160
    def testStopScan(self):
161
        daemon = DummyWrapper([])
162
        response = ET.fromstring(
163
            daemon.handle_command('<start_scan ' +
164
                                  'target="localhost" ports="80, 443">' +
165
                                  '<scanner_params /></start_scan>'))
166
        print(ET.tostring(response))
167
        scan_id = response.findtext('id')
168
        time.sleep(0.01)
169
170
        response = daemon.stop_scan(scan_id)
171
        self.assertEqual(response, None)
172
173
        response = ET.fromstring(daemon.handle_command(
174
            '<stop_scan scan_id="%s" />' % scan_id))
175
        self.assertEqual(response.get('status'), '200')
176
        print(ET.tostring(response))
177
178
    def testScanWithVTs(self):
179
        daemon = DummyWrapper([])
180
        cmd = ET.fromstring('<start_scan ' +
181
                            'target="localhost" ports="80, 443">' +
182
                            '<scanner_params /><vts /></start_scan>')
183
        print(ET.tostring(cmd))
184
        self.assertRaises(OSPDError, daemon.handle_start_scan_command, cmd)
185
186
        # With one VT, without params
187
        response = ET.fromstring(
188
            daemon.handle_command('<start_scan ' +
189
                                  'target="localhost" ports="80, 443">' +
190
                                  '<scanner_params /><vts><vt id="1.2.3.4" />' +
191
                                  '</vts></start_scan>'))
192
        print(ET.tostring(response))
193
        scan_id = response.findtext('id')
194
        time.sleep(0.01)
195
        self.assertEqual(daemon.get_scan_vts(scan_id), {'1.2.3.4': {}})
196
        self.assertNotEqual(daemon.get_scan_vts(scan_id), {'1.2.3.6': {}})
197
198
        # With out VTS
199
        response = ET.fromstring(
200
            daemon.handle_command('<start_scan ' +
201
                                  'target="localhost" ports="80, 443">' +
202
                                  '<scanner_params /></start_scan>'))
203
        print(ET.tostring(response))
204
        scan_id = response.findtext('id')
205
        time.sleep(0.01)
206
        self.assertEqual(daemon.get_scan_vts(scan_id), {})
207
208
    def testScanWithVTs_and_param(self):
209
        daemon = DummyWrapper([])
210
211
        # Raise because no vt_param name attribute
212
        cmd = ET.fromstring('<start_scan ' +
213
                            'target="localhost" ports="80, 443">' +
214
                            '<scanner_params /><vts><vt id="1234">' +
215
                            '<vt_param type="entry">200</vt_param>' +
216
                            '</vt></vts></start_scan>')
217
        print(ET.tostring(cmd))
218
        self.assertRaises(OSPDError, daemon.handle_start_scan_command, cmd)
219
220
        # No error
221
        response = ET.fromstring(
222
            daemon.handle_command('<start_scan ' +
223
                                  'target="localhost" ports="80, 443">' +
224
                                  '<scanner_params /><vts><vt id="1234">' +
225
                                  '<vt_param name="ABC" type="entry">200' +
226
                                  '</vt_param></vt></vts></start_scan>'))
227
        print(ET.tostring(response))
228
        scan_id = response.findtext('id')
229
        time.sleep(0.01)
230
        self.assertEqual(daemon.get_scan_vts(scan_id),
231
                         {'1234': {'ABC': {'type': 'entry', 'value': '200'}}})
232