Completed
Push — master ( 7300a2...9e6edc )
by
unknown
27s queued 19s
created

gvm.protocols.gmpv9.Gmp.clone_audit()   A

Complexity

Conditions 2

Size

Total Lines 17
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 7
nop 2
dl 0
loc 17
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 - 2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
# pylint: disable=arguments-differ, redefined-builtin, too-many-lines
20
21
"""
22
Module for communication with gvmd in `Greenbone Management Protocol version 9`_
23
24
.. _Greenbone Management Protocol version 9:
25
    https://docs.greenbone.net/API/GMP/gmp-9.0.html
26
"""
27
import collections
28
import numbers
29
30
from typing import Any, List, Optional
31
32
from gvm.errors import InvalidArgument, InvalidArgumentType, RequiredArgument
33
from gvm.utils import deprecation
34
from gvm.xml import XmlCommand
35
36
from gvm.protocols.gmpv8 import Gmp as Gmpv8, _to_bool, _add_filter
37
from gvm.protocols.gmpv7 import _is_list_like, _to_comma_list
38
39
from . import types
40
from .types import *
41
from .types import _UsageType as UsageType
42
43
_EMPTY_POLICY_ID = '085569ce-73ed-11df-83c3-002264764cea'
44
45
PROTOCOL_VERSION = (9,)
46
47
48
class Gmp(Gmpv8):
49
50
    types = types
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable types does not seem to be defined.
Loading history...
51
52
    @staticmethod
53
    def get_protocol_version() -> tuple:
54
        """Determine the Greenbone Management Protocol version.
55
56
        Returns:
57
            tuple: Implemented version of the Greenbone Management Protocol
58
        """
59
        return PROTOCOL_VERSION
60
61 View Code Duplication
    def create_audit(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
62
        self,
63
        name: str,
64
        policy_id: str,
65
        target_id: str,
66
        scanner_id: str,
67
        *,
68
        alterable: Optional[bool] = None,
69
        hosts_ordering: Optional[HostsOrdering] = None,
70
        schedule_id: Optional[str] = None,
71
        alert_ids: Optional[List[str]] = None,
72
        comment: Optional[str] = None,
73
        schedule_periods: Optional[int] = None,
74
        observers: Optional[List[str]] = None,
75
        preferences: Optional[dict] = None
76
    ) -> Any:
77
        """Create a new audit task
78
79
        Arguments:
80
            name: Name of the new audit
81
            policy_id: UUID of policy to use by the audit
82
            target_id: UUID of target to be scanned
83
            scanner_id: UUID of scanner to use for scanning the target
84
            comment: Comment for the audit
85
            alterable: Whether the task should be alterable
86
            alert_ids: List of UUIDs for alerts to be applied to the audit
87
            hosts_ordering: The order hosts are scanned in
88
            schedule_id: UUID of a schedule when the audit should be run.
89
            schedule_periods: A limit to the number of times the audit will be
90
                scheduled, or 0 for no limit
91
            observers: List of names or ids of users which should be allowed to
92
                observe this audit
93
            preferences: Name/Value pairs of scanner preferences.
94
95
        Returns:
96
            The response. See :py:meth:`send_command` for details.
97
        """
98
99
        return self.__create_task(
100
            name=name,
101
            config_id=policy_id,
102
            target_id=target_id,
103
            scanner_id=scanner_id,
104
            usage_type=UsageType.AUDIT,
105
            function=self.create_audit.__name__,
106
            alterable=alterable,
107
            hosts_ordering=hosts_ordering,
108
            schedule_id=schedule_id,
109
            alert_ids=alert_ids,
110
            comment=comment,
111
            schedule_periods=schedule_periods,
112
            observers=observers,
113
            preferences=preferences,
114
        )
115
116
    def create_config(self, config_id: str, name: str) -> Any:
117
        """Create a new scan config
118
119
        Arguments:
120
            config_id: UUID of the existing scan config
121
            name: Name of the new scan config
122
123
        Returns:
124
            The response. See :py:meth:`send_command` for details.
125
        """
