Completed
Push — master ( 494987...e33655 )
by Juan José
23s queued 10s
created

tests.test_command   A

Complexity

Total Complexity 25

Size/Duplication

Total Lines 299
Duplicated Lines 15.72 %

Importance

Changes 0
Metric Value
eloc 152
dl 47
loc 299
rs 10
c 0
b 0
f 0
wmc 25

16 Methods

Rating   Name   Duplication   Size   Complexity  
A StartScanTestCase.test_scan_with_vts_and_param_missing_vt_param_id() 0 16 2
A StartScanTestCase.test_scan_with_vts_and_param_missing_vt_group_filter() 0 14 2
A GetPerformanceTestCase.test_get_performance_fail_int() 0 8 2
A GetPerformanceTestCase.test_get_performance_fail_cmd() 0 8 2
A StartScanTestCase.test_scan_with_vts_empty_vt_list() 0 11 2
A GetPerformanceTestCase.test_get_performance() 0 14 1
A StartScanTestCase.test_scan_with_vts() 0 24 1
A StopCommandTestCase.test_stop_scan() 0 27 1
A StopCommandTestCase.test_missing_scan_id() 0 6 2
A GetPerformanceTestCase.test_get_performance_fail_regex() 0 8 2
A StartScanTestCase.test_scan_multi_target_parallel_100() 0 24 1
A StopCommandTestCase.test_unknown_scan_id() 0 7 2
A StartScanTestCase.test_scan_multi_target_parallel_with_error() 0 17 2
A StartScanTestCase.test_scan_with_vts_and_param_with_vt_group_filter() 22 22 1
A StartScanTestCase.test_scan_without_vts() 0 18 1
A StartScanTestCase.test_scan_with_vts_and_param() 25 25 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
# Copyright (C) 2020 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: GPL-2.0-or-later
4
#
5
# This program is free software; you can redistribute it and/or
6
# modify it under the terms of the GNU General Public License
7
# as published by the Free Software Foundation; either version 2
8
# of the License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU General Public License for more details.
14
#
15
# You should have received a copy of the GNU General Public License
16
# along with this program; if not, write to the Free Software
17
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
19
from unittest import TestCase
20
from unittest.mock import patch
21
22
from xml.etree import ElementTree as et
23
24
from ospd.command.command import GetPerformance, StartScan, StopScan
25
from ospd.errors import OspdCommandError
26
27
from .helper import DummyWrapper, assert_called
28
29
30
class GetPerformanceTestCase(TestCase):
31
    @patch('ospd.command.command.subprocess')
32
    def test_get_performance(self, mock_subproc):
33
        cmd = GetPerformance(None)
34
        mock_subproc.check_output.return_value = b'foo'
35
        response = et.fromstring(
36
            cmd.handle_xml(
37
                et.fromstring(
38
                    '<get_performance start="0" end="0" titles="mem"/>'
39
                )
40
            )
41
        )
42
43
        self.assertEqual(response.get('status'), '200')
44
        self.assertEqual(response.tag, 'get_performance_response')
45
46
    def test_get_performance_fail_int(self):
47
        cmd = GetPerformance(None)
48
        request = et.fromstring(
49
            '<get_performance start="a" end="0" titles="mem"/>'
50
        )
51
52
        with self.assertRaises(OspdCommandError):
53
            cmd.handle_xml(request)
54
55
    def test_get_performance_fail_regex(self):
56
        cmd = GetPerformance(None)
57
        request = et.fromstring(
58
            '<get_performance start="0" end="0" titles="mem|bar"/>'
59
        )
60
61
        with self.assertRaises(OspdCommandError):
62
            cmd.handle_xml(request)
63
64
    def test_get_performance_fail_cmd(self):
65
        cmd = GetPerformance(None)
66
        request = et.fromstring(
67
            '<get_performance start="0" end="0" titles="mem1"/>'
68
        )
69
70
        with self.assertRaises(OspdCommandError):
71
            cmd.handle_xml(request)
72
73
74
class StartScanTestCase(TestCase):
75
    def test_scan_with_vts_empty_vt_list(self):
