Completed
Push — master ( 7a7f2e...ca40a9 )
by Juan José
13s queued 11s
created

sion()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 2
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
""" Provide functions to handle NVT Info Cache. """
21
22
import logging
23
24
from typing import List, Dict, Optional, Iterator, Tuple
25
26
from ospd_openvas.db import NVT_META_FIELDS, OpenvasDB, MainDB, BaseDB, RedisCtx
27
28
NVTI_CACHE_NAME = "nvticache"
29
30
logger = logging.getLogger(__name__)
31
32
LIST_FIRST_POS = 0
33
LIST_LAST_POS = -1
34
35
36
class NVTICache(BaseDB):
37
38
    QOD_TYPES = {
39
        'exploit': '100',
40
        'remote_vul': '99',
41
        'remote_app': '98',
42
        'package': '97',
43
        'registry': '97',
44
        'remote_active': '95',
45
        'remote_banner': '80',
46
        'executable_version': '80',
47
        'remote_analysis': '70',
48
        'remote_probe': '50',
49
        'remote_banner_unreliable': '30',
50
        'executable_version_unreliable': '30',
51
        'general_note': '1',
52
        'default': '70',
53
    }
54
55
    def __init__(  # pylint: disable=super-init-not-called
56
        self, main_db: MainDB
57
    ):
58
        self._ctx = None
59
        self.index = None
60
        self._main_db = main_db
61
62
    @property
63
    def ctx(self) -> Optional[RedisCtx]:
64
        if self._ctx is None:
65
            self._ctx, self.index = OpenvasDB.find_database_by_pattern(
66
                NVTI_CACHE_NAME, self._main_db.max_database_index
67
            )
68
        return self._ctx
69
70
    def get_feed_version(self) -> Optional[str]:
71
        """ Get feed version of the nvti cache db.
72
73
        Returns the feed version or None if the nvt feed isn't available.
74
        """
75
        if not self.ctx:
76
            # no nvti cache db available yet
77
            return None
78
79
        return OpenvasDB.get_single_item(self.ctx, NVTI_CACHE_NAME)
80
81
    def get_oids(self) -> Iterator[Tuple[str, str]]:
82
        """ Get the list of NVT file names and OIDs.
83
84
        Returns:
85
            A i. Each single list contains the filename
86
            as first element and the oid as second one.
87
        """
88
        return OpenvasDB.get_filenames_and_oids(self.ctx)
89
90
    def get_nvt_params(self, oid: str) -> Optional[Dict[str, str]]:
91
        """ Get NVT's preferences.
92
93
        Arguments:
94
            oid: OID of VT from which to get the parameters.
95
96
        Returns:
97
            A dictionary with preferences and timeout.
98
        """
99
        prefs = self.get_nvt_prefs(oid)
100
101
        vt_params = {}
102
103
        if prefs:
104
            for nvt_pref in prefs:
105
                elem = nvt_pref.split('|||')
106
107
                param_id = elem[0]
108
                param_name = elem[1]
109
                param_type = elem[2]
110
111
                vt_params[param_id] = dict()
112
                vt_params[param_id]['id'] = param_id
113
                vt_params[param_id]['type'] = param_type
114
                vt_params[param_id]['name'] = param_name.strip()
115
                vt_params[param_id]['description'] = 'Description'
116
117
                if len(elem) > 3:
118
                    param_default = elem[3]
119
                    vt_params[param_id]['default'] = param_default
120
                else:
121
                    vt_params[param_id]['default'] = ''
122
123
        return vt_params
124
125
    @staticmethod
126
    def _parse_metadata_tags(tags_str: str, oid: str) -> Dict[str, str]:
127
        """ Parse a string with multiple tags.
128
129
        Arguments:
130
            tags_str: String with tags separated by `|`.
131
            oid: VT OID. Only used for logging in error case.
132
133
        Returns:
134
            A dictionary with the tags.
135
        """
136
        tags_dict = dict()
137
        tags = tags_str.split('|')
138
        for tag in tags:
139
            try:
140
                _tag, _value = tag.split('=', 1)
141
            except ValueError:
142
                logger.error('Tag %s in %s has no value.', tag, oid)
143
                continue
144
            tags_dict[_tag] = _value
145
146
        return tags_dict
147
148
    def get_nvt_metadata(self, oid: str) -> Optional[Dict[str, str]]:
149
        """ Get a full NVT. Returns an XML tree with the NVT metadata.
150
151
        Arguments:
152
            oid: OID of VT from which to get the metadata.
153
154
        Returns:
155
            A dictionary with the VT metadata.
156
        """
157
        resp = OpenvasDB.get_list_item(
158
            self.ctx,
159
            "nvt:%s" % oid,
160
            start=NVT_META_FIELDS.index("NVT_FILENAME_POS"),
161
            end=NVT_META_FIELDS.index("NVT_NAME_POS"),
162
        )