126
        return self.__create_config(
127
            config_id=config_id,
128
            name=name,
129
            usage_type=UsageType.SCAN,
130
            function=self.create_config.__name__,
131
        )
132
133
    def create_policy(self, name: str, *, policy_id: str = None) -> Any:
134
        """Create a new policy config
135
136
        Arguments:
137
            name: Name of the new policy
138
            policy_id: UUID of an existing policy as base. By default the empty
139
                policy is used.
140
141
        Returns:
142
            The response. See :py:meth:`send_command` for details.
143
        """
144
        if policy_id is None:
145
            policy_id = _EMPTY_POLICY_ID
146
        return self.__create_config(
147
            config_id=policy_id,
148
            name=name,
149
            usage_type=UsageType.POLICY,
150
            function=self.create_policy.__name__,
151
        )
152
153 View Code Duplication
    def create_task(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
154
        self,
155
        name: str,
156
        config_id: str,
157
        target_id: str,
158
        scanner_id: str,
159
        *,
160
        alterable: Optional[bool] = None,
161
        hosts_ordering: Optional[HostsOrdering] = None,
162
        schedule_id: Optional[str] = None,
163
        alert_ids: Optional[List[str]] = None,
164
        comment: Optional[str] = None,
165
        schedule_periods: Optional[int] = None,
166
        observers: Optional[List[str]] = None,
167
        preferences: Optional[dict] = None
168
    ) -> Any:
169
        """Create a new scan task
170
171
        Arguments:
172
            name: Name of the task
173
            config_id: UUID of scan config to use by the task
174
            target_id: UUID of target to be scanned
175
            scanner_id: UUID of scanner to use for scanning the target
176
            comment: Comment for the task
177
            alterable: Whether the task should be alterable
178
            alert_ids: List of UUIDs for alerts to be applied to the task
179
            hosts_ordering: The order hosts are scanned in
180
            schedule_id: UUID of a schedule when the task should be run.
181
            schedule_periods: A limit to the number of times the task will be
182
                scheduled, or 0 for no limit
183
            observers: List of names or ids of users which should be allowed to
184
                observe this task
185
            preferences: Name/Value pairs of scanner preferences.
186
187
        Returns:
188
            The response. See :py:meth:`send_command` for details.
189
        """
190
        return self.__create_task(
191
            name=name,
192
            config_id=config_id,
193
            target_id=target_id,
194
            scanner_id=scanner_id,
195
            usage_type=UsageType.SCAN,
196
            function=self.create_task.__name__,
197
            alterable=alterable,
198
            hosts_ordering=hosts_ordering,
199
            schedule_id=schedule_id,
200
            alert_ids=alert_ids,
201
            comment=comment,
202
            schedule_periods=schedule_periods,
203
            observers=observers,
204
            preferences=preferences,
205
        )
206
207 View Code Duplication
    def create_tls_certificate(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
208
        self,
209
        name: str,
210
        certificate: str,
211
        *,
212
        comment: Optional[str] = None,
213
        trust: Optional[bool] = None
214
    ) -> Any:
215
        """Create a new TLS certificate
216
217
        Arguments:
218
            name: Name of the TLS certificate, defaulting to the MD5
219
                fingerprint.
220
            certificate: The Base64 encoded certificate data (x.509 DER or PEM).
221
            comment: Comment for the TLS certificate.
222
            trust: Whether the certificate is trusted.
223
224
        Returns:
225
            The response. See :py:meth:`send_command` for details.
226
        """
227
        if not name:
228
            raise RequiredArgument(
229
                function=self.create_tls_certificate.__name__, argument='name'
230
            )
231
        if not certificate:
232
            raise RequiredArgument(
233
                function=self.create_tls_certificate.__name__,
234
                argument='certificate',
235
            )
236
237
        cmd = XmlCommand("create_tls_certificate")
238
239
        if comment:
240
            cmd.add_element("comment", comment)
241
242
        cmd.add_element("name", name)
