Completed
Push — master ( ee6aac...b90bd1 )
by Björn
28s queued 12s
created

NotusMetadataHandler.upload_lsc_from_csv_reader()   B

Complexity

Conditions 4

Size

Total Lines 112
Code Lines 59

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 59
nop 4
dl 0
loc 112
rs 8.3417
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
""" Provide functions to upload Notus Metadata in the Redis Cache. """
21
22
import logging
23
24
import ast
25
import os
26
27
from glob import glob
28
from hashlib import sha256
29
from pathlib import Path
30
from csv import DictReader
31
from typing import List, Dict
32
33
from ospd_openvas.db import MainDB
34
from ospd_openvas.nvticache import NVTICache
35
from ospd_openvas.errors import OspdOpenvasError
36
from ospd_openvas.openvas import Openvas
37
38
logger = logging.getLogger(__name__)
39
40
41
# The expected field names in CSV files
42
EXPECTED_FIELD_NAMES_LIST = [
43
    "OID",
44
    "TITLE",
45
    "CREATION_DATE",
46
    "LAST_MODIFICATION",
47
    "SOURCE_PKGS",
48
    "ADVISORY_ID",
49
    "CVSS_BASE_VECTOR",
50
    "CVSS_BASE",
51
    "ADVISORY_XREF",
52
    "DESCRIPTION",
53
    "INSIGHT",
54
    "AFFECTED",
55
    "CVE_LIST",
56
    "BINARY_PACKAGES_FOR_RELEASES",
57
    "XREFS",
58
]
59
60
METADATA_DIRECTORY_NAME = "notus_metadata"
61
62
# Metadata constant field definitions
63
SCRIPT_CATEGORY = "3"  # ACT_GATHER_INFO
64
SCRIPT_TIMEOUT = "0"
65
SCRIPT_FAMILY = "Notus_LSC_Metadata"
66
BIDS = ""
67
REQUIRED_KEYS = ""
68
MANDATORY_KEYS = ""
69
EXCLUDED_KEYS = ""
70
REQUIRED_UDP_PORTS = ""
71
REQUIRED_PORTS = ""
72
DEPENDENCIES = ""
73
74
75
class NotusMetadataHandler:
76
    """Class to perform checksum checks and upload metadata for
77
    CSV files that were created by the Notus Generator."""
78
79
    def __init__(self, nvti: NVTICache = None, metadata_path: str = None):
80
        self._nvti = nvti
81
        self._metadata_path = metadata_path
82
        self._openvas_settings_dict = None
83
84
    @property
85
    def nvti(self) -> NVTICache:
86
        if self._nvti is None:
87
            try:
88
                maindb = MainDB()
89
                self._nvti = NVTICache(maindb)
90
            except SystemExit:
91
                raise OspdOpenvasError(
92
                    "Could not connect to the Redis KB"
93
                ) from None
94
95
        return self._nvti
96
97
    @property
98
    def metadata_path(self) -> str:
99
        """Find out where the CSV files containing the metadata
100
        are on the file system, depending on whether this machine
101
        is a GSM or GVM in a development environment.
102
103
        Returns:
104
            A full path to the directory that contains all Notus
105
            metadata.
106
        """
107
        if self._metadata_path is None:
108
            # Openvas is installed and the plugins folder configured.
109
            plugins_folder = self.openvas_setting.get("plugins_folder")
110
            if plugins_folder:
111
                self._metadata_path = (
112
                    f'{plugins_folder}/{METADATA_DIRECTORY_NAME}/'
113
                )
114
                return self._metadata_path
115
116
            try:
117
                # From the development environment - Not used in production
118
                install_prefix = os.environ["INSTALL_PREFIX"]
119
            except KeyError:
120
                install_prefix = None
121
122
            if not install_prefix:
123
                # Fall back to the path used in production
