Completed
Push — master ( ff96d5...d0ddf5 )
by Björn
21s queued 14s
created

GmpV9Mixin.create_alert()   F

Complexity

Conditions 16

Size

Total Lines 113
Code Lines 62

Duplication

Lines 113
Ratio 100 %

Importance

Changes 0
Metric Value
cc 16
eloc 62
nop 11
dl 113
loc 113
rs 2.4
c 0
b 0
f 0

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like gvm.protocols.gmpv9.gmpv9.GmpV9Mixin.create_alert() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 - 2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
# pylint: disable=arguments-differ, redefined-builtin, too-many-lines
20
21
"""
22
Module for communication with gvmd in `Greenbone Management Protocol version 9`_
23
24
.. _Greenbone Management Protocol version 9:
25
    https://docs.greenbone.net/API/GMP/gmp-9.0.html
26
"""
27
import collections
28
import numbers
29
30
from typing import Any, List, Optional, Callable
31
32
from gvm.errors import InvalidArgument, InvalidArgumentType, RequiredArgument
33
from gvm.utils import deprecation
34
from gvm.xml import XmlCommand
35
36
from gvm.protocols.base import GvmProtocol
37
from gvm.connections import GvmConnection
38
from gvm.protocols.gmpv7.gmpv7 import (
39
    _to_bool,
40
    _add_filter,
41
    _is_list_like,
42
    _to_comma_list,
43
)
44
45
from . import types
46
from .types import *  # pylint: disable=unused-wildcard-import, wildcard-import
47
from .types import _UsageType as UsageType
48
49
_EMPTY_POLICY_ID = '085569ce-73ed-11df-83c3-002264764cea'
50
51
52 View Code Duplication
def _check_event(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
53
    event: AlertEvent, condition: AlertCondition, method: AlertMethod
54
):
55
    if event == AlertEvent.TASK_RUN_STATUS_CHANGED:
56
        if not condition:
57
            raise RequiredArgument(
58
                "condition is required for event {}".format(event.name),
59
            )
60
61
        if not method:
62
            raise RequiredArgument(
63
                "method is required for event {}".format(event.name),
64
            )
65
66
        if condition not in (
67
            AlertCondition.ALWAYS,
68
            AlertCondition.FILTER_COUNT_CHANGED,
69
            AlertCondition.FILTER_COUNT_AT_LEAST,
70
            AlertCondition.SEVERITY_AT_LEAST,
71
        ):
72
            raise InvalidArgument(
73
                "Invalid condition {} for event {}".format(
74
                    condition.name, event.name
75
                )
76
            )
77
        if method not in (
78
            AlertMethod.SCP,
79
            AlertMethod.SEND,
80
            AlertMethod.SMB,
81
            AlertMethod.SNMP,
82
            AlertMethod.SYSLOG,
83
            AlertMethod.EMAIL,
84
            AlertMethod.START_TASK,
85
            AlertMethod.HTTP_GET,
86
            AlertMethod.SOURCEFIRE_CONNECTOR,
87
            AlertMethod.VERINICE_CONNECTOR,
88
            AlertMethod.TIPPINGPOINT,
89
            AlertMethod.ALEMBA_VFIRE,
90
        ):
91
            raise InvalidArgument(
92
                "Invalid method {} for event {}".format(method.name, event.name)
93
            )
94
    elif event in (
95
        AlertEvent.NEW_SECINFO_ARRIVED,
96
        AlertEvent.UPDATED_SECINFO_ARRIVED,
97
    ):
98
        if not condition:
99
            raise RequiredArgument(
100
                "condition is required for event {}".format(event.name),
101
            )
102
103
        if not method:
104
            raise RequiredArgument(
105
                "method is required for event {}".format(event.name),
106
            )
107
108
        if condition not in (AlertCondition.ALWAYS,):
109
            raise InvalidArgument(
110
                "Invalid condition {} for event {}".format(
111
                    condition.name, event.name
112
                )
113
            )
114
        if method not in (
115
            AlertMethod.SCP,
116
            AlertMethod.SEND,
117
            AlertMethod.SMB,
118
            AlertMethod.SNMP,
119
            AlertMethod.SYSLOG,
120
            AlertMethod.EMAIL,
121
        ):
122
            raise InvalidArgument(
123
                "Invalid method {} for event {}".format(method.name, event.name)
124
            )
125
    elif event is not None:
126
        raise InvalidArgument('Invalid event "{}"'.format(event.name))
127
128
129
class GmpV9Mixin(GvmProtocol):
130
131
    types = types
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable types does not seem to be defined.
Loading history...
132
133
    def __init__(
134
        self,
135
        connection: GvmConnection,
136
        *,
137
        transform: Optional[Callable[[str], Any]] = None
138
    ):
139
        super().__init__(connection, transform=transform)
140
141
        # Is authenticated on gvmd
142
        self._authenticated = False
