Completed
Push — master ( 802093...1e243f )
by Jaspar
29s queued 13s
created

gvm.protocols.gmpv208.entities.tasks   F

Complexity

Total Complexity 67

Size/Duplication

Total Lines 480
Duplicated Lines 57.29 %

Importance

Changes 0
Metric Value
eloc 252
dl 275
loc 480
rs 3.04
c 0
b 0
f 0
wmc 67

11 Methods

Rating   Name   Duplication   Size   Complexity  
A TasksMixin.get_tasks() 37 37 4
F TasksMixin.modify_task() 126 126 23
A TasksMixin.clone_task() 0 17 2
A TasksMixin.delete_task() 0 19 2
F TasksMixin.create_task() 112 112 22
A TasksMixin.start_task() 0 18 2
A TasksMixin.move_task() 0 22 3
A TasksMixin.create_container_task() 0 28 3
A TasksMixin.stop_task() 0 18 2
A TasksMixin.get_task() 0 21 2
A TasksMixin.resume_task() 0 18 2

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like gvm.protocols.gmpv208.entities.tasks often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
# pylint:  disable=redefined-builtin
20
21
from collections.abc import Mapping
22
from numbers import Integral
23
from typing import Any, List, Optional
24
25
from gvm.errors import InvalidArgument, InvalidArgumentType, RequiredArgument
26
from gvm.protocols.gmpv208.types import (
27
    HostsOrdering,
28
)  # if I use latest, I get circular import :/
29
from gvm.utils import add_filter, is_list_like, to_bool, to_comma_list
30
from gvm.xml import XmlCommand
31
32
33
class TasksMixin:
34
    def clone_task(self, task_id: str) -> Any:
35
        """Clone an existing task
36
37
        Arguments:
38
            task_id: UUID of existing task to clone from
39
40
        Returns:
41
            The response. See :py:meth:`send_command` for details.
42
        """
43
        if not task_id:
44
            raise RequiredArgument(
45
                function=self.clone_task.__name__, argument='task_id'
46
            )
47
48
        cmd = XmlCommand("create_task")
49
        cmd.add_element("copy", task_id)
50
        return self._send_xml_command(cmd)
51
52
    def create_container_task(
53
        self, name: str, *, comment: Optional[str] = None
54
    ) -> Any:
55
        """Create a new container task
56
57
        A container task is a "meta" task to import and view reports from other
58
        systems.
59
60
        Arguments:
61
            name: Name of the task
62
            comment: Comment for the task
63
64
        Returns:
65
            The response. See :py:meth:`send_command` for details.
66
        """
67
        if not name:
68
            raise RequiredArgument(
69
                function=self.create_container_task.__name__, argument='name'
70
            )
71
72
        cmd = XmlCommand("create_task")
73
        cmd.add_element("name", name)
74
        cmd.add_element("target", attrs={"id": "0"})
75
76
        if comment:
77
            cmd.add_element("comment", comment)
78
79
        return self._send_xml_command(cmd)
80
81 View Code Duplication
    def create_task(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
82
        self,
83
        name: str,
84
        config_id: str,
85
        target_id: str,
86
        scanner_id: str,
87
        *,
88
        alterable: Optional[bool] = None,
89
        hosts_ordering: Optional[HostsOrdering] = None,
90
        schedule_id: Optional[str] = None,
91
        alert_ids: Optional[List[str]] = None,
92
        comment: Optional[str] = None,
93
        schedule_periods: Optional[int] = None,
94
        observers: Optional[List[str]] = None,
95
        preferences: Optional[dict] = None,
96
    ) -> Any:
97
        if not name:
98
            raise RequiredArgument(
99
                function=self.create_task.__name__, argument='name'
100
            )
101
102
        if not config_id:
103
            raise RequiredArgument(
104
                function=self.create_task.__name__, argument='config_id'
105
            )
106
107
        if not target_id:
108
            raise RequiredArgument(
109
                function=self.create_task.__name__, argument='target_id'
110
            )
111
112
        if not scanner_id:
113
            raise RequiredArgument(
114
                function=self.create_task.__name__, argument='scanner_id'
115
            )
116
117
        # don't allow to create a container task with create_task
