Completed
Push — master ( 52721c...da306e )
by Jaspar
18s queued 14s
created

gvm.protocols.gmpv208.entities.targets   B

Complexity

Total Complexity 49

Size/Duplication

Total Lines 385
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 202
dl 0
loc 385
rs 8.48
c 0
b 0
f 0
wmc 49

1 Function

Rating   Name   Duplication   Size   Complexity  
A get_alive_test_from_string() 0 22 3

6 Methods

Rating   Name   Duplication   Size   Complexity  
F TargetsMixin.modify_target() 0 100 17
A TargetsMixin.get_targets() 0 30 3
F TargetsMixin.create_target() 0 117 19
A TargetsMixin.get_target() 0 25 3
A TargetsMixin.delete_target() 0 19 2
A TargetsMixin.clone_target() 0 17 2

How to fix   Complexity   

Complexity

Complex classes like gvm.protocols.gmpv208.entities.targets often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
# pylint:  disable=redefined-builtin
20
21
from enum import Enum
22
from typing import Any, List, Optional
23
24
from gvm.errors import RequiredArgument, InvalidArgument, InvalidArgumentType
25
from gvm.utils import add_filter, to_bool, to_comma_list
26
from gvm.xml import XmlCommand
27
28
29
class AliveTest(Enum):
30
    """Enum for choosing an alive test"""
31
32
    SCAN_CONFIG_DEFAULT = 'Scan Config Default'
33
    ICMP_PING = 'ICMP Ping'
34
    TCP_ACK_SERVICE_PING = 'TCP-ACK Service Ping'
35
    TCP_SYN_SERVICE_PING = 'TCP-SYN Service Ping'
36
    ARP_PING = 'ARP Ping'
37
    APR_PING = 'ARP Ping'  # Alias for ARP_PING
38
    ICMP_AND_TCP_ACK_SERVICE_PING = 'ICMP & TCP-ACK Service Ping'
39
    ICMP_AND_ARP_PING = 'ICMP & ARP Ping'
40
    TCP_ACK_SERVICE_AND_ARP_PING = 'TCP-ACK Service & ARP Ping'
41
    ICMP_TCP_ACK_SERVICE_AND_ARP_PING = (  # pylint: disable=invalid-name
42
        'ICMP, TCP-ACK Service & ARP Ping'
43
    )
44
    CONSIDER_ALIVE = 'Consider Alive'
45
46
47
def get_alive_test_from_string(
48
    alive_test: Optional[str],
49
) -> Optional[AliveTest]:
50
    """Convert an alive test string into a AliveTest instance"""
51
    if not alive_test:
52
        return None
53
54
    alive_test = alive_test.lower()
55
56
    try:
57
        return AliveTest[
58
            alive_test.replace(',', '')
59
            .replace(' ', '_')
60
            .replace('-', '_')
61
            .replace('&', 'and')
62
            .upper()
63
        ]
64
    except KeyError:
65
        raise InvalidArgument(
66
            argument='alive_test',
67
            function=get_alive_test_from_string.__name__,
68
        ) from None
69
70
71
class TargetsMixin:
72
    def clone_target(self, target_id: str) -> Any:
73
        """Clone an existing target
74
75
        Arguments:
76
            target_id: UUID of an existing target to clone from
77
78
        Returns:
79
            The response. See :py:meth:`send_command` for details.
80
        """
81
        if not target_id:
82
            raise RequiredArgument(
83
                function=self.clone_target.__name__, argument='target_id'
84
            )
85
86
        cmd = XmlCommand("create_target")
87
        cmd.add_element("copy", target_id)
88
        return self._send_xml_command(cmd)
89
90
    def create_target(
91
        self,
92
        name: str,
93
        *,
94
        asset_hosts_filter: Optional[str] = None,
95
        hosts: Optional[List[str]] = None,
96
        comment: Optional[str] = None,
97
        exclude_hosts: Optional[List[str]] = None,
98
        ssh_credential_id: Optional[str] = None,
99
        ssh_credential_port: Optional[int] = None,
100
        smb_credential_id: Optional[str] = None,
101
        esxi_credential_id: Optional[str] = None,
102
        snmp_credential_id: Optional[str] = None,
103
        alive_test: Optional[AliveTest] = None,
104
        reverse_lookup_only: Optional[bool] = None,
105
        reverse_lookup_unify: Optional[bool] = None,
106
        port_range: Optional[str] = None,
107
        port_list_id: Optional[str] = None,
108
    ) -> Any:
109
        """Create a new target
110
111
        Arguments:
112
            name: Name of the target
113
            asset_hosts_filter: Filter to select target host from assets hosts
114
            hosts: List of hosts addresses to scan
115
            exclude_hosts: List of hosts addresses to exclude from scan
116
            comment: Comment for the target
117
            ssh_credential_id: UUID of a ssh credential to use on target
118
            ssh_credential_port: The port to use for ssh credential
119
            smb_credential_id: UUID of a smb credential to use on target
120
            snmp_credential_id: UUID of a snmp credential to use on target
121
            esxi_credential_id: UUID of a esxi credential to use on target
122
            alive_test: Which alive test to use
123
            reverse_lookup_only: Whether to scan only hosts that have names
124
            reverse_lookup_unify: Whether to scan only one IP when multiple IPs
125
                have the same name.
126
            port_range: Port range for the target
127
            port_list_id: UUID of the port list to use on target
128
129
        Returns:
130
            The response. See :py:meth:`send_command` for details.
131
        """
132
133
        cmd = XmlCommand("create_target")
134
        _xmlname = cmd.add_element("name", name)
135
136
        if not name:
137
            raise RequiredArgument(
138
                function=self.create_target.__name__, argument='name'
139
            )
140
141
        if asset_hosts_filter:
142
            cmd.add_element(
143
                "asset_hosts", attrs={"filter": str(asset_hosts_filter)}
144
            )
145
        elif hosts:
146
            cmd.add_element("hosts", to_comma_list(hosts))
147
        else:
148
            raise RequiredArgument(
149
                function=self.create_target.__name__,
150
                argument='hosts or asset_hosts_filter',
151
            )
152
153
        if comment:
154
            cmd.add_element("comment", comment)
155
156
        if exclude_hosts:
157
            cmd.add_element("exclude_hosts", to_comma_list(exclude_hosts))
158
159
        if ssh_credential_id:
160
            _xmlssh = cmd.add_element(
161
                "ssh_credential", attrs={"id": ssh_credential_id}
162
            )
163
            if ssh_credential_port:
164
                _xmlssh.add_element("port", str(ssh_credential_port))
165
166
        if smb_credential_id:
167
            cmd.add_element("smb_credential", attrs={"id": smb_credential_id})
168
169
        if esxi_credential_id:
170
            cmd.add_element("esxi_credential", attrs={"id": esxi_credential_id})
171
172
        if snmp_credential_id:
173
            cmd.add_element("snmp_credential", attrs={"id": snmp_credential_id})
174
175
        if alive_test:
176
            if not isinstance(alive_test, AliveTest):
177
                raise InvalidArgumentType(
178
                    function=self.create_target.__name__,
179
                    argument='alive_test',
180
                    arg_type=AliveTest.__name__,
181
                )
182
183
            cmd.add_element("alive_tests", alive_test.value)
184
185
        if reverse_lookup_only is not None:
186
            cmd.add_element("reverse_lookup_only", to_bool(reverse_lookup_only))
187
188
        if reverse_lookup_unify is not None:
189
            cmd.add_element(
190
                "reverse_lookup_unify", to_bool(reverse_lookup_unify)
191
            )
192
193
        # since 20.08 one of port_range or port_list_id is required!
194
        if not port_range and not port_list_id:
195
            raise RequiredArgument(
196
                function=self.create_target.__name__,
197
                argument='port_range or port_list_id',
198
            )
199
200
        if port_range:
201
            cmd.add_element("port_range", port_range)
202
203
        if port_list_id:
204
            cmd.add_element("port_list", attrs={"id": port_list_id})
205
206
        return self._send_xml_command(cmd)
207
208
    def delete_target(
209
        self, target_id: str, *, ultimate: Optional[bool] = False
210
    ) -> Any:
211
        """Deletes an existing target
212
213
        Arguments:
214
            target_id: UUID of the target to be deleted.
215
            ultimate: Whether to remove entirely, or to the trashcan.
216
        """
217
        if not target_id:
218
            raise RequiredArgument(
219
                function=self.delete_target.__name__, argument='target_id'
220
            )
221
222
        cmd = XmlCommand("delete_target")
223
        cmd.set_attribute("target_id", target_id)
224
        cmd.set_attribute("ultimate", to_bool(ultimate))
225
226
        return self._send_xml_command(cmd)
227
228
    def get_target(
229
        self, target_id: str, *, tasks: Optional[bool] = None
230
    ) -> Any:
231
        """Request a single target
232
233
        Arguments:
234
            target_id: UUID of an existing target
235
            tasks: Whether to include list of tasks that use the target
236
237
        Returns:
238
            The response. See :py:meth:`send_command` for details.
239
        """
240
        cmd = XmlCommand("get_targets")
241
242
        if not target_id:
243
            raise RequiredArgument(
244
                function=self.get_target.__name__, argument='target_id'
245
            )
246
247
        cmd.set_attribute("target_id", target_id)
248
249
        if tasks is not None:
250
            cmd.set_attribute("tasks", to_bool(tasks))
251
252
        return self._send_xml_command(cmd)
