Completed
Push — master ( fd2ed8...4d2b48 )
by Björn
15s queued 11s
created

GmpV9Mixin.modify_audit()   A

Complexity

Conditions 1

Size

Total Lines 52
Code Lines 30

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 30
nop 15
dl 0
loc 52
rs 9.16
c 0
b 0
f 0

How to fix   Long Method    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 - 2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
# pylint: disable=arguments-differ, redefined-builtin, too-many-lines
20
21
"""
22
Module for communication with gvmd in `Greenbone Management Protocol version 9`_
23
24
.. _Greenbone Management Protocol version 9:
25
    https://docs.greenbone.net/API/GMP/gmp-9.0.html
26
"""
27
import collections
28
import numbers
29
30
from typing import Any, List, Optional, Callable
31
32
from gvm.errors import InvalidArgument, InvalidArgumentType, RequiredArgument
33
from gvm.utils import deprecation
34
from gvm.xml import XmlCommand
35
36
from gvm.protocols.base import GvmProtocol
37
from gvm.connections import GvmConnection
38
from gvm.protocols.gmpv7.gmpv7 import (
39
    _to_bool,
40
    _add_filter,
41
    _is_list_like,
42
    _to_comma_list,
43
)
44
45
from . import types
46
from .types import *  # pylint: disable=unused-wildcard-import, wildcard-import
47
from .types import _UsageType as UsageType
48
49
_EMPTY_POLICY_ID = '085569ce-73ed-11df-83c3-002264764cea'
50
51
52
def _check_event(
53
    event: AlertEvent, condition: AlertCondition, method: AlertMethod
54
):
55
    if event == AlertEvent.TASK_RUN_STATUS_CHANGED:
56
        if not condition:
57
            raise RequiredArgument(
58
                "condition is required for event {}".format(event.name)
59
            )
60
61
        if not method:
62
            raise RequiredArgument(
63
                "method is required for event {}".format(event.name)
64
            )
65
66
        if condition not in (
67
            AlertCondition.ALWAYS,
68
            AlertCondition.FILTER_COUNT_CHANGED,
69
            AlertCondition.FILTER_COUNT_AT_LEAST,
70
            AlertCondition.SEVERITY_AT_LEAST,
71
            AlertCondition.SEVERITY_CHANGED,
72
        ):
73
            raise InvalidArgument(
74
                "Invalid condition {} for event {}".format(
75
                    condition.name, event.name
76
                )
77
            )
78
    elif event in (
79
        AlertEvent.NEW_SECINFO_ARRIVED,
80
        AlertEvent.UPDATED_SECINFO_ARRIVED,
81
    ):
82
        if not condition:
83
            raise RequiredArgument(
84
                "condition is required for event {}".format(event.name)
85
            )
86
87
        if not method:
88
            raise RequiredArgument(
89
                "method is required for event {}".format(event.name)
90
            )
91
92
        if condition != AlertCondition.ALWAYS:
93
            raise InvalidArgument(
94
                "Invalid condition {} for event {}".format(
95
                    condition.name, event.name
96
                )
97
            )
98
        if method not in (
99
            AlertMethod.SCP,
100
            AlertMethod.SEND,
101
            AlertMethod.SMB,
102
            AlertMethod.SNMP,
103
            AlertMethod.SYSLOG,
104
            AlertMethod.EMAIL,
105
        ):
106
            raise InvalidArgument(
107
                "Invalid method {} for event {}".format(method.name, event.name)
108
            )
109
    elif event in (
110
        AlertEvent.TICKET_RECEIVED,
111
        AlertEvent.OWNED_TICKET_CHANGED,
112
        AlertEvent.ASSIGNED_TICKET_CHANGED,
113
    ):
114
        if not condition:
115
            raise RequiredArgument(
116
                "condition is required for event {}".format(event.name)
117
            )
118
119
        if not method:
120
            raise RequiredArgument(
121
                "method is required for event {}".format(event.name)
122
            )
123
        if condition != AlertCondition.ALWAYS:
124
            raise InvalidArgument(
125
                "Invalid condition {} for event {}".format(
126
                    condition.name, event.name
127
                )
128
            )
129
        if method not in (
130
            AlertMethod.EMAIL,
131
            AlertMethod.START_TASK,
132
            AlertMethod.SYSLOG,
133
        ):
134
            raise InvalidArgument(
135
                "Invalid method {} for event {}".format(method.name, event.name)
136
            )
137
    elif event is not None:
138
        raise InvalidArgument('Invalid event "{}"'.format(event.name))
139
140
141
class GmpV9Mixin(GvmProtocol):
142
143
    types = types
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable types does not seem to be defined.
Loading history...
144
145
    def __init__(
146
        self,
147
        connection: GvmConnection,
148
        *,
149
        transform: Optional[Callable[[str], Any]] = None
150
    ):
151
        super().__init__(connection, transform=transform)
152
153
        # Is authenticated on gvmd
154
        self._authenticated = False
155
156 View Code Duplication
    def create_alert(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
157
        self,
158
        name: str,
159
        condition: AlertCondition,
160
        event: AlertEvent,
161
        method: AlertMethod,
162
        *,
163
        method_data: Optional[dict] = None,
164
        event_data: Optional[dict] = None,
165
        condition_data: Optional[dict] = None,
166
        filter_id: Optional[int] = None,
167
        comment: Optional[str] = None
168
    ) -> Any:
169
        """Create a new alert
170
171
        Arguments:
172
            name: Name of the new Alert
173
            condition: The condition that must be satisfied for the alert
174
                to occur; if the event is either 'Updated SecInfo arrived' or
175
                'New SecInfo arrived', condition must be 'Always'. Otherwise,
176
                condition can also be on of 'Severity at least', 'Filter count
177
                changed' or 'Filter count at least'.
178
            event: The event that must happen for the alert to occur, one
179
                of 'Task run status changed', 'Updated SecInfo arrived' or 'New
180
                SecInfo arrived'
181
            method: The method by which the user is alerted, one of 'SCP',
182
                'Send', 'SMB', 'SNMP', 'Syslog' or 'Email'; if the event is
183
                neither 'Updated SecInfo arrived' nor 'New SecInfo arrived',
184
                method can also be one of 'Start Task', 'HTTP Get', 'Sourcefire
185
                Connector' or 'verinice Connector'.
186
            condition_data: Data that defines the condition
187
            event_data: Data that defines the event
188
            method_data: Data that defines the method
189
            filter_id: Filter to apply when executing alert
190
            comment: Comment for the alert
191
192
        Returns:
193
            The response. See :py:meth:`send_command` for details.
194
        """
