Completed
Push — master ( f97307...dda958 )
by Juan José
13s
created

gvm.protocols.ospv1.Osp._read()   A

Complexity

Conditions 1

Size

Total Lines 6
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 1
dl 0
loc 6
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
"""
19
Module for communication to a daemon speaking Open Scanner Protocol version 1
20
"""
21
import logging
22
23
from gvm.errors import RequiredArgument
24
from gvm.utils import get_version_string
25
from gvm.xml import XmlCommand
26
27
from .base import GvmProtocol
28
29
logger = logging.getLogger(__name__)
30
31
PROTOCOL_VERSION = (1, 2,)
32
33
34
def create_credentials_element(_xmlcredentials, credentials):
35
    """Generates an xml element with credentials."""
36
    for service, credential in credentials.items():
37
        cred_type = credential.get('type')
38
        serv_port = credential.get('port')
39
        username = credential.get('username')
40
        password = credential.get('password')
41
        _xmlcredential = _xmlcredentials.add_element(
42
            'credential', attrs={
43
                'type': cred_type,
44
                'port': serv_port,
45
                'service': service,
46
            })
47
        _xmlcredential.add_element('username', username)
48
        _xmlcredential.add_element('password', password)
49
    return _xmlcredentials
50
51
def create_vt_selection_element(_xmlvtselection, vt_selection):
52
    """Generates an xml element with a selection of Vulnerability tests."""
53
    for vt_id, vt_values in vt_selection.items():
54
        if vt_id != 'vt_groups':
55
            _xmlvt = _xmlvtselection.add_element('vt_single',
56
                                                 attrs={'id': vt_id})
57
            if vt_values:
58
                for key, value in vt_values.items():
59
                    _xmlvt.add_element('vt_value', value, attrs={'id': key})
60
        else:
61
            for group in vt_values:
62
                _xmlvt = _xmlvtselection.add_element(
63
                    'vt_group', attrs={'filter': group})
64
65
    return _xmlvtselection
66
67
class Osp(GvmProtocol):
68
69
    @staticmethod
70
    def get_protocol_version():
71
        """Allow to determine the Open Scanner Protocol version.
72
73
            Returns:
74
                str: Implemented version of the Open Scanner Protocol
75
        """
76
        return get_version_string(PROTOCOL_VERSION)
77
78
    def _read(self):
79
        # OSP is stateless. Therefore the connection is closed after each
80
        # response and we must reset the connection
81
        data = super()._read()
82
        self.disconnect()
83
        return data
84
85
    def get_version(self):
86
        """Get the version of the OSPD server which is connected to."""
87
        cmd = XmlCommand('get_version')
88
        return self._send_xml_command(cmd)
89
90
    def help(self):
91
        """Get the help text."""
92
        cmd = XmlCommand('help')
93
        return self._send_xml_command(cmd)
94
95
    def get_scans(self, scan_id=None, details=True, pop_results=False):
96
        """Get the stored scans.
97
98
        Arguments:
99
            scan_id (str, optional): UUID identifier for a scan.
100
            details (boolean, optional): Whether to get full scan reports.
101
                Default: True
102
            pop_results (boolean, optional) Whether to remove the fetched
103
                results. Default: False
104
105
        Returns:
106
            str: Response from server.
107
        """
108
        cmd = XmlCommand('get_scans')
109
        if scan_id:
110
            cmd.set_attribute('scan_id', scan_id)
111
        if details:
112
            cmd.set_attribute('details', '1')
113
        else:
114
            cmd.set_attribute('details', '0')
115
116
        if pop_results:
117
            cmd.set_attribute('pop_results', '1')
118
        else:
119
            cmd.set_attribute('pop_results', '0')
120
121
        return self._send_xml_command(cmd)
122
123
    def delete_scan(self, scan_id=None):
124
        """Delete a finished scan.
125
126
        Arguments:
127
            scan_id (str): UUID identifier for a finished scan.
128
129
        Returns:
130
            str: Response from server.
131
        """
132
        if not scan_id:
133
            raise ValueError('delete_scan requires a scan_id element')
134
        cmd = XmlCommand('delete_scan')
135
        cmd.set_attribute('scan_id', scan_id)
136
137
        return self._send_xml_command(cmd)
138
139
    def get_scanner_details(self):
140
        """Return scanner description and parameters."""
141
        cmd = XmlCommand('get_scanner_details')
142
        return self._send_xml_command(cmd)
143
144
    def get_vts(self, vt_id=None):
