Completed
Push — master ( 1ed431...35dbab )
by
unknown
17s queued 13s
created

tests.helper   A

Complexity

Total Complexity 30

Size/Duplication

Total Lines 218
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 133
dl 0
loc 218
rs 10
c 0
b 0
f 0
wmc 30

1 Function

Rating   Name   Duplication   Size   Complexity  
A assert_called() 0 11 3

22 Methods

Rating   Name   Duplication   Size   Complexity  
A FakeDataManager.__init__() 0 2 1
A DummyWrapper.get_summary_vt_as_xml_str() 0 5 1
A DummyWrapper.get_refs_vt_as_xml_str() 0 7 1
A DummyWrapper.get_insight_vt_as_xml_str() 0 5 1
A FakeDataManager.dict() 0 2 1
A DummyWrapper.get_dependencies_vt_as_xml_str() 0 10 1
A DummyWrapper.__init__() 0 7 1
A DummyWrapper.get_detection_vt_as_xml_str() 0 7 1
A FakeStream.get_response() 0 2 1
A DummyWrapper.get_creation_time_vt_as_xml_str() 0 7 1
A DummyWrapper.get_affected_vt_as_xml_str() 0 5 1
A DummyWrapper.get_severities_vt_as_xml_str() 0 9 1
A DummyWrapper.get_impact_vt_as_xml_str() 0 5 1
A FakeStream.__init__() 0 2 1
A DummyWrapper.get_params_vt_as_xml_str() 0 4 1
A DummyWrapper.get_modification_time_vt_as_xml_str() 0 9 1
A DummyWrapper.check() 0 2 1
A FakeStream.write() 0 2 1
A DummyWrapper.get_custom_vt_as_xml_str() 0 3 1
A DummyWrapper.get_solution_vt_as_xml_str() 0 7 1
A FakePsutil.__init__() 0 2 1
B DummyWrapper.exec_scan() 0 39 6
1
# Copyright (C) 2014-2020 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: AGPL-3.0-or-later
4
#
5
# This program is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Affero General Public License as
7
# published by the Free Software Foundation, either version 3 of the
8
# License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
import time
19
import multiprocessing
20
21
from unittest.mock import Mock
22
23
from xml.etree import ElementTree as et
24
25
from ospd.ospd import OSPDaemon
26
27
28
def assert_called(mock: Mock):
29
    if hasattr(mock, 'assert_called'):
30
        return mock.assert_called()
31
32
    if not mock.call_count == 1:
33
        msg = "Expected '%s' to have been called once. Called %s times.%s" % (
34
            mock._mock_name or 'mock',  # pylint: disable=protected-access
35
            mock.call_count,
36
            mock._calls_repr(),  # pylint: disable=protected-access
37
        )
38
        raise AssertionError(msg)
39
40
41
class FakePsutil:
42
    def __init__(self, free=None):
43
        self.free = free
44
45
46
class FakeStream:
47
    def __init__(self):
48
        self.response = b''
49
50
    def write(self, data):
51
        self.response = self.response + data
52
53
    def get_response(self):
54
        return et.fromstring(self.response)
55
56
57
class FakeDataManager:
58
    def __init__(self):
59
        pass
60
61
    def dict(self):
62
        return dict()
63
64
65
class DummyWrapper(OSPDaemon):
66
    def __init__(self, results, checkresult=True):
67
        super().__init__()
68
        self.checkresult = checkresult
69
        self.results = results
70
        self.initialized = True
71
        self.scan_collection.data_manager = FakeDataManager()
72
        self.scan_collection.file_storage_dir = '/tmp'
73
74
    def check(self):
75
        return self.checkresult
76
77
    @staticmethod
78
    def get_custom_vt_as_xml_str(vt_id, custom):
79
        return '<custom><mytest>static test</mytest></custom>'
80
81
    @staticmethod
82
    def get_params_vt_as_xml_str(vt_id, vt_params):
