Completed
Push — master ( b1214c...aea294 )
by Björn
28s queued 20s
created

get_aggregate_statistic_from_string()   A

Complexity

Conditions 3

Size

Total Lines 21
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 11
nop 1
dl 0
loc 21
rs 9.85
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
from enum import Enum
20
from typing import Any, Optional
21
22
from gvm.protocols.gmpv208.entities.entities import (
23
    EntityType,
24
)  # if I use latest, I get circular import :/
25
from gvm.errors import RequiredArgument, InvalidArgument, InvalidArgumentType
26
from gvm.utils import add_filter
27
from gvm.xml import XmlCommand
28
29
30
class AggregateStatistic(Enum):
31
    """Enum for aggregate statistic types"""
32
33
    COUNT = "count"  # Number of items
34
    C_COUNT = "c_count"  # Cumulative number of items
35
    C_SUM = "c_sum"  # Cumulative sum of values
36
    MAX = "max"  # Maximum value
37
    MEAN = "mean"  # Arithmetic mean of values
38
    MIN = "min"  # Minimum value
39
    SUM = "sum"  # Sum of values
40
    TEXT = "text"  # Text column value
41
    VALUE = "value"  # Group or subgroup column value
42
43
44
def get_aggregate_statistic_from_string(
45
    aggregate_statistic: Optional[str],
46
) -> Optional[AggregateStatistic]:
47
    """
48
    Convert a aggregate statistic string to an actual AggregateStatistic
49
    instance.
50
51
    Arguments:
52
        aggregate_statistic: Aggregate statistic string to convert to a
53
            AggregateStatistic
54
    """
55
    if not aggregate_statistic:
56
        return None
57
58
    try:
59
        return AggregateStatistic[aggregate_statistic.upper()]
60
    except KeyError:
61
        raise InvalidArgument(
62
            argument='aggregate_statistic',
63
            function=get_aggregate_statistic_from_string.__name__,
64
        ) from None
65
66
67
class SortOrder(Enum):
68
    """Enum for sort order"""
69
70
    ASCENDING = "ascending"
71
    DESCENDING = "descending"
72
73
74
def get_sort_order_from_string(
75
    sort_order: Optional[str],
76
) -> Optional[SortOrder]:
77
    """
78
    Convert a sort order string to an actual SortOrder instance.
79
80
    Arguments:
81
        sort_order: Sort order string to convert to a SortOrder
82
    """
83
    if not sort_order:
84
        return None
85
86
    try:
87
        return SortOrder[sort_order.upper()]
88
    except KeyError:
89
        raise InvalidArgument(
90
            argument='sort_order', function=get_sort_order_from_string.__name__
91
        ) from None
92
93
94
class AggregatesMixin:
95
    def get_aggregates(
96
        self,
97
        resource_type: EntityType,
98
        *,
99
        filter_string: Optional[str] = None,
100
        filter_id: Optional[str] = None,
101
        sort_criteria: Optional[list] = None,
102
        data_columns: Optional[list] = None,
103
        group_column: Optional[str] = None,
104
        subgroup_column: Optional[str] = None,
105
        text_columns: Optional[list] = None,
106
        first_group: Optional[int] = None,
107
        max_groups: Optional[int] = None,
108
        mode: Optional[int] = None,
109
        **kwargs,
110
    ) -> Any:
111
        """Request aggregated information on a resource / entity type
112
113
        Additional arguments can be set via the kwargs parameter for backward
114
        compatibility with older versions of python-gvm, but are not validated.
115
116
        Arguments:
117
            resource_type: The entity type to gather data from
118
            filter_string: Filter term to use for the query
119
            filter_id: UUID of an existing filter to use for the query
120
            sort_criteria: List of sort criteria (dicts that can contain
121
                a field, stat and order)
122
            data_columns: List of fields to aggregate data from
123
            group_column: The field to group the entities by
124
            subgroup_column: The field to further group the entities
125
                inside groups by
126
            text_columns: List of simple text columns which no statistics
127
                are calculated for
128
            first_group: The index of the first aggregate group to return
129
            max_groups: The maximum number of aggregate groups to return,
130
                -1 for all
131
            mode: Special mode for aggregation
132
133
        Returns:
134
            The response. See :py:meth:`send_command` for details.
135
        """
136
        if not resource_type:
137
            raise RequiredArgument(
138
                function=self.get_aggregates.__name__, argument='resource_type'
139
            )
140
141
        if not isinstance(resource_type, EntityType):
