TasksMixin.get_tasks()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 37
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 18
nop 7
dl 0
loc 37
rs 9.5
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
20
from collections.abc import Mapping
21
from numbers import Integral
22
from typing import Any, List, Optional
23
24
from gvm.errors import InvalidArgument, InvalidArgumentType, RequiredArgument
25
from gvm.protocols.gmpv208.entities.hosts import HostsOrdering
26
from gvm.utils import add_filter, is_list_like, to_bool, to_comma_list
27
from gvm.xml import XmlCommand
28
29
30
class TasksMixin:
31
    def clone_task(self, task_id: str) -> Any:
32
        """Clone an existing task
33
34
        Arguments:
35
            task_id: UUID of existing task to clone from
36
37
        Returns:
38
            The response. See :py:meth:`send_command` for details.
39
        """
40
        if not task_id:
41
            raise RequiredArgument(
42
                function=self.clone_task.__name__, argument='task_id'
43
            )
44
45
        cmd = XmlCommand("create_task")
46
        cmd.add_element("copy", task_id)
47
        return self._send_xml_command(cmd)
48
49
    def create_container_task(
50
        self, name: str, *, comment: Optional[str] = None
51
    ) -> Any:
52
        """Create a new container task
53
54
        A container task is a "meta" task to import and view reports from other
55
        systems.
56
57
        Arguments:
58
            name: Name of the task
59
            comment: Comment for the task
60
61
        Returns:
62
            The response. See :py:meth:`send_command` for details.
63
        """
64
        if not name:
65
            raise RequiredArgument(
66
                function=self.create_container_task.__name__, argument='name'
67
            )
68
69
        cmd = XmlCommand("create_task")
70
        cmd.add_element("name", name)
71
        cmd.add_element("target", attrs={"id": "0"})
72
73
        if comment:
74
            cmd.add_element("comment", comment)
75
76
        return self._send_xml_command(cmd)
77
78 View Code Duplication
    def create_task(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
79
        self,
80
        name: str,
81
        config_id: str,
82
        target_id: str,
83
        scanner_id: str,
84
        *,
85
        alterable: Optional[bool] = None,
86
        hosts_ordering: Optional[HostsOrdering] = None,
87
        schedule_id: Optional[str] = None,
88
        alert_ids: Optional[List[str]] = None,
89
        comment: Optional[str] = None,
90
        schedule_periods: Optional[int] = None,
91
        observers: Optional[List[str]] = None,
92
        preferences: Optional[dict] = None,
93
    ) -> Any:
94
        if not name:
95
            raise RequiredArgument(
96
                function=self.create_task.__name__, argument='name'
97
            )
98
99
        if not config_id:
100
            raise RequiredArgument(
101
                function=self.create_task.__name__, argument='config_id'
102
            )
103
104
        if not target_id:
105
            raise RequiredArgument(
106
                function=self.create_task.__name__, argument='target_id'
107
            )
108
109
        if not scanner_id:
110
            raise RequiredArgument(
111
                function=self.create_task.__name__, argument='scanner_id'
112
            )
113
114
        # don't allow to create a container task with create_task
115
        if target_id == '0':
116
            raise InvalidArgument(
117
                function=self.create_task.__name__, argument='target_id'
118
            )
119
120
        cmd = XmlCommand("create_task")
121
        cmd.add_element("name", name)
122
        cmd.add_element("usage_type", "scan")
123
        cmd.add_element("config", attrs={"id": config_id})
124
        cmd.add_element("target", attrs={"id": target_id})
125
        cmd.add_element("scanner", attrs={"id": scanner_id})
126
127
        if comment:
128
            cmd.add_element("comment", comment)
129
130
        if alterable is not None:
131
            cmd.add_element("alterable", to_bool(alterable))
132
133
        if hosts_ordering:
134
            if not isinstance(hosts_ordering, HostsOrdering):
