Completed
Push — master ( e4d3cc...f37e7f )
by Björn
15s queued 12s
created

create-cve-report-from-json.gmp.parse_args()   A

Complexity

Conditions 1

Size

Total Lines 48
Code Lines 35

Duplication

Lines 48
Ratio 100 %

Importance

Changes 0
Metric Value
cc 1
eloc 35
nop 1
dl 48
loc 48
rs 9.0399
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
import json
20
import csv
21
import datetime
22
import time
23
from pathlib import Path
24
25
from typing import Dict, Tuple
26
from argparse import ArgumentParser, RawTextHelpFormatter, Namespace
27
from lxml import etree as e
28
from cpe import CPE
29
from gvm.protocols.gmp import Gmp
30
from gvm.protocols.latest import InfoType
31
from gvmtools.helper import generate_uuid, error_and_exit
32
33
34
HELP_TEXT = (
35
    'This script creates a cve report from a JSON document.\n'
36
    'The JSON document needs to be formatted like this: '
37
    '['
38
    '    {'
39
    '        "headings": ['
40
    '            "name",'
41
    '            "IP Address",'
42
    '            "IP range",'
43
    '            "Operating System",'
44
    '            "CPE String 23",'
45
    '            "Name",'
46
    '            "Full Version (version)",'
47
    '            "CPE String 23"'
48
    '        ],'
49
    '        ...,'
50
    '        "results": ['
51
    '            ['
52
    '                "foo",'
53
    '                "127.0.0.1",'
54
    '                "127.0.0.1/32",'
55
    '                "Some Windows",'
56
    '                "cpe:2.3:o:microsoft:some_windows:-:*:*:*:*:*:*:*",'
57
    '                ['
58
    '                    "Some Microsoftware",'
59
    '                    .'
60
    '                ],'
61
    '                ['
62
    '                    "0.1",'
63
    '                    ...'
64
    '                ],'
65
    '                ['
66
    '                    "cpe:2.3:a:microsoft:microsoftware:0.1:*:*:*:*:*:*:*",'
67
    '                    ...'
68
    '                ]'
69
    '            ],'
70
    '        ]'
71
    '    }'
72
    ']'
73
    ' Usable with gvm-script (gvm-tools). Help: gvm-script -h'
74
)
75
76
77
class ProgressBar:
78
    def __init__(self, length: int, count: int, pl_name: str):
79
        self.length = length
80
        self.count = count
81
        self.current = 0
82
        self.start_time = datetime.datetime.now()
83
        self.entities = pl_name
84
85
        self.eta = '???'
86
        self.seq = ''
87
        self.end = ''
88
89
        self._print()
90
        self.seq = '\r'
91
92
    def _leading_zeros(self) -> str:
93
        return (len(str(self.count)) - len(str(self.current))) * ' '
94
95
    def _bar(self):
96
        points = int(self.length * (self.current / self.count))
97
        return str("·" * points + " " * (self.length - points))
98
99
    def _print(self):
100
        print(
101
            f'{self.seq}[{self._bar()}] | '
102
            f'{self._leading_zeros()}{str(self.current)}/{str(self.count)} '
103
            f'{self.entities} processed. | '
104
            f'ETA: {self.eta}',
105
            flush=True,
106
            end=self.end,
107
        )
108
109
    def update(self, progressed):
110
        self.current = progressed
111
        elapsed = datetime.datetime.now() - self.start_time
112
        self.eta = str(elapsed / self.current * (self.count - self.current))
113
        self._print()
114
115
    def done(self):
116
        self.current = self.count
117
        self.eta = 'Done!         '
118
        self.end = '\n'
119
        self._print()
120
121
122
class ListGenerator:
123
    """
124
    Creating the CPE to CVE list used for the report generation
125
    in this this script.
126
    """
127
128
    def __init__(self, gmp: Gmp, filename: Path, recreate: bool):
129
        self.gmp = gmp
130
        if filename.exists():
131
            if recreate:
132
                filename.unlink()
133
            else:
134
                error_and_exit(
135
                    f'The file "{filename}" already exists. '
136
                    'If you want to delete the old list and '
137
                    'recreate the list run with "++create-list '
138
                    f'recreate +f {filename}"'
139
                )
140
        self.file = open(filename, 'w')
141
142
    def _cpe_to_cve(self, resp):
143
        """ Write the CPEs and CVEs to the list """
144
        cve_tags = resp.findall('info')
145
        for cve_tag in cve_tags[
146
            :-1
147
        ]:  # -1 because the last info tag is a wrongy. :D
148
            cve = None
149
            cpes = None
150
            if 'id' in cve_tag.attrib:
151
                cve = cve_tag.attrib['id']
152
                cpes = cve_tag.find('cve').find('products').text
153
                cvss = cve_tag.find('cve').find('cvss').text
154
                if cpes:
155
                    for cpe in cpes.strip().split(' '):
156
                        print(
157
                            f"'{cpe}','{cve}','{cvss}'",
158
                            file=self.file,
159
                            end='\n',
160
                        )
161
162
    def create_cpe_list(self, step: int = 3000):
163
        """Creates a CPE to CVE list in a CSV format:
164
        'cpe', 'cve', 'cvss'
165
        The CPE's have a 1-to-1-relation to the CVE's
166
        so CPE's can appear more then once in this
167
        list
168
169
        step(int): How many CVEs will be requested from the GSM
170
                   in one request. Be careful with higher values.
171
                   You will need to set the default timeout in
172
                   gvm-tools higher if you set step >3000. A higher
173
                   step will make the list generation faster.
174
        """
175
        resp = self.gmp.get_info_list(info_type=InfoType.CVE, filter='rows=1')
176
        count = resp.find('info_count').text
177
178
        first = 0
179
        count = int(count)
180
        print(f'Creating CPE to CVE list. Found {count} CVE\'s.')
181
        progress_bar = ProgressBar(length=100, count=count, pl_name='CVEs')
182
        print(f'[{" " * 50}] | ({str(first)}/{count})', flush=True, end='')
183
        while (first + step) < count:
184
            resp = self.gmp.get_info_list(
185
                info_type=InfoType.CVE, filter=f'rows={step} first={first}'
186
            )
187
            self._cpe_to_cve(resp)
188
            first = first + step
189
            progress_bar.update(progressed=first)
190
191
        # find the rest
192
        resp = self.gmp.get_info_list(
193
            info_type=InfoType.CVE,
194
            filter=f'rows={count - first} first={first}',
195
        )
196
        self._cpe_to_cve(resp)
197
        progress_bar.done()
198
199
        self.file.close()
200
201
202
class Report:
203
    def __init__(self, gmp):
204
        self.results = e.Element('results', {'start': '1', 'max': '-1'})
205
        self.hosts = []
206
        self.report = None
207
208
        self.gmp = gmp
209
210
    def finish_report(self):
211
        report_format_id = 'd5da9f67-8551-4e51-807b-b6a873d70e34'
212
        self.report_id = generate_uuid()
213
        self.report = e.Element(
214
            'report',
215
            {
216
                'id': self.report_id,
217
                'format_id': report_format_id,
218
                'extension': 'xml',
219
                'content_type': 'text/xml',
220
            },
221
        )
222
        owner_elem = e.SubElement(self.report, 'owner')
223
        e.SubElement(owner_elem, 'name').text = ''
224
        e.SubElement(self.report, 'name').text = 'Report created from JSON-File'
225
226
        inner_report = e.SubElement(
227
            self.report, 'report', {'id': self.report_id}
228
        )
229
        ports_elem = e.SubElement(
230
            inner_report, 'ports', {'start': '1', 'max': '-1'}
231
        )
232
233
        inner_report.append(ports_elem)
234
        inner_report.append(self.results)
235
        inner_report.extend(self.hosts)
236
        self.report.append(inner_report)
237
238
    def send_report(self) -> str:
239
        the_time = time.strftime("%Y/%m/%d-%H:%M:%S")
240
        task_id = ''
