Passed
Pull Request — master (#359)
by
unknown
01:15
created

GmpV9Mixin.modify_audit()   A

Complexity

Conditions 1

Size

Total Lines 52
Code Lines 30

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 30
nop 15
dl 0
loc 52
rs 9.16
c 0
b 0
f 0

How to fix   Long Method    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 - 2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
# pylint: disable=arguments-differ, redefined-builtin, too-many-lines
20
21
"""
22
Module for communication with gvmd in `Greenbone Management Protocol version 9`_
23
24
.. _Greenbone Management Protocol version 9:
25
    https://docs.greenbone.net/API/GMP/gmp-9.0.html
26
"""
27
import collections
28
import numbers
29
30
from typing import Any, List, Optional, Callable, Tuple
31
32
from gvm.errors import InvalidArgument, InvalidArgumentType, RequiredArgument
33
from gvm.utils import deprecation
34
from gvm.xml import XmlCommand
35
36
from gvm.protocols.base import GvmProtocol
37
from gvm.connections import GvmConnection
38
from gvm.protocols.gmpv7.gmpv7 import (
39
    _to_bool,
40
    _add_filter,
41
    _is_list_like,
42
    _to_comma_list,
43
)
44
45
from . import types
46
from .types import *  # pylint: disable=unused-wildcard-import, wildcard-import
47
from .types import _UsageType as UsageType
48
49
_EMPTY_POLICY_ID = '085569ce-73ed-11df-83c3-002264764cea'
50
51
52
def _check_event(
53
    event: AlertEvent, condition: AlertCondition, method: AlertMethod
54
):
55
    if event == AlertEvent.TASK_RUN_STATUS_CHANGED:
56
        if not condition:
57
            raise RequiredArgument(
58
                "condition is required for event {}".format(event.name)
59
            )
60
61
        if not method:
62
            raise RequiredArgument(
63
                "method is required for event {}".format(event.name)
64
            )
65
66
        if condition not in (
67
            AlertCondition.ALWAYS,
68
            AlertCondition.FILTER_COUNT_CHANGED,
69
            AlertCondition.FILTER_COUNT_AT_LEAST,
70
            AlertCondition.SEVERITY_AT_LEAST,
71
            AlertCondition.SEVERITY_CHANGED,
72
        ):
73
            raise InvalidArgument(
74
                "Invalid condition {} for event {}".format(
75
                    condition.name, event.name
76
                )
77
            )
78
    elif event in (
79
        AlertEvent.NEW_SECINFO_ARRIVED,
80
        AlertEvent.UPDATED_SECINFO_ARRIVED,
81
    ):
82
        if not condition:
83
            raise RequiredArgument(
84
                "condition is required for event {}".format(event.name)
85
            )
86
87
        if not method:
88
            raise RequiredArgument(
89
                "method is required for event {}".format(event.name)
90
            )
91
92
        if condition != AlertCondition.ALWAYS:
93
            raise InvalidArgument(
94
                "Invalid condition {} for event {}".format(
95
                    condition.name, event.name
96
                )
97
            )
98
        if method not in (
99
            AlertMethod.SCP,
100
            AlertMethod.SEND,
101
            AlertMethod.SMB,
102
            AlertMethod.SNMP,
103
            AlertMethod.SYSLOG,
104
            AlertMethod.EMAIL,
105
        ):
106
            raise InvalidArgument(
107
                "Invalid method {} for event {}".format(method.name, event.name)
108
            )
109
    elif event in (
110
        AlertEvent.TICKET_RECEIVED,
111
        AlertEvent.OWNED_TICKET_CHANGED,
112
        AlertEvent.ASSIGNED_TICKET_CHANGED,
113
    ):
114
        if not condition:
115
            raise RequiredArgument(
116
                "condition is required for event {}".format(event.name)
117
            )
118
119
        if not method:
120
            raise RequiredArgument(
121
                "method is required for event {}".format(event.name)
122
            )
123
        if condition != AlertCondition.ALWAYS:
124
            raise InvalidArgument(
125
                "Invalid condition {} for event {}".format(
126
                    condition.name, event.name
127
                )
128
            )
129
        if method not in (
130
            AlertMethod.EMAIL,
131
            AlertMethod.START_TASK,
132
            AlertMethod.SYSLOG,
133
        ):
134
            raise InvalidArgument(
135
                "Invalid method {} for event {}".format(method.name, event.name)
136
            )
137
    elif event is not None:
138
        raise InvalidArgument('Invalid event "{}"'.format(event.name))
139
140
141
class GmpV9Mixin(GvmProtocol):
142
143
    types = types
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable types does not seem to be defined.
Loading history...
144
145
    def __init__(
146
        self,
147
        connection: GvmConnection,
148
        *,
149
        transform: Optional[Callable[[str], Any]] = None
150
    ):
151
        super().__init__(connection, transform=transform)
152
153
        # Is authenticated on gvmd
154
        self._authenticated = False
155
156 View Code Duplication
    def create_alert(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
157
        self,
158
        name: str,
159
        condition: AlertCondition,
160
        event: AlertEvent,
161
        method: AlertMethod,
162
        *,
163
        method_data: Optional[dict] = None,
164
        event_data: Optional[dict] = None,
165
        condition_data: Optional[dict] = None,
166
        filter_id: Optional[int] = None,
167
        comment: Optional[str] = None
168
    ) -> Any:
169
        """Create a new alert
170
171
        Arguments:
172
            name: Name of the new Alert
173
            condition: The condition that must be satisfied for the alert
174
                to occur; if the event is either 'Updated SecInfo arrived' or
175
                'New SecInfo arrived', condition must be 'Always'. Otherwise,
176
                condition can also be on of 'Severity at least', 'Filter count
177
                changed' or 'Filter count at least'.
178
            event: The event that must happen for the alert to occur, one
179
                of 'Task run status changed', 'Updated SecInfo arrived' or 'New
180
                SecInfo arrived'
181
            method: The method by which the user is alerted, one of 'SCP',
182
                'Send', 'SMB', 'SNMP', 'Syslog' or 'Email'; if the event is
183
                neither 'Updated SecInfo arrived' nor 'New SecInfo arrived',
184
                method can also be one of 'Start Task', 'HTTP Get', 'Sourcefire
185
                Connector' or 'verinice Connector'.
186
            condition_data: Data that defines the condition
187
            event_data: Data that defines the event
188
            method_data: Data that defines the method
189
            filter_id: Filter to apply when executing alert
190
            comment: Comment for the alert
191
192
        Returns:
193
            The response. See :py:meth:`send_command` for details.
194
        """
195
        if not name:
196
            raise RequiredArgument(
197
                function=self.create_alert.__name__, argument='name'
198
            )
199
200
        if not condition:
201
            raise RequiredArgument(
202
                function=self.create_alert.__name__, argument='condition'
203
            )
204
205
        if not event:
206
            raise RequiredArgument(
207
                function=self.create_alert.__name__, argument='event'
208
            )
209
210
        if not method:
211
            raise RequiredArgument(
212
                function=self.create_alert.__name__, argument='method'
213
            )
214
215
        if not isinstance(condition, AlertCondition):
216
            raise InvalidArgumentType(
217
                function=self.create_alert.__name__,
218
                argument='condition',
219
                arg_type=AlertCondition.__name__,
220
            )
221
222
        if not isinstance(event, AlertEvent):
223
            raise InvalidArgumentType(
224
                function=self.create_alert.__name__,
225
                argument='even',
226
                arg_type=AlertEvent.__name__,
227
            )
228
229
        if not isinstance(method, AlertMethod):
230
            raise InvalidArgumentType(
231
                function=self.create_alert.__name__,
232
                argument='method',
233
                arg_type=AlertMethod.__name__,
234
            )
235
236
        _check_event(event, condition, method)
237
238
        cmd = XmlCommand("create_alert")
239
        cmd.add_element("name", name)
240
241
        conditions = cmd.add_element("condition", condition.value)
242
243
        if condition_data is not None:
244
            for key, value in condition_data.items():
245
                _data = conditions.add_element("data", value)
246
                _data.add_element("name", key)
247
248
        events = cmd.add_element("event", event.value)
249
250
        if event_data is not None:
251
            for key, value in event_data.items():
252
                _data = events.add_element("data", value)
253
                _data.add_element("name", key)
254
255
        methods = cmd.add_element("method", method.value)
256
257
        if method_data is not None:
258
            for key, value in method_data.items():
259
                _data = methods.add_element("data", value)
260
                _data.add_element("name", key)
261
262
        if filter_id:
263
            cmd.add_element("filter", attrs={"id": filter_id})
264
265
        if comment:
266
            cmd.add_element("comment", comment)
267
268
        return self._send_xml_command(cmd)
269
270 View Code Duplication
    def create_audit(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
271
        self,
272
        name: str,
273
        policy_id: str,
274
        target_id: str,
275
        scanner_id: str,
276
        *,
277
        alterable: Optional[bool] = None,
278
        hosts_ordering: Optional[HostsOrdering] = None,
279
        schedule_id: Optional[str] = None,
280
        alert_ids: Optional[List[str]] = None,
281
        comment: Optional[str] = None,
282
        schedule_periods: Optional[int] = None,
283
        observers: Optional[List[str]] = None,
284
        preferences: Optional[dict] = None
285
    ) -> Any:
286
        """Create a new audit task
287
288
        Arguments:
289
            name: Name of the new audit
290
            policy_id: UUID of policy to use by the audit
291
            target_id: UUID of target to be scanned
292
            scanner_id: UUID of scanner to use for scanning the target
293
            comment: Comment for the audit
294
            alterable: Whether the task should be alterable
295
            alert_ids: List of UUIDs for alerts to be applied to the audit
296
            hosts_ordering: The order hosts are scanned in
297
            schedule_id: UUID of a schedule when the audit should be run.
298
            schedule_periods: A limit to the number of times the audit will be
299
                scheduled, or 0 for no limit
300
            observers: List of names or ids of users which should be allowed to
301
                observe this audit
302
            preferences: Name/Value pairs of scanner preferences.
303
304
        Returns:
305
            The response. See :py:meth:`send_command` for details.
306
        """
307
308
        return self.__create_task(
309
            name=name,
310
            config_id=policy_id,
311
            target_id=target_id,
312
            scanner_id=scanner_id,
313
            usage_type=UsageType.AUDIT,
314
            function=self.create_audit.__name__,
315
            alterable=alterable,
316
            hosts_ordering=hosts_ordering,
317
            schedule_id=schedule_id,
318
            alert_ids=alert_ids,
319
            comment=comment,
320
            schedule_periods=schedule_periods,
321
            observers=observers,
322
            preferences=preferences,
323
        )
324
325
    def create_config(
326
        self, config_id: str, name: str, *, comment: Optional[str] = None
327
    ) -> Any:
328
        """Create a new scan config
329
330
        Arguments:
331
            config_id: UUID of the existing scan config
332
            name: Name of the new scan config
333
            comment: A comment on the config
334
335
        Returns:
336
            The response. See :py:meth:`send_command` for details.
337
        """
338
        return self.__create_config(
339
            config_id=config_id,
340
            name=name,
341
            comment=comment,
342
            usage_type=UsageType.SCAN,
343
            function=self.create_config.__name__,
344
        )
345
346
    def create_config_from_osp_scanner(
347
        self, scanner_id: str, name: str, *, comment: Optional[str] = None
348
    ) -> Any:
349
        """Create a new scan config from an ospd scanner.
350
351
        Create config by retrieving the expected preferences from the given
352
        scanner via OSP.
353
354
        Arguments:
355
            scanner_id: UUID of an OSP scanner to get config data from
356
            name: Name of the new scan config
357
            comment: A comment on the config
358
359
        Returns:
360
            The response. See :py:meth:`send_command` for details.
361
        """
362
        return self.__create_config_from_osp_scanner(
363
            scanner_id=scanner_id,
364
            name=name,
365
            comment=comment,
366
            usage_type=UsageType.SCAN,
367
            function=self.create_config.__name__,
368
        )
369
370
    def create_permission(
371
        self,
372
        name: str,
373
        subject_id: str,
374
        subject_type: PermissionSubjectType,
375
        *,
376
        resource_id: Optional[str] = None,
377
        resource_type: Optional[EntityType] = None,
378
        comment: Optional[str] = None
379
    ) -> Any:
380
        """Create a new permission
381
382
        Arguments:
383
            name: Name of the new permission
384
            subject_id: UUID of subject to whom the permission is granted
385
            subject_type: Type of the subject user, group or role
386
            comment: Comment for the permission
387
            resource_id: UUID of entity to which the permission applies
388
            resource_type: Type of the resource. For Super permissions user,
389
                group or role
390
391
        Returns:
392
            The response. See :py:meth:`send_command` for details.
393
        """
394
        if not name:
395
            raise RequiredArgument(
396
                function=self.create_permission.__name__, argument='name'
397
            )
398
399
        if not subject_id:
400
            raise RequiredArgument(
401
                function=self.create_permission.__name__, argument='subject_id'
402
            )
403
404
        if not isinstance(subject_type, PermissionSubjectType):
405
            raise InvalidArgumentType(
406
                function=self.create_permission.__name__,
407
                argument='subject_type',
408
                arg_type=PermissionSubjectType.__name__,
409
            )
410
411
        cmd = XmlCommand("create_permission")
412
        cmd.add_element("name", name)
413
414
        _xmlsubject = cmd.add_element("subject", attrs={"id": subject_id})
415
        _xmlsubject.add_element("type", subject_type.value)
416
417
        if comment:
418
            cmd.add_element("comment", comment)
419
420 View Code Duplication
        if resource_id or resource_type:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
421
            if not resource_id:
422
                raise RequiredArgument(
423
                    function=self.create_permission.__name__,
424
                    argument='resource_id',
425
                )
426
427
            if not resource_type:
428
                raise RequiredArgument(
429
                    function=self.create_permission.__name__,
430
                    argument='resource_type',
431
                )
432
433
            if not isinstance(resource_type, self.types.EntityType):
434
                raise InvalidArgumentType(
435
                    function=self.create_permission.__name__,
436
                    argument='resource_type',
437
                    arg_type=self.types.EntityType.__name__,
438
                )
439
440
            _xmlresource = cmd.add_element(
441
                "resource", attrs={"id": resource_id}
442
            )
443
444
            _actual_resource_type = resource_type
445
            if resource_type.value == EntityType.AUDIT.value:
446
                _actual_resource_type = EntityType.TASK
447
            elif resource_type.value == EntityType.POLICY.value:
448
                _actual_resource_type = EntityType.SCAN_CONFIG
449
450
            _xmlresource.add_element("type", _actual_resource_type.value)
451
452
        return self._send_xml_command(cmd)
453
454
    def create_policy(
455
        self, name: str, *, policy_id: str = None, comment: Optional[str] = None
456
    ) -> Any:
457
        """Create a new policy config
458
459
        Arguments:
460
            name: Name of the new policy
461
            policy_id: UUID of an existing policy as base. By default the empty
462
                policy is used.
463
            comment: A comment on the policy
464
465
        Returns:
466
            The response. See :py:meth:`send_command` for details.
467
        """
468
        if policy_id is None:
469
            policy_id = _EMPTY_POLICY_ID
470
        return self.__create_config(
471
            config_id=policy_id,
472
            name=name,
473
            comment=comment,
474
            usage_type=UsageType.POLICY,
475
            function=self.create_policy.__name__,
476
        )
477
478
    def create_tag(
479
        self,
480
        name: str,
481
        resource_type: EntityType,
482
        *,
483
        resource_filter: Optional[str] = None,
484
        resource_ids: Optional[List[str]] = None,
485
        value: Optional[str] = None,
486
        comment: Optional[str] = None,
487
        active: Optional[bool] = None
488
    ) -> Any:
489
        """Create a tag.
490
491
        Arguments:
492
            name: Name of the tag. A full tag name consisting of namespace and
493
                predicate e.g. `foo:bar`.
494
            resource_type: Entity type the tag is to be attached to.
495
            resource_filter: Filter term to select resources the tag is to be
496
                attached to. Only one of resource_filter or resource_ids can be
497
                provided.
498
            resource_ids: IDs of the resources the tag is to be attached to.
499
                Only one of resource_filter or resource_ids can be provided.
500
            value: Value associated with the tag.
501
            comment: Comment for the tag.
502
            active: Whether the tag should be active.
503
504
        Returns:
505
            The response. See :py:meth:`send_command` for details.
506
        """
507
        if not name:
508
            raise RequiredArgument(
509
                function=self.create_tag.__name__, argument='name'
510
            )
511
512
        if resource_filter and resource_ids:
513
            raise InvalidArgument(
514
                "create_tag accepts either resource_filter or resource_ids "
515
                "argument",
516
                function=self.create_tag.__name__,
517
            )
518
519
        if not resource_type:
520
            raise RequiredArgument(
521
                function=self.create_tag.__name__, argument='resource_type'
522
            )
523
524
        if not isinstance(resource_type, self.types.EntityType):
525
            raise InvalidArgumentType(
526
                function=self.create_tag.__name__,
527
                argument='resource_type',
528
                arg_type=EntityType.__name__,
529
            )
530
531
        cmd = XmlCommand('create_tag')
532
        cmd.add_element('name', name)
533
534
        _xmlresources = cmd.add_element("resources")
535
        if resource_filter is not None:
536
            _xmlresources.set_attribute("filter", resource_filter)
537
538
        for resource_id in resource_ids or []:
539
            _xmlresources.add_element(
540
                "resource", attrs={"id": str(resource_id)}
541
            )
542
543
        _actual_resource_type = resource_type
544
        if resource_type.value == EntityType.AUDIT.value:
545
            _actual_resource_type = EntityType.TASK
546
        elif resource_type.value == EntityType.POLICY.value:
547
            _actual_resource_type = EntityType.SCAN_CONFIG
548
        _xmlresources.add_element("type", _actual_resource_type.value)
549
550
        if comment:
551
            cmd.add_element("comment", comment)
552
553
        if value:
554
            cmd.add_element("value", value)
555
556
        if active is not None:
557
            if active:
558
                cmd.add_element("active", "1")
559
            else:
560
                cmd.add_element("active", "0")
561
562
        return self._send_xml_command(cmd)
563
564 View Code Duplication
    def create_task(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
565
        self,
566
        name: str,
567
        config_id: str,
568
        target_id: str,
569
        scanner_id: str,
570
        *,
571
        alterable: Optional[bool] = None,
572
        hosts_ordering: Optional[HostsOrdering] = None,
573
        schedule_id: Optional[str] = None,
574
        alert_ids: Optional[List[str]] = None,
575
        comment: Optional[str] = None,
576
        schedule_periods: Optional[int] = None,
577
        observers: Optional[List[str]] = None,
578
        preferences: Optional[dict] = None
579
    ) -> Any:
580
        """Create a new scan task
581
582
        Arguments:
583
            name: Name of the task
584
            config_id: UUID of scan config to use by the task
585
            target_id: UUID of target to be scanned
586
            scanner_id: UUID of scanner to use for scanning the target
587
            comment: Comment for the task
588
            alterable: Whether the task should be alterable
589
            alert_ids: List of UUIDs for alerts to be applied to the task
590
            hosts_ordering: The order hosts are scanned in
591
            schedule_id: UUID of a schedule when the task should be run.
592
            schedule_periods: A limit to the number of times the task will be
593
                scheduled, or 0 for no limit
594
            observers: List of names or ids of users which should be allowed to
595
                observe this task
596
            preferences: Name/Value pairs of scanner preferences.
597
598
        Returns:
599
            The response. See :py:meth:`send_command` for details.
600
        """
601
        return self.__create_task(
602
            name=name,
603
            config_id=config_id,
604
            target_id=target_id,
605
            scanner_id=scanner_id,
606
            usage_type=UsageType.SCAN,
607
            function=self.create_task.__name__,
608
            alterable=alterable,
609
            hosts_ordering=hosts_ordering,
610
            schedule_id=schedule_id,
611
            alert_ids=alert_ids,
612
            comment=comment,
613
            schedule_periods=schedule_periods,
614
            observers=observers,
615
            preferences=preferences,
616
        )
617
618 View Code Duplication
    def create_tls_certificate(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
619
        self,
620
        name: str,
621
        certificate: str,
622
        *,
623
        comment: Optional[str] = None,
624
        trust: Optional[bool] = None
625
    ) -> Any:
626
        """Create a new TLS certificate
627
628
        Arguments:
629
            name: Name of the TLS certificate, defaulting to the MD5
630
                fingerprint.
631
            certificate: The Base64 encoded certificate data (x.509 DER or PEM).
632
            comment: Comment for the TLS certificate.
633
            trust: Whether the certificate is trusted.
634
635
        Returns:
636
            The response. See :py:meth:`send_command` for details.
637
        """
638
        if not name:
639
            raise RequiredArgument(
640
                function=self.create_tls_certificate.__name__, argument='name'
641
            )
642
        if not certificate:
643
            raise RequiredArgument(
644
                function=self.create_tls_certificate.__name__,
645
                argument='certificate',
646
            )
647
648
        cmd = XmlCommand("create_tls_certificate")
649
650
        if comment:
651
            cmd.add_element("comment", comment)
652
653
        cmd.add_element("name", name)
654
        cmd.add_element("certificate", certificate)
655
656
        if trust:
657
            cmd.add_element("trust", _to_bool(trust))
658
659
        return self._send_xml_command(cmd)
660
661
    def get_aggregates(
662
        self,
663
        resource_type: EntityType,
664
        *,
665
        filter: Optional[str] = None,
666
        filter_id: Optional[str] = None,
667
        sort_criteria: Optional[list] = None,
668
        data_columns: Optional[list] = None,
669
        group_column: Optional[str] = None,
670
        subgroup_column: Optional[str] = None,
671
        text_columns: Optional[list] = None,
672
        first_group: Optional[int] = None,
673
        max_groups: Optional[int] = None,
674
        mode: Optional[int] = None,
675
        **kwargs
676
    ) -> Any:
677
        """Request aggregated information on a resource / entity type
678
679
        Additional arguments can be set via the kwargs parameter for backward
680
        compatibility with older versions of python-gvm, but are not validated.
681
682
        Arguments:
683
            resource_type: The entity type to gather data from
684
            filter: Filter term to use for the query
685
            filter_id: UUID of an existing filter to use for the query
686
            sort_criteria: List of sort criteria (dicts that can contain
687
                a field, stat and order)
688
            data_columns: List of fields to aggregate data from
689
            group_column: The field to group the entities by
690
            subgroup_column: The field to further group the entities
691
                inside groups by
692
            text_columns: List of simple text columns which no statistics
693
                are calculated for
694
            first_group: The index of the first aggregate group to return
695
            max_groups: The maximum number of aggregate groups to return,
696
                -1 for all
697
            mode: Special mode for aggregation
698
699
        Returns:
700
            The response. See :py:meth:`send_command` for details.
701
        """
702
        if not resource_type:
703
            raise RequiredArgument(
704
                function=self.get_aggregates.__name__, argument='resource_type'
705
            )
706
707
        if not isinstance(resource_type, self.types.EntityType):
708
            raise InvalidArgumentType(
709
                function=self.get_aggregates.__name__,
710
                argument='resource_type',
711
                arg_type=self.types.EntityType.__name__,
712
            )
713
714
        cmd = XmlCommand('get_aggregates')
715
716
        _actual_resource_type = resource_type
717
        if resource_type.value == EntityType.AUDIT.value:
718
            _actual_resource_type = EntityType.TASK
719
            cmd.set_attribute('usage_type', 'audit')
720
        elif resource_type.value == EntityType.POLICY.value:
721
            _actual_resource_type = EntityType.SCAN_CONFIG
722
            cmd.set_attribute('usage_type', 'policy')
723
        elif resource_type.value == EntityType.SCAN_CONFIG.value:
724
            cmd.set_attribute('usage_type', 'scan')
725
        elif resource_type.value == EntityType.TASK.value:
726
            cmd.set_attribute('usage_type', 'scan')
727
        cmd.set_attribute('type', _actual_resource_type.value)
728
729
        _add_filter(cmd, filter, filter_id)
730
731
        if first_group is not None:
732
            if not isinstance(first_group, int):
733
                raise InvalidArgumentType(
734
                    function=self.get_aggregates.__name__,
735
                    argument='first_group',
736
                    arg_type=int.__name__,
737
                )
738
            cmd.set_attribute('first_group', str(first_group))
739
740
        if max_groups is not None:
741
            if not isinstance(max_groups, int):
742
                raise InvalidArgumentType(
743
                    function=self.get_aggregates.__name__,
744
                    argument='max_groups',
745
                    arg_type=int.__name__,
746
                )
747
            cmd.set_attribute('max_groups', str(max_groups))
748
749
        if sort_criteria is not None:
750
            if not isinstance(sort_criteria, list):
751
                raise InvalidArgumentType(
752
                    function=self.get_aggregates.__name__,
753
                    argument='sort_criteria',
754
                    arg_type=list.__name__,
755
                )
756
            for sort in sort_criteria:
757
                if not isinstance(sort, dict):
758
                    raise InvalidArgumentType(
759
                        function=self.get_aggregates.__name__,
760
                        argument='sort_criteria',
761
                    )
762
763
                sort_elem = cmd.add_element('sort')
764
                if sort.get('field'):
765
                    sort_elem.set_attribute('field', sort.get('field'))
766
767
                if sort.get('stat'):
768
                    if isinstance(sort['stat'], AggregateStatistic):
769
                        sort_elem.set_attribute('stat', sort['stat'].value)
770
                    else:
771
                        stat = get_aggregate_statistic_from_string(sort['stat'])
772
                        sort_elem.set_attribute('stat', stat.value)
773
774
                if sort.get('order'):
775
                    if isinstance(sort['order'], SortOrder):
776
                        sort_elem.set_attribute('order', sort['order'].value)
777
                    else:
778
                        so = get_sort_order_from_string(sort['order'])
779
                        sort_elem.set_attribute('order', so.value)
780
781
        if data_columns is not None:
782
            if not isinstance(data_columns, list):
783
                raise InvalidArgumentType(
784
                    function=self.get_aggregates.__name__,
785
                    argument='data_columns',
786
                    arg_type=list.__name__,
787
                )
788
            for column in data_columns:
789
                cmd.add_element('data_column', column)
790
791
        if group_column is not None:
792
            cmd.set_attribute('group_column', group_column)
793
794
        if subgroup_column is not None:
795
            if not group_column:
796
                raise RequiredArgument(
797
                    '{} requires a group_column argument'
798
                    ' if subgroup_column is given'.format(
799
                        self.get_aggregates.__name__
800
                    ),
801
                    function=self.get_aggregates.__name__,
802
                    argument='subgroup_column',
803
                )
804
            cmd.set_attribute('subgroup_column', subgroup_column)
805
806
        if text_columns is not None:
807
            if not isinstance(text_columns, list):
808
                raise InvalidArgumentType(
809
                    function=self.get_aggregates.__name__,
810
                    argument='text_columns',
811
                    arg_type=list.__name__,
812
                )
813
            for column in text_columns:
814
                cmd.add_element('text_column', column)
815
816
        if mode is not None:
817
            cmd.set_attribute('mode', mode)
818
819
        # Add additional keyword args as attributes for backward compatibility.
820
        cmd.set_attributes(kwargs)
821
822
        return self._send_xml_command(cmd)
823
824 View Code Duplication
    def get_tls_certificates(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
825
        self,
826
        *,
827
        filter: Optional[str] = None,
828
        filter_id: Optional[str] = None,
829
        include_certificate_data: Optional[bool] = None,
830
        details: Optional[bool] = None
831
    ) -> Any:
832
        """Request a list of TLS certificates
833
834
        Arguments:
835
            filter: Filter term to use for the query
836
            filter_id: UUID of an existing filter to use for the query
837
            include_certificate_data: Whether to include the certificate data in
838
                the response
839
840
        Returns:
841
            The response. See :py:meth:`send_command` for details.
842
        """
843
844
        cmd = XmlCommand("get_tls_certificates")
845
846
        _add_filter(cmd, filter, filter_id)
847
848
        if details is not None:
849
            cmd.set_attribute("details", _to_bool(details))
850
851
        if include_certificate_data is not None:
852
            cmd.set_attribute(
853
                "include_certificate_data", _to_bool(include_certificate_data)
854
            )
855
856
        return self._send_xml_command(cmd)
857
858
    def get_tls_certificate(self, tls_certificate_id: str) -> Any:
859
        """Request a single TLS certificate
860
861
        Arguments:
862
            tls_certificate_id: UUID of an existing TLS certificate
863
864
        Returns:
865
            The response. See :py:meth:`send_command` for details.
866
        """
867
        cmd = XmlCommand("get_tls_certificates")
868
869
        if not tls_certificate_id:
870
            raise RequiredArgument(
871
                function=self.get_tls_certificate.__name__,
872
                argument='tls_certificate_id',
873
            )
874
875
        cmd.set_attribute("tls_certificate_id", tls_certificate_id)
876
877
        # for single tls certificate always request cert data
878
        cmd.set_attribute("include_certificate_data", "1")
879
880
        # for single entity always request all details
881
        cmd.set_attribute("details", "1")
882
883
        return self._send_xml_command(cmd)
884
885 View Code Duplication
    def modify_alert(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
886
        self,
887
        alert_id: str,
888
        *,
889
        name: Optional[str] = None,
890
        comment: Optional[str] = None,
891
        filter_id: Optional[str] = None,
892
        event: Optional[AlertEvent] = None,
893
        event_data: Optional[dict] = None,
894
        condition: Optional[AlertCondition] = None,
895
        condition_data: Optional[dict] = None,
896
        method: Optional[AlertMethod] = None,
897
        method_data: Optional[dict] = None
898
    ) -> Any:
899
        """Modifies an existing alert.
900
901
        Arguments:
902
            alert_id: UUID of the alert to be modified.
903
            name: Name of the Alert.
904
            condition: The condition that must be satisfied for the alert to
905
                occur. If the event is either 'Updated SecInfo
906
                arrived' or 'New SecInfo arrived', condition must be 'Always'.
907
                Otherwise, condition can also be on of 'Severity at least',
908
                'Filter count changed' or 'Filter count at least'.
909
            condition_data: Data that defines the condition
910
            event: The event that must happen for the alert to occur, one of
911
                'Task run status changed', 'Updated SecInfo arrived' or
912
                'New SecInfo arrived'
913
            event_data: Data that defines the event
914
            method: The method by which the user is alerted, one of 'SCP',
915
                'Send', 'SMB', 'SNMP', 'Syslog' or 'Email';
916
                if the event is neither 'Updated SecInfo arrived' nor
917
                'New SecInfo arrived', method can also be one of 'Start Task',
918
                'HTTP Get', 'Sourcefire Connector' or 'verinice Connector'.
919
            method_data: Data that defines the method
920
            filter_id: Filter to apply when executing alert
921
            comment: Comment for the alert
922
923
        Returns:
924
            The response. See :py:meth:`send_command` for details.
925
        """
926
927
        if not alert_id:
928
            raise RequiredArgument(
929
                function=self.modify_alert.__name__, argument='alert_id'
930
            )
931
932
        cmd = XmlCommand("modify_alert")
933
        cmd.set_attribute("alert_id", str(alert_id))
934
935
        if name:
936
            cmd.add_element("name", name)
937
938
        if comment:
939
            cmd.add_element("comment", comment)
940
941
        if filter_id:
942
            cmd.add_element("filter", attrs={"id": filter_id})
943
944
        if condition:
945
            if not isinstance(condition, AlertCondition):
946
                raise InvalidArgumentType(
947
                    function=self.modify_alert.__name__,
948
                    argument='condition',
949
                    arg_type=AlertCondition.__name__,
950
                )
951
952
            conditions = cmd.add_element("condition", condition.value)
953
954
            if condition_data is not None:
955
                for key, value in condition_data.items():
956
                    _data = conditions.add_element("data", value)
957
                    _data.add_element("name", key)
958
959
        if method:
960
            if not isinstance(method, AlertMethod):
961
                raise InvalidArgumentType(
962
                    function=self.modify_alert.__name__,
963
                    argument='method',
964
                    arg_type=AlertMethod.__name__,
965
                )
966
967
            methods = cmd.add_element("method", method.value)
968
969
            if method_data is not None:
970
                for key, value in method_data.items():
971
                    _data = methods.add_element("data", value)
972
                    _data.add_element("name", key)
973
974
        if event:
975
            if not isinstance(event, AlertEvent):
976
                raise InvalidArgumentType(
977
                    function=self.modify_alert.__name__,
978
                    argument='event',
979
                    arg_type=AlertEvent.__name__,
980
                )
981
982
            _check_event(event, condition, method)
983
984
            events = cmd.add_element("event", event.value)
985
986
            if event_data is not None:
987
                for key, value in event_data.items():
988
                    _data = events.add_element("data", value)
989
                    _data.add_element("name", key)
990
991
        return self._send_xml_command(cmd)
992
993
    def modify_audit(
994
        self,
995
        audit_id: str,
996
        *,
997
        name: Optional[str] = None,
998
        policy_id: Optional[str] = None,
999
        target_id: Optional[str] = None,
1000
        scanner_id: Optional[str] = None,
1001
        alterable: Optional[bool] = None,
1002
        hosts_ordering: Optional[HostsOrdering] = None,
1003
        schedule_id: Optional[str] = None,
1004
        schedule_periods: Optional[int] = None,
1005
        comment: Optional[str] = None,
1006
        alert_ids: Optional[List[str]] = None,
1007
        observers: Optional[List[str]] = None,
1008
        preferences: Optional[dict] = None
1009
    ) -> Any:
1010
        """Modifies an existing task.
1011
1012
        Arguments:
1013
            audit_id: UUID of audit to modify.
1014
            name: The name of the audit.
1015
            policy_id: UUID of policy to use by the audit
1016
            target_id: UUID of target to be scanned
1017
            scanner_id: UUID of scanner to use for scanning the target
1018
            comment: The comment on the audit.
1019
            alert_ids: List of UUIDs for alerts to be applied to the audit
1020
            hosts_ordering: The order hosts are scanned in
1021
            schedule_id: UUID of a schedule when the audit should be run.
1022
            schedule_periods: A limit to the number of times the audit will be
1023
                scheduled, or 0 for no limit.
1024
            observers: List of names or ids of users which should be allowed to
1025
                observe this audit
1026
            preferences: Name/Value pairs of scanner preferences.
1027
1028
        Returns:
1029
            The response. See :py:meth:`send_command` for details.
1030
        """
1031
        self.modify_task(
1032
            task_id=audit_id,
1033
            name=name,
1034
            config_id=policy_id,
1035
            target_id=target_id,
1036
            scanner_id=scanner_id,
1037
            alterable=alterable,
1038
            hosts_ordering=hosts_ordering,
1039
            schedule_id=schedule_id,
1040
            schedule_periods=schedule_periods,
1041
            comment=comment,
1042
            alert_ids=alert_ids,
1043
            observers=observers,
1044
            preferences=preferences,
1045
        )
1046
1047
    def modify_permission(
1048
        self,
1049
        permission_id: str,
1050
        *,
1051
        comment: Optional[str] = None,
1052
        name: Optional[str] = None,
1053
        resource_id: Optional[str] = None,
1054
        resource_type: Optional[EntityType] = None,
1055
        subject_id: Optional[str] = None,
1056
        subject_type: Optional[PermissionSubjectType] = None
1057
    ) -> Any:
1058
        """Modifies an existing permission.
1059
1060
        Arguments:
1061
            permission_id: UUID of permission to be modified.
1062
            comment: The comment on the permission.
1063
            name: Permission name, currently the name of a command.
1064
            subject_id: UUID of subject to whom the permission is granted
1065
            subject_type: Type of the subject user, group or role
1066
            resource_id: UUID of entity to which the permission applies
1067
            resource_type: Type of the resource. For Super permissions user,
1068
                group or role
1069
1070
        Returns:
1071
            The response. See :py:meth:`send_command` for details.
1072
        """
1073
        if not permission_id:
1074
            raise RequiredArgument(
1075
                function=self.modify_permission.__name__,
1076
                argument='permission_id',
1077
            )
1078
1079
        cmd = XmlCommand("modify_permission")
1080
        cmd.set_attribute("permission_id", permission_id)
1081
1082
        if comment:
1083
            cmd.add_element("comment", comment)
1084
1085
        if name:
1086
            cmd.add_element("name", name)
1087
1088 View Code Duplication
        if resource_id or resource_type:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1089
            if not resource_id:
1090
                raise RequiredArgument(
1091
                    function=self.modify_permission.__name__,
1092
                    argument='resource_id',
1093
                )
1094
1095
            if not resource_type:
1096
                raise RequiredArgument(
1097
                    function=self.modify_permission.__name__,
1098
                    argument='resource_type',
1099
                )
1100
1101
            if not isinstance(resource_type, self.types.EntityType):
1102
                raise InvalidArgumentType(
1103
                    function=self.modify_permission.__name__,
1104
                    argument='resource_type',
1105
                    arg_type=self.types.EntityType.__name__,
1106
                )
1107
1108
            _xmlresource = cmd.add_element(
1109
                "resource", attrs={"id": resource_id}
1110
            )
1111
            _actual_resource_type = resource_type
1112
            if resource_type.value == EntityType.AUDIT.value:
1113
                _actual_resource_type = EntityType.TASK
1114
            elif resource_type.value == EntityType.POLICY.value:
1115
                _actual_resource_type = EntityType.SCAN_CONFIG
1116
            _xmlresource.add_element("type", _actual_resource_type.value)
1117
1118 View Code Duplication
        if subject_id or subject_type:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1119
            if not subject_id:
1120
                raise RequiredArgument(
1121
                    function=self.modify_permission.__name__,
1122
                    argument='subject_id',
1123
                )
1124
1125
            if not isinstance(subject_type, PermissionSubjectType):
1126
                raise InvalidArgumentType(
1127
                    function=self.modify_permission.__name__,
1128
                    argument='subject_type',
1129
                    arg_type=PermissionSubjectType.__name__,
1130
                )
1131
1132
            _xmlsubject = cmd.add_element("subject", attrs={"id": subject_id})
1133
            _xmlsubject.add_element("type", subject_type.value)
1134
1135
        return self._send_xml_command(cmd)
1136
1137
    def modify_policy_set_nvt_preference(
1138
        self,
1139
        policy_id: str,
1140
        name: str,
1141
        nvt_oid: str,
1142
        *,
1143
        value: Optional[str] = None
1144
    ) -> Any:
1145
        """Modifies the nvt preferences of an existing policy.
1146
1147
        Arguments:
1148
            policy_id: UUID of policy to modify.
1149
            name: Name for preference to change.
1150
            nvt_oid: OID of the NVT associated with preference to modify
1151
            value: New value for the preference. None to delete the preference
1152
                and to use the default instead.
1153
        """
1154
        self.modify_config_set_nvt_preference(
1155
            config_id=policy_id,
1156
            name=name,
1157
            nvt_oid=nvt_oid,
1158
            value=value,
1159
        )
1160
1161
    def modify_policy_set_name(self, policy_id: str, name: str) -> Any:
1162
        """Modifies the name of an existing policy
1163
1164
        Arguments:
1165
            config_id: UUID of policy to modify.
1166
            name: New name for the config.
1167
        """
1168
        self.modify_config_set_name(
1169
            config_id=policy_id,
1170
            name=name,
1171
        )
1172
1173
    def modify_policy_set_comment(
1174
        self, policy_id: str, comment: Optional[str] = ""
1175
    ) -> Any:
1176
        """Modifies the comment of an existing policy
1177
1178
        Arguments:
1179
            policy_id: UUID of policy to modify.
1180
            comment: Comment to set on a config. Default: ''
1181
        """
1182
        self.modify_config_set_comment(
1183
            config_id=policy_id,
1184
            comment=comment,
1185
        )
1186
1187
    def modify_policy_set_scanner_preference(
1188
        self, policy_id: str, name: str, *, value: Optional[str] = None
1189
    ) -> Any:
1190
        """Modifies the scanner preferences of an existing policy
1191
1192
        Arguments:
1193
            policy_id: UUID of policy to modify.
1194
            name: Name of the scanner preference to change
1195
            value: New value for the preference. None to delete the preference
1196
                and to use the default instead.
1197
1198
        """
1199
        self.modify_config_set_scanner_preference(
1200
            config_id=policy_id,
1201
            name=name,
1202
            value=value,
1203
        )
1204
1205
    def modify_policy_set_nvt_selection(
1206
        self, policy_id: str, family: str, nvt_oids: List[str]
1207
    ) -> Any:
1208
        """Modifies the selected nvts of an existing policy
1209
1210
        The manager updates the given family in the config to include only the
1211
        given NVTs.
1212
1213
        Arguments:
1214
            policy_id: UUID of policy to modify.
1215
            family: Name of the NVT family to include NVTs from
1216
            nvt_oids: List of NVTs to select for the family.
1217
        """
1218
        self.modify_config_set_nvt_selection(
1219
            config_id=policy_id,
1220
            family=family,
1221
            nvt_oids=nvt_oids,
1222
        )
1223
1224
    def modify_policy_set_family_selection(
1225
        self,
1226
        policy_id: str,
1227
        families: List[Tuple[str, bool]],
1228
        *,
1229
        auto_add_new_families: Optional[bool] = True
1230
    ) -> Any:
1231
        """
1232
        Selected the NVTs of a policy at a family level.
1233
1234
        Arguments:
1235
            policy_id: UUID of policy to modify.
1236
            families: A list of tuples with the first entry being the name
1237
                of the NVT family selected, second entry a boolean indicating
1238
                whether new NVTs should be added to the family automatically.
1239
            auto_add_new_families: Whether new families should be added to the
1240
                policy automatically. Default: True.
1241
        """
1242
        self.modify_config_set_family_selection(
1243
            config_id=policy_id,
1244
            families=families,
1245
            auto_add_new_families=auto_add_new_families,
1246
        )
1247
1248
    def modify_tag(
1249
        self,
1250
        tag_id: str,
1251
        *,
1252
        comment: Optional[str] = None,
1253
        name: Optional[str] = None,
1254
        value=None,
1255
        active=None,
1256
        resource_action: Optional[str] = None,
1257
        resource_type: Optional[EntityType] = None,
1258
        resource_filter: Optional[str] = None,
1259
        resource_ids: Optional[List[str]] = None
1260
    ) -> Any:
1261
        """Modifies an existing tag.
1262
1263
        Arguments:
1264
            tag_id: UUID of the tag.
1265
            comment: Comment to add to the tag.
1266
            name: Name of the tag.
1267
            value: Value of the tag.
1268
            active: Whether the tag is active.
1269
            resource_action: Whether to add or remove resources instead of
1270
                overwriting. One of '', 'add', 'set' or 'remove'.
1271
            resource_type: Type of the resources to which to attach the tag.
1272
                Required if resource_filter is set.
1273
            resource_filter: Filter term to select resources the tag is to be
1274
                attached to.
1275
            resource_ids: IDs of the resources to which to attach the tag.
1276
1277
        Returns:
1278
            The response. See :py:meth:`send_command` for details.
1279
        """
1280
        if not tag_id:
1281
            raise RequiredArgument(
1282
                function=self.modify_tag.__name__, argument='tag_id'
1283
            )
1284
1285
        cmd = XmlCommand("modify_tag")
1286
        cmd.set_attribute("tag_id", str(tag_id))
1287
1288
        if comment:
1289
            cmd.add_element("comment", comment)
1290
1291
        if name:
1292
            cmd.add_element("name", name)
1293
1294
        if value:
1295
            cmd.add_element("value", value)
1296
1297
        if active is not None:
1298
            cmd.add_element("active", _to_bool(active))
1299
1300
        if resource_action or resource_filter or resource_ids or resource_type:
1301
            if resource_filter and not resource_type:
1302
                raise RequiredArgument(
1303
                    function=self.modify_tag.__name__, argument='resource_type'
1304
                )
1305
1306
            _xmlresources = cmd.add_element("resources")
1307
            if resource_action is not None:
1308
                _xmlresources.set_attribute("action", resource_action)
1309
1310
            if resource_filter is not None:
1311
                _xmlresources.set_attribute("filter", resource_filter)
1312
1313
            for resource_id in resource_ids or []:
1314
                _xmlresources.add_element(
1315
                    "resource", attrs={"id": str(resource_id)}
1316
                )
1317
1318
            if resource_type is not None:
1319
                if not isinstance(resource_type, self.types.EntityType):
1320
                    raise InvalidArgumentType(
1321
                        function=self.modify_tag.__name__,
1322
                        argument="resource_type",
1323
                        arg_type=EntityType.__name__,
1324
                    )
1325
                _actual_resource_type = resource_type
1326
                if resource_type.value == EntityType.AUDIT.value:
1327
                    _actual_resource_type = EntityType.TASK
1328
                elif resource_type.value == EntityType.POLICY.value:
1329
                    _actual_resource_type = EntityType.SCAN_CONFIG
1330
                _xmlresources.add_element("type", _actual_resource_type.value)
1331
1332
        return self._send_xml_command(cmd)
1333
1334 View Code Duplication
    def modify_tls_certificate(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1335
        self,
1336
        tls_certificate_id: str,
1337
        *,
1338
        name: Optional[str] = None,
1339
        comment: Optional[str] = None,
1340
        trust: Optional[bool] = None
1341
    ) -> Any:
1342
        """Modifies an existing TLS certificate.
1343
1344
        Arguments:
1345
            tls_certificate_id: UUID of the TLS certificate to be modified.
1346
            name: Name of the TLS certificate, defaulting to the MD5 fingerprint
1347
            comment: Comment for the TLS certificate.
1348
            trust: Whether the certificate is trusted.
1349
1350
        Returns:
1351
            The response. See :py:meth:`send_command` for details.
1352
        """
1353
        if not tls_certificate_id:
1354
            raise RequiredArgument(
1355
                function=self.modify_tls_certificate.__name__,
1356
                argument='tls_certificate_id',
1357
            )
1358
1359
        cmd = XmlCommand("modify_tls_certificate")
1360
        cmd.set_attribute("tls_certificate_id", str(tls_certificate_id))
1361
1362
        if comment:
1363
            cmd.add_element("comment", comment)
1364
1365
        if name:
1366
            cmd.add_element("name", name)
1367
1368
        if trust:
1369
            cmd.add_element("trust", _to_bool(trust))
1370
1371
        return self._send_xml_command(cmd)
1372
1373
    def clone_tls_certificate(self, tls_certificate_id: str) -> Any:
1374
        """Modifies an existing TLS certificate.
1375
1376
        Arguments:
1377
            tls_certificate_id: The UUID of an existing TLS certificate
1378
1379
        Returns:
1380
            The response. See :py:meth:`send_command` for details.
1381
        """
1382
        if not tls_certificate_id:
1383
            raise RequiredArgument(
1384
                function=self.clone_tls_certificate.__name__,
1385
                argument='tls_certificate_id',
1386
            )
1387
1388
        cmd = XmlCommand("create_tls_certificate")
1389
1390
        cmd.add_element("copy", tls_certificate_id)
1391
1392
        return self._send_xml_command(cmd)
1393
1394
    def get_configs(
1395
        self,
1396
        *,
1397
        filter: Optional[str] = None,
1398
        filter_id: Optional[str] = None,
1399
        trash: Optional[bool] = None,
1400
        details: Optional[bool] = None,
1401
        families: Optional[bool] = None,
1402
        preferences: Optional[bool] = None,
1403
        tasks: Optional[bool] = None
1404
    ) -> Any:
1405
        """Request a list of scan configs
1406
1407
        Arguments:
1408
            filter: Filter term to use for the query
1409
            filter_id: UUID of an existing filter to use for the query
1410
            trash: Whether to get the trashcan scan configs instead
1411
            details: Whether to get config families, preferences, nvt selectors
1412
                and tasks.
1413
            families: Whether to include the families if no details are
1414
                requested
1415
            preferences: Whether to include the preferences if no details are
1416
                requested
1417
            tasks: Whether to get tasks using this config
1418
1419
        Returns:
1420
            The response. See :py:meth:`send_command` for details.
1421
        """
1422
        return self.__get_configs(
1423
            UsageType.SCAN,
1424
            filter=filter,
1425
            filter_id=filter_id,
1426
            trash=trash,
1427
            details=details,
1428
            families=families,
1429
            preferences=preferences,
1430
            tasks=tasks,
1431
        )
1432
1433
    def get_policies(
1434
        self,
1435
        *,
1436
        audits: Optional[bool] = None,
1437
        filter: Optional[str] = None,
1438
        filter_id: Optional[str] = None,
1439
        details: Optional[bool] = None,
1440
        families: Optional[bool] = None,
1441
        preferences: Optional[bool] = None,
1442
        trash: Optional[bool] = None
1443
    ) -> Any:
1444
        """Request a list of policies
1445
1446
        Arguments:
1447
            audits: Whether to get audits using the policy
1448
            filter: Filter term to use for the query
1449
            filter_id: UUID of an existing filter to use for the query
1450
            details: Whether to get  families, preferences, nvt selectors
1451
                and tasks.
1452
            families: Whether to include the families if no details are
1453
                requested
1454
            preferences: Whether to include the preferences if no details are
1455
                requested
1456
            trash: Whether to get the trashcan audits instead
1457
1458
        Returns:
1459
            The response. See :py:meth:`send_command` for details.
1460
        """
1461
        return self.__get_configs(
1462
            UsageType.POLICY,
1463
            filter=filter,
1464
            filter_id=filter_id,
1465
            details=details,
1466
            families=families,
1467
            preferences=preferences,
1468
            tasks=audits,
1469
            trash=trash,
1470
        )
1471
1472
    def get_config(
1473
        self, config_id: str, *, tasks: Optional[bool] = None
1474
    ) -> Any:
1475
        """Request a single scan config
1476
1477
        Arguments:
1478
            config_id: UUID of an existing scan config
1479
            tasks: Whether to get tasks using this config
1480
1481
        Returns:
1482
            The response. See :py:meth:`send_command` for details.
1483
        """
1484
        return self.__get_config(
1485
            config_id=config_id, usage_type=UsageType.SCAN, tasks=tasks
1486
        )
1487
1488
    def get_policy(
1489
        self, policy_id: str, *, audits: Optional[bool] = None
1490
    ) -> Any:
1491
        """Request a single policy
1492
1493
        Arguments:
1494
            policy_id: UUID of an existing policy
1495
            audits: Whether to get audits using this config
1496
1497
        Returns:
1498
            The response. See :py:meth:`send_command` for details.
1499
        """
1500
        return self.__get_config(policy_id, UsageType.POLICY, tasks=audits)
1501
1502
    def get_tasks(
1503
        self,
1504
        *,
1505
        filter: Optional[str] = None,
1506
        filter_id: Optional[str] = None,
1507
        trash: Optional[bool] = None,
1508
        details: Optional[bool] = None,
1509
        schedules_only: Optional[bool] = None
1510
    ) -> Any:
1511
        """Request a list of tasks
1512
1513
        Arguments:
1514
            filter: Filter term to use for the query
1515
            filter_id: UUID of an existing filter to use for the query
1516
            trash: Whether to get the trashcan tasks instead
1517
            details: Whether to include full task details
1518
            schedules_only: Whether to only include id, name and schedule
1519
                details
1520
1521
        Returns:
1522
            The response. See :py:meth:`send_command` for details.
1523
        """
1524
        return self.__get_tasks(
1525
            UsageType.SCAN,
1526
            filter=filter,
1527
            filter_id=filter_id,
1528
            trash=trash,
1529
            details=details,
1530
            schedules_only=schedules_only,
1531
        )
1532
1533
    def get_audits(
1534
        self,
1535
        *,
1536
        filter: Optional[str] = None,
1537
        filter_id: Optional[str] = None,
1538
        trash: Optional[bool] = None,
1539
        details: Optional[bool] = None,
1540
        schedules_only: Optional[bool] = None
1541
    ) -> Any:
1542
        """Request a list of audits
1543
1544
        Arguments:
1545
            filter: Filter term to use for the query
1546
            filter_id: UUID of an existing filter to use for the query
1547
            trash: Whether to get the trashcan audits instead
1548
            details: Whether to include full audit details
1549
            schedules_only: Whether to only include id, name and schedule
1550
                details
1551
1552
        Returns:
1553
            The response. See :py:meth:`send_command` for details.
1554
        """
1555
        return self.__get_tasks(
1556
            UsageType.AUDIT,
1557
            filter=filter,
1558
            filter_id=filter_id,
1559
            trash=trash,
1560
            details=details,
1561
            schedules_only=schedules_only,
1562
        )
1563
1564
    def get_task(self, task_id: str) -> Any:
1565
        """Request a single task
1566
1567
        Arguments:
1568
            task_id: UUID of an existing task
1569
1570
        Returns:
1571
            The response. See :py:meth:`send_command` for details.
1572
        """
1573
        return self.__get_task(task_id, UsageType.SCAN)
1574
1575
    def get_audit(self, audit_id: str) -> Any:
1576
        """Request a single audit
1577
1578
        Arguments:
1579
            audit_id: UUID of an existing audit
1580
1581
        Returns:
1582
            The response. See :py:meth:`send_command` for details.
1583
        """
1584
        return self.__get_task(audit_id, UsageType.AUDIT)
1585
1586
    def clone_audit(self, audit_id: str) -> Any:
1587
        """Clone an existing audit
1588
1589
        Arguments:
1590
            audit_id: UUID of existing audit to clone from
1591
1592
        Returns:
1593
            The response. See :py:meth:`send_command` for details.
1594
        """
1595
        if not audit_id:
1596
            raise RequiredArgument(
1597
                function=self.clone_audit.__name__, argument='audit_id'
1598
            )
1599
1600
        cmd = XmlCommand("create_task")
1601
        cmd.add_element("copy", audit_id)
1602
        return self._send_xml_command(cmd)
1603
1604
    def clone_policy(self, policy_id: str) -> Any:
1605
        """Clone a policy from an existing one
1606
1607
        Arguments:
1608
            policy_id: UUID of the existing policy
1609
1610
        Returns:
1611
            The response. See :py:meth:`send_command` for details.
1612
        """
1613
        if not policy_id:
1614
            raise RequiredArgument(
1615
                function=self.clone_policy.__name__, argument='policy_id'
1616
            )
1617
1618
        cmd = XmlCommand("create_config")
1619
        cmd.add_element("copy", policy_id)
1620
        return self._send_xml_command(cmd)
1621
1622
    def delete_audit(
1623
        self, audit_id: str, *, ultimate: Optional[bool] = False
1624
    ) -> Any:
1625
        """Deletes an existing audit
1626
1627
        Arguments:
1628
            audit_id: UUID of the audit to be deleted.
1629
            ultimate: Whether to remove entirely, or to the trashcan.
1630
        """
1631
        if not audit_id:
1632
            raise RequiredArgument(
1633
                function=self.delete_audit.__name__, argument='audit_id'
1634
            )
1635
1636
        cmd = XmlCommand("delete_task")
1637
        cmd.set_attribute("task_id", audit_id)
1638
        cmd.set_attribute("ultimate", _to_bool(ultimate))
1639
1640
        return self._send_xml_command(cmd)
1641
1642
    def delete_policy(
1643
        self, policy_id: str, *, ultimate: Optional[bool] = False
1644
    ) -> Any:
1645
        """Deletes an existing policy
1646
1647
        Arguments:
1648
            policy_id: UUID of the policy to be deleted.
1649
            ultimate: Whether to remove entirely, or to the trashcan.
1650
        """
1651
        if not policy_id:
1652
            raise RequiredArgument(
1653
                function=self.delete_policy.__name__, argument='policy_id'
1654
            )
1655
1656
        cmd = XmlCommand("delete_config")
1657
        cmd.set_attribute("config_id", policy_id)
1658
        cmd.set_attribute("ultimate", _to_bool(ultimate))
1659
1660
        return self._send_xml_command(cmd)
1661
1662
    def delete_tls_certificate(self, tls_certificate_id: str) -> Any:
1663
        """Deletes an existing tls certificate
1664
1665
        Arguments:
1666
            tls_certificate_id: UUID of the tls certificate to be deleted.
1667
        """
1668
        if not tls_certificate_id:
1669
            raise RequiredArgument(
1670
                function=self.delete_tls_certificate.__name__,
1671
                argument='tls_certificate_id',
1672
            )
1673
1674
        cmd = XmlCommand("delete_tls_certificate")
1675
        cmd.set_attribute("tls_certificate_id", tls_certificate_id)
1676
1677
        return self._send_xml_command(cmd)
1678
1679 View Code Duplication
    def __create_task(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1680
        self,
1681
        name: str,
1682
        config_id: str,
1683
        target_id: str,
1684
        scanner_id: str,
1685
        usage_type: UsageType,
1686
        function: str,
1687
        *,
1688
        alterable: Optional[bool] = None,
1689
        hosts_ordering: Optional[HostsOrdering] = None,
1690
        schedule_id: Optional[str] = None,
1691
        alert_ids: Optional[List[str]] = None,
1692
        comment: Optional[str] = None,
1693
        schedule_periods: Optional[int] = None,
1694
        observers: Optional[List[str]] = None,
1695
        preferences: Optional[dict] = None
1696
    ) -> Any:
1697
        if not name:
1698
            raise RequiredArgument(function=function, argument='name')
1699
1700
        if not config_id:
1701
            raise RequiredArgument(function=function, argument='config_id')
1702
1703
        if not target_id:
1704
            raise RequiredArgument(function=function, argument='target_id')
1705
1706
        if not scanner_id:
1707
            raise RequiredArgument(function=function, argument='scanner_id')
1708
1709
        # don't allow to create a container task with create_task
1710
        if target_id == '0':
1711
            raise InvalidArgument(function=function, argument='target_id')
1712
1713
        cmd = XmlCommand("create_task")
1714
        cmd.add_element("name", name)
1715
        cmd.add_element("usage_type", usage_type.value)
1716
        cmd.add_element("config", attrs={"id": config_id})
1717
        cmd.add_element("target", attrs={"id": target_id})
1718
        cmd.add_element("scanner", attrs={"id": scanner_id})
1719
1720
        if comment:
1721
            cmd.add_element("comment", comment)
1722
1723
        if alterable is not None:
1724
            cmd.add_element("alterable", _to_bool(alterable))
1725
1726
        if hosts_ordering:
1727
            if not isinstance(hosts_ordering, self.types.HostsOrdering):
1728
                raise InvalidArgumentType(
1729
                    function=function,
1730
                    argument='hosts_ordering',
1731
                    arg_type=HostsOrdering.__name__,
1732
                )
1733
            cmd.add_element("hosts_ordering", hosts_ordering.value)
1734
1735
        if alert_ids:
1736
            if isinstance(alert_ids, str):
1737
                deprecation(
1738
                    "Please pass a list as alert_ids parameter to {}. "
1739
                    "Passing a string is deprecated and will be removed in "
1740
                    "future.".format(function)
1741
                )
1742
1743
                # if a single id is given as a string wrap it into a list
1744
                alert_ids = [alert_ids]
1745
            if _is_list_like(alert_ids):
1746
                # parse all given alert id's
1747
                for alert in alert_ids:
1748
                    cmd.add_element("alert", attrs={"id": str(alert)})
1749
1750
        if schedule_id:
1751
            cmd.add_element("schedule", attrs={"id": schedule_id})
1752
1753
            if schedule_periods is not None:
1754
                if (
1755
                    not isinstance(schedule_periods, numbers.Integral)
1756
                    or schedule_periods < 0
1757
                ):
1758
                    raise InvalidArgument(
1759
                        "schedule_periods must be an integer greater or equal "
1760
                        "than 0"
1761
                    )
1762
                cmd.add_element("schedule_periods", str(schedule_periods))
1763
1764
        if observers is not None:
1765
            if not _is_list_like(observers):
1766
                raise InvalidArgumentType(
1767
                    function=function, argument='observers', arg_type='list'
1768
                )
1769
1770
            # gvmd splits by comma and space
1771
            # gvmd tries to lookup each value as user name and afterwards as
1772
            # user id. So both user name and user id are possible
1773
            cmd.add_element("observers", _to_comma_list(observers))
1774
1775
        if preferences is not None:
1776
            if not isinstance(preferences, collections.abc.Mapping):
1777
                raise InvalidArgumentType(
1778
                    function=function,
1779
                    argument='preferences',
1780
                    arg_type=collections.abc.Mapping.__name__,
1781
                )
1782
1783
            _xmlprefs = cmd.add_element("preferences")
1784
            for pref_name, pref_value in preferences.items():
1785
                _xmlpref = _xmlprefs.add_element("preference")
1786
                _xmlpref.add_element("scanner_name", pref_name)
1787
                _xmlpref.add_element("value", str(pref_value))
1788
1789
        return self._send_xml_command(cmd)
1790
1791
    def __create_config(
1792
        self,
1793
        config_id: str,
1794
        name: str,
1795
        usage_type: UsageType,
1796
        function: str,
1797
        *,
1798
        comment: Optional[str] = None
1799
    ) -> Any:
1800
        if not name:
1801
            raise RequiredArgument(function=function, argument='name')
1802
1803
        if not config_id:
1804
            raise RequiredArgument(function=function, argument='config_id')
1805
1806
        cmd = XmlCommand("create_config")
1807
        if comment is not None:
1808
            cmd.add_element("comment", comment)
1809
        cmd.add_element("copy", config_id)
1810
        cmd.add_element("name", name)
1811
        cmd.add_element("usage_type", usage_type.value)
1812
        return self._send_xml_command(cmd)
1813
1814
    def __create_config_from_osp_scanner(
1815
        self,
1816
        scanner_id: str,
1817
        name: str,
1818
        usage_type: UsageType,
1819
        function: str,
1820
        *,
1821
        comment: Optional[str] = None
1822
    ) -> Any:
1823
        if not name:
1824
            raise RequiredArgument(function=function, argument='name')
1825
1826
        if not scanner_id:
1827
            raise RequiredArgument(function=function, argument='scanner_id')
1828
1829
        cmd = XmlCommand("create_config")
1830
        if comment is not None:
1831
            cmd.add_element("comment", comment)
1832
        cmd.add_element("scanner", scanner_id)
1833
        cmd.add_element("name", name)
1834
        cmd.add_element("usage_type", usage_type.value)
1835
        return self._send_xml_command(cmd)
1836
1837 View Code Duplication
    def __get_configs(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1838
        self,
1839
        usage_type: UsageType,
1840
        *,
1841
        filter: Optional[str] = None,
1842
        filter_id: Optional[str] = None,
1843
        trash: Optional[bool] = None,
1844
        details: Optional[bool] = None,
1845
        families: Optional[bool] = None,
1846
        preferences: Optional[bool] = None,
1847
        tasks: Optional[bool] = None
1848
    ) -> Any:
1849
        cmd = XmlCommand("get_configs")
1850
        cmd.set_attribute("usage_type", usage_type.value)
1851
1852
        _add_filter(cmd, filter, filter_id)
1853
1854
        if trash is not None:
1855
            cmd.set_attribute("trash", _to_bool(trash))
1856
1857
        if details is not None:
1858
            cmd.set_attribute("details", _to_bool(details))
1859
1860
        if families is not None:
1861
            cmd.set_attribute("families", _to_bool(families))
1862
1863
        if preferences is not None:
1864
            cmd.set_attribute("preferences", _to_bool(preferences))
1865
1866
        if tasks is not None:
1867
            cmd.set_attribute("tasks", _to_bool(tasks))
1868
1869
        return self._send_xml_command(cmd)
1870
1871
    def __get_config(
1872
        self,
1873
        config_id: str,
1874
        usage_type: UsageType,
1875
        *,
1876
        tasks: Optional[bool] = None
1877
    ) -> Any:
1878
        if not config_id:
1879
            raise RequiredArgument(
1880
                function=self.get_config.__name__, argument='config_id'
1881
            )
1882
1883
        cmd = XmlCommand("get_configs")
1884
        cmd.set_attribute("config_id", config_id)
1885
1886
        cmd.set_attribute("usage_type", usage_type.value)
1887
1888
        if tasks is not None:
1889
            cmd.set_attribute("tasks", _to_bool(tasks))
1890
1891
        # for single entity always request all details
1892
        cmd.set_attribute("details", "1")
1893
1894
        return self._send_xml_command(cmd)
1895
1896
    def __get_tasks(
1897
        self,
1898
        usage_type: UsageType,
1899
        *,
1900
        filter: Optional[str] = None,
1901
        filter_id: Optional[str] = None,
1902
        trash: Optional[bool] = None,
1903
        details: Optional[bool] = None,
1904
        schedules_only: Optional[bool] = None
1905
    ) -> Any:
1906
        cmd = XmlCommand("get_tasks")
1907
        cmd.set_attribute("usage_type", usage_type.value)
1908
1909
        _add_filter(cmd, filter, filter_id)
1910
1911
        if trash is not None:
1912
            cmd.set_attribute("trash", _to_bool(trash))
1913
1914
        if details is not None:
1915
            cmd.set_attribute("details", _to_bool(details))
1916
1917
        if schedules_only is not None:
1918
            cmd.set_attribute("schedules_only", _to_bool(schedules_only))
1919
1920
        return self._send_xml_command(cmd)
1921
1922
    def __get_task(self, task_id: str, usage_type: UsageType) -> Any:
1923
        if not task_id:
1924
            raise RequiredArgument(
1925
                function=self.get_task.__name__, argument='task_id'
1926
            )
1927
1928
        cmd = XmlCommand("get_tasks")
1929
        cmd.set_attribute("task_id", task_id)
1930
        cmd.set_attribute("usage_type", usage_type.value)
1931
1932
        # for single entity always request all details
1933
        cmd.set_attribute("details", "1")
1934
        return self._send_xml_command(cmd)
1935
1936
    def resume_audit(self, audit_id: str) -> Any:
1937
        """Resume an existing stopped audit
1938
1939
        Arguments:
1940
            audit_id: UUID of the audit to be resumed
1941
1942
        Returns:
1943
            The response. See :py:meth:`send_command` for details.
1944
        """
1945
        if not audit_id:
1946
            raise RequiredArgument(
1947
                function=self.resume_audit.__name__, argument='audit_id'
1948
            )
1949
1950
        cmd = XmlCommand("resume_task")
1951
        cmd.set_attribute("task_id", audit_id)
1952
1953
        return self._send_xml_command(cmd)
1954
1955
    def start_audit(self, audit_id: str) -> Any:
1956
        """Start an existing audit
1957
1958
        Arguments:
1959
            audit_id: UUID of the audit to be started
1960
1961
        Returns:
1962
            The response. See :py:meth:`send_command` for details.
1963
        """
1964
        if not audit_id:
1965
            raise RequiredArgument(
1966
                function=self.start_audit.__name__, argument='audit_id'
1967
            )
1968
1969
        cmd = XmlCommand("start_task")
1970
        cmd.set_attribute("task_id", audit_id)
1971
1972
        return self._send_xml_command(cmd)
1973
1974
    def stop_audit(self, audit_id: str) -> Any:
1975
        """Stop an existing running audit
1976
1977
        Arguments:
1978
            audit_id: UUID of the audit to be stopped
1979
1980
        Returns:
1981
            The response. See :py:meth:`send_command` for details.
1982
        """
1983
        if not audit_id:
1984
            raise RequiredArgument(
1985
                function=self.stop_audit.__name__, argument='audit_id'
1986
            )
1987
1988
        cmd = XmlCommand("stop_task")
1989
        cmd.set_attribute("task_id", audit_id)
1990
1991
        return self._send_xml_command(cmd)
1992