195
        if not name:
196
            raise RequiredArgument(
197
                function=self.create_alert.__name__, argument='name'
198
            )
199
200
        if not condition:
201
            raise RequiredArgument(
202
                function=self.create_alert.__name__, argument='condition'
203
            )
204
205
        if not event:
206
            raise RequiredArgument(
207
                function=self.create_alert.__name__, argument='event'
208
            )
209
210
        if not method:
211
            raise RequiredArgument(
212
                function=self.create_alert.__name__, argument='method'
213
            )
214
215
        if not isinstance(condition, AlertCondition):
216
            raise InvalidArgumentType(
217
                function=self.create_alert.__name__,
218
                argument='condition',
219
                arg_type=AlertCondition.__name__,
220
            )
221
222
        if not isinstance(event, AlertEvent):
223
            raise InvalidArgumentType(
224
                function=self.create_alert.__name__,
225
                argument='even',
226
                arg_type=AlertEvent.__name__,
227
            )
228
229
        if not isinstance(method, AlertMethod):
230
            raise InvalidArgumentType(
231
                function=self.create_alert.__name__,
232
                argument='method',
233
                arg_type=AlertMethod.__name__,
234
            )
235
236
        _check_event(event, condition, method)
237
238
        cmd = XmlCommand("create_alert")
239
        cmd.add_element("name", name)
240
241
        conditions = cmd.add_element("condition", condition.value)
242
243
        if condition_data is not None:
244
            for key, value in condition_data.items():
245
                _data = conditions.add_element("data", value)
246
                _data.add_element("name", key)
247
248
        events = cmd.add_element("event", event.value)
249
250
        if event_data is not None:
251
            for key, value in event_data.items():
252
                _data = events.add_element("data", value)
253
                _data.add_element("name", key)
254
255
        methods = cmd.add_element("method", method.value)
256
257
        if method_data is not None:
258
            for key, value in method_data.items():
259
                _data = methods.add_element("data", value)
260
                _data.add_element("name", key)
261
262
        if filter_id:
263
            cmd.add_element("filter", attrs={"id": filter_id})
264
265
        if comment:
266
            cmd.add_element("comment", comment)
267
268
        return self._send_xml_command(cmd)
269
270 View Code Duplication
    def create_audit(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
271
        self,
272
        name: str,
273
        policy_id: str,
274
        target_id: str,
275
        scanner_id: str,
276
        *,
277
        alterable: Optional[bool] = None,
278
        hosts_ordering: Optional[HostsOrdering] = None,
279
        schedule_id: Optional[str] = None,
280
        alert_ids: Optional[List[str]] = None,
281
        comment: Optional[str] = None,
282
        schedule_periods: Optional[int] = None,
283
        observers: Optional[List[str]] = None,
284
        preferences: Optional[dict] = None
285
    ) -> Any:
286
        """Create a new audit task
287
288
        Arguments:
289
            name: Name of the new audit
290
            policy_id: UUID of policy to use by the audit
291
            target_id: UUID of target to be scanned
292
            scanner_id: UUID of scanner to use for scanning the target
293
            comment: Comment for the audit
294
            alterable: Whether the task should be alterable
295
            alert_ids: List of UUIDs for alerts to be applied to the audit
296
            hosts_ordering: The order hosts are scanned in
297
            schedule_id: UUID of a schedule when the audit should be run.
298
            schedule_periods: A limit to the number of times the audit will be
299
                scheduled, or 0 for no limit
300
            observers: List of names or ids of users which should be allowed to
301
                observe this audit
302
            preferences: Name/Value pairs of scanner preferences.
303
304
        Returns:
305
            The response. See :py:meth:`send_command` for details.
306
        """
307
308
        return self.__create_task(
309
            name=name,
310
            config_id=policy_id,
311
            target_id=target_id,
312
            scanner_id=scanner_id,
313
            usage_type=UsageType.AUDIT,
314
            function=self.create_audit.__name__,
315
            alterable=alterable,
316
            hosts_ordering=hosts_ordering,
317
            schedule_id=schedule_id,
318
            alert_ids=alert_ids,
319
            comment=comment,
320
            schedule_periods=schedule_periods,
321
            observers=observers,
322
            preferences=preferences,
323
        )
324
325
    def create_config(
326
        self, config_id: str, name: str, *, comment: Optional[str] = None
327
    ) -> Any:
328
        """Create a new scan config
329
330
        Arguments:
331
            config_id: UUID of the existing scan config
332
            name: Name of the new scan config
333
            comment: A comment on the config
334
335
        Returns:
336
            The response. See :py:meth:`send_command` for details.
337
        """
338
        return self.__create_config(
339
            config_id=config_id,
340
            name=name,
341
            comment=comment,
342
            usage_type=UsageType.SCAN,
343
            function=self.create_config.__name__,
344
        )
345
346
    def create_config_from_osp_scanner(
347
        self, scanner_id: str, name: str, *, comment: Optional[str] = None
348
    ) -> Any:
349
        """Create a new scan config from an ospd scanner.
350
351
        Create config by retrieving the expected preferences from the given
352
        scanner via OSP.
353
354
        Arguments:
355
            scanner_id: UUID of an OSP scanner to get config data from
356
            name: Name of the new scan config
357
            comment: A comment on the config
358
359
        Returns:
360
            The response. See :py:meth:`send_command` for details.
361
        """
362
        return self.__create_config_from_osp_scanner(
363
            scanner_id=scanner_id,
364
            name=name,
365
            comment=comment,
366
            usage_type=UsageType.SCAN,
367
            function=self.create_config.__name__,
368
        )
