Completed
Push — master ( 3b9245...f5377d )
by Jaspar
17s queued 13s
created

gvm.protocols.gmpv9.gmpv9.GmpV9Mixin.__get_task()   A

Complexity

Conditions 2

Size

Total Lines 13
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 9
nop 3
dl 0
loc 13
rs 9.95
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 - 2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
# pylint: disable=arguments-differ, redefined-builtin, too-many-lines
20
21
"""
22
Module for communication with gvmd in `Greenbone Management Protocol version 9`_
23
24
.. _Greenbone Management Protocol version 9:
25
    https://docs.greenbone.net/API/GMP/gmp-9.0.html
26
"""
27
import collections
28
import numbers
29
30
from typing import Any, List, Optional, Callable
31
32
from gvm.errors import InvalidArgument, InvalidArgumentType, RequiredArgument
33
from gvm.utils import deprecation
34
from gvm.xml import XmlCommand
35
36
from gvm.protocols.base import GvmProtocol
37
from gvm.connections import GvmConnection
38
from gvm.protocols.gmpv7.gmpv7 import (
39
    _to_bool,
40
    _add_filter,
41
    _is_list_like,
42
    _to_comma_list,
43
)
44
45
from . import types
46
from .types import *  # pylint: disable=unused-wildcard-import, wildcard-import
47
from .types import _UsageType as UsageType
48
49
_EMPTY_POLICY_ID = '085569ce-73ed-11df-83c3-002264764cea'
50
51
52
def _check_event(
53
    event: AlertEvent, condition: AlertCondition, method: AlertMethod
54
):
55
    if event == AlertEvent.TASK_RUN_STATUS_CHANGED:
56
        if not condition:
57
            raise RequiredArgument(
58
                "condition is required for event {}".format(event.name)
59
            )
60
61
        if not method:
62
            raise RequiredArgument(
63
                "method is required for event {}".format(event.name)
64
            )
65
66
        if condition not in (
67
            AlertCondition.ALWAYS,
68
            AlertCondition.FILTER_COUNT_CHANGED,
69
            AlertCondition.FILTER_COUNT_AT_LEAST,
70
            AlertCondition.SEVERITY_AT_LEAST,
71
            AlertCondition.SEVERITY_CHANGED,
72
        ):
73
            raise InvalidArgument(
74
                "Invalid condition {} for event {}".format(
75
                    condition.name, event.name
76
                )
77
            )
78
    elif event in (
79
        AlertEvent.NEW_SECINFO_ARRIVED,
80
        AlertEvent.UPDATED_SECINFO_ARRIVED,
81
    ):
82
        if not condition:
83
            raise RequiredArgument(
84
                "condition is required for event {}".format(event.name)
85
            )
86
87
        if not method:
88
            raise RequiredArgument(
89
                "method is required for event {}".format(event.name)
90
            )
91
92
        if condition != AlertCondition.ALWAYS:
93
            raise InvalidArgument(
94
                "Invalid condition {} for event {}".format(
95
                    condition.name, event.name
96
                )
97
            )
98
        if method not in (
99
            AlertMethod.SCP,
100
            AlertMethod.SEND,
101
            AlertMethod.SMB,
102
            AlertMethod.SNMP,
103
            AlertMethod.SYSLOG,
104
            AlertMethod.EMAIL,
105
        ):
106
            raise InvalidArgument(
107
                "Invalid method {} for event {}".format(method.name, event.name)
108
            )
109
    elif event in (
110
        AlertEvent.TICKET_RECEIVED,
111
        AlertEvent.OWNED_TICKET_CHANGED,
112
        AlertEvent.ASSIGNED_TICKET_CHANGED,
113
    ):
114
        if not condition:
115
            raise RequiredArgument(
116
                "condition is required for event {}".format(event.name)
117
            )
118
119
        if not method:
120
            raise RequiredArgument(
121
                "method is required for event {}".format(event.name)
122
            )
123
        if condition != AlertCondition.ALWAYS:
124
            raise InvalidArgument(
125
                "Invalid condition {} for event {}".format(
126
                    condition.name, event.name
127
                )
128
            )
129
        if method not in (
130
            AlertMethod.EMAIL,
131
            AlertMethod.START_TASK,
132
            AlertMethod.SYSLOG,
133
        ):
134
            raise InvalidArgument(
135
                "Invalid method {} for event {}".format(method.name, event.name)
136
            )
137
    elif event is not None:
138
        raise InvalidArgument('Invalid event "{}"'.format(event.name))
139
140
141
class GmpV9Mixin(GvmProtocol):
142
143
    types = types
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable types does not seem to be defined.
Loading history...
144
145
    def __init__(
146
        self,
147
        connection: GvmConnection,
148
        *,
149
        transform: Optional[Callable[[str], Any]] = None
150
    ):
151
        super().__init__(connection, transform=transform)
152
153
        # Is authenticated on gvmd
154
        self._authenticated = False
155
156 View Code Duplication
    def create_alert(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
157
        self,
158
        name: str,
159
        condition: AlertCondition,
160
        event: AlertEvent,
161
        method: AlertMethod,
162
        *,
163
        method_data: Optional[dict] = None,
164
        event_data: Optional[dict] = None,
165
        condition_data: Optional[dict] = None,
166
        filter_id: Optional[int] = None,
167
        comment: Optional[str] = None
168
    ) -> Any:
