Completed
Push — master ( 9d0301...629a56 )
by Jaspar
23s queued 15s
created

GmpV9Mixin.create_permission()   D

Complexity

Conditions 12

Size

Total Lines 83
Code Lines 49

Duplication

Lines 32
Ratio 38.55 %

Importance

Changes 0
Metric Value
cc 12
eloc 49
nop 8
dl 32
loc 83
rs 4.8
c 0
b 0
f 0

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like gvm.protocols.gmpv9.gmpv9.GmpV9Mixin.create_permission() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 - 2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
# pylint: disable=arguments-differ, redefined-builtin, too-many-lines
20
21
"""
22
Module for communication with gvmd in `Greenbone Management Protocol version 9`_
23
24
.. _Greenbone Management Protocol version 9:
25
    https://docs.greenbone.net/API/GMP/gmp-9.0.html
26
"""
27
import collections
28
import numbers
29
30
from typing import Any, List, Optional, Callable
31
32
from gvm.errors import InvalidArgument, InvalidArgumentType, RequiredArgument
33
from gvm.utils import deprecation
34
from gvm.xml import XmlCommand
35
36
from gvm.protocols.base import GvmProtocol
37
from gvm.connections import GvmConnection
38
from gvm.protocols.gmpv7.gmpv7 import (
39
    _to_bool,
40
    _add_filter,
41
    _is_list_like,
42
    _to_comma_list,
43
)
44
45
from . import types
46
from .types import *  # pylint: disable=unused-wildcard-import, wildcard-import
47
from .types import _UsageType as UsageType
48
49
_EMPTY_POLICY_ID = '085569ce-73ed-11df-83c3-002264764cea'
50
51
52
def _check_event(
53
    event: AlertEvent, condition: AlertCondition, method: AlertMethod
54
):
55
    if event == AlertEvent.TASK_RUN_STATUS_CHANGED:
56
        if not condition:
57
            raise RequiredArgument(
58
                "condition is required for event {}".format(event.name)
59
            )
60
61
        if not method:
62
            raise RequiredArgument(
63
                "method is required for event {}".format(event.name)
64
            )
65
66
        if condition not in (
67
            AlertCondition.ALWAYS,
68
            AlertCondition.FILTER_COUNT_CHANGED,
69
            AlertCondition.FILTER_COUNT_AT_LEAST,
70
            AlertCondition.SEVERITY_AT_LEAST,
71
            AlertCondition.SEVERITY_CHANGED,
72
        ):
73
            raise InvalidArgument(
74
                "Invalid condition {} for event {}".format(
75
                    condition.name, event.name
76
                )
77
            )
78
    elif event in (
79
        AlertEvent.NEW_SECINFO_ARRIVED,
80
        AlertEvent.UPDATED_SECINFO_ARRIVED,
81
    ):
82
        if not condition:
83
            raise RequiredArgument(
84
                "condition is required for event {}".format(event.name)
85
            )
86
87
        if not method:
88
            raise RequiredArgument(
89
                "method is required for event {}".format(event.name)
90
            )
91
92
        if condition != AlertCondition.ALWAYS:
93
            raise InvalidArgument(
94
                "Invalid condition {} for event {}".format(
95
                    condition.name, event.name
96
                )
97
            )
98
        if method not in (
99
            AlertMethod.SCP,
100
            AlertMethod.SEND,
101
            AlertMethod.SMB,
102
            AlertMethod.SNMP,
103
            AlertMethod.SYSLOG,
104
            AlertMethod.EMAIL,
105
        ):
106
            raise InvalidArgument(
107
                "Invalid method {} for event {}".format(method.name, event.name)
108
            )
109
    elif event in (
110
        AlertEvent.TICKET_RECEIVED,
111
        AlertEvent.OWNED_TICKET_CHANGED,
112
        AlertEvent.ASSIGNED_TICKET_CHANGED,
113
    ):
114
        if not condition:
115
            raise RequiredArgument(
116
                "condition is required for event {}".format(event.name)
117
            )
118
119
        if not method:
120
            raise RequiredArgument(
121
                "method is required for event {}".format(event.name)
122
            )
123
        if condition != AlertCondition.ALWAYS:
124
            raise InvalidArgument(
125
                "Invalid condition {} for event {}".format(
126
                    condition.name, event.name
127
                )
128
            )
129
        if method not in (
130
            AlertMethod.EMAIL,
131
            AlertMethod.START_TASK,
132
            AlertMethod.SYSLOG,
133
        ):
134
            raise InvalidArgument(
135
                "Invalid method {} for event {}".format(method.name, event.name)
136
            )
137
    elif event is not None:
138
        raise InvalidArgument('Invalid event "{}"'.format(event.name))
139
140
141
class GmpV9Mixin(GvmProtocol):
142
143
    types = types
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable types does not seem to be defined.
Loading history...
144
145
    def __init__(
146
        self,
147
        connection: GvmConnection,
148
        *,
149
        transform: Optional[Callable[[str], Any]] = None
150
    ):
151
        super().__init__(connection, transform=transform)
152
153
        # Is authenticated on gvmd
154
        self._authenticated = False
155
156 View Code Duplication
    def create_alert(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
157
        self,
158
        name: str,
159
        condition: AlertCondition,
160
        event: AlertEvent,
161
        method: AlertMethod,
162
        *,
163
        method_data: Optional[dict] = None,
164
        event_data: Optional[dict] = None,
165
        condition_data: Optional[dict] = None,
166
        filter_id: Optional[int] = None,
167
        comment: Optional[str] = None
168
    ) -> Any:
169
        """Create a new alert
170
171
        Arguments:
172
            name: Name of the new Alert
173
            condition: The condition that must be satisfied for the alert
174
                to occur; if the event is either 'Updated SecInfo arrived' or
175
                'New SecInfo arrived', condition must be 'Always'. Otherwise,
176
                condition can also be on of 'Severity at least', 'Filter count
177
                changed' or 'Filter count at least'.
178
            event: The event that must happen for the alert to occur, one
179
                of 'Task run status changed', 'Updated SecInfo arrived' or 'New
180
                SecInfo arrived'
181
            method: The method by which the user is alerted, one of 'SCP',
182
                'Send', 'SMB', 'SNMP', 'Syslog' or 'Email'; if the event is
183
                neither 'Updated SecInfo arrived' nor 'New SecInfo arrived',
184
                method can also be one of 'Start Task', 'HTTP Get', 'Sourcefire
185
                Connector' or 'verinice Connector'.
186
            condition_data: Data that defines the condition
187
            event_data: Data that defines the event
188
            method_data: Data that defines the method
189
            filter_id: Filter to apply when executing alert
190
            comment: Comment for the alert
191
192
        Returns:
193
            The response. See :py:meth:`send_command` for details.
194
        """
195
        if not name:
196
            raise RequiredArgument(
197
                function=self.create_alert.__name__, argument='name'
198
            )
199
200
        if not condition:
201
            raise RequiredArgument(
202
                function=self.create_alert.__name__, argument='condition'
203
            )
204
205
        if not event:
206
            raise RequiredArgument(
207
                function=self.create_alert.__name__, argument='event'
208
            )
209
210
        if not method:
211
            raise RequiredArgument(
212
                function=self.create_alert.__name__, argument='method'
213
            )
214
215
        if not isinstance(condition, AlertCondition):
216
            raise InvalidArgumentType(
217
                function=self.create_alert.__name__,
218
                argument='condition',
219
                arg_type=AlertCondition.__name__,
220
            )
221
222
        if not isinstance(event, AlertEvent):
223
            raise InvalidArgumentType(
224
                function=self.create_alert.__name__,
225
                argument='even',
226
                arg_type=AlertEvent.__name__,
227
            )
228
229
        if not isinstance(method, AlertMethod):
230
            raise InvalidArgumentType(
231
                function=self.create_alert.__name__,
232
                argument='method',
233
                arg_type=AlertMethod.__name__,
234
            )
