Completed
Push — master ( dc472d...f45e49 )
by Juan José
14s queued 12s
created

ospd.vts.Vts.get()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
# Copyright (C) 2020 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: GPL-2.0-or-later
4
#
5
# This program is free software; you can redistribute it and/or
6
# modify it under the terms of the GNU General Public License
7
# as published by the Free Software Foundation; either version 2
8
# of the License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU General Public License for more details.
14
#
15
# You should have received a copy of the GNU General Public License
16
# along with this program; if not, write to the Free Software
17
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
19
""" Classes for storing VTs
20
"""
21
import logging
22
import multiprocessing
23
from hashlib import sha256
24
import re
25
26
from copy import deepcopy
27
from typing import (
28
    Dict,
29
    Any,
30
    Type,
31
    Iterator,
32
    Iterable,
33
    Tuple,
34
)
35
36
from ospd.errors import OspdError
37
38
logger = logging.getLogger(__name__)
39
40
DEFAULT_VT_ID_PATTERN = re.compile("[0-9a-zA-Z_\\-:.]{1,80}")
41
42
43
class Vts:
44
    def __init__(
45
        self, storage: Type[Dict] = None, vt_id_pattern=DEFAULT_VT_ID_PATTERN,
46
    ):
47
        self.storage = storage
48
49
        self.vt_id_pattern = vt_id_pattern
50
        self._vts = None
51
        self.sha256_hash = None
52
53
    def __contains__(self, key: str) -> bool:
54
        return key in self._vts
55
56
    def __iter__(self) -> Iterator[str]:
57
        if hasattr(self.vts, '__iter__'):
58
            return self.vts.__iter__()
59
60
        # Use iter because python3.5 has no support for
61
        # iteration over DictProxy.
62
        return iter(self.vts.keys())
63
64
    def __getitem__(self, key):
65
        return self.vts[key]
66
67
    def items(self) -> Iterator[Tuple[str, Dict]]:
68
        return iter(self.vts.items())
69
70
    def __len__(self) -> int:
71
        return len(self.vts)
72
73
    def __init_vts(self):
74
        if self.storage:
75
            self._vts = self.storage()
76
        else:
77
            self._vts = multiprocessing.Manager().dict()
78
79
    @property
80
    def vts(self) -> Dict[str, Any]:
81
        if self._vts is None:
82
            self.__init_vts()
83
84
        return self._vts
85
86
    def add(
87
        self,
88
        vt_id: str,
89
        name: str = None,
90
        vt_params: str = None,
91
        vt_refs: str = None,
92
        custom: str = None,
93
        vt_creation_time: str = None,
94
        vt_modification_time: str = None,
95
        vt_dependencies: str = None,
96
        summary: str = None,
97
        impact: str = None,
98
        affected: str = None,
99
        insight: str = None,
100
        solution: str = None,
101
        solution_t: str = None,
102
        solution_m: str = None,
103
        detection: str = None,
104
        qod_t: str = None,
105
        qod_v: str = None,
106
        severities: str = None,
107
    ) -> None:
108
        """ Add a vulnerability test information.
109
110
        IMPORTANT: The VT's Data Manager will store the vts collection.
111
        If the collection is considerably big and it will be consultated
112
        intensible during a routine, consider to do a deepcopy(), since
113
        accessing the shared memory in the data manager is very expensive.
114
        At the end of the routine, the temporal copy must be set to None
115
        and deleted.
116
        """
117
        if not vt_id:
118
            raise OspdError('Invalid vt_id {}'.format(vt_id))
119
120
        if self.vt_id_pattern.fullmatch(vt_id) is None:
121
            raise OspdError('Invalid vt_id {}'.format(vt_id))
122
123
        if vt_id in self.vts:
124
            raise OspdError('vt_id {} already exists'.format(vt_id))
125
126
        if name is None:
127
            name = ''
128
129
        vt = {'name': name}
130
        if custom is not None:
131
            vt["custom"] = custom
132
        if vt_params is not None:
133
            vt["vt_params"] = vt_params
134
        if vt_refs is not None:
135
            vt["vt_refs"] = vt_refs
136
        if vt_dependencies is not None:
137
            vt["vt_dependencies"] = vt_dependencies
138
        if vt_creation_time is not None:
139
            vt["creation_time"] = vt_creation_time
140
        if vt_modification_time is not None:
141
            vt["modification_time"] = vt_modification_time
142
        if summary is not None:
143
            vt["summary"] = summary
144
        if impact is not None:
145
            vt["impact"] = impact
146
        if affected is not None:
147
            vt["affected"] = affected
148
        if insight is not None:
149
            vt["insight"] = insight
150
151
        if solution is not None:
152
            vt["solution"] = solution
153
            if solution_t is not None:
154
                vt["solution_type"] = solution_t
155
            if solution_m is not None:
156
                vt["solution_method"] = solution_m
157
158
        if detection is not None:
159
            vt["detection"] = detection
160
161
        if qod_t is not None:
162
            vt["qod_type"] = qod_t
163
        elif qod_v is not None:
164
            vt["qod"] = qod_v
165
166
        if severities is not None:
167
            vt["severities"] = severities
168
169
        self.vts[vt_id] = vt
170
171
    def get(self, vt_id: str) -> Dict[str, Any]:
172
        return self.vts.get(vt_id)
173
174
    def keys(self) -> Iterable[str]:
175
        return self.vts.keys()
176
177
    def clear(self) -> None:
178
        self._vts.clear()
179
        self._vts = None
180
181
    def copy(self) -> "Vts":
182
        copy = Vts(self.storage, vt_id_pattern=self.vt_id_pattern)
183
        copy._vts = deepcopy(self._vts)  # pylint: disable=protected-access
184
        return copy
185
186
    def calculate_vts_collection_hash(self, include_vt_params: bool = True):
187
        """ Calculate the vts collection sha256 hash. """
188
        if not self._vts:
189
            logger.debug(
190
                "Error calculating VTs collection hash. Cache is empty"
191
            )
192
            return
193
194
        m = sha256()
195
196
        for vt_id, vt in sorted(self._vts.items()):
197
            param_chain = ""
198
            vt_params = vt.get('vt_params')
199
            if include_vt_params and vt_params:
200
                for _, param in sorted(vt_params.items()):
201
                    param_chain += (
202
                        param.get('id')
203
                        + param.get('name')
204
                        + param.get('default')
205
                    )
206
207
            m.update(
208
                (vt_id + vt.get('modification_time')).encode('utf-8')
209
                + param_chain.encode('utf-8')
210
            )
211
212
        self.sha256_hash = m.hexdigest()
213