Passed
Push — master ( f8b4f2...b39f5b )
by Jaspar
142:01 queued 94:15
created

GmpV9Mixin.import_report()   B

Complexity

Conditions 5

Size

Total Lines 44
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 24
nop 5
dl 0
loc 44
rs 8.8373
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 - 2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
# pylint: disable=arguments-differ, redefined-builtin, too-many-lines
20
21
"""
22
Module for communication with gvmd in `Greenbone Management Protocol version 9`_
23
24
.. _Greenbone Management Protocol version 9:
25
    https://docs.greenbone.net/API/GMP/gmp-9.0.html
26
"""
27
import collections
28
import numbers
29
30
from typing import Any, List, Optional, Callable, Tuple
31
32
from lxml import etree
33
34
from gvm.errors import InvalidArgument, InvalidArgumentType, RequiredArgument
35
from gvm.utils import deprecation
36
from gvm.xml import XmlCommand
37
38
from gvm.protocols.base import GvmProtocol
39
from gvm.connections import GvmConnection
40
from gvm.protocols.gmpv7.gmpv7 import (
41
    _to_bool,
42
    _add_filter,
43
    _is_list_like,
44
    _to_comma_list,
45
)
46
47
from . import types
48
from .types import *  # pylint: disable=unused-wildcard-import, wildcard-import
49
from .types import _UsageType as UsageType
50
51
_EMPTY_POLICY_ID = '085569ce-73ed-11df-83c3-002264764cea'
52
53
54
def _check_event(
55
    event: AlertEvent, condition: AlertCondition, method: AlertMethod
56
):
57
    if event == AlertEvent.TASK_RUN_STATUS_CHANGED:
58
        if not condition:
59
            raise RequiredArgument(
60
                "condition is required for event {}".format(event.name)
61
            )
62
63
        if not method:
64
            raise RequiredArgument(
65
                "method is required for event {}".format(event.name)
66
            )
67
68
        if condition not in (
69
            AlertCondition.ALWAYS,
70
            AlertCondition.FILTER_COUNT_CHANGED,
71
            AlertCondition.FILTER_COUNT_AT_LEAST,
72
            AlertCondition.SEVERITY_AT_LEAST,
73
            AlertCondition.SEVERITY_CHANGED,
74
        ):
75
            raise InvalidArgument(
76
                "Invalid condition {} for event {}".format(
77
                    condition.name, event.name
78
                )
79
            )
80
    elif event in (
81
        AlertEvent.NEW_SECINFO_ARRIVED,
82
        AlertEvent.UPDATED_SECINFO_ARRIVED,
83
    ):
84
        if not condition:
85
            raise RequiredArgument(
86
                "condition is required for event {}".format(event.name)
87
            )
88
89
        if not method:
90
            raise RequiredArgument(
91
                "method is required for event {}".format(event.name)
92
            )
93
94
        if condition != AlertCondition.ALWAYS:
95
            raise InvalidArgument(
96
                "Invalid condition {} for event {}".format(
97
                    condition.name, event.name
98
                )
99
            )
100
        if method not in (
101
            AlertMethod.SCP,
102
            AlertMethod.SEND,
103
            AlertMethod.SMB,
104
            AlertMethod.SNMP,
105
            AlertMethod.SYSLOG,
106
            AlertMethod.EMAIL,
107
        ):
108
            raise InvalidArgument(
109
                "Invalid method {} for event {}".format(method.name, event.name)
110
            )
111
    elif event in (
112
        AlertEvent.TICKET_RECEIVED,
113
        AlertEvent.OWNED_TICKET_CHANGED,
114
        AlertEvent.ASSIGNED_TICKET_CHANGED,
115
    ):
116
        if not condition:
117
            raise RequiredArgument(
118
                "condition is required for event {}".format(event.name)
119
            )
120
121
        if not method:
122
            raise RequiredArgument(
123
                "method is required for event {}".format(event.name)
124
            )
125
        if condition != AlertCondition.ALWAYS:
126
            raise InvalidArgument(
127
                "Invalid condition {} for event {}".format(
128
                    condition.name, event.name
129
                )
130
            )
131
        if method not in (
132
            AlertMethod.EMAIL,
133
            AlertMethod.START_TASK,
134
            AlertMethod.SYSLOG,
135
        ):
136
            raise InvalidArgument(
137
                "Invalid method {} for event {}".format(method.name, event.name)
138
            )
139
    elif event is not None:
140
        raise InvalidArgument('Invalid event "{}"'.format(event.name))
141
142
143
class GmpV9Mixin(GvmProtocol):
144
145
    types = types
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable types does not seem to be defined.
Loading history...
146
147
    def __init__(
148
        self,
149
        connection: GvmConnection,
150
        *,
151
        transform: Optional[Callable[[str], Any]] = None,
152
    ):
153
        super().__init__(connection, transform=transform)
154
155
        # Is authenticated on gvmd
156
        self._authenticated = False
157
158 View Code Duplication
    def create_alert(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
159
        self,
160
        name: str,
161
        condition: AlertCondition,
162
        event: AlertEvent,
163
        method: AlertMethod,
164
        *,
165
        method_data: Optional[dict] = None,
166
        event_data: Optional[dict] = None,
167
        condition_data: Optional[dict] = None,
168
        filter_id: Optional[int] = None,
169
        comment: Optional[str] = None,
170
    ) -> Any:
171
        """Create a new alert
172
173
        Arguments:
174
            name: Name of the new Alert
175
            condition: The condition that must be satisfied for the alert
176
                to occur; if the event is either 'Updated SecInfo arrived' or
177
                'New SecInfo arrived', condition must be 'Always'. Otherwise,
178
                condition can also be on of 'Severity at least', 'Filter count
179
                changed' or 'Filter count at least'.
180
            event: The event that must happen for the alert to occur, one
181
                of 'Task run status changed', 'Updated SecInfo arrived' or 'New
182
                SecInfo arrived'
183
            method: The method by which the user is alerted, one of 'SCP',
184
                'Send', 'SMB', 'SNMP', 'Syslog' or 'Email'; if the event is
185
                neither 'Updated SecInfo arrived' nor 'New SecInfo arrived',
186
                method can also be one of 'Start Task', 'HTTP Get', 'Sourcefire
187
                Connector' or 'verinice Connector'.
188
            condition_data: Data that defines the condition
189
            event_data: Data that defines the event
190
            method_data: Data that defines the method
191
            filter_id: Filter to apply when executing alert
192
            comment: Comment for the alert
193
194
        Returns:
195
            The response. See :py:meth:`send_command` for details.
196
        """
197
        if not name:
198
            raise RequiredArgument(
199
                function=self.create_alert.__name__, argument='name'
200
            )
201
202
        if not condition:
203
            raise RequiredArgument(
204
                function=self.create_alert.__name__, argument='condition'
205
            )
206
207
        if not event:
208
            raise RequiredArgument(
209
                function=self.create_alert.__name__, argument='event'
210
            )
211
212
        if not method:
213
            raise RequiredArgument(
214
                function=self.create_alert.__name__, argument='method'
215
            )
216
217
        if not isinstance(condition, AlertCondition):
218
            raise InvalidArgumentType(
219
                function=self.create_alert.__name__,
220
                argument='condition',
221
                arg_type=AlertCondition.__name__,
222
            )
223
224
        if not isinstance(event, AlertEvent):
225
            raise InvalidArgumentType(
226
                function=self.create_alert.__name__,
227
                argument='even',
228
                arg_type=AlertEvent.__name__,
229
            )
230
231
        if not isinstance(method, AlertMethod):
232
            raise InvalidArgumentType(
233
                function=self.create_alert.__name__,
234
                argument='method',
235
                arg_type=AlertMethod.__name__,
236
            )
237
238
        _check_event(event, condition, method)
239
240
        cmd = XmlCommand("create_alert")
241
        cmd.add_element("name", name)
242
243
        conditions = cmd.add_element("condition", condition.value)
244
245
        if condition_data is not None:
246
            for key, value in condition_data.items():
247
                _data = conditions.add_element("data", value)
248
                _data.add_element("name", key)
249
250
        events = cmd.add_element("event", event.value)
251
252
        if event_data is not None:
253
            for key, value in event_data.items():
254
                _data = events.add_element("data", value)
255
                _data.add_element("name", key)
256
257
        methods = cmd.add_element("method", method.value)
258
259
        if method_data is not None:
260
            for key, value in method_data.items():
261
                _data = methods.add_element("data", value)
262
                _data.add_element("name", key)
263
264
        if filter_id:
265
            cmd.add_element("filter", attrs={"id": filter_id})
266
267
        if comment:
268
            cmd.add_element("comment", comment)
269
270
        return self._send_xml_command(cmd)
