Completed
Push — master ( edf821...f5f7b9 )
by Juan José
14s queued 11s
created

ospd.protocol.RequestParser.__init__()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
# Copyright (C) 2020 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: GPL-2.0-or-later
4
#
5
# This program is free software; you can redistribute it and/or
6
# modify it under the terms of the GNU General Public License
7
# as published by the Free Software Foundation; either version 2
8
# of the License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU General Public License for more details.
14
#
15
# You should have received a copy of the GNU General Public License
16
# along with this program; if not, write to the Free Software
17
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
19
""" Helper classes for parsing and creating OSP XML requests and responses
20
"""
21
22
from typing import Dict, Union, List, Any
23
24
from xml.etree.ElementTree import SubElement, Element, XMLPullParser
25
26
from ospd.errors import OspdError
27
28
29
class RequestParser:
30
    def __init__(self):
31
        self._parser = XMLPullParser(['start', 'end'])
32
        self._root_element = None
33
34
    def has_ended(self, data: bytes) -> bool:
35
        self._parser.feed(data)
36
37
        for event, element in self._parser.read_events():
38
            if event == 'start' and self._root_element is None:
39
                self._root_element = element
40
            elif event == 'end' and self._root_element is not None:
41
                if element.tag == self._root_element.tag:
42
                    return True
43
44
        return False
45
46
47
class OspRequest:
48
    @staticmethod
49
    def process_vts_params(
50
        scanner_vts: Element,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
51
    ) -> Dict[str, Union[Dict, List]]:
52
        """ Receive an XML object with the Vulnerability Tests an their
53
        parameters to be use in a scan and return a dictionary.
54
55
        @param: XML element with vt subelements. Each vt has an
56
                id attribute. Optional parameters can be included
57
                as vt child.
58
                Example form:
59
                <vt_selection>
60
                  <vt_single id='vt1' />
61
                  <vt_single id='vt2'>
62
                    <vt_value id='param1'>value</vt_value>
63
                  </vt_single>
64
                  <vt_group filter='family=debian'/>
65
                  <vt_group filter='family=general'/>
66
                </vt_selection>
67
68
        @return: Dictionary containing the vts attribute and subelements,
69
                 like the VT's id and VT's parameters.
70
                 Example form:
71
                 {'vt1': {},
72
                  'vt2': {'value_id': 'value'},
73
                  'vt_groups': ['family=debian', 'family=general']}
74
        """
75
        vt_selection = {}  # type: Dict
76
        filters = []
77
78
        for vt in scanner_vts:
79
            if vt.tag == 'vt_single':
80
                vt_id = vt.attrib.get('id')
81
                vt_selection[vt_id] = {}
82
83
                for vt_value in vt:
84
                    if not vt_value.attrib.get('id'):
85
                        raise OspdError(
86
                            'Invalid VT preference. No attribute id'
87
                        )
88
89
                    vt_value_id = vt_value.attrib.get('id')
90
                    vt_value_value = vt_value.text if vt_value.text else ''
91
                    vt_selection[vt_id][vt_value_id] = vt_value_value
92
93
            if vt.tag == 'vt_group':
94
                vts_filter = vt.attrib.get('filter', None)
95
96
                if vts_filter is None:
97
                    raise OspdError('Invalid VT group. No filter given.')
98
99
                filters.append(vts_filter)
100
101
        vt_selection['vt_groups'] = filters
102
103
        return vt_selection
104
105
    @staticmethod
106
    def process_credentials_elements(cred_tree: Element) -> Dict:
107
        """ Receive an XML object with the credentials to run
108
        a scan against a given target.
109
110
        @param:
111
        <credentials>
112
          <credential type="up" service="ssh" port="22">
113
            <username>scanuser</username>
114
            <password>mypass</password>
115
          </credential>
116
          <credential type="up" service="smb">
117
            <username>smbuser</username>
118
            <password>mypass</password>
119
          </credential>
120
        </credentials>
121
122
        @return: Dictionary containing the credentials for a given target.
123
                 Example form:
124
                 {'ssh': {'type': type,
125
                          'port': port,
126
                          'username': username,
127
                          'password': pass,
128
                        },
129
                  'smb': {'type': type,
130
                          'username': username,
131
                          'password': pass,
132
                         },
133
                   }
134
        """
135
        credentials = {}  # type: Dict
136
137
        for credential in cred_tree:
138
            service = credential.attrib.get('service')
139
            credentials[service] = {}
140
            credentials[service]['type'] = credential.attrib.get('type')
141
142
            if service == 'ssh':
143
                credentials[service]['port'] = credential.attrib.get('port')
144
145
            for param in credential:
146
                credentials[service][param.tag] = param.text
147
148
        return credentials
149
150
    @classmethod
151
    def process_target_element(cls, scanner_target: Element) -> Dict:
152
        """ Receive an XML object with the target, ports and credentials to run
153
        a scan against.
154
155
        Arguments:
156
            Single XML target element. The target has <hosts> and <ports>
157
            subelements. Hosts can be a single host, a host range, a
158
            comma-separated host list or a network address.
159
            <ports> and  <credentials> are optional. Therefore each
160
            ospd-scanner should check for a valid ones if needed.
161
162
            Example form:
163
164
            <target>
165
                <hosts>192.168.0.0/24</hosts>
166
                <ports>22</ports>
167
                <credentials>
168
                    <credential type="up" service="ssh" port="22">
169
                    <username>scanuser</username>
170
                    <password>mypass</password>
171
                    </credential>
172
                    <credential type="up" service="smb">
173
                    <username>smbuser</username>
174
                    <password>mypass</password>
175
                    </credential>
176
                </credentials>
177
                <alive_test></alive_test>
178
                <reverse_lookup_only>1</reverse_lookup_only>
179
                <reverse_lookup_unify>0</reverse_lookup_unify>
180
            </target>
181
182
        Return:
183
            A Dict  hosts, port, {credentials}, exclude_hosts, options].
184
185
            Example form:
186
187
            {
188
                'hosts': '192.168.0.0/24',
189
                'port': '22',
190
                'credentials': {'smb': {'type': type,
191
                                        'port': port,
192
                                        'username': username,
193
                                        'password': pass,
194
                                        }
195
                                },
196
197
                'exclude_hosts': '',
198
                'finished_hosts': '',
199
                'options': {'alive_test': 'ALIVE_TEST_CONSIDER_ALIVE',
200
                            'reverse_lookup_only': '1',
201
                            'reverse_lookup_unify': '0',
202
                            },
203
            }
204
        """
205
        if scanner_target:
206
            exclude_hosts = ''
207
            finished_hosts = ''
208
            ports = ''
209
            hosts = None
210
            credentials = {}  # type: Dict
211
            options = {}
212
213
            for child in scanner_target:
214
                if child.tag == 'hosts':
215
                    hosts = child.text
216
                if child.tag == 'exclude_hosts':
217
                    exclude_hosts = child.text
218
                if child.tag == 'finished_hosts':
219
                    finished_hosts = child.text
220
                if child.tag == 'ports':
221
                    ports = child.text
222
                if child.tag == 'credentials':
223
                    credentials = cls.process_credentials_elements(child)
224
                if child.tag == 'alive_test':
225
                    options['alive_test'] = child.text
226
                if child.tag == 'reverse_lookup_unify':
227
                    options['reverse_lookup_unify'] = child.text
228
                if child.tag == 'reverse_lookup_only':
229
                    options['reverse_lookup_only'] = child.text
230
231
            if hosts:
232
                return {
233
                    'hosts': hosts,
234
                    'ports': ports,
235
                    'credentials': credentials,
236
                    'exclude_hosts': exclude_hosts,
237
                    'finished_hosts': finished_hosts,
238
                    'options': options,
239
                }
240
            else:
241
                raise OspdError('No target to scan')
242
243
244
class OspResponse:
245
    @staticmethod
246
    def create_scanner_params_xml(scanner_params: Dict[str, Any]) -> Element:
247
        """ Returns the OSP Daemon's scanner params in xml format. """
248
        scanner_params_xml = Element('scanner_params')
249
250
        for param_id, param in scanner_params.items():
251
            param_xml = SubElement(scanner_params_xml, 'scanner_param')
252
253
            for name, value in [('id', param_id), ('type', param['type'])]:
254
                param_xml.set(name, value)
255
256
            for name, value in [
257
                ('name', param['name']),
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
258
                ('description', param['description']),
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
259
                ('default', param['default']),
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
260
                ('mandatory', param['mandatory']),
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
261
            ]:
262
                elem = SubElement(param_xml, name)
263
                elem.text = str(value)
264
265
        return scanner_params_xml
266