ospd.vts   B
last analyzed

Complexity

Total Complexity 43

Size/Duplication

Total Lines 214
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 142
dl 0
loc 214
rs 8.96
c 0
b 0
f 0
wmc 43

14 Methods

Rating   Name   Duplication   Size   Complexity  
A Vts.items() 0 2 1
A Vts.keys() 0 2 1
A Vts.copy() 0 4 1
A Vts.__init_vts() 0 5 2
A Vts.__getitem__() 0 2 1
A Vts.clear() 0 3 1
A Vts.vts() 0 6 2
B Vts.calculate_vts_collection_hash() 0 29 6
A Vts.get() 0 2 1
A Vts.__len__() 0 2 1
F Vts.add() 0 84 22
A Vts.__init__() 0 12 1
A Vts.__contains__() 0 2 1
A Vts.__iter__() 0 3 2

How to fix   Complexity   

Complexity

Complex classes like ospd.vts often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Copyright (C) 2014-2021 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: AGPL-3.0-or-later
4
#
5
# This program is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Affero General Public License as
7
# published by the Free Software Foundation, either version 3 of the
8
# License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
""" Classes for storing VTs
19
"""
20
import logging
21
import multiprocessing
22
from hashlib import sha256
23
import re
24
25
from copy import deepcopy
26
from typing import (
27
    Dict,
28
    Any,
29
    Type,
30
    Iterator,
31
    Iterable,
32
    Tuple,
33
)
34
35
from ospd.errors import OspdError
36
37
logger = logging.getLogger(__name__)
38
39
DEFAULT_VT_ID_PATTERN = re.compile("[0-9a-zA-Z_\\-:.]{1,80}")
40
41
42
class Vts:
43
    def __init__(
44
        self,
45
        storage: Type[Dict] = None,
46
        vt_id_pattern=DEFAULT_VT_ID_PATTERN,
47
    ):
48
        self.storage = storage
49
50
        self.vt_id_pattern = vt_id_pattern
51
        self._vts = None
52
        self.sha256_hash = None
53
54
        self.is_cache_available = True
55
56
    def __contains__(self, key: str) -> bool:
57
        return key in self._vts
58
59
    def __iter__(self) -> Iterator[str]:
60
        if hasattr(self.vts, '__iter__'):
61
            return self.vts.__iter__()
62
63
    def __getitem__(self, key):
64
        return self.vts[key]
65
66
    def items(self) -> Iterator[Tuple[str, Dict]]:
67
        return iter(self.vts.items())
68
69
    def __len__(self) -> int:
70
        return len(self.vts)
71
72
    def __init_vts(self):
73
        if self.storage:
74
            self._vts = self.storage()
75
        else:
76
            self._vts = multiprocessing.Manager().dict()
77
78
    @property
79
    def vts(self) -> Dict[str, Any]:
80
        if self._vts is None:
81
            self.__init_vts()
82
83
        return self._vts
84
85
    def add(
86
        self,
87
        vt_id: str,
88
        name: str = None,
89
        vt_params: str = None,
90
        vt_refs: str = None,
91
        custom: str = None,
92
        vt_creation_time: str = None,
93
        vt_modification_time: str = None,
94
        vt_dependencies: str = None,
95
        summary: str = None,
96
        impact: str = None,
97
        affected: str = None,
98
        insight: str = None,
99
        solution: str = None,
100
        solution_t: str = None,
101
        solution_m: str = None,
102
        detection: str = None,
103
        qod_t: str = None,
104
        qod_v: str = None,
105
        severities: str = None,
106
    ) -> None:
107
        """Add a vulnerability test information.
108
109
        IMPORTANT: The VT's Data Manager will store the vts collection.
110
        If the collection is considerably big and it will be consultated
111
        intensible during a routine, consider to do a deepcopy(), since
112
        accessing the shared memory in the data manager is very expensive.
113
        At the end of the routine, the temporal copy must be set to None
114
        and deleted.
115
        """
116
        if not vt_id:
117
            raise OspdError('Invalid vt_id {}'.format(vt_id))
118
119
        if self.vt_id_pattern.fullmatch(vt_id) is None:
120
            raise OspdError('Invalid vt_id {}'.format(vt_id))
121
122
        if vt_id in self.vts:
123
            raise OspdError('vt_id {} already exists'.format(vt_id))
124
125
        if name is None:
126
            name = ''
127
128
        vt = {'name': name}
129
        if custom is not None:
130
            vt["custom"] = custom
131
        if vt_params is not None:
132
            vt["vt_params"] = vt_params
133
        if vt_refs is not None:
134
            vt["vt_refs"] = vt_refs
135
        if vt_dependencies is not None:
136
            vt["vt_dependencies"] = vt_dependencies
137
        if vt_creation_time is not None:
138
            vt["creation_time"] = vt_creation_time
139
        if vt_modification_time is not None:
140
            vt["modification_time"] = vt_modification_time
141
        if summary is not None:
142
            vt["summary"] = summary
143
        if impact is not None:
144
            vt["impact"] = impact
145
        if affected is not None:
146
            vt["affected"] = affected
147
        if insight is not None:
148
            vt["insight"] = insight
149
150
        if solution is not None:
151
            vt["solution"] = solution
152
            if solution_t is not None:
153
                vt["solution_type"] = solution_t
154
            if solution_m is not None:
155
                vt["solution_method"] = solution_m
156
157
        if detection is not None:
158
            vt["detection"] = detection
159
160
        if qod_t is not None:
161
            vt["qod_type"] = qod_t
162
        elif qod_v is not None:
163
            vt["qod"] = qod_v
164
165
        if severities is not None:
166
            vt["severities"] = severities
167
168
        self.vts[vt_id] = vt
169
170
    def get(self, vt_id: str) -> Dict[str, Any]:
171
        return self.vts.get(vt_id)
172
173
    def keys(self) -> Iterable[str]:
174
        return self.vts.keys()
175
176
    def clear(self) -> None:
177
        self._vts.clear()
178
        self._vts = None
179
180
    def copy(self) -> "Vts":
181
        copy = Vts(self.storage, vt_id_pattern=self.vt_id_pattern)
182
        copy._vts = deepcopy(self._vts)  # pylint: disable=protected-access
183
        return copy
184
185
    def calculate_vts_collection_hash(self, include_vt_params: bool = True):
186
        """Calculate the vts collection sha256 hash."""
187
        if not self._vts:
188
            logger.debug(
189
                "Error calculating VTs collection hash. Cache is empty"
190
            )
191
            return
192
193
        m = sha256()  # pylint: disable=invalid-name
194
195
        # for a reproducible hash calculation
196
        # the vts must already be sorted in the dictionary.
197
        for vt_id, vt in self.vts.items():
198
            param_chain = ""
199
            vt_params = vt.get('vt_params')
200
            if include_vt_params and vt_params:
201
                for _, param in sorted(vt_params.items()):
202
                    param_chain += (
203
                        param.get('id')
204
                        + param.get('name')
205
                        + param.get('default')
206
                    )
207
208
            m.update(
209
                (vt_id + vt.get('modification_time')).encode('utf-8')
210
                + param_chain.encode('utf-8')
211
            )
212
213
        self.sha256_hash = m.hexdigest()
214