235
236
        _check_event(event, condition, method)
237
238
        cmd = XmlCommand("create_alert")
239
        cmd.add_element("name", name)
240
241
        conditions = cmd.add_element("condition", condition.value)
242
243
        if condition_data is not None:
244
            for key, value in condition_data.items():
245
                _data = conditions.add_element("data", value)
246
                _data.add_element("name", key)
247
248
        events = cmd.add_element("event", event.value)
249
250
        if event_data is not None:
251
            for key, value in event_data.items():
252
                _data = events.add_element("data", value)
253
                _data.add_element("name", key)
254
255
        methods = cmd.add_element("method", method.value)
256
257
        if method_data is not None:
258
            for key, value in method_data.items():
259
                _data = methods.add_element("data", value)
260
                _data.add_element("name", key)
261
262
        if filter_id:
263
            cmd.add_element("filter", attrs={"id": filter_id})
264
265
        if comment:
266
            cmd.add_element("comment", comment)
267
268
        return self._send_xml_command(cmd)
269
270 View Code Duplication
    def create_audit(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
271
        self,
272
        name: str,
273
        policy_id: str,
274
        target_id: str,
275
        scanner_id: str,
276
        *,
277
        alterable: Optional[bool] = None,
278
        hosts_ordering: Optional[HostsOrdering] = None,
279
        schedule_id: Optional[str] = None,
280
        alert_ids: Optional[List[str]] = None,
281
        comment: Optional[str] = None,
282
        schedule_periods: Optional[int] = None,
283
        observers: Optional[List[str]] = None,
284
        preferences: Optional[dict] = None
285
    ) -> Any:
286
        """Create a new audit task
287
288
        Arguments:
289
            name: Name of the new audit
290
            policy_id: UUID of policy to use by the audit
291
            target_id: UUID of target to be scanned
292
            scanner_id: UUID of scanner to use for scanning the target
293
            comment: Comment for the audit
294
            alterable: Whether the task should be alterable
295
            alert_ids: List of UUIDs for alerts to be applied to the audit
296
            hosts_ordering: The order hosts are scanned in
297
            schedule_id: UUID of a schedule when the audit should be run.
298
            schedule_periods: A limit to the number of times the audit will be
299
                scheduled, or 0 for no limit
300
            observers: List of names or ids of users which should be allowed to
301
                observe this audit
302
            preferences: Name/Value pairs of scanner preferences.
303
304
        Returns:
305
            The response. See :py:meth:`send_command` for details.
306
        """
307
308
        return self.__create_task(
309
            name=name,
310
            config_id=policy_id,
311
            target_id=target_id,
312
            scanner_id=scanner_id,
313
            usage_type=UsageType.AUDIT,
314
            function=self.create_audit.__name__,
315
            alterable=alterable,
316
            hosts_ordering=hosts_ordering,
317
            schedule_id=schedule_id,
318
            alert_ids=alert_ids,
319
            comment=comment,
320
            schedule_periods=schedule_periods,
321
            observers=observers,
322
            preferences=preferences,
323
        )
324
325
    def create_config(
326
        self, config_id: str, name: str, *, comment: Optional[str] = None
327
    ) -> Any:
328
        """Create a new scan config
329
330
        Arguments:
331
            config_id: UUID of the existing scan config
332
            name: Name of the new scan config
333
            comment: A comment on the config
334
335
        Returns:
336
            The response. See :py:meth:`send_command` for details.
337
        """
338
        return self.__create_config(
339
            config_id=config_id,
340
            name=name,
341
            comment=comment,
342
            usage_type=UsageType.SCAN,
343
            function=self.create_config.__name__,
344
        )
345
346
    def create_config_from_osp_scanner(
347
        self, scanner_id: str, name: str, *, comment: Optional[str] = None
348
    ) -> Any:
349
        """Create a new scan config from an ospd scanner.
350
351
        Create config by retrieving the expected preferences from the given
352
        scanner via OSP.
353
354
        Arguments:
355
            scanner_id: UUID of an OSP scanner to get config data from
356
            name: Name of the new scan config
357
            comment: A comment on the config
358
359
        Returns:
360
            The response. See :py:meth:`send_command` for details.
361
        """
362
        return self.__create_config_from_osp_scanner(
363
            scanner_id=scanner_id,
364
            name=name,
365
            comment=comment,
366
            usage_type=UsageType.SCAN,
367
            function=self.create_config.__name__,
368
        )
369
370
    def create_permission(
371
        self,
372
        name: str,
373
        subject_id: str,
374
        subject_type: PermissionSubjectType,
375
        *,
376
        resource_id: Optional[str] = None,
377
        resource_type: Optional[EntityType] = None,
378
        comment: Optional[str] = None
379
    ) -> Any:
380
        """Create a new permission
381
382
        Arguments:
383
            name: Name of the new permission
384
            subject_id: UUID of subject to whom the permission is granted
385
            subject_type: Type of the subject user, group or role
386
            comment: Comment for the permission
387
            resource_id: UUID of entity to which the permission applies
388
            resource_type: Type of the resource. For Super permissions user,
389
                group or role
390
391
        Returns:
392
            The response. See :py:meth:`send_command` for details.
393
        """
394
        if not name:
395
            raise RequiredArgument(
396
                function=self.create_permission.__name__, argument='name'
397
            )
398
399
        if not subject_id:
400
            raise RequiredArgument(
401
                function=self.create_permission.__name__, argument='subject_id'
402
            )
403
404
        if not isinstance(subject_type, PermissionSubjectType):
405
            raise InvalidArgumentType(
406
                function=self.create_permission.__name__,
407
                argument='subject_type',
408
                arg_type=PermissionSubjectType.__name__,
409
            )
410
411
        cmd = XmlCommand("create_permission")
412
        cmd.add_element("name", name)
413
414
        _xmlsubject = cmd.add_element("subject", attrs={"id": subject_id})
415
        _xmlsubject.add_element("type", subject_type.value)
416
417
        if comment:
418
            cmd.add_element("comment", comment)
419
420 View Code Duplication
        if resource_id or resource_type:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
421
            if not resource_id:
422
                raise RequiredArgument(
423
                    function=self.create_permission.__name__,
424
                    argument='resource_id',
425
                )
426
427
            if not resource_type:
428
                raise RequiredArgument(
429
                    function=self.create_permission.__name__,
430
                    argument='resource_type',
431
                )
432
433
            if not isinstance(resource_type, self.types.EntityType):
434
                raise InvalidArgumentType(
435
                    function=self.create_permission.__name__,
436
                    argument='resource_type',
437
                    arg_type=self.types.EntityType.__name__,
438
                )
439
440
            _xmlresource = cmd.add_element(
441
                "resource", attrs={"id": resource_id}
442
            )
443
444
            _actual_resource_type = resource_type
445
            if resource_type.value == EntityType.AUDIT.value:
446
                _actual_resource_type = EntityType.TASK
447
            elif resource_type.value == EntityType.POLICY.value:
448
                _actual_resource_type = EntityType.SCAN_CONFIG
449
450
            _xmlresource.add_element("type", _actual_resource_type.value)
451
452
        return self._send_xml_command(cmd)
453
454
    def create_policy(
455
        self, name: str, *, policy_id: str = None, comment: Optional[str] = None
456
    ) -> Any:
457
        """Create a new policy config
458
459
        Arguments:
460
            name: Name of the new policy
461
            policy_id: UUID of an existing policy as base. By default the empty
462
                policy is used.
463
            comment: A comment on the policy
464
465
        Returns:
466
            The response. See :py:meth:`send_command` for details.
467
        """
468
        if policy_id is None:
469
            policy_id = _EMPTY_POLICY_ID
470
        return self.__create_config(
471
            config_id=policy_id,
472
            name=name,
473
            comment=comment,
474
            usage_type=UsageType.POLICY,
475
            function=self.create_policy.__name__,
476
        )
