Passed
Push — master ( 9a837c...ad9af7 )
by
unknown
02:35 queued 23s
created

gvm.protocols.ospv1.Osp.start_scan()   C

Complexity

Conditions 9

Size

Total Lines 108
Code Lines 40

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 9
eloc 40
nop 8
dl 0
loc 108
rs 6.5866
c 0
b 0
f 0

How to fix   Long Method    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 - 2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
"""
19
Module for communication to a daemon speaking `Open Scanner Protocol version 1`_
20
21
.. _Open Scanner Protocol version 1:
22
    https://docs.greenbone.net/API/GMP/gmp-7.0.html
23
"""
24
import logging
25
26
from gvm.errors import RequiredArgument, InvalidArgument
27
from gvm.xml import XmlCommand
28
29
from .base import GvmProtocol
30
31
logger = logging.getLogger(__name__)
32
33
PROTOCOL_VERSION = (1, 2)
34
35
36
def create_credentials_element(_xmlcredentials, credentials):
37
    """Generates an xml element with credentials."""
38
    for service, credential in credentials.items():
39
        cred_type = credential.get("type")
40
        serv_port = credential.get("port")
41
        username = credential.get("username")
42
        password = credential.get("password")
43
44
        _xmlcredential = _xmlcredentials.add_element("credential")
45
        _xmlcredential.set_attribute("type", cred_type)
46
        _xmlcredential.set_attribute("port", serv_port)
47
        _xmlcredential.set_attribute("service", service)
48
49
        _xmlcredential.add_element("username", username)
50
        _xmlcredential.add_element("password", password)
51
    return _xmlcredentials
52
53
54
def create_vt_selection_element(_xmlvtselection, vt_selection):
55
    """Generates an xml element with a selection of Vulnerability tests."""
56
    for vt_id, vt_values in vt_selection.items():
57
        if vt_id != "vt_groups" and isinstance(vt_values, dict):
58
            _xmlvt = _xmlvtselection.add_element(
59
                "vt_single", attrs={"id": vt_id}
60
            )
61
            if vt_values:
62
                for key, value in vt_values.items():
63
                    _xmlvt.add_element("vt_value", value, attrs={"id": key})
64
        elif vt_id == "vt_groups" and isinstance(vt_values, list):
65
            for group in vt_values:
66
                _xmlvt = _xmlvtselection.add_element(
67
                    "vt_group", attrs={"filter": group}
68
                )
69
        else:
70
            raise InvalidArgument(
71
                "It was not possible to add {} to the VTs "
72
                "selection.".format(vt_id)
73
            )
74
75
    return _xmlvtselection
76
77
78
class Osp(GvmProtocol):
79
    @staticmethod
80
    def get_protocol_version():
81
        """Determine the Open Scanner Protocol version.
82
83
        Returns:
84
            tuple: Implemented version of the Open Scanner Protocol
85
        """
86
        return PROTOCOL_VERSION
87
88
    def _read(self):
89
        # OSP is stateless. Therefore the connection is closed after each
90
        # response and we must reset the connection
91
        data = super()._read()
92
        self.disconnect()
93
        return data
94
95
    def get_version(self):
96
        """Get the version of the OSPD server which is connected to."""
97
        cmd = XmlCommand("get_version")
98
        return self._send_xml_command(cmd)
99
100
    def help(self):
101
        """Get the help text."""
102
        cmd = XmlCommand("help")
103
        return self._send_xml_command(cmd)
104
105
    def get_scans(self, scan_id=None, details=True, pop_results=False):
106
        """Get the stored scans.
107
108
        Arguments:
109
            scan_id (str, optional): UUID identifier for a scan.
110
            details (boolean, optional): Whether to get full scan reports.
111
                Default: True
112
            pop_results (boolean, optional) Whether to remove the fetched
113
                results. Default: False
114
115
        Returns:
116
            str: Response from server.
117
        """
118
        cmd = XmlCommand("get_scans")
119
        if scan_id:
120
            cmd.set_attribute("scan_id", scan_id)
121
        if details:
122
            cmd.set_attribute("details", "1")
123
        else:
124
            cmd.set_attribute("details", "0")
125
126
        if pop_results:
127
            cmd.set_attribute("pop_results", "1")
128
        else:
129
            cmd.set_attribute("pop_results", "0")
130
131
        return self._send_xml_command(cmd)
132
133
    def delete_scan(self, scan_id=None):
134
        """Delete a finished scan.
135
136
        Arguments:
137
            scan_id (str): UUID identifier for a finished scan.
138
139
        Returns:
140
            str: Response from server.
141
        """
142
        if not scan_id:
143
            raise ValueError("delete_scan requires a scan_id element")
144
        cmd = XmlCommand("delete_scan")