243
        cmd.add_element("certificate", certificate)
244
245
        if trust:
246
            cmd.add_element("trust", _to_bool(trust))
247
248
        return self._send_xml_command(cmd)
249
250
    def get_tls_certificates(
251
        self,
252
        *,
253
        filter: Optional[str] = None,
254
        filter_id: Optional[str] = None,
255
        include_certificate_data: Optional[bool] = None
256
    ) -> Any:
257
        """Request a list of TLS certificates
258
259
        Arguments:
260
            filter: Filter term to use for the query
261
            filter_id: UUID of an existing filter to use for the query
262
            include_certificate_data: Whether to include the certificate data in
263
                the response
264
265
        Returns:
266
            The response. See :py:meth:`send_command` for details.
267
        """
268
269
        cmd = XmlCommand("get_tls_certificates")
270
271
        _add_filter(cmd, filter, filter_id)
272
273
        if include_certificate_data is not None:
274
            cmd.set_attribute(
275
                "include_certificate_data", _to_bool(include_certificate_data)
276
            )
277
278
        return self._send_xml_command(cmd)
279
280
    def get_tls_certificate(self, tls_certificate_id: str) -> Any:
281
        """Request a single TLS certificate
282
283
        Arguments:
284
            tls_certificate_id: UUID of an existing TLS certificate
285
286
        Returns:
287
            The response. See :py:meth:`send_command` for details.
288
        """
289
        if not tls_certificate_id:
290
            raise RequiredArgument(
291
                function=self.get_tls_certificate.__name__,
292
                argument='tls_certificate_id',
293
            )
294
295
        cmd = XmlCommand("get_tls_certificates")
296
        cmd.set_attribute("tls_certificate_id", tls_certificate_id)
297
298
        # for single tls certificate always request cert data
299
        cmd.set_attribute("include_certificate_data", "1")
300
        return self._send_xml_command(cmd)
301
302 View Code Duplication
    def modify_tls_certificate(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
303
        self,
304
        tls_certificate_id: str,
305
        *,
306
        name: Optional[str] = None,
307
        comment: Optional[str] = None,
308
        trust: Optional[bool] = None
309
    ) -> Any:
310
        """Modifies an existing TLS certificate.
311
312
        Arguments:
313
            tls_certificate_id: UUID of the TLS certificate to be modified.
314
            name: Name of the TLS certificate, defaulting to the MD5 fingerprint
315
            comment: Comment for the TLS certificate.
316
            trust: Whether the certificate is trusted.
317
318
        Returns:
319
            The response. See :py:meth:`send_command` for details.
320
        """
321
        if not tls_certificate_id:
322
            raise RequiredArgument(
323
                function=self.modify_tls_certificate.__name__,
324
                argument='tls_certificate_id',
325
            )
326
327
        cmd = XmlCommand("modify_tls_certificate")
328
        cmd.set_attribute("tls_certificate_id", str(tls_certificate_id))
329
330
        if comment:
331
            cmd.add_element("comment", comment)
332
333
        if name:
334
            cmd.add_element("name", name)
335
336
        if trust:
337
            cmd.add_element("trust", _to_bool(trust))
338
339
        return self._send_xml_command(cmd)
340
341
    def clone_tls_certificate(self, tls_certificate_id: str) -> Any:
342
        """Modifies an existing TLS certificate.
343
344
        Arguments:
345
            tls_certificate_id: The UUID of an existing TLS certificate
346
347
        Returns:
348
            The response. See :py:meth:`send_command` for details.
349
        """
350
        if not tls_certificate_id:
351
            raise RequiredArgument(
352
                function=self.modify_tls_certificate.__name__,
353
                argument='tls_certificate_id',
354
            )
355
356
        cmd = XmlCommand("create_tls_certificate")
357
358
        cmd.add_element("copy", tls_certificate_id)
359
360
        return self._send_xml_command(cmd)