118
        if target_id == '0':
119
            raise InvalidArgument(
120
                function=self.create_task.__name__, argument='target_id'
121
            )
122
123
        cmd = XmlCommand("create_task")
124
        cmd.add_element("name", name)
125
        cmd.add_element("usage_type", "scan")
126
        cmd.add_element("config", attrs={"id": config_id})
127
        cmd.add_element("target", attrs={"id": target_id})
128
        cmd.add_element("scanner", attrs={"id": scanner_id})
129
130
        if comment:
131
            cmd.add_element("comment", comment)
132
133
        if alterable is not None:
134
            cmd.add_element("alterable", to_bool(alterable))
135
136
        if hosts_ordering:
137
            if not isinstance(hosts_ordering, self.types.HostsOrdering):
138
                raise InvalidArgumentType(
139
                    function=self.create_task.__name__,
140
                    argument='hosts_ordering',
141
                    arg_type=HostsOrdering.__name__,
142
                )
143
            cmd.add_element("hosts_ordering", hosts_ordering.value)
144
145
        if alert_ids:
146
            if is_list_like(alert_ids):
147
                # parse all given alert id's
148
                for alert in alert_ids:
149
                    cmd.add_element("alert", attrs={"id": str(alert)})
150
151
        if schedule_id:
152
            cmd.add_element("schedule", attrs={"id": schedule_id})
153
154
            if schedule_periods is not None:
155
                if (
156
                    not isinstance(schedule_periods, Integral)
157
                    or schedule_periods < 0
158
                ):
159
                    raise InvalidArgument(
160
                        "schedule_periods must be an integer greater or equal "
161
                        "than 0"
162
                    )
163
                cmd.add_element("schedule_periods", str(schedule_periods))
164
165
        if observers is not None:
166
            if not is_list_like(observers):
167
                raise InvalidArgumentType(
168
                    function=self.create_task.__name__,
169
                    argument='observers',
170
                    arg_type='list',
171
                )
172
173
            # gvmd splits by comma and space
174
            # gvmd tries to lookup each value as user name and afterwards as
175
            # user id. So both user name and user id are possible
176
            cmd.add_element("observers", to_comma_list(observers))
177
178
        if preferences is not None:
179
            if not isinstance(preferences, Mapping):
180
                raise InvalidArgumentType(
181
                    function=self.create_task.__name__,
182
                    argument='preferences',
183
                    arg_type=Mapping.__name__,
184
                )
185
186
            _xmlprefs = cmd.add_element("preferences")
187
            for pref_name, pref_value in preferences.items():
188
                _xmlpref = _xmlprefs.add_element("preference")
189
                _xmlpref.add_element("scanner_name", pref_name)
190
                _xmlpref.add_element("value", str(pref_value))
191
192
        return self._send_xml_command(cmd)
193
194
    def delete_task(
195
        self, task_id: str, *, ultimate: Optional[bool] = False
196
    ) -> Any:
197
        """Deletes an existing task
198
199
        Arguments:
200
            task_id: UUID of the task to be deleted.
201
            ultimate: Whether to remove entirely, or to the trashcan.
202
        """
203
        if not task_id:
204
            raise RequiredArgument(
205
                function=self.delete_task.__name__, argument='task_id'
206
            )
207
208
        cmd = XmlCommand("delete_task")
209
        cmd.set_attribute("task_id", task_id)
210
        cmd.set_attribute("ultimate", to_bool(ultimate))
211
212
        return self._send_xml_command(cmd)
213
214 View Code Duplication
    def get_tasks(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
215
        self,
216
        *,
217
        filter: Optional[str] = None,
218
        filter_id: Optional[str] = None,
219
        trash: Optional[bool] = None,
220
        details: Optional[bool] = None,
221
        schedules_only: Optional[bool] = None,
222
    ) -> Any:
223
        """Request a list of tasks
224
225
        Arguments:
226
            filter: Filter term to use for the query
227
            filter_id: UUID of an existing filter to use for the query
228
            trash: Whether to get the trashcan tasks instead
229
            details: Whether to include full task details
230
            schedules_only: Whether to only include id, name and schedule
231
                details
232
233
        Returns:
234
            The response. See :py:meth:`send_command` for details.
235
        """