477
478
    def create_tag(
479
        self,
480
        name: str,
481
        resource_type: EntityType,
482
        *,
483
        resource_filter: Optional[str] = None,
484
        resource_ids: Optional[List[str]] = None,
485
        value: Optional[str] = None,
486
        comment: Optional[str] = None,
487
        active: Optional[bool] = None
488
    ) -> Any:
489
        """Create a tag.
490
491
        Arguments:
492
            name: Name of the tag. A full tag name consisting of namespace and
493
                predicate e.g. `foo:bar`.
494
            resource_type: Entity type the tag is to be attached to.
495
            resource_filter: Filter term to select resources the tag is to be
496
                attached to. Only one of resource_filter or resource_ids can be
497
                provided.
498
            resource_ids: IDs of the resources the tag is to be attached to.
499
                Only one of resource_filter or resource_ids can be provided.
500
            value: Value associated with the tag.
501
            comment: Comment for the tag.
502
            active: Whether the tag should be active.
503
504
        Returns:
505
            The response. See :py:meth:`send_command` for details.
506
        """
507
        if not name:
508
            raise RequiredArgument(
509
                function=self.create_tag.__name__, argument='name'
510
            )
511
512
        if resource_filter and resource_ids:
513
            raise InvalidArgument(
514
                "create_tag accepts either resource_filter or resource_ids "
515
                "argument",
516
                function=self.create_tag.__name__,
517
            )
518
519
        if not resource_type:
520
            raise RequiredArgument(
521
                function=self.create_tag.__name__, argument='resource_type'
522
            )
523
524
        if not isinstance(resource_type, self.types.EntityType):
525
            raise InvalidArgumentType(
526
                function=self.create_tag.__name__,
527
                argument='resource_type',
528
                arg_type=EntityType.__name__,
529
            )
530
531
        cmd = XmlCommand('create_tag')
532
        cmd.add_element('name', name)
533
534
        _xmlresources = cmd.add_element("resources")
535
        if resource_filter is not None:
536
            _xmlresources.set_attribute("filter", resource_filter)
537
538
        for resource_id in resource_ids or []:
539
            _xmlresources.add_element(
540
                "resource", attrs={"id": str(resource_id)}
541
            )
542
543
        _actual_resource_type = resource_type
544
        if resource_type.value == EntityType.AUDIT.value:
545
            _actual_resource_type = EntityType.TASK
546
        elif resource_type.value == EntityType.POLICY.value:
547
            _actual_resource_type = EntityType.SCAN_CONFIG
548
        _xmlresources.add_element("type", _actual_resource_type.value)
549
550
        if comment:
551
            cmd.add_element("comment", comment)
552
553
        if value:
554
            cmd.add_element("value", value)
555
556
        if active is not None:
557
            if active:
558
                cmd.add_element("active", "1")
559
            else:
560
                cmd.add_element("active", "0")
561
562
        return self._send_xml_command(cmd)
563
564 View Code Duplication
    def create_task(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
565
        self,
566
        name: str,
567
        config_id: str,
568
        target_id: str,
569
        scanner_id: str,
570
        *,
571
        alterable: Optional[bool] = None,
572
        hosts_ordering: Optional[HostsOrdering] = None,
573
        schedule_id: Optional[str] = None,
574
        alert_ids: Optional[List[str]] = None,
575
        comment: Optional[str] = None,
576
        schedule_periods: Optional[int] = None,
577
        observers: Optional[List[str]] = None,
578
        preferences: Optional[dict] = None
579
    ) -> Any:
580
        """Create a new scan task
581
582
        Arguments:
583
            name: Name of the task
584
            config_id: UUID of scan config to use by the task
585
            target_id: UUID of target to be scanned
586
            scanner_id: UUID of scanner to use for scanning the target
587
            comment: Comment for the task
588
            alterable: Whether the task should be alterable
589
            alert_ids: List of UUIDs for alerts to be applied to the task
590
            hosts_ordering: The order hosts are scanned in
591
            schedule_id: UUID of a schedule when the task should be run.
592
            schedule_periods: A limit to the number of times the task will be
593
                scheduled, or 0 for no limit
594
            observers: List of names or ids of users which should be allowed to
595
                observe this task
596
            preferences: Name/Value pairs of scanner preferences.
597
598
        Returns:
599
            The response. See :py:meth:`send_command` for details.
600
        """
601
        return self.__create_task(
602
            name=name,
603
            config_id=config_id,
604
            target_id=target_id,
605
            scanner_id=scanner_id,
606
            usage_type=UsageType.SCAN,
607
            function=self.create_task.__name__,
608
            alterable=alterable,
609
            hosts_ordering=hosts_ordering,
610
            schedule_id=schedule_id,
611
            alert_ids=alert_ids,
612
            comment=comment,
613
            schedule_periods=schedule_periods,
614
            observers=observers,
615
            preferences=preferences,
616
        )
617
618 View Code Duplication
    def create_tls_certificate(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
619
        self,
620
        name: str,
621
        certificate: str,
622
        *,
623
        comment: Optional[str] = None,
624
        trust: Optional[bool] = None
625
    ) -> Any:
626
        """Create a new TLS certificate
627
628
        Arguments:
629
            name: Name of the TLS certificate, defaulting to the MD5
630
                fingerprint.
631
            certificate: The Base64 encoded certificate data (x.509 DER or PEM).
632
            comment: Comment for the TLS certificate.
633
            trust: Whether the certificate is trusted.
634
635
        Returns:
636
            The response. See :py:meth:`send_command` for details.
637
        """
638
        if not name:
639
            raise RequiredArgument(
640
                function=self.create_tls_certificate.__name__, argument='name'
641
            )
642
        if not certificate:
643
            raise RequiredArgument(
644
                function=self.create_tls_certificate.__name__,
645
                argument='certificate',
646
            )
647
648
        cmd = XmlCommand("create_tls_certificate")
649
650
        if comment:
651
            cmd.add_element("comment", comment)
652
653
        cmd.add_element("name", name)
654
        cmd.add_element("certificate", certificate)
655
656
        if trust:
657
            cmd.add_element("trust", _to_bool(trust))
658
659
        return self._send_xml_command(cmd)
660
661 View Code Duplication
    def get_tls_certificates(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
662
        self,
663
        *,
664
        filter: Optional[str] = None,
665
        filter_id: Optional[str] = None,
666
        include_certificate_data: Optional[bool] = None,
667
        details: Optional[bool] = None
668
    ) -> Any:
669
        """Request a list of TLS certificates
670
671
        Arguments:
672
            filter: Filter term to use for the query
673
            filter_id: UUID of an existing filter to use for the query
674
            include_certificate_data: Whether to include the certificate data in
675
                the response
676
677
        Returns:
678
            The response. See :py:meth:`send_command` for details.
679
        """
680
681
        cmd = XmlCommand("get_tls_certificates")
682
683
        _add_filter(cmd, filter, filter_id)
684
685
        if details is not None:
686
            cmd.set_attribute("details", _to_bool(details))
687
688
        if include_certificate_data is not None:
689
            cmd.set_attribute(
690
                "include_certificate_data", _to_bool(include_certificate_data)
691
            )
692
693
        return self._send_xml_command(cmd)
694
695
    def get_tls_certificate(self, tls_certificate_id: str) -> Any:
696
        """Request a single TLS certificate
697
698
        Arguments:
699
            tls_certificate_id: UUID of an existing TLS certificate
700
701
        Returns:
702
            The response. See :py:meth:`send_command` for details.
703
        """
704
        cmd = XmlCommand("get_tls_certificates")
705
706
        if not tls_certificate_id:
707
            raise RequiredArgument(
708
                function=self.get_tls_certificate.__name__,
709
                argument='tls_certificate_id',
710
            )
711
712
        cmd.set_attribute("tls_certificate_id", tls_certificate_id)
