Completed
Push — master ( bac420...8e9778 )
by
unknown
18s queued 11s
created

ospd_openvas.nvticache.NVTICache.get_nvt_timeout()   A

Complexity

Conditions 1

Size

Total Lines 15
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 6
nop 3
dl 0
loc 15
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-2.0-or-later
5
#
6
# This program is free software; you can redistribute it and/or
7
# modify it under the terms of the GNU General Public License
8
# as published by the Free Software Foundation; either version 2
9
# of the License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
20
""" Provide functions to handle NVT Info Cache. """
21
22
import logging
23
import subprocess
24
import sys
25
26
from pkg_resources import parse_version
27
28
from ospd_openvas.db import NVT_META_FIELDS
29
from ospd_openvas.errors import OspdOpenvasError
30
31
32
logger = logging.getLogger(__name__)
33
34
LIST_FIRST_POS = 0
35
LIST_LAST_POS = -1
36
37
SUPPORTED_NVTICACHE_VERSIONS = ('20.4',)
38
39
40
class NVTICache(object):
41
42
    QOD_TYPES = {
43
        'exploit': '100',
44
        'remote_vul': '99',
45
        'remote_app': '98',
46
        'package': '97',
47
        'registry': '97',
48
        'remote_active': '95',
49
        'remote_banner': '80',
50
        'executable_version': '80',
51
        'remote_analysis': '70',
52
        'remote_probe': '50',
53
        'remote_banner_unreliable': '30',
54
        'executable_version_unreliable': '30',
55
        'general_note': '1',
56
        'default': '70',
57
    }
58
59
    NVTICACHE_STR = None
60
61
    def __init__(self, openvas_db):
62
        self._openvas_db = openvas_db
63
64
    def set_nvticache_str(self):
65
        """Set nvticache name"""
66
        try:
67
            result = subprocess.check_output(
68
                ['pkg-config', '--modversion', 'libgvm_util'],
69
                stderr=subprocess.STDOUT,
70
            )
71
        except (subprocess.CalledProcessError, PermissionError) as e:
72
            raise OspdOpenvasError(
73
                "Error setting nvticache. "
74
                "Not possible to get the installed "
75
                "gvm-libs version. %s" % e
76
            )
77
78
        installed_lib = parse_version(str(result.decode('utf-8')))
79
80
        for supported_item in SUPPORTED_NVTICACHE_VERSIONS:
81
            supported_lib = parse_version(supported_item)
82
            if (
83
                installed_lib >= supported_lib
84
                and installed_lib.base_version.split('.')[0]
85
                == supported_lib.base_version.split('.')[0]
86
            ):
87
                NVTICache.NVTICACHE_STR = (
88
                    "nvticache" + result.decode('utf-8').rstrip()
89
                )
90
                return
91
92
        logger.error(
93
            "Error setting nvticache. " "Incompatible nvticache version."
94
        )
95
        sys.exit(1)
96
97
    def get_feed_version(self):
98
        """ Get feed version.
99
        """
100
        ctx = self._openvas_db.db_find(self.NVTICACHE_STR)
101
        return self._openvas_db.get_single_item(self.NVTICACHE_STR, ctx=ctx)
102
103
    def get_oids(self):
104
        """ Get the list of NVT OIDs.
105
        Returns:
106
            list: A list of lists. Each single list contains the filename
107
            as first element and the oid as second one.
108
        """
109
        return self._openvas_db.get_elem_pattern_by_index('filename:*')
110
111
    def get_nvt_params(self, oid):
112
        """ Get NVT's preferences.
113
        Arguments:
114
            oid (str) OID of VT from which to get the parameters.
115
        Returns:
116
            dict: A dictionary with preferences and timeout.
117
        """
118
        ctx = self._openvas_db.get_kb_context()
119
        prefs = self.get_nvt_prefs(ctx, oid)
120
        timeout = self.get_nvt_timeout(ctx, oid)
121
122
        if timeout is None:
123
            return None
124
125
        vt_params = {}
126
        if int(timeout) > 0:
127
            _param_id = '0'
128
            vt_params[_param_id] = dict()
129
            vt_params[_param_id]['id'] = _param_id
130
            vt_params[_param_id]['type'] = 'entry'
131
            vt_params[_param_id]['name'] = 'timeout'
132
            vt_params[_param_id]['description'] = 'Script Timeout'
133
            vt_params[_param_id]['default'] = timeout
134
135
        if prefs:
136
            for nvt_pref in prefs:
137
                elem = nvt_pref.split('|||')
138
                _param_id = elem[0]
139
                vt_params[_param_id] = dict()
140
                vt_params[_param_id]['id'] = _param_id
141
                vt_params[_param_id]['type'] = elem[2]
142
                vt_params[_param_id]['name'] = elem[1].strip()
143
                vt_params[_param_id]['description'] = 'Description'
