Completed
Push — master ( 802093...1e243f )
by Jaspar
29s queued 13s
created

AuditsMixin.clone_audit()   A

Complexity

Conditions 2

Size

Total Lines 17
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 7
nop 2
dl 0
loc 17
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
# pylint:  disable=redefined-builtin
20
21
from collections.abc import Mapping
22
from numbers import Integral
23
from typing import Any, List, Optional
24
25
from gvm.errors import InvalidArgument, InvalidArgumentType, RequiredArgument
26
from gvm.protocols.gmpv208.types import (
27
    HostsOrdering,
28
)  # if I use latest, I get circular import :/
29
from gvm.utils import add_filter, is_list_like, to_bool, to_comma_list
30
from gvm.xml import XmlCommand
31
32
33
class AuditsMixin:
34
    def clone_audit(self, audit_id: str) -> Any:
35
        """Clone an existing audit
36
37
        Arguments:
38
            audit_id: UUID of existing audit to clone from
39
40
        Returns:
41
            The response. See :py:meth:`send_command` for details.
42
        """
43
        if not audit_id:
44
            raise RequiredArgument(
45
                function=self.clone_audit.__name__, argument='audit_id'
46
            )
47
48
        cmd = XmlCommand("create_task")
49
        cmd.add_element("copy", audit_id)
50
        return self._send_xml_command(cmd)
51
52 View Code Duplication
    def create_audit(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
53
        self,
54
        name: str,
55
        policy_id: str,
56
        target_id: str,
57
        scanner_id: str,
58
        *,
59
        alterable: Optional[bool] = None,
60
        hosts_ordering: Optional[HostsOrdering] = None,
61
        schedule_id: Optional[str] = None,
62
        alert_ids: Optional[List[str]] = None,
63
        comment: Optional[str] = None,
64
        schedule_periods: Optional[int] = None,
65
        observers: Optional[List[str]] = None,
66
        preferences: Optional[dict] = None,
67
    ) -> Any:
68
        """Create a new audit task
69
70
        Arguments:
71
            name: Name of the new audit
72
            policy_id: UUID of policy to use by the audit
73
            target_id: UUID of target to be scanned
74
            scanner_id: UUID of scanner to use for scanning the target
75
            comment: Comment for the audit
76
            alterable: Whether the task should be alterable
77
            alert_ids: List of UUIDs for alerts to be applied to the audit
78
            hosts_ordering: The order hosts are scanned in
79
            schedule_id: UUID of a schedule when the audit should be run.
80
            schedule_periods: A limit to the number of times the audit will be
81
                scheduled, or 0 for no limit
82
            observers: List of names or ids of users which should be allowed to
83
                observe this audit
84
            preferences: Name/Value pairs of scanner preferences.
85
86
        Returns:
87
            The response. See :py:meth:`send_command` for details.
88
        """
89
90
        if not name:
91
            raise RequiredArgument(
92
                function=self.create_audit.__name__, argument='name'
93
            )
94
        if not policy_id:
95
            raise RequiredArgument(
96
                function=self.create_audit.__name__, argument='policy_id'
97
            )
98
99
        if not target_id:
100
            raise RequiredArgument(
101
                function=self.create_audit.__name__, argument='target_id'
102
            )
103
104
        if not scanner_id:
105
            raise RequiredArgument(
106
                function=self.create_audit.__name__, argument='scanner_id'
107
            )
108
109
        # don't allow to create a container task with create_task
110
        if target_id == '0':
111
            raise InvalidArgument(
112
                function=self.create_audit.__name__, argument='target_id'
113
            )
114
115
        cmd = XmlCommand("create_task")
116
        cmd.add_element("name", name)
117
        cmd.add_element("usage_type", "audit")
118
        cmd.add_element("config", attrs={"id": policy_id})
119
        cmd.add_element("target", attrs={"id": target_id})
120
        cmd.add_element("scanner", attrs={"id": scanner_id})
121
122
        if comment:
123
            cmd.add_element("comment", comment)
124
125
        if alterable is not None:
126
            cmd.add_element("alterable", to_bool(alterable))
127
128
        if hosts_ordering:
129
            if not isinstance(hosts_ordering, self.types.HostsOrdering):
130
                raise InvalidArgumentType(
131
                    function=self.create_audit.__name__,
132
                    argument='hosts_ordering',
133
                    arg_type=HostsOrdering.__name__,
134
                )
135
            cmd.add_element("hosts_ordering", hosts_ordering.value)
136
137
        if alert_ids:
138
            if is_list_like(alert_ids):
139
                # parse all given alert id's
140
                for alert in alert_ids:
141
                    cmd.add_element("alert", attrs={"id": str(alert)})
142
143
        if schedule_id:
144
            cmd.add_element("schedule", attrs={"id": schedule_id})
145
146
            if schedule_periods is not None:
147
                if (
148
                    not isinstance(schedule_periods, Integral)
149
                    or schedule_periods < 0
150
                ):
151
                    raise InvalidArgument(
152
                        "schedule_periods must be an integer greater or equal "
153
                        "than 0"
154
                    )