369
370
    def create_policy(
371
        self, name: str, *, policy_id: str = None, comment: Optional[str] = None
372
    ) -> Any:
373
        """Create a new policy config
374
375
        Arguments:
376
            name: Name of the new policy
377
            policy_id: UUID of an existing policy as base. By default the empty
378
                policy is used.
379
            comment: A comment on the policy
380
381
        Returns:
382
            The response. See :py:meth:`send_command` for details.
383
        """
384
        if policy_id is None:
385
            policy_id = _EMPTY_POLICY_ID
386
        return self.__create_config(
387
            config_id=policy_id,
388
            name=name,
389
            comment=comment,
390
            usage_type=UsageType.POLICY,
391
            function=self.create_policy.__name__,
392
        )
393
394 View Code Duplication
    def create_task(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
395
        self,
396
        name: str,
397
        config_id: str,
398
        target_id: str,
399
        scanner_id: str,
400
        *,
401
        alterable: Optional[bool] = None,
402
        hosts_ordering: Optional[HostsOrdering] = None,
403
        schedule_id: Optional[str] = None,
404
        alert_ids: Optional[List[str]] = None,
405
        comment: Optional[str] = None,
406
        schedule_periods: Optional[int] = None,
407
        observers: Optional[List[str]] = None,
408
        preferences: Optional[dict] = None
409
    ) -> Any:
410
        """Create a new scan task
411
412
        Arguments:
413
            name: Name of the task
414
            config_id: UUID of scan config to use by the task
415
            target_id: UUID of target to be scanned
416
            scanner_id: UUID of scanner to use for scanning the target
417
            comment: Comment for the task
418
            alterable: Whether the task should be alterable
419
            alert_ids: List of UUIDs for alerts to be applied to the task
420
            hosts_ordering: The order hosts are scanned in
421
            schedule_id: UUID of a schedule when the task should be run.
422
            schedule_periods: A limit to the number of times the task will be
423
                scheduled, or 0 for no limit
424
            observers: List of names or ids of users which should be allowed to
425
                observe this task
426
            preferences: Name/Value pairs of scanner preferences.
427
428
        Returns:
429
            The response. See :py:meth:`send_command` for details.
430
        """
431
        return self.__create_task(
432
            name=name,
433
            config_id=config_id,
434
            target_id=target_id,
435
            scanner_id=scanner_id,
436
            usage_type=UsageType.SCAN,
437
            function=self.create_task.__name__,
438
            alterable=alterable,
439
            hosts_ordering=hosts_ordering,
440
            schedule_id=schedule_id,
441
            alert_ids=alert_ids,
442
            comment=comment,
443
            schedule_periods=schedule_periods,
444
            observers=observers,
445
            preferences=preferences,
446
        )
447
448 View Code Duplication
    def create_tls_certificate(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
449
        self,
450
        name: str,
451
        certificate: str,
452
        *,
453
        comment: Optional[str] = None,
454
        trust: Optional[bool] = None
455
    ) -> Any:
456
        """Create a new TLS certificate
457
458
        Arguments:
459
            name: Name of the TLS certificate, defaulting to the MD5
460
                fingerprint.
461
            certificate: The Base64 encoded certificate data (x.509 DER or PEM).
462
            comment: Comment for the TLS certificate.
463
            trust: Whether the certificate is trusted.
464
465
        Returns:
466
            The response. See :py:meth:`send_command` for details.
467
        """
468
        if not name:
469
            raise RequiredArgument(
470
                function=self.create_tls_certificate.__name__, argument='name'
471
            )
472
        if not certificate:
473
            raise RequiredArgument(
474
                function=self.create_tls_certificate.__name__,
475
                argument='certificate',
476
            )
477
478
        cmd = XmlCommand("create_tls_certificate")
479
480
        if comment:
481
            cmd.add_element("comment", comment)
482
483
        cmd.add_element("name", name)
484
        cmd.add_element("certificate", certificate)
485
486
        if trust:
487
            cmd.add_element("trust", _to_bool(trust))
488
489
        return self._send_xml_command(cmd)
490
491 View Code Duplication
    def get_tls_certificates(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
492
        self,
493
        *,
494
        filter: Optional[str] = None,
495
        filter_id: Optional[str] = None,
496
        include_certificate_data: Optional[bool] = None,
497
        details: Optional[bool] = None
498
    ) -> Any:
499
        """Request a list of TLS certificates
500
501
        Arguments:
502
            filter: Filter term to use for the query
503
            filter_id: UUID of an existing filter to use for the query
504
            include_certificate_data: Whether to include the certificate data in
505
                the response
506
507
        Returns:
508
            The response. See :py:meth:`send_command` for details.
509
        """
510
511
        cmd = XmlCommand("get_tls_certificates")
512
513
        _add_filter(cmd, filter, filter_id)
514
515
        if details is not None:
516
            cmd.set_attribute("details", _to_bool(details))
517
518
        if include_certificate_data is not None:
519
            cmd.set_attribute(
520
                "include_certificate_data", _to_bool(include_certificate_data)
521
            )
522
523
        return self._send_xml_command(cmd)
524
525
    def get_tls_certificate(self, tls_certificate_id: str) -> Any:
526
        """Request a single TLS certificate
527
528
        Arguments:
529
            tls_certificate_id: UUID of an existing TLS certificate
530
531
        Returns:
532
            The response. See :py:meth:`send_command` for details.
533
        """
534
        cmd = XmlCommand("get_tls_certificates")
535
536
        if not tls_certificate_id:
537
            raise RequiredArgument(
538
                function=self.get_tls_certificate.__name__,
539
                argument='tls_certificate_id',
540
            )
541
542
        cmd.set_attribute("tls_certificate_id", tls_certificate_id)
543
544
        # for single tls certificate always request cert data
545
        cmd.set_attribute("include_certificate_data", "1")
546
547
        # for single entity always request all details
548
        cmd.set_attribute("details", "1")
549
550
        return self._send_xml_command(cmd)
