Completed
Push — master ( d58222...cd4f3f )
by Juan José
16s queued 12s
created

ospd.vts.Vts.keys()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
# Copyright (C) 2020 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: GPL-2.0-or-later
4
#
5
# This program is free software; you can redistribute it and/or
6
# modify it under the terms of the GNU General Public License
7
# as published by the Free Software Foundation; either version 2
8
# of the License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU General Public License for more details.
14
#
15
# You should have received a copy of the GNU General Public License
16
# along with this program; if not, write to the Free Software
17
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
19
""" Classes for storing VTs
20
"""
21
22
import multiprocessing
23
import re
24
25
from copy import deepcopy
26
from typing import Dict, Any, Type, Iterator, Iterable
27
28
from ospd.errors import OspdError
29
30
DEFAULT_VT_ID_PATTERN = re.compile("[0-9a-zA-Z_\\-:.]{1,80}")
31
32
33
class Vts:
34
    def __init__(
35
        self, storage: Type[Dict] = None, vt_id_pattern=DEFAULT_VT_ID_PATTERN,
36
    ):
37
        self.storage = storage
38
39
        self.vt_id_pattern = vt_id_pattern
40
        self._vts = None
41
42
    def __contains__(self, key: str) -> bool:
43
        return key in self._vts
44
45
    def __iter__(self) -> Iterator[str]:
46
        if hasattr(self.vts, '__iter__'):
47
            return self.vts.__iter__()
48
49
        # Use iter because python3.5 has no support for
50
        # iteration over DictProxy.
51
        return iter(self.vts.keys())
52
53
    def __getitem__(self, key):
54
        return self.vts[key]
55
56
    def __len__(self) -> int:
57
        return len(self.vts)
58
59
    def __init_vts(self):
60
        if self.storage:
61
            self._vts = self.storage()
62
        else:
63
            self._vts = multiprocessing.Manager().dict()
64
65
    @property
66
    def vts(self) -> Dict[str, Any]:
67
        if self._vts is None:
68
            self.__init_vts()
69
70
        return self._vts
71
72
    def add(
73
        self,
74
        vt_id: str,
75
        name: str = None,
76
        vt_params: str = None,
77
        vt_refs: str = None,
78
        custom: str = None,
79
        vt_creation_time: str = None,
80
        vt_modification_time: str = None,
81
        vt_dependencies: str = None,
82
        summary: str = None,
83
        impact: str = None,
84
        affected: str = None,
85
        insight: str = None,
86
        solution: str = None,
87
        solution_t: str = None,
88
        solution_m: str = None,
89
        detection: str = None,
90
        qod_t: str = None,
91
        qod_v: str = None,
92
        severities: str = None,
93
    ) -> None:
94
        """ Add a vulnerability test information.
95
96
        IMPORTANT: The VT's Data Manager will store the vts collection.
97
        If the collection is considerably big and it will be consultated
98
        intensible during a routine, consider to do a deepcopy(), since
99
        accessing the shared memory in the data manager is very expensive.
100
        At the end of the routine, the temporal copy must be set to None
101
        and deleted.
102
        """
103
        if not vt_id:
104
            raise OspdError('Invalid vt_id {}'.format(vt_id))
105
106
        if self.vt_id_pattern.fullmatch(vt_id) is None:
107
            raise OspdError('Invalid vt_id {}'.format(vt_id))
108
109
        if vt_id in self.vts:
110
            raise OspdError('vt_id {} already exists'.format(vt_id))
111
112
        if name is None:
113
            name = ''
114
115
        vt = {'name': name}
116
        if custom is not None:
117
            vt["custom"] = custom
118
        if vt_params is not None:
119
            vt["vt_params"] = vt_params
120
        if vt_refs is not None:
121
            vt["vt_refs"] = vt_refs
122
        if vt_dependencies is not None:
123
            vt["vt_dependencies"] = vt_dependencies
124
        if vt_creation_time is not None:
125
            vt["creation_time"] = vt_creation_time
126
        if vt_modification_time is not None:
127
            vt["modification_time"] = vt_modification_time
128
        if summary is not None:
129
            vt["summary"] = summary
130
        if impact is not None:
131
            vt["impact"] = impact
132
        if affected is not None:
133
            vt["affected"] = affected
134
        if insight is not None:
135
            vt["insight"] = insight
136
137
        if solution is not None:
138
            vt["solution"] = solution
139
            if solution_t is not None:
140
                vt["solution_type"] = solution_t
141
            if solution_m is not None:
142
                vt["solution_method"] = solution_m
143
144
        if detection is not None:
145
            vt["detection"] = detection
146
147
        if qod_t is not None:
148
            vt["qod_type"] = qod_t
149
        elif qod_v is not None:
150
            vt["qod"] = qod_v
151
152
        if severities is not None:
153
            vt["severities"] = severities
154
155
        self.vts[vt_id] = vt
156
157
    def get(self, vt_id: str) -> Dict[str, Any]:
158
        return self.vts.get(vt_id)
159
160
    def keys(self) -> Iterable[str]:
161
        return self.vts.keys()
162
163
    def clear(self) -> None:
164
        self._vts.clear()
165
        self._vts = None
166
167
    def copy(self) -> "Vts":
168
        copy = Vts(self.storage, vt_id_pattern=self.vt_id_pattern)
169
        copy._vts = deepcopy(self._vts)  # pylint: disable=protected-access
170
        return copy
171