Completed
Push — master ( a7ad2f...bd2755 )
by Jaspar
22s queued 14s
created

GmpV9Mixin.modify_policy_set_family_selection()   A

Complexity

Conditions 1

Size

Total Lines 24
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 12
nop 6
dl 0
loc 24
rs 9.8
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 - 2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
# pylint: disable=arguments-differ, redefined-builtin, too-many-lines
20
21
"""
22
Module for communication with gvmd in `Greenbone Management Protocol version 9`_
23
24
.. _Greenbone Management Protocol version 9:
25
    https://docs.greenbone.net/API/GMP/gmp-9.0.html
26
"""
27
import collections
28
import numbers
29
30
from typing import Any, List, Optional, Callable
31
32
from gvm.errors import InvalidArgument, InvalidArgumentType, RequiredArgument
33
from gvm.utils import deprecation
34
from gvm.xml import XmlCommand
35
36
from gvm.protocols.base import GvmProtocol
37
from gvm.connections import GvmConnection
38
from gvm.protocols.gmpv7.gmpv7 import (
39
    _to_bool,
40
    _add_filter,
41
    _is_list_like,
42
    _to_comma_list,
43
)
44
45
from . import types
46
from .types import *  # pylint: disable=unused-wildcard-import, wildcard-import
47
from .types import _UsageType as UsageType
48
49
_EMPTY_POLICY_ID = '085569ce-73ed-11df-83c3-002264764cea'
50
51
52
def _check_event(
53
    event: AlertEvent, condition: AlertCondition, method: AlertMethod
54
):
55
    if event == AlertEvent.TASK_RUN_STATUS_CHANGED:
56
        if not condition:
57
            raise RequiredArgument(
58
                "condition is required for event {}".format(event.name)
59
            )
60
61
        if not method:
62
            raise RequiredArgument(
63
                "method is required for event {}".format(event.name)
64
            )
65
66
        if condition not in (
67
            AlertCondition.ALWAYS,
68
            AlertCondition.FILTER_COUNT_CHANGED,
69
            AlertCondition.FILTER_COUNT_AT_LEAST,
70
            AlertCondition.SEVERITY_AT_LEAST,
71
            AlertCondition.SEVERITY_CHANGED,
72
        ):
73
            raise InvalidArgument(
74
                "Invalid condition {} for event {}".format(
75
                    condition.name, event.name
76
                )
77
            )
78
    elif event in (
79
        AlertEvent.NEW_SECINFO_ARRIVED,
80
        AlertEvent.UPDATED_SECINFO_ARRIVED,
81
    ):
82
        if not condition:
83
            raise RequiredArgument(
84
                "condition is required for event {}".format(event.name)
85
            )
86
87
        if not method:
88
            raise RequiredArgument(
89
                "method is required for event {}".format(event.name)
90
            )
91
92
        if condition != AlertCondition.ALWAYS:
93
            raise InvalidArgument(
94
                "Invalid condition {} for event {}".format(
95
                    condition.name, event.name
96
                )
97
            )
98
        if method not in (
99
            AlertMethod.SCP,
100
            AlertMethod.SEND,
101
            AlertMethod.SMB,
102
            AlertMethod.SNMP,
103
            AlertMethod.SYSLOG,
104
            AlertMethod.EMAIL,
105
        ):
106
            raise InvalidArgument(
107
                "Invalid method {} for event {}".format(method.name, event.name)
108
            )
109
    elif event in (
110
        AlertEvent.TICKET_RECEIVED,
111
        AlertEvent.OWNED_TICKET_CHANGED,
112
        AlertEvent.ASSIGNED_TICKET_CHANGED,
113
    ):
114
        if not condition:
115
            raise RequiredArgument(
116
                "condition is required for event {}".format(event.name)
117
            )
118
119
        if not method:
120
            raise RequiredArgument(
121
                "method is required for event {}".format(event.name)
122
            )
123
        if condition != AlertCondition.ALWAYS:
124
            raise InvalidArgument(
125
                "Invalid condition {} for event {}".format(
126
                    condition.name, event.name
127
                )
128
            )
129
        if method not in (
130
            AlertMethod.EMAIL,
131
            AlertMethod.START_TASK,
132
            AlertMethod.SYSLOG,
133
        ):
134
            raise InvalidArgument(
135
                "Invalid method {} for event {}".format(method.name, event.name)
136
            )
137
    elif event is not None:
138
        raise InvalidArgument('Invalid event "{}"'.format(event.name))
139
140
141
class GmpV9Mixin(GvmProtocol):
142
143
    types = types
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable types does not seem to be defined.
Loading history...
144
145
    def __init__(
146
        self,
147
        connection: GvmConnection,
148
        *,
149
        transform: Optional[Callable[[str], Any]] = None
150
    ):
151
        super().__init__(connection, transform=transform)
152
153
        # Is authenticated on gvmd
154
        self._authenticated = False
155
156 View Code Duplication
    def create_alert(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
157
        self,
158
        name: str,
159
        condition: AlertCondition,
160
        event: AlertEvent,
161
        method: AlertMethod,
162
        *,
163
        method_data: Optional[dict] = None,
164
        event_data: Optional[dict] = None,
165
        condition_data: Optional[dict] = None,
166
        filter_id: Optional[int] = None,
167
        comment: Optional[str] = None
168
    ) -> Any:
169
        """Create a new alert
170
171
        Arguments:
172
            name: Name of the new Alert
173
            condition: The condition that must be satisfied for the alert
174
                to occur; if the event is either 'Updated SecInfo arrived' or
175
                'New SecInfo arrived', condition must be 'Always'. Otherwise,
176
                condition can also be on of 'Severity at least', 'Filter count
177
                changed' or 'Filter count at least'.
178
            event: The event that must happen for the alert to occur, one
179
                of 'Task run status changed', 'Updated SecInfo arrived' or 'New
180
                SecInfo arrived'
181
            method: The method by which the user is alerted, one of 'SCP',
182
                'Send', 'SMB', 'SNMP', 'Syslog' or 'Email'; if the event is
183
                neither 'Updated SecInfo arrived' nor 'New SecInfo arrived',
184
                method can also be one of 'Start Task', 'HTTP Get', 'Sourcefire
185
                Connector' or 'verinice Connector'.
186
            condition_data: Data that defines the condition
187
            event_data: Data that defines the event
188
            method_data: Data that defines the method
189
            filter_id: Filter to apply when executing alert
190
            comment: Comment for the alert
191
192
        Returns:
193
            The response. See :py:meth:`send_command` for details.
194
        """
