Passed
Pull Request — master (#236)
by Juan José
01:49
created

ospd_openvas.nvticache.NVTICache.get_nvt_family()   A

Complexity

Conditions 1

Size

Total Lines 12
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 5
nop 2
dl 0
loc 12
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
""" Provide functions to handle NVT Info Cache. """
21
22
import logging
23
24
from typing import List, Dict, Optional, Iterator, Tuple
25
26
from packaging.specifiers import SpecifierSet
27
from packaging.version import parse as parse_version
28
29
from ospd_openvas.db import NVT_META_FIELDS, OpenvasDB, MainDB, BaseDB, RedisCtx
30
from ospd_openvas.errors import OspdOpenvasError
31
from ospd_openvas.openvas import Openvas
32
33
34
logger = logging.getLogger(__name__)
35
36
LIST_FIRST_POS = 0
37
LIST_LAST_POS = -1
38
39
# actually the nvti cache with gvm-libs 10 should fit too but openvas was only
40
# introduced with GVM 11 and gvm-libs 11
41
SUPPORTED_NVTICACHE_VERSIONS_SPECIFIER = SpecifierSet('>=11.0')
42
43
44
class NVTICache(BaseDB):
45
46
    QOD_TYPES = {
47
        'exploit': '100',
48
        'remote_vul': '99',
49
        'remote_app': '98',
50
        'package': '97',
51
        'registry': '97',
52
        'remote_active': '95',
53
        'remote_banner': '80',
54
        'executable_version': '80',
55
        'remote_analysis': '70',
56
        'remote_probe': '50',
57
        'remote_banner_unreliable': '30',
58
        'executable_version_unreliable': '30',
59
        'general_note': '1',
60
        'default': '70',
61
    }
62
63
    def __init__(  # pylint: disable=super-init-not-called
64
        self, main_db: MainDB
65
    ):
66
        self._ctx = None
67
        self.index = None
68
        self._main_db = main_db
69
        self._nvti_cache_name = None
70
71
    def _get_nvti_cache_name(self) -> str:
72
        if not self._nvti_cache_name:
73
            self._set_nvti_cache_name()
74
75
        return self._nvti_cache_name
76
77
    def _is_compatible_version(self, version: str) -> bool:
78
        installed_version = parse_version(version)
79
        return installed_version in SUPPORTED_NVTICACHE_VERSIONS_SPECIFIER
80
81
    def _set_nvti_cache_name(self):
82
        """Set nvticache name"""
83
        version_string = Openvas.get_gvm_libs_version()
84
        if not version_string:
85
            raise OspdOpenvasError(
86
                "Not possible to get the installed gvm-libs version. "
87
                "Outdated openvas version. openvas version needs to be at "
88
                "least 7.0.1."
89
            )
90
91
        if self._is_compatible_version(version_string):
92
            self._nvti_cache_name = "nvticache{}".format(version_string)
93
        else:
94
            raise OspdOpenvasError(
95
                "Error setting nvticache. Incompatible nvticache "
96
                "version {}. Supported versions are {}.".format(
97
                    version_string,
98
                    ", ".join(
99
                        [
100
                            str(spec)
101
                            for spec in SUPPORTED_NVTICACHE_VERSIONS_SPECIFIER
102
                        ]
103
                    ),
104
                )
105
            )
106
107
    @property
108
    def ctx(self) -> Optional[RedisCtx]:
109
        if self._ctx is None:
110
            self._ctx, self.index = OpenvasDB.find_database_by_pattern(
111
                self._get_nvti_cache_name(), self._main_db.max_database_index
112
            )
113
        return self._ctx
114
115
    def get_feed_version(self) -> Optional[str]:
116
        """ Get feed version of the nvti cache db.
117
118
        Returns the feed version or None if the nvt feed isn't available.
119
        """
120
        if not self.ctx:
121
            # no nvti cache db available yet
122
            return None
123
124
        return OpenvasDB.get_single_item(self.ctx, self._get_nvti_cache_name())
125
126
    def get_oids(self) -> Iterator[Tuple[str, str]]:
127
        """ Get the list of NVT file names and OIDs.
128
129
        Returns:
130
            A i. Each single list contains the filename
131
            as first element and the oid as second one.
132
        """
133
        return OpenvasDB.get_filenames_and_oids(self.ctx)
134
135
    def get_nvt_params(self, oid: str) -> Optional[Dict[str, str]]:
136
        """ Get NVT's preferences.
137
138
        Arguments:
139
            oid: OID of VT from which to get the parameters.
140
141
        Returns:
142
            A dictionary with preferences and timeout.
143
        """
144
        prefs = self.get_nvt_prefs(oid)
145
146
        vt_params = {}
147
148
        if prefs:
149
            for nvt_pref in prefs:
150
                elem = nvt_pref.split('|||')
151
152
                param_id = elem[0]
153
                param_name = elem[1]
154
                param_type = elem[2]
155
156
                vt_params[param_id] = dict()
157
                vt_params[param_id]['id'] = param_id
158
                vt_params[param_id]['type'] = param_type
159
                vt_params[param_id]['name'] = param_name.strip()
160
                vt_params[param_id]['description'] = 'Description'
161
162
                if len(elem) > 3:
163
                    param_default = elem[3]
164
                    vt_params[param_id]['default'] = param_default
165
                else:
166
                    vt_params[param_id]['default'] = ''
167
168
        return vt_params
169
170
    @staticmethod
171
    def _parse_metadata_tags(tags_str: str, oid: str) -> Dict[str, str]:
172
        """ Parse a string with multiple tags.
173
174
        Arguments:
175
            tags_str: String with tags separated by `|`.
176
            oid: VT OID. Only used for logging in error case.
177
178
        Returns:
179
            A dictionary with the tags.
180
        """
181
        tags_dict = dict()
182
        tags = tags_str.split('|')
183
        for tag in tags:
184
            try:
185
                _tag, _value = tag.split('=', 1)
186
            except ValueError:
187
                logger.error('Tag %s in %s has no value.', tag, oid)
188
                continue
189
            tags_dict[_tag] = _value
190
191
        return tags_dict
192
193
    def get_nvt_metadata(self, oid: str) -> Optional[Dict[str, str]]:
194
        """ Get a full NVT. Returns an XML tree with the NVT metadata.
195
196
        Arguments:
197
            oid: OID of VT from which to get the metadata.
198
199
        Returns:
200
            A dictonary with the VT metadata.
201
        """
202
        resp = OpenvasDB.get_list_item(
203
            self.ctx,
204
            "nvt:%s" % oid,
205
            start=NVT_META_FIELDS.index("NVT_FILENAME_POS"),
206
            end=NVT_META_FIELDS.index("NVT_NAME_POS"),
207
        )
208
209
        if not isinstance(resp, list) or len(resp) == 0:
210
            return None
211
212
        subelem = [
213
            'filename',
214
            'required_keys',
215
            'mandatory_keys',
216
            'excluded_keys',
217
            'required_udp_ports',
218
            'required_ports',
219
            'dependencies',
220
            'tag',
221
            'cve',
222
            'bid',
223
            'xref',
224
            'category',
225
            'timeout',
226
            'family',
227
            'name',
228
        ]
229
230
        custom = dict()
231
        custom['refs'] = dict()
232
        custom['vt_params'] = dict()
233
        for child, res in zip(subelem, resp):
234
            if child not in ['cve', 'bid', 'xref', 'tag', 'timeout'] and res:
235
                custom[child] = res
236
            elif child == 'tag':
237
                custom.update(self._parse_metadata_tags(res, oid))
238
            elif child in ['cve', 'bid', 'xref'] and res:
239
                custom['refs'][child] = res.split(", ")
240
            elif child == 'timeout':
241
                if res is None:
242
                    continue
243
                vt_params = {}
244
                if int(res) > 0:
245
                    _param_id = '0'
246
                    vt_params[_param_id] = dict()
247
                    vt_params[_param_id]['id'] = _param_id
248
                    vt_params[_param_id]['type'] = 'entry'
249
                    vt_params[_param_id]['name'] = 'timeout'
250
                    vt_params[_param_id]['description'] = 'Script Timeout'
251
                    vt_params[_param_id]['default'] = res
252
                custom['vt_params'] = vt_params
253
                custom['vt_params'].update(self.get_nvt_params(oid))
254
255
        return custom
256
257
    def get_nvt_refs(self, oid: str) -> Optional[Dict[str, str]]:
258
        """ Get a full NVT.
259
260
        Arguments:
261
            oid: OID of VT from which to get the VT references.
262
263
        Returns:
264
            A dictionary with the VT references.
265
        """
266
        resp = OpenvasDB.get_list_item(
267
            self.ctx,
268
            "nvt:%s" % oid,
269
            start=NVT_META_FIELDS.index("NVT_CVES_POS"),
270
            end=NVT_META_FIELDS.index("NVT_XREFS_POS"),
271
        )
272
273
        if not isinstance(resp, list) or len(resp) == 0:
274
            return None
275
276
        subelem = ['cve', 'bid', 'xref']
277
278
        refs = dict()
279
        for child, res in zip(subelem, resp):
280
            refs[child] = res.split(", ")
281
282
        return refs
283
284
    def get_nvt_family(self, oid: str) -> str:
285
        """ Get NVT family
286
        Arguments:
287
            oid: OID of VT from which to get the VT family.
288
289
        Returns:
290
            A str with the VT family.
291
        """
292
        return OpenvasDB.get_single_item(
293
            self.ctx,
294
            'nvt:%s' % oid,
295
            index=NVT_META_FIELDS.index("NVT_FAMILY_POS"),
296
        )
297
298
    def get_nvt_prefs(self, oid: str) -> Optional[List[str]]:
299
        """ Get NVT preferences.
300
301
        Arguments:
302
            ctx: Redis context to be used.
303
            oid: OID of VT from which to get the VT preferences.
304
305
        Returns:
306
            A list with the VT preferences.
307
        """
308
        key = 'oid:%s:prefs' % oid
309
        return OpenvasDB.get_list_item(self.ctx, key)
310
311
    def get_nvt_timeout(self, oid: str) -> Optional[str]:
312
        """ Get NVT timeout
313
314
        Arguments:
315
            ctx: Redis context to be used.
316
            oid: OID of VT from which to get the script timeout.
317
318
        Returns:
319
            The timeout.
320
        """
321
        return OpenvasDB.get_single_item(
322
            self.ctx,
323
            'nvt:%s' % oid,
324
            index=NVT_META_FIELDS.index("NVT_TIMEOUT_POS"),
325
        )
326
327
    def get_nvt_tags(self, oid: str) -> Optional[Dict[str, str]]:
328
        """ Get Tags of the given OID.
329
330
        Arguments:
331
            ctx: Redis context to be used.
332
            oid: OID of VT from which to get the VT tags.
333
334
        Returns:
335
            A dictionary with the VT tags.
336
        """
337
        tag = OpenvasDB.get_single_item(
338
            self.ctx,
339
            'nvt:%s' % oid,
340
            index=NVT_META_FIELDS.index('NVT_TAGS_POS'),
341
        )
342
        tags = tag.split('|')
343
344
        return dict([item.split('=', 1) for item in tags])
345
346
    def get_nvt_files_count(self) -> int:
347
        return OpenvasDB.get_key_count(self.ctx, "filename:*")
348
349
    def get_nvt_count(self) -> int:
350
        return OpenvasDB.get_key_count(self.ctx, "nvt:*")
351
352
    def force_reload(self):
353
        self._main_db.release_database(self)
354