142
            raise InvalidArgumentType(
143
                function=self.get_aggregates.__name__,
144
                argument='resource_type',
145
                arg_type=EntityType.__name__,
146
            )
147
148
        cmd = XmlCommand('get_aggregates')
149
150
        _actual_resource_type = resource_type
151
        if resource_type.value == EntityType.AUDIT.value:
152
            _actual_resource_type = EntityType.TASK
153
            cmd.set_attribute('usage_type', 'audit')
154
        elif resource_type.value == EntityType.POLICY.value:
155
            _actual_resource_type = EntityType.SCAN_CONFIG
156
            cmd.set_attribute('usage_type', 'policy')
157
        elif resource_type.value == EntityType.SCAN_CONFIG.value:
158
            cmd.set_attribute('usage_type', 'scan')
159
        elif resource_type.value == EntityType.TASK.value:
160
            cmd.set_attribute('usage_type', 'scan')
161
        cmd.set_attribute('type', _actual_resource_type.value)
162
163
        add_filter(cmd, filter_string, filter_id)
164
165
        if first_group is not None:
166
            if not isinstance(first_group, int):
167
                raise InvalidArgumentType(
168
                    function=self.get_aggregates.__name__,
169
                    argument='first_group',
170
                    arg_type=int.__name__,
171
                )
172
            cmd.set_attribute('first_group', str(first_group))
173
174
        if max_groups is not None:
175
            if not isinstance(max_groups, int):
176
                raise InvalidArgumentType(
177
                    function=self.get_aggregates.__name__,
178
                    argument='max_groups',
179
                    arg_type=int.__name__,
180
                )
181
            cmd.set_attribute('max_groups', str(max_groups))
182
183
        if sort_criteria is not None:
184
            if not isinstance(sort_criteria, list):
185
                raise InvalidArgumentType(
186
                    function=self.get_aggregates.__name__,
187
                    argument='sort_criteria',
188
                    arg_type=list.__name__,
189
                )
190
            for sort in sort_criteria:
191
                if not isinstance(sort, dict):
192
                    raise InvalidArgumentType(
193
                        function=self.get_aggregates.__name__,
194
                        argument='sort_criteria',
195
                    )
196
197
                sort_elem = cmd.add_element('sort')
198
                if sort.get('field'):
199
                    sort_elem.set_attribute('field', sort.get('field'))
200
201
                if sort.get('stat'):
202
                    if isinstance(sort['stat'], AggregateStatistic):
203
                        sort_elem.set_attribute('stat', sort['stat'].value)
204
                    else:
205
                        stat = get_aggregate_statistic_from_string(sort['stat'])
206
                        sort_elem.set_attribute('stat', stat.value)
207
208
                if sort.get('order'):
209
                    if isinstance(sort['order'], SortOrder):
210
                        sort_elem.set_attribute('order', sort['order'].value)
211
                    else:
212
                        so = get_sort_order_from_string(sort['order'])
213
                        sort_elem.set_attribute('order', so.value)
214
215
        if data_columns is not None:
216
            if not isinstance(data_columns, list):
217
                raise InvalidArgumentType(
218
                    function=self.get_aggregates.__name__,
219
                    argument='data_columns',
220
                    arg_type=list.__name__,
221
                )
222
            for column in data_columns:
223
                cmd.add_element('data_column', column)
224
225
        if group_column is not None:
226
            cmd.set_attribute('group_column', group_column)
227
228
        if subgroup_column is not None:
229
            if not group_column:
230
                raise RequiredArgument(
231
                    '{} requires a group_column argument'
232
                    ' if subgroup_column is given'.format(
233
                        self.get_aggregates.__name__
234
                    ),
235
                    function=self.get_aggregates.__name__,
236
                    argument='subgroup_column',
237
                )
238
            cmd.set_attribute('subgroup_column', subgroup_column)
239
240
        if text_columns is not None:
241
            if not isinstance(text_columns, list):
242
                raise InvalidArgumentType(
243
                    function=self.get_aggregates.__name__,
244
                    argument='text_columns',
245
                    arg_type=list.__name__,
246
                )
247
            for column in text_columns:
248
                cmd.add_element('text_column', column)
249
250
        if mode is not None:
251
            cmd.set_attribute('mode', mode)
252
253
        # Add additional keyword args as attributes for backward compatibility.
254
        cmd.set_attributes(kwargs)
255
256
        return self._send_xml_command(cmd)
257