361
362
    def get_configs(
363
        self,
364
        *,
365
        filter: Optional[str] = None,
366
        filter_id: Optional[str] = None,
367
        trash: Optional[bool] = None,
368
        details: Optional[bool] = None,
369
        families: Optional[bool] = None,
370
        preferences: Optional[bool] = None,
371
        tasks: Optional[bool] = None
372
    ) -> Any:
373
        """Request a list of scan configs
374
375
        Arguments:
376
            filter: Filter term to use for the query
377
            filter_id: UUID of an existing filter to use for the query
378
            trash: Whether to get the trashcan scan configs instead
379
            details: Whether to get config families, preferences, nvt selectors
380
                and tasks.
381
            families: Whether to include the families if no details are
382
                requested
383
            preferences: Whether to include the preferences if no details are
384
                requested
385
            tasks: Whether to get tasks using this config
386
387
        Returns:
388
            The response. See :py:meth:`send_command` for details.
389
        """
390
        return self.__get_configs(
391
            UsageType.SCAN,
392
            filter=filter,
393
            filter_id=filter_id,
394
            trash=trash,
395
            details=details,
396
            families=families,
397
            preferences=preferences,
398
            tasks=tasks,
399
        )
400
401
    def get_policies(
402
        self,
403
        *,
404
        audits: Optional[bool] = None,
405
        filter: Optional[str] = None,
406
        filter_id: Optional[str] = None,
407
        details: Optional[bool] = None,
408
        families: Optional[bool] = None,
409
        preferences: Optional[bool] = None,
410
        trash: Optional[bool] = None
411
    ) -> Any:
412
        """Request a list of policies
413
414
        Arguments:
415
            audits: Whether to get audits using the policy
416
            filter: Filter term to use for the query
417
            filter_id: UUID of an existing filter to use for the query
418
            details: Whether to get  families, preferences, nvt selectors
419
                and tasks.
420
            families: Whether to include the families if no details are
421
                requested
422
            preferences: Whether to include the preferences if no details are
423
                requested
424
            trash: Whether to get the trashcan audits instead
425
426
        Returns:
427
            The response. See :py:meth:`send_command` for details.
428
        """
429
        return self.__get_configs(
430
            UsageType.POLICY,
431
            filter=filter,
432
            filter_id=filter_id,
433
            details=details,
434
            families=families,
435
            preferences=preferences,
436
            tasks=audits,
437
            trash=trash,
438
        )
439
440
    def get_config(self, config_id: str) -> Any:
441
        """Request a single scan config
442
443
        Arguments:
444
            config_id: UUID of an existing scan config
445
446
        Returns:
447
            The response. See :py:meth:`send_command` for details.
448
        """
449
        return self.__get_config(config_id, UsageType.SCAN)
450
451
    def get_policy(self, policy_id: str) -> Any:
452
        """Request a single policy
453
454
        Arguments:
455
            policy_id: UUID of an existing policy
456
457
        Returns:
458
            The response. See :py:meth:`send_command` for details.
459
        """
460
        return self.__get_config(policy_id, UsageType.POLICY)
461
462
    def get_tasks(
463
        self,
464
        *,
465
        filter: Optional[str] = None,
466
        filter_id: Optional[str] = None,
467
        trash: Optional[bool] = None,
468
        details: Optional[bool] = None,
469
        schedules_only: Optional[bool] = None
470
    ) -> Any:
471
        """Request a list of tasks
472
473
        Arguments:
474
            filter: Filter term to use for the query
475
            filter_id: UUID of an existing filter to use for the query
476
            trash: Whether to get the trashcan tasks instead
477
            details: Whether to include full task details
478
            schedules_only: Whether to only include id, name and schedule
479
                details
480
481
        Returns:
482
            The response. See :py:meth:`send_command` for details.
483
        """
484
        return self.__get_tasks(
485
            UsageType.SCAN,
486
            filter=filter,
487
            filter_id=filter_id,
488
            trash=trash,
489
            details=details,
490
            schedules_only=schedules_only,
491
        )
