Passed
Pull Request — master (#364)
by Juan José
01:33
created

NotusMetadataHandler.update_metadata()   C

Complexity

Conditions 9

Size

Total Lines 51
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 9
eloc 28
nop 1
dl 0
loc 51
rs 6.6666
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
""" Provide functions to upload Notus Metadata in the Redis Cache. """
21
22
import logging
23
24
import ast
25
import os
26
27
from glob import glob
28
from hashlib import sha256
29
from pathlib import Path
30
from csv import DictReader
31
from typing import List, Dict
32
33
from ospd_openvas.db import MainDB
34
from ospd_openvas.nvticache import NVTICache
35
from ospd_openvas.errors import OspdOpenvasError
36
from ospd_openvas.openvas import Openvas
37
38
logger = logging.getLogger(__name__)
39
40
41
# The expected field names in CSV files
42
EXPECTED_FIELD_NAMES_LIST = [
43
    "OID",
44
    "TITLE",
45
    "CREATION_DATE",
46
    "LAST_MODIFICATION",
47
    "SOURCE_PKGS",
48
    "ADVISORY_ID",
49
    "SEVERITY_ORIGIN",
50
    "SEVERITY_DATE",
51
    "SEVERITY_VECTOR",
52
    "ADVISORY_XREF",
53
    "DESCRIPTION",
54
    "INSIGHT",
55
    "AFFECTED",
56
    "CVE_LIST",
57
    "BINARY_PACKAGES_FOR_RELEASES",
58
    "XREFS",
59
]
60
61
METADATA_DIRECTORY_NAME = "notus_metadata"
62
63
# Metadata constant field definitions
64
SCRIPT_CATEGORY = "3"  # ACT_GATHER_INFO
65
SCRIPT_TIMEOUT = "0"
66
SCRIPT_FAMILY = "Notus_LSC_Metadata"
67
BIDS = ""
68
REQUIRED_KEYS = ""
69
MANDATORY_KEYS = ""
70
EXCLUDED_KEYS = ""
71
REQUIRED_UDP_PORTS = ""
72
REQUIRED_PORTS = ""
73
DEPENDENCIES = ""
74
75
76
class NotusMetadataHandler:
77
    """Class to perform checksum checks and upload metadata for
78
    CSV files that were created by the Notus Generator."""
79
80
    def __init__(self, nvti: NVTICache = None, metadata_path: str = None):
81
        self._nvti = nvti
82
        self._metadata_path = metadata_path
83
        self._openvas_settings_dict = None
84
85
    @property
86
    def nvti(self) -> NVTICache:
87
        if self._nvti is None:
88
            try:
89
                maindb = MainDB()
90
                self._nvti = NVTICache(maindb)
91
            except SystemExit:
92
                raise OspdOpenvasError(
93
                    "Could not connect to the Redis KB"
94
                ) from None
95
96
        return self._nvti
97
98
    @property
99
    def metadata_path(self) -> str:
100
        """Find out where the CSV files containing the metadata
101
        are on the file system, depending on whether this machine
102
        is a GSM or GVM in a development environment.
103
104
        Returns:
105
            A full path to the directory that contains all Notus
106
            metadata.
107
        """
108
        if self._metadata_path is None:
109
            # Openvas is installed and the plugins folder configured.
110
            plugins_folder = self.openvas_setting.get("plugins_folder")
111
            if plugins_folder:
112
                self._metadata_path = (
113
                    f'{plugins_folder}/{METADATA_DIRECTORY_NAME}/'
114
                )
115
                return self._metadata_path
116
117
            try:
118
                # From the development environment - Not used in production
119
                install_prefix = os.environ["INSTALL_PREFIX"]
120
            except KeyError:
121
                install_prefix = None
122
123
            if not install_prefix:
124
                # Fall back to the path used in production
125
                self._metadata_path = (
126
                    f'/opt/greenbone/feed/plugins/{METADATA_DIRECTORY_NAME}/'
127
                )
128
            else:
129
                self._metadata_path = f'{install_prefix}/var/lib/openvas/plugins/{METADATA_DIRECTORY_NAME}/'  # pylint: disable=C0301
130
131
        return self._metadata_path
132
133
    @property
134
    def openvas_setting(self):
135
        """Set OpenVAS option."""
136
        if self._openvas_settings_dict is None:
137
            openvas_object = Openvas()
138
            self._openvas_settings_dict = openvas_object.get_settings()
139
        return self._openvas_settings_dict
140
141
    def _get_csv_filepaths(self) -> List[Path]:
142
        """Get a list of absolute file paths to all detected CSV files
143
        in the relevant directory.
144
145
        Returns:
146
            A Path object that contains the absolute file path.
147
        """
148
        return [
149
            Path(csv_file).resolve()
150
            for csv_file in glob(f'{self.metadata_path}*.csv')
151
        ]
152
153
    def _check_field_names_lsc(self, field_names_list: list) -> bool:
154
        """Check if the field names of the parsed CSV file are exactly
155
        as expected to confirm that this version of the CSV format for
156
        Notus is supported by this module.
157
158
        Arguments:
159
            field_names_list: A list of field names such as ["OID", "TITLE",...]
160
161
        Returns:
162
            Whether the parsed CSV file conforms to the expected format.
163
        """
164
        if not EXPECTED_FIELD_NAMES_LIST == field_names_list:
165
            return False
166
        return True
167
168
    def _check_advisory_dict(self, advisory_dict: dict) -> bool:
169
        """Check a row of the parsed CSV file to confirm that
170
        no field is missing. Also check if any lists are empty
171
        that should never be empty. This should avoid unexpected
172
        runtime errors when the CSV file is incomplete. The QA-check
173
        in the Notus Generator should already catch something like this
174
        before it happens, but this is another check just to be sure.
175
176
        Arguments:
177
            advisory_dict: Metadata for one vendor advisory
178
                           in the form of a dict.
179
180
        Returns:
181
            Whether this advisory_dict is as expected or not.
182
        """
183
        # Check if there are any empty fields that shouldn't be empty.
184
        # Skip those that are incorrect.
185
        for (key, value) in advisory_dict.items():
186
            # The value is missing entirely
187
            if not value:
188
                return False
189
            # A list is empty when it shouldn't be
190
            try:
191
                if key == "SOURCE_PKGS" and len(ast.literal_eval(value)) == 0:
192
                    return False
193
            except (ValueError, TypeError):
194
                # Expected a list, but this was not a list
195
                return False
196
        return True
197
198
    def _format_xrefs(self, advisory_xref_string: str, xrefs_list: list) -> str:
199
        """Create a string that contains all links for this advisory, to be
200
        inserted into the Redis KB.
201
202
        Arguments:
203
            advisory_xref_string: A link to the official advisory page.
204
            xrefs_list: A list of URLs that were mentioned
205
                        in the advisory itself.
206
207
        Returns:
208
            All URLs separated by ", ".
209
            Example: URL:www.example.com, URL:www.example2.com
210
        """
211
        formatted_list = list()
212
        advisory_xref_string = f'URL:{advisory_xref_string}'
213
        formatted_list.append(advisory_xref_string)
214
        for url_string in xrefs_list:
215
            url_string = f'URL:{url_string}'
216
            formatted_list.append(url_string)
217
        return ", ".join(formatted_list)
218
219
    def is_checksum_correct(self, file_abs_path: Path) -> bool:
220
        """Perform a checksum check on a specific file, if
221
        signature checks have been enabled in OpenVAS.
222
223
        Arguments:
224
            file_abs_path: A Path object that points to the
225
                           absolute path of a file.
226
227
        Returns:
228
            Whether the checksum check was successful or not.
229
            Also returns true if the checksum check is disabled.
230
        """
231
232
        no_signature_check = self.openvas_setting.get("nasl_no_signature_check")
233
        if not no_signature_check:
234
            with file_abs_path.open("rb") as file_file_bytes:
235
                sha256_object = sha256()
236
                # Read chunks of 4096 bytes sequentially to avoid
237
                # filling up the RAM if the file is extremely large
238
                for byte_block in iter(lambda: file_file_bytes.read(4096), b""):
0 ignored issues
show
introduced by
The variable file_file_bytes does not seem to be defined in case BooleanNotNode on line 233 is False. Are you sure this can never be the case?
Loading history...
239
                    sha256_object.update(byte_block)
240
241
                # Calculate the checksum for this file
242
                file_calculated_checksum_string = sha256_object.hexdigest()
243
                # Extract the downloaded checksum for this file
244
                # from the Redis KB
245
                file_downloaded_checksum_string = self.nvti.get_file_checksum(
246
                    file_abs_path
247
                )
248
249
                # Checksum check
250
                if (
251
                    not file_calculated_checksum_string
252
                    == file_downloaded_checksum_string
253
                ):
254
                    return False
255
        # Checksum check was either successful or it was skipped
256
        return True
257
258
    def upload_lsc_from_csv_reader(
259
        self,
260
        file_name: str,
261
        general_metadata_dict: Dict,
262
        csv_reader: DictReader,
263
    ) -> bool:
264
        """For each advisory_dict, write its contents to the
265
        Redis KB as metadata.
266
267
        Arguments:
268
            file_name: CSV file name with metadata to be uploaded
269
            general_metadata_dict: General metadata common for all advisories
270
                                   in the CSV file.
271
            csv_reader: DictReader iterator to access the advisories
272
273
        Return True if success, False otherwise
274
        """
275
276
        loaded = 0
277
        total = 0
278
        for advisory_dict in csv_reader:
279
            # Make sure that no element is missing in the advisory_dict,
280
            # else skip that advisory
281
            total += 1
282
            is_correct = self._check_advisory_dict(advisory_dict)
283
            if not is_correct:
284
                continue
285
            # For each advisory_dict,
286
            # write its contents to the Redis KB as metadata.
287
            # Create a list with all the metadata. Refer to:
288
            # https://github.com/greenbone/ospd-openvas/blob/232d04e72d2af0199d60324e8820d9e73498a831/ospd_openvas/db.py#L39 # pylint: disable=C0321
289
            advisory_metadata_list = list()
290
            # File name
291
            advisory_metadata_list.append(
292
                f'{METADATA_DIRECTORY_NAME}/{file_name}'
293
            )
294
            # Required keys
295
            advisory_metadata_list.append(REQUIRED_KEYS)
296
            # Mandatory keys
297
            advisory_metadata_list.append(MANDATORY_KEYS)
298
            # Excluded keys
299
            advisory_metadata_list.append(EXCLUDED_KEYS)
300
            # Required UDP ports
301
            advisory_metadata_list.append(REQUIRED_UDP_PORTS)
302
            # Required ports
303
            advisory_metadata_list.append(REQUIRED_PORTS)
304
            # Dependencies
305
            advisory_metadata_list.append(DEPENDENCIES)
306
            # Tags
307
            tags_string = (
308
                "severity_origin={}|severity_date={}|"
309
                "severity_vector={}|last_modification={}|"
310
                "creation_date={}|summary={}|vuldetect={}|"
311
                "insight={}|affected={}|solution={}|"
312
                "solution_type={}|qod_type={}"
313
            )
314
            tags_string = tags_string.format(
315
                advisory_dict["SEVERITY_ORIGIN"],
316
                advisory_dict["SEVERITY_DATE"],
317
                advisory_dict["SEVERITY_VECTOR"],
318
                advisory_dict["LAST_MODIFICATION"],
319
                advisory_dict["CREATION_DATE"],
320
                advisory_dict["DESCRIPTION"],
321
                general_metadata_dict["VULDETECT"],
322
                advisory_dict["INSIGHT"],
323
                advisory_dict["AFFECTED"],
324
                general_metadata_dict["SOLUTION"],
325
                general_metadata_dict["SOLUTION_TYPE"],
326
                general_metadata_dict["QOD_TYPE"],
327
            )
328
            advisory_metadata_list.append(tags_string)
329
            # CVEs
330
            advisory_metadata_list.append(
331
                ", ".join(ast.literal_eval(advisory_dict["CVE_LIST"]))
332
            )
333
334
            advisory_metadata_list.append(BIDS)
335
            # XREFS
336
            advisory_metadata_list.append(
337
                self._format_xrefs(
338
                    advisory_dict["ADVISORY_XREF"],
339
                    ast.literal_eval(advisory_dict["XREFS"]),
340
                )
341
            )
342
343
            # Script category
344
            advisory_metadata_list.append(SCRIPT_CATEGORY)
345
            # Script timeout
346
            advisory_metadata_list.append(SCRIPT_TIMEOUT)
347
            # Script family
348
            advisory_metadata_list.append(SCRIPT_FAMILY)
349
            # Script Name / Title
350
            advisory_metadata_list.append(advisory_dict["TITLE"])
351
352
            # Write the metadata list to the respective Redis KB key,
353
            # overwriting any existing values
354
            oid = advisory_dict["OID"]
355
            kb_key_string = f'nvt:{oid}'
356
            try:
357
                self.nvti.add_vt_to_cache(
358
                    vt_id=kb_key_string, vt=advisory_metadata_list
359
                )
360
            except OspdOpenvasError:
361
                logger.warning(
362
                    "LSC will not be loaded. The advisory_metadata_"
363
                    "list was either not a list or does not include "
364
                    "15 entries"
365
                )
366
                continue
367
            loaded += 1
368
369
        logger.debug(
370
            "Loaded %d/%d advisories from %s", loaded, total, file_name
371
        )
372
        return loaded == total
373
374
    def update_metadata(self) -> None:
375
        """Parse all CSV files that are present in the
376
        Notus metadata directory, perform a checksum check,
377
        read their metadata, format some fields
378
        and write this information to the Redis KB.
379
        """
380
381
        # Check if Notus is enabled
382
        if not self.openvas_setting.get("table_driven_lsc"):
383
            return
384
385
        logger.debug("Starting the Notus metadata load up")
386
        # Get a list of all CSV files in that directory with their absolute path
387
        csv_abs_filepaths_list = self._get_csv_filepaths()
388
389
        # Read each CSV file
390
        for csv_abs_path in csv_abs_filepaths_list:
391
            # Check the checksums, unless they have been disabled
392
            if not self.is_checksum_correct(csv_abs_path):
393
                # Skip this file if the checksum does not match
394
                logger.warning('Checksum for %s failed', csv_abs_path)
395
                continue
396
            logger.debug("Checksum check for %s successful", csv_abs_path)
397
            with csv_abs_path.open("r") as csv_file:
398
                # Skip the license header, so the actual content
399
                # can be parsed by the DictReader
400
                general_metadata_dict = dict()
401
                for line_string in csv_file:
402
                    if line_string.startswith("{"):
403
                        general_metadata_dict = ast.literal_eval(line_string)
404
                        break
405
                # Check if the file can be parsed by the CSV module
406
                reader = DictReader(csv_file)
407
                # Check if the CSV file has the expected field names,
408
                # else skip the file
409
                is_correct = self._check_field_names_lsc(reader.fieldnames)
410
                if not is_correct:
411
                    logger.warning(
412
                        'Field names check for %s failed', csv_abs_path
413
                    )
414
                    continue
415
416
                file_name = csv_abs_path.name
417
                if not self.upload_lsc_from_csv_reader(
418
                    file_name, general_metadata_dict, reader
419
                ):
420
                    logger.warning(
421
                        "Some advaisory was not loaded from %s", file_name
422
                    )
423
424
        logger.debug("Notus metadata load up finished.")
425