1
|
|
|
# -*- coding: utf-8 -*- |
2
|
|
|
# Copyright (C) 2021 Greenbone Networks GmbH |
3
|
|
|
# |
4
|
|
|
# SPDX-License-Identifier: GPL-3.0-or-later |
5
|
|
|
# |
6
|
|
|
# This program is free software: you can redistribute it and/or modify |
7
|
|
|
# it under the terms of the GNU General Public License as published by |
8
|
|
|
# the Free Software Foundation, either version 3 of the License, or |
9
|
|
|
# (at your option) any later version. |
10
|
|
|
# |
11
|
|
|
# This program is distributed in the hope that it will be useful, |
12
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
13
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
14
|
|
|
# GNU General Public License for more details. |
15
|
|
|
# |
16
|
|
|
# You should have received a copy of the GNU General Public License |
17
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
18
|
|
|
|
19
|
|
|
from uuid import UUID |
20
|
|
|
from typing import List, Tuple |
21
|
|
|
from datetime import date |
22
|
|
|
from argparse import ArgumentParser, Namespace, RawTextHelpFormatter |
23
|
|
|
from lxml import etree as e |
24
|
|
|
from gvm.protocols.gmp import Gmp |
25
|
|
|
|
26
|
|
|
from gvmtools.helper import generate_uuid, error_and_exit |
27
|
|
|
|
28
|
|
|
HELP_TEXT = ( |
29
|
|
|
'This script creates a consolidated report and imports it to the GSM.' |
30
|
|
|
' Usable with gvm-script (gvm-tools)' |
31
|
|
|
) |
32
|
|
|
|
33
|
|
|
|
34
|
|
|
def parse_tags(tags: List[str]) -> List[str]: |
35
|
|
|
"""Parsing and validating the given tags |
36
|
|
|
|
37
|
|
|
tags (List): A list containing tags: |
38
|
|
|
name, tag-id, name=value |
39
|
|
|
|
40
|
|
|
Returns a list containing tag="name", tag_id="id" ... |
41
|
|
|
""" |
42
|
|
|
filter_tags = [] |
43
|
|
|
for tag in tags: |
44
|
|
|
try: |
45
|
|
|
UUID(tag, version=4) |
46
|
|
|
filter_tags.append('tag_id="{}"'.format(tag)) |
47
|
|
|
except ValueError: |
48
|
|
|
filter_tags.append('tag="{}"'.format(tag)) |
49
|
|
|
|
50
|
|
|
return filter_tags |
51
|
|
|
|
52
|
|
|
|
53
|
|
|
def parse_period(period: List[str]) -> Tuple[date, date]: |
54
|
|
|
"""Parsing and validating the given time period |
55
|
|
|
|
56
|
|
|
period (List): A list with two entries containing |
57
|
|
|
dates in the format yyyy/mm/dd |
58
|
|
|
|
59
|
|
|
Returns two date-objects containing the passed dates |
60
|
|
|
""" |
61
|
|
|
try: |
62
|
|
|
s_year, s_month, s_day = map(int, period[0].split('/')) |
63
|
|
|
except ValueError as e: |
64
|
|
|
error_and_exit( |
65
|
|
|
'Start date [{}] is not a correct date format:\n{}'.format( |
66
|
|
|
period[0], e.args[0] |
67
|
|
|
) |
68
|
|
|
) |
69
|
|
|
try: |
70
|
|
|
e_year, e_month, e_day = map(int, period[1].split('/')) |
71
|
|
|
except ValueError as e: |
72
|
|
|
error_and_exit( |
73
|
|
|
'End date [{}] is not a correct date format:\n{}'.format( |
74
|
|
|
period[1], e.args[0] |
75
|
|
|
) |
76
|
|
|
) |
77
|
|
|
|
78
|
|
|
try: |
79
|
|
|
period_start = date(s_year, s_month, s_day) |
80
|
|
|
except ValueError as e: |
81
|
|
|
error_and_exit('Start date: {}'.format(e.args[0])) |
82
|
|
|
|
83
|
|
|
try: |
84
|
|
|
period_end = date(e_year, e_month, e_day) |
85
|
|
|
except ValueError as e: |
86
|
|
|
error_and_exit('End date: {}'.format(e.args[0])) |
87
|
|
|
|
88
|
|
|
if period_end < period_start: |
89
|
|
|
error_and_exit('The start date seems to after the end date.') |
90
|
|
|
|
91
|
|
|
return period_start, period_end |
92
|
|
|
|
93
|
|
|
|
94
|
|
|
def parse_args(args: Namespace) -> Namespace: # pylint: disable=unused-argument |
95
|
|
|
""" Parsing args ... """ |
96
|
|
|
|
97
|
|
|
parser = ArgumentParser( |
98
|
|
|
prefix_chars='+', |
99
|
|
|
add_help=False, |
100
|
|
|
formatter_class=RawTextHelpFormatter, |
101
|
|
|
description=HELP_TEXT, |
102
|
|
|
) |
103
|
|
|
|
104
|
|
|
parser.add_argument( |
105
|
|
|
'+h', |
106
|
|
|
'++help', |
107
|
|
|
action='help', |
108
|
|
|
help='Show this help message and exit.', |
109
|
|
|
) |
110
|
|
|
|
111
|
|
|
parser.add_argument( |
112
|
|
|
'+p', |
113
|
|
|
'++period', |
114
|
|
|
nargs=2, |
115
|
|
|
type=str, |
116
|
|
|
required=True, |
117
|
|
|
dest='period', |
118
|
|
|
help=( |
119
|
|
|
'Choose a time period that is filtering the tasks.\n' |
120
|
|
|
'Use the date format YYYY/MM/DD.' |
121
|
|
|
), |
122
|
|
|
) |
123
|
|
|
|
124
|
|
|
parser.add_argument( |
125
|
|
|
'+t', |
126
|
|
|
'++tags', |
127
|
|
|
nargs='+', |
128
|
|
|
type=str, |
129
|
|
|
dest='tags', |
130
|
|
|
help=( |
131
|
|
|
'Filter the tasks by given tag(s).\n' |
132
|
|
|
'If you pass more than on tag, they will be concatenated with ' |
133
|
|
|
or '\n' |
134
|
|
|
'You can pass tag names, tag ids or tag name=value to this argument' |
135
|
|
|
), |
136
|
|
|
) |
137
|
|
|
|
138
|
|
|
parser.add_argument( |
139
|
|
|
'+f', |
140
|
|
|
'++filter', |
141
|
|
|
nargs='+', |
142
|
|
|
type=str, |
143
|
|
|
dest='filter', |
144
|
|
|
help='Filter the results by given filter(s).', |
145
|
|
|
) |
146
|
|
|
|
147
|
|
|
script_args, _ = parser.parse_known_args() |
148
|
|
|
return script_args |
149
|
|
|
|
150
|
|
|
|
151
|
|
|
def generate_task_filter( |
152
|
|
|
period_start: date, period_end: date, tags: List[str] |
153
|
|
|
) -> str: |
154
|
|
|
"""Generate the tasks filter |
155
|
|
|
|
156
|
|
|
period_start: the start date |
157
|
|
|
period_end: the end date |
158
|
|
|
tags: list of tags for the filter |
159
|
|
|
|
160
|
|
|
Returns an task filter string |
161
|
|
|
""" |
162
|
|
|
task_filter = 'rows=-1 ' |
163
|
|
|
period_filter = 'created>{0} and created<{1}'.format( |
164
|
|
|
period_start.isoformat(), period_end.isoformat() |
165
|
|
|
) |
166
|
|
|
filter_parts = [] |
167
|
|
|
if tags: |
168
|
|
|
for tag in tags: |
169
|
|
|
filter_parts.append('{} and {}'.format(period_filter, tag)) |
170
|
|
|
|
171
|
|
|
tags_filter = ' or '.join(filter_parts) |
172
|
|
|
task_filter += tags_filter |
173
|
|
|
else: |
174
|
|
|
task_filter += period_filter |
175
|
|
|
|
176
|
|
|
return task_filter |
177
|
|
|
|
178
|
|
|
|
179
|
|
|
def get_last_reports_from_tasks(gmp: Gmp, task_filter: str) -> List[str]: |
180
|
|
|
"""Get the last reports from the tasks in the given time period |
181
|
|
|
|
182
|
|
|
gmp: the GMP object |
183
|
|
|
task_filter: task filter string |
184
|
|
|
|
185
|
|
|
""" |
186
|
|
|
|
187
|
|
|
print('Filtering the task with the filter term [{}]'.format(task_filter)) |
188
|
|
|
|
189
|
|
|
tasks_xml = gmp.get_tasks(filter=task_filter) |
190
|
|
|
reports = [] |
191
|
|
|
for report in tasks_xml.xpath('task/last_report/report/@id'): |
192
|
|
|
reports.append(str(report)) |
193
|
|
|
|
194
|
|
|
# remove duplicates ... just in case |
195
|
|
|
reports = list(dict.fromkeys(reports)) |
196
|
|
|
|
197
|
|
|
return reports |
198
|
|
|
|
199
|
|
|
|
200
|
|
|
def combine_reports( |
201
|
|
|
gmp: Gmp, reports: List[str], filter_term: str |
202
|
|
|
) -> e.Element: |
203
|
|
|
"""Combining the filtered ports, results and hosts of the given |
204
|
|
|
report ids into one new report. |
205
|
|
|
|
206
|
|
|
gmp: the GMP object |
207
|
|
|
reports (List): List of report_ids |
208
|
|
|
filter_term (str): the result filter string |
209
|
|
|
""" |
210
|
|
|
|
211
|
|
|
new_uuid = generate_uuid() |
212
|
|
|
combined_report = e.Element( |
213
|
|
|
'report', |
214
|
|
|
{ |
215
|
|
|
'id': new_uuid, |
216
|
|
|
'format_id': 'd5da9f67-8551-4e51-807b-b6a873d70e34', |
217
|
|
|
'extension': 'xml', |
218
|
|
|
'content_type': 'text/xml', |
219
|
|
|
}, |
220
|
|
|
) |
221
|
|
|
report_elem = e.Element('report', {'id': new_uuid}) |
222
|
|
|
|
223
|
|
|
ports_elem = e.Element('ports', {'start': '1', 'max': '-1'}) |
224
|
|
|
results_elem = e.Element('results', {'start': '1', 'max': '-1'}) |
225
|
|
|
combined_report.append(report_elem) |
226
|
|
|
report_elem.append(ports_elem) |
227
|
|
|
report_elem.append(results_elem) |
228
|
|
|
|
229
|
|
|
for report in reports: |
230
|
|
|
current_report = gmp.get_report( |
231
|
|
|
report, filter=filter_term, details=True |
232
|
|
|
)[0] |
233
|
|
|
for port in current_report.xpath('report/ports/port'): |
234
|
|
|
ports_elem.append(port) |
235
|
|
|
for result in current_report.xpath('report/results/result'): |
236
|
|
|
results_elem.append(result) |
237
|
|
|
for host in current_report.xpath('host'): |
238
|
|
|
report_elem.append(host) |
239
|
|
|
|
240
|
|
|
return combined_report |
241
|
|
|
|
242
|
|
|
|
243
|
|
|
def send_report( |
244
|
|
|
gmp: Gmp, combined_report: e.Element, period_start: date, period_end: date |
245
|
|
|
) -> str: |
246
|
|
|
"""Creating a container task and sending the combined report to the GSM |
247
|
|
|
|
248
|
|
|
gmp: the GMP object |
249
|
|
|
combined_report: the combined report xml object |
250
|
|
|
period_start: the start date |
251
|
|
|
period_end: the end date |
252
|
|
|
""" |
253
|
|
|
|
254
|
|
|
task_name = 'Consolidated Report [{} - {}]'.format(period_start, period_end) |
255
|
|
|
|
256
|
|
|
res = gmp.create_container_task( |
257
|
|
|
name=task_name, comment='Created with gvm-tools.' |
258
|
|
|
) |
259
|
|
|
|
260
|
|
|
task_id = res.xpath('//@id')[0] |
261
|
|
|
|
262
|
|
|
combined_report = e.tostring(combined_report) |
263
|
|
|
|
264
|
|
|
res = gmp.import_report(combined_report, task_id=task_id) |
265
|
|
|
|
266
|
|
|
return res.xpath('//@id')[0] |
267
|
|
|
|
268
|
|
|
|
269
|
|
|
def main(gmp: Gmp, args: Namespace) -> None: |
270
|
|
|
# pylint: disable=undefined-variable |
271
|
|
|
|
272
|
|
|
parsed_args = parse_args(args=args) |
273
|
|
|
|
274
|
|
|
period_start, period_end = parse_period(period=parsed_args.period) |
275
|
|
|
|
276
|
|
|
print( |
277
|
|
|
'Combining reports from tasks within the time period [{}, {}]'.format( |
278
|
|
|
period_start, period_end |
279
|
|
|
) |
280
|
|
|
) |
281
|
|
|
|
282
|
|
|
filter_tags = None |
283
|
|
|
if parsed_args.tags: |
284
|
|
|
filter_tags = parse_tags(tags=parsed_args.tags) |
285
|
|
|
|
286
|
|
|
# Generate Task Filter |
287
|
|
|
task_filter = generate_task_filter( |
288
|
|
|
period_start=period_start, |
289
|
|
|
period_end=period_end, |
290
|
|
|
tags=filter_tags, |
291
|
|
|
) |
292
|
|
|
|
293
|
|
|
# Find reports |
294
|
|
|
reports = get_last_reports_from_tasks(gmp=gmp, task_filter=task_filter) |
295
|
|
|
|
296
|
|
|
print("Combining {} found reports.".format(len(reports))) |
297
|
|
|
|
298
|
|
|
filter_term = '' |
299
|
|
|
if parsed_args.filter: |
300
|
|
|
filter_term = ' '.join(parsed_args.filter) |
301
|
|
|
print( |
302
|
|
|
'Filtering the results by the following filter term [{}]'.format( |
303
|
|
|
filter_term |
304
|
|
|
) |
305
|
|
|
) |
306
|
|
|
else: |
307
|
|
|
print('No result filter given.') |
308
|
|
|
|
309
|
|
|
# Combine the reports |
310
|
|
|
combined_report = combine_reports( |
311
|
|
|
gmp=gmp, reports=reports, filter_term=filter_term |
312
|
|
|
) |
313
|
|
|
|
314
|
|
|
# Import the generated report to GSM |
315
|
|
|
report = send_report( |
316
|
|
|
gmp=gmp, |
317
|
|
|
combined_report=combined_report, |
318
|
|
|
period_start=period_start, |
319
|
|
|
period_end=period_end, |
320
|
|
|
) |
321
|
|
|
|
322
|
|
|
print("Successfully imported new consolidated report [{}]".format(report)) |
323
|
|
|
|
324
|
|
|
|
325
|
|
|
if __name__ == '__gmp__': |
326
|
|
|
main(gmp, args) |
|
|
|
|
327
|
|
|
|