145
        """Return information about vulnerability tests,
146
        if offered by scanner.
147
148
        Arguments:
149
            vt_id (str, optional): UUID identifier for a vulnerability test.
150
151
        Returns:
152
            str: Response from server.
153
        """
154
        cmd = XmlCommand('get_vts')
155
        if vt_id:
156
            cmd.set_attribute('vt_id', vt_id)
157
158
        return self._send_xml_command(cmd)
159
160
    def start_scan(self, scan_id=None, parallel=1, target=None,
0 ignored issues
show
Comprehensibility introduced by
This function exceeds the maximum number of variables (17/15).
Loading history...
161
                   ports=None, targets=None, scanner_params=None,
162
                   vt_selection=None):
163
        """Start a new scan.
164
165
        Arguments:
166
            scan_id (str, optional): UUID identifier for a running scan.
167
            parallel (int, optional): Number of parallel scanned targets.
168
                Default 1.
169
            target (dict, optional): Deprecated. Please use targets instead.
170
            targets (list, optional): List of dictionaries. See example.
171
            ports (str, optional): Deprecated. Ports to use for target
172
                parameter.
173
            scanner_params: (dict, optional): Dictionary of scanner parameters.
174
            vt_selection: (dict, optional): Vulnerability tests to select. See
175
                example.
176
177
        Returns:
178
            str: Response from server.
179
180
181
        Examples:
182
183
            Scanner Parameters::
184
185
                scanner_parameters = {
186
                    'scan_param1': 'scan_param1_value',
187
                    'scan_param2': 'scan_param2_value',
188
                }
189
190
            Targets::
191
192
                targets = [{
193
                    'hosts': 'localhost',
194
                    'ports': '80,43'
195
                }, {
196
                    'hosts': '192.168.0.0/24',
197
                    'ports': '22',
198
                }, {
199
                    'credentials': {
200
                        'smb': {
201
                            'password': 'pass',
202
                            'port': 'port',
203
                            'type': 'type',
204
                            'username': 'username',
205
                        }
206
                    }
207
                }]
208
209
            VT Selection::
210
211
                vt_selection = {
212
                    'vt1': {},
213
                    'vt2': {'value_id': 'value'},
214
                    'vt_groups': ['family=debian', 'family=general']
215
                }
216
        """
217
        cmd = XmlCommand('start_scan')
218
219
        if scan_id:
220
            cmd.set_attribute('scan_id', scan_id)
221
222
        cmd.set_attribute('parallel', str(parallel))
223
224
        # Add <scanner_params> even if it is empty, since it is mandatory
225
        _xmlscanparams = cmd.add_element('scanner_params')
226
        if scanner_params:
227
            _xmlscanparams.set_attributes(scanner_params)
228
229
        if targets:
230
            _xmltargets = cmd.add_element('targets')
231
            for target in targets:
0 ignored issues
show
unused-code introduced by
Redefining argument with the local name 'target'
Loading history...
232
                _xmltarget = _xmltargets.add_element('target')
233
                hosts = target.get('hosts')
234
                ports = target.get('ports')
235
                credentials = target.get('credentials')
236
                _xmltarget.add_element('hosts', hosts)
237
                _xmltarget.add_element('ports', ports)
238
                if credentials:
239
                    _xmlcredentials = _xmltarget.add_element('credentials')
240
                    _xmlcredentials = (create_credentials_element(
241
                        _xmlcredentials, credentials))
242
        # Check target as attribute for legacy mode compatibility. Deprecated.
243
        elif target:
244
            cmd.set_attribute('target', target)
245
            if ports:
246
                cmd.set_attribute('ports', ports)
247
        else:
248
            raise RequiredArgument('start_scan requires a target. Please pass '
249
                                   'targets parameter.')
250
251
        if vt_selection:
252
            _xmlvtselection = cmd.add_element('vt_selection')
253
            _xmlvtselection = create_vt_selection_element(
254
                _xmlvtselection, vt_selection)
255
256
        return self._send_xml_command(cmd)
257
258
    def stop_scan(self, scan_id):
259
        """Stop a currently running scan.
260
261
        Args:
262
            scan_id (str): UUID identifier for a running scan.
263
264
        Returns:
265
            str: Response from server.
266
        """
267
        if not scan_id:
268
            raise RequiredArgument('stop_scan requires a scan_id argument')
269
270
        cmd = XmlCommand('stop_scan')
271
        cmd.set_attribute('scan_id', scan_id)
272
273
        return self._send_xml_command(cmd)
274