135
                raise InvalidArgumentType(
136
                    function=self.create_task.__name__,
137
                    argument='hosts_ordering',
138
                    arg_type=HostsOrdering.__name__,
139
                )
140
            cmd.add_element("hosts_ordering", hosts_ordering.value)
141
142
        if alert_ids is not None:
143
            if not is_list_like(alert_ids):
144
                raise InvalidArgumentType(
145
                    function=self.modify_task.__name__,
146
                    argument='alert_ids',
147
                    arg_type='list',
148
                )
149
150
            if not len(alert_ids) == 0:
151
                for alert in alert_ids:
152
                    cmd.add_element("alert", attrs={"id": str(alert)})
153
154
        if schedule_id:
155
            cmd.add_element("schedule", attrs={"id": schedule_id})
156
157
            if schedule_periods is not None:
158
                if (
159
                    not isinstance(schedule_periods, Integral)
160
                    or schedule_periods < 0
161
                ):
162
                    raise InvalidArgument(
163
                        "schedule_periods must be an integer greater or equal "
164
                        "than 0"
165
                    )
166
                cmd.add_element("schedule_periods", str(schedule_periods))
167
168
        if observers is not None:
169
            if not is_list_like(observers):
170
                raise InvalidArgumentType(
171
                    function=self.create_task.__name__,
172
                    argument='observers',
173
                    arg_type='list',
174
                )
175
176
            # gvmd splits by comma and space
177
            # gvmd tries to lookup each value as user name and afterwards as
178
            # user id. So both user name and user id are possible
179
            cmd.add_element("observers", to_comma_list(observers))
180
181
        if preferences is not None:
182
            if not isinstance(preferences, Mapping):
183
                raise InvalidArgumentType(
184
                    function=self.create_task.__name__,
185
                    argument='preferences',
186
                    arg_type=Mapping.__name__,
187
                )
188
189
            _xmlprefs = cmd.add_element("preferences")
190
            for pref_name, pref_value in preferences.items():
191
                _xmlpref = _xmlprefs.add_element("preference")
192
                _xmlpref.add_element("scanner_name", pref_name)
193
                _xmlpref.add_element("value", str(pref_value))
194
195
        return self._send_xml_command(cmd)
196
197
    def delete_task(
198
        self, task_id: str, *, ultimate: Optional[bool] = False
199
    ) -> Any:
200
        """Deletes an existing task
201
202
        Arguments:
203
            task_id: UUID of the task to be deleted.
204
            ultimate: Whether to remove entirely, or to the trashcan.
205
        """
206
        if not task_id:
207
            raise RequiredArgument(
208
                function=self.delete_task.__name__, argument='task_id'
209
            )
210
211
        cmd = XmlCommand("delete_task")
212
        cmd.set_attribute("task_id", task_id)
213
        cmd.set_attribute("ultimate", to_bool(ultimate))
214
215
        return self._send_xml_command(cmd)
216
217
    def get_tasks(
218
        self,
219
        *,
220
        filter_string: Optional[str] = None,
221
        filter_id: Optional[str] = None,
222
        trash: Optional[bool] = None,
223
        details: Optional[bool] = None,
224
        schedules_only: Optional[bool] = None,
225
    ) -> Any:
226
        """Request a list of tasks
227
228
        Arguments:
229
            filter_string: Filter term to use for the query
230
            filter_id: UUID of an existing filter to use for the query
231
            trash: Whether to get the trashcan tasks instead
232
            details: Whether to include full task details
233
            schedules_only: Whether to only include id, name and schedule
234
                details
235
236
        Returns:
237
            The response. See :py:meth:`send_command` for details.
238
        """
239
        cmd = XmlCommand("get_tasks")
240
        cmd.set_attribute("usage_type", "scan")
241
242
        add_filter(cmd, filter_string, filter_id)
243
244
        if trash is not None:
245
            cmd.set_attribute("trash", to_bool(trash))
246
247
        if details is not None:
248
            cmd.set_attribute("details", to_bool(details))
