Completed
Push — master ( a3f003...f81eb3 )
by Juan José
31s queued 21s
created

NotusMetadataHandler.get_family_driver_linkers()   A

Complexity

Conditions 5

Size

Total Lines 30
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 14
nop 1
dl 0
loc 30
rs 9.2333
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
""" Provide functions to upload Notus Metadata in the Redis Cache. """
21
22
import logging
23
24
import ast
25
import os
26
27
from glob import glob
28
from hashlib import sha256
29
from pathlib import Path
30
from csv import DictReader
31
from typing import List, Dict, Optional, IO
32
33
from ospd_openvas.db import MainDB
34
from ospd_openvas.nvticache import NVTICache
35
from ospd_openvas.errors import OspdOpenvasError
36
from ospd_openvas.openvas import Openvas
37
38
logger = logging.getLogger(__name__)
39
40
41
# The expected field names in CSV files
42
EXPECTED_FIELD_NAMES_LIST = [
43
    "OID",
44
    "TITLE",
45
    "CREATION_DATE",
46
    "LAST_MODIFICATION",
47
    "SOURCE_PKGS",
48
    "ADVISORY_ID",
49
    "SEVERITY_ORIGIN",
50
    "SEVERITY_DATE",
51
    "SEVERITY_VECTOR",
52
    "ADVISORY_XREF",
53
    "DESCRIPTION",
54
    "INSIGHT",
55
    "AFFECTED",
56
    "CVE_LIST",
57
    "BINARY_PACKAGES_FOR_RELEASES",
58
    "XREFS",
59
    "FILENAME",
60
]
61
62
METADATA_DIRECTORY_NAME = "notus_metadata"
63
64
# Metadata constant field definitions
65
SCRIPT_CATEGORY = "3"  # ACT_GATHER_INFO
66
SCRIPT_TIMEOUT = "0"
67
BIDS = ""
68
REQUIRED_KEYS = ""
69
MANDATORY_KEYS = ""
70
EXCLUDED_KEYS = ""
71
REQUIRED_UDP_PORTS = ""
72
REQUIRED_PORTS = ""
73
DEPENDENCIES = ""
74
75
76
class NotusMetadataHandler:
77
    """Class to perform checksum checks and upload metadata for
78
    CSV files that were created by the Notus Generator."""
79
80
    def __init__(self, nvti: NVTICache = None, metadata_path: str = None):
81
        self._nvti = nvti
82
        self._metadata_path = metadata_path
83
        self._openvas_settings_dict = None
84
85
    @property
86
    def nvti(self) -> NVTICache:
87
        if self._nvti is None:
88
            try:
89
                maindb = MainDB()
90
                self._nvti = NVTICache(maindb)
91
            except SystemExit:
92
                raise OspdOpenvasError(
93
                    "Could not connect to the Redis KB"
94
                ) from None
95
96
        return self._nvti
97
98
    @property
99
    def metadata_path(self) -> str:
100
        """Find out where the CSV files containing the metadata
101
        are on the file system, depending on whether this machine
102
        is a GSM or GVM in a development environment.
103
104
        Returns:
105
            A full path to the directory that contains all Notus
106
            metadata.
107
        """
108
        if self._metadata_path is None:
109
            # Openvas is installed and the plugins folder configured.
110
            plugins_folder = self.openvas_setting.get("plugins_folder")
111
            if plugins_folder:
112
                self._metadata_path = (
113
                    f'{plugins_folder}/{METADATA_DIRECTORY_NAME}/'
114
                )
115
                return self._metadata_path
116
117
            try:
118
                # From the development environment - Not used in production
119
                install_prefix = os.environ["INSTALL_PREFIX"]
120
            except KeyError:
121
                install_prefix = None
122
123
            if not install_prefix:
124
                # Fall back to the path used in production
125
                self._metadata_path = (
126
                    f'/opt/greenbone/feed/plugins/{METADATA_DIRECTORY_NAME}/'
127
                )
128
            else:
129
                self._metadata_path = f'{install_prefix}/var/lib/openvas/plugins/{METADATA_DIRECTORY_NAME}/'  # pylint: disable=C0301
130
131
        return self._metadata_path
132
133
    @property
134
    def openvas_setting(self):
135
        """Set OpenVAS option."""
136
        if self._openvas_settings_dict is None:
137
            openvas_object = Openvas()
138
            self._openvas_settings_dict = openvas_object.get_settings()
139
        return self._openvas_settings_dict
140
141
    def _get_csv_filepaths(self) -> List[Path]:
142
        """Get a list of absolute file paths to all detected CSV files
143
        in the relevant directory.
144
145
        Returns:
146
            A Path object that contains the absolute file path.
147
        """
148
        return [
149
            Path(csv_file).resolve()
150
            for csv_file in glob(f'{self.metadata_path}*.csv')
151
        ]
152
153
    def _check_field_names_lsc(self, field_names_list: list) -> bool:
154
        """Check if the field names of the parsed CSV file are exactly
155
        as expected to confirm that this version of the CSV format for
156
        Notus is supported by this module.
157
158
        Arguments:
159
            field_names_list: A list of field names such as ["OID", "TITLE",...]
160
161
        Returns:
162
            Whether the parsed CSV file conforms to the expected format.
163
        """
164
        if not EXPECTED_FIELD_NAMES_LIST == field_names_list:
165
            return False
166
        return True
167
168
    def _check_advisory_dict(self, advisory_dict: dict) -> bool:
169
        """Check a row of the parsed CSV file to confirm that
170
        no field is missing. Also check if any lists are empty
171
        that should never be empty. This should avoid unexpected
172
        runtime errors when the CSV file is incomplete. The QA-check
173
        in the Notus Generator should already catch something like this
174
        before it happens, but this is another check just to be sure.
175
176
        Arguments:
177
            advisory_dict: Metadata for one vendor advisory
178
                           in the form of a dict.
179
180
        Returns:
181
            Whether this advisory_dict is as expected or not.
182
        """
183
        # Check if there are any empty fields that shouldn't be empty.
184
        # Skip those that are incorrect.
185
        for (key, value) in advisory_dict.items():
186
            # The value is missing entirely
187
            if not value:
188
                return False
189
            # A list is empty when it shouldn't be
190
            try:
191
                if key == "SOURCE_PKGS" and len(ast.literal_eval(value)) == 0:
192
                    return False
193
            except (ValueError, TypeError):
194
                # Expected a list, but this was not a list
195
                return False
196
        return True
197
198
    def _format_xrefs(self, advisory_xref_string: str, xrefs_list: list) -> str:
199
        """Create a string that contains all links for this advisory, to be
200
        inserted into the Redis KB.
201
202
        Arguments:
203
            advisory_xref_string: A link to the official advisory page.
204
            xrefs_list: A list of URLs that were mentioned
205
                        in the advisory itself.
206
207
        Returns:
208
            All URLs separated by ", ".
209
            Example: URL:www.example.com, URL:www.example2.com
210
        """
211
        formatted_list = list()
212
        advisory_xref_string = f'URL:{advisory_xref_string}'
213
        formatted_list.append(advisory_xref_string)
214
        for url_string in xrefs_list:
215
            url_string = f'URL:{url_string}'
216
            formatted_list.append(url_string)
217
        return ", ".join(formatted_list)
218
219
    def is_checksum_correct(self, file_abs_path: Path) -> bool:
220
        """Perform a checksum check on a specific file, if
221
        signature checks have been enabled in OpenVAS.
222
223
        Arguments:
224
            file_abs_path: A Path object that points to the
225
                           absolute path of a file.
226
227
        Returns:
228
            Whether the checksum check was successful or not.
229
            Also returns true if the checksum check is disabled.
230
        """
231
232
        no_signature_check = self.openvas_setting.get("nasl_no_signature_check")
233
        if not no_signature_check:
234
            with file_abs_path.open("rb") as file_file_bytes:
235
                sha256_object = sha256()
236
                # Read chunks of 4096 bytes sequentially to avoid
237
                # filling up the RAM if the file is extremely large
238
                for byte_block in iter(lambda: file_file_bytes.read(4096), b""):
0 ignored issues
show
introduced by
The variable file_file_bytes does not seem to be defined in case BooleanNotNode on line 233 is False. Are you sure this can never be the case?
Loading history...
239
                    sha256_object.update(byte_block)
240
241
                # Calculate the checksum for this file
242
                file_calculated_checksum_string = sha256_object.hexdigest()
243
                # Extract the downloaded checksum for this file
244
                # from the Redis KB
245
                file_downloaded_checksum_string = self.nvti.get_file_checksum(
246
                    file_abs_path
247
                )
248
249
                # Checksum check
250
                if (
251
                    not file_calculated_checksum_string
252
                    == file_downloaded_checksum_string
253
                ):
254
                    return False
255
        # Checksum check was either successful or it was skipped
256
        return True