144
                if elem[2]:
145
                    vt_params[_param_id]['default'] = elem[3]
146
                else:
147
                    vt_params[_param_id]['default'] = ''
148
149
        return vt_params
150
151
    @staticmethod
152
    def _parse_metadata_tags(tags_str, oid):
153
        """ Parse a string with multiple tags.
154
155
        Arguments:
156
            tags_str (str) String with tags separated by `|`.
157
            oid (str) VT OID. Only used for logging in error case.
158
159
        Returns:
160
            dict: A dictionary with the tags.
161
        """
162
        tags_dict = dict()
163
        tags = tags_str.split('|')
164
        for tag in tags:
165
            try:
166
                _tag, _value = tag.split('=', 1)
167
            except ValueError:
168
                logger.error('Tag %s in %s has no value.', tag, oid)
169
                continue
170
            tags_dict[_tag] = _value
171
172
        return tags_dict
173
174
    def get_nvt_metadata(self, oid):
175
        """ Get a full NVT. Returns an XML tree with the NVT metadata.
176
        Arguments:
177
            oid (str) OID of VT from which to get the metadata.
178
        Returns:
179
            dict: A dictonary with the VT metadata.
180
        """
181
        ctx = self._openvas_db.get_kb_context()
182
        resp = self._openvas_db.get_list_item(
183
            "nvt:%s" % oid,
184
            ctx=ctx,
185
            start=NVT_META_FIELDS.index("NVT_FILENAME_POS"),
186
            end=NVT_META_FIELDS.index("NVT_NAME_POS"),
187
        )
188
189
        if not isinstance(resp, list) or len(resp) == 0:
190
            return None
191
192
        subelem = [
193
            'filename',
194
            'required_keys',
195
            'mandatory_keys',
196
            'excluded_keys',
197
            'required_udp_ports',
198
            'required_ports',
199
            'dependencies',
200
            'tag',
201
            'cve',
202
            'bid',
203
            'xref',
204
            'category',
205
            'timeout',
206
            'family',
207
            'name',
208
        ]
209
210
        custom = dict()
211
        for child, res in zip(subelem, resp):
212
            if child not in ['cve', 'bid', 'xref', 'tag'] and res:
213
                custom[child] = res
214
            elif child == 'tag':
215
                custom.update(self._parse_metadata_tags(res, oid))
216
217
        return custom
218
219
    def get_nvt_refs(self, oid):
220
        """ Get a full NVT.
221
        Arguments:
222
            oid (str) OID of VT from which to get the VT references.
223
        Returns:
224
            dict: A dictionary with the VT references.
225
        """
226
        ctx = self._openvas_db.get_kb_context()
227
        resp = self._openvas_db.get_list_item(
228
            "nvt:%s" % oid,
229
            ctx=ctx,
230
            start=NVT_META_FIELDS.index("NVT_CVES_POS"),
231
            end=NVT_META_FIELDS.index("NVT_XREFS_POS"),
232
        )
233
234
        if not isinstance(resp, list) or len(resp) == 0:
235
            return None
236
237
        subelem = ['cve', 'bid', 'xref']
238
239
        refs = dict()
240
        for child, res in zip(subelem, resp):
241
            refs[child] = res.split(", ")
242
243
        return refs
244
245
    def get_nvt_prefs(self, ctx, oid):
246
        """ Get NVT preferences.
247
        Arguments:
248
            ctx (object): Redis context to be used.
249
            oid (str) OID of VT from which to get the VT preferences.
250
        Returns:
251
            list: A list with the VT preferences.
252
        """
253
        key = 'oid:%s:prefs' % oid
254
        prefs = self._openvas_db.get_list_item(key, ctx=ctx)
255
        return prefs
256
257
    def get_nvt_timeout(self, ctx, oid):
258
        """ Get NVT timeout
259
        Arguments:
260
            ctx (object): Redis context to be used.
261
            oid (str) OID of VT from which to get the script timeout.
262
        Returns:
263
            str: The timeout.
264
        """
265
        timeout = self._openvas_db.get_single_item(
266
            'nvt:%s' % oid,
267
            ctx=ctx,
268
            index=NVT_META_FIELDS.index("NVT_TIMEOUT_POS"),
269
        )
270
271
        return timeout
272
273
    def get_nvt_tag(self, ctx, oid):
274
        """ Get Tags of the given OID.
275
        Arguments:
276
            ctx (object): Redis context to be used.
277
            oid (str) OID of VT from which to get the VT tags.
278
        Returns:
279
            dict: A dictionary with the VT tags.
280
        """
281
        tag = self._openvas_db.get_single_item(
282
            'nvt:%s' % oid, ctx=ctx, index=NVT_META_FIELDS.index('NVT_TAGS_POS')
283
        )
284
        tags = tag.split('|')
285
286
        return dict([item.split('=', 1) for item in tags])
287