Passed
Push — master ( d88eea...1dd774 )
by Juan José
01:47 queued 10s
created

ospd_openvas.nvticache.NVTICache.get_nvt_timeout()   A

Complexity

Conditions 1

Size

Total Lines 14
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 5
nop 2
dl 0
loc 14
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
""" Provide functions to handle NVT Info Cache. """
21
22
import logging
23
24
from typing import List, Dict, Optional, Iterator, Tuple
25
26
from ospd.errors import RequiredArgument
27
from ospd_openvas.errors import OspdOpenvasError
28
from ospd_openvas.db import NVT_META_FIELDS, OpenvasDB, MainDB, BaseDB, RedisCtx
29
30
NVTI_CACHE_NAME = "nvticache"
31
32
logger = logging.getLogger(__name__)
33
34
LIST_FIRST_POS = 0
35
LIST_LAST_POS = -1
36
37
38
class NVTICache(BaseDB):
39
40
    QOD_TYPES = {
41
        'exploit': '100',
42
        'remote_vul': '99',
43
        'remote_app': '98',
44
        'package': '97',
45
        'registry': '97',
46
        'remote_active': '95',
47
        'remote_banner': '80',
48
        'executable_version': '80',
49
        'remote_analysis': '70',
50
        'remote_probe': '50',
51
        'remote_banner_unreliable': '30',
52
        'executable_version_unreliable': '30',
53
        'general_note': '1',
54
        'default': '70',
55
    }
56
57
    def __init__(  # pylint: disable=super-init-not-called
58
        self, main_db: MainDB
59
    ):
60
        self._ctx = None
61
        self.index = None
62
        self._main_db = main_db
63
64
    @property
65
    def ctx(self) -> Optional[RedisCtx]:
66
        if self._ctx is None:
67
            self._ctx, self.index = OpenvasDB.find_database_by_pattern(
68
                NVTI_CACHE_NAME, self._main_db.max_database_index
69
            )
70
        return self._ctx
71
72
    def get_feed_version(self) -> Optional[str]:
73
        """Get feed version of the nvti cache db.
74
75
        Returns the feed version or None if the nvt feed isn't available.
76
        """
77
        if not self.ctx:
78
            # no nvti cache db available yet
79
            return None
80
81
        return OpenvasDB.get_single_item(self.ctx, NVTI_CACHE_NAME)
82
83
    def get_oids(self) -> Iterator[Tuple[str, str]]:
84
        """Get the list of NVT file names and OIDs.
85
86
        Returns:
87
            A i. Each single list contains the filename
88
            as first element and the oid as second one.
89
        """
90
        return OpenvasDB.get_filenames_and_oids(self.ctx)
91
92
    def get_nvt_params(self, oid: str) -> Optional[Dict[str, str]]:
93
        """Get NVT's preferences.
94
95
        Arguments:
96
            oid: OID of VT from which to get the parameters.
97
98
        Returns:
99
            A dictionary with preferences and timeout.
100
        """
101
        prefs = self.get_nvt_prefs(oid)
102
103
        vt_params = {}
104
105
        if prefs:
106
            for nvt_pref in prefs:
107
                elem = nvt_pref.split('|||')
108
109
                param_id = elem[0]
110
                param_name = elem[1]
111
                param_type = elem[2]
112
113
                vt_params[param_id] = dict()
114
                vt_params[param_id]['id'] = param_id
115
                vt_params[param_id]['type'] = param_type
116
                vt_params[param_id]['name'] = param_name.strip()
117
                vt_params[param_id]['description'] = 'Description'
118
119
                if len(elem) > 3:
120
                    param_default = elem[3]
121
                    vt_params[param_id]['default'] = param_default
122
                else:
123
                    vt_params[param_id]['default'] = ''
124
125
        return vt_params
126
127
    @staticmethod
128
    def _parse_metadata_tags(tags_str: str, oid: str) -> Dict[str, str]:
129
        """Parse a string with multiple tags.
130
131
        Arguments:
132
            tags_str: String with tags separated by `|`.
133
            oid: VT OID. Only used for logging in error case.
134
135
        Returns:
136
            A dictionary with the tags.
137
        """
138
        tags_dict = dict()
139
        tags = tags_str.split('|')
140
        for tag in tags:
141
            try:
142
                _tag, _value = tag.split('=', 1)
143
            except ValueError:
144
                logger.error('Tag %s in %s has no value.', tag, oid)
145
                continue
146
            tags_dict[_tag] = _value
147
148
        return tags_dict
149
150
    def get_nvt_metadata(self, oid: str) -> Optional[Dict[str, str]]:
151
        """Get a full NVT. Returns an XML tree with the NVT metadata.
152
153
        Arguments:
154
            oid: OID of VT from which to get the metadata.
155
156
        Returns:
157
            A dictionary with the VT metadata.
158
        """
159
        resp = OpenvasDB.get_list_item(
160
            self.ctx,
161
            "nvt:%s" % oid,
162
            start=NVT_META_FIELDS.index("NVT_FILENAME_POS"),
163
            end=NVT_META_FIELDS.index("NVT_NAME_POS"),
164
        )
165
166
        if not isinstance(resp, list) or len(resp) == 0:
167
            return None
