Passed
Pull Request — master (#376)
by Jaspar
01:21
created

CPELookup.get_cves()   A

Complexity

Conditions 5

Size

Total Lines 18
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 16
nop 2
dl 0
loc 18
rs 9.1333
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
import json
20
import csv
21
import datetime
22
import time
23
24
from pathlib import Path
25
from typing import Dict, Tuple
26
from argparse import ArgumentParser, RawTextHelpFormatter, Namespace
27
from lxml import etree as e
28
from cpe import CPE
29
from gvm.protocols.gmp import Gmp
30
from gvm.protocols.latest import InfoType
31
from gvm.xml import pretty_print
32
from gvmtools.helper import generate_uuid, error_and_exit
33
34
35
HELP_TEXT = (
36
    'This script creates a cve report from a JSON document.\n'
37
    'The JSON document needs to be formatted like this: '
38
    '['
39
    '    {'
40
    '        "headings": ['
41
    '            "name",'
42
    '            "IP Address",'
43
    '            "IP range",'
44
    '            "Operating System",'
45
    '            "CPE String 23",'
46
    '            "Name",'
47
    '            "Full Version (version)",'
48
    '            "CPE String 23"'
49
    '        ],'
50
    '        ...,'
51
    '        "results": ['
52
    '            ['
53
    '                "foo",'
54
    '                "127.0.0.1",'
55
    '                "127.0.0.1/32",'
56
    '                "Some Windows",'
57
    '                "cpe:2.3:o:microsoft:some_windows:-:*:*:*:*:*:*:*",'
58
    '                ['
59
    '                    "Some Microsoftware",'
60
    '                    .'
61
    '                ],'
62
    '                ['
63
    '                    "0.1",'
64
    '                    ...'
65
    '                ],'
66
    '                ['
67
    '                    "cpe:2.3:a:microsoft:microsoftware:0.1:*:*:*:*:*:*:*",'
68
    '                    ...'
69
    '                ]'
70
    '            ],'
71
    '        ]'
72
    '    }'
73
    ']'
74
    ' Usable with gvm-script (gvm-tools). Help: gvm-script -h'
75
)
76
77
78
class CPELookup:
79
    """Class handles the CPEs"""
80
81
    def __init__(self, filename: Path):
82
        try:
83
            self.file = open(filename, 'r')
84
            self.reader = csv.reader(self.file)
85
        except FileNotFoundError:
86
            error_and_exit(
87
                f'There is no file "{filename}". '
88
                'Maybe you need to create a list first. Run with '
89
                f'argument "++create-list +f {filename}", to create '
90
                'a new list, or pass the correct location of an existing list.'
91
            )
92
93
    def get_cves(self, cpes):
94
        """Get CVEs for the CPEs from the CSV List"""
95
        d1 = datetime.datetime.now()
96
        print(f'Serching CVEs for {str(len(cpes))}:', end=None)
97
        vulns = {}
98
        i = 0
99
        for cpe in cpes:
100
            vulns[cpe] = {}
101
        for row in self.reader:  # O(n)
102
            for cpe in cpes:
103
                if cpe in row[0]:
104
                    vulns[cpe][row[1].strip("'")] = float(row[2].strip("'"))
105
                    i = i + 1
106
        self.file.seek(0)
107
        d2 = datetime.datetime.now()
108
        print(f'Found {str(i)} CVEs. Time consumed: {str(d2 - d1)}')
109
110
        return vulns
111
112
    def finish_lookup(self):
113
        self.file.close()
114
115
116
class ListGenerator:
117
    """
118
    Creating the CPE to CVE list used for the report generation
119
    in this this script.
120
    """
121
122
    def __init__(self, gmp: Gmp, filename: Path, recreate: bool):
123
        self.gmp = gmp
124
        if filename.exists():
125
            if recreate:
126
                filename.unlink()
127
            else:
128
                error_and_exit(
129
                    f'The file "{filename}" already exists. '
130
                    'If you want to delete the old list and '
131
                    'recreate the list run with "++create-list '
132
                    f'recreate +f {filename}"'
133
                )
134
        self.file = open(filename, 'w')
135
136
    def _cpe_to_cve(self, resp):
137
        """ Write the CPEs and CVEs to the list """
138
        cve_tags = resp.findall('info')
139
        for cve_tag in cve_tags[
140
            :-1
141
        ]:  # -1 because the last info tag is a wrongy. :D
142
            cve = None
143
            cpes = None
144
            if 'id' in cve_tag.attrib:
145
                cve = cve_tag.attrib['id']
146
                cpes = cve_tag.find('cve').find('products').text
147
                cvss = cve_tag.find('cve').find('cvss').text
148
                if cpes:
149
                    for cpe in cpes.strip().split(' '):
150
                        print(
151
                            f"'{cpe}','{cve}','{cvss}'",
152
                            file=self.file,
153
                            end='\n',
154
                        )
155
156
    def create_cve_list(self, step: int = 3000):
157
        """Creates a CPE to CVE list in a CSV format:
158
        'cpe', 'cve', 'cvss'
159
        The CPE's have a 1-to-1-relation to the CVE's
160
        so CPE's can appear more then once in this
161
        list
162
163
        step(int): How many CVEs will be requested from the GSM
164
                   in one request. Be careful with higher values.
165
                   You will need to set the default timeout in
166
                   gvm-tools higher if you set step >3000. A higher
167
                   step will make the list generation faster.
168
        """
169
        resp = self.gmp.get_info_list(info_type=InfoType.CVE, filter='rows=1')
170
        count = resp.find('info_count').text
171
172
        print(f'Creating CPE to CVE list. Found {count} CVE\'s.')
173
174
        first = 0
175
        counter = int(count)
176
        d1 = datetime.datetime.now()
177
        print(f'[{" " * 50}] | ({str(first)}/{count})', flush=True, end='')
178
        while counter > step:
179
            resp = self.gmp.get_info_list(
180
                info_type=InfoType.CVE, filter=f'rows={step} first={first}'
181
            )
182
            # refresh the counters
183
            counter = counter - step
184
            first = first + step
185
186
            self._cpe_to_cve(resp)
187
            points = int(((first / int(count)) * 100) / 2)
188
            percent = str("." * points + " " * (50 - points))
189
            print(
190
                f'\r[{percent}] | ({str(first)}/{count}) '
191
                f'TIME CONSUMED: {str(datetime.datetime.now() - d1)}',
192
                flush=True,
193
                end='',
194
            )
195
196
        # find the rest
197
        resp = self.gmp.get_info_list(
198
            info_type=InfoType.CVE, filter=f'rows={counter} first={first}'
199
        )
200
        self._cpe_to_cve(resp)
201
202
        self.file.close()
203
204
205
class Report:
206
    def __init__(self, gmp):
207
        self.results = e.Element('results', {'start': '1', 'max': '-1'})
208
        self.hosts = []
209
        self.report = None
210
211
        self.gmp = gmp
212
213
    def finish_report(self):
214
        report_format_id = 'd5da9f67-8551-4e51-807b-b6a873d70e34'
215
        self.report_id = generate_uuid()
216
        self.report = e.Element(
217
            'report',
218
            {
219
                'id': self.report_id,
220
                'format_id': report_format_id,
221
                'extension': 'xml',
222
                'content_type': 'text/xml',
223
            },
224
        )
225
        owner_elem = e.SubElement(self.report, 'owner')
226
        e.SubElement(owner_elem, 'name').text = ''
227
        e.SubElement(self.report, 'name').text = 'Report created from JSON-File'
228
229
        inner_report = e.SubElement(
230
            self.report, 'report', {'id': self.report_id}
231
        )
232
        ports_elem = e.SubElement(
233
            inner_report, 'ports', {'start': '1', 'max': '-1'}
234
        )
235
236
        inner_report.append(ports_elem)
237
        inner_report.append(self.results)
238
        for host in self.hosts:
239
            pretty_print(host)
240
            inner_report.append(host)
241
        self.report.append(inner_report)
242
243
    def send_report(self) -> str:
244
        the_time = time.strftime("%Y/%m/%d-%H:%M:%S")
245
        task_id = ''
246
        task_name = "CVE_Scan_Report_{}".format(the_time)
247
248
        res = self.gmp.create_container_task(
249
            name=task_name, comment="Created with gvm-tools."
250
        )
251
252
        task_id = res.xpath('//@id')[0]
253
254
        report = e.tostring(self.report)
255
256
        res = self.gmp.import_report(report, task_id=task_id, in_assets=True)
257
258
        return res.xpath('//@id')[0]
259
260
    def generate_host_detail(
261
        self,
262
        name,
263
        value,
264
        source_name=None,
265
        source_description=None,
266
        source_type=None,
267
    ):
268
        """ Generating a host details xml element """
269
        host_detail_elem = e.Element('detail')
270
        e.SubElement(host_detail_elem, 'name').text = name
271
        e.SubElement(host_detail_elem, 'value').text = value
272
273
        if source_name:
274
            source_elem = e.SubElement(host_detail_elem, 'source')
275
            e.SubElement(source_elem, 'name').text = source_name
276
            if source_type:
277
                e.SubElement(source_elem, 'type').text = source_type
278
            if source_description:
279
                e.SubElement(
280
                    source_elem, 'description'
281
                ).text = source_description
282
283
        return host_detail_elem
284
285
    def add_results(self, ip, hostname, cpes: Dict, cpeo, os, date_time):
286
        host_elem = e.Element('host')
287
        host_id = generate_uuid()
288
        e.SubElement(host_elem, 'ip').text = ip
289
        e.SubElement(host_elem, 'asset', {'asset_id': host_id})
290
291
        source_name = 'gvm-tools'
292
        host_elem.append(
293
            self.generate_host_detail(
294
                name='hostname', value=hostname, source_name=source_name
295
            )
296
        )
297
        host_elem.append(
298
            self.generate_host_detail(
299
                name='best_os_txt',
300
                value=os,
301
                source_name=source_name,
302
                source_description="Host Details",
303
            )
304
        )
305
        host_elem.append(
306
            self.generate_host_detail(
307
                name='best_os_cpe',
308
                value=cpeo,
309
                source_name=source_name,
310
                source_description="Host Details",
311
            )
312
        )
313
314
        date_format = '%Y-%m-%dT%H:%M:%S'
315
        date_time = f'{date_time.strftime(date_format)}+01:00'
316
317
        for cpe, cves in cpes.items():
318
            if cves:
319
                for cve, cvss in cves.items():
320
                    result_id = generate_uuid()
321
                    result = e.Element('result', {'id': result_id})
322
                    e.SubElement(result, 'name').text = f'Result for host {ip}'
323
                    e.SubElement(
324
                        result, 'comment'
325
                    ).text = 'Imported with gvm-tools'
326
                    e.SubElement(result, 'modification_time').text = date_time
327
                    e.SubElement(result, 'creation_time').text = date_time
328
                    detect_elem = e.Element('detection')
329
                    detect_result_elem = e.SubElement(
330
                        detect_elem, 'result', {'id': result_id}
331
                    )
332
                    details_elem = e.SubElement(detect_result_elem, 'details')
333
                    # We need to add the detection details here
334
                    # but actually they are not imported to GSM anyways ...
335
                    e.SubElement(details_elem, 'detail')
336
337
                    result_host_elem = e.Element('host')
338
                    result_host_elem.text = ip
339
                    e.SubElement(host_elem, 'asset', {'asset_id': host_id})
340
                    e.SubElement(result_host_elem, 'hostname').text = hostname
341
                    result.append(result_host_elem)
342
343
                    nvt_elem = e.Element('nvt', {'oid': cve})
344
                    e.SubElement(nvt_elem, 'type').text = 'cve'
345
                    e.SubElement(nvt_elem, 'name').text = cve
346
                    e.SubElement(nvt_elem, 'cvss_base').text = str(cvss)
347
                    e.SubElement(nvt_elem, 'cve').text = cve
348
349
                    result.append(nvt_elem)
350
351
                    e.SubElement(result, 'severity').text = str(cvss)
352
353
                    host_elem.append(
354
                        self.generate_host_detail(
355
                            name='App',
356
                            value=cpe,
357
                            source_type='cve',
358
                            source_name=cve,
359
                            source_description='CVE Scanner',
360
                        )
361
                    )
362
363
                    self.results.append(result)
364
365
        self.hosts.append(host_elem)
366
367
368
class Hosts:
369
    """Class to store the host elements"""
370
371
    def __init__(self):
372
        self.hosts = []
373
374
    def add_host(self):
375
        pass
376
377
378
def convert_cpe23_to_cpe22(cpe: str) -> Tuple[str, bool]:
379
    """Convert a CPE v2.3 to a CPE v2.2
380
    returns the CPE v2.2 and True if no product
381
    version is given
382
    """
383
    # MAKE ME BETTER!!!
384
    cpe = CPE(cpe)
385
    any_version = False
386
    if cpe.get_version()[0] == '*':
387
        any_version = True
388
    return (
389
        str(CPE(cpe.as_uri_2_3(), CPE.VERSION_2_2)).replace('CPE v2.2: ', ''),
390
        any_version,
391
    )
392
393
394 View Code Duplication
def parse_args(args: Namespace) -> Namespace:  # pylint: disable=unused-argument
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
395
    """ Parsing args ... """
396
397
    parser = ArgumentParser(
398
        prefix_chars='+',
399
        add_help=False,
400
        formatter_class=RawTextHelpFormatter,
401
        description=HELP_TEXT,
402
    )
403
404
    parser.add_argument(
405
        '+h',
406
        '++help',
407
        action='help',
408
        help='Show this help message and exit.',
409
    )
410
411
    parser.add_argument(
412
        '++create-list',
413
        nargs='?',
414
        type=str,
415
        choices=('no_creation', 'recreate', 'create'),
416
        const='create',
417
        default='no_creation',
418
        dest="create_list",
419
        help="Create the CPE to CVE helper list",
420
    )
421
422
    parser.add_argument(
423
        '+l',
424
        '++list',
425
        type=str,
426
        dest="list",
427
        required=True,
428
        help="Create the CPE to CVE helper list",
429
    )
430
431
    parser.add_argument(
432
        '+f',
433
        '++file',
434
        type=str,
435
        dest="json_file",
436
        help="File that should be parsed",
437
    )
438
439
    args, _ = parser.parse_known_args()
440
441
    return args
442
443
444
def get_cpe(gmp, cpe):
445
    """Parse and return the CPE's from the JSON.
446
    Convert the CPEs to v2.2 and check if they have a
447
    version part. If not get this CPE in all versions
448
    from the GSM and return them. This may result in
449
    a lot of false positives or false negatives.
450
    """
451
    cpe = convert_cpe23_to_cpe22(cpe)
452
    if cpe[1] is False:
453
        return [cpe[0]]
454
455
    cpes = []
456
    cpe_xml = gmp.get_info_list(
457
        info_type=InfoType.CPE, filter='rows=-1 uuid~"{}:"'.format(cpe[0])
458
    )
459
    infos = cpe_xml.findall('info')
460
    for cpe in infos[:-1]:  # -1 because the last info tag is a wrongy. :D
461
        cpes.append(cpe.get('id'))
462
    return cpes
463
464
465
def parse_json(gmp, hosts_dump, cpe_list):
466
    """Loads an JSON file and extracts host informations:
467
468
    Args:
469
        host_dump: the dumped json results, containing a hostname,
470
                   host_ip, host_ip_range, host_operating_system,
471
                   host_os_cpe, arrays of found_app, app_version,
472
                   app_cpe
473
    """
474
475
    report = Report(gmp=gmp)
476
477
    date_time = datetime.datetime.now()
478
479
    for entry in hosts_dump:
480
        if entry[3] is None:
481
            error_and_exit("The JSON format is not correct.")
482
        name = entry[0]
483
        print(f"Creating Results for the host {name}")
484
        ips = entry[1]
485
        if isinstance(ips, str):
486
            ips = [ips]
487
        os = entry[3]
488
        os_cpe = convert_cpe23_to_cpe22(entry[4])[0]
489
490
        cpes = []
491
        # entry[7] should be the CPEs ...
492
        if entry[7] is not None:
493
            if isinstance(entry[7], str):
494
                cpes.extend(get_cpe(gmp, entry[7]))
495
            else:
496
                for cpe in entry[7]:
497
                    if cpe:
498
                        cpes.extend(get_cpe(gmp, cpe))
499
500
        vulns = cpe_list.get_cves(cpes)
501
        if vulns:
502
            print("Found CVEs!")
503
            for ip in ips:
504
                report.add_results(
505
                    ip=ip,
506
                    hostname=name,
507
                    cpes=vulns,
508
                    cpeo=os_cpe,
509
                    os=os,
510
                    date_time=date_time,
511
                )
512
513
    report.finish_report()
514
    report_id = report.send_report()
515
    print(f"New CVE Scanner Report {report_id} imported.")
516
517
518
def main(gmp, args):
519
    # pylint: disable=undefined-variable
520
521
    parsed_args = parse_args(args=args)
522
523
    recreate = False
524
    if parsed_args.create_list == 'recreate':
525
        recreate = True
526
    if parsed_args.create_list != 'no_creation':
527
        print("Generating CVE to CPE list.")
528
        list_generator = ListGenerator(
529
            gmp, filename=Path(parsed_args.list).absolute(), recreate=recreate
530
        )
531
        list_generator.create_cve_list()
532
        print("Generation of CVE to CPE list done.")
533
    if parsed_args.json_file:
534
        cpe_list = CPELookup(parsed_args.list)
535
        print("Looking up hosts ...")
536
        with open(parsed_args.json_file, 'r') as fp:
537
            parse_json(gmp, json.load(fp)[0]['results'], cpe_list)
538
539
540
if __name__ == '__gmp__':
541
    main(gmp, args)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable args does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable gmp does not seem to be defined.
Loading history...
542