143
144 View Code Duplication
    def create_alert(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
145
        self,
146
        name: str,
147
        condition: AlertCondition,
148
        event: AlertEvent,
149
        method: AlertMethod,
150
        *,
151
        method_data: Optional[dict] = None,
152
        event_data: Optional[dict] = None,
153
        condition_data: Optional[dict] = None,
154
        filter_id: Optional[int] = None,
155
        comment: Optional[str] = None
156
    ) -> Any:
157
        """Create a new alert
158
159
        Arguments:
160
            name: Name of the new Alert
161
            condition: The condition that must be satisfied for the alert
162
                to occur; if the event is either 'Updated SecInfo arrived' or
163
                'New SecInfo arrived', condition must be 'Always'. Otherwise,
164
                condition can also be on of 'Severity at least', 'Filter count
165
                changed' or 'Filter count at least'.
166
            event: The event that must happen for the alert to occur, one
167
                of 'Task run status changed', 'Updated SecInfo arrived' or 'New
168
                SecInfo arrived'
169
            method: The method by which the user is alerted, one of 'SCP',
170
                'Send', 'SMB', 'SNMP', 'Syslog' or 'Email'; if the event is
171
                neither 'Updated SecInfo arrived' nor 'New SecInfo arrived',
172
                method can also be one of 'Start Task', 'HTTP Get', 'Sourcefire
173
                Connector' or 'verinice Connector'.
174
            condition_data: Data that defines the condition
175
            event_data: Data that defines the event
176
            method_data: Data that defines the method
177
            filter_id: Filter to apply when executing alert
178
            comment: Comment for the alert
179
180
        Returns:
181
            The response. See :py:meth:`send_command` for details.
182
        """
183
        if not name:
184
            raise RequiredArgument(
185
                function=self.create_alert.__name__, argument='name'
186
            )
187
188
        if not condition:
189
            raise RequiredArgument(
190
                function=self.create_alert.__name__, argument='condition'
191
            )
192
193
        if not event:
194
            raise RequiredArgument(
195
                function=self.create_alert.__name__, argument='event'
196
            )
197
198
        if not method:
199
            raise RequiredArgument(
200
                function=self.create_alert.__name__, argument='method'
201
            )
202
203
        if not isinstance(condition, AlertCondition):
204
            raise InvalidArgumentType(
205
                function=self.create_alert.__name__,
206
                argument='condition',
207
                arg_type=AlertCondition.__name__,
208
            )
209
210
        if not isinstance(event, AlertEvent):
211
            raise InvalidArgumentType(
212
                function=self.create_alert.__name__,
213
                argument='even',
214
                arg_type=AlertEvent.__name__,
215
            )
216
217
        if not isinstance(method, AlertMethod):
218
            raise InvalidArgumentType(
219
                function=self.create_alert.__name__,
220
                argument='method',
221
                arg_type=AlertMethod.__name__,
222
            )
223
224
        _check_event(event, condition, method)
225
226
        cmd = XmlCommand("create_alert")
227
        cmd.add_element("name", name)
228
229
        conditions = cmd.add_element("condition", condition.value)
230
231
        if condition_data is not None:
232
            for key, value in condition_data.items():
233
                _data = conditions.add_element("data", value)
234
                _data.add_element("name", key)
235
236
        events = cmd.add_element("event", event.value)
237
238
        if event_data is not None:
239
            for key, value in event_data.items():
240
                _data = events.add_element("data", value)
241
                _data.add_element("name", key)
242
243
        methods = cmd.add_element("method", method.value)
244
245
        if method_data is not None:
246
            for key, value in method_data.items():
247
                _data = methods.add_element("data", value)
248
                _data.add_element("name", key)
249
250
        if filter_id:
251
            cmd.add_element("filter", attrs={"id": filter_id})
252
253
        if comment:
254
            cmd.add_element("comment", comment)
255
256
        return self._send_xml_command(cmd)
257
258 View Code Duplication
    def create_audit(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
259
        self,
260
        name: str,
261
        policy_id: str,
262
        target_id: str,
263
        scanner_id: str,
264
        *,
265
        alterable: Optional[bool] = None,
266
        hosts_ordering: Optional[HostsOrdering] = None,
267
        schedule_id: Optional[str] = None,
268
        alert_ids: Optional[List[str]] = None,
269
        comment: Optional[str] = None,
270
        schedule_periods: Optional[int] = None,
271
        observers: Optional[List[str]] = None,
272
        preferences: Optional[dict] = None
273
    ) -> Any:
274
        """Create a new audit task
275
276
        Arguments:
277
            name: Name of the new audit
278
            policy_id: UUID of policy to use by the audit
279
            target_id: UUID of target to be scanned
280
            scanner_id: UUID of scanner to use for scanning the target
281
            comment: Comment for the audit
282
            alterable: Whether the task should be alterable
283
            alert_ids: List of UUIDs for alerts to be applied to the audit
284
            hosts_ordering: The order hosts are scanned in
285
            schedule_id: UUID of a schedule when the audit should be run.
286
            schedule_periods: A limit to the number of times the audit will be
287
                scheduled, or 0 for no limit
288
            observers: List of names or ids of users which should be allowed to
289
                observe this audit
290
            preferences: Name/Value pairs of scanner preferences.
291
292
        Returns:
293
            The response. See :py:meth:`send_command` for details.
294
        """
295
296
        return self.__create_task(
297
            name=name,
298
            config_id=policy_id,
299
            target_id=target_id,
300
            scanner_id=scanner_id,
301
            usage_type=UsageType.AUDIT,
302
            function=self.create_audit.__name__,
303
            alterable=alterable,
304
            hosts_ordering=hosts_ordering,
305
            schedule_id=schedule_id,
306
            alert_ids=alert_ids,
307
            comment=comment,
308
            schedule_periods=schedule_periods,
309
            observers=observers,
310
            preferences=preferences,
311
        )
312
313
    def create_config(self, config_id: str, name: str) -> Any:
314
        """Create a new scan config
315
316
        Arguments:
317
            config_id: UUID of the existing scan config
318
            name: Name of the new scan config
319
320
        Returns:
321
            The response. See :py:meth:`send_command` for details.
322
        """
323
        return self.__create_config(
324
            config_id=config_id,
325
            name=name,
326
            usage_type=UsageType.SCAN,
327
            function=self.create_config.__name__,
328
        )
329
330
    def create_policy(self, name: str, *, policy_id: str = None) -> Any:
331
        """Create a new policy config
332
333
        Arguments:
334
            name: Name of the new policy
335
            policy_id: UUID of an existing policy as base. By default the empty
336
                policy is used.
337
338
        Returns:
339
            The response. See :py:meth:`send_command` for details.
340
        """
341
        if policy_id is None:
342
            policy_id = _EMPTY_POLICY_ID
343
        return self.__create_config(
344
            config_id=policy_id,
345
            name=name,
346
            usage_type=UsageType.POLICY,
347
            function=self.create_policy.__name__,
348
        )
349
350 View Code Duplication
    def create_task(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
351
        self,
352
        name: str,
353
        config_id: str,
354
        target_id: str,
355
        scanner_id: str,
356
        *,
357
        alterable: Optional[bool] = None,
358
        hosts_ordering: Optional[HostsOrdering] = None,
359
        schedule_id: Optional[str] = None,
360
        alert_ids: Optional[List[str]] = None,
361
        comment: Optional[str] = None,
362
        schedule_periods: Optional[int] = None,
363
        observers: Optional[List[str]] = None,
364
        preferences: Optional[dict] = None
365
    ) -> Any:
366
        """Create a new scan task
367
368
        Arguments:
369
            name: Name of the task
370
            config_id: UUID of scan config to use by the task
371
            target_id: UUID of target to be scanned
372
            scanner_id: UUID of scanner to use for scanning the target
373
            comment: Comment for the task
374
            alterable: Whether the task should be alterable
375
            alert_ids: List of UUIDs for alerts to be applied to the task
376
            hosts_ordering: The order hosts are scanned in
377
            schedule_id: UUID of a schedule when the task should be run.
378
            schedule_periods: A limit to the number of times the task will be
379
                scheduled, or 0 for no limit
380
            observers: List of names or ids of users which should be allowed to
381
                observe this task
382
            preferences: Name/Value pairs of scanner preferences.
383
384
        Returns:
385
            The response. See :py:meth:`send_command` for details.
386
        """
387
        return self.__create_task(
388
            name=name,
389
            config_id=config_id,
390
            target_id=target_id,
391
            scanner_id=scanner_id,
392
            usage_type=UsageType.SCAN,
393
            function=self.create_task.__name__,
394
            alterable=alterable,
395
            hosts_ordering=hosts_ordering,
396
            schedule_id=schedule_id,
397
            alert_ids=alert_ids,
398
            comment=comment,
399
            schedule_periods=schedule_periods,
400
            observers=observers,
401
            preferences=preferences,
402
        )
403
404 View Code Duplication
    def create_tls_certificate(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
405
        self,
406
        name: str,
407
        certificate: str,
408
        *,
409
        comment: Optional[str] = None,
410
        trust: Optional[bool] = None
411
    ) -> Any:
412
        """Create a new TLS certificate
413
414
        Arguments:
415
            name: Name of the TLS certificate, defaulting to the MD5
416
                fingerprint.
417
            certificate: The Base64 encoded certificate data (x.509 DER or PEM).
418
            comment: Comment for the TLS certificate.
419
            trust: Whether the certificate is trusted.
420
421
        Returns:
422
            The response. See :py:meth:`send_command` for details.
423
        """
424
        if not name:
425
            raise RequiredArgument(
426
                function=self.create_tls_certificate.__name__, argument='name'
427
            )