169
        """Create a new alert
170
171
        Arguments:
172
            name: Name of the new Alert
173
            condition: The condition that must be satisfied for the alert
174
                to occur; if the event is either 'Updated SecInfo arrived' or
175
                'New SecInfo arrived', condition must be 'Always'. Otherwise,
176
                condition can also be on of 'Severity at least', 'Filter count
177
                changed' or 'Filter count at least'.
178
            event: The event that must happen for the alert to occur, one
179
                of 'Task run status changed', 'Updated SecInfo arrived' or 'New
180
                SecInfo arrived'
181
            method: The method by which the user is alerted, one of 'SCP',
182
                'Send', 'SMB', 'SNMP', 'Syslog' or 'Email'; if the event is
183
                neither 'Updated SecInfo arrived' nor 'New SecInfo arrived',
184
                method can also be one of 'Start Task', 'HTTP Get', 'Sourcefire
185
                Connector' or 'verinice Connector'.
186
            condition_data: Data that defines the condition
187
            event_data: Data that defines the event
188
            method_data: Data that defines the method
189
            filter_id: Filter to apply when executing alert
190
            comment: Comment for the alert
191
192
        Returns:
193
            The response. See :py:meth:`send_command` for details.
194
        """
195
        if not name:
196
            raise RequiredArgument(
197
                function=self.create_alert.__name__, argument='name'
198
            )
199
200
        if not condition:
201
            raise RequiredArgument(
202
                function=self.create_alert.__name__, argument='condition'
203
            )
204
205
        if not event:
206
            raise RequiredArgument(
207
                function=self.create_alert.__name__, argument='event'
208
            )
209
210
        if not method:
211
            raise RequiredArgument(
212
                function=self.create_alert.__name__, argument='method'
213
            )
214
215
        if not isinstance(condition, AlertCondition):
216
            raise InvalidArgumentType(
217
                function=self.create_alert.__name__,
218
                argument='condition',
219
                arg_type=AlertCondition.__name__,
220
            )
221
222
        if not isinstance(event, AlertEvent):
223
            raise InvalidArgumentType(
224
                function=self.create_alert.__name__,
225
                argument='even',
226
                arg_type=AlertEvent.__name__,
227
            )
228
229
        if not isinstance(method, AlertMethod):
230
            raise InvalidArgumentType(
231
                function=self.create_alert.__name__,
232
                argument='method',
233
                arg_type=AlertMethod.__name__,
234
            )
235
236
        _check_event(event, condition, method)
237
238
        cmd = XmlCommand("create_alert")
239
        cmd.add_element("name", name)
240
241
        conditions = cmd.add_element("condition", condition.value)
242
243
        if condition_data is not None:
244
            for key, value in condition_data.items():
245
                _data = conditions.add_element("data", value)
246
                _data.add_element("name", key)
247
248
        events = cmd.add_element("event", event.value)
249
250
        if event_data is not None:
251
            for key, value in event_data.items():
252
                _data = events.add_element("data", value)
253
                _data.add_element("name", key)
254
255
        methods = cmd.add_element("method", method.value)
256
257
        if method_data is not None:
258
            for key, value in method_data.items():
259
                _data = methods.add_element("data", value)
260
                _data.add_element("name", key)
261
262
        if filter_id:
263
            cmd.add_element("filter", attrs={"id": filter_id})
264
265
        if comment:
266
            cmd.add_element("comment", comment)
267
268
        return self._send_xml_command(cmd)
269
270 View Code Duplication
    def create_audit(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
271
        self,
272
        name: str,
273
        policy_id: str,
274
        target_id: str,
275
        scanner_id: str,
276
        *,
277
        alterable: Optional[bool] = None,
278
        hosts_ordering: Optional[HostsOrdering] = None,
279
        schedule_id: Optional[str] = None,
280
        alert_ids: Optional[List[str]] = None,
281
        comment: Optional[str] = None,
282
        schedule_periods: Optional[int] = None,
283
        observers: Optional[List[str]] = None,
284
        preferences: Optional[dict] = None
285
    ) -> Any:
286
        """Create a new audit task
287
288
        Arguments:
289
            name: Name of the new audit
290
            policy_id: UUID of policy to use by the audit
291
            target_id: UUID of target to be scanned
292
            scanner_id: UUID of scanner to use for scanning the target
293
            comment: Comment for the audit
294
            alterable: Whether the task should be alterable
295
            alert_ids: List of UUIDs for alerts to be applied to the audit
296
            hosts_ordering: The order hosts are scanned in
297
            schedule_id: UUID of a schedule when the audit should be run.
298
            schedule_periods: A limit to the number of times the audit will be
299
                scheduled, or 0 for no limit
300
            observers: List of names or ids of users which should be allowed to
301
                observe this audit
302
            preferences: Name/Value pairs of scanner preferences.
303
304
        Returns:
305
            The response. See :py:meth:`send_command` for details.
306
        """