241
        task_name = "CVE_Scan_Report_{}".format(the_time)
242
243
        res = self.gmp.create_container_task(
244
            name=task_name, comment="Created with gvm-tools."
245
        )
246
247
        task_id = res.xpath('//@id')[0]
248
249
        report = e.tostring(self.report)
250
251
        res = self.gmp.import_report(report, task_id=task_id, in_assets=True)
252
253
        return res.xpath('//@id')[0]
254
255
    def generate_host_detail(
256
        self,
257
        name,
258
        value,
259
        source_name=None,
260
        source_description=None,
261
        source_type=None,
262
    ):
263
        """ Generating a host details xml element """
264
        host_detail_elem = e.Element('detail')
265
        e.SubElement(host_detail_elem, 'name').text = name
266
        e.SubElement(host_detail_elem, 'value').text = value
267
268
        if source_name:
269
            source_elem = e.SubElement(host_detail_elem, 'source')
270
            e.SubElement(source_elem, 'name').text = source_name
271
            if source_type:
272
                e.SubElement(source_elem, 'type').text = source_type
273
            if source_description:
274
                e.SubElement(
275
                    source_elem, 'description'
276
                ).text = source_description
277
278
        return host_detail_elem
279
280
    def add_results(self, ip, hostname, cpes: Dict, cpeo, os, date_time):
281
        host_id = generate_uuid()
282
        source_name = 'gvm-tools'
283
        date_format = '%Y-%m-%dT%H:%M:%S'
284
        date_time = f'{date_time.strftime(date_format)}Z'
285
286
        host_elem = e.Element('host')
287
        e.SubElement(host_elem, 'ip').text = ip
288
        e.SubElement(host_elem, 'asset', {'asset_id': host_id})
289
        e.SubElement(host_elem, 'start').text = date_time
290
        e.SubElement(host_elem, 'end').text = date_time
291
        host_result_count_elem = e.SubElement(host_elem, 'result_count')
292
        host_elem.append(
293
            self.generate_host_detail(
294
                name='hostname', value=hostname, source_name=source_name
295
            )
296
        )
297
        host_elem.append(
298
            self.generate_host_detail(
299
                name='best_os_txt',
300
                value=os,
301
                source_name=source_name,
302
                source_description="Host Details",
303
            )
304
        )
305
        host_elem.append(
306
            self.generate_host_detail(
307
                name='best_os_cpe',
308
                value=cpeo,
309
                source_name=source_name,
310
                source_description="Host Details",
311
            )
312
        )
313
314
        host_details = 0
315
        for cpe, cves in cpes.items():
316
            if cves:
317
                for cve, cvss in cves.items():
318
                    result_id = generate_uuid()
319
                    result = e.Element('result', {'id': result_id})
320
                    e.SubElement(result, 'name').text = f'Result for host {ip}'
321
                    e.SubElement(
322
                        result, 'comment'
323
                    ).text = 'Imported with gvm-tools'
324
                    e.SubElement(result, 'modification_time').text = date_time
325
                    e.SubElement(result, 'creation_time').text = date_time
326
                    detect_elem = e.Element('detection')
327
                    detect_result_elem = e.SubElement(
328
                        detect_elem, 'result', {'id': result_id}
329
                    )
330
                    details_elem = e.SubElement(detect_result_elem, 'details')
331
                    # We need to add the detection details here
332
                    # but actually they are not imported to GSM anyways ...
333
                    e.SubElement(details_elem, 'detail')
334
335
                    result_host_elem = e.Element('host')
336
                    result_host_elem.text = ip
337
                    e.SubElement(
338
                        result_host_elem, 'asset', {'asset_id': host_id}
339
                    )
340
                    e.SubElement(result_host_elem, 'hostname').text = hostname
341
                    result.append(result_host_elem)
342
343
                    nvt_elem = e.Element('nvt', {'oid': cve})
344
                    e.SubElement(nvt_elem, 'type').text = 'cve'
345
                    e.SubElement(nvt_elem, 'name').text = cve
346
                    e.SubElement(nvt_elem, 'cvss_base').text = str(cvss)
