Passed
Push — master ( 9a837c...ad9af7 )
by
unknown
02:35 queued 23s
created

gvm.protocols.ospv1.Osp._send()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 2
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 - 2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
"""
19
Module for communication to a daemon speaking `Open Scanner Protocol version 1`_
20
21
.. _Open Scanner Protocol version 1:
22
    https://docs.greenbone.net/API/GMP/gmp-7.0.html
23
"""
24
import logging
25
26
from gvm.errors import RequiredArgument, InvalidArgument
27
from gvm.xml import XmlCommand
28
29
from .base import GvmProtocol
30
31
logger = logging.getLogger(__name__)
32
33
PROTOCOL_VERSION = (1, 2)
34
35
36
def create_credentials_element(_xmlcredentials, credentials):
37
    """Generates an xml element with credentials."""
38
    for service, credential in credentials.items():
39
        cred_type = credential.get("type")
40
        serv_port = credential.get("port")
41
        username = credential.get("username")
42
        password = credential.get("password")
43
44
        _xmlcredential = _xmlcredentials.add_element("credential")
45
        _xmlcredential.set_attribute("type", cred_type)
46
        _xmlcredential.set_attribute("port", serv_port)
47
        _xmlcredential.set_attribute("service", service)
48
49
        _xmlcredential.add_element("username", username)
50
        _xmlcredential.add_element("password", password)
51
    return _xmlcredentials
52
53
54
def create_vt_selection_element(_xmlvtselection, vt_selection):
55
    """Generates an xml element with a selection of Vulnerability tests."""
56
    for vt_id, vt_values in vt_selection.items():
57
        if vt_id != "vt_groups" and isinstance(vt_values, dict):
58
            _xmlvt = _xmlvtselection.add_element(
59
                "vt_single", attrs={"id": vt_id}
60
            )
61
            if vt_values:
62
                for key, value in vt_values.items():
63
                    _xmlvt.add_element("vt_value", value, attrs={"id": key})
64
        elif vt_id == "vt_groups" and isinstance(vt_values, list):
65
            for group in vt_values:
66
                _xmlvt = _xmlvtselection.add_element(
67
                    "vt_group", attrs={"filter": group}
68
                )
69
        else:
70
            raise InvalidArgument(
71
                "It was not possible to add {} to the VTs "
72
                "selection.".format(vt_id)
73
            )
74
75
    return _xmlvtselection
76
77
78
class Osp(GvmProtocol):
79
    @staticmethod
80
    def get_protocol_version():
81
        """Determine the Open Scanner Protocol version.
82
83
        Returns:
84
            tuple: Implemented version of the Open Scanner Protocol
85
        """
86
        return PROTOCOL_VERSION
87
88
    def _read(self):
89
        # OSP is stateless. Therefore the connection is closed after each
90
        # response and we must reset the connection
91
        data = super()._read()
92
        self.disconnect()
93
        return data
94
95
    def get_version(self):
96
        """Get the version of the OSPD server which is connected to."""
97
        cmd = XmlCommand("get_version")
98
        return self._send_xml_command(cmd)
99
100
    def help(self):
101
        """Get the help text."""
102
        cmd = XmlCommand("help")
103
        return self._send_xml_command(cmd)
104
105
    def get_scans(self, scan_id=None, details=True, pop_results=False):
106
        """Get the stored scans.
107
108
        Arguments:
109
            scan_id (str, optional): UUID identifier for a scan.
110
            details (boolean, optional): Whether to get full scan reports.
111
                Default: True
112
            pop_results (boolean, optional) Whether to remove the fetched
113
                results. Default: False
114
115
        Returns:
116
            str: Response from server.
117
        """
118
        cmd = XmlCommand("get_scans")
119
        if scan_id:
120
            cmd.set_attribute("scan_id", scan_id)
121
        if details:
122
            cmd.set_attribute("details", "1")
123
        else:
124
            cmd.set_attribute("details", "0")
125
126
        if pop_results:
127
            cmd.set_attribute("pop_results", "1")
128
        else:
129
            cmd.set_attribute("pop_results", "0")
130
131
        return self._send_xml_command(cmd)
132
133
    def delete_scan(self, scan_id=None):
134
        """Delete a finished scan.
135
136
        Arguments:
137
            scan_id (str): UUID identifier for a finished scan.
138
139
        Returns:
140
            str: Response from server.
141
        """
142
        if not scan_id:
143
            raise ValueError("delete_scan requires a scan_id element")
144
        cmd = XmlCommand("delete_scan")
145
        cmd.set_attribute("scan_id", scan_id)
146
147
        return self._send_xml_command(cmd)