249
250
        if schedules_only is not None:
251
            cmd.set_attribute("schedules_only", to_bool(schedules_only))
252
253
        return self._send_xml_command(cmd)
254
255
    def get_task(self, task_id: str) -> Any:
256
        """Request a single task
257
258
        Arguments:
259
            task_id: UUID of an existing task
260
261
        Returns:
262
            The response. See :py:meth:`send_command` for details.
263
        """
264
        if not task_id:
265
            raise RequiredArgument(
266
                function=self.get_task.__name__, argument='task_id'
267
            )
268
269
        cmd = XmlCommand("get_tasks")
270
        cmd.set_attribute("task_id", task_id)
271
        cmd.set_attribute("usage_type", "scan")
272
273
        # for single entity always request all details
274
        cmd.set_attribute("details", "1")
275
        return self._send_xml_command(cmd)
276
277 View Code Duplication
    def modify_task(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
278
        self,
279
        task_id: str,
280
        *,
281
        name: Optional[str] = None,
282
        config_id: Optional[str] = None,
283
        target_id: Optional[str] = None,
284
        scanner_id: Optional[str] = None,
285
        alterable: Optional[bool] = None,
286
        hosts_ordering: Optional[HostsOrdering] = None,
287
        schedule_id: Optional[str] = None,
288
        schedule_periods: Optional[int] = None,
289
        comment: Optional[str] = None,
290
        alert_ids: Optional[List[str]] = None,
291
        observers: Optional[List[str]] = None,
292
        preferences: Optional[dict] = None,
293
    ) -> Any:
294
        """Modifies an existing task.
295
296
        Arguments:
297
            task_id: UUID of task to modify.
298
            name: The name of the task.
299
            config_id: UUID of scan config to use by the task
300
            target_id: UUID of target to be scanned
301
            scanner_id: UUID of scanner to use for scanning the target
302
            comment: The comment on the task.
303
            alert_ids: List of UUIDs for alerts to be applied to the task
304
            hosts_ordering: The order hosts are scanned in
305
            schedule_id: UUID of a schedule when the task should be run.
306
            schedule_periods: A limit to the number of times the task will be
307
                scheduled, or 0 for no limit.
308
            observers: List of names or ids of users which should be allowed to
309
                observe this task
310
            preferences: Name/Value pairs of scanner preferences.
311
312
        Returns:
313
            The response. See :py:meth:`send_command` for details.
314
        """
315
        if not task_id:
316
            raise RequiredArgument(
317
                function=self.modify_task.__name__, argument='task_id argument'
318
            )
319
320
        cmd = XmlCommand("modify_task")
321
        cmd.set_attribute("task_id", task_id)
322
323
        if name:
324
            cmd.add_element("name", name)
325
326
        if comment:
327
            cmd.add_element("comment", comment)
328
329
        if config_id:
330
            cmd.add_element("config", attrs={"id": config_id})
331
332
        if target_id:
333
            cmd.add_element("target", attrs={"id": target_id})
334
335
        if alterable is not None:
336
            cmd.add_element("alterable", to_bool(alterable))
337
338
        if hosts_ordering:
339
            if not isinstance(hosts_ordering, HostsOrdering):
340
                raise InvalidArgumentType(
341
                    function=self.modify_task.__name__,
342
                    argument='hosts_ordering',
343
                    arg_type=HostsOrdering.__name__,
344
                )
345
            cmd.add_element("hosts_ordering", hosts_ordering.value)
346
347
        if scanner_id:
348
            cmd.add_element("scanner", attrs={"id": scanner_id})
349
350
        if schedule_id:
351
            cmd.add_element("schedule", attrs={"id": schedule_id})
352
353
        if schedule_periods is not None:
354
            if (
355
                not isinstance(schedule_periods, Integral)
356
                or schedule_periods < 0
357
            ):
358
                raise InvalidArgument(
359
                    "schedule_periods must be an integer greater or equal "
360
                    "than 0"
361
                )
