Completed
Push — master ( 92689a...85289d )
by Björn
14s queued 11s
created

create-consolidated-report.gmp.main()   A

Complexity

Conditions 3

Size

Total Lines 54
Code Lines 30

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 30
nop 2
dl 0
loc 54
rs 9.16
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
from uuid import UUID
20
from typing import List, Tuple
21
from datetime import date
22
from argparse import ArgumentParser, Namespace, RawTextHelpFormatter
23
from lxml import etree as e
24
from gvm.protocols.gmp import Gmp
25
26
from gvmtools.helper import generate_uuid, error_and_exit
27
28
HELP_TEXT = (
29
    'This script creates a consolidated report and imports it to the GSM.'
30
    ' Usable with gvm-script (gvm-tools)'
31
)
32
33
34
def parse_tags(tags: List[str]) -> List[str]:
35
    """Parsing and validating the given tags
36
37
    tags (List): A list containing tags:
38
                 name, tag-id, name=value
39
40
    Returns a list containing tag="name", tag_id="id" ...
41
    """
42
    filter_tags = []
43
    for tag in tags:
44
        try:
45
            UUID(tag, version=4)
46
            filter_tags.append('tag_id="{}"'.format(tag))
47
        except ValueError:
48
            filter_tags.append('tag="{}"'.format(tag))
49
50
    return filter_tags
51
52
53
def parse_period(period: List[str]) -> Tuple[date, date]:
54
    """Parsing and validating the given time period
55
56
    period (List): A list with two entries containing
57
                   dates in the format yyyy/mm/dd
58
59
    Returns two date-objects containing the passed dates
60
    """
61
    try:
62
        s_year, s_month, s_day = map(int, period[0].split('/'))
63
    except ValueError as e:
64
        error_and_exit(
65
            'Start date [{}] is not a correct date format:\n{}'.format(
66
                period[0], e.args[0]
67
            )
68
        )
69
    try:
70
        e_year, e_month, e_day = map(int, period[1].split('/'))
71
    except ValueError as e:
72
        error_and_exit(
73
            'End date [{}] is not a correct date format:\n{}'.format(
74
                period[1], e.args[0]
75
            )
76
        )
77
78
    try:
79
        period_start = date(s_year, s_month, s_day)
80
    except ValueError as e:
81
        error_and_exit('Start date: {}'.format(e.args[0]))
82
83
    try:
84
        period_end = date(e_year, e_month, e_day)
85
    except ValueError as e:
86
        error_and_exit('End date: {}'.format(e.args[0]))
87
88
    if period_end < period_start:
89
        error_and_exit('The start date seems to after the end date.')
90
91
    return period_start, period_end
92
93
94
def parse_args(args: Namespace) -> Namespace:  # pylint: disable=unused-argument
95
    """ Parsing args ... """
96
97
    parser = ArgumentParser(
98
        prefix_chars='+',
99
        add_help=False,
100
        formatter_class=RawTextHelpFormatter,
101
        description=HELP_TEXT,
102
    )
103
104
    parser.add_argument(
105
        '+h',
106
        '++help',
107
        action='help',
108
        help='Show this help message and exit.',
109
    )
110
111
    parser.add_argument(
112
        '+p',
113
        '++period',
114
        nargs=2,
115
        type=str,
116
        required=True,
117
        dest='period',
118
        help=(
119
            'Choose a time period that is filtering the tasks.\n'
120
            'Use the date format YYYY/MM/DD.'
121
        ),
122
    )
123
124
    parser.add_argument(
125
        '+t',
126
        '++tags',
127
        nargs='+',
128
        type=str,
129
        dest='tags',
130
        help=(
131
            'Filter the tasks by given tag(s).\n'
132
            'If you pass more than on tag, they will be concatenated with '
133
            or '\n'
134
            'You can pass tag names, tag ids or tag name=value to this argument'
135
        ),
136
    )
137
138
    parser.add_argument(
139
        '+f',
140
        '++filter',
141
        nargs='+',
142
        type=str,
143
        dest='filter',
144
        help='Filter the results by given filter(s).',
145
    )
146
147
    script_args, _ = parser.parse_known_args()
148
    return script_args
149
150
151
def generate_task_filter(
152
    period_start: date, period_end: date, tags: List[str]
153
) -> str:
154
    """Generate the tasks filter
155
156
    period_start: the start date
157
    period_end: the end date
158
    tags: list of tags for the filter
159
160
    Returns an task filter string
161
    """
162
    task_filter = 'rows=-1 '
163
    period_filter = 'created>{0} and created<{1}'.format(
164
        period_start.isoformat(), period_end.isoformat()
165
    )
166
    filter_parts = []
167
    if tags:
168
        for tag in tags:
169
            filter_parts.append('{} and {}'.format(period_filter, tag))
170
171
        tags_filter = ' or '.join(filter_parts)