713
714
        # for single tls certificate always request cert data
715
        cmd.set_attribute("include_certificate_data", "1")
716
717
        # for single entity always request all details
718
        cmd.set_attribute("details", "1")
719
720
        return self._send_xml_command(cmd)
721
722 View Code Duplication
    def modify_alert(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
723
        self,
724
        alert_id: str,
725
        *,
726
        name: Optional[str] = None,
727
        comment: Optional[str] = None,
728
        filter_id: Optional[str] = None,
729
        event: Optional[AlertEvent] = None,
730
        event_data: Optional[dict] = None,
731
        condition: Optional[AlertCondition] = None,
732
        condition_data: Optional[dict] = None,
733
        method: Optional[AlertMethod] = None,
734
        method_data: Optional[dict] = None
735
    ) -> Any:
736
        """Modifies an existing alert.
737
738
        Arguments:
739
            alert_id: UUID of the alert to be modified.
740
            name: Name of the Alert.
741
            condition: The condition that must be satisfied for the alert to
742
                occur. If the event is either 'Updated SecInfo
743
                arrived' or 'New SecInfo arrived', condition must be 'Always'.
744
                Otherwise, condition can also be on of 'Severity at least',
745
                'Filter count changed' or 'Filter count at least'.
746
            condition_data: Data that defines the condition
747
            event: The event that must happen for the alert to occur, one of
748
                'Task run status changed', 'Updated SecInfo arrived' or
749
                'New SecInfo arrived'
750
            event_data: Data that defines the event
751
            method: The method by which the user is alerted, one of 'SCP',
752
                'Send', 'SMB', 'SNMP', 'Syslog' or 'Email';
753
                if the event is neither 'Updated SecInfo arrived' nor
754
                'New SecInfo arrived', method can also be one of 'Start Task',
755
                'HTTP Get', 'Sourcefire Connector' or 'verinice Connector'.
756
            method_data: Data that defines the method
757
            filter_id: Filter to apply when executing alert
758
            comment: Comment for the alert
759
760
        Returns:
761
            The response. See :py:meth:`send_command` for details.
762
        """
763
764
        if not alert_id:
765
            raise RequiredArgument(
766
                function=self.modify_alert.__name__, argument='alert_id'
767
            )
768
769
        cmd = XmlCommand("modify_alert")
770
        cmd.set_attribute("alert_id", str(alert_id))
771
772
        if name:
773
            cmd.add_element("name", name)
774
775
        if comment:
776
            cmd.add_element("comment", comment)
777
778
        if filter_id:
779
            cmd.add_element("filter", attrs={"id": filter_id})
780
781
        if condition:
782
            if not isinstance(condition, AlertCondition):
783
                raise InvalidArgumentType(
784
                    function=self.modify_alert.__name__,
785
                    argument='condition',
786
                    arg_type=AlertCondition.__name__,
787
                )
788
789
            conditions = cmd.add_element("condition", condition.value)
790
791
            if condition_data is not None:
792
                for key, value in condition_data.items():
793
                    _data = conditions.add_element("data", value)
794
                    _data.add_element("name", key)
795
796
        if method:
797
            if not isinstance(method, AlertMethod):
798
                raise InvalidArgumentType(
799
                    function=self.modify_alert.__name__,
800
                    argument='method',
801
                    arg_type=AlertMethod.__name__,
802
                )
803
804
            methods = cmd.add_element("method", method.value)
805
806
            if method_data is not None:
807
                for key, value in method_data.items():
808
                    _data = methods.add_element("data", value)
809
                    _data.add_element("name", key)
810
811
        if event:
812
            if not isinstance(event, AlertEvent):
813
                raise InvalidArgumentType(
814
                    function=self.modify_alert.__name__,
815
                    argument='event',
816
                    arg_type=AlertEvent.__name__,
817
                )
818
819
            _check_event(event, condition, method)
820
821
            events = cmd.add_element("event", event.value)
822
823
            if event_data is not None:
824
                for key, value in event_data.items():
825
                    _data = events.add_element("data", value)
826
                    _data.add_element("name", key)
827
828
        return self._send_xml_command(cmd)
829
830
    def modify_audit(
831
        self,
832
        audit_id: str,
833
        *,
834
        name: Optional[str] = None,
835
        policy_id: Optional[str] = None,
836
        target_id: Optional[str] = None,
837
        scanner_id: Optional[str] = None,
838
        alterable: Optional[bool] = None,
839
        hosts_ordering: Optional[HostsOrdering] = None,
840
        schedule_id: Optional[str] = None,
841
        schedule_periods: Optional[int] = None,
842
        comment: Optional[str] = None,
843
        alert_ids: Optional[List[str]] = None,
844
        observers: Optional[List[str]] = None,
845
        preferences: Optional[dict] = None
846
    ) -> Any:
847
        """Modifies an existing task.
848
849
        Arguments:
850
            audit_id: UUID of audit to modify.
851
            name: The name of the audit.
852
            policy_id: UUID of policy to use by the audit
853
            target_id: UUID of target to be scanned
854
            scanner_id: UUID of scanner to use for scanning the target
855
            comment: The comment on the audit.
856
            alert_ids: List of UUIDs for alerts to be applied to the audit
857
            hosts_ordering: The order hosts are scanned in
858
            schedule_id: UUID of a schedule when the audit should be run.
859
            schedule_periods: A limit to the number of times the audit will be
860
                scheduled, or 0 for no limit.
861
            observers: List of names or ids of users which should be allowed to
862
                observe this audit
863
            preferences: Name/Value pairs of scanner preferences.
864
865
        Returns:
866
            The response. See :py:meth:`send_command` for details.
867
        """
868
        self.modify_task(
869
            task_id=audit_id,
870
            name=name,
871
            config_id=policy_id,
872
            target_id=target_id,
873
            scanner_id=scanner_id,
874
            alterable=alterable,
875
            hosts_ordering=hosts_ordering,
876
            schedule_id=schedule_id,
877
            schedule_periods=schedule_periods,
878
            comment=comment,
879
            alert_ids=alert_ids,
880
            observers=observers,
881
            preferences=preferences,
882
        )
883
884
    def modify_permission(
885
        self,
886
        permission_id: str,
887
        *,
888
        comment: Optional[str] = None,
889
        name: Optional[str] = None,
890
        resource_id: Optional[str] = None,
891
        resource_type: Optional[EntityType] = None,
892
        subject_id: Optional[str] = None,
893
        subject_type: Optional[PermissionSubjectType] = None
894
    ) -> Any:
895
        """Modifies an existing permission.
896
897
        Arguments:
898
            permission_id: UUID of permission to be modified.
899
            comment: The comment on the permission.
900
            name: Permission name, currently the name of a command.
901
            subject_id: UUID of subject to whom the permission is granted
902
            subject_type: Type of the subject user, group or role
903
            resource_id: UUID of entity to which the permission applies
904
            resource_type: Type of the resource. For Super permissions user,
905
                group or role
906
907
        Returns:
908
            The response. See :py:meth:`send_command` for details.
909
        """
910
        if not permission_id:
911
            raise RequiredArgument(
912
                function=self.modify_permission.__name__,
913
                argument='permission_id',
914
            )
915
916
        cmd = XmlCommand("modify_permission")
917
        cmd.set_attribute("permission_id", permission_id)
918
919
        if comment:
920
            cmd.add_element("comment", comment)
921
922
        if name:
923
            cmd.add_element("name", name)
924
925 View Code Duplication
        if resource_id or resource_type:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
926
            if not resource_id:
927
                raise RequiredArgument(
928
                    function=self.modify_permission.__name__,
929
                    argument='resource_id',
930
                )
931
932
            if not resource_type:
933
                raise RequiredArgument(
934
                    function=self.modify_permission.__name__,
935
                    argument='resource_type',
936
                )
937
938
            if not isinstance(resource_type, self.types.EntityType):