347
                    e.SubElement(nvt_elem, 'cve').text = cve
348
349
                    result.append(nvt_elem)
350
351
                    e.SubElement(result, 'severity').text = str(cvss)
352
353
                    host_elem.append(
354
                        self.generate_host_detail(
355
                            name='App',
356
                            value=cpe,
357
                            source_type='cve',
358
                            source_name=cve,
359
                            source_description='CVE Scanner',
360
                        )
361
                    )
362
                    host_details = host_details + 1
363
364
                    self.results.append(result)
365
        e.SubElement(host_result_count_elem, 'page').text = str(host_details)
366
        self.hosts.append(host_elem)
367
368
369
class Parser:
370
    """Class handles the Parsing from JSON to a Report"""
371
372
    def __init__(self, gmp: Gmp, json_file: Path, cpe_list: Path) -> None:
373
        try:
374
            self.cpe_list = open(cpe_list, 'r')
375
            self.reader = csv.reader(self.cpe_list)
376
        except FileNotFoundError:
377
            error_and_exit(
378
                f'There is no file "{cpe_list}". '
379
                'Maybe you need to create a list first. Run with '
380
                f'argument "++create-list +f {cpe_list}", to create '
381
                'a new list, or pass the correct location of an existing list.'
382
            )
383
        self.gmp = gmp
384
        try:
385
            self.json_fp = open(json_file)
386
            self.json_dump = json.load(self.json_fp)[0]['results']
387
        except FileNotFoundError:
388
            error_and_exit(f'There is no file "{json_file}".')
389
        except json.JSONDecodeError as e:
390
            error_and_exit(f'The JSON seems to be invalid: {e.args[0]}')
391
392
    def parse(self) -> Report:
393
        """Loads an JSON file and extracts host informations:
394
395
        Args:
396
            host_dump: the dumped json results, containing a hostname,
397
                    host_ip, host_ip_range, host_operating_system,
398
                    host_os_cpe, arrays of found_app, app_version,
399
                    app_cpe
400
        """
401
402
        report = Report(gmp=gmp)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable gmp does not seem to be defined.
Loading history...
403
404
        date_time = datetime.datetime.now()
405
406
        count = len(self.json_dump)
407
        progressed = 0
408
        print(f'Found {str(count)} hosts:')
409
410
        progressbar = ProgressBar(length=100, count=count, pl_name="Hosts")
411
412
        for entry in self.json_dump:
413
            if entry[3] is None:
414
                error_and_exit("The JSON format is not correct.")
415
            name = entry[0]
416
            # print(f"Creating Results for the host {name}")
417
            ips = entry[1]
418
            if isinstance(ips, str):
419
                ips = [ips]
420
            os = entry[3]
421
            os_cpe = convert_cpe23_to_cpe22(entry[4])[0]
422
423
            cpes = []
424
            # entry[7] should be the CPEs ...
425
            if entry[7] is not None:
426
                if isinstance(entry[7], str):
427
                    cpes.extend(self._get_cpes(entry[7]))
428
                else:
429
                    for cpe in entry[7]:
430
                        if cpe:
431
                            cpes.extend(self._get_cpes(cpe))
432
433
            vulns = self._get_cves(cpes)
434
            if vulns:
435
                for ip in ips:
436
                    report.add_results(
437
                        ip=ip,
438
                        hostname=name,
439
                        cpes=vulns,
440
                        cpeo=os_cpe,
441
                        os=os,
442
                        date_time=date_time,
443
                    )
444
445
            progressed += 1
446
            progressbar.update(progressed=progressed)
447
448
        progressbar.done()
449
        print("Nice ...")
450
        print(report.results)
451
        return report
452
453
    def _get_cpes(self, cpe):
454
        """Parse and return the CPE's from the JSON.
455
        Convert the CPEs to v2.2 and check if they have a
456
        version part. If not get this CPE in all versions
457
        from the GSM and return them. This may result in
458
        a lot of false positives or false negatives.
459
        """
460
        cpe = convert_cpe23_to_cpe22(cpe)