307
308
        return self.__create_task(
309
            name=name,
310
            config_id=policy_id,
311
            target_id=target_id,
312
            scanner_id=scanner_id,
313
            usage_type=UsageType.AUDIT,
314
            function=self.create_audit.__name__,
315
            alterable=alterable,
316
            hosts_ordering=hosts_ordering,
317
            schedule_id=schedule_id,
318
            alert_ids=alert_ids,
319
            comment=comment,
320
            schedule_periods=schedule_periods,
321
            observers=observers,
322
            preferences=preferences,
323
        )
324
325
    def create_config(
326
        self, config_id: str, name: str, *, comment: Optional[str] = None
327
    ) -> Any:
328
        """Create a new scan config
329
330
        Arguments:
331
            config_id: UUID of the existing scan config
332
            name: Name of the new scan config
333
            comment: A comment on the config
334
335
        Returns:
336
            The response. See :py:meth:`send_command` for details.
337
        """
338
        return self.__create_config(
339
            config_id=config_id,
340
            name=name,
341
            comment=comment,
342
            usage_type=UsageType.SCAN,
343
            function=self.create_config.__name__,
344
        )
345
346
    def create_config_from_osp_scanner(
347
        self, scanner_id: str, name: str, *, comment: Optional[str] = None
348
    ) -> Any:
349
        """Create a new scan config from an ospd scanner.
350
351
        Create config by retrieving the expected preferences from the given
352
        scanner via OSP.
353
354
        Arguments:
355
            scanner_id: UUID of an OSP scanner to get config data from
356
            name: Name of the new scan config
357
            comment: A comment on the config
358
359
        Returns:
360
            The response. See :py:meth:`send_command` for details.
361
        """
362
        return self.__create_config_from_osp_scanner(
363
            scanner_id=scanner_id,
364
            name=name,
365
            comment=comment,
366
            usage_type=UsageType.SCAN,
367
            function=self.create_config.__name__,
368
        )
369
370
    def create_policy(
371
        self, name: str, *, policy_id: str = None, comment: Optional[str] = None
372
    ) -> Any:
373
        """Create a new policy config
374
375
        Arguments:
376
            name: Name of the new policy
377
            policy_id: UUID of an existing policy as base. By default the empty
378
                policy is used.
379
            comment: A comment on the policy
380
381
        Returns:
382
            The response. See :py:meth:`send_command` for details.
383
        """
384
        if policy_id is None:
385
            policy_id = _EMPTY_POLICY_ID
386
        return self.__create_config(
387
            config_id=policy_id,
388
            name=name,
389
            comment=comment,
390
            usage_type=UsageType.POLICY,
391
            function=self.create_policy.__name__,
392
        )
393
394 View Code Duplication
    def create_task(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
395
        self,
396
        name: str,
397
        config_id: str,
398
        target_id: str,
399
        scanner_id: str,
400
        *,
401
        alterable: Optional[bool] = None,
402
        hosts_ordering: Optional[HostsOrdering] = None,
403
        schedule_id: Optional[str] = None,
404
        alert_ids: Optional[List[str]] = None,
405
        comment: Optional[str] = None,
406
        schedule_periods: Optional[int] = None,
407
        observers: Optional[List[str]] = None,
408
        preferences: Optional[dict] = None
409
    ) -> Any:
410
        """Create a new scan task
411
412
        Arguments:
413
            name: Name of the task
414
            config_id: UUID of scan config to use by the task
415
            target_id: UUID of target to be scanned
416
            scanner_id: UUID of scanner to use for scanning the target
417
            comment: Comment for the task
418
            alterable: Whether the task should be alterable
419
            alert_ids: List of UUIDs for alerts to be applied to the task
420
            hosts_ordering: The order hosts are scanned in
421
            schedule_id: UUID of a schedule when the task should be run.
422
            schedule_periods: A limit to the number of times the task will be
423
                scheduled, or 0 for no limit
424
            observers: List of names or ids of users which should be allowed to
425
                observe this task
426
            preferences: Name/Value pairs of scanner preferences.
427
428
        Returns:
429
            The response. See :py:meth:`send_command` for details.
430
        """
431
        return self.__create_task(
432
            name=name,
433
            config_id=config_id,
434
            target_id=target_id,
435
            scanner_id=scanner_id,
436
            usage_type=UsageType.SCAN,
437
            function=self.create_task.__name__,
438
            alterable=alterable,
439
            hosts_ordering=hosts_ordering,
440
            schedule_id=schedule_id,
441
            alert_ids=alert_ids,
442
            comment=comment,
443
            schedule_periods=schedule_periods,
444
            observers=observers,
445
            preferences=preferences,
446
        )
447
448 View Code Duplication
    def create_tls_certificate(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
449
        self,
450
        name: str,
451
        certificate: str,
452
        *,
453
        comment: Optional[str] = None,
454
        trust: Optional[bool] = None
455
    ) -> Any:
456
        """Create a new TLS certificate
457
458
        Arguments:
459
            name: Name of the TLS certificate, defaulting to the MD5
460
                fingerprint.
461
            certificate: The Base64 encoded certificate data (x.509 DER or PEM).
462
            comment: Comment for the TLS certificate.
463
            trust: Whether the certificate is trusted.
464
465
        Returns:
466
            The response. See :py:meth:`send_command` for details.
467
        """
468
        if not name:
469
            raise RequiredArgument(
470
                function=self.create_tls_certificate.__name__, argument='name'
471
            )
