ospd.protocol.OspRequest.process_target_element()   F
last analyzed

Complexity

Conditions 14

Size

Total Lines 99
Code Lines 40

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 14
eloc 40
nop 2
dl 0
loc 99
rs 3.6
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like ospd.protocol.OspRequest.process_target_element() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Copyright (C) 2014-2021 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: AGPL-3.0-or-later
4
#
5
# This program is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Affero General Public License as
7
# published by the Free Software Foundation, either version 3 of the
8
# License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
""" Helper classes for parsing and creating OSP XML requests and responses
19
"""
20
21
from typing import Dict, Union, List, Any
22
23
from xml.etree.ElementTree import SubElement, Element, XMLPullParser
24
25
from ospd.errors import OspdError
26
27
28
class RequestParser:
29
    def __init__(self):
30
        self._parser = XMLPullParser(['start', 'end'])
31
        self._root_element = None
32
33
    def has_ended(self, data: bytes) -> bool:
34
        self._parser.feed(data)
35
36
        for event, element in self._parser.read_events():
37
            if event == 'start' and self._root_element is None:
38
                self._root_element = element
39
            elif event == 'end' and self._root_element is not None:
40
                if element.tag == self._root_element.tag:
41
                    return True
42
43
        return False
44
45
46
class OspRequest:
47
    @staticmethod
48
    def process_vts_params(
49
        scanner_vts: Element,
50
    ) -> Dict[str, Union[Dict[str, str], List]]:
51
        """Receive an XML object with the Vulnerability Tests an their
52
        parameters to be use in a scan and return a dictionary.
53
54
        @param: XML element with vt subelements. Each vt has an
55
                id attribute. Optional parameters can be included
56
                as vt child.
57
                Example form:
58
                <vt_selection>
59
                  <vt_single id='vt1' />
60
                  <vt_single id='vt2'>
61
                    <vt_value id='param1'>value</vt_value>
62
                  </vt_single>
63
                  <vt_group filter='family=debian'/>
64
                  <vt_group filter='family=general'/>
65
                </vt_selection>
66
67
        @return: Dictionary containing the vts attribute and subelements,
68
                 like the VT's id and VT's parameters.
69
                 Example form:
70
                 {'vt1': {},
71
                  'vt2': {'value_id': 'value'},
72
                  'vt_groups': ['family=debian', 'family=general']}
73
        """
74
        vt_selection = {}  # type: Dict
75
        filters = []
76
77
        for vt in scanner_vts:
78
            if vt.tag == 'vt_single':
79
                vt_id = vt.attrib.get('id')
80
                vt_selection[vt_id] = {}
81
82
                for vt_value in vt:
83
                    if not vt_value.attrib.get('id'):
84
                        raise OspdError(
85
                            'Invalid VT preference. No attribute id'
86
                        )
87
88
                    vt_value_id = vt_value.attrib.get('id')
89
                    vt_value_value = vt_value.text if vt_value.text else ''
90
                    vt_selection[vt_id][vt_value_id] = vt_value_value
91
92
            if vt.tag == 'vt_group':
93
                vts_filter = vt.attrib.get('filter', None)
94
95
                if vts_filter is None:
96
                    raise OspdError('Invalid VT group. No filter given.')
97
98
                filters.append(vts_filter)
99
100
        vt_selection['vt_groups'] = filters
101
102
        return vt_selection
103
104
    @staticmethod
105
    def process_credentials_elements(cred_tree: Element) -> Dict:
106
        """Receive an XML object with the credentials to run
107
        a scan against a given target.
108
109
        @param:
110
        <credentials>
111
          <credential type="up" service="ssh" port="22">
112
            <username>scanuser</username>
113
            <password>mypass</password>
114
          </credential>
115
          <credential type="up" service="smb">
116
            <username>smbuser</username>
117
            <password>mypass</password>
118
          </credential>
119
        </credentials>
120
121
        @return: Dictionary containing the credentials for a given target.
122
                 Example form:
123
                 {'ssh': {'type': type,
124
                          'port': port,
125
                          'username': username,
126
                          'password': pass,
127
                        },
128
                  'smb': {'type': type,
129
                          'username': username,
130
                          'password': pass,
131
                         },
132
                   }
133
        """
134
        credentials = {}  # type: Dict
135
136
        for credential in cred_tree:
137
            service = credential.attrib.get('service')
138
            credentials[service] = {}
139
            credentials[service]['type'] = credential.attrib.get('type')
140
141
            if service == 'ssh':
142
                credentials[service]['port'] = credential.attrib.get('port')
143
144
            for param in credential:
145
                credentials[service][param.tag] = (
146
                    param.text if param.text else ""
147
                )
148
149
        return credentials
150
151
    @staticmethod
152
    def process_alive_test_methods(
153
        alive_test_tree: Element, options: Dict
154
    ) -> None:
155
        """Receive an XML object with the alive test methods to run
156
        a scan with. Methods are added to the options Dict.
157
158
        @param
159
        <alive_test_methods>
160
            </icmp>boolean(1 or 0)</icmp>
161
            </tcp_ack>boolean(1 or 0)</tcp_ack>
162
            </tcp_syn>boolean(1 or 0)</tcp_syn>
163
            </arp>boolean(1 or 0)</arp>
164
            </consider_alive>boolean(1 or 0)</consider_alive>
165
        </alive_test_methods>
166
        """