195
        if not name:
196
            raise RequiredArgument(
197
                function=self.create_alert.__name__, argument='name'
198
            )
199
200
        if not condition:
201
            raise RequiredArgument(
202
                function=self.create_alert.__name__, argument='condition'
203
            )
204
205
        if not event:
206
            raise RequiredArgument(
207
                function=self.create_alert.__name__, argument='event'
208
            )
209
210
        if not method:
211
            raise RequiredArgument(
212
                function=self.create_alert.__name__, argument='method'
213
            )
214
215
        if not isinstance(condition, AlertCondition):
216
            raise InvalidArgumentType(
217
                function=self.create_alert.__name__,
218
                argument='condition',
219
                arg_type=AlertCondition.__name__,
220
            )
221
222
        if not isinstance(event, AlertEvent):
223
            raise InvalidArgumentType(
224
                function=self.create_alert.__name__,
225
                argument='even',
226
                arg_type=AlertEvent.__name__,
227
            )
228
229
        if not isinstance(method, AlertMethod):
230
            raise InvalidArgumentType(
231
                function=self.create_alert.__name__,
232
                argument='method',
233
                arg_type=AlertMethod.__name__,
234
            )
235
236
        _check_event(event, condition, method)
237
238
        cmd = XmlCommand("create_alert")
239
        cmd.add_element("name", name)
240
241
        conditions = cmd.add_element("condition", condition.value)
242
243
        if condition_data is not None:
244
            for key, value in condition_data.items():
245
                _data = conditions.add_element("data", value)
246
                _data.add_element("name", key)
247
248
        events = cmd.add_element("event", event.value)
249
250
        if event_data is not None:
251
            for key, value in event_data.items():
252
                _data = events.add_element("data", value)
253
                _data.add_element("name", key)
254
255
        methods = cmd.add_element("method", method.value)
256
257
        if method_data is not None:
258
            for key, value in method_data.items():
259
                _data = methods.add_element("data", value)
260
                _data.add_element("name", key)
261
262
        if filter_id:
263
            cmd.add_element("filter", attrs={"id": filter_id})
264
265
        if comment:
266
            cmd.add_element("comment", comment)
267
268
        return self._send_xml_command(cmd)
269
270 View Code Duplication
    def create_audit(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
271
        self,
272
        name: str,
273
        policy_id: str,
274
        target_id: str,
275
        scanner_id: str,
276
        *,
277
        alterable: Optional[bool] = None,
278
        hosts_ordering: Optional[HostsOrdering] = None,
279
        schedule_id: Optional[str] = None,
280
        alert_ids: Optional[List[str]] = None,
281
        comment: Optional[str] = None,
282
        schedule_periods: Optional[int] = None,
283
        observers: Optional[List[str]] = None,
284
        preferences: Optional[dict] = None
285
    ) -> Any:
286
        """Create a new audit task
287
288
        Arguments:
289
            name: Name of the new audit
290
            policy_id: UUID of policy to use by the audit
291
            target_id: UUID of target to be scanned
292
            scanner_id: UUID of scanner to use for scanning the target
293
            comment: Comment for the audit
294
            alterable: Whether the task should be alterable
295
            alert_ids: List of UUIDs for alerts to be applied to the audit
296
            hosts_ordering: The order hosts are scanned in
297
            schedule_id: UUID of a schedule when the audit should be run.
298
            schedule_periods: A limit to the number of times the audit will be
299
                scheduled, or 0 for no limit
300
            observers: List of names or ids of users which should be allowed to
301
                observe this audit
302
            preferences: Name/Value pairs of scanner preferences.
303
304
        Returns:
305
            The response. See :py:meth:`send_command` for details.
306
        """
307
308
        return self.__create_task(
309
            name=name,
310
            config_id=policy_id,
311
            target_id=target_id,
312
            scanner_id=scanner_id,
313
            usage_type=UsageType.AUDIT,
314
            function=self.create_audit.__name__,
315
            alterable=alterable,
316
            hosts_ordering=hosts_ordering,
317
            schedule_id=schedule_id,
318
            alert_ids=alert_ids,
319
            comment=comment,
320
            schedule_periods=schedule_periods,
321
            observers=observers,
322
            preferences=preferences,
323
        )
324
325
    def create_config(
326
        self, config_id: str, name: str, *, comment: Optional[str] = None
327
    ) -> Any:
328
        """Create a new scan config
329
330
        Arguments:
331
            config_id: UUID of the existing scan config
332
            name: Name of the new scan config
333
            comment: A comment on the config
334
335
        Returns:
336
            The response. See :py:meth:`send_command` for details.
337
        """
338
        return self.__create_config(
339
            config_id=config_id,
340
            name=name,
341
            comment=comment,
342
            usage_type=UsageType.SCAN,
343
            function=self.create_config.__name__,
344
        )
345
346
    def create_config_from_osp_scanner(
347
        self, scanner_id: str, name: str, *, comment: Optional[str] = None
348
    ) -> Any:
349
        """Create a new scan config from an ospd scanner.
350
351
        Create config by retrieving the expected preferences from the given
352
        scanner via OSP.
353
354
        Arguments:
355
            scanner_id: UUID of an OSP scanner to get config data from
356
            name: Name of the new scan config
357
            comment: A comment on the config
358
359
        Returns:
360
            The response. See :py:meth:`send_command` for details.
361
        """
