Completed
Push — master ( b1214c...aea294 )
by Björn
28s queued 20s
created

AggregatesMixin.get_aggregates()   F

Complexity

Conditions 30

Size

Total Lines 162
Code Lines 106

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 30
eloc 106
nop 14
dl 0
loc 162
rs 0
c 0
b 0
f 0

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like gvm.protocols.gmpv208.system.aggregates.AggregatesMixin.get_aggregates() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
from enum import Enum
20
from typing import Any, Optional
21
22
from gvm.protocols.gmpv208.entities.entities import (
23
    EntityType,
24
)  # if I use latest, I get circular import :/
25
from gvm.errors import RequiredArgument, InvalidArgument, InvalidArgumentType
26
from gvm.utils import add_filter
27
from gvm.xml import XmlCommand
28
29
30
class AggregateStatistic(Enum):
31
    """Enum for aggregate statistic types"""
32
33
    COUNT = "count"  # Number of items
34
    C_COUNT = "c_count"  # Cumulative number of items
35
    C_SUM = "c_sum"  # Cumulative sum of values
36
    MAX = "max"  # Maximum value
37
    MEAN = "mean"  # Arithmetic mean of values
38
    MIN = "min"  # Minimum value
39
    SUM = "sum"  # Sum of values
40
    TEXT = "text"  # Text column value
41
    VALUE = "value"  # Group or subgroup column value
42
43
44
def get_aggregate_statistic_from_string(
45
    aggregate_statistic: Optional[str],
46
) -> Optional[AggregateStatistic]:
47
    """
48
    Convert a aggregate statistic string to an actual AggregateStatistic
49
    instance.
50
51
    Arguments:
52
        aggregate_statistic: Aggregate statistic string to convert to a
53
            AggregateStatistic
54
    """
55
    if not aggregate_statistic:
56
        return None
57
58
    try:
59
        return AggregateStatistic[aggregate_statistic.upper()]
60
    except KeyError:
61
        raise InvalidArgument(
62
            argument='aggregate_statistic',
63
            function=get_aggregate_statistic_from_string.__name__,
64
        ) from None
65
66
67
class SortOrder(Enum):
68
    """Enum for sort order"""
69
70
    ASCENDING = "ascending"
71
    DESCENDING = "descending"
72
73
74
def get_sort_order_from_string(
75
    sort_order: Optional[str],
76
) -> Optional[SortOrder]:
77
    """
78
    Convert a sort order string to an actual SortOrder instance.
79
80
    Arguments:
81
        sort_order: Sort order string to convert to a SortOrder
82
    """
83
    if not sort_order:
84
        return None
85
86
    try:
87
        return SortOrder[sort_order.upper()]
88
    except KeyError:
89
        raise InvalidArgument(
90
            argument='sort_order', function=get_sort_order_from_string.__name__
91
        ) from None
92
93
94
class AggregatesMixin:
95
    def get_aggregates(
96
        self,
97
        resource_type: EntityType,
98
        *,
99
        filter_string: Optional[str] = None,
100
        filter_id: Optional[str] = None,
101
        sort_criteria: Optional[list] = None,
102
        data_columns: Optional[list] = None,
103
        group_column: Optional[str] = None,
104
        subgroup_column: Optional[str] = None,
105
        text_columns: Optional[list] = None,
106
        first_group: Optional[int] = None,
107
        max_groups: Optional[int] = None,
108
        mode: Optional[int] = None,
109
        **kwargs,
110
    ) -> Any:
111
        """Request aggregated information on a resource / entity type
112
113
        Additional arguments can be set via the kwargs parameter for backward
114
        compatibility with older versions of python-gvm, but are not validated.
115
116
        Arguments:
117
            resource_type: The entity type to gather data from
118
            filter_string: Filter term to use for the query
119
            filter_id: UUID of an existing filter to use for the query
120
            sort_criteria: List of sort criteria (dicts that can contain
121
                a field, stat and order)
122
            data_columns: List of fields to aggregate data from
123
            group_column: The field to group the entities by
124
            subgroup_column: The field to further group the entities
125
                inside groups by
126
            text_columns: List of simple text columns which no statistics
127
                are calculated for
128
            first_group: The index of the first aggregate group to return
129
            max_groups: The maximum number of aggregate groups to return,
130
                -1 for all
131
            mode: Special mode for aggregation
132
133
        Returns:
134
            The response. See :py:meth:`send_command` for details.
135
        """
136
        if not resource_type:
137
            raise RequiredArgument(
138
                function=self.get_aggregates.__name__, argument='resource_type'
139
            )
