tests.helper   A
last analyzed

Complexity

Total Complexity 30

Size/Duplication

Total Lines 223
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 30
eloc 138
dl 0
loc 223
rs 10
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
A assert_called() 0 11 3

22 Methods

Rating   Name   Duplication   Size   Complexity  
A FakeDataManager.__init__() 0 2 1
A DummyWrapper.get_summary_vt_as_xml_str() 0 5 1
A DummyWrapper.get_refs_vt_as_xml_str() 0 7 1
A DummyWrapper.get_insight_vt_as_xml_str() 0 5 1
A FakeDataManager.dict() 0 2 1
A DummyWrapper.get_dependencies_vt_as_xml_str() 0 10 1
A DummyWrapper.__init__() 0 7 1
A DummyWrapper.get_detection_vt_as_xml_str() 0 7 1
A FakeStream.get_response() 0 2 1
A DummyWrapper.get_creation_time_vt_as_xml_str() 0 7 1
A DummyWrapper.get_affected_vt_as_xml_str() 0 5 1
A DummyWrapper.get_severities_vt_as_xml_str() 0 9 1
A DummyWrapper.get_impact_vt_as_xml_str() 0 5 1
A FakeStream.__init__() 0 3 1
A DummyWrapper.get_params_vt_as_xml_str() 0 4 1
A DummyWrapper.get_modification_time_vt_as_xml_str() 0 9 1
A DummyWrapper.check() 0 2 1
A FakeStream.write() 0 3 1
A DummyWrapper.get_custom_vt_as_xml_str() 0 3 1
A DummyWrapper.get_solution_vt_as_xml_str() 0 7 1
A FakePsutil.__init__() 0 2 1
B DummyWrapper.exec_scan() 0 43 6
1
# Copyright (C) 2014-2021 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: AGPL-3.0-or-later
4
#
5
# This program is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Affero General Public License as
7
# published by the Free Software Foundation, either version 3 of the
8
# License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
import time
19
20
from unittest.mock import Mock
21
22
from xml.etree import ElementTree as et
23
24
from ospd.ospd import OSPDaemon
25
26
27
def assert_called(mock: Mock):
28
    if hasattr(mock, 'assert_called'):
29
        return mock.assert_called()
30
31
    if not mock.call_count == 1:
32
        msg = "Expected '%s' to have been called once. Called %s times.%s" % (
33
            mock._mock_name or 'mock',  # pylint: disable=protected-access
34
            mock.call_count,
35
            mock._calls_repr(),  # pylint: disable=protected-access
36
        )
37
        raise AssertionError(msg)
38
39
40
class FakePsutil:
41
    def __init__(self, available=None):
42
        self.available = available
43
44
45
class FakeStream:
46
    def __init__(self, return_value=True):
47
        self.response = b''
48
        self.return_value = return_value
49
50
    def write(self, data):
51
        self.response = self.response + data
52
        return self.return_value
53
54
    def get_response(self):
55
        return et.fromstring(self.response)
56
57
58
class FakeDataManager:
59
    def __init__(self):
60
        pass
61
62
    def dict(self):
63
        return dict()
64
65
66
class DummyWrapper(OSPDaemon):
67
    def __init__(self, results, checkresult=True):
68
        super().__init__()
69
        self.checkresult = checkresult
70
        self.results = results
71
        self.initialized = True
72
        self.scan_collection.data_manager = FakeDataManager()
73
        self.scan_collection.file_storage_dir = '/tmp'
74
75
    def check(self):
76
        return self.checkresult
77
78
    @staticmethod
79
    def get_custom_vt_as_xml_str(vt_id, custom):
80
        return '<custom><mytest>static test</mytest></custom>'
81
82
    @staticmethod
83
    def get_params_vt_as_xml_str(vt_id, vt_params):
84
        return (
85
            '<params><param id="abc" type="string">'
86
            '<name>ABC</name><description>Test ABC</description>'
87
            '<default>yes</default></param>'
88
            '<param id="def" type="string">'
89
            '<name>DEF</name><description>Test DEF</description>'
90
            '<default>no</default></param></params>'
91
        )