76
        daemon = DummyWrapper([])
77
        cmd = StartScan(daemon)
78
        request = et.fromstring(
79
            '<start_scan target="localhost" ports="80, 443">'
80
            '<scanner_params /><vt_selection />'
81
            '</start_scan>'
82
        )
83
84
        with self.assertRaises(OspdCommandError):
85
            cmd.handle_xml(request)
86
87
    @patch("ospd.command.command.create_process")
88
    def test_scan_with_vts(self, mock_create_process):
89
        daemon = DummyWrapper([])
90
        cmd = StartScan(daemon)
91
92
        request = et.fromstring(
93
            '<start_scan target="localhost" ports="80, 443">'
94
            '<scanner_params />'
95
            '<vt_selection>'
96
            '<vt_single id="1.2.3.4" />'
97
            '</vt_selection>'
98
            '</start_scan>'
99
        )
100
101
        # With one vt, without params
102
        response = et.fromstring(cmd.handle_xml(request))
103
        scan_id = response.findtext('id')
104
105
        self.assertEqual(
106
            daemon.get_scan_vts(scan_id), {'1.2.3.4': {}, 'vt_groups': []}
107
        )
108
        self.assertNotEqual(daemon.get_scan_vts(scan_id), {'1.2.3.6': {}})
109
110
        assert_called(mock_create_process)
111
112
    @patch("ospd.command.command.create_process")
113
    def test_scan_without_vts(self, mock_create_process):
114
        daemon = DummyWrapper([])
115
        cmd = StartScan(daemon)
116
117
        # With out vtS
118
        request = et.fromstring(
119
            '<start_scan target="localhost" ports="80, 443">'
120
            '<scanner_params />'
121
            '</start_scan>'
122
        )
123
        response = et.fromstring(cmd.handle_xml(request))
124
125
        scan_id = response.findtext('id')
126
127
        self.assertEqual(daemon.get_scan_vts(scan_id), {})
128
129
        assert_called(mock_create_process)
130
131
    def test_scan_with_vts_and_param_missing_vt_param_id(self):
132
        daemon = DummyWrapper([])
133
        cmd = StartScan(daemon)
134
135
        # Raise because no vt_param id attribute
136
        request = et.fromstring(
137
            '<start_scan target="localhost" ports="80, 443">'
138
            '<scanner_params />'
139
            '<vt_selection>'
140
            '<vt_single id="1234"><vt_value>200</vt_value></vt_single>'
141
            '</vt_selection>'
142
            '</start_scan>'
143
        )
144
145
        with self.assertRaises(OspdCommandError):
146
            cmd.handle_xml(request)
147
148 View Code Duplication
    @patch("ospd.command.command.create_process")
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
149
    def test_scan_with_vts_and_param(self, mock_create_process):
150
        daemon = DummyWrapper([])
151
        cmd = StartScan(daemon)
152
153
        # No error
154
        request = et.fromstring(
155
            '<start_scan target="localhost" ports="80, 443">'
156
            '<scanner_params />'
157
            '<vt_selection>'
158
            '<vt_single id="1234">'
159
            '<vt_value id="ABC">200</vt_value>'
160
            '</vt_single>'
161
            '</vt_selection>'
162
            '</start_scan>'
163
        )
164
        response = et.fromstring(cmd.handle_xml(request))
165
        scan_id = response.findtext('id')
166
167
        self.assertEqual(
168
            daemon.get_scan_vts(scan_id),
169
            {'1234': {'ABC': '200'}, 'vt_groups': []},
170
        )
171
172
        assert_called(mock_create_process)
173
174
    def test_scan_with_vts_and_param_missing_vt_group_filter(self):
175
        daemon = DummyWrapper([])
176
        cmd = StartScan(daemon)
177
178
        # Raise because no vtgroup filter attribute
179
        request = et.fromstring(
180
            '<start_scan target="localhost" ports="80, 443">'
181
            '<scanner_params />'
182
            '<vt_selection><vt_group/></vt_selection>'
183
            '</start_scan>'
184
        )
