Completed
Push — master ( da320a...e2fa58 )
by Björn
15s queued 12s
created

create-consolidated-report.gmp.main()   B

Complexity

Conditions 5

Size

Total Lines 72
Code Lines 44

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 44
nop 2
dl 0
loc 72
rs 8.3573
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
from uuid import UUID
20
from typing import List, Tuple
21
from datetime import date
22
from argparse import ArgumentParser, Namespace, RawTextHelpFormatter
23
from lxml import etree as e
24
from gvm.protocols.gmp import Gmp
25
from gvm.errors import GvmError
26
27
from gvmtools.helper import generate_uuid, error_and_exit
28
29
HELP_TEXT = (
30
    'This script creates a consolidated report and imports it to the GSM.'
31
    ' Usable with gvm-script (gvm-tools)'
32
)
33
34
35
def parse_tags(tags: List[str]) -> List[str]:
36
    """Parsing and validating the given tags
37
38
    tags (List): A list containing tags:
39
                 name, tag-id, name=value
40
41
    Returns a list containing tag="name", tag_id="id" ...
42
    """
43
    filter_tags = []
44
    for tag in tags:
45
        try:
46
            UUID(tag, version=4)
47
            filter_tags.append('tag_id="{}"'.format(tag))
48
        except ValueError:
49
            filter_tags.append('tag="{}"'.format(tag))
50
51
    return filter_tags
52
53
54
def parse_period(period: List[str]) -> Tuple[date, date]:
55
    """Parsing and validating the given time period
56
57
    period (List): A list with two entries containing
58
                   dates in the format yyyy/mm/dd
59
60
    Returns two date-objects containing the passed dates
61
    """
62
    try:
63
        s_year, s_month, s_day = map(int, period[0].split('/'))
64
    except ValueError as e:
65
        error_and_exit(
66
            'Start date [{}] is not a correct date format:\n{}'.format(
67
                period[0], e.args[0]
68
            )
69
        )
70
    try:
71
        e_year, e_month, e_day = map(int, period[1].split('/'))
72
    except ValueError as e:
73
        error_and_exit(
74
            'End date [{}] is not a correct date format:\n{}'.format(
75
                period[1], e.args[0]
76
            )
77
        )
78
79
    try:
80
        period_start = date(s_year, s_month, s_day)
81
    except ValueError as e:
82
        error_and_exit('Start date: {}'.format(e.args[0]))
83
84
    try:
85
        period_end = date(e_year, e_month, e_day)
86
    except ValueError as e:
87
        error_and_exit('End date: {}'.format(e.args[0]))
88
89
    if period_end < period_start:
90
        error_and_exit('The start date seems to after the end date.')
91
92
    return period_start, period_end
93
94
95
def parse_args(args: Namespace) -> Namespace:  # pylint: disable=unused-argument
96
    """ Parsing args ... """
97
98
    parser = ArgumentParser(
99
        prefix_chars='+',
100
        add_help=False,
101
        formatter_class=RawTextHelpFormatter,
102
        description=HELP_TEXT,
103
    )
104
105
    parser.add_argument(
106
        '+h',
107
        '++help',
108
        action='help',
109
        help='Show this help message and exit.',
110
    )
111
112
    parser.add_argument(
113
        '+p',
114
        '++period',
115
        nargs=2,
116
        type=str,
117
        required=True,
118
        dest='period',
119
        help=(
120
            'Choose a time period that is filtering the tasks.\n'
121
            'Use the date format YYYY/MM/DD.'
122
        ),
123
    )
124
125
    parser.add_argument(
126
        '+t',
127
        '++tags',
128
        nargs='+',
129
        type=str,
130
        dest='tags',
131
        help=(
132
            'Filter the tasks by given tag(s).\n'
133
            'If you pass more than on tag, they will be concatenated with '
134
            or '\n'
135
            'You can pass tag names, tag ids or tag name=value to this argument'
136
        ),
137
    )
138
139
    filter_args = parser.add_mutually_exclusive_group()
140
141
    filter_args.add_argument(
142
        '++filter-terms',
143
        nargs='+',
144
        type=str,
145
        dest='filter_term',
146
        help='Filter the results by given filter terms.',
147
    )
148
149
    filter_args.add_argument(
150
        '++filter-id',
151
        type=str,
152
        dest='filter_id',
153
        help='Filter the results by given filter id.',
154
    )
155
156
    script_args, _ = parser.parse_known_args()
157
    return script_args
158
159
160
def generate_task_filter(
161
    period_start: date, period_end: date, tags: List[str]
162
) -> str:
163
    """Generate the tasks filter
164
165
    period_start: the start date
166
    period_end: the end date
167
    tags: list of tags for the filter
168
169
    Returns an task filter string
170
    """
171
    task_filter = 'rows=-1 '
172
173
    # last is for the timestamp of the last report in that task
174
    period_filter = 'last>{0} and last<{1}'.format(
175
        period_start.isoformat(), period_end.isoformat()
176
    )
177
    filter_parts = []
178
    if tags:
179
        for tag in tags:
180
            filter_parts.append('{} and {}'.format(period_filter, tag))
181
182
        tags_filter = ' or '.join(filter_parts)
183
        task_filter += tags_filter
184
    else:
185
        task_filter += period_filter
186
187
    return task_filter