362
        return self.__create_config_from_osp_scanner(
363
            scanner_id=scanner_id,
364
            name=name,
365
            comment=comment,
366
            usage_type=UsageType.SCAN,
367
            function=self.create_config.__name__,
368
        )
369
370
    def create_policy(
371
        self, name: str, *, policy_id: str = None, comment: Optional[str] = None
372
    ) -> Any:
373
        """Create a new policy config
374
375
        Arguments:
376
            name: Name of the new policy
377
            policy_id: UUID of an existing policy as base. By default the empty
378
                policy is used.
379
            comment: A comment on the policy
380
381
        Returns:
382
            The response. See :py:meth:`send_command` for details.
383
        """
384
        if policy_id is None:
385
            policy_id = _EMPTY_POLICY_ID
386
        return self.__create_config(
387
            config_id=policy_id,
388
            name=name,
389
            comment=comment,
390
            usage_type=UsageType.POLICY,
391
            function=self.create_policy.__name__,
392
        )
393
394 View Code Duplication
    def create_task(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
395
        self,
396
        name: str,
397
        config_id: str,
398
        target_id: str,
399
        scanner_id: str,
400
        *,
401
        alterable: Optional[bool] = None,
402
        hosts_ordering: Optional[HostsOrdering] = None,
403
        schedule_id: Optional[str] = None,
404
        alert_ids: Optional[List[str]] = None,
405
        comment: Optional[str] = None,
406
        schedule_periods: Optional[int] = None,
407
        observers: Optional[List[str]] = None,
408
        preferences: Optional[dict] = None
409
    ) -> Any:
410
        """Create a new scan task
411
412
        Arguments:
413
            name: Name of the task
414
            config_id: UUID of scan config to use by the task
415
            target_id: UUID of target to be scanned
416
            scanner_id: UUID of scanner to use for scanning the target
417
            comment: Comment for the task
418
            alterable: Whether the task should be alterable
419
            alert_ids: List of UUIDs for alerts to be applied to the task
420
            hosts_ordering: The order hosts are scanned in
421
            schedule_id: UUID of a schedule when the task should be run.
422
            schedule_periods: A limit to the number of times the task will be
423
                scheduled, or 0 for no limit
424
            observers: List of names or ids of users which should be allowed to
425
                observe this task
426
            preferences: Name/Value pairs of scanner preferences.
427
428
        Returns:
429
            The response. See :py:meth:`send_command` for details.
430
        """
431
        return self.__create_task(
432
            name=name,
433
            config_id=config_id,
434
            target_id=target_id,
435
            scanner_id=scanner_id,
436
            usage_type=UsageType.SCAN,
437
            function=self.create_task.__name__,
438
            alterable=alterable,
439
            hosts_ordering=hosts_ordering,
440
            schedule_id=schedule_id,
441
            alert_ids=alert_ids,
442
            comment=comment,
443
            schedule_periods=schedule_periods,
444
            observers=observers,
445
            preferences=preferences,
446
        )
447
448 View Code Duplication
    def create_tls_certificate(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
449
        self,
450
        name: str,
451
        certificate: str,
452
        *,
453
        comment: Optional[str] = None,
454
        trust: Optional[bool] = None
455
    ) -> Any:
456
        """Create a new TLS certificate
457
458
        Arguments:
459
            name: Name of the TLS certificate, defaulting to the MD5
460
                fingerprint.
461
            certificate: The Base64 encoded certificate data (x.509 DER or PEM).
462
            comment: Comment for the TLS certificate.
463
            trust: Whether the certificate is trusted.
464
465
        Returns:
466
            The response. See :py:meth:`send_command` for details.
467
        """
468
        if not name:
469
            raise RequiredArgument(
470
                function=self.create_tls_certificate.__name__, argument='name'
471
            )
472
        if not certificate:
473
            raise RequiredArgument(
474
                function=self.create_tls_certificate.__name__,
475
                argument='certificate',
476
            )
477
478
        cmd = XmlCommand("create_tls_certificate")
479
480
        if comment:
481
            cmd.add_element("comment", comment)
482
483
        cmd.add_element("name", name)
484
        cmd.add_element("certificate", certificate)
485
486
        if trust:
487
            cmd.add_element("trust", _to_bool(trust))
488
489
        return self._send_xml_command(cmd)
490
491 View Code Duplication
    def get_tls_certificates(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
492
        self,
493
        *,
494
        filter: Optional[str] = None,
495
        filter_id: Optional[str] = None,
496
        include_certificate_data: Optional[bool] = None,
497
        details: Optional[bool] = None
498
    ) -> Any:
499
        """Request a list of TLS certificates
500
501
        Arguments:
502
            filter: Filter term to use for the query
503
            filter_id: UUID of an existing filter to use for the query
504
            include_certificate_data: Whether to include the certificate data in
505
                the response
506
507
        Returns:
508
            The response. See :py:meth:`send_command` for details.
509
        """
510
511
        cmd = XmlCommand("get_tls_certificates")
512
513
        _add_filter(cmd, filter, filter_id)
514
515
        if details is not None:
516
            cmd.set_attribute("details", _to_bool(details))
517
518
        if include_certificate_data is not None:
519
            cmd.set_attribute(
520
                "include_certificate_data", _to_bool(include_certificate_data)
521
            )
522
523
        return self._send_xml_command(cmd)
524
525
    def get_tls_certificate(self, tls_certificate_id: str) -> Any:
526
        """Request a single TLS certificate
527
528
        Arguments:
529
            tls_certificate_id: UUID of an existing TLS certificate
530
531
        Returns:
532
            The response. See :py:meth:`send_command` for details.
533
        """
534
        cmd = XmlCommand("get_tls_certificates")
535
536
        if not tls_certificate_id:
537
            raise RequiredArgument(
538
                function=self.get_tls_certificate.__name__,
539
                argument='tls_certificate_id',
540
            )
541
542
        cmd.set_attribute("tls_certificate_id", tls_certificate_id)
543
544
        # for single tls certificate always request cert data
