Passed
Pull Request — master (#55)
by
unknown
01:51
created

gvm.protocols.ospv1.Osp.get_version()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 1
dl 0
loc 4
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
"""
19
Module for communication to a daemon speaking `Open Scanner Protocol version 1`_
20
21
.. _Open Scanner Protocol version 1:
22
    https://docs.greenbone.net/API/GMP/gmp-7.0.html
23
"""
24
import logging
25
26
from gvm.errors import RequiredArgument
27
from gvm.utils import get_version_string
28
from gvm.xml import XmlCommand
29
30
from .base import GvmProtocol
31
32
logger = logging.getLogger(__name__)
33
34
PROTOCOL_VERSION = (1, 2,)
35
36
37
def create_credentials_element(_xmlcredentials, credentials):
38
    """Generates an xml element with credentials."""
39
    for service, credential in credentials.items():
40
        cred_type = credential.get('type')
41
        serv_port = credential.get('port')
42
        username = credential.get('username')
43
        password = credential.get('password')
44
        _xmlcredential = _xmlcredentials.add_element(
45
            'credential', attrs={
46
                'type': cred_type,
47
                'port': serv_port,
48
                'service': service,
49
            })
50
        _xmlcredential.add_element('username', username)
51
        _xmlcredential.add_element('password', password)
52
    return _xmlcredentials
53
54
def create_vt_selection_element(_xmlvtselection, vt_selection):
55
    """Generates an xml element with a selection of Vulnerability tests."""
56
    for vt_id, vt_values in vt_selection.items():
57
        if vt_id != 'vt_groups':
58
            _xmlvt = _xmlvtselection.add_element('vt_single',
59
                                                 attrs={'id': vt_id})
60
            if vt_values:
61
                for key, value in vt_values.items():
62
                    _xmlvt.add_element('vt_value', value, attrs={'id': key})
63
        else:
64
            for group in vt_values:
65
                _xmlvt = _xmlvtselection.add_element(
66
                    'vt_group', attrs={'filter': group})
67
68
    return _xmlvtselection
69
70
class Osp(GvmProtocol):
71
72
    @staticmethod
73
    def get_protocol_version():
74
        """Allow to determine the Open Scanner Protocol version.
75
76
            Returns:
77
                str: Implemented version of the Open Scanner Protocol
78
        """
79
        return get_version_string(PROTOCOL_VERSION)
80
81
    def _read(self):
82
        # OSP is stateless. Therefore the connection is closed after each
83
        # response and we must reset the connection
84
        data = super()._read()
85
        self.disconnect()
86
        return data
87
88
    def _send(self, data):
89
        # OSP is stateless. Therefore we can shutdown the socket if we are done
90
        # with sending
91
        super()._send(data)
92
        self._connection.finish_send()
93
94
    def get_version(self):
95
        """Get the version of the OSPD server which is connected to."""
96
        cmd = XmlCommand('get_version')
97
        return self._send_xml_command(cmd)
98
99
    def help(self):
100
        """Get the help text."""
101
        cmd = XmlCommand('help')
102
        return self._send_xml_command(cmd)
103
104
    def get_scans(self, scan_id=None, details=True, pop_results=False):
105
        """Get the stored scans.
106
107
        Arguments:
108
            scan_id (str, optional): UUID identifier for a scan.
109
            details (boolean, optional): Whether to get full scan reports.
110
                Default: True
111
            pop_results (boolean, optional) Whether to remove the fetched
112
                results. Default: False
113
114
        Returns:
115
            str: Response from server.
116
        """
117
        cmd = XmlCommand('get_scans')
118
        if scan_id:
119
            cmd.set_attribute('scan_id', scan_id)
120
        if details:
121
            cmd.set_attribute('details', '1')
122
        else:
123
            cmd.set_attribute('details', '0')
124
125
        if pop_results:
126
            cmd.set_attribute('pop_results', '1')
127
        else:
128
            cmd.set_attribute('pop_results', '0')
129
130
        return self._send_xml_command(cmd)
131
132
    def delete_scan(self, scan_id=None):
133
        """Delete a finished scan.
134
135
        Arguments:
136
            scan_id (str): UUID identifier for a finished scan.
137
138
        Returns:
139
            str: Response from server.
140
        """
141
        if not scan_id:
142
            raise ValueError('delete_scan requires a scan_id element')
143
        cmd = XmlCommand('delete_scan')
144
        cmd.set_attribute('scan_id', scan_id)
145
146
        return self._send_xml_command(cmd)
147
148
    def get_scanner_details(self):
149
        """Return scanner description and parameters."""
150
        cmd = XmlCommand('get_scanner_details')
151
        return self._send_xml_command(cmd)
152
153
    def get_vts(self, vt_id=None):
154
        """Return information about vulnerability tests,
155
        if offered by scanner.
156
157
        Arguments:
158
            vt_id (str, optional): UUID identifier for a vulnerability test.
159
160
        Returns:
161
            str: Response from server.
162
        """
163
        cmd = XmlCommand('get_vts')
164
        if vt_id:
165
            cmd.set_attribute('vt_id', vt_id)
166
167
        return self._send_xml_command(cmd)
168
169
    def start_scan(self, scan_id=None, parallel=1, target=None,
0 ignored issues
show
Comprehensibility introduced by
This function exceeds the maximum number of variables (17/15).
Loading history...
170
                   ports=None, targets=None, scanner_params=None,
171
                   vt_selection=None):
172
        """Start a new scan.
173
174
        Arguments:
175
            scan_id (str, optional): UUID identifier for a running scan.
176
            parallel (int, optional): Number of parallel scanned targets.
177
                Default 1.
178
            target (dict, optional): Deprecated. Please use targets instead.
179
            targets (list, optional): List of dictionaries. See example.
180
            ports (str, optional): Deprecated. Ports to use for target
181
                parameter.
182
            scanner_params: (dict, optional): Dictionary of scanner parameters.
183
            vt_selection: (dict, optional): Vulnerability tests to select. See
184
                example.
185
186
        Returns:
187
            str: Response from server.
188
189
190
        Examples:
191
192
            Scanner Parameters::
193
194
                scanner_parameters = {
195
                    'scan_param1': 'scan_param1_value',
196
                    'scan_param2': 'scan_param2_value',
197
                }
198
199
            Targets::
200
201
                targets = [{
202
                    'hosts': 'localhost',
203
                    'ports': '80,43'
204
                }, {
205
                    'hosts': '192.168.0.0/24',
206
                    'ports': '22',
207
                }, {
208
                    'credentials': {
209
                        'smb': {
210
                            'password': 'pass',
211
                            'port': 'port',
212
                            'type': 'type',
213
                            'username': 'username',
214
                        }
215
                    }
216
                }]
217
218
            VT Selection::
219
220
                vt_selection = {
221
                    'vt1': {},
222
                    'vt2': {'value_id': 'value'},
223
                    'vt_groups': ['family=debian', 'family=general']
224
                }
225
        """
226
        cmd = XmlCommand('start_scan')
227
228
        if scan_id:
229
            cmd.set_attribute('scan_id', scan_id)
230
231
        cmd.set_attribute('parallel', str(parallel))
232
233
        # Add <scanner_params> even if it is empty, since it is mandatory
234
        _xmlscanparams = cmd.add_element('scanner_params')
235
        if scanner_params:
236
            _xmlscanparams.set_attributes(scanner_params)
237
238
        if targets:
239
            _xmltargets = cmd.add_element('targets')
240
            for target in targets:
0 ignored issues
show
unused-code introduced by
Redefining argument with the local name 'target'
Loading history...
241
                _xmltarget = _xmltargets.add_element('target')
242
                hosts = target.get('hosts')
243
                ports = target.get('ports')
244
                credentials = target.get('credentials')
245
                _xmltarget.add_element('hosts', hosts)
246
                _xmltarget.add_element('ports', ports)
247
                if credentials:
248
                    _xmlcredentials = _xmltarget.add_element('credentials')
249
                    _xmlcredentials = (create_credentials_element(
250
                        _xmlcredentials, credentials))
251
        # Check target as attribute for legacy mode compatibility. Deprecated.
252
        elif target:
253
            cmd.set_attribute('target', target)
254
            if ports:
255
                cmd.set_attribute('ports', ports)
256
        else:
257
            raise RequiredArgument('start_scan requires a target. Please pass '
258
                                   'targets parameter.')
259
260
        if vt_selection:
261
            _xmlvtselection = cmd.add_element('vt_selection')
262
            _xmlvtselection = create_vt_selection_element(
263
                _xmlvtselection, vt_selection)
264
265
        return self._send_xml_command(cmd)
266
267
    def stop_scan(self, scan_id):
268
        """Stop a currently running scan.
269
270
        Args:
271
            scan_id (str): UUID identifier for a running scan.
272
273
        Returns:
274
            str: Response from server.
275
        """
276
        if not scan_id:
277
            raise RequiredArgument('stop_scan requires a scan_id argument')
278
279
        cmd = XmlCommand('stop_scan')
280
        cmd.set_attribute('scan_id', scan_id)
281
282
        return self._send_xml_command(cmd)
283