Completed
Push — master ( f97307...dda958 )
by Juan José
13s
created

gvm.protocols.ospv1.Osp.start_scan()   C

Complexity

Conditions 9

Size

Total Lines 97
Code Lines 33

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 9
eloc 33
nop 8
dl 0
loc 97
rs 6.6666
c 0
b 0
f 0

How to fix   Long Method    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
"""
19
Module for communication to a daemon speaking Open Scanner Protocol version 1
20
"""
21
import logging
22
23
from gvm.errors import RequiredArgument
24
from gvm.utils import get_version_string
25
from gvm.xml import XmlCommand
26
27
from .base import GvmProtocol
28
29
logger = logging.getLogger(__name__)
30
31
PROTOCOL_VERSION = (1, 2,)
32
33
34
def create_credentials_element(_xmlcredentials, credentials):
35
    """Generates an xml element with credentials."""
36
    for service, credential in credentials.items():
37
        cred_type = credential.get('type')
38
        serv_port = credential.get('port')
39
        username = credential.get('username')
40
        password = credential.get('password')
41
        _xmlcredential = _xmlcredentials.add_element(
42
            'credential', attrs={
43
                'type': cred_type,
44
                'port': serv_port,
45
                'service': service,
46
            })
47
        _xmlcredential.add_element('username', username)
48
        _xmlcredential.add_element('password', password)
49
    return _xmlcredentials
50
51
def create_vt_selection_element(_xmlvtselection, vt_selection):
52
    """Generates an xml element with a selection of Vulnerability tests."""
53
    for vt_id, vt_values in vt_selection.items():
54
        if vt_id != 'vt_groups':
55
            _xmlvt = _xmlvtselection.add_element('vt_single',
56
                                                 attrs={'id': vt_id})
57
            if vt_values:
58
                for key, value in vt_values.items():
59
                    _xmlvt.add_element('vt_value', value, attrs={'id': key})
60
        else:
61
            for group in vt_values:
62
                _xmlvt = _xmlvtselection.add_element(
63
                    'vt_group', attrs={'filter': group})
64
65
    return _xmlvtselection
66
67
class Osp(GvmProtocol):
68
69
    @staticmethod
70
    def get_protocol_version():
71
        """Allow to determine the Open Scanner Protocol version.
72
73
            Returns:
74
                str: Implemented version of the Open Scanner Protocol
75
        """
76
        return get_version_string(PROTOCOL_VERSION)
77
78
    def _read(self):
79
        # OSP is stateless. Therefore the connection is closed after each
80
        # response and we must reset the connection
81
        data = super()._read()
82
        self.disconnect()
83
        return data
84
85
    def get_version(self):
86
        """Get the version of the OSPD server which is connected to."""
87
        cmd = XmlCommand('get_version')
88
        return self._send_xml_command(cmd)
89
90
    def help(self):
91
        """Get the help text."""
92
        cmd = XmlCommand('help')
93
        return self._send_xml_command(cmd)
94
95
    def get_scans(self, scan_id=None, details=True, pop_results=False):
96
        """Get the stored scans.
97
98
        Arguments:
99
            scan_id (str, optional): UUID identifier for a scan.
100
            details (boolean, optional): Whether to get full scan reports.
101
                Default: True
102
            pop_results (boolean, optional) Whether to remove the fetched
103
                results. Default: False
104
105
        Returns:
106
            str: Response from server.
107
        """
108
        cmd = XmlCommand('get_scans')
109
        if scan_id:
110
            cmd.set_attribute('scan_id', scan_id)
111
        if details:
112
            cmd.set_attribute('details', '1')
113
        else:
114
            cmd.set_attribute('details', '0')
115
116
        if pop_results:
117
            cmd.set_attribute('pop_results', '1')
118
        else:
119
            cmd.set_attribute('pop_results', '0')
120
121
        return self._send_xml_command(cmd)
122
123
    def delete_scan(self, scan_id=None):
124
        """Delete a finished scan.
125
126
        Arguments:
127
            scan_id (str): UUID identifier for a finished scan.
128
129
        Returns:
130
            str: Response from server.
131
        """
132
        if not scan_id:
133
            raise ValueError('delete_scan requires a scan_id element')
134
        cmd = XmlCommand('delete_scan')
135
        cmd.set_attribute('scan_id', scan_id)
136
137
        return self._send_xml_command(cmd)
138
139
    def get_scanner_details(self):
140
        """Return scanner description and parameters."""
141
        cmd = XmlCommand('get_scanner_details')
142
        return self._send_xml_command(cmd)
