Passed
Push — master ( 8c9545...f42925 )
by Jaspar
01:33 queued 10s
created

gvm.protocols.gmpv214.gmpv214   D

Complexity

Total Complexity 58

Size/Duplication

Total Lines 484
Duplicated Lines 85.95 %

Importance

Changes 0
Metric Value
eloc 249
dl 416
loc 484
rs 4.5599
c 0
b 0
f 0
wmc 58

6 Methods

Rating   Name   Duplication   Size   Complexity  
A GmpV214Mixin.__init__() 0 10 1
D GmpV214Mixin.create_override() 89 89 12
C GmpV214Mixin.create_note() 73 73 10
D GmpV214Mixin.modify_user() 93 93 13
C GmpV214Mixin.modify_note() 72 72 10
D GmpV214Mixin.modify_override() 89 89 12

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like gvm.protocols.gmpv214.gmpv214 often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2020-2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
# pylint: disable=arguments-differ, redefined-builtin, too-many-lines
20
21
"""
22
Module for communication with gvmd in
23
`Greenbone Management Protocol version 21.04`_
24
25
.. _Greenbone Management Protocol version 21.04:
26
    https://docs.greenbone.net/API/GMP/gmp-21.04.html
27
"""
28
29
from typing import Any, List, Optional, Callable
30
import numbers
31
32
from gvm.utils import deprecation, to_comma_list, to_bool
33
from gvm.xml import XmlCommand
34
35
from gvm.connections import GvmConnection
36
from gvm.errors import RequiredArgument
37
38
from gvm.protocols.base import GvmProtocol
39
40
from . import types
41
from .types import *  # pylint: disable=unused-wildcard-import, wildcard-import
42
43
_EMPTY_POLICY_ID = '085569ce-73ed-11df-83c3-002264764cea'
44
45
PROTOCOL_VERSION = (21, 4)
46
47
Severity = numbers.Real
48
49
50
class GmpV214Mixin(GvmProtocol):
51
    types = types
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable types does not seem to be defined.
Loading history...
52
53
    def __init__(
54
        self,
55
        connection: GvmConnection,
56
        *,
57
        transform: Optional[Callable[[str], Any]] = None,
58
    ):
59
        super().__init__(connection, transform=transform)
60
61
        # Is authenticated on gvmd
62
        self._authenticated = False
63
64 View Code Duplication
    def create_note(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
65
        self,
66
        text: str,
67
        nvt_oid: str,
68
        *,
69
        days_active: Optional[int] = None,
70
        hosts: Optional[List[str]] = None,
71
        port: Optional[int] = None,
72
        result_id: Optional[str] = None,
73
        severity: Optional[Severity] = None,
74
        task_id: Optional[str] = None,
75
        threat: Optional[SeverityLevel] = None,
76
    ) -> Any:
77
        """Create a new note
78
79
        Arguments:
80
            text: Text of the new note
81
            nvt_id: OID of the nvt to which note applies
82
            days_active: Days note will be active. -1 on
83
                always, 0 off
84
            hosts: A list of hosts addresses
85
            port: Port to which the note applies
86
            result_id: UUID of a result to which note applies
87
            severity: Severity to which note applies
88
            task_id: UUID of task to which note applies
89
            threat: Severity level to which note applies. Will be converted to
90
                severity.
91
92
        Returns:
93
            The response. See :py:meth:`send_command` for details.
94
        """
95
        if not text:
96
            raise RequiredArgument(
97
                function=self.create_note.__name__, argument='text'
98
            )
99
100
        if not nvt_oid:
101
            raise RequiredArgument(
102
                function=self.create_note.__name__, argument='nvt_oid'
103
            )
104
105
        cmd = XmlCommand("create_note")
106
        cmd.add_element("text", text)
107
        cmd.add_element("nvt", attrs={"oid": nvt_oid})
108
109
        if days_active is not None:
110
            cmd.add_element("active", str(days_active))
111
112
        if hosts:
113
            cmd.add_element("hosts", to_comma_list(hosts))
