Passed
Pull Request — master (#376)
by Jaspar
02:03
created

CPELookup.finish_lookup()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
import json
20
import csv
21
import datetime
22
import time
23
24
from pathlib import Path
25
from typing import Dict, Tuple
26
from argparse import ArgumentParser, RawTextHelpFormatter, Namespace
27
from lxml import etree as e
28
from cpe import CPE
29
from gvm.protocols.gmp import Gmp
30
from gvm.protocols.latest import InfoType
31
from gvm.xml import pretty_print
32
from gvmtools.helper import generate_uuid, error_and_exit
33
34
35
HELP_TEXT = (
36
    'This script creates a cve report from a JSON document.\n'
37
    'The JSON document needs to be formatted like this: '
38
    '['
39
    '    {'
40
    '        "headings": ['
41
    '            "name",'
42
    '            "IP Address",'
43
    '            "IP range",'
44
    '            "Operating System",'
45
    '            "CPE String 23",'
46
    '            "Name",'
47
    '            "Full Version (version)",'
48
    '            "CPE String 23"'
49
    '        ],'
50
    '        ...,'
51
    '        "results": ['
52
    '            ['
53
    '                "foo",'
54
    '                "127.0.0.1",'
55
    '                "127.0.0.1/32",'
56
    '                "Some Windows",'
57
    '                "cpe:2.3:o:microsoft:some_windows:-:*:*:*:*:*:*:*",'
58
    '                ['
59
    '                    "Some Microsoftware",'
60
    '                    .'
61
    '                ],'
62
    '                ['
63
    '                    "0.1",'
64
    '                    ...'
65
    '                ],'
66
    '                ['
67
    '                    "cpe:2.3:a:microsoft:microsoftware:0.1:*:*:*:*:*:*:*",'
68
    '                    ...'
69
    '                ]'
70
    '            ],'
71
    '        ]'
72
    '    }'
73
    ']'
74
    ' Usable with gvm-script (gvm-tools). Help: gvm-script -h'
75
)
76
77
78
class CPELookup:
79
    """Class handles the CPEs"""
80
81
    def __init__(self, filename: Path):
82
        try:
83
            self.file = open(filename, 'r')
84
            self.reader = csv.reader(self.file)
85
        except FileNotFoundError:
86
            error_and_exit(
87
                f'There is no file "{filename}". '
88
                'Maybe you need to create a list first. Run with '
89
                f'argument "++create-list +f {filename}", to create '
90
                'a new list, or pass the correct location of an existing list.'
91
            )
92
93
    def get_cves(self, cpes):
94
        """Get CVEs for the CPEs from the CSV List"""
95
        d1 = datetime.datetime.now()
96
        print(f'Serching CVEs for {str(len(cpes))}:', end=None)
97
        vulns = {}
98
        i = 0
99
        for cpe in cpes:
100
            vulns[cpe] = {}
101
        for row in self.reader:  # O(n)
102
            for cpe in cpes:
103
                if cpe in row[0]:
104
                    vulns[cpe][row[1].strip("'")] = float(row[2].strip("'"))
105
                    i = i + 1
106
        self.file.seek(0)
107
        d2 = datetime.datetime.now()
108
        print(f'Found {str(i)} CVEs. Time consumed: {str(d2 - d1)}')
109
110
        return vulns
111
112
    def finish_lookup(self):
113
        self.file.close()
114
115
116
class ListGenerator:
117
    """
118
    Creating the CPE to CVE list used for the report generation
119
    in this this script.
120
    """
121
122
    def __init__(self, gmp: Gmp, filename: Path, recreate: bool):
123
        self.gmp = gmp
124
        if filename.exists():
125
            if recreate:
126
                filename.unlink()
127
            else:
128
                error_and_exit(
129
                    f'The file "{filename}" already exists. '
130
                    'If you want to delete the old list and '
131
                    'recreate the list run with "++create-list '
132
                    f'recreate +f {filename}"'
133
                )
134
        self.file = open(filename, 'w')
135
136
    def _cpe_to_cve(self, resp):
137
        """ Write the CPEs and CVEs to the list """
138
        cve_tags = resp.findall('info')
139
        for cve_tag in cve_tags[
140
            :-1
141
        ]:  # -1 because the last info tag is a wrongy. :D
142
            cve = None
143
            cpes = None
144
            if 'id' in cve_tag.attrib:
145
                cve = cve_tag.attrib['id']
146
                cpes = cve_tag.find('cve').find('products').text
147
                cvss = cve_tag.find('cve').find('cvss').text
148
                if cpes:
149
                    for cpe in cpes.strip().split(' '):
150
                        print(
151
                            f"'{cpe}','{cve}','{cvss}'",
152
                            file=self.file,
153
                            end='\n',
154
                        )
155
156
    def create_cve_list(self, step: int = 3000):
157
        """Creates a CPE to CVE list in a CSV format:
158
        'cpe', 'cve', 'cvss'
159
        The CPE's have a 1-to-1-relation to the CVE's
160
        so CPE's can appear more then once in this
161
        list
162
163
        step(int): How many CVEs will be requested from the GSM
164
                   in one request. Be careful with higher values.
165
                   You will need to set the default timeout in
166
                   gvm-tools higher if you set step >3000. A higher
167
                   step will make the list generation faster.
168
        """
169
        resp = self.gmp.get_info_list(info_type=InfoType.CVE, filter='rows=1')
170
        count = resp.find('info_count').text
171
172
        print(f'Creating CPE to CVE list. Found {count} CVE\'s.')
173
174
        first = 0
175
        counter = int(count)
176
        d1 = datetime.datetime.now()
177
        print(f'[{" " * 50}] | ({str(first)}/{count})', flush=True, end='')
178
        while counter > step:
179
            resp = self.gmp.get_info_list(
180
                info_type=InfoType.CVE, filter=f'rows={step} first={first}'
181
            )
182
            # refresh the counters
183
            counter = counter - step
184
            first = first + step
185
186
            self._cpe_to_cve(resp)
187
            points = int(((first / int(count)) * 100) / 2)
188
            percent = str("." * points + " " * (50 - points))
189
            print(
190
                f'\r[{percent}] | ({str(first)}/{count}) '
191
                f'TIME CONSUMED: {str(datetime.datetime.now() - d1)}',
192
                flush=True,
193
                end='',
194
            )
195
196
        # find the rest
197
        resp = self.gmp.get_info_list(
198
            info_type=InfoType.CVE, filter=f'rows={counter} first={first}'
199
        )
200
        self._cpe_to_cve(resp)
201
202
        self.file.close()
203
204
205
class Report:
206
    def __init__(self, gmp):
207
        self.results = e.Element('results', {'start': '1', 'max': '-1'})
208
        self.hosts = []
209
        self.report = None
210
211
        self.gmp = gmp
212
213
    def finish_report(self):
214
        report_format_id = 'd5da9f67-8551-4e51-807b-b6a873d70e34'
215
        self.report_id = generate_uuid()
216
        self.report = e.Element(
217
            'report',
218
            {
219
                'id': self.report_id,
220
                'format_id': report_format_id,
221
                'extension': 'xml',
222
                'content_type': 'text/xml',
223
            },
224
        )
225
        owner_elem = e.SubElement(self.report, 'owner')
226
        e.SubElement(owner_elem, 'name').text = ''
227
        e.SubElement(self.report, 'name').text = 'Report created from JSON-File'
228
229
        inner_report = e.SubElement(
230
            self.report, 'report', {'id': self.report_id}
231
        )
232
        ports_elem = e.SubElement(
233
            inner_report, 'ports', {'start': '1', 'max': '-1'}
234
        )
235
236
        inner_report.append(ports_elem)
237
        inner_report.append(self.results)
238
        for host in self.hosts:
239
            inner_report.append(host)
240
        self.report.append(inner_report)
241
242
    def send_report(self) -> str:
243
        the_time = time.strftime("%Y/%m/%d-%H:%M:%S")
244
        task_id = ''
245
        task_name = "CVE_Scan_Report_{}".format(the_time)
246
247
        res = self.gmp.create_container_task(
248
            name=task_name, comment="Created with gvm-tools."
249
        )
250
251
        task_id = res.xpath('//@id')[0]
252
253
        report = e.tostring(self.report)
254
255
        res = self.gmp.import_report(report, task_id=task_id, in_assets=True)
256
257
        return res.xpath('//@id')[0]
258
259
    def generate_host_detail(
260
        self,
261
        name,
262
        value,
263
        source_name=None,
264
        source_description=None,
265
        source_type=None,
266
    ):
267
        """ Generating a host details xml element """
268
        host_detail_elem = e.Element('detail')
269
        e.SubElement(host_detail_elem, 'name').text = name
270
        e.SubElement(host_detail_elem, 'value').text = value
271
272
        if source_name:
273
            source_elem = e.SubElement(host_detail_elem, 'source')
274
            e.SubElement(source_elem, 'name').text = source_name
275
            if source_type:
276
                e.SubElement(source_elem, 'type').text = source_type
277
            if source_description:
278
                e.SubElement(
279
                    source_elem, 'description'
280
                ).text = source_description
281
282
        return host_detail_elem
283
284
    def add_results(self, ip, hostname, cpes: Dict, cpeo, os, date_time):
285
        host_id = generate_uuid()
286
        source_name = 'gvm-tools'
287
        date_format = '%Y-%m-%dT%H:%M:%S'
288
        date_time = f'{date_time.strftime(date_format)}Z'
289
290
        host_elem = e.Element('host')
291
        e.SubElement(host_elem, 'ip').text = ip
292
        e.SubElement(host_elem, 'asset', {'asset_id': host_id})
293
        e.SubElement(host_elem, 'start').text = date_time
294
        e.SubElement(host_elem, 'end').text = date_time
295
        host_result_count_elem = e.SubElement(host_elem, 'result_count')
296
        host_elem.append(
297
            self.generate_host_detail(
298
                name='hostname', value=hostname, source_name=source_name
299
            )
300
        )
301
        host_elem.append(
302
            self.generate_host_detail(
303
                name='best_os_txt',
304
                value=os,
305
                source_name=source_name,
306
                source_description="Host Details",
307
            )
308
        )
309
        host_elem.append(
310
            self.generate_host_detail(
311
                name='best_os_cpe',
312
                value=cpeo,
313
                source_name=source_name,
314
                source_description="Host Details",
315
            )
316
        )
317
318
        host_details = 0
319
        for cpe, cves in cpes.items():
320
            if cves:
321
                for cve, cvss in cves.items():
322
                    result_id = generate_uuid()
323
                    result = e.Element('result', {'id': result_id})
324
                    e.SubElement(result, 'name').text = f'Result for host {ip}'
325
                    e.SubElement(
326
                        result, 'comment'
327
                    ).text = 'Imported with gvm-tools'
328
                    e.SubElement(result, 'modification_time').text = date_time
329
                    e.SubElement(result, 'creation_time').text = date_time
330
                    detect_elem = e.Element('detection')
331
                    detect_result_elem = e.SubElement(
332
                        detect_elem, 'result', {'id': result_id}
333
                    )
334
                    details_elem = e.SubElement(detect_result_elem, 'details')
335
                    # We need to add the detection details here
336
                    # but actually they are not imported to GSM anyways ...
337
                    e.SubElement(details_elem, 'detail')
338
339
                    result_host_elem = e.Element('host')
340
                    result_host_elem.text = ip
341
                    e.SubElement(
342
                        result_host_elem, 'asset', {'asset_id': host_id}
343
                    )
344
                    e.SubElement(result_host_elem, 'hostname').text = hostname
345
                    result.append(result_host_elem)
346
347
                    nvt_elem = e.Element('nvt', {'oid': cve})
348
                    e.SubElement(nvt_elem, 'type').text = 'cve'
349
                    e.SubElement(nvt_elem, 'name').text = cve
350
                    e.SubElement(nvt_elem, 'cvss_base').text = str(cvss)
351
                    e.SubElement(nvt_elem, 'cve').text = cve
352
353
                    result.append(nvt_elem)
354
355
                    e.SubElement(result, 'severity').text = str(cvss)
356
357
                    host_elem.append(
358
                        self.generate_host_detail(
359
                            name='App',
360
                            value=cpe,
361
                            source_type='cve',
362
                            source_name=cve,
363
                            source_description='CVE Scanner',
364
                        )
365
                    )
366
                    host_details = host_details + 1
367
368
                    self.results.append(result)
369
        e.SubElement(host_result_count_elem, 'page').text = str(host_details)
370
        self.hosts.append(host_elem)
371
372
373
class Hosts:
374
    """Class to store the host elements"""
375
376
    def __init__(self):
377
        self.hosts = []
378
379
    def add_host(self):
380
        pass
381
382
383
def convert_cpe23_to_cpe22(cpe: str) -> Tuple[str, bool]:
384
    """Convert a CPE v2.3 to a CPE v2.2
385
    returns the CPE v2.2 and True if no product
386
    version is given
387
    """
388
    # MAKE ME BETTER!!!
389
    cpe = CPE(cpe)
390
    any_version = False
391
    if cpe.get_version()[0] == '*':
392
        any_version = True
393
    return (
394
        str(CPE(cpe.as_uri_2_3(), CPE.VERSION_2_2)).replace('CPE v2.2: ', ''),
395
        any_version,
396
    )
397
398
399 View Code Duplication
def parse_args(args: Namespace) -> Namespace:  # pylint: disable=unused-argument
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
400
    """ Parsing args ... """
401
402
    parser = ArgumentParser(
403
        prefix_chars='+',
404
        add_help=False,
405
        formatter_class=RawTextHelpFormatter,
406
        description=HELP_TEXT,
407
    )
408
409
    parser.add_argument(
410
        '+h',
411
        '++help',
412
        action='help',
413
        help='Show this help message and exit.',
414
    )
415
416
    parser.add_argument(
417
        '++create-list',
418
        nargs='?',
419
        type=str,
420
        choices=('no_creation', 'recreate', 'create'),
421
        const='create',
422
        default='no_creation',
423
        dest="create_list",
424
        help="Create the CPE to CVE helper list",
425
    )
426
427
    parser.add_argument(
428
        '+l',
429
        '++list',
430
        type=str,
431
        dest="list",
432
        required=True,
433
        help="Create the CPE to CVE helper list",
434
    )
435
436
    parser.add_argument(
437
        '+f',
438
        '++file',
439
        type=str,
440
        dest="json_file",
441
        help="File that should be parsed",
442
    )
443
444
    args, _ = parser.parse_known_args()
445
446
    return args
447
448
449
def get_cpe(gmp, cpe):
450
    """Parse and return the CPE's from the JSON.
451
    Convert the CPEs to v2.2 and check if they have a
452
    version part. If not get this CPE in all versions
453
    from the GSM and return them. This may result in
454
    a lot of false positives or false negatives.
455
    """
456
    cpe = convert_cpe23_to_cpe22(cpe)
457
    if cpe[1] is False:
458
        return [cpe[0]]
459
460
    cpes = []
461
    cpe_xml = gmp.get_info_list(
462
        info_type=InfoType.CPE, filter='rows=-1 uuid~"{}:"'.format(cpe[0])
463
    )
464
    infos = cpe_xml.findall('info')
465
    for cpe in infos[:-1]:  # -1 because the last info tag is a wrongy. :D
466
        cpes.append(cpe.get('id'))
467
    return cpes
468
469
470
def parse_json(gmp, hosts_dump, cpe_list):
471
    """Loads an JSON file and extracts host informations:
472
473
    Args:
474
        host_dump: the dumped json results, containing a hostname,
475
                   host_ip, host_ip_range, host_operating_system,
476
                   host_os_cpe, arrays of found_app, app_version,
477
                   app_cpe
478
    """
479
480
    report = Report(gmp=gmp)
481
482
    date_time = datetime.datetime.now()
483
484
    for entry in hosts_dump:
485
        if entry[3] is None:
486
            error_and_exit("The JSON format is not correct.")
487
        name = entry[0]
488
        print(f"Creating Results for the host {name}")
489
        ips = entry[1]
490
        if isinstance(ips, str):
491
            ips = [ips]
492
        os = entry[3]
493
        os_cpe = convert_cpe23_to_cpe22(entry[4])[0]
494
495
        cpes = []
496
        # entry[7] should be the CPEs ...
497
        if entry[7] is not None:
498
            if isinstance(entry[7], str):
499
                cpes.extend(get_cpe(gmp, entry[7]))
500
            else:
501
                for cpe in entry[7]:
502
                    if cpe:
503
                        cpes.extend(get_cpe(gmp, cpe))
504
505
        vulns = cpe_list.get_cves(cpes)
506
        if vulns:
507
            print("Found CVEs!")
508
            for ip in ips:
509
                report.add_results(
510
                    ip=ip,
511
                    hostname=name,
512
                    cpes=vulns,
513
                    cpeo=os_cpe,
514
                    os=os,
515
                    date_time=date_time,
516
                )
517
518
    report.finish_report()
519
    report_id = report.send_report()
520
    print(f"New CVE Scanner Report {report_id} imported.")
521
522
523
def main(gmp, args):
524
    # pylint: disable=undefined-variable
525
526
    parsed_args = parse_args(args=args)
527
528
    recreate = False
529
    if parsed_args.create_list == 'recreate':
530
        recreate = True
531
    if parsed_args.create_list != 'no_creation':
532
        print("Generating CVE to CPE list.")
533
        list_generator = ListGenerator(
534
            gmp, filename=Path(parsed_args.list).absolute(), recreate=recreate
535
        )
536
        list_generator.create_cve_list()
537
        print("Generation of CVE to CPE list done.")
538
    if parsed_args.json_file:
539
        cpe_list = CPELookup(parsed_args.list)
540
        print("Looking up hosts ...")
541
        with open(parsed_args.json_file, 'r') as fp:
542
            parse_json(gmp, json.load(fp)[0]['results'], cpe_list)
543
544
545
if __name__ == '__gmp__':
546
    main(gmp, args)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable gmp does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable args does not seem to be defined.
Loading history...
547