236
        cmd = XmlCommand("get_tasks")
237
        cmd.set_attribute("usage_type", "scan")
238
239
        add_filter(cmd, filter, filter_id)
240
241
        if trash is not None:
242
            cmd.set_attribute("trash", to_bool(trash))
243
244
        if details is not None:
245
            cmd.set_attribute("details", to_bool(details))
246
247
        if schedules_only is not None:
248
            cmd.set_attribute("schedules_only", to_bool(schedules_only))
249
250
        return self._send_xml_command(cmd)
251
252
    def get_task(self, task_id: str) -> Any:
253
        """Request a single task
254
255
        Arguments:
256
            task_id: UUID of an existing task
257
258
        Returns:
259
            The response. See :py:meth:`send_command` for details.
260
        """
261
        if not task_id:
262
            raise RequiredArgument(
263
                function=self.get_task.__name__, argument='task_id'
264
            )
265
266
        cmd = XmlCommand("get_tasks")
267
        cmd.set_attribute("task_id", task_id)
268
        cmd.set_attribute("usage_type", "scan")
269
270
        # for single entity always request all details
271
        cmd.set_attribute("details", "1")
272
        return self._send_xml_command(cmd)
273
274 View Code Duplication
    def modify_task(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
275
        self,
276
        task_id: str,
277
        *,
278
        name: Optional[str] = None,
279
        config_id: Optional[str] = None,
280
        target_id: Optional[str] = None,
281
        scanner_id: Optional[str] = None,
282
        alterable: Optional[bool] = None,
283
        hosts_ordering: Optional[HostsOrdering] = None,
284
        schedule_id: Optional[str] = None,
285
        schedule_periods: Optional[int] = None,
286
        comment: Optional[str] = None,
287
        alert_ids: Optional[List[str]] = None,
288
        observers: Optional[List[str]] = None,
289
        preferences: Optional[dict] = None,
290
    ) -> Any:
291
        """Modifies an existing task.
292
293
        Arguments:
294
            task_id: UUID of task to modify.
295
            name: The name of the task.
296
            config_id: UUID of scan config to use by the task
297
            target_id: UUID of target to be scanned
298
            scanner_id: UUID of scanner to use for scanning the target
299
            comment: The comment on the task.
300
            alert_ids: List of UUIDs for alerts to be applied to the task
301
            hosts_ordering: The order hosts are scanned in
302
            schedule_id: UUID of a schedule when the task should be run.
303
            schedule_periods: A limit to the number of times the task will be
304
                scheduled, or 0 for no limit.
305
            observers: List of names or ids of users which should be allowed to
306
                observe this task
307
            preferences: Name/Value pairs of scanner preferences.
308
309
        Returns:
310
            The response. See :py:meth:`send_command` for details.
311
        """
312
        if not task_id:
313
            raise RequiredArgument(
314
                function=self.modify_task.__name__, argument='task_id argument'
315
            )
316
317
        cmd = XmlCommand("modify_task")
318
        cmd.set_attribute("task_id", task_id)
319
320
        if name:
321
            cmd.add_element("name", name)
322
323
        if comment:
324
            cmd.add_element("comment", comment)
325
326
        if config_id:
327
            cmd.add_element("config", attrs={"id": config_id})
328
329
        if target_id:
330
            cmd.add_element("target", attrs={"id": target_id})
331
332
        if alterable is not None:
333
            cmd.add_element("alterable", to_bool(alterable))
334
335
        if hosts_ordering:
336
            if not isinstance(hosts_ordering, HostsOrdering):
337
                raise InvalidArgumentType(
338
                    function=self.modify_task.__name__,
339
                    argument='hosts_ordering',
340
                    arg_type=HostsOrdering.__name__,
341
                )
342
            cmd.add_element("hosts_ordering", hosts_ordering.value)
343
344
        if scanner_id:
345
            cmd.add_element("scanner", attrs={"id": scanner_id})
346
347
        if schedule_id:
348
            cmd.add_element("schedule", attrs={"id": schedule_id})
349
350
        if schedule_periods is not None:
351
            if (
352
                not isinstance(schedule_periods, Integral)
353
                or schedule_periods < 0
354
            ):
355
                raise InvalidArgument(
356
                    "schedule_periods must be an integer greater or equal "
357
                    "than 0"
358
                )
359
            cmd.add_element("schedule_periods", str(schedule_periods))
360
361
        if alert_ids is not None:
362
            if not is_list_like(alert_ids):
363
                raise InvalidArgumentType(
364
                    function=self.modify_task.__name__,
365
                    argument='alert_ids',
366
                    arg_type='list',
367
                )
368
369
            if len(alert_ids) == 0:
370
                cmd.add_element("alert", attrs={"id": "0"})
371
            else:
372
                for alert in alert_ids:
373
                    cmd.add_element("alert", attrs={"id": str(alert)})
374
375
        if observers is not None:
376
            if not is_list_like(observers):
377
                raise InvalidArgumentType(
378
                    function=self.modify_task.__name__,
379
                    argument='observers',
380
                    arg_type='list',
381
                )
382
383
            cmd.add_element("observers", to_comma_list(observers))
384
385
        if preferences is not None:
386
            if not isinstance(preferences, Mapping):
387
                raise InvalidArgumentType(
388
                    function=self.modify_task.__name__,
389
                    argument='preferences',
390
                    arg_type=Mapping.__name__,
391
                )
392
393
            _xmlprefs = cmd.add_element("preferences")
394
            for pref_name, pref_value in preferences.items():
395
                _xmlpref = _xmlprefs.add_element("preference")
396
                _xmlpref.add_element("scanner_name", pref_name)
397
                _xmlpref.add_element("value", str(pref_value))
398
399
        return self._send_xml_command(cmd)
400
401
    def move_task(self, task_id: str, *, slave_id: Optional[str] = None) -> Any:
402
        """Move an existing task to another GMP slave scanner or the master
403
404
        Arguments:
405
            task_id: UUID of the task to be moved
406
            slave_id: UUID of slave to reassign the task to, empty for master.
407
408
        Returns:
409
            The response. See :py:meth:`send_command` for details.
410
        """
411
        if not task_id:
412
            raise RequiredArgument(
413
                function=self.move_task.__name__, argument='task_id'
414
            )
415
416
        cmd = XmlCommand("move_task")
417
        cmd.set_attribute("task_id", task_id)
418
419
        if slave_id is not None:
420
            cmd.set_attribute("slave_id", slave_id)
421
422
        return self._send_xml_command(cmd)
423
424
    def start_task(self, task_id: str) -> Any:
425
        """Start an existing task
426
427
        Arguments:
428
            task_id: UUID of the task to be started
429
430
        Returns:
431
            The response. See :py:meth:`send_command` for details.
432
        """
433
        if not task_id:
434
            raise RequiredArgument(
435
                function=self.start_task.__name__, argument='task_id'
436
            )
437
438
        cmd = XmlCommand("start_task")
439
        cmd.set_attribute("task_id", task_id)
440
441
        return self._send_xml_command(cmd)
442
443
    def resume_task(self, task_id: str) -> Any:
444
        """Resume an existing stopped task
445
446
        Arguments:
447
            task_id: UUID of the task to be resumed
448
449
        Returns:
450
            The response. See :py:meth:`send_command` for details.
451
        """
452
        if not task_id:
453
            raise RequiredArgument(
454
                function=self.resume_task.__name__, argument='task_id'
455
            )
456
457
        cmd = XmlCommand("resume_task")
458
        cmd.set_attribute("task_id", task_id)
459
460
        return self._send_xml_command(cmd)
461
462
    def stop_task(self, task_id: str) -> Any:
463
        """Stop an existing running task
464
465
        Arguments:
466
            task_id: UUID of the task to be stopped
467
468
        Returns:
469
            The response. See :py:meth:`send_command` for details.
470
        """
471
        if not task_id:
472
            raise RequiredArgument(
473
                function=self.stop_task.__name__, argument='task_id'
474
            )
475
476
        cmd = XmlCommand("stop_task")
477
        cmd.set_attribute("task_id", task_id)
478
479
        return self._send_xml_command(cmd)
480