Passed
Pull Request — master (#213)
by
unknown
01:45
created

StartScanTestCase.test_scan_ignore_multi_target()   A

Complexity

Conditions 1

Size

Total Lines 21
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 10
nop 3
dl 0
loc 21
rs 9.9
c 0
b 0
f 0
1
# Copyright (C) 2020 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: GPL-2.0-or-later
4
#
5
# This program is free software; you can redistribute it and/or
6
# modify it under the terms of the GNU General Public License
7
# as published by the Free Software Foundation; either version 2
8
# of the License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU General Public License for more details.
14
#
15
# You should have received a copy of the GNU General Public License
16
# along with this program; if not, write to the Free Software
17
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
19
import time
20
21
from unittest import TestCase
22
from unittest.mock import patch
23
24
from xml.etree import ElementTree as et
25
26
from ospd.command.command import (
27
    GetPerformance,
28
    StartScan,
29
    StopScan,
30
    GetMemoryUsage,
31
)
32
from ospd.errors import OspdCommandError, OspdError
33
from ospd.misc import create_process
34
35
from ..helper import DummyWrapper, assert_called, FakeStream
36
37
38
class GetPerformanceTestCase(TestCase):
39
    @patch('ospd.command.command.subprocess')
40
    def test_get_performance(self, mock_subproc):
41
        cmd = GetPerformance(None)
42
        mock_subproc.check_output.return_value = b'foo'
43
        response = et.fromstring(
44
            cmd.handle_xml(
45
                et.fromstring(
46
                    '<get_performance start="0" end="0" titles="mem"/>'
47
                )
48
            )
49
        )
50
51
        self.assertEqual(response.get('status'), '200')
52
        self.assertEqual(response.tag, 'get_performance_response')
53
54
    def test_get_performance_fail_int(self):
55
        cmd = GetPerformance(None)
56
        request = et.fromstring(
57
            '<get_performance start="a" end="0" titles="mem"/>'
58
        )
59
60
        with self.assertRaises(OspdCommandError):
61
            cmd.handle_xml(request)
62
63
    def test_get_performance_fail_regex(self):
64
        cmd = GetPerformance(None)
65
        request = et.fromstring(
66
            '<get_performance start="0" end="0" titles="mem|bar"/>'
67
        )
68
69
        with self.assertRaises(OspdCommandError):
70
            cmd.handle_xml(request)
71
72
    def test_get_performance_fail_cmd(self):
73
        cmd = GetPerformance(None)
74
        request = et.fromstring(
75
            '<get_performance start="0" end="0" titles="mem1"/>'
76
        )
77
78
        with self.assertRaises(OspdCommandError):
79
            cmd.handle_xml(request)
80
81
82
class StartScanTestCase(TestCase):
83
    def test_scan_with_vts_empty_vt_list(self):
84
        daemon = DummyWrapper([])
85
        cmd = StartScan(daemon)
86
        request = et.fromstring(
87
            '<start_scan>'
88
            '<targets>'
89
            '<target>'
90
            '<hosts>localhost</hosts>'
91
            '<ports>80, 443</ports>'
92
            '</target>'
93
            '</targets>'
94
            '<scanner_params /><vt_selection />'
95
            '</start_scan>'
96
        )
97
98
        with self.assertRaises(OspdCommandError):
99
            cmd.handle_xml(request)
100
101
    @patch("ospd.command.command.create_process")
102
    def test_scan_with_vts(self, mock_create_process):
103
        daemon = DummyWrapper([])
104
        cmd = StartScan(daemon)
105
106
        request = et.fromstring(
107
            '<start_scan>'
108
            '<targets>'
109
            '<target>'
110
            '<hosts>localhost</hosts>'
111
            '<ports>80, 443</ports>'
112
            '</target>'
113
            '</targets>'
114
            '<scanner_params />'
115
            '<vt_selection>'
116
            '<vt_single id="1.2.3.4" />'
117
            '</vt_selection>'
118
            '</start_scan>'
119
        )
120
121
        # With one vt, without params
122
        response = et.fromstring(cmd.handle_xml(request))
123
        scan_id = response.findtext('id')
124
125
        self.assertEqual(
126
            daemon.get_scan_vts(scan_id), {'1.2.3.4': {}, 'vt_groups': []}
127
        )
128
        self.assertNotEqual(daemon.get_scan_vts(scan_id), {'1.2.3.6': {}})
129
130
        assert_called(mock_create_process)
131
132
    @patch("ospd.command.command.create_process")
133
    def test_scan_without_vts(self, mock_create_process):
134
        daemon = DummyWrapper([])
135
        cmd = StartScan(daemon)
136
137
        # With out vts
138
        request = et.fromstring(
139
            '<start_scan>'
140
            '<targets>'
141
            '<target>'
142
            '<hosts>localhost</hosts>'
143
            '<ports>80, 443</ports>'
144
            '</target>'
145
            '</targets>'
146
            '<scanner_params />'
147
            '</start_scan>'
148
        )
149
        response = et.fromstring(cmd.handle_xml(request))
150
151
        scan_id = response.findtext('id')
152
153
        self.assertEqual(daemon.get_scan_vts(scan_id), {})
154
155
        assert_called(mock_create_process)
156
157
    def test_scan_with_vts_and_param_missing_vt_param_id(self):
158
        daemon = DummyWrapper([])
159
        cmd = StartScan(daemon)
160
161
        # Raise because no vt_param id attribute
162
        request = et.fromstring(
163
            '<start_scan>'
164
            '<targets>'
165
            '<target>'
166
            '<hosts>localhost</hosts>'
167
            '<ports>80, 443</ports>'
168
            '</target>'
169
            '</targets>'
170
            '<scanner_params />'
171
            '<vt_selection>'
172
            '<vt_single id="1234"><vt_value>200</vt_value></vt_single>'
173
            '</vt_selection>'
174
            '</start_scan>'
175
        )
176
177
        with self.assertRaises(OspdError):
178
            cmd.handle_xml(request)
179
180 View Code Duplication
    @patch("ospd.command.command.create_process")
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
181
    def test_scan_with_vts_and_param(self, mock_create_process):
182
        daemon = DummyWrapper([])
183
        cmd = StartScan(daemon)
184
185
        # No error
186
        request = et.fromstring(
187
            '<start_scan>'
188
            '<targets>'
189
            '<target>'
190
            '<hosts>localhost</hosts>'
191
            '<ports>80, 443</ports>'
192
            '</target>'
193
            '</targets>'
194
            '<scanner_params />'
195
            '<vt_selection>'
196
            '<vt_single id="1234">'
197
            '<vt_value id="ABC">200</vt_value>'
198
            '</vt_single>'
199
            '</vt_selection>'
200
            '</start_scan>'
201
        )
202
        response = et.fromstring(cmd.handle_xml(request))
203
        scan_id = response.findtext('id')
204
205
        self.assertEqual(
206
            daemon.get_scan_vts(scan_id),
207
            {'1234': {'ABC': '200'}, 'vt_groups': []},
208
        )
209
210
        assert_called(mock_create_process)
211
212
    def test_scan_with_vts_and_param_missing_vt_group_filter(self):
213
        daemon = DummyWrapper([])
214
        cmd = StartScan(daemon)
215
216
        # Raise because no vtgroup filter attribute
217
        request = et.fromstring(
218
            '<start_scan>'
219
            '<targets>'
220
            '<target>'
221
            '<hosts>localhost</hosts>'
222
            '<ports>80, 443</ports>'
223
            '</target>'
224
            '</targets>'
225
            '<scanner_params />'
226
            '<vt_selection><vt_group/></vt_selection>'
227
            '</start_scan>'
228
        )
229
230
        with self.assertRaises(OspdError):
231
            cmd.handle_xml(request)
232
233 View Code Duplication
    @patch("ospd.command.command.create_process")
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
234
    def test_scan_with_vts_and_param_with_vt_group_filter(
235
        self, mock_create_process
236
    ):
237
        daemon = DummyWrapper([])
238
        cmd = StartScan(daemon)
239
240
        # No error
241
        request = et.fromstring(
242
            '<start_scan>'
243
            '<targets>'
244
            '<target>'
245
            '<hosts>localhost</hosts>'
246
            '<ports>80, 443</ports>'
247
            '</target>'
248
            '</targets>'
249
            '<scanner_params />'
250
            '<vt_selection>'
251
            '<vt_group filter="a"/>'
252
            '</vt_selection>'
253
            '</start_scan>'
254
        )
255
        response = et.fromstring(cmd.handle_xml(request))
256
        scan_id = response.findtext('id')
257
258
        self.assertEqual(daemon.get_scan_vts(scan_id), {'vt_groups': ['a']})
259
260
        assert_called(mock_create_process)
261
262
    @patch("ospd.command.command.create_process")
263
    @patch("ospd.command.command.logger")
264
    def test_scan_ignore_multi_target(self, mock_logger, mock_create_process):
265
        daemon = DummyWrapper([])
266
        cmd = StartScan(daemon)
267
        request = et.fromstring(
268
            '<start_scan parallel="100a">'
269
            '<targets>'
270
            '<target>'
271
            '<hosts>localhosts</hosts>'
272
            '<ports>22</ports>'
273
            '</target>'
274
            '</targets>'
275
            '<scanner_params />'
276
            '</start_scan>'
277
        )
278
279
        cmd.handle_xml(request)
280
281
        assert_called(mock_logger.warning)
282
        assert_called(mock_create_process)
283
284
    @patch("ospd.command.command.create_process")
285
    @patch("ospd.command.command.logger")
286
    def test_scan_use_legacy_target_and_port(
287
        self, mock_logger, mock_create_process
288
    ):
289
        daemon = DummyWrapper([])
290
        cmd = StartScan(daemon)
291
        request = et.fromstring(
292
            '<start_scan target="localhost" ports="22">'
293
            '<scanner_params />'
294
            '</start_scan>'
295
        )
296
297
        response = et.fromstring(cmd.handle_xml(request))
298
        scan_id = response.findtext('id')
299
300
        self.assertIsNotNone(scan_id)
301
302
        self.assertEqual(daemon.get_scan_host(scan_id), 'localhost')
303
        self.assertEqual(daemon.get_scan_ports(scan_id), '22')
304
305
        assert_called(mock_logger.warning)
306
        assert_called(mock_create_process)
307
308
309
class StopCommandTestCase(TestCase):
310
    @patch("ospd.ospd.os")
311
    @patch("ospd.command.command.create_process")
312
    def test_stop_scan(self, mock_create_process, mock_os):
313
        mock_process = mock_create_process.return_value
314
        mock_process.is_alive.return_value = True
315
        mock_process.pid = "foo"
316
317
        fs = FakeStream()
318
        daemon = DummyWrapper([])
319
        request = (
320
            '<start_scan>'
321
            '<targets>'
322
            '<target>'
323
            '<hosts>localhosts</hosts>'
324
            '<ports>22</ports>'
325
            '</target>'
326
            '</targets>'
327
            '<scanner_params />'
328
            '</start_scan>'
329
        )
330
        daemon.handle_command(request, fs)
331
        response = fs.get_response()
332
333
        assert_called(mock_create_process)
334
        assert_called(mock_process.start)
335
336
        scan_id = response.findtext('id')
337
338
        request = et.fromstring('<stop_scan scan_id="%s" />' % scan_id)
339
        cmd = StopScan(daemon)
340
        cmd.handle_xml(request)
341
342
        assert_called(mock_process.terminate)
343
344
        mock_os.getpgid.assert_called_with('foo')
345
346
    def test_unknown_scan_id(self):
347
        daemon = DummyWrapper([])
348
        cmd = StopScan(daemon)
349
        request = et.fromstring('<stop_scan scan_id="foo" />')
350
351
        with self.assertRaises(OspdCommandError):
352
            cmd.handle_xml(request)
353
354
    def test_missing_scan_id(self):
355
        request = et.fromstring('<stop_scan />')
356
        cmd = StopScan(None)
357
358
        with self.assertRaises(OspdCommandError):
359
            cmd.handle_xml(request)
360
361
362
class GetMemoryUsageTestCase(TestCase):
363
    def test_with_main_process_only(self):
364
        cmd = GetMemoryUsage(None)
365
366
        request = et.fromstring('<get_memory_usage />')
367
368
        response = et.fromstring(cmd.handle_xml(request))
369
        processes_element = response.find('processes')
370
371
        process_elements = processes_element.findall('process')
372
373
        self.assertTrue(len(process_elements), 1)
374
375
        main_process_element = process_elements[0]
376
377
        rss_element = main_process_element.find('rss')
378
        vms_element = main_process_element.find('vms')
379
        shared_element = main_process_element.find('shared')
380
381
        self.assertIsNotNone(rss_element)
382
        self.assertIsNotNone(rss_element.text)
383
384
        self.assertIsNotNone(vms_element)
385
        self.assertIsNotNone(vms_element.text)
386
387
        self.assertIsNotNone(shared_element)
388
        self.assertIsNotNone(shared_element.text)
389
390 View Code Duplication
    def test_with_subprocess(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
391
        cmd = GetMemoryUsage(None)
392
393
        def foo():  # pylint: disable=blacklisted-name
394
            time.sleep(60)
395
396
        create_process(foo, args=[])
397
398
        request = et.fromstring('<get_memory_usage />')
399
400
        response = et.fromstring(cmd.handle_xml(request))
401
        processes_element = response.find('processes')
402
403
        process_elements = processes_element.findall('process')
404
405
        self.assertTrue(len(process_elements), 2)
406
407
        for process_element in process_elements:
408
            rss_element = process_element.find('rss')
409
            vms_element = process_element.find('vms')
410
            shared_element = process_element.find('shared')
411
412
            self.assertIsNotNone(rss_element)
413
            self.assertIsNotNone(rss_element.text)
414
415
            self.assertIsNotNone(vms_element)
416
            self.assertIsNotNone(vms_element.text)
417
418
            self.assertIsNotNone(shared_element)
419
            self.assertIsNotNone(shared_element.text)
420
421 View Code Duplication
    def test_with_subsubprocess(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
422
        cmd = GetMemoryUsage(None)
423
424
        def bar():  # pylint: disable=blacklisted-name
425
            create_process(foo, args=[])
426
427
        def foo():  # pylint: disable=blacklisted-name
428
            time.sleep(60)
429
430
        create_process(bar, args=[])
431
432
        request = et.fromstring('<get_memory_usage />')
433
434
        response = et.fromstring(cmd.handle_xml(request))
435
        processes_element = response.find('processes')
436
437
        process_elements = processes_element.findall('process')
438
439
        # sub-sub-processes aren't listed
440
        self.assertTrue(len(process_elements), 2)
441
442
        for process_element in process_elements:
443
            rss_element = process_element.find('rss')
444
            vms_element = process_element.find('vms')
445
            shared_element = process_element.find('shared')
446
447
            self.assertIsNotNone(rss_element)
448
            self.assertIsNotNone(rss_element.text)
449
450
            self.assertIsNotNone(vms_element)
451
            self.assertIsNotNone(vms_element.text)
452
453
            self.assertIsNotNone(shared_element)
454
            self.assertIsNotNone(shared_element.text)
455