140
141
        if not isinstance(resource_type, EntityType):
142
            raise InvalidArgumentType(
143
                function=self.get_aggregates.__name__,
144
                argument='resource_type',
145
                arg_type=EntityType.__name__,
146
            )
147
148
        cmd = XmlCommand('get_aggregates')
149
150
        _actual_resource_type = resource_type
151
        if resource_type.value == EntityType.AUDIT.value:
152
            _actual_resource_type = EntityType.TASK
153
            cmd.set_attribute('usage_type', 'audit')
154
        elif resource_type.value == EntityType.POLICY.value:
155
            _actual_resource_type = EntityType.SCAN_CONFIG
156
            cmd.set_attribute('usage_type', 'policy')
157
        elif resource_type.value == EntityType.SCAN_CONFIG.value:
158
            cmd.set_attribute('usage_type', 'scan')
159
        elif resource_type.value == EntityType.TASK.value:
160
            cmd.set_attribute('usage_type', 'scan')
161
        cmd.set_attribute('type', _actual_resource_type.value)
162
163
        add_filter(cmd, filter_string, filter_id)
164
165
        if first_group is not None:
166
            if not isinstance(first_group, int):
167
                raise InvalidArgumentType(
168
                    function=self.get_aggregates.__name__,
169
                    argument='first_group',
170
                    arg_type=int.__name__,
171
                )
172
            cmd.set_attribute('first_group', str(first_group))
173
174
        if max_groups is not None:
175
            if not isinstance(max_groups, int):
176
                raise InvalidArgumentType(
177
                    function=self.get_aggregates.__name__,
178
                    argument='max_groups',
179
                    arg_type=int.__name__,
180
                )
181
            cmd.set_attribute('max_groups', str(max_groups))
182
183
        if sort_criteria is not None:
184
            if not isinstance(sort_criteria, list):
185
                raise InvalidArgumentType(
186
                    function=self.get_aggregates.__name__,
187
                    argument='sort_criteria',
188
                    arg_type=list.__name__,
189
                )
190
            for sort in sort_criteria:
191
                if not isinstance(sort, dict):
192
                    raise InvalidArgumentType(
193
                        function=self.get_aggregates.__name__,
194
                        argument='sort_criteria',
195
                    )
196
197
                sort_elem = cmd.add_element('sort')
198
                if sort.get('field'):
199
                    sort_elem.set_attribute('field', sort.get('field'))
200
201
                if sort.get('stat'):
202
                    if isinstance(sort['stat'], AggregateStatistic):
203
                        sort_elem.set_attribute('stat', sort['stat'].value)
204
                    else:
205
                        stat = get_aggregate_statistic_from_string(sort['stat'])
206
                        sort_elem.set_attribute('stat', stat.value)
207
208
                if sort.get('order'):
209
                    if isinstance(sort['order'], SortOrder):
210
                        sort_elem.set_attribute('order', sort['order'].value)
211
                    else:
212
                        so = get_sort_order_from_string(sort['order'])
213
                        sort_elem.set_attribute('order', so.value)
214
215
        if data_columns is not None:
216
            if not isinstance(data_columns, list):
217
                raise InvalidArgumentType(
218
                    function=self.get_aggregates.__name__,
219
                    argument='data_columns',
220
                    arg_type=list.__name__,
221
                )
222
            for column in data_columns:
223
                cmd.add_element('data_column', column)
224
225
        if group_column is not None:
226
            cmd.set_attribute('group_column', group_column)
227
228
        if subgroup_column is not None:
229
            if not group_column:
230
                raise RequiredArgument(
231
                    '{} requires a group_column argument'
232
                    ' if subgroup_column is given'.format(
233
                        self.get_aggregates.__name__
234
                    ),
235
                    function=self.get_aggregates.__name__,
236
                    argument='subgroup_column',
237
                )
238
            cmd.set_attribute('subgroup_column', subgroup_column)
239
240
        if text_columns is not None:
241
            if not isinstance(text_columns, list):
242
                raise InvalidArgumentType(
243
                    function=self.get_aggregates.__name__,
244
                    argument='text_columns',
245
                    arg_type=list.__name__,
246
                )
247
            for column in text_columns:
248
                cmd.add_element('text_column', column)
249
250
        if mode is not None:
251
            cmd.set_attribute('mode', mode)
252
253
        # Add additional keyword args as attributes for backward compatibility.
254
        cmd.set_attributes(kwargs)
255
256
        return self._send_xml_command(cmd)
257