| Total Complexity | 41 |
| Total Lines | 737 |
| Duplicated Lines | 6.38 % |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like tests.test_daemon often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | # -*- coding: utf-8 -*- |
||
| 2 | # Copyright (C) 2014-2020 Greenbone Networks GmbH |
||
| 3 | # |
||
| 4 | # SPDX-License-Identifier: AGPL-3.0-or-later |
||
| 5 | # |
||
| 6 | # This program is free software: you can redistribute it and/or modify |
||
| 7 | # it under the terms of the GNU Affero General Public License as |
||
| 8 | # published by the Free Software Foundation, either version 3 of the |
||
| 9 | # License, or (at your option) any later version. |
||
| 10 | # |
||
| 11 | # This program is distributed in the hope that it will be useful, |
||
| 12 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
||
| 13 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||
| 14 | # GNU Affero General Public License for more details. |
||
| 15 | # |
||
| 16 | # You should have received a copy of the GNU Affero General Public License |
||
| 17 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
||
| 18 | |||
| 19 | |||
| 20 | # pylint: disable=invalid-name,line-too-long,no-value-for-parameter |
||
| 21 | |||
| 22 | """ Unit Test for ospd-openvas """ |
||
| 23 | |||
| 24 | import io |
||
| 25 | import logging |
||
| 26 | |||
| 27 | from unittest import TestCase |
||
| 28 | from unittest.mock import patch, Mock, MagicMock |
||
| 29 | |||
| 30 | from ospd.vts import Vts |
||
| 31 | from ospd.protocol import OspRequest |
||
| 32 | |||
| 33 | from tests.dummydaemon import DummyDaemon |
||
| 34 | from tests.helper import assert_called_once |
||
| 35 | |||
| 36 | from ospd_openvas.daemon import OSPD_PARAMS, OpenVasVtsFilter |
||
| 37 | from ospd_openvas.openvas import Openvas |
||
| 38 | |||
| 39 | OSPD_PARAMS_OUT = { |
||
| 40 | 'auto_enable_dependencies': { |
||
| 41 | 'type': 'boolean', |
||
| 42 | 'name': 'auto_enable_dependencies', |
||
| 43 | 'default': 1, |
||
| 44 | 'mandatory': 1, |
||
| 45 | 'description': 'Automatically enable the plugins that are depended on', |
||
| 46 | }, |
||
| 47 | 'cgi_path': { |
||
| 48 | 'type': 'string', |
||
| 49 | 'name': 'cgi_path', |
||
| 50 | 'default': '/cgi-bin:/scripts', |
||
| 51 | 'mandatory': 1, |
||
| 52 | 'description': 'Look for default CGIs in /cgi-bin and /scripts', |
||
| 53 | }, |
||
| 54 | 'checks_read_timeout': { |
||
| 55 | 'type': 'integer', |
||
| 56 | 'name': 'checks_read_timeout', |
||
| 57 | 'default': 5, |
||
| 58 | 'mandatory': 1, |
||
| 59 | 'description': 'Number of seconds that the security checks will ' |
||
| 60 | 'wait for when doing a recv()', |
||
| 61 | }, |
||
| 62 | 'drop_privileges': { |
||
| 63 | 'type': 'boolean', |
||
| 64 | 'name': 'drop_privileges', |
||
| 65 | 'default': 0, |
||
| 66 | 'mandatory': 1, |
||
| 67 | 'description': '', |
||
| 68 | }, |
||
| 69 | 'non_simult_ports': { |
||
| 70 | 'type': 'string', |
||
| 71 | 'name': 'non_simult_ports', |
||
| 72 | 'default': '22', |
||
| 73 | 'mandatory': 1, |
||
| 74 | 'description': 'Prevent to make two connections on the same given ' |
||
| 75 | 'ports at the same time.', |
||
| 76 | }, |
||
| 77 | 'open_sock_max_attempts': { |
||
| 78 | 'type': 'integer', |
||
| 79 | 'name': 'open_sock_max_attempts', |
||
| 80 | 'default': 5, |
||
| 81 | 'mandatory': 0, |
||
| 82 | 'description': 'Number of unsuccessful retries to open the socket ' |
||
| 83 | 'before to set the port as closed.', |
||
| 84 | }, |
||
| 85 | 'timeout_retry': { |
||
| 86 | 'type': 'integer', |
||
| 87 | 'name': 'timeout_retry', |
||
| 88 | 'default': 5, |
||
| 89 | 'mandatory': 0, |
||
| 90 | 'description': 'Number of retries when a socket connection attempt ' |
||
| 91 | 'timesout.', |
||
| 92 | }, |
||
| 93 | 'optimize_test': { |
||
| 94 | 'type': 'integer', |
||
| 95 | 'name': 'optimize_test', |
||
| 96 | 'default': 5, |
||
| 97 | 'mandatory': 0, |
||
| 98 | 'description': 'By default, openvas does not trust the remote ' |
||
| 99 | 'host banners.', |
||
| 100 | }, |
||
| 101 | 'plugins_timeout': { |
||
| 102 | 'type': 'integer', |
||
| 103 | 'name': 'plugins_timeout', |
||
| 104 | 'default': 5, |
||
| 105 | 'mandatory': 0, |
||
| 106 | 'description': 'This is the maximum lifetime, in seconds of a plugin.', |
||
| 107 | }, |
||
| 108 | 'report_host_details': { |
||
| 109 | 'type': 'boolean', |
||
| 110 | 'name': 'report_host_details', |
||
| 111 | 'default': 1, |
||
| 112 | 'mandatory': 1, |
||
| 113 | 'description': '', |
||
| 114 | }, |
||
| 115 | 'safe_checks': { |
||
| 116 | 'type': 'boolean', |
||
| 117 | 'name': 'safe_checks', |
||
| 118 | 'default': 1, |
||
| 119 | 'mandatory': 1, |
||
| 120 | 'description': 'Disable the plugins with potential to crash ' |
||
| 121 | 'the remote services', |
||
| 122 | }, |
||
| 123 | 'scanner_plugins_timeout': { |
||
| 124 | 'type': 'integer', |
||
| 125 | 'name': 'scanner_plugins_timeout', |
||
| 126 | 'default': 36000, |
||
| 127 | 'mandatory': 1, |
||
| 128 | 'description': 'Like plugins_timeout, but for ACT_SCANNER plugins.', |
||
| 129 | }, |
||
| 130 | 'time_between_request': { |
||
| 131 | 'type': 'integer', |
||
| 132 | 'name': 'time_between_request', |
||
| 133 | 'default': 0, |
||
| 134 | 'mandatory': 0, |
||
| 135 | 'description': 'Allow to set a wait time between two actions ' |
||
| 136 | '(open, send, close).', |
||
| 137 | }, |
||
| 138 | 'unscanned_closed': { |
||
| 139 | 'type': 'boolean', |
||
| 140 | 'name': 'unscanned_closed', |
||
| 141 | 'default': 1, |
||
| 142 | 'mandatory': 1, |
||
| 143 | 'description': '', |
||
| 144 | }, |
||
| 145 | 'unscanned_closed_udp': { |
||
| 146 | 'type': 'boolean', |
||
| 147 | 'name': 'unscanned_closed_udp', |
||
| 148 | 'default': 1, |
||
| 149 | 'mandatory': 1, |
||
| 150 | 'description': '', |
||
| 151 | }, |
||
| 152 | 'expand_vhosts': { |
||
| 153 | 'type': 'boolean', |
||
| 154 | 'name': 'expand_vhosts', |
||
| 155 | 'default': 1, |
||
| 156 | 'mandatory': 0, |
||
| 157 | 'description': 'Whether to expand the target hosts ' |
||
| 158 | + 'list of vhosts with values gathered from sources ' |
||
| 159 | + 'such as reverse-lookup queries and VT checks ' |
||
| 160 | + 'for SSL/TLS certificates.', |
||
| 161 | }, |
||
| 162 | 'test_empty_vhost': { |
||
| 163 | 'type': 'boolean', |
||
| 164 | 'name': 'test_empty_vhost', |
||
| 165 | 'default': 0, |
||
| 166 | 'mandatory': 0, |
||
| 167 | 'description': 'If set to yes, the scanner will ' |
||
| 168 | + 'also test the target by using empty vhost value ' |
||
| 169 | + 'in addition to the targets associated vhost values.', |
||
| 170 | }, |
||
| 171 | } |
||
| 172 | |||
| 173 | |||
| 174 | class TestOspdOpenvas(TestCase): |
||
| 175 | @patch('ospd_openvas.daemon.Openvas') |
||
| 176 | def test_set_params_from_openvas_settings(self, mock_openvas: Openvas): |
||
| 177 | mock_openvas.get_settings.return_value = { |
||
| 178 | 'non_simult_ports': '22', |
||
| 179 | 'plugins_folder': '/foo/bar', |
||
| 180 | } |
||
| 181 | w = DummyDaemon() |
||
| 182 | w.set_params_from_openvas_settings() |
||
| 183 | |||
| 184 | self.assertEqual(mock_openvas.get_settings.call_count, 1) |
||
| 185 | self.assertEqual(OSPD_PARAMS, OSPD_PARAMS_OUT) |
||
| 186 | self.assertEqual(w.scan_only_params.get('plugins_folder'), '/foo/bar') |
||
| 187 | |||
| 188 | @patch('ospd_openvas.daemon.Openvas') |
||
| 189 | def test_sudo_available(self, mock_openvas): |
||
| 190 | mock_openvas.check_sudo.return_value = True |
||
| 191 | |||
| 192 | w = DummyDaemon() |
||
| 193 | w._sudo_available = None # pylint: disable=protected-access |
||
| 194 | w.sudo_available # pylint: disable=pointless-statement |
||
| 195 | |||
| 196 | self.assertTrue(w.sudo_available) |
||
| 197 | |||
| 198 | def test_get_custom_xml(self): |
||
| 199 | out = ( |
||
| 200 | '<custom>' |
||
| 201 | '<required_ports>Services/www, 80</required_ports>' |
||
| 202 | '<category>3</category>' |
||
| 203 | '<excluded_keys>Settings/disable_cgi_scanning</excluded_keys>' |
||
| 204 | '<family>Product detection</family>' |
||
| 205 | '<filename>mantis_detect.nasl</filename>' |
||
| 206 | '<timeout>0</timeout>' |
||
| 207 | '</custom>' |
||
| 208 | ) |
||
| 209 | w = DummyDaemon() |
||
| 210 | |||
| 211 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 212 | res = w.get_custom_vt_as_xml_str( |
||
| 213 | '1.3.6.1.4.1.25623.1.0.100061', vt.get('custom') |
||
| 214 | ) |
||
| 215 | self.assertEqual(len(res), len(out)) |
||
| 216 | |||
| 217 | def test_get_custom_xml_failed(self): |
||
| 218 | w = DummyDaemon() |
||
| 219 | logging.Logger.warning = Mock() |
||
| 220 | |||
| 221 | custom = {'a': u"\u0006"} |
||
| 222 | w.get_custom_vt_as_xml_str( |
||
| 223 | '1.3.6.1.4.1.25623.1.0.100061', custom=custom |
||
| 224 | ) |
||
| 225 | |||
| 226 | assert_called_once(logging.Logger.warning) |
||
| 227 | |||
| 228 | def test_get_severities_xml(self): |
||
| 229 | w = DummyDaemon() |
||
| 230 | |||
| 231 | out = ( |
||
| 232 | '<severities>' |
||
| 233 | '<severity type="cvss_base_v2">' |
||
| 234 | 'AV:N/AC:L/Au:N/C:N/I:N/A:N' |
||
| 235 | '</severity>' |
||
| 236 | '</severities>' |
||
| 237 | ) |
||
| 238 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 239 | severities = vt.get('severities') |
||
| 240 | res = w.get_severities_vt_as_xml_str( |
||
| 241 | '1.3.6.1.4.1.25623.1.0.100061', severities |
||
| 242 | ) |
||
| 243 | |||
| 244 | self.assertEqual(res, out) |
||
| 245 | |||
| 246 | def test_get_severities_xml_failed(self): |
||
| 247 | w = DummyDaemon() |
||
| 248 | logging.Logger.warning = Mock() |
||
| 249 | |||
| 250 | sever = {'severity_base_vector': u"\u0006"} |
||
| 251 | w.get_severities_vt_as_xml_str( |
||
| 252 | '1.3.6.1.4.1.25623.1.0.100061', severities=sever |
||
| 253 | ) |
||
| 254 | |||
| 255 | assert_called_once(logging.Logger.warning) |
||
| 256 | |||
| 257 | def test_get_params_xml(self): |
||
| 258 | w = DummyDaemon() |
||
| 259 | out = ( |
||
| 260 | '<params>' |
||
| 261 | '<param type="checkbox" id="2">' |
||
| 262 | '<name>Do not randomize the order in which ports are ' |
||
| 263 | 'scanned</name>' |
||
| 264 | '<default>no</default>' |
||
| 265 | '</param>' |
||
| 266 | '<param type="entry" id="1">' |
||
| 267 | '<name>Data length :</name>' |
||
| 268 | '</param>' |
||
| 269 | '</params>' |
||
| 270 | ) |
||
| 271 | |||
| 272 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 273 | params = vt.get('vt_params') |
||
| 274 | res = w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params) |
||
| 275 | |||
| 276 | self.assertEqual(len(res), len(out)) |
||
| 277 | |||
| 278 | def test_get_params_xml_failed(self): |
||
| 279 | w = DummyDaemon() |
||
| 280 | logging.Logger.warning = Mock() |
||
| 281 | |||
| 282 | params = { |
||
| 283 | '1': { |
||
| 284 | 'id': '1', |
||
| 285 | 'type': 'entry', |
||
| 286 | 'default': u'\u0006', |
||
| 287 | 'name': 'dns-fuzz.timelimit', |
||
| 288 | 'description': 'Description', |
||
| 289 | } |
||
| 290 | } |
||
| 291 | w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params) |
||
| 292 | |||
| 293 | assert_called_once(logging.Logger.warning) |
||
| 294 | |||
| 295 | def test_get_refs_xml(self): |
||
| 296 | w = DummyDaemon() |
||
| 297 | |||
| 298 | out = '<refs><ref type="url" id="http://www.mantisbt.org/"/></refs>' |
||
| 299 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 300 | refs = vt.get('vt_refs') |
||
| 301 | res = w.get_refs_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', refs) |
||
| 302 | |||
| 303 | self.assertEqual(res, out) |
||
| 304 | |||
| 305 | def test_get_dependencies_xml(self): |
||
| 306 | w = DummyDaemon() |
||
| 307 | |||
| 308 | out = ( |
||
| 309 | '<dependencies>' |
||
| 310 | '<dependency vt_id="1.2.3.4"/><dependency vt_id="4.3.2.1"/>' |
||
| 311 | '</dependencies>' |
||
| 312 | ) |
||
| 313 | dep = ['1.2.3.4', '4.3.2.1'] |
||
| 314 | res = w.get_dependencies_vt_as_xml_str( |
||
| 315 | '1.3.6.1.4.1.25623.1.0.100061', dep |
||
| 316 | ) |
||
| 317 | |||
| 318 | self.assertEqual(res, out) |
||
| 319 | |||
| 320 | def test_get_dependencies_xml_failed(self): |
||
| 321 | w = DummyDaemon() |
||
| 322 | logging.Logger.error = Mock() |
||
| 323 | |||
| 324 | dep = [u"\u0006"] |
||
| 325 | w.get_dependencies_vt_as_xml_str( |
||
| 326 | '1.3.6.1.4.1.25623.1.0.100061', vt_dependencies=dep |
||
| 327 | ) |
||
| 328 | |||
| 329 | assert_called_once(logging.Logger.error) |
||
| 330 | |||
| 331 | def test_get_ctime_xml(self): |
||
| 332 | w = DummyDaemon() |
||
| 333 | |||
| 334 | out = '<creation_time>1237458156</creation_time>' |
||
| 335 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 336 | ctime = vt.get('creation_time') |
||
| 337 | res = w.get_creation_time_vt_as_xml_str( |
||
| 338 | '1.3.6.1.4.1.25623.1.0.100061', ctime |
||
| 339 | ) |
||
| 340 | |||
| 341 | self.assertEqual(res, out) |
||
| 342 | |||
| 343 | def test_get_ctime_xml_failed(self): |
||
| 344 | w = DummyDaemon() |
||
| 345 | logging.Logger.warning = Mock() |
||
| 346 | |||
| 347 | ctime = u'\u0006' |
||
| 348 | w.get_creation_time_vt_as_xml_str( |
||
| 349 | '1.3.6.1.4.1.25623.1.0.100061', vt_creation_time=ctime |
||
| 350 | ) |
||
| 351 | |||
| 352 | assert_called_once(logging.Logger.warning) |
||
| 353 | |||
| 354 | def test_get_mtime_xml(self): |
||
| 355 | w = DummyDaemon() |
||
| 356 | |||
| 357 | out = '<modification_time>1533906565</modification_time>' |
||
| 358 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 359 | mtime = vt.get('modification_time') |
||
| 360 | res = w.get_modification_time_vt_as_xml_str( |
||
| 361 | '1.3.6.1.4.1.25623.1.0.100061', mtime |
||
| 362 | ) |
||
| 363 | |||
| 364 | self.assertEqual(res, out) |
||
| 365 | |||
| 366 | def test_get_mtime_xml_failed(self): |
||
| 367 | w = DummyDaemon() |
||
| 368 | logging.Logger.warning = Mock() |
||
| 369 | |||
| 370 | mtime = u'\u0006' |
||
| 371 | w.get_modification_time_vt_as_xml_str( |
||
| 372 | '1.3.6.1.4.1.25623.1.0.100061', mtime |
||
| 373 | ) |
||
| 374 | |||
| 375 | assert_called_once(logging.Logger.warning) |
||
| 376 | |||
| 377 | def test_get_summary_xml(self): |
||
| 378 | w = DummyDaemon() |
||
| 379 | |||
| 380 | out = '<summary>some summary</summary>' |
||
| 381 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 382 | summary = vt.get('summary') |
||
| 383 | res = w.get_summary_vt_as_xml_str( |
||
| 384 | '1.3.6.1.4.1.25623.1.0.100061', summary |
||
| 385 | ) |
||
| 386 | |||
| 387 | self.assertEqual(res, out) |
||
| 388 | |||
| 389 | def test_get_summary_xml_failed(self): |
||
| 390 | w = DummyDaemon() |
||
| 391 | |||
| 392 | summary = u'\u0006' |
||
| 393 | logging.Logger.warning = Mock() |
||
| 394 | w.get_summary_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', summary) |
||
| 395 | |||
| 396 | assert_called_once(logging.Logger.warning) |
||
| 397 | |||
| 398 | def test_get_impact_xml(self): |
||
| 399 | w = DummyDaemon() |
||
| 400 | |||
| 401 | out = '<impact>some impact</impact>' |
||
| 402 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 403 | impact = vt.get('impact') |
||
| 404 | res = w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact) |
||
| 405 | |||
| 406 | self.assertEqual(res, out) |
||
| 407 | |||
| 408 | def test_get_impact_xml_failed(self): |
||
| 409 | w = DummyDaemon() |
||
| 410 | logging.Logger.warning = Mock() |
||
| 411 | |||
| 412 | impact = u'\u0006' |
||
| 413 | w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact) |
||
| 414 | |||
| 415 | assert_called_once(logging.Logger.warning) |
||
| 416 | |||
| 417 | def test_get_insight_xml(self): |
||
| 418 | w = DummyDaemon() |
||
| 419 | |||
| 420 | out = '<insight>some insight</insight>' |
||
| 421 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 422 | insight = vt.get('insight') |
||
| 423 | res = w.get_insight_vt_as_xml_str( |
||
| 424 | '1.3.6.1.4.1.25623.1.0.100061', insight |
||
| 425 | ) |
||
| 426 | |||
| 427 | self.assertEqual(res, out) |
||
| 428 | |||
| 429 | def test_get_insight_xml_failed(self): |
||
| 430 | w = DummyDaemon() |
||
| 431 | logging.Logger.warning = Mock() |
||
| 432 | |||
| 433 | insight = u'\u0006' |
||
| 434 | w.get_insight_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', insight) |
||
| 435 | |||
| 436 | assert_called_once(logging.Logger.warning) |
||
| 437 | |||
| 438 | def test_get_solution_xml(self): |
||
| 439 | w = DummyDaemon() |
||
| 440 | |||
| 441 | out = ( |
||
| 442 | '<solution type="WillNotFix" method="DebianAPTUpgrade">' |
||
| 443 | 'some solution' |
||
| 444 | '</solution>' |
||
| 445 | ) |
||
| 446 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 447 | solution = vt.get('solution') |
||
| 448 | solution_type = vt.get('solution_type') |
||
| 449 | solution_method = vt.get('solution_method') |
||
| 450 | |||
| 451 | res = w.get_solution_vt_as_xml_str( |
||
| 452 | '1.3.6.1.4.1.25623.1.0.100061', |
||
| 453 | solution, |
||
| 454 | solution_type, |
||
| 455 | solution_method, |
||
| 456 | ) |
||
| 457 | |||
| 458 | self.assertEqual(res, out) |
||
| 459 | |||
| 460 | def test_get_solution_xml_failed(self): |
||
| 461 | w = DummyDaemon() |
||
| 462 | logging.Logger.warning = Mock() |
||
| 463 | |||
| 464 | solution = u'\u0006' |
||
| 465 | w.get_solution_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', solution) |
||
| 466 | |||
| 467 | assert_called_once(logging.Logger.warning) |
||
| 468 | |||
| 469 | def test_get_detection_xml(self): |
||
| 470 | w = DummyDaemon() |
||
| 471 | |||
| 472 | out = '<detection qod_type="remote_banner"/>' |
||
| 473 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 474 | detection_type = vt.get('qod_type') |
||
| 475 | |||
| 476 | res = w.get_detection_vt_as_xml_str( |
||
| 477 | '1.3.6.1.4.1.25623.1.0.100061', qod_type=detection_type |
||
| 478 | ) |
||
| 479 | |||
| 480 | self.assertEqual(res, out) |
||
| 481 | |||
| 482 | def test_get_detection_xml_failed(self): |
||
| 483 | w = DummyDaemon() |
||
| 484 | logging.Logger.warning = Mock() |
||
| 485 | |||
| 486 | detection = u'\u0006' |
||
| 487 | w.get_detection_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', detection) |
||
| 488 | |||
| 489 | assert_called_once(logging.Logger.warning) |
||
| 490 | |||
| 491 | def test_get_affected_xml(self): |
||
| 492 | w = DummyDaemon() |
||
| 493 | out = '<affected>some affection</affected>' |
||
| 494 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 495 | affected = vt.get('affected') |
||
| 496 | |||
| 497 | res = w.get_affected_vt_as_xml_str( |
||
| 498 | '1.3.6.1.4.1.25623.1.0.100061', affected=affected |
||
| 499 | ) |
||
| 500 | |||
| 501 | self.assertEqual(res, out) |
||
| 502 | |||
| 503 | def test_get_affected_xml_failed(self): |
||
| 504 | w = DummyDaemon() |
||
| 505 | logging.Logger.warning = Mock() |
||
| 506 | |||
| 507 | affected = u"\u0006" + "affected" |
||
| 508 | w.get_affected_vt_as_xml_str( |
||
| 509 | '1.3.6.1.4.1.25623.1.0.100061', affected=affected |
||
| 510 | ) |
||
| 511 | |||
| 512 | assert_called_once(logging.Logger.warning) |
||
| 513 | |||
| 514 | @patch('ospd_openvas.daemon.Path.exists') |
||
| 515 | @patch('ospd_openvas.daemon.OSPDopenvas.set_params_from_openvas_settings') |
||
| 516 | def test_feed_is_outdated_none( |
||
| 517 | self, mock_set_params: MagicMock, mock_path_exists: MagicMock |
||
| 518 | ): |
||
| 519 | w = DummyDaemon() |
||
| 520 | |||
| 521 | w.scan_only_params['plugins_folder'] = '/foo/bar' |
||
| 522 | |||
| 523 | # Return None |
||
| 524 | mock_path_exists.return_value = False |
||
| 525 | |||
| 526 | ret = w.feed_is_outdated('1234') |
||
| 527 | self.assertIsNone(ret) |
||
| 528 | |||
| 529 | self.assertEqual(mock_set_params.call_count, 1) |
||
| 530 | self.assertEqual(mock_path_exists.call_count, 1) |
||
| 531 | |||
| 532 | @patch('ospd_openvas.daemon.Path.exists') |
||
| 533 | @patch('ospd_openvas.daemon.Path.open') |
||
| 534 | def test_feed_is_outdated_true( |
||
| 535 | self, mock_path_open: MagicMock, mock_path_exists: MagicMock, |
||
| 536 | ): |
||
| 537 | read_data = 'PLUGIN_SET = "1235";' |
||
| 538 | |||
| 539 | mock_path_exists.return_value = True |
||
| 540 | mock_read = MagicMock(name='Path open context manager') |
||
| 541 | mock_read.__enter__ = MagicMock(return_value=io.StringIO(read_data)) |
||
| 542 | mock_path_open.return_value = mock_read |
||
| 543 | |||
| 544 | w = DummyDaemon() |
||
| 545 | |||
| 546 | # Return True |
||
| 547 | w.scan_only_params['plugins_folder'] = '/foo/bar' |
||
| 548 | |||
| 549 | ret = w.feed_is_outdated('1234') |
||
| 550 | self.assertTrue(ret) |
||
| 551 | |||
| 552 | self.assertEqual(mock_path_exists.call_count, 1) |
||
| 553 | self.assertEqual(mock_path_open.call_count, 1) |
||
| 554 | |||
| 555 | @patch('ospd_openvas.daemon.Path.exists') |
||
| 556 | @patch('ospd_openvas.daemon.Path.open') |
||
| 557 | def test_feed_is_outdated_false( |
||
| 558 | self, mock_path_open: MagicMock, mock_path_exists: MagicMock, |
||
| 559 | ): |
||
| 560 | mock_path_exists.return_value = True |
||
| 561 | |||
| 562 | read_data = 'PLUGIN_SET = "1234"' |
||
| 563 | mock_path_exists.return_value = True |
||
| 564 | mock_read = MagicMock(name='Path open context manager') |
||
| 565 | mock_read.__enter__ = MagicMock(return_value=io.StringIO(read_data)) |
||
| 566 | mock_path_open.return_value = mock_read |
||
| 567 | |||
| 568 | w = DummyDaemon() |
||
| 569 | w.scan_only_params['plugins_folder'] = '/foo/bar' |
||
| 570 | |||
| 571 | ret = w.feed_is_outdated('1234') |
||
| 572 | self.assertFalse(ret) |
||
| 573 | |||
| 574 | self.assertEqual(mock_path_exists.call_count, 1) |
||
| 575 | self.assertEqual(mock_path_open.call_count, 1) |
||
| 576 | |||
| 577 | def test_check_feed_cache_unavailable(self): |
||
| 578 | w = DummyDaemon() |
||
| 579 | w.vts.is_cache_available = False |
||
| 580 | w.feed_is_outdated = Mock() |
||
| 581 | res = w.check_feed() |
||
| 582 | |||
| 583 | self.assertFalse(res) |
||
| 584 | w.feed_is_outdated.assert_not_called() |
||
| 585 | |||
| 586 | View Code Duplication | @patch('ospd_openvas.daemon.ScanDB') |
|
|
|
|||
| 587 | @patch('ospd_openvas.daemon.ResultList.add_scan_log_to_list') |
||
| 588 | def test_get_openvas_result(self, mock_add_scan_log_to_list, MockDBClass): |
||
| 589 | w = DummyDaemon() |
||
| 590 | |||
| 591 | target_element = w.create_xml_target() |
||
| 592 | targets = OspRequest.process_target_element(target_element) |
||
| 593 | w.create_scan('123-456', targets, None, []) |
||
| 594 | |||
| 595 | results = ["LOG||| |||general/Host_Details||| |||Host dead", None] |
||
| 596 | mock_db = MockDBClass.return_value |
||
| 597 | mock_db.get_result.side_effect = results |
||
| 598 | mock_add_scan_log_to_list.return_value = None |
||
| 599 | |||
| 600 | w.report_openvas_results(mock_db, '123-456', 'localhost') |
||
| 601 | mock_add_scan_log_to_list.assert_called_with( |
||
| 602 | host='localhost', |
||
| 603 | hostname='', |
||
| 604 | name='', |
||
| 605 | port='general/Host_Details', |
||
| 606 | qod='', |
||
| 607 | test_id='', |
||
| 608 | value='Host dead', |
||
| 609 | ) |
||
| 610 | |||
| 611 | View Code Duplication | @patch('ospd_openvas.daemon.ScanDB') |
|
| 612 | @patch('ospd_openvas.daemon.ResultList.add_scan_error_to_list') |
||
| 613 | def test_get_openvas_result_host_deny( |
||
| 614 | self, mock_add_scan_error_to_list, MockDBClass |
||
| 615 | ): |
||
| 616 | w = DummyDaemon() |
||
| 617 | |||
| 618 | target_element = w.create_xml_target() |
||
| 619 | targets = OspRequest.process_target_element(target_element) |
||
| 620 | w.create_scan('123-456', targets, None, []) |
||
| 621 | |||
| 622 | results = ["ERRMSG|||127.0.0.1|||||| |||Host access denied.", None] |
||
| 623 | mock_db = MockDBClass.return_value |
||
| 624 | mock_db.get_result.side_effect = results |
||
| 625 | mock_add_scan_error_to_list.return_value = None |
||
| 626 | |||
| 627 | w.report_openvas_results(mock_db, '123-456', '') |
||
| 628 | mock_add_scan_error_to_list.assert_called_with( |
||
| 629 | host='127.0.0.1', |
||
| 630 | hostname='127.0.0.1', |
||
| 631 | name='', |
||
| 632 | port='', |
||
| 633 | test_id='', |
||
| 634 | value='Host access denied.', |
||
| 635 | ) |
||
| 636 | |||
| 637 | @patch('ospd_openvas.daemon.ScanDB') |
||
| 638 | def test_get_openvas_result_dead_hosts(self, MockDBClass): |
||
| 639 | w = DummyDaemon() |
||
| 640 | target_element = w.create_xml_target() |
||
| 641 | targets = OspRequest.process_target_element(target_element) |
||
| 642 | w.create_scan('123-456', targets, None, []) |
||
| 643 | |||
| 644 | results = ["DEADHOST||| ||| ||| |||4", None] |
||
| 645 | mock_db = MockDBClass.return_value |
||
| 646 | mock_db.get_result.side_effect = results |
||
| 647 | w.scan_collection.set_amount_dead_hosts = MagicMock() |
||
| 648 | |||
| 649 | w.report_openvas_results(mock_db, '123-456', 'localhost') |
||
| 650 | w.scan_collection.set_amount_dead_hosts.assert_called_with( |
||
| 651 | '123-456', total_dead=4, |
||
| 652 | ) |
||
| 653 | |||
| 654 | @patch('ospd_openvas.daemon.ScanDB') |
||
| 655 | @patch('ospd_openvas.daemon.ResultList.add_scan_log_to_list') |
||
| 656 | def test_result_without_vt_oid( |
||
| 657 | self, mock_add_scan_log_to_list, MockDBClass |
||
| 658 | ): |
||
| 659 | w = DummyDaemon() |
||
| 660 | logging.Logger.warning = Mock() |
||
| 661 | |||
| 662 | target_element = w.create_xml_target() |
||
| 663 | targets = OspRequest.process_target_element(target_element) |
||
| 664 | w.create_scan('123-456', targets, None, []) |
||
| 665 | w.scan_collection.scans_table['123-456']['results'] = list() |
||
| 666 | results = ["ALARM||| ||| ||| |||some alarm", None] |
||
| 667 | mock_db = MockDBClass.return_value |
||
| 668 | mock_db.get_result.side_effect = results |
||
| 669 | mock_add_scan_log_to_list.return_value = None |
||
| 670 | |||
| 671 | w.report_openvas_results(mock_db, '123-456', 'localhost') |
||
| 672 | |||
| 673 | assert_called_once(logging.Logger.warning) |
||
| 674 | |||
| 675 | @patch('ospd_openvas.db.KbDB') |
||
| 676 | def test_openvas_is_alive_already_stopped(self, mock_db): |
||
| 677 | w = DummyDaemon() |
||
| 678 | # mock_psutil = MockPsutil.return_value |
||
| 679 | mock_db.scan_is_stopped.return_value = True |
||
| 680 | ret = w.is_openvas_process_alive(mock_db, '1234', 'a1-b2-c3-d4') |
||
| 681 | |||
| 682 | self.assertTrue(ret) |
||
| 683 | |||
| 684 | @patch('ospd_openvas.db.KbDB') |
||
| 685 | def test_openvas_is_alive_still(self, mock_db): |
||
| 686 | w = DummyDaemon() |
||
| 687 | # mock_psutil = MockPsutil.return_value |
||
| 688 | mock_db.scan_is_stopped.return_value = False |
||
| 689 | ret = w.is_openvas_process_alive(mock_db, '1234', 'a1-b2-c3-d4') |
||
| 690 | |||
| 691 | self.assertFalse(ret) |
||
| 692 | |||
| 693 | @patch('ospd_openvas.daemon.OSPDaemon.set_scan_host_progress') |
||
| 694 | def test_update_progress(self, mock_set_scan_host_progress): |
||
| 695 | w = DummyDaemon() |
||
| 696 | |||
| 697 | mock_set_scan_host_progress.return_value = None |
||
| 698 | |||
| 699 | msg = '0/-1' |
||
| 700 | target_element = w.create_xml_target() |
||
| 701 | targets = OspRequest.process_target_element(target_element) |
||
| 702 | |||
| 703 | w.create_scan('123-456', targets, None, []) |
||
| 704 | w.update_progress('123-456', 'localhost', msg) |
||
| 705 | |||
| 706 | mock_set_scan_host_progress.assert_called_with( |
||
| 707 | '123-456', host='localhost', progress=-1 |
||
| 708 | ) |
||
| 709 | |||
| 710 | |||
| 711 | class TestFilters(TestCase): |
||
| 712 | def test_format_vt_modification_time(self): |
||
| 713 | ovformat = OpenVasVtsFilter(None) |
||
| 714 | td = '1517443741' |
||
| 715 | formatted = ovformat.format_vt_modification_time(td) |
||
| 716 | self.assertEqual(formatted, "20180201000901") |
||
| 717 | |||
| 718 | def test_get_filtered_vts_false(self): |
||
| 719 | w = DummyDaemon() |
||
| 720 | vts_collection = ['1234', '1.3.6.1.4.1.25623.1.0.100061'] |
||
| 721 | |||
| 722 | ovfilter = OpenVasVtsFilter(w.nvti) |
||
| 723 | res = ovfilter.get_filtered_vts_list( |
||
| 724 | vts_collection, "modification_time<10" |
||
| 725 | ) |
||
| 726 | self.assertNotIn('1.3.6.1.4.1.25623.1.0.100061', res) |
||
| 727 | |||
| 728 | def test_get_filtered_vts_true(self): |
||
| 729 | w = DummyDaemon() |
||
| 730 | vts_collection = ['1234', '1.3.6.1.4.1.25623.1.0.100061'] |
||
| 731 | |||
| 732 | ovfilter = OpenVasVtsFilter(w.nvti) |
||
| 733 | res = ovfilter.get_filtered_vts_list( |
||
| 734 | vts_collection, "modification_time>10" |
||
| 735 | ) |
||
| 736 | self.assertIn('1.3.6.1.4.1.25623.1.0.100061', res) |
||
| 737 |