428
        if not certificate:
429
            raise RequiredArgument(
430
                function=self.create_tls_certificate.__name__,
431
                argument='certificate',
432
            )
433
434
        cmd = XmlCommand("create_tls_certificate")
435
436
        if comment:
437
            cmd.add_element("comment", comment)
438
439
        cmd.add_element("name", name)
440
        cmd.add_element("certificate", certificate)
441
442
        if trust:
443
            cmd.add_element("trust", _to_bool(trust))
444
445
        return self._send_xml_command(cmd)
446
447
    def get_tls_certificates(
448
        self,
449
        *,
450
        filter: Optional[str] = None,
451
        filter_id: Optional[str] = None,
452
        include_certificate_data: Optional[bool] = None
453
    ) -> Any:
454
        """Request a list of TLS certificates
455
456
        Arguments:
457
            filter: Filter term to use for the query
458
            filter_id: UUID of an existing filter to use for the query
459
            include_certificate_data: Whether to include the certificate data in
460
                the response
461
462
        Returns:
463
            The response. See :py:meth:`send_command` for details.
464
        """
465
466
        cmd = XmlCommand("get_tls_certificates")
467
468
        _add_filter(cmd, filter, filter_id)
469
470
        if include_certificate_data is not None:
471
            cmd.set_attribute(
472
                "include_certificate_data", _to_bool(include_certificate_data)
473
            )
474
475
        return self._send_xml_command(cmd)
476
477
    def get_tls_certificate(self, tls_certificate_id: str) -> Any:
478
        """Request a single TLS certificate
479
480
        Arguments:
481
            tls_certificate_id: UUID of an existing TLS certificate
482
483
        Returns:
484
            The response. See :py:meth:`send_command` for details.
485
        """
486
        if not tls_certificate_id:
487
            raise RequiredArgument(
488
                function=self.get_tls_certificate.__name__,
489
                argument='tls_certificate_id',
490
            )
491
492
        cmd = XmlCommand("get_tls_certificates")
493
        cmd.set_attribute("tls_certificate_id", tls_certificate_id)
494
495
        # for single tls certificate always request cert data
496
        cmd.set_attribute("include_certificate_data", "1")
497
        return self._send_xml_command(cmd)
498
499 View Code Duplication
    def modify_alert(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
500
        self,
501
        alert_id: str,
502
        *,
503
        name: Optional[str] = None,
504
        comment: Optional[str] = None,
505
        filter_id: Optional[str] = None,
506
        event: Optional[AlertEvent] = None,
507
        event_data: Optional[dict] = None,
508
        condition: Optional[AlertCondition] = None,
509
        condition_data: Optional[dict] = None,
510
        method: Optional[AlertMethod] = None,
511
        method_data: Optional[dict] = None
512
    ) -> Any:
513
        """Modifies an existing alert.
514
515
        Arguments:
516
            alert_id: UUID of the alert to be modified.
517
            name: Name of the Alert.
518
            condition: The condition that must be satisfied for the alert to
519
                occur. If the event is either 'Updated SecInfo
520
                arrived' or 'New SecInfo arrived', condition must be 'Always'.
521
                Otherwise, condition can also be on of 'Severity at least',
522
                'Filter count changed' or 'Filter count at least'.
523
            condition_data: Data that defines the condition
524
            event: The event that must happen for the alert to occur, one of
525
                'Task run status changed', 'Updated SecInfo arrived' or
526
                'New SecInfo arrived'
527
            event_data: Data that defines the event
528
            method: The method by which the user is alerted, one of 'SCP',
529
                'Send', 'SMB', 'SNMP', 'Syslog' or 'Email';
530
                if the event is neither 'Updated SecInfo arrived' nor
531
                'New SecInfo arrived', method can also be one of 'Start Task',
532
                'HTTP Get', 'Sourcefire Connector' or 'verinice Connector'.
533
            method_data: Data that defines the method
534
            filter_id: Filter to apply when executing alert
535
            comment: Comment for the alert
536
537
        Returns:
538
            The response. See :py:meth:`send_command` for details.
539
        """
540
541
        if not alert_id:
542
            raise RequiredArgument(
543
                function=self.modify_alert.__name__, argument='alert_id'
544
            )
545
546
        cmd = XmlCommand("modify_alert")
547
        cmd.set_attribute("alert_id", str(alert_id))
548
549
        if name:
550
            cmd.add_element("name", name)
551
552
        if comment:
553
            cmd.add_element("comment", comment)
554
555
        if filter_id:
556
            cmd.add_element("filter", attrs={"id": filter_id})
557
558
        if condition:
559
            if not isinstance(condition, AlertCondition):
560
                raise InvalidArgumentType(
561
                    function=self.modify_alert.__name__,
562
                    argument='condition',
563
                    arg_type=AlertCondition.__name__,
564
                )
