AuditsMixin.resume_audit()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 18
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 7
nop 2
dl 0
loc 18
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
20
from collections.abc import Mapping
21
from numbers import Integral
22
from typing import Any, List, Optional
23
24
from gvm.errors import InvalidArgument, InvalidArgumentType, RequiredArgument
25
from gvm.protocols.gmpv208.entities.hosts import HostsOrdering
26
from gvm.utils import add_filter, is_list_like, to_bool, to_comma_list
27
from gvm.xml import XmlCommand
28
29
30
class AuditsMixin:
31
    def clone_audit(self, audit_id: str) -> Any:
32
        """Clone an existing audit
33
34
        Arguments:
35
            audit_id: UUID of existing audit to clone from
36
37
        Returns:
38
            The response. See :py:meth:`send_command` for details.
39
        """
40
        if not audit_id:
41
            raise RequiredArgument(
42
                function=self.clone_audit.__name__, argument='audit_id'
43
            )
44
45
        cmd = XmlCommand("create_task")
46
        cmd.add_element("copy", audit_id)
47
        return self._send_xml_command(cmd)
48
49 View Code Duplication
    def create_audit(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
50
        self,
51
        name: str,
52
        policy_id: str,
53
        target_id: str,
54
        scanner_id: str,
55
        *,
56
        alterable: Optional[bool] = None,
57
        hosts_ordering: Optional[HostsOrdering] = None,
58
        schedule_id: Optional[str] = None,
59
        alert_ids: Optional[List[str]] = None,
60
        comment: Optional[str] = None,
61
        schedule_periods: Optional[int] = None,
62
        observers: Optional[List[str]] = None,
63
        preferences: Optional[dict] = None,
64
    ) -> Any:
65
        """Create a new audit task
66
67
        Arguments:
68
            name: Name of the new audit
69
            policy_id: UUID of policy to use by the audit
70
            target_id: UUID of target to be scanned
71
            scanner_id: UUID of scanner to use for scanning the target
72
            comment: Comment for the audit
73
            alterable: Whether the task should be alterable
74
            alert_ids: List of UUIDs for alerts to be applied to the audit
75
            hosts_ordering: The order hosts are scanned in
76
            schedule_id: UUID of a schedule when the audit should be run.
77
            schedule_periods: A limit to the number of times the audit will be
78
                scheduled, or 0 for no limit
79
            observers: List of names or ids of users which should be allowed to
80
                observe this audit
81
            preferences: Name/Value pairs of scanner preferences.
82
83
        Returns:
84
            The response. See :py:meth:`send_command` for details.
85
        """
86
87
        if not name:
88
            raise RequiredArgument(
89
                function=self.create_audit.__name__, argument='name'
90
            )
91
        if not policy_id:
92
            raise RequiredArgument(
93
                function=self.create_audit.__name__, argument='policy_id'
94
            )
95
96
        if not target_id:
97
            raise RequiredArgument(
98
                function=self.create_audit.__name__, argument='target_id'
99
            )
100
101
        if not scanner_id:
102
            raise RequiredArgument(
103
                function=self.create_audit.__name__, argument='scanner_id'
104
            )
105
106
        # don't allow to create a container task with create_task
107
        if target_id == '0':
108
            raise InvalidArgument(
109
                function=self.create_audit.__name__, argument='target_id'
110
            )
111
112
        cmd = XmlCommand("create_task")
113
        cmd.add_element("name", name)
114
        cmd.add_element("usage_type", "audit")
115
        cmd.add_element("config", attrs={"id": policy_id})
116
        cmd.add_element("target", attrs={"id": target_id})
117
        cmd.add_element("scanner", attrs={"id": scanner_id})
118
119
        if comment:
120
            cmd.add_element("comment", comment)
121
122
        if alterable is not None:
123
            cmd.add_element("alterable", to_bool(alterable))
124
125
        if hosts_ordering:
126
            if not isinstance(hosts_ordering, HostsOrdering):
127
                raise InvalidArgumentType(
128
                    function=self.create_audit.__name__,
129
                    argument='hosts_ordering',
130
                    arg_type=HostsOrdering.__name__,
131
                )
132
            cmd.add_element("hosts_ordering", hosts_ordering.value)
133
134
        if alert_ids is not None:
135
            if not is_list_like(alert_ids):
136
                raise InvalidArgumentType(
137
                    function=self.modify_task.__name__,
138
                    argument='alert_ids',
139
                    arg_type='list',
140
                )
141
142
            if not len(alert_ids) == 0:
143
                for alert in alert_ids:
144
                    cmd.add_element("alert", attrs={"id": str(alert)})
145
146
        if schedule_id:
147
            cmd.add_element("schedule", attrs={"id": schedule_id})
148
149
            if schedule_periods is not None:
150
                if (
151
                    not isinstance(schedule_periods, Integral)
152
                    or schedule_periods < 0
153
                ):
154
                    raise InvalidArgument(
155
                        "schedule_periods must be an integer greater or equal "
156
                        "than 0"
157
                    )