492
493
    def get_audits(
494
        self,
495
        *,
496
        filter: Optional[str] = None,
497
        filter_id: Optional[str] = None,
498
        trash: Optional[bool] = None,
499
        details: Optional[bool] = None,
500
        schedules_only: Optional[bool] = None
501
    ) -> Any:
502
        """Request a list of audits
503
504
        Arguments:
505
            filter: Filter term to use for the query
506
            filter_id: UUID of an existing filter to use for the query
507
            trash: Whether to get the trashcan audits instead
508
            details: Whether to include full audit details
509
            schedules_only: Whether to only include id, name and schedule
510
                details
511
512
        Returns:
513
            The response. See :py:meth:`send_command` for details.
514
        """
515
        return self.__get_tasks(
516
            UsageType.AUDIT,
517
            filter=filter,
518
            filter_id=filter_id,
519
            trash=trash,
520
            details=details,
521
            schedules_only=schedules_only,
522
        )
523
524
    def get_task(self, task_id: str) -> Any:
525
        """Request a single task
526
527
        Arguments:
528
            task_id: UUID of an existing task
529
530
        Returns:
531
            The response. See :py:meth:`send_command` for details.
532
        """
533
        return self.__get_task(task_id, UsageType.SCAN)
534
535
    def get_audit(self, audit_id: str) -> Any:
536
        """Request a single audit
537
538
        Arguments:
539
            audit_id: UUID of an existing audit
540
541
        Returns:
542
            The response. See :py:meth:`send_command` for details.
543
        """
544
        return self.__get_task(audit_id, UsageType.AUDIT)
545
546
    def clone_audit(self, audit_id: str) -> Any:
547
        """Clone an existing audit
548
549
        Arguments:
550
            audit_id: UUID of existing audit to clone from
551
552
        Returns:
553
            The response. See :py:meth:`send_command` for details.
554
        """
555
        if not audit_id:
556
            raise RequiredArgument(
557
                function=self.clone_audit.__name__, argument='audit_id'
558
            )
559
560
        cmd = XmlCommand("create_task")
561
        cmd.add_element("copy", audit_id)
562
        return self._send_xml_command(cmd)
563
564
    def clone_policy(self, policy_id: str) -> Any:
565
        """Clone a policy from an existing one
566
567
        Arguments:
568
            policy_id: UUID of the existing policy
569
570
        Returns:
571
            The response. See :py:meth:`send_command` for details.
572
        """
573
        if not policy_id:
574
            raise RequiredArgument(
575
                function=self.clone_policy.__name__, argument='policy_id'
576
            )
577
578
        cmd = XmlCommand("create_config")
579
        cmd.add_element("copy", policy_id)
580
        return self._send_xml_command(cmd)
581
582
    def delete_audit(
583
        self, audit_id: str, *, ultimate: Optional[bool] = False
584
    ) -> Any:
585
        """Deletes an existing audit
586
587
        Arguments:
588
            audit_id: UUID of the audit to be deleted.
589
            ultimate: Whether to remove entirely, or to the trashcan.
590
        """
591
        if not audit_id:
592
            raise RequiredArgument(
593
                function=self.delete_audit.__name__, argument='audit_id'
594
            )
595
596
        cmd = XmlCommand("delete_task")
597
        cmd.set_attribute("task_id", audit_id)
598
        cmd.set_attribute("ultimate", _to_bool(ultimate))
599
600
        return self._send_xml_command(cmd)
601
602
    def delete_policy(
603
        self, policy_id: str, *, ultimate: Optional[bool] = False
604
    ) -> Any:
605
        """Deletes an existing policy
606
607
        Arguments:
608
            policy_id: UUID of the policy to be deleted.
609
            ultimate: Whether to remove entirely, or to the trashcan.
610
        """
611
        if not policy_id:
612
            raise RequiredArgument(
613
                function=self.delete_policy.__name__, argument='policy_id'
614
            )
615
616
        cmd = XmlCommand("delete_config")
617
        cmd.set_attribute("config_id", policy_id)