257
258
    def upload_lsc_from_csv_reader(
259
        self,
260
        file_name: str,
261
        family: str,
262
        general_metadata_dict: Dict,
263
        csv_reader: DictReader,
264
    ) -> bool:
265
        """For each advisory_dict, write its contents to the
266
        Redis KB as metadata.
267
268
        Arguments:
269
            file_name: CSV file name with metadata to be uploaded
270
            general_metadata_dict: General metadata common for all advisories
271
                                   in the CSV file.
272
            csv_reader: DictReader iterator to access the advisories
273
274
        Return True if success, False otherwise
275
        """
276
277
        loaded = 0
278
        total = 0
279
        for advisory_dict in csv_reader:
280
            # Make sure that no element is missing in the advisory_dict,
281
            # else skip that advisory
282
            total += 1
283
            is_correct = self._check_advisory_dict(advisory_dict)
284
            if not is_correct:
285
                continue
286
            # For each advisory_dict,
287
            # write its contents to the Redis KB as metadata.
288
            # Create a list with all the metadata. Refer to:
289
            # https://github.com/greenbone/ospd-openvas/blob/232d04e72d2af0199d60324e8820d9e73498a831/ospd_openvas/db.py#L39 # pylint: disable=C0321
290
            advisory_metadata_list = list()
291
292
            oid = advisory_dict["OID"]
293
294
            # Advisory virtual location
295
            filename = advisory_dict["FILENAME"]
296
            advisory_metadata_list.append(
297
                f'{METADATA_DIRECTORY_NAME}/{filename}'
298
            )
299
300
            # Required keys
301
            advisory_metadata_list.append(REQUIRED_KEYS)
302
            # Mandatory keys
303
            advisory_metadata_list.append(MANDATORY_KEYS)
304
            # Excluded keys
305
            advisory_metadata_list.append(EXCLUDED_KEYS)
306
            # Required UDP ports
307
            advisory_metadata_list.append(REQUIRED_UDP_PORTS)
308
            # Required ports
309
            advisory_metadata_list.append(REQUIRED_PORTS)
310
            # Dependencies
311
            advisory_metadata_list.append(DEPENDENCIES)
312
            # Tags
313
            tags_string = (
314
                "severity_origin={}|severity_date={}|"
315
                "severity_vector={}|last_modification={}|"
316
                "creation_date={}|summary={}|vuldetect={}|"
317
                "insight={}|affected={}|solution={}|"
318
                "solution_type={}|qod_type={}"
319
            )
320
            tags_string = tags_string.format(
321
                advisory_dict["SEVERITY_ORIGIN"],
322
                advisory_dict["SEVERITY_DATE"],
323
                advisory_dict["SEVERITY_VECTOR"],
324
                advisory_dict["LAST_MODIFICATION"],
325
                advisory_dict["CREATION_DATE"],
326
                advisory_dict["DESCRIPTION"],
327
                general_metadata_dict["VULDETECT"],
328
                advisory_dict["INSIGHT"],
329
                advisory_dict["AFFECTED"],
330
                general_metadata_dict["SOLUTION"],
331
                general_metadata_dict["SOLUTION_TYPE"],
332
                general_metadata_dict["QOD_TYPE"],
333
            )
334
            advisory_metadata_list.append(tags_string)
335
            # CVEs
336
            advisory_metadata_list.append(
337
                ", ".join(ast.literal_eval(advisory_dict["CVE_LIST"]))
338
            )
339
340
            advisory_metadata_list.append(BIDS)
341
            # XREFS
342
            advisory_metadata_list.append(
343
                self._format_xrefs(
344
                    advisory_dict["ADVISORY_XREF"],
345
                    ast.literal_eval(advisory_dict["XREFS"]),
346
                )
347
            )
348
349
            # Script category
350
            advisory_metadata_list.append(SCRIPT_CATEGORY)
351
            # Script timeout
352
            advisory_metadata_list.append(SCRIPT_TIMEOUT)
353
            # Script family
354
            advisory_metadata_list.append(family)
355
            # Script Name / Title
356
            advisory_metadata_list.append(advisory_dict["TITLE"])
357
358
            # Write the metadata list to the respective Redis KB key,
359
            # overwriting any existing values
360
            kb_key_string = f'nvt:{oid}'
361
            try:
362
                self.nvti.add_vt_to_cache(
363
                    vt_id=kb_key_string, vt=advisory_metadata_list
364
                )
365
            except OspdOpenvasError:
366
                logger.warning(
367
                    "LSC will not be loaded. The advisory_metadata_"
368
                    "list was either not a list or does not include "
369
                    "15 entries"
370
                )