551
552 View Code Duplication
    def modify_alert(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
553
        self,
554
        alert_id: str,
555
        *,
556
        name: Optional[str] = None,
557
        comment: Optional[str] = None,
558
        filter_id: Optional[str] = None,
559
        event: Optional[AlertEvent] = None,
560
        event_data: Optional[dict] = None,
561
        condition: Optional[AlertCondition] = None,
562
        condition_data: Optional[dict] = None,
563
        method: Optional[AlertMethod] = None,
564
        method_data: Optional[dict] = None
565
    ) -> Any:
566
        """Modifies an existing alert.
567
568
        Arguments:
569
            alert_id: UUID of the alert to be modified.
570
            name: Name of the Alert.
571
            condition: The condition that must be satisfied for the alert to
572
                occur. If the event is either 'Updated SecInfo
573
                arrived' or 'New SecInfo arrived', condition must be 'Always'.
574
                Otherwise, condition can also be on of 'Severity at least',
575
                'Filter count changed' or 'Filter count at least'.
576
            condition_data: Data that defines the condition
577
            event: The event that must happen for the alert to occur, one of
578
                'Task run status changed', 'Updated SecInfo arrived' or
579
                'New SecInfo arrived'
580
            event_data: Data that defines the event
581
            method: The method by which the user is alerted, one of 'SCP',
582
                'Send', 'SMB', 'SNMP', 'Syslog' or 'Email';
583
                if the event is neither 'Updated SecInfo arrived' nor
584
                'New SecInfo arrived', method can also be one of 'Start Task',
585
                'HTTP Get', 'Sourcefire Connector' or 'verinice Connector'.
586
            method_data: Data that defines the method
587
            filter_id: Filter to apply when executing alert
588
            comment: Comment for the alert
589
590
        Returns:
591
            The response. See :py:meth:`send_command` for details.
592
        """
593
594
        if not alert_id:
595
            raise RequiredArgument(
596
                function=self.modify_alert.__name__, argument='alert_id'
597
            )
598
599
        cmd = XmlCommand("modify_alert")
600
        cmd.set_attribute("alert_id", str(alert_id))
601
602
        if name:
603
            cmd.add_element("name", name)
604
605
        if comment:
606
            cmd.add_element("comment", comment)
607
608
        if filter_id:
609
            cmd.add_element("filter", attrs={"id": filter_id})
610
611
        if condition:
612
            if not isinstance(condition, AlertCondition):
613
                raise InvalidArgumentType(
614
                    function=self.modify_alert.__name__,
615
                    argument='condition',
616
                    arg_type=AlertCondition.__name__,
617
                )
618
619
            conditions = cmd.add_element("condition", condition.value)
620
621
            if condition_data is not None:
622
                for key, value in condition_data.items():
623
                    _data = conditions.add_element("data", value)
624
                    _data.add_element("name", key)
625
626
        if method:
627
            if not isinstance(method, AlertMethod):
628
                raise InvalidArgumentType(
629
                    function=self.modify_alert.__name__,
630
                    argument='method',
631
                    arg_type=AlertMethod.__name__,
632
                )
633
634
            methods = cmd.add_element("method", method.value)
635
636
            if method_data is not None:
637
                for key, value in method_data.items():
638
                    _data = methods.add_element("data", value)
639
                    _data.add_element("name", key)
640
641
        if event:
642
            if not isinstance(event, AlertEvent):
643
                raise InvalidArgumentType(
644
                    function=self.modify_alert.__name__,
645
                    argument='event',
646
                    arg_type=AlertEvent.__name__,
647
                )
648
649
            _check_event(event, condition, method)
650
651
            events = cmd.add_element("event", event.value)
652
653
            if event_data is not None:
654
                for key, value in event_data.items():
655
                    _data = events.add_element("data", value)
656
                    _data.add_element("name", key)
657
658
        return self._send_xml_command(cmd)
659
660
    def modify_audit(
661
        self,
662
        audit_id: str,
663
        *,
664
        name: Optional[str] = None,
665
        policy_id: Optional[str] = None,
666
        target_id: Optional[str] = None,
667
        scanner_id: Optional[str] = None,
668
        alterable: Optional[bool] = None,
669
        hosts_ordering: Optional[HostsOrdering] = None,
670
        schedule_id: Optional[str] = None,
671
        schedule_periods: Optional[int] = None,
672
        comment: Optional[str] = None,
673
        alert_ids: Optional[List[str]] = None,
674
        observers: Optional[List[str]] = None,
675
        preferences: Optional[dict] = None
676
    ) -> Any:
677
        """Modifies an existing task.
678
679
        Arguments:
680
            audit_id: UUID of audit to modify.
681
            name: The name of the audit.
682
            policy_id: UUID of policy to use by the audit
683
            target_id: UUID of target to be scanned
684
            scanner_id: UUID of scanner to use for scanning the target
685
            comment: The comment on the audit.
686
            alert_ids: List of UUIDs for alerts to be applied to the audit
687
            hosts_ordering: The order hosts are scanned in
688
            schedule_id: UUID of a schedule when the audit should be run.
689
            schedule_periods: A limit to the number of times the audit will be
690
                scheduled, or 0 for no limit.
691
            observers: List of names or ids of users which should be allowed to
692
                observe this audit
693
            preferences: Name/Value pairs of scanner preferences.
694
695
        Returns:
696
            The response. See :py:meth:`send_command` for details.
697
        """
698
        self.modify_task(
699
            task_id=audit_id,
700
            name=name,
701
            config_id=policy_id,
702
            target_id=target_id,
703
            scanner_id=scanner_id,
704
            alterable=alterable,
705
            hosts_ordering=hosts_ordering,
706
            schedule_id=schedule_id,
707
            schedule_periods=schedule_periods,
708
            comment=comment,
709
            alert_ids=alert_ids,
710
            observers=observers,
711
            preferences=preferences,
712
        )
713
714
    def modify_policy_set_nvt_preference(
715
        self,
716
        policy_id: str,
717
        name: str,
718
        nvt_oid: str,
719
        *,
720
        value: Optional[str] = None
721
    ) -> Any:
