Passed
Pull Request — master (#297)
by Jaspar
01:19
created

gvm.protocols.gmpv9.gmpv9._check_event()   F

Complexity

Conditions 16

Size

Total Lines 87
Code Lines 62

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 16
eloc 62
nop 3
dl 0
loc 87
rs 2.4
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like gvm.protocols.gmpv9.gmpv9._check_event() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 - 2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
# pylint: disable=arguments-differ, redefined-builtin, too-many-lines
20
21
"""
22
Module for communication with gvmd in `Greenbone Management Protocol version 9`_
23
24
.. _Greenbone Management Protocol version 9:
25
    https://docs.greenbone.net/API/GMP/gmp-9.0.html
26
"""
27
import collections
28
import numbers
29
30
from typing import Any, List, Optional, Callable
31
32
from gvm.errors import InvalidArgument, InvalidArgumentType, RequiredArgument
33
from gvm.utils import deprecation
34
from gvm.xml import XmlCommand
35
36
from gvm.protocols.base import GvmProtocol
37
from gvm.connections import GvmConnection
38
from gvm.protocols.gmpv7.gmpv7 import (
39
    _to_bool,
40
    _add_filter,
41
    _is_list_like,
42
    _to_comma_list,
43
)
44
45
from . import types
46
from .types import *  # pylint: disable=unused-wildcard-import, wildcard-import
47
from .types import _UsageType as UsageType
48
49
_EMPTY_POLICY_ID = '085569ce-73ed-11df-83c3-002264764cea'
50
51
52
def _check_event(
53
    event: AlertEvent, condition: AlertCondition, method: AlertMethod
54
):
55
    if event == AlertEvent.TASK_RUN_STATUS_CHANGED:
56
        if not condition:
57
            raise RequiredArgument(
58
                "condition is required for event {}".format(event.name)
59
            )
60
61
        if not method:
62
            raise RequiredArgument(
63
                "method is required for event {}".format(event.name)
64
            )
65
66
        if condition not in (
67
            AlertCondition.ALWAYS,
68
            AlertCondition.FILTER_COUNT_CHANGED,
69
            AlertCondition.FILTER_COUNT_AT_LEAST,
70
            AlertCondition.SEVERITY_AT_LEAST,
71
            AlertCondition.SEVERITY_CHANGED,
72
        ):
73
            raise InvalidArgument(
74
                "Invalid condition {} for event {}".format(
75
                    condition.name, event.name
76
                )
77
            )
78
    elif event in (
79
        AlertEvent.NEW_SECINFO_ARRIVED,
80
        AlertEvent.UPDATED_SECINFO_ARRIVED,
81
    ):
82
        if not condition:
83
            raise RequiredArgument(
84
                "condition is required for event {}".format(event.name)
85
            )
86
87
        if not method:
88
            raise RequiredArgument(
89
                "method is required for event {}".format(event.name)
90
            )
91
92
        if condition != AlertCondition.ALWAYS:
93
            raise InvalidArgument(
94
                "Invalid condition {} for event {}".format(
95
                    condition.name, event.name
96
                )
97
            )
98
        if method not in (
99
            AlertMethod.SCP,
100
            AlertMethod.SEND,
101
            AlertMethod.SMB,
102
            AlertMethod.SNMP,
103
            AlertMethod.SYSLOG,
104
            AlertMethod.EMAIL,
105
        ):
106
            raise InvalidArgument(
107
                "Invalid method {} for event {}".format(method.name, event.name)
108
            )
109
    elif event in (
110
        AlertEvent.TICKET_RECEIVED,
111
        AlertEvent.OWNED_TICKET_CHANGED,
112
        AlertEvent.ASSIGNED_TICKET_CHANGED,
113
    ):
114
        if not condition:
115
            raise RequiredArgument(
116
                "condition is required for event {}".format(event.name)
117
            )
118
119
        if not method:
120
            raise RequiredArgument(
121
                "method is required for event {}".format(event.name)
122
            )
123
        if condition != AlertCondition.ALWAYS:
124
            raise InvalidArgument(
125
                "Invalid condition {} for event {}".format(
126
                    condition.name, event.name
127
                )
128
            )
129
        if method not in (
130
            AlertMethod.EMAIL,
131
            AlertMethod.START_TASK,
132
            AlertMethod.SYSLOG,
133
        ):
134
            raise InvalidArgument(
135
                "Invalid method {} for event {}".format(method.name, event.name)
136
            )
137
    elif event is not None:
138
        raise InvalidArgument('Invalid event "{}"'.format(event.name))
139
140
141
class GmpV9Mixin(GvmProtocol):
142
143
    types = types
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable types does not seem to be defined.
Loading history...
144
145
    def __init__(
146
        self,
147
        connection: GvmConnection,
148
        *,
149
        transform: Optional[Callable[[str], Any]] = None
150
    ):
151
        super().__init__(connection, transform=transform)
152
153
        # Is authenticated on gvmd
154
        self._authenticated = False
155
156 View Code Duplication
    def create_alert(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
157
        self,
158
        name: str,
159
        condition: AlertCondition,
160
        event: AlertEvent,
161
        method: AlertMethod,
162
        *,
163
        method_data: Optional[dict] = None,
164
        event_data: Optional[dict] = None,
165
        condition_data: Optional[dict] = None,
166
        filter_id: Optional[int] = None,
167
        comment: Optional[str] = None
168
    ) -> Any:
169
        """Create a new alert
170
171
        Arguments:
172
            name: Name of the new Alert
173
            condition: The condition that must be satisfied for the alert
174
                to occur; if the event is either 'Updated SecInfo arrived' or
175
                'New SecInfo arrived', condition must be 'Always'. Otherwise,
176
                condition can also be on of 'Severity at least', 'Filter count
177
                changed' or 'Filter count at least'.
178
            event: The event that must happen for the alert to occur, one
179
                of 'Task run status changed', 'Updated SecInfo arrived' or 'New
180
                SecInfo arrived'
181
            method: The method by which the user is alerted, one of 'SCP',
182
                'Send', 'SMB', 'SNMP', 'Syslog' or 'Email'; if the event is
183
                neither 'Updated SecInfo arrived' nor 'New SecInfo arrived',
184
                method can also be one of 'Start Task', 'HTTP Get', 'Sourcefire
185
                Connector' or 'verinice Connector'.
186
            condition_data: Data that defines the condition
187
            event_data: Data that defines the event
188
            method_data: Data that defines the method
189
            filter_id: Filter to apply when executing alert
190
            comment: Comment for the alert
191
192
        Returns:
193
            The response. See :py:meth:`send_command` for details.
194
        """
195
        if not name:
196
            raise RequiredArgument(
197
                function=self.create_alert.__name__, argument='name'
198
            )
199
200
        if not condition:
201
            raise RequiredArgument(
202
                function=self.create_alert.__name__, argument='condition'
203
            )
204
205
        if not event:
206
            raise RequiredArgument(
207
                function=self.create_alert.__name__, argument='event'
208
            )
209
210
        if not method:
211
            raise RequiredArgument(
212
                function=self.create_alert.__name__, argument='method'
213
            )
214
215
        if not isinstance(condition, AlertCondition):
216
            raise InvalidArgumentType(
217
                function=self.create_alert.__name__,
218
                argument='condition',
219
                arg_type=AlertCondition.__name__,
220
            )
221
222
        if not isinstance(event, AlertEvent):
223
            raise InvalidArgumentType(
224
                function=self.create_alert.__name__,
225
                argument='even',
226
                arg_type=AlertEvent.__name__,
227
            )
228
229
        if not isinstance(method, AlertMethod):
230
            raise InvalidArgumentType(
231
                function=self.create_alert.__name__,
232
                argument='method',
233
                arg_type=AlertMethod.__name__,
234
            )
235
236
        _check_event(event, condition, method)
237
238
        cmd = XmlCommand("create_alert")
239
        cmd.add_element("name", name)
240
241
        conditions = cmd.add_element("condition", condition.value)
242
243
        if condition_data is not None:
244
            for key, value in condition_data.items():
245
                _data = conditions.add_element("data", value)
246
                _data.add_element("name", key)
247
248
        events = cmd.add_element("event", event.value)
249
250
        if event_data is not None:
251
            for key, value in event_data.items():
252
                _data = events.add_element("data", value)
253
                _data.add_element("name", key)
254
255
        methods = cmd.add_element("method", method.value)
256
257
        if method_data is not None:
258
            for key, value in method_data.items():
259
                _data = methods.add_element("data", value)
260
                _data.add_element("name", key)
261
262
        if filter_id:
263
            cmd.add_element("filter", attrs={"id": filter_id})
264
265
        if comment:
266
            cmd.add_element("comment", comment)
267
268
        return self._send_xml_command(cmd)
269
270 View Code Duplication
    def create_audit(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
271
        self,
272
        name: str,
273
        policy_id: str,
274
        target_id: str,
275
        scanner_id: str,
276
        *,
277
        alterable: Optional[bool] = None,
278
        hosts_ordering: Optional[HostsOrdering] = None,
279
        schedule_id: Optional[str] = None,
280
        alert_ids: Optional[List[str]] = None,
281
        comment: Optional[str] = None,
282
        schedule_periods: Optional[int] = None,
283
        observers: Optional[List[str]] = None,
284
        preferences: Optional[dict] = None
285
    ) -> Any:
286
        """Create a new audit task
287
288
        Arguments:
289
            name: Name of the new audit
290
            policy_id: UUID of policy to use by the audit
291
            target_id: UUID of target to be scanned
292
            scanner_id: UUID of scanner to use for scanning the target
293
            comment: Comment for the audit
294
            alterable: Whether the task should be alterable
295
            alert_ids: List of UUIDs for alerts to be applied to the audit
296
            hosts_ordering: The order hosts are scanned in
297
            schedule_id: UUID of a schedule when the audit should be run.
298
            schedule_periods: A limit to the number of times the audit will be
299
                scheduled, or 0 for no limit
300
            observers: List of names or ids of users which should be allowed to
301
                observe this audit
302
            preferences: Name/Value pairs of scanner preferences.
303
304
        Returns:
305
            The response. See :py:meth:`send_command` for details.
306
        """
307
308
        return self.__create_task(
309
            name=name,
310
            config_id=policy_id,
311
            target_id=target_id,
312
            scanner_id=scanner_id,
313
            usage_type=UsageType.AUDIT,
314
            function=self.create_audit.__name__,
315
            alterable=alterable,
316
            hosts_ordering=hosts_ordering,
317
            schedule_id=schedule_id,
318
            alert_ids=alert_ids,
319
            comment=comment,
320
            schedule_periods=schedule_periods,
321
            observers=observers,
322
            preferences=preferences,
323
        )
324
325
    def create_config(
326
        self, config_id: str, name: str, *, comment: Optional[str] = None
327
    ) -> Any:
328
        """Create a new scan config
329
330
        Arguments:
331
            config_id: UUID of the existing scan config
332
            name: Name of the new scan config
333
            comment: A comment on the config
334
335
        Returns:
336
            The response. See :py:meth:`send_command` for details.
337
        """
338
        return self.__create_config(
339
            config_id=config_id,
340
            name=name,
341
            comment=comment,
342
            usage_type=UsageType.SCAN,
343
            function=self.create_config.__name__,
344
        )
345
346
    def create_policy(
347
        self, name: str, *, policy_id: str = None, comment: Optional[str] = None
348
    ) -> Any:
349
        """Create a new policy config
350
351
        Arguments:
352
            name: Name of the new policy
353
            policy_id: UUID of an existing policy as base. By default the empty
354
                policy is used.
355
            comment: A comment on the policy
356
357
        Returns:
358
            The response. See :py:meth:`send_command` for details.
359
        """
360
        if policy_id is None:
361
            policy_id = _EMPTY_POLICY_ID
362
        return self.__create_config(
363
            config_id=policy_id,
364
            name=name,
365
            comment=comment,
366
            usage_type=UsageType.POLICY,
367
            function=self.create_policy.__name__,
368
        )
369
370 View Code Duplication
    def create_task(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
371
        self,
372
        name: str,
373
        config_id: str,
374
        target_id: str,
375
        scanner_id: str,
376
        *,
377
        alterable: Optional[bool] = None,
378
        hosts_ordering: Optional[HostsOrdering] = None,
379
        schedule_id: Optional[str] = None,
380
        alert_ids: Optional[List[str]] = None,
381
        comment: Optional[str] = None,
382
        schedule_periods: Optional[int] = None,
383
        observers: Optional[List[str]] = None,
384
        preferences: Optional[dict] = None
385
    ) -> Any:
386
        """Create a new scan task
387
388
        Arguments:
389
            name: Name of the task
390
            config_id: UUID of scan config to use by the task
391
            target_id: UUID of target to be scanned
392
            scanner_id: UUID of scanner to use for scanning the target
393
            comment: Comment for the task
394
            alterable: Whether the task should be alterable
395
            alert_ids: List of UUIDs for alerts to be applied to the task
396
            hosts_ordering: The order hosts are scanned in
397
            schedule_id: UUID of a schedule when the task should be run.
398
            schedule_periods: A limit to the number of times the task will be
399
                scheduled, or 0 for no limit
400
            observers: List of names or ids of users which should be allowed to
401
                observe this task
402
            preferences: Name/Value pairs of scanner preferences.
403
404
        Returns:
405
            The response. See :py:meth:`send_command` for details.
406
        """
407
        return self.__create_task(
408
            name=name,
409
            config_id=config_id,
410
            target_id=target_id,
411
            scanner_id=scanner_id,
412
            usage_type=UsageType.SCAN,
413
            function=self.create_task.__name__,
414
            alterable=alterable,
415
            hosts_ordering=hosts_ordering,
416
            schedule_id=schedule_id,
417
            alert_ids=alert_ids,
418
            comment=comment,
419
            schedule_periods=schedule_periods,
420
            observers=observers,
421
            preferences=preferences,
422
        )
423
424 View Code Duplication
    def create_tls_certificate(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
425
        self,
426
        name: str,
427
        certificate: str,
428
        *,
429
        comment: Optional[str] = None,
430
        trust: Optional[bool] = None
431
    ) -> Any:
432
        """Create a new TLS certificate
433
434
        Arguments:
435
            name: Name of the TLS certificate, defaulting to the MD5
436
                fingerprint.
437
            certificate: The Base64 encoded certificate data (x.509 DER or PEM).
438
            comment: Comment for the TLS certificate.
439
            trust: Whether the certificate is trusted.
440
441
        Returns:
442
            The response. See :py:meth:`send_command` for details.
443
        """
444
        if not name:
445
            raise RequiredArgument(
446
                function=self.create_tls_certificate.__name__, argument='name'
447
            )
448
        if not certificate:
449
            raise RequiredArgument(
450
                function=self.create_tls_certificate.__name__,
451
                argument='certificate',
452
            )
453
454
        cmd = XmlCommand("create_tls_certificate")
455
456
        if comment:
457
            cmd.add_element("comment", comment)
458
459
        cmd.add_element("name", name)
460
        cmd.add_element("certificate", certificate)
461
462
        if trust:
463
            cmd.add_element("trust", _to_bool(trust))
464
465
        return self._send_xml_command(cmd)
466
467 View Code Duplication
    def get_tls_certificates(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
468
        self,
469
        *,
470
        filter: Optional[str] = None,
471
        filter_id: Optional[str] = None,
472
        include_certificate_data: Optional[bool] = None,
473
        details: Optional[bool] = None
474
    ) -> Any:
475
        """Request a list of TLS certificates
476
477
        Arguments:
478
            filter: Filter term to use for the query
479
            filter_id: UUID of an existing filter to use for the query
480
            include_certificate_data: Whether to include the certificate data in
481
                the response
482
483
        Returns:
484
            The response. See :py:meth:`send_command` for details.
485
        """
486
487
        cmd = XmlCommand("get_tls_certificates")
488
489
        _add_filter(cmd, filter, filter_id)
490
491
        if details is not None:
492
            cmd.set_attribute("details", _to_bool(details))
493
494
        if include_certificate_data is not None:
495
            cmd.set_attribute(
496
                "include_certificate_data", _to_bool(include_certificate_data)
497
            )
498
499
        return self._send_xml_command(cmd)
500
501
    def get_tls_certificate(self, tls_certificate_id: str) -> Any:
502
        """Request a single TLS certificate
503
504
        Arguments:
505
            tls_certificate_id: UUID of an existing TLS certificate
506
507
        Returns:
508
            The response. See :py:meth:`send_command` for details.
509
        """
510
        cmd = XmlCommand("get_tls_certificates")
511
512
        if not tls_certificate_id:
513
            raise RequiredArgument(
514
                function=self.get_tls_certificate.__name__,
515
                argument='tls_certificate_id',
516
            )
517
518
        cmd.set_attribute("tls_certificate_id", tls_certificate_id)
519
520
        # for single tls certificate always request cert data
521
        cmd.set_attribute("include_certificate_data", "1")
522
523
        # for single entity always request all details
524
        cmd.set_attribute("details", "1")
525
526
        return self._send_xml_command(cmd)
527
528 View Code Duplication
    def modify_alert(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
529
        self,
530
        alert_id: str,
531
        *,
532
        name: Optional[str] = None,
533
        comment: Optional[str] = None,
534
        filter_id: Optional[str] = None,
535
        event: Optional[AlertEvent] = None,
536
        event_data: Optional[dict] = None,
537
        condition: Optional[AlertCondition] = None,
538
        condition_data: Optional[dict] = None,
539
        method: Optional[AlertMethod] = None,
540
        method_data: Optional[dict] = None
541
    ) -> Any:
542
        """Modifies an existing alert.
543
544
        Arguments:
545
            alert_id: UUID of the alert to be modified.
546
            name: Name of the Alert.
547
            condition: The condition that must be satisfied for the alert to
548
                occur. If the event is either 'Updated SecInfo
549
                arrived' or 'New SecInfo arrived', condition must be 'Always'.
550
                Otherwise, condition can also be on of 'Severity at least',
551
                'Filter count changed' or 'Filter count at least'.
552
            condition_data: Data that defines the condition
553
            event: The event that must happen for the alert to occur, one of
554
                'Task run status changed', 'Updated SecInfo arrived' or
555
                'New SecInfo arrived'
556
            event_data: Data that defines the event
557
            method: The method by which the user is alerted, one of 'SCP',
558
                'Send', 'SMB', 'SNMP', 'Syslog' or 'Email';
559
                if the event is neither 'Updated SecInfo arrived' nor
560
                'New SecInfo arrived', method can also be one of 'Start Task',
561
                'HTTP Get', 'Sourcefire Connector' or 'verinice Connector'.
562
            method_data: Data that defines the method
563
            filter_id: Filter to apply when executing alert
564
            comment: Comment for the alert
565
566
        Returns:
567
            The response. See :py:meth:`send_command` for details.
568
        """
569
570
        if not alert_id:
571
            raise RequiredArgument(
572
                function=self.modify_alert.__name__, argument='alert_id'
573
            )
574
575
        cmd = XmlCommand("modify_alert")
576
        cmd.set_attribute("alert_id", str(alert_id))
577
578
        if name:
579
            cmd.add_element("name", name)
580
581
        if comment:
582
            cmd.add_element("comment", comment)
583
584
        if filter_id:
585
            cmd.add_element("filter", attrs={"id": filter_id})
586
587
        if condition:
588
            if not isinstance(condition, AlertCondition):
589
                raise InvalidArgumentType(
590
                    function=self.modify_alert.__name__,
591
                    argument='condition',
592
                    arg_type=AlertCondition.__name__,
593
                )
594
595
            conditions = cmd.add_element("condition", condition.value)
596
597
            if condition_data is not None:
598
                for key, value in condition_data.items():
599
                    _data = conditions.add_element("data", value)
600
                    _data.add_element("name", key)
601
602
        if method:
603
            if not isinstance(method, AlertMethod):
604
                raise InvalidArgumentType(
605
                    function=self.modify_alert.__name__,
606
                    argument='method',
607
                    arg_type=AlertMethod.__name__,
608
                )
609
610
            methods = cmd.add_element("method", method.value)
611
612
            if method_data is not None:
613
                for key, value in method_data.items():
614
                    _data = methods.add_element("data", value)
615
                    _data.add_element("name", key)
616
617
        if event:
618
            if not isinstance(event, AlertEvent):
619
                raise InvalidArgumentType(
620
                    function=self.modify_alert.__name__,
621
                    argument='event',
622
                    arg_type=AlertEvent.__name__,
623
                )
624
625
            _check_event(event, condition, method)
626
627
            events = cmd.add_element("event", event.value)
628
629
            if event_data is not None:
630
                for key, value in event_data.items():
631
                    _data = events.add_element("data", value)
632
                    _data.add_element("name", key)
633
634
        return self._send_xml_command(cmd)
635
636 View Code Duplication
    def modify_tls_certificate(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
637
        self,
638
        tls_certificate_id: str,
639
        *,
640
        name: Optional[str] = None,
641
        comment: Optional[str] = None,
642
        trust: Optional[bool] = None
643
    ) -> Any:
644
        """Modifies an existing TLS certificate.
645
646
        Arguments:
647
            tls_certificate_id: UUID of the TLS certificate to be modified.
648
            name: Name of the TLS certificate, defaulting to the MD5 fingerprint
649
            comment: Comment for the TLS certificate.
650
            trust: Whether the certificate is trusted.
651
652
        Returns:
653
            The response. See :py:meth:`send_command` for details.
654
        """
655
        if not tls_certificate_id:
656
            raise RequiredArgument(
657
                function=self.modify_tls_certificate.__name__,
658
                argument='tls_certificate_id',
659
            )
660
661
        cmd = XmlCommand("modify_tls_certificate")
662
        cmd.set_attribute("tls_certificate_id", str(tls_certificate_id))
663
664
        if comment:
665
            cmd.add_element("comment", comment)
666
667
        if name:
668
            cmd.add_element("name", name)
669
670
        if trust:
671
            cmd.add_element("trust", _to_bool(trust))
672
673
        return self._send_xml_command(cmd)
674
675
    def clone_tls_certificate(self, tls_certificate_id: str) -> Any:
676
        """Modifies an existing TLS certificate.
677
678
        Arguments:
679
            tls_certificate_id: The UUID of an existing TLS certificate
680
681
        Returns:
682
            The response. See :py:meth:`send_command` for details.
683
        """
684
        if not tls_certificate_id:
685
            raise RequiredArgument(
686
                function=self.modify_tls_certificate.__name__,
687
                argument='tls_certificate_id',
688
            )
689
690
        cmd = XmlCommand("create_tls_certificate")
691
692
        cmd.add_element("copy", tls_certificate_id)
693
694
        return self._send_xml_command(cmd)
695
696
    def get_configs(
697
        self,
698
        *,
699
        filter: Optional[str] = None,
700
        filter_id: Optional[str] = None,
701
        trash: Optional[bool] = None,
702
        details: Optional[bool] = None,
703
        families: Optional[bool] = None,
704
        preferences: Optional[bool] = None,
705
        tasks: Optional[bool] = None
706
    ) -> Any:
707
        """Request a list of scan configs
708
709
        Arguments:
710
            filter: Filter term to use for the query
711
            filter_id: UUID of an existing filter to use for the query
712
            trash: Whether to get the trashcan scan configs instead
713
            details: Whether to get config families, preferences, nvt selectors
714
                and tasks.
715
            families: Whether to include the families if no details are
716
                requested
717
            preferences: Whether to include the preferences if no details are
718
                requested
719
            tasks: Whether to get tasks using this config
720
721
        Returns:
722
            The response. See :py:meth:`send_command` for details.
723
        """
724
        return self.__get_configs(
725
            UsageType.SCAN,
726
            filter=filter,
727
            filter_id=filter_id,
728
            trash=trash,
729
            details=details,
730
            families=families,
731
            preferences=preferences,
732
            tasks=tasks,
733
        )
734
735
    def get_policies(
736
        self,
737
        *,
738
        audits: Optional[bool] = None,
739
        filter: Optional[str] = None,
740
        filter_id: Optional[str] = None,
741
        details: Optional[bool] = None,
742
        families: Optional[bool] = None,
743
        preferences: Optional[bool] = None,
744
        trash: Optional[bool] = None
745
    ) -> Any:
746
        """Request a list of policies
747
748
        Arguments:
749
            audits: Whether to get audits using the policy
750
            filter: Filter term to use for the query
751
            filter_id: UUID of an existing filter to use for the query
752
            details: Whether to get  families, preferences, nvt selectors
753
                and tasks.
754
            families: Whether to include the families if no details are
755
                requested
756
            preferences: Whether to include the preferences if no details are
757
                requested
758
            trash: Whether to get the trashcan audits instead
759
760
        Returns:
761
            The response. See :py:meth:`send_command` for details.
762
        """
763
        return self.__get_configs(
764
            UsageType.POLICY,
765
            filter=filter,
766
            filter_id=filter_id,
767
            details=details,
768
            families=families,
769
            preferences=preferences,
770
            tasks=audits,
771
            trash=trash,
772
        )
773
774
    def get_config(
775
        self, config_id: str, *, tasks: Optional[bool] = None
776
    ) -> Any:
777
        """Request a single scan config
778
779
        Arguments:
780
            config_id: UUID of an existing scan config
781
            tasks: Whether to get tasks using this config
782
783
        Returns:
784
            The response. See :py:meth:`send_command` for details.
785
        """
786
        return self.__get_config(
787
            config_id=config_id, usage_type=UsageType.SCAN, tasks=tasks
788
        )
789
790
    def get_policy(self, policy_id: str) -> Any:
791
        """Request a single policy
792
793
        Arguments:
794
            policy_id: UUID of an existing policy
795
796
        Returns:
797
            The response. See :py:meth:`send_command` for details.
798
        """
799
        return self.__get_config(policy_id, UsageType.POLICY)
800
801
    def get_tasks(
802
        self,
803
        *,
804
        filter: Optional[str] = None,
805
        filter_id: Optional[str] = None,
806
        trash: Optional[bool] = None,
807
        details: Optional[bool] = None,
808
        schedules_only: Optional[bool] = None
809
    ) -> Any:
810
        """Request a list of tasks
811
812
        Arguments:
813
            filter: Filter term to use for the query
814
            filter_id: UUID of an existing filter to use for the query
815
            trash: Whether to get the trashcan tasks instead
816
            details: Whether to include full task details
817
            schedules_only: Whether to only include id, name and schedule
818
                details
819
820
        Returns:
821
            The response. See :py:meth:`send_command` for details.
822
        """
823
        return self.__get_tasks(
824
            UsageType.SCAN,
825
            filter=filter,
826
            filter_id=filter_id,
827
            trash=trash,
828
            details=details,
829
            schedules_only=schedules_only,
830
        )
831
832
    def get_audits(
833
        self,
834
        *,
835
        filter: Optional[str] = None,
836
        filter_id: Optional[str] = None,
837
        trash: Optional[bool] = None,
838
        details: Optional[bool] = None,
839
        schedules_only: Optional[bool] = None
840
    ) -> Any:
841
        """Request a list of audits
842
843
        Arguments:
844
            filter: Filter term to use for the query
845
            filter_id: UUID of an existing filter to use for the query
846
            trash: Whether to get the trashcan audits instead
847
            details: Whether to include full audit details
848
            schedules_only: Whether to only include id, name and schedule
849
                details
850
851
        Returns:
852
            The response. See :py:meth:`send_command` for details.
853
        """
854
        return self.__get_tasks(
855
            UsageType.AUDIT,
856
            filter=filter,
857
            filter_id=filter_id,
858
            trash=trash,
859
            details=details,
860
            schedules_only=schedules_only,
861
        )
862
863
    def get_task(self, task_id: str) -> Any:
864
        """Request a single task
865
866
        Arguments:
867
            task_id: UUID of an existing task
868
869
        Returns:
870
            The response. See :py:meth:`send_command` for details.
871
        """
872
        return self.__get_task(task_id, UsageType.SCAN)
873
874
    def get_audit(self, audit_id: str) -> Any:
875
        """Request a single audit
876
877
        Arguments:
878
            audit_id: UUID of an existing audit
879
880
        Returns:
881
            The response. See :py:meth:`send_command` for details.
882
        """
883
        return self.__get_task(audit_id, UsageType.AUDIT)
884
885
    def clone_audit(self, audit_id: str) -> Any:
886
        """Clone an existing audit
887
888
        Arguments:
889
            audit_id: UUID of existing audit to clone from
890
891
        Returns:
892
            The response. See :py:meth:`send_command` for details.
893
        """
894
        if not audit_id:
895
            raise RequiredArgument(
896
                function=self.clone_audit.__name__, argument='audit_id'
897
            )
898
899
        cmd = XmlCommand("create_task")
900
        cmd.add_element("copy", audit_id)
901
        return self._send_xml_command(cmd)
902
903
    def clone_policy(self, policy_id: str) -> Any:
904
        """Clone a policy from an existing one
905
906
        Arguments:
907
            policy_id: UUID of the existing policy
908
909
        Returns:
910
            The response. See :py:meth:`send_command` for details.
911
        """
912
        if not policy_id:
913
            raise RequiredArgument(
914
                function=self.clone_policy.__name__, argument='policy_id'
915
            )
916
917
        cmd = XmlCommand("create_config")
918
        cmd.add_element("copy", policy_id)
919
        return self._send_xml_command(cmd)
920
921
    def delete_audit(
922
        self, audit_id: str, *, ultimate: Optional[bool] = False
923
    ) -> Any:
924
        """Deletes an existing audit
925
926
        Arguments:
927
            audit_id: UUID of the audit to be deleted.
928
            ultimate: Whether to remove entirely, or to the trashcan.
929
        """
930
        if not audit_id:
931
            raise RequiredArgument(
932
                function=self.delete_audit.__name__, argument='audit_id'
933
            )
934
935
        cmd = XmlCommand("delete_task")
936
        cmd.set_attribute("task_id", audit_id)
937
        cmd.set_attribute("ultimate", _to_bool(ultimate))
938
939
        return self._send_xml_command(cmd)
940
941
    def delete_policy(
942
        self, policy_id: str, *, ultimate: Optional[bool] = False
943
    ) -> Any:
944
        """Deletes an existing policy
945
946
        Arguments:
947
            policy_id: UUID of the policy to be deleted.
948
            ultimate: Whether to remove entirely, or to the trashcan.
949
        """
950
        if not policy_id:
951
            raise RequiredArgument(
952
                function=self.delete_policy.__name__, argument='policy_id'
953
            )
954
955
        cmd = XmlCommand("delete_config")
956
        cmd.set_attribute("config_id", policy_id)
957
        cmd.set_attribute("ultimate", _to_bool(ultimate))
958
959
        return self._send_xml_command(cmd)
960
961 View Code Duplication
    def __create_task(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
962
        self,
963
        name: str,
964
        config_id: str,
965
        target_id: str,
966
        scanner_id: str,
967
        usage_type: UsageType,
968
        function: str,
969
        *,
970
        alterable: Optional[bool] = None,
971
        hosts_ordering: Optional[HostsOrdering] = None,
972
        schedule_id: Optional[str] = None,
973
        alert_ids: Optional[List[str]] = None,
974
        comment: Optional[str] = None,
975
        schedule_periods: Optional[int] = None,
976
        observers: Optional[List[str]] = None,
977
        preferences: Optional[dict] = None
978
    ) -> Any:
979
        if not name:
980
            raise RequiredArgument(function=function, argument='name')
981
982
        if not config_id:
983
            raise RequiredArgument(function=function, argument='config_id')
984
985
        if not target_id:
986
            raise RequiredArgument(function=function, argument='target_id')
987
988
        if not scanner_id:
989
            raise RequiredArgument(function=function, argument='scanner_id')
990
991
        # don't allow to create a container task with create_task
992
        if target_id == '0':
993
            raise InvalidArgument(function=function, argument='target_id')
994
995
        cmd = XmlCommand("create_task")
996
        cmd.add_element("name", name)
997
        cmd.add_element("usage_type", usage_type.value)
998
        cmd.add_element("config", attrs={"id": config_id})
999
        cmd.add_element("target", attrs={"id": target_id})
1000
        cmd.add_element("scanner", attrs={"id": scanner_id})
1001
1002
        if comment:
1003
            cmd.add_element("comment", comment)
1004
1005
        if alterable is not None:
1006
            cmd.add_element("alterable", _to_bool(alterable))
1007
1008
        if hosts_ordering:
1009
            if not isinstance(hosts_ordering, self.types.HostsOrdering):
1010
                raise InvalidArgumentType(
1011
                    function=function,
1012
                    argument='hosts_ordering',
1013
                    arg_type=HostsOrdering.__name__,
1014
                )
1015
            cmd.add_element("hosts_ordering", hosts_ordering.value)
1016
1017
        if alert_ids:
1018
            if isinstance(alert_ids, str):
1019
                deprecation(
1020
                    "Please pass a list as alert_ids parameter to {}. "
1021
                    "Passing a string is deprecated and will be removed in "
1022
                    "future.".format(function)
1023
                )
1024
1025
                # if a single id is given as a string wrap it into a list
1026
                alert_ids = [alert_ids]
1027
            if _is_list_like(alert_ids):
1028
                # parse all given alert id's
1029
                for alert in alert_ids:
1030
                    cmd.add_element("alert", attrs={"id": str(alert)})
1031
1032
        if schedule_id:
1033
            cmd.add_element("schedule", attrs={"id": schedule_id})
1034
1035
            if schedule_periods is not None:
1036
                if (
1037
                    not isinstance(schedule_periods, numbers.Integral)
1038
                    or schedule_periods < 0
1039
                ):
1040
                    raise InvalidArgument(
1041
                        "schedule_periods must be an integer greater or equal "
1042
                        "than 0"
1043
                    )
1044
                cmd.add_element("schedule_periods", str(schedule_periods))
1045
1046
        if observers is not None:
1047
            if not _is_list_like(observers):
1048
                raise InvalidArgumentType(
1049
                    function=function, argument='observers', arg_type='list'
1050
                )
1051
1052
            # gvmd splits by comma and space
1053
            # gvmd tries to lookup each value as user name and afterwards as
1054
            # user id. So both user name and user id are possible
1055
            cmd.add_element("observers", _to_comma_list(observers))
1056
1057
        if preferences is not None:
1058
            if not isinstance(preferences, collections.abc.Mapping):
1059
                raise InvalidArgumentType(
1060
                    function=function,
1061
                    argument='preferences',
1062
                    arg_type=collections.abc.Mapping.__name__,
1063
                )
1064
1065
            _xmlprefs = cmd.add_element("preferences")
1066
            for pref_name, pref_value in preferences.items():
1067
                _xmlpref = _xmlprefs.add_element("preference")
1068
                _xmlpref.add_element("scanner_name", pref_name)
1069
                _xmlpref.add_element("value", str(pref_value))
1070
1071
        return self._send_xml_command(cmd)
1072
1073
    def __create_config(
1074
        self,
1075
        config_id: str,
1076
        name: str,
1077
        usage_type: UsageType,
1078
        function: str,
1079
        *,
1080
        comment: Optional[str] = None
1081
    ) -> Any:
1082
        if not name:
1083
            raise RequiredArgument(function=function, argument='name')
1084
1085
        if not config_id:
1086
            raise RequiredArgument(function=function, argument='config_id')
1087
1088
        cmd = XmlCommand("create_config")
1089
        if comment is not None:
1090
            cmd.add_element("comment", comment)
1091
        cmd.add_element("copy", config_id)
1092
        cmd.add_element("name", name)
1093
        cmd.add_element("usage_type", usage_type.value)
1094
        return self._send_xml_command(cmd)
1095
1096 View Code Duplication
    def __get_configs(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1097
        self,
1098
        usage_type: UsageType,
1099
        *,
1100
        filter: Optional[str] = None,
1101
        filter_id: Optional[str] = None,
1102
        trash: Optional[bool] = None,
1103
        details: Optional[bool] = None,
1104
        families: Optional[bool] = None,
1105
        preferences: Optional[bool] = None,
1106
        tasks: Optional[bool] = None
1107
    ) -> Any:
1108
        cmd = XmlCommand("get_configs")
1109
        cmd.set_attribute("usage_type", usage_type.value)
1110
1111
        _add_filter(cmd, filter, filter_id)
1112
1113
        if trash is not None:
1114
            cmd.set_attribute("trash", _to_bool(trash))
1115
1116
        if details is not None:
1117
            cmd.set_attribute("details", _to_bool(details))
1118
1119
        if families is not None:
1120
            cmd.set_attribute("families", _to_bool(families))
1121
1122
        if preferences is not None:
1123
            cmd.set_attribute("preferences", _to_bool(preferences))
1124
1125
        if tasks is not None:
1126
            cmd.set_attribute("tasks", _to_bool(tasks))
1127
1128
        return self._send_xml_command(cmd)
1129
1130
    def __get_config(
1131
        self,
1132
        config_id: str,
1133
        usage_type: UsageType,
1134
        *,
1135
        tasks: Optional[bool] = None
1136
    ) -> Any:
1137
        if not config_id:
1138
            raise RequiredArgument(
1139
                function=self.get_config.__name__, argument='config_id'
1140
            )
1141
1142
        cmd = XmlCommand("get_configs")
1143
        cmd.set_attribute("config_id", config_id)
1144
1145
        cmd.set_attribute("usage_type", usage_type.value)
1146
1147
        if tasks is not None:
1148
            cmd.set_attribute("tasks", _to_bool(tasks))
1149
1150
        # for single entity always request all details
1151
        cmd.set_attribute("details", "1")
1152
1153
        return self._send_xml_command(cmd)
1154
1155
    def __get_tasks(
1156
        self,
1157
        usage_type: UsageType,
1158
        *,
1159
        filter: Optional[str] = None,
1160
        filter_id: Optional[str] = None,
1161
        trash: Optional[bool] = None,
1162
        details: Optional[bool] = None,
1163
        schedules_only: Optional[bool] = None
1164
    ) -> Any:
1165
        cmd = XmlCommand("get_tasks")
1166
        cmd.set_attribute("usage_type", usage_type.value)
1167
1168
        _add_filter(cmd, filter, filter_id)
1169
1170
        if trash is not None:
1171
            cmd.set_attribute("trash", _to_bool(trash))
1172
1173
        if details is not None:
1174
            cmd.set_attribute("details", _to_bool(details))
1175
1176
        if schedules_only is not None:
1177
            cmd.set_attribute("schedules_only", _to_bool(schedules_only))
1178
1179
        return self._send_xml_command(cmd)
1180
1181
    def __get_task(self, task_id: str, usage_type: UsageType) -> Any:
1182
        if not task_id:
1183
            raise RequiredArgument(
1184
                function=self.get_task.__name__, argument='task_id'
1185
            )
1186
1187
        cmd = XmlCommand("get_tasks")
1188
        cmd.set_attribute("task_id", task_id)
1189
        cmd.set_attribute("usage_type", usage_type.value)
1190
1191
        # for single entity always request all details
1192
        cmd.set_attribute("details", "1")
1193
        return self._send_xml_command(cmd)
1194