271
272 View Code Duplication
    def create_audit(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
273
        self,
274
        name: str,
275
        policy_id: str,
276
        target_id: str,
277
        scanner_id: str,
278
        *,
279
        alterable: Optional[bool] = None,
280
        hosts_ordering: Optional[HostsOrdering] = None,
281
        schedule_id: Optional[str] = None,
282
        alert_ids: Optional[List[str]] = None,
283
        comment: Optional[str] = None,
284
        schedule_periods: Optional[int] = None,
285
        observers: Optional[List[str]] = None,
286
        preferences: Optional[dict] = None,
287
    ) -> Any:
288
        """Create a new audit task
289
290
        Arguments:
291
            name: Name of the new audit
292
            policy_id: UUID of policy to use by the audit
293
            target_id: UUID of target to be scanned
294
            scanner_id: UUID of scanner to use for scanning the target
295
            comment: Comment for the audit
296
            alterable: Whether the task should be alterable
297
            alert_ids: List of UUIDs for alerts to be applied to the audit
298
            hosts_ordering: The order hosts are scanned in
299
            schedule_id: UUID of a schedule when the audit should be run.
300
            schedule_periods: A limit to the number of times the audit will be
301
                scheduled, or 0 for no limit
302
            observers: List of names or ids of users which should be allowed to
303
                observe this audit
304
            preferences: Name/Value pairs of scanner preferences.
305
306
        Returns:
307
            The response. See :py:meth:`send_command` for details.
308
        """
309
310
        return self.__create_task(
311
            name=name,
312
            config_id=policy_id,
313
            target_id=target_id,
314
            scanner_id=scanner_id,
315
            usage_type=UsageType.AUDIT,
316
            function=self.create_audit.__name__,
317
            alterable=alterable,
318
            hosts_ordering=hosts_ordering,
319
            schedule_id=schedule_id,
320
            alert_ids=alert_ids,
321
            comment=comment,
322
            schedule_periods=schedule_periods,
323
            observers=observers,
324
            preferences=preferences,
325
        )
326
327
    def create_config(
328
        self, config_id: str, name: str, *, comment: Optional[str] = None
329
    ) -> Any:
330
        """Create a new scan config
331
332
        Arguments:
333
            config_id: UUID of the existing scan config
334
            name: Name of the new scan config
335
            comment: A comment on the config
336
337
        Returns:
338
            The response. See :py:meth:`send_command` for details.
339
        """
340
        return self.__create_config(
341
            config_id=config_id,
342
            name=name,
343
            comment=comment,
344
            usage_type=UsageType.SCAN,
345
            function=self.create_config.__name__,
346
        )
347
348
    def create_config_from_osp_scanner(
349
        self, scanner_id: str, name: str, *, comment: Optional[str] = None
350
    ) -> Any:
351
        """Create a new scan config from an ospd scanner.
352
353
        Create config by retrieving the expected preferences from the given
354
        scanner via OSP.
355
356
        Arguments:
357
            scanner_id: UUID of an OSP scanner to get config data from
358
            name: Name of the new scan config
359
            comment: A comment on the config
360
361
        Returns:
362
            The response. See :py:meth:`send_command` for details.
363
        """
364
        return self.__create_config_from_osp_scanner(
365
            scanner_id=scanner_id,
366
            name=name,
367
            comment=comment,
368
            usage_type=UsageType.SCAN,
369
            function=self.create_config.__name__,
370
        )
371
372
    def create_permission(
373
        self,
374
        name: str,
375
        subject_id: str,
376
        subject_type: PermissionSubjectType,
377
        *,
378
        resource_id: Optional[str] = None,
379
        resource_type: Optional[EntityType] = None,
380
        comment: Optional[str] = None,
381
    ) -> Any:
382
        """Create a new permission
383
384
        Arguments:
385
            name: Name of the new permission
386
            subject_id: UUID of subject to whom the permission is granted
387
            subject_type: Type of the subject user, group or role
388
            comment: Comment for the permission
389
            resource_id: UUID of entity to which the permission applies
390
            resource_type: Type of the resource. For Super permissions user,
391
                group or role
392
393
        Returns:
394
            The response. See :py:meth:`send_command` for details.
395
        """
396
        if not name:
397
            raise RequiredArgument(
398
                function=self.create_permission.__name__, argument='name'
399
            )
400
401
        if not subject_id:
402
            raise RequiredArgument(
403
                function=self.create_permission.__name__, argument='subject_id'
404
            )
405
406
        if not isinstance(subject_type, PermissionSubjectType):
407
            raise InvalidArgumentType(
408
                function=self.create_permission.__name__,
409
                argument='subject_type',
410
                arg_type=PermissionSubjectType.__name__,
411
            )
412
413
        cmd = XmlCommand("create_permission")
414
        cmd.add_element("name", name)
415
416
        _xmlsubject = cmd.add_element("subject", attrs={"id": subject_id})
417
        _xmlsubject.add_element("type", subject_type.value)
418
419
        if comment:
420
            cmd.add_element("comment", comment)
421
422 View Code Duplication
        if resource_id or resource_type:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
423
            if not resource_id:
424
                raise RequiredArgument(
425
                    function=self.create_permission.__name__,
426
                    argument='resource_id',
427
                )
428
429
            if not resource_type:
430
                raise RequiredArgument(
431
                    function=self.create_permission.__name__,
432
                    argument='resource_type',
433
                )
434
435
            if not isinstance(resource_type, self.types.EntityType):
436
                raise InvalidArgumentType(
437
                    function=self.create_permission.__name__,
438
                    argument='resource_type',
439
                    arg_type=self.types.EntityType.__name__,
440
                )
441
442
            _xmlresource = cmd.add_element(
443
                "resource", attrs={"id": resource_id}
444
            )
445
446
            _actual_resource_type = resource_type
447
            if resource_type.value == EntityType.AUDIT.value:
448
                _actual_resource_type = EntityType.TASK
449
            elif resource_type.value == EntityType.POLICY.value:
450
                _actual_resource_type = EntityType.SCAN_CONFIG
451
452
            _xmlresource.add_element("type", _actual_resource_type.value)
453
454
        return self._send_xml_command(cmd)
455
456
    def create_policy(
457
        self, name: str, *, policy_id: str = None, comment: Optional[str] = None
458
    ) -> Any:
459
        """Create a new policy config
460
461
        Arguments:
462
            name: Name of the new policy
463
            policy_id: UUID of an existing policy as base. By default the empty
464
                policy is used.
465
            comment: A comment on the policy
466
467
        Returns:
468
            The response. See :py:meth:`send_command` for details.
469
        """
470
        if policy_id is None:
471
            policy_id = _EMPTY_POLICY_ID
472
        return self.__create_config(
473
            config_id=policy_id,
474
            name=name,
475
            comment=comment,
476
            usage_type=UsageType.POLICY,
477
            function=self.create_policy.__name__,
478
        )
479
480
    def create_tag(
481
        self,
482
        name: str,
483
        resource_type: EntityType,
484
        *,
485
        resource_filter: Optional[str] = None,
486
        resource_ids: Optional[List[str]] = None,
487
        value: Optional[str] = None,
488
        comment: Optional[str] = None,
489
        active: Optional[bool] = None,
490
    ) -> Any:
491
        """Create a tag.
492
493
        Arguments:
494
            name: Name of the tag. A full tag name consisting of namespace and
495
                predicate e.g. `foo:bar`.
496
            resource_type: Entity type the tag is to be attached to.
497
            resource_filter: Filter term to select resources the tag is to be
498
                attached to. Only one of resource_filter or resource_ids can be
499
                provided.
500
            resource_ids: IDs of the resources the tag is to be attached to.
501
                Only one of resource_filter or resource_ids can be provided.
502
            value: Value associated with the tag.
503
            comment: Comment for the tag.
504
            active: Whether the tag should be active.
505
506
        Returns:
507
            The response. See :py:meth:`send_command` for details.
508
        """
509
        if not name:
510
            raise RequiredArgument(
511
                function=self.create_tag.__name__, argument='name'
512
            )
513
514
        if resource_filter and resource_ids:
515
            raise InvalidArgument(
516
                "create_tag accepts either resource_filter or resource_ids "
517
                "argument",
518
                function=self.create_tag.__name__,
519
            )
520
521
        if not resource_type:
522
            raise RequiredArgument(
523
                function=self.create_tag.__name__, argument='resource_type'
524
            )
525
526
        if not isinstance(resource_type, self.types.EntityType):
527
            raise InvalidArgumentType(
528
                function=self.create_tag.__name__,
529
                argument='resource_type',
530
                arg_type=EntityType.__name__,
531
            )
532
533
        cmd = XmlCommand('create_tag')
534
        cmd.add_element('name', name)
535
536
        _xmlresources = cmd.add_element("resources")
537
        if resource_filter is not None:
538
            _xmlresources.set_attribute("filter", resource_filter)
