1 | # -*- coding: utf-8 -*- |
||
2 | # Copyright (C) 2021 Greenbone Networks GmbH |
||
3 | # |
||
4 | # SPDX-License-Identifier: GPL-3.0-or-later |
||
5 | # |
||
6 | # This program is free software: you can redistribute it and/or modify |
||
7 | # it under the terms of the GNU General Public License as published by |
||
8 | # the Free Software Foundation, either version 3 of the License, or |
||
9 | # (at your option) any later version. |
||
10 | # |
||
11 | # This program is distributed in the hope that it will be useful, |
||
12 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
||
13 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||
14 | # GNU General Public License for more details. |
||
15 | # |
||
16 | # You should have received a copy of the GNU General Public License |
||
17 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
||
18 | |||
19 | from uuid import UUID |
||
20 | from typing import List, Tuple |
||
21 | from datetime import date |
||
22 | from argparse import ArgumentParser, Namespace, RawTextHelpFormatter |
||
23 | from lxml import etree as e |
||
24 | from gvm.protocols.gmp import Gmp |
||
25 | from gvm.errors import GvmError |
||
26 | |||
27 | from gvmtools.helper import generate_uuid, error_and_exit |
||
28 | |||
29 | HELP_TEXT = ( |
||
30 | 'This script creates a consolidated report and imports it to the GSM. ' |
||
31 | 'You are able to set a time period. Within this period the last report' |
||
32 | 'of all tasks will be consolidated. You can additionally filter the ' |
||
33 | 'tasks by one or more tags and the results with a filter id or filter ' |
||
34 | 'term.\n' |
||
35 | ' Usable with gvm-script (gvm-tools). Help: gvm-script -h' |
||
36 | ) |
||
37 | |||
38 | |||
39 | def parse_tags(tags: List[str]) -> List[str]: |
||
40 | """Parsing and validating the given tags |
||
41 | |||
42 | tags (List): A list containing tags: |
||
43 | name, tag-id, name=value |
||
44 | |||
45 | Returns a list containing tag="name", tag_id="id" ... |
||
46 | """ |
||
47 | filter_tags = [] |
||
48 | for tag in tags: |
||
49 | try: |
||
50 | UUID(tag, version=4) |
||
51 | filter_tags.append('tag_id="{}"'.format(tag)) |
||
52 | except ValueError: |
||
53 | filter_tags.append('tag="{}"'.format(tag)) |
||
54 | |||
55 | return filter_tags |
||
56 | |||
57 | |||
58 | def parse_period(period: List[str]) -> Tuple[date, date]: |
||
59 | """Parsing and validating the given time period |
||
60 | |||
61 | period (List): A list with two entries containing |
||
62 | dates in the format yyyy/mm/dd |
||
63 | |||
64 | Returns two date-objects containing the passed dates |
||
65 | """ |
||
66 | try: |
||
67 | s_year, s_month, s_day = map(int, period[0].split('/')) |
||
68 | except ValueError as e: |
||
69 | error_and_exit( |
||
70 | 'Start date [{}] is not a correct date format:\n{}'.format( |
||
71 | period[0], e.args[0] |
||
72 | ) |
||
73 | ) |
||
74 | try: |
||
75 | e_year, e_month, e_day = map(int, period[1].split('/')) |
||
76 | except ValueError as e: |
||
77 | error_and_exit( |
||
78 | 'End date [{}] is not a correct date format:\n{}'.format( |
||
79 | period[1], e.args[0] |
||
80 | ) |
||
81 | ) |
||
82 | |||
83 | try: |
||
84 | period_start = date(s_year, s_month, s_day) |
||
85 | except ValueError as e: |
||
86 | error_and_exit('Start date: {}'.format(e.args[0])) |
||
87 | |||
88 | try: |
||
89 | period_end = date(e_year, e_month, e_day) |
||
90 | except ValueError as e: |
||
91 | error_and_exit('End date: {}'.format(e.args[0])) |
||
92 | |||
93 | if period_end < period_start: |
||
94 | error_and_exit('The start date seems to after the end date.') |
||
95 | |||
96 | return period_start, period_end |
||
97 | |||
98 | |||
99 | View Code Duplication | def parse_args(args: Namespace) -> Namespace: # pylint: disable=unused-argument |
|
0 ignored issues
–
show
Duplication
introduced
by
![]() |
|||
100 | """ Parsing args ... """ |
||
101 | |||
102 | parser = ArgumentParser( |
||
103 | prefix_chars='+', |
||
104 | add_help=False, |
||
105 | formatter_class=RawTextHelpFormatter, |
||
106 | description=HELP_TEXT, |
||
107 | ) |
||
108 | |||
109 | parser.add_argument( |
||
110 | '+h', |
||
111 | '++help', |
||
112 | action='help', |
||
113 | help='Show this help message and exit.', |
||
114 | ) |
||
115 | |||
116 | parser.add_argument( |
||
117 | '+p', |
||
118 | '++period', |
||
119 | nargs=2, |
||
120 | type=str, |
||
121 | required=True, |
||
122 | dest='period', |
||
123 | help=( |
||
124 | 'Choose a time period that is filtering the tasks.\n' |
||
125 | 'Use the date format YYYY/MM/DD.' |
||
126 | ), |
||
127 | ) |
||
128 | |||
129 | parser.add_argument( |
||
130 | '+t', |
||
131 | '++tags', |
||
132 | nargs='+', |
||
133 | type=str, |
||
134 | dest='tags', |
||
135 | help=( |
||
136 | 'Filter the tasks by given tag(s).\n' |
||
137 | 'If you pass more than on tag, they will be concatenated with ' |
||
138 | or '\n' |
||
139 | 'You can pass tag names, tag ids or tag name=value to this argument' |
||
140 | ), |
||
141 | ) |
||
142 | |||
143 | filter_args = parser.add_mutually_exclusive_group() |
||
144 | |||
145 | filter_args.add_argument( |
||
146 | '++filter-terms', |
||
147 | nargs='+', |
||
148 | type=str, |
||
149 | dest='filter_term', |
||
150 | help='Filter the results by given filter terms.', |
||
151 | ) |
||
152 | |||
153 | filter_args.add_argument( |
||
154 | '++filter-id', |
||
155 | type=str, |
||
156 | dest='filter_id', |
||
157 | help='Filter the results by given filter id.', |
||
158 | ) |
||
159 | |||
160 | script_args, _ = parser.parse_known_args() |
||
161 | return script_args |
||
162 | |||
163 | |||
164 | def generate_task_filter( |
||
165 | period_start: date, period_end: date, tags: List[str] |
||
166 | ) -> str: |
||
167 | """Generate the tasks filter |
||
168 | |||
169 | period_start: the start date |
||
170 | period_end: the end date |
||
171 | tags: list of tags for the filter |
||
172 | |||
173 | Returns an task filter string |
||
174 | """ |
||
175 | task_filter = 'rows=-1 ' |
||
176 | |||
177 | # last is for the timestamp of the last report in that task |
||
178 | period_filter = 'last>{0} and last<{1}'.format( |
||
179 | period_start.isoformat(), period_end.isoformat() |
||
180 | ) |
||
181 | filter_parts = [] |
||
182 | if tags: |
||
183 | for tag in tags: |
||
184 | filter_parts.append('{} and {}'.format(period_filter, tag)) |
||
185 | |||
186 | tags_filter = ' or '.join(filter_parts) |
||
187 | task_filter += tags_filter |
||
188 | else: |
||
189 | task_filter += period_filter |
||
190 | |||
191 | return task_filter |
||
192 | |||
193 | |||
194 | def get_last_reports_from_tasks(gmp: Gmp, task_filter: str) -> List[str]: |
||
195 | """Get the last reports from the tasks in the given time period |
||
196 | |||
197 | gmp: the GMP object |
||
198 | task_filter: task filter string |
||
199 | |||
200 | """ |
||
201 | |||
202 | print('Filtering the task with the filter term [{}]'.format(task_filter)) |
||
203 | |||
204 | tasks_xml = gmp.get_tasks(filter=task_filter) |
||
205 | reports = [] |
||
206 | for report in tasks_xml.xpath('task/last_report/report/@id'): |
||
207 | reports.append(str(report)) |
||
208 | |||
209 | # remove duplicates ... just in case |
||
210 | reports = list(dict.fromkeys(reports)) |
||
211 | |||
212 | return reports |
||
213 | |||
214 | |||
215 | def combine_reports( |
||
216 | gmp: Gmp, reports: List[str], filter_term: str, filter_id: str |
||
217 | ) -> e.Element: |
||
218 | """Combining the filtered ports, results and hosts of the given |
||
219 | report ids into one new report. |
||
220 | |||
221 | gmp: the GMP object |
||
222 | reports (List): List of report_ids |
||
223 | filter_term (str): the result filter string |
||
224 | """ |
||
225 | |||
226 | new_uuid = generate_uuid() |
||
227 | combined_report = e.Element( |
||
228 | 'report', |
||
229 | { |
||
230 | 'id': new_uuid, |
||
231 | 'format_id': 'd5da9f67-8551-4e51-807b-b6a873d70e34', |
||
232 | 'extension': 'xml', |
||
233 | 'content_type': 'text/xml', |
||
234 | }, |
||
235 | ) |
||
236 | report_elem = e.Element('report', {'id': new_uuid}) |
||
237 | |||
238 | ports_elem = e.Element('ports', {'start': '1', 'max': '-1'}) |
||
239 | results_elem = e.Element('results', {'start': '1', 'max': '-1'}) |
||
240 | combined_report.append(report_elem) |
||
241 | report_elem.append(ports_elem) |
||
242 | report_elem.append(results_elem) |
||
243 | |||
244 | for report in reports: |
||
245 | try: |
||
246 | if filter_id: |
||
247 | current_report = gmp.get_report( |
||
248 | report, filter_id=filter_id, details=True |
||
249 | ).find('report') |
||
250 | else: |
||
251 | current_report = gmp.get_report( |
||
252 | report, filter=filter_term, details=True |
||
253 | ).find('report') |
||
254 | except GvmError: |
||
255 | print("Could not find the report [{}]".format(report)) |
||
256 | for port in current_report.xpath('report/ports/port'): |
||
257 | ports_elem.append(port) |
||
258 | for result in current_report.xpath('report/results/result'): |
||
259 | results_elem.append(result) |
||
260 | for host in current_report.xpath('report/host'): |
||
261 | report_elem.append(host) |
||
262 | |||
263 | return combined_report |
||
264 | |||
265 | |||
266 | def send_report( |
||
267 | gmp: Gmp, combined_report: e.Element, period_start: date, period_end: date |
||
268 | ) -> str: |
||
269 | """Creating a container task and sending the combined report to the GSM |
||
270 | |||
271 | gmp: the GMP object |
||
272 | combined_report: the combined report xml object |
||
273 | period_start: the start date |
||
274 | period_end: the end date |
||
275 | """ |
||
276 | |||
277 | task_name = 'Consolidated Report [{} - {}]'.format(period_start, period_end) |
||
278 | |||
279 | res = gmp.create_container_task( |
||
280 | name=task_name, comment='Created with gvm-tools.' |
||
281 | ) |
||
282 | |||
283 | task_id = res.xpath('//@id')[0] |
||
284 | |||
285 | combined_report = e.tostring(combined_report) |
||
286 | |||
287 | res = gmp.import_report(combined_report, task_id=task_id, in_assets=True) |
||
288 | |||
289 | return res.xpath('//@id')[0] |
||
290 | |||
291 | |||
292 | def main(gmp: Gmp, args: Namespace) -> None: |
||
293 | # pylint: disable=undefined-variable |
||
294 | |||
295 | parsed_args = parse_args(args=args) |
||
296 | |||
297 | period_start, period_end = parse_period(period=parsed_args.period) |
||
298 | |||
299 | print( |
||
300 | 'Combining reports from tasks within the time period [{}, {}]'.format( |
||
301 | period_start, period_end |
||
302 | ) |
||
303 | ) |
||
304 | |||
305 | # Generate Task Filter |
||
306 | filter_tags = None |
||
307 | if parsed_args.tags: |
||
308 | filter_tags = parse_tags(tags=parsed_args.tags) |
||
309 | |||
310 | task_filter = generate_task_filter( |
||
311 | period_start=period_start, |
||
312 | period_end=period_end, |
||
313 | tags=filter_tags, |
||
314 | ) |
||
315 | |||
316 | # Find reports |
||
317 | reports = get_last_reports_from_tasks(gmp=gmp, task_filter=task_filter) |
||
318 | |||
319 | print("Combining {} found reports.".format(len(reports))) |
||
320 | |||
321 | filter_term = '' |
||
322 | if parsed_args.filter_term: |
||
323 | filter_term = ' '.join(parsed_args.filter_term) |
||
324 | print( |
||
325 | 'Filtering the results by the following filter term [{}]'.format( |
||
326 | filter_term |
||
327 | ) |
||
328 | ) |
||
329 | elif parsed_args.filter_id: |
||
330 | try: |
||
331 | filter_xml = gmp.get_filter(filter_id=parsed_args.filter_id).find( |
||
332 | 'filter' |
||
333 | ) |
||
334 | print( |
||
335 | 'Filtering the results by the following filter term ' |
||
336 | '[{}]'.format(filter_xml.find('term').text) |
||
337 | ) |
||
338 | except GvmError: |
||
339 | print( |
||
340 | "Filter with the ID [{}] is not existing.".format( |
||
341 | parsed_args.filter_id |
||
342 | ) |
||
343 | ) |
||
344 | else: |
||
345 | print('No results filter given.') |
||
346 | |||
347 | # Combine the reports |
||
348 | combined_report = combine_reports( |
||
349 | gmp=gmp, |
||
350 | reports=reports, |
||
351 | filter_term=filter_term, |
||
352 | filter_id=parsed_args.filter_id, |
||
353 | ) |
||
354 | |||
355 | # Import the generated report to GSM |
||
356 | report = send_report( |
||
357 | gmp=gmp, |
||
358 | combined_report=combined_report, |
||
359 | period_start=period_start, |
||
360 | period_end=period_end, |
||
361 | ) |
||
362 | |||
363 | print("Successfully imported new consolidated report [{}]".format(report)) |
||
364 | |||
365 | |||
366 | if __name__ == '__gmp__': |
||
367 | main(gmp, args) |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
Comprehensibility
Best Practice
introduced
by
|
|||
368 |