Passed
Pull Request — master (#201)
by Juan José
01:30
created

tests.helper.FakeStream.get_response()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
# Copyright (C) 2020 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: GPL-2.0-or-later
4
#
5
# This program is free software; you can redistribute it and/or
6
# modify it under the terms of the GNU General Public License
7
# as published by the Free Software Foundation; either version 2
8
# of the License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU General Public License for more details.
14
#
15
# You should have received a copy of the GNU General Public License
16
# along with this program; if not, write to the Free Software
17
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
19
import time
20
21
from unittest.mock import Mock
22
23
from xml.etree import ElementTree as et
24
25
from ospd.ospd import OSPDaemon
26
27
28
def assert_called(mock: Mock):
29
    if hasattr(mock, 'assert_called'):
30
        return mock.assert_called()
31
32
    if not mock.call_count == 1:
33
        msg = "Expected '%s' to have been called once. Called %s times.%s" % (
34
            mock._mock_name or 'mock',  # pylint: disable=protected-access
35
            mock.call_count,
36
            mock._calls_repr(),  # pylint: disable=protected-access
37
        )
38
        raise AssertionError(msg)
39
40
41
class FakeStream:
42
    def __init__(self):
43
        self.response = b''
44
45
    def write(self, data):
46
        self.response = self.response + data
47
48
    def get_response(self):
49
        return et.fromstring(self.response)
50
51
52
class DummyWrapper(OSPDaemon):
53
    def __init__(self, results, checkresult=True):
54
        super().__init__()
55
        self.checkresult = checkresult
56
        self.results = results
57
58
    def check(self):
59
        return self.checkresult
60
61
    @staticmethod
62
    def get_custom_vt_as_xml_str(vt_id, custom):
63
        return '<custom><mytest>static test</mytest></custom>'
64
65
    @staticmethod
66
    def get_params_vt_as_xml_str(vt_id, vt_params):
67
        return (
68
            '<params><param id="abc" type="string">'
69
            '<name>ABC</name><description>Test ABC</description>'
70
            '<default>yes</default></param>'
71
            '<param id="def" type="string">'
72
            '<name>DEF</name><description>Test DEF</description>'
73
            '<default>no</default></param></params>'
74
        )
75
76
    @staticmethod
77
    def get_refs_vt_as_xml_str(vt_id, vt_refs):
78
        response = (
79
            '<refs><ref type="cve" id="CVE-2010-4480"/>'
80
            '<ref type="url" id="http://example.com"/></refs>'
81
        )
82
        return response
83
84
    @staticmethod
85
    def get_dependencies_vt_as_xml_str(vt_id, vt_dependencies):
86
        response = (
87
            '<dependencies>'
88
            '<dependency vt_id="1.3.6.1.4.1.25623.1.0.50282" />'
89
            '<dependency vt_id="1.3.6.1.4.1.25623.1.0.50283" />'
90
            '</dependencies>'
91
        )
92
93
        return response
94
95
    @staticmethod
96
    def get_severities_vt_as_xml_str(vt_id, severities):
97
        response = (
98
            '<severities><severity cvss_base="5.0" cvss_'
99
            'type="cvss_base_v2">AV:N/AC:L/Au:N/C:N/I:N/'
100
            'A:P</severity></severities>'
101
        )
102
103
        return response
104
105
    @staticmethod
106
    def get_detection_vt_as_xml_str(
107
        vt_id, detection=None, qod_type=None, qod=None
108
    ):
109
        response = '<detection qod_type="package">some detection</detection>'
110
111
        return response
112
113
    @staticmethod
114
    def get_summary_vt_as_xml_str(vt_id, summary):
115
        response = '<summary>Some summary</summary>'
116
117
        return response
118
119
    @staticmethod
120
    def get_affected_vt_as_xml_str(vt_id, affected):
121
        response = '<affected>Some affected</affected>'
122
123
        return response
124
125
    @staticmethod
126
    def get_impact_vt_as_xml_str(vt_id, impact):
127
        response = '<impact>Some impact</impact>'
128
129
        return response
130
131
    @staticmethod
132
    def get_insight_vt_as_xml_str(vt_id, insight):
133
        response = '<insight>Some insight</insight>'
134
135
        return response
136
137
    @staticmethod
138
    def get_solution_vt_as_xml_str(
139
        vt_id, solution, solution_type=None, solution_method=None
140
    ):
141
        response = '<solution>Some solution</solution>'
142
143
        return response
144
145
    @staticmethod
146
    def get_creation_time_vt_as_xml_str(
147
        vt_id, creation_time
148
    ):  # pylint: disable=arguments-differ
149
        response = '<creation_time>%s</creation_time>' % creation_time
150
151
        return response
152
153
    @staticmethod
154
    def get_modification_time_vt_as_xml_str(
155
        vt_id, modification_time
156
    ):  # pylint: disable=arguments-differ
157
        response = (
158
            '<modification_time>%s</modification_time>' % modification_time
159
        )
160
161
        return response
162
163
    def exec_scan(self, scan_id, target):
164
        time.sleep(0.01)
165
        for res in self.results:
166
            if res.result_type == 'log':
167
                self.add_scan_log(
168
                    scan_id,
169
                    res.host or target,
170
                    res.hostname,
171
                    res.name,
172
                    res.value,
173
                    res.port,
174
                )
175
            if res.result_type == 'error':
176
                self.add_scan_error(
177
                    scan_id,
178
                    res.host or target,
179
                    res.hostname,
180
                    res.name,
181
                    res.value,
182
                    res.port,
183
                )
184
            elif res.result_type == 'host-detail':
185
                self.add_scan_host_detail(
186
                    scan_id,
187
                    res.host or target,
188
                    res.hostname,
189
                    res.name,
190
                    res.value,
191
                )
192
            elif res.result_type == 'alarm':
193
                self.add_scan_alarm(
194
                    scan_id,
195
                    res.host or target,
196
                    res.hostname,
197
                    res.name,
198
                    res.value,
199
                    res.port,
200
                    res.test_id,
201
                    res.severity,
202
                    res.qod,
203
                )
204
            else:
205
                raise ValueError(res.result_type)
206