Completed
Push — master ( 745830...54fe4d )
by Juan José
17s queued 13s
created

DummyWrapper.get_insight_vt_as_xml_str()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 2
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
# Copyright (C) 2014-2020 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: AGPL-3.0-or-later
4
#
5
# This program is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Affero General Public License as
7
# published by the Free Software Foundation, either version 3 of the
8
# License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
import time
19
import multiprocessing
20
21
from unittest.mock import Mock
22
23
from xml.etree import ElementTree as et
24
25
from ospd.ospd import OSPDaemon
26
27
28
def assert_called(mock: Mock):
29
    if hasattr(mock, 'assert_called'):
30
        return mock.assert_called()
31
32
    if not mock.call_count == 1:
33
        msg = "Expected '%s' to have been called once. Called %s times.%s" % (
34
            mock._mock_name or 'mock',  # pylint: disable=protected-access
35
            mock.call_count,
36
            mock._calls_repr(),  # pylint: disable=protected-access
37
        )
38
        raise AssertionError(msg)
39
40
41
class FakeStream:
42
    def __init__(self):
43
        self.response = b''
44
45
    def write(self, data):
46
        self.response = self.response + data
47
48
    def get_response(self):
49
        return et.fromstring(self.response)
50
51
52
class FakeDataManager:
53
    def __init__(self):
54
        pass
55
56
    def dict(self):
57
        return dict()
58
59
60
class DummyWrapper(OSPDaemon):
61
    def __init__(self, results, checkresult=True):
62
        super().__init__()
63
        self.checkresult = checkresult
64
        self.results = results
65
        self.initialized = True
66
        self.scan_collection.data_manager = FakeDataManager()
67
68
    def check(self):
69
        return self.checkresult
70
71
    @staticmethod
72
    def get_custom_vt_as_xml_str(vt_id, custom):
73
        return '<custom><mytest>static test</mytest></custom>'
74
75
    @staticmethod
76
    def get_params_vt_as_xml_str(vt_id, vt_params):
77
        return (
78
            '<params><param id="abc" type="string">'
79
            '<name>ABC</name><description>Test ABC</description>'
80
            '<default>yes</default></param>'
81
            '<param id="def" type="string">'
82
            '<name>DEF</name><description>Test DEF</description>'
83
            '<default>no</default></param></params>'
84
        )
85
86
    @staticmethod
87
    def get_refs_vt_as_xml_str(vt_id, vt_refs):
88
        response = (
89
            '<refs><ref type="cve" id="CVE-2010-4480"/>'
90
            '<ref type="url" id="http://example.com"/></refs>'
91
        )
92
        return response
93
94
    @staticmethod
95
    def get_dependencies_vt_as_xml_str(vt_id, vt_dependencies):
96
        response = (
97
            '<dependencies>'
98
            '<dependency vt_id="1.3.6.1.4.1.25623.1.0.50282" />'
99
            '<dependency vt_id="1.3.6.1.4.1.25623.1.0.50283" />'
100
            '</dependencies>'
101
        )
102
103
        return response
104
105
    @staticmethod
106
    def get_severities_vt_as_xml_str(vt_id, severities):
107
        response = (
108
            '<severities><severity cvss_base="5.0" cvss_'
109
            'type="cvss_base_v2">AV:N/AC:L/Au:N/C:N/I:N/'
110
            'A:P</severity></severities>'
111
        )
112
113
        return response
114
115
    @staticmethod
116
    def get_detection_vt_as_xml_str(
117
        vt_id, detection=None, qod_type=None, qod=None
118
    ):
119
        response = '<detection qod_type="package">some detection</detection>'
120
121
        return response
122
123
    @staticmethod
124
    def get_summary_vt_as_xml_str(vt_id, summary):
125
        response = '<summary>Some summary</summary>'
126
127
        return response
128
129
    @staticmethod
130
    def get_affected_vt_as_xml_str(vt_id, affected):
131
        response = '<affected>Some affected</affected>'
132
133
        return response
134
135
    @staticmethod
136
    def get_impact_vt_as_xml_str(vt_id, impact):
137
        response = '<impact>Some impact</impact>'
138
139
        return response
140
141
    @staticmethod
142
    def get_insight_vt_as_xml_str(vt_id, insight):
143
        response = '<insight>Some insight</insight>'
144
145
        return response
146
147
    @staticmethod
148
    def get_solution_vt_as_xml_str(
149
        vt_id, solution, solution_type=None, solution_method=None
150
    ):
151
        response = '<solution>Some solution</solution>'
152
153
        return response
154
155
    @staticmethod
156
    def get_creation_time_vt_as_xml_str(
157
        vt_id, creation_time
158
    ):  # pylint: disable=arguments-differ
159
        response = '<creation_time>%s</creation_time>' % creation_time
160
161
        return response
162
163
    @staticmethod
164
    def get_modification_time_vt_as_xml_str(
165
        vt_id, modification_time
166
    ):  # pylint: disable=arguments-differ
167
        response = (
168
            '<modification_time>%s</modification_time>' % modification_time
169
        )
170
171
        return response
172
173
    def exec_scan(self, scan_id):
174
        time.sleep(0.01)
175
        for res in self.results:
176
            if res.result_type == 'log':
177
                self.add_scan_log(
178
                    scan_id,
179
                    res.host,
180
                    res.hostname,
181
                    res.name,
182
                    res.value,
183
                    res.port,
184
                )
185
            if res.result_type == 'error':
186
                self.add_scan_error(
187
                    scan_id,
188
                    res.host,
189
                    res.hostname,
190
                    res.name,
191
                    res.value,
192
                    res.port,
193
                )
194
            elif res.result_type == 'host-detail':
195
                self.add_scan_host_detail(
196
                    scan_id, res.host, res.hostname, res.name, res.value,
197
                )
198
            elif res.result_type == 'alarm':
199
                self.add_scan_alarm(
200
                    scan_id,
201
                    res.host,
202
                    res.hostname,
203
                    res.name,
204
                    res.value,
205
                    res.port,
206
                    res.test_id,
207
                    res.severity,
208
                    res.qod,
209
                )
210
            else:
211
                raise ValueError(res.result_type)
212