565
566
            conditions = cmd.add_element("condition", condition.value)
567
568
            if condition_data is not None:
569
                for key, value in condition_data.items():
570
                    _data = conditions.add_element("data", value)
571
                    _data.add_element("name", key)
572
573
        if method:
574
            if not isinstance(method, AlertMethod):
575
                raise InvalidArgumentType(
576
                    function=self.modify_alert.__name__,
577
                    argument='method',
578
                    arg_type=AlertMethod.__name__,
579
                )
580
581
            methods = cmd.add_element("method", method.value)
582
583
            if method_data is not None:
584
                for key, value in method_data.items():
585
                    _data = methods.add_element("data", value)
586
                    _data.add_element("name", key)
587
588
        if event:
589
            if not isinstance(event, AlertEvent):
590
                raise InvalidArgumentType(
591
                    function=self.modify_alert.__name__,
592
                    argument='event',
593
                    arg_type=AlertEvent.__name__,
594
                )
595
596
            _check_event(event, condition, method)
597
598
            events = cmd.add_element("event", event.value)
599
600
            if event_data is not None:
601
                for key, value in event_data.items():
602
                    _data = events.add_element("data", value)
603
                    _data.add_element("name", key)
604
605
        return self._send_xml_command(cmd)
606
607 View Code Duplication
    def modify_tls_certificate(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
608
        self,
609
        tls_certificate_id: str,
610
        *,
611
        name: Optional[str] = None,
612
        comment: Optional[str] = None,
613
        trust: Optional[bool] = None
614
    ) -> Any:
615
        """Modifies an existing TLS certificate.
616
617
        Arguments:
618
            tls_certificate_id: UUID of the TLS certificate to be modified.
619
            name: Name of the TLS certificate, defaulting to the MD5 fingerprint
620
            comment: Comment for the TLS certificate.
621
            trust: Whether the certificate is trusted.
622
623
        Returns:
624
            The response. See :py:meth:`send_command` for details.
625
        """
626
        if not tls_certificate_id:
627
            raise RequiredArgument(
628
                function=self.modify_tls_certificate.__name__,
629
                argument='tls_certificate_id',
630
            )
631
632
        cmd = XmlCommand("modify_tls_certificate")
633
        cmd.set_attribute("tls_certificate_id", str(tls_certificate_id))
634
635
        if comment:
636
            cmd.add_element("comment", comment)
637
638
        if name:
639
            cmd.add_element("name", name)
640
641
        if trust:
642
            cmd.add_element("trust", _to_bool(trust))
643
644
        return self._send_xml_command(cmd)
645
646
    def clone_tls_certificate(self, tls_certificate_id: str) -> Any:
647
        """Modifies an existing TLS certificate.
648
649
        Arguments:
650
            tls_certificate_id: The UUID of an existing TLS certificate
651
652
        Returns:
653
            The response. See :py:meth:`send_command` for details.
654
        """
655
        if not tls_certificate_id:
656
            raise RequiredArgument(
657
                function=self.modify_tls_certificate.__name__,
658
                argument='tls_certificate_id',
659
            )
660
661
        cmd = XmlCommand("create_tls_certificate")
662
663
        cmd.add_element("copy", tls_certificate_id)
664
665
        return self._send_xml_command(cmd)
666
667
    def get_configs(
668
        self,
669
        *,
670
        filter: Optional[str] = None,
671
        filter_id: Optional[str] = None,
672
        trash: Optional[bool] = None,
673
        details: Optional[bool] = None,
674
        families: Optional[bool] = None,
675
        preferences: Optional[bool] = None,
676
        tasks: Optional[bool] = None
677
    ) -> Any:
678
        """Request a list of scan configs
679
680
        Arguments:
681
            filter: Filter term to use for the query
682
            filter_id: UUID of an existing filter to use for the query
683
            trash: Whether to get the trashcan scan configs instead
684
            details: Whether to get config families, preferences, nvt selectors
685
                and tasks.
686
            families: Whether to include the families if no details are
687
                requested
688
            preferences: Whether to include the preferences if no details are
689
                requested
690
            tasks: Whether to get tasks using this config
691
692
        Returns:
693
            The response. See :py:meth:`send_command` for details.
694
        """
695
        return self.__get_configs(
696
            UsageType.SCAN,
697
            filter=filter,
698
            filter_id=filter_id,
699
            trash=trash,
700
            details=details,
701
            families=families,
702
            preferences=preferences,
703
            tasks=tasks,
704
        )
705
706
    def get_policies(
707
        self,
708
        *,
709
        audits: Optional[bool] = None,
710
        filter: Optional[str] = None,
711
        filter_id: Optional[str] = None,
712
        details: Optional[bool] = None,
713
        families: Optional[bool] = None,
714
        preferences: Optional[bool] = None,
715
        trash: Optional[bool] = None
716
    ) -> Any:
717
        """Request a list of policies
718
719
        Arguments:
720
            audits: Whether to get audits using the policy
721
            filter: Filter term to use for the query
722
            filter_id: UUID of an existing filter to use for the query
723
            details: Whether to get  families, preferences, nvt selectors
724
                and tasks.
725
            families: Whether to include the families if no details are
726
                requested
727
            preferences: Whether to include the preferences if no details are
728
                requested
729
            trash: Whether to get the trashcan audits instead
730
731
        Returns:
732
            The response. See :py:meth:`send_command` for details.
733
        """
734
        return self.__get_configs(
735
            UsageType.POLICY,
736
            filter=filter,
737
            filter_id=filter_id,
738
            details=details,
739
            families=families,
740
            preferences=preferences,
741
            tasks=audits,
742
            trash=trash,
743
        )
744
745
    def get_config(self, config_id: str) -> Any:
746
        """Request a single scan config
747
748
        Arguments:
749
            config_id: UUID of an existing scan config
750
751
        Returns:
752
            The response. See :py:meth:`send_command` for details.
753
        """
754
        return self.__get_config(config_id, UsageType.SCAN)
755
756
    def get_policy(self, policy_id: str) -> Any:
757
        """Request a single policy
758
759
        Arguments:
760
            policy_id: UUID of an existing policy
761
762
        Returns:
763
            The response. See :py:meth:`send_command` for details.
764
        """
765
        return self.__get_config(policy_id, UsageType.POLICY)
766
767
    def get_tasks(
768
        self,
769
        *,
770
        filter: Optional[str] = None,
771
        filter_id: Optional[str] = None,
772
        trash: Optional[bool] = None,
773
        details: Optional[bool] = None,
774
        schedules_only: Optional[bool] = None
775
    ) -> Any:
776
        """Request a list of tasks
777
778
        Arguments:
779
            filter: Filter term to use for the query
780
            filter_id: UUID of an existing filter to use for the query
781
            trash: Whether to get the trashcan tasks instead
782
            details: Whether to include full task details
783
            schedules_only: Whether to only include id, name and schedule
784
                details
785
786
        Returns:
787
            The response. See :py:meth:`send_command` for details.
788
        """
789
        return self.__get_tasks(
790
            UsageType.SCAN,
791
            filter=filter,
792
            filter_id=filter_id,
793
            trash=trash,
794
            details=details,
795
            schedules_only=schedules_only,
796
        )
797
798
    def get_audits(
799
        self,
800
        *,
801
        filter: Optional[str] = None,
802
        filter_id: Optional[str] = None,
803
        trash: Optional[bool] = None,
804
        details: Optional[bool] = None,
805
        schedules_only: Optional[bool] = None
806
    ) -> Any:
807
        """Request a list of audits
808
809
        Arguments:
810
            filter: Filter term to use for the query
811
            filter_id: UUID of an existing filter to use for the query
812
            trash: Whether to get the trashcan audits instead
813
            details: Whether to include full audit details
814
            schedules_only: Whether to only include id, name and schedule
815
                details
816
817
        Returns:
818
            The response. See :py:meth:`send_command` for details.
819
        """
820
        return self.__get_tasks(
821
            UsageType.AUDIT,
822
            filter=filter,
823
            filter_id=filter_id,
824
            trash=trash,
825
            details=details,
826
            schedules_only=schedules_only,
827
        )
828
829
    def get_task(self, task_id: str) -> Any:
830
        """Request a single task
831
832
        Arguments:
833
            task_id: UUID of an existing task
834
835
        Returns:
836
            The response. See :py:meth:`send_command` for details.
837
        """
838
        return self.__get_task(task_id, UsageType.SCAN)
839
840
    def get_audit(self, audit_id: str) -> Any:
841
        """Request a single audit
842
843
        Arguments:
844
            audit_id: UUID of an existing audit
845
846
        Returns:
847
            The response. See :py:meth:`send_command` for details.
848
        """
849
        return self.__get_task(audit_id, UsageType.AUDIT)
850
851
    def clone_audit(self, audit_id: str) -> Any:
852
        """Clone an existing audit
853
854
        Arguments:
855
            audit_id: UUID of existing audit to clone from
856
857
        Returns:
858
            The response. See :py:meth:`send_command` for details.
859
        """
860
        if not audit_id:
861
            raise RequiredArgument(
862
                function=self.clone_audit.__name__, argument='audit_id'
863
            )
864
865
        cmd = XmlCommand("create_task")
866
        cmd.add_element("copy", audit_id)
867
        return self._send_xml_command(cmd)
868
869
    def clone_policy(self, policy_id: str) -> Any:
870
        """Clone a policy from an existing one
871
872
        Arguments:
873
            policy_id: UUID of the existing policy
874
875
        Returns:
876
            The response. See :py:meth:`send_command` for details.
877
        """
878
        if not policy_id:
879
            raise RequiredArgument(
880
                function=self.clone_policy.__name__, argument='policy_id'
881
            )
