Completed
Push — master ( 8945f9...62acd1 )
by Juan José
22s queued 14s
created

ospd.vts.Vts.__iter__()   A

Complexity

Conditions 2

Size

Total Lines 7
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 4
nop 1
dl 0
loc 7
rs 10
c 0
b 0
f 0
1
# Copyright (C) 2020 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: GPL-2.0-or-later
4
#
5
# This program is free software; you can redistribute it and/or
6
# modify it under the terms of the GNU General Public License
7
# as published by the Free Software Foundation; either version 2
8
# of the License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU General Public License for more details.
14
#
15
# You should have received a copy of the GNU General Public License
16
# along with this program; if not, write to the Free Software
17
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
19
""" Classes for storing VTs
20
"""
21
22
import multiprocessing
23
import re
24
25
from copy import deepcopy
26
from typing import Dict, Any, Type, Iterator, Iterable
27
28
from ospd.errors import OspdError
29
30
DEFAULT_VT_ID_PATTERN = re.compile("[0-9a-zA-Z_\\-:.]{1,80}")
31
32
33
class Vts:
34
    def __init__(
35
        self, storage: Type[Dict] = None, vt_id_pattern=DEFAULT_VT_ID_PATTERN,
36
    ):
37
        self.storage = storage
38
39
        self.vt_id_pattern = vt_id_pattern
40
        self._vts = None
41
42
    def __contains__(self, key: str) -> bool:
43
        return key in self._vts
44
45
    def __iter__(self) -> Iterator[str]:
46
        if hasattr(self.vts, '__iter__'):
47
            return self.vts.__iter__()
48
49
        # Use iter because python3.5 has no support for
50
        # iteration over DictProxy.
51
        return iter(self.vts.keys())
52
53
    def __getitem__(self, key):
54
        return self.vts[key]
55
56
    def __init_vts(self):
57
        if self.storage:
58
            self._vts = self.storage()
59
        else:
60
            self._vts = multiprocessing.Manager().dict()
61
62
    @property
63
    def vts(self) -> Dict[str, Any]:
64
        if self._vts is None:
65
            self.__init_vts()
66
67
        return self._vts
68
69
    def add(
70
        self,
71
        vt_id: str,
72
        name: str = None,
73
        vt_params: str = None,
74
        vt_refs: str = None,
75
        custom: str = None,
76
        vt_creation_time: str = None,
77
        vt_modification_time: str = None,
78
        vt_dependencies: str = None,
79
        summary: str = None,
80
        impact: str = None,
81
        affected: str = None,
82
        insight: str = None,
83
        solution: str = None,
84
        solution_t: str = None,
85
        solution_m: str = None,
86
        detection: str = None,
87
        qod_t: str = None,
88
        qod_v: str = None,
89
        severities: str = None,
90
    ) -> None:
91
        """ Add a vulnerability test information.
92
93
        IMPORTANT: The VT's Data Manager will store the vts collection.
94
        If the collection is considerably big and it will be consultated
95
        intensible during a routine, consider to do a deepcopy(), since
96
        accessing the shared memory in the data manager is very expensive.
97
        At the end of the routine, the temporal copy must be set to None
98
        and deleted.
99
        """
100
        if not vt_id:
101
            raise OspdError('Invalid vt_id {}'.format(vt_id))
102
103
        if self.vt_id_pattern.fullmatch(vt_id) is None:
104
            raise OspdError('Invalid vt_id {}'.format(vt_id))
105
106
        if vt_id in self.vts:
107
            raise OspdError('vt_id {} already exists'.format(vt_id))
108
109
        if name is None:
110
            name = ''
111
112
        vt = {'name': name}
113
        if custom is not None:
114
            vt["custom"] = custom
115
        if vt_params is not None:
116
            vt["vt_params"] = vt_params
117
        if vt_refs is not None:
118
            vt["vt_refs"] = vt_refs
119
        if vt_dependencies is not None:
120
            vt["vt_dependencies"] = vt_dependencies
121
        if vt_creation_time is not None:
122
            vt["creation_time"] = vt_creation_time
123
        if vt_modification_time is not None:
124
            vt["modification_time"] = vt_modification_time
125
        if summary is not None:
126
            vt["summary"] = summary
127
        if impact is not None:
128
            vt["impact"] = impact
129
        if affected is not None:
130
            vt["affected"] = affected
131
        if insight is not None:
132
            vt["insight"] = insight
133
134
        if solution is not None:
135
            vt["solution"] = solution
136
            if solution_t is not None:
137
                vt["solution_type"] = solution_t
138
            if solution_m is not None:
139
                vt["solution_method"] = solution_m
140
141
        if detection is not None:
142
            vt["detection"] = detection
143
144
        if qod_t is not None:
145
            vt["qod_type"] = qod_t
146
        elif qod_v is not None:
147
            vt["qod"] = qod_v
148
149
        if severities is not None:
150
            vt["severities"] = severities
151
152
        self.vts[vt_id] = vt
153
154
    def get(self, vt_id: str) -> Dict[str, Any]:
155
        return self.vts.get(vt_id)
156
157
    def keys(self) -> Iterable[str]:
158
        return self.vts.keys()
159
160
    def clear(self) -> None:
161
        self._vts.clear()
162
        self._vts = None
163
164
    def copy(self) -> "Vts":
165
        copy = Vts(self.storage, vt_id_pattern=self.vt_id_pattern)
166
        copy._vts = deepcopy(self._vts)  # pylint: disable=protected-access
167
        return copy
168