Passed
Pull Request — master (#289)
by Juan José
01:26
created

ospd_openvas.nvticache.NVTICache.force_reload()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
""" Provide functions to handle NVT Info Cache. """
21
22
import logging
23
24
from typing import List, Dict, Optional, Iterator, Tuple
25
26
from packaging.specifiers import SpecifierSet
27
from packaging.version import parse as parse_version
28
29
from ospd_openvas.db import NVT_META_FIELDS, OpenvasDB, MainDB, BaseDB, RedisCtx
30
from ospd_openvas.errors import OspdOpenvasError
31
from ospd_openvas.openvas import Openvas
32
33
34
logger = logging.getLogger(__name__)
35
36
LIST_FIRST_POS = 0
37
LIST_LAST_POS = -1
38
39
# actually the nvti cache with gvm-libs 10 should fit too but openvas was only
40
# introduced with GVM 11 and gvm-libs 11
41
SUPPORTED_NVTICACHE_VERSIONS_SPECIFIER = SpecifierSet('>=11.0')
42
43
44
class NVTICache(BaseDB):
45
46
    QOD_TYPES = {
47
        'exploit': '100',
48
        'remote_vul': '99',
49
        'remote_app': '98',
50
        'package': '97',
51
        'registry': '97',
52
        'remote_active': '95',
53
        'remote_banner': '80',
54
        'executable_version': '80',
55
        'remote_analysis': '70',
56
        'remote_probe': '50',
57
        'remote_banner_unreliable': '30',
58
        'executable_version_unreliable': '30',
59
        'general_note': '1',
60
        'default': '70',
61
    }
62
63
    def __init__(  # pylint: disable=super-init-not-called
64
        self, main_db: MainDB
65
    ):
66
        self._ctx = None
67
        self.index = None
68
        self._main_db = main_db
69
        self._nvti_cache_name = None
70
71
    def _get_nvti_cache_name(self) -> str:
72
        if not self._nvti_cache_name:
73
            self._set_nvti_cache_name()
74
75
        return self._nvti_cache_name
76
77
    def _is_compatible_version(self, version: str) -> bool:
78
        installed_version = parse_version(version)
79
        return installed_version in SUPPORTED_NVTICACHE_VERSIONS_SPECIFIER
80
81
    def _set_nvti_cache_name(self):
82
        """Set nvticache name"""
83
        version_string = Openvas.get_gvm_libs_version()
84
        if not version_string:
85
            raise OspdOpenvasError(
86
                "Not possible to get the installed gvm-libs version. "
87
                "Outdated openvas version. openvas version needs to be at "
88
                "least 7.0.1."
89
            )
90
        # Remove pre-release sufix and git revision if exists
91
        # as the gvm-libs version has the  format
92
        # e.g "20.8+beta1-git-a41b140d-zero-padding"
93
        version_string = version_string.split("+")[0]
94
95
        if self._is_compatible_version(version_string):
96
            self._nvti_cache_name = "nvticache{}".format(version_string)
97
        else:
98
            raise OspdOpenvasError(
99
                "Error setting nvticache. Incompatible nvticache "
100
                "version {}. Supported versions are {}.".format(
101
                    version_string,
102
                    ", ".join(
103
                        [
104
                            str(spec)
105
                            for spec in SUPPORTED_NVTICACHE_VERSIONS_SPECIFIER
106
                        ]
107
                    ),
108
                )
109
            )
110
111
    @property
112
    def ctx(self) -> Optional[RedisCtx]:
113
        if self._ctx is None:
114
            self._ctx, self.index = OpenvasDB.find_database_by_pattern(
115
                self._get_nvti_cache_name(), self._main_db.max_database_index
116
            )
117
        return self._ctx
118
119
    def get_feed_version(self) -> Optional[str]:
120
        """ Get feed version of the nvti cache db.
121
122
        Returns the feed version or None if the nvt feed isn't available.
123
        """
124
        if not self.ctx:
125
            # no nvti cache db available yet
126
            return None
127
128
        return OpenvasDB.get_single_item(self.ctx, self._get_nvti_cache_name())
129
130
    def get_oids(self) -> Iterator[Tuple[str, str]]:
131
        """ Get the list of NVT file names and OIDs.
132
133
        Returns:
134
            A i. Each single list contains the filename
135
            as first element and the oid as second one.
136
        """
137
        return OpenvasDB.get_filenames_and_oids(self.ctx)
138
139
    def get_nvt_params(self, oid: str) -> Optional[Dict[str, str]]:
140
        """ Get NVT's preferences.
141
142
        Arguments:
143
            oid: OID of VT from which to get the parameters.
144
145
        Returns:
146
            A dictionary with preferences and timeout.
147
        """
148
        prefs = self.get_nvt_prefs(oid)
149
150
        vt_params = {}
151
152
        if prefs:
153
            for nvt_pref in prefs:
154
                elem = nvt_pref.split('|||')
155
156
                param_id = elem[0]
157
                param_name = elem[1]
158
                param_type = elem[2]
159
160
                vt_params[param_id] = dict()
161
                vt_params[param_id]['id'] = param_id
162
                vt_params[param_id]['type'] = param_type
163
                vt_params[param_id]['name'] = param_name.strip()
164
                vt_params[param_id]['description'] = 'Description'
165
166
                if len(elem) > 3:
167
                    param_default = elem[3]
168
                    vt_params[param_id]['default'] = param_default
169
                else:
170
                    vt_params[param_id]['default'] = ''
171
172
        return vt_params
173
174
    @staticmethod
175
    def _parse_metadata_tags(tags_str: str, oid: str) -> Dict[str, str]:
176
        """ Parse a string with multiple tags.
177
178
        Arguments:
179
            tags_str: String with tags separated by `|`.
180
            oid: VT OID. Only used for logging in error case.
181
182
        Returns:
183
            A dictionary with the tags.
184
        """
185
        tags_dict = dict()
186
        tags = tags_str.split('|')
187
        for tag in tags:
188
            try:
189
                _tag, _value = tag.split('=', 1)
190
            except ValueError:
191
                logger.error('Tag %s in %s has no value.', tag, oid)
192
                continue
193
            tags_dict[_tag] = _value
194
195
        return tags_dict
196
197
    def get_nvt_metadata(self, oid: str) -> Optional[Dict[str, str]]:
198
        """ Get a full NVT. Returns an XML tree with the NVT metadata.
199
200
        Arguments:
201
            oid: OID of VT from which to get the metadata.
202
203
        Returns:
204
            A dictionary with the VT metadata.
205
        """
206
        resp = OpenvasDB.get_list_item(
207
            self.ctx,
208
            "nvt:%s" % oid,
209
            start=NVT_META_FIELDS.index("NVT_FILENAME_POS"),
210
            end=NVT_META_FIELDS.index("NVT_NAME_POS"),
211
        )
212
213
        if not isinstance(resp, list) or len(resp) == 0:
214
            return None
215
216
        subelem = [
217
            'filename',
218
            'required_keys',
219
            'mandatory_keys',
220
            'excluded_keys',
221
            'required_udp_ports',
222
            'required_ports',
223
            'dependencies',
224
            'tag',
225
            'cve',
226
            'bid',
227
            'xref',
228
            'category',
229
            'timeout',
230
            'family',
231
            'name',
232
        ]
233
234
        custom = dict()
235
        custom['refs'] = dict()
236
        custom['vt_params'] = dict()
237
        for child, res in zip(subelem, resp):
238
            if child not in ['cve', 'bid', 'xref', 'tag', 'timeout'] and res:
239
                custom[child] = res
240
            elif child == 'tag':
241
                custom.update(self._parse_metadata_tags(res, oid))
242
            elif child in ['cve', 'bid', 'xref'] and res:
243
                custom['refs'][child] = res.split(", ")
244
            elif child == 'timeout':
245
                if res is None:
246
                    continue
247
                vt_params = {}
248
                if int(res) > 0:
249
                    _param_id = '0'
250
                    vt_params[_param_id] = dict()
251
                    vt_params[_param_id]['id'] = _param_id
252
                    vt_params[_param_id]['type'] = 'entry'
253
                    vt_params[_param_id]['name'] = 'timeout'
254
                    vt_params[_param_id]['description'] = 'Script Timeout'
255
                    vt_params[_param_id]['default'] = res
256
                custom['vt_params'] = vt_params
257
                custom['vt_params'].update(self.get_nvt_params(oid))
258
259
        return custom
260
261
    def get_nvt_refs(self, oid: str) -> Optional[Dict[str, str]]:
262
        """ Get a full NVT.
263
264
        Arguments:
265
            oid: OID of VT from which to get the VT references.
266
267
        Returns:
268
            A dictionary with the VT references.
269
        """
270
        resp = OpenvasDB.get_list_item(
271
            self.ctx,
272
            "nvt:%s" % oid,
273
            start=NVT_META_FIELDS.index("NVT_CVES_POS"),
274
            end=NVT_META_FIELDS.index("NVT_XREFS_POS"),
275
        )
276
277
        if not isinstance(resp, list) or len(resp) == 0:
278
            return None
279
280
        subelem = ['cve', 'bid', 'xref']
281
282
        refs = dict()
283
        for child, res in zip(subelem, resp):
284
            refs[child] = res.split(", ")
285
286
        return refs
287
288
    def get_nvt_family(self, oid: str) -> str:
289
        """ Get NVT family
290
        Arguments:
291
            oid: OID of VT from which to get the VT family.
292
293
        Returns:
294
            A str with the VT family.
295
        """
296
        return OpenvasDB.get_single_item(
297
            self.ctx,
298
            'nvt:%s' % oid,
299
            index=NVT_META_FIELDS.index("NVT_FAMILY_POS"),
300
        )
301
302
    def get_nvt_prefs(self, oid: str) -> Optional[List[str]]:
303
        """ Get NVT preferences.
304
305
        Arguments:
306
            ctx: Redis context to be used.
307
            oid: OID of VT from which to get the VT preferences.
308
309
        Returns:
310
            A list with the VT preferences.
311
        """
312
        key = 'oid:%s:prefs' % oid
313
        return OpenvasDB.get_list_item(self.ctx, key)
314
315
    def get_nvt_timeout(self, oid: str) -> Optional[str]:
316
        """ Get NVT timeout
317
318
        Arguments:
319
            ctx: Redis context to be used.
320
            oid: OID of VT from which to get the script timeout.
321
322
        Returns:
323
            The timeout.
324
        """
325
        return OpenvasDB.get_single_item(
326
            self.ctx,
327
            'nvt:%s' % oid,
328
            index=NVT_META_FIELDS.index("NVT_TIMEOUT_POS"),
329
        )
330
331
    def get_nvt_tags(self, oid: str) -> Optional[Dict[str, str]]:
332
        """ Get Tags of the given OID.
333
334
        Arguments:
335
            ctx: Redis context to be used.
336
            oid: OID of VT from which to get the VT tags.
337
338
        Returns:
339
            A dictionary with the VT tags.
340
        """
341
        tag = OpenvasDB.get_single_item(
342
            self.ctx,
343
            'nvt:%s' % oid,
344
            index=NVT_META_FIELDS.index('NVT_TAGS_POS'),
345
        )
346
        tags = tag.split('|')
347
348
        return dict([item.split('=', 1) for item in tags])
349
350
    def get_nvt_files_count(self) -> int:
351
        return OpenvasDB.get_key_count(self.ctx, "filename:*")
352
353
    def get_nvt_count(self) -> int:
354
        return OpenvasDB.get_key_count(self.ctx, "nvt:*")
355
356
    def force_reload(self):
357
        self._main_db.release_database(self)
358