939
                raise InvalidArgumentType(
940
                    function=self.modify_permission.__name__,
941
                    argument='resource_type',
942
                    arg_type=self.types.EntityType.__name__,
943
                )
944
945
            _xmlresource = cmd.add_element(
946
                "resource", attrs={"id": resource_id}
947
            )
948
            _actual_resource_type = resource_type
949
            if resource_type.value == EntityType.AUDIT.value:
950
                _actual_resource_type = EntityType.TASK
951
            elif resource_type.value == EntityType.POLICY.value:
952
                _actual_resource_type = EntityType.SCAN_CONFIG
953
            _xmlresource.add_element("type", _actual_resource_type.value)
954
955 View Code Duplication
        if subject_id or subject_type:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
956
            if not subject_id:
957
                raise RequiredArgument(
958
                    function=self.modify_permission.__name__,
959
                    argument='subject_id',
960
                )
961
962
            if not isinstance(subject_type, PermissionSubjectType):
963
                raise InvalidArgumentType(
964
                    function=self.modify_permission.__name__,
965
                    argument='subject_type',
966
                    arg_type=PermissionSubjectType.__name__,
967
                )
968
969
            _xmlsubject = cmd.add_element("subject", attrs={"id": subject_id})
970
            _xmlsubject.add_element("type", subject_type.value)
971
972
        return self._send_xml_command(cmd)
973
974
    def modify_policy_set_nvt_preference(
975
        self,
976
        policy_id: str,
977
        name: str,
978
        nvt_oid: str,
979
        *,
980
        value: Optional[str] = None
981
    ) -> Any:
982
        """Modifies the nvt preferences of an existing policy.
983
984
        Arguments:
985
            policy_id: UUID of policy to modify.
986
            name: Name for preference to change.
987
            nvt_oid: OID of the NVT associated with preference to modify
988
            value: New value for the preference. None to delete the preference
989
                and to use the default instead.
990
        """
991
        self.modify_config_set_nvt_preference(
992
            config_id=policy_id,
993
            name=name,
994
            nvt_oid=nvt_oid,
995
            value=value,
996
        )
997
998
    def modify_policy_set_name(self, policy_id: str, name: str) -> Any:
999
        """Modifies the name of an existing policy
1000
1001
        Arguments:
1002
            config_id: UUID of policy to modify.
1003
            name: New name for the config.
1004
        """
1005
        self.modify_config_set_name(
1006
            config_id=policy_id,
1007
            name=name,
1008
        )
1009
1010
    def modify_policy_set_comment(
1011
        self, policy_id: str, comment: Optional[str] = ""
1012
    ) -> Any:
1013
        """Modifies the comment of an existing policy
1014
1015
        Arguments:
1016
            policy_id: UUID of policy to modify.
1017
            comment: Comment to set on a config. Default: ''
1018
        """
1019
        self.modify_config_set_comment(
1020
            config_id=policy_id,
1021
            comment=comment,
1022
        )
1023
1024
    def modify_policy_set_scanner_preference(
1025
        self, policy_id: str, name: str, *, value: Optional[str] = None
1026
    ) -> Any:
1027
        """Modifies the scanner preferences of an existing policy
1028
1029
        Arguments:
1030
            policy_id: UUID of policy to modify.
1031
            name: Name of the scanner preference to change
1032
            value: New value for the preference. None to delete the preference
1033
                and to use the default instead.
1034
1035
        """
1036
        self.modify_config_set_scanner_preference(
1037
            config_id=policy_id,
1038
            name=name,
1039
            value=value,
1040
        )
1041
1042
    def modify_policy_set_nvt_selection(
1043
        self, policy_id: str, family: str, nvt_oids: List[str]
1044
    ) -> Any:
1045
        """Modifies the selected nvts of an existing policy
1046
1047
        The manager updates the given family in the config to include only the
1048
        given NVTs.
1049
1050
        Arguments:
1051
            policy_id: UUID of policy to modify.
1052
            family: Name of the NVT family to include NVTs from
1053
            nvt_oids: List of NVTs to select for the family.
1054
        """
1055
        self.modify_config_set_nvt_selection(
1056
            config_id=policy_id,
1057
            family=family,
1058
            nvt_oids=nvt_oids,
1059
        )
1060
1061
    def modify_policy_set_family_selection(
1062
        self,
1063
        policy_id: str,
1064
        families: List[str],
1065
        *,
1066
        auto_add_new_families: Optional[bool] = True,
1067
        auto_add_new_nvts: Optional[bool] = True
1068
    ) -> Any:
1069
        """
1070
        Selected the NVTs of a policy at a family level.
1071
1072
        Arguments:
1073
            policy_id: UUID of policy to modify.
1074
            families: List of NVT family names to select.
1075
            auto_add_new_families: Whether new families should be added to the
1076
                policy automatically. Default: True.
1077
            auto_add_new_nvts: Whether new NVTs in the selected families should
1078
                be added to the policy automatically. Default: True.
1079
        """
1080
        self.modify_config_set_family_selection(
1081
            config_id=policy_id,
1082
            families=families,
1083
            auto_add_new_families=auto_add_new_families,
1084
            auto_add_new_nvts=auto_add_new_nvts,
1085
        )
1086
1087
    def modify_tag(
1088
        self,
1089
        tag_id: str,
1090
        *,
1091
        comment: Optional[str] = None,
1092
        name: Optional[str] = None,
1093
        value=None,
1094
        active=None,
1095
        resource_action: Optional[str] = None,
1096
        resource_type: Optional[EntityType] = None,
1097
        resource_filter: Optional[str] = None,
1098
        resource_ids: Optional[List[str]] = None
1099
    ) -> Any:
1100
        """Modifies an existing tag.
1101
1102
        Arguments:
1103
            tag_id: UUID of the tag.
1104
            comment: Comment to add to the tag.
1105
            name: Name of the tag.
1106
            value: Value of the tag.
1107
            active: Whether the tag is active.
1108
            resource_action: Whether to add or remove resources instead of
1109
                overwriting. One of '', 'add', 'set' or 'remove'.
1110
            resource_type: Type of the resources to which to attach the tag.
1111
                Required if resource_filter is set.
1112
            resource_filter: Filter term to select resources the tag is to be
1113
                attached to.
1114
            resource_ids: IDs of the resources to which to attach the tag.
1115
1116
        Returns:
1117
            The response. See :py:meth:`send_command` for details.
1118
        """
1119
        if not tag_id:
1120
            raise RequiredArgument(
1121
                function=self.modify_tag.__name__, argument='tag_id'
1122
            )
1123
1124
        cmd = XmlCommand("modify_tag")
1125
        cmd.set_attribute("tag_id", str(tag_id))
1126
1127
        if comment:
1128
            cmd.add_element("comment", comment)
1129
1130
        if name:
1131
            cmd.add_element("name", name)
1132
1133
        if value:
1134
            cmd.add_element("value", value)
1135
1136
        if active is not None:
1137
            cmd.add_element("active", _to_bool(active))
1138
1139
        if resource_action or resource_filter or resource_ids or resource_type:
1140
            if resource_filter and not resource_type:
1141
                raise RequiredArgument(
1142
                    function=self.modify_tag.__name__, argument='resource_type'
1143
                )
1144
1145
            _xmlresources = cmd.add_element("resources")
1146
            if resource_action is not None:
1147
                _xmlresources.set_attribute("action", resource_action)
1148
1149
            if resource_filter is not None:
1150
                _xmlresources.set_attribute("filter", resource_filter)
1151
1152
            for resource_id in resource_ids or []:
1153
                _xmlresources.add_element(
1154
                    "resource", attrs={"id": str(resource_id)}
1155
                )
1156
1157
            if resource_type is not None:
1158
                if not isinstance(resource_type, self.types.EntityType):
1159
                    raise InvalidArgumentType(
1160
                        function=self.modify_tag.__name__,
1161
                        argument="resource_type",
1162
                        arg_type=EntityType.__name__,
1163
                    )