539
540
        for resource_id in resource_ids or []:
541
            _xmlresources.add_element(
542
                "resource", attrs={"id": str(resource_id)}
543
            )
544
545
        _actual_resource_type = resource_type
546
        if resource_type.value == EntityType.AUDIT.value:
547
            _actual_resource_type = EntityType.TASK
548
        elif resource_type.value == EntityType.POLICY.value:
549
            _actual_resource_type = EntityType.SCAN_CONFIG
550
        _xmlresources.add_element("type", _actual_resource_type.value)
551
552
        if comment:
553
            cmd.add_element("comment", comment)
554
555
        if value:
556
            cmd.add_element("value", value)
557
558
        if active is not None:
559
            if active:
560
                cmd.add_element("active", "1")
561
            else:
562
                cmd.add_element("active", "0")
563
564
        return self._send_xml_command(cmd)
565
566 View Code Duplication
    def create_task(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
567
        self,
568
        name: str,
569
        config_id: str,
570
        target_id: str,
571
        scanner_id: str,
572
        *,
573
        alterable: Optional[bool] = None,
574
        hosts_ordering: Optional[HostsOrdering] = None,
575
        schedule_id: Optional[str] = None,
576
        alert_ids: Optional[List[str]] = None,
577
        comment: Optional[str] = None,
578
        schedule_periods: Optional[int] = None,
579
        observers: Optional[List[str]] = None,
580
        preferences: Optional[dict] = None,
581
    ) -> Any:
582
        """Create a new scan task
583
584
        Arguments:
585
            name: Name of the task
586
            config_id: UUID of scan config to use by the task
587
            target_id: UUID of target to be scanned
588
            scanner_id: UUID of scanner to use for scanning the target
589
            comment: Comment for the task
590
            alterable: Whether the task should be alterable
591
            alert_ids: List of UUIDs for alerts to be applied to the task
592
            hosts_ordering: The order hosts are scanned in
593
            schedule_id: UUID of a schedule when the task should be run.
594
            schedule_periods: A limit to the number of times the task will be
595
                scheduled, or 0 for no limit
596
            observers: List of names or ids of users which should be allowed to
597
                observe this task
598
            preferences: Name/Value pairs of scanner preferences.
599
600
        Returns:
601
            The response. See :py:meth:`send_command` for details.
602
        """
603
        return self.__create_task(
604
            name=name,
605
            config_id=config_id,
606
            target_id=target_id,
607
            scanner_id=scanner_id,
608
            usage_type=UsageType.SCAN,
609
            function=self.create_task.__name__,
610
            alterable=alterable,
611
            hosts_ordering=hosts_ordering,
612
            schedule_id=schedule_id,
613
            alert_ids=alert_ids,
614
            comment=comment,
615
            schedule_periods=schedule_periods,
616
            observers=observers,
617
            preferences=preferences,
618
        )
619
620 View Code Duplication
    def create_tls_certificate(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
621
        self,
622
        name: str,
623
        certificate: str,
624
        *,
625
        comment: Optional[str] = None,
626
        trust: Optional[bool] = None,
627
    ) -> Any:
628
        """Create a new TLS certificate
629
630
        Arguments:
631
            name: Name of the TLS certificate, defaulting to the MD5
632
                fingerprint.
633
            certificate: The Base64 encoded certificate data (x.509 DER or PEM).
634
            comment: Comment for the TLS certificate.
635
            trust: Whether the certificate is trusted.
636
637
        Returns:
638
            The response. See :py:meth:`send_command` for details.
639
        """
640
        if not name:
641
            raise RequiredArgument(
642
                function=self.create_tls_certificate.__name__, argument='name'
643
            )
644
        if not certificate:
645
            raise RequiredArgument(
646
                function=self.create_tls_certificate.__name__,
647
                argument='certificate',
648
            )
649
650
        cmd = XmlCommand("create_tls_certificate")
651
652
        if comment:
653
            cmd.add_element("comment", comment)
654
655
        cmd.add_element("name", name)
656
        cmd.add_element("certificate", certificate)
657
658
        if trust:
659
            cmd.add_element("trust", _to_bool(trust))
660
661
        return self._send_xml_command(cmd)
662
663
    def get_aggregates(
664
        self,
665
        resource_type: EntityType,
666
        *,
667
        filter: Optional[str] = None,
668
        filter_id: Optional[str] = None,
669
        sort_criteria: Optional[list] = None,
670
        data_columns: Optional[list] = None,
671
        group_column: Optional[str] = None,
672
        subgroup_column: Optional[str] = None,
673
        text_columns: Optional[list] = None,
674
        first_group: Optional[int] = None,
675
        max_groups: Optional[int] = None,
676
        mode: Optional[int] = None,
677
        **kwargs,
678
    ) -> Any:
679
        """Request aggregated information on a resource / entity type
680
681
        Additional arguments can be set via the kwargs parameter for backward
682
        compatibility with older versions of python-gvm, but are not validated.
683
684
        Arguments:
685
            resource_type: The entity type to gather data from
686
            filter: Filter term to use for the query
687
            filter_id: UUID of an existing filter to use for the query
688
            sort_criteria: List of sort criteria (dicts that can contain
689
                a field, stat and order)
690
            data_columns: List of fields to aggregate data from
691
            group_column: The field to group the entities by
692
            subgroup_column: The field to further group the entities
693
                inside groups by
694
            text_columns: List of simple text columns which no statistics
695
                are calculated for
696
            first_group: The index of the first aggregate group to return
697
            max_groups: The maximum number of aggregate groups to return,
698
                -1 for all
699
            mode: Special mode for aggregation
700
701
        Returns:
702
            The response. See :py:meth:`send_command` for details.
703
        """
704
        if not resource_type:
705
            raise RequiredArgument(
706
                function=self.get_aggregates.__name__, argument='resource_type'
707
            )
708
709
        if not isinstance(resource_type, self.types.EntityType):
710
            raise InvalidArgumentType(
711
                function=self.get_aggregates.__name__,
712
                argument='resource_type',
713
                arg_type=self.types.EntityType.__name__,
714
            )
715
716
        cmd = XmlCommand('get_aggregates')
717
718
        _actual_resource_type = resource_type
719
        if resource_type.value == EntityType.AUDIT.value:
720
            _actual_resource_type = EntityType.TASK
721
            cmd.set_attribute('usage_type', 'audit')
722
        elif resource_type.value == EntityType.POLICY.value:
723
            _actual_resource_type = EntityType.SCAN_CONFIG
724
            cmd.set_attribute('usage_type', 'policy')
725
        elif resource_type.value == EntityType.SCAN_CONFIG.value:
726
            cmd.set_attribute('usage_type', 'scan')
727
        elif resource_type.value == EntityType.TASK.value:
728
            cmd.set_attribute('usage_type', 'scan')
729
        cmd.set_attribute('type', _actual_resource_type.value)
730
731
        _add_filter(cmd, filter, filter_id)
732
733
        if first_group is not None:
734
            if not isinstance(first_group, int):
735
                raise InvalidArgumentType(
736
                    function=self.get_aggregates.__name__,
737
                    argument='first_group',
738
                    arg_type=int.__name__,
739
                )
740
            cmd.set_attribute('first_group', str(first_group))
741
742
        if max_groups is not None:
743
            if not isinstance(max_groups, int):
744
                raise InvalidArgumentType(
745
                    function=self.get_aggregates.__name__,
746
                    argument='max_groups',
747
                    arg_type=int.__name__,
748
                )
749
            cmd.set_attribute('max_groups', str(max_groups))
750
751
        if sort_criteria is not None:
752
            if not isinstance(sort_criteria, list):
753
                raise InvalidArgumentType(
754
                    function=self.get_aggregates.__name__,
755
                    argument='sort_criteria',
756
                    arg_type=list.__name__,
757
                )
758
            for sort in sort_criteria:
759
                if not isinstance(sort, dict):
760
                    raise InvalidArgumentType(
761
                        function=self.get_aggregates.__name__,
762
                        argument='sort_criteria',
763
                    )
764
765
                sort_elem = cmd.add_element('sort')
766
                if sort.get('field'):
767
                    sort_elem.set_attribute('field', sort.get('field'))
768
769
                if sort.get('stat'):
770
                    if isinstance(sort['stat'], AggregateStatistic):
771
                        sort_elem.set_attribute('stat', sort['stat'].value)
772
                    else:
773
                        stat = get_aggregate_statistic_from_string(sort['stat'])
774
                        sort_elem.set_attribute('stat', stat.value)
775
776
                if sort.get('order'):
777
                    if isinstance(sort['order'], SortOrder):
778
                        sort_elem.set_attribute('order', sort['order'].value)
779
                    else:
780
                        so = get_sort_order_from_string(sort['order'])
781
                        sort_elem.set_attribute('order', so.value)
782
783
        if data_columns is not None:
784
            if not isinstance(data_columns, list):
785
                raise InvalidArgumentType(
786
                    function=self.get_aggregates.__name__,
787
                    argument='data_columns',
788
                    arg_type=list.__name__,
789
                )
790
            for column in data_columns:
791
                cmd.add_element('data_column', column)
792
793
        if group_column is not None:
794
            cmd.set_attribute('group_column', group_column)
795
796
        if subgroup_column is not None:
797
            if not group_column:
798
                raise RequiredArgument(
799
                    '{} requires a group_column argument'
800
                    ' if subgroup_column is given'.format(
801
                        self.get_aggregates.__name__
802
                    ),
803
                    function=self.get_aggregates.__name__,
804
                    argument='subgroup_column',
805
                )
806
            cmd.set_attribute('subgroup_column', subgroup_column)
807
808
        if text_columns is not None:
809
            if not isinstance(text_columns, list):
810
                raise InvalidArgumentType(
811
                    function=self.get_aggregates.__name__,
812
                    argument='text_columns',
813
                    arg_type=list.__name__,
814
                )
815
            for column in text_columns:
816
                cmd.add_element('text_column', column)
817
818
        if mode is not None:
819
            cmd.set_attribute('mode', mode)
820
821
        # Add additional keyword args as attributes for backward compatibility.
822
        cmd.set_attributes(kwargs)
823
824
        return self._send_xml_command(cmd)
825
826 View Code Duplication
    def get_tls_certificates(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
827
        self,
828
        *,
829
        filter: Optional[str] = None,
830
        filter_id: Optional[str] = None,
831
        include_certificate_data: Optional[bool] = None,
832
        details: Optional[bool] = None,
833
    ) -> Any:
834
        """Request a list of TLS certificates
835
836
        Arguments:
837
            filter: Filter term to use for the query
838
            filter_id: UUID of an existing filter to use for the query
839
            include_certificate_data: Whether to include the certificate data in
840
                the response
841
842
        Returns:
843
            The response. See :py:meth:`send_command` for details.
844
        """
845
846
        cmd = XmlCommand("get_tls_certificates")
847
848
        _add_filter(cmd, filter, filter_id)
849
850
        if details is not None:
851
            cmd.set_attribute("details", _to_bool(details))
852
853
        if include_certificate_data is not None:
854
            cmd.set_attribute(
855
                "include_certificate_data", _to_bool(include_certificate_data)
856
            )
857
858
        return self._send_xml_command(cmd)
859
860
    def get_tls_certificate(self, tls_certificate_id: str) -> Any:
861
        """Request a single TLS certificate
862
863
        Arguments:
864
            tls_certificate_id: UUID of an existing TLS certificate
865
866
        Returns:
867
            The response. See :py:meth:`send_command` for details.
868
        """
869
        cmd = XmlCommand("get_tls_certificates")
870
871
        if not tls_certificate_id:
872
            raise RequiredArgument(
873
                function=self.get_tls_certificate.__name__,
874
                argument='tls_certificate_id',
875
            )
876
877
        cmd.set_attribute("tls_certificate_id", tls_certificate_id)
878
879
        # for single tls certificate always request cert data
880
        cmd.set_attribute("include_certificate_data", "1")
881
882
        # for single entity always request all details
883
        cmd.set_attribute("details", "1")
884
885
        return self._send_xml_command(cmd)
886
887 View Code Duplication
    def modify_alert(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
888
        self,
889
        alert_id: str,
890
        *,
891
        name: Optional[str] = None,
892
        comment: Optional[str] = None,
893
        filter_id: Optional[str] = None,
894
        event: Optional[AlertEvent] = None,
895
        event_data: Optional[dict] = None,
896
        condition: Optional[AlertCondition] = None,
897
        condition_data: Optional[dict] = None,
898
        method: Optional[AlertMethod] = None,
899
        method_data: Optional[dict] = None,
900
    ) -> Any:
901
        """Modifies an existing alert.
902
903
        Arguments:
904
            alert_id: UUID of the alert to be modified.
905
            name: Name of the Alert.
906
            condition: The condition that must be satisfied for the alert to
907
                occur. If the event is either 'Updated SecInfo
908
                arrived' or 'New SecInfo arrived', condition must be 'Always'.
909
                Otherwise, condition can also be on of 'Severity at least',
910
                'Filter count changed' or 'Filter count at least'.
911
            condition_data: Data that defines the condition
912
            event: The event that must happen for the alert to occur, one of
913
                'Task run status changed', 'Updated SecInfo arrived' or
914
                'New SecInfo arrived'
915
            event_data: Data that defines the event
916
            method: The method by which the user is alerted, one of 'SCP',
917
                'Send', 'SMB', 'SNMP', 'Syslog' or 'Email';
918
                if the event is neither 'Updated SecInfo arrived' nor
919
                'New SecInfo arrived', method can also be one of 'Start Task',
920
                'HTTP Get', 'Sourcefire Connector' or 'verinice Connector'.
921
            method_data: Data that defines the method
922
            filter_id: Filter to apply when executing alert
923
            comment: Comment for the alert
924
925
        Returns:
926
            The response. See :py:meth:`send_command` for details.
927
        """
928
929
        if not alert_id:
930
            raise RequiredArgument(
931
                function=self.modify_alert.__name__, argument='alert_id'
932
            )
933
934
        cmd = XmlCommand("modify_alert")
935
        cmd.set_attribute("alert_id", str(alert_id))
936
937
        if name:
938
            cmd.add_element("name", name)
939
940
        if comment:
941
            cmd.add_element("comment", comment)
942
943
        if filter_id:
944
            cmd.add_element("filter", attrs={"id": filter_id})
945
946
        if condition:
947
            if not isinstance(condition, AlertCondition):
948
                raise InvalidArgumentType(
949
                    function=self.modify_alert.__name__,
950
                    argument='condition',
951
                    arg_type=AlertCondition.__name__,
952
                )
953
954
            conditions = cmd.add_element("condition", condition.value)
955
956
            if condition_data is not None:
957
                for key, value in condition_data.items():
958
                    _data = conditions.add_element("data", value)
959
                    _data.add_element("name", key)
960
961
        if method:
962
            if not isinstance(method, AlertMethod):
963
                raise InvalidArgumentType(
964
                    function=self.modify_alert.__name__,
965
                    argument='method',
966
                    arg_type=AlertMethod.__name__,
967
                )
968
969
            methods = cmd.add_element("method", method.value)
970
971
            if method_data is not None:
972
                for key, value in method_data.items():
973
                    _data = methods.add_element("data", value)
974
                    _data.add_element("name", key)
975
976
        if event:
977
            if not isinstance(event, AlertEvent):
978
                raise InvalidArgumentType(
979
                    function=self.modify_alert.__name__,
980
                    argument='event',
981
                    arg_type=AlertEvent.__name__,
982
                )
983
984
            _check_event(event, condition, method)
985
986
            events = cmd.add_element("event", event.value)
987
988
            if event_data is not None:
989
                for key, value in event_data.items():
990
                    _data = events.add_element("data", value)
991
                    _data.add_element("name", key)
992
993
        return self._send_xml_command(cmd)
994
995
    def modify_audit(
996
        self,
997
        audit_id: str,
998
        *,
999
        name: Optional[str] = None,
1000
        policy_id: Optional[str] = None,
1001
        target_id: Optional[str] = None,
1002
        scanner_id: Optional[str] = None,
1003
        alterable: Optional[bool] = None,
1004
        hosts_ordering: Optional[HostsOrdering] = None,
1005
        schedule_id: Optional[str] = None,
1006
        schedule_periods: Optional[int] = None,
1007
        comment: Optional[str] = None,
1008
        alert_ids: Optional[List[str]] = None,
1009
        observers: Optional[List[str]] = None,
1010
        preferences: Optional[dict] = None,
1011
    ) -> Any:
1012
        """Modifies an existing task.
1013
1014
        Arguments:
1015
            audit_id: UUID of audit to modify.
1016
            name: The name of the audit.
1017
            policy_id: UUID of policy to use by the audit
1018
            target_id: UUID of target to be scanned
1019
            scanner_id: UUID of scanner to use for scanning the target
1020
            comment: The comment on the audit.
1021
            alert_ids: List of UUIDs for alerts to be applied to the audit
1022
            hosts_ordering: The order hosts are scanned in
1023
            schedule_id: UUID of a schedule when the audit should be run.
1024
            schedule_periods: A limit to the number of times the audit will be
1025
                scheduled, or 0 for no limit.
1026
            observers: List of names or ids of users which should be allowed to
1027
                observe this audit
1028
            preferences: Name/Value pairs of scanner preferences.
1029
1030
        Returns:
1031
            The response. See :py:meth:`send_command` for details.
1032
        """
