Completed
Push — master ( ee6aac...b90bd1 )
by Björn
28s queued 12s
created

ospd_openvas.nvticache.NVTICache.get_nvt_count()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
""" Provide functions to handle NVT Info Cache. """
21
22
import logging
23
24
from typing import List, Dict, Optional, Iterator, Tuple
25
from pathlib import Path
26
27
from ospd.errors import RequiredArgument
28
from ospd_openvas.errors import OspdOpenvasError
29
from ospd_openvas.db import NVT_META_FIELDS, OpenvasDB, MainDB, BaseDB, RedisCtx
30
31
NVTI_CACHE_NAME = "nvticache"
32
33
logger = logging.getLogger(__name__)
34
35
LIST_FIRST_POS = 0
36
LIST_LAST_POS = -1
37
38
39
class NVTICache(BaseDB):
40
41
    QOD_TYPES = {
42
        'exploit': '100',
43
        'remote_vul': '99',
44
        'remote_app': '98',
45
        'package': '97',
46
        'registry': '97',
47
        'remote_active': '95',
48
        'remote_banner': '80',
49
        'executable_version': '80',
50
        'remote_analysis': '70',
51
        'remote_probe': '50',
52
        'remote_banner_unreliable': '30',
53
        'executable_version_unreliable': '30',
54
        'general_note': '1',
55
        'default': '70',
56
    }
57
58
    def __init__(  # pylint: disable=super-init-not-called
59
        self, main_db: MainDB
60
    ):
61
        self._ctx = None
62
        self.index = None
63
        self._main_db = main_db
64
65
    @property
66
    def ctx(self) -> Optional[RedisCtx]:
67
        if self._ctx is None:
68
            self._ctx, self.index = OpenvasDB.find_database_by_pattern(
69
                NVTI_CACHE_NAME, self._main_db.max_database_index
70
            )
71
        return self._ctx
72
73
    def get_feed_version(self) -> Optional[str]:
74
        """Get feed version of the nvti cache db.
75
76
        Returns the feed version or None if the nvt feed isn't available.
77
        """
78
        if not self.ctx:
79
            # no nvti cache db available yet
80
            return None
81
82
        return OpenvasDB.get_single_item(self.ctx, NVTI_CACHE_NAME)
83
84
    def get_oids(self) -> Iterator[Tuple[str, str]]:
85
        """Get the list of NVT file names and OIDs.
86
87
        Returns:
88
            A i. Each single list contains the filename
89
            as first element and the oid as second one.
90
        """
91
        return OpenvasDB.get_filenames_and_oids(self.ctx)
92
93
    def get_nvt_params(self, oid: str) -> Optional[Dict[str, str]]:
94
        """Get NVT's preferences.
95
96
        Arguments:
97
            oid: OID of VT from which to get the parameters.
98
99
        Returns:
100
            A dictionary with preferences and timeout.
101
        """
102
        prefs = self.get_nvt_prefs(oid)
103
104
        vt_params = {}
105
106
        if prefs:
107
            for nvt_pref in prefs:
108
                elem = nvt_pref.split('|||')
109
110
                param_id = elem[0]
111
                param_name = elem[1]
112
                param_type = elem[2]
113
114
                vt_params[param_id] = dict()
115
                vt_params[param_id]['id'] = param_id
116
                vt_params[param_id]['type'] = param_type
117
                vt_params[param_id]['name'] = param_name.strip()
118
                vt_params[param_id]['description'] = 'Description'
119
120
                if len(elem) > 3:
121
                    param_default = elem[3]
122
                    vt_params[param_id]['default'] = param_default
123
                else:
124
                    vt_params[param_id]['default'] = ''
125
126
        return vt_params
127
128
    @staticmethod
129
    def _parse_metadata_tags(tags_str: str, oid: str) -> Dict[str, str]:
130
        """Parse a string with multiple tags.
131
132
        Arguments:
133
            tags_str: String with tags separated by `|`.
134
            oid: VT OID. Only used for logging in error case.
135
136
        Returns:
137
            A dictionary with the tags.
138
        """
139
        tags_dict = dict()
140
        tags = tags_str.split('|')
141
        for tag in tags:
142
            try:
143
                _tag, _value = tag.split('=', 1)
144
            except ValueError:
145
                logger.error('Tag %s in %s has no value.', tag, oid)
146
                continue
147
            tags_dict[_tag] = _value
148
149
        return tags_dict
150
151
    def get_nvt_metadata(self, oid: str) -> Optional[Dict[str, str]]:
152
        """Get a full NVT. Returns an XML tree with the NVT metadata.
153
154
        Arguments:
155
            oid: OID of VT from which to get the metadata.
156
157
        Returns:
158
            A dictionary with the VT metadata.
159
        """
160
        resp = OpenvasDB.get_list_item(
161
            self.ctx,
162
            "nvt:%s" % oid,
163
            start=NVT_META_FIELDS.index("NVT_FILENAME_POS"),
164
            end=NVT_META_FIELDS.index("NVT_NAME_POS"),
165
        )
166
167
        if not isinstance(resp, list) or len(resp) == 0:
168
            return None
169
170
        subelem = [
171
            'filename',
172
            'required_keys',
173
            'mandatory_keys',
174
            'excluded_keys',
175
            'required_udp_ports',
176
            'required_ports',
177
            'dependencies',
178
            'tag',
179
            'cve',
180
            'bid',
181
            'xref',
182
            'category',
183
            'timeout',
184
            'family',
185
            'name',
186
        ]
187
188
        custom = dict()
189
        custom['refs'] = dict()
190
        custom['vt_params'] = dict()