114
115
        if port:
116
            cmd.add_element("port", str(port))
117
118
        if result_id:
119
            cmd.add_element("result", attrs={"id": result_id})
120
121
        if severity:
122
            cmd.add_element("severity", str(severity))
123
124
        if task_id:
125
            cmd.add_element("task", attrs={"id": task_id})
126
127
        if threat is not None:
128
            deprecation(
129
                "The threat parameter has been removed in GMP"
130
                " version {}{}".format(
131
                    self.get_protocol_version()[0],
132
                    self.get_protocol_version()[1],
133
                )
134
            )
135
136
        return self._send_xml_command(cmd)
137
138 View Code Duplication
    def create_override(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
139
        self,
140
        text: str,
141
        nvt_oid: str,
142
        *,
143
        days_active: Optional[int] = None,
144
        hosts: Optional[List[str]] = None,
145
        port: Optional[int] = None,
146
        result_id: Optional[str] = None,
147
        severity: Optional[Severity] = None,
148
        new_severity: Optional[Severity] = None,
149
        task_id: Optional[str] = None,
150
        threat: Optional[SeverityLevel] = None,
151
        new_threat: Optional[SeverityLevel] = None,
152
    ) -> Any:
153
        """Create a new override
154
155
        Arguments:
156
            text: Text of the new override
157
            nvt_id: OID of the nvt to which override applies
158
            days_active: Days override will be active. -1 on always, 0 off
159
            hosts: A list of host addresses
160
            port: Port to which the override applies
161
            result_id: UUID of a result to which override applies
162
            severity: Severity to which override applies
163
            new_severity: New severity for result
164
            task_id: UUID of task to which override applies
165
            threat: Severity level to which override applies. Will be converted
166
                to severity.
167
            new_threat: New severity level for results. Will be converted to
168
                new_severity.
169
170
        Returns:
171
            The response. See :py:meth:`send_command` for details.
172
        """
173
        if not text:
174
            raise RequiredArgument(
175
                function=self.create_override.__name__, argument='text'
176
            )
177
178
        if not nvt_oid:
179
            raise RequiredArgument(
180
                function=self.create_override.__name__, argument='nvt_oid'
181
            )
182
183
        cmd = XmlCommand("create_override")
184
        cmd.add_element("text", text)
185
        cmd.add_element("nvt", attrs={"oid": nvt_oid})
186
187
        if days_active is not None:
188
            cmd.add_element("active", str(days_active))
189
190
        if hosts:
191
            cmd.add_element("hosts", to_comma_list(hosts))
192
193
        if port:
194
            cmd.add_element("port", str(port))
195
196
        if result_id:
197
            cmd.add_element("result", attrs={"id": result_id})
198
199
        if severity:
200
            cmd.add_element("severity", str(severity))
201
202
        if new_severity:
203
            cmd.add_element("new_severity", str(new_severity))
204
205
        if task_id:
206
            cmd.add_element("task", attrs={"id": task_id})
207
208
        if threat is not None:
209
            deprecation(
210
                "The threat parameter has been removed in GMP"
211
                " version {}{}".format(
212
                    self.get_protocol_version()[0],
213
                    self.get_protocol_version()[1],
214
                )
215
            )
216
217
        if new_threat is not None:
218
            deprecation(
219
                "The new_threat parameter has been removed in GMP"
220
                " version {}{}".format(
221
                    self.get_protocol_version()[0],
222
                    self.get_protocol_version()[1],
223
                )
224
            )
225
226
        return self._send_xml_command(cmd)
227
228 View Code Duplication
    def modify_note(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
229
        self,
230
        note_id: str,
231
        text: str,
232
        *,
233
        days_active: Optional[int] = None,
234
        hosts: Optional[List[str]] = None,
235
        port: Optional[int] = None,
236
        result_id: Optional[str] = None,
237
        severity: Optional[Severity] = None,
238
        task_id: Optional[str] = None,
239
        threat: Optional[SeverityLevel] = None,
240
    ) -> Any:
