Passed
Push — master ( b63135...f278ac )
by
unknown
02:20 queued 24s
created

gvm.protocols.ospv1.create_vt_selection_element()   C

Complexity

Conditions 9

Size

Total Lines 19
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 9
eloc 16
nop 2
dl 0
loc 19
rs 6.6666
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
"""
19
Module for communication to a daemon speaking `Open Scanner Protocol version 1`_
20
21
.. _Open Scanner Protocol version 1:
22
    https://docs.greenbone.net/API/GMP/gmp-7.0.html
23
"""
24
import logging
25
26
from gvm.errors import RequiredArgument
27
from gvm.utils import get_version_string
28
from gvm.xml import XmlCommand
29
30
from .base import GvmProtocol
31
32
logger = logging.getLogger(__name__)
33
34
PROTOCOL_VERSION = (1, 2,)
35
36
37
def create_credentials_element(_xmlcredentials, credentials):
38
    """Generates an xml element with credentials."""
39
    for service, credential in credentials.items():
40
        cred_type = credential.get('type')
41
        serv_port = credential.get('port')
42
        username = credential.get('username')
43
        password = credential.get('password')
44
        _xmlcredential = _xmlcredentials.add_element(
45
            'credential', attrs={
46
                'type': cred_type,
47
                'port': serv_port,
48
                'service': service,
49
            })
50
        _xmlcredential.add_element('username', username)
51
        _xmlcredential.add_element('password', password)
52
    return _xmlcredentials
53
54
def create_vt_selection_element(_xmlvtselection, vt_selection):
55
    """Generates an xml element with a selection of Vulnerability tests."""
56
    for vt_id, vt_values in vt_selection.items():
57
        if vt_id != 'vt_groups' and isinstance(vt_values, dict):
58
            _xmlvt = _xmlvtselection.add_element('vt_single',
59
                                                 attrs={'id': vt_id})
60
            if vt_values:
61
                for key, value in vt_values.items():
62
                    _xmlvt.add_element('vt_value', value, attrs={'id': key})
63
        elif vt_id == 'vt_groups' and isinstance(vt_values, list):
64
            for group in vt_values:
65
                _xmlvt = _xmlvtselection.add_element(
66
                    'vt_group', attrs={'filter': group})
67
        else:
68
            raise InvalidArgument(
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable InvalidArgument does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
Undefined variable 'InvalidArgument'
Loading history...
69
                'It was not possible to add {} to the VTs '
70
                'selection.'.format(vt_id))
71
72
    return _xmlvtselection
73
74
class Osp(GvmProtocol):
75
76
    @staticmethod
77
    def get_protocol_version():
78
        """Allow to determine the Open Scanner Protocol version.
79
80
            Returns:
81
                str: Implemented version of the Open Scanner Protocol
82
        """
83
        return get_version_string(PROTOCOL_VERSION)
84
85
    def _read(self):
86
        # OSP is stateless. Therefore the connection is closed after each
87
        # response and we must reset the connection
88
        data = super()._read()
89
        self.disconnect()
90
        return data
91
92
    def _send(self, data):
93
        # OSP is stateless. Therefore we can shutdown the socket if we are done
94
        # with sending
95
        super()._send(data)
96
        self._connection.finish_send()
97
98
    def get_version(self):
99
        """Get the version of the OSPD server which is connected to."""
100
        cmd = XmlCommand('get_version')
101
        return self._send_xml_command(cmd)
102
103
    def help(self):
104
        """Get the help text."""
105
        cmd = XmlCommand('help')
106
        return self._send_xml_command(cmd)
107
108
    def get_scans(self, scan_id=None, details=True, pop_results=False):
109
        """Get the stored scans.
110
111
        Arguments:
112
            scan_id (str, optional): UUID identifier for a scan.
113
            details (boolean, optional): Whether to get full scan reports.
114
                Default: True
115
            pop_results (boolean, optional) Whether to remove the fetched
116
                results. Default: False
117
118
        Returns:
119
            str: Response from server.
120
        """
121
        cmd = XmlCommand('get_scans')
122
        if scan_id:
123
            cmd.set_attribute('scan_id', scan_id)
124
        if details:
125
            cmd.set_attribute('details', '1')
126
        else:
127
            cmd.set_attribute('details', '0')
128
129
        if pop_results:
130
            cmd.set_attribute('pop_results', '1')
131
        else:
132
            cmd.set_attribute('pop_results', '0')
133
134
        return self._send_xml_command(cmd)
135
136
    def delete_scan(self, scan_id=None):
137
        """Delete a finished scan.
138
139
        Arguments:
140
            scan_id (str): UUID identifier for a finished scan.
141
142
        Returns:
143
            str: Response from server.
144
        """
145
        if not scan_id:
146
            raise ValueError('delete_scan requires a scan_id element')
147
        cmd = XmlCommand('delete_scan')
