greenbone /
gvm-tools
| 1 | # -*- coding: utf-8 -*- |
||
| 2 | # Copyright (C) 2021 Greenbone Networks GmbH |
||
| 3 | # |
||
| 4 | # SPDX-License-Identifier: GPL-3.0-or-later |
||
| 5 | # |
||
| 6 | # This program is free software: you can redistribute it and/or modify |
||
| 7 | # it under the terms of the GNU General Public License as published by |
||
| 8 | # the Free Software Foundation, either version 3 of the License, or |
||
| 9 | # (at your option) any later version. |
||
| 10 | # |
||
| 11 | # This program is distributed in the hope that it will be useful, |
||
| 12 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
||
| 13 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||
| 14 | # GNU General Public License for more details. |
||
| 15 | # |
||
| 16 | # You should have received a copy of the GNU General Public License |
||
| 17 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
||
| 18 | |||
| 19 | from uuid import UUID |
||
| 20 | from typing import List, Tuple |
||
| 21 | from datetime import date |
||
| 22 | from argparse import ArgumentParser, Namespace, RawTextHelpFormatter |
||
| 23 | from lxml import etree as e |
||
| 24 | from gvm.protocols.gmp import Gmp |
||
| 25 | from gvm.errors import GvmError |
||
| 26 | |||
| 27 | from gvmtools.helper import generate_uuid, error_and_exit |
||
| 28 | |||
| 29 | HELP_TEXT = ( |
||
| 30 | 'This script creates a consolidated report and imports it to the GSM. ' |
||
| 31 | 'You are able to set a time period. Within this period the last report' |
||
| 32 | 'of all tasks will be consolidated. You can additionally filter the ' |
||
| 33 | 'tasks by one or more tags and the results with a filter id or filter ' |
||
| 34 | 'term.\n' |
||
| 35 | ' Usable with gvm-script (gvm-tools). Help: gvm-script -h' |
||
| 36 | ) |
||
| 37 | |||
| 38 | |||
| 39 | def parse_tags(tags: List[str]) -> List[str]: |
||
| 40 | """Parsing and validating the given tags |
||
| 41 | |||
| 42 | tags (List): A list containing tags: |
||
| 43 | name, tag-id, name=value |
||
| 44 | |||
| 45 | Returns a list containing tag="name", tag_id="id" ... |
||
| 46 | """ |
||
| 47 | filter_tags = [] |
||
| 48 | for tag in tags: |
||
| 49 | try: |
||
| 50 | UUID(tag, version=4) |
||
| 51 | filter_tags.append('tag_id="{}"'.format(tag)) |
||
| 52 | except ValueError: |
||
| 53 | filter_tags.append('tag="{}"'.format(tag)) |
||
| 54 | |||
| 55 | return filter_tags |
||
| 56 | |||
| 57 | |||
| 58 | def parse_period(period: List[str]) -> Tuple[date, date]: |
||
| 59 | """Parsing and validating the given time period |
||
| 60 | |||
| 61 | period (List): A list with two entries containing |
||
| 62 | dates in the format yyyy/mm/dd |
||
| 63 | |||
| 64 | Returns two date-objects containing the passed dates |
||
| 65 | """ |
||
| 66 | try: |
||
| 67 | s_year, s_month, s_day = map(int, period[0].split('/')) |
||
| 68 | except ValueError as e: |
||
| 69 | error_and_exit( |
||
| 70 | 'Start date [{}] is not a correct date format:\n{}'.format( |
||
| 71 | period[0], e.args[0] |
||
| 72 | ) |
||
| 73 | ) |
||
| 74 | try: |
||
| 75 | e_year, e_month, e_day = map(int, period[1].split('/')) |
||
| 76 | except ValueError as e: |
||
| 77 | error_and_exit( |
||
| 78 | 'End date [{}] is not a correct date format:\n{}'.format( |
||
| 79 | period[1], e.args[0] |
||
| 80 | ) |
||
| 81 | ) |
||
| 82 | |||
| 83 | try: |
||
| 84 | period_start = date(s_year, s_month, s_day) |
||
| 85 | except ValueError as e: |
||
| 86 | error_and_exit('Start date: {}'.format(e.args[0])) |
||
| 87 | |||
| 88 | try: |
||
| 89 | period_end = date(e_year, e_month, e_day) |
||
| 90 | except ValueError as e: |
||
| 91 | error_and_exit('End date: {}'.format(e.args[0])) |
||
| 92 | |||
| 93 | if period_end < period_start: |
||
| 94 | error_and_exit('The start date seems to after the end date.') |
||
| 95 | |||
| 96 | return period_start, period_end |
||
| 97 | |||
| 98 | |||
| 99 | View Code Duplication | def parse_args(args: Namespace) -> Namespace: # pylint: disable=unused-argument |
|
|
0 ignored issues
–
show
Duplication
introduced
by
Loading history...
|
|||
| 100 | """ Parsing args ... """ |
||
| 101 | |||
| 102 | parser = ArgumentParser( |
||
| 103 | prefix_chars='+', |
||
| 104 | add_help=False, |
||
| 105 | formatter_class=RawTextHelpFormatter, |
||
| 106 | description=HELP_TEXT, |
||
| 107 | ) |
||
| 108 | |||
| 109 | parser.add_argument( |
||
| 110 | '+h', |
||
| 111 | '++help', |
||
| 112 | action='help', |
||
| 113 | help='Show this help message and exit.', |
||
| 114 | ) |
||
| 115 | |||
| 116 | parser.add_argument( |
||
| 117 | '+p', |
||
| 118 | '++period', |
||
| 119 | nargs=2, |
||
| 120 | type=str, |
||
| 121 | required=True, |
||
| 122 | dest='period', |
||
| 123 | help=( |
||
| 124 | 'Choose a time period that is filtering the tasks.\n' |
||
| 125 | 'Use the date format YYYY/MM/DD.' |
||
| 126 | ), |
||
| 127 | ) |
||
| 128 | |||
| 129 | parser.add_argument( |
||
| 130 | '+t', |
||
| 131 | '++tags', |
||
| 132 | nargs='+', |
||
| 133 | type=str, |
||
| 134 | dest='tags', |
||
| 135 | help=( |
||
| 136 | 'Filter the tasks by given tag(s).\n' |
||
| 137 | 'If you pass more than on tag, they will be concatenated with ' |
||
| 138 | or '\n' |
||
| 139 | 'You can pass tag names, tag ids or tag name=value to this argument' |
||
| 140 | ), |
||
| 141 | ) |
||
| 142 | |||
| 143 | filter_args = parser.add_mutually_exclusive_group() |
||
| 144 | |||
| 145 | filter_args.add_argument( |
||
| 146 | '++filter-terms', |
||
| 147 | nargs='+', |
||
| 148 | type=str, |
||
| 149 | dest='filter_term', |
||
| 150 | help='Filter the results by given filter terms.', |
||
| 151 | ) |
||
| 152 | |||
| 153 | filter_args.add_argument( |
||
| 154 | '++filter-id', |
||
| 155 | type=str, |
||
| 156 | dest='filter_id', |
||
| 157 | help='Filter the results by given filter id.', |
||
| 158 | ) |
||
| 159 | |||
| 160 | script_args, _ = parser.parse_known_args() |
||
| 161 | return script_args |
||
| 162 | |||
| 163 | |||
| 164 | def generate_task_filter( |
||
| 165 | period_start: date, period_end: date, tags: List[str] |
||
| 166 | ) -> str: |
||
| 167 | """Generate the tasks filter |
||
| 168 | |||
| 169 | period_start: the start date |
||
| 170 | period_end: the end date |
||
| 171 | tags: list of tags for the filter |
||
| 172 | |||
| 173 | Returns an task filter string |
||
| 174 | """ |
||
| 175 | task_filter = 'rows=-1 ' |
||
| 176 | |||
| 177 | # last is for the timestamp of the last report in that task |
||
| 178 | period_filter = 'last>{0} and last<{1}'.format( |
||
| 179 | period_start.isoformat(), period_end.isoformat() |
||
| 180 | ) |
||
| 181 | filter_parts = [] |
||
| 182 | if tags: |
||
| 183 | for tag in tags: |
||
| 184 | filter_parts.append('{} and {}'.format(period_filter, tag)) |
||
| 185 | |||
| 186 | tags_filter = ' or '.join(filter_parts) |
||
| 187 | task_filter += tags_filter |
||
| 188 | else: |
||
| 189 | task_filter += period_filter |
||
| 190 | |||
| 191 | return task_filter |
||
| 192 | |||
| 193 | |||
| 194 | def get_last_reports_from_tasks(gmp: Gmp, task_filter: str) -> List[str]: |
||
| 195 | """Get the last reports from the tasks in the given time period |
||
| 196 | |||
| 197 | gmp: the GMP object |
||
| 198 | task_filter: task filter string |
||
| 199 | |||
| 200 | """ |
||
| 201 | |||
| 202 | print('Filtering the task with the filter term [{}]'.format(task_filter)) |
||
| 203 | |||
| 204 | tasks_xml = gmp.get_tasks(filter=task_filter) |
||
| 205 | reports = [] |
||
| 206 | for report in tasks_xml.xpath('task/last_report/report/@id'): |
||
| 207 | reports.append(str(report)) |
||
| 208 | |||
| 209 | # remove duplicates ... just in case |
||
| 210 | reports = list(dict.fromkeys(reports)) |
||
| 211 | |||
| 212 | return reports |
||
| 213 | |||
| 214 | |||
| 215 | def combine_reports( |
||
| 216 | gmp: Gmp, reports: List[str], filter_term: str, filter_id: str |
||
| 217 | ) -> e.Element: |
||
| 218 | """Combining the filtered ports, results and hosts of the given |
||
| 219 | report ids into one new report. |
||
| 220 | |||
| 221 | gmp: the GMP object |
||
| 222 | reports (List): List of report_ids |
||
| 223 | filter_term (str): the result filter string |
||
| 224 | """ |
||
| 225 | |||
| 226 | new_uuid = generate_uuid() |
||
| 227 | combined_report = e.Element( |
||
| 228 | 'report', |
||
| 229 | { |
||
| 230 | 'id': new_uuid, |
||
| 231 | 'format_id': 'd5da9f67-8551-4e51-807b-b6a873d70e34', |
||
| 232 | 'extension': 'xml', |
||
| 233 | 'content_type': 'text/xml', |
||
| 234 | }, |
||
| 235 | ) |
||
| 236 | report_elem = e.Element('report', {'id': new_uuid}) |
||
| 237 | |||
| 238 | ports_elem = e.Element('ports', {'start': '1', 'max': '-1'}) |
||
| 239 | results_elem = e.Element('results', {'start': '1', 'max': '-1'}) |
||
| 240 | combined_report.append(report_elem) |
||
| 241 | report_elem.append(ports_elem) |
||
| 242 | report_elem.append(results_elem) |
||
| 243 | |||
| 244 | for report in reports: |
||
| 245 | try: |
||
| 246 | if filter_id: |
||
| 247 | current_report = gmp.get_report( |
||
| 248 | report, filter_id=filter_id, details=True |
||
| 249 | ).find('report') |
||
| 250 | else: |
||
| 251 | current_report = gmp.get_report( |
||
| 252 | report, filter=filter_term, details=True |
||
| 253 | ).find('report') |
||
| 254 | except GvmError: |
||
| 255 | print("Could not find the report [{}]".format(report)) |
||
| 256 | for port in current_report.xpath('report/ports/port'): |
||
| 257 | ports_elem.append(port) |
||
| 258 | for result in current_report.xpath('report/results/result'): |
||
| 259 | results_elem.append(result) |
||
| 260 | for host in current_report.xpath('report/host'): |
||
| 261 | report_elem.append(host) |
||
| 262 | |||
| 263 | return combined_report |
||
| 264 | |||
| 265 | |||
| 266 | def send_report( |
||
| 267 | gmp: Gmp, combined_report: e.Element, period_start: date, period_end: date |
||
| 268 | ) -> str: |
||
| 269 | """Creating a container task and sending the combined report to the GSM |
||
| 270 | |||
| 271 | gmp: the GMP object |
||
| 272 | combined_report: the combined report xml object |
||
| 273 | period_start: the start date |
||
| 274 | period_end: the end date |
||
| 275 | """ |
||
| 276 | |||
| 277 | task_name = 'Consolidated Report [{} - {}]'.format(period_start, period_end) |
||
| 278 | |||
| 279 | res = gmp.create_container_task( |
||
| 280 | name=task_name, comment='Created with gvm-tools.' |
||
| 281 | ) |
||
| 282 | |||
| 283 | task_id = res.xpath('//@id')[0] |
||
| 284 | |||
| 285 | combined_report = e.tostring(combined_report) |
||
| 286 | |||
| 287 | res = gmp.import_report(combined_report, task_id=task_id, in_assets=True) |
||
| 288 | |||
| 289 | return res.xpath('//@id')[0] |
||
| 290 | |||
| 291 | |||
| 292 | def main(gmp: Gmp, args: Namespace) -> None: |
||
| 293 | # pylint: disable=undefined-variable |
||
| 294 | |||
| 295 | parsed_args = parse_args(args=args) |
||
| 296 | |||
| 297 | period_start, period_end = parse_period(period=parsed_args.period) |
||
| 298 | |||
| 299 | print( |
||
| 300 | 'Combining reports from tasks within the time period [{}, {}]'.format( |
||
| 301 | period_start, period_end |
||
| 302 | ) |
||
| 303 | ) |
||
| 304 | |||
| 305 | # Generate Task Filter |
||
| 306 | filter_tags = None |
||
| 307 | if parsed_args.tags: |
||
| 308 | filter_tags = parse_tags(tags=parsed_args.tags) |
||
| 309 | |||
| 310 | task_filter = generate_task_filter( |
||
| 311 | period_start=period_start, |
||
| 312 | period_end=period_end, |
||
| 313 | tags=filter_tags, |
||
| 314 | ) |
||
| 315 | |||
| 316 | # Find reports |
||
| 317 | reports = get_last_reports_from_tasks(gmp=gmp, task_filter=task_filter) |
||
| 318 | |||
| 319 | print("Combining {} found reports.".format(len(reports))) |
||
| 320 | |||
| 321 | filter_term = '' |
||
| 322 | if parsed_args.filter_term: |
||
| 323 | filter_term = ' '.join(parsed_args.filter_term) |
||
| 324 | print( |
||
| 325 | 'Filtering the results by the following filter term [{}]'.format( |
||
| 326 | filter_term |
||
| 327 | ) |
||
| 328 | ) |
||
| 329 | elif parsed_args.filter_id: |
||
| 330 | try: |
||
| 331 | filter_xml = gmp.get_filter(filter_id=parsed_args.filter_id).find( |
||
| 332 | 'filter' |
||
| 333 | ) |
||
| 334 | print( |
||
| 335 | 'Filtering the results by the following filter term ' |
||
| 336 | '[{}]'.format(filter_xml.find('term').text) |
||
| 337 | ) |
||
| 338 | except GvmError: |
||
| 339 | print( |
||
| 340 | "Filter with the ID [{}] is not existing.".format( |
||
| 341 | parsed_args.filter_id |
||
| 342 | ) |
||
| 343 | ) |
||
| 344 | else: |
||
| 345 | print('No results filter given.') |
||
| 346 | |||
| 347 | # Combine the reports |
||
| 348 | combined_report = combine_reports( |
||
| 349 | gmp=gmp, |
||
| 350 | reports=reports, |
||
| 351 | filter_term=filter_term, |
||
| 352 | filter_id=parsed_args.filter_id, |
||
| 353 | ) |
||
| 354 | |||
| 355 | # Import the generated report to GSM |
||
| 356 | report = send_report( |
||
| 357 | gmp=gmp, |
||
| 358 | combined_report=combined_report, |
||
| 359 | period_start=period_start, |
||
| 360 | period_end=period_end, |
||
| 361 | ) |
||
| 362 | |||
| 363 | print("Successfully imported new consolidated report [{}]".format(report)) |
||
| 364 | |||
| 365 | |||
| 366 | if __name__ == '__gmp__': |
||
| 367 | main(gmp, args) |
||
|
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
Comprehensibility
Best Practice
introduced
by
|
|||
| 368 |