1
|
|
|
# -*- coding: utf-8 -*- |
2
|
|
|
# Copyright (C) 2021 Greenbone Networks GmbH |
3
|
|
|
# |
4
|
|
|
# SPDX-License-Identifier: GPL-3.0-or-later |
5
|
|
|
# |
6
|
|
|
# This program is free software: you can redistribute it and/or modify |
7
|
|
|
# it under the terms of the GNU General Public License as published by |
8
|
|
|
# the Free Software Foundation, either version 3 of the License, or |
9
|
|
|
# (at your option) any later version. |
10
|
|
|
# |
11
|
|
|
# This program is distributed in the hope that it will be useful, |
12
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
13
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
14
|
|
|
# GNU General Public License for more details. |
15
|
|
|
# |
16
|
|
|
# You should have received a copy of the GNU General Public License |
17
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
18
|
|
|
|
19
|
|
|
from enum import Enum |
20
|
|
|
from typing import Any, Optional |
21
|
|
|
|
22
|
|
|
from gvm.protocols.gmpv208.entities.entities import ( |
23
|
|
|
EntityType, |
24
|
|
|
) # if I use latest, I get circular import :/ |
25
|
|
|
from gvm.errors import RequiredArgument, InvalidArgument, InvalidArgumentType |
26
|
|
|
from gvm.utils import add_filter |
27
|
|
|
from gvm.xml import XmlCommand |
28
|
|
|
|
29
|
|
|
|
30
|
|
|
class AggregateStatistic(Enum): |
31
|
|
|
"""Enum for aggregate statistic types""" |
32
|
|
|
|
33
|
|
|
COUNT = "count" # Number of items |
34
|
|
|
C_COUNT = "c_count" # Cumulative number of items |
35
|
|
|
C_SUM = "c_sum" # Cumulative sum of values |
36
|
|
|
MAX = "max" # Maximum value |
37
|
|
|
MEAN = "mean" # Arithmetic mean of values |
38
|
|
|
MIN = "min" # Minimum value |
39
|
|
|
SUM = "sum" # Sum of values |
40
|
|
|
TEXT = "text" # Text column value |
41
|
|
|
VALUE = "value" # Group or subgroup column value |
42
|
|
|
|
43
|
|
|
|
44
|
|
|
def get_aggregate_statistic_from_string( |
45
|
|
|
aggregate_statistic: Optional[str], |
46
|
|
|
) -> Optional[AggregateStatistic]: |
47
|
|
|
""" |
48
|
|
|
Convert a aggregate statistic string to an actual AggregateStatistic |
49
|
|
|
instance. |
50
|
|
|
|
51
|
|
|
Arguments: |
52
|
|
|
aggregate_statistic: Aggregate statistic string to convert to a |
53
|
|
|
AggregateStatistic |
54
|
|
|
""" |
55
|
|
|
if not aggregate_statistic: |
56
|
|
|
return None |
57
|
|
|
|
58
|
|
|
try: |
59
|
|
|
return AggregateStatistic[aggregate_statistic.upper()] |
60
|
|
|
except KeyError: |
61
|
|
|
raise InvalidArgument( |
62
|
|
|
argument='aggregate_statistic', |
63
|
|
|
function=get_aggregate_statistic_from_string.__name__, |
64
|
|
|
) from None |
65
|
|
|
|
66
|
|
|
|
67
|
|
|
class SortOrder(Enum): |
68
|
|
|
"""Enum for sort order""" |
69
|
|
|
|
70
|
|
|
ASCENDING = "ascending" |
71
|
|
|
DESCENDING = "descending" |
72
|
|
|
|
73
|
|
|
|
74
|
|
|
def get_sort_order_from_string( |
75
|
|
|
sort_order: Optional[str], |
76
|
|
|
) -> Optional[SortOrder]: |
77
|
|
|
""" |
78
|
|
|
Convert a sort order string to an actual SortOrder instance. |
79
|
|
|
|
80
|
|
|
Arguments: |
81
|
|
|
sort_order: Sort order string to convert to a SortOrder |
82
|
|
|
""" |
83
|
|
|
if not sort_order: |
84
|
|
|
return None |
85
|
|
|
|
86
|
|
|
try: |
87
|
|
|
return SortOrder[sort_order.upper()] |
88
|
|
|
except KeyError: |
89
|
|
|
raise InvalidArgument( |
90
|
|
|
argument='sort_order', function=get_sort_order_from_string.__name__ |
91
|
|
|
) from None |
92
|
|
|
|
93
|
|
|
|
94
|
|
|
class AggregatesMixin: |
95
|
|
|
def get_aggregates( |
96
|
|
|
self, |
97
|
|
|
resource_type: EntityType, |
98
|
|
|
*, |
99
|
|
|
filter_string: Optional[str] = None, |
100
|
|
|
filter_id: Optional[str] = None, |
101
|
|
|
sort_criteria: Optional[list] = None, |
102
|
|
|
data_columns: Optional[list] = None, |
103
|
|
|
group_column: Optional[str] = None, |
104
|
|
|
subgroup_column: Optional[str] = None, |
105
|
|
|
text_columns: Optional[list] = None, |
106
|
|
|
first_group: Optional[int] = None, |
107
|
|
|
max_groups: Optional[int] = None, |
108
|
|
|
mode: Optional[int] = None, |
109
|
|
|
**kwargs, |
110
|
|
|
) -> Any: |
111
|
|
|
"""Request aggregated information on a resource / entity type |
112
|
|
|
|
113
|
|
|
Additional arguments can be set via the kwargs parameter for backward |
114
|
|
|
compatibility with older versions of python-gvm, but are not validated. |
115
|
|
|
|
116
|
|
|
Arguments: |
117
|
|
|
resource_type: The entity type to gather data from |
118
|
|
|
filter_string: Filter term to use for the query |
119
|
|
|
filter_id: UUID of an existing filter to use for the query |
120
|
|
|
sort_criteria: List of sort criteria (dicts that can contain |
121
|
|
|
a field, stat and order) |
122
|
|
|
data_columns: List of fields to aggregate data from |
123
|
|
|
group_column: The field to group the entities by |
124
|
|
|
subgroup_column: The field to further group the entities |
125
|
|
|
inside groups by |
126
|
|
|
text_columns: List of simple text columns which no statistics |
127
|
|
|
are calculated for |
128
|
|
|
first_group: The index of the first aggregate group to return |
129
|
|
|
max_groups: The maximum number of aggregate groups to return, |
130
|
|
|
-1 for all |
131
|
|
|
mode: Special mode for aggregation |
132
|
|
|
|
133
|
|
|
Returns: |
134
|
|
|
The response. See :py:meth:`send_command` for details. |
135
|
|
|
""" |
136
|
|
|
if not resource_type: |
137
|
|
|
raise RequiredArgument( |
138
|
|
|
function=self.get_aggregates.__name__, argument='resource_type' |
139
|
|
|
) |
140
|
|
|
|
141
|
|
|
if not isinstance(resource_type, EntityType): |
142
|
|
|
raise InvalidArgumentType( |
143
|
|
|
function=self.get_aggregates.__name__, |
144
|
|
|
argument='resource_type', |
145
|
|
|
arg_type=EntityType.__name__, |
146
|
|
|
) |
147
|
|
|
|
148
|
|
|
cmd = XmlCommand('get_aggregates') |
149
|
|
|
|
150
|
|
|
_actual_resource_type = resource_type |
151
|
|
|
if resource_type.value == EntityType.AUDIT.value: |
152
|
|
|
_actual_resource_type = EntityType.TASK |
153
|
|
|
cmd.set_attribute('usage_type', 'audit') |
154
|
|
|
elif resource_type.value == EntityType.POLICY.value: |
155
|
|
|
_actual_resource_type = EntityType.SCAN_CONFIG |
156
|
|
|
cmd.set_attribute('usage_type', 'policy') |
157
|
|
|
elif resource_type.value == EntityType.SCAN_CONFIG.value: |
158
|
|
|
cmd.set_attribute('usage_type', 'scan') |
159
|
|
|
elif resource_type.value == EntityType.TASK.value: |
160
|
|
|
cmd.set_attribute('usage_type', 'scan') |
161
|
|
|
cmd.set_attribute('type', _actual_resource_type.value) |
162
|
|
|
|
163
|
|
|
add_filter(cmd, filter_string, filter_id) |
164
|
|
|
|
165
|
|
|
if first_group is not None: |
166
|
|
|
if not isinstance(first_group, int): |
167
|
|
|
raise InvalidArgumentType( |
168
|
|
|
function=self.get_aggregates.__name__, |
169
|
|
|
argument='first_group', |
170
|
|
|
arg_type=int.__name__, |
171
|
|
|
) |
172
|
|
|
cmd.set_attribute('first_group', str(first_group)) |
173
|
|
|
|
174
|
|
|
if max_groups is not None: |
175
|
|
|
if not isinstance(max_groups, int): |
176
|
|
|
raise InvalidArgumentType( |
177
|
|
|
function=self.get_aggregates.__name__, |
178
|
|
|
argument='max_groups', |
179
|
|
|
arg_type=int.__name__, |
180
|
|
|
) |
181
|
|
|
cmd.set_attribute('max_groups', str(max_groups)) |
182
|
|
|
|
183
|
|
|
if sort_criteria is not None: |
184
|
|
|
if not isinstance(sort_criteria, list): |
185
|
|
|
raise InvalidArgumentType( |
186
|
|
|
function=self.get_aggregates.__name__, |
187
|
|
|
argument='sort_criteria', |
188
|
|
|
arg_type=list.__name__, |
189
|
|
|
) |
190
|
|
|
for sort in sort_criteria: |
191
|
|
|
if not isinstance(sort, dict): |
192
|
|
|
raise InvalidArgumentType( |
193
|
|
|
function=self.get_aggregates.__name__, |
194
|
|
|
argument='sort_criteria', |
195
|
|
|
) |
196
|
|
|
|
197
|
|
|
sort_elem = cmd.add_element('sort') |
198
|
|
|
if sort.get('field'): |
199
|
|
|
sort_elem.set_attribute('field', sort.get('field')) |
200
|
|
|
|
201
|
|
|
if sort.get('stat'): |
202
|
|
|
if isinstance(sort['stat'], AggregateStatistic): |
203
|
|
|
sort_elem.set_attribute('stat', sort['stat'].value) |
204
|
|
|
else: |
205
|
|
|
stat = get_aggregate_statistic_from_string(sort['stat']) |
206
|
|
|
sort_elem.set_attribute('stat', stat.value) |
207
|
|
|
|
208
|
|
|
if sort.get('order'): |
209
|
|
|
if isinstance(sort['order'], SortOrder): |
210
|
|
|
sort_elem.set_attribute('order', sort['order'].value) |
211
|
|
|
else: |
212
|
|
|
so = get_sort_order_from_string(sort['order']) |
213
|
|
|
sort_elem.set_attribute('order', so.value) |
214
|
|
|
|
215
|
|
|
if data_columns is not None: |
216
|
|
|
if not isinstance(data_columns, list): |
217
|
|
|
raise InvalidArgumentType( |
218
|
|
|
function=self.get_aggregates.__name__, |
219
|
|
|
argument='data_columns', |
220
|
|
|
arg_type=list.__name__, |
221
|
|
|
) |
222
|
|
|
for column in data_columns: |
223
|
|
|
cmd.add_element('data_column', column) |
224
|
|
|
|
225
|
|
|
if group_column is not None: |
226
|
|
|
cmd.set_attribute('group_column', group_column) |
227
|
|
|
|
228
|
|
|
if subgroup_column is not None: |
229
|
|
|
if not group_column: |
230
|
|
|
raise RequiredArgument( |
231
|
|
|
'{} requires a group_column argument' |
232
|
|
|
' if subgroup_column is given'.format( |
233
|
|
|
self.get_aggregates.__name__ |
234
|
|
|
), |
235
|
|
|
function=self.get_aggregates.__name__, |
236
|
|
|
argument='subgroup_column', |
237
|
|
|
) |
238
|
|
|
cmd.set_attribute('subgroup_column', subgroup_column) |
239
|
|
|
|
240
|
|
|
if text_columns is not None: |
241
|
|
|
if not isinstance(text_columns, list): |
242
|
|
|
raise InvalidArgumentType( |
243
|
|
|
function=self.get_aggregates.__name__, |
244
|
|
|
argument='text_columns', |
245
|
|
|
arg_type=list.__name__, |
246
|
|
|
) |
247
|
|
|
for column in text_columns: |
248
|
|
|
cmd.add_element('text_column', column) |
249
|
|
|
|
250
|
|
|
if mode is not None: |
251
|
|
|
cmd.set_attribute('mode', mode) |
252
|
|
|
|
253
|
|
|
# Add additional keyword args as attributes for backward compatibility. |
254
|
|
|
cmd.set_attributes(kwargs) |
255
|
|
|
|
256
|
|
|
return self._send_xml_command(cmd) |
257
|
|
|
|