545
        cmd.set_attribute("include_certificate_data", "1")
546
547
        # for single entity always request all details
548
        cmd.set_attribute("details", "1")
549
550
        return self._send_xml_command(cmd)
551
552 View Code Duplication
    def modify_alert(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
553
        self,
554
        alert_id: str,
555
        *,
556
        name: Optional[str] = None,
557
        comment: Optional[str] = None,
558
        filter_id: Optional[str] = None,
559
        event: Optional[AlertEvent] = None,
560
        event_data: Optional[dict] = None,
561
        condition: Optional[AlertCondition] = None,
562
        condition_data: Optional[dict] = None,
563
        method: Optional[AlertMethod] = None,
564
        method_data: Optional[dict] = None
565
    ) -> Any:
566
        """Modifies an existing alert.
567
568
        Arguments:
569
            alert_id: UUID of the alert to be modified.
570
            name: Name of the Alert.
571
            condition: The condition that must be satisfied for the alert to
572
                occur. If the event is either 'Updated SecInfo
573
                arrived' or 'New SecInfo arrived', condition must be 'Always'.
574
                Otherwise, condition can also be on of 'Severity at least',
575
                'Filter count changed' or 'Filter count at least'.
576
            condition_data: Data that defines the condition
577
            event: The event that must happen for the alert to occur, one of
578
                'Task run status changed', 'Updated SecInfo arrived' or
579
                'New SecInfo arrived'
580
            event_data: Data that defines the event
581
            method: The method by which the user is alerted, one of 'SCP',
582
                'Send', 'SMB', 'SNMP', 'Syslog' or 'Email';
583
                if the event is neither 'Updated SecInfo arrived' nor
584
                'New SecInfo arrived', method can also be one of 'Start Task',
585
                'HTTP Get', 'Sourcefire Connector' or 'verinice Connector'.
586
            method_data: Data that defines the method
587
            filter_id: Filter to apply when executing alert
588
            comment: Comment for the alert
589
590
        Returns:
591
            The response. See :py:meth:`send_command` for details.
592
        """
593
594
        if not alert_id:
595
            raise RequiredArgument(
596
                function=self.modify_alert.__name__, argument='alert_id'
597
            )
598
599
        cmd = XmlCommand("modify_alert")
600
        cmd.set_attribute("alert_id", str(alert_id))
601
602
        if name:
603
            cmd.add_element("name", name)
604
605
        if comment:
606
            cmd.add_element("comment", comment)
607
608
        if filter_id:
609
            cmd.add_element("filter", attrs={"id": filter_id})
610
611
        if condition:
612
            if not isinstance(condition, AlertCondition):
613
                raise InvalidArgumentType(
614
                    function=self.modify_alert.__name__,
615
                    argument='condition',
616
                    arg_type=AlertCondition.__name__,
617
                )
618
619
            conditions = cmd.add_element("condition", condition.value)
620
621
            if condition_data is not None:
622
                for key, value in condition_data.items():
623
                    _data = conditions.add_element("data", value)
624
                    _data.add_element("name", key)
625
626
        if method:
627
            if not isinstance(method, AlertMethod):
628
                raise InvalidArgumentType(
629
                    function=self.modify_alert.__name__,
630
                    argument='method',
631
                    arg_type=AlertMethod.__name__,
632
                )
633
634
            methods = cmd.add_element("method", method.value)
635
636
            if method_data is not None:
637
                for key, value in method_data.items():
638
                    _data = methods.add_element("data", value)
639
                    _data.add_element("name", key)
640
641
        if event:
642
            if not isinstance(event, AlertEvent):
643
                raise InvalidArgumentType(
644
                    function=self.modify_alert.__name__,
645
                    argument='event',
646
                    arg_type=AlertEvent.__name__,
647
                )
648
649
            _check_event(event, condition, method)
650
651
            events = cmd.add_element("event", event.value)
652
653
            if event_data is not None:
654
                for key, value in event_data.items():
655
                    _data = events.add_element("data", value)
656
                    _data.add_element("name", key)
657
658
        return self._send_xml_command(cmd)
659
660
    def modify_audit(
661
        self,
662
        audit_id: str,
663
        *,
664
        name: Optional[str] = None,
665
        policy_id: Optional[str] = None,
666
        target_id: Optional[str] = None,
667
        scanner_id: Optional[str] = None,
668
        alterable: Optional[bool] = None,
669
        hosts_ordering: Optional[HostsOrdering] = None,
670
        schedule_id: Optional[str] = None,
671
        schedule_periods: Optional[int] = None,
672
        comment: Optional[str] = None,
673
        alert_ids: Optional[List[str]] = None,
674
        observers: Optional[List[str]] = None,
675
        preferences: Optional[dict] = None
676
    ) -> Any:
677
        """Modifies an existing task.
678
679
        Arguments:
680
            audit_id: UUID of audit to modify.
681
            name: The name of the audit.
682
            policy_id: UUID of policy to use by the audit
683
            target_id: UUID of target to be scanned
684
            scanner_id: UUID of scanner to use for scanning the target
685
            comment: The comment on the audit.
686
            alert_ids: List of UUIDs for alerts to be applied to the audit
687
            hosts_ordering: The order hosts are scanned in
688
            schedule_id: UUID of a schedule when the audit should be run.
689
            schedule_periods: A limit to the number of times the audit will be
690
                scheduled, or 0 for no limit.
691
            observers: List of names or ids of users which should be allowed to
692
                observe this audit
693
            preferences: Name/Value pairs of scanner preferences.
694
695
        Returns:
696
            The response. See :py:meth:`send_command` for details.
697
        """
698
        self.modify_task(
699
            task_id=audit_id,
700
            name=name,
701
            config_id=policy_id,
702
            target_id=target_id,
703
            scanner_id=scanner_id,
704
            alterable=alterable,
705
            hosts_ordering=hosts_ordering,
706
            schedule_id=schedule_id,
707
            schedule_periods=schedule_periods,
708
            comment=comment,
709
            alert_ids=alert_ids,
710
            observers=observers,
711
            preferences=preferences,
712
        )