1033
        self.modify_task(
1034
            task_id=audit_id,
1035
            name=name,
1036
            config_id=policy_id,
1037
            target_id=target_id,
1038
            scanner_id=scanner_id,
1039
            alterable=alterable,
1040
            hosts_ordering=hosts_ordering,
1041
            schedule_id=schedule_id,
1042
            schedule_periods=schedule_periods,
1043
            comment=comment,
1044
            alert_ids=alert_ids,
1045
            observers=observers,
1046
            preferences=preferences,
1047
        )
1048
1049
    def modify_permission(
1050
        self,
1051
        permission_id: str,
1052
        *,
1053
        comment: Optional[str] = None,
1054
        name: Optional[str] = None,
1055
        resource_id: Optional[str] = None,
1056
        resource_type: Optional[EntityType] = None,
1057
        subject_id: Optional[str] = None,
1058
        subject_type: Optional[PermissionSubjectType] = None,
1059
    ) -> Any:
1060
        """Modifies an existing permission.
1061
1062
        Arguments:
1063
            permission_id: UUID of permission to be modified.
1064
            comment: The comment on the permission.
1065
            name: Permission name, currently the name of a command.
1066
            subject_id: UUID of subject to whom the permission is granted
1067
            subject_type: Type of the subject user, group or role
1068
            resource_id: UUID of entity to which the permission applies
1069
            resource_type: Type of the resource. For Super permissions user,
1070
                group or role
1071
1072
        Returns:
1073
            The response. See :py:meth:`send_command` for details.
1074
        """
1075
        if not permission_id:
1076
            raise RequiredArgument(
1077
                function=self.modify_permission.__name__,
1078
                argument='permission_id',
1079
            )
1080
1081
        cmd = XmlCommand("modify_permission")
1082
        cmd.set_attribute("permission_id", permission_id)
1083
1084
        if comment:
1085
            cmd.add_element("comment", comment)
1086
1087
        if name:
1088
            cmd.add_element("name", name)
1089
1090 View Code Duplication
        if resource_id or resource_type:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1091
            if not resource_id:
1092
                raise RequiredArgument(
1093
                    function=self.modify_permission.__name__,
1094
                    argument='resource_id',
1095
                )
1096
1097
            if not resource_type:
1098
                raise RequiredArgument(
1099
                    function=self.modify_permission.__name__,
1100
                    argument='resource_type',
1101
                )
1102
1103
            if not isinstance(resource_type, self.types.EntityType):
1104
                raise InvalidArgumentType(
1105
                    function=self.modify_permission.__name__,
1106
                    argument='resource_type',
1107
                    arg_type=self.types.EntityType.__name__,
1108
                )
1109
1110
            _xmlresource = cmd.add_element(
1111
                "resource", attrs={"id": resource_id}
1112
            )
1113
            _actual_resource_type = resource_type
1114
            if resource_type.value == EntityType.AUDIT.value:
1115
                _actual_resource_type = EntityType.TASK
1116
            elif resource_type.value == EntityType.POLICY.value:
1117
                _actual_resource_type = EntityType.SCAN_CONFIG
1118
            _xmlresource.add_element("type", _actual_resource_type.value)
1119
1120 View Code Duplication
        if subject_id or subject_type:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1121
            if not subject_id:
1122
                raise RequiredArgument(
1123
                    function=self.modify_permission.__name__,
1124
                    argument='subject_id',
1125
                )
1126
1127
            if not isinstance(subject_type, PermissionSubjectType):
1128
                raise InvalidArgumentType(
1129
                    function=self.modify_permission.__name__,
1130
                    argument='subject_type',
1131
                    arg_type=PermissionSubjectType.__name__,
1132
                )
1133
1134
            _xmlsubject = cmd.add_element("subject", attrs={"id": subject_id})
1135
            _xmlsubject.add_element("type", subject_type.value)
1136
1137
        return self._send_xml_command(cmd)
1138
1139
    def modify_policy_set_nvt_preference(
1140
        self,
1141
        policy_id: str,
1142
        name: str,
1143
        nvt_oid: str,
1144
        *,
1145
        value: Optional[str] = None,
1146
    ) -> Any:
1147
        """Modifies the nvt preferences of an existing policy.
1148
1149
        Arguments:
1150
            policy_id: UUID of policy to modify.
1151
            name: Name for preference to change.
1152
            nvt_oid: OID of the NVT associated with preference to modify
1153
            value: New value for the preference. None to delete the preference
1154
                and to use the default instead.
1155
        """
1156
        self.modify_config_set_nvt_preference(
1157
            config_id=policy_id, name=name, nvt_oid=nvt_oid, value=value
1158
        )
1159
1160
    def modify_policy_set_name(self, policy_id: str, name: str) -> Any:
1161
        """Modifies the name of an existing policy
1162
1163
        Arguments:
1164
            config_id: UUID of policy to modify.
1165
            name: New name for the config.
1166
        """
1167
        self.modify_config_set_name(config_id=policy_id, name=name)
1168
1169
    def modify_policy_set_comment(
1170
        self, policy_id: str, comment: Optional[str] = ""
1171
    ) -> Any:
1172
        """Modifies the comment of an existing policy
1173
1174
        Arguments:
1175
            policy_id: UUID of policy to modify.
1176
            comment: Comment to set on a config. Default: ''
1177
        """
1178
        self.modify_config_set_comment(config_id=policy_id, comment=comment)
1179
1180
    def modify_policy_set_scanner_preference(
1181
        self, policy_id: str, name: str, *, value: Optional[str] = None
1182
    ) -> Any:
1183
        """Modifies the scanner preferences of an existing policy
1184
1185
        Arguments:
1186
            policy_id: UUID of policy to modify.
1187
            name: Name of the scanner preference to change
1188
            value: New value for the preference. None to delete the preference
1189
                and to use the default instead.
1190
1191
        """
1192
        self.modify_config_set_scanner_preference(
1193
            config_id=policy_id, name=name, value=value
1194
        )
1195
1196
    def modify_policy_set_nvt_selection(
1197
        self, policy_id: str, family: str, nvt_oids: List[str]
1198
    ) -> Any:
1199
        """Modifies the selected nvts of an existing policy
1200
1201
        The manager updates the given family in the config to include only the
1202
        given NVTs.
1203
1204
        Arguments:
1205
            policy_id: UUID of policy to modify.
1206
            family: Name of the NVT family to include NVTs from
1207
            nvt_oids: List of NVTs to select for the family.
1208
        """
1209
        self.modify_config_set_nvt_selection(
1210
            config_id=policy_id, family=family, nvt_oids=nvt_oids
1211
        )
1212
1213
    def modify_policy_set_family_selection(
1214
        self,
1215
        policy_id: str,
1216
        families: List[Tuple[str, bool, bool]],
1217
        *,
1218
        auto_add_new_families: Optional[bool] = True,
1219
    ) -> Any:
1220
        """
1221
        Selected the NVTs of a policy at a family level.
1222
1223
        Arguments:
1224
            policy_id: UUID of policy to modify.
1225
            families: A list of tuples with the first entry being the name
1226
                of the NVT family selected, second entry a boolean indicating
1227
                whether new NVTs should be added to the family automatically,
1228
                and third entry a boolean indicating whether all nvts from
1229
                the family should be included.
1230
            auto_add_new_families: Whether new families should be added to the
1231
                policy automatically. Default: True.
1232
        """
1233
        self.modify_config_set_family_selection(
1234
            config_id=policy_id,
1235
            families=families,
1236
            auto_add_new_families=auto_add_new_families,
1237
        )
1238
1239
    def modify_tag(
1240
        self,
1241
        tag_id: str,
1242
        *,
1243
        comment: Optional[str] = None,
1244
        name: Optional[str] = None,
1245
        value=None,
1246
        active=None,
1247
        resource_action: Optional[str] = None,
1248
        resource_type: Optional[EntityType] = None,
1249
        resource_filter: Optional[str] = None,
1250
        resource_ids: Optional[List[str]] = None,
1251
    ) -> Any:
1252
        """Modifies an existing tag.
1253
1254
        Arguments:
1255
            tag_id: UUID of the tag.
1256
            comment: Comment to add to the tag.
1257
            name: Name of the tag.
1258
            value: Value of the tag.
1259
            active: Whether the tag is active.
1260
            resource_action: Whether to add or remove resources instead of
1261
                overwriting. One of '', 'add', 'set' or 'remove'.
1262
            resource_type: Type of the resources to which to attach the tag.
1263
                Required if resource_filter is set.
1264
            resource_filter: Filter term to select resources the tag is to be
1265
                attached to.
1266
            resource_ids: IDs of the resources to which to attach the tag.
1267
1268
        Returns:
1269
            The response. See :py:meth:`send_command` for details.
1270
        """
1271
        if not tag_id:
1272
            raise RequiredArgument(
1273
                function=self.modify_tag.__name__, argument='tag_id'
1274
            )