143
144
    def get_vts(self, vt_id=None):
145
        """Return information about vulnerability tests,
146
        if offered by scanner.
147
148
        Arguments:
149
            vt_id (str, optional): UUID identifier for a vulnerability test.
150
151
        Returns:
152
            str: Response from server.
153
        """
154
        cmd = XmlCommand('get_vts')
155
        if vt_id:
156
            cmd.set_attribute('vt_id', vt_id)
157
158
        return self._send_xml_command(cmd)
159
160
    def start_scan(self, scan_id=None, parallel=1, target=None,
0 ignored issues
show
Comprehensibility introduced by
This function exceeds the maximum number of variables (17/15).
Loading history...
161
                   ports=None, targets=None, scanner_params=None,
162
                   vt_selection=None):
163
        """Start a new scan.
164
165
        Arguments:
166
            scan_id (str, optional): UUID identifier for a running scan.
167
            parallel (int, optional): Number of parallel scanned targets.
168
                Default 1.
169
            target (dict, optional): Deprecated. Please use targets instead.
170
            targets (list, optional): List of dictionaries. See example.
171
            ports (str, optional): Deprecated. Ports to use for target
172
                parameter.
173
            scanner_params: (dict, optional): Dictionary of scanner parameters.
174
            vt_selection: (dict, optional): Vulnerability tests to select. See
175
                example.
176
177
        Returns:
178
            str: Response from server.
179
180
181
        Examples:
182
183
            Scanner Parameters::
184
185
                scanner_parameters = {
186
                    'scan_param1': 'scan_param1_value',
187
                    'scan_param2': 'scan_param2_value',
188
                }
189
190
            Targets::
191
192
                targets = [{
193
                    'hosts': 'localhost',
194
                    'ports': '80,43'
195
                }, {
196
                    'hosts': '192.168.0.0/24',
197
                    'ports': '22',
198
                }, {
199
                    'credentials': {
200
                        'smb': {
201
                            'password': 'pass',
202
                            'port': 'port',
203
                            'type': 'type',
204
                            'username': 'username',
205
                        }
206
                    }
207
                }]
208
209
            VT Selection::
210
211
                vt_selection = {
212
                    'vt1': {},
213
                    'vt2': {'value_id': 'value'},
214
                    'vt_groups': ['family=debian', 'family=general']
215
                }
216
        """
217
        cmd = XmlCommand('start_scan')
218
219
        if scan_id:
220
            cmd.set_attribute('scan_id', scan_id)
221
222
        cmd.set_attribute('parallel', str(parallel))
223
224
        # Add <scanner_params> even if it is empty, since it is mandatory
225
        _xmlscanparams = cmd.add_element('scanner_params')
226
        if scanner_params:
227
            _xmlscanparams.set_attributes(scanner_params)
228
229
        if targets:
230
            _xmltargets = cmd.add_element('targets')
231
            for target in targets:
0 ignored issues
show
unused-code introduced by
Redefining argument with the local name 'target'
Loading history...
232
                _xmltarget = _xmltargets.add_element('target')
233
                hosts = target.get('hosts')
234
                ports = target.get('ports')
235
                credentials = target.get('credentials')
236
                _xmltarget.add_element('hosts', hosts)
237
                _xmltarget.add_element('ports', ports)
238
                if credentials:
239
                    _xmlcredentials = _xmltarget.add_element('credentials')
240
                    _xmlcredentials = (create_credentials_element(
241
                        _xmlcredentials, credentials))
242
        # Check target as attribute for legacy mode compatibility. Deprecated.
243
        elif target:
244
            cmd.set_attribute('target', target)
245
            if ports:
246
                cmd.set_attribute('ports', ports)
247
        else:
248
            raise RequiredArgument('start_scan requires a target. Please pass '
249
                                   'targets parameter.')
250
251
        if vt_selection:
252
            _xmlvtselection = cmd.add_element('vt_selection')
253
            _xmlvtselection = create_vt_selection_element(
254
                _xmlvtselection, vt_selection)
255
256
        return self._send_xml_command(cmd)
257
258
    def stop_scan(self, scan_id):
259
        """Stop a currently running scan.
260
261
        Args:
262
            scan_id (str): UUID identifier for a running scan.
263
264
        Returns:
265
            str: Response from server.
266
        """
267
        if not scan_id:
268
            raise RequiredArgument('stop_scan requires a scan_id argument')
269
270
        cmd = XmlCommand('stop_scan')
271
        cmd.set_attribute('scan_id', scan_id)
272
273
        return self._send_xml_command(cmd)
274