618
        cmd.set_attribute("ultimate", _to_bool(ultimate))
619
620
        return self._send_xml_command(cmd)
621
622 View Code Duplication
    def __create_task(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
623
        self,
624
        name: str,
625
        config_id: str,
626
        target_id: str,
627
        scanner_id: str,
628
        usage_type: UsageType,
629
        function: str,
630
        *,
631
        alterable: Optional[bool] = None,
632
        hosts_ordering: Optional[HostsOrdering] = None,
633
        schedule_id: Optional[str] = None,
634
        alert_ids: Optional[List[str]] = None,
635
        comment: Optional[str] = None,
636
        schedule_periods: Optional[int] = None,
637
        observers: Optional[List[str]] = None,
638
        preferences: Optional[dict] = None
639
    ) -> Any:
640
        if not name:
641
            raise RequiredArgument(function=function, argument='name')
642
643
        if not config_id:
644
            raise RequiredArgument(function=function, argument='config_id')
645
646
        if not target_id:
647
            raise RequiredArgument(function=function, argument='target_id')
648
649
        if not scanner_id:
650
            raise RequiredArgument(function=function, argument='scanner_id')
651
652
        # don't allow to create a container task with create_task
653
        if target_id == '0':
654
            raise InvalidArgument(function=function, argument='target_id')
655
656
        cmd = XmlCommand("create_task")
657
        cmd.add_element("name", name)
658
        cmd.add_element("usage_type", usage_type.value)
659
        cmd.add_element("config", attrs={"id": config_id})
660
        cmd.add_element("target", attrs={"id": target_id})
661
        cmd.add_element("scanner", attrs={"id": scanner_id})
662
663
        if comment:
664
            cmd.add_element("comment", comment)
665
666
        if alterable is not None:
667
            cmd.add_element("alterable", _to_bool(alterable))
668
669
        if hosts_ordering:
670
            if not isinstance(hosts_ordering, HostsOrdering):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable HostsOrdering does not seem to be defined.
Loading history...
671
                raise InvalidArgumentType(
672
                    function=function,
673
                    argument='hosts_ordering',
674
                    arg_type=HostsOrdering.__name__,
675
                )
676
            cmd.add_element("hosts_ordering", hosts_ordering.value)
677
678
        if alert_ids:
679
            if isinstance(alert_ids, str):
680
                deprecation(
681
                    "Please pass a list as alert_ids parameter to {}. "
682
                    "Passing a string is deprecated and will be removed in "
683
                    "future.".format(function)
684
                )
685
686
                # if a single id is given as a string wrap it into a list
687
                alert_ids = [alert_ids]
688
            if _is_list_like(alert_ids):
689
                # parse all given alert id's
690
                for alert in alert_ids:
691
                    cmd.add_element("alert", attrs={"id": str(alert)})
692
693
        if schedule_id:
694
            cmd.add_element("schedule", attrs={"id": schedule_id})
695
696
            if schedule_periods is not None:
697
                if (
698
                    not isinstance(schedule_periods, numbers.Integral)
699
                    or schedule_periods < 0
700
                ):
701
                    raise InvalidArgument(
702
                        "schedule_periods must be an integer greater or equal "
703
                        "than 0"
704
                    )
705
                cmd.add_element("schedule_periods", str(schedule_periods))
706
707
        if observers is not None:
708
            if not _is_list_like(observers):
709
                raise InvalidArgumentType(
710
                    function=function, argument='observers', arg_type='list'
711
                )
712
713
            # gvmd splits by comma and space
714
            # gvmd tries to lookup each value as user name and afterwards as
715
            # user id. So both user name and user id are possible
716
            cmd.add_element("observers", _to_comma_list(observers))
717
718
        if preferences is not None:
719
            if not isinstance(preferences, collections.abc.Mapping):
720
                raise InvalidArgumentType(
721
                    function=function,
722
                    argument='preferences',
723
                    arg_type=collections.abc.Mapping.__name__,
724
                )
725
726
            _xmlprefs = cmd.add_element("preferences")
