Passed
Push — master ( 83ffd5...05d15c )
by
unknown
01:51 queued 33s
created

gvm.protocols.ospv1   A

Complexity

Total Complexity 29

Size/Duplication

Total Lines 223
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 29
eloc 103
dl 0
loc 223
rs 10
c 0
b 0
f 0

2 Functions

Rating   Name   Duplication   Size   Complexity  
A create_credentials_element() 0 15 2
B create_vt_selection_element() 0 15 6

9 Methods

Rating   Name   Duplication   Size   Complexity  
A Osp.get_scanner_details() 0 4 1
A Osp.help() 0 4 1
A Osp.get_vts() 0 14 2
A Osp.delete_scan() 0 12 2
A Osp.get_scans() 0 18 2
C Osp.start_scan() 0 71 9
A Osp.get_protocol_version() 0 8 1
A Osp.stop_scan() 0 13 2
A Osp.get_version() 0 4 1
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
"""
19
Module for communication to a daemon speaking Open Scanner Protocol version 1
20
"""
21
import logging
22
23
from gvm.utils import get_version_string
24
from gvm.xml import XmlCommand
25
26
from .base import GvmProtocol
27
28
logger = logging.getLogger(__name__)
29
30
PROTOCOL_VERSION = (1, 2,)
31
32
33
def create_credentials_element(_xmlcredentials, credentials):
34
    """Generates an xml element with credentials."""
35
    for service, credential in credentials.items():
36
        cred_type = credential.get('type')
37
        serv_port = credential.get('port')
38
        username = credential.get('username')
39
        password = credential.get('password')
40
        _xmlcredential = _xmlcredentials.add_element('credential',
41
                                                     attrs={'type': cred_type,
42
                                                            'port': serv_port,
43
                                                            'service': service,
44
                                                     })
0 ignored issues
show
Coding Style introduced by
Wrong continued indentation.
Loading history...
45
        _xmlcredential.add_element('username', username)
46
        _xmlcredential.add_element('password', password)
47
    return _xmlcredentials
48
49
def create_vt_selection_element(_xmlvtselection, vt_selection):
50
    """Generates an xml element with a selection of Vulnerability tests."""
51
    for vt_id, vt_values in vt_selection.items():
52
        if vt_id != 'vt_groups':
53
            _xmlvt = _xmlvtselection.add_element('vt_single',
54
                                                 attrs={'id': vt_id})
55
            if vt_values:
56
                for key, value in vt_values.items():
57
                    _xmlvt.add_element('vt_value', value, attrs={'id': key})
58
        else:
59
            for group in vt_values:
60
                _xmlvt = _xmlvtselection.add_element('vt_group',
61
                                                 attrs={'filter': group})
0 ignored issues
show
Coding Style introduced by
Wrong continued indentation (add 4 spaces).
Loading history...
62
63
    return _xmlvtselection
64
65
class Osp(GvmProtocol):
66
67
    @staticmethod
68
    def get_protocol_version():
69
        """Allow to determine the Open Scanner Protocol version.
70
71
            Returns:
72
                str: Implemented version of the Open Scanner Protocol
73
        """
74
        return get_version_string(PROTOCOL_VERSION)
75
76
    def get_version(self):
77
        """Get the version of the OSPD server which is connected to."""
78
        cmd = XmlCommand('get_version')
79
        return self.send_command(cmd.to_string())
80
81
    def help(self):
82
        """Get the help text."""
83
        cmd = XmlCommand('help')
84
        return self.send_command(cmd.to_string())
85
86
    def get_scans(self, scan_id=None, details='1', pop_results='0'):
87
        """Get the stored scans.
88
89
         Args:
90
            scan_id (uuid): Identifier for a scan.
91
            details (boolean): Whether to get full scan reports.
92
            pop_results (boolean) Whether to remove the fetched results.
93
94
        Returns:
95
            str: Response from server.
96
        """
97
        cmd = XmlCommand('get_scans')
98
        if scan_id:
99
            cmd.set_attribute('scan_id', scan_id)
100
        cmd.set_attribute('details', details)
101
        cmd.set_attribute('pop_results', pop_results)
102
103
        return self.send_command(cmd.to_string())
104
105
    def delete_scan(self, scan_id):
106
        """Delete a finished scan.
107
        Args:
108
            scan_id (uuid): Identifier for a finished scan.
109
        Returns:
110
            str: Response from server.
111
        """
