Passed
Pull Request — master (#374)
by
unknown
01:23
created

ospd_openvas.nvticache.NVTICache.get_nvt_timeout()   A

Complexity

Conditions 1

Size

Total Lines 14
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 5
nop 2
dl 0
loc 14
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
""" Provide functions to handle NVT Info Cache. """
21
22
import logging
23
24
from typing import List, Dict, Optional, Iterator, Tuple
25
26
from packaging.specifiers import SpecifierSet
27
from packaging.version import parse as parse_version
28
29
from ospd_openvas.db import NVT_META_FIELDS, OpenvasDB, MainDB, BaseDB, RedisCtx
30
from ospd_openvas.errors import OspdOpenvasError
31
from ospd_openvas.openvas import Openvas
32
33
34
logger = logging.getLogger(__name__)
35
36
LIST_FIRST_POS = 0
37
LIST_LAST_POS = -1
38
39
# actually the nvti cache with gvm-libs 10 should fit too but openvas was only
40
# introduced with GVM 11 and gvm-libs 11
41
SUPPORTED_NVTICACHE_VERSIONS_SPECIFIER = SpecifierSet('>=11.0')
42
43
44
class NVTICache(BaseDB):
45
46
    QOD_TYPES = {
47
        'exploit': '100',
48
        'remote_vul': '99',
49
        'remote_app': '98',
50
        'package': '97',
51
        'registry': '97',
52
        'remote_active': '95',
53
        'remote_banner': '80',
54
        'executable_version': '80',
55
        'remote_analysis': '70',
56
        'remote_probe': '50',
57
        'remote_banner_unreliable': '30',
58
        'executable_version_unreliable': '30',
59
        'general_note': '1',
60
        'default': '70',
61
    }
62
63
    def __init__(  # pylint: disable=super-init-not-called
64
        self, main_db: MainDB
65
    ):
66
        self._ctx = None
67
        self.index = None
68
        self._main_db = main_db
69
        self._nvti_cache_name = None
70
71
    def _get_nvti_cache_name(self) -> str:
72
        if not self._nvti_cache_name:
73
            self._set_nvti_cache_name()
74
75
        return self._nvti_cache_name
76
77
    def _is_compatible_version(self, version: str) -> bool:
78
        installed_version = parse_version(version)
79
        return installed_version in SUPPORTED_NVTICACHE_VERSIONS_SPECIFIER
80
81
    def _set_nvti_cache_name(self):
82
        """Set nvticache name"""
83
        version_string = Openvas.get_gvm_libs_version()
84
        if not version_string:
85
            raise OspdOpenvasError(
86
                "Not possible to get the installed gvm-libs version. "
87
                "Outdated openvas version. openvas version needs to be at "
88
                "least 7.0.1."
89
            )
90
        # Remove pre-release sufix and git revision if exists
91
        # as the gvm-libs version has the format
92
        # e.g. "20.8+beta1~git-123-somefix" for beta version from sources
93
        # or "20.8.0~git-123-hotfix" for a stable version from sources (no beta)
94
        version_string = version_string.split("+")[0]
95
        version_string = version_string.split("~")[0]
96
97
        if self._is_compatible_version(version_string):
98
            self._nvti_cache_name = "nvticache{}".format(version_string)
99
        else:
100
            raise OspdOpenvasError(
101
                "Error setting nvticache. Incompatible nvticache "
102
                "version {}. Supported versions are {}.".format(
103
                    version_string,
104
                    ", ".join(
105
                        [
106
                            str(spec)
107
                            for spec in SUPPORTED_NVTICACHE_VERSIONS_SPECIFIER
108
                        ]
109
                    ),
110
                )
111
            )
112
113
    @property
114
    def ctx(self) -> Optional[RedisCtx]:
115
        if self._ctx is None:
116
            self._ctx, self.index = OpenvasDB.find_database_by_pattern(
117
                self._get_nvti_cache_name(), self._main_db.max_database_index
118
            )
119
        return self._ctx
120
121
    def get_feed_version(self) -> Optional[str]:
122
        """Get feed version of the nvti cache db.
123
124
        Returns the feed version or None if the nvt feed isn't available.
125
        """
126
        if not self.ctx:
127
            # no nvti cache db available yet
128
            return None
129
130
        return OpenvasDB.get_single_item(self.ctx, self._get_nvti_cache_name())
131
132
    def get_oids(self) -> Iterator[Tuple[str, str]]:
133
        """Get the list of NVT file names and OIDs.
134
135
        Returns:
136
            A i. Each single list contains the filename
137
            as first element and the oid as second one.
138
        """
139
        return OpenvasDB.get_filenames_and_oids(self.ctx)
140
141
    def get_nvt_params(self, oid: str) -> Optional[Dict[str, str]]:
142
        """Get NVT's preferences.
143
144
        Arguments:
145
            oid: OID of VT from which to get the parameters.
146
147
        Returns:
148
            A dictionary with preferences and timeout.
149
        """
150
        prefs = self.get_nvt_prefs(oid)
151
152
        vt_params = {}
153
154
        if prefs:
155
            for nvt_pref in prefs:
156
                elem = nvt_pref.split('|||')
157
158
                param_id = elem[0]
159
                param_name = elem[1]
160
                param_type = elem[2]
161
162
                vt_params[param_id] = dict()
163
                vt_params[param_id]['id'] = param_id
164
                vt_params[param_id]['type'] = param_type
165
                vt_params[param_id]['name'] = param_name.strip()
166
                vt_params[param_id]['description'] = 'Description'
167
168
                if len(elem) > 3:
169
                    param_default = elem[3]
170
                    vt_params[param_id]['default'] = param_default
171
                else:
172
                    vt_params[param_id]['default'] = ''
173
174
        return vt_params
175
176
    @staticmethod
177
    def _parse_metadata_tags(tags_str: str, oid: str) -> Dict[str, str]:
178
        """Parse a string with multiple tags.
179
180
        Arguments:
181
            tags_str: String with tags separated by `|`.
182
            oid: VT OID. Only used for logging in error case.
183
184
        Returns:
185
            A dictionary with the tags.
186
        """
187
        tags_dict = dict()
188
        tags = tags_str.split('|')
189
        for tag in tags:
190
            try:
191
                _tag, _value = tag.split('=', 1)
192
            except ValueError:
193
                logger.error('Tag %s in %s has no value.', tag, oid)
194
                continue
195
            tags_dict[_tag] = _value
196
197
        return tags_dict
198
199
    def get_nvt_metadata(self, oid: str) -> Optional[Dict[str, str]]:
200
        """Get a full NVT. Returns an XML tree with the NVT metadata.
201
202
        Arguments:
203
            oid: OID of VT from which to get the metadata.
204
205
        Returns:
206
            A dictionary with the VT metadata.
207
        """
208
        resp = OpenvasDB.get_list_item(
209
            self.ctx,
210
            "nvt:%s" % oid,
211
            start=NVT_META_FIELDS.index("NVT_FILENAME_POS"),
212
            end=NVT_META_FIELDS.index("NVT_NAME_POS"),
213
        )
214
215
        if not isinstance(resp, list) or len(resp) == 0:
216
            return None
217
218
        subelem = [
219
            'filename',
220
            'required_keys',
221
            'mandatory_keys',
222
            'excluded_keys',
223
            'required_udp_ports',
224
            'required_ports',
225
            'dependencies',
226
            'tag',
227
            'cve',
228
            'bid',
229
            'xref',
230
            'category',
231
            'timeout',
232
            'family',
233
            'name',
234
        ]
235
236
        custom = dict()
237
        custom['refs'] = dict()
238
        custom['vt_params'] = dict()
239
        for child, res in zip(subelem, resp):
240
            if child not in ['cve', 'bid', 'xref', 'tag', 'timeout'] and res:
241
                custom[child] = res
242
            elif child == 'tag':
243
                custom.update(self._parse_metadata_tags(res, oid))
244
            elif child in ['cve', 'bid', 'xref'] and res:
245
                custom['refs'][child] = res.split(", ")
246
            elif child == 'timeout':
247
                if res is None:
248
                    continue
249
                vt_params = {}
250
                if int(res) > 0:
251
                    _param_id = '0'
252
                    vt_params[_param_id] = dict()
253
                    vt_params[_param_id]['id'] = _param_id
254
                    vt_params[_param_id]['type'] = 'entry'
255
                    vt_params[_param_id]['name'] = 'timeout'
256
                    vt_params[_param_id]['description'] = 'Script Timeout'
257
                    vt_params[_param_id]['default'] = res
258
                custom['vt_params'] = vt_params
259
                custom['vt_params'].update(self.get_nvt_params(oid))
260
261
        return custom
262
263
    def get_nvt_refs(self, oid: str) -> Optional[Dict[str, str]]:
264
        """Get a full NVT.
265
266
        Arguments:
267
            oid: OID of VT from which to get the VT references.
268
269
        Returns:
270
            A dictionary with the VT references.
271
        """
272
        resp = OpenvasDB.get_list_item(
273
            self.ctx,
274
            "nvt:%s" % oid,
275
            start=NVT_META_FIELDS.index("NVT_CVES_POS"),
276
            end=NVT_META_FIELDS.index("NVT_XREFS_POS"),
277
        )
278
279
        if not isinstance(resp, list) or len(resp) == 0:
280
            return None
281
282
        subelem = ['cve', 'bid', 'xref']
283
284
        refs = dict()
285
        for child, res in zip(subelem, resp):
286
            refs[child] = res.split(", ")
287
288
        return refs
289
290
    def get_nvt_family(self, oid: str) -> str:
291
        """Get NVT family
292
        Arguments:
293
            oid: OID of VT from which to get the VT family.
294
295
        Returns:
296
            A str with the VT family.
297
        """
298
        return OpenvasDB.get_single_item(
299
            self.ctx,
300
            'nvt:%s' % oid,
301
            index=NVT_META_FIELDS.index("NVT_FAMILY_POS"),
302
        )
303
304
    def get_nvt_prefs(self, oid: str) -> Optional[List[str]]:
305
        """Get NVT preferences.
306
307
        Arguments:
308
            ctx: Redis context to be used.
309
            oid: OID of VT from which to get the VT preferences.
310
311
        Returns:
312
            A list with the VT preferences.
313
        """
314
        key = 'oid:%s:prefs' % oid
315
        return OpenvasDB.get_list_item(self.ctx, key)
316
317
    def get_nvt_timeout(self, oid: str) -> Optional[str]:
318
        """Get NVT timeout
319
320
        Arguments:
321
            ctx: Redis context to be used.
322
            oid: OID of VT from which to get the script timeout.
323
324
        Returns:
325
            The timeout.
326
        """
327
        return OpenvasDB.get_single_item(
328
            self.ctx,
329
            'nvt:%s' % oid,
330
            index=NVT_META_FIELDS.index("NVT_TIMEOUT_POS"),
331
        )
332
333
    def get_nvt_tags(self, oid: str) -> Optional[Dict[str, str]]:
334
        """Get Tags of the given OID.
335
336
        Arguments:
337
            ctx: Redis context to be used.
338
            oid: OID of VT from which to get the VT tags.
339
340
        Returns:
341
            A dictionary with the VT tags.
342
        """
343
        tag = OpenvasDB.get_single_item(
344
            self.ctx,
345
            'nvt:%s' % oid,
346
            index=NVT_META_FIELDS.index('NVT_TAGS_POS'),
347
        )
348
        tags = tag.split('|')
349
350
        return dict([item.split('=', 1) for item in tags])
351
352
    def get_nvt_files_count(self) -> int:
353
        return OpenvasDB.get_key_count(self.ctx, "filename:*")
354
355
    def get_nvt_count(self) -> int:
356
        return OpenvasDB.get_key_count(self.ctx, "nvt:*")
357
358
    def force_reload(self):
359
        self._main_db.release_database(self)
360