Passed
Pull Request — master (#281)
by Juan José
01:13
created

ospd_openvas.nvticache.NVTICache.get_nvt_refs()   A

Complexity

Conditions 4

Size

Total Lines 26
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 13
nop 2
dl 0
loc 26
rs 9.75
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
""" Provide functions to handle NVT Info Cache. """
21
22
import logging
23
24
from typing import List, Dict, Optional, Iterator, Tuple
25
26
from packaging.specifiers import SpecifierSet
27
from packaging.version import parse as parse_version
28
from packaging.version import Version
29
30
from ospd_openvas.db import NVT_META_FIELDS, OpenvasDB, MainDB, BaseDB, RedisCtx
31
from ospd_openvas.errors import OspdOpenvasError
32
from ospd_openvas.openvas import Openvas
33
34
35
logger = logging.getLogger(__name__)
36
37
LIST_FIRST_POS = 0
38
LIST_LAST_POS = -1
39
40
# actually the nvti cache with gvm-libs 10 should fit too but openvas was only
41
# introduced with GVM 11 and gvm-libs 11
42
SUPPORTED_NVTICACHE_VERSIONS_SPECIFIER = SpecifierSet('>=11.0')
43
44
45
class NVTICache(BaseDB):
46
47
    QOD_TYPES = {
48
        'exploit': '100',
49
        'remote_vul': '99',
50
        'remote_app': '98',
51
        'package': '97',
52
        'registry': '97',
53
        'remote_active': '95',
54
        'remote_banner': '80',
55
        'executable_version': '80',
56
        'remote_analysis': '70',
57
        'remote_probe': '50',
58
        'remote_banner_unreliable': '30',
59
        'executable_version_unreliable': '30',
60
        'general_note': '1',
61
        'default': '70',
62
    }
63
64
    def __init__(  # pylint: disable=super-init-not-called
65
        self, main_db: MainDB
66
    ):
67
        self._ctx = None
68
        self.index = None
69
        self._main_db = main_db
70
        self._nvti_cache_name = None
71
72
    def _get_nvti_cache_name(self) -> str:
73
        if not self._nvti_cache_name:
74
            self._set_nvti_cache_name()
75
76
        return self._nvti_cache_name
77
78
    def _is_compatible_version(self, version: str) -> bool:
79
        installed_version = parse_version(version)
80
        return installed_version in SUPPORTED_NVTICACHE_VERSIONS_SPECIFIER
81
82
    def _set_nvti_cache_name(self):
83
        """Set nvticache name"""
84
        _version_string = Openvas.get_gvm_libs_version()
85
        if not _version_string:
86
            raise OspdOpenvasError(
87
                "Not possible to get the installed gvm-libs version. "
88
                "Outdated openvas version. openvas version needs to be at "
89
                "least 7.0.1."
90
            )
91
92
        # Get the Major.Minor.Patch value
93
        unsupported_symbols = [
94
            '~',
95
        ]
96
        for symb in unsupported_symbols:
97
            _version_string = _version_string.replace(symb, "")
98
99
        if self._is_compatible_version(_version_string):
100
            version = Version(_version_string)
101
            self._nvti_cache_name = "nvticache{}.{}".format(
102
                version.major, version.minor
103
            )
104
        else:
105
            raise OspdOpenvasError(
106
                "Error setting nvticache. Incompatible nvticache "
107
                "version {}. Supported versions are {}.".format(
108
                    _version_string,
109
                    ", ".join(
110
                        [
111
                            str(spec)
112
                            for spec in SUPPORTED_NVTICACHE_VERSIONS_SPECIFIER
113
                        ]
114
                    ),
115
                )
116
            )
117
118
    @property
119
    def ctx(self) -> Optional[RedisCtx]:
120
        if self._ctx is None:
121
            self._ctx, self.index = OpenvasDB.find_database_by_pattern(
122
                self._get_nvti_cache_name(), self._main_db.max_database_index
123
            )
124
        return self._ctx
125
126
    def get_feed_version(self) -> Optional[str]:
127
        """ Get feed version of the nvti cache db.
128
129
        Returns the feed version or None if the nvt feed isn't available.
130
        """
131
        if not self.ctx:
132
            # no nvti cache db available yet
133
            return None
134
135
        return OpenvasDB.get_single_item(self.ctx, self._get_nvti_cache_name())
136
137
    def get_oids(self) -> Iterator[Tuple[str, str]]:
138
        """ Get the list of NVT file names and OIDs.
139
140
        Returns:
141
            A i. Each single list contains the filename
142
            as first element and the oid as second one.
143
        """
144
        return OpenvasDB.get_filenames_and_oids(self.ctx)
145
146
    def get_nvt_params(self, oid: str) -> Optional[Dict[str, str]]:
147
        """ Get NVT's preferences.
148
149
        Arguments:
150
            oid: OID of VT from which to get the parameters.
151
152
        Returns:
153
            A dictionary with preferences and timeout.
154
        """
155
        prefs = self.get_nvt_prefs(oid)
156
157
        vt_params = {}
158
159
        if prefs:
160
            for nvt_pref in prefs:
161
                elem = nvt_pref.split('|||')
162
163
                param_id = elem[0]
164
                param_name = elem[1]
165
                param_type = elem[2]
166
167
                vt_params[param_id] = dict()
168
                vt_params[param_id]['id'] = param_id
169
                vt_params[param_id]['type'] = param_type
170
                vt_params[param_id]['name'] = param_name.strip()
171
                vt_params[param_id]['description'] = 'Description'
172
173
                if len(elem) > 3:
174
                    param_default = elem[3]
175
                    vt_params[param_id]['default'] = param_default
176
                else:
177
                    vt_params[param_id]['default'] = ''
178
179
        return vt_params
180
181
    @staticmethod
182
    def _parse_metadata_tags(tags_str: str, oid: str) -> Dict[str, str]:
183
        """ Parse a string with multiple tags.
184
185
        Arguments:
186
            tags_str: String with tags separated by `|`.
187
            oid: VT OID. Only used for logging in error case.
188
189
        Returns:
190
            A dictionary with the tags.
191
        """
192
        tags_dict = dict()
193
        tags = tags_str.split('|')
194
        for tag in tags:
195
            try:
196
                _tag, _value = tag.split('=', 1)
197
            except ValueError:
198
                logger.error('Tag %s in %s has no value.', tag, oid)
199
                continue
200
            tags_dict[_tag] = _value
201
202
        return tags_dict
203
204
    def get_nvt_metadata(self, oid: str) -> Optional[Dict[str, str]]:
205
        """ Get a full NVT. Returns an XML tree with the NVT metadata.
206
207
        Arguments:
208
            oid: OID of VT from which to get the metadata.
209
210
        Returns:
211
            A dictionary with the VT metadata.
212
        """
213
        resp = OpenvasDB.get_list_item(
214
            self.ctx,
215
            "nvt:%s" % oid,
216
            start=NVT_META_FIELDS.index("NVT_FILENAME_POS"),
217
            end=NVT_META_FIELDS.index("NVT_NAME_POS"),
218
        )
219
220
        if not isinstance(resp, list) or len(resp) == 0:
221
            return None
222
223
        subelem = [
224
            'filename',
225
            'required_keys',
226
            'mandatory_keys',
227
            'excluded_keys',
228
            'required_udp_ports',
229
            'required_ports',
230
            'dependencies',
231
            'tag',
232
            'cve',
233
            'bid',
234
            'xref',
235
            'category',
236
            'timeout',
237
            'family',
238
            'name',
239
        ]
240
241
        custom = dict()
242
        custom['refs'] = dict()
243
        custom['vt_params'] = dict()
244
        for child, res in zip(subelem, resp):
245
            if child not in ['cve', 'bid', 'xref', 'tag', 'timeout'] and res:
246
                custom[child] = res
247
            elif child == 'tag':
248
                custom.update(self._parse_metadata_tags(res, oid))
249
            elif child in ['cve', 'bid', 'xref'] and res:
250
                custom['refs'][child] = res.split(", ")
251
            elif child == 'timeout':
252
                if res is None:
253
                    continue
254
                vt_params = {}
255
                if int(res) > 0:
256
                    _param_id = '0'
257
                    vt_params[_param_id] = dict()
258
                    vt_params[_param_id]['id'] = _param_id
259
                    vt_params[_param_id]['type'] = 'entry'
260
                    vt_params[_param_id]['name'] = 'timeout'
261
                    vt_params[_param_id]['description'] = 'Script Timeout'
262
                    vt_params[_param_id]['default'] = res
263
                custom['vt_params'] = vt_params
264
                custom['vt_params'].update(self.get_nvt_params(oid))
265
266
        return custom
267
268
    def get_nvt_refs(self, oid: str) -> Optional[Dict[str, str]]:
269
        """ Get a full NVT.
270
271
        Arguments:
272
            oid: OID of VT from which to get the VT references.
273
274
        Returns:
275
            A dictionary with the VT references.
276
        """
277
        resp = OpenvasDB.get_list_item(
278
            self.ctx,
279
            "nvt:%s" % oid,
280
            start=NVT_META_FIELDS.index("NVT_CVES_POS"),
281
            end=NVT_META_FIELDS.index("NVT_XREFS_POS"),
282
        )
283
284
        if not isinstance(resp, list) or len(resp) == 0:
285
            return None
286
287
        subelem = ['cve', 'bid', 'xref']
288
289
        refs = dict()
290
        for child, res in zip(subelem, resp):
291
            refs[child] = res.split(", ")
292
293
        return refs
294
295
    def get_nvt_family(self, oid: str) -> str:
296
        """ Get NVT family
297
        Arguments:
298
            oid: OID of VT from which to get the VT family.
299
300
        Returns:
301
            A str with the VT family.
302
        """
303
        return OpenvasDB.get_single_item(
304
            self.ctx,
305
            'nvt:%s' % oid,
306
            index=NVT_META_FIELDS.index("NVT_FAMILY_POS"),
307
        )
308
309
    def get_nvt_prefs(self, oid: str) -> Optional[List[str]]:
310
        """ Get NVT preferences.
311
312
        Arguments:
313
            ctx: Redis context to be used.
314
            oid: OID of VT from which to get the VT preferences.
315
316
        Returns:
317
            A list with the VT preferences.
318
        """
319
        key = 'oid:%s:prefs' % oid
320
        return OpenvasDB.get_list_item(self.ctx, key)
321
322
    def get_nvt_timeout(self, oid: str) -> Optional[str]:
323
        """ Get NVT timeout
324
325
        Arguments:
326
            ctx: Redis context to be used.
327
            oid: OID of VT from which to get the script timeout.
328
329
        Returns:
330
            The timeout.
331
        """
332
        return OpenvasDB.get_single_item(
333
            self.ctx,
334
            'nvt:%s' % oid,
335
            index=NVT_META_FIELDS.index("NVT_TIMEOUT_POS"),
336
        )
337
338
    def get_nvt_tags(self, oid: str) -> Optional[Dict[str, str]]:
339
        """ Get Tags of the given OID.
340
341
        Arguments:
342
            ctx: Redis context to be used.
343
            oid: OID of VT from which to get the VT tags.
344
345
        Returns:
346
            A dictionary with the VT tags.
347
        """
348
        tag = OpenvasDB.get_single_item(
349
            self.ctx,
350
            'nvt:%s' % oid,
351
            index=NVT_META_FIELDS.index('NVT_TAGS_POS'),
352
        )
353
        tags = tag.split('|')
354
355
        return dict([item.split('=', 1) for item in tags])
356
357
    def get_nvt_files_count(self) -> int:
358
        return OpenvasDB.get_key_count(self.ctx, "filename:*")
359
360
    def get_nvt_count(self) -> int:
361
        return OpenvasDB.get_key_count(self.ctx, "nvt:*")
362
363
    def force_reload(self):
364
        self._main_db.release_database(self)
365