188
189
190
def get_last_reports_from_tasks(gmp: Gmp, task_filter: str) -> List[str]:
191
    """Get the last reports from the tasks in the given time period
192
193
    gmp: the GMP object
194
    task_filter: task filter string
195
196
    """
197
198
    print('Filtering the task with the filter term [{}]'.format(task_filter))
199
200
    tasks_xml = gmp.get_tasks(filter=task_filter)
201
    reports = []
202
    for report in tasks_xml.xpath('task/last_report/report/@id'):
203
        reports.append(str(report))
204
205
    # remove duplicates ... just in case
206
    reports = list(dict.fromkeys(reports))
207
208
    return reports
209
210
211
def combine_reports(
212
    gmp: Gmp, reports: List[str], filter_term: str, filter_id: str
213
) -> e.Element:
214
    """Combining the filtered ports, results and hosts of the given
215
    report ids into one new report.
216
217
    gmp: the GMP object
218
    reports (List): List of report_ids
219
    filter_term (str): the result filter string
220
    """
221
222
    new_uuid = generate_uuid()
223
    combined_report = e.Element(
224
        'report',
225
        {
226
            'id': new_uuid,
227
            'format_id': 'd5da9f67-8551-4e51-807b-b6a873d70e34',
228
            'extension': 'xml',
229
            'content_type': 'text/xml',
230
        },
231
    )
232
    report_elem = e.Element('report', {'id': new_uuid})
233
234
    ports_elem = e.Element('ports', {'start': '1', 'max': '-1'})
235
    results_elem = e.Element('results', {'start': '1', 'max': '-1'})
236
    combined_report.append(report_elem)
237
    report_elem.append(ports_elem)
238
    report_elem.append(results_elem)
239
240
    for report in reports:
241
        try:
242
            if filter_id:
243
                current_report = gmp.get_report(
244
                    report, filter_id=filter_id, details=True
245
                ).find('report')
246
            else:
247
                current_report = gmp.get_report(
248
                    report, filter=filter_term, details=True
249
                ).find('report')
250
        except GvmError:
251
            print("Could not find the report [{}]".format(report))
252
        for port in current_report.xpath('report/ports/port'):
253
            ports_elem.append(port)
254
        for result in current_report.xpath('report/results/result'):
255
            results_elem.append(result)
256
        for host in current_report.xpath('host'):
257
            report_elem.append(host)
258
259
    return combined_report
260
261
262
def send_report(
263
    gmp: Gmp, combined_report: e.Element, period_start: date, period_end: date
264
) -> str:
265
    """Creating a container task and sending the combined report to the GSM
266
267
    gmp: the GMP object
268
    combined_report: the combined report xml object
269
    period_start: the start date
270
    period_end: the end date
271
    """
272
273
    task_name = 'Consolidated Report [{} - {}]'.format(period_start, period_end)
274
275
    res = gmp.create_container_task(
276
        name=task_name, comment='Created with gvm-tools.'
277
    )
278
279
    task_id = res.xpath('//@id')[0]
280
281
    combined_report = e.tostring(combined_report)
282
283
    res = gmp.import_report(combined_report, task_id=task_id, in_assets=True)
284
285
    return res.xpath('//@id')[0]
286
287
288
def main(gmp: Gmp, args: Namespace) -> None:
289
    # pylint: disable=undefined-variable
290
291
    parsed_args = parse_args(args=args)
292
293
    period_start, period_end = parse_period(period=parsed_args.period)
294
295
    print(
296
        'Combining reports from tasks within the time period [{}, {}]'.format(
297
            period_start, period_end
298
        )
299
    )
300
301
    # Generate Task Filter
302
    filter_tags = None
303
    if parsed_args.tags:
304
        filter_tags = parse_tags(tags=parsed_args.tags)
305
306
    task_filter = generate_task_filter(
307
        period_start=period_start,
308
        period_end=period_end,
309
        tags=filter_tags,
310
    )
311
312
    # Find reports
313
    reports = get_last_reports_from_tasks(gmp=gmp, task_filter=task_filter)
314
315
    print("Combining {} found reports.".format(len(reports)))
316
317
    filter_term = ''
318
    if parsed_args.filter_term:
319
        filter_term = ' '.join(parsed_args.filter_term)
320
        print(
321
            'Filtering the results by the following filter term [{}]'.format(
322
                filter_term
323
            )
324
        )
325
    elif parsed_args.filter_id:
326
        try:
327
            filter_xml = gmp.get_filter(filter_id=parsed_args.filter_id).find(
328
                'filter'
329
            )
330
            print(
331
                'Filtering the results by the following filter term '
332
                '[{}]'.format(filter_xml.find('term').text)
333
            )
334
        except GvmError:
335
            print(
336
                "Filter with the ID [{}] is not existing.".format(
337
                    parsed_args.filter_id
338
                )
339
            )
340
    else:
341
        print('No results filter given.')
342
343
    # Combine the reports
344
    combined_report = combine_reports(
345
        gmp=gmp,
346
        reports=reports,
347
        filter_term=filter_term,
348
        filter_id=parsed_args.filter_id,
349
    )
350
351
    # Import the generated report to GSM
352
    report = send_report(
353
        gmp=gmp,
354
        combined_report=combined_report,
355
        period_start=period_start,
356
        period_end=period_end,
357
    )
358
359
    print("Successfully imported new consolidated report [{}]".format(report))
360
361
362
if __name__ == '__gmp__':
363
    main(gmp, args)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable gmp does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable args does not seem to be defined.
Loading history...
364