83
        return (
84
            '<params><param id="abc" type="string">'
85
            '<name>ABC</name><description>Test ABC</description>'
86
            '<default>yes</default></param>'
87
            '<param id="def" type="string">'
88
            '<name>DEF</name><description>Test DEF</description>'
89
            '<default>no</default></param></params>'
90
        )
91
92
    @staticmethod
93
    def get_refs_vt_as_xml_str(vt_id, vt_refs):
94
        response = (
95
            '<refs><ref type="cve" id="CVE-2010-4480"/>'
96
            '<ref type="url" id="http://example.com"/></refs>'
97
        )
98
        return response
99
100
    @staticmethod
101
    def get_dependencies_vt_as_xml_str(vt_id, vt_dependencies):
102
        response = (
103
            '<dependencies>'
104
            '<dependency vt_id="1.3.6.1.4.1.25623.1.0.50282" />'
105
            '<dependency vt_id="1.3.6.1.4.1.25623.1.0.50283" />'
106
            '</dependencies>'
107
        )
108
109
        return response
110
111
    @staticmethod
112
    def get_severities_vt_as_xml_str(vt_id, severities):
113
        response = (
114
            '<severities><severity cvss_base="5.0" cvss_'
115
            'type="cvss_base_v2">AV:N/AC:L/Au:N/C:N/I:N/'
116
            'A:P</severity></severities>'
117
        )
118
119
        return response
120
121
    @staticmethod
122
    def get_detection_vt_as_xml_str(
123
        vt_id, detection=None, qod_type=None, qod=None
124
    ):
125
        response = '<detection qod_type="package">some detection</detection>'
126
127
        return response
128
129
    @staticmethod
130
    def get_summary_vt_as_xml_str(vt_id, summary):
131
        response = '<summary>Some summary</summary>'
132
133
        return response
134
135
    @staticmethod
136
    def get_affected_vt_as_xml_str(vt_id, affected):
137
        response = '<affected>Some affected</affected>'
138
139
        return response
140
141
    @staticmethod
142
    def get_impact_vt_as_xml_str(vt_id, impact):
143
        response = '<impact>Some impact</impact>'
144
145
        return response
146
147
    @staticmethod
148
    def get_insight_vt_as_xml_str(vt_id, insight):
149
        response = '<insight>Some insight</insight>'
150
151
        return response
152
153
    @staticmethod
154
    def get_solution_vt_as_xml_str(
155
        vt_id, solution, solution_type=None, solution_method=None
156
    ):
157
        response = '<solution>Some solution</solution>'
158
159
        return response
160
161
    @staticmethod
162
    def get_creation_time_vt_as_xml_str(
163
        vt_id, creation_time
164
    ):  # pylint: disable=arguments-differ
165
        response = '<creation_time>%s</creation_time>' % creation_time
166
167
        return response
168
169
    @staticmethod
170
    def get_modification_time_vt_as_xml_str(
171
        vt_id, modification_time
172
    ):  # pylint: disable=arguments-differ
173
        response = (
174
            '<modification_time>%s</modification_time>' % modification_time
175
        )
176
177
        return response
178
179
    def exec_scan(self, scan_id):
180
        time.sleep(0.01)
181
        for res in self.results:
182
            if res.result_type == 'log':
183
                self.add_scan_log(
184
                    scan_id,
185
                    res.host,
186
                    res.hostname,
187
                    res.name,
188
                    res.value,
189
                    res.port,
190
                )
191
            if res.result_type == 'error':
192
                self.add_scan_error(
193
                    scan_id,
194
                    res.host,
195
                    res.hostname,
196
                    res.name,
197
                    res.value,
198
                    res.port,
199
                )
200
            elif res.result_type == 'host-detail':
201
                self.add_scan_host_detail(
202
                    scan_id, res.host, res.hostname, res.name, res.value,
203
                )
204
            elif res.result_type == 'alarm':
205
                self.add_scan_alarm(
206
                    scan_id,
207
                    res.host,
208
                    res.hostname,
209
                    res.name,
210
                    res.value,
211
                    res.port,
212
                    res.test_id,
213
                    res.severity,
214
                    res.qod,
215
                )
216
            else:
217
                raise ValueError(res.result_type)
218