155
                cmd.add_element("schedule_periods", str(schedule_periods))
156
157
        if observers is not None:
158
            if not is_list_like(observers):
159
                raise InvalidArgumentType(
160
                    function=self.create_audit.__name__,
161
                    argument='observers',
162
                    arg_type='list',
163
                )
164
165
            # gvmd splits by comma and space
166
            # gvmd tries to lookup each value as user name and afterwards as
167
            # user id. So both user name and user id are possible
168
            cmd.add_element("observers", to_comma_list(observers))
169
170
        if preferences is not None:
171
            if not isinstance(preferences, Mapping):
172
                raise InvalidArgumentType(
173
                    function=self.create_audit.__name__,
174
                    argument='preferences',
175
                    arg_type=Mapping.__name__,
176
                )
177
178
            _xmlprefs = cmd.add_element("preferences")
179
            for pref_name, pref_value in preferences.items():
180
                _xmlpref = _xmlprefs.add_element("preference")
181
                _xmlpref.add_element("scanner_name", pref_name)
182
                _xmlpref.add_element("value", str(pref_value))
183
184
        return self._send_xml_command(cmd)
185
186
    def delete_audit(
187
        self, audit_id: str, *, ultimate: Optional[bool] = False
188
    ) -> Any:
189
        """Deletes an existing audit
190
191
        Arguments:
192
            audit_id: UUID of the audit to be deleted.
193
            ultimate: Whether to remove entirely, or to the trashcan.
194
        """
195
        if not audit_id:
196
            raise RequiredArgument(
197
                function=self.delete_audit.__name__, argument='audit_id'
198
            )
199
200
        cmd = XmlCommand("delete_task")
201
        cmd.set_attribute("task_id", audit_id)
202
        cmd.set_attribute("ultimate", to_bool(ultimate))
203
204
        return self._send_xml_command(cmd)
205
206 View Code Duplication
    def get_audits(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
207
        self,
208
        *,
209
        filter: Optional[str] = None,
210
        filter_id: Optional[str] = None,
211
        trash: Optional[bool] = None,
212
        details: Optional[bool] = None,
213
        schedules_only: Optional[bool] = None,
214
    ) -> Any:
215
        """Request a list of audits
216
217
        Arguments:
218
            filter: Filter term to use for the query
219
            filter_id: UUID of an existing filter to use for the query
220
            trash: Whether to get the trashcan audits instead
221
            details: Whether to include full audit details
222
            schedules_only: Whether to only include id, name and schedule
223
                details
224
225
        Returns:
226
            The response. See :py:meth:`send_command` for details.
227
        """
228
        cmd = XmlCommand("get_tasks")
229
        cmd.set_attribute("usage_type", "audit")
230
231
        add_filter(cmd, filter, filter_id)
232
233
        if trash is not None:
234
            cmd.set_attribute("trash", to_bool(trash))
235
236
        if details is not None:
237
            cmd.set_attribute("details", to_bool(details))
238
239
        if schedules_only is not None:
240
            cmd.set_attribute("schedules_only", to_bool(schedules_only))
241
242
        return self._send_xml_command(cmd)
243
244
    def get_audit(self, audit_id: str) -> Any:
245
        """Request a single audit
246
247
        Arguments:
248
            audit_id: UUID of an existing audit
249
250
        Returns:
251
            The response. See :py:meth:`send_command` for details.
252
        """
253
        if not audit_id:
254
            raise RequiredArgument(
255
                function=self.get_task.__name__, argument='audit_id'
256
            )
257
258
        cmd = XmlCommand("get_tasks")
259
        cmd.set_attribute("task_id", audit_id)
260
        cmd.set_attribute("usage_type", "audit")
261
262
        # for single entity always request all details
263
        cmd.set_attribute("details", "1")
264
        return self._send_xml_command(cmd)
265
266 View Code Duplication
    def modify_audit(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
267
        self,
268
        audit_id: str,
269
        *,
270
        name: Optional[str] = None,
271
        policy_id: Optional[str] = None,
272
        target_id: Optional[str] = None,
273
        scanner_id: Optional[str] = None,
274
        alterable: Optional[bool] = None,
275
        hosts_ordering: Optional[HostsOrdering] = None,
276
        schedule_id: Optional[str] = None,
277
        schedule_periods: Optional[int] = None,
278
        comment: Optional[str] = None,
279
        alert_ids: Optional[List[str]] = None,
280
        observers: Optional[List[str]] = None,
281
        preferences: Optional[dict] = None,
282
    ) -> Any:
283
        """Modifies an existing task.
284
285
        Arguments:
286
            audit_id: UUID of audit to modify.
287
            name: The name of the audit.
288
            policy_id: UUID of policy to use by the audit
289
            target_id: UUID of target to be scanned
290
            scanner_id: UUID of scanner to use for scanning the target
291
            comment: The comment on the audit.
292
            alert_ids: List of UUIDs for alerts to be applied to the audit
293
            hosts_ordering: The order hosts are scanned in
294
            schedule_id: UUID of a schedule when the audit should be run.
295
            schedule_periods: A limit to the number of times the audit will be
296
                scheduled, or 0 for no limit.
297
            observers: List of names or ids of users which should be allowed to
298
                observe this audit
299
            preferences: Name/Value pairs of scanner preferences.
300
301
        Returns:
302
            The response. See :py:meth:`send_command` for details.
303
        """
