Total Complexity | 70 |
Total Lines | 1287 |
Duplicated Lines | 12.67 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like tests.test_scan_and_result often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | # Copyright (C) 2014-2020 Greenbone Networks GmbH |
||
2 | # |
||
3 | # SPDX-License-Identifier: AGPL-3.0-or-later |
||
4 | # |
||
5 | # This program is free software: you can redistribute it and/or modify |
||
6 | # it under the terms of the GNU Affero General Public License as |
||
7 | # published by the Free Software Foundation, either version 3 of the |
||
8 | # License, or (at your option) any later version. |
||
9 | # |
||
10 | # This program is distributed in the hope that it will be useful, |
||
11 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
||
12 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||
13 | # GNU Affero General Public License for more details. |
||
14 | # |
||
15 | # You should have received a copy of the GNU Affero General Public License |
||
16 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
||
17 | |||
18 | # pylint: disable=too-many-lines |
||
19 | |||
20 | """ Test module for scan runs |
||
21 | """ |
||
22 | |||
23 | import time |
||
24 | import unittest |
||
25 | |||
26 | from unittest.mock import patch, MagicMock, Mock |
||
27 | |||
28 | import logging |
||
29 | import xml.etree.ElementTree as ET |
||
30 | |||
31 | from defusedxml.common import EntitiesForbidden |
||
32 | |||
33 | from ospd.resultlist import ResultList |
||
34 | from ospd.errors import OspdCommandError |
||
35 | from ospd.scan import ScanStatus |
||
36 | |||
37 | from .helper import ( |
||
38 | DummyWrapper, |
||
39 | assert_called, |
||
40 | FakeStream, |
||
41 | FakeDataManager, |
||
42 | FakePsutil, |
||
43 | ) |
||
44 | |||
45 | |||
46 | class FakeStartProcess: |
||
47 | def __init__(self): |
||
48 | self.run_mock = MagicMock() |
||
49 | self.call_mock = MagicMock() |
||
50 | |||
51 | self.func = None |
||
52 | self.args = None |
||
53 | self.kwargs = None |
||
54 | |||
55 | def __call__(self, func, *, args=None, kwargs=None): |
||
56 | self.func = func |
||
57 | self.args = args or [] |
||
58 | self.kwargs = kwargs or {} |
||
59 | return self.call_mock |
||
60 | |||
61 | def run(self): |
||
62 | self.func(*self.args, **self.kwargs) |
||
63 | return self.run_mock |
||
64 | |||
65 | def __repr__(self): |
||
66 | return "<FakeProcess func={} args={} kwargs={}>".format( |
||
67 | self.func, self.args, self.kwargs |
||
68 | ) |
||
69 | |||
70 | |||
71 | class Result(object): |
||
72 | def __init__(self, type_, **kwargs): |
||
73 | self.result_type = type_ |
||
74 | self.host = '' |
||
75 | self.hostname = '' |
||
76 | self.name = '' |
||
77 | self.value = '' |
||
78 | self.port = '' |
||
79 | self.test_id = '' |
||
80 | self.severity = '' |
||
81 | self.qod = '' |
||
82 | self.uri = '' |
||
83 | for name, value in kwargs.items(): |
||
84 | setattr(self, name, value) |
||
85 | |||
86 | |||
87 | class ScanTestCase(unittest.TestCase): |
||
88 | def setUp(self): |
||
89 | self.daemon = DummyWrapper([]) |
||
90 | self.daemon.scan_collection.datamanager = FakeDataManager() |
||
91 | self.daemon.scan_collection.file_storage_dir = '/tmp' |
||
92 | |||
93 | def test_get_default_scanner_params(self): |
||
94 | fs = FakeStream() |
||
95 | |||
96 | self.daemon.handle_command('<get_scanner_details />', fs) |
||
97 | response = fs.get_response() |
||
98 | |||
99 | # The status of the response must be success (i.e. 200) |
||
100 | self.assertEqual(response.get('status'), '200') |
||
101 | # The response root element must have the correct name |
||
102 | self.assertEqual(response.tag, 'get_scanner_details_response') |
||
103 | # The response must contain a 'scanner_params' element |
||
104 | self.assertIsNotNone(response.find('scanner_params')) |
||
105 | |||
106 | def test_get_default_help(self): |
||
107 | fs = FakeStream() |
||
108 | |||
109 | self.daemon.handle_command('<help />', fs) |
||
110 | response = fs.get_response() |
||
111 | self.assertEqual(response.get('status'), '200') |
||
112 | |||
113 | fs = FakeStream() |
||
114 | self.daemon.handle_command('<help format="xml" />', fs) |
||
115 | response = fs.get_response() |
||
116 | |||
117 | self.assertEqual(response.get('status'), '200') |
||
118 | self.assertEqual(response.tag, 'help_response') |
||
119 | |||
120 | def test_get_default_scanner_version(self): |
||
121 | fs = FakeStream() |
||
122 | self.daemon.handle_command('<get_version />', fs) |
||
123 | response = fs.get_response() |
||
124 | |||
125 | self.assertEqual(response.get('status'), '200') |
||
126 | self.assertIsNotNone(response.find('protocol')) |
||
127 | |||
128 | def test_get_vts_no_vt(self): |
||
129 | fs = FakeStream() |
||
130 | |||
131 | self.daemon.handle_command('<get_vts />', fs) |
||
132 | response = fs.get_response() |
||
133 | |||
134 | self.assertEqual(response.get('status'), '200') |
||
135 | self.assertIsNotNone(response.find('vts')) |
||
136 | |||
137 | def test_get_vt_xml_no_dict(self): |
||
138 | single_vt = ('1234', None) |
||
139 | vt = self.daemon.get_vt_xml(single_vt) |
||
140 | self.assertFalse(vt.get('id')) |
||
141 | |||
142 | def test_get_vts_single_vt(self): |
||
143 | fs = FakeStream() |
||
144 | self.daemon.add_vt('1.2.3.4', 'A vulnerability test') |
||
145 | self.daemon.handle_command('<get_vts />', fs) |
||
146 | response = fs.get_response() |
||
147 | |||
148 | self.assertEqual(response.get('status'), '200') |
||
149 | |||
150 | vts = response.find('vts') |
||
151 | self.assertIsNotNone(vts.find('vt')) |
||
152 | |||
153 | vt = vts.find('vt') |
||
154 | self.assertEqual(vt.get('id'), '1.2.3.4') |
||
155 | |||
156 | def test_get_vts_version(self): |
||
157 | fs = FakeStream() |
||
158 | self.daemon.add_vt('1.2.3.4', 'A vulnerability test') |
||
159 | self.daemon.set_vts_version('today') |
||
160 | self.daemon.handle_command('<get_vts />', fs) |
||
161 | response = fs.get_response() |
||
162 | |||
163 | self.assertEqual(response.get('status'), '200') |
||
164 | |||
165 | vts_version = response.find('vts').attrib['vts_version'] |
||
166 | self.assertEqual(vts_version, self.daemon.get_vts_version()) |
||
167 | |||
168 | vts = response.find('vts') |
||
169 | self.assertIsNotNone(vts.find('vt')) |
||
170 | |||
171 | vt = vts.find('vt') |
||
172 | self.assertEqual(vt.get('id'), '1.2.3.4') |
||
173 | |||
174 | def test_get_vts_version_only(self): |
||
175 | fs = FakeStream() |
||
176 | self.daemon.add_vt('1.2.3.4', 'A vulnerability test') |
||
177 | self.daemon.set_vts_version('today') |
||
178 | self.daemon.handle_command('<get_vts version_only="1"/>', fs) |
||
179 | response = fs.get_response() |
||
180 | |||
181 | self.assertEqual(response.get('status'), '200') |
||
182 | |||
183 | vts_version = response.find('vts').attrib['vts_version'] |
||
184 | self.assertEqual(vts_version, self.daemon.get_vts_version()) |
||
185 | |||
186 | vts = response.find('vts') |
||
187 | self.assertIsNone(vts.find('vt')) |
||
188 | |||
189 | def test_get_vts_still_not_init(self): |
||
190 | fs = FakeStream() |
||
191 | self.daemon.initialized = False |
||
192 | self.daemon.handle_command('<get_vts />', fs) |
||
193 | response = fs.get_response() |
||
194 | |||
195 | self.assertEqual(response.get('status'), '400') |
||
196 | |||
197 | def test_get_help_still_not_init(self): |
||
198 | fs = FakeStream() |
||
199 | self.daemon.initialized = False |
||
200 | self.daemon.handle_command('<help/>', fs) |
||
201 | response = fs.get_response() |
||
202 | |||
203 | self.assertEqual(response.get('status'), '200') |
||
204 | |||
205 | View Code Duplication | def test_get_vts_filter_positive(self): |
|
|
|||
206 | self.daemon.add_vt( |
||
207 | '1.2.3.4', |
||
208 | 'A vulnerability test', |
||
209 | vt_params="a", |
||
210 | vt_modification_time='19000202', |
||
211 | ) |
||
212 | fs = FakeStream() |
||
213 | |||
214 | self.daemon.handle_command( |
||
215 | '<get_vts filter="modification_time>19000201"></get_vts>', fs |
||
216 | ) |
||
217 | response = fs.get_response() |
||
218 | |||
219 | self.assertEqual(response.get('status'), '200') |
||
220 | vts = response.find('vts') |
||
221 | |||
222 | vt = vts.find('vt') |
||
223 | self.assertIsNotNone(vt) |
||
224 | self.assertEqual(vt.get('id'), '1.2.3.4') |
||
225 | |||
226 | modification_time = response.findall('vts/vt/modification_time') |
||
227 | self.assertEqual( |
||
228 | '<modification_time>19000202</modification_time>', |
||
229 | ET.tostring(modification_time[0]).decode('utf-8'), |
||
230 | ) |
||
231 | |||
232 | View Code Duplication | def test_get_vts_filter_negative(self): |
|
233 | self.daemon.add_vt( |
||
234 | '1.2.3.4', |
||
235 | 'A vulnerability test', |
||
236 | vt_params="a", |
||
237 | vt_modification_time='19000202', |
||
238 | ) |
||
239 | fs = FakeStream() |
||
240 | self.daemon.handle_command( |
||
241 | '<get_vts filter="modification_time<19000203"></get_vts>', fs, |
||
242 | ) |
||
243 | response = fs.get_response() |
||
244 | |||
245 | self.assertEqual(response.get('status'), '200') |
||
246 | |||
247 | vts = response.find('vts') |
||
248 | |||
249 | vt = vts.find('vt') |
||
250 | self.assertIsNotNone(vt) |
||
251 | self.assertEqual(vt.get('id'), '1.2.3.4') |
||
252 | |||
253 | modification_time = response.findall('vts/vt/modification_time') |
||
254 | self.assertEqual( |
||
255 | '<modification_time>19000202</modification_time>', |
||
256 | ET.tostring(modification_time[0]).decode('utf-8'), |
||
257 | ) |
||
258 | |||
259 | def test_get_vts_bad_filter(self): |
||
260 | fs = FakeStream() |
||
261 | cmd = '<get_vts filter="modification_time"/>' |
||
262 | |||
263 | self.assertRaises(OspdCommandError, self.daemon.handle_command, cmd, fs) |
||
264 | self.assertTrue(self.daemon.vts.is_cache_available) |
||
265 | |||
266 | def test_get_vtss_multiple_vts(self): |
||
267 | self.daemon.add_vt('1.2.3.4', 'A vulnerability test') |
||
268 | self.daemon.add_vt('1.2.3.5', 'Another vulnerability test') |
||
269 | self.daemon.add_vt('123456789', 'Yet another vulnerability test') |
||
270 | |||
271 | fs = FakeStream() |
||
272 | |||
273 | self.daemon.handle_command('<get_vts />', fs) |
||
274 | response = fs.get_response() |
||
275 | self.assertEqual(response.get('status'), '200') |
||
276 | |||
277 | vts = response.find('vts') |
||
278 | self.assertIsNotNone(vts.find('vt')) |
||
279 | |||
280 | def test_get_vts_multiple_vts_with_custom(self): |
||
281 | self.daemon.add_vt('1.2.3.4', 'A vulnerability test', custom='b') |
||
282 | self.daemon.add_vt( |
||
283 | '4.3.2.1', 'Another vulnerability test with custom info', custom='b' |
||
284 | ) |
||
285 | self.daemon.add_vt( |
||
286 | '123456789', 'Yet another vulnerability test', custom='b' |
||
287 | ) |
||
288 | fs = FakeStream() |
||
289 | |||
290 | self.daemon.handle_command('<get_vts />', fs) |
||
291 | response = fs.get_response() |
||
292 | |||
293 | custom = response.findall('vts/vt/custom') |
||
294 | |||
295 | self.assertEqual(3, len(custom)) |
||
296 | |||
297 | View Code Duplication | def test_get_vts_vts_with_params(self): |
|
298 | self.daemon.add_vt( |
||
299 | '1.2.3.4', 'A vulnerability test', vt_params="a", custom="b" |
||
300 | ) |
||
301 | fs = FakeStream() |
||
302 | |||
303 | self.daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs) |
||
304 | response = fs.get_response() |
||
305 | |||
306 | # The status of the response must be success (i.e. 200) |
||
307 | self.assertEqual(response.get('status'), '200') |
||
308 | |||
309 | # The response root element must have the correct name |
||
310 | self.assertEqual(response.tag, 'get_vts_response') |
||
311 | # The response must contain a 'scanner_params' element |
||
312 | self.assertIsNotNone(response.find('vts')) |
||
313 | |||
314 | vt_params = response[0][0].findall('params') |
||
315 | self.assertEqual(1, len(vt_params)) |
||
316 | |||
317 | custom = response[0][0].findall('custom') |
||
318 | self.assertEqual(1, len(custom)) |
||
319 | |||
320 | params = response.findall('vts/vt/params/param') |
||
321 | self.assertEqual(2, len(params)) |
||
322 | |||
323 | View Code Duplication | def test_get_vts_vts_with_refs(self): |
|
324 | self.daemon.add_vt( |
||
325 | '1.2.3.4', |
||
326 | 'A vulnerability test', |
||
327 | vt_params="a", |
||
328 | custom="b", |
||
329 | vt_refs="c", |
||
330 | ) |
||
331 | fs = FakeStream() |
||
332 | |||
333 | self.daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs) |
||
334 | response = fs.get_response() |
||
335 | |||
336 | # The status of the response must be success (i.e. 200) |
||
337 | self.assertEqual(response.get('status'), '200') |
||
338 | |||
339 | # The response root element must have the correct name |
||
340 | self.assertEqual(response.tag, 'get_vts_response') |
||
341 | |||
342 | # The response must contain a 'vts' element |
||
343 | self.assertIsNotNone(response.find('vts')) |
||
344 | |||
345 | vt_params = response[0][0].findall('params') |
||
346 | self.assertEqual(1, len(vt_params)) |
||
347 | |||
348 | custom = response[0][0].findall('custom') |
||
349 | self.assertEqual(1, len(custom)) |
||
350 | |||
351 | refs = response.findall('vts/vt/refs/ref') |
||
352 | self.assertEqual(2, len(refs)) |
||
353 | |||
354 | def test_get_vts_vts_with_dependencies(self): |
||
355 | self.daemon.add_vt( |
||
356 | '1.2.3.4', |
||
357 | 'A vulnerability test', |
||
358 | vt_params="a", |
||
359 | custom="b", |
||
360 | vt_dependencies="c", |
||
361 | ) |
||
362 | fs = FakeStream() |
||
363 | |||
364 | self.daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs) |
||
365 | |||
366 | response = fs.get_response() |
||
367 | |||
368 | deps = response.findall('vts/vt/dependencies/dependency') |
||
369 | self.assertEqual(2, len(deps)) |
||
370 | |||
371 | def test_get_vts_vts_with_severities(self): |
||
372 | self.daemon.add_vt( |
||
373 | '1.2.3.4', |
||
374 | 'A vulnerability test', |
||
375 | vt_params="a", |
||
376 | custom="b", |
||
377 | severities="c", |
||
378 | ) |
||
379 | fs = FakeStream() |
||
380 | |||
381 | self.daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs) |
||
382 | response = fs.get_response() |
||
383 | |||
384 | severity = response.findall('vts/vt/severities/severity') |
||
385 | self.assertEqual(1, len(severity)) |
||
386 | |||
387 | def test_get_vts_vts_with_detection_qodt(self): |
||
388 | self.daemon.add_vt( |
||
389 | '1.2.3.4', |
||
390 | 'A vulnerability test', |
||
391 | vt_params="a", |
||
392 | custom="b", |
||
393 | detection="c", |
||
394 | qod_t="d", |
||
395 | ) |
||
396 | fs = FakeStream() |
||
397 | |||
398 | self.daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs) |
||
399 | response = fs.get_response() |
||
400 | |||
401 | detection = response.findall('vts/vt/detection') |
||
402 | self.assertEqual(1, len(detection)) |
||
403 | |||
404 | def test_get_vts_vts_with_detection_qodv(self): |
||
405 | self.daemon.add_vt( |
||
406 | '1.2.3.4', |
||
407 | 'A vulnerability test', |
||
408 | vt_params="a", |
||
409 | custom="b", |
||
410 | detection="c", |
||
411 | qod_v="d", |
||
412 | ) |
||
413 | fs = FakeStream() |
||
414 | |||
415 | self.daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs) |
||
416 | response = fs.get_response() |
||
417 | |||
418 | detection = response.findall('vts/vt/detection') |
||
419 | self.assertEqual(1, len(detection)) |
||
420 | |||
421 | def test_get_vts_vts_with_summary(self): |
||
422 | self.daemon.add_vt( |
||
423 | '1.2.3.4', |
||
424 | 'A vulnerability test', |
||
425 | vt_params="a", |
||
426 | custom="b", |
||
427 | summary="c", |
||
428 | ) |
||
429 | fs = FakeStream() |
||
430 | |||
431 | self.daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs) |
||
432 | response = fs.get_response() |
||
433 | |||
434 | summary = response.findall('vts/vt/summary') |
||
435 | self.assertEqual(1, len(summary)) |
||
436 | |||
437 | def test_get_vts_vts_with_impact(self): |
||
438 | self.daemon.add_vt( |
||
439 | '1.2.3.4', |
||
440 | 'A vulnerability test', |
||
441 | vt_params="a", |
||
442 | custom="b", |
||
443 | impact="c", |
||
444 | ) |
||
445 | fs = FakeStream() |
||
446 | |||
447 | self.daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs) |
||
448 | response = fs.get_response() |
||
449 | |||
450 | impact = response.findall('vts/vt/impact') |
||
451 | self.assertEqual(1, len(impact)) |
||
452 | |||
453 | def test_get_vts_vts_with_affected(self): |
||
454 | self.daemon.add_vt( |
||
455 | '1.2.3.4', |
||
456 | 'A vulnerability test', |
||
457 | vt_params="a", |
||
458 | custom="b", |
||
459 | affected="c", |
||
460 | ) |
||
461 | fs = FakeStream() |
||
462 | |||
463 | self.daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs) |
||
464 | response = fs.get_response() |
||
465 | |||
466 | affect = response.findall('vts/vt/affected') |
||
467 | self.assertEqual(1, len(affect)) |
||
468 | |||
469 | def test_get_vts_vts_with_insight(self): |
||
470 | self.daemon.add_vt( |
||
471 | '1.2.3.4', |
||
472 | 'A vulnerability test', |
||
473 | vt_params="a", |
||
474 | custom="b", |
||
475 | insight="c", |
||
476 | ) |
||
477 | fs = FakeStream() |
||
478 | |||
479 | self.daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs) |
||
480 | response = fs.get_response() |
||
481 | |||
482 | insight = response.findall('vts/vt/insight') |
||
483 | self.assertEqual(1, len(insight)) |
||
484 | |||
485 | def test_get_vts_vts_with_solution(self): |
||
486 | self.daemon.add_vt( |
||
487 | '1.2.3.4', |
||
488 | 'A vulnerability test', |
||
489 | vt_params="a", |
||
490 | custom="b", |
||
491 | solution="c", |
||
492 | solution_t="d", |
||
493 | solution_m="e", |
||
494 | ) |
||
495 | fs = FakeStream() |
||
496 | |||
497 | self.daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs) |
||
498 | response = fs.get_response() |
||
499 | |||
500 | solution = response.findall('vts/vt/solution') |
||
501 | self.assertEqual(1, len(solution)) |
||
502 | |||
503 | def test_get_vts_vts_with_ctime(self): |
||
504 | self.daemon.add_vt( |
||
505 | '1.2.3.4', |
||
506 | 'A vulnerability test', |
||
507 | vt_params="a", |
||
508 | vt_creation_time='01-01-1900', |
||
509 | ) |
||
510 | fs = FakeStream() |
||
511 | |||
512 | self.daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs) |
||
513 | response = fs.get_response() |
||
514 | |||
515 | creation_time = response.findall('vts/vt/creation_time') |
||
516 | self.assertEqual( |
||
517 | '<creation_time>01-01-1900</creation_time>', |
||
518 | ET.tostring(creation_time[0]).decode('utf-8'), |
||
519 | ) |
||
520 | |||
521 | def test_get_vts_vts_with_mtime(self): |
||
522 | self.daemon.add_vt( |
||
523 | '1.2.3.4', |
||
524 | 'A vulnerability test', |
||
525 | vt_params="a", |
||
526 | vt_modification_time='02-01-1900', |
||
527 | ) |
||
528 | fs = FakeStream() |
||
529 | |||
530 | self.daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', fs) |
||
531 | response = fs.get_response() |
||
532 | |||
533 | modification_time = response.findall('vts/vt/modification_time') |
||
534 | self.assertEqual( |
||
535 | '<modification_time>02-01-1900</modification_time>', |
||
536 | ET.tostring(modification_time[0]).decode('utf-8'), |
||
537 | ) |
||
538 | |||
539 | def test_clean_forgotten_scans(self): |
||
540 | fs = FakeStream() |
||
541 | |||
542 | self.daemon.handle_command( |
||
543 | '<start_scan target="localhost" ports="80, ' |
||
544 | '443"><scanner_params /></start_scan>', |
||
545 | fs, |
||
546 | ) |
||
547 | response = fs.get_response() |
||
548 | |||
549 | scan_id = response.findtext('id') |
||
550 | |||
551 | finished = False |
||
552 | |||
553 | self.daemon.start_queued_scans() |
||
554 | while not finished: |
||
555 | fs = FakeStream() |
||
556 | self.daemon.handle_command( |
||
557 | '<get_scans scan_id="%s" details="1"/>' % scan_id, fs |
||
558 | ) |
||
559 | response = fs.get_response() |
||
560 | |||
561 | scans = response.findall('scan') |
||
562 | self.assertEqual(1, len(scans)) |
||
563 | |||
564 | scan = scans[0] |
||
565 | |||
566 | if scan.get('end_time') != '0': |
||
567 | finished = True |
||
568 | else: |
||
569 | time.sleep(0.01) |
||
570 | |||
571 | fs = FakeStream() |
||
572 | self.daemon.handle_command( |
||
573 | '<get_scans scan_id="%s" details="1"/>' % scan_id, fs |
||
574 | ) |
||
575 | response = fs.get_response() |
||
576 | |||
577 | self.assertEqual( |
||
578 | len(list(self.daemon.scan_collection.ids_iterator())), 1 |
||
579 | ) |
||
580 | |||
581 | # Set an old end_time |
||
582 | self.daemon.scan_collection.scans_table[scan_id]['end_time'] = 123456 |
||
583 | # Run the check |
||
584 | self.daemon.clean_forgotten_scans() |
||
585 | # Not removed |
||
586 | self.assertEqual( |
||
587 | len(list(self.daemon.scan_collection.ids_iterator())), 1 |
||
588 | ) |
||
589 | |||
590 | # Set the max time and run again |
||
591 | self.daemon.scaninfo_store_time = 1 |
||
592 | self.daemon.clean_forgotten_scans() |
||
593 | # Now is removed |
||
594 | self.assertEqual( |
||
595 | len(list(self.daemon.scan_collection.ids_iterator())), 0 |
||
596 | ) |
||
597 | |||
598 | def test_scan_with_error(self): |
||
599 | fs = FakeStream() |
||
600 | |||
601 | self.daemon.handle_command( |
||
602 | '<start_scan target="localhost" ports="80, ' |
||
603 | '443"><scanner_params /></start_scan>', |
||
604 | fs, |
||
605 | ) |
||
606 | |||
607 | response = fs.get_response() |
||
608 | scan_id = response.findtext('id') |
||
609 | finished = False |
||
610 | self.daemon.start_queued_scans() |
||
611 | self.daemon.add_scan_error( |
||
612 | scan_id, host='a', value='something went wrong' |
||
613 | ) |
||
614 | |||
615 | while not finished: |
||
616 | fs = FakeStream() |
||
617 | self.daemon.handle_command( |
||
618 | '<get_scans scan_id="%s" details="1"/>' % scan_id, fs |
||
619 | ) |
||
620 | response = fs.get_response() |
||
621 | |||
622 | scans = response.findall('scan') |
||
623 | self.assertEqual(1, len(scans)) |
||
624 | |||
625 | scan = scans[0] |
||
626 | status = scan.get('status') |
||
627 | |||
628 | if status == "init" or status == "running": |
||
629 | self.assertEqual('0', scan.get('end_time')) |
||
630 | time.sleep(0.010) |
||
631 | else: |
||
632 | finished = True |
||
633 | |||
634 | fs = FakeStream() |
||
635 | |||
636 | self.daemon.handle_command( |
||
637 | '<get_scans scan_id="%s" details="1"/>' % scan_id, fs |
||
638 | ) |
||
639 | response = fs.get_response() |
||
640 | |||
641 | self.assertEqual( |
||
642 | response.findtext('scan/results/result'), 'something went wrong' |
||
643 | ) |
||
644 | fs = FakeStream() |
||
645 | self.daemon.handle_command('<delete_scan scan_id="%s" />' % scan_id, fs) |
||
646 | response = fs.get_response() |
||
647 | |||
648 | self.assertEqual(response.get('status'), '200') |
||
649 | |||
650 | def test_get_scan_pop(self): |
||
651 | fs = FakeStream() |
||
652 | |||
653 | self.daemon.handle_command( |
||
654 | '<start_scan target="localhost" ports="80, 443">' |
||
655 | '<scanner_params /></start_scan>', |
||
656 | fs, |
||
657 | ) |
||
658 | self.daemon.start_queued_scans() |
||
659 | response = fs.get_response() |
||
660 | |||
661 | scan_id = response.findtext('id') |
||
662 | self.daemon.add_scan_host_detail( |
||
663 | scan_id, host='a', value='Some Host Detail' |
||
664 | ) |
||
665 | |||
666 | time.sleep(1) |
||
667 | |||
668 | fs = FakeStream() |
||
669 | self.daemon.handle_command('<get_scans scan_id="%s"/>' % scan_id, fs) |
||
670 | response = fs.get_response() |
||
671 | |||
672 | self.assertEqual( |
||
673 | response.findtext('scan/results/result'), 'Some Host Detail' |
||
674 | ) |
||
675 | fs = FakeStream() |
||
676 | self.daemon.handle_command( |
||
677 | '<get_scans scan_id="%s" pop_results="1"/>' % scan_id, fs |
||
678 | ) |
||
679 | response = fs.get_response() |
||
680 | |||
681 | self.assertEqual( |
||
682 | response.findtext('scan/results/result'), 'Some Host Detail' |
||
683 | ) |
||
684 | |||
685 | fs = FakeStream() |
||
686 | self.daemon.handle_command( |
||
687 | '<get_scans scan_id="%s" details="0" pop_results="1"/>' % scan_id, |
||
688 | fs, |
||
689 | ) |
||
690 | response = fs.get_response() |
||
691 | |||
692 | self.assertEqual(response.findtext('scan/results/result'), None) |
||
693 | |||
694 | def test_get_scan_pop_max_res(self): |
||
695 | fs = FakeStream() |
||
696 | self.daemon.handle_command( |
||
697 | '<start_scan target="localhost" ports="80, 443">' |
||
698 | '<scanner_params /></start_scan>', |
||
699 | fs, |
||
700 | ) |
||
701 | self.daemon.start_queued_scans() |
||
702 | response = fs.get_response() |
||
703 | scan_id = response.findtext('id') |
||
704 | |||
705 | self.daemon.add_scan_log(scan_id, host='a', name='a') |
||
706 | self.daemon.add_scan_log(scan_id, host='c', name='c') |
||
707 | self.daemon.add_scan_log(scan_id, host='b', name='b') |
||
708 | |||
709 | fs = FakeStream() |
||
710 | self.daemon.handle_command( |
||
711 | '<get_scans scan_id="%s" pop_results="1" max_results="1"/>' |
||
712 | % scan_id, |
||
713 | fs, |
||
714 | ) |
||
715 | |||
716 | response = fs.get_response() |
||
717 | |||
718 | self.assertEqual(len(response.findall('scan/results/result')), 1) |
||
719 | |||
720 | fs = FakeStream() |
||
721 | self.daemon.handle_command( |
||
722 | '<get_scans scan_id="%s" pop_results="1"/>' % scan_id, fs |
||
723 | ) |
||
724 | response = fs.get_response() |
||
725 | self.assertEqual(len(response.findall('scan/results/result')), 2) |
||
726 | |||
727 | View Code Duplication | def test_get_scan_results_clean(self): |
|
728 | fs = FakeStream() |
||
729 | self.daemon.handle_command( |
||
730 | '<start_scan target="localhost" ports="80, 443">' |
||
731 | '<scanner_params /></start_scan>', |
||
732 | fs, |
||
733 | ) |
||
734 | self.daemon.start_queued_scans() |
||
735 | response = fs.get_response() |
||
736 | scan_id = response.findtext('id') |
||
737 | |||
738 | self.daemon.add_scan_log(scan_id, host='a', name='a') |
||
739 | self.daemon.add_scan_log(scan_id, host='c', name='c') |
||
740 | self.daemon.add_scan_log(scan_id, host='b', name='b') |
||
741 | |||
742 | fs = FakeStream() |
||
743 | self.daemon.handle_command( |
||
744 | '<get_scans scan_id="%s" pop_results="1"/>' % scan_id, fs, |
||
745 | ) |
||
746 | |||
747 | res_len = len( |
||
748 | self.daemon.scan_collection.scans_table[scan_id]['results'] |
||
749 | ) |
||
750 | self.assertEqual(res_len, 0) |
||
751 | |||
752 | res_len = len( |
||
753 | self.daemon.scan_collection.scans_table[scan_id]['temp_results'] |
||
754 | ) |
||
755 | self.assertEqual(res_len, 0) |
||
756 | |||
757 | View Code Duplication | def test_get_scan_results_restore(self): |
|
758 | fs = FakeStream() |
||
759 | self.daemon.handle_command( |
||
760 | '<start_scan target="localhost" ports="80, 443">' |
||
761 | '<scanner_params /></start_scan>', |
||
762 | fs, |
||
763 | ) |
||
764 | self.daemon.start_queued_scans() |
||
765 | response = fs.get_response() |
||
766 | scan_id = response.findtext('id') |
||
767 | |||
768 | self.daemon.add_scan_log(scan_id, host='a', name='a') |
||
769 | self.daemon.add_scan_log(scan_id, host='c', name='c') |
||
770 | self.daemon.add_scan_log(scan_id, host='b', name='b') |
||
771 | |||
772 | fs = FakeStream(return_value=False) |
||
773 | self.daemon.handle_command( |
||
774 | '<get_scans scan_id="%s" pop_results="1"/>' % scan_id, fs, |
||
775 | ) |
||
776 | |||
777 | res_len = len( |
||
778 | self.daemon.scan_collection.scans_table[scan_id]['results'] |
||
779 | ) |
||
780 | self.assertEqual(res_len, 3) |
||
781 | |||
782 | res_len = len( |
||
783 | self.daemon.scan_collection.scans_table[scan_id]['temp_results'] |
||
784 | ) |
||
785 | self.assertEqual(res_len, 0) |
||
786 | |||
787 | def test_billon_laughs(self): |
||
788 | # pylint: disable=line-too-long |
||
789 | |||
790 | lol = ( |
||
791 | '<?xml version="1.0"?>' |
||
792 | '<!DOCTYPE lolz [' |
||
793 | ' <!ENTITY lol "lol">' |
||
794 | ' <!ELEMENT lolz (#PCDATA)>' |
||
795 | ' <!ENTITY lol1 "&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;">' |
||
796 | ' <!ENTITY lol2 "&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;">' |
||
797 | ' <!ENTITY lol3 "&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;">' |
||
798 | ' <!ENTITY lol4 "&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;">' |
||
799 | ' <!ENTITY lol5 "&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;">' |
||
800 | ' <!ENTITY lol6 "&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;">' |
||
801 | ' <!ENTITY lol7 "&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;">' |
||
802 | ' <!ENTITY lol8 "&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;">' |
||
803 | ' <!ENTITY lol9 "&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;">' |
||
804 | ']>' |
||
805 | ) |
||
806 | fs = FakeStream() |
||
807 | self.assertRaises( |
||
808 | EntitiesForbidden, self.daemon.handle_command, lol, fs |
||
809 | ) |
||
810 | |||
811 | def test_target_with_credentials(self): |
||
812 | fs = FakeStream() |
||
813 | self.daemon.handle_command( |
||
814 | '<start_scan>' |
||
815 | '<scanner_params /><vts><vt id="1.2.3.4" />' |
||
816 | '</vts>' |
||
817 | '<targets><target>' |
||
818 | '<hosts>192.168.0.0/24</hosts><ports>22' |
||
819 | '</ports><credentials>' |
||
820 | '<credential type="up" service="ssh" port="22">' |
||
821 | '<username>scanuser</username>' |
||
822 | '<password>mypass</password>' |
||
823 | '</credential><credential type="up" service="smb">' |
||
824 | '<username>smbuser</username>' |
||
825 | '<password>mypass</password></credential>' |
||
826 | '</credentials>' |
||
827 | '</target></targets>' |
||
828 | '</start_scan>', |
||
829 | fs, |
||
830 | ) |
||
831 | self.daemon.start_queued_scans() |
||
832 | response = fs.get_response() |
||
833 | |||
834 | self.assertEqual(response.get('status'), '200') |
||
835 | |||
836 | cred_dict = { |
||
837 | 'ssh': { |
||
838 | 'type': 'up', |
||
839 | 'password': 'mypass', |
||
840 | 'port': '22', |
||
841 | 'username': 'scanuser', |
||
842 | }, |
||
843 | 'smb': {'type': 'up', 'password': 'mypass', 'username': 'smbuser'}, |
||
844 | } |
||
845 | scan_id = response.findtext('id') |
||
846 | response = self.daemon.get_scan_credentials(scan_id) |
||
847 | self.assertEqual(response, cred_dict) |
||
848 | |||
849 | def test_scan_get_target(self): |
||
850 | fs = FakeStream() |
||
851 | self.daemon.handle_command( |
||
852 | '<start_scan>' |
||
853 | '<scanner_params /><vts><vt id="1.2.3.4" />' |
||
854 | '</vts>' |
||
855 | '<targets><target>' |
||
856 | '<hosts>localhosts,192.168.0.0/24</hosts>' |
||
857 | '<ports>80,443</ports>' |
||
858 | '</target></targets>' |
||
859 | '</start_scan>', |
||
860 | fs, |
||
861 | ) |
||
862 | self.daemon.start_queued_scans() |
||
863 | |||
864 | response = fs.get_response() |
||
865 | scan_id = response.findtext('id') |
||
866 | |||
867 | fs = FakeStream() |
||
868 | self.daemon.handle_command('<get_scans scan_id="%s"/>' % scan_id, fs) |
||
869 | response = fs.get_response() |
||
870 | |||
871 | scan_res = response.find('scan') |
||
872 | self.assertEqual(scan_res.get('target'), 'localhosts,192.168.0.0/24') |
||
873 | |||
874 | def test_scan_get_target_options(self): |
||
875 | fs = FakeStream() |
||
876 | self.daemon.handle_command( |
||
877 | '<start_scan>' |
||
878 | '<scanner_params /><vts><vt id="1.2.3.4" />' |
||
879 | '</vts>' |
||
880 | '<targets>' |
||
881 | '<target><hosts>192.168.0.1</hosts>' |
||
882 | '<ports>22</ports><alive_test>0</alive_test></target>' |
||
883 | '</targets>' |
||
884 | '</start_scan>', |
||
885 | fs, |
||
886 | ) |
||
887 | self.daemon.start_queued_scans() |
||
888 | |||
889 | response = fs.get_response() |
||
890 | |||
891 | scan_id = response.findtext('id') |
||
892 | time.sleep(1) |
||
893 | target_options = self.daemon.get_scan_target_options(scan_id) |
||
894 | self.assertEqual(target_options, {'alive_test': '0'}) |
||
895 | |||
896 | def test_progress(self): |
||
897 | |||
898 | fs = FakeStream() |
||
899 | self.daemon.handle_command( |
||
900 | '<start_scan parallel="2">' |
||
901 | '<scanner_params />' |
||
902 | '<targets><target>' |
||
903 | '<hosts>localhost1, localhost2</hosts>' |
||
904 | '<ports>22</ports>' |
||
905 | '</target></targets>' |
||
906 | '</start_scan>', |
||
907 | fs, |
||
908 | ) |
||
909 | self.daemon.start_queued_scans() |
||
910 | response = fs.get_response() |
||
911 | |||
912 | scan_id = response.findtext('id') |
||
913 | self.daemon.set_scan_host_progress(scan_id, 'localhost1', 75) |
||
914 | self.daemon.set_scan_host_progress(scan_id, 'localhost2', 25) |
||
915 | |||
916 | self.assertEqual( |
||
917 | self.daemon.scan_collection.calculate_target_progress(scan_id), 50 |
||
918 | ) |
||
919 | |||
920 | @patch('ospd.ospd.os') |
||
921 | def test_interrupted_scan(self, mock_os): |
||
922 | mock_os.setsid.return_value = None |
||
923 | fs = FakeStream() |
||
924 | self.daemon.handle_command( |
||
925 | '<start_scan parallel="2">' |
||
926 | '<scanner_params />' |
||
927 | '<targets><target>' |
||
928 | '<hosts>localhost1, localhost2, localhost3, localhost4</hosts>' |
||
929 | '<ports>22</ports>' |
||
930 | '</target></targets>' |
||
931 | '</start_scan>', |
||
932 | fs, |
||
933 | ) |
||
934 | self.daemon.start_queued_scans() |
||
935 | |||
936 | response = fs.get_response() |
||
937 | scan_id = response.findtext('id') |
||
938 | |||
939 | self.daemon.exec_scan = Mock(return_value=None) |
||
940 | self.daemon.set_scan_host_progress(scan_id, 'localhost1', 5) |
||
941 | self.daemon.set_scan_host_progress(scan_id, 'localhost2', 14) |
||
942 | while self.daemon.get_scan_status(scan_id) == ScanStatus.INIT: |
||
943 | fs = FakeStream() |
||
944 | self.daemon.handle_command( |
||
945 | '<get_scans scan_id="%s" details="0" progress="0"/>' % scan_id, |
||
946 | fs, |
||
947 | ) |
||
948 | response = fs.get_response() |
||
949 | status = response.find('scan').attrib['status'] |
||
950 | |||
951 | self.assertEqual(status, ScanStatus.INTERRUPTED.name.lower()) |
||
952 | |||
953 | def test_sort_host_finished(self): |
||
954 | |||
955 | fs = FakeStream() |
||
956 | self.daemon.handle_command( |
||
957 | '<start_scan parallel="2">' |
||
958 | '<scanner_params />' |
||
959 | '<targets><target>' |
||
960 | '<hosts>localhost1, localhost2, localhost3, localhost4</hosts>' |
||
961 | '<ports>22</ports>' |
||
962 | '</target></targets>' |
||
963 | '</start_scan>', |
||
964 | fs, |
||
965 | ) |
||
966 | self.daemon.start_queued_scans() |
||
967 | |||
968 | response = fs.get_response() |
||
969 | |||
970 | scan_id = response.findtext('id') |
||
971 | self.daemon.set_scan_host_progress(scan_id, 'localhost3', -1) |
||
972 | self.daemon.set_scan_host_progress(scan_id, 'localhost1', 75) |
||
973 | self.daemon.set_scan_host_progress(scan_id, 'localhost4', 100) |
||
974 | self.daemon.set_scan_host_progress(scan_id, 'localhost2', 25) |
||
975 | |||
976 | self.daemon.sort_host_finished(scan_id, ['localhost3', 'localhost4']) |
||
977 | |||
978 | rounded_progress = self.daemon.scan_collection.calculate_target_progress( # pylint: disable=line-too-long) |
||
979 | scan_id |
||
980 | ) |
||
981 | self.assertEqual(rounded_progress, 66) |
||
982 | |||
983 | def test_calculate_progress_without_current_hosts(self): |
||
984 | |||
985 | fs = FakeStream() |
||
986 | self.daemon.handle_command( |
||
987 | '<start_scan parallel="2">' |
||
988 | '<scanner_params />' |
||
989 | '<targets><target>' |
||
990 | '<hosts>localhost1, localhost2, localhost3, localhost4</hosts>' |
||
991 | '<ports>22</ports>' |
||
992 | '</target></targets>' |
||
993 | '</start_scan>', |
||
994 | fs, |
||
995 | ) |
||
996 | self.daemon.start_queued_scans() |
||
997 | response = fs.get_response() |
||
998 | |||
999 | scan_id = response.findtext('id') |
||
1000 | self.daemon.set_scan_host_progress(scan_id) |
||
1001 | self.daemon.set_scan_host_progress(scan_id, 'localhost3', -1) |
||
1002 | self.daemon.set_scan_host_progress(scan_id, 'localhost4', 100) |
||
1003 | |||
1004 | self.daemon.sort_host_finished(scan_id, ['localhost3', 'localhost4']) |
||
1005 | |||
1006 | float_progress = self.daemon.scan_collection.calculate_target_progress( |
||
1007 | scan_id |
||
1008 | ) |
||
1009 | self.assertEqual(int(float_progress), 33) |
||
1010 | |||
1011 | self.daemon.scan_collection.set_progress(scan_id, float_progress) |
||
1012 | progress = self.daemon.get_scan_progress(scan_id) |
||
1013 | self.assertEqual(progress, 33) |
||
1014 | |||
1015 | def test_get_scan_without_scanid(self): |
||
1016 | |||
1017 | fs = FakeStream() |
||
1018 | self.daemon.handle_command( |
||
1019 | '<start_scan parallel="2">' |
||
1020 | '<scanner_params />' |
||
1021 | '<targets><target>' |
||
1022 | '<hosts>localhost1, localhost2, localhost3, localhost4</hosts>' |
||
1023 | '<ports>22</ports>' |
||
1024 | '</target></targets>' |
||
1025 | '</start_scan>', |
||
1026 | fs, |
||
1027 | ) |
||
1028 | self.daemon.start_queued_scans() |
||
1029 | |||
1030 | fs = FakeStream() |
||
1031 | self.assertRaises( |
||
1032 | OspdCommandError, |
||
1033 | self.daemon.handle_command, |
||
1034 | '<get_scans details="0" progress="1"/>', |
||
1035 | fs, |
||
1036 | ) |
||
1037 | |||
1038 | def test_get_scan_progress_xml(self): |
||
1039 | |||
1040 | fs = FakeStream() |
||
1041 | self.daemon.handle_command( |
||
1042 | '<start_scan parallel="2">' |
||
1043 | '<scanner_params />' |
||
1044 | '<targets><target>' |
||
1045 | '<hosts>localhost1, localhost2, localhost3, localhost4</hosts>' |
||
1046 | '<ports>22</ports>' |
||
1047 | '</target></targets>' |
||
1048 | '</start_scan>', |
||
1049 | fs, |
||
1050 | ) |
||
1051 | self.daemon.start_queued_scans() |
||
1052 | |||
1053 | response = fs.get_response() |
||
1054 | scan_id = response.findtext('id') |
||
1055 | |||
1056 | self.daemon.set_scan_host_progress(scan_id, 'localhost3', -1) |
||
1057 | self.daemon.set_scan_host_progress(scan_id, 'localhost4', 100) |
||
1058 | self.daemon.sort_host_finished(scan_id, ['localhost3', 'localhost4']) |
||
1059 | |||
1060 | self.daemon.set_scan_host_progress(scan_id, 'localhost1', 75) |
||
1061 | self.daemon.set_scan_host_progress(scan_id, 'localhost2', 25) |
||
1062 | |||
1063 | fs = FakeStream() |
||
1064 | self.daemon.handle_command( |
||
1065 | '<get_scans scan_id="%s" details="0" progress="1"/>' % scan_id, fs, |
||
1066 | ) |
||
1067 | response = fs.get_response() |
||
1068 | |||
1069 | progress = response.find('scan/progress') |
||
1070 | |||
1071 | overall = float(progress.findtext('overall')) |
||
1072 | self.assertEqual(int(overall), 66) |
||
1073 | |||
1074 | count_alive = progress.findtext('count_alive') |
||
1075 | self.assertEqual(count_alive, '1') |
||
1076 | |||
1077 | count_dead = progress.findtext('count_dead') |
||
1078 | self.assertEqual(count_dead, '1') |
||
1079 | |||
1080 | current_hosts = progress.findall('host') |
||
1081 | self.assertEqual(len(current_hosts), 2) |
||
1082 | |||
1083 | count_excluded = progress.findtext('count_excluded') |
||
1084 | self.assertEqual(count_excluded, '0') |
||
1085 | |||
1086 | def test_set_get_vts_version(self): |
||
1087 | self.daemon.set_vts_version('1234') |
||
1088 | |||
1089 | version = self.daemon.get_vts_version() |
||
1090 | self.assertEqual('1234', version) |
||
1091 | |||
1092 | def test_set_get_vts_version_error(self): |
||
1093 | self.assertRaises(TypeError, self.daemon.set_vts_version) |
||
1094 | |||
1095 | @patch("ospd.ospd.os") |
||
1096 | @patch("ospd.ospd.create_process") |
||
1097 | def test_scan_exists(self, mock_create_process, _mock_os): |
||
1098 | fp = FakeStartProcess() |
||
1099 | mock_create_process.side_effect = fp |
||
1100 | mock_process = fp.call_mock |
||
1101 | mock_process.start.side_effect = fp.run |
||
1102 | mock_process.is_alive.return_value = True |
||
1103 | mock_process.pid = "main-scan-process" |
||
1104 | |||
1105 | fs = FakeStream() |
||
1106 | self.daemon.handle_command( |
||
1107 | '<start_scan>' |
||
1108 | '<scanner_params />' |
||
1109 | '<targets><target>' |
||
1110 | '<hosts>localhost</hosts>' |
||
1111 | '<ports>22</ports>' |
||
1112 | '</target></targets>' |
||
1113 | '</start_scan>', |
||
1114 | fs, |
||
1115 | ) |
||
1116 | response = fs.get_response() |
||
1117 | scan_id = response.findtext('id') |
||
1118 | self.assertIsNotNone(scan_id) |
||
1119 | |||
1120 | status = response.get('status_text') |
||
1121 | self.assertEqual(status, 'OK') |
||
1122 | |||
1123 | self.daemon.start_queued_scans() |
||
1124 | |||
1125 | assert_called(mock_create_process) |
||
1126 | assert_called(mock_process.start) |
||
1127 | |||
1128 | self.daemon.handle_command('<stop_scan scan_id="%s" />' % scan_id, fs) |
||
1129 | |||
1130 | fs = FakeStream() |
||
1131 | cmd = ( |
||
1132 | '<start_scan scan_id="' + scan_id + '">' |
||
1133 | '<scanner_params />' |
||
1134 | '<targets><target>' |
||
1135 | '<hosts>localhost</hosts>' |
||
1136 | '<ports>22</ports>' |
||
1137 | '</target></targets>' |
||
1138 | '</start_scan>' |
||
1139 | ) |
||
1140 | |||
1141 | self.daemon.handle_command( |
||
1142 | cmd, fs, |
||
1143 | ) |
||
1144 | self.daemon.start_queued_scans() |
||
1145 | |||
1146 | response = fs.get_response() |
||
1147 | status = response.get('status_text') |
||
1148 | self.assertEqual(status, 'Continue') |
||
1149 | |||
1150 | def test_result_order(self): |
||
1151 | |||
1152 | fs = FakeStream() |
||
1153 | self.daemon.handle_command( |
||
1154 | '<start_scan parallel="1">' |
||
1155 | '<scanner_params />' |
||
1156 | '<targets><target>' |
||
1157 | '<hosts>a</hosts>' |
||
1158 | '<ports>22</ports>' |
||
1159 | '</target></targets>' |
||
1160 | '</start_scan>', |
||
1161 | fs, |
||
1162 | ) |
||
1163 | self.daemon.start_queued_scans() |
||
1164 | response = fs.get_response() |
||
1165 | |||
1166 | scan_id = response.findtext('id') |
||
1167 | |||
1168 | self.daemon.add_scan_log(scan_id, host='a', name='a') |
||
1169 | self.daemon.add_scan_log(scan_id, host='c', name='c') |
||
1170 | self.daemon.add_scan_log(scan_id, host='b', name='b') |
||
1171 | hosts = ['a', 'c', 'b'] |
||
1172 | |||
1173 | fs = FakeStream() |
||
1174 | self.daemon.handle_command( |
||
1175 | '<get_scans scan_id="%s" details="1"/>' % scan_id, fs |
||
1176 | ) |
||
1177 | response = fs.get_response() |
||
1178 | |||
1179 | results = response.findall("scan/results/") |
||
1180 | |||
1181 | for idx, res in enumerate(results): |
||
1182 | att_dict = res.attrib |
||
1183 | self.assertEqual(hosts[idx], att_dict['name']) |
||
1184 | |||
1185 | def test_batch_result(self): |
||
1186 | reslist = ResultList() |
||
1187 | fs = FakeStream() |
||
1188 | self.daemon.handle_command( |
||
1189 | '<start_scan parallel="1">' |
||
1190 | '<scanner_params />' |
||
1191 | '<targets><target>' |
||
1192 | '<hosts>a</hosts>' |
||
1193 | '<ports>22</ports>' |
||
1194 | '</target></targets>' |
||
1195 | '</start_scan>', |
||
1196 | fs, |
||
1197 | ) |
||
1198 | self.daemon.start_queued_scans() |
||
1199 | response = fs.get_response() |
||
1200 | |||
1201 | scan_id = response.findtext('id') |
||
1202 | reslist.add_scan_log_to_list(host='a', name='a') |
||
1203 | reslist.add_scan_log_to_list(host='c', name='c') |
||
1204 | reslist.add_scan_log_to_list(host='b', name='b') |
||
1205 | self.daemon.scan_collection.add_result_list(scan_id, reslist) |
||
1206 | |||
1207 | hosts = ['a', 'c', 'b'] |
||
1208 | |||
1209 | fs = FakeStream() |
||
1210 | self.daemon.handle_command( |
||
1211 | '<get_scans scan_id="%s" details="1"/>' % scan_id, fs |
||
1212 | ) |
||
1213 | response = fs.get_response() |
||
1214 | |||
1215 | results = response.findall("scan/results/") |
||
1216 | |||
1217 | for idx, res in enumerate(results): |
||
1218 | att_dict = res.attrib |
||
1219 | self.assertEqual(hosts[idx], att_dict['name']) |
||
1220 | |||
1221 | def test_is_new_scan_allowed_false(self): |
||
1222 | self.daemon.scan_processes = { # pylint: disable=protected-access |
||
1223 | 'a': 1, |
||
1224 | 'b': 2, |
||
1225 | } |
||
1226 | self.daemon.max_scans = 1 |
||
1227 | |||
1228 | self.assertFalse(self.daemon.is_new_scan_allowed()) |
||
1229 | |||
1230 | def test_is_new_scan_allowed_true(self): |
||
1231 | self.daemon.scan_processes = { # pylint: disable=protected-access |
||
1232 | 'a': 1, |
||
1233 | 'b': 2, |
||
1234 | } |
||
1235 | self.daemon.max_scans = 3 |
||
1236 | |||
1237 | self.assertTrue(self.daemon.is_new_scan_allowed()) |
||
1238 | |||
1239 | def test_start_queue_scan_daemon_not_init(self): |
||
1240 | self.daemon.get_count_queued_scans = MagicMock(return_value=10) |
||
1241 | self.daemon.initialized = False |
||
1242 | logging.Logger.info = Mock() |
||
1243 | self.daemon.start_queued_scans() |
||
1244 | |||
1245 | logging.Logger.info.assert_called_with( # pylint: disable=no-member |
||
1246 | "Queued task can not be started because a " |
||
1247 | "feed update is being performed." |
||
1248 | ) |
||
1249 | |||
1250 | @patch("ospd.ospd.psutil") |
||
1251 | def test_free_memory_true(self, mock_psutil): |
||
1252 | self.daemon.min_free_mem_scan_queue = 1000 |
||
1253 | # 1.5 GB free |
||
1254 | mock_psutil.virtual_memory.return_value = FakePsutil( |
||
1255 | available=1500000000 |
||
1256 | ) |
||
1257 | |||
1258 | self.assertTrue(self.daemon.is_enough_free_memory()) |
||
1259 | |||
1260 | @patch("ospd.ospd.psutil") |
||
1261 | def test_free_memory_false(self, mock_psutil): |
||
1262 | self.daemon.min_free_mem_scan_queue = 2000 |
||
1263 | # 1.5 GB free |
||
1264 | mock_psutil.virtual_memory.return_value = FakePsutil( |
||
1265 | available=1500000000 |
||
1266 | ) |
||
1267 | |||
1268 | self.assertFalse(self.daemon.is_enough_free_memory()) |
||
1269 | |||
1270 | def test_count_queued_scans(self): |
||
1271 | fs = FakeStream() |
||
1272 | self.daemon.handle_command( |
||
1273 | '<start_scan>' |
||
1274 | '<scanner_params /><vts><vt id="1.2.3.4" />' |
||
1275 | '</vts>' |
||
1276 | '<targets><target>' |
||
1277 | '<hosts>localhosts,192.168.0.0/24</hosts>' |
||
1278 | '<ports>80,443</ports>' |
||
1279 | '</target></targets>' |
||
1280 | '</start_scan>', |
||
1281 | fs, |
||
1282 | ) |
||
1283 | |||
1284 | self.assertEqual(self.daemon.get_count_queued_scans(), 1) |
||
1285 | self.daemon.start_queued_scans() |
||
1286 | self.assertEqual(self.daemon.get_count_queued_scans(), 0) |
||
1287 |