Completed
Push — master ( a3f003...f81eb3 )
by Juan José
31s queued 21s
created

VtHelper.get_notus_driver_oids()   A

Complexity

Conditions 1

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 1
dl 0
loc 8
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
""" Provide functions to handle VT Info. """
21
22
from hashlib import sha256
23
from typing import Optional, Dict, List, Tuple, Iterator
24
25
from ospd_openvas.nvticache import NVTICache
26
from ospd_openvas.notus.metadata import NotusMetadataHandler
27
28
29
class VtHelper:
30
    def __init__(self, nvticache: NVTICache):
31
        self.nvti = nvticache
32
33
    def get_single_vt(self, vt_id: str, oids=None) -> Optional[Dict[str, any]]:
34
        custom = self.nvti.get_nvt_metadata(vt_id)
35
36
        if not custom:
37
            return None
38
39
        vt_params = custom.pop('vt_params')
40
        vt_refs = custom.pop('refs')
41
        name = custom.pop('name')
42
        vt_creation_time = custom.pop('creation_date')
43
        vt_modification_time = custom.pop('last_modification')
44
45
        if oids:
46
            vt_dependencies = list()
47
            if 'dependencies' in custom:
48
                deps = custom.pop('dependencies')
49
                deps_list = deps.split(', ')
50
                for dep_name in deps_list:
51
                    dep_oid = oids.get(dep_name)
52
                    if dep_oid:
53
                        vt_dependencies.append(dep_oid)
54
                    else:
55
                        vt_dependencies.append(dep_name)
56
        else:
57
            vt_dependencies = None
58
59
        summary = None
60
        impact = None
61
        affected = None
62
        insight = None
63
        solution = None
64
        solution_t = None
65
        solution_m = None
66
        vuldetect = None
67
        qod_t = None
68
        qod_v = None
69
70
        if 'summary' in custom:
71
            summary = custom.pop('summary')
72
        if 'impact' in custom:
73
            impact = custom.pop('impact')
74
        if 'affected' in custom:
75
            affected = custom.pop('affected')
76
        if 'insight' in custom:
77
            insight = custom.pop('insight')
78
        if 'solution' in custom:
79
            solution = custom.pop('solution')
80
            if 'solution_type' in custom:
81
                solution_t = custom.pop('solution_type')
82
            if 'solution_method' in custom:
83
                solution_m = custom.pop('solution_method')
84
85
        if 'vuldetect' in custom:
86
            vuldetect = custom.pop('vuldetect')
87
        if 'qod_type' in custom:
88
            qod_t = custom.pop('qod_type')
89
        elif 'qod' in custom:
90
            qod_v = custom.pop('qod')
91
92
        severity = dict()
93
        if 'severity_vector' in custom:
94
            severity_vector = custom.pop('severity_vector')
95
        else:
96
            severity_vector = custom.pop('cvss_base_vector')
97
        severity['severity_base_vector'] = severity_vector
98
99
        if "CVSS:3" in severity_vector:
100
            severity_type = 'cvss_base_v3'
101
        else:
102
            severity_type = 'cvss_base_v2'
103
        severity['severity_type'] = severity_type
104
105
        if 'severity_date' in custom:
106
            severity['severity_date'] = custom.pop('severity_date')
107
        else:
108
            severity['severity_date'] = vt_creation_time
109
110
        if 'severity_origin' in custom:
111
            severity['severity_origin'] = custom.pop('severity_origin')
112
113
        if name is None:
114
            name = ''
115
116
        vt = {'name': name}
117
        if custom is not None:
118
            vt["custom"] = custom
119
        if vt_params is not None:
120
            vt["vt_params"] = vt_params
121
        if vt_refs is not None:
122
            vt["vt_refs"] = vt_refs
123
        if vt_dependencies is not None:
124
            vt["vt_dependencies"] = vt_dependencies
125
        if vt_creation_time is not None:
126
            vt["creation_time"] = vt_creation_time
127
        if vt_modification_time is not None:
128
            vt["modification_time"] = vt_modification_time
129
        if summary is not None:
130
            vt["summary"] = summary
131
        if impact is not None:
132
            vt["impact"] = impact
133
        if affected is not None:
134
            vt["affected"] = affected
135
        if insight is not None:
136
            vt["insight"] = insight
137
138
        if solution is not None:
139
            vt["solution"] = solution
140
            if solution_t is not None:
141
                vt["solution_type"] = solution_t
142
            if solution_m is not None:
143
                vt["solution_method"] = solution_m
144
145
        if vuldetect is not None:
146
            vt["detection"] = vuldetect
147
148
        if qod_t is not None:
149
            vt["qod_type"] = qod_t
150
        elif qod_v is not None:
151
            vt["qod"] = qod_v
152
153
        if severity is not None:
154
            vt["severities"] = severity
155
156
        return vt
157
158
    def get_notus_driver_oids(self) -> List[str]:
159
        """Return a list of oid corresponding to notus driver which
160
        are considered backend entities and must not be exposed to
161
        the OSP client."""
162
        notus = NotusMetadataHandler()
163
        lsc_families_and_drivers = notus.get_family_driver_linkers()
164
165
        return lsc_families_and_drivers.values()
166
167
    def get_vt_iterator(
168
        self, vt_selection: List[str] = None, details: bool = True
169
    ) -> Iterator[Tuple[str, Dict]]:
170
        """ Yield the vts from the Redis NVTicache. """
171
172
        oids = None
173
        if not vt_selection or details:
174
            vt_collection = dict(self.nvti.get_oids())
175
176
            if not vt_selection:
177
                vt_selection = vt_collection.values()
178
179
            if details:
180
                oids = vt_collection
181
182
        # Notus driver oid list which are not sent.
183
        drivers = self.get_notus_driver_oids()
184
        for vt_id in vt_selection:
185
            if vt_id in drivers:
186
                continue
187
            vt = self.get_single_vt(vt_id, oids)
188
            yield (vt_id, vt)
189
190
    def calculate_vts_collection_hash(self) -> str:
191
        """ Calculate the vts collection sha256 hash. """
192
        m = sha256()  # pylint: disable=invalid-name
193
194
        # Notus driver oid list which are not sent.
195
        drivers = self.get_notus_driver_oids()
196
197
        # for a reproducible hash calculation
198
        # the vts must already be sorted in the dictionary.
199
        for vt_id, vt in self.get_vt_iterator(details=False):
200
            if vt_id in drivers:
201
                continue
202
            param_chain = ""
203
            vt_params = vt.get('vt_params')
204
            if vt_params:
205
                for _, param in sorted(vt_params.items()):
206
                    param_chain += (
207
                        param.get('id')
208
                        + param.get('name')
209
                        + param.get('default')
210
                    )
211
212
            m.update(
213
                (vt_id + vt.get('modification_time')).encode('utf-8')
214
                + param_chain.encode('utf-8')
215
            )
216
217
        return m.hexdigest()
218