472
        if not certificate:
473
            raise RequiredArgument(
474
                function=self.create_tls_certificate.__name__,
475
                argument='certificate',
476
            )
477
478
        cmd = XmlCommand("create_tls_certificate")
479
480
        if comment:
481
            cmd.add_element("comment", comment)
482
483
        cmd.add_element("name", name)
484
        cmd.add_element("certificate", certificate)
485
486
        if trust:
487
            cmd.add_element("trust", _to_bool(trust))
488
489
        return self._send_xml_command(cmd)
490
491 View Code Duplication
    def get_tls_certificates(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
492
        self,
493
        *,
494
        filter: Optional[str] = None,
495
        filter_id: Optional[str] = None,
496
        include_certificate_data: Optional[bool] = None,
497
        details: Optional[bool] = None
498
    ) -> Any:
499
        """Request a list of TLS certificates
500
501
        Arguments:
502
            filter: Filter term to use for the query
503
            filter_id: UUID of an existing filter to use for the query
504
            include_certificate_data: Whether to include the certificate data in
505
                the response
506
507
        Returns:
508
            The response. See :py:meth:`send_command` for details.
509
        """
510
511
        cmd = XmlCommand("get_tls_certificates")
512
513
        _add_filter(cmd, filter, filter_id)
514
515
        if details is not None:
516
            cmd.set_attribute("details", _to_bool(details))
517
518
        if include_certificate_data is not None:
519
            cmd.set_attribute(
520
                "include_certificate_data", _to_bool(include_certificate_data)
521
            )
522
523
        return self._send_xml_command(cmd)
524
525
    def get_tls_certificate(self, tls_certificate_id: str) -> Any:
526
        """Request a single TLS certificate
527
528
        Arguments:
529
            tls_certificate_id: UUID of an existing TLS certificate
530
531
        Returns:
532
            The response. See :py:meth:`send_command` for details.
533
        """
534
        cmd = XmlCommand("get_tls_certificates")
535
536
        if not tls_certificate_id:
537
            raise RequiredArgument(
538
                function=self.get_tls_certificate.__name__,
539
                argument='tls_certificate_id',
540
            )
541
542
        cmd.set_attribute("tls_certificate_id", tls_certificate_id)
543
544
        # for single tls certificate always request cert data
545
        cmd.set_attribute("include_certificate_data", "1")
546
547
        # for single entity always request all details
548
        cmd.set_attribute("details", "1")
549
550
        return self._send_xml_command(cmd)
551
552 View Code Duplication
    def modify_alert(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
553
        self,
554
        alert_id: str,
555
        *,
556
        name: Optional[str] = None,
557
        comment: Optional[str] = None,
558
        filter_id: Optional[str] = None,
559
        event: Optional[AlertEvent] = None,
560
        event_data: Optional[dict] = None,
561
        condition: Optional[AlertCondition] = None,
562
        condition_data: Optional[dict] = None,
563
        method: Optional[AlertMethod] = None,
564
        method_data: Optional[dict] = None
565
    ) -> Any:
566
        """Modifies an existing alert.
567
568
        Arguments:
569
            alert_id: UUID of the alert to be modified.
570
            name: Name of the Alert.
571
            condition: The condition that must be satisfied for the alert to
572
                occur. If the event is either 'Updated SecInfo
573
                arrived' or 'New SecInfo arrived', condition must be 'Always'.
574
                Otherwise, condition can also be on of 'Severity at least',
575
                'Filter count changed' or 'Filter count at least'.
576
            condition_data: Data that defines the condition
577
            event: The event that must happen for the alert to occur, one of
578
                'Task run status changed', 'Updated SecInfo arrived' or
579
                'New SecInfo arrived'
580
            event_data: Data that defines the event
581
            method: The method by which the user is alerted, one of 'SCP',
582
                'Send', 'SMB', 'SNMP', 'Syslog' or 'Email';
583
                if the event is neither 'Updated SecInfo arrived' nor
584
                'New SecInfo arrived', method can also be one of 'Start Task',
585
                'HTTP Get', 'Sourcefire Connector' or 'verinice Connector'.
586
            method_data: Data that defines the method
587
            filter_id: Filter to apply when executing alert
588
            comment: Comment for the alert
589
590
        Returns:
591
            The response. See :py:meth:`send_command` for details.
592
        """
593
594
        if not alert_id:
595
            raise RequiredArgument(
596
                function=self.modify_alert.__name__, argument='alert_id'
597
            )
598
599
        cmd = XmlCommand("modify_alert")
600
        cmd.set_attribute("alert_id", str(alert_id))
601
602
        if name:
603
            cmd.add_element("name", name)
604
605
        if comment:
606
            cmd.add_element("comment", comment)
607
608
        if filter_id:
609
            cmd.add_element("filter", attrs={"id": filter_id})
610
611
        if condition:
612
            if not isinstance(condition, AlertCondition):
613
                raise InvalidArgumentType(
614
                    function=self.modify_alert.__name__,
615
                    argument='condition',
616
                    arg_type=AlertCondition.__name__,
617
                )
618
619
            conditions = cmd.add_element("condition", condition.value)
620
621
            if condition_data is not None:
622
                for key, value in condition_data.items():
