Issues (69)

scripts/create-consolidated-report.gmp.py (3 issues)

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
from uuid import UUID
20
from typing import List, Tuple
21
from datetime import date
22
from argparse import ArgumentParser, Namespace, RawTextHelpFormatter
23
from lxml import etree as e
24
from gvm.protocols.gmp import Gmp
25
from gvm.errors import GvmError
26
27
from gvmtools.helper import generate_uuid, error_and_exit
28
29
HELP_TEXT = (
30
    'This script creates a consolidated report and imports it to the GSM. '
31
    'You are able to set a time period. Within this period the last report'
32
    'of all tasks will be consolidated. You can additionally filter the '
33
    'tasks by one or more tags and the results with a filter id or filter '
34
    'term.\n'
35
    ' Usable with gvm-script (gvm-tools). Help: gvm-script -h'
36
)
37
38
39
def parse_tags(tags: List[str]) -> List[str]:
40
    """Parsing and validating the given tags
41
42
    tags (List): A list containing tags:
43
                 name, tag-id, name=value
44
45
    Returns a list containing tag="name", tag_id="id" ...
46
    """
47
    filter_tags = []
48
    for tag in tags:
49
        try:
50
            UUID(tag, version=4)
51
            filter_tags.append('tag_id="{}"'.format(tag))
52
        except ValueError:
53
            filter_tags.append('tag="{}"'.format(tag))
54
55
    return filter_tags
56
57
58
def parse_period(period: List[str]) -> Tuple[date, date]:
59
    """Parsing and validating the given time period
60
61
    period (List): A list with two entries containing
62
                   dates in the format yyyy/mm/dd
63
64
    Returns two date-objects containing the passed dates
65
    """
66
    try:
67
        s_year, s_month, s_day = map(int, period[0].split('/'))
68
    except ValueError as e:
69
        error_and_exit(
70
            'Start date [{}] is not a correct date format:\n{}'.format(
71
                period[0], e.args[0]
72
            )
73
        )
74
    try:
75
        e_year, e_month, e_day = map(int, period[1].split('/'))
76
    except ValueError as e:
77
        error_and_exit(
78
            'End date [{}] is not a correct date format:\n{}'.format(
79
                period[1], e.args[0]
80
            )
81
        )
82
83
    try:
84
        period_start = date(s_year, s_month, s_day)
85
    except ValueError as e:
86
        error_and_exit('Start date: {}'.format(e.args[0]))
87
88
    try:
89
        period_end = date(e_year, e_month, e_day)
90
    except ValueError as e:
91
        error_and_exit('End date: {}'.format(e.args[0]))
92
93
    if period_end < period_start:
94
        error_and_exit('The start date seems to after the end date.')
95
96
    return period_start, period_end
97
98
99 View Code Duplication
def parse_args(args: Namespace) -> Namespace:  # pylint: disable=unused-argument
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
100
    """ Parsing args ... """
101
102
    parser = ArgumentParser(
103
        prefix_chars='+',
104
        add_help=False,
105
        formatter_class=RawTextHelpFormatter,
106
        description=HELP_TEXT,
107
    )
108
109
    parser.add_argument(
110
        '+h',
111
        '++help',
112
        action='help',
113
        help='Show this help message and exit.',
114
    )
115
116
    parser.add_argument(
117
        '+p',
118
        '++period',
119
        nargs=2,
120
        type=str,
121
        required=True,
122
        dest='period',
123
        help=(
124
            'Choose a time period that is filtering the tasks.\n'
125
            'Use the date format YYYY/MM/DD.'
126
        ),
127
    )
128
129
    parser.add_argument(
130
        '+t',
131
        '++tags',
132
        nargs='+',
133
        type=str,
134
        dest='tags',
135
        help=(
136
            'Filter the tasks by given tag(s).\n'
137
            'If you pass more than on tag, they will be concatenated with '
138
            or '\n'
139
            'You can pass tag names, tag ids or tag name=value to this argument'
140
        ),
141
    )
142
143
    filter_args = parser.add_mutually_exclusive_group()
144
145
    filter_args.add_argument(
146
        '++filter-terms',
147
        nargs='+',
148
        type=str,
149
        dest='filter_term',
150
        help='Filter the results by given filter terms.',
151
    )
152
153
    filter_args.add_argument(
154
        '++filter-id',
155
        type=str,
156
        dest='filter_id',
157
        help='Filter the results by given filter id.',
158
    )
159
160
    script_args, _ = parser.parse_known_args()
161
    return script_args
162
163
164
def generate_task_filter(
165
    period_start: date, period_end: date, tags: List[str]
166
) -> str:
167
    """Generate the tasks filter
168
169
    period_start: the start date
170
    period_end: the end date
171
    tags: list of tags for the filter
172
173
    Returns an task filter string
174
    """
175
    task_filter = 'rows=-1 '
176
177
    # last is for the timestamp of the last report in that task
178
    period_filter = 'last>{0} and last<{1}'.format(
179
        period_start.isoformat(), period_end.isoformat()
180
    )
181
    filter_parts = []
182
    if tags:
183
        for tag in tags:
184
            filter_parts.append('{} and {}'.format(period_filter, tag))
185
186
        tags_filter = ' or '.join(filter_parts)
187
        task_filter += tags_filter
188
    else:
189
        task_filter += period_filter
190
191
    return task_filter