1275
1276
        cmd = XmlCommand("modify_tag")
1277
        cmd.set_attribute("tag_id", str(tag_id))
1278
1279
        if comment:
1280
            cmd.add_element("comment", comment)
1281
1282
        if name:
1283
            cmd.add_element("name", name)
1284
1285
        if value:
1286
            cmd.add_element("value", value)
1287
1288
        if active is not None:
1289
            cmd.add_element("active", _to_bool(active))
1290
1291
        if resource_action or resource_filter or resource_ids or resource_type:
1292
            if resource_filter and not resource_type:
1293
                raise RequiredArgument(
1294
                    function=self.modify_tag.__name__, argument='resource_type'
1295
                )
1296
1297
            _xmlresources = cmd.add_element("resources")
1298
            if resource_action is not None:
1299
                _xmlresources.set_attribute("action", resource_action)
1300
1301
            if resource_filter is not None:
1302
                _xmlresources.set_attribute("filter", resource_filter)
1303
1304
            for resource_id in resource_ids or []:
1305
                _xmlresources.add_element(
1306
                    "resource", attrs={"id": str(resource_id)}
1307
                )
1308
1309
            if resource_type is not None:
1310
                if not isinstance(resource_type, self.types.EntityType):
1311
                    raise InvalidArgumentType(
1312
                        function=self.modify_tag.__name__,
1313
                        argument="resource_type",
1314
                        arg_type=EntityType.__name__,
1315
                    )
1316
                _actual_resource_type = resource_type
1317
                if resource_type.value == EntityType.AUDIT.value:
1318
                    _actual_resource_type = EntityType.TASK
1319
                elif resource_type.value == EntityType.POLICY.value:
1320
                    _actual_resource_type = EntityType.SCAN_CONFIG
1321
                _xmlresources.add_element("type", _actual_resource_type.value)
1322
1323
        return self._send_xml_command(cmd)
1324
1325 View Code Duplication
    def modify_tls_certificate(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1326
        self,
1327
        tls_certificate_id: str,
1328
        *,
1329
        name: Optional[str] = None,
1330
        comment: Optional[str] = None,
1331
        trust: Optional[bool] = None,
1332
    ) -> Any:
1333
        """Modifies an existing TLS certificate.
1334
1335
        Arguments:
1336
            tls_certificate_id: UUID of the TLS certificate to be modified.
1337
            name: Name of the TLS certificate, defaulting to the MD5 fingerprint
1338
            comment: Comment for the TLS certificate.
1339
            trust: Whether the certificate is trusted.
1340
1341
        Returns:
1342
            The response. See :py:meth:`send_command` for details.
1343
        """
1344
        if not tls_certificate_id:
1345
            raise RequiredArgument(
1346
                function=self.modify_tls_certificate.__name__,
1347
                argument='tls_certificate_id',
1348
            )
1349
1350
        cmd = XmlCommand("modify_tls_certificate")
1351
        cmd.set_attribute("tls_certificate_id", str(tls_certificate_id))
1352
1353
        if comment:
1354
            cmd.add_element("comment", comment)
1355
1356
        if name:
1357
            cmd.add_element("name", name)
1358
1359
        if trust:
1360
            cmd.add_element("trust", _to_bool(trust))
1361
1362
        return self._send_xml_command(cmd)
1363
1364
    def clone_tls_certificate(self, tls_certificate_id: str) -> Any:
1365
        """Modifies an existing TLS certificate.
1366
1367
        Arguments:
1368
            tls_certificate_id: The UUID of an existing TLS certificate
1369
1370
        Returns:
1371
            The response. See :py:meth:`send_command` for details.
1372
        """
1373
        if not tls_certificate_id:
1374
            raise RequiredArgument(
1375
                function=self.clone_tls_certificate.__name__,
1376
                argument='tls_certificate_id',
1377
            )
1378
1379
        cmd = XmlCommand("create_tls_certificate")
1380
1381
        cmd.add_element("copy", tls_certificate_id)
1382
1383
        return self._send_xml_command(cmd)
1384
1385
    def get_configs(
1386
        self,
1387
        *,
1388
        filter: Optional[str] = None,
1389
        filter_id: Optional[str] = None,
1390
        trash: Optional[bool] = None,
1391
        details: Optional[bool] = None,
1392
        families: Optional[bool] = None,
1393
        preferences: Optional[bool] = None,
1394
        tasks: Optional[bool] = None,
1395
    ) -> Any:
1396
        """Request a list of scan configs
1397
1398
        Arguments:
1399
            filter: Filter term to use for the query
1400
            filter_id: UUID of an existing filter to use for the query
1401
            trash: Whether to get the trashcan scan configs instead
1402
            details: Whether to get config families, preferences, nvt selectors
1403
                and tasks.
1404
            families: Whether to include the families if no details are
1405
                requested
1406
            preferences: Whether to include the preferences if no details are
1407
                requested
1408
            tasks: Whether to get tasks using this config
1409
1410
        Returns:
1411
            The response. See :py:meth:`send_command` for details.
1412
        """
1413
        return self.__get_configs(
1414
            UsageType.SCAN,
1415
            filter=filter,
1416
            filter_id=filter_id,
1417
            trash=trash,
1418
            details=details,
1419
            families=families,
1420
            preferences=preferences,
1421
            tasks=tasks,
1422
        )
1423
1424
    def get_policies(
1425
        self,
1426
        *,
1427
        audits: Optional[bool] = None,
1428
        filter: Optional[str] = None,
1429
        filter_id: Optional[str] = None,
1430
        details: Optional[bool] = None,
1431
        families: Optional[bool] = None,
1432
        preferences: Optional[bool] = None,
1433
        trash: Optional[bool] = None,
1434
    ) -> Any:
1435
        """Request a list of policies
1436
1437
        Arguments:
1438
            audits: Whether to get audits using the policy
1439
            filter: Filter term to use for the query
1440
            filter_id: UUID of an existing filter to use for the query
1441
            details: Whether to get  families, preferences, nvt selectors
1442
                and tasks.
1443
            families: Whether to include the families if no details are
1444
                requested
1445
            preferences: Whether to include the preferences if no details are
1446
                requested
1447
            trash: Whether to get the trashcan audits instead
1448
1449
        Returns:
1450
            The response. See :py:meth:`send_command` for details.
1451
        """
1452
        return self.__get_configs(
1453
            UsageType.POLICY,
1454
            filter=filter,
1455
            filter_id=filter_id,
1456
            details=details,
1457
            families=families,
1458
            preferences=preferences,
1459
            tasks=audits,
1460
            trash=trash,
1461
        )
1462
1463
    def get_config(
1464
        self, config_id: str, *, tasks: Optional[bool] = None
1465
    ) -> Any:
1466
        """Request a single scan config
1467
1468
        Arguments:
1469
            config_id: UUID of an existing scan config
1470
            tasks: Whether to get tasks using this config
1471
1472
        Returns:
1473
            The response. See :py:meth:`send_command` for details.
1474
        """
1475
        return self.__get_config(
1476
            config_id=config_id, usage_type=UsageType.SCAN, tasks=tasks
1477
        )
1478
1479
    def get_policy(
1480
        self, policy_id: str, *, audits: Optional[bool] = None
1481
    ) -> Any:
1482
        """Request a single policy
1483
1484
        Arguments:
1485
            policy_id: UUID of an existing policy
1486
            audits: Whether to get audits using this config
1487
1488
        Returns:
1489
            The response. See :py:meth:`send_command` for details.
1490
        """
1491
        return self.__get_config(policy_id, UsageType.POLICY, tasks=audits)
1492
1493
    def get_tasks(
1494
        self,
1495
        *,
1496
        filter: Optional[str] = None,
1497
        filter_id: Optional[str] = None,
1498
        trash: Optional[bool] = None,
1499
        details: Optional[bool] = None,
1500
        schedules_only: Optional[bool] = None,
1501
    ) -> Any:
1502
        """Request a list of tasks
1503
1504
        Arguments:
1505
            filter: Filter term to use for the query
1506
            filter_id: UUID of an existing filter to use for the query
1507
            trash: Whether to get the trashcan tasks instead
1508
            details: Whether to include full task details
1509
            schedules_only: Whether to only include id, name and schedule
1510
                details
1511
1512
        Returns:
1513
            The response. See :py:meth:`send_command` for details.
1514
        """
1515
        return self.__get_tasks(
1516
            UsageType.SCAN,
1517
            filter=filter,
1518
            filter_id=filter_id,
1519
            trash=trash,
1520
            details=details,
1521
            schedules_only=schedules_only,
1522
        )
1523
1524
    def get_audits(
1525
        self,
1526
        *,
1527
        filter: Optional[str] = None,
1528
        filter_id: Optional[str] = None,
1529
        trash: Optional[bool] = None,
1530
        details: Optional[bool] = None,
1531
        schedules_only: Optional[bool] = None,
1532
    ) -> Any:
1533
        """Request a list of audits
1534
1535
        Arguments:
1536
            filter: Filter term to use for the query
1537
            filter_id: UUID of an existing filter to use for the query
1538
            trash: Whether to get the trashcan audits instead
1539
            details: Whether to include full audit details
1540
            schedules_only: Whether to only include id, name and schedule
1541
                details
1542
1543
        Returns:
1544
            The response. See :py:meth:`send_command` for details.
1545
        """
1546
        return self.__get_tasks(
1547
            UsageType.AUDIT,
1548
            filter=filter,
1549
            filter_id=filter_id,
1550
            trash=trash,
1551
            details=details,
1552
            schedules_only=schedules_only,
1553
        )
1554
1555
    def get_task(self, task_id: str) -> Any:
1556
        """Request a single task
1557
1558
        Arguments:
1559
            task_id: UUID of an existing task
1560
1561
        Returns:
1562
            The response. See :py:meth:`send_command` for details.
1563
        """
1564
        return self.__get_task(task_id, UsageType.SCAN)
1565
1566
    def get_audit(self, audit_id: str) -> Any:
1567
        """Request a single audit
1568
1569
        Arguments:
1570
            audit_id: UUID of an existing audit
1571
1572
        Returns:
1573
            The response. See :py:meth:`send_command` for details.
1574
        """
1575
        return self.__get_task(audit_id, UsageType.AUDIT)
1576
1577
    def clone_audit(self, audit_id: str) -> Any:
1578
        """Clone an existing audit
1579
1580
        Arguments:
1581
            audit_id: UUID of existing audit to clone from
1582
1583
        Returns:
1584
            The response. See :py:meth:`send_command` for details.
1585
        """
1586
        if not audit_id:
1587
            raise RequiredArgument(
1588
                function=self.clone_audit.__name__, argument='audit_id'
1589
            )
1590
1591
        cmd = XmlCommand("create_task")
1592
        cmd.add_element("copy", audit_id)
1593
        return self._send_xml_command(cmd)
1594
1595
    def clone_policy(self, policy_id: str) -> Any:
1596
        """Clone a policy from an existing one
1597
1598
        Arguments:
1599
            policy_id: UUID of the existing policy
1600
1601
        Returns:
1602
            The response. See :py:meth:`send_command` for details.
1603
        """
1604
        if not policy_id:
1605
            raise RequiredArgument(
1606
                function=self.clone_policy.__name__, argument='policy_id'
1607
            )
1608
1609
        cmd = XmlCommand("create_config")
1610
        cmd.add_element("copy", policy_id)
1611
        return self._send_xml_command(cmd)
1612
1613
    def delete_audit(
1614
        self, audit_id: str, *, ultimate: Optional[bool] = False
1615
    ) -> Any:
1616
        """Deletes an existing audit
1617
1618
        Arguments:
1619
            audit_id: UUID of the audit to be deleted.
1620
            ultimate: Whether to remove entirely, or to the trashcan.
1621
        """
1622
        if not audit_id:
1623
            raise RequiredArgument(
1624
                function=self.delete_audit.__name__, argument='audit_id'
1625
            )
1626
1627
        cmd = XmlCommand("delete_task")
1628
        cmd.set_attribute("task_id", audit_id)
1629
        cmd.set_attribute("ultimate", _to_bool(ultimate))
1630
1631
        return self._send_xml_command(cmd)
1632
1633
    def delete_policy(
1634
        self, policy_id: str, *, ultimate: Optional[bool] = False
1635
    ) -> Any:
1636
        """Deletes an existing policy
1637
1638
        Arguments:
1639
            policy_id: UUID of the policy to be deleted.
1640
            ultimate: Whether to remove entirely, or to the trashcan.
1641
        """
1642
        if not policy_id:
1643
            raise RequiredArgument(
1644
                function=self.delete_policy.__name__, argument='policy_id'
1645
            )
1646
1647
        cmd = XmlCommand("delete_config")
1648
        cmd.set_attribute("config_id", policy_id)
1649
        cmd.set_attribute("ultimate", _to_bool(ultimate))
1650
1651
        return self._send_xml_command(cmd)
1652
1653
    def delete_tls_certificate(self, tls_certificate_id: str) -> Any:
1654
        """Deletes an existing tls certificate
1655
1656
        Arguments:
1657
            tls_certificate_id: UUID of the tls certificate to be deleted.
1658
        """
1659
        if not tls_certificate_id:
1660
            raise RequiredArgument(
1661
                function=self.delete_tls_certificate.__name__,
1662
                argument='tls_certificate_id',
1663
            )
1664
1665
        cmd = XmlCommand("delete_tls_certificate")
1666
        cmd.set_attribute("tls_certificate_id", tls_certificate_id)
1667
1668
        return self._send_xml_command(cmd)
1669
1670 View Code Duplication
    def __create_task(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1671
        self,
1672
        name: str,
1673
        config_id: str,
1674
        target_id: str,
1675
        scanner_id: str,
1676
        usage_type: UsageType,
1677
        function: str,
1678
        *,
1679
        alterable: Optional[bool] = None,
1680
        hosts_ordering: Optional[HostsOrdering] = None,
1681
        schedule_id: Optional[str] = None,
1682
        alert_ids: Optional[List[str]] = None,
1683
        comment: Optional[str] = None,
1684
        schedule_periods: Optional[int] = None,
1685
        observers: Optional[List[str]] = None,
1686
        preferences: Optional[dict] = None,
1687
    ) -> Any:
1688
        if not name:
1689
            raise RequiredArgument(function=function, argument='name')
1690
1691
        if not config_id:
1692
            raise RequiredArgument(function=function, argument='config_id')
1693
1694
        if not target_id:
1695
            raise RequiredArgument(function=function, argument='target_id')
1696
1697
        if not scanner_id:
1698
            raise RequiredArgument(function=function, argument='scanner_id')
1699
1700
        # don't allow to create a container task with create_task
1701
        if target_id == '0':
1702
            raise InvalidArgument(function=function, argument='target_id')
1703
1704
        cmd = XmlCommand("create_task")
1705
        cmd.add_element("name", name)
1706
        cmd.add_element("usage_type", usage_type.value)
1707
        cmd.add_element("config", attrs={"id": config_id})
1708
        cmd.add_element("target", attrs={"id": target_id})
1709
        cmd.add_element("scanner", attrs={"id": scanner_id})
1710
1711
        if comment:
1712
            cmd.add_element("comment", comment)
1713
1714
        if alterable is not None:
1715
            cmd.add_element("alterable", _to_bool(alterable))
1716
1717
        if hosts_ordering:
1718
            if not isinstance(hosts_ordering, self.types.HostsOrdering):
1719
                raise InvalidArgumentType(
1720
                    function=function,
1721
                    argument='hosts_ordering',
1722
                    arg_type=HostsOrdering.__name__,
1723
                )
1724
            cmd.add_element("hosts_ordering", hosts_ordering.value)
1725
1726
        if alert_ids:
1727
            if isinstance(alert_ids, str):
1728
                deprecation(
1729
                    "Please pass a list as alert_ids parameter to {}. "
1730
                    "Passing a string is deprecated and will be removed in "
1731
                    "future.".format(function)
1732
                )
1733
1734
                # if a single id is given as a string wrap it into a list
1735
                alert_ids = [alert_ids]
1736
            if _is_list_like(alert_ids):
1737
                # parse all given alert id's
1738
                for alert in alert_ids:
1739
                    cmd.add_element("alert", attrs={"id": str(alert)})
1740
1741
        if schedule_id:
1742
            cmd.add_element("schedule", attrs={"id": schedule_id})
1743
1744
            if schedule_periods is not None:
1745
                if (
1746
                    not isinstance(schedule_periods, numbers.Integral)
1747
                    or schedule_periods < 0
1748
                ):
1749
                    raise InvalidArgument(
1750
                        "schedule_periods must be an integer greater or equal "
1751
                        "than 0"
1752
                    )
1753
                cmd.add_element("schedule_periods", str(schedule_periods))
1754
1755
        if observers is not None:
1756
            if not _is_list_like(observers):
1757
                raise InvalidArgumentType(
1758
                    function=function, argument='observers', arg_type='list'
1759
                )
1760
1761
            # gvmd splits by comma and space
1762
            # gvmd tries to lookup each value as user name and afterwards as
1763
            # user id. So both user name and user id are possible
1764
            cmd.add_element("observers", _to_comma_list(observers))
1765
1766
        if preferences is not None:
1767
            if not isinstance(preferences, collections.abc.Mapping):
1768
                raise InvalidArgumentType(
1769
                    function=function,
1770
                    argument='preferences',
1771
                    arg_type=collections.abc.Mapping.__name__,
1772
                )