623
                    _data = conditions.add_element("data", value)
624
                    _data.add_element("name", key)
625
626
        if method:
627
            if not isinstance(method, AlertMethod):
628
                raise InvalidArgumentType(
629
                    function=self.modify_alert.__name__,
630
                    argument='method',
631
                    arg_type=AlertMethod.__name__,
632
                )
633
634
            methods = cmd.add_element("method", method.value)
635
636
            if method_data is not None:
637
                for key, value in method_data.items():
638
                    _data = methods.add_element("data", value)
639
                    _data.add_element("name", key)
640
641
        if event:
642
            if not isinstance(event, AlertEvent):
643
                raise InvalidArgumentType(
644
                    function=self.modify_alert.__name__,
645
                    argument='event',
646
                    arg_type=AlertEvent.__name__,
647
                )
648
649
            _check_event(event, condition, method)
650
651
            events = cmd.add_element("event", event.value)
652
653
            if event_data is not None:
654
                for key, value in event_data.items():
655
                    _data = events.add_element("data", value)
656
                    _data.add_element("name", key)
657
658
        return self._send_xml_command(cmd)
659
660 View Code Duplication
    def modify_tls_certificate(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
661
        self,
662
        tls_certificate_id: str,
663
        *,
664
        name: Optional[str] = None,
665
        comment: Optional[str] = None,
666
        trust: Optional[bool] = None
667
    ) -> Any:
668
        """Modifies an existing TLS certificate.
669
670
        Arguments:
671
            tls_certificate_id: UUID of the TLS certificate to be modified.
672
            name: Name of the TLS certificate, defaulting to the MD5 fingerprint
673
            comment: Comment for the TLS certificate.
674
            trust: Whether the certificate is trusted.
675
676
        Returns:
677
            The response. See :py:meth:`send_command` for details.
678
        """
679
        if not tls_certificate_id:
680
            raise RequiredArgument(
681
                function=self.modify_tls_certificate.__name__,
682
                argument='tls_certificate_id',
683
            )
684
685
        cmd = XmlCommand("modify_tls_certificate")
686
        cmd.set_attribute("tls_certificate_id", str(tls_certificate_id))
687
688
        if comment:
689
            cmd.add_element("comment", comment)
690
691
        if name:
692
            cmd.add_element("name", name)
693
694
        if trust:
695
            cmd.add_element("trust", _to_bool(trust))
696
697
        return self._send_xml_command(cmd)
698
699
    def clone_tls_certificate(self, tls_certificate_id: str) -> Any:
700
        """Modifies an existing TLS certificate.
701
702
        Arguments:
703
            tls_certificate_id: The UUID of an existing TLS certificate
704
705
        Returns:
706
            The response. See :py:meth:`send_command` for details.
707
        """
708
        if not tls_certificate_id:
709
            raise RequiredArgument(
710
                function=self.modify_tls_certificate.__name__,
711
                argument='tls_certificate_id',
712
            )
713
714
        cmd = XmlCommand("create_tls_certificate")
715
716
        cmd.add_element("copy", tls_certificate_id)
717
718
        return self._send_xml_command(cmd)
719
720
    def get_configs(
721
        self,
722
        *,
723
        filter: Optional[str] = None,
724
        filter_id: Optional[str] = None,
725
        trash: Optional[bool] = None,
726
        details: Optional[bool] = None,
727
        families: Optional[bool] = None,
728
        preferences: Optional[bool] = None,
729
        tasks: Optional[bool] = None
730
    ) -> Any:
731
        """Request a list of scan configs
732
733
        Arguments:
734
            filter: Filter term to use for the query
735
            filter_id: UUID of an existing filter to use for the query
736
            trash: Whether to get the trashcan scan configs instead
737
            details: Whether to get config families, preferences, nvt selectors
738
                and tasks.
739
            families: Whether to include the families if no details are
740
                requested
741
            preferences: Whether to include the preferences if no details are
742
                requested
743
            tasks: Whether to get tasks using this config
744
745
        Returns:
746
            The response. See :py:meth:`send_command` for details.
747
        """
748
        return self.__get_configs(
749
            UsageType.SCAN,
750
            filter=filter,
751
            filter_id=filter_id,
752
            trash=trash,
753
            details=details,
754
            families=families,
755
            preferences=preferences,
756
            tasks=tasks,
757
        )
758
759
    def get_policies(
760
        self,
761
        *,
762
        audits: Optional[bool] = None,
763
        filter: Optional[str] = None,
764
        filter_id: Optional[str] = None,
765
        details: Optional[bool] = None,
766
        families: Optional[bool] = None,
767
        preferences: Optional[bool] = None,
768
        trash: Optional[bool] = None
769
    ) -> Any:
770
        """Request a list of policies
771
772
        Arguments:
773
            audits: Whether to get audits using the policy
774
            filter: Filter term to use for the query
775
            filter_id: UUID of an existing filter to use for the query
776
            details: Whether to get  families, preferences, nvt selectors
777
                and tasks.
778
            families: Whether to include the families if no details are
779
                requested
780
            preferences: Whether to include the preferences if no details are
781
                requested
782
            trash: Whether to get the trashcan audits instead
783
784
        Returns:
785
            The response. See :py:meth:`send_command` for details.
786
        """
