Passed
Pull Request — master (#236)
by Juan José
01:49
created

ospd_openvas.vthelper.VtHelper.__init__()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
""" Provide functions to handle VT Info. """
21
22
from hashlib import sha256
23
from typing import Optional, Dict, List, Tuple, Iterator
24
25
from ospd_openvas.nvticache import NVTICache
26
27
28
class VtHelper:
29
    def __init__(self, nvticache: NVTICache):
30
        self.nvti = nvticache
31
32
    def get_single_vt(self, vt_id: str, oids=None) -> Optional[Dict[str, any]]:
33
        custom = self.nvti.get_nvt_metadata(vt_id)
34
35
        if not custom:
36
            return None
37
38
        vt_params = custom.pop('vt_params')
39
        vt_refs = custom.pop('refs')
40
        name = custom.pop('name')
41
        vt_creation_time = custom.pop('creation_date')
42
        vt_modification_time = custom.pop('last_modification')
43
44
        if oids:
45
            vt_dependencies = list()
46
            if 'dependencies' in custom:
47
                deps = custom.pop('dependencies')
48
                deps_list = deps.split(', ')
49
                for dep in deps_list:
50
                    vt_dependencies.append(oids.get(dep))
51
        else:
52
            vt_dependencies = None
53
54
        summary = None
55
        impact = None
56
        affected = None
57
        insight = None
58
        solution = None
59
        solution_t = None
60
        solution_m = None
61
        vuldetect = None
62
        qod_t = None
63
        qod_v = None
64
65
        if 'summary' in custom:
66
            summary = custom.pop('summary')
67
        if 'impact' in custom:
68
            impact = custom.pop('impact')
69
        if 'affected' in custom:
70
            affected = custom.pop('affected')
71
        if 'insight' in custom:
72
            insight = custom.pop('insight')
73
        if 'solution' in custom:
74
            solution = custom.pop('solution')
75
            if 'solution_type' in custom:
76
                solution_t = custom.pop('solution_type')
77
            if 'solution_method' in custom:
78
                solution_m = custom.pop('solution_method')
79
80
        if 'vuldetect' in custom:
81
            vuldetect = custom.pop('vuldetect')
82
        if 'qod_type' in custom:
83
            qod_t = custom.pop('qod_type')
84
        elif 'qod' in custom:
85
            qod_v = custom.pop('qod')
86
87
        severity = dict()
88
        if 'severity_base_vector' in custom:
89
            severity_vector = custom.pop('severity_base_vector')
90
        else:
91
            severity_vector = custom.pop('cvss_base_vector')
92
        severity['severity_base_vector'] = severity_vector
93
        if 'severity_type' in custom:
94
            severity_type = custom.pop('severity_type')
95
        else:
96
            severity_type = 'cvss_base_v2'
97
        severity['severity_type'] = severity_type
98
        if 'severity_origin' in custom:
99
            severity['severity_origin'] = custom.pop('severity_origin')
100
101
        if name is None:
102
            name = ''
103
104
        vt = {'name': name}
105
        if custom is not None:
106
            vt["custom"] = custom
107
        if vt_params is not None:
108
            vt["vt_params"] = vt_params
109
        if vt_refs is not None:
110
            vt["vt_refs"] = vt_refs
111
        if vt_dependencies is not None:
112
            vt["vt_dependencies"] = vt_dependencies
113
        if vt_creation_time is not None:
114
            vt["creation_time"] = vt_creation_time
115
        if vt_modification_time is not None:
116
            vt["modification_time"] = vt_modification_time
117
        if summary is not None:
118
            vt["summary"] = summary
119
        if impact is not None:
120
            vt["impact"] = impact
121
        if affected is not None:
122
            vt["affected"] = affected
123
        if insight is not None:
124
            vt["insight"] = insight
125
126
        if solution is not None:
127
            vt["solution"] = solution
128
            if solution_t is not None:
129
                vt["solution_type"] = solution_t
130
            if solution_m is not None:
131
                vt["solution_method"] = solution_m
132
133
        if vuldetect is not None:
134
            vt["detection"] = vuldetect
135
136
        if qod_t is not None:
137
            vt["qod_type"] = qod_t
138
        elif qod_v is not None:
139
            vt["qod"] = qod_v
140
141
        if severity is not None:
142
            vt["severities"] = severity
143
144
        return vt
145
146
    def get_vt_iterator(
147
        self, vt_selection: List[str] = None, details: bool = True
148
    ) -> Iterator[Tuple[str, Dict]]:
149
        """ Yield the vts from the Redis NVTicache. """
150
151
        oids = None
152
        if not vt_selection or details:
153
            vt_collection = dict(self.nvti.get_oids())
154
155
            if not vt_selection:
156
                vt_selection = vt_collection.values()
157
158
            if details:
159
                oids = vt_collection
160
161
        for vt_id in vt_selection:
162
            vt = self.get_single_vt(vt_id, oids)
163
            yield (vt_id, vt)
164
165
    def calculate_vts_collection_hash(self) -> str:
166
        """ Calculate the vts collection sha256 hash. """
167
        m = sha256()  # pylint: disable=invalid-name
168
169
        # for a reproducable hash calculation
170
        # the vts must already be sorted in the dictionary.
171
        for vt_id, vt in self.get_vt_iterator(details=False):
172
            param_chain = ""
173
            vt_params = vt.get('vt_params')
174
            if vt_params:
175
                for _, param in sorted(vt_params.items()):
176
                    param_chain += (
177
                        param.get('id')
178
                        + param.get('name')
179
                        + param.get('default')
180
                    )
181
182
            m.update(
183
                (vt_id + vt.get('modification_time')).encode('utf-8')
184
                + param_chain.encode('utf-8')
185
            )
186
187
        return m.hexdigest()
188