172
        task_filter += tags_filter
173
    else:
174
        task_filter += period_filter
175
176
    return task_filter
177
178
179
def get_last_reports_from_tasks(gmp: Gmp, task_filter: str) -> List[str]:
180
    """Get the last reports from the tasks in the given time period
181
182
    gmp: the GMP object
183
    task_filter: task filter string
184
185
    """
186
187
    print('Filtering the task with the filter term [{}]'.format(task_filter))
188
189
    tasks_xml = gmp.get_tasks(filter=task_filter)
190
    reports = []
191
    for report in tasks_xml.xpath('task/last_report/report/@id'):
192
        reports.append(str(report))
193
194
    # remove duplicates ... just in case
195
    reports = list(dict.fromkeys(reports))
196
197
    return reports
198
199
200
def combine_reports(
201
    gmp: Gmp, reports: List[str], filter_term: str
202
) -> e.Element:
203
    """Combining the filtered ports, results and hosts of the given
204
    report ids into one new report.
205
206
    gmp: the GMP object
207
    reports (List): List of report_ids
208
    filter_term (str): the result filter string
209
    """
210
211
    new_uuid = generate_uuid()
212
    combined_report = e.Element(
213
        'report',
214
        {
215
            'id': new_uuid,
216
            'format_id': 'd5da9f67-8551-4e51-807b-b6a873d70e34',
217
            'extension': 'xml',
218
            'content_type': 'text/xml',
219
        },
220
    )
221
    report_elem = e.Element('report', {'id': new_uuid})
222
223
    ports_elem = e.Element('ports', {'start': '1', 'max': '-1'})
224
    results_elem = e.Element('results', {'start': '1', 'max': '-1'})
225
    combined_report.append(report_elem)
226
    report_elem.append(ports_elem)
227
    report_elem.append(results_elem)
228
229
    for report in reports:
230
        current_report = gmp.get_report(
231
            report, filter=filter_term, details=True
232
        )[0]
233
        for port in current_report.xpath('report/ports/port'):
234
            ports_elem.append(port)
235
        for result in current_report.xpath('report/results/result'):
236
            results_elem.append(result)
237
        for host in current_report.xpath('host'):
238
            report_elem.append(host)
239
240
    return combined_report
241
242
243
def send_report(
244
    gmp: Gmp, combined_report: e.Element, period_start: date, period_end: date
245
) -> str:
246
    """Creating a container task and sending the combined report to the GSM
247
248
    gmp: the GMP object
249
    combined_report: the combined report xml object
250
    period_start: the start date
251
    period_end: the end date
252
    """
253
254
    task_name = 'Consolidated Report [{} - {}]'.format(period_start, period_end)
255
256
    res = gmp.create_container_task(
257
        name=task_name, comment='Created with gvm-tools.'
258
    )
259
260
    task_id = res.xpath('//@id')[0]
261
262
    combined_report = e.tostring(combined_report)
263
264
    res = gmp.import_report(combined_report, task_id=task_id)
265
266
    return res.xpath('//@id')[0]
267
268
269
def main(gmp: Gmp, args: Namespace) -> None:
270
    # pylint: disable=undefined-variable
271
272
    parsed_args = parse_args(args=args)
273
274
    period_start, period_end = parse_period(period=parsed_args.period)
275
276
    print(
277
        'Combining reports from tasks within the time period [{}, {}]'.format(
278
            period_start, period_end
279
        )
280
    )
281
282
    filter_tags = None
283
    if parsed_args.tags:
284
        filter_tags = parse_tags(tags=parsed_args.tags)
285
286
    # Generate Task Filter
287
    task_filter = generate_task_filter(
288
        period_start=period_start,
289
        period_end=period_end,
290
        tags=filter_tags,
291
    )
292
293
    # Find reports
294
    reports = get_last_reports_from_tasks(gmp=gmp, task_filter=task_filter)
295
296
    print("Combining {} found reports.".format(len(reports)))
297
298
    filter_term = ''
299
    if parsed_args.filter:
300
        filter_term = ' '.join(parsed_args.filter)
301
        print(
302
            'Filtering the results by the following filter term [{}]'.format(
303
                filter_term
304
            )
305
        )
306
    else:
307
        print('No result filter given.')
308
309
    # Combine the reports
310
    combined_report = combine_reports(
311
        gmp=gmp, reports=reports, filter_term=filter_term
312
    )
313
314
    # Import the generated report to GSM
315
    report = send_report(
316
        gmp=gmp,
317
        combined_report=combined_report,
318
        period_start=period_start,
319
        period_end=period_end,
320
    )
321
322
    print("Successfully imported new consolidated report [{}]".format(report))
323
324
325
if __name__ == '__gmp__':
326
    main(gmp, args)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable args does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable gmp does not seem to be defined.
Loading history...
327