787
        return self.__get_configs(
788
            UsageType.POLICY,
789
            filter=filter,
790
            filter_id=filter_id,
791
            details=details,
792
            families=families,
793
            preferences=preferences,
794
            tasks=audits,
795
            trash=trash,
796
        )
797
798
    def get_config(
799
        self, config_id: str, *, tasks: Optional[bool] = None
800
    ) -> Any:
801
        """Request a single scan config
802
803
        Arguments:
804
            config_id: UUID of an existing scan config
805
            tasks: Whether to get tasks using this config
806
807
        Returns:
808
            The response. See :py:meth:`send_command` for details.
809
        """
810
        return self.__get_config(
811
            config_id=config_id, usage_type=UsageType.SCAN, tasks=tasks
812
        )
813
814
    def get_policy(self, policy_id: str) -> Any:
815
        """Request a single policy
816
817
        Arguments:
818
            policy_id: UUID of an existing policy
819
820
        Returns:
821
            The response. See :py:meth:`send_command` for details.
822
        """
823
        return self.__get_config(policy_id, UsageType.POLICY)
824
825
    def get_tasks(
826
        self,
827
        *,
828
        filter: Optional[str] = None,
829
        filter_id: Optional[str] = None,
830
        trash: Optional[bool] = None,
831
        details: Optional[bool] = None,
832
        schedules_only: Optional[bool] = None
833
    ) -> Any:
834
        """Request a list of tasks
835
836
        Arguments:
837
            filter: Filter term to use for the query
838
            filter_id: UUID of an existing filter to use for the query
839
            trash: Whether to get the trashcan tasks instead
840
            details: Whether to include full task details
841
            schedules_only: Whether to only include id, name and schedule
842
                details
843
844
        Returns:
845
            The response. See :py:meth:`send_command` for details.
846
        """
847
        return self.__get_tasks(
848
            UsageType.SCAN,
849
            filter=filter,
850
            filter_id=filter_id,
851
            trash=trash,
852
            details=details,
853
            schedules_only=schedules_only,
854
        )
855
856
    def get_audits(
857
        self,
858
        *,
859
        filter: Optional[str] = None,
860
        filter_id: Optional[str] = None,
861
        trash: Optional[bool] = None,
862
        details: Optional[bool] = None,
863
        schedules_only: Optional[bool] = None
864
    ) -> Any:
865
        """Request a list of audits
866
867
        Arguments:
868
            filter: Filter term to use for the query
869
            filter_id: UUID of an existing filter to use for the query
870
            trash: Whether to get the trashcan audits instead
871
            details: Whether to include full audit details
872
            schedules_only: Whether to only include id, name and schedule
873
                details
874
875
        Returns:
876
            The response. See :py:meth:`send_command` for details.
877
        """
878
        return self.__get_tasks(
879
            UsageType.AUDIT,
880
            filter=filter,
881
            filter_id=filter_id,
882
            trash=trash,
883
            details=details,
884
            schedules_only=schedules_only,
885
        )
886
887
    def get_task(self, task_id: str) -> Any:
888
        """Request a single task
889
890
        Arguments:
891
            task_id: UUID of an existing task
892
893
        Returns:
894
            The response. See :py:meth:`send_command` for details.
895
        """
896
        return self.__get_task(task_id, UsageType.SCAN)
897
898
    def get_audit(self, audit_id: str) -> Any:
899
        """Request a single audit
900
901
        Arguments:
902
            audit_id: UUID of an existing audit
903
904
        Returns:
905
            The response. See :py:meth:`send_command` for details.
906
        """
907
        return self.__get_task(audit_id, UsageType.AUDIT)
908
909
    def clone_audit(self, audit_id: str) -> Any:
910
        """Clone an existing audit
911
912
        Arguments:
913
            audit_id: UUID of existing audit to clone from
914
915
        Returns:
916
            The response. See :py:meth:`send_command` for details.
917
        """
918
        if not audit_id:
919
            raise RequiredArgument(
920
                function=self.clone_audit.__name__, argument='audit_id'
921
            )
922
923
        cmd = XmlCommand("create_task")
924
        cmd.add_element("copy", audit_id)
925
        return self._send_xml_command(cmd)
926
927
    def clone_policy(self, policy_id: str) -> Any:
928
        """Clone a policy from an existing one
929
930
        Arguments:
931
            policy_id: UUID of the existing policy
932
933
        Returns:
934
            The response. See :py:meth:`send_command` for details.
935
        """
936
        if not policy_id:
937
            raise RequiredArgument(
938
                function=self.clone_policy.__name__, argument='policy_id'
939
            )
940
941
        cmd = XmlCommand("create_config")
942
        cmd.add_element("copy", policy_id)
943
        return self._send_xml_command(cmd)
944
945
    def delete_audit(
946
        self, audit_id: str, *, ultimate: Optional[bool] = False
947
    ) -> Any:
948
        """Deletes an existing audit
949
950
        Arguments:
951
            audit_id: UUID of the audit to be deleted.
952
            ultimate: Whether to remove entirely, or to the trashcan.
953
        """
954
        if not audit_id:
955
            raise RequiredArgument(
956
                function=self.delete_audit.__name__, argument='audit_id'
957
            )