362
            cmd.add_element("schedule_periods", str(schedule_periods))
363
364
        if alert_ids is not None:
365
            if not is_list_like(alert_ids):
366
                raise InvalidArgumentType(
367
                    function=self.modify_task.__name__,
368
                    argument='alert_ids',
369
                    arg_type='list',
370
                )
371
372
            if len(alert_ids) == 0:
373
                cmd.add_element("alert", attrs={"id": "0"})
374
            else:
375
                for alert in alert_ids:
376
                    cmd.add_element("alert", attrs={"id": str(alert)})
377
378
        if observers is not None:
379
            if not is_list_like(observers):
380
                raise InvalidArgumentType(
381
                    function=self.modify_task.__name__,
382
                    argument='observers',
383
                    arg_type='list',
384
                )
385
386
            cmd.add_element("observers", to_comma_list(observers))
387
388
        if preferences is not None:
389
            if not isinstance(preferences, Mapping):
390
                raise InvalidArgumentType(
391
                    function=self.modify_task.__name__,
392
                    argument='preferences',
393
                    arg_type=Mapping.__name__,
394
                )
395
396
            _xmlprefs = cmd.add_element("preferences")
397
            for pref_name, pref_value in preferences.items():
398
                _xmlpref = _xmlprefs.add_element("preference")
399
                _xmlpref.add_element("scanner_name", pref_name)
400
                _xmlpref.add_element("value", str(pref_value))
401
402
        return self._send_xml_command(cmd)
403
404
    def move_task(self, task_id: str, *, slave_id: Optional[str] = None) -> Any:
405
        """Move an existing task to another GMP slave scanner or the master
406
407
        Arguments:
408
            task_id: UUID of the task to be moved
409
            slave_id: UUID of slave to reassign the task to, empty for master.
410
411
        Returns:
412
            The response. See :py:meth:`send_command` for details.
413
        """
414
        if not task_id:
415
            raise RequiredArgument(
416
                function=self.move_task.__name__, argument='task_id'
417
            )
418
419
        cmd = XmlCommand("move_task")
420
        cmd.set_attribute("task_id", task_id)
421
422
        if slave_id is not None:
423
            cmd.set_attribute("slave_id", slave_id)
424
425
        return self._send_xml_command(cmd)
426
427
    def start_task(self, task_id: str) -> Any:
428
        """Start an existing task
429
430
        Arguments:
431
            task_id: UUID of the task to be started
432
433
        Returns:
434
            The response. See :py:meth:`send_command` for details.
435
        """
436
        if not task_id:
437
            raise RequiredArgument(
438
                function=self.start_task.__name__, argument='task_id'
439
            )
440
441
        cmd = XmlCommand("start_task")
442
        cmd.set_attribute("task_id", task_id)
443
444
        return self._send_xml_command(cmd)
445
446
    def resume_task(self, task_id: str) -> Any:
447
        """Resume an existing stopped task
448
449
        Arguments:
450
            task_id: UUID of the task to be resumed
451
452
        Returns:
453
            The response. See :py:meth:`send_command` for details.
454
        """
455
        if not task_id:
456
            raise RequiredArgument(
457
                function=self.resume_task.__name__, argument='task_id'
458
            )
459
460
        cmd = XmlCommand("resume_task")
461
        cmd.set_attribute("task_id", task_id)
462
463
        return self._send_xml_command(cmd)
464
465
    def stop_task(self, task_id: str) -> Any:
466
        """Stop an existing running task
467
468
        Arguments:
469
            task_id: UUID of the task to be stopped
470
471
        Returns:
472
            The response. See :py:meth:`send_command` for details.
473
        """
474
        if not task_id:
475
            raise RequiredArgument(
476
                function=self.stop_task.__name__, argument='task_id'
477
            )
478
479
        cmd = XmlCommand("stop_task")
480
        cmd.set_attribute("task_id", task_id)
481
482
        return self._send_xml_command(cmd)
483