167
        for child in alive_test_tree:
168
            if child.tag == 'icmp':
169
                if child.text is not None:
170
                    options['icmp'] = child.text
171
            if child.tag == 'tcp_ack':
172
                if child.text is not None:
173
                    options['tcp_ack'] = child.text
174
            if child.tag == 'tcp_syn':
175
                if child.text is not None:
176
                    options['tcp_syn'] = child.text
177
            if child.tag == 'arp':
178
                if child.text is not None:
179
                    options['arp'] = child.text
180
            if child.tag == 'consider_alive':
181
                if child.text is not None:
182
                    options['consider_alive'] = child.text
183
184
    @classmethod
185
    def process_target_element(cls, scanner_target: Element) -> Dict:
186
        """Receive an XML object with the target, ports and credentials to run
187
        a scan against.
188
189
        Arguments:
190
            Single XML target element. The target has <hosts> and <ports>
191
            subelements. Hosts can be a single host, a host range, a
192
            comma-separated host list or a network address.
193
            <ports> and  <credentials> are optional. Therefore each
194
            ospd-scanner should check for a valid ones if needed.
195
196
            Example form:
197
198
            <target>
199
                <hosts>192.168.0.0/24</hosts>
200
                <ports>22</ports>
201
                <credentials>
202
                    <credential type="up" service="ssh" port="22">
203
                    <username>scanuser</username>
204
                    <password>mypass</password>
205
                    </credential>
206
                    <credential type="up" service="smb">
207
                    <username>smbuser</username>
208
                    <password>mypass</password>
209
                    </credential>
210
                </credentials>
211
                <alive_test></alive_test>
212
                <alive_test_ports></alive_test_ports>
213
                <reverse_lookup_only>1</reverse_lookup_only>
214
                <reverse_lookup_unify>0</reverse_lookup_unify>
215
            </target>
216
217
        Return:
218
            A Dict  hosts, port, {credentials}, exclude_hosts, options].
219
220
            Example form:
221
222
            {
223
                'hosts': '192.168.0.0/24',
224
                'port': '22',
225
                'credentials': {'smb': {'type': type,
226
                                        'port': port,
227
                                        'username': username,
228
                                        'password': pass,
229
                                        }
230
                                },
231
232
                'exclude_hosts': '',
233
                'finished_hosts': '',
234
                'options': {'alive_test': 'ALIVE_TEST_CONSIDER_ALIVE',
235
                            'alive_test_ports: '22,80,123',
236
                            'reverse_lookup_only': '1',
237
                            'reverse_lookup_unify': '0',
238
                            },
239
            }
240
        """
241
        if scanner_target:
242
            exclude_hosts = ''
243
            finished_hosts = ''
244
            ports = ''
245
            hosts = None
246
            credentials = {}  # type: Dict
247
            options = {}
248
249
            for child in scanner_target:
250
                if child.tag == 'hosts':
251
                    hosts = child.text
252
                if child.tag == 'exclude_hosts':
253
                    exclude_hosts = child.text
254
                if child.tag == 'finished_hosts':
255
                    finished_hosts = child.text
256
                if child.tag == 'ports':
257
                    ports = child.text
258
                if child.tag == 'credentials':
259
                    credentials = cls.process_credentials_elements(child)
260
                if child.tag == 'alive_test_methods':
261
                    options['alive_test_methods'] = '1'
262
                    cls.process_alive_test_methods(child, options)
263
                if child.tag == 'alive_test':
264
                    options['alive_test'] = child.text
265
                if child.tag == 'alive_test_ports':
266
                    options['alive_test_ports'] = child.text
267
                if child.tag == 'reverse_lookup_unify':
268
                    options['reverse_lookup_unify'] = child.text
269
                if child.tag == 'reverse_lookup_only':
270
                    options['reverse_lookup_only'] = child.text
271
272
            if hosts:
273
                return {
274
                    'hosts': hosts,
275
                    'ports': ports,
276
                    'credentials': credentials,
277
                    'exclude_hosts': exclude_hosts,
278
                    'finished_hosts': finished_hosts,
279
                    'options': options,
280
                }
281
            else:
282
                raise OspdError('No target to scan')
283
284
285
class OspResponse:
286
    @staticmethod
287
    def create_scanner_params_xml(scanner_params: Dict[str, Any]) -> Element:
288
        """Returns the OSP Daemon's scanner params in xml format."""
289
        scanner_params_xml = Element('scanner_params')
290
291
        for param_id, param in scanner_params.items():
292
            param_xml = SubElement(scanner_params_xml, 'scanner_param')
293
294
            for name, value in [('id', param_id), ('type', param['type'])]:
295
                param_xml.set(name, value)
296
297
            for name, value in [
298
                ('name', param['name']),
299
                ('description', param['description']),
300
                ('default', param['default']),
301
                ('mandatory', param['mandatory']),
302
            ]:
303
                elem = SubElement(param_xml, name)
304
                elem.text = str(value)
305
306
        return scanner_params_xml
307