ospd_openvas.vthelper.VtHelper.get_single_vt()   F
last analyzed

Complexity

Conditions 38

Size

Total Lines 124
Code Lines 102

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 38
eloc 102
nop 3
dl 0
loc 124
rs 0
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like ospd_openvas.vthelper.VtHelper.get_single_vt() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
""" Provide functions to handle VT Info. """
21
22
from hashlib import sha256
23
from typing import Optional, Dict, List, Tuple, Iterator
24
25
from ospd.cvss import CVSS
26
from ospd_openvas.nvticache import NVTICache
27
28
29
class VtHelper:
30
    def __init__(self, nvticache: NVTICache):
31
        self.nvti = nvticache
32
33
    def get_single_vt(self, vt_id: str, oids=None) -> Optional[Dict[str, any]]:
34
        custom = self.nvti.get_nvt_metadata(vt_id)
35
36
        if not custom:
37
            return None
38
39
        vt_params = custom.pop('vt_params')
40
        vt_refs = custom.pop('refs')
41
        name = custom.pop('name')
42
        vt_creation_time = custom.pop('creation_date')
43
        vt_modification_time = custom.pop('last_modification')
44
45
        if oids:
46
            vt_dependencies = list()
47
            if 'dependencies' in custom:
48
                deps = custom.pop('dependencies')
49
                deps_list = deps.split(', ')
50
                for dep_name in deps_list:
51
                    dep_oid = oids.get(dep_name)
52
                    if dep_oid:
53
                        vt_dependencies.append(dep_oid)
54
                    else:
55
                        vt_dependencies.append(dep_name)
56
        else:
57
            vt_dependencies = None
58
59
        summary = None
60
        impact = None
61
        affected = None
62
        insight = None
63
        solution = None
64
        solution_t = None
65
        solution_m = None
66
        vuldetect = None
67
        qod_t = None
68
        qod_v = None
69
70
        if 'summary' in custom:
71
            summary = custom.pop('summary')
72
        if 'impact' in custom:
73
            impact = custom.pop('impact')
74
        if 'affected' in custom:
75
            affected = custom.pop('affected')
76
        if 'insight' in custom:
77
            insight = custom.pop('insight')
78
        if 'solution' in custom:
79
            solution = custom.pop('solution')
80
            if 'solution_type' in custom:
81
                solution_t = custom.pop('solution_type')
82
            if 'solution_method' in custom:
83
                solution_m = custom.pop('solution_method')
84
85
        if 'vuldetect' in custom:
86
            vuldetect = custom.pop('vuldetect')
87
        if 'qod_type' in custom:
88
            qod_t = custom.pop('qod_type')
89
        elif 'qod' in custom:
90
            qod_v = custom.pop('qod')
91
92
        severity = dict()
93
        if 'severity_vector' in custom:
94
            severity_vector = custom.pop('severity_vector')
95
        else:
96
            severity_vector = custom.pop('cvss_base_vector')
97
        severity['severity_base_vector'] = severity_vector
98
99
        if "CVSS:3" in severity_vector:
100
            severity_type = 'cvss_base_v3'
101
        else:
102
            severity_type = 'cvss_base_v2'
103
        severity['severity_type'] = severity_type
104
105
        if 'severity_date' in custom:
106
            severity['severity_date'] = custom.pop('severity_date')
107
        else:
108
            severity['severity_date'] = vt_creation_time
109
110
        if 'severity_origin' in custom:
111
            severity['severity_origin'] = custom.pop('severity_origin')
112
113
        if name is None:
114
            name = ''
115
116
        vt = {'name': name}
117
        if custom is not None:
118
            vt["custom"] = custom
119
        if vt_params is not None:
120
            vt["vt_params"] = vt_params
121
        if vt_refs is not None:
122
            vt["vt_refs"] = vt_refs
123
        if vt_dependencies is not None:
124
            vt["vt_dependencies"] = vt_dependencies
125
        if vt_creation_time is not None:
126
            vt["creation_time"] = vt_creation_time
127
        if vt_modification_time is not None:
128
            vt["modification_time"] = vt_modification_time
129
        if summary is not None:
130
            vt["summary"] = summary
131
        if impact is not None:
132
            vt["impact"] = impact
133
        if affected is not None:
134
            vt["affected"] = affected
135
        if insight is not None:
136
            vt["insight"] = insight
137
138
        if solution is not None:
139
            vt["solution"] = solution
140
            if solution_t is not None:
141
                vt["solution_type"] = solution_t
142
            if solution_m is not None:
143
                vt["solution_method"] = solution_m
144
145
        if vuldetect is not None:
146
            vt["detection"] = vuldetect
147
148
        if qod_t is not None:
149
            vt["qod_type"] = qod_t
150
        elif qod_v is not None:
151
            vt["qod"] = qod_v
152
153
        if severity is not None:
154
            vt["severities"] = severity
155
156
        return vt
157
158
    def get_vt_iterator(
159
        self, vt_selection: List[str] = None, details: bool = True
160
    ) -> Iterator[Tuple[str, Dict]]:
161
        """Yield the vts from the Redis NVTicache."""
162
163
        oids = None
164
        if not vt_selection or details:
165
            vt_collection = dict(self.nvti.get_oids())
166
167
            if not vt_selection:
168
                vt_selection = vt_collection.values()
169
170
            if details:
171
                oids = vt_collection
172
173
        for vt_id in vt_selection:
174
            vt = self.get_single_vt(vt_id, oids)
175
            yield (vt_id, vt)
176
177
    def calculate_vts_collection_hash(self) -> str:
178
        """Calculate the vts collection sha256 hash."""
179
        m = sha256()  # pylint: disable=invalid-name
180
181
        # for a reproducible hash calculation
182
        # the vts must already be sorted in the dictionary.
183
        for vt_id, vt in self.get_vt_iterator(details=False):
184
            param_chain = ""
185
            vt_params = vt.get('vt_params')
186
            if vt_params:
187
                for _, param in sorted(vt_params.items()):
188
                    param_chain += (
189
                        param.get('id')
190
                        + param.get('name')
191
                        + param.get('default')
192
                    )
193
194
            m.update(
195
                (vt_id + vt.get('modification_time')).encode('utf-8')
196
                + param_chain.encode('utf-8')
197
            )
198
199
        return m.hexdigest()
200
201
    def get_severity_score(self, vt_aux: dict) -> Optional[float]:
202
        """Return the severity score for the given oid.
203
        Arguments:
204
            vt_aux: VT element from which to get the severity vector
205
        Returns:
206
            The calculated cvss base value. None if there is no severity
207
            vector or severity type is not cvss base version 2.
208
        """
209
        if vt_aux:
210
            severity_type = vt_aux['severities'].get('severity_type')
211
            severity_vector = vt_aux['severities'].get('severity_base_vector')
212
213
            if severity_type == "cvss_base_v2" and severity_vector:
214
                return CVSS.cvss_base_v2_value(severity_vector)
215
            elif severity_type == "cvss_base_v3" and severity_vector:
216
                return CVSS.cvss_base_v3_value(severity_vector)
217
218
        return None
219