112
        cmd = XmlCommand('delete_scan')
113
        if scan_id:
114
            cmd.set_attribute('scan_id', scan_id)
115
116
        return self.send_command(cmd.to_string())
117
118
    def get_scanner_details(self):
119
        """Return scanner description and parameters."""
120
        cmd = XmlCommand('get_scanner_details')
121
        return self.send_command(cmd.to_string())
122
123
    def get_vts(self, vt_id=None):
124
        """Return information about vulnerability tests,
125
        if offered by scanner.
126
127
        Args:
128
            vt_id (uuid): Identifier for a vulnerability test.
129
        Returns:
130
            str: Response from server.
131
        """
132
        cmd = XmlCommand('get_vts')
133
        if vt_id:
134
            cmd.set_attribute('vt_id', vt_id)
135
136
        return self.send_command(cmd.to_string())
137
138
    def start_scan(self, scan_id=None, parallel='1', target=None,
0 ignored issues
show
Comprehensibility introduced by
This function exceeds the maximum number of variables (18/15).
Loading history...
139
                   ports=None, **kwargs):
140
        """Start a new scan.
141
142
        Args:
143
            scan_id (uuid): Identifier for a running scan.
144
            kwargs (dict): A dict with scanner parameters, targets and a
145
                           VT selection.
146
147
        Returns:
148
            str: Response from server.
149
150
151
        kwargs example:
152
        {'scanner_parameters': {'scan_param1': 'scan_param1_value',
153
                                'scan_param2': 'scan_param2_value'},
154
         'targets': [{'hosts': 'localhost',
155
                      'ports': '80,43'},
156
                     {'hosts': '192.168.0.0/24',
157
                      'ports': '22'},
158
                      'credentials': {'smb': {'password': 'pass',
159
                                              'port': 'port',
160
                                              'type': 'type',
161
                                              'username': 'username'}},
162
                    ],
163
         'vt_selection': {'vt1': {},
164
                          'vt2': {'value_id': 'value'},
165
                          'vt_groups': ['family=debian', 'family=general']}
166
        }
167
168
        """
169
        cmd = XmlCommand('start_scan')
170
        if scan_id:
171
            cmd.set_attribute('scan_id', scan_id)
172
        cmd.set_attribute('parallel', parallel)
173
174
        # Add <scanner_params> even if it is empty, since it is mandatory
175
        _xmlscanparams = cmd.add_element('scanner_params')
176
        scanner_params = kwargs.get('scanner_params')
177
        if scanner_params:
178
            _xmlscanparams.set_attributes(scanner_params)
179
180
        targets = kwargs.get('targets')
181
        if targets:
182
            _xmltargets = cmd.add_element('targets')
183
            for target in targets:
0 ignored issues
show
unused-code introduced by
Redefining argument with the local name 'target'
Loading history...
184
                _xmltarget = _xmltargets.add_element('target')
185
                hosts = target.get('hosts')
186
                ports = target.get('ports')
187
                credentials = target.get('credentials')
188
                _xmltarget.add_element('hosts', hosts)
189
                _xmltarget.add_element('ports', ports)
190
                if credentials:
191
                    _xmlcredentials = _xmltarget.add_element('credentials')
192
                    _xmlcredentials = (create_credentials_element(
193
                        _xmlcredentials, credentials))
194
        # Check target as attribute for legacy mode compatibility. Deprecated.
195
        elif target:
196
            cmd.set_attribute('target', target)
197
            if ports:
198
                cmd.set_attribute('ports', ports)
199
        else:
200
            raise ValueError('start_scan requires a target')
201
202
        _xmlvtselection = cmd.add_element('vt_selection')
203
        vt_selection = kwargs.get('vt_selection')
204
        if vt_selection:
205
            _xmlvtselection = create_vt_selection_element(
206
                _xmlvtselection, vt_selection)
207
208
        return self.send_command(cmd.to_string())
209
210
    def stop_scan(self, scan_id=None):
211
        """Stop a currently running scan.
212
213
        Args:
214
            scan_id (uuid): Identifier for a running scan.
215
        Returns:
216
            str: Response from server.
217
        """
218
        cmd = XmlCommand('stop_scan')
219
        if scan_id:
220
            cmd.set_attribute('scan_id', scan_id)
221
222
        return self.send_command(cmd.to_string())
223