|
1
|
|
|
# -*- coding: utf-8 -*- |
|
2
|
|
|
# Copyright (C) 2021 Greenbone Networks GmbH |
|
3
|
|
|
# |
|
4
|
|
|
# SPDX-License-Identifier: GPL-3.0-or-later |
|
5
|
|
|
# |
|
6
|
|
|
# This program is free software: you can redistribute it and/or modify |
|
7
|
|
|
# it under the terms of the GNU General Public License as published by |
|
8
|
|
|
# the Free Software Foundation, either version 3 of the License, or |
|
9
|
|
|
# (at your option) any later version. |
|
10
|
|
|
# |
|
11
|
|
|
# This program is distributed in the hope that it will be useful, |
|
12
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
13
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
14
|
|
|
# GNU General Public License for more details. |
|
15
|
|
|
# |
|
16
|
|
|
# You should have received a copy of the GNU General Public License |
|
17
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
18
|
|
|
|
|
19
|
|
|
from uuid import UUID |
|
20
|
|
|
from typing import List, Tuple |
|
21
|
|
|
from datetime import date |
|
22
|
|
|
from argparse import ArgumentParser, Namespace, RawTextHelpFormatter |
|
23
|
|
|
from lxml import etree as e |
|
24
|
|
|
from gvm.protocols.gmp import Gmp |
|
25
|
|
|
|
|
26
|
|
|
from gvmtools.helper import generate_uuid, error_and_exit |
|
27
|
|
|
|
|
28
|
|
|
HELP_TEXT = ( |
|
29
|
|
|
'This script creates a consolidated report and imports it to the GSM.' |
|
30
|
|
|
' Usable with gvm-script (gvm-tools)' |
|
31
|
|
|
) |
|
32
|
|
|
|
|
33
|
|
|
|
|
34
|
|
|
def parse_tags(tags: List[str]) -> List[str]: |
|
35
|
|
|
"""Parsing and validating the given tags |
|
36
|
|
|
|
|
37
|
|
|
tags (List): A list containing tags: |
|
38
|
|
|
name, tag-id, name=value |
|
39
|
|
|
|
|
40
|
|
|
Returns a list containing tag="name", tag_id="id" ... |
|
41
|
|
|
""" |
|
42
|
|
|
filter_tags = [] |
|
43
|
|
|
for tag in tags: |
|
44
|
|
|
try: |
|
45
|
|
|
UUID(tag, version=4) |
|
46
|
|
|
filter_tags.append('tag_id="{}"'.format(tag)) |
|
47
|
|
|
except ValueError: |
|
48
|
|
|
filter_tags.append('tag="{}"'.format(tag)) |
|
49
|
|
|
|
|
50
|
|
|
return filter_tags |
|
51
|
|
|
|
|
52
|
|
|
|
|
53
|
|
|
def parse_period(period: List[str]) -> Tuple[date, date]: |
|
54
|
|
|
"""Parsing and validating the given time period |
|
55
|
|
|
|
|
56
|
|
|
period (List): A list with two entries containing |
|
57
|
|
|
dates in the format yyyy/mm/dd |
|
58
|
|
|
|
|
59
|
|
|
Returns two date-objects containing the passed dates |
|
60
|
|
|
""" |
|
61
|
|
|
try: |
|
62
|
|
|
s_year, s_month, s_day = map(int, period[0].split('/')) |
|
63
|
|
|
except ValueError as e: |
|
64
|
|
|
error_and_exit( |
|
65
|
|
|
'Start date [{}] is not a correct date format:\n{}'.format( |
|
66
|
|
|
period[0], e.args[0] |
|
67
|
|
|
) |
|
68
|
|
|
) |
|
69
|
|
|
try: |
|
70
|
|
|
e_year, e_month, e_day = map(int, period[1].split('/')) |
|
71
|
|
|
except ValueError as e: |
|
72
|
|
|
error_and_exit( |
|
73
|
|
|
'End date [{}] is not a correct date format:\n{}'.format( |
|
74
|
|
|
period[1], e.args[0] |
|
75
|
|
|
) |
|
76
|
|
|
) |
|
77
|
|
|
|
|
78
|
|
|
try: |
|
79
|
|
|
period_start = date(s_year, s_month, s_day) |
|
80
|
|
|
except ValueError as e: |
|
81
|
|
|
error_and_exit('Start date: {}'.format(e.args[0])) |
|
82
|
|
|
|
|
83
|
|
|
try: |
|
84
|
|
|
period_end = date(e_year, e_month, e_day) |
|
85
|
|
|
except ValueError as e: |
|
86
|
|
|
error_and_exit('End date: {}'.format(e.args[0])) |
|
87
|
|
|
|
|
88
|
|
|
if period_end < period_start: |
|
89
|
|
|
error_and_exit('The start date seems to after the end date.') |
|
90
|
|
|
|
|
91
|
|
|
return period_start, period_end |
|
92
|
|
|
|
|
93
|
|
|
|
|
94
|
|
|
def parse_args(args: Namespace) -> Namespace: # pylint: disable=unused-argument |
|
95
|
|
|
""" Parsing args ... """ |
|
96
|
|
|
|
|
97
|
|
|
parser = ArgumentParser( |
|
98
|
|
|
prefix_chars='+', |
|
99
|
|
|
add_help=False, |
|
100
|
|
|
formatter_class=RawTextHelpFormatter, |
|
101
|
|
|
description=HELP_TEXT, |
|
102
|
|
|
) |
|
103
|
|
|
|
|
104
|
|
|
parser.add_argument( |
|
105
|
|
|
'+h', |
|
106
|
|
|
'++help', |
|
107
|
|
|
action='help', |
|
108
|
|
|
help='Show this help message and exit.', |
|
109
|
|
|
) |
|
110
|
|
|
|
|
111
|
|
|
parser.add_argument( |
|
112
|
|
|
'+p', |
|
113
|
|
|
'++period', |
|
114
|
|
|
nargs=2, |
|
115
|
|
|
type=str, |
|
116
|
|
|
required=True, |
|
117
|
|
|
dest='period', |
|
118
|
|
|
help=( |
|
119
|
|
|
'Choose a time period that is filtering the tasks.\n' |
|
120
|
|
|
'Use the date format YYYY/MM/DD.' |
|
121
|
|
|
), |
|
122
|
|
|
) |
|
123
|
|
|
|
|
124
|
|
|
parser.add_argument( |
|
125
|
|
|
'+t', |
|
126
|
|
|
'++tags', |
|
127
|
|
|
nargs='+', |
|
128
|
|
|
type=str, |
|
129
|
|
|
dest='tags', |
|
130
|
|
|
help=( |
|
131
|
|
|
'Filter the tasks by given tag(s).\n' |
|
132
|
|
|
'If you pass more than on tag, they will be concatenated with ' |
|
133
|
|
|
or '\n' |
|
134
|
|
|
'You can pass tag names, tag ids or tag name=value to this argument' |
|
135
|
|
|
), |
|
136
|
|
|
) |
|
137
|
|
|
|
|
138
|
|
|
parser.add_argument( |
|
139
|
|
|
'+f', |
|
140
|
|
|
'++filter', |
|
141
|
|
|
nargs='+', |
|
142
|
|
|
type=str, |
|
143
|
|
|
dest='filter', |
|
144
|
|
|
help='Filter the results by given filter(s).', |
|
145
|
|
|
) |
|
146
|
|
|
|
|
147
|
|
|
script_args, _ = parser.parse_known_args() |
|
148
|
|
|
return script_args |
|
149
|
|
|
|
|
150
|
|
|
|
|
151
|
|
|
def generate_task_filter( |
|
152
|
|
|
period_start: date, period_end: date, tags: List[str] |
|
153
|
|
|
) -> str: |
|
154
|
|
|
"""Generate the tasks filter |
|
155
|
|
|
|
|
156
|
|
|
period_start: the start date |
|
157
|
|
|
period_end: the end date |
|
158
|
|
|
tags: list of tags for the filter |
|
159
|
|
|
|
|
160
|
|
|
Returns an task filter string |
|
161
|
|
|
""" |
|
162
|
|
|
task_filter = 'rows=-1 ' |
|
163
|
|
|
period_filter = 'created>{0} and created<{1}'.format( |
|
164
|
|
|
period_start.isoformat(), period_end.isoformat() |
|
165
|
|
|
) |
|
166
|
|
|
filter_parts = [] |
|
167
|
|
|
if tags: |
|
168
|
|
|
for tag in tags: |
|
169
|
|
|
filter_parts.append('{} and {}'.format(period_filter, tag)) |
|
170
|
|
|
|
|
171
|
|
|
tags_filter = ' or '.join(filter_parts) |
|
172
|
|
|
task_filter += tags_filter |
|
173
|
|
|
else: |
|
174
|
|
|
task_filter += period_filter |
|
175
|
|
|
|
|
176
|
|
|
return task_filter |
|
177
|
|
|
|
|
178
|
|
|
|
|
179
|
|
|
def get_last_reports_from_tasks(gmp: Gmp, task_filter: str) -> List[str]: |
|
180
|
|
|
"""Get the last reports from the tasks in the given time period |
|
181
|
|
|
|
|
182
|
|
|
gmp: the GMP object |
|
183
|
|
|
task_filter: task filter string |
|
184
|
|
|
|
|
185
|
|
|
""" |
|
186
|
|
|
|
|
187
|
|
|
print('Filtering the task with the filter term [{}]'.format(task_filter)) |
|
188
|
|
|
|
|
189
|
|
|
tasks_xml = gmp.get_tasks(filter=task_filter) |
|
190
|
|
|
reports = [] |
|
191
|
|
|
for report in tasks_xml.xpath('task/last_report/report/@id'): |
|
192
|
|
|
reports.append(str(report)) |
|
193
|
|
|
|
|
194
|
|
|
# remove duplicates ... just in case |
|
195
|
|
|
reports = list(dict.fromkeys(reports)) |
|
196
|
|
|
|
|
197
|
|
|
return reports |
|
198
|
|
|
|
|
199
|
|
|
|
|
200
|
|
|
def combine_reports( |
|
201
|
|
|
gmp: Gmp, reports: List[str], filter_term: str |
|
202
|
|
|
) -> e.Element: |
|
203
|
|
|
"""Combining the filtered ports, results and hosts of the given |
|
204
|
|
|
report ids into one new report. |
|
205
|
|
|
|
|
206
|
|
|
gmp: the GMP object |
|
207
|
|
|
reports (List): List of report_ids |
|
208
|
|
|
filter_term (str): the result filter string |
|
209
|
|
|
""" |
|
210
|
|
|
|
|
211
|
|
|
new_uuid = generate_uuid() |
|
212
|
|
|
combined_report = e.Element( |
|
213
|
|
|
'report', |
|
214
|
|
|
{ |
|
215
|
|
|
'id': new_uuid, |
|
216
|
|
|
'format_id': 'd5da9f67-8551-4e51-807b-b6a873d70e34', |
|
217
|
|
|
'extension': 'xml', |
|
218
|
|
|
'content_type': 'text/xml', |
|
219
|
|
|
}, |
|
220
|
|
|
) |
|
221
|
|
|
report_elem = e.Element('report', {'id': new_uuid}) |
|
222
|
|
|
|
|
223
|
|
|
ports_elem = e.Element('ports', {'start': '1', 'max': '-1'}) |
|
224
|
|
|
results_elem = e.Element('results', {'start': '1', 'max': '-1'}) |
|
225
|
|
|
combined_report.append(report_elem) |
|
226
|
|
|
report_elem.append(ports_elem) |
|
227
|
|
|
report_elem.append(results_elem) |
|
228
|
|
|
|
|
229
|
|
|
for report in reports: |
|
230
|
|
|
current_report = gmp.get_report( |
|
231
|
|
|
report, filter=filter_term, details=True |
|
232
|
|
|
)[0] |
|
233
|
|
|
for port in current_report.xpath('report/ports/port'): |
|
234
|
|
|
ports_elem.append(port) |
|
235
|
|
|
for result in current_report.xpath('report/results/result'): |
|
236
|
|
|
results_elem.append(result) |
|
237
|
|
|
for host in current_report.xpath('host'): |
|
238
|
|
|
report_elem.append(host) |
|
239
|
|
|
|
|
240
|
|
|
return combined_report |
|
241
|
|
|
|
|
242
|
|
|
|
|
243
|
|
|
def send_report( |
|
244
|
|
|
gmp: Gmp, combined_report: e.Element, period_start: date, period_end: date |
|
245
|
|
|
) -> str: |
|
246
|
|
|
"""Creating a container task and sending the combined report to the GSM |
|
247
|
|
|
|
|
248
|
|
|
gmp: the GMP object |
|
249
|
|
|
combined_report: the combined report xml object |
|
250
|
|
|
period_start: the start date |
|
251
|
|
|
period_end: the end date |
|
252
|
|
|
""" |
|
253
|
|
|
|
|
254
|
|
|
task_name = 'Consolidated Report [{} - {}]'.format(period_start, period_end) |
|
255
|
|
|
|
|
256
|
|
|
res = gmp.create_container_task( |
|
257
|
|
|
name=task_name, comment='Created with gvm-tools.' |
|
258
|
|
|
) |
|
259
|
|
|
|
|
260
|
|
|
task_id = res.xpath('//@id')[0] |
|
261
|
|
|
|
|
262
|
|
|
combined_report = e.tostring(combined_report) |
|
263
|
|
|
|
|
264
|
|
|
res = gmp.import_report(combined_report, task_id=task_id) |
|
265
|
|
|
|
|
266
|
|
|
return res.xpath('//@id')[0] |
|
267
|
|
|
|
|
268
|
|
|
|
|
269
|
|
|
def main(gmp: Gmp, args: Namespace) -> None: |
|
270
|
|
|
# pylint: disable=undefined-variable |
|
271
|
|
|
|
|
272
|
|
|
parsed_args = parse_args(args=args) |
|
273
|
|
|
|
|
274
|
|
|
period_start, period_end = parse_period(period=parsed_args.period) |
|
275
|
|
|
|
|
276
|
|
|
print( |
|
277
|
|
|
'Combining reports from tasks within the time period [{}, {}]'.format( |
|
278
|
|
|
period_start, period_end |
|
279
|
|
|
) |
|
280
|
|
|
) |
|
281
|
|
|
|
|
282
|
|
|
filter_tags = None |
|
283
|
|
|
if parsed_args.tags: |
|
284
|
|
|
filter_tags = parse_tags(tags=parsed_args.tags) |
|
285
|
|
|
|
|
286
|
|
|
# Generate Task Filter |
|
287
|
|
|
task_filter = generate_task_filter( |
|
288
|
|
|
period_start=period_start, |
|
289
|
|
|
period_end=period_end, |
|
290
|
|
|
tags=filter_tags, |
|
291
|
|
|
) |
|
292
|
|
|
|
|
293
|
|
|
# Find reports |
|
294
|
|
|
reports = get_last_reports_from_tasks(gmp=gmp, task_filter=task_filter) |
|
295
|
|
|
|
|
296
|
|
|
print("Combining {} found reports.".format(len(reports))) |
|
297
|
|
|
|
|
298
|
|
|
filter_term = '' |
|
299
|
|
|
if parsed_args.filter: |
|
300
|
|
|
filter_term = ' '.join(parsed_args.filter) |
|
301
|
|
|
print( |
|
302
|
|
|
'Filtering the results by the following filter term [{}]'.format( |
|
303
|
|
|
filter_term |
|
304
|
|
|
) |
|
305
|
|
|
) |
|
306
|
|
|
else: |
|
307
|
|
|
print('No result filter given.') |
|
308
|
|
|
|
|
309
|
|
|
# Combine the reports |
|
310
|
|
|
combined_report = combine_reports( |
|
311
|
|
|
gmp=gmp, reports=reports, filter_term=filter_term |
|
312
|
|
|
) |
|
313
|
|
|
|
|
314
|
|
|
# Import the generated report to GSM |
|
315
|
|
|
report = send_report( |
|
316
|
|
|
gmp=gmp, |
|
317
|
|
|
combined_report=combined_report, |
|
318
|
|
|
period_start=period_start, |
|
319
|
|
|
period_end=period_end, |
|
320
|
|
|
) |
|
321
|
|
|
|
|
322
|
|
|
print("Successfully imported new consolidated report [{}]".format(report)) |
|
323
|
|
|
|
|
324
|
|
|
|
|
325
|
|
|
if __name__ == '__gmp__': |
|
326
|
|
|
main(gmp, args) |
|
|
|
|
|
|
327
|
|
|
|