191
        for child, res in zip(subelem, resp):
192
            if child not in ['cve', 'bid', 'xref', 'tag', 'timeout'] and res:
193
                custom[child] = res
194
            elif child == 'tag':
195
                custom.update(self._parse_metadata_tags(res, oid))
196
            elif child in ['cve', 'bid', 'xref'] and res:
197
                custom['refs'][child] = res.split(", ")
198
            elif child == 'timeout':
199
                if res is None:
200
                    continue
201
                vt_params = {}
202
                if int(res) > 0:
203
                    _param_id = '0'
204
                    vt_params[_param_id] = dict()
205
                    vt_params[_param_id]['id'] = _param_id
206
                    vt_params[_param_id]['type'] = 'entry'
207
                    vt_params[_param_id]['name'] = 'timeout'
208
                    vt_params[_param_id]['description'] = 'Script Timeout'
209
                    vt_params[_param_id]['default'] = res
210
                custom['vt_params'] = vt_params
211
                custom['vt_params'].update(self.get_nvt_params(oid))
212
213
        return custom
214
215
    def get_nvt_refs(self, oid: str) -> Optional[Dict[str, str]]:
216
        """Get a full NVT.
217
218
        Arguments:
219
            oid: OID of VT from which to get the VT references.
220
221
        Returns:
222
            A dictionary with the VT references.
223
        """
224
        resp = OpenvasDB.get_list_item(
225
            self.ctx,
226
            "nvt:%s" % oid,
227
            start=NVT_META_FIELDS.index("NVT_CVES_POS"),
228
            end=NVT_META_FIELDS.index("NVT_XREFS_POS"),
229
        )
230
231
        if not isinstance(resp, list) or len(resp) == 0:
232
            return None
233
234
        subelem = ['cve', 'bid', 'xref']
235
236
        refs = dict()
237
        for child, res in zip(subelem, resp):
238
            refs[child] = res.split(", ")
239
240
        return refs
241
242
    def get_nvt_family(self, oid: str) -> str:
243
        """Get NVT family
244
        Arguments:
245
            oid: OID of VT from which to get the VT family.
246
247
        Returns:
248
            A str with the VT family.
249
        """
250
        return OpenvasDB.get_single_item(
251
            self.ctx,
252
            'nvt:%s' % oid,
253
            index=NVT_META_FIELDS.index("NVT_FAMILY_POS"),
254
        )
255
256
    def get_nvt_prefs(self, oid: str) -> Optional[List[str]]:
257
        """Get NVT preferences.
258
259
        Arguments:
260
            ctx: Redis context to be used.
261
            oid: OID of VT from which to get the VT preferences.
262
263
        Returns:
264
            A list with the VT preferences.
265
        """
266
        key = 'oid:%s:prefs' % oid
267
        return OpenvasDB.get_list_item(self.ctx, key)
268
269
    def get_nvt_timeout(self, oid: str) -> Optional[str]:
270
        """Get NVT timeout
271
272
        Arguments:
273
            ctx: Redis context to be used.
274
            oid: OID of VT from which to get the script timeout.
275
276
        Returns:
277
            The timeout.
278
        """
279
        return OpenvasDB.get_single_item(
280
            self.ctx,
281
            'nvt:%s' % oid,
282
            index=NVT_META_FIELDS.index("NVT_TIMEOUT_POS"),
283
        )
284
285
    def get_nvt_tags(self, oid: str) -> Optional[Dict[str, str]]:
286
        """Get Tags of the given OID.
287
288
        Arguments:
289
            ctx: Redis context to be used.
290
            oid: OID of VT from which to get the VT tags.
291
292
        Returns:
293
            A dictionary with the VT tags.
294
        """
295
        tag = OpenvasDB.get_single_item(
296
            self.ctx,
297
            'nvt:%s' % oid,
298
            index=NVT_META_FIELDS.index('NVT_TAGS_POS'),
299
        )
300
        tags = tag.split('|')
301
302
        return dict([item.split('=', 1) for item in tags])
303
304
    def get_nvt_files_count(self) -> int:
305
        return OpenvasDB.get_key_count(self.ctx, "filename:*")
306
307
    def get_nvt_count(self) -> int:
308
        return OpenvasDB.get_key_count(self.ctx, "nvt:*")
309
310
    def force_reload(self):
311
        self._main_db.release_database(self)
312
313
    def add_vt_to_cache(self, vt_id: str, vt: List[str]):
314
        if not vt_id:
315
            raise RequiredArgument('add_vt_to_cache', 'vt_id')
316
        if not vt:
317
            raise RequiredArgument('add_vt_to_cache', 'vt')
318
        if not isinstance(vt, list) or len(vt) != 15:
319
            raise OspdOpenvasError(
320
                'Error trying to load the VT' ' {} in cache'.format(vt)
321
            )
322
323
        OpenvasDB.add_single_list(self.ctx, vt_id, vt)
324
325
    def get_file_checksum(self, file_abs_path: Path) -> str:
326
        """Get file sha256 checksum or md5 checksum
327
328
        Arguments:
329
            file_abs_path: File to get the checksum
330
331
        Returns:
332
            The checksum
333
        """
334
        # Try to get first sha256 checksum
335
        sha256sum = OpenvasDB.get_single_item(
336
            self.ctx,
337
            f'sha256sums:{file_abs_path}',
338
        )
339
        if sha256sum:
340
            return sha256sum
341
342
        # Search for md5 checksum
343
        md5sum = OpenvasDB.get_single_item(
344
            self.ctx,
345
            f'md5sums:{file_abs_path}',
346
        )
347
        if md5sum:
348
            return md5sum
349