Passed
Pull Request — master (#351)
by
unknown
01:50 queued 17s
created

metadata.NotusMetadataHandler.update_metadata()   C

Complexity

Conditions 10

Size

Total Lines 117
Code Lines 63

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
eloc 63
nop 1
dl 0
loc 117
rs 5.4109
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like metadata.NotusMetadataHandler.update_metadata() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
""" Provide functions to upload Notus Metadata in the Redis Cache. """
21
22
import logging
23
24
import csv
25
import ast
26
import os
27
28
from glob import glob
29
from hashlib import sha256
30
from pathlib import Path
31
32
from ospd_openvas import db, nvticache
33
from ospd_openvas.errors import OspdOpenvasError
34
from ospd_openvas.openvas import Openvas
35
36
logger = logging.getLogger(__name__)
37
38
39
# The expected field names in CSV files
40
EXPECTED_FIELD_NAMES_LIST = [
41
    "OID",
42
    "TITLE",
43
    "CREATION_DATE",
44
    "LAST_MODIFICATION",
45
    "SOURCE_PKGS",
46
    "ADVISORY_ID",
47
    "CVSS_BASE_VECTOR",
48
    "CVSS_BASE",
49
    "ADVISORY_XREF",
50
    "DESCRIPTION",
51
    "INSIGHT",
52
    "AFFECTED",
53
    "CVE_LIST",
54
    "BINARY_PACKAGES_FOR_RELEASES",
55
    "XREFS",
56
]
57
58
METADATA_DIRECTORY_NAME = "notus_metadata"
59
60
61
class NotusMetadataHandler:
62
    """Class to perform checksum checks and upload metadata for
63
    CSV files that were created by the Notus Generator."""
64
65
    def __init__(self, metadata_path: str = None):
66
67
        openvas_object = Openvas()
68
        self.openvas_settings_dict = openvas_object.get_settings()
69
        self.__no_signature_check = self.openvas_settings_dict[
70
            "nasl_no_signature_check"
71
        ]
72
73
        # Figure out the path to the metadata
74
        if not metadata_path:
75
            self.__metadata_path = self._get_metadata_path()
76
        else:
77
            self.__metadata_path = metadata_path
78
79
        self.__metadata_relative_path_string = "{}/".format(
80
            METADATA_DIRECTORY_NAME
81
        )
82
83
        # Get a list of all CSV files in that directory with their absolute path
84
        self.__csv_abs_filepaths_list = self._get_csv_filepaths()
85
86
        # Connect to the Redis KB
87
        try:
88
            self.__db_ctx = db.OpenvasDB.create_context(1)
89
            main_db = db.MainDB()
90
            self.__nvti_cache = nvticache.NVTICache(main_db)
91
        except SystemExit:
92
            # Maybe replace this with just a log message
93
            raise Exception("Could not connect to the Redis KB") from None
94
95
    def _get_metadata_path(self) -> str:
96
        """Find out where the CSV files containing the metadata
97
        are on the file system, depending on whether this machine
98
        is a GSM or GVM in a development environment.
99
100
        Returns:
101
            A full path to the directory that contains all Notus
102
            metadata.
103
        """
104
        # Openvas is installed and the plugins folder configured.
105
        plugins_folder = self.openvas_settings_dict.get("plugins_folder")
106
        if plugins_folder:
107
            metadata_path = "{}/{}/".format(
108
                plugins_folder, METADATA_DIRECTORY_NAME
109
            )
110
            return metadata_path
111
112
        try:
113
            # From the development environment - Not used in production
114
            install_prefix = os.environ["INSTALL_PREFIX"]
115
        except KeyError:
116
            install_prefix = None
117
118
        if not install_prefix:
119
            # Fall back to the path used in production
120
            metadata_path = "/opt/greenbone/feed/plugins/{}/".format(
121
                METADATA_DIRECTORY_NAME
122
            )
123
        else:
124
            metadata_path = "{}/var/lib/openvas/plugins/{}/".format(
125
                install_prefix, METADATA_DIRECTORY_NAME
126
            )
127
128
        return metadata_path
129
130
    def _get_csv_filepaths(self) -> Path:
131
        """Get the absolute file path to all detected CSV files
132
        in the relevant directory.
133
134
        Returns:
135
            A Path object that contains the absolute file path.
136
        """
137
        return [
138
            Path(csv_file).resolve()
139
            for csv_file in glob("{}*.csv".format(self.__metadata_path))
140
        ]
141
142
    def _check_field_names_lsc(self, field_names_list: list) -> bool:
143
        """Check if the field names of the parsed CSV file are exactly
144
        as expected to confirm that this version of the CSV format for
145
        Notus is supported by this module.
146
147
        Arguments:
148
            field_names_list: A list of field names such as ["OID", "TITLE",...]
149
150
        Returns:
151
            Whether the parsed CSV file conforms to the expected format.
152
        """
153
        if not EXPECTED_FIELD_NAMES_LIST == field_names_list:
154
            return False
155
        return True
156
157
    def _check_advisory_dict(self, advisory_dict: dict) -> bool:
158
        """Check a row of the parsed CSV file to confirm that
159
        no field is missing. Also check if any lists are empty
160
        that should never be empty. This should avoid unexpected
161
        runtime errors when the CSV file is incomplete. The QA-check
162
        in the Notus Generator should already catch something like this
163
        before it happens, but this is another check just to be sure.
164
165
        Arguments:
166
            advisory_dict: Metadata for one vendor advisory
167
                           in the form of a dict.
168
169
        Returns:
170
            Whether this advisory_dict is as expected or not.
171
        """
172
        # Check if there are any empty fields that shouldn't be empty.
173
        # Skip those that are incorrect.
174
        for (key, value) in advisory_dict.items():
175
            # The value is missing entirely
176
            if not value:
177
                return False
178
            # A list is empty when it shouldn't be
179
            try:
180
                if key == "SOURCE_PKGS" and len(ast.literal_eval(value)) == 0:
181
                    return False
182
            except (ValueError, TypeError):
183
                # Expected a list, but this was not a list
184
                return False
185
        return True
186
187
    def _format_xrefs(self, advisory_xref_string: str, xrefs_list: list) -> str:
188
        """Create a string that contains all links for this advisory, to be
189
        inserted into the Redis KB.
190
191
        Arguments:
192
            advisory_xref_string: A link to the official advisory page.
193
            xrefs_list: A list of URLs that were mentioned
194
                        in the advisory itself.
195
196
        Returns:
197
            All URLs separated by ", ".
198
            Example: URL:www.example.com, URL:www.example2.com
199
        """
200
        formatted_list = list()
201
        advisory_xref_string = "URL:{}".format(advisory_xref_string)
202
        formatted_list.append(advisory_xref_string)
203
        for url_string in xrefs_list:
204
            url_string = "URL:{}".format(url_string)
205
            formatted_list.append(url_string)
206
        return ", ".join(formatted_list)
207
208
    def is_checksum_correct(self, file_abs_path: Path) -> bool:
209
        """Perform a checksum check on a specific file, if
210
        signature checks have been enabled in OpenVAS.
211
212
        Arguments:
213
            file_abs_path: A Path object that points to the
214
                           absolute path of a file.
215
216
        Returns:
217
            Whether the checksum check was successful or not.
218
            Also returns true if the checksum check is disabled.
219
        """
220
        if not self.__no_signature_check:
221
            with file_abs_path.open("rb") as file_file_bytes:
222
                sha256_object = sha256()
223
                # Read chunks of 4096 bytes sequentially to avoid
224
                # filling up the RAM if the file is extremely large
225
                for byte_block in iter(lambda: file_file_bytes.read(4096), b""):
0 ignored issues
show
introduced by
The variable file_file_bytes does not seem to be defined in case BooleanNotNode on line 220 is False. Are you sure this can never be the case?
Loading history...
226
                    sha256_object.update(byte_block)
227
228
                # Calculate the checksum for this file
229
                file_calculated_checksum_string = sha256_object.hexdigest()
230
                # Extract the downloaded checksum for this file
231
                # from the Redis KB
232
                file_downloaded_checksum_string = db.OpenvasDB.get_single_item(
233
                    self.__db_ctx, "sha256sums:{}".format(str(file_abs_path))
234
                )
235
236
                # Checksum check
237
                if (
238
                    not file_calculated_checksum_string
239
                    == file_downloaded_checksum_string
240
                ):
241
                    return False
242
        # Checksum check was either successful or it was skipped
243
        return True
244
245
    def update_metadata(self) -> None:
246
        """Parse all CSV files that are present in the
247
        Notus metadata directory, perform a checksum check,
248
        read their metadata, format some fields
249
        and write this information to the Redis KB.
250
        """
251
252
        logger.debug("Starting the Notus metadata load up")
253
        # 1. Read each CSV file
254
        for csv_abs_path in self.__csv_abs_filepaths_list:
255
            # 2. Check the checksums, unless they have been disabled
256
            if not self.is_checksum_correct(csv_abs_path):
257
                # Skip this file if the checksum does not match
258
                logger.debug("Checksum failed")
259
                continue
260
261
            logger.debug("Checksum check for %s successful", csv_abs_path)
262
            with csv_abs_path.open("r") as csv_file:
263
                # Skip the license header, so the actual content
264
                # can be parsed by the DictReader
265
                general_metadata_dict = dict()
266
                for line_string in csv_file:
267
                    if line_string.startswith("{"):
268
                        general_metadata_dict = ast.literal_eval(line_string)
269
                        break
270
                # Check if the file can be parsed by the CSV module
271
                reader = csv.DictReader(csv_file)
272
                # Check if the CSV file has the expected field names,
273
                # else skip the file
274
                is_correct = self._check_field_names_lsc(reader.fieldnames)
275
                if not is_correct:
276
                    continue
277
278
                for advisory_dict in reader:
279
                    # Make sure that no element is missing in the advisory_dict,
280
                    # else skip that advisory
281
                    is_correct = self._check_advisory_dict(advisory_dict)
282
                    if not is_correct:
283
                        continue
284
285
                    # 3. For each advisory_dict,
286
                    # write its contents to the Redis KB as metadata.
287
                    # Create a list with all the metadata. Refer to:
288
                    # https://github.com/greenbone/ospd-openvas/blob/232d04e72d2af0199d60324e8820d9e73498a831/ospd_openvas/db.py#L39
289
                    advisory_metadata_list = list()
290
                    # File name
291
                    advisory_metadata_list.append(
292
                        "{0}{1}".format(
293
                            self.__metadata_relative_path_string,
294
                            os.path.basename(csv_file.name),
295
                        )
296
                    )
297
                    # Required keys
298
                    advisory_metadata_list.append("")
299
                    # Mandatory keys
300
                    advisory_metadata_list.append("")
301
                    # Excluded keys
302
                    advisory_metadata_list.append("")
303
                    # Required UDP ports
304
                    advisory_metadata_list.append("")
305
                    # Required ports
306
                    advisory_metadata_list.append("")
307
                    # Dependencies
308
                    advisory_metadata_list.append("")
309
                    # Tags
310
                    tags_string = (
311
                        "cvss_base_vector={}|last_modification={}|"
312
                        "creation_date={}|summary={}|vuldetect={}|"
313
                        "insight={}|affected={}|solution={}|"
314
                        "solution_type={}|qod_type={}"
315
                    )
316
                    tags_string = tags_string.format(
317
                        advisory_dict["CVSS_BASE_VECTOR"],
318
                        advisory_dict["LAST_MODIFICATION"],
319
                        advisory_dict["CREATION_DATE"],
320
                        advisory_dict["DESCRIPTION"],
321
                        general_metadata_dict["VULDETECT"],
322
                        advisory_dict["INSIGHT"],
323
                        advisory_dict["AFFECTED"],
324
                        general_metadata_dict["SOLUTION"],
325
                        general_metadata_dict["SOLUTION_TYPE"],
326
                        general_metadata_dict["QOD_TYPE"],
327
                    )
328
                    advisory_metadata_list.append(tags_string)
329
                    # CVEs
330
                    advisory_metadata_list.append(
331
                        ", ".join(ast.literal_eval(advisory_dict["CVE_LIST"]))
332
                    )
333
                    # BIDs
334
                    advisory_metadata_list.append("")
335
                    # XREFS
336
                    advisory_metadata_list.append(
337
                        self._format_xrefs(
338
                            advisory_dict["ADVISORY_XREF"],
339
                            ast.literal_eval(advisory_dict["XREFS"]),
340
                        )
341
                    )
342
                    # Script Category
343
                    advisory_metadata_list.append("3")
344
                    # Timeout
345
                    advisory_metadata_list.append("0")
346
                    # Script Family
347
                    advisory_metadata_list.append("Notus_LSC_Metadata")
348
                    # Script Name / Title
349
                    advisory_metadata_list.append(advisory_dict["TITLE"])
350
351
                    # Write the metadata list to the respective Redis KB key,
352
                    # overwriting any existing values
353
                    kb_key_string = "nvt:{}".format(advisory_dict["OID"])
354
                    try:
355
                        self.__nvti_cache.add_vt_to_cache(
356
                            vt_id=kb_key_string, vt=advisory_metadata_list
357
                        )
358
                    except OspdOpenvasError:
359
                        # The advisory_metadata_list was either not
360
                        # a list or does not include 15 entries
361
                        break
362