Passed
Pull Request — master (#367)
by Juan José
01:29
created

ospd_openvas.vthelper   B

Complexity

Total Complexity 51

Size/Duplication

Total Lines 214
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 145
dl 0
loc 214
rs 7.92
c 0
b 0
f 0
wmc 51

5 Methods

Rating   Name   Duplication   Size   Complexity  
A VtHelper.__init__() 0 2 1
A VtHelper.calculate_vts_collection_hash() 0 28 5
F VtHelper.get_single_vt() 0 120 37
B VtHelper.get_vt_iterator() 0 22 7
A VtHelper.get_notus_driver_oids() 0 8 1

How to fix   Complexity   

Complexity

Complex classes like ospd_openvas.vthelper often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
""" Provide functions to handle VT Info. """
21
22
from hashlib import sha256
23
from typing import Optional, Dict, List, Tuple, Iterator
24
25
from ospd_openvas.nvticache import NVTICache
26
from ospd_openvas.notus.metadata import NotusMetadataHandler
27
28
29
class VtHelper:
30
    def __init__(self, nvticache: NVTICache):
31
        self.nvti = nvticache
32
33
    def get_single_vt(self, vt_id: str, oids=None) -> Optional[Dict[str, any]]:
34
        custom = self.nvti.get_nvt_metadata(vt_id)
35
36
        if not custom:
37
            return None
38
39
        vt_params = custom.pop('vt_params')
40
        vt_refs = custom.pop('refs')
41
        name = custom.pop('name')
42
        vt_creation_time = custom.pop('creation_date')
43
        vt_modification_time = custom.pop('last_modification')
44
45
        if oids:
46
            vt_dependencies = list()
47
            if 'dependencies' in custom:
48
                deps = custom.pop('dependencies')
49
                deps_list = deps.split(', ')
50
                for dep in deps_list:
51
                    vt_dependencies.append(oids.get(dep))
52
        else:
53
            vt_dependencies = None
54
55
        summary = None
56
        impact = None
57
        affected = None
58
        insight = None
59
        solution = None
60
        solution_t = None
61
        solution_m = None
62
        vuldetect = None
63
        qod_t = None
64
        qod_v = None
65
66
        if 'summary' in custom:
67
            summary = custom.pop('summary')
68
        if 'impact' in custom:
69
            impact = custom.pop('impact')
70
        if 'affected' in custom:
71
            affected = custom.pop('affected')
72
        if 'insight' in custom:
73
            insight = custom.pop('insight')
74
        if 'solution' in custom:
75
            solution = custom.pop('solution')
76
            if 'solution_type' in custom:
77
                solution_t = custom.pop('solution_type')
78
            if 'solution_method' in custom:
79
                solution_m = custom.pop('solution_method')
80
81
        if 'vuldetect' in custom:
82
            vuldetect = custom.pop('vuldetect')
83
        if 'qod_type' in custom:
84
            qod_t = custom.pop('qod_type')
85
        elif 'qod' in custom:
86
            qod_v = custom.pop('qod')
87
88
        severity = dict()
89
        if 'severity_vector' in custom:
90
            severity_vector = custom.pop('severity_vector')
91
        else:
92
            severity_vector = custom.pop('cvss_base_vector')
93
        severity['severity_base_vector'] = severity_vector
94
95
        if "CVSS:3" in severity_vector:
96
            severity_type = 'cvss_base_v3'
97
        else:
98
            severity_type = 'cvss_base_v2'
99
        severity['severity_type'] = severity_type
100
101
        if 'severity_date' in custom:
102
            severity['severity_date'] = custom.pop('severity_date')
103
        else:
104
            severity['severity_date'] = vt_creation_time
105
106
        if 'severity_origin' in custom:
107
            severity['severity_origin'] = custom.pop('severity_origin')
108
109
        if name is None:
110
            name = ''
111
112
        vt = {'name': name}
113
        if custom is not None:
114
            vt["custom"] = custom
115
        if vt_params is not None:
116
            vt["vt_params"] = vt_params
117
        if vt_refs is not None:
118
            vt["vt_refs"] = vt_refs
119
        if vt_dependencies is not None:
120
            vt["vt_dependencies"] = vt_dependencies
121
        if vt_creation_time is not None:
122
            vt["creation_time"] = vt_creation_time
123
        if vt_modification_time is not None:
124
            vt["modification_time"] = vt_modification_time
125
        if summary is not None:
126
            vt["summary"] = summary
127
        if impact is not None:
128
            vt["impact"] = impact
129
        if affected is not None:
130
            vt["affected"] = affected
131
        if insight is not None:
132
            vt["insight"] = insight
133
134
        if solution is not None:
135
            vt["solution"] = solution
136
            if solution_t is not None:
137
                vt["solution_type"] = solution_t
138
            if solution_m is not None:
139
                vt["solution_method"] = solution_m
140
141
        if vuldetect is not None:
142
            vt["detection"] = vuldetect
143
144
        if qod_t is not None:
145
            vt["qod_type"] = qod_t
146
        elif qod_v is not None:
147
            vt["qod"] = qod_v
148
149
        if severity is not None:
150
            vt["severities"] = severity
151
152
        return vt
153
154
    def get_notus_driver_oids(self) -> List[str]:
155
        """Return a list of oid corresponding to notus driver which
156
        are considered backend entities and must not be exposed to
157
        the OSP client."""
158
        notus = NotusMetadataHandler()
159
        lsc_families_and_drivers = notus.get_family_driver_linkers()
160
161
        return lsc_families_and_drivers.values()
162
163
    def get_vt_iterator(
164
        self, vt_selection: List[str] = None, details: bool = True
165
    ) -> Iterator[Tuple[str, Dict]]:
166
        """ Yield the vts from the Redis NVTicache. """
167
168
        oids = None
169
        if not vt_selection or details:
170
            vt_collection = dict(self.nvti.get_oids())
171
172
            if not vt_selection:
173
                vt_selection = vt_collection.values()
174
175
            if details:
176
                oids = vt_collection
177
178
        # Notus driver oid list which are not sent.
179
        drivers = self.get_notus_driver_oids()
180
        for vt_id in vt_selection:
181
            if vt_id in drivers:
182
                continue
183
            vt = self.get_single_vt(vt_id, oids)
184
            yield (vt_id, vt)
185
186
    def calculate_vts_collection_hash(self) -> str:
187
        """ Calculate the vts collection sha256 hash. """
188
        m = sha256()  # pylint: disable=invalid-name
189
190
        # Notus driver oid list which are not sent.
191
        drivers = self.get_notus_driver_oids()
192
193
        # for a reproducible hash calculation
194
        # the vts must already be sorted in the dictionary.
195
        for vt_id, vt in self.get_vt_iterator(details=False):
196
            if vt_id in drivers:
197
                continue
198
            param_chain = ""
199
            vt_params = vt.get('vt_params')
200
            if vt_params:
201
                for _, param in sorted(vt_params.items()):
202
                    param_chain += (
203
                        param.get('id')
204
                        + param.get('name')
205
                        + param.get('default')
206
                    )
207
208
            m.update(
209
                (vt_id + vt.get('modification_time')).encode('utf-8')
210
                + param_chain.encode('utf-8')
211
            )
212
213
        return m.hexdigest()
214