241
        """Modifies an existing note.
242
243
        Arguments:
244
            note_id: UUID of note to modify.
245
            text: The text of the note.
246
            days_active: Days note will be active. -1 on always, 0 off.
247
            hosts: A list of hosts addresses
248
            port: Port to which note applies.
249
            result_id: Result to which note applies.
250
            severity: Severity to which note applies.
251
            task_id: Task to which note applies.
252
            threat: Threat level to which note applies. Will be converted to
253
                severity.
254
255
        Returns:
256
            The response. See :py:meth:`send_command` for details.
257
        """
258
        if not note_id:
259
            raise RequiredArgument(
260
                function=self.modify_note.__name__, argument='note_id'
261
            )
262
263
        if not text:
264
            raise RequiredArgument(
265
                function=self.modify_note.__name__, argument='text'
266
            )
267
268
        cmd = XmlCommand("modify_note")
269
        cmd.set_attribute("note_id", note_id)
270
        cmd.add_element("text", text)
271
272
        if days_active is not None:
273
            cmd.add_element("active", str(days_active))
274
275
        if hosts:
276
            cmd.add_element("hosts", to_comma_list(hosts))
277
278
        if port:
279
            cmd.add_element("port", str(port))
280
281
        if result_id:
282
            cmd.add_element("result", attrs={"id": result_id})
283
284
        if severity:
285
            cmd.add_element("severity", str(severity))
286
287
        if task_id:
288
            cmd.add_element("task", attrs={"id": task_id})
289
290
        if threat is not None:
291
            deprecation(
292
                "The threat parameter has been removed in GMP"
293
                " version {}{}".format(
294
                    self.get_protocol_version()[0],
295
                    self.get_protocol_version()[1],
296
                )
297
            )
298
299
        return self._send_xml_command(cmd)
300
301 View Code Duplication
    def modify_override(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
302
        self,
303
        override_id: str,
304
        text: str,
305
        *,
306
        days_active: Optional[int] = None,
307
        hosts: Optional[List[str]] = None,
308
        port: Optional[int] = None,
309
        result_id: Optional[str] = None,
310
        severity: Optional[Severity] = None,
311
        new_severity: Optional[Severity] = None,
312
        task_id: Optional[str] = None,
313
        threat: Optional[SeverityLevel] = None,
314
        new_threat: Optional[SeverityLevel] = None,
315
    ) -> Any:
316
        """Modifies an existing override.
317
318
        Arguments:
319
            override_id: UUID of override to modify.
320
            text: The text of the override.
321
            days_active: Days override will be active. -1 on always,
322
                0 off.
323
            hosts: A list of host addresses
324
            port: Port to which override applies.
325
            result_id: Result to which override applies.
326
            severity: Severity to which override applies.
327
            new_severity: New severity score for result.
328
            task_id: Task to which override applies.
329
            threat: Threat level to which override applies.
330
                Will be converted to severity.
331
            new_threat: New threat level for results. Will be converted to
332
                new_severity.
333
334
        Returns:
335
            The response. See :py:meth:`send_command` for details.
336
        """
337
        if not override_id:
338
            raise RequiredArgument(
339
                function=self.modify_override.__name__, argument='override_id'
340
            )
341
        if not text:
342
            raise RequiredArgument(
343
                function=self.modify_override.__name__, argument='text'
344
            )
345
346
        cmd = XmlCommand("modify_override")
347
        cmd.set_attribute("override_id", override_id)
348
        cmd.add_element("text", text)
349
350
        if days_active is not None:
351
            cmd.add_element("active", str(days_active))
352
353
        if hosts:
354
            cmd.add_element("hosts", to_comma_list(hosts))
355
356
        if port:
357
            cmd.add_element("port", str(port))
358
359
        if result_id:
360
            cmd.add_element("result", attrs={"id": result_id})
361
362
        if severity:
363
            cmd.add_element("severity", str(severity))
364
365
        if new_severity:
366
            cmd.add_element("new_severity", str(new_severity))
367
368
        if task_id:
369
            cmd.add_element("task", attrs={"id": task_id})
370
371
        if threat is not None:
372
            deprecation(
373
                "The threat parameter has been removed in GMP"
374
                " version {}{}".format(
375
                    self.get_protocol_version()[0],
376
                    self.get_protocol_version()[1],
377
                )
378
            )
379
380
        if new_threat is not None:
381
            deprecation(
382
                "The new_threat parameter has been removed in GMP"
383
                " version {}{}".format(
384
                    self.get_protocol_version()[0],
385
                    self.get_protocol_version()[1],
386
                )
387
            )
388
389
        return self._send_xml_command(cmd)
390
391 View Code Duplication
    def modify_user(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
392
        self,
393
        user_id: str = None,
394
        *,
395
        name: Optional[str] = None,
396
        comment: Optional[str] = None,
397
        password: Optional[str] = None,
398
        auth_source: Optional[UserAuthType] = None,
399
        role_ids: Optional[List[str]] = None,
400
        hosts: Optional[List[str]] = None,
401
        hosts_allow: Optional[bool] = False,
402
        ifaces: Optional[List[str]] = None,
403
        ifaces_allow: Optional[bool] = False,
404
        group_ids: Optional[List[str]] = None,
405
    ) -> Any:
406
407
        """Modifies an existing user. Most of the fields need to be supplied
408
        for changing a single field even if no change is wanted for those.
409
        Else empty values are inserted for the missing fields instead.
410
411
        Arguments:
412
            user_id: UUID of the user to be modified.
413
            name: The new name for the user.
414
            comment: Comment on the user.
415
            password: The password for the user.
416
            auth_source: Source allowed for authentication for this user.
417
            roles_id: List of roles UUIDs for the user.
418
            hosts: User access rules: List of hosts.
419
            hosts_allow: Defines how the hosts list is to be interpreted.
420
                If False (default) the list is treated as a deny list.
421
                All hosts are allowed by default except those provided by
422
                the hosts parameter. If True the list is treated as a
423
                allow list. All hosts are denied by default except those
424
                provided by the hosts parameter.
425
            ifaces: User access rules: List of ifaces.
426
            ifaces_allow: Defines how the ifaces list is to be interpreted.
427
                If False (default) the list is treated as a deny list.
428
                All ifaces are allowed by default except those provided by
429
                the ifaces parameter. If True the list is treated as a
430
                allow list. All ifaces are denied by default except those
431
                provided by the ifaces parameter.
432
            group_ids: List of group UUIDs for the user.
433
434
        Returns:
435
            The response. See :py:meth:`send_command` for details.
436
        """
437
        if not user_id:
438
            raise RequiredArgument(
439
                function=self.modify_user.__name__, argument='user_id'
440
            )
441
442
        cmd = XmlCommand("modify_user")
443
444
        if user_id:
445
            cmd.set_attribute("user_id", user_id)
446
447
        if name:
448
            cmd.add_element("new_name", name)
449
450
        if role_ids:
451
            for role in role_ids:
452
                cmd.add_element("role", attrs={"id": role})
453
454
        if hosts:
455
            cmd.add_element(
456
                "hosts",
457
                to_comma_list(hosts),
458
                attrs={"allow": to_bool(hosts_allow)},
459
            )
460
461
        if ifaces:
462
            cmd.add_element(
463
                "ifaces",
464
                to_comma_list(ifaces),
465
                attrs={"allow": to_bool(ifaces_allow)},
466
            )
467
468
        if comment:
469
            cmd.add_element("comment", comment)
470
471
        if password:
472
            cmd.add_element("password", password)
473
474
        if auth_source:
475
            _xmlauthsrc = cmd.add_element("sources")
476
            _xmlauthsrc.add_element("source", auth_source.value)
477
478
        if group_ids:
479
            _xmlgroups = cmd.add_element("groups")
480
            for group_id in group_ids:
481
                _xmlgroups.add_element("group", attrs={"id": group_id})
482
483
        return self._send_xml_command(cmd)
484