163
164
        if not isinstance(resp, list) or len(resp) == 0:
165
            return None
166
167
        subelem = [
168
            'filename',
169
            'required_keys',
170
            'mandatory_keys',
171
            'excluded_keys',
172
            'required_udp_ports',
173
            'required_ports',
174
            'dependencies',
175
            'tag',
176
            'cve',
177
            'bid',
178
            'xref',
179
            'category',
180
            'timeout',
181
            'family',
182
            'name',
183
        ]
184
185
        custom = dict()
186
        custom['refs'] = dict()
187
        custom['vt_params'] = dict()
188
        for child, res in zip(subelem, resp):
189
            if child not in ['cve', 'bid', 'xref', 'tag', 'timeout'] and res:
190
                custom[child] = res
191
            elif child == 'tag':
192
                custom.update(self._parse_metadata_tags(res, oid))
193
            elif child in ['cve', 'bid', 'xref'] and res:
194
                custom['refs'][child] = res.split(", ")
195
            elif child == 'timeout':
196
                if res is None:
197
                    continue
198
                vt_params = {}
199
                if int(res) > 0:
200
                    _param_id = '0'
201
                    vt_params[_param_id] = dict()
202
                    vt_params[_param_id]['id'] = _param_id
203
                    vt_params[_param_id]['type'] = 'entry'
204
                    vt_params[_param_id]['name'] = 'timeout'
205
                    vt_params[_param_id]['description'] = 'Script Timeout'
206
                    vt_params[_param_id]['default'] = res
207
                custom['vt_params'] = vt_params
208
                custom['vt_params'].update(self.get_nvt_params(oid))
209
210
        return custom
211
212
    def get_nvt_refs(self, oid: str) -> Optional[Dict[str, str]]:
213
        """ Get a full NVT.
214
215
        Arguments:
216
            oid: OID of VT from which to get the VT references.
217
218
        Returns:
219
            A dictionary with the VT references.
220
        """
221
        resp = OpenvasDB.get_list_item(
222
            self.ctx,
223
            "nvt:%s" % oid,
224
            start=NVT_META_FIELDS.index("NVT_CVES_POS"),
225
            end=NVT_META_FIELDS.index("NVT_XREFS_POS"),
226
        )
227
228
        if not isinstance(resp, list) or len(resp) == 0:
229
            return None
230
231
        subelem = ['cve', 'bid', 'xref']
232
233
        refs = dict()
234
        for child, res in zip(subelem, resp):
235
            refs[child] = res.split(", ")
236
237
        return refs
238
239
    def get_nvt_family(self, oid: str) -> str:
240
        """ Get NVT family
241
        Arguments:
242
            oid: OID of VT from which to get the VT family.
243
244
        Returns:
245
            A str with the VT family.
246
        """
247
        return OpenvasDB.get_single_item(
248
            self.ctx,
249
            'nvt:%s' % oid,
250
            index=NVT_META_FIELDS.index("NVT_FAMILY_POS"),
251
        )
252
253
    def get_nvt_prefs(self, oid: str) -> Optional[List[str]]:
254
        """ Get NVT preferences.
255
256
        Arguments:
257
            ctx: Redis context to be used.
258
            oid: OID of VT from which to get the VT preferences.
259
260
        Returns:
261
            A list with the VT preferences.
262
        """
263
        key = 'oid:%s:prefs' % oid
264
        return OpenvasDB.get_list_item(self.ctx, key)
265
266
    def get_nvt_timeout(self, oid: str) -> Optional[str]:
267
        """ Get NVT timeout
268
269
        Arguments:
270
            ctx: Redis context to be used.
271
            oid: OID of VT from which to get the script timeout.
272
273
        Returns:
274
            The timeout.
275
        """
276
        return OpenvasDB.get_single_item(
277
            self.ctx,
278
            'nvt:%s' % oid,
279
            index=NVT_META_FIELDS.index("NVT_TIMEOUT_POS"),
280
        )
281
282
    def get_nvt_tags(self, oid: str) -> Optional[Dict[str, str]]:
283
        """ Get Tags of the given OID.
284
285
        Arguments:
286
            ctx: Redis context to be used.
287
            oid: OID of VT from which to get the VT tags.
288
289
        Returns:
290
            A dictionary with the VT tags.
291
        """
292
        tag = OpenvasDB.get_single_item(
293
            self.ctx,
294
            'nvt:%s' % oid,
295
            index=NVT_META_FIELDS.index('NVT_TAGS_POS'),
296
        )
297
        tags = tag.split('|')
298
299
        return dict([item.split('=', 1) for item in tags])
300
301
    def get_nvt_files_count(self) -> int:
302
        return OpenvasDB.get_key_count(self.ctx, "filename:*")
303
304
    def get_nvt_count(self) -> int:
305
        return OpenvasDB.get_key_count(self.ctx, "nvt:*")
306
307
    def force_reload(self):
308
        self._main_db.release_database(self)
309