882
883
        cmd = XmlCommand("create_config")
884
        cmd.add_element("copy", policy_id)
885
        return self._send_xml_command(cmd)
886
887
    def delete_audit(
888
        self, audit_id: str, *, ultimate: Optional[bool] = False
889
    ) -> Any:
890
        """Deletes an existing audit
891
892
        Arguments:
893
            audit_id: UUID of the audit to be deleted.
894
            ultimate: Whether to remove entirely, or to the trashcan.
895
        """
896
        if not audit_id:
897
            raise RequiredArgument(
898
                function=self.delete_audit.__name__, argument='audit_id'
899
            )
900
901
        cmd = XmlCommand("delete_task")
902
        cmd.set_attribute("task_id", audit_id)
903
        cmd.set_attribute("ultimate", _to_bool(ultimate))
904
905
        return self._send_xml_command(cmd)
906
907
    def delete_policy(
908
        self, policy_id: str, *, ultimate: Optional[bool] = False
909
    ) -> Any:
910
        """Deletes an existing policy
911
912
        Arguments:
913
            policy_id: UUID of the policy to be deleted.
914
            ultimate: Whether to remove entirely, or to the trashcan.
915
        """
916
        if not policy_id:
917
            raise RequiredArgument(
918
                function=self.delete_policy.__name__, argument='policy_id'
919
            )
920
921
        cmd = XmlCommand("delete_config")
922
        cmd.set_attribute("config_id", policy_id)
923
        cmd.set_attribute("ultimate", _to_bool(ultimate))
924
925
        return self._send_xml_command(cmd)
926
927 View Code Duplication
    def __create_task(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
928
        self,
929
        name: str,
930
        config_id: str,
931
        target_id: str,
932
        scanner_id: str,
933
        usage_type: UsageType,
934
        function: str,
935
        *,
936
        alterable: Optional[bool] = None,
937
        hosts_ordering: Optional[HostsOrdering] = None,
938
        schedule_id: Optional[str] = None,
939
        alert_ids: Optional[List[str]] = None,
940
        comment: Optional[str] = None,
941
        schedule_periods: Optional[int] = None,
942
        observers: Optional[List[str]] = None,
943
        preferences: Optional[dict] = None
944
    ) -> Any:
945
        if not name:
946
            raise RequiredArgument(function=function, argument='name')
947
948
        if not config_id:
949
            raise RequiredArgument(function=function, argument='config_id')
950
951
        if not target_id:
952
            raise RequiredArgument(function=function, argument='target_id')
953
954
        if not scanner_id:
955
            raise RequiredArgument(function=function, argument='scanner_id')
956
957
        # don't allow to create a container task with create_task
958
        if target_id == '0':
959
            raise InvalidArgument(function=function, argument='target_id')
960
961
        cmd = XmlCommand("create_task")
962
        cmd.add_element("name", name)
963
        cmd.add_element("usage_type", usage_type.value)
964
        cmd.add_element("config", attrs={"id": config_id})
965
        cmd.add_element("target", attrs={"id": target_id})
966
        cmd.add_element("scanner", attrs={"id": scanner_id})
967
968
        if comment:
969
            cmd.add_element("comment", comment)
970
971
        if alterable is not None:
972
            cmd.add_element("alterable", _to_bool(alterable))
973
974
        if hosts_ordering:
975
            if not isinstance(hosts_ordering, self.types.HostsOrdering):
976
                raise InvalidArgumentType(
977
                    function=function,
978
                    argument='hosts_ordering',
979
                    arg_type=HostsOrdering.__name__,
980
                )
981
            cmd.add_element("hosts_ordering", hosts_ordering.value)
982
983
        if alert_ids:
984
            if isinstance(alert_ids, str):
985
                deprecation(
986
                    "Please pass a list as alert_ids parameter to {}. "
987
                    "Passing a string is deprecated and will be removed in "
988
                    "future.".format(function)
989
                )
990
991
                # if a single id is given as a string wrap it into a list
992
                alert_ids = [alert_ids]
993
            if _is_list_like(alert_ids):
994
                # parse all given alert id's
995
                for alert in alert_ids:
996
                    cmd.add_element("alert", attrs={"id": str(alert)})
997
998
        if schedule_id:
999
            cmd.add_element("schedule", attrs={"id": schedule_id})
1000
1001
            if schedule_periods is not None:
1002
                if (
1003
                    not isinstance(schedule_periods, numbers.Integral)
1004
                    or schedule_periods < 0
1005
                ):
1006
                    raise InvalidArgument(
1007
                        "schedule_periods must be an integer greater or equal "
1008
                        "than 0"
1009
                    )
1010
                cmd.add_element("schedule_periods", str(schedule_periods))
1011
1012
        if observers is not None:
1013
            if not _is_list_like(observers):