168
169
        subelem = [
170
            'filename',
171
            'required_keys',
172
            'mandatory_keys',
173
            'excluded_keys',
174
            'required_udp_ports',
175
            'required_ports',
176
            'dependencies',
177
            'tag',
178
            'cve',
179
            'bid',
180
            'xref',
181
            'category',
182
            'timeout',
183
            'family',
184
            'name',
185
        ]
186
187
        custom = dict()
188
        custom['refs'] = dict()
189
        custom['vt_params'] = dict()
190
        for child, res in zip(subelem, resp):
191
            if child not in ['cve', 'bid', 'xref', 'tag', 'timeout'] and res:
192
                custom[child] = res
193
            elif child == 'tag':
194
                custom.update(self._parse_metadata_tags(res, oid))
195
            elif child in ['cve', 'bid', 'xref'] and res:
196
                custom['refs'][child] = res.split(", ")
197
            elif child == 'timeout':
198
                if res is None:
199
                    continue
200
                vt_params = {}
201
                if int(res) > 0:
202
                    _param_id = '0'
203
                    vt_params[_param_id] = dict()
204
                    vt_params[_param_id]['id'] = _param_id
205
                    vt_params[_param_id]['type'] = 'entry'
206
                    vt_params[_param_id]['name'] = 'timeout'
207
                    vt_params[_param_id]['description'] = 'Script Timeout'
208
                    vt_params[_param_id]['default'] = res
209
                custom['vt_params'] = vt_params
210
                custom['vt_params'].update(self.get_nvt_params(oid))
211
212
        return custom
213
214
    def get_nvt_refs(self, oid: str) -> Optional[Dict[str, str]]:
215
        """Get a full NVT.
216
217
        Arguments:
218
            oid: OID of VT from which to get the VT references.
219
220
        Returns:
221
            A dictionary with the VT references.
222
        """
223
        resp = OpenvasDB.get_list_item(
224
            self.ctx,
225
            "nvt:%s" % oid,
226
            start=NVT_META_FIELDS.index("NVT_CVES_POS"),
227
            end=NVT_META_FIELDS.index("NVT_XREFS_POS"),
228
        )
229
230
        if not isinstance(resp, list) or len(resp) == 0:
231
            return None
232
233
        subelem = ['cve', 'bid', 'xref']
234
235
        refs = dict()
236
        for child, res in zip(subelem, resp):
237
            refs[child] = res.split(", ")
238
239
        return refs
240
241
    def get_nvt_family(self, oid: str) -> str:
242
        """Get NVT family
243
        Arguments:
244
            oid: OID of VT from which to get the VT family.
245
246
        Returns:
247
            A str with the VT family.
248
        """
249
        return OpenvasDB.get_single_item(
250
            self.ctx,
251
            'nvt:%s' % oid,
252
            index=NVT_META_FIELDS.index("NVT_FAMILY_POS"),
253
        )
254
255
    def get_nvt_prefs(self, oid: str) -> Optional[List[str]]:
256
        """Get NVT preferences.
257
258
        Arguments:
259
            ctx: Redis context to be used.
260
            oid: OID of VT from which to get the VT preferences.
261
262
        Returns:
263
            A list with the VT preferences.
264
        """
265
        key = 'oid:%s:prefs' % oid
266
        return OpenvasDB.get_list_item(self.ctx, key)
267
268
    def get_nvt_timeout(self, oid: str) -> Optional[str]:
269
        """Get NVT timeout
270
271
        Arguments:
272
            ctx: Redis context to be used.
273
            oid: OID of VT from which to get the script timeout.
274
275
        Returns:
276
            The timeout.
277
        """
278
        return OpenvasDB.get_single_item(
279
            self.ctx,
280
            'nvt:%s' % oid,
281
            index=NVT_META_FIELDS.index("NVT_TIMEOUT_POS"),
282
        )
283
284
    def get_nvt_tags(self, oid: str) -> Optional[Dict[str, str]]:
285
        """Get Tags of the given OID.
286
287
        Arguments:
288
            ctx: Redis context to be used.
289
            oid: OID of VT from which to get the VT tags.
290
291
        Returns:
292
            A dictionary with the VT tags.
293
        """
294
        tag = OpenvasDB.get_single_item(
295
            self.ctx,
296
            'nvt:%s' % oid,
297
            index=NVT_META_FIELDS.index('NVT_TAGS_POS'),
298
        )
299
        tags = tag.split('|')
300
301
        return dict([item.split('=', 1) for item in tags])
302
303
    def get_nvt_files_count(self) -> int:
304
        return OpenvasDB.get_key_count(self.ctx, "filename:*")
305
306
    def get_nvt_count(self) -> int:
307
        return OpenvasDB.get_key_count(self.ctx, "nvt:*")
308
309
    def force_reload(self):
310
        self._main_db.release_database(self)
311
312
    def add_vt_to_cache(self, vt_id: str, vt: List[str]):
313
        if not vt_id:
314
            raise RequiredArgument('add_vt_to_cache', 'vt_id')
315
        if not vt:
316
            raise RequiredArgument('add_vt_to_cache', 'vt')
317
        if not isinstance(vt, list) or len(vt) != 15:
318
            raise OspdOpenvasError(
319
                'Error trying to load the VT' ' {} in cache'.format(vt)
320
            )
321
322
        OpenvasDB.add_single_list(self.ctx, vt_id, vt)
323