713
714
    def modify_policy_set_nvt_preference(
715
        self,
716
        policy_id: str,
717
        name: str,
718
        nvt_oid: str,
719
        *,
720
        value: Optional[str] = None
721
    ) -> Any:
722
        """Modifies the nvt preferences of an existing policy.
723
724
        Arguments:
725
            policy_id: UUID of policy to modify.
726
            name: Name for preference to change.
727
            nvt_oid: OID of the NVT associated with preference to modify
728
            value: New value for the preference. None to delete the preference
729
                and to use the default instead.
730
        """
731
        self.modify_config_set_nvt_preference(
732
            config_id=policy_id,
733
            name=name,
734
            nvt_oid=nvt_oid,
735
            value=value,
736
        )
737
738
    def modify_policy_set_name(self, policy_id: str, name: str) -> Any:
739
        """Modifies the name of an existing policy
740
741
        Arguments:
742
            config_id: UUID of policy to modify.
743
            name: New name for the config.
744
        """
745
        self.modify_config_set_name(
746
            config_id=policy_id,
747
            name=name,
748
        )
749
750
    def modify_policy_set_comment(
751
        self, policy_id: str, comment: Optional[str] = ""
752
    ) -> Any:
753
        """Modifies the comment of an existing policy
754
755
        Arguments:
756
            policy_id: UUID of policy to modify.
757
            comment: Comment to set on a config. Default: ''
758
        """
759
        self.modify_config_set_comment(
760
            config_id=policy_id,
761
            comment=comment,
762
        )
763
764
    def modify_policy_set_scanner_preference(
765
        self, policy_id: str, name: str, *, value: Optional[str] = None
766
    ) -> Any:
767
        """Modifies the scanner preferences of an existing policy
768
769
        Arguments:
770
            policy_id: UUID of policy to modify.
771
            name: Name of the scanner preference to change
772
            value: New value for the preference. None to delete the preference
773
                and to use the default instead.
774
775
        """
776
        self.modify_config_set_scanner_preference(
777
            config_id=policy_id,
778
            name=name,
779
            value=value,
780
        )
781
782
    def modify_policy_set_nvt_selection(
783
        self, policy_id: str, family: str, nvt_oids: List[str]
784
    ) -> Any:
785
        """Modifies the selected nvts of an existing policy
786
787
        The manager updates the given family in the config to include only the
788
        given NVTs.
789
790
        Arguments:
791
            policy_id: UUID of policy to modify.
792
            family: Name of the NVT family to include NVTs from
793
            nvt_oids: List of NVTs to select for the family.
794
        """
795
        self.modify_config_set_nvt_selection(
796
            config_id=policy_id,
797
            family=family,
798
            nvt_oids=nvt_oids,
799
        )
800
801
    def modify_policy_set_family_selection(
802
        self,
803
        policy_id: str,
804
        families: List[str],
805
        *,
806
        auto_add_new_families: Optional[bool] = True,
807
        auto_add_new_nvts: Optional[bool] = True
808
    ) -> Any:
809
        """
810
        Selected the NVTs of a policy at a family level.
811
812
        Arguments:
813
            policy_id: UUID of policy to modify.
814
            families: List of NVT family names to select.
815
            auto_add_new_families: Whether new families should be added to the
816
                policy automatically. Default: True.
817
            auto_add_new_nvts: Whether new NVTs in the selected families should
818
                be added to the policy automatically. Default: True.
819
        """
820
        self.modify_config_set_family_selection(
821
            config_id=policy_id,
822
            families=families,
823
            auto_add_new_families=auto_add_new_families,
824
            auto_add_new_nvts=auto_add_new_nvts,
825
        )
826
827 View Code Duplication
    def modify_tls_certificate(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
828
        self,
829
        tls_certificate_id: str,
830
        *,
831
        name: Optional[str] = None,
832
        comment: Optional[str] = None,
833
        trust: Optional[bool] = None
834
    ) -> Any:
835
        """Modifies an existing TLS certificate.
836
837
        Arguments:
838
            tls_certificate_id: UUID of the TLS certificate to be modified.
839
            name: Name of the TLS certificate, defaulting to the MD5 fingerprint
840
            comment: Comment for the TLS certificate.
841
            trust: Whether the certificate is trusted.
842
843
        Returns:
844
            The response. See :py:meth:`send_command` for details.
845
        """
846
        if not tls_certificate_id:
847
            raise RequiredArgument(
848
                function=self.modify_tls_certificate.__name__,
849
                argument='tls_certificate_id',
850
            )
851
852
        cmd = XmlCommand("modify_tls_certificate")
853
        cmd.set_attribute("tls_certificate_id", str(tls_certificate_id))
854
855
        if comment:
856
            cmd.add_element("comment", comment)
857
858
        if name:
859
            cmd.add_element("name", name)
860
861
        if trust:
862
            cmd.add_element("trust", _to_bool(trust))
863
864
        return self._send_xml_command(cmd)
865
866
    def clone_tls_certificate(self, tls_certificate_id: str) -> Any:
867
        """Modifies an existing TLS certificate.
868
869
        Arguments:
870
            tls_certificate_id: The UUID of an existing TLS certificate
871
872
        Returns:
873
            The response. See :py:meth:`send_command` for details.
874
        """
875
        if not tls_certificate_id:
876
            raise RequiredArgument(
877
                function=self.modify_tls_certificate.__name__,
878
                argument='tls_certificate_id',
879
            )
880
881
        cmd = XmlCommand("create_tls_certificate")
882
883
        cmd.add_element("copy", tls_certificate_id)
884
885
        return self._send_xml_command(cmd)
886
887
    def get_configs(
888
        self,
889
        *,
890
        filter: Optional[str] = None,
891
        filter_id: Optional[str] = None,
892
        trash: Optional[bool] = None,
893
        details: Optional[bool] = None,
894
        families: Optional[bool] = None,
895
        preferences: Optional[bool] = None,
896
        tasks: Optional[bool] = None
897
    ) -> Any:
898
        """Request a list of scan configs
899
900
        Arguments:
901
            filter: Filter term to use for the query
902
            filter_id: UUID of an existing filter to use for the query
903
            trash: Whether to get the trashcan scan configs instead
904
            details: Whether to get config families, preferences, nvt selectors
905
                and tasks.
906
            families: Whether to include the families if no details are
907
                requested
908
            preferences: Whether to include the preferences if no details are
909
                requested
910
            tasks: Whether to get tasks using this config
911
912
        Returns:
913
            The response. See :py:meth:`send_command` for details.
914
        """
915
        return self.__get_configs(
916
            UsageType.SCAN,
917
            filter=filter,
918
            filter_id=filter_id,
919
            trash=trash,
920
            details=details,
921
            families=families,
922
            preferences=preferences,
923
            tasks=tasks,
924
        )
925
926
    def get_policies(
927
        self,
928
        *,
929
        audits: Optional[bool] = None,
930
        filter: Optional[str] = None,
931
        filter_id: Optional[str] = None,
932
        details: Optional[bool] = None,
933
        families: Optional[bool] = None,
934
        preferences: Optional[bool] = None,
935
        trash: Optional[bool] = None
936
    ) -> Any:
937
        """Request a list of policies
938
939
        Arguments:
940
            audits: Whether to get audits using the policy
941
            filter: Filter term to use for the query
942
            filter_id: UUID of an existing filter to use for the query
943
            details: Whether to get  families, preferences, nvt selectors
944
                and tasks.
945
            families: Whether to include the families if no details are
946
                requested
947
            preferences: Whether to include the preferences if no details are
948
                requested
949
            trash: Whether to get the trashcan audits instead
950
951
        Returns:
952
            The response. See :py:meth:`send_command` for details.
953
        """
954
        return self.__get_configs(
955
            UsageType.POLICY,
956
            filter=filter,
957
            filter_id=filter_id,
958
            details=details,
959
            families=families,
960
            preferences=preferences,
961
            tasks=audits,
962
            trash=trash,
963
        )
964
965
    def get_config(
966
        self, config_id: str, *, tasks: Optional[bool] = None
967
    ) -> Any:
968
        """Request a single scan config
969
970
        Arguments:
971
            config_id: UUID of an existing scan config
972
            tasks: Whether to get tasks using this config
973
974
        Returns:
975
            The response. See :py:meth:`send_command` for details.
976
        """
977
        return self.__get_config(
978
            config_id=config_id, usage_type=UsageType.SCAN, tasks=tasks
979
        )
980
981
    def get_policy(self, policy_id: str) -> Any:
982
        """Request a single policy
983
984
        Arguments:
985
            policy_id: UUID of an existing policy
986
987
        Returns:
988
            The response. See :py:meth:`send_command` for details.
989
        """
990
        return self.__get_config(policy_id, UsageType.POLICY)
991
992
    def get_tasks(
993
        self,
994
        *,
995
        filter: Optional[str] = None,
996
        filter_id: Optional[str] = None,
997
        trash: Optional[bool] = None,
998
        details: Optional[bool] = None,
999
        schedules_only: Optional[bool] = None
1000
    ) -> Any:
1001
        """Request a list of tasks
1002
1003
        Arguments:
1004
            filter: Filter term to use for the query
1005
            filter_id: UUID of an existing filter to use for the query
1006
            trash: Whether to get the trashcan tasks instead
1007
            details: Whether to include full task details
1008
            schedules_only: Whether to only include id, name and schedule
1009
                details
1010
1011
        Returns:
1012
            The response. See :py:meth:`send_command` for details.
1013
        """
1014
        return self.__get_tasks(
1015
            UsageType.SCAN,
1016
            filter=filter,
1017
            filter_id=filter_id,
1018
            trash=trash,
1019
            details=details,
1020
            schedules_only=schedules_only,
1021
        )
1022
1023
    def get_audits(
1024
        self,
1025
        *,
1026
        filter: Optional[str] = None,
1027
        filter_id: Optional[str] = None,
1028
        trash: Optional[bool] = None,
1029
        details: Optional[bool] = None,
1030
        schedules_only: Optional[bool] = None
1031
    ) -> Any:
1032
        """Request a list of audits
1033
1034
        Arguments:
1035
            filter: Filter term to use for the query
1036
            filter_id: UUID of an existing filter to use for the query
1037
            trash: Whether to get the trashcan audits instead
1038
            details: Whether to include full audit details
1039
            schedules_only: Whether to only include id, name and schedule
1040
                details
1041
1042
        Returns:
1043
            The response. See :py:meth:`send_command` for details.
1044
        """
1045
        return self.__get_tasks(
1046
            UsageType.AUDIT,
1047
            filter=filter,
1048
            filter_id=filter_id,
1049
            trash=trash,
1050
            details=details,
1051
            schedules_only=schedules_only,
1052
        )
1053
1054
    def get_task(self, task_id: str) -> Any:
1055
        """Request a single task
1056
1057
        Arguments:
1058
            task_id: UUID of an existing task
1059
1060
        Returns:
1061
            The response. See :py:meth:`send_command` for details.
1062
        """
1063
        return self.__get_task(task_id, UsageType.SCAN)
1064
1065
    def get_audit(self, audit_id: str) -> Any:
1066
        """Request a single audit
1067
1068
        Arguments:
1069
            audit_id: UUID of an existing audit
1070
1071
        Returns:
1072
            The response. See :py:meth:`send_command` for details.
1073
        """
1074
        return self.__get_task(audit_id, UsageType.AUDIT)
1075
1076
    def clone_audit(self, audit_id: str) -> Any:
1077
        """Clone an existing audit
1078
1079
        Arguments:
1080
            audit_id: UUID of existing audit to clone from
1081
1082
        Returns:
1083
            The response. See :py:meth:`send_command` for details.
1084
        """
