Completed
Push — master ( 532169...df823d )
by Juan José
15s queued 11s
created

ospd.protocol.OspRequest.process_vts_params()   B

Complexity

Conditions 8

Size

Total Lines 56
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 8
eloc 23
nop 1
dl 0
loc 56
rs 7.3333
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# Copyright (C) 2014-2020 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: AGPL-3.0-or-later
4
#
5
# This program is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Affero General Public License as
7
# published by the Free Software Foundation, either version 3 of the
8
# License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
""" Helper classes for parsing and creating OSP XML requests and responses
19
"""
20
21
from typing import Dict, Union, List, Any
22
23
from xml.etree.ElementTree import SubElement, Element, XMLPullParser
24
25
from ospd.errors import OspdError
26
27
28
class RequestParser:
29
    def __init__(self):
30
        self._parser = XMLPullParser(['start', 'end'])
31
        self._root_element = None
32
33
    def has_ended(self, data: bytes) -> bool:
34
        self._parser.feed(data)
35
36
        for event, element in self._parser.read_events():
37
            if event == 'start' and self._root_element is None:
38
                self._root_element = element
39
            elif event == 'end' and self._root_element is not None:
40
                if element.tag == self._root_element.tag:
41
                    return True
42
43
        return False
44
45
46
class OspRequest:
47
    @staticmethod
48
    def process_vts_params(
49
        scanner_vts: Element,
50
    ) -> Dict[str, Union[Dict[str, str], List]]:
51
        """Receive an XML object with the Vulnerability Tests an their
52
        parameters to be use in a scan and return a dictionary.
53
54
        @param: XML element with vt subelements. Each vt has an
55
                id attribute. Optional parameters can be included
56
                as vt child.
57
                Example form:
58
                <vt_selection>
59
                  <vt_single id='vt1' />
60
                  <vt_single id='vt2'>
61
                    <vt_value id='param1'>value</vt_value>
62
                  </vt_single>
63
                  <vt_group filter='family=debian'/>
64
                  <vt_group filter='family=general'/>
65
                </vt_selection>
66
67
        @return: Dictionary containing the vts attribute and subelements,
68
                 like the VT's id and VT's parameters.
69
                 Example form:
70
                 {'vt1': {},
71
                  'vt2': {'value_id': 'value'},
72
                  'vt_groups': ['family=debian', 'family=general']}
73
        """
74
        vt_selection = {}  # type: Dict
75
        filters = []
76
77
        for vt in scanner_vts:
78
            if vt.tag == 'vt_single':
79
                vt_id = vt.attrib.get('id')
80
                vt_selection[vt_id] = {}
81
82
                for vt_value in vt:
83
                    if not vt_value.attrib.get('id'):
84
                        raise OspdError(
85
                            'Invalid VT preference. No attribute id'
86
                        )
87
88
                    vt_value_id = vt_value.attrib.get('id')
89
                    vt_value_value = vt_value.text if vt_value.text else ''
90
                    vt_selection[vt_id][vt_value_id] = vt_value_value
91
92
            if vt.tag == 'vt_group':
93
                vts_filter = vt.attrib.get('filter', None)
94
95
                if vts_filter is None:
96
                    raise OspdError('Invalid VT group. No filter given.')
97
98
                filters.append(vts_filter)
99
100
        vt_selection['vt_groups'] = filters
101
102
        return vt_selection
103
104
    @staticmethod
105
    def process_credentials_elements(cred_tree: Element) -> Dict:
106
        """Receive an XML object with the credentials to run
107
        a scan against a given target.
108
109
        @param:
110
        <credentials>
111
          <credential type="up" service="ssh" port="22">
112
            <username>scanuser</username>
113
            <password>mypass</password>
114
          </credential>
115
          <credential type="up" service="smb">
116
            <username>smbuser</username>
117
            <password>mypass</password>
118
          </credential>
119
        </credentials>
120
121
        @return: Dictionary containing the credentials for a given target.
122
                 Example form:
123
                 {'ssh': {'type': type,
124
                          'port': port,
125
                          'username': username,
126
                          'password': pass,
127
                        },
128
                  'smb': {'type': type,
129
                          'username': username,
130
                          'password': pass,
131
                         },
132
                   }
133
        """
134
        credentials = {}  # type: Dict
135
136
        for credential in cred_tree:
137
            service = credential.attrib.get('service')
138
            credentials[service] = {}
139
            credentials[service]['type'] = credential.attrib.get('type')
140
141
            if service == 'ssh':
142
                credentials[service]['port'] = credential.attrib.get('port')
143
144
            for param in credential:
145
                credentials[service][param.tag] = param.text
146
147
        return credentials
148
149
    @staticmethod
150
    def process_alive_test_methods(
151
        alive_test_tree: Element, options: Dict
152
    ) -> None:
153
        """Receive an XML object with the alive test methods to run
154
        a scan with. Methods are added to the options Dict.
155
156
        @param
157
        <alive_test_methods>
158
            </icmp>boolean(1 or 0)</icmp>
159
            </tcp_ack>boolean(1 or 0)</tcp_ack>
160
            </tcp_syn>boolean(1 or 0)</tcp_syn>
161
            </arp>boolean(1 or 0)</arp>
162
            </consider_alive>boolean(1 or 0)</consider_alive>
163
        </alive_test_methods>
164
        """
