Passed
Pull Request — master (#232)
by Juan José
01:24
created

ospd.vts.Vts.__init_vts()   A

Complexity

Conditions 2

Size

Total Lines 5
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 4
nop 1
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
# Copyright (C) 2020 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: GPL-2.0-or-later
4
#
5
# This program is free software; you can redistribute it and/or
6
# modify it under the terms of the GNU General Public License
7
# as published by the Free Software Foundation; either version 2
8
# of the License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU General Public License for more details.
14
#
15
# You should have received a copy of the GNU General Public License
16
# along with this program; if not, write to the Free Software
17
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
19
""" Classes for storing VTs
20
"""
21
22
import multiprocessing
23
from hashlib import sha256
24
import re
25
26
from copy import deepcopy
27
from typing import (
28
    Dict,
29
    Any,
30
    Type,
31
    Iterator,
32
    Iterable,
33
    Tuple,
34
)
35
36
from ospd.errors import OspdError
37
38
DEFAULT_VT_ID_PATTERN = re.compile("[0-9a-zA-Z_\\-:.]{1,80}")
39
40
41
class Vts:
42
    def __init__(
43
        self, storage: Type[Dict] = None, vt_id_pattern=DEFAULT_VT_ID_PATTERN,
44
    ):
45
        self.storage = storage
46
47
        self.vt_id_pattern = vt_id_pattern
48
        self._vts = None
49
        self.sha256_hash = None
50
51
    def __contains__(self, key: str) -> bool:
52
        return key in self._vts
53
54
    def __iter__(self) -> Iterator[str]:
55
        if hasattr(self.vts, '__iter__'):
56
            return self.vts.__iter__()
57
58
        # Use iter because python3.5 has no support for
59
        # iteration over DictProxy.
60
        return iter(self.vts.keys())
61
62
    def __getitem__(self, key):
63
        return self.vts[key]
64
65
    def items(self) -> Iterator[Tuple[str, Dict]]:
66
        return iter(self.vts.items())
67
68
    def __len__(self) -> int:
69
        return len(self.vts)
70
71
    def __init_vts(self):
72
        if self.storage:
73
            self._vts = self.storage()
74
        else:
75
            self._vts = multiprocessing.Manager().dict()
76
77
    @property
78
    def vts(self) -> Dict[str, Any]:
79
        if self._vts is None:
80
            self.__init_vts()
81
82
        return self._vts
83
84
    def add(
85
        self,
86
        vt_id: str,
87
        name: str = None,
88
        vt_params: str = None,
89
        vt_refs: str = None,
90
        custom: str = None,
91
        vt_creation_time: str = None,
92
        vt_modification_time: str = None,
93
        vt_dependencies: str = None,
94
        summary: str = None,
95
        impact: str = None,
96
        affected: str = None,
97
        insight: str = None,
98
        solution: str = None,
99
        solution_t: str = None,
100
        solution_m: str = None,
101
        detection: str = None,
102
        qod_t: str = None,
103
        qod_v: str = None,
104
        severities: str = None,
105
    ) -> None:
106
        """ Add a vulnerability test information.
107
108
        IMPORTANT: The VT's Data Manager will store the vts collection.
109
        If the collection is considerably big and it will be consultated
110
        intensible during a routine, consider to do a deepcopy(), since
111
        accessing the shared memory in the data manager is very expensive.
112
        At the end of the routine, the temporal copy must be set to None
113
        and deleted.
114
        """
115
        if not vt_id:
116
            raise OspdError('Invalid vt_id {}'.format(vt_id))
117
118
        if self.vt_id_pattern.fullmatch(vt_id) is None:
119
            raise OspdError('Invalid vt_id {}'.format(vt_id))
120
121
        if vt_id in self.vts:
122
            raise OspdError('vt_id {} already exists'.format(vt_id))
123
124
        if name is None:
125
            name = ''
126
127
        vt = {'name': name}
128
        if custom is not None:
129
            vt["custom"] = custom
130
        if vt_params is not None:
131
            vt["vt_params"] = vt_params
132
        if vt_refs is not None:
133
            vt["vt_refs"] = vt_refs
134
        if vt_dependencies is not None:
135
            vt["vt_dependencies"] = vt_dependencies
136
        if vt_creation_time is not None:
137
            vt["creation_time"] = vt_creation_time
138
        if vt_modification_time is not None:
139
            vt["modification_time"] = vt_modification_time
140
        if summary is not None:
141
            vt["summary"] = summary
142
        if impact is not None:
143
            vt["impact"] = impact
144
        if affected is not None:
145
            vt["affected"] = affected
146
        if insight is not None:
147
            vt["insight"] = insight
148
149
        if solution is not None:
150
            vt["solution"] = solution
151
            if solution_t is not None:
152
                vt["solution_type"] = solution_t
153
            if solution_m is not None:
154
                vt["solution_method"] = solution_m
155
156
        if detection is not None:
157
            vt["detection"] = detection
158
159
        if qod_t is not None:
160
            vt["qod_type"] = qod_t
161
        elif qod_v is not None:
162
            vt["qod"] = qod_v
163
164
        if severities is not None:
165
            vt["severities"] = severities
166
167
        self.vts[vt_id] = vt
168
169
    def get(self, vt_id: str) -> Dict[str, Any]:
170
        return self.vts.get(vt_id)
171
172
    def keys(self) -> Iterable[str]:
173
        return self.vts.keys()
174
175
    def clear(self) -> None:
176
        self._vts.clear()
177
        self._vts = None
178
179
    def copy(self) -> "Vts":
180
        copy = Vts(self.storage, vt_id_pattern=self.vt_id_pattern)
181
        copy._vts = deepcopy(self._vts)  # pylint: disable=protected-access
182
        return copy
183
184
    def calculate_vts_collection_hash(self):
185
        """ Calculate the vts collection sha256 hash. """
186
        m = sha256()
187
188
        for vt_id, vt in sorted(self._vts.items()):
189
            m.update((vt_id + vt.get('modification_time')).encode('utf-8'))
190
191
        self.sha256_hash = m.hexdigest()
192