Passed
Pull Request — master (#370)
by Jaspar
01:25
created

get_last_reports_from_tasks()   A

Complexity

Conditions 2

Size

Total Lines 19
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 8
nop 2
dl 0
loc 19
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
from uuid import UUID
20
from typing import List
21
from datetime import date
22
from argparse import ArgumentParser, RawTextHelpFormatter
23
from lxml import etree as e
24
25
from gvmtools.helper import generate_uuid, error_and_exit
26
27
HELP_TEXT = (
28
    'This script creates a consolidated report and imports it to the GSM.'
29
    ' Usable with gvm-script (gvm-tools)'
30
)
31
32
33
def parse_tags(tags: List):
34
    """Parsing and validating the given tags
35
36
    tags (List): A list containing tags:
37
                 name, tag-id, name=value
38
39
    Returns a list containing tag="name", tag_id="id" ...
40
    """
41
    filter_tags = []
42
    for tag in tags:
43
        try:
44
            UUID(tag, version=4)
45
            filter_tags.append('tag_id="{}"'.format(tag))
46
        except ValueError:
47
            filter_tags.append('tag="{}"'.format(tag))
48
49
    return filter_tags
50
51
52
def parse_period(period: List):
53
    """Parsing and validating the given time period
54
55
    period (List): A list with two entries containing
56
                   dates in the format yyyy/mm/dd
57
58
    Returns two date-objects containing the passed dates
59
    """
60
    try:
61
        s_year, s_month, s_day = map(int, period[0].split('/'))
62
    except ValueError as e:
63
        error_and_exit(
64
            'Start date [{}] is not a correct date format:\n{}'.format(
65
                period[0], e.args[0]
66
            )
67
        )
68
    try:
69
        e_year, e_month, e_day = map(int, period[1].split('/'))
70
    except ValueError as e:
71
        error_and_exit(
72
            'End date [{}] is not a correct date format:\n{}'.format(
73
                period[1], e.args[0]
74
            )
75
        )
76
77
    try:
78
        period_start = date(s_year, s_month, s_day)
79
    except ValueError as e:
80
        error_and_exit('Start date: {}'.format(e.args[0]))
81
82
    try:
83
        period_end = date(e_year, e_month, e_day)
84
    except ValueError as e:
85
        error_and_exit('End date: {}'.format(e.args[0]))
86
87
    if period_end < period_start:
88
        error_and_exit('The start date seems to after the end date.')
89
90
    return period_start, period_end
91
92
93
def parse_args(args):  # pylint: disable=unused-argument
94
    """ Parsing args ... """
95
96
    parser = ArgumentParser(
97
        prefix_chars='+',
98
        add_help=False,
99
        formatter_class=RawTextHelpFormatter,
100
        description=HELP_TEXT,
101
    )
102
103
    parser.add_argument(
104
        '+h',
105
        '++help',
106
        action='help',
107
        help='Show this help message and exit.',
108
    )
109
110
    parser.add_argument(
111
        '+p',
112
        '++period',
113
        nargs=2,
114
        type=str,
115
        required=True,
116
        dest='period',
117
        help=(
118
            'Choose a time period that is filtering the tasks.\n'
119
            'Use the date format YYYY/MM/DD.'
120
        ),
121
    )
122
123
    parser.add_argument(
124
        '+t',
125
        '++tags',
126
        nargs='+',
127
        type=str,
128
        dest='tags',
129
        help=(
130
            'Filter the tasks by given tag(s).\n'
131
            'If you pass more than on tag, they will be concatenated with '
132
            or '\n'
133
            'You can pass tag names, tag ids or tag name=value to this argument'
134
        ),
135
    )
136
137
    parser.add_argument(
138
        '+f',
139
        '++filter',
140
        nargs='+',
141
        type=str,
142
        dest='filter',
143
        help='Filter the results by given filter(s).',
144
    )
145
146
    script_args, _ = parser.parse_known_args()
147
    return script_args
148
149
150
def generate_task_filter(period_start, period_end, tags: List):
151
    """Generate the tasks filter
152
153
    period_start: the start date
154
    period_end: the end date
155
    tags: list of tags for the filter
156
157
    Returns an task filter string
158
    """
159
    task_filter = 'rows=-1 '
160
    period_filter = 'created>{0} and created<{1}'.format(
161
        period_start.isoformat(), period_end.isoformat()
162
    )
163
    filter_parts = []
164
    if tags:
165
        for tag in tags:
166
            filter_parts.append('{} and {}'.format(period_filter, tag))
167
168
        tags_filter = ' or '.join(filter_parts)
169
        task_filter += tags_filter
170
    else:
171
        task_filter += period_filter
172
173
    return task_filter
174
175
176
def get_last_reports_from_tasks(gmp, task_filter: str):
177
    """Get the last reports from the tasks in the given time period
178
179
    gmp: the GMP object
180
    task_filter: task filter string
181
182
    """
183
184
    print('Filtering the task with the filter term [{}]'.format(task_filter))
185
186
    tasks_xml = gmp.get_tasks(filter=task_filter)
187
    reports = []
188
    for report in tasks_xml.xpath('task/last_report/report/@id'):
189
        reports.append(str(report))
190
191
    # remove duplicates ... just in case
192
    reports = list(dict.fromkeys(reports))
193
194
    return reports
195
196
197
def combine_reports(gmp, reports: List, filter_term: str):
198
    """Combining the filtered ports, results and hosts of the given
199
    report ids into one new report.
200
201
    gmp: the GMP object
202
    reports (List): List of report_ids
203
    filter_term (str): the result filter string
204
    """
205
206
    new_uuid = generate_uuid()
207
    combined_report = e.Element(
208
        'report',
209
        {
210
            'id': new_uuid,
211
            'format_id': 'd5da9f67-8551-4e51-807b-b6a873d70e34',
212
            'extension': 'xml',
213
            'content_type': 'text/xml',
214
        },
215
    )
216
    report_elem = e.Element('report', {'id': new_uuid})
217
218
    ports_elem = e.Element('ports', {'start': '1', 'max': '-1'})
219
    results_elem = e.Element('results', {'start': '1', 'max': '-1'})
220
    combined_report.append(report_elem)
221
    report_elem.append(ports_elem)
222
    report_elem.append(results_elem)
223
224
    for report in reports:
225
        current_report = gmp.get_report(
226
            report, filter=filter_term, details=True
227
        )[0]
228
        for port in current_report.xpath('report/ports/port'):
229
            ports_elem.append(port)
230
        for result in current_report.xpath('report/results/result'):
231
            results_elem.append(result)
232
        for host in current_report.xpath('host'):
233
            report_elem.append(host)
234
235
    return combined_report
236
237
238
def send_report(gmp, combined_report, period_start, period_end):
239
    """Creating a container task and sending the combined report to the GSM
240
241
    gmp: the GMP object
242
    combined_report: the combined report xml object
243
    period_start: the start date
244
    period_end: the end date
245
    """
246
247
    task_name = 'Consolidated Report [{} - {}]'.format(period_start, period_end)
248
249
    res = gmp.create_container_task(
250
        name=task_name, comment='Created with gvm-tools.'
251
    )
252
253
    task_id = res.xpath('//@id')[0]
254
255
    combined_report = e.tostring(combined_report)
256
257
    res = gmp.import_report(combined_report, task_id=task_id)
258
259
    return res.xpath('//@id')[0]
260
261
262
def main(gmp, args):
263
    # pylint: disable=undefined-variable
264
265
    parsed_args = parse_args(args=args)
266
267
    period_start, period_end = parse_period(period=parsed_args.period)
268
269
    print(
270
        'Combining reports from tasks within the time period [{}, {}]'.format(
271
            period_start, period_end
272
        )
273
    )
274
275
    filter_tags = None
276
    if parsed_args.tags:
277
        filter_tags = parse_tags(tags=parsed_args.tags)
278
279
    # Generate Task Filter
280
    task_filter = generate_task_filter(
281
        period_start=period_start,
282
        period_end=period_end,
283
        tags=filter_tags,
284
    )
285
286
    # Find reports
287
    reports = get_last_reports_from_tasks(gmp=gmp, task_filter=task_filter)
288
289
    print("Combining {} found reports.".format(len(reports)))
290
291
    filter_term = ''
292
    if parsed_args.filter:
293
        filter_term = ' '.join(parsed_args.filter)
294
        print(
295
            'Filtering the results by the following filter term [{}]'.format(
296
                filter_term
297
            )
298
        )
299
    else:
300
        print('No result filter given.')
301
302
    # Combine the reports
303
    combined_report = combine_reports(
304
        gmp=gmp, reports=reports, filter_term=filter_term
305
    )
306
307
    # Import the generated report to GSM
308
    report = send_report(
309
        gmp=gmp,
310
        combined_report=combined_report,
311
        period_start=period_start,
312
        period_end=period_end,
313
    )
314
315
    print("Successfully imported new consolidated report [{}]".format(report))
316
317
318
if __name__ == '__gmp__':
319
    main(gmp, args)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable args does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable gmp does not seem to be defined.
Loading history...
320