92
93
    @staticmethod
94
    def get_refs_vt_as_xml_str(vt_id, vt_refs):
95
        response = (
96
            '<refs><ref type="cve" id="CVE-2010-4480"/>'
97
            '<ref type="url" id="http://example.com"/></refs>'
98
        )
99
        return response
100
101
    @staticmethod
102
    def get_dependencies_vt_as_xml_str(vt_id, vt_dependencies):
103
        response = (
104
            '<dependencies>'
105
            '<dependency vt_id="1.3.6.1.4.1.25623.1.0.50282" />'
106
            '<dependency vt_id="1.3.6.1.4.1.25623.1.0.50283" />'
107
            '</dependencies>'
108
        )
109
110
        return response
111
112
    @staticmethod
113
    def get_severities_vt_as_xml_str(vt_id, severities):
114
        response = (
115
            '<severities><severity cvss_base="5.0" cvss_'
116
            'type="cvss_base_v2">AV:N/AC:L/Au:N/C:N/I:N/'
117
            'A:P</severity></severities>'
118
        )
119
120
        return response
121
122
    @staticmethod
123
    def get_detection_vt_as_xml_str(
124
        vt_id, detection=None, qod_type=None, qod=None
125
    ):
126
        response = '<detection qod_type="package">some detection</detection>'
127
128
        return response
129
130
    @staticmethod
131
    def get_summary_vt_as_xml_str(vt_id, summary):
132
        response = '<summary>Some summary</summary>'
133
134
        return response
135
136
    @staticmethod
137
    def get_affected_vt_as_xml_str(vt_id, affected):
138
        response = '<affected>Some affected</affected>'
139
140
        return response
141
142
    @staticmethod
143
    def get_impact_vt_as_xml_str(vt_id, impact):
144
        response = '<impact>Some impact</impact>'
145
146
        return response
147
148
    @staticmethod
149
    def get_insight_vt_as_xml_str(vt_id, insight):
150
        response = '<insight>Some insight</insight>'
151
152
        return response
153
154
    @staticmethod
155
    def get_solution_vt_as_xml_str(
156
        vt_id, solution, solution_type=None, solution_method=None
157
    ):
158
        response = '<solution>Some solution</solution>'
159
160
        return response
161
162
    @staticmethod
163
    def get_creation_time_vt_as_xml_str(
164
        vt_id, vt_creation_time
165
    ):  # pylint: disable=arguments-differ
166
        response = '<creation_time>%s</creation_time>' % vt_creation_time
167
168
        return response
169
170
    @staticmethod
171
    def get_modification_time_vt_as_xml_str(
172
        vt_id, vt_modification_time
173
    ):  # pylint: disable=arguments-differ
174
        response = (
175
            '<modification_time>%s</modification_time>' % vt_modification_time
176
        )
177
178
        return response
179
180
    def exec_scan(self, scan_id):
181
        time.sleep(0.01)
182
        for res in self.results:
183
            if res.result_type == 'log':
184
                self.add_scan_log(
185
                    scan_id,
186
                    res.host,
187
                    res.hostname,
188
                    res.name,
189
                    res.value,
190
                    res.port,
191
                )
192
            if res.result_type == 'error':
193
                self.add_scan_error(
194
                    scan_id,
195
                    res.host,
196
                    res.hostname,
197
                    res.name,
198
                    res.value,
199
                    res.port,
200
                )
201
            elif res.result_type == 'host-detail':
202
                self.add_scan_host_detail(
203
                    scan_id,
204
                    res.host,
205
                    res.hostname,
206
                    res.name,
207
                    res.value,
208
                )
209
            elif res.result_type == 'alarm':
210
                self.add_scan_alarm(
211
                    scan_id,
212
                    res.host,
213
                    res.hostname,
214
                    res.name,
215
                    res.value,
216
                    res.port,
217
                    res.test_id,
218
                    res.severity,
219
                    res.qod,
220
                )
221
            else:
222
                raise ValueError(res.result_type)
223