1085
        if not audit_id:
1086
            raise RequiredArgument(
1087
                function=self.clone_audit.__name__, argument='audit_id'
1088
            )
1089
1090
        cmd = XmlCommand("create_task")
1091
        cmd.add_element("copy", audit_id)
1092
        return self._send_xml_command(cmd)
1093
1094
    def clone_policy(self, policy_id: str) -> Any:
1095
        """Clone a policy from an existing one
1096
1097
        Arguments:
1098
            policy_id: UUID of the existing policy
1099
1100
        Returns:
1101
            The response. See :py:meth:`send_command` for details.
1102
        """
1103
        if not policy_id:
1104
            raise RequiredArgument(
1105
                function=self.clone_policy.__name__, argument='policy_id'
1106
            )
1107
1108
        cmd = XmlCommand("create_config")
1109
        cmd.add_element("copy", policy_id)
1110
        return self._send_xml_command(cmd)
1111
1112
    def delete_audit(
1113
        self, audit_id: str, *, ultimate: Optional[bool] = False
1114
    ) -> Any:
1115
        """Deletes an existing audit
1116
1117
        Arguments:
1118
            audit_id: UUID of the audit to be deleted.
1119
            ultimate: Whether to remove entirely, or to the trashcan.
1120
        """
1121
        if not audit_id:
1122
            raise RequiredArgument(
1123
                function=self.delete_audit.__name__, argument='audit_id'
1124
            )
1125
1126
        cmd = XmlCommand("delete_task")
1127
        cmd.set_attribute("task_id", audit_id)
1128
        cmd.set_attribute("ultimate", _to_bool(ultimate))
1129
1130
        return self._send_xml_command(cmd)
1131
1132
    def delete_policy(
1133
        self, policy_id: str, *, ultimate: Optional[bool] = False
1134
    ) -> Any:
1135
        """Deletes an existing policy
1136
1137
        Arguments:
1138
            policy_id: UUID of the policy to be deleted.
1139
            ultimate: Whether to remove entirely, or to the trashcan.
1140
        """
1141
        if not policy_id:
1142
            raise RequiredArgument(
1143
                function=self.delete_policy.__name__, argument='policy_id'
1144
            )
1145
1146
        cmd = XmlCommand("delete_config")
1147
        cmd.set_attribute("config_id", policy_id)
1148
        cmd.set_attribute("ultimate", _to_bool(ultimate))
1149
1150
        return self._send_xml_command(cmd)
1151
1152 View Code Duplication
    def __create_task(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1153
        self,
1154
        name: str,
1155
        config_id: str,
1156
        target_id: str,
1157
        scanner_id: str,
1158
        usage_type: UsageType,
1159
        function: str,
1160
        *,
1161
        alterable: Optional[bool] = None,
1162
        hosts_ordering: Optional[HostsOrdering] = None,
1163
        schedule_id: Optional[str] = None,
1164
        alert_ids: Optional[List[str]] = None,
1165
        comment: Optional[str] = None,
1166
        schedule_periods: Optional[int] = None,
1167
        observers: Optional[List[str]] = None,
1168
        preferences: Optional[dict] = None
1169
    ) -> Any:
1170
        if not name:
1171
            raise RequiredArgument(function=function, argument='name')
1172
1173
        if not config_id:
1174
            raise RequiredArgument(function=function, argument='config_id')
1175
1176
        if not target_id:
1177
            raise RequiredArgument(function=function, argument='target_id')
1178
1179
        if not scanner_id:
1180
            raise RequiredArgument(function=function, argument='scanner_id')
1181
1182
        # don't allow to create a container task with create_task
1183
        if target_id == '0':
1184
            raise InvalidArgument(function=function, argument='target_id')
1185
1186
        cmd = XmlCommand("create_task")
1187
        cmd.add_element("name", name)
1188
        cmd.add_element("usage_type", usage_type.value)
1189
        cmd.add_element("config", attrs={"id": config_id})
1190
        cmd.add_element("target", attrs={"id": target_id})
1191
        cmd.add_element("scanner", attrs={"id": scanner_id})
1192
1193
        if comment:
1194
            cmd.add_element("comment", comment)
1195
1196
        if alterable is not None:
1197
            cmd.add_element("alterable", _to_bool(alterable))
1198
1199
        if hosts_ordering:
1200
            if not isinstance(hosts_ordering, self.types.HostsOrdering):
1201
                raise InvalidArgumentType(
1202
                    function=function,
1203
                    argument='hosts_ordering',
1204
                    arg_type=HostsOrdering.__name__,
1205
                )
1206
            cmd.add_element("hosts_ordering", hosts_ordering.value)
1207
1208
        if alert_ids:
1209
            if isinstance(alert_ids, str):
1210
                deprecation(
1211
                    "Please pass a list as alert_ids parameter to {}. "
1212
                    "Passing a string is deprecated and will be removed in "
1213
                    "future.".format(function)
1214
                )
1215
1216
                # if a single id is given as a string wrap it into a list
1217
                alert_ids = [alert_ids]
1218
            if _is_list_like(alert_ids):
1219
                # parse all given alert id's
1220
                for alert in alert_ids:
1221
                    cmd.add_element("alert", attrs={"id": str(alert)})
1222
1223
        if schedule_id:
1224
            cmd.add_element("schedule", attrs={"id": schedule_id})
1225
1226
            if schedule_periods is not None:
1227
                if (
1228
                    not isinstance(schedule_periods, numbers.Integral)
1229
                    or schedule_periods < 0
1230
                ):
1231
                    raise InvalidArgument(
1232
                        "schedule_periods must be an integer greater or equal "
1233
                        "than 0"
1234
                    )
1235
                cmd.add_element("schedule_periods", str(schedule_periods))
1236
1237
        if observers is not None:
1238
            if not _is_list_like(observers):
1239
                raise InvalidArgumentType(
1240
                    function=function, argument='observers', arg_type='list'
1241
                )
1242
1243
            # gvmd splits by comma and space