722
        """Modifies the nvt preferences of an existing policy.
723
724
        Arguments:
725
            policy_id: UUID of policy to modify.
726
            name: Name for preference to change.
727
            nvt_oid: OID of the NVT associated with preference to modify
728
            value: New value for the preference. None to delete the preference
729
                and to use the default instead.
730
        """
731
        self.modify_config_set_nvt_preference(
732
            config_id=policy_id,
733
            name=name,
734
            nvt_oid=nvt_oid,
735
            value=value,
736
        )
737
738
    def modify_policy_set_name(self, policy_id: str, name: str) -> Any:
739
        """Modifies the name of an existing policy
740
741
        Arguments:
742
            config_id: UUID of policy to modify.
743
            name: New name for the config.
744
        """
745
        self.modify_config_set_name(
746
            config_id=policy_id,
747
            name=name,
748
        )
749
750
    def modify_policy_set_comment(
751
        self, policy_id: str, comment: Optional[str] = ""
752
    ) -> Any:
753
        """Modifies the comment of an existing policy
754
755
        Arguments:
756
            policy_id: UUID of policy to modify.
757
            comment: Comment to set on a config. Default: ''
758
        """
759
        self.modify_config_set_comment(
760
            config_id=policy_id,
761
            comment=comment,
762
        )
763
764
    def modify_policy_set_scanner_preference(
765
        self, policy_id: str, name: str, *, value: Optional[str] = None
766
    ) -> Any:
767
        """Modifies the scanner preferences of an existing policy
768
769
        Arguments:
770
            policy_id: UUID of policy to modify.
771
            name: Name of the scanner preference to change
772
            value: New value for the preference. None to delete the preference
773
                and to use the default instead.
774
775
        """
776
        self.modify_config_set_scanner_preference(
777
            config_id=policy_id,
778
            name=name,
779
            value=value,
780
        )
781
782
    def modify_policy_set_nvt_selection(
783
        self, policy_id: str, family: str, nvt_oids: List[str]
784
    ) -> Any:
785
        """Modifies the selected nvts of an existing policy
786
787
        The manager updates the given family in the config to include only the
788
        given NVTs.
789
790
        Arguments:
791
            policy_id: UUID of policy to modify.
792
            family: Name of the NVT family to include NVTs from
793
            nvt_oids: List of NVTs to select for the family.
794
        """
795
        self.modify_config_set_nvt_selection(
796
            config_id=policy_id,
797
            family=family,
798
            nvt_oids=nvt_oids,
799
        )
800
801
    def modify_policy_set_family_selection(
802
        self,
803
        policy_id: str,
804
        families: List[str],
805
        *,
806
        auto_add_new_families: Optional[bool] = True,
807
        auto_add_new_nvts: Optional[bool] = True
808
    ) -> Any:
809
        """
810
        Selected the NVTs of a policy at a family level.
811
812
        Arguments:
813
            policy_id: UUID of policy to modify.
814
            families: List of NVT family names to select.
815
            auto_add_new_families: Whether new families should be added to the
816
                policy automatically. Default: True.
817
            auto_add_new_nvts: Whether new NVTs in the selected families should
818
                be added to the policy automatically. Default: True.
819
        """
820
        self.modify_config_set_family_selection(
821
            config_id=policy_id,
822
            families=families,
823
            auto_add_new_families=auto_add_new_families,
824
            auto_add_new_nvts=auto_add_new_nvts,
825
        )
826
827 View Code Duplication
    def modify_tls_certificate(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
828
        self,
829
        tls_certificate_id: str,
830
        *,
831
        name: Optional[str] = None,
832
        comment: Optional[str] = None,
833
        trust: Optional[bool] = None
834
    ) -> Any:
835
        """Modifies an existing TLS certificate.
836
837
        Arguments:
838
            tls_certificate_id: UUID of the TLS certificate to be modified.
839
            name: Name of the TLS certificate, defaulting to the MD5 fingerprint
840
            comment: Comment for the TLS certificate.
841
            trust: Whether the certificate is trusted.
842
843
        Returns:
844
            The response. See :py:meth:`send_command` for details.
845
        """
846
        if not tls_certificate_id:
847
            raise RequiredArgument(
848
                function=self.modify_tls_certificate.__name__,
849
                argument='tls_certificate_id',
850
            )
851
852
        cmd = XmlCommand("modify_tls_certificate")
853
        cmd.set_attribute("tls_certificate_id", str(tls_certificate_id))
854
855
        if comment:
856
            cmd.add_element("comment", comment)
857
858
        if name:
859
            cmd.add_element("name", name)
860
861
        if trust:
862
            cmd.add_element("trust", _to_bool(trust))
863
864
        return self._send_xml_command(cmd)
865
866
    def clone_tls_certificate(self, tls_certificate_id: str) -> Any:
867
        """Modifies an existing TLS certificate.
868
869
        Arguments:
870
            tls_certificate_id: The UUID of an existing TLS certificate
871
872
        Returns:
873
            The response. See :py:meth:`send_command` for details.
874
        """
875
        if not tls_certificate_id:
876
            raise RequiredArgument(
877
                function=self.modify_tls_certificate.__name__,
878
                argument='tls_certificate_id',
879
            )
880
881
        cmd = XmlCommand("create_tls_certificate")
882
883
        cmd.add_element("copy", tls_certificate_id)
884
885
        return self._send_xml_command(cmd)
886
887
    def get_configs(
888
        self,
889
        *,
890
        filter: Optional[str] = None,
891
        filter_id: Optional[str] = None,
892
        trash: Optional[bool] = None,
893
        details: Optional[bool] = None,
894
        families: Optional[bool] = None,
895
        preferences: Optional[bool] = None,
896
        tasks: Optional[bool] = None
897
    ) -> Any:
898
        """Request a list of scan configs
899
900
        Arguments:
901
            filter: Filter term to use for the query
902
            filter_id: UUID of an existing filter to use for the query
903
            trash: Whether to get the trashcan scan configs instead
904
            details: Whether to get config families, preferences, nvt selectors
905
                and tasks.
906
            families: Whether to include the families if no details are
907
                requested
908
            preferences: Whether to include the preferences if no details are
909
                requested
910
            tasks: Whether to get tasks using this config
911
912
        Returns:
913
            The response. See :py:meth:`send_command` for details.
914
        """
915
        return self.__get_configs(
916
            UsageType.SCAN,
917
            filter=filter,
918
            filter_id=filter_id,
919
            trash=trash,
920
            details=details,
921
            families=families,
922
            preferences=preferences,
923
            tasks=tasks,
924
        )
925
926
    def get_policies(
927
        self,
928
        *,
929
        audits: Optional[bool] = None,
930
        filter: Optional[str] = None,
931
        filter_id: Optional[str] = None,
932
        details: Optional[bool] = None,
933
        families: Optional[bool] = None,
934
        preferences: Optional[bool] = None,
935
        trash: Optional[bool] = None
936
    ) -> Any:
937
        """Request a list of policies
938
939
        Arguments:
940
            audits: Whether to get audits using the policy
941
            filter: Filter term to use for the query
942
            filter_id: UUID of an existing filter to use for the query
943
            details: Whether to get  families, preferences, nvt selectors
944
                and tasks.
945
            families: Whether to include the families if no details are
946
                requested
947
            preferences: Whether to include the preferences if no details are
948
                requested
949
            trash: Whether to get the trashcan audits instead
950
951
        Returns:
952
            The response. See :py:meth:`send_command` for details.
953
        """
954
        return self.__get_configs(
955
            UsageType.POLICY,
956
            filter=filter,
957
            filter_id=filter_id,
958
            details=details,
959
            families=families,
960
            preferences=preferences,
961
            tasks=audits,
962
            trash=trash,
963
        )
964
965
    def get_config(
966
        self, config_id: str, *, tasks: Optional[bool] = None
967
    ) -> Any:
968
        """Request a single scan config
969
970
        Arguments:
971
            config_id: UUID of an existing scan config
972
            tasks: Whether to get tasks using this config
973
974
        Returns:
975
            The response. See :py:meth:`send_command` for details.
976
        """
977
        return self.__get_config(
978
            config_id=config_id, usage_type=UsageType.SCAN, tasks=tasks
979
        )
980
981
    def get_policy(self, policy_id: str) -> Any:
982
        """Request a single policy
983
984
        Arguments:
985
            policy_id: UUID of an existing policy
986
987
        Returns:
988
            The response. See :py:meth:`send_command` for details.
989
        """
990
        return self.__get_config(policy_id, UsageType.POLICY)
991
992
    def get_tasks(
993
        self,
994
        *,
995
        filter: Optional[str] = None,
996
        filter_id: Optional[str] = None,
997
        trash: Optional[bool] = None,
998
        details: Optional[bool] = None,
999
        schedules_only: Optional[bool] = None
1000
    ) -> Any:
1001
        """Request a list of tasks
1002
1003
        Arguments:
1004
            filter: Filter term to use for the query
1005
            filter_id: UUID of an existing filter to use for the query
1006
            trash: Whether to get the trashcan tasks instead
1007
            details: Whether to include full task details
1008
            schedules_only: Whether to only include id, name and schedule
1009
                details
1010
1011
        Returns:
1012
            The response. See :py:meth:`send_command` for details.
1013
        """
1014
        return self.__get_tasks(
1015
            UsageType.SCAN,
1016
            filter=filter,
1017
            filter_id=filter_id,
1018
            trash=trash,
1019
            details=details,
1020
            schedules_only=schedules_only,
1021
        )
1022
1023
    def get_audits(
1024
        self,
1025
        *,
1026
        filter: Optional[str] = None,
1027
        filter_id: Optional[str] = None,
1028
        trash: Optional[bool] = None,
1029
        details: Optional[bool] = None,
1030
        schedules_only: Optional[bool] = None
1031
    ) -> Any:
1032
        """Request a list of audits
1033
1034
        Arguments:
1035
            filter: Filter term to use for the query
1036
            filter_id: UUID of an existing filter to use for the query
1037
            trash: Whether to get the trashcan audits instead
1038
            details: Whether to include full audit details
1039
            schedules_only: Whether to only include id, name and schedule
1040
                details
1041
1042
        Returns:
1043
            The response. See :py:meth:`send_command` for details.
1044
        """
1045
        return self.__get_tasks(
1046
            UsageType.AUDIT,
1047
            filter=filter,
1048
            filter_id=filter_id,
1049
            trash=trash,
1050
            details=details,
1051
            schedules_only=schedules_only,
1052
        )
1053
1054
    def get_task(self, task_id: str) -> Any:
1055
        """Request a single task
1056
1057
        Arguments:
1058
            task_id: UUID of an existing task
1059
1060
        Returns:
1061
            The response. See :py:meth:`send_command` for details.
1062
        """
1063
        return self.__get_task(task_id, UsageType.SCAN)
1064
1065
    def get_audit(self, audit_id: str) -> Any:
1066
        """Request a single audit
1067
1068
        Arguments:
1069
            audit_id: UUID of an existing audit
1070
1071
        Returns:
1072
            The response. See :py:meth:`send_command` for details.
1073
        """
1074
        return self.__get_task(audit_id, UsageType.AUDIT)
1075
1076
    def clone_audit(self, audit_id: str) -> Any:
1077
        """Clone an existing audit
1078
1079
        Arguments:
1080
            audit_id: UUID of existing audit to clone from
1081
1082
        Returns:
1083
            The response. See :py:meth:`send_command` for details.
1084
        """
1085
        if not audit_id:
1086
            raise RequiredArgument(
1087
                function=self.clone_audit.__name__, argument='audit_id'
1088
            )
1089
1090
        cmd = XmlCommand("create_task")
1091
        cmd.add_element("copy", audit_id)
1092
        return self._send_xml_command(cmd)
1093
1094
    def clone_policy(self, policy_id: str) -> Any:
1095
        """Clone a policy from an existing one
1096
1097
        Arguments:
1098
            policy_id: UUID of the existing policy
1099
1100
        Returns:
1101
            The response. See :py:meth:`send_command` for details.
1102
        """
1103
        if not policy_id:
1104
            raise RequiredArgument(
1105
                function=self.clone_policy.__name__, argument='policy_id'
1106
            )
1107
1108
        cmd = XmlCommand("create_config")
1109
        cmd.add_element("copy", policy_id)
1110
        return self._send_xml_command(cmd)
1111
1112
    def delete_audit(
1113
        self, audit_id: str, *, ultimate: Optional[bool] = False
1114
    ) -> Any:
1115
        """Deletes an existing audit
1116
1117
        Arguments:
1118
            audit_id: UUID of the audit to be deleted.
1119
            ultimate: Whether to remove entirely, or to the trashcan.
1120
        """
1121
        if not audit_id:
1122
            raise RequiredArgument(
1123
                function=self.delete_audit.__name__, argument='audit_id'
1124
            )
1125
1126
        cmd = XmlCommand("delete_task")
1127
        cmd.set_attribute("task_id", audit_id)
1128
        cmd.set_attribute("ultimate", _to_bool(ultimate))
1129
1130
        return self._send_xml_command(cmd)
1131
1132
    def delete_policy(
1133
        self, policy_id: str, *, ultimate: Optional[bool] = False
1134
    ) -> Any:
1135
        """Deletes an existing policy
1136
1137
        Arguments:
1138
            policy_id: UUID of the policy to be deleted.
1139
            ultimate: Whether to remove entirely, or to the trashcan.
1140
        """
1141
        if not policy_id:
1142
            raise RequiredArgument(
1143
                function=self.delete_policy.__name__, argument='policy_id'
1144
            )
1145
1146
        cmd = XmlCommand("delete_config")
1147
        cmd.set_attribute("config_id", policy_id)
1148
        cmd.set_attribute("ultimate", _to_bool(ultimate))
1149
1150
        return self._send_xml_command(cmd)
1151
1152
    def delete_tls_certificate(self, tls_certificate_id: str) -> Any:
1153
        """Deletes an existing tls certificate
1154
1155
        Arguments:
1156
            tls_certificate_id: UUID of the tls certificate to be deleted.
1157
        """
1158
        if not tls_certificate_id:
1159
            raise RequiredArgument(
1160
                function=self.delete_tls_certificate.__name__,
1161
                argument='policy_id',
1162
            )
1163
1164
        cmd = XmlCommand("delete_tls_certificate")
1165
        cmd.set_attribute("tls_certificate_id", tls_certificate_id)
1166
1167
        return self._send_xml_command(cmd)
1168
1169 View Code Duplication
    def __create_task(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1170
        self,
1171
        name: str,
1172
        config_id: str,
1173
        target_id: str,
1174
        scanner_id: str,
1175
        usage_type: UsageType,
1176
        function: str,
1177
        *,
1178
        alterable: Optional[bool] = None,
1179
        hosts_ordering: Optional[HostsOrdering] = None,
1180
        schedule_id: Optional[str] = None,
1181
        alert_ids: Optional[List[str]] = None,
1182
        comment: Optional[str] = None,
1183
        schedule_periods: Optional[int] = None,
1184
        observers: Optional[List[str]] = None,
1185
        preferences: Optional[dict] = None
1186
    ) -> Any:
1187
        if not name:
1188
            raise RequiredArgument(function=function, argument='name')
1189
1190
        if not config_id:
1191
            raise RequiredArgument(function=function, argument='config_id')
1192
1193
        if not target_id:
1194
            raise RequiredArgument(function=function, argument='target_id')
1195
1196
        if not scanner_id:
1197
            raise RequiredArgument(function=function, argument='scanner_id')
1198
1199
        # don't allow to create a container task with create_task
1200
        if target_id == '0':
1201
            raise InvalidArgument(function=function, argument='target_id')
1202
1203
        cmd = XmlCommand("create_task")
1204
        cmd.add_element("name", name)
1205
        cmd.add_element("usage_type", usage_type.value)
1206
        cmd.add_element("config", attrs={"id": config_id})
1207
        cmd.add_element("target", attrs={"id": target_id})
1208
        cmd.add_element("scanner", attrs={"id": scanner_id})
1209
1210
        if comment:
1211
            cmd.add_element("comment", comment)
1212
1213
        if alterable is not None:
1214
            cmd.add_element("alterable", _to_bool(alterable))
1215
1216
        if hosts_ordering:
1217
            if not isinstance(hosts_ordering, self.types.HostsOrdering):
1218
                raise InvalidArgumentType(
1219
                    function=function,
1220
                    argument='hosts_ordering',
1221
                    arg_type=HostsOrdering.__name__,
1222
                )
1223
            cmd.add_element("hosts_ordering", hosts_ordering.value)
1224
1225
        if alert_ids:
1226
            if isinstance(alert_ids, str):
1227
                deprecation(
1228
                    "Please pass a list as alert_ids parameter to {}. "
1229
                    "Passing a string is deprecated and will be removed in "
1230
                    "future.".format(function)
1231
                )
1232
1233
                # if a single id is given as a string wrap it into a list
1234
                alert_ids = [alert_ids]
1235
            if _is_list_like(alert_ids):
1236
                # parse all given alert id's
1237
                for alert in alert_ids:
1238
                    cmd.add_element("alert", attrs={"id": str(alert)})
1239
1240
        if schedule_id:
1241
            cmd.add_element("schedule", attrs={"id": schedule_id})
1242
1243
            if schedule_periods is not None:
1244
                if (
1245
                    not isinstance(schedule_periods, numbers.Integral)
1246
                    or schedule_periods < 0
1247
                ):
1248
                    raise InvalidArgument(
1249
                        "schedule_periods must be an integer greater or equal "
1250
                        "than 0"
1251
                    )
1252
                cmd.add_element("schedule_periods", str(schedule_periods))
1253
1254
        if observers is not None:
1255
            if not _is_list_like(observers):
1256
                raise InvalidArgumentType(
1257
                    function=function, argument='observers', arg_type='list'
1258
                )
