Passed
Pull Request — master (#370)
by Jaspar
01:15
created

create-monthly-report.gmp.combine_reports()   B

Complexity

Conditions 5

Size

Total Lines 39
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 26
nop 3
dl 0
loc 39
rs 8.7893
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
from uuid import UUID
20
from typing import List
21
from datetime import date
22
from argparse import ArgumentParser, RawTextHelpFormatter
23
from lxml import etree as e
24
from gvm.xml import pretty_print
25
26
from gvmtools.helper import generate_uuid, error_and_exit
27
28
HELP_TEXT = (
29
    'This script creates a consolidated report and imports it to the GSM.'
30
    ' Usable with gvm-script (gvm-tools)'
31
)
32
33
34
def get_last_reports_from_tasks(gmp, period_start, period_end, tags: List):
35
    """Get the last reports from the tasks in the given time period
36
37
    gmp: the GMP object
38
    period_start: the start date
39
    period_end: the end date
40
    tags: list of tags for the filter
41
42
    """
43
44
    task_filter = 'rows=-1 '
45
    period_filter = 'created>{0} and created<{1}'.format(
46
        period_start.isoformat(), period_end.isoformat()
47
    )
48
    filter_parts = []
49
    if tags:
50
        for tag in tags:
51
            filter_parts.append('{} and {}'.format(period_filter, tag))
52
53
        tags_filter = ' or '.join(filter_parts)
54
        task_filter += tags_filter
55
    else:
56
        task_filter += period_filter
57
58
    print('Filtering the task with the filter term [{}]'.format(task_filter))
59
60
    tasks_xml = gmp.get_tasks(filter=task_filter)
61
    reports = []
62
    for report in tasks_xml.xpath('task/last_report/report/@id'):
63
        reports.append(str(report))
64
65
    # remove duplicates ... just in case
66
    reports = list(dict.fromkeys(reports))
67
68
    return reports
69
70
71
def combine_reports(gmp, reports: List, filter_term: str):
72
    """Combining the filtered ports, results and hosts of the given
73
    report ids into one new report.
74
75
    gmp: the GMP object
76
    reports (List): List of report_ids
77
    filter_term (str): the result filter string
78
    """
79
80
    new_uuid = generate_uuid()
81
    combined_report = e.Element(
82
        'report',
83
        {
84
            'id': new_uuid,
85
            'format_id': 'd5da9f67-8551-4e51-807b-b6a873d70e34',
86
            'extension': 'xml',
87
            'content_type': 'text/xml',
88
        },
89
    )
90
    report_elem = e.Element('report', {'id': new_uuid})
91
92
    ports_elem = e.Element('ports', {'start': '1', 'max': '-1'})
93
    results_elem = e.Element('results', {'start': '1', 'max': '-1'})
94
    combined_report.append(report_elem)
95
    report_elem.append(results_elem)
96
97
    for report in reports:
98
        current_report = gmp.get_report(
99
            report, filter=filter_term, details=True
100
        )[0]
101
        pretty_print(current_report.find('report').find('result_count'))
102
        for port in current_report.xpath('report/ports/port'):
103
            ports_elem.append(port)
104
        for result in current_report.xpath('report/results/result'):
105
            results_elem.append(result)
106
        for host in current_report.xpath('report/host'):
107
            report_elem.append(host)
108
109
    return combined_report
110
111
112
def send_report(gmp, combined_report, period_start, period_end):
113
    """Creating a container task and sending the combined report to the GSM
114
115
    gmp: the GMP object
116
    combined_report: the combined report xml object
117
    period_start: the start date
118
    period_end: the end date
119
    """
120
121
    task_name = 'Consolidated Report [{} - {}]'.format(period_start, period_end)
122
123
    res = gmp.create_container_task(
124
        name=task_name, comment='Created with gvm-tools.'
125
    )
126
127
    task_id = res.xpath('//@id')[0]
128
129
    combined_report = e.tostring(combined_report)
130
131
    res = gmp.import_report(combined_report, task_id=task_id)
132
133
    return res.xpath('//@id')[0]
134
135
136
def parse_tags(tags: List):
137
    """Parsing and validating the given tags
138
139
    tags (List): A list containing tags:
140
                 name, tag-id, name=value
141
142
    Returns a list containing tag="name", tag_id="id" ...
143
    """
144
    filter_tags = []
145
    for tag in tags:
146
        try:
147
            UUID(tag, version=4)
148
            filter_tags.append('tag_id="{}"'.format(tag))
149
        except ValueError:
150
            filter_tags.append('tag="{}"'.format(tag))
151
152
    return filter_tags
153
154
155
def parse_period(period: List):
156
    """Parsing and validating the given time period
157
158
    period (List): A list with two entries containing
159
                   dates in the format yyyy/mm/dd
160
161
    Returns two date-objects containing the passed dates
162
    """
163
    try:
164
        s_year, s_month, s_day = map(int, period[0].split('/'))
165
    except ValueError as e:
166
        error_and_exit(
167
            'Start date [{}] is not a correct date format:\n{}'.format(
168
                period[0], e.args[0]
169
            )
170
        )
171
    try:
172
        e_year, e_month, e_day = map(int, period[1].split('/'))
173
    except ValueError as e:
174
        error_and_exit(
175
            'End date [{}] is not a correct date format:\n{}'.format(
176
                period[1], e.args[0]
177
            )
178
        )
179
180
    try:
181
        period_start = date(s_year, s_month, s_day)
182
    except ValueError as e:
183
        error_and_exit('Start date: {}'.format(e.args[0]))
184
185
    try:
186
        period_end = date(e_year, e_month, e_day)
187
    except ValueError as e:
188
        error_and_exit('End date: {}'.format(e.args[0]))
189
190
    if period_end < period_start:
191
        error_and_exit('The start date seems to after the end date.')
192
193
    return period_start, period_end
194
195
196
def parse_args(args):  # pylint: disable=unused-argument
197
    """ Parsing args ... """
198
199
    parser = ArgumentParser(
200
        prefix_chars='+',
201
        add_help=False,
202
        formatter_class=RawTextHelpFormatter,
203
        description=HELP_TEXT,
204
    )
205
206
    parser.add_argument(
207
        '+h',
208
        '++help',
209
        action='help',
210
        help='Show this help message and exit.',
211
    )
212
213
    parser.add_argument(
214
        '+p',
215
        '++period',
216
        nargs=2,
217
        type=str,
218
        required=True,
219
        dest='period',
220
        help=(
221
            'Choose a time period that is filtering the tasks.\n'
222
            'Use the date format YYYY/MM/DD.'
223
        ),
224
    )
225
226
    parser.add_argument(
227
        '+t',
228
        '++tags',
229
        nargs='+',
230
        type=str,
231
        dest='tags',
232
        help=(
233
            'Filter the tasks by given tag(s).\n'
234
            'If you pass more than on tag, they will be concatenated with '
235
            or '\n'
236
            'You can pass tag names, tag ids or tag name=value to this argument'
237
        ),
238
    )
239
240
    parser.add_argument(
241
        '+f',
242
        '++filter',
243
        nargs='+',
244
        type=str,
245
        dest='filter',
246
        help='Filter the results by given filter(s).',
247
    )
248
249
    script_args, _ = parser.parse_known_args()
250
    return script_args
251
252
253
def main(gmp, args):
254
    # pylint: disable=undefined-variable
255
256
    parsed_args = parse_args(args=args)
257
258
    period_start, period_end = parse_period(period=parsed_args.period)
259
260
    print(
261
        'Combining reports from tasks within the time period [{}, {}]'.format(
262
            period_start, period_end
263
        )
264
    )
265
266
    filter_tags = None
267
    if parsed_args.tags:
268
        filter_tags = parse_tags(tags=parsed_args.tags)
269
270
    reports = get_last_reports_from_tasks(
271
        gmp=gmp,
272
        period_start=period_start,
273
        period_end=period_end,
274
        tags=filter_tags,
275
    )
276
277
    print("Combining {} found reports.".format(len(reports)))
278
279
    filter_term = ''
280
    if parsed_args.filter:
281
        filter_term = ' '.join(parsed_args.filter)
282
        print(
283
            'Filtering the results by the following filter term [{}]'.format(
284
                filter_term
285
            )
286
        )
287
    else:
288
        print('No result filter given.')
289
290
    combined_report = combine_reports(
291
        gmp=gmp, reports=reports, filter_term=filter_term
292
    )
293
294
    send_report(
295
        gmp=gmp,
296
        combined_report=combined_report,
297
        period_start=period_start,
298
        period_end=period_end,
299
    )
300
301
302
if __name__ == '__gmp__':
303
    main(gmp, args)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable gmp does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable args does not seem to be defined.
Loading history...
304