Passed
Pull Request — master (#376)
by Jaspar
01:26
created

create_cve_report_from_json.gmp.Parser.parse()   C

Complexity

Conditions 10

Size

Total Lines 60
Code Lines 39

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
eloc 39
nop 1
dl 0
loc 60
rs 5.9999
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like create_cve_report_from_json.gmp.Parser.parse() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
import json
20
import csv
21
import datetime
22
import time
23
24
from pathlib import Path
25
from typing import Dict, Tuple
26
from argparse import ArgumentParser, RawTextHelpFormatter, Namespace
27
from lxml import etree as e
28
from cpe import CPE
29
from gvm.protocols.gmp import Gmp
30
from gvm.protocols.latest import InfoType
31
from gvm.xml import pretty_print
32
from gvmtools.helper import generate_uuid, error_and_exit
33
34
35
HELP_TEXT = (
36
    'This script creates a cve report from a JSON document.\n'
37
    'The JSON document needs to be formatted like this: '
38
    '['
39
    '    {'
40
    '        "headings": ['
41
    '            "name",'
42
    '            "IP Address",'
43
    '            "IP range",'
44
    '            "Operating System",'
45
    '            "CPE String 23",'
46
    '            "Name",'
47
    '            "Full Version (version)",'
48
    '            "CPE String 23"'
49
    '        ],'
50
    '        ...,'
51
    '        "results": ['
52
    '            ['
53
    '                "foo",'
54
    '                "127.0.0.1",'
55
    '                "127.0.0.1/32",'
56
    '                "Some Windows",'
57
    '                "cpe:2.3:o:microsoft:some_windows:-:*:*:*:*:*:*:*",'
58
    '                ['
59
    '                    "Some Microsoftware",'
60
    '                    .'
61
    '                ],'
62
    '                ['
63
    '                    "0.1",'
64
    '                    ...'
65
    '                ],'
66
    '                ['
67
    '                    "cpe:2.3:a:microsoft:microsoftware:0.1:*:*:*:*:*:*:*",'
68
    '                    ...'
69
    '                ]'
70
    '            ],'
71
    '        ]'
72
    '    }'
73
    ']'
74
    ' Usable with gvm-script (gvm-tools). Help: gvm-script -h'
75
)
76
77
78
class ProgressBar:
79
    def __init__(self, length: int, count: int, pl_name: str):
80
        self.length = length
81
        self.count = count
82
        self.current = 0
83
        self.start_time = datetime.datetime.now()
84
        self.entities = pl_name
85
86
        self.eta = '???'
87
        self.seq = ''
88
        self.end = ''
89
90
        self._print()
91
        self.seq = '\r'
92
93
    def _leading_zeros(self) -> str:
94
        return (len(str(self.count)) - len(str(self.current))) * ' '
95
96
    def _bar(self):
97
        points = int(self.length * (self.current / self.count))
98
        return str("·" * points + " " * (self.length - points))
99
100
    def _print(self):
101
        print(
102
            f'{self.seq}[{self._bar()}] | '
103
            f'{self._leading_zeros()}{str(self.current)}/{str(self.count)} '
104
            f'{self.entities} processed. | '
105
            f'ETA: {self.eta}',
106
            flush=True,
107
            end=self.end,
108
        )
109
110
    def update(self, progressed):
111
        self.current = progressed
112
        elapsed = datetime.datetime.now() - self.start_time
113
        self.eta = str(elapsed / self.current * (self.count - self.current))
114
        self._print()
115
116
    def done(self):
117
        self.current = self.count
118
        self.eta = 'Done!         '
119
        self.end = '\n'
120
        self._print()
121
122
123
class ListGenerator:
124
    """
125
    Creating the CPE to CVE list used for the report generation
126
    in this this script.
127
    """
128
129
    def __init__(self, gmp: Gmp, filename: Path, recreate: bool):
130
        self.gmp = gmp
131
        if filename.exists():
132
            if recreate:
133
                filename.unlink()
134
            else:
135
                error_and_exit(
136
                    f'The file "{filename}" already exists. '
137
                    'If you want to delete the old list and '
138
                    'recreate the list run with "++create-list '
139
                    f'recreate +f {filename}"'
140
                )
141
        self.file = open(filename, 'w')
142
143
    def _cpe_to_cve(self, resp):
144
        """ Write the CPEs and CVEs to the list """
145
        cve_tags = resp.findall('info')
146
        for cve_tag in cve_tags[
147
            :-1
148
        ]:  # -1 because the last info tag is a wrongy. :D
149
            cve = None
150
            cpes = None
151
            if 'id' in cve_tag.attrib:
152
                cve = cve_tag.attrib['id']
153
                cpes = cve_tag.find('cve').find('products').text
154
                cvss = cve_tag.find('cve').find('cvss').text
155
                if cpes:
156
                    for cpe in cpes.strip().split(' '):
157
                        print(
158
                            f"'{cpe}','{cve}','{cvss}'",
159
                            file=self.file,
160
                            end='\n',
161
                        )
162
163
    def create_cve_list(self, step: int = 3000):
164
        """Creates a CPE to CVE list in a CSV format:
165
        'cpe', 'cve', 'cvss'
166
        The CPE's have a 1-to-1-relation to the CVE's
167
        so CPE's can appear more then once in this
168
        list
169
170
        step(int): How many CVEs will be requested from the GSM
171
                   in one request. Be careful with higher values.
172
                   You will need to set the default timeout in
173
                   gvm-tools higher if you set step >3000. A higher
174
                   step will make the list generation faster.
175
        """
176
        resp = self.gmp.get_info_list(info_type=InfoType.CVE, filter='rows=1')
177
        count = resp.find('info_count').text
178
179
        first = 0
180
        count = int(count)
181
        print(f'Creating CPE to CVE list. Found {count} CVE\'s.')
182
        progress_bar = ProgressBar(length=100, count=count, pl_name='CVEs')
183
        print(f'[{" " * 50}] | ({str(first)}/{count})', flush=True, end='')
184
        while (first + step) < count:
185
            resp = self.gmp.get_info_list(
186
                info_type=InfoType.CVE, filter=f'rows={step} first={first}'
187
            )
188
            # refresh the counters
189
            counter = counter - step
0 ignored issues
show
introduced by
The variable counter does not seem to be defined in case the while loop on line 184 is not entered. Are you sure this can never be the case?
Loading history...
190
            first = first + step
191
192
            self._cpe_to_cve(resp)
193
            progress_bar.update(progressed=first)
194
195
        # find the rest
196
        resp = self.gmp.get_info_list(
197
            info_type=InfoType.CVE,
198
            filter=f'rows={counter - first} first={first}',
199
        )
200
        self._cpe_to_cve(resp)
201
        progress_bar.done()
202
203
        self.file.close()
204
205
206
class Report:
207
    def __init__(self, gmp):
208
        self.results = e.Element('results', {'start': '1', 'max': '-1'})
209
        self.hosts = []
210
        self.report = None
211
212
        self.gmp = gmp
213
214
    def finish_report(self):
215
        report_format_id = 'd5da9f67-8551-4e51-807b-b6a873d70e34'
216
        self.report_id = generate_uuid()
217
        self.report = e.Element(
218
            'report',
219
            {
220
                'id': self.report_id,
221
                'format_id': report_format_id,
222
                'extension': 'xml',
223
                'content_type': 'text/xml',
224
            },
225
        )
226
        owner_elem = e.SubElement(self.report, 'owner')
227
        e.SubElement(owner_elem, 'name').text = ''
228
        e.SubElement(self.report, 'name').text = 'Report created from JSON-File'
229
230
        inner_report = e.SubElement(
231
            self.report, 'report', {'id': self.report_id}
232
        )
233
        ports_elem = e.SubElement(
234
            inner_report, 'ports', {'start': '1', 'max': '-1'}
235
        )
236
237
        inner_report.append(ports_elem)
238
        inner_report.append(self.results)
239
        inner_report.extend(self.hosts)
240
        self.report.append(inner_report)
241
242
    def send_report(self) -> str:
243
        the_time = time.strftime("%Y/%m/%d-%H:%M:%S")
244
        task_id = ''
245
        task_name = "CVE_Scan_Report_{}".format(the_time)
246
247
        res = self.gmp.create_container_task(
248
            name=task_name, comment="Created with gvm-tools."
249
        )
250
251
        task_id = res.xpath('//@id')[0]
252
253
        report = e.tostring(self.report)
254
255
        res = self.gmp.import_report(report, task_id=task_id, in_assets=True)
256
257
        return res.xpath('//@id')[0]
258
259
    def generate_host_detail(
260
        self,
261
        name,
262
        value,
263
        source_name=None,
264
        source_description=None,
265
        source_type=None,
266
    ):
267
        """ Generating a host details xml element """
268
        host_detail_elem = e.Element('detail')
269
        e.SubElement(host_detail_elem, 'name').text = name
270
        e.SubElement(host_detail_elem, 'value').text = value
271
272
        if source_name:
273
            source_elem = e.SubElement(host_detail_elem, 'source')
274
            e.SubElement(source_elem, 'name').text = source_name
275
            if source_type:
276
                e.SubElement(source_elem, 'type').text = source_type
277
            if source_description:
278
                e.SubElement(
279
                    source_elem, 'description'
280
                ).text = source_description
281
282
        return host_detail_elem
283
284
    def add_results(self, ip, hostname, cpes: Dict, cpeo, os, date_time):
285
        host_id = generate_uuid()
286
        source_name = 'gvm-tools'
287
        date_format = '%Y-%m-%dT%H:%M:%S'
288
        date_time = f'{date_time.strftime(date_format)}Z'
289
290
        host_elem = e.Element('host')
291
        e.SubElement(host_elem, 'ip').text = ip
292
        e.SubElement(host_elem, 'asset', {'asset_id': host_id})
293
        e.SubElement(host_elem, 'start').text = date_time
294
        e.SubElement(host_elem, 'end').text = date_time
295
        host_result_count_elem = e.SubElement(host_elem, 'result_count')
296
        host_elem.append(
297
            self.generate_host_detail(
298
                name='hostname', value=hostname, source_name=source_name
299
            )
300
        )
301
        host_elem.append(
302
            self.generate_host_detail(
303
                name='best_os_txt',
304
                value=os,
305
                source_name=source_name,
306
                source_description="Host Details",
307
            )
308
        )
309
        host_elem.append(
310
            self.generate_host_detail(
311
                name='best_os_cpe',
312
                value=cpeo,
313
                source_name=source_name,
314
                source_description="Host Details",
315
            )
316
        )
317
318
        host_details = 0
319
        for cpe, cves in cpes.items():
320
            if cves:
321
                for cve, cvss in cves.items():
322
                    result_id = generate_uuid()
323
                    result = e.Element('result', {'id': result_id})
324
                    e.SubElement(result, 'name').text = f'Result for host {ip}'
325
                    e.SubElement(
326
                        result, 'comment'
327
                    ).text = 'Imported with gvm-tools'
328
                    e.SubElement(result, 'modification_time').text = date_time
329
                    e.SubElement(result, 'creation_time').text = date_time
330
                    detect_elem = e.Element('detection')
331
                    detect_result_elem = e.SubElement(
332
                        detect_elem, 'result', {'id': result_id}
333
                    )
334
                    details_elem = e.SubElement(detect_result_elem, 'details')
335
                    # We need to add the detection details here
336
                    # but actually they are not imported to GSM anyways ...
337
                    e.SubElement(details_elem, 'detail')
338
339
                    result_host_elem = e.Element('host')
340
                    result_host_elem.text = ip
341
                    e.SubElement(
342
                        result_host_elem, 'asset', {'asset_id': host_id}
343
                    )
344
                    e.SubElement(result_host_elem, 'hostname').text = hostname
345
                    result.append(result_host_elem)
346
347
                    nvt_elem = e.Element('nvt', {'oid': cve})
348
                    e.SubElement(nvt_elem, 'type').text = 'cve'
349
                    e.SubElement(nvt_elem, 'name').text = cve
350
                    e.SubElement(nvt_elem, 'cvss_base').text = str(cvss)
351
                    e.SubElement(nvt_elem, 'cve').text = cve
352
353
                    result.append(nvt_elem)
354
355
                    e.SubElement(result, 'severity').text = str(cvss)
356
357
                    host_elem.append(
358
                        self.generate_host_detail(
359
                            name='App',
360
                            value=cpe,
361
                            source_type='cve',
362
                            source_name=cve,
363
                            source_description='CVE Scanner',
364
                        )
365
                    )
366
                    host_details = host_details + 1
367
368
                    self.results.append(result)
369
        e.SubElement(host_result_count_elem, 'page').text = str(host_details)
370
        self.hosts.append(host_elem)
371
372
373
class Parser:
374
    """Class handles the Parsing from JSON to a Report"""
375
376
    def __init__(self, gmp: Gmp, json_file: Path, cpe_list: Path) -> None:
377
        try:
378
            self.cpe_list = open(cpe_list, 'r')
379
            self.reader = csv.reader(self.cpe_list)
380
        except FileNotFoundError:
381
            error_and_exit(
382
                f'There is no file "{cpe_list}". '
383
                'Maybe you need to create a list first. Run with '
384
                f'argument "++create-list +f {cpe_list}", to create '
385
                'a new list, or pass the correct location of an existing list.'
386
            )
387
        self.gmp = gmp
388
        try:
389
            self.json_fp = open(json_file)
390
            self.json_dump = json.load(self.json_fp)[0]['results']
391
        except FileNotFoundError:
392
            error_and_exit(f'There is no file "{json_file}".')
393
        except json.JSONDecodeError as e:
394
            error_and_exit(f'The JSON seems to be invalid: {e.args[0]}')
395
396
    def parse(self) -> Report:
397
        """Loads an JSON file and extracts host informations:
398
399
        Args:
400
            host_dump: the dumped json results, containing a hostname,
401
                    host_ip, host_ip_range, host_operating_system,
402
                    host_os_cpe, arrays of found_app, app_version,
403
                    app_cpe
404
        """
405
406
        report = Report(gmp=gmp)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable gmp does not seem to be defined.
Loading history...
407
408
        date_time = datetime.datetime.now()
409
410
        count = len(self.json_dump)
411
        progressed = 0
412
        print(f'Found {str(count)} hosts:')
413
414
        progressbar = ProgressBar(length=100, count=count, pl_name="Hosts")
415
416
        for entry in self.json_dump:
417
            if entry[3] is None:
418
                error_and_exit("The JSON format is not correct.")
419
            name = entry[0]
420
            # print(f"Creating Results for the host {name}")
421
            ips = entry[1]
422
            if isinstance(ips, str):
423
                ips = [ips]
424
            os = entry[3]
425
            os_cpe = convert_cpe23_to_cpe22(entry[4])[0]
426
427
            cpes = []
428
            # entry[7] should be the CPEs ...
429
            if entry[7] is not None:
430
                if isinstance(entry[7], str):
431
                    cpes.extend(self._get_cpes(entry[7]))
432
                else:
433
                    for cpe in entry[7]:
434
                        if cpe:
435
                            cpes.extend(self._get_cpes(cpe))
436
437
            vulns = self._get_cves(cpes)
438
            if vulns:
439
                for ip in ips:
440
                    report.add_results(
441
                        ip=ip,
442
                        hostname=name,
443
                        cpes=vulns,
444
                        cpeo=os_cpe,
445
                        os=os,
446
                        date_time=date_time,
447
                    )
448
449
            progressed += 1
450
            progressbar.update(progressed=progressed)
451
452
        progressbar.done()
453
        print("Nice ...")
454
        print(report.results)
455
        return report
456
457
    def _get_cpes(self, cpe):
458
        """Parse and return the CPE's from the JSON.
459
        Convert the CPEs to v2.2 and check if they have a
460
        version part. If not get this CPE in all versions
461
        from the GSM and return them. This may result in
462
        a lot of false positives or false negatives.
463
        """
464
        cpe = convert_cpe23_to_cpe22(cpe)
465
        if cpe[1] is False:
466
            return [cpe[0]]
467
468
        cpes = []
469
        cpe_xml = self.gmp.get_info_list(
470
            info_type=InfoType.CPE, filter='rows=-1 uuid~"{}:"'.format(cpe[0])
471
        )
472
        infos = cpe_xml.findall('info')
473
        for cpe in infos[:-1]:  # -1 because the last info tag is a wrongy. :D
474
            cpes.append(cpe.get('id'))
475
        return cpes
476
477
    def _get_cves(self, cpes):
478
        """Get CVEs for the CPEs from the CSV List"""
479
        d1 = datetime.datetime.now()
480
        # print(f'Serching CVEs for {str(len(cpes))}:', end=None)
481
        vulns = {}
482
        i = 0
483
        for row in self.reader:  # O(n)
484
            for cpe in cpes:
485
                vulns[cpe] = {}
486
                if cpe in row[0]:
487
                    vulns[cpe][row[1].strip("'")] = float(row[2].strip("'"))
488
                    i = i + 1
489
        self.cpe_list.seek(0)
490
        d2 = datetime.datetime.now()
491
        # print(f'Found {str(i)} CVEs. Time consumed: {str(d2 - d1)}')
492
493
        return vulns
494
495
    def finish_lookup(self):
496
        self.file.close()
497
498
499
def convert_cpe23_to_cpe22(cpe: str) -> Tuple[str, bool]:
500
    """Convert a CPE v2.3 to a CPE v2.2
501
    returns the CPE v2.2 and True if no product
502
    version is given
503
    """
504
    # MAKE ME BETTER!!!
505
    cpe = CPE(cpe)
506
    any_version = False
507
    if cpe.get_version()[0] == '*':
508
        any_version = True
509
    return (
510
        str(CPE(cpe.as_uri_2_3(), CPE.VERSION_2_2)).replace('CPE v2.2: ', ''),
511
        any_version,
512
    )
513
514
515 View Code Duplication
def parse_args(args: Namespace) -> Namespace:  # pylint: disable=unused-argument
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
516
    """ Parsing args ... """
517
518
    parser = ArgumentParser(
519
        prefix_chars='+',
520
        add_help=False,
521
        formatter_class=RawTextHelpFormatter,
522
        description=HELP_TEXT,
523
    )
524
525
    parser.add_argument(
526
        '+h',
527
        '++help',
528
        action='help',
529
        help='Show this help message and exit.',
530
    )
531
532
    parser.add_argument(
533
        '++create-list',
534
        nargs='?',
535
        type=str,
536
        choices=('no_creation', 'recreate', 'create'),
537
        const='create',
538
        default='no_creation',
539
        dest="create_list",
540
        help="Create the CPE to CVE helper list",
541
    )
542
543
    parser.add_argument(
544
        '+l',
545
        '++list',
546
        type=str,
547
        dest="list",
548
        required=True,
549
        help="Create the CPE to CVE helper list",
550
    )
551
552
    parser.add_argument(
553
        '+f',
554
        '++file',
555
        type=str,
556
        dest="json_file",
557
        help="File that should be parsed",
558
    )
559
560
    args, _ = parser.parse_known_args()
561
562
    return args
563
564
565
def main(gmp, args):
566
    # pylint: disable=undefined-variable
567
568
    parsed_args = parse_args(args=args)
569
570
    recreate = False
571
    if parsed_args.create_list == 'recreate':
572
        recreate = True
573
    if parsed_args.create_list != 'no_creation':
574
        print("Generating CVE to CPE list.")
575
        list_generator = ListGenerator(
576
            gmp, filename=Path(parsed_args.list).absolute(), recreate=recreate
577
        )
578
        list_generator.create_cve_list()
579
        print("Generation of CVE to CPE list done.")
580
    if parsed_args.json_file:
581
        report = Parser(
582
            gmp=gmp, json_file=parsed_args.json_file, cpe_list=parsed_args.list
583
        ).parse()
584
585
        report.finish_report()
586
        report_id = report.send_report()
587
        print(f"Imported Report [{report_id}]")
588
589
590
if __name__ == '__gmp__':
591
    main(gmp, args)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable args does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable gmp does not seem to be defined.
Loading history...
592