124
                self._metadata_path = (
125
                    f'/opt/greenbone/feed/plugins/{METADATA_DIRECTORY_NAME}/'
126
                )
127
            else:
128
                self._metadata_path = f'{install_prefix}/var/lib/openvas/plugins/{METADATA_DIRECTORY_NAME}/'  # pylint: disable=C0301
129
130
        return self._metadata_path
131
132
    @property
133
    def openvas_setting(self):
134
        """Set OpenVAS option."""
135
        if self._openvas_settings_dict is None:
136
            openvas_object = Openvas()
137
            self._openvas_settings_dict = openvas_object.get_settings()
138
        return self._openvas_settings_dict
139
140
    def _get_csv_filepaths(self) -> List[Path]:
141
        """Get a list of absolute file paths to all detected CSV files
142
        in the relevant directory.
143
144
        Returns:
145
            A Path object that contains the absolute file path.
146
        """
147
        return [
148
            Path(csv_file).resolve()
149
            for csv_file in glob(f'{self.metadata_path}*.csv')
150
        ]
151
152
    def _check_field_names_lsc(self, field_names_list: list) -> bool:
153
        """Check if the field names of the parsed CSV file are exactly
154
        as expected to confirm that this version of the CSV format for
155
        Notus is supported by this module.
156
157
        Arguments:
158
            field_names_list: A list of field names such as ["OID", "TITLE",...]
159
160
        Returns:
161
            Whether the parsed CSV file conforms to the expected format.
162
        """
163
        if not EXPECTED_FIELD_NAMES_LIST == field_names_list:
164
            return False
165
        return True
166
167
    def _check_advisory_dict(self, advisory_dict: dict) -> bool:
168
        """Check a row of the parsed CSV file to confirm that
169
        no field is missing. Also check if any lists are empty
170
        that should never be empty. This should avoid unexpected
171
        runtime errors when the CSV file is incomplete. The QA-check
172
        in the Notus Generator should already catch something like this
173
        before it happens, but this is another check just to be sure.
174
175
        Arguments:
176
            advisory_dict: Metadata for one vendor advisory
177
                           in the form of a dict.
178
179
        Returns:
180
            Whether this advisory_dict is as expected or not.
181
        """
182
        # Check if there are any empty fields that shouldn't be empty.
183
        # Skip those that are incorrect.
184
        for (key, value) in advisory_dict.items():
185
            # The value is missing entirely
186
            if not value:
187
                return False
188
            # A list is empty when it shouldn't be
189
            try:
190
                if key == "SOURCE_PKGS" and len(ast.literal_eval(value)) == 0:
191
                    return False
192
            except (ValueError, TypeError):
193
                # Expected a list, but this was not a list
194
                return False
195
        return True
196
197
    def _format_xrefs(self, advisory_xref_string: str, xrefs_list: list) -> str:
198
        """Create a string that contains all links for this advisory, to be
199
        inserted into the Redis KB.
200
201
        Arguments:
202
            advisory_xref_string: A link to the official advisory page.
203
            xrefs_list: A list of URLs that were mentioned
204
                        in the advisory itself.
205
206
        Returns:
207
            All URLs separated by ", ".
208
            Example: URL:www.example.com, URL:www.example2.com
209
        """
210
        formatted_list = list()
211
        advisory_xref_string = f'URL:{advisory_xref_string}'
212
        formatted_list.append(advisory_xref_string)
213
        for url_string in xrefs_list:
214
            url_string = f'URL:{url_string}'
215
            formatted_list.append(url_string)
216
        return ", ".join(formatted_list)
217
218
    def is_checksum_correct(self, file_abs_path: Path) -> bool:
219
        """Perform a checksum check on a specific file, if
220
        signature checks have been enabled in OpenVAS.
221
222
        Arguments:
223
            file_abs_path: A Path object that points to the
224
                           absolute path of a file.
225
226
        Returns:
227
            Whether the checksum check was successful or not.
228
            Also returns true if the checksum check is disabled.
229
        """
