Passed
Pull Request — master (#374)
by Jaspar
01:21
created

create-consolidated-report.gmp.main()   B

Complexity

Conditions 5

Size

Total Lines 70
Code Lines 43

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 43
nop 2
dl 0
loc 70
rs 8.3813
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
from uuid import UUID
20
from typing import List, Tuple
21
from datetime import date
22
from argparse import ArgumentParser, Namespace, RawTextHelpFormatter
23
from lxml import etree as e
24
from gvm.protocols.gmp import Gmp
25
from gvm.errors import GvmError
26
27
from gvmtools.helper import generate_uuid, error_and_exit
28
29
HELP_TEXT = (
30
    'This script creates a consolidated report and imports it to the GSM.'
31
    ' Usable with gvm-script (gvm-tools)'
32
)
33
34
35
def parse_tags(tags: List[str]) -> List[str]:
36
    """Parsing and validating the given tags
37
38
    tags (List): A list containing tags:
39
                 name, tag-id, name=value
40
41
    Returns a list containing tag="name", tag_id="id" ...
42
    """
43
    filter_tags = []
44
    for tag in tags:
45
        try:
46
            UUID(tag, version=4)
47
            filter_tags.append('tag_id="{}"'.format(tag))
48
        except ValueError:
49
            filter_tags.append('tag="{}"'.format(tag))
50
51
    return filter_tags
52
53
54
def parse_period(period: List[str]) -> Tuple[date, date]:
55
    """Parsing and validating the given time period
56
57
    period (List): A list with two entries containing
58
                   dates in the format yyyy/mm/dd
59
60
    Returns two date-objects containing the passed dates
61
    """
62
    try:
63
        s_year, s_month, s_day = map(int, period[0].split('/'))
64
    except ValueError as e:
65
        error_and_exit(
66
            'Start date [{}] is not a correct date format:\n{}'.format(
67
                period[0], e.args[0]
68
            )
69
        )
70
    try:
71
        e_year, e_month, e_day = map(int, period[1].split('/'))
72
    except ValueError as e:
73
        error_and_exit(
74
            'End date [{}] is not a correct date format:\n{}'.format(
75
                period[1], e.args[0]
76
            )
77
        )
78
79
    try:
80
        period_start = date(s_year, s_month, s_day)
81
    except ValueError as e:
82
        error_and_exit('Start date: {}'.format(e.args[0]))
83
84
    try:
85
        period_end = date(e_year, e_month, e_day)
86
    except ValueError as e:
87
        error_and_exit('End date: {}'.format(e.args[0]))
88
89
    if period_end < period_start:
90
        error_and_exit('The start date seems to after the end date.')
91
92
    return period_start, period_end
93
94
95
def parse_args(args: Namespace) -> Namespace:  # pylint: disable=unused-argument
96
    """ Parsing args ... """
97
98
    parser = ArgumentParser(
99
        prefix_chars='+',
100
        add_help=False,
101
        formatter_class=RawTextHelpFormatter,
102
        description=HELP_TEXT,
103
    )
104
105
    parser.add_argument(
106
        '+h',
107
        '++help',
108
        action='help',
109
        help='Show this help message and exit.',
110
    )
111
112
    parser.add_argument(
113
        '+p',
114
        '++period',
115
        nargs=2,
116
        type=str,
117
        required=True,
118
        dest='period',
119
        help=(
120
            'Choose a time period that is filtering the tasks.\n'
121
            'Use the date format YYYY/MM/DD.'
122
        ),
123
    )
124
125
    parser.add_argument(
126
        '+t',
127
        '++tags',
128
        nargs='+',
129
        type=str,
130
        dest='tags',
131
        help=(
132
            'Filter the tasks by given tag(s).\n'
133
            'If you pass more than on tag, they will be concatenated with '
134
            or '\n'
135
            'You can pass tag names, tag ids or tag name=value to this argument'
136
        ),
137
    )
138
139
    filter_args = parser.add_mutually_exclusive_group()
140
141
    filter_args.add_argument(
142
        '++filter-terms',
143
        nargs='+',
144
        type=str,
145
        dest='filter_term',
146
        help='Filter the results by given filter terms.',
147
    )
148
149
    filter_args.add_argument(
150
        '++filter-id',
151
        type=str,
152
        dest='filter_id',
153
        help='Filter the results by given filter id.',
154
    )
155
156
    script_args, _ = parser.parse_known_args()
157
    return script_args
158
159
160
def generate_task_filter(
161
    period_start: date, period_end: date, tags: List[str]
162
) -> str:
163
    """Generate the tasks filter
164
165
    period_start: the start date
166
    period_end: the end date
167
    tags: list of tags for the filter
168
169
    Returns an task filter string
170
    """
171
    task_filter = 'rows=-1 '
172
173
    # last is for the timestamp of the last report in that task
174
    period_filter = 'last>{0} and last<{1}'.format(
175
        period_start.isoformat(), period_end.isoformat()
176
    )
177
    filter_parts = []
178
    if tags:
179
        for tag in tags:
180
            filter_parts.append('{} and {}'.format(period_filter, tag))
181
182
        tags_filter = ' or '.join(filter_parts)
183
        task_filter += tags_filter
184
    else:
185
        task_filter += period_filter
186
187
    return task_filter
188
189
190
def get_last_reports_from_tasks(gmp: Gmp, task_filter: str) -> List[str]:
191
    """Get the last reports from the tasks in the given time period
192
193
    gmp: the GMP object
194
    task_filter: task filter string
195
196
    """
197
198
    print('Filtering the task with the filter term [{}]'.format(task_filter))
199
200
    tasks_xml = gmp.get_tasks(filter=task_filter)
201
    reports = []
202
    for report in tasks_xml.xpath('task/last_report/report/@id'):
203
        reports.append(str(report))
204
205
    # remove duplicates ... just in case
206
    reports = list(dict.fromkeys(reports))
207
208
    return reports
209
210
211
def combine_reports(
212
    gmp: Gmp, reports: List[str], filter_term: str, filter_id: str
213
) -> e.Element:
214
    """Combining the filtered ports, results and hosts of the given
215
    report ids into one new report.
216
217
    gmp: the GMP object
218
    reports (List): List of report_ids
219
    filter_term (str): the result filter string
220
    """
221
222
    new_uuid = generate_uuid()
223
    combined_report = e.Element(
224
        'report',
225
        {
226
            'id': new_uuid,
227
            'format_id': 'd5da9f67-8551-4e51-807b-b6a873d70e34',
228
            'extension': 'xml',
229
            'content_type': 'text/xml',
230
        },
231
    )
232
    report_elem = e.Element('report', {'id': new_uuid})
233
234
    ports_elem = e.Element('ports', {'start': '1', 'max': '-1'})
235
    results_elem = e.Element('results', {'start': '1', 'max': '-1'})
236
    combined_report.append(report_elem)
237
    report_elem.append(ports_elem)
238
    report_elem.append(results_elem)
239
240
    for report in reports:
241
        if filter_id:
242
            current_report = gmp.get_report(
243
                report, filter_id=filter_id, details=True
244
            )[0]
245
        else:
246
            current_report = gmp.get_report(
247
                report, filter=filter_term, details=True
248
            )[0]
249
        for port in current_report.xpath('report/ports/port'):
250
            ports_elem.append(port)
251
        for result in current_report.xpath('report/results/result'):
252
            results_elem.append(result)
253
        for host in current_report.xpath('host'):
254
            report_elem.append(host)
255
256
    return combined_report
257
258
259
def send_report(
260
    gmp: Gmp, combined_report: e.Element, period_start: date, period_end: date
261
) -> str:
262
    """Creating a container task and sending the combined report to the GSM
263
264
    gmp: the GMP object
265
    combined_report: the combined report xml object
266
    period_start: the start date
267
    period_end: the end date
268
    """
269
270
    task_name = 'Consolidated Report [{} - {}]'.format(period_start, period_end)
271
272
    res = gmp.create_container_task(
273
        name=task_name, comment='Created with gvm-tools.'
274
    )
275
276
    task_id = res.xpath('//@id')[0]
277
278
    combined_report = e.tostring(combined_report)
279
280
    res = gmp.import_report(combined_report, task_id=task_id, in_assets=True)
281
282
    return res.xpath('//@id')[0]
283
284
285
def main(gmp: Gmp, args: Namespace) -> None:
286
    # pylint: disable=undefined-variable
287
288
    parsed_args = parse_args(args=args)
289
290
    period_start, period_end = parse_period(period=parsed_args.period)
291
292
    print(
293
        'Combining reports from tasks within the time period [{}, {}]'.format(
294
            period_start, period_end
295
        )
296
    )
297
298
    # Generate Task Filter
299
    filter_tags = None
300
    if parsed_args.tags:
301
        filter_tags = parse_tags(tags=parsed_args.tags)
302
303
    task_filter = generate_task_filter(
304
        period_start=period_start,
305
        period_end=period_end,
306
        tags=filter_tags,
307
    )
308
309
    # Find reports
310
    reports = get_last_reports_from_tasks(gmp=gmp, task_filter=task_filter)
311
312
    print("Combining {} found reports.".format(len(reports)))
313
314
    filter_term = ''
315
    if parsed_args.filter_term:
316
        filter_term = ' '.join(parsed_args.filter_term)
317
        print(
318
            'Filtering the results by the following filter term [{}]'.format(
319
                filter_term
320
            )
321
        )
322
    elif parsed_args.filter_id:
323
        try:
324
            filter_xml = gmp.get_filter(filter_id=parsed_args.filter_id)[0]
325
            print(
326
                'Filtering the results by the following filter term '
327
                '[{}]'.format(filter_xml.find('term').text)
328
            )
329
        except GvmError:
330
            print(
331
                "Filter with the ID [{}] is not existing.".format(
332
                    parsed_args.filter_id
333
                )
334
            )
335
    else:
336
        print('No results filter given.')
337
338
    # Combine the reports
339
    combined_report = combine_reports(
340
        gmp=gmp,
341
        reports=reports,
342
        filter_term=filter_term,
343
        filter_id=parsed_args.filter_id,
344
    )
345
346
    # Import the generated report to GSM
347
    report = send_report(
348
        gmp=gmp,
349
        combined_report=combined_report,
350
        period_start=period_start,
351
        period_end=period_end,
352
    )
353
354
    print("Successfully imported new consolidated report [{}]".format(report))
355
356
357
if __name__ == '__gmp__':
358
    main(gmp, args)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable gmp does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable args does not seem to be defined.
Loading history...
359