1164
                _actual_resource_type = resource_type
1165
                if resource_type.value == EntityType.AUDIT.value:
1166
                    _actual_resource_type = EntityType.TASK
1167
                elif resource_type.value == EntityType.POLICY.value:
1168
                    _actual_resource_type = EntityType.SCAN_CONFIG
1169
                _xmlresources.add_element("type", _actual_resource_type.value)
1170
1171
        return self._send_xml_command(cmd)
1172
1173 View Code Duplication
    def modify_tls_certificate(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1174
        self,
1175
        tls_certificate_id: str,
1176
        *,
1177
        name: Optional[str] = None,
1178
        comment: Optional[str] = None,
1179
        trust: Optional[bool] = None
1180
    ) -> Any:
1181
        """Modifies an existing TLS certificate.
1182
1183
        Arguments:
1184
            tls_certificate_id: UUID of the TLS certificate to be modified.
1185
            name: Name of the TLS certificate, defaulting to the MD5 fingerprint
1186
            comment: Comment for the TLS certificate.
1187
            trust: Whether the certificate is trusted.
1188
1189
        Returns:
1190
            The response. See :py:meth:`send_command` for details.
1191
        """
1192
        if not tls_certificate_id:
1193
            raise RequiredArgument(
1194
                function=self.modify_tls_certificate.__name__,
1195
                argument='tls_certificate_id',
1196
            )
1197
1198
        cmd = XmlCommand("modify_tls_certificate")
1199
        cmd.set_attribute("tls_certificate_id", str(tls_certificate_id))
1200
1201
        if comment:
1202
            cmd.add_element("comment", comment)
1203
1204
        if name:
1205
            cmd.add_element("name", name)
1206
1207
        if trust:
1208
            cmd.add_element("trust", _to_bool(trust))
1209
1210
        return self._send_xml_command(cmd)
1211
1212
    def clone_tls_certificate(self, tls_certificate_id: str) -> Any:
1213
        """Modifies an existing TLS certificate.
1214
1215
        Arguments:
1216
            tls_certificate_id: The UUID of an existing TLS certificate
1217
1218
        Returns:
1219
            The response. See :py:meth:`send_command` for details.
1220
        """
1221
        if not tls_certificate_id:
1222
            raise RequiredArgument(
1223
                function=self.clone_tls_certificate.__name__,
1224
                argument='tls_certificate_id',
1225
            )
1226
1227
        cmd = XmlCommand("create_tls_certificate")
1228
1229
        cmd.add_element("copy", tls_certificate_id)
1230
1231
        return self._send_xml_command(cmd)
1232
1233
    def get_configs(
1234
        self,
1235
        *,
1236
        filter: Optional[str] = None,
1237
        filter_id: Optional[str] = None,
1238
        trash: Optional[bool] = None,
1239
        details: Optional[bool] = None,
1240
        families: Optional[bool] = None,
1241
        preferences: Optional[bool] = None,
1242
        tasks: Optional[bool] = None
1243
    ) -> Any:
1244
        """Request a list of scan configs
1245
1246
        Arguments:
1247
            filter: Filter term to use for the query
1248
            filter_id: UUID of an existing filter to use for the query
1249
            trash: Whether to get the trashcan scan configs instead
1250
            details: Whether to get config families, preferences, nvt selectors
1251
                and tasks.
1252
            families: Whether to include the families if no details are
1253
                requested
1254
            preferences: Whether to include the preferences if no details are
1255
                requested
1256
            tasks: Whether to get tasks using this config
1257
1258
        Returns:
1259
            The response. See :py:meth:`send_command` for details.
1260
        """
1261
        return self.__get_configs(
1262
            UsageType.SCAN,
1263
            filter=filter,
1264
            filter_id=filter_id,
1265
            trash=trash,
1266
            details=details,
1267
            families=families,
1268
            preferences=preferences,
1269
            tasks=tasks,
1270
        )
1271
1272
    def get_policies(
1273
        self,
1274
        *,
1275
        audits: Optional[bool] = None,
1276
        filter: Optional[str] = None,
1277
        filter_id: Optional[str] = None,
1278
        details: Optional[bool] = None,
1279
        families: Optional[bool] = None,
1280
        preferences: Optional[bool] = None,
1281
        trash: Optional[bool] = None
1282
    ) -> Any:
1283
        """Request a list of policies
1284
1285
        Arguments:
1286
            audits: Whether to get audits using the policy
1287
            filter: Filter term to use for the query
1288
            filter_id: UUID of an existing filter to use for the query
1289
            details: Whether to get  families, preferences, nvt selectors
1290
                and tasks.
1291
            families: Whether to include the families if no details are
1292
                requested
1293
            preferences: Whether to include the preferences if no details are
1294
                requested
1295
            trash: Whether to get the trashcan audits instead
1296
1297
        Returns:
1298
            The response. See :py:meth:`send_command` for details.
1299
        """
1300
        return self.__get_configs(
1301
            UsageType.POLICY,
1302
            filter=filter,
1303
            filter_id=filter_id,
1304
            details=details,
1305
            families=families,
1306
            preferences=preferences,
1307
            tasks=audits,
1308
            trash=trash,
1309
        )
1310
1311
    def get_config(
1312
        self, config_id: str, *, tasks: Optional[bool] = None
1313
    ) -> Any:
1314
        """Request a single scan config
1315
1316
        Arguments:
1317
            config_id: UUID of an existing scan config
1318
            tasks: Whether to get tasks using this config
1319
1320
        Returns:
1321
            The response. See :py:meth:`send_command` for details.
1322
        """
1323
        return self.__get_config(
1324
            config_id=config_id, usage_type=UsageType.SCAN, tasks=tasks
1325
        )
1326
1327
    def get_policy(
1328
        self, policy_id: str, *, audits: Optional[bool] = None
1329
    ) -> Any:
1330
        """Request a single policy
1331
1332
        Arguments:
1333
            policy_id: UUID of an existing policy
1334
            audits: Whether to get audits using this config
1335
1336
        Returns:
1337
            The response. See :py:meth:`send_command` for details.
1338
        """
1339
        return self.__get_config(policy_id, UsageType.POLICY, tasks=audits)
1340
1341
    def get_tasks(
1342
        self,
1343
        *,
1344
        filter: Optional[str] = None,
1345
        filter_id: Optional[str] = None,
1346
        trash: Optional[bool] = None,
1347
        details: Optional[bool] = None,
1348
        schedules_only: Optional[bool] = None
1349
    ) -> Any:
1350
        """Request a list of tasks
1351
1352
        Arguments:
1353
            filter: Filter term to use for the query
1354
            filter_id: UUID of an existing filter to use for the query
1355
            trash: Whether to get the trashcan tasks instead
1356
            details: Whether to include full task details
1357
            schedules_only: Whether to only include id, name and schedule
1358
                details
1359
1360
        Returns:
1361
            The response. See :py:meth:`send_command` for details.
1362
        """
1363
        return self.__get_tasks(
1364
            UsageType.SCAN,
1365
            filter=filter,
1366
            filter_id=filter_id,
1367
            trash=trash,
1368
            details=details,
1369
            schedules_only=schedules_only,
1370
        )
1371
1372
    def get_audits(
1373
        self,
1374
        *,
1375
        filter: Optional[str] = None,
1376
        filter_id: Optional[str] = None,
1377
        trash: Optional[bool] = None,
1378
        details: Optional[bool] = None,
1379
        schedules_only: Optional[bool] = None
1380
    ) -> Any:
1381
        """Request a list of audits
1382
1383
        Arguments:
1384
            filter: Filter term to use for the query
1385
            filter_id: UUID of an existing filter to use for the query
1386
            trash: Whether to get the trashcan audits instead
1387
            details: Whether to include full audit details
1388
            schedules_only: Whether to only include id, name and schedule
1389
                details
1390
1391
        Returns:
1392
            The response. See :py:meth:`send_command` for details.
1393
        """