253
254
    def get_targets(
255
        self,
256
        *,
257
        filter: Optional[str] = None,
258
        filter_id: Optional[str] = None,
259
        trash: Optional[bool] = None,
260
        tasks: Optional[bool] = None,
261
    ) -> Any:
262
        """Request a list of targets
263
264
        Arguments:
265
            filter: Filter term to use for the query
266
            filter_id: UUID of an existing filter to use for the query
267
            trash: Whether to get the trashcan targets instead
268
            tasks: Whether to include list of tasks that use the target
269
270
        Returns:
271
            The response. See :py:meth:`send_command` for details.
272
        """
273
        cmd = XmlCommand("get_targets")
274
275
        add_filter(cmd, filter, filter_id)
276
277
        if trash is not None:
278
            cmd.set_attribute("trash", to_bool(trash))
279
280
        if tasks is not None:
281
            cmd.set_attribute("tasks", to_bool(tasks))
282
283
        return self._send_xml_command(cmd)
284
285
    def modify_target(
286
        self,
287
        target_id: str,
288
        *,
289
        name: Optional[str] = None,
290
        comment: Optional[str] = None,
291
        hosts: Optional[List[str]] = None,
292
        exclude_hosts: Optional[List[str]] = None,
293
        ssh_credential_id: Optional[str] = None,
294
        ssh_credential_port: Optional[bool] = None,
295
        smb_credential_id: Optional[str] = None,
296
        esxi_credential_id: Optional[str] = None,
297
        snmp_credential_id: Optional[str] = None,
298
        alive_test: Optional[AliveTest] = None,
299
        reverse_lookup_only: Optional[bool] = None,
300
        reverse_lookup_unify: Optional[bool] = None,
301
        port_list_id: Optional[str] = None,
302
    ) -> Any:
303
        """Modifies an existing target.
304
305
        Arguments:
306
            target_id: ID of target to modify.
307
            comment: Comment on target.
308
            name: Name of target.
309
            hosts: List of target hosts.
310
            exclude_hosts: A list of hosts to exclude.
311
            ssh_credential_id: UUID of SSH credential to use on target.
312
            ssh_credential_port: The port to use for ssh credential
313
            smb_credential_id: UUID of SMB credential to use on target.
314
            esxi_credential_id: UUID of ESXi credential to use on target.
315
            snmp_credential_id: UUID of SNMP credential to use on target.
316
            port_list_id: UUID of port list describing ports to scan.
317
            alive_test: Which alive tests to use.
318
            reverse_lookup_only: Whether to scan only hosts that have names.
319
            reverse_lookup_unify: Whether to scan only one IP when multiple IPs
320
                have the same name.
321
322
        Returns:
323
            The response. See :py:meth:`send_command` for details.
324
        """
325
        if not target_id:
326
            raise RequiredArgument(
327
                function=self.modify_target.__name__, argument='target_id'
328
            )
329
330
        cmd = XmlCommand("modify_target")
331
        cmd.set_attribute("target_id", target_id)
332
333
        if comment:
334
            cmd.add_element("comment", comment)
335
336
        if name:
337
            cmd.add_element("name", name)
338
339
        if hosts:
340
            cmd.add_element("hosts", to_comma_list(hosts))
341
            if exclude_hosts is None:
342
                exclude_hosts = ['']
343
344
        if exclude_hosts:
345
            cmd.add_element("exclude_hosts", to_comma_list(exclude_hosts))
346
347
        if alive_test:
348
            if not isinstance(alive_test, AliveTest):
349
                raise InvalidArgumentType(
350
                    function=self.modify_target.__name__,
351
                    argument='alive_test',
352
                    arg_type=AliveTest.__name__,
353
                )
354
            cmd.add_element("alive_tests", alive_test.value)
355
356
        if ssh_credential_id:
357
            _xmlssh = cmd.add_element(
358
                "ssh_credential", attrs={"id": ssh_credential_id}
359
            )
360
361
            if ssh_credential_port:
362
                _xmlssh.add_element("port", str(ssh_credential_port))
363
364
        if smb_credential_id:
365
            cmd.add_element("smb_credential", attrs={"id": smb_credential_id})
366
367
        if esxi_credential_id:
368
            cmd.add_element("esxi_credential", attrs={"id": esxi_credential_id})
369
370
        if snmp_credential_id:
371
            cmd.add_element("snmp_credential", attrs={"id": snmp_credential_id})
372
373
        if reverse_lookup_only is not None:
374
            cmd.add_element("reverse_lookup_only", to_bool(reverse_lookup_only))
375
376
        if reverse_lookup_unify is not None:
377
            cmd.add_element(
378
                "reverse_lookup_unify", to_bool(reverse_lookup_unify)
379
            )
380
381
        if port_list_id:
382
            cmd.add_element("port_list", attrs={"id": port_list_id})
383
384
        return self._send_xml_command(cmd)
385