958
959
        cmd = XmlCommand("delete_task")
960
        cmd.set_attribute("task_id", audit_id)
961
        cmd.set_attribute("ultimate", _to_bool(ultimate))
962
963
        return self._send_xml_command(cmd)
964
965
    def delete_policy(
966
        self, policy_id: str, *, ultimate: Optional[bool] = False
967
    ) -> Any:
968
        """Deletes an existing policy
969
970
        Arguments:
971
            policy_id: UUID of the policy to be deleted.
972
            ultimate: Whether to remove entirely, or to the trashcan.
973
        """
974
        if not policy_id:
975
            raise RequiredArgument(
976
                function=self.delete_policy.__name__, argument='policy_id'
977
            )
978
979
        cmd = XmlCommand("delete_config")
980
        cmd.set_attribute("config_id", policy_id)
981
        cmd.set_attribute("ultimate", _to_bool(ultimate))
982
983
        return self._send_xml_command(cmd)
984
985 View Code Duplication
    def __create_task(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
986
        self,
987
        name: str,
988
        config_id: str,
989
        target_id: str,
990
        scanner_id: str,
991
        usage_type: UsageType,
992
        function: str,
993
        *,
994
        alterable: Optional[bool] = None,
995
        hosts_ordering: Optional[HostsOrdering] = None,
996
        schedule_id: Optional[str] = None,
997
        alert_ids: Optional[List[str]] = None,
998
        comment: Optional[str] = None,
999
        schedule_periods: Optional[int] = None,
1000
        observers: Optional[List[str]] = None,
1001
        preferences: Optional[dict] = None
1002
    ) -> Any:
1003
        if not name:
1004
            raise RequiredArgument(function=function, argument='name')
1005
1006
        if not config_id:
1007
            raise RequiredArgument(function=function, argument='config_id')
1008
1009
        if not target_id:
1010
            raise RequiredArgument(function=function, argument='target_id')
1011
1012
        if not scanner_id:
1013
            raise RequiredArgument(function=function, argument='scanner_id')
1014
1015
        # don't allow to create a container task with create_task
1016
        if target_id == '0':
1017
            raise InvalidArgument(function=function, argument='target_id')
1018
1019
        cmd = XmlCommand("create_task")
1020
        cmd.add_element("name", name)
1021
        cmd.add_element("usage_type", usage_type.value)
1022
        cmd.add_element("config", attrs={"id": config_id})
1023
        cmd.add_element("target", attrs={"id": target_id})
1024
        cmd.add_element("scanner", attrs={"id": scanner_id})
1025
1026
        if comment:
1027
            cmd.add_element("comment", comment)
1028
1029
        if alterable is not None:
1030
            cmd.add_element("alterable", _to_bool(alterable))
1031
1032
        if hosts_ordering:
1033
            if not isinstance(hosts_ordering, self.types.HostsOrdering):
1034
                raise InvalidArgumentType(
1035
                    function=function,
1036
                    argument='hosts_ordering',
1037
                    arg_type=HostsOrdering.__name__,
1038
                )
1039
            cmd.add_element("hosts_ordering", hosts_ordering.value)
1040
1041
        if alert_ids:
1042
            if isinstance(alert_ids, str):
1043
                deprecation(
1044
                    "Please pass a list as alert_ids parameter to {}. "
1045
                    "Passing a string is deprecated and will be removed in "
1046
                    "future.".format(function)
1047
                )
1048
1049
                # if a single id is given as a string wrap it into a list
1050
                alert_ids = [alert_ids]
1051
            if _is_list_like(alert_ids):
1052
                # parse all given alert id's
1053
                for alert in alert_ids:
1054
                    cmd.add_element("alert", attrs={"id": str(alert)})
1055
1056
        if schedule_id:
1057
            cmd.add_element("schedule", attrs={"id": schedule_id})
1058
1059
            if schedule_periods is not None:
1060
                if (
1061
                    not isinstance(schedule_periods, numbers.Integral)
1062
                    or schedule_periods < 0
1063
                ):
1064
                    raise InvalidArgument(
1065
                        "schedule_periods must be an integer greater or equal "
1066
                        "than 0"
1067
                    )
1068
                cmd.add_element("schedule_periods", str(schedule_periods))
1069
1070
        if observers is not None:
1071
            if not _is_list_like(observers):
1072
                raise InvalidArgumentType(
1073
                    function=function, argument='observers', arg_type='list'
1074
                )
1075
1076
            # gvmd splits by comma and space
1077
            # gvmd tries to lookup each value as user name and afterwards as
1078
            # user id. So both user name and user id are possible
1079
            cmd.add_element("observers", _to_comma_list(observers))
1080
1081
        if preferences is not None:
1082
            if not isinstance(preferences, collections.abc.Mapping):
1083
                raise InvalidArgumentType(
1084
                    function=function,
1085
                    argument='preferences',
1086
                    arg_type=collections.abc.Mapping.__name__,
1087
                )
1088
1089
            _xmlprefs = cmd.add_element("preferences")
1090
            for pref_name, pref_value in preferences.items():
1091
                _xmlpref = _xmlprefs.add_element("preference")
1092
                _xmlpref.add_element("scanner_name", pref_name)
1093
                _xmlpref.add_element("value", str(pref_value))