230
231
        no_signature_check = self.openvas_setting.get("nasl_no_signature_check")
232
        if not no_signature_check:
233
            with file_abs_path.open("rb") as file_file_bytes:
234
                sha256_object = sha256()
235
                # Read chunks of 4096 bytes sequentially to avoid
236
                # filling up the RAM if the file is extremely large
237
                for byte_block in iter(lambda: file_file_bytes.read(4096), b""):
0 ignored issues
show
introduced by
The variable file_file_bytes does not seem to be defined in case BooleanNotNode on line 232 is False. Are you sure this can never be the case?
Loading history...
238
                    sha256_object.update(byte_block)
239
240
                # Calculate the checksum for this file
241
                file_calculated_checksum_string = sha256_object.hexdigest()
242
                # Extract the downloaded checksum for this file
243
                # from the Redis KB
244
                file_downloaded_checksum_string = self.nvti.get_file_checksum(
245
                    file_abs_path
246
                )
247
248
                # Checksum check
249
                if (
250
                    not file_calculated_checksum_string
251
                    == file_downloaded_checksum_string
252
                ):
253
                    return False
254
        # Checksum check was either successful or it was skipped
255
        return True
256
257
    def upload_lsc_from_csv_reader(
258
        self,
259
        file_name: str,
260
        general_metadata_dict: Dict,
261
        csv_reader: DictReader,
262
    ) -> bool:
263
        """For each advisory_dict, write its contents to the
264
        Redis KB as metadata.
265
266
        Arguments:
267
            file_name: CSV file name with metadata to be uploaded
268
            general_metadata_dict: General metadata common for all advisories
269
                                   in the CSV file.
270
            csv_reader: DictReader iterator to access the advisories
271
272
        Return True if success, False otherwise
273
        """
274
275
        loaded = 0
276
        total = 0
277
        for advisory_dict in csv_reader:
278
            # Make sure that no element is missing in the advisory_dict,
279
            # else skip that advisory
280
            total += 1
281
            is_correct = self._check_advisory_dict(advisory_dict)
282
            if not is_correct:
283
                continue
284
            # For each advisory_dict,
285
            # write its contents to the Redis KB as metadata.
286
            # Create a list with all the metadata. Refer to:
287
            # https://github.com/greenbone/ospd-openvas/blob/232d04e72d2af0199d60324e8820d9e73498a831/ospd_openvas/db.py#L39 # pylint: disable=C0321
288
            advisory_metadata_list = list()
289
            # File name
290
            advisory_metadata_list.append(
291
                f'{METADATA_DIRECTORY_NAME}/{file_name}'
292
            )
293
            # Required keys
294
            advisory_metadata_list.append(REQUIRED_KEYS)
295
            # Mandatory keys
296
            advisory_metadata_list.append(MANDATORY_KEYS)
297
            # Excluded keys
298
            advisory_metadata_list.append(EXCLUDED_KEYS)
299
            # Required UDP ports
300
            advisory_metadata_list.append(REQUIRED_UDP_PORTS)
301
            # Required ports
302
            advisory_metadata_list.append(REQUIRED_PORTS)
303
            # Dependencies
304
            advisory_metadata_list.append(DEPENDENCIES)
305
            # Tags
306
            tags_string = (
307
                "cvss_base_vector={}|last_modification={}|"
308
                "creation_date={}|summary={}|vuldetect={}|"
309
                "insight={}|affected={}|solution={}|"
310
                "solution_type={}|qod_type={}"
311
            )
312
            tags_string = tags_string.format(
313
                advisory_dict["CVSS_BASE_VECTOR"],
314
                advisory_dict["LAST_MODIFICATION"],
315
                advisory_dict["CREATION_DATE"],
316
                advisory_dict["DESCRIPTION"],
317
                general_metadata_dict["VULDETECT"],
318
                advisory_dict["INSIGHT"],
319
                advisory_dict["AFFECTED"],
320
                general_metadata_dict["SOLUTION"],
321
                general_metadata_dict["SOLUTION_TYPE"],
322
                general_metadata_dict["QOD_TYPE"],
323
            )
