Passed
Pull Request — master (#376)
by Jaspar
01:15
created

CPELookup.get_cves()   A

Complexity

Conditions 5

Size

Total Lines 18
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 16
nop 2
dl 0
loc 18
rs 9.1333
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
from uuid import UUID
20
from cpe import CPE
21
22
from typing import List, Dict, Tuple
23
from datetime import date
24
import datetime
25
import time
26
from argparse import ArgumentParser, RawTextHelpFormatter
27
from lxml import etree as e
28
from gvm.protocols.latest import InfoType
29
from gvm.errors import GvmResponseError, GvmError
30
from gvm.xml import pretty_print
31
from gvmtools.helper import generate_uuid, error_and_exit
32
import json
33
import csv
34
from pathlib import Path
35
36
37
HELP_TEXT = (
38
    'This script creates a cve report from a JSON document.'
39
    ' Usable with gvm-script (gvm-tools)'
40
)
41
42
43
class CPELookup:
44
    """Class handles the CPEs"""
45
46
    def __init__(self, filename):
47
        try:
48
            self.file = open(filename, 'r')
49
            self.reader = csv.reader(self.file)
50
        except FileNotFoundError:
51
            error_and_exit(
52
                f'There is no file "{filename}". '
53
                'Maybe you need to create a list first. Run with '
54
                f'argument "++create-list +f {filename}", to create '
55
                'a new list, or pass the correct location of an existing list.'
56
            )
57
58
    def get_cves(self, cpes):
59
        """Get CVEs for the CPEs"""
60
        d1 = datetime.datetime.now()
61
        print(f'Serching CVEs for {str(len(cpes))}:', end=None)
62
        vulns = {}
63
        i = 0
64
        for cpe in cpes:
65
            vulns[cpe] = {}
66
        for row in self.reader:  # O(n)
67
            for cpe in cpes:
68
                if cpe in row[0]:
69
                    vulns[cpe][row[1].strip("'")] = float(row[2].strip("'"))
70
                    i = i + 1
71
        self.file.seek(0)
72
        d2 = datetime.datetime.now()
73
        print(f'Found {str(i)} CVEs. Time consumed: {str(d2 - d1)}')
74
75
        return vulns
76
77
    def finish_lookup(self):
78
        self.file.close()
79
80
81
class ListGenerator:
82
    """
83
    Creating the initial lists for this script.
84
    """
85
86
    def __init__(self, gmp, filename="cpes.csv"):
87
        self.gmp = gmp
88
        self.file = open(filename, 'w')
89
90
    def get_cve_from_cpe_id(self, cpe_id):
91
        cves = []
92
        return cves
93
94
    def cpe_to_cve(self, resp):
95
        cve_tags = resp.findall('info')
96
        for cve_tag in cve_tags[:-1]:
97
            cve = None
98
            cpes = None
99
            if 'id' in cve_tag.attrib:
100
                cve = cve_tag.attrib['id']
101
                cpes = cve_tag.find('cve').find('products').text
102
                cvss = cve_tag.find('cve').find('cvss').text
103
                if cpes:
104
                    for cpe in cpes.strip().split(' '):
105
                        print(
106
                            f"'{cpe}','{cve}','{cvss}'",
107
                            file=self.file,
108
                            end='\n',
109
                        )
110
111
    def create_cve_list(self, step=1000):
112
        resp = self.gmp.get_info_list(info_type=InfoType.CVE, filter='rows=1')
113
        count = resp.find('info_count').text
114
115
        counter = int(count)
116
        print(f'Found {count} CVEs.')
117
118
        first = 0
119
        d1 = datetime.datetime.now()
120
        while counter > step:
121
            resp = self.gmp.get_info_list(
122
                info_type=InfoType.CVE, filter=f'rows={step} first={first}'
123
            )
124
            # refresh the counters
125
            counter = counter - step
126
            first = first + step
127
128
            self.cpe_to_cve(resp)
129
            print(
130
                f'CVEs left: {str(counter)}/{count} TIME CONSUMED: {str(datetime.datetime.now() - d1)}'
131
            )
132
133
        # find the rest
134
        resp = self.gmp.get_info_list(
135
            info_type=InfoType.CVE, filter=f'rows={counter} first={first}'
136
        )
137
        self.cpe_to_cve(resp)
138
139
    def finish_list(self):
140
        self.file.close()
141
142
143 View Code Duplication
def generate_host_detail_elem(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
144
    name, value, source_name=None, source_description=None, source_type=None
145
):
146
    host_detail_elem = e.Element('detail')
147
    e.SubElement(host_detail_elem, 'name').text = name
148
    e.SubElement(host_detail_elem, 'value').text = value
149
150
    if source_name:
151
        source_elem = e.SubElement(host_detail_elem, 'source')
152
        e.SubElement(source_elem, 'name').text = source_name
153
        if source_type:
154
            e.SubElement(source_elem, 'type').text = source_type
155
        if source_description:
156
            e.SubElement(source_elem, 'description').text = source_description
157
158
    return host_detail_elem
159
160
161
class Report:
162
    def __init__(self, gmp):
163
        self.results = e.Element('results', {'start': '1', 'max': '-1'})
164
        self.hosts = []
165
        self.report = None
166
167
        self.gmp = gmp
168
169
    def finish_report(self):
170
        report_format_id = 'd5da9f67-8551-4e51-807b-b6a873d70e34'
171
        self.report_id = generate_uuid()
172
        self.report = e.Element(
173
            'report',
174
            {
175
                'id': self.report_id,
176
                'format_id': report_format_id,
177
                'extension': 'xml',
178
                'content_type': 'text/xml',
179
            },
180
        )
181
        owner_elem = e.SubElement(self.report, 'owner')
182
        e.SubElement(owner_elem, 'name').text = ''
183
        e.SubElement(
184
            self.report, 'name'
185
        ).text = f'Report created from JSON-File'
186
187
        inner_report = e.SubElement(
188
            self.report, 'report', {'id': self.report_id}
189
        )
190
        ports_elem = e.SubElement(
191
            inner_report, 'ports', {'start': '1', 'max': '-1'}
192
        )
193
194
        inner_report.append(ports_elem)
195
        inner_report.append(self.results)
196
        for host in self.hosts:
197
            pretty_print(host)
198
            inner_report.append(host)
199
        self.report.append(inner_report)
200
201
    def send_report(self) -> str:
202
        the_time = time.strftime("%Y/%m/%d-%H:%M:%S")
203
        task_id = ''
204
        task_name = "CVE_Scan_Report_{}".format(the_time)
205
206
        res = self.gmp.create_container_task(
207
            name=task_name, comment="Created with gvm-tools."
208
        )
209
210
        task_id = res.xpath('//@id')[0]
211
212
        report = e.tostring(self.report)
213
214
        res = self.gmp.import_report(report, task_id=task_id, in_assets=True)
215
216
        return res.xpath('//@id')[0]
217
218
    def add_results(self, ip, hostname, cpes: Dict, cpeo, os, date_time):
219
        print("ADDING RESULTS")
220
        host_elem = e.Element('host')
221
        host_id = generate_uuid()
222
        e.SubElement(host_elem, 'ip').text = ip
223
        # e.SubElement(host_elem, 'asset', {'asset_id': host_id}).text = ''
224
225
        source_name = 'gvm-tools'
226
        print("ADDED HOST ELEM")
227
        host_elem.append(
228
            generate_host_detail_elem(
229
                name='hostname', value=hostname, source_name=source_name
230
            )
231
        )
232
        host_elem.append(
233
            generate_host_detail_elem(
234
                name='best_os_txt',
235
                value=os,
236
                source_name=source_name,
237
                source_description="Host Details",
238
            )
239
        )
240
        host_elem.append(
241
            generate_host_detail_elem(
242
                name='best_os_cpe',
243
                value=cpeo,
244
                source_name=source_name,
245
                source_description="Host Details",
246
            )
247
        )
248
249
        date_format = '%Y-%m-%dT%H:%M:%S'
250
        date_time = f'{date_time.strftime(date_format)}+01:00'
251
        print("SO FAR")
252
253
        for cpe, cves in cpes.items():
254
            print(f"UNPACKED CPE: {str(cpe)}:")
255
            print(cves)
256
            if cves:
257
                for cve, cvss in cves.items():
258
                    print("UNPACKED CVES")
259
                    result_id = generate_uuid()
260
                    result = e.Element('result', {'id': result_id})
261
                    print("RESULT")
262
                    e.SubElement(result, 'name').text = f'Result for host {ip}'
263
                    e.SubElement(
264
                        result, 'comment'
265
                    ).text = 'Imported with gvm-tools'
266
                    e.SubElement(result, 'modification_time').text = date_time
267
                    e.SubElement(result, 'creation_time').text = date_time
268
                    detect_elem = e.Element('detection')
269
                    detect_result_elem = e.SubElement(
270
                        detect_elem, 'result', {'id': result_id}
271
                    )
272
                    details_elem = e.SubElement(detect_result_elem, 'details')
273
274
                    result_host_elem = e.Element('host')
275
                    result_host_elem.text = ip
276
                    # e.SubElement(
277
                    #    host_elem, 'asset', {'asset_id': host_id}
278
                    # ).text = ''
279
                    e.SubElement(result_host_elem, 'hostname').text = hostname
280
                    result.append(result_host_elem)
281
282
                    nvt_elem = e.Element('nvt', {'oid': cve})
283
                    e.SubElement(nvt_elem, 'type').text = 'cve'
284
                    e.SubElement(nvt_elem, 'name').text = cve
285
                    e.SubElement(nvt_elem, 'cvss_base').text = str(cvss)
286
                    e.SubElement(nvt_elem, 'cve').text = cve
287
288
                    result.append(nvt_elem)
289
290
                    e.SubElement(result, 'severity').text = str(cvss)
291
292
                    host_elem.append(
293
                        generate_host_detail_elem(
294
                            name='App',
295
                            value=cpe,
296
                            source_type='cve',
297
                            source_name=cve,
298
                            source_description='CVE Scanner',
299
                        )
300
                    )
301
302
                    print(
303
                        e.tostring(result, pretty_print=True, with_tail=False)
304
                    )
305
                    self.results.append(result)
306
307
        self.hosts.append(host_elem)
308
309
310
class Hosts:
311
    """Class to store the host elements"""
312
313
    def __init__(self):
314
        self.hosts = []
315
316
    def add_host(self):
317
        pass
318
319
320
def convert_cpe23_to_cpe22(cpe):
321
    """Convert a CPE v2.3 to a CPE v2.2
322
    returns the CPE v2.2 and True if no product
323
    version is given
324
    """
325
    # MAKE ME BETTER!!!
326
    cpe = CPE(cpe)
327
    any_version = False
328
    if cpe.get_version()[0] == '*':
329
        any_version = True
330
    return (
331
        str(CPE(cpe.as_uri_2_3(), CPE.VERSION_2_2)).replace('CPE v2.2: ', ''),
332
        any_version,
333
    )
334
335
336
def parse_args(args):  # pylint: disable=unused-argument
337
    """ Parsing args ... """
338
339
    parser = ArgumentParser(
340
        prefix_chars='+',
341
        add_help=False,
342
        formatter_class=RawTextHelpFormatter,
343
        description=HELP_TEXT,
344
    )
345
346
    parser.add_argument(
347
        '+h',
348
        '++help',
349
        action='help',
350
        help='Show this help message and exit.',
351
    )
352
353
    parser.add_argument(
354
        '++create-list',
355
        action='store_true',
356
        dest="create_list",
357
        help="Create the CPE to CVE helper list",
358
    )
359
360
    parser.add_argument(
361
        '+l',
362
        '++list',
363
        type=str,
364
        dest="list",
365
        help="Create the CPE to CVE helper list",
366
    )
367
368
    parser.add_argument(
369
        '+f',
370
        '++file',
371
        type=str,
372
        dest="json_file",
373
        help="File that should be parsed",
374
    )
375
376
    args, _ = parser.parse_known_args()
377
378
    return args
379
380
381
def add_cpe_dict(cpe: e.Element) -> Dict:
382
    cves = []
383
    for cve in cpe.find('cpe').find('cves').findall('cve'):
384
        cves.append(cve.find('*').get('id'))
385
    return {cpe: [cves]}
386
387
388
def get_cpe(gmp, cpe):
389
    # print("GET")
390
    cpe = convert_cpe23_to_cpe22(cpe)
391
    # print(cpe)
392
    if cpe[1] is False:
393
        print(f'Found 1 CPE with version.')
394
        return [cpe[0]]
395
396
    cpes = []
397
    d2 = datetime.datetime.now()
398
    # print("Jo2")
399
    cpe_xml = gmp.get_info_list(
400
        info_type=InfoType.CPE, filter='rows=-1 uuid~"{}:"'.format(cpe[0])
401
    )
402
    infos = cpe_xml.findall('info')
403
    for cpe in infos[:-1]:
404
        cpes.append(cpe.get('id'))
405
    d3 = datetime.datetime.now()
406
    print(f'Found {str(len(infos[:-1]))} CPEs without version: {str(d3 - d2)}.')
407
    return cpes
408
409
410
def parse_json(gmp, hosts_dump, cpe_list):
411
    """Loads an JSON file and extracts host informations:
412
413
    Args:
414
        host_dump
415
416
    Returns:
417
        hosts:       The host list
418
    """
419
420
    report = Report(gmp=gmp)
421
422
    entries = []
423
    date_time = datetime.datetime.now()
424
425
    for entry in hosts_dump:
426
        if entry[3] is None:
427
            error_and_exit("The JSON format is not correct.")
428
        name = entry[0]
429
        print(f"Host {name}")
430
        ips = entry[1]
431
        ip_range = entry[2]
432
        os = entry[3]
433
        os_cpe = convert_cpe23_to_cpe22(entry[4])[0]
434
435
        objs = []
436
        # adding the app/apps in the host object (if there are any ...)
437
        # I hope this is not all to bad performing ...
438
        cpes = []
439
        # entry[7] should be cpes ...
440
        if entry[7] is not None:
441
            # print(entry[7])
442
            if isinstance(entry[7], str):
443
                cpes.extend(get_cpe(gmp, entry[7]))
444
            else:
445
                print(f'Entry CPEs {str(len(entry[7]))}')
446
                i = 0
447
                for cpe in entry[7]:
448
                    print(f'Entry {str(i)}: {cpe}')
449
                    if cpe:
450
                        cpes.extend(get_cpe(gmp, cpe))
451
                    i = i + 1
452
453
        vulns = cpe_list.get_cves(cpes)
454
        if vulns:
455
            print("WE GOT CPES")
456
            if isinstance(ips, str):
457
                ips = [ips]
458
            for ip in ips:
459
                report.add_results(
460
                    ip=ip,
461
                    hostname=name,
462
                    cpes=vulns,
463
                    cpeo=os_cpe,
464
                    os=os,
465
                    date_time=date_time,
466
                )
467
468
    report.finish_report()
469
    report_id = report.send_report()
470
    print(f"Sent report {report_id}.")
471
472
473
def main(gmp, args):
474
    # pylint: disable=undefined-variable
475
476
    parsed_args = parse_args(args=args)
477
478
    if parsed_args.create_list:
479
        print("Generating CVE to CPE list")
480
        list_generator = ListGenerator(
481
            gmp, filename=Path(parsed_args.list).absolute()
482
        )
483
        list_generator.create_cve_list()
484
        list_generator.finish_list()
485
        print("Done.")
486
    if parsed_args.json_file:
487
        cpe_list = CPELookup(parsed_args.list)
488
        print("Looking up hosts ...")
489
        with open(parsed_args.json_file, 'r') as fp:
490
            hosts = parse_json(gmp, json.load(fp)[0]['results'], cpe_list)
491
492
493
if __name__ == '__gmp__':
494
    main(gmp, args)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable args does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable gmp does not seem to be defined.
Loading history...
495