1094
1095
        return self._send_xml_command(cmd)
1096
1097
    def __create_config(
1098
        self,
1099
        config_id: str,
1100
        name: str,
1101
        usage_type: UsageType,
1102
        function: str,
1103
        *,
1104
        comment: Optional[str] = None
1105
    ) -> Any:
1106
        if not name:
1107
            raise RequiredArgument(function=function, argument='name')
1108
1109
        if not config_id:
1110
            raise RequiredArgument(function=function, argument='config_id')
1111
1112
        cmd = XmlCommand("create_config")
1113
        if comment is not None:
1114
            cmd.add_element("comment", comment)
1115
        cmd.add_element("copy", config_id)
1116
        cmd.add_element("name", name)
1117
        cmd.add_element("usage_type", usage_type.value)
1118
        return self._send_xml_command(cmd)
1119
1120
    def __create_config_from_osp_scanner(
1121
        self,
1122
        scanner_id: str,
1123
        name: str,
1124
        usage_type: UsageType,
1125
        function: str,
1126
        *,
1127
        comment: Optional[str] = None
1128
    ) -> Any:
1129
        if not name:
1130
            raise RequiredArgument(function=function, argument='name')
1131
1132
        if not scanner_id:
1133
            raise RequiredArgument(function=function, argument='scanner_id')
1134
1135
        cmd = XmlCommand("create_config")
1136
        if comment is not None:
1137
            cmd.add_element("comment", comment)
1138
        cmd.add_element("scanner", scanner_id)
1139
        cmd.add_element("name", name)
1140
        cmd.add_element("usage_type", usage_type.value)
1141
        return self._send_xml_command(cmd)
1142
1143 View Code Duplication
    def __get_configs(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1144
        self,
1145
        usage_type: UsageType,
1146
        *,
1147
        filter: Optional[str] = None,
1148
        filter_id: Optional[str] = None,
1149
        trash: Optional[bool] = None,
1150
        details: Optional[bool] = None,
1151
        families: Optional[bool] = None,
1152
        preferences: Optional[bool] = None,
1153
        tasks: Optional[bool] = None
1154
    ) -> Any:
1155
        cmd = XmlCommand("get_configs")
1156
        cmd.set_attribute("usage_type", usage_type.value)
1157
1158
        _add_filter(cmd, filter, filter_id)
1159
1160
        if trash is not None:
1161
            cmd.set_attribute("trash", _to_bool(trash))
1162
1163
        if details is not None:
1164
            cmd.set_attribute("details", _to_bool(details))
1165
1166
        if families is not None:
1167
            cmd.set_attribute("families", _to_bool(families))
1168
1169
        if preferences is not None:
1170
            cmd.set_attribute("preferences", _to_bool(preferences))
1171
1172
        if tasks is not None:
1173
            cmd.set_attribute("tasks", _to_bool(tasks))
1174
1175
        return self._send_xml_command(cmd)
1176
1177
    def __get_config(
1178
        self,
1179
        config_id: str,
1180
        usage_type: UsageType,
1181
        *,
1182
        tasks: Optional[bool] = None
1183
    ) -> Any:
1184
        if not config_id:
1185
            raise RequiredArgument(
1186
                function=self.get_config.__name__, argument='config_id'
1187
            )
1188
1189
        cmd = XmlCommand("get_configs")
1190
        cmd.set_attribute("config_id", config_id)
1191
1192
        cmd.set_attribute("usage_type", usage_type.value)
1193
1194
        if tasks is not None:
1195
            cmd.set_attribute("tasks", _to_bool(tasks))
1196
1197
        # for single entity always request all details
1198
        cmd.set_attribute("details", "1")
1199
1200
        return self._send_xml_command(cmd)
1201
1202
    def __get_tasks(
1203
        self,
1204
        usage_type: UsageType,
1205
        *,
1206
        filter: Optional[str] = None,
1207
        filter_id: Optional[str] = None,
1208
        trash: Optional[bool] = None,
1209
        details: Optional[bool] = None,
1210
        schedules_only: Optional[bool] = None
1211
    ) -> Any:
1212
        cmd = XmlCommand("get_tasks")
1213
        cmd.set_attribute("usage_type", usage_type.value)
1214
1215
        _add_filter(cmd, filter, filter_id)
1216
1217
        if trash is not None:
1218
            cmd.set_attribute("trash", _to_bool(trash))
1219
1220
        if details is not None:
1221
            cmd.set_attribute("details", _to_bool(details))
1222
1223
        if schedules_only is not None:
1224
            cmd.set_attribute("schedules_only", _to_bool(schedules_only))
1225
1226
        return self._send_xml_command(cmd)
1227
1228
    def __get_task(self, task_id: str, usage_type: UsageType) -> Any:
1229
        if not task_id:
1230
            raise RequiredArgument(
1231
                function=self.get_task.__name__, argument='task_id'
1232
            )
1233
1234
        cmd = XmlCommand("get_tasks")
1235
        cmd.set_attribute("task_id", task_id)
1236
        cmd.set_attribute("usage_type", usage_type.value)
1237
1238
        # for single entity always request all details
1239
        cmd.set_attribute("details", "1")
1240
        return self._send_xml_command(cmd)
1241