NVTICache.get_feed_version()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 10
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 4
nop 1
dl 0
loc 10
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
""" Provide functions to handle NVT Info Cache. """
21
22
import logging
23
24
from typing import List, Dict, Optional, Iterator, Tuple
25
from pathlib import Path
26
from time import time
27
28
from ospd.errors import RequiredArgument
29
from ospd_openvas.errors import OspdOpenvasError
30
from ospd_openvas.db import NVT_META_FIELDS, OpenvasDB, MainDB, BaseDB, RedisCtx
31
32
NVTI_CACHE_NAME = "nvticache"
33
34
logger = logging.getLogger(__name__)
35
36
LIST_FIRST_POS = 0
37
LIST_LAST_POS = -1
38
39
40
class NVTICache(BaseDB):
41
42
    QOD_TYPES = {
43
        'exploit': '100',
44
        'remote_vul': '99',
45
        'remote_app': '98',
46
        'package': '97',
47
        'registry': '97',
48
        'remote_active': '95',
49
        'remote_banner': '80',
50
        'executable_version': '80',
51
        'remote_analysis': '70',
52
        'remote_probe': '50',
53
        'remote_banner_unreliable': '30',
54
        'executable_version_unreliable': '30',
55
        'general_note': '1',
56
        'default': '70',
57
    }
58
59
    def __init__(  # pylint: disable=super-init-not-called
60
        self, main_db: MainDB
61
    ):
62
        self._ctx = None
63
        self.index = None
64
        self._main_db = main_db
65
66
    @property
67
    def ctx(self) -> Optional[RedisCtx]:
68
        if self._ctx is None:
69
            self._ctx, self.index = OpenvasDB.find_database_by_pattern(
70
                NVTI_CACHE_NAME, self._main_db.max_database_index
71
            )
72
        return self._ctx
73
74
    def get_feed_version(self) -> Optional[str]:
75
        """Get feed version of the nvti cache db.
76
77
        Returns the feed version or None if the nvt feed isn't available.
78
        """
79
        if not self.ctx:
80
            # no nvti cache db available yet
81
            return None
82
83
        return OpenvasDB.get_single_item(self.ctx, NVTI_CACHE_NAME)
84
85
    def get_oids(self) -> Iterator[Tuple[str, str]]:
86
        """Get the list of NVT file names and OIDs.
87
88
        Returns:
89
            A i. Each single list contains the filename
90
            as first element and the oid as second one.
91
        """
92
        return OpenvasDB.get_filenames_and_oids(self.ctx)
93
94
    def get_nvt_params(self, oid: str) -> Optional[Dict[str, str]]:
95
        """Get NVT's preferences.
96
97
        Arguments:
98
            oid: OID of VT from which to get the parameters.
99
100
        Returns:
101
            A dictionary with preferences and timeout.
102
        """
103
        prefs = self.get_nvt_prefs(oid)
104
105
        vt_params = {}
106
107
        if prefs:
108
            for nvt_pref in prefs:
109
                elem = nvt_pref.split('|||')
110
111
                param_id = elem[0]
112
                param_name = elem[1]
113
                param_type = elem[2]
114
115
                vt_params[param_id] = dict()
116
                vt_params[param_id]['id'] = param_id
117
                vt_params[param_id]['type'] = param_type
118
                vt_params[param_id]['name'] = param_name.strip()
119
                vt_params[param_id]['description'] = 'Description'
120
121
                if len(elem) > 3:
122
                    param_default = elem[3]
123
                    vt_params[param_id]['default'] = param_default
124
                else:
125
                    vt_params[param_id]['default'] = ''
126
127
        return vt_params
128
129
    @staticmethod
130
    def _parse_metadata_tags(tags_str: str, oid: str) -> Dict[str, str]:
131
        """Parse a string with multiple tags.
132
133
        Arguments:
134
            tags_str: String with tags separated by `|`.
135
            oid: VT OID. Only used for logging in error case.
136
137
        Returns:
138
            A dictionary with the tags.
139
        """
140
        tags_dict = dict()
141
        tags = tags_str.split('|')
142
        for tag in tags:
143
            try:
144
                _tag, _value = tag.split('=', 1)
145
            except ValueError:
146
                logger.error('Tag %s in %s has no value.', tag, oid)
147
                continue
148
            tags_dict[_tag] = _value
149
150
        return tags_dict
151
152
    def get_nvt_metadata(self, oid: str) -> Optional[Dict[str, str]]:
153
        """Get a full NVT. Returns an XML tree with the NVT metadata.
154
155
        Arguments:
156
            oid: OID of VT from which to get the metadata.
157
158
        Returns:
159
            A dictionary with the VT metadata.
160
        """
161
        resp = OpenvasDB.get_list_item(
162
            self.ctx,
163
            "nvt:%s" % oid,
164
            start=NVT_META_FIELDS.index("NVT_FILENAME_POS"),
165
            end=NVT_META_FIELDS.index("NVT_NAME_POS"),
166
        )
167
168
        if not isinstance(resp, list) or len(resp) == 0:
169
            return None
170
171
        subelem = [
172
            'filename',
173
            'required_keys',
174
            'mandatory_keys',
175
            'excluded_keys',
176
            'required_udp_ports',
177
            'required_ports',
178
            'dependencies',
179
            'tag',
180
            'cve',
181
            'bid',
182
            'xref',
183
            'category',
184
            'timeout',
185
            'family',
186
            'name',
187
        ]
188
189
        custom = dict()
190
        custom['refs'] = dict()
191
        custom['vt_params'] = dict()
192
        for child, res in zip(subelem, resp):
193
            if child not in ['cve', 'bid', 'xref', 'tag', 'timeout'] and res:
194
                custom[child] = res
195
            elif child == 'tag':
196
                custom.update(self._parse_metadata_tags(res, oid))
197
            elif child in ['cve', 'bid', 'xref'] and res:
198
                custom['refs'][child] = res.split(", ")
199
            elif child == 'timeout':
200
                if res is None:
201
                    continue
202
                vt_params = {}
203
                if int(res) > 0:
204
                    _param_id = '0'
205
                    vt_params[_param_id] = dict()
206
                    vt_params[_param_id]['id'] = _param_id
207
                    vt_params[_param_id]['type'] = 'entry'
208
                    vt_params[_param_id]['name'] = 'timeout'
209
                    vt_params[_param_id]['description'] = 'Script Timeout'
210
                    vt_params[_param_id]['default'] = res
211
                custom['vt_params'] = vt_params
212
                custom['vt_params'].update(self.get_nvt_params(oid))
213
214
        return custom
215
216
    def get_nvt_refs(self, oid: str) -> Optional[Dict[str, str]]:
217
        """Get a full NVT.
218
219
        Arguments:
220
            oid: OID of VT from which to get the VT references.
221
222
        Returns:
223
            A dictionary with the VT references.
224
        """
225
        resp = OpenvasDB.get_list_item(
226
            self.ctx,
227
            "nvt:%s" % oid,
228
            start=NVT_META_FIELDS.index("NVT_CVES_POS"),
229
            end=NVT_META_FIELDS.index("NVT_XREFS_POS"),
230
        )
231
232
        if not isinstance(resp, list) or len(resp) == 0:
233
            return None
234
235
        subelem = ['cve', 'bid', 'xref']
236
237
        refs = dict()
238
        for child, res in zip(subelem, resp):
239
            refs[child] = res.split(", ")
240
241
        return refs
242
243
    def get_nvt_family(self, oid: str) -> str:
244
        """Get NVT family
245
        Arguments:
246
            oid: OID of VT from which to get the VT family.
247
248
        Returns:
249
            A str with the VT family.
250
        """
251
        return OpenvasDB.get_single_item(
252
            self.ctx,
253
            'nvt:%s' % oid,
254
            index=NVT_META_FIELDS.index("NVT_FAMILY_POS"),
255
        )
256
257
    def get_nvt_prefs(self, oid: str) -> Optional[List[str]]:
258
        """Get NVT preferences.
259
260
        Arguments:
261
            ctx: Redis context to be used.
262
            oid: OID of VT from which to get the VT preferences.
263
264
        Returns:
265
            A list with the VT preferences.
266
        """
267
        key = 'oid:%s:prefs' % oid
268
        return OpenvasDB.get_list_item(self.ctx, key)
269
270
    def get_nvt_timeout(self, oid: str) -> Optional[str]:
271
        """Get NVT timeout
272
273
        Arguments:
274
            ctx: Redis context to be used.
275
            oid: OID of VT from which to get the script timeout.
276
277
        Returns:
278
            The timeout.
279
        """
280
        return OpenvasDB.get_single_item(
281
            self.ctx,
282
            'nvt:%s' % oid,
283
            index=NVT_META_FIELDS.index("NVT_TIMEOUT_POS"),
284
        )
285
286
    def get_nvt_tags(self, oid: str) -> Optional[Dict[str, str]]:
287
        """Get Tags of the given OID.
288
289
        Arguments:
290
            ctx: Redis context to be used.
291
            oid: OID of VT from which to get the VT tags.
292
293
        Returns:
294
            A dictionary with the VT tags.
295
        """
296
        tag = OpenvasDB.get_single_item(
297
            self.ctx,
298
            'nvt:%s' % oid,
299
            index=NVT_META_FIELDS.index('NVT_TAGS_POS'),
300
        )
301
        tags = tag.split('|')
302
303
        return dict([item.split('=', 1) for item in tags])
304
305
    def get_nvt_files_count(self) -> int:
306
        return OpenvasDB.get_key_count(self.ctx, "filename:*")
307
308
    def get_nvt_count(self) -> int:
309
        return OpenvasDB.get_key_count(self.ctx, "nvt:*")
310
311
    def force_reload(self):
312
        self._main_db.release_database(self)
313
314
    def add_vt_to_cache(self, vt_id: str, vt: List[str]):
315
        if not vt_id:
316
            raise RequiredArgument('add_vt_to_cache', 'vt_id')
317
        if not vt:
318
            raise RequiredArgument('add_vt_to_cache', 'vt')
319
        if not isinstance(vt, list) or len(vt) != 15:
320
            raise OspdOpenvasError(
321
                'Error trying to load the VT' ' {} in cache'.format(vt)
322
            )
323
324
        OpenvasDB.add_single_list(self.ctx, vt_id, vt)
325
326
        OpenvasDB.add_single_item(self.ctx, f'filename:{vt[0]}', [int(time())])
327
328
    def get_file_checksum(self, file_abs_path: Path) -> str:
329
        """Get file sha256 checksum or md5 checksum
330
331
        Arguments:
332
            file_abs_path: File to get the checksum
333
334
        Returns:
335
            The checksum
336
        """
337
        # Try to get first sha256 checksum
338
        sha256sum = OpenvasDB.get_single_item(
339
            self.ctx,
340
            f'sha256sums:{file_abs_path}',
341
        )
342
        if sha256sum:
343
            return sha256sum
344
345
        # Search for md5 checksum
346
        md5sum = OpenvasDB.get_single_item(
347
            self.ctx,
348
            f'md5sums:{file_abs_path}',
349
        )
350
        if md5sum:
351
            return md5sum
352