371
                continue
372
            loaded += 1
373
374
        logger.debug(
375
            "Loaded %d/%d advisories from %s", loaded, total, file_name
376
        )
377
        return loaded == total
378
379
    def update_metadata(self) -> None:
380
        """Parse all CSV files that are present in the
381
        Notus metadata directory, perform a checksum check,
382
        read their metadata, format some fields
383
        and write this information to the Redis KB.
384
        """
385
386
        # Check if Notus is enabled
387
        if not self.openvas_setting.get("table_driven_lsc"):
388
            return
389
390
        logger.debug("Starting the Notus metadata load up")
391
        # Get a list of all CSV files in that directory with their absolute path
392
        csv_abs_filepaths_list = self._get_csv_filepaths()
393
394
        # Read each CSV file
395
        for csv_abs_path in csv_abs_filepaths_list:
396
            # Check the checksums, unless they have been disabled
397
            if not self.is_checksum_correct(csv_abs_path):
398
                # Skip this file if the checksum does not match
399
                logger.warning('Checksum for %s failed', csv_abs_path)
400
                continue
401
            logger.debug("Checksum check for %s successful", csv_abs_path)
402
            with csv_abs_path.open("r") as csv_file:
403
                # Skip the license header, so the actual content
404
                # can be parsed by the DictReader
405
406
                # Get the family from the Notus metadata csv file.
407
                family_and_driver_dict = self.parse_family_driver_link(csv_file)
408
                family, _ = family_and_driver_dict.popitem()
409
410
                general_metadata_dict = dict()
411
                for line_string in csv_file:
412
                    if line_string.startswith("{"):
413
                        general_metadata_dict = ast.literal_eval(line_string)
414
                        break
415
416
                # Check if the file can be parsed by the CSV module
417
                reader = DictReader(csv_file)
418
                # Check if the CSV file has the expected field names,
419
                # else skip the file
420
                is_correct = self._check_field_names_lsc(reader.fieldnames)
421
                if not is_correct:
422
                    logger.warning(
423
                        'Field names check for %s failed', csv_abs_path
424
                    )
425
                    continue
426
427
                file_name = csv_abs_path.name
428
                if not self.upload_lsc_from_csv_reader(
429
                    file_name, family, general_metadata_dict, reader
430
                ):
431
                    logger.warning(
432
                        "Some advaisory was not loaded from %s", file_name
433
                    )
434
435
        logger.debug("Notus metadata load up finished.")
436
437
    def parse_family_driver_link(self, csv_file: IO) -> Optional[Dict]:
438
        """Return the dictionary from the Notus metadata csv file which
439
        holds the driver script OID for the corresponding LSC family.
440
        This dictionary has one entry:
441
            E.g. {'LSC family name': 'Driver script OID'}
442
443
        Arguments:
444
            csv_file: Opened file descriptor to Notus metadata csv file.
445
446
        Return: the dictionary if success. None otherwise.
447
        """
448
        # Find the family name - driver script linker
449
        csv_file.seek(0)
450
        for line_string in csv_file:
451
            if line_string.startswith("link = "):
452
                link = line_string
453
                break
454
        if link:
0 ignored issues
show
introduced by
The variable link does not seem to be defined for all execution paths.
Loading history...
455
            return ast.literal_eval(link.strip("link = "))
456
457
    def get_family_driver_linkers(self) -> Optional[Dict]:
458
        """Get the a collection of advisory families supported
459
        by Notus and the linked OID of the driver script to run
460
        the Notus scanner for the given family
461
462
        This method always returns a dict with the supported families,
463
        even if Notus Scanner is disabled.
464
        """
465
466
        # Get a list of all CSV files in that directory with their absolute path
467
        csv_abs_filepaths_list = self._get_csv_filepaths()
468
469
        family_driver_linkers = {}
470
        # Read each CSV file
471
        for csv_abs_path in csv_abs_filepaths_list:
472
            # Check the checksums, unless they have been disabled
473
            if not self.is_checksum_correct(csv_abs_path):
474
                # Skip this file if the checksum does not match
475
                logger.warning('Checksum for %s failed', csv_abs_path)
476
                continue
477
            logger.debug("Checksum check for %s successful", csv_abs_path)
478
479
            dict_entry = None
480
            with csv_abs_path.open("r") as csv_file:
481
                dict_entry = self.parse_family_driver_link(csv_file)
482
483
            if dict_entry:
484
                family_driver_linkers.update(dict_entry)
485
486
        return family_driver_linkers
487