1773
1774
            _xmlprefs = cmd.add_element("preferences")
1775
            for pref_name, pref_value in preferences.items():
1776
                _xmlpref = _xmlprefs.add_element("preference")
1777
                _xmlpref.add_element("scanner_name", pref_name)
1778
                _xmlpref.add_element("value", str(pref_value))
1779
1780
        return self._send_xml_command(cmd)
1781
1782
    def __create_config(
1783
        self,
1784
        config_id: str,
1785
        name: str,
1786
        usage_type: UsageType,
1787
        function: str,
1788
        *,
1789
        comment: Optional[str] = None,
1790
    ) -> Any:
1791
        if not name:
1792
            raise RequiredArgument(function=function, argument='name')
1793
1794
        if not config_id:
1795
            raise RequiredArgument(function=function, argument='config_id')
1796
1797
        cmd = XmlCommand("create_config")
1798
        if comment is not None:
1799
            cmd.add_element("comment", comment)
1800
        cmd.add_element("copy", config_id)
1801
        cmd.add_element("name", name)
1802
        cmd.add_element("usage_type", usage_type.value)
1803
        return self._send_xml_command(cmd)
1804
1805
    def __create_config_from_osp_scanner(
1806
        self,
1807
        scanner_id: str,
1808
        name: str,
1809
        usage_type: UsageType,
1810
        function: str,
1811
        *,
1812
        comment: Optional[str] = None,
1813
    ) -> Any:
1814
        if not name:
1815
            raise RequiredArgument(function=function, argument='name')
1816
1817
        if not scanner_id:
1818
            raise RequiredArgument(function=function, argument='scanner_id')
1819
1820
        cmd = XmlCommand("create_config")
1821
        if comment is not None:
1822
            cmd.add_element("comment", comment)
1823
        cmd.add_element("scanner", scanner_id)
1824
        cmd.add_element("name", name)
1825
        cmd.add_element("usage_type", usage_type.value)
1826
        return self._send_xml_command(cmd)
1827
1828 View Code Duplication
    def __get_configs(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1829
        self,
1830
        usage_type: UsageType,
1831
        *,
1832
        filter: Optional[str] = None,
1833
        filter_id: Optional[str] = None,
1834
        trash: Optional[bool] = None,
1835
        details: Optional[bool] = None,
1836
        families: Optional[bool] = None,
1837
        preferences: Optional[bool] = None,
1838
        tasks: Optional[bool] = None,
1839
    ) -> Any:
1840
        cmd = XmlCommand("get_configs")
1841
        cmd.set_attribute("usage_type", usage_type.value)
1842
1843
        _add_filter(cmd, filter, filter_id)
1844
1845
        if trash is not None:
1846
            cmd.set_attribute("trash", _to_bool(trash))
1847
1848
        if details is not None:
1849
            cmd.set_attribute("details", _to_bool(details))
1850
1851
        if families is not None:
1852
            cmd.set_attribute("families", _to_bool(families))
1853
1854
        if preferences is not None:
1855
            cmd.set_attribute("preferences", _to_bool(preferences))
1856
1857
        if tasks is not None:
1858
            cmd.set_attribute("tasks", _to_bool(tasks))
1859
1860
        return self._send_xml_command(cmd)
1861
1862
    def __get_config(
1863
        self,
1864
        config_id: str,
1865
        usage_type: UsageType,
1866
        *,
1867
        tasks: Optional[bool] = None,
1868
    ) -> Any:
1869
        if not config_id:
1870
            raise RequiredArgument(
1871
                function=self.get_config.__name__, argument='config_id'
1872
            )
1873
1874
        cmd = XmlCommand("get_configs")
1875
        cmd.set_attribute("config_id", config_id)
1876
1877
        cmd.set_attribute("usage_type", usage_type.value)
1878
1879
        if tasks is not None:
1880
            cmd.set_attribute("tasks", _to_bool(tasks))
1881
1882
        # for single entity always request all details
1883
        cmd.set_attribute("details", "1")
1884
1885
        return self._send_xml_command(cmd)
1886
1887
    def __get_tasks(
1888
        self,
1889
        usage_type: UsageType,
1890
        *,
1891
        filter: Optional[str] = None,
1892
        filter_id: Optional[str] = None,
1893
        trash: Optional[bool] = None,
1894
        details: Optional[bool] = None,
1895
        schedules_only: Optional[bool] = None,
1896
    ) -> Any:
1897
        cmd = XmlCommand("get_tasks")
1898
        cmd.set_attribute("usage_type", usage_type.value)
1899
1900
        _add_filter(cmd, filter, filter_id)
1901
1902
        if trash is not None:
1903
            cmd.set_attribute("trash", _to_bool(trash))
1904
1905
        if details is not None:
1906
            cmd.set_attribute("details", _to_bool(details))
1907
1908
        if schedules_only is not None:
1909
            cmd.set_attribute("schedules_only", _to_bool(schedules_only))
1910
1911
        return self._send_xml_command(cmd)
1912
1913
    def __get_task(self, task_id: str, usage_type: UsageType) -> Any:
1914
        if not task_id:
1915
            raise RequiredArgument(
1916
                function=self.get_task.__name__, argument='task_id'
1917
            )
1918
1919
        cmd = XmlCommand("get_tasks")
1920
        cmd.set_attribute("task_id", task_id)
1921
        cmd.set_attribute("usage_type", usage_type.value)
1922
1923
        # for single entity always request all details
1924
        cmd.set_attribute("details", "1")
1925
        return self._send_xml_command(cmd)
1926
1927
    def resume_audit(self, audit_id: str) -> Any:
1928
        """Resume an existing stopped audit
1929
1930
        Arguments:
1931
            audit_id: UUID of the audit to be resumed
1932
1933
        Returns:
1934
            The response. See :py:meth:`send_command` for details.
1935
        """
1936
        if not audit_id:
1937
            raise RequiredArgument(
1938
                function=self.resume_audit.__name__, argument='audit_id'
1939
            )
1940
1941
        cmd = XmlCommand("resume_task")
1942
        cmd.set_attribute("task_id", audit_id)
1943
1944
        return self._send_xml_command(cmd)
1945
1946
    def start_audit(self, audit_id: str) -> Any:
1947
        """Start an existing audit
1948
1949
        Arguments:
1950
            audit_id: UUID of the audit to be started
1951
1952
        Returns:
1953
            The response. See :py:meth:`send_command` for details.
1954
        """
1955
        if not audit_id:
1956
            raise RequiredArgument(
1957
                function=self.start_audit.__name__, argument='audit_id'
1958
            )
1959
1960
        cmd = XmlCommand("start_task")
1961
        cmd.set_attribute("task_id", audit_id)
1962
1963
        return self._send_xml_command(cmd)
1964
1965
    def stop_audit(self, audit_id: str) -> Any:
1966
        """Stop an existing running audit
1967
1968
        Arguments:
1969
            audit_id: UUID of the audit to be stopped
1970
1971
        Returns:
1972
            The response. See :py:meth:`send_command` for details.
1973
        """
1974
        if not audit_id:
1975
            raise RequiredArgument(
1976
                function=self.stop_audit.__name__, argument='audit_id'
1977
            )
1978
1979
        cmd = XmlCommand("stop_task")
1980
        cmd.set_attribute("task_id", audit_id)
1981
1982
        return self._send_xml_command(cmd)
1983
1984
    def import_report(
1985
        self,
1986
        report: str,
1987
        *,
1988
        task_id: Optional[str] = None,
1989
        in_assets: Optional[bool] = None,
1990
    ) -> Any:
1991
        """Import a Report from XML
1992
1993
        Arguments:
1994
            report: Report XML as string to import. This XML must contain
1995
                a :code:`<report>` root element.
1996
            task_id: UUID of task to import report to
1997
            in_asset: Whether to create or update assets using the report
1998
1999
        Returns:
2000
            The response. See :py:meth:`send_command` for details.
2001
        """
2002
        if not report:
2003
            raise RequiredArgument(
2004
                function=self.import_report.__name__, argument='report'
2005
            )
2006
2007
        cmd = XmlCommand("create_report")
2008
2009
        if task_id:
2010
            cmd.add_element("task", attrs={"id": task_id})
2011
        else:
2012
            raise RequiredArgument(
2013
                function=self.import_report.__name__,
2014
                argument='task_id',
2015
            )
2016
2017
        if in_assets is not None:
2018
            cmd.add_element("in_assets", _to_bool(in_assets))
2019
2020
        try:
2021
            cmd.append_xml_str(report)
2022
        except etree.XMLSyntaxError as e:
2023
            raise InvalidArgument(
2024
                "Invalid xml passed as report to import_report {}".format(e)
2025
            ) from None
2026
2027
        return self._send_xml_command(cmd)
2028