1259
1260
            # gvmd splits by comma and space
1261
            # gvmd tries to lookup each value as user name and afterwards as
1262
            # user id. So both user name and user id are possible
1263
            cmd.add_element("observers", _to_comma_list(observers))
1264
1265
        if preferences is not None:
1266
            if not isinstance(preferences, collections.abc.Mapping):
1267
                raise InvalidArgumentType(
1268
                    function=function,
1269
                    argument='preferences',
1270
                    arg_type=collections.abc.Mapping.__name__,
1271
                )
1272
1273
            _xmlprefs = cmd.add_element("preferences")
1274
            for pref_name, pref_value in preferences.items():
1275
                _xmlpref = _xmlprefs.add_element("preference")
1276
                _xmlpref.add_element("scanner_name", pref_name)
1277
                _xmlpref.add_element("value", str(pref_value))
1278
1279
        return self._send_xml_command(cmd)
1280
1281
    def __create_config(
1282
        self,
1283
        config_id: str,
1284
        name: str,
1285
        usage_type: UsageType,
1286
        function: str,
1287
        *,
1288
        comment: Optional[str] = None
1289
    ) -> Any:
1290
        if not name:
1291
            raise RequiredArgument(function=function, argument='name')
1292
1293
        if not config_id:
1294
            raise RequiredArgument(function=function, argument='config_id')
1295
1296
        cmd = XmlCommand("create_config")
1297
        if comment is not None:
1298
            cmd.add_element("comment", comment)
1299
        cmd.add_element("copy", config_id)
1300
        cmd.add_element("name", name)
1301
        cmd.add_element("usage_type", usage_type.value)
1302
        return self._send_xml_command(cmd)
1303
1304
    def __create_config_from_osp_scanner(
1305
        self,
1306
        scanner_id: str,
1307
        name: str,
1308
        usage_type: UsageType,
1309
        function: str,
1310
        *,
1311
        comment: Optional[str] = None
1312
    ) -> Any:
1313
        if not name:
1314
            raise RequiredArgument(function=function, argument='name')
1315
1316
        if not scanner_id:
1317
            raise RequiredArgument(function=function, argument='scanner_id')
1318
1319
        cmd = XmlCommand("create_config")
1320
        if comment is not None:
1321
            cmd.add_element("comment", comment)
1322
        cmd.add_element("scanner", scanner_id)
1323
        cmd.add_element("name", name)
1324
        cmd.add_element("usage_type", usage_type.value)
1325
        return self._send_xml_command(cmd)
1326
1327 View Code Duplication
    def __get_configs(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1328
        self,
1329
        usage_type: UsageType,
1330
        *,
1331
        filter: Optional[str] = None,
1332
        filter_id: Optional[str] = None,
1333
        trash: Optional[bool] = None,
1334
        details: Optional[bool] = None,
1335
        families: Optional[bool] = None,
1336
        preferences: Optional[bool] = None,
1337
        tasks: Optional[bool] = None
1338
    ) -> Any:
1339
        cmd = XmlCommand("get_configs")
1340
        cmd.set_attribute("usage_type", usage_type.value)
1341
1342
        _add_filter(cmd, filter, filter_id)
1343
1344
        if trash is not None:
1345
            cmd.set_attribute("trash", _to_bool(trash))
1346
1347
        if details is not None:
1348
            cmd.set_attribute("details", _to_bool(details))
1349
1350
        if families is not None:
1351
            cmd.set_attribute("families", _to_bool(families))
1352
1353
        if preferences is not None:
1354
            cmd.set_attribute("preferences", _to_bool(preferences))
1355
1356
        if tasks is not None:
1357
            cmd.set_attribute("tasks", _to_bool(tasks))
1358
1359
        return self._send_xml_command(cmd)
1360
1361
    def __get_config(
1362
        self,
1363
        config_id: str,
1364
        usage_type: UsageType,
1365
        *,
1366
        tasks: Optional[bool] = None
1367
    ) -> Any:
1368
        if not config_id:
1369
            raise RequiredArgument(
1370
                function=self.get_config.__name__, argument='config_id'
1371
            )
1372
1373
        cmd = XmlCommand("get_configs")
1374
        cmd.set_attribute("config_id", config_id)
1375
1376
        cmd.set_attribute("usage_type", usage_type.value)
1377
1378
        if tasks is not None:
1379
            cmd.set_attribute("tasks", _to_bool(tasks))
1380
1381
        # for single entity always request all details
1382
        cmd.set_attribute("details", "1")
1383
1384
        return self._send_xml_command(cmd)
1385
1386
    def __get_tasks(
1387
        self,
1388
        usage_type: UsageType,
1389
        *,
1390
        filter: Optional[str] = None,
1391
        filter_id: Optional[str] = None,
1392
        trash: Optional[bool] = None,
1393
        details: Optional[bool] = None,
1394
        schedules_only: Optional[bool] = None
1395
    ) -> Any:
1396
        cmd = XmlCommand("get_tasks")
1397
        cmd.set_attribute("usage_type", usage_type.value)
1398
1399
        _add_filter(cmd, filter, filter_id)
1400
1401
        if trash is not None:
1402
            cmd.set_attribute("trash", _to_bool(trash))
1403
1404
        if details is not None:
1405
            cmd.set_attribute("details", _to_bool(details))
1406
1407
        if schedules_only is not None:
1408
            cmd.set_attribute("schedules_only", _to_bool(schedules_only))
1409
1410
        return self._send_xml_command(cmd)
1411
1412
    def __get_task(self, task_id: str, usage_type: UsageType) -> Any:
1413
        if not task_id:
1414
            raise RequiredArgument(
1415
                function=self.get_task.__name__, argument='task_id'
1416
            )
1417
1418
        cmd = XmlCommand("get_tasks")
1419
        cmd.set_attribute("task_id", task_id)
1420
        cmd.set_attribute("usage_type", usage_type.value)
1421
1422
        # for single entity always request all details
1423
        cmd.set_attribute("details", "1")
1424
        return self._send_xml_command(cmd)
1425