158
                cmd.add_element("schedule_periods", str(schedule_periods))
159
160
        if observers is not None:
161
            if not is_list_like(observers):
162
                raise InvalidArgumentType(
163
                    function=self.create_audit.__name__,
164
                    argument='observers',
165
                    arg_type='list',
166
                )
167
168
            # gvmd splits by comma and space
169
            # gvmd tries to lookup each value as user name and afterwards as
170
            # user id. So both user name and user id are possible
171
            cmd.add_element("observers", to_comma_list(observers))
172
173
        if preferences is not None:
174
            if not isinstance(preferences, Mapping):
175
                raise InvalidArgumentType(
176
                    function=self.create_audit.__name__,
177
                    argument='preferences',
178
                    arg_type=Mapping.__name__,
179
                )
180
181
            _xmlprefs = cmd.add_element("preferences")
182
            for pref_name, pref_value in preferences.items():
183
                _xmlpref = _xmlprefs.add_element("preference")
184
                _xmlpref.add_element("scanner_name", pref_name)
185
                _xmlpref.add_element("value", str(pref_value))
186
187
        return self._send_xml_command(cmd)
188
189
    def delete_audit(
190
        self, audit_id: str, *, ultimate: Optional[bool] = False
191
    ) -> Any:
192
        """Deletes an existing audit
193
194
        Arguments:
195
            audit_id: UUID of the audit to be deleted.
196
            ultimate: Whether to remove entirely, or to the trashcan.
197
        """
198
        if not audit_id:
199
            raise RequiredArgument(
200
                function=self.delete_audit.__name__, argument='audit_id'
201
            )
202
203
        cmd = XmlCommand("delete_task")
204
        cmd.set_attribute("task_id", audit_id)
205
        cmd.set_attribute("ultimate", to_bool(ultimate))
206
207
        return self._send_xml_command(cmd)
208
209
    def get_audits(
210
        self,
211
        *,
212
        filter_string: Optional[str] = None,
213
        filter_id: Optional[str] = None,
214
        trash: Optional[bool] = None,
215
        details: Optional[bool] = None,
216
        schedules_only: Optional[bool] = None,
217
    ) -> Any:
218
        """Request a list of audits
219
220
        Arguments:
221
            filter_string: Filter term to use for the query
222
            filter_id: UUID of an existing filter to use for the query
223
            trash: Whether to get the trashcan audits instead
224
            details: Whether to include full audit details
225
            schedules_only: Whether to only include id, name and schedule
226
                details
227
228
        Returns:
229
            The response. See :py:meth:`send_command` for details.
230
        """
231
        cmd = XmlCommand("get_tasks")
232
        cmd.set_attribute("usage_type", "audit")
233
234
        add_filter(cmd, filter_string, filter_id)
235
236
        if trash is not None:
237
            cmd.set_attribute("trash", to_bool(trash))
238
239
        if details is not None:
240
            cmd.set_attribute("details", to_bool(details))
241
242
        if schedules_only is not None:
243
            cmd.set_attribute("schedules_only", to_bool(schedules_only))
244
245
        return self._send_xml_command(cmd)
246
247
    def get_audit(self, audit_id: str) -> Any:
248
        """Request a single audit
249
250
        Arguments:
251
            audit_id: UUID of an existing audit
252
253
        Returns:
254
            The response. See :py:meth:`send_command` for details.
255
        """
256
        if not audit_id:
257
            raise RequiredArgument(
258
                function=self.get_task.__name__, argument='audit_id'
259
            )
260
261
        cmd = XmlCommand("get_tasks")
262
        cmd.set_attribute("task_id", audit_id)
263
        cmd.set_attribute("usage_type", "audit")
264
265
        # for single entity always request all details
266
        cmd.set_attribute("details", "1")
267
        return self._send_xml_command(cmd)
268
269 View Code Duplication
    def modify_audit(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
270
        self,
271
        audit_id: str,
272
        *,
273
        name: Optional[str] = None,
274
        policy_id: Optional[str] = None,
275
        target_id: Optional[str] = None,
276
        scanner_id: Optional[str] = None,
277
        alterable: Optional[bool] = None,
278
        hosts_ordering: Optional[HostsOrdering] = None,
279
        schedule_id: Optional[str] = None,
280
        schedule_periods: Optional[int] = None,
281
        comment: Optional[str] = None,
282
        alert_ids: Optional[List[str]] = None,
283
        observers: Optional[List[str]] = None,
284
        preferences: Optional[dict] = None,
285
    ) -> Any:
286
        """Modifies an existing task.
287
288
        Arguments:
289
            audit_id: UUID of audit to modify.
290
            name: The name of the audit.
291
            policy_id: UUID of policy to use by the audit
292
            target_id: UUID of target to be scanned
293
            scanner_id: UUID of scanner to use for scanning the target
294
            comment: The comment on the audit.
295
            alert_ids: List of UUIDs for alerts to be applied to the audit
296
            hosts_ordering: The order hosts are scanned in
297
            schedule_id: UUID of a schedule when the audit should be run.
298
            schedule_periods: A limit to the number of times the audit will be
299
                scheduled, or 0 for no limit.
300
            observers: List of names or ids of users which should be allowed to
301
                observe this audit
302
            preferences: Name/Value pairs of scanner preferences.
303
304
        Returns:
305
            The response. See :py:meth:`send_command` for details.
306
        """