1244
            # gvmd tries to lookup each value as user name and afterwards as
1245
            # user id. So both user name and user id are possible
1246
            cmd.add_element("observers", _to_comma_list(observers))
1247
1248
        if preferences is not None:
1249
            if not isinstance(preferences, collections.abc.Mapping):
1250
                raise InvalidArgumentType(
1251
                    function=function,
1252
                    argument='preferences',
1253
                    arg_type=collections.abc.Mapping.__name__,
1254
                )
1255
1256
            _xmlprefs = cmd.add_element("preferences")
1257
            for pref_name, pref_value in preferences.items():
1258
                _xmlpref = _xmlprefs.add_element("preference")
1259
                _xmlpref.add_element("scanner_name", pref_name)
1260
                _xmlpref.add_element("value", str(pref_value))
1261
1262
        return self._send_xml_command(cmd)
1263
1264
    def __create_config(
1265
        self,
1266
        config_id: str,
1267
        name: str,
1268
        usage_type: UsageType,
1269
        function: str,
1270
        *,
1271
        comment: Optional[str] = None
1272
    ) -> Any:
1273
        if not name:
1274
            raise RequiredArgument(function=function, argument='name')
1275
1276
        if not config_id:
1277
            raise RequiredArgument(function=function, argument='config_id')
1278
1279
        cmd = XmlCommand("create_config")
1280
        if comment is not None:
1281
            cmd.add_element("comment", comment)
1282
        cmd.add_element("copy", config_id)
1283
        cmd.add_element("name", name)
1284
        cmd.add_element("usage_type", usage_type.value)
1285
        return self._send_xml_command(cmd)
1286
1287
    def __create_config_from_osp_scanner(
1288
        self,
1289
        scanner_id: str,
1290
        name: str,
1291
        usage_type: UsageType,
1292
        function: str,
1293
        *,
1294
        comment: Optional[str] = None
1295
    ) -> Any:
1296
        if not name:
1297
            raise RequiredArgument(function=function, argument='name')
1298
1299
        if not scanner_id:
1300
            raise RequiredArgument(function=function, argument='scanner_id')
1301
1302
        cmd = XmlCommand("create_config")
1303
        if comment is not None:
1304
            cmd.add_element("comment", comment)
1305
        cmd.add_element("scanner", scanner_id)
1306
        cmd.add_element("name", name)
1307
        cmd.add_element("usage_type", usage_type.value)
1308
        return self._send_xml_command(cmd)
1309
1310 View Code Duplication
    def __get_configs(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1311
        self,
1312
        usage_type: UsageType,
1313
        *,
1314
        filter: Optional[str] = None,
1315
        filter_id: Optional[str] = None,
1316
        trash: Optional[bool] = None,
1317
        details: Optional[bool] = None,
1318
        families: Optional[bool] = None,
1319
        preferences: Optional[bool] = None,
1320
        tasks: Optional[bool] = None
1321
    ) -> Any:
1322
        cmd = XmlCommand("get_configs")
1323
        cmd.set_attribute("usage_type", usage_type.value)
1324
1325
        _add_filter(cmd, filter, filter_id)
1326
1327
        if trash is not None:
1328
            cmd.set_attribute("trash", _to_bool(trash))
1329
1330
        if details is not None:
1331
            cmd.set_attribute("details", _to_bool(details))
1332
1333
        if families is not None:
1334
            cmd.set_attribute("families", _to_bool(families))
1335
1336
        if preferences is not None:
1337
            cmd.set_attribute("preferences", _to_bool(preferences))
1338
1339
        if tasks is not None:
1340
            cmd.set_attribute("tasks", _to_bool(tasks))
1341
1342
        return self._send_xml_command(cmd)
1343
1344
    def __get_config(
1345
        self,
1346
        config_id: str,
1347
        usage_type: UsageType,
1348
        *,
1349
        tasks: Optional[bool] = None
1350
    ) -> Any:
1351
        if not config_id:
1352
            raise RequiredArgument(
1353
                function=self.get_config.__name__, argument='config_id'
1354
            )
1355
1356
        cmd = XmlCommand("get_configs")
1357
        cmd.set_attribute("config_id", config_id)
1358
1359
        cmd.set_attribute("usage_type", usage_type.value)
1360
1361
        if tasks is not None:
1362
            cmd.set_attribute("tasks", _to_bool(tasks))
1363
1364
        # for single entity always request all details
1365
        cmd.set_attribute("details", "1")
1366
1367
        return self._send_xml_command(cmd)
1368
1369
    def __get_tasks(
1370
        self,
1371
        usage_type: UsageType,
1372
        *,
1373
        filter: Optional[str] = None,
1374
        filter_id: Optional[str] = None,
1375
        trash: Optional[bool] = None,
1376
        details: Optional[bool] = None,
1377
        schedules_only: Optional[bool] = None
1378
    ) -> Any:
1379
        cmd = XmlCommand("get_tasks")
1380
        cmd.set_attribute("usage_type", usage_type.value)
1381
1382
        _add_filter(cmd, filter, filter_id)
1383
1384
        if trash is not None:
1385
            cmd.set_attribute("trash", _to_bool(trash))
1386
1387
        if details is not None:
1388
            cmd.set_attribute("details", _to_bool(details))
1389
1390
        if schedules_only is not None:
1391
            cmd.set_attribute("schedules_only", _to_bool(schedules_only))
1392
1393
        return self._send_xml_command(cmd)
1394
1395
    def __get_task(self, task_id: str, usage_type: UsageType) -> Any:
1396
        if not task_id:
1397
            raise RequiredArgument(
1398
                function=self.get_task.__name__, argument='task_id'
1399
            )
1400
1401
        cmd = XmlCommand("get_tasks")
1402
        cmd.set_attribute("task_id", task_id)
1403
        cmd.set_attribute("usage_type", usage_type.value)
1404
1405
        # for single entity always request all details
1406
        cmd.set_attribute("details", "1")
1407
        return self._send_xml_command(cmd)
1408