165
        for child in alive_test_tree:
166
            if child.tag == 'icmp':
167
                if child.text is not None:
168
                    options['icmp'] = child.text
169
            if child.tag == 'tcp_ack':
170
                if child.text is not None:
171
                    options['tcp_ack'] = child.text
172
            if child.tag == 'tcp_syn':
173
                if child.text is not None:
174
                    options['tcp_syn'] = child.text
175
            if child.tag == 'arp':
176
                if child.text is not None:
177
                    options['arp'] = child.text
178
            if child.tag == 'consider_alive':
179
                if child.text is not None:
180
                    options['consider_alive'] = child.text
181
182
    @classmethod
183
    def process_target_element(cls, scanner_target: Element) -> Dict:
184
        """Receive an XML object with the target, ports and credentials to run
185
        a scan against.
186
187
        Arguments:
188
            Single XML target element. The target has <hosts> and <ports>
189
            subelements. Hosts can be a single host, a host range, a
190
            comma-separated host list or a network address.
191
            <ports> and  <credentials> are optional. Therefore each
192
            ospd-scanner should check for a valid ones if needed.
193
194
            Example form:
195
196
            <target>
197
                <hosts>192.168.0.0/24</hosts>
198
                <ports>22</ports>
199
                <credentials>
200
                    <credential type="up" service="ssh" port="22">
201
                    <username>scanuser</username>
202
                    <password>mypass</password>
203
                    </credential>
204
                    <credential type="up" service="smb">
205
                    <username>smbuser</username>
206
                    <password>mypass</password>
207
                    </credential>
208
                </credentials>
209
                <alive_test></alive_test>
210
                <alive_test_ports></alive_test_ports>
211
                <reverse_lookup_only>1</reverse_lookup_only>
212
                <reverse_lookup_unify>0</reverse_lookup_unify>
213
            </target>
214
215
        Return:
216
            A Dict  hosts, port, {credentials}, exclude_hosts, options].
217
218
            Example form:
219
220
            {
221
                'hosts': '192.168.0.0/24',
222
                'port': '22',
223
                'credentials': {'smb': {'type': type,
224
                                        'port': port,
225
                                        'username': username,
226
                                        'password': pass,
227
                                        }
228
                                },
229
230
                'exclude_hosts': '',
231
                'finished_hosts': '',
232
                'options': {'alive_test': 'ALIVE_TEST_CONSIDER_ALIVE',
233
                            'alive_test_ports: '22,80,123',
234
                            'reverse_lookup_only': '1',
235
                            'reverse_lookup_unify': '0',
236
                            },
237
            }
238
        """
239
        if scanner_target:
240
            exclude_hosts = ''
241
            finished_hosts = ''
242
            ports = ''
243
            hosts = None
244
            credentials = {}  # type: Dict
245
            options = {}
246
247
            for child in scanner_target:
248
                if child.tag == 'hosts':
249
                    hosts = child.text
250
                if child.tag == 'exclude_hosts':
251
                    exclude_hosts = child.text
252
                if child.tag == 'finished_hosts':
253
                    finished_hosts = child.text
254
                if child.tag == 'ports':
255
                    ports = child.text
256
                if child.tag == 'credentials':
257
                    credentials = cls.process_credentials_elements(child)
258
                if child.tag == 'alive_test_methods':
259
                    options['alive_test_methods'] = '1'
260
                    cls.process_alive_test_methods(child, options)
261
                if child.tag == 'alive_test':
262
                    options['alive_test'] = child.text
263
                if child.tag == 'alive_test_ports':
264
                    options['alive_test_ports'] = child.text
265
                if child.tag == 'reverse_lookup_unify':
266
                    options['reverse_lookup_unify'] = child.text
267
                if child.tag == 'reverse_lookup_only':
268
                    options['reverse_lookup_only'] = child.text
269
270
            if hosts:
271
                return {
272
                    'hosts': hosts,
273
                    'ports': ports,
274
                    'credentials': credentials,
275
                    'exclude_hosts': exclude_hosts,
276
                    'finished_hosts': finished_hosts,
277
                    'options': options,
278
                }
279
            else:
280
                raise OspdError('No target to scan')
281
282
283
class OspResponse:
284
    @staticmethod
285
    def create_scanner_params_xml(scanner_params: Dict[str, Any]) -> Element:
286
        """ Returns the OSP Daemon's scanner params in xml format. """
287
        scanner_params_xml = Element('scanner_params')
288
289
        for param_id, param in scanner_params.items():
290
            param_xml = SubElement(scanner_params_xml, 'scanner_param')
291
292
            for name, value in [('id', param_id), ('type', param['type'])]:
293
                param_xml.set(name, value)
294
295
            for name, value in [
296
                ('name', param['name']),
297
                ('description', param['description']),
298
                ('default', param['default']),
299
                ('mandatory', param['mandatory']),
300
            ]:
301
                elem = SubElement(param_xml, name)
302
                elem.text = str(value)
303
304
        return scanner_params_xml
305