185
186
        with self.assertRaises(OspdCommandError):
187
            cmd.handle_xml(request)
188
189 View Code Duplication
    @patch("ospd.command.command.create_process")
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
190
    def test_scan_with_vts_and_param_with_vt_group_filter(
191
        self, mock_create_process
192
    ):
193
        daemon = DummyWrapper([])
194
        cmd = StartScan(daemon)
195
196
        # No error
197
        request = et.fromstring(
198
            '<start_scan target="localhost" ports="80, 443">'
199
            '<scanner_params />'
200
            '<vt_selection>'
201
            '<vt_group filter="a"/>'
202
            '</vt_selection>'
203
            '</start_scan>'
204
        )
205
        response = et.fromstring(cmd.handle_xml(request))
206
        scan_id = response.findtext('id')
207
208
        self.assertEqual(daemon.get_scan_vts(scan_id), {'vt_groups': ['a']})
209
210
        assert_called(mock_create_process)
211
212
    def test_scan_multi_target_parallel_with_error(self):
213
        daemon = DummyWrapper([])
214
        cmd = StartScan(daemon)
215
        request = et.fromstring(
216
            '<start_scan parallel="100a">'
217
            '<scanner_params />'
218
            '<targets>'
219
            '<target>'
220
            '<hosts>localhosts</hosts>'
221
            '<ports>22</ports>'
222
            '</target>'
223
            '</targets>'
224
            '</start_scan>'
225
        )
226
227
        with self.assertRaises(OspdCommandError):
228
            cmd.handle_xml(request)
229
230
    @patch("ospd.ospd.OSPDaemon")
231
    @patch("ospd.command.command.create_process")
232
    def test_scan_multi_target_parallel_100(
233
        self, mock_create_process, mock_daemon
234
    ):
235
        daemon = mock_daemon()
236
        daemon.create_scan.return_value = '1'
237
        cmd = StartScan(daemon)
238
        request = et.fromstring(
239
            '<start_scan parallel="100">'
240
            '<scanner_params />'
241
            '<targets>'
242
            '<target>'
243
            '<hosts>localhosts</hosts>'
244
            '<ports>22</ports>'
245
            '</target>'
246
            '</targets>'
247
            '</start_scan>'
248
        )
249
        response = et.fromstring(cmd.handle_xml(request))
250
251
        self.assertEqual(response.get('status'), '200')
252
253
        assert_called(mock_create_process)
254
255
256
class StopCommandTestCase(TestCase):
257
    @patch("ospd.ospd.os")
258
    @patch("ospd.command.command.create_process")
259
    def test_stop_scan(self, mock_create_process, mock_os):
260
        mock_process = mock_create_process.return_value
261
        mock_process.is_alive.return_value = True
262
        mock_process.pid = "foo"
263
264
        daemon = DummyWrapper([])
265
        request = (
266
            '<start_scan target="localhost" ports="80, 443">'
267
            '<scanner_params />'
268
            '</start_scan>'
269
        )
270
        response = et.fromstring(daemon.handle_command(request))
271
272
        assert_called(mock_create_process)
273
        assert_called(mock_process.start)
274
275
        scan_id = response.findtext('id')
276
277
        request = et.fromstring('<stop_scan scan_id="%s" />' % scan_id)
278
        cmd = StopScan(daemon)
279
        cmd.handle_xml(request)
280
281
        assert_called(mock_process.terminate)
282
283
        mock_os.getpgid.assert_called_with('foo')
284
285
    def test_unknown_scan_id(self):
286
        daemon = DummyWrapper([])
287
        cmd = StopScan(daemon)
288
        request = et.fromstring('<stop_scan scan_id="foo" />')
289
290
        with self.assertRaises(OspdCommandError):
291
            cmd.handle_xml(request)
292
293
    def test_missing_scan_id(self):
294
        request = et.fromstring('<stop_scan />')
295
        cmd = StopScan(None)
296
297
        with self.assertRaises(OspdCommandError):
298
            cmd.handle_xml(request)
299