727
            for pref_name, pref_value in preferences.items():
728
                _xmlpref = _xmlprefs.add_element("preference")
729
                _xmlpref.add_element("scanner_name", pref_name)
730
                _xmlpref.add_element("value", str(pref_value))
731
732
        return self._send_xml_command(cmd)
733
734
    def __create_config(
735
        self, config_id: str, name: str, usage_type: UsageType, function: str
736
    ) -> Any:
737
        if not name:
738
            raise RequiredArgument(function=function, argument='name')
739
740
        if not config_id:
741
            raise RequiredArgument(function=function, argument='config_id')
742
743
        cmd = XmlCommand("create_config")
744
        cmd.add_element("copy", config_id)
745
        cmd.add_element("name", name)
746
        cmd.add_element("usage_type", usage_type.value)
747
        return self._send_xml_command(cmd)
748
749 View Code Duplication
    def __get_configs(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
750
        self,
751
        usage_type: UsageType,
752
        *,
753
        filter: Optional[str] = None,
754
        filter_id: Optional[str] = None,
755
        trash: Optional[bool] = None,
756
        details: Optional[bool] = None,
757
        families: Optional[bool] = None,
758
        preferences: Optional[bool] = None,
759
        tasks: Optional[bool] = None
760
    ) -> Any:
761
        cmd = XmlCommand("get_configs")
762
        cmd.set_attribute("usage_type", usage_type.value)
763
764
        _add_filter(cmd, filter, filter_id)
765
766
        if trash is not None:
767
            cmd.set_attribute("trash", _to_bool(trash))
768
769
        if details is not None:
770
            cmd.set_attribute("details", _to_bool(details))
771
772
        if families is not None:
773
            cmd.set_attribute("families", _to_bool(families))
774
775
        if preferences is not None:
776
            cmd.set_attribute("preferences", _to_bool(preferences))
777
778
        if tasks is not None:
779
            cmd.set_attribute("tasks", _to_bool(tasks))
780
781
        return self._send_xml_command(cmd)
782
783
    def __get_config(self, config_id: str, usage_type: UsageType) -> Any:
784
        if not config_id:
785
            raise RequiredArgument(
786
                function=self.get_config.__name__, argument='config_id'
787
            )
788
789
        cmd = XmlCommand("get_configs")
790
        cmd.set_attribute("config_id", config_id)
791
        cmd.set_attribute("usage_type", usage_type.value)
792
793
        # for single entity always request all details
794
        cmd.set_attribute("details", "1")
795
796
        return self._send_xml_command(cmd)
797
798
    def __get_tasks(
799
        self,
800
        usage_type: UsageType,
801
        *,
802
        filter: Optional[str] = None,
803
        filter_id: Optional[str] = None,
804
        trash: Optional[bool] = None,
805
        details: Optional[bool] = None,
806
        schedules_only: Optional[bool] = None
807
    ) -> Any:
808
        cmd = XmlCommand("get_tasks")
809
        cmd.set_attribute("usage_type", usage_type.value)
810
811
        _add_filter(cmd, filter, filter_id)
812
813
        if trash is not None:
814
            cmd.set_attribute("trash", _to_bool(trash))
815
816
        if details is not None:
817
            cmd.set_attribute("details", _to_bool(details))
818
819
        if schedules_only is not None:
820
            cmd.set_attribute("schedules_only", _to_bool(schedules_only))
821
822
        return self._send_xml_command(cmd)
823
824
    def __get_task(self, task_id: str, usage_type: UsageType) -> Any:
825
        if not task_id:
826
            raise RequiredArgument(
827
                function=self.get_task.__name__, argument='task_id'
828
            )
829
830
        cmd = XmlCommand("get_tasks")
831
        cmd.set_attribute("task_id", task_id)
832
        cmd.set_attribute("usage_type", usage_type.value)
833
834
        # for single entity always request all details
835
        cmd.set_attribute("details", "1")
836
        return self._send_xml_command(cmd)
837