1394
        return self.__get_tasks(
1395
            UsageType.AUDIT,
1396
            filter=filter,
1397
            filter_id=filter_id,
1398
            trash=trash,
1399
            details=details,
1400
            schedules_only=schedules_only,
1401
        )
1402
1403
    def get_task(self, task_id: str) -> Any:
1404
        """Request a single task
1405
1406
        Arguments:
1407
            task_id: UUID of an existing task
1408
1409
        Returns:
1410
            The response. See :py:meth:`send_command` for details.
1411
        """
1412
        return self.__get_task(task_id, UsageType.SCAN)
1413
1414
    def get_audit(self, audit_id: str) -> Any:
1415
        """Request a single audit
1416
1417
        Arguments:
1418
            audit_id: UUID of an existing audit
1419
1420
        Returns:
1421
            The response. See :py:meth:`send_command` for details.
1422
        """
1423
        return self.__get_task(audit_id, UsageType.AUDIT)
1424
1425
    def clone_audit(self, audit_id: str) -> Any:
1426
        """Clone an existing audit
1427
1428
        Arguments:
1429
            audit_id: UUID of existing audit to clone from
1430
1431
        Returns:
1432
            The response. See :py:meth:`send_command` for details.
1433
        """
1434
        if not audit_id:
1435
            raise RequiredArgument(
1436
                function=self.clone_audit.__name__, argument='audit_id'
1437
            )
1438
1439
        cmd = XmlCommand("create_task")
1440
        cmd.add_element("copy", audit_id)
1441
        return self._send_xml_command(cmd)
1442
1443
    def clone_policy(self, policy_id: str) -> Any:
1444
        """Clone a policy from an existing one
1445
1446
        Arguments:
1447
            policy_id: UUID of the existing policy
1448
1449
        Returns:
1450
            The response. See :py:meth:`send_command` for details.
1451
        """
1452
        if not policy_id:
1453
            raise RequiredArgument(
1454
                function=self.clone_policy.__name__, argument='policy_id'
1455
            )
1456
1457
        cmd = XmlCommand("create_config")
1458
        cmd.add_element("copy", policy_id)
1459
        return self._send_xml_command(cmd)
1460
1461
    def delete_audit(
1462
        self, audit_id: str, *, ultimate: Optional[bool] = False
1463
    ) -> Any:
1464
        """Deletes an existing audit
1465
1466
        Arguments:
1467
            audit_id: UUID of the audit to be deleted.
1468
            ultimate: Whether to remove entirely, or to the trashcan.
1469
        """
1470
        if not audit_id:
1471
            raise RequiredArgument(
1472
                function=self.delete_audit.__name__, argument='audit_id'
1473
            )
1474
1475
        cmd = XmlCommand("delete_task")
1476
        cmd.set_attribute("task_id", audit_id)
1477
        cmd.set_attribute("ultimate", _to_bool(ultimate))
1478
1479
        return self._send_xml_command(cmd)
1480
1481
    def delete_policy(
1482
        self, policy_id: str, *, ultimate: Optional[bool] = False
1483
    ) -> Any:
1484
        """Deletes an existing policy
1485
1486
        Arguments:
1487
            policy_id: UUID of the policy to be deleted.
1488
            ultimate: Whether to remove entirely, or to the trashcan.
1489
        """
1490
        if not policy_id:
1491
            raise RequiredArgument(
1492
                function=self.delete_policy.__name__, argument='policy_id'
1493
            )
1494
1495
        cmd = XmlCommand("delete_config")
1496
        cmd.set_attribute("config_id", policy_id)
1497
        cmd.set_attribute("ultimate", _to_bool(ultimate))
1498
1499
        return self._send_xml_command(cmd)
1500
1501
    def delete_tls_certificate(self, tls_certificate_id: str) -> Any:
1502
        """Deletes an existing tls certificate
1503
1504
        Arguments:
1505
            tls_certificate_id: UUID of the tls certificate to be deleted.
1506
        """
1507
        if not tls_certificate_id:
1508
            raise RequiredArgument(
1509
                function=self.delete_tls_certificate.__name__,
1510
                argument='tls_certificate_id',
1511
            )
1512
1513
        cmd = XmlCommand("delete_tls_certificate")
1514
        cmd.set_attribute("tls_certificate_id", tls_certificate_id)
1515
1516
        return self._send_xml_command(cmd)
1517
1518 View Code Duplication
    def __create_task(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1519
        self,
1520
        name: str,
1521
        config_id: str,
1522
        target_id: str,
1523
        scanner_id: str,
1524
        usage_type: UsageType,
1525
        function: str,
1526
        *,
1527
        alterable: Optional[bool] = None,
1528
        hosts_ordering: Optional[HostsOrdering] = None,
1529
        schedule_id: Optional[str] = None,
1530
        alert_ids: Optional[List[str]] = None,
1531
        comment: Optional[str] = None,
1532
        schedule_periods: Optional[int] = None,
1533
        observers: Optional[List[str]] = None,
1534
        preferences: Optional[dict] = None
1535
    ) -> Any:
1536
        if not name:
1537
            raise RequiredArgument(function=function, argument='name')
1538
1539
        if not config_id:
1540
            raise RequiredArgument(function=function, argument='config_id')
1541
1542
        if not target_id:
1543
            raise RequiredArgument(function=function, argument='target_id')
1544
1545
        if not scanner_id:
1546
            raise RequiredArgument(function=function, argument='scanner_id')
1547
1548
        # don't allow to create a container task with create_task
1549
        if target_id == '0':
1550
            raise InvalidArgument(function=function, argument='target_id')
1551
1552
        cmd = XmlCommand("create_task")
1553
        cmd.add_element("name", name)
1554
        cmd.add_element("usage_type", usage_type.value)
1555
        cmd.add_element("config", attrs={"id": config_id})
1556
        cmd.add_element("target", attrs={"id": target_id})
1557
        cmd.add_element("scanner", attrs={"id": scanner_id})
1558
1559
        if comment:
1560
            cmd.add_element("comment", comment)
1561
1562
        if alterable is not None:
1563
            cmd.add_element("alterable", _to_bool(alterable))
1564
1565
        if hosts_ordering:
1566
            if not isinstance(hosts_ordering, self.types.HostsOrdering):
1567
                raise InvalidArgumentType(
1568
                    function=function,
1569
                    argument='hosts_ordering',
1570
                    arg_type=HostsOrdering.__name__,
1571
                )
1572
            cmd.add_element("hosts_ordering", hosts_ordering.value)
1573
1574
        if alert_ids:
1575
            if isinstance(alert_ids, str):
1576
                deprecation(
1577
                    "Please pass a list as alert_ids parameter to {}. "
1578
                    "Passing a string is deprecated and will be removed in "
1579
                    "future.".format(function)
1580
                )
1581
1582
                # if a single id is given as a string wrap it into a list
1583
                alert_ids = [alert_ids]
1584
            if _is_list_like(alert_ids):
1585
                # parse all given alert id's
1586
                for alert in alert_ids:
1587
                    cmd.add_element("alert", attrs={"id": str(alert)})
1588
1589
        if schedule_id:
1590
            cmd.add_element("schedule", attrs={"id": schedule_id})
1591
1592
            if schedule_periods is not None:
1593
                if (
1594
                    not isinstance(schedule_periods, numbers.Integral)
1595
                    or schedule_periods < 0
1596
                ):
1597
                    raise InvalidArgument(
1598
                        "schedule_periods must be an integer greater or equal "
1599
                        "than 0"
1600
                    )
1601
                cmd.add_element("schedule_periods", str(schedule_periods))
1602
1603
        if observers is not None:
1604
            if not _is_list_like(observers):
1605
                raise InvalidArgumentType(
1606
                    function=function, argument='observers', arg_type='list'
1607
                )