461
        if cpe[1] is False:
462
            return [cpe[0]]
463
464
        cpes = []
465
        cpe_xml = self.gmp.get_info_list(
466
            info_type=InfoType.CPE, filter='rows=-1 uuid~"{}:"'.format(cpe[0])
467
        )
468
        infos = cpe_xml.findall('info')
469
        for cpe in infos[:-1]:  # -1 because the last info tag is a wrongy. :D
470
            cpes.append(cpe.get('id'))
471
        return cpes
472
473
    def _get_cves(self, cpes):
474
        """Get CVEs for the CPEs from the CSV List"""
475
        vulns = {}
476
        i = 0
477
        for row in self.reader:  # O(n)
478
            for cpe in cpes:
479
                vulns[cpe] = {}
480
                if cpe in row[0]:
481
                    vulns[cpe][row[1].strip("'")] = float(row[2].strip("'"))
482
                    i = i + 1
483
        self.cpe_list.seek(0)
484
485
        return vulns
486
487
    def finish_lookup(self):
488
        self.json_fp.close()
489
        self.cpe_list.close()
490
491
492
def convert_cpe23_to_cpe22(cpe: str) -> Tuple[str, bool]:
493
    """Convert a CPE v2.3 to a CPE v2.2
494
    returns the CPE v2.2 and True if no product
495
    version is given
496
    """
497
    # MAKE ME BETTER!!!
498
    cpe = CPE(cpe)
499
    any_version = False
500
    if cpe.get_version()[0] == '*':
501
        any_version = True
502
    return (
503
        str(CPE(cpe.as_uri_2_3(), CPE.VERSION_2_2)).replace('CPE v2.2: ', ''),
504
        any_version,
505
    )
506
507
508 View Code Duplication
def parse_args(args: Namespace) -> Namespace:  # pylint: disable=unused-argument
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
509
    """ Parsing args ... """
510
511
    parser = ArgumentParser(
512
        prefix_chars='+',
513
        add_help=False,
514
        formatter_class=RawTextHelpFormatter,
515
        description=HELP_TEXT,
516
    )
517
518
    parser.add_argument(
519
        '+h',
520
        '++help',
521
        action='help',
522
        help='Show this help message and exit.',
523
    )
524
525
    parser.add_argument(
526
        '++create-list',
527
        nargs='?',
528
        type=str,
529
        choices=('no_creation', 'recreate', 'create'),
530
        const='create',
531
        default='no_creation',
532
        dest="create_list",
533
        help="Create the CPE to CVE helper list",
534
    )
535
536
    parser.add_argument(
537
        '+l',
538
        '++list',
539
        type=str,
540
        dest="list",
541
        required=True,
542
        help="Create the CPE to CVE helper list",
543
    )
544
545
    parser.add_argument(
546
        '+f',
547
        '++file',
548
        type=str,
549
        dest="json_file",
550
        help="File that should be parsed",
551
    )
552
553
    args, _ = parser.parse_known_args()
554
555
    return args
556
557
558
def main(gmp, args):
559
    # pylint: disable=undefined-variable
560
561
    parsed_args = parse_args(args=args)
562
563
    recreate = False
564
    if parsed_args.create_list == 'recreate':
565
        recreate = True
566
    if parsed_args.create_list != 'no_creation':
567
        print("Generating CPE to CVE list.")
568
        list_generator = ListGenerator(
569
            gmp, filename=Path(parsed_args.list).absolute(), recreate=recreate
570
        )
571
        list_generator.create_cpe_list()
572
        print("Generation of CPE to CVE list done.")
573
    if parsed_args.json_file:
574
        report = Parser(
575
            gmp=gmp, json_file=parsed_args.json_file, cpe_list=parsed_args.list
576
        ).parse()
577
578
        report.finish_report()
579
        report_id = report.send_report()
580
        print(f"Imported Report [{report_id}]")
581
582
583
if __name__ == '__gmp__':
584
    main(gmp, args)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable args does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable gmp does not seem to be defined.
Loading history...
585