1014
                raise InvalidArgumentType(
1015
                    function=function, argument='observers', arg_type='list'
1016
                )
1017
1018
            # gvmd splits by comma and space
1019
            # gvmd tries to lookup each value as user name and afterwards as
1020
            # user id. So both user name and user id are possible
1021
            cmd.add_element("observers", _to_comma_list(observers))
1022
1023
        if preferences is not None:
1024
            if not isinstance(preferences, collections.abc.Mapping):
1025
                raise InvalidArgumentType(
1026
                    function=function,
1027
                    argument='preferences',
1028
                    arg_type=collections.abc.Mapping.__name__,
1029
                )
1030
1031
            _xmlprefs = cmd.add_element("preferences")
1032
            for pref_name, pref_value in preferences.items():
1033
                _xmlpref = _xmlprefs.add_element("preference")
1034
                _xmlpref.add_element("scanner_name", pref_name)
1035
                _xmlpref.add_element("value", str(pref_value))
1036
1037
        return self._send_xml_command(cmd)
1038
1039
    def __create_config(
1040
        self, config_id: str, name: str, usage_type: UsageType, function: str
1041
    ) -> Any:
1042
        if not name:
1043
            raise RequiredArgument(function=function, argument='name')
1044
1045
        if not config_id:
1046
            raise RequiredArgument(function=function, argument='config_id')
1047
1048
        cmd = XmlCommand("create_config")
1049
        cmd.add_element("copy", config_id)
1050
        cmd.add_element("name", name)
1051
        cmd.add_element("usage_type", usage_type.value)
1052
        return self._send_xml_command(cmd)
1053
1054 View Code Duplication
    def __get_configs(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1055
        self,
1056
        usage_type: UsageType,
1057
        *,
1058
        filter: Optional[str] = None,
1059
        filter_id: Optional[str] = None,
1060
        trash: Optional[bool] = None,
1061
        details: Optional[bool] = None,
1062
        families: Optional[bool] = None,
1063
        preferences: Optional[bool] = None,
1064
        tasks: Optional[bool] = None
1065
    ) -> Any:
1066
        cmd = XmlCommand("get_configs")
1067
        cmd.set_attribute("usage_type", usage_type.value)
1068
1069
        _add_filter(cmd, filter, filter_id)
1070
1071
        if trash is not None:
1072
            cmd.set_attribute("trash", _to_bool(trash))
1073
1074
        if details is not None:
1075
            cmd.set_attribute("details", _to_bool(details))
1076
1077
        if families is not None:
1078
            cmd.set_attribute("families", _to_bool(families))
1079
1080
        if preferences is not None:
1081
            cmd.set_attribute("preferences", _to_bool(preferences))
1082
1083
        if tasks is not None:
1084
            cmd.set_attribute("tasks", _to_bool(tasks))
1085
1086
        return self._send_xml_command(cmd)
1087
1088
    def __get_config(self, config_id: str, usage_type: UsageType) -> Any:
1089
        if not config_id:
1090
            raise RequiredArgument(
1091
                function=self.get_config.__name__, argument='config_id'
1092
            )
1093
1094
        cmd = XmlCommand("get_configs")
1095
        cmd.set_attribute("config_id", config_id)
1096
        cmd.set_attribute("usage_type", usage_type.value)
1097
1098
        # for single entity always request all details
1099
        cmd.set_attribute("details", "1")
1100
1101
        return self._send_xml_command(cmd)
1102
1103
    def __get_tasks(
1104
        self,
1105
        usage_type: UsageType,
1106
        *,
1107
        filter: Optional[str] = None,
1108
        filter_id: Optional[str] = None,
1109
        trash: Optional[bool] = None,
1110
        details: Optional[bool] = None,
1111
        schedules_only: Optional[bool] = None
1112
    ) -> Any:
1113
        cmd = XmlCommand("get_tasks")
1114
        cmd.set_attribute("usage_type", usage_type.value)
1115
1116
        _add_filter(cmd, filter, filter_id)
1117
1118
        if trash is not None:
1119
            cmd.set_attribute("trash", _to_bool(trash))
1120
1121
        if details is not None:
1122
            cmd.set_attribute("details", _to_bool(details))
1123
1124
        if schedules_only is not None:
1125
            cmd.set_attribute("schedules_only", _to_bool(schedules_only))
1126
1127
        return self._send_xml_command(cmd)
1128
1129
    def __get_task(self, task_id: str, usage_type: UsageType) -> Any:
1130
        if not task_id:
1131
            raise RequiredArgument(
1132
                function=self.get_task.__name__, argument='task_id'
1133
            )
1134
1135
        cmd = XmlCommand("get_tasks")
1136
        cmd.set_attribute("task_id", task_id)
1137
        cmd.set_attribute("usage_type", usage_type.value)
1138
1139
        # for single entity always request all details
1140
        cmd.set_attribute("details", "1")
1141
        return self._send_xml_command(cmd)
1142