Passed
Pull Request — master (#55)
by
unknown
01:51
created

gvm.protocols.ospv1.Osp.start_scan()   C

Complexity

Conditions 9

Size

Total Lines 97
Code Lines 33

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 9
eloc 33
nop 8
dl 0
loc 97
rs 6.6666
c 0
b 0
f 0

How to fix   Long Method    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
"""
19
Module for communication to a daemon speaking `Open Scanner Protocol version 1`_
20
21
.. _Open Scanner Protocol version 1:
22
    https://docs.greenbone.net/API/GMP/gmp-7.0.html
23
"""
24
import logging
25
26
from gvm.errors import RequiredArgument
27
from gvm.utils import get_version_string
28
from gvm.xml import XmlCommand
29
30
from .base import GvmProtocol
31
32
logger = logging.getLogger(__name__)
33
34
PROTOCOL_VERSION = (1, 2,)
35
36
37
def create_credentials_element(_xmlcredentials, credentials):
38
    """Generates an xml element with credentials."""
39
    for service, credential in credentials.items():
40
        cred_type = credential.get('type')
41
        serv_port = credential.get('port')
42
        username = credential.get('username')
43
        password = credential.get('password')
44
        _xmlcredential = _xmlcredentials.add_element(
45
            'credential', attrs={
46
                'type': cred_type,
47
                'port': serv_port,
48
                'service': service,
49
            })
50
        _xmlcredential.add_element('username', username)
51
        _xmlcredential.add_element('password', password)
52
    return _xmlcredentials
53
54
def create_vt_selection_element(_xmlvtselection, vt_selection):
55
    """Generates an xml element with a selection of Vulnerability tests."""
56
    for vt_id, vt_values in vt_selection.items():
57
        if vt_id != 'vt_groups':
58
            _xmlvt = _xmlvtselection.add_element('vt_single',
59
                                                 attrs={'id': vt_id})
60
            if vt_values:
61
                for key, value in vt_values.items():
62
                    _xmlvt.add_element('vt_value', value, attrs={'id': key})
63
        else:
64
            for group in vt_values:
65
                _xmlvt = _xmlvtselection.add_element(
66
                    'vt_group', attrs={'filter': group})
67
68
    return _xmlvtselection
69
70
class Osp(GvmProtocol):
71
72
    @staticmethod
73
    def get_protocol_version():
74
        """Allow to determine the Open Scanner Protocol version.
75
76
            Returns:
77
                str: Implemented version of the Open Scanner Protocol
78
        """
79
        return get_version_string(PROTOCOL_VERSION)
80
81
    def _read(self):
82
        # OSP is stateless. Therefore the connection is closed after each
83
        # response and we must reset the connection
84
        data = super()._read()
85
        self.disconnect()
86
        return data
87
88
    def _send(self, data):
89
        # OSP is stateless. Therefore we can shutdown the socket if we are done
90
        # with sending
91
        super()._send(data)
92
        self._connection.finish_send()
93
94
    def get_version(self):
95
        """Get the version of the OSPD server which is connected to."""
96
        cmd = XmlCommand('get_version')
97
        return self._send_xml_command(cmd)
98
99
    def help(self):
100
        """Get the help text."""
101
        cmd = XmlCommand('help')
102
        return self._send_xml_command(cmd)
103
104
    def get_scans(self, scan_id=None, details=True, pop_results=False):
105
        """Get the stored scans.
106
107
        Arguments:
108
            scan_id (str, optional): UUID identifier for a scan.
109
            details (boolean, optional): Whether to get full scan reports.
110
                Default: True
111
            pop_results (boolean, optional) Whether to remove the fetched
112
                results. Default: False
113
114
        Returns:
115
            str: Response from server.
116
        """
117
        cmd = XmlCommand('get_scans')
118
        if scan_id:
119
            cmd.set_attribute('scan_id', scan_id)
120
        if details:
121
            cmd.set_attribute('details', '1')
122
        else:
123
            cmd.set_attribute('details', '0')
124
125
        if pop_results:
126
            cmd.set_attribute('pop_results', '1')
127
        else:
128
            cmd.set_attribute('pop_results', '0')
129
130
        return self._send_xml_command(cmd)
131
132
    def delete_scan(self, scan_id=None):
133
        """Delete a finished scan.
134
135
        Arguments:
136
            scan_id (str): UUID identifier for a finished scan.
137
138
        Returns:
139
            str: Response from server.
140
        """
141
        if not scan_id:
142
            raise ValueError('delete_scan requires a scan_id element')
143
        cmd = XmlCommand('delete_scan')
144
        cmd.set_attribute('scan_id', scan_id)
145
146
        return self._send_xml_command(cmd)
147
148
    def get_scanner_details(self):
149
        """Return scanner description and parameters."""
150
        cmd = XmlCommand('get_scanner_details')
151
        return self._send_xml_command(cmd)
152
153
    def get_vts(self, vt_id=None):
154
        """Return information about vulnerability tests,
155
        if offered by scanner.
156
157
        Arguments:
158
            vt_id (str, optional): UUID identifier for a vulnerability test.
159
160
        Returns:
161
            str: Response from server.
162
        """
163
        cmd = XmlCommand('get_vts')
164
        if vt_id:
165
            cmd.set_attribute('vt_id', vt_id)
166
167
        return self._send_xml_command(cmd)
168
169
    def start_scan(self, scan_id=None, parallel=1, target=None,
0 ignored issues
show
Comprehensibility introduced by
This function exceeds the maximum number of variables (17/15).
Loading history...
170
                   ports=None, targets=None, scanner_params=None,
171
                   vt_selection=None):
172
        """Start a new scan.
173
174
        Arguments:
175
            scan_id (str, optional): UUID identifier for a running scan.
176
            parallel (int, optional): Number of parallel scanned targets.
177
                Default 1.
178
            target (dict, optional): Deprecated. Please use targets instead.
179
            targets (list, optional): List of dictionaries. See example.
180
            ports (str, optional): Deprecated. Ports to use for target
181
                parameter.
182
            scanner_params: (dict, optional): Dictionary of scanner parameters.
183
            vt_selection: (dict, optional): Vulnerability tests to select. See
184
                example.
185
186
        Returns:
187
            str: Response from server.
188
189
190
        Examples:
191
192
            Scanner Parameters::
193
194
                scanner_parameters = {
195
                    'scan_param1': 'scan_param1_value',
196
                    'scan_param2': 'scan_param2_value',
197
                }
198
199
            Targets::
200
201
                targets = [{
202
                    'hosts': 'localhost',
203
                    'ports': '80,43'
204
                }, {
205
                    'hosts': '192.168.0.0/24',
206
                    'ports': '22',
207
                }, {
208
                    'credentials': {
209
                        'smb': {
210
                            'password': 'pass',
211
                            'port': 'port',
212
                            'type': 'type',
213
                            'username': 'username',
214
                        }
215
                    }
216
                }]
217
218
            VT Selection::
219
220
                vt_selection = {
221
                    'vt1': {},
222
                    'vt2': {'value_id': 'value'},
223
                    'vt_groups': ['family=debian', 'family=general']
224
                }
225
        """
226
        cmd = XmlCommand('start_scan')
227
228
        if scan_id:
229
            cmd.set_attribute('scan_id', scan_id)
230
231
        cmd.set_attribute('parallel', str(parallel))
232
233
        # Add <scanner_params> even if it is empty, since it is mandatory
234
        _xmlscanparams = cmd.add_element('scanner_params')
235
        if scanner_params:
236
            _xmlscanparams.set_attributes(scanner_params)
237
238
        if targets:
239
            _xmltargets = cmd.add_element('targets')
240
            for target in targets:
0 ignored issues
show
unused-code introduced by
Redefining argument with the local name 'target'
Loading history...
241
                _xmltarget = _xmltargets.add_element('target')
242
                hosts = target.get('hosts')
243
                ports = target.get('ports')
244
                credentials = target.get('credentials')
245
                _xmltarget.add_element('hosts', hosts)
246
                _xmltarget.add_element('ports', ports)
247
                if credentials:
248
                    _xmlcredentials = _xmltarget.add_element('credentials')
249
                    _xmlcredentials = (create_credentials_element(
250
                        _xmlcredentials, credentials))
251
        # Check target as attribute for legacy mode compatibility. Deprecated.
252
        elif target:
253
            cmd.set_attribute('target', target)
254
            if ports:
255
                cmd.set_attribute('ports', ports)
256
        else:
257
            raise RequiredArgument('start_scan requires a target. Please pass '
258
                                   'targets parameter.')
259
260
        if vt_selection:
261
            _xmlvtselection = cmd.add_element('vt_selection')
262
            _xmlvtselection = create_vt_selection_element(
263
                _xmlvtselection, vt_selection)
264
265
        return self._send_xml_command(cmd)
266
267
    def stop_scan(self, scan_id):
268
        """Stop a currently running scan.
269
270
        Args:
271
            scan_id (str): UUID identifier for a running scan.
272
273
        Returns:
274
            str: Response from server.
275
        """
276
        if not scan_id:
277
            raise RequiredArgument('stop_scan requires a scan_id argument')
278
279
        cmd = XmlCommand('stop_scan')
280
        cmd.set_attribute('scan_id', scan_id)
281
282
        return self._send_xml_command(cmd)
283