304
        if not audit_id:
305
            raise RequiredArgument(
306
                function=self.modify_task.__name__, argument='task_id argument'
307
            )
308
309
        cmd = XmlCommand("modify_task")
310
        cmd.set_attribute("task_id", audit_id)
311
312
        if name:
313
            cmd.add_element("name", name)
314
315
        if comment:
316
            cmd.add_element("comment", comment)
317
318
        if policy_id:
319
            cmd.add_element("config", attrs={"id": policy_id})
320
321
        if target_id:
322
            cmd.add_element("target", attrs={"id": target_id})
323
324
        if alterable is not None:
325
            cmd.add_element("alterable", to_bool(alterable))
326
327
        if hosts_ordering:
328
            if not isinstance(hosts_ordering, HostsOrdering):
329
                raise InvalidArgumentType(
330
                    function=self.modify_task.__name__,
331
                    argument='hosts_ordering',
332
                    arg_type=HostsOrdering.__name__,
333
                )
334
            cmd.add_element("hosts_ordering", hosts_ordering.value)
335
336
        if scanner_id:
337
            cmd.add_element("scanner", attrs={"id": scanner_id})
338
339
        if schedule_id:
340
            cmd.add_element("schedule", attrs={"id": schedule_id})
341
342
        if schedule_periods is not None:
343
            if (
344
                not isinstance(schedule_periods, Integral)
345
                or schedule_periods < 0
346
            ):
347
                raise InvalidArgument(
348
                    "schedule_periods must be an integer greater or equal "
349
                    "than 0"
350
                )
351
            cmd.add_element("schedule_periods", str(schedule_periods))
352
353
        if alert_ids is not None:
354
            if not is_list_like(alert_ids):
355
                raise InvalidArgumentType(
356
                    function=self.modify_task.__name__,
357
                    argument='alert_ids',
358
                    arg_type='list',
359
                )
360
361
            if len(alert_ids) == 0:
362
                cmd.add_element("alert", attrs={"id": "0"})
363
            else:
364
                for alert in alert_ids:
365
                    cmd.add_element("alert", attrs={"id": str(alert)})
366
367
        if observers is not None:
368
            if not is_list_like(observers):
369
                raise InvalidArgumentType(
370
                    function=self.modify_task.__name__,
371
                    argument='observers',
372
                    arg_type='list',
373
                )
374
375
            cmd.add_element("observers", to_comma_list(observers))
376
377
        if preferences is not None:
378
            if not isinstance(preferences, Mapping):
379
                raise InvalidArgumentType(
380
                    function=self.modify_task.__name__,
381
                    argument='preferences',
382
                    arg_type=Mapping.__name__,
383
                )
384
385
            _xmlprefs = cmd.add_element("preferences")
386
            for pref_name, pref_value in preferences.items():
387
                _xmlpref = _xmlprefs.add_element("preference")
388
                _xmlpref.add_element("scanner_name", pref_name)
389
                _xmlpref.add_element("value", str(pref_value))
390
391
        return self._send_xml_command(cmd)
392
393
    def resume_audit(self, audit_id: str) -> Any:
394
        """Resume an existing stopped audit
395
396
        Arguments:
397
            audit_id: UUID of the audit to be resumed
398
399
        Returns:
400
            The response. See :py:meth:`send_command` for details.
401
        """
402
        if not audit_id:
403
            raise RequiredArgument(
404
                function=self.resume_audit.__name__, argument='audit_id'
405
            )
406
407
        cmd = XmlCommand("resume_task")
408
        cmd.set_attribute("task_id", audit_id)
409
410
        return self._send_xml_command(cmd)
411
412
    def start_audit(self, audit_id: str) -> Any:
413
        """Start an existing audit
414
415
        Arguments:
416
            audit_id: UUID of the audit to be started
417
418
        Returns:
419
            The response. See :py:meth:`send_command` for details.
420
        """
421
        if not audit_id:
422
            raise RequiredArgument(
423
                function=self.start_audit.__name__, argument='audit_id'
424
            )
425
426
        cmd = XmlCommand("start_task")
427
        cmd.set_attribute("task_id", audit_id)
428
429
        return self._send_xml_command(cmd)
430
431
    def stop_audit(self, audit_id: str) -> Any:
432
        """Stop an existing running audit
433
434
        Arguments:
435
            audit_id: UUID of the audit to be stopped
436
437
        Returns:
438
            The response. See :py:meth:`send_command` for details.
439
        """
440
        if not audit_id:
441
            raise RequiredArgument(
442
                function=self.stop_audit.__name__, argument='audit_id'
443
            )
444
445
        cmd = XmlCommand("stop_task")
446
        cmd.set_attribute("task_id", audit_id)
447
448
        return self._send_xml_command(cmd)
449