148
149
    def get_scanner_details(self):
150
        """Return scanner description and parameters."""
151
        cmd = XmlCommand("get_scanner_details")
152
        return self._send_xml_command(cmd)
153
154
    def get_vts(self, vt_id=None):
155
        """Return information about vulnerability tests,
156
        if offered by scanner.
157
158
        Arguments:
159
            vt_id (str, optional): UUID identifier for a vulnerability test.
160
161
        Returns:
162
            str: Response from server.
163
        """
164
        cmd = XmlCommand("get_vts")
165
        if vt_id:
166
            cmd.set_attribute("vt_id", vt_id)
167
168
        return self._send_xml_command(cmd)
169
170
    def start_scan(
171
        self,
172
        scan_id=None,
173
        parallel=1,
174
        target=None,
175
        ports=None,
176
        targets=None,
177
        scanner_params=None,
178
        vt_selection=None,
179
    ):
180
        """Start a new scan.
181
182
        Arguments:
183
            scan_id (str, optional): UUID identifier for a running scan.
184
            parallel (int, optional): Number of parallel scanned targets.
185
                Default 1.
186
            target (dict, optional): Deprecated. Please use targets instead.
187
            targets (list, optional): List of dictionaries. See example.
188
            ports (str, optional): Deprecated. Ports to use for target
189
                parameter.
190
            scanner_params: (dict, optional): Dictionary of scanner parameters.
191
            vt_selection: (dict, optional): Vulnerability tests to select. See
192
                example.
193
194
        Returns:
195
            str: Response from server.
196
197
198
        Examples:
199
200
            Scanner Parameters::
201
202
                scanner_parameters = {
203
                    'scan_param1': 'scan_param1_value',
204
                    'scan_param2': 'scan_param2_value',
205
                }
206
207
            Targets::
208
209
                targets = [{
210
                    'hosts': 'localhost',
211
                    'ports': '80,43'
212
                }, {
213
                    'hosts': '192.168.0.0/24',
214
                    'ports': '22',
215
                }, {
216
                    'credentials': {
217
                        'smb': {
218
                            'password': 'pass',
219
                            'port': 'port',
220
                            'type': 'type',
221
                            'username': 'username',
222
                        }
223
                    }
224
                }]
225
226
            VT Selection::
227
228
                vt_selection = {
229
                    'vt1': {},
230
                    'vt2': {'value_id': 'value'},
231
                    'vt_groups': ['family=debian', 'family=general']
232
                }
233
        """
234
        cmd = XmlCommand("start_scan")
235
236
        if scan_id:
237
            cmd.set_attribute("scan_id", scan_id)
238
239
        cmd.set_attribute("parallel", str(parallel))
240
241
        # Add <scanner_params> even if it is empty, since it is mandatory
242
        _xmlscanparams = cmd.add_element("scanner_params")
243
        if scanner_params:
244
            _xmlscanparams.set_attributes(scanner_params)
245
246
        if targets:
247
            _xmltargets = cmd.add_element("targets")
248
            for target in targets:
249
                _xmltarget = _xmltargets.add_element("target")
250
                hosts = target.get("hosts")
251
                ports = target.get("ports")
252
                credentials = target.get("credentials")
253
                _xmltarget.add_element("hosts", hosts)
254
                _xmltarget.add_element("ports", ports)
255
                if credentials:
256
                    _xmlcredentials = _xmltarget.add_element("credentials")
257
                    _xmlcredentials = create_credentials_element(
258
                        _xmlcredentials, credentials
259
                    )
260
        # Check target as attribute for legacy mode compatibility. Deprecated.
261
        elif target:
262
            cmd.set_attribute("target", target)
263
            if ports:
264
                cmd.set_attribute("ports", ports)
265
        else:
266
            raise RequiredArgument(
267
                "start_scan requires a target. Please pass "
268
                "targets parameter."
269
            )
270
271
        if vt_selection:
272
            _xmlvtselection = cmd.add_element("vt_selection")
273
            _xmlvtselection = create_vt_selection_element(
274
                _xmlvtselection, vt_selection
275
            )
276
277
        return self._send_xml_command(cmd)
278
279
    def stop_scan(self, scan_id):
280
        """Stop a currently running scan.
281
282
        Args:
283
            scan_id (str): UUID identifier for a running scan.
284
285
        Returns:
286
            str: Response from server.
287
        """
288
        if not scan_id:
289
            raise RequiredArgument("stop_scan requires a scan_id argument")
290
291
        cmd = XmlCommand("stop_scan")
292
        cmd.set_attribute("scan_id", scan_id)
293
294
        return self._send_xml_command(cmd)
295