Passed
Pull Request — master (#329)
by
unknown
01:14
created

ospd.protocol.RequestParser.__init__()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
# Copyright (C) 2014-2020 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: AGPL-3.0-or-later
4
#
5
# This program is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Affero General Public License as
7
# published by the Free Software Foundation, either version 3 of the
8
# License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
""" Helper classes for parsing and creating OSP XML requests and responses
19
"""
20
21
from typing import Dict, Union, List, Any
22
23
from xml.etree.ElementTree import SubElement, Element, XMLPullParser
24
25
from ospd.errors import OspdError
26
27
28
class RequestParser:
29
    def __init__(self):
30
        self._parser = XMLPullParser(['start', 'end'])
31
        self._root_element = None
32
33
    def has_ended(self, data: bytes) -> bool:
34
        self._parser.feed(data)
35
36
        for event, element in self._parser.read_events():
37
            if event == 'start' and self._root_element is None:
38
                self._root_element = element
39
            elif event == 'end' and self._root_element is not None:
40
                if element.tag == self._root_element.tag:
41
                    return True
42
43
        return False
44
45
46
class OspRequest:
47
    @staticmethod
48
    def process_vts_params(
49
        scanner_vts: Element,
50
    ) -> Dict[str, Union[Dict[str, str], List]]:
51
        """Receive an XML object with the Vulnerability Tests an their
52
        parameters to be use in a scan and return a dictionary.
53
54
        @param: XML element with vt subelements. Each vt has an
55
                id attribute. Optional parameters can be included
56
                as vt child.
57
                Example form:
58
                <vt_selection>
59
                  <vt_single id='vt1' />
60
                  <vt_single id='vt2'>
61
                    <vt_value id='param1'>value</vt_value>
62
                  </vt_single>
63
                  <vt_group filter='family=debian'/>
64
                  <vt_group filter='family=general'/>
65
                </vt_selection>
66
67
        @return: Dictionary containing the vts attribute and subelements,
68
                 like the VT's id and VT's parameters.
69
                 Example form:
70
                 {'vt1': {},
71
                  'vt2': {'value_id': 'value'},
72
                  'vt_groups': ['family=debian', 'family=general']}
73
        """
74
        vt_selection = {}  # type: Dict
75
        filters = []
76
77
        for vt in scanner_vts:
78
            if vt.tag == 'vt_single':
79
                vt_id = vt.attrib.get('id')
80
                vt_selection[vt_id] = {}
81
82
                for vt_value in vt:
83
                    if not vt_value.attrib.get('id'):
84
                        raise OspdError(
85
                            'Invalid VT preference. No attribute id'
86
                        )
87
88
                    vt_value_id = vt_value.attrib.get('id')
89
                    vt_value_value = vt_value.text if vt_value.text else ''
90
                    vt_selection[vt_id][vt_value_id] = vt_value_value
91
92
            if vt.tag == 'vt_group':
93
                vts_filter = vt.attrib.get('filter', None)
94
95
                if vts_filter is None:
96
                    raise OspdError('Invalid VT group. No filter given.')
97
98
                filters.append(vts_filter)
99
100
        vt_selection['vt_groups'] = filters
101
102
        return vt_selection
103
104
    @staticmethod
105
    def process_credentials_elements(cred_tree: Element) -> Dict:
106
        """Receive an XML object with the credentials to run
107
        a scan against a given target.
108
109
        @param:
110
        <credentials>
111
          <credential type="up" service="ssh" port="22">
112
            <username>scanuser</username>
113
            <password>mypass</password>
114
          </credential>
115
          <credential type="up" service="smb">
116
            <username>smbuser</username>
117
            <password>mypass</password>
118
          </credential>
119
        </credentials>
120
121
        @return: Dictionary containing the credentials for a given target.
122
                 Example form:
123
                 {'ssh': {'type': type,
124
                          'port': port,
125
                          'username': username,
126
                          'password': pass,
127
                        },
128
                  'smb': {'type': type,
129
                          'username': username,
130
                          'password': pass,
131
                         },
132
                   }
133
        """
134
        credentials = {}  # type: Dict
135
136
        for credential in cred_tree:
137
            service = credential.attrib.get('service')
138
            credentials[service] = {}
139
            credentials[service]['type'] = credential.attrib.get('type')
140
141
            if service == 'ssh':
142
                credentials[service]['port'] = credential.attrib.get('port')
143
144
            for param in credential:
145
                credentials[service][param.tag] = param.text
146
147
        return credentials
148
149
    @staticmethod
150
    def process_alive_test_methods(cred_tree: Element, options: Dict) -> None:
151
        """ Receive an XML object with the alive test methods to run
152
        a scan with. Methods are added to the options Dict.
153
154
        @param
155
        <alive_test_methods>
156
            </icmp></icmp>
157
            </tcp_ack></tcp_ack>
158
            </tcp_syn></tcp_syn>
159
            </arp></arp>
160
            </consider_alive>0</consider_alive>
161
        </alive_test_methods>
162
        """
163
        for child in cred_tree:
164
            if child.tag == 'icmp':
165
                options['icmp'] = child.text
166
            if child.tag == 'tcp_ack':
167
                options['tcp_ack'] = child.text
168
            if child.tag == 'tcp_syn':
169
                options['tcp_syn'] = child.text
170
            if child.tag == 'arp':
171
                options['arp'] = child.text
172
            if child.tag == 'consider_alive':
173
                options['consider_alive'] = child.text
174
175
    @classmethod
176
    def process_target_element(cls, scanner_target: Element) -> Dict:
177
        """Receive an XML object with the target, ports and credentials to run
178
        a scan against.
179
180
        Arguments:
181
            Single XML target element. The target has <hosts> and <ports>
182
            subelements. Hosts can be a single host, a host range, a
183
            comma-separated host list or a network address.
184
            <ports> and  <credentials> are optional. Therefore each
185
            ospd-scanner should check for a valid ones if needed.
186
187
            Example form:
188
189
            <target>
190
                <hosts>192.168.0.0/24</hosts>
191
                <ports>22</ports>
192
                <credentials>
193
                    <credential type="up" service="ssh" port="22">
194
                    <username>scanuser</username>
195
                    <password>mypass</password>
196
                    </credential>
197
                    <credential type="up" service="smb">
198
                    <username>smbuser</username>
199
                    <password>mypass</password>
200
                    </credential>
201
                </credentials>
202
                <alive_test></alive_test>
203
                <alive_test_ports></alive_test_ports>
204
                <reverse_lookup_only>1</reverse_lookup_only>
205
                <reverse_lookup_unify>0</reverse_lookup_unify>
206
            </target>
207
208
        Return:
209
            A Dict  hosts, port, {credentials}, exclude_hosts, options].
210
211
            Example form:
212
213
            {
214
                'hosts': '192.168.0.0/24',
215
                'port': '22',
216
                'credentials': {'smb': {'type': type,
217
                                        'port': port,
218
                                        'username': username,
219
                                        'password': pass,
220
                                        }
221
                                },
222
223
                'exclude_hosts': '',
224
                'finished_hosts': '',
225
                'options': {'alive_test': 'ALIVE_TEST_CONSIDER_ALIVE',
226
                            'alive_test_ports: '22,80,123',
227
                            'reverse_lookup_only': '1',
228
                            'reverse_lookup_unify': '0',
229
                            },
230
            }
231
        """
232
        if scanner_target:
233
            exclude_hosts = ''
234
            finished_hosts = ''
235
            ports = ''
236
            hosts = None
237
            credentials = {}  # type: Dict
238
            options = {}
239
240
            for child in scanner_target:
241
                if child.tag == 'hosts':
242
                    hosts = child.text
243
                if child.tag == 'exclude_hosts':
244
                    exclude_hosts = child.text
245
                if child.tag == 'finished_hosts':
246
                    finished_hosts = child.text
247
                if child.tag == 'ports':
248
                    ports = child.text
249
                if child.tag == 'credentials':
250
                    credentials = cls.process_credentials_elements(child)
251
                if child.tag == 'alive_test_methods':
252
                    options['alive_test_methods'] = '1'
253
                    cls.process_alive_test_methods(child, options)
254
                if child.tag == 'alive_test':
255
                    options['alive_test'] = child.text
256
                if child.tag == 'alive_test_ports':
257
                    options['alive_test_ports'] = child.text
258
                if child.tag == 'reverse_lookup_unify':
259
                    options['reverse_lookup_unify'] = child.text
260
                if child.tag == 'reverse_lookup_only':
261
                    options['reverse_lookup_only'] = child.text
262
263
            if hosts:
264
                return {
265
                    'hosts': hosts,
266
                    'ports': ports,
267
                    'credentials': credentials,
268
                    'exclude_hosts': exclude_hosts,
269
                    'finished_hosts': finished_hosts,
270
                    'options': options,
271
                }
272
            else:
273
                raise OspdError('No target to scan')
274
275
276
class OspResponse:
277
    @staticmethod
278
    def create_scanner_params_xml(scanner_params: Dict[str, Any]) -> Element:
279
        """ Returns the OSP Daemon's scanner params in xml format. """
280
        scanner_params_xml = Element('scanner_params')
281
282
        for param_id, param in scanner_params.items():
283
            param_xml = SubElement(scanner_params_xml, 'scanner_param')
284
285
            for name, value in [('id', param_id), ('type', param['type'])]:
286
                param_xml.set(name, value)
287
288
            for name, value in [
289
                ('name', param['name']),
290
                ('description', param['description']),
291
                ('default', param['default']),
292
                ('mandatory', param['mandatory']),
293
            ]:
294
                elem = SubElement(param_xml, name)
295
                elem.text = str(value)
296
297
        return scanner_params_xml
298