148
        cmd.set_attribute('scan_id', scan_id)
149
150
        return self._send_xml_command(cmd)
151
152
    def get_scanner_details(self):
153
        """Return scanner description and parameters."""
154
        cmd = XmlCommand('get_scanner_details')
155
        return self._send_xml_command(cmd)
156
157
    def get_vts(self, vt_id=None):
158
        """Return information about vulnerability tests,
159
        if offered by scanner.
160
161
        Arguments:
162
            vt_id (str, optional): UUID identifier for a vulnerability test.
163
164
        Returns:
165
            str: Response from server.
166
        """
167
        cmd = XmlCommand('get_vts')
168
        if vt_id:
169
            cmd.set_attribute('vt_id', vt_id)
170
171
        return self._send_xml_command(cmd)
172
173
    def start_scan(self, scan_id=None, parallel=1, target=None,
0 ignored issues
show
Comprehensibility introduced by
This function exceeds the maximum number of variables (17/15).
Loading history...
174
                   ports=None, targets=None, scanner_params=None,
175
                   vt_selection=None):
176
        """Start a new scan.
177
178
        Arguments:
179
            scan_id (str, optional): UUID identifier for a running scan.
180
            parallel (int, optional): Number of parallel scanned targets.
181
                Default 1.
182
            target (dict, optional): Deprecated. Please use targets instead.
183
            targets (list, optional): List of dictionaries. See example.
184
            ports (str, optional): Deprecated. Ports to use for target
185
                parameter.
186
            scanner_params: (dict, optional): Dictionary of scanner parameters.
187
            vt_selection: (dict, optional): Vulnerability tests to select. See
188
                example.
189
190
        Returns:
191
            str: Response from server.
192
193
194
        Examples:
195
196
            Scanner Parameters::
197
198
                scanner_parameters = {
199
                    'scan_param1': 'scan_param1_value',
200
                    'scan_param2': 'scan_param2_value',
201
                }
202
203
            Targets::
204
205
                targets = [{
206
                    'hosts': 'localhost',
207
                    'ports': '80,43'
208
                }, {
209
                    'hosts': '192.168.0.0/24',
210
                    'ports': '22',
211
                }, {
212
                    'credentials': {
213
                        'smb': {
214
                            'password': 'pass',
215
                            'port': 'port',
216
                            'type': 'type',
217
                            'username': 'username',
218
                        }
219
                    }
220
                }]
221
222
            VT Selection::
223
224
                vt_selection = {
225
                    'vt1': {},
226
                    'vt2': {'value_id': 'value'},
227
                    'vt_groups': ['family=debian', 'family=general']
228
                }
229
        """
230
        cmd = XmlCommand('start_scan')
231
232
        if scan_id:
233
            cmd.set_attribute('scan_id', scan_id)
234
235
        cmd.set_attribute('parallel', str(parallel))
236
237
        # Add <scanner_params> even if it is empty, since it is mandatory
238
        _xmlscanparams = cmd.add_element('scanner_params')
239
        if scanner_params:
240
            _xmlscanparams.set_attributes(scanner_params)
241
242
        if targets:
243
            _xmltargets = cmd.add_element('targets')
244
            for target in targets:
0 ignored issues
show
unused-code introduced by
Redefining argument with the local name 'target'
Loading history...
245
                _xmltarget = _xmltargets.add_element('target')
246
                hosts = target.get('hosts')
247
                ports = target.get('ports')
248
                credentials = target.get('credentials')
249
                _xmltarget.add_element('hosts', hosts)
250
                _xmltarget.add_element('ports', ports)
251
                if credentials:
252
                    _xmlcredentials = _xmltarget.add_element('credentials')
253
                    _xmlcredentials = (create_credentials_element(
254
                        _xmlcredentials, credentials))
255
        # Check target as attribute for legacy mode compatibility. Deprecated.
256
        elif target:
257
            cmd.set_attribute('target', target)
258
            if ports:
259
                cmd.set_attribute('ports', ports)
260
        else:
261
            raise RequiredArgument('start_scan requires a target. Please pass '
262
                                   'targets parameter.')
263
264
        if vt_selection:
265
            _xmlvtselection = cmd.add_element('vt_selection')
266
            _xmlvtselection = create_vt_selection_element(
267
                _xmlvtselection, vt_selection)
268
269
        return self._send_xml_command(cmd)
270
271
    def stop_scan(self, scan_id):
272
        """Stop a currently running scan.
273
274
        Args:
275
            scan_id (str): UUID identifier for a running scan.
276
277
        Returns:
278
            str: Response from server.
279
        """
280
        if not scan_id:
281
            raise RequiredArgument('stop_scan requires a scan_id argument')
282
283
        cmd = XmlCommand('stop_scan')
284
        cmd.set_attribute('scan_id', scan_id)
285
286
        return self._send_xml_command(cmd)
287