145
        cmd.set_attribute("scan_id", scan_id)
146
147
        return self._send_xml_command(cmd)
148
149
    def get_scanner_details(self):
150
        """Return scanner description and parameters."""
151
        cmd = XmlCommand("get_scanner_details")
152
        return self._send_xml_command(cmd)
153
154
    def get_vts(self, vt_id=None):
155
        """Return information about vulnerability tests,
156
        if offered by scanner.
157
158
        Arguments:
159
            vt_id (str, optional): UUID identifier for a vulnerability test.
160
161
        Returns:
162
            str: Response from server.
163
        """
164
        cmd = XmlCommand("get_vts")
165
        if vt_id:
166
            cmd.set_attribute("vt_id", vt_id)
167
168
        return self._send_xml_command(cmd)
169
170
    def start_scan(
171
        self,
172
        scan_id=None,
173
        parallel=1,
174
        target=None,
175
        ports=None,
176
        targets=None,
177
        scanner_params=None,
178
        vt_selection=None,
179
    ):
180
        """Start a new scan.
181
182
        Arguments:
183
            scan_id (str, optional): UUID identifier for a running scan.
184
            parallel (int, optional): Number of parallel scanned targets.
185
                Default 1.
186
            target (dict, optional): Deprecated. Please use targets instead.
187
            targets (list, optional): List of dictionaries. See example.
188
            ports (str, optional): Deprecated. Ports to use for target
189
                parameter.
190
            scanner_params: (dict, optional): Dictionary of scanner parameters.
191
            vt_selection: (dict, optional): Vulnerability tests to select. See
192
                example.
193
194
        Returns:
195
            str: Response from server.
196
197
198
        Examples:
199
200
            Scanner Parameters::
201
202
                scanner_parameters = {
203
                    'scan_param1': 'scan_param1_value',
204
                    'scan_param2': 'scan_param2_value',
205
                }
206
207
            Targets::
208
209
                targets = [{
210
                    'hosts': 'localhost',
211
                    'ports': '80,43'
212
                }, {
213
                    'hosts': '192.168.0.0/24',
214
                    'ports': '22',
215
                }, {
216
                    'credentials': {
217
                        'smb': {
218
                            'password': 'pass',
219
                            'port': 'port',
220
                            'type': 'type',
221
                            'username': 'username',
222
                        }
223
                    }
224
                }]
225
226
            VT Selection::
227
228
                vt_selection = {
229
                    'vt1': {},
230
                    'vt2': {'value_id': 'value'},
231
                    'vt_groups': ['family=debian', 'family=general']
232
                }
233
        """
234
        cmd = XmlCommand("start_scan")
235
236
        if scan_id:
237
            cmd.set_attribute("scan_id", scan_id)
238
239
        cmd.set_attribute("parallel", str(parallel))
240
241
        # Add <scanner_params> even if it is empty, since it is mandatory
242
        _xmlscanparams = cmd.add_element("scanner_params")
243
        if scanner_params:
244
            _xmlscanparams.set_attributes(scanner_params)
245
246
        if targets:
247
            _xmltargets = cmd.add_element("targets")
248
            for target in targets:
249
                _xmltarget = _xmltargets.add_element("target")
250
                hosts = target.get("hosts")
251
                ports = target.get("ports")
252
                credentials = target.get("credentials")
253
                _xmltarget.add_element("hosts", hosts)
254
                _xmltarget.add_element("ports", ports)
255
                if credentials:
256
                    _xmlcredentials = _xmltarget.add_element("credentials")
257
                    _xmlcredentials = create_credentials_element(
258
                        _xmlcredentials, credentials
259
                    )
260
        # Check target as attribute for legacy mode compatibility. Deprecated.
261
        elif target:
262
            cmd.set_attribute("target", target)
263
            if ports:
264
                cmd.set_attribute("ports", ports)
265
        else:
266
            raise RequiredArgument(
267
                "start_scan requires a target. Please pass "
268
                "targets parameter."
269
            )
270
271
        if vt_selection:
272
            _xmlvtselection = cmd.add_element("vt_selection")
273
            _xmlvtselection = create_vt_selection_element(
274
                _xmlvtselection, vt_selection
275
            )
276
277
        return self._send_xml_command(cmd)
278
279
    def stop_scan(self, scan_id):
280
        """Stop a currently running scan.
281
282
        Args:
283
            scan_id (str): UUID identifier for a running scan.
284
285
        Returns:
286
            str: Response from server.
287
        """
288
        if not scan_id:
289
            raise RequiredArgument("stop_scan requires a scan_id argument")
290
291
        cmd = XmlCommand("stop_scan")
292
        cmd.set_attribute("scan_id", scan_id)
293
294
        return self._send_xml_command(cmd)
295