192
193
194
def get_last_reports_from_tasks(gmp: Gmp, task_filter: str) -> List[str]:
195
    """Get the last reports from the tasks in the given time period
196
197
    gmp: the GMP object
198
    task_filter: task filter string
199
200
    """
201
202
    print('Filtering the task with the filter term [{}]'.format(task_filter))
203
204
    tasks_xml = gmp.get_tasks(filter=task_filter)
205
    reports = []
206
    for report in tasks_xml.xpath('task/last_report/report/@id'):
207
        reports.append(str(report))
208
209
    # remove duplicates ... just in case
210
    reports = list(dict.fromkeys(reports))
211
212
    return reports
213
214
215
def combine_reports(
216
    gmp: Gmp, reports: List[str], filter_term: str, filter_id: str
217
) -> e.Element:
218
    """Combining the filtered ports, results and hosts of the given
219
    report ids into one new report.
220
221
    gmp: the GMP object
222
    reports (List): List of report_ids
223
    filter_term (str): the result filter string
224
    """
225
226
    new_uuid = generate_uuid()
227
    combined_report = e.Element(
228
        'report',
229
        {
230
            'id': new_uuid,
231
            'format_id': 'd5da9f67-8551-4e51-807b-b6a873d70e34',
232
            'extension': 'xml',
233
            'content_type': 'text/xml',
234
        },
235
    )
236
    report_elem = e.Element('report', {'id': new_uuid})
237
238
    ports_elem = e.Element('ports', {'start': '1', 'max': '-1'})
239
    results_elem = e.Element('results', {'start': '1', 'max': '-1'})
240
    combined_report.append(report_elem)
241
    report_elem.append(ports_elem)
242
    report_elem.append(results_elem)
243
244
    for report in reports:
245
        try:
246
            if filter_id:
247
                current_report = gmp.get_report(
248
                    report, filter_id=filter_id, details=True
249
                ).find('report')
250
            else:
251
                current_report = gmp.get_report(
252
                    report, filter=filter_term, details=True
253
                ).find('report')
254
        except GvmError:
255
            print("Could not find the report [{}]".format(report))
256
        for port in current_report.xpath('report/ports/port'):
257
            ports_elem.append(port)
258
        for result in current_report.xpath('report/results/result'):
259
            results_elem.append(result)
260
        for host in current_report.xpath('report/host'):
261
            report_elem.append(host)
262
263
    return combined_report
264
265
266
def send_report(
267
    gmp: Gmp, combined_report: e.Element, period_start: date, period_end: date
268
) -> str:
269
    """Creating a container task and sending the combined report to the GSM
270
271
    gmp: the GMP object
272
    combined_report: the combined report xml object
273
    period_start: the start date
274
    period_end: the end date
275
    """
276
277
    task_name = 'Consolidated Report [{} - {}]'.format(period_start, period_end)
278
279
    res = gmp.create_container_task(
280
        name=task_name, comment='Created with gvm-tools.'
281
    )
282
283
    task_id = res.xpath('//@id')[0]
284
285
    combined_report = e.tostring(combined_report)
286
287
    res = gmp.import_report(combined_report, task_id=task_id, in_assets=True)
288
289
    return res.xpath('//@id')[0]
290
291
292
def main(gmp: Gmp, args: Namespace) -> None:
293
    # pylint: disable=undefined-variable
294
295
    parsed_args = parse_args(args=args)
296
297
    period_start, period_end = parse_period(period=parsed_args.period)
298
299
    print(
300
        'Combining reports from tasks within the time period [{}, {}]'.format(
301
            period_start, period_end
302
        )
303
    )
304
305
    # Generate Task Filter
306
    filter_tags = None
307
    if parsed_args.tags:
308
        filter_tags = parse_tags(tags=parsed_args.tags)
309
310
    task_filter = generate_task_filter(
311
        period_start=period_start,
312
        period_end=period_end,
313
        tags=filter_tags,
314
    )
315
316
    # Find reports
317
    reports = get_last_reports_from_tasks(gmp=gmp, task_filter=task_filter)
318
319
    print("Combining {} found reports.".format(len(reports)))
320
321
    filter_term = ''
322
    if parsed_args.filter_term:
323
        filter_term = ' '.join(parsed_args.filter_term)
324
        print(
325
            'Filtering the results by the following filter term [{}]'.format(
326
                filter_term
327
            )
328
        )
329
    elif parsed_args.filter_id:
330
        try:
331
            filter_xml = gmp.get_filter(filter_id=parsed_args.filter_id).find(
332
                'filter'
333
            )
334
            print(
335
                'Filtering the results by the following filter term '
336
                '[{}]'.format(filter_xml.find('term').text)
337
            )
338
        except GvmError:
339
            print(
340
                "Filter with the ID [{}] is not existing.".format(
341
                    parsed_args.filter_id
342
                )
343
            )
344
    else:
345
        print('No results filter given.')
346
347
    # Combine the reports
348
    combined_report = combine_reports(
349
        gmp=gmp,
350
        reports=reports,
351
        filter_term=filter_term,
352
        filter_id=parsed_args.filter_id,
353
    )
354
355
    # Import the generated report to GSM
356
    report = send_report(
357
        gmp=gmp,
358
        combined_report=combined_report,
359
        period_start=period_start,
360
        period_end=period_end,
361
    )
362
363
    print("Successfully imported new consolidated report [{}]".format(report))
364
365
366
if __name__ == '__gmp__':
367
    main(gmp, args)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable gmp does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable args does not seem to be defined.
Loading history...
368