307
        if not audit_id:
308
            raise RequiredArgument(
309
                function=self.modify_task.__name__, argument='task_id argument'
310
            )
311
312
        cmd = XmlCommand("modify_task")
313
        cmd.set_attribute("task_id", audit_id)
314
315
        if name:
316
            cmd.add_element("name", name)
317
318
        if comment:
319
            cmd.add_element("comment", comment)
320
321
        if policy_id:
322
            cmd.add_element("config", attrs={"id": policy_id})
323
324
        if target_id:
325
            cmd.add_element("target", attrs={"id": target_id})
326
327
        if alterable is not None:
328
            cmd.add_element("alterable", to_bool(alterable))
329
330
        if hosts_ordering:
331
            if not isinstance(hosts_ordering, HostsOrdering):
332
                raise InvalidArgumentType(
333
                    function=self.modify_task.__name__,
334
                    argument='hosts_ordering',
335
                    arg_type=HostsOrdering.__name__,
336
                )
337
            cmd.add_element("hosts_ordering", hosts_ordering.value)
338
339
        if scanner_id:
340
            cmd.add_element("scanner", attrs={"id": scanner_id})
341
342
        if schedule_id:
343
            cmd.add_element("schedule", attrs={"id": schedule_id})
344
345
        if schedule_periods is not None:
346
            if (
347
                not isinstance(schedule_periods, Integral)
348
                or schedule_periods < 0
349
            ):
350
                raise InvalidArgument(
351
                    "schedule_periods must be an integer greater or equal "
352
                    "than 0"
353
                )
354
            cmd.add_element("schedule_periods", str(schedule_periods))
355
356
        if alert_ids is not None:
357
            if not is_list_like(alert_ids):
358
                raise InvalidArgumentType(
359
                    function=self.modify_task.__name__,
360
                    argument='alert_ids',
361
                    arg_type='list',
362
                )
363
364
            if len(alert_ids) == 0:
365
                cmd.add_element("alert", attrs={"id": "0"})
366
            else:
367
                for alert in alert_ids:
368
                    cmd.add_element("alert", attrs={"id": str(alert)})
369
370
        if observers is not None:
371
            if not is_list_like(observers):
372
                raise InvalidArgumentType(
373
                    function=self.modify_task.__name__,
374
                    argument='observers',
375
                    arg_type='list',
376
                )
377
378
            cmd.add_element("observers", to_comma_list(observers))
379
380
        if preferences is not None:
381
            if not isinstance(preferences, Mapping):
382
                raise InvalidArgumentType(
383
                    function=self.modify_task.__name__,
384
                    argument='preferences',
385
                    arg_type=Mapping.__name__,
386
                )
387
388
            _xmlprefs = cmd.add_element("preferences")
389
            for pref_name, pref_value in preferences.items():
390
                _xmlpref = _xmlprefs.add_element("preference")
391
                _xmlpref.add_element("scanner_name", pref_name)
392
                _xmlpref.add_element("value", str(pref_value))
393
394
        return self._send_xml_command(cmd)
395
396
    def resume_audit(self, audit_id: str) -> Any:
397
        """Resume an existing stopped audit
398
399
        Arguments:
400
            audit_id: UUID of the audit to be resumed
401
402
        Returns:
403
            The response. See :py:meth:`send_command` for details.
404
        """
405
        if not audit_id:
406
            raise RequiredArgument(
407
                function=self.resume_audit.__name__, argument='audit_id'
408
            )
409
410
        cmd = XmlCommand("resume_task")
411
        cmd.set_attribute("task_id", audit_id)
412
413
        return self._send_xml_command(cmd)
414
415
    def start_audit(self, audit_id: str) -> Any:
416
        """Start an existing audit
417
418
        Arguments:
419
            audit_id: UUID of the audit to be started
420
421
        Returns:
422
            The response. See :py:meth:`send_command` for details.
423
        """
424
        if not audit_id:
425
            raise RequiredArgument(
426
                function=self.start_audit.__name__, argument='audit_id'
427
            )
428
429
        cmd = XmlCommand("start_task")
430
        cmd.set_attribute("task_id", audit_id)
431
432
        return self._send_xml_command(cmd)
433
434
    def stop_audit(self, audit_id: str) -> Any:
435
        """Stop an existing running audit
436
437
        Arguments:
438
            audit_id: UUID of the audit to be stopped
439
440
        Returns:
441
            The response. See :py:meth:`send_command` for details.
442
        """
443
        if not audit_id:
444
            raise RequiredArgument(
445
                function=self.stop_audit.__name__, argument='audit_id'
446
            )
447
448
        cmd = XmlCommand("stop_task")
449
        cmd.set_attribute("task_id", audit_id)
450
451
        return self._send_xml_command(cmd)
452