1608
1609
            # gvmd splits by comma and space
1610
            # gvmd tries to lookup each value as user name and afterwards as
1611
            # user id. So both user name and user id are possible
1612
            cmd.add_element("observers", _to_comma_list(observers))
1613
1614
        if preferences is not None:
1615
            if not isinstance(preferences, collections.abc.Mapping):
1616
                raise InvalidArgumentType(
1617
                    function=function,
1618
                    argument='preferences',
1619
                    arg_type=collections.abc.Mapping.__name__,
1620
                )
1621
1622
            _xmlprefs = cmd.add_element("preferences")
1623
            for pref_name, pref_value in preferences.items():
1624
                _xmlpref = _xmlprefs.add_element("preference")
1625
                _xmlpref.add_element("scanner_name", pref_name)
1626
                _xmlpref.add_element("value", str(pref_value))
1627
1628
        return self._send_xml_command(cmd)
1629
1630
    def __create_config(
1631
        self,
1632
        config_id: str,
1633
        name: str,
1634
        usage_type: UsageType,
1635
        function: str,
1636
        *,
1637
        comment: Optional[str] = None
1638
    ) -> Any:
1639
        if not name:
1640
            raise RequiredArgument(function=function, argument='name')
1641
1642
        if not config_id:
1643
            raise RequiredArgument(function=function, argument='config_id')
1644
1645
        cmd = XmlCommand("create_config")
1646
        if comment is not None:
1647
            cmd.add_element("comment", comment)
1648
        cmd.add_element("copy", config_id)
1649
        cmd.add_element("name", name)
1650
        cmd.add_element("usage_type", usage_type.value)
1651
        return self._send_xml_command(cmd)
1652
1653
    def __create_config_from_osp_scanner(
1654
        self,
1655
        scanner_id: str,
1656
        name: str,
1657
        usage_type: UsageType,
1658
        function: str,
1659
        *,
1660
        comment: Optional[str] = None
1661
    ) -> Any:
1662
        if not name:
1663
            raise RequiredArgument(function=function, argument='name')
1664
1665
        if not scanner_id:
1666
            raise RequiredArgument(function=function, argument='scanner_id')
1667
1668
        cmd = XmlCommand("create_config")
1669
        if comment is not None:
1670
            cmd.add_element("comment", comment)
1671
        cmd.add_element("scanner", scanner_id)
1672
        cmd.add_element("name", name)
1673
        cmd.add_element("usage_type", usage_type.value)
1674
        return self._send_xml_command(cmd)
1675
1676 View Code Duplication
    def __get_configs(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1677
        self,
1678
        usage_type: UsageType,
1679
        *,
1680
        filter: Optional[str] = None,
1681
        filter_id: Optional[str] = None,
1682
        trash: Optional[bool] = None,
1683
        details: Optional[bool] = None,
1684
        families: Optional[bool] = None,
1685
        preferences: Optional[bool] = None,
1686
        tasks: Optional[bool] = None
1687
    ) -> Any:
1688
        cmd = XmlCommand("get_configs")
1689
        cmd.set_attribute("usage_type", usage_type.value)
1690
1691
        _add_filter(cmd, filter, filter_id)
1692
1693
        if trash is not None:
1694
            cmd.set_attribute("trash", _to_bool(trash))
1695
1696
        if details is not None:
1697
            cmd.set_attribute("details", _to_bool(details))
1698
1699
        if families is not None:
1700
            cmd.set_attribute("families", _to_bool(families))
1701
1702
        if preferences is not None:
1703
            cmd.set_attribute("preferences", _to_bool(preferences))
1704
1705
        if tasks is not None:
1706
            cmd.set_attribute("tasks", _to_bool(tasks))
1707
1708
        return self._send_xml_command(cmd)
1709
1710
    def __get_config(
1711
        self,
1712
        config_id: str,
1713
        usage_type: UsageType,
1714
        *,
1715
        tasks: Optional[bool] = None
1716
    ) -> Any:
1717
        if not config_id:
1718
            raise RequiredArgument(
1719
                function=self.get_config.__name__, argument='config_id'
1720
            )
1721
1722
        cmd = XmlCommand("get_configs")
1723
        cmd.set_attribute("config_id", config_id)
1724
1725
        cmd.set_attribute("usage_type", usage_type.value)
1726
1727
        if tasks is not None:
1728
            cmd.set_attribute("tasks", _to_bool(tasks))
1729
1730
        # for single entity always request all details
1731
        cmd.set_attribute("details", "1")
1732
1733
        return self._send_xml_command(cmd)
1734
1735
    def __get_tasks(
1736
        self,
1737
        usage_type: UsageType,
1738
        *,
1739
        filter: Optional[str] = None,
1740
        filter_id: Optional[str] = None,
1741
        trash: Optional[bool] = None,
1742
        details: Optional[bool] = None,
1743
        schedules_only: Optional[bool] = None
1744
    ) -> Any:
1745
        cmd = XmlCommand("get_tasks")
1746
        cmd.set_attribute("usage_type", usage_type.value)
1747
1748
        _add_filter(cmd, filter, filter_id)
1749
1750
        if trash is not None:
1751
            cmd.set_attribute("trash", _to_bool(trash))
1752
1753
        if details is not None:
1754
            cmd.set_attribute("details", _to_bool(details))
1755
1756
        if schedules_only is not None:
1757
            cmd.set_attribute("schedules_only", _to_bool(schedules_only))
1758
1759
        return self._send_xml_command(cmd)
1760
1761
    def __get_task(self, task_id: str, usage_type: UsageType) -> Any:
1762
        if not task_id:
1763
            raise RequiredArgument(
1764
                function=self.get_task.__name__, argument='task_id'
1765
            )
1766
1767
        cmd = XmlCommand("get_tasks")
1768
        cmd.set_attribute("task_id", task_id)
1769
        cmd.set_attribute("usage_type", usage_type.value)
1770
1771
        # for single entity always request all details
1772
        cmd.set_attribute("details", "1")
1773
        return self._send_xml_command(cmd)
1774
1775
    def resume_audit(self, audit_id: str) -> Any:
1776
        """Resume an existing stopped audit
1777
1778
        Arguments:
1779
            audit_id: UUID of the audit to be resumed
1780
1781
        Returns:
1782
            The response. See :py:meth:`send_command` for details.
1783
        """
1784
        if not audit_id:
1785
            raise RequiredArgument(
1786
                function=self.resume_audit.__name__, argument='audit_id'
1787
            )
1788
1789
        cmd = XmlCommand("resume_task")
1790
        cmd.set_attribute("task_id", audit_id)
1791
1792
        return self._send_xml_command(cmd)
1793
1794
    def start_audit(self, audit_id: str) -> Any:
1795
        """Start an existing audit
1796
1797
        Arguments:
1798
            audit_id: UUID of the audit to be started
1799
1800
        Returns:
1801
            The response. See :py:meth:`send_command` for details.
1802
        """
1803
        if not audit_id:
1804
            raise RequiredArgument(
1805
                function=self.start_audit.__name__, argument='audit_id'
1806
            )
1807
1808
        cmd = XmlCommand("start_task")
1809
        cmd.set_attribute("task_id", audit_id)
1810
1811
        return self._send_xml_command(cmd)
1812
1813
    def stop_audit(self, audit_id: str) -> Any:
1814
        """Stop an existing running audit
1815
1816
        Arguments:
1817
            audit_id: UUID of the audit to be stopped
1818
1819
        Returns:
1820
            The response. See :py:meth:`send_command` for details.
1821
        """
1822
        if not audit_id:
1823
            raise RequiredArgument(
1824
                function=self.stop_audit.__name__, argument='audit_id'
1825
            )
1826
1827
        cmd = XmlCommand("stop_task")
1828
        cmd.set_attribute("task_id", audit_id)
1829
1830
        return self._send_xml_command(cmd)
1831