324
            advisory_metadata_list.append(tags_string)
325
            # CVEs
326
            advisory_metadata_list.append(
327
                ", ".join(ast.literal_eval(advisory_dict["CVE_LIST"]))
328
            )
329
330
            advisory_metadata_list.append(BIDS)
331
            # XREFS
332
            advisory_metadata_list.append(
333
                self._format_xrefs(
334
                    advisory_dict["ADVISORY_XREF"],
335
                    ast.literal_eval(advisory_dict["XREFS"]),
336
                )
337
            )
338
339
            # Script category
340
            advisory_metadata_list.append(SCRIPT_CATEGORY)
341
            # Script timeout
342
            advisory_metadata_list.append(SCRIPT_TIMEOUT)
343
            # Script family
344
            advisory_metadata_list.append(SCRIPT_FAMILY)
345
            # Script Name / Title
346
            advisory_metadata_list.append(advisory_dict["TITLE"])
347
348
            # Write the metadata list to the respective Redis KB key,
349
            # overwriting any existing values
350
            oid = advisory_dict["OID"]
351
            kb_key_string = f'nvt:{oid}'
352
            try:
353
                self.nvti.add_vt_to_cache(
354
                    vt_id=kb_key_string, vt=advisory_metadata_list
355
                )
356
            except OspdOpenvasError:
357
                logger.warning(
358
                    "LSC will not be loaded. The advisory_metadata_"
359
                    "list was either not a list or does not include "
360
                    "15 entries"
361
                )
362
                continue
363
            loaded += 1
364
365
        logger.debug(
366
            "Loaded %d/%d advisories from %s", loaded, total, file_name
367
        )
368
        return loaded == total
369
370
    def update_metadata(self) -> None:
371
        """Parse all CSV files that are present in the
372
        Notus metadata directory, perform a checksum check,
373
        read their metadata, format some fields
374
        and write this information to the Redis KB.
375
        """
376
377
        logger.debug("Starting the Notus metadata load up")
378
        # Get a list of all CSV files in that directory with their absolute path
379
        csv_abs_filepaths_list = self._get_csv_filepaths()
380
381
        # Read each CSV file
382
        for csv_abs_path in csv_abs_filepaths_list:
383
            # Check the checksums, unless they have been disabled
384
            if not self.is_checksum_correct(csv_abs_path):
385
                # Skip this file if the checksum does not match
386
                logger.warning('Checksum for %s failed', csv_abs_path)
387
                continue
388
            logger.debug("Checksum check for %s successful", csv_abs_path)
389
            with csv_abs_path.open("r") as csv_file:
390
                # Skip the license header, so the actual content
391
                # can be parsed by the DictReader
392
                general_metadata_dict = dict()
393
                for line_string in csv_file:
394
                    if line_string.startswith("{"):
395
                        general_metadata_dict = ast.literal_eval(line_string)
396
                        break
397
                # Check if the file can be parsed by the CSV module
398
                reader = DictReader(csv_file)
399
                # Check if the CSV file has the expected field names,
400
                # else skip the file
401
                is_correct = self._check_field_names_lsc(reader.fieldnames)
402
                if not is_correct:
403
                    logger.warning(
404
                        'Field names check for %s failed', csv_abs_path
405
                    )
406
                    continue
407
408
                file_name = csv_abs_path.name
409
                if not self.upload_lsc_from_csv_reader(
410
                    file_name, general_metadata_dict, reader
411
                ):
412
                    logger.warning(
413
                        "Some advaisory was not loaded from %s", file_name
414
                    )
415
416
        logger.debug("Notus metadata load up finished.")
417