| Total Complexity | 43 |
| Total Lines | 935 |
| Duplicated Lines | 5.45 % |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like tests.test_daemon often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | # -*- coding: utf-8 -*- |
||
| 2 | # Copyright (C) 2014-2021 Greenbone Networks GmbH |
||
| 3 | # |
||
| 4 | # SPDX-License-Identifier: AGPL-3.0-or-later |
||
| 5 | # |
||
| 6 | # This program is free software: you can redistribute it and/or modify |
||
| 7 | # it under the terms of the GNU Affero General Public License as |
||
| 8 | # published by the Free Software Foundation, either version 3 of the |
||
| 9 | # License, or (at your option) any later version. |
||
| 10 | # |
||
| 11 | # This program is distributed in the hope that it will be useful, |
||
| 12 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
||
| 13 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||
| 14 | # GNU Affero General Public License for more details. |
||
| 15 | # |
||
| 16 | # You should have received a copy of the GNU Affero General Public License |
||
| 17 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
||
| 18 | |||
| 19 | |||
| 20 | # pylint: disable=invalid-name,line-too-long,no-value-for-parameter |
||
| 21 | |||
| 22 | """ Unit Test for ospd-openvas """ |
||
| 23 | |||
| 24 | import io |
||
| 25 | import logging |
||
| 26 | |||
| 27 | from unittest import TestCase |
||
| 28 | from unittest.mock import patch, Mock, MagicMock |
||
| 29 | |||
| 30 | from ospd.vts import Vts |
||
| 31 | from ospd.protocol import OspRequest |
||
| 32 | |||
| 33 | from tests.dummydaemon import DummyDaemon |
||
| 34 | from tests.helper import assert_called_once |
||
| 35 | |||
| 36 | from ospd_openvas.daemon import OSPD_PARAMS, OpenVasVtsFilter |
||
| 37 | from ospd_openvas.openvas import Openvas |
||
| 38 | |||
| 39 | OSPD_PARAMS_OUT = { |
||
| 40 | 'auto_enable_dependencies': { |
||
| 41 | 'type': 'boolean', |
||
| 42 | 'name': 'auto_enable_dependencies', |
||
| 43 | 'default': 1, |
||
| 44 | 'mandatory': 1, |
||
| 45 | 'visible_for_client': True, |
||
| 46 | 'description': 'Automatically enable the plugins that are depended on', |
||
| 47 | }, |
||
| 48 | 'cgi_path': { |
||
| 49 | 'type': 'string', |
||
| 50 | 'name': 'cgi_path', |
||
| 51 | 'default': '/cgi-bin:/scripts', |
||
| 52 | 'mandatory': 1, |
||
| 53 | 'visible_for_client': True, |
||
| 54 | 'description': 'Look for default CGIs in /cgi-bin and /scripts', |
||
| 55 | }, |
||
| 56 | 'checks_read_timeout': { |
||
| 57 | 'type': 'integer', |
||
| 58 | 'name': 'checks_read_timeout', |
||
| 59 | 'default': 5, |
||
| 60 | 'mandatory': 1, |
||
| 61 | 'visible_for_client': True, |
||
| 62 | 'description': ( |
||
| 63 | 'Number of seconds that the security checks will ' |
||
| 64 | + 'wait for when doing a recv()' |
||
| 65 | ), |
||
| 66 | }, |
||
| 67 | 'non_simult_ports': { |
||
| 68 | 'type': 'string', |
||
| 69 | 'name': 'non_simult_ports', |
||
| 70 | 'default': '139, 445, 3389, Services/irc', |
||
| 71 | 'mandatory': 1, |
||
| 72 | 'visible_for_client': True, |
||
| 73 | 'description': ( |
||
| 74 | 'Prevent to make two connections on the same given ' |
||
| 75 | + 'ports at the same time.' |
||
| 76 | ), |
||
| 77 | }, |
||
| 78 | 'open_sock_max_attempts': { |
||
| 79 | 'type': 'integer', |
||
| 80 | 'name': 'open_sock_max_attempts', |
||
| 81 | 'default': 5, |
||
| 82 | 'mandatory': 0, |
||
| 83 | 'visible_for_client': True, |
||
| 84 | 'description': ( |
||
| 85 | 'Number of unsuccessful retries to open the socket ' |
||
| 86 | + 'before to set the port as closed.' |
||
| 87 | ), |
||
| 88 | }, |
||
| 89 | 'timeout_retry': { |
||
| 90 | 'type': 'integer', |
||
| 91 | 'name': 'timeout_retry', |
||
| 92 | 'default': 5, |
||
| 93 | 'mandatory': 0, |
||
| 94 | 'visible_for_client': True, |
||
| 95 | 'description': ( |
||
| 96 | 'Number of retries when a socket connection attempt ' + 'timesout.' |
||
| 97 | ), |
||
| 98 | }, |
||
| 99 | 'optimize_test': { |
||
| 100 | 'type': 'boolean', |
||
| 101 | 'name': 'optimize_test', |
||
| 102 | 'default': 1, |
||
| 103 | 'mandatory': 0, |
||
| 104 | 'visible_for_client': True, |
||
| 105 | 'description': ( |
||
| 106 | 'By default, optimize_test is enabled which means openvas does ' |
||
| 107 | + 'trust the remote host banners and is only launching plugins ' |
||
| 108 | + 'against the services they have been designed to check. ' |
||
| 109 | + 'For example it will check a web server claiming to be IIS only ' |
||
| 110 | + 'for IIS related flaws but will skip plugins testing for Apache ' |
||
| 111 | + 'flaws, and so on. This default behavior is used to optimize ' |
||
| 112 | + 'the scanning performance and to avoid false positives. ' |
||
| 113 | + 'If you are not sure that the banners of the remote host ' |
||
| 114 | + 'have been tampered with, you can disable this option.' |
||
| 115 | ), |
||
| 116 | }, |
||
| 117 | 'plugins_timeout': { |
||
| 118 | 'type': 'integer', |
||
| 119 | 'name': 'plugins_timeout', |
||
| 120 | 'default': 5, |
||
| 121 | 'mandatory': 0, |
||
| 122 | 'visible_for_client': True, |
||
| 123 | 'description': 'This is the maximum lifetime, in seconds of a plugin.', |
||
| 124 | }, |
||
| 125 | 'report_host_details': { |
||
| 126 | 'type': 'boolean', |
||
| 127 | 'name': 'report_host_details', |
||
| 128 | 'default': 1, |
||
| 129 | 'mandatory': 1, |
||
| 130 | 'visible_for_client': True, |
||
| 131 | 'description': '', |
||
| 132 | }, |
||
| 133 | 'safe_checks': { |
||
| 134 | 'type': 'boolean', |
||
| 135 | 'name': 'safe_checks', |
||
| 136 | 'default': 1, |
||
| 137 | 'mandatory': 1, |
||
| 138 | 'visible_for_client': True, |
||
| 139 | 'description': ( |
||
| 140 | 'Disable the plugins with potential to crash ' |
||
| 141 | + 'the remote services' |
||
| 142 | ), |
||
| 143 | }, |
||
| 144 | 'scanner_plugins_timeout': { |
||
| 145 | 'type': 'integer', |
||
| 146 | 'name': 'scanner_plugins_timeout', |
||
| 147 | 'default': 36000, |
||
| 148 | 'mandatory': 1, |
||
| 149 | 'visible_for_client': True, |
||
| 150 | 'description': 'Like plugins_timeout, but for ACT_SCANNER plugins.', |
||
| 151 | }, |
||
| 152 | 'time_between_request': { |
||
| 153 | 'type': 'integer', |
||
| 154 | 'name': 'time_between_request', |
||
| 155 | 'default': 0, |
||
| 156 | 'mandatory': 0, |
||
| 157 | 'visible_for_client': True, |
||
| 158 | 'description': ( |
||
| 159 | 'Allow to set a wait time between two actions ' |
||
| 160 | + '(open, send, close).' |
||
| 161 | ), |
||
| 162 | }, |
||
| 163 | 'unscanned_closed': { |
||
| 164 | 'type': 'boolean', |
||
| 165 | 'name': 'unscanned_closed', |
||
| 166 | 'default': 1, |
||
| 167 | 'mandatory': 1, |
||
| 168 | 'visible_for_client': True, |
||
| 169 | 'description': '', |
||
| 170 | }, |
||
| 171 | 'unscanned_closed_udp': { |
||
| 172 | 'type': 'boolean', |
||
| 173 | 'name': 'unscanned_closed_udp', |
||
| 174 | 'default': 1, |
||
| 175 | 'mandatory': 1, |
||
| 176 | 'visible_for_client': True, |
||
| 177 | 'description': '', |
||
| 178 | }, |
||
| 179 | 'expand_vhosts': { |
||
| 180 | 'type': 'boolean', |
||
| 181 | 'name': 'expand_vhosts', |
||
| 182 | 'default': 1, |
||
| 183 | 'mandatory': 0, |
||
| 184 | 'visible_for_client': True, |
||
| 185 | 'description': 'Whether to expand the target hosts ' |
||
| 186 | + 'list of vhosts with values gathered from sources ' |
||
| 187 | + 'such as reverse-lookup queries and VT checks ' |
||
| 188 | + 'for SSL/TLS certificates.', |
||
| 189 | }, |
||
| 190 | 'test_empty_vhost': { |
||
| 191 | 'type': 'boolean', |
||
| 192 | 'name': 'test_empty_vhost', |
||
| 193 | 'default': 0, |
||
| 194 | 'mandatory': 0, |
||
| 195 | 'visible_for_client': True, |
||
| 196 | 'description': 'If set to yes, the scanner will ' |
||
| 197 | + 'also test the target by using empty vhost value ' |
||
| 198 | + 'in addition to the targets associated vhost values.', |
||
| 199 | }, |
||
| 200 | 'max_hosts': { |
||
| 201 | 'type': 'integer', |
||
| 202 | 'name': 'max_hosts', |
||
| 203 | 'default': 30, |
||
| 204 | 'mandatory': 0, |
||
| 205 | 'visible_for_client': False, |
||
| 206 | 'description': ( |
||
| 207 | 'The maximum number of hosts to test at the same time which ' |
||
| 208 | + 'should be given to the client (which can override it). ' |
||
| 209 | + 'This value must be computed given your bandwidth, ' |
||
| 210 | + 'the number of hosts you want to test, your amount of ' |
||
| 211 | + 'memory and the performance of your processor(s).' |
||
| 212 | ), |
||
| 213 | }, |
||
| 214 | 'max_checks': { |
||
| 215 | 'type': 'integer', |
||
| 216 | 'name': 'max_checks', |
||
| 217 | 'default': 10, |
||
| 218 | 'mandatory': 0, |
||
| 219 | 'visible_for_client': False, |
||
| 220 | 'description': ( |
||
| 221 | 'The number of plugins that will run against each host being ' |
||
| 222 | + 'tested. Note that the total number of process will be max ' |
||
| 223 | + 'checks x max_hosts so you need to find a balance between ' |
||
| 224 | + 'these two options. Note that launching too many plugins at ' |
||
| 225 | + 'the same time may disable the remote host, either temporarily ' |
||
| 226 | + '(ie: inetd closes its ports) or definitely (the remote host ' |
||
| 227 | + 'crash because it is asked to do too many things at the ' |
||
| 228 | + 'same time), so be careful.' |
||
| 229 | ), |
||
| 230 | }, |
||
| 231 | 'port_range': { |
||
| 232 | 'type': 'string', |
||
| 233 | 'name': 'port_range', |
||
| 234 | 'default': '', |
||
| 235 | 'mandatory': 0, |
||
| 236 | 'visible_for_client': False, |
||
| 237 | 'description': ( |
||
| 238 | 'This is the default range of ports that the scanner plugins will ' |
||
| 239 | + 'probe. The syntax of this option is flexible, it can be a ' |
||
| 240 | + 'single range ("1-1500"), several ports ("21,23,80"), several ' |
||
| 241 | + 'ranges of ports ("1-1500,32000-33000"). Note that you can ' |
||
| 242 | + 'specify UDP and TCP ports by prefixing each range by T or U. ' |
||
| 243 | + 'For instance, the following range will make openvas scan UDP ' |
||
| 244 | + 'ports 1 to 1024 and TCP ports 1 to 65535 : ' |
||
| 245 | + '"T:1-65535,U:1-1024".' |
||
| 246 | ), |
||
| 247 | }, |
||
| 248 | 'test_alive_hosts_only': { |
||
| 249 | 'type': 'boolean', |
||
| 250 | 'name': 'test_alive_hosts_only', |
||
| 251 | 'default': 0, |
||
| 252 | 'mandatory': 0, |
||
| 253 | 'visible_for_client': False, |
||
| 254 | 'description': ( |
||
| 255 | 'If this option is set, openvas will scan the target list for ' |
||
| 256 | + 'alive hosts in a separate process while only testing those ' |
||
| 257 | + 'hosts which are identified as alive. This boosts the scan ' |
||
| 258 | + 'speed of target ranges with a high amount of dead hosts ' |
||
| 259 | + 'significantly.' |
||
| 260 | ), |
||
| 261 | }, |
||
| 262 | 'source_iface': { |
||
| 263 | 'type': 'string', |
||
| 264 | 'name': 'source_iface', |
||
| 265 | 'default': '', |
||
| 266 | 'mandatory': 0, |
||
| 267 | 'visible_for_client': False, |
||
| 268 | 'description': ( |
||
| 269 | 'Name of the network interface that will be used as the source ' |
||
| 270 | + 'of connections established by openvas. The scan won\'t be ' |
||
| 271 | + 'launched if the value isn\'t authorized according to ' |
||
| 272 | + '(sys_)ifaces_allow / (sys_)ifaces_deny if present.' |
||
| 273 | ), |
||
| 274 | }, |
||
| 275 | 'ifaces_allow': { |
||
| 276 | 'type': 'string', |
||
| 277 | 'name': 'ifaces_allow', |
||
| 278 | 'default': '', |
||
| 279 | 'mandatory': 0, |
||
| 280 | 'visible_for_client': False, |
||
| 281 | 'description': ( |
||
| 282 | 'Comma-separated list of interfaces names that are authorized ' |
||
| 283 | + 'as source_iface values.' |
||
| 284 | ), |
||
| 285 | }, |
||
| 286 | 'ifaces_deny': { |
||
| 287 | 'type': 'string', |
||
| 288 | 'name': 'ifaces_deny', |
||
| 289 | 'default': '', |
||
| 290 | 'mandatory': 0, |
||
| 291 | 'visible_for_client': False, |
||
| 292 | 'description': ( |
||
| 293 | 'Comma-separated list of interfaces names that are not ' |
||
| 294 | + 'authorized as source_iface values.' |
||
| 295 | ), |
||
| 296 | }, |
||
| 297 | 'hosts_allow': { |
||
| 298 | 'type': 'string', |
||
| 299 | 'name': 'hosts_allow', |
||
| 300 | 'default': '', |
||
| 301 | 'mandatory': 0, |
||
| 302 | 'visible_for_client': False, |
||
| 303 | 'description': ( |
||
| 304 | 'Comma-separated list of the only targets that are authorized ' |
||
| 305 | + 'to be scanned. Supports the same syntax as the list targets. ' |
||
| 306 | + 'Both target hostnames and the address to which they resolve ' |
||
| 307 | + 'are checked. Hostnames in hosts_allow list are not resolved ' |
||
| 308 | + 'however.' |
||
| 309 | ), |
||
| 310 | }, |
||
| 311 | 'hosts_deny': { |
||
| 312 | 'type': 'string', |
||
| 313 | 'name': 'hosts_deny', |
||
| 314 | 'default': '', |
||
| 315 | 'mandatory': 0, |
||
| 316 | 'visible_for_client': False, |
||
| 317 | 'description': ( |
||
| 318 | 'Comma-separated list of targets that are not authorized to ' |
||
| 319 | + 'be scanned. Supports the same syntax as the list targets. ' |
||
| 320 | + 'Both target hostnames and the address to which they resolve ' |
||
| 321 | + 'are checked. Hostnames in hosts_deny list are not ' |
||
| 322 | + 'resolved however.' |
||
| 323 | ), |
||
| 324 | }, |
||
| 325 | } |
||
| 326 | |||
| 327 | |||
| 328 | class TestOspdOpenvas(TestCase): |
||
| 329 | @patch('ospd_openvas.daemon.Openvas') |
||
| 330 | def test_set_params_from_openvas_settings(self, mock_openvas: Openvas): |
||
| 331 | mock_openvas.get_settings.return_value = { |
||
| 332 | 'non_simult_ports': '139, 445, 3389, Services/irc', |
||
| 333 | 'plugins_folder': '/foo/bar', |
||
| 334 | } |
||
| 335 | w = DummyDaemon() |
||
| 336 | w.set_params_from_openvas_settings() |
||
| 337 | |||
| 338 | self.assertEqual(mock_openvas.get_settings.call_count, 1) |
||
| 339 | self.assertEqual(OSPD_PARAMS, OSPD_PARAMS_OUT) |
||
| 340 | self.assertEqual(w.scan_only_params.get('plugins_folder'), '/foo/bar') |
||
| 341 | |||
| 342 | @patch('ospd_openvas.daemon.Openvas') |
||
| 343 | def test_sudo_available(self, mock_openvas): |
||
| 344 | mock_openvas.check_sudo.return_value = True |
||
| 345 | |||
| 346 | w = DummyDaemon() |
||
| 347 | w._sudo_available = None # pylint: disable=protected-access |
||
| 348 | w.sudo_available # pylint: disable=pointless-statement |
||
| 349 | |||
| 350 | self.assertTrue(w.sudo_available) |
||
| 351 | |||
| 352 | def test_get_custom_xml(self): |
||
| 353 | out = ( |
||
| 354 | '<custom>' |
||
| 355 | '<required_ports>Services/www, 80</required_ports>' |
||
| 356 | '<category>3</category>' |
||
| 357 | '<excluded_keys>Settings/disable_cgi_scanning</excluded_keys>' |
||
| 358 | '<family>Product detection</family>' |
||
| 359 | '<filename>mantis_detect.nasl</filename>' |
||
| 360 | '<timeout>0</timeout>' |
||
| 361 | '</custom>' |
||
| 362 | ) |
||
| 363 | w = DummyDaemon() |
||
| 364 | |||
| 365 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 366 | res = w.get_custom_vt_as_xml_str( |
||
| 367 | '1.3.6.1.4.1.25623.1.0.100061', vt.get('custom') |
||
| 368 | ) |
||
| 369 | self.assertEqual(len(res), len(out)) |
||
| 370 | |||
| 371 | def test_get_custom_xml_failed(self): |
||
| 372 | w = DummyDaemon() |
||
| 373 | logging.Logger.warning = Mock() |
||
| 374 | |||
| 375 | custom = {'a': u"\u0006"} |
||
| 376 | w.get_custom_vt_as_xml_str( |
||
| 377 | '1.3.6.1.4.1.25623.1.0.100061', custom=custom |
||
| 378 | ) |
||
| 379 | |||
| 380 | assert_called_once(logging.Logger.warning) |
||
| 381 | |||
| 382 | def test_get_severities_xml(self): |
||
| 383 | w = DummyDaemon() |
||
| 384 | |||
| 385 | out = ( |
||
| 386 | '<severities>' |
||
| 387 | '<severity type="cvss_base_v2">' |
||
| 388 | 'AV:N/AC:L/Au:N/C:N/I:N/A:N' |
||
| 389 | '</severity>' |
||
| 390 | '</severities>' |
||
| 391 | ) |
||
| 392 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 393 | severities = vt.get('severities') |
||
| 394 | res = w.get_severities_vt_as_xml_str( |
||
| 395 | '1.3.6.1.4.1.25623.1.0.100061', severities |
||
| 396 | ) |
||
| 397 | |||
| 398 | self.assertEqual(res, out) |
||
| 399 | |||
| 400 | def test_get_severities_xml_failed(self): |
||
| 401 | w = DummyDaemon() |
||
| 402 | logging.Logger.warning = Mock() |
||
| 403 | |||
| 404 | sever = {'severity_base_vector': u"\u0006"} |
||
| 405 | w.get_severities_vt_as_xml_str( |
||
| 406 | '1.3.6.1.4.1.25623.1.0.100061', severities=sever |
||
| 407 | ) |
||
| 408 | |||
| 409 | assert_called_once(logging.Logger.warning) |
||
| 410 | |||
| 411 | def test_get_params_xml(self): |
||
| 412 | w = DummyDaemon() |
||
| 413 | out = ( |
||
| 414 | '<params>' |
||
| 415 | '<param type="checkbox" id="2">' |
||
| 416 | '<name>Do not randomize the order in which ports are ' |
||
| 417 | 'scanned</name>' |
||
| 418 | '<default>no</default>' |
||
| 419 | '</param>' |
||
| 420 | '<param type="entry" id="1">' |
||
| 421 | '<name>Data length :</name>' |
||
| 422 | '</param>' |
||
| 423 | '</params>' |
||
| 424 | ) |
||
| 425 | |||
| 426 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 427 | params = vt.get('vt_params') |
||
| 428 | res = w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params) |
||
| 429 | |||
| 430 | self.assertEqual(len(res), len(out)) |
||
| 431 | |||
| 432 | def test_get_params_xml_failed(self): |
||
| 433 | w = DummyDaemon() |
||
| 434 | logging.Logger.warning = Mock() |
||
| 435 | |||
| 436 | params = { |
||
| 437 | '1': { |
||
| 438 | 'id': '1', |
||
| 439 | 'type': 'entry', |
||
| 440 | 'default': u'\u0006', |
||
| 441 | 'name': 'dns-fuzz.timelimit', |
||
| 442 | 'description': 'Description', |
||
| 443 | } |
||
| 444 | } |
||
| 445 | w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params) |
||
| 446 | |||
| 447 | assert_called_once(logging.Logger.warning) |
||
| 448 | |||
| 449 | def test_get_refs_xml(self): |
||
| 450 | w = DummyDaemon() |
||
| 451 | |||
| 452 | out = '<refs><ref type="url" id="http://www.mantisbt.org/"/></refs>' |
||
| 453 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 454 | refs = vt.get('vt_refs') |
||
| 455 | res = w.get_refs_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', refs) |
||
| 456 | |||
| 457 | self.assertEqual(res, out) |
||
| 458 | |||
| 459 | def test_get_dependencies_xml(self): |
||
| 460 | w = DummyDaemon() |
||
| 461 | |||
| 462 | out = ( |
||
| 463 | '<dependencies>' |
||
| 464 | '<dependency vt_id="1.3.6.1.4.1.25623.1.2.3.4"/>' |
||
| 465 | '<dependency vt_id="1.3.6.1.4.1.25623.4.3.2.1"/>' |
||
| 466 | '</dependencies>' |
||
| 467 | ) |
||
| 468 | dep = ['1.3.6.1.4.1.25623.1.2.3.4', '1.3.6.1.4.1.25623.4.3.2.1'] |
||
| 469 | res = w.get_dependencies_vt_as_xml_str( |
||
| 470 | '1.3.6.1.4.1.25623.1.0.100061', dep |
||
| 471 | ) |
||
| 472 | |||
| 473 | self.assertEqual(res, out) |
||
| 474 | |||
| 475 | def test_get_dependencies_xml_missing_dep(self): |
||
| 476 | w = DummyDaemon() |
||
| 477 | |||
| 478 | out = ( |
||
| 479 | '<dependencies>' |
||
| 480 | '<dependency vt_id="1.3.6.1.4.1.25623.1.2.3.4"/>' |
||
| 481 | '</dependencies>' |
||
| 482 | ) |
||
| 483 | dep = ['1.3.6.1.4.1.25623.1.2.3.4', 'file_name.nasl'] |
||
| 484 | res = w.get_dependencies_vt_as_xml_str( |
||
| 485 | '1.3.6.1.4.1.25623.1.0.100061', dep |
||
| 486 | ) |
||
| 487 | |||
| 488 | self.assertEqual(res, out) |
||
| 489 | |||
| 490 | def test_get_dependencies_xml_failed(self): |
||
| 491 | w = DummyDaemon() |
||
| 492 | logging.Logger.error = Mock() |
||
| 493 | |||
| 494 | dep = [u"\u0006"] |
||
| 495 | w.get_dependencies_vt_as_xml_str( |
||
| 496 | '1.3.6.1.4.1.25623.1.0.100061', vt_dependencies=dep |
||
| 497 | ) |
||
| 498 | |||
| 499 | assert_called_once(logging.Logger.error) |
||
| 500 | |||
| 501 | def test_get_ctime_xml(self): |
||
| 502 | w = DummyDaemon() |
||
| 503 | |||
| 504 | out = '<creation_time>1237458156</creation_time>' |
||
| 505 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 506 | ctime = vt.get('creation_time') |
||
| 507 | res = w.get_creation_time_vt_as_xml_str( |
||
| 508 | '1.3.6.1.4.1.25623.1.0.100061', ctime |
||
| 509 | ) |
||
| 510 | |||
| 511 | self.assertEqual(res, out) |
||
| 512 | |||
| 513 | def test_get_ctime_xml_failed(self): |
||
| 514 | w = DummyDaemon() |
||
| 515 | logging.Logger.warning = Mock() |
||
| 516 | |||
| 517 | ctime = u'\u0006' |
||
| 518 | w.get_creation_time_vt_as_xml_str( |
||
| 519 | '1.3.6.1.4.1.25623.1.0.100061', vt_creation_time=ctime |
||
| 520 | ) |
||
| 521 | |||
| 522 | assert_called_once(logging.Logger.warning) |
||
| 523 | |||
| 524 | def test_get_mtime_xml(self): |
||
| 525 | w = DummyDaemon() |
||
| 526 | |||
| 527 | out = '<modification_time>1533906565</modification_time>' |
||
| 528 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 529 | mtime = vt.get('modification_time') |
||
| 530 | res = w.get_modification_time_vt_as_xml_str( |
||
| 531 | '1.3.6.1.4.1.25623.1.0.100061', mtime |
||
| 532 | ) |
||
| 533 | |||
| 534 | self.assertEqual(res, out) |
||
| 535 | |||
| 536 | def test_get_mtime_xml_failed(self): |
||
| 537 | w = DummyDaemon() |
||
| 538 | logging.Logger.warning = Mock() |
||
| 539 | |||
| 540 | mtime = u'\u0006' |
||
| 541 | w.get_modification_time_vt_as_xml_str( |
||
| 542 | '1.3.6.1.4.1.25623.1.0.100061', mtime |
||
| 543 | ) |
||
| 544 | |||
| 545 | assert_called_once(logging.Logger.warning) |
||
| 546 | |||
| 547 | def test_get_summary_xml(self): |
||
| 548 | w = DummyDaemon() |
||
| 549 | |||
| 550 | out = '<summary>some summary</summary>' |
||
| 551 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 552 | summary = vt.get('summary') |
||
| 553 | res = w.get_summary_vt_as_xml_str( |
||
| 554 | '1.3.6.1.4.1.25623.1.0.100061', summary |
||
| 555 | ) |
||
| 556 | |||
| 557 | self.assertEqual(res, out) |
||
| 558 | |||
| 559 | def test_get_summary_xml_failed(self): |
||
| 560 | w = DummyDaemon() |
||
| 561 | |||
| 562 | summary = u'\u0006' |
||
| 563 | logging.Logger.warning = Mock() |
||
| 564 | w.get_summary_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', summary) |
||
| 565 | |||
| 566 | assert_called_once(logging.Logger.warning) |
||
| 567 | |||
| 568 | def test_get_impact_xml(self): |
||
| 569 | w = DummyDaemon() |
||
| 570 | |||
| 571 | out = '<impact>some impact</impact>' |
||
| 572 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 573 | impact = vt.get('impact') |
||
| 574 | res = w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact) |
||
| 575 | |||
| 576 | self.assertEqual(res, out) |
||
| 577 | |||
| 578 | def test_get_impact_xml_failed(self): |
||
| 579 | w = DummyDaemon() |
||
| 580 | logging.Logger.warning = Mock() |
||
| 581 | |||
| 582 | impact = u'\u0006' |
||
| 583 | w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact) |
||
| 584 | |||
| 585 | assert_called_once(logging.Logger.warning) |
||
| 586 | |||
| 587 | def test_get_insight_xml(self): |
||
| 588 | w = DummyDaemon() |
||
| 589 | |||
| 590 | out = '<insight>some insight</insight>' |
||
| 591 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 592 | insight = vt.get('insight') |
||
| 593 | res = w.get_insight_vt_as_xml_str( |
||
| 594 | '1.3.6.1.4.1.25623.1.0.100061', insight |
||
| 595 | ) |
||
| 596 | |||
| 597 | self.assertEqual(res, out) |
||
| 598 | |||
| 599 | def test_get_insight_xml_failed(self): |
||
| 600 | w = DummyDaemon() |
||
| 601 | logging.Logger.warning = Mock() |
||
| 602 | |||
| 603 | insight = u'\u0006' |
||
| 604 | w.get_insight_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', insight) |
||
| 605 | |||
| 606 | assert_called_once(logging.Logger.warning) |
||
| 607 | |||
| 608 | def test_get_solution_xml(self): |
||
| 609 | w = DummyDaemon() |
||
| 610 | |||
| 611 | out = ( |
||
| 612 | '<solution type="WillNotFix" method="DebianAPTUpgrade">' |
||
| 613 | 'some solution' |
||
| 614 | '</solution>' |
||
| 615 | ) |
||
| 616 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 617 | solution = vt.get('solution') |
||
| 618 | solution_type = vt.get('solution_type') |
||
| 619 | solution_method = vt.get('solution_method') |
||
| 620 | |||
| 621 | res = w.get_solution_vt_as_xml_str( |
||
| 622 | '1.3.6.1.4.1.25623.1.0.100061', |
||
| 623 | solution, |
||
| 624 | solution_type, |
||
| 625 | solution_method, |
||
| 626 | ) |
||
| 627 | |||
| 628 | self.assertEqual(res, out) |
||
| 629 | |||
| 630 | def test_get_solution_xml_failed(self): |
||
| 631 | w = DummyDaemon() |
||
| 632 | logging.Logger.warning = Mock() |
||
| 633 | |||
| 634 | solution = u'\u0006' |
||
| 635 | w.get_solution_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', solution) |
||
| 636 | |||
| 637 | assert_called_once(logging.Logger.warning) |
||
| 638 | |||
| 639 | def test_get_detection_xml(self): |
||
| 640 | w = DummyDaemon() |
||
| 641 | |||
| 642 | out = '<detection qod_type="remote_banner"/>' |
||
| 643 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 644 | detection_type = vt.get('qod_type') |
||
| 645 | |||
| 646 | res = w.get_detection_vt_as_xml_str( |
||
| 647 | '1.3.6.1.4.1.25623.1.0.100061', qod_type=detection_type |
||
| 648 | ) |
||
| 649 | |||
| 650 | self.assertEqual(res, out) |
||
| 651 | |||
| 652 | def test_get_detection_xml_failed(self): |
||
| 653 | w = DummyDaemon() |
||
| 654 | logging.Logger.warning = Mock() |
||
| 655 | |||
| 656 | detection = u'\u0006' |
||
| 657 | w.get_detection_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', detection) |
||
| 658 | |||
| 659 | assert_called_once(logging.Logger.warning) |
||
| 660 | |||
| 661 | def test_get_affected_xml(self): |
||
| 662 | w = DummyDaemon() |
||
| 663 | out = '<affected>some affection</affected>' |
||
| 664 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 665 | affected = vt.get('affected') |
||
| 666 | |||
| 667 | res = w.get_affected_vt_as_xml_str( |
||
| 668 | '1.3.6.1.4.1.25623.1.0.100061', affected=affected |
||
| 669 | ) |
||
| 670 | |||
| 671 | self.assertEqual(res, out) |
||
| 672 | |||
| 673 | def test_get_affected_xml_failed(self): |
||
| 674 | w = DummyDaemon() |
||
| 675 | logging.Logger.warning = Mock() |
||
| 676 | |||
| 677 | affected = u"\u0006" + "affected" |
||
| 678 | w.get_affected_vt_as_xml_str( |
||
| 679 | '1.3.6.1.4.1.25623.1.0.100061', affected=affected |
||
| 680 | ) |
||
| 681 | |||
| 682 | assert_called_once(logging.Logger.warning) |
||
| 683 | |||
| 684 | @patch('ospd_openvas.daemon.Path.exists') |
||
| 685 | @patch('ospd_openvas.daemon.OSPDopenvas.set_params_from_openvas_settings') |
||
| 686 | def test_feed_is_outdated_none( |
||
| 687 | self, mock_set_params: MagicMock, mock_path_exists: MagicMock |
||
| 688 | ): |
||
| 689 | w = DummyDaemon() |
||
| 690 | |||
| 691 | w.scan_only_params['plugins_folder'] = '/foo/bar' |
||
| 692 | |||
| 693 | # Return None |
||
| 694 | mock_path_exists.return_value = False |
||
| 695 | |||
| 696 | ret = w.feed_is_outdated('1234') |
||
| 697 | self.assertIsNone(ret) |
||
| 698 | |||
| 699 | self.assertEqual(mock_set_params.call_count, 1) |
||
| 700 | self.assertEqual(mock_path_exists.call_count, 1) |
||
| 701 | |||
| 702 | @patch('ospd_openvas.daemon.Path.exists') |
||
| 703 | @patch('ospd_openvas.daemon.Path.open') |
||
| 704 | def test_feed_is_outdated_true( |
||
| 705 | self, |
||
| 706 | mock_path_open: MagicMock, |
||
| 707 | mock_path_exists: MagicMock, |
||
| 708 | ): |
||
| 709 | read_data = 'PLUGIN_SET = "1235";' |
||
| 710 | |||
| 711 | mock_path_exists.return_value = True |
||
| 712 | mock_read = MagicMock(name='Path open context manager') |
||
| 713 | mock_read.__enter__ = MagicMock(return_value=io.StringIO(read_data)) |
||
| 714 | mock_path_open.return_value = mock_read |
||
| 715 | |||
| 716 | w = DummyDaemon() |
||
| 717 | |||
| 718 | # Return True |
||
| 719 | w.scan_only_params['plugins_folder'] = '/foo/bar' |
||
| 720 | |||
| 721 | ret = w.feed_is_outdated('1234') |
||
| 722 | self.assertTrue(ret) |
||
| 723 | |||
| 724 | self.assertEqual(mock_path_exists.call_count, 1) |
||
| 725 | self.assertEqual(mock_path_open.call_count, 1) |
||
| 726 | |||
| 727 | @patch('ospd_openvas.daemon.Path.exists') |
||
| 728 | @patch('ospd_openvas.daemon.Path.open') |
||
| 729 | def test_feed_is_outdated_false( |
||
| 730 | self, |
||
| 731 | mock_path_open: MagicMock, |
||
| 732 | mock_path_exists: MagicMock, |
||
| 733 | ): |
||
| 734 | mock_path_exists.return_value = True |
||
| 735 | |||
| 736 | read_data = 'PLUGIN_SET = "1234"' |
||
| 737 | mock_path_exists.return_value = True |
||
| 738 | mock_read = MagicMock(name='Path open context manager') |
||
| 739 | mock_read.__enter__ = MagicMock(return_value=io.StringIO(read_data)) |
||
| 740 | mock_path_open.return_value = mock_read |
||
| 741 | |||
| 742 | w = DummyDaemon() |
||
| 743 | w.scan_only_params['plugins_folder'] = '/foo/bar' |
||
| 744 | |||
| 745 | ret = w.feed_is_outdated('1234') |
||
| 746 | self.assertFalse(ret) |
||
| 747 | |||
| 748 | self.assertEqual(mock_path_exists.call_count, 1) |
||
| 749 | self.assertEqual(mock_path_open.call_count, 1) |
||
| 750 | |||
| 751 | def test_check_feed_cache_unavailable(self): |
||
| 752 | w = DummyDaemon() |
||
| 753 | w.vts.is_cache_available = False |
||
| 754 | w.feed_is_outdated = Mock() |
||
| 755 | res = w.check_feed() |
||
| 756 | |||
| 757 | self.assertFalse(res) |
||
| 758 | w.feed_is_outdated.assert_not_called() |
||
| 759 | |||
| 760 | View Code Duplication | @patch('ospd_openvas.daemon.ScanDB') |
|
|
|
|||
| 761 | @patch('ospd_openvas.daemon.ResultList.add_scan_log_to_list') |
||
| 762 | def test_get_openvas_result(self, mock_add_scan_log_to_list, MockDBClass): |
||
| 763 | w = DummyDaemon() |
||
| 764 | |||
| 765 | target_element = w.create_xml_target() |
||
| 766 | targets = OspRequest.process_target_element(target_element) |
||
| 767 | w.create_scan('123-456', targets, None, []) |
||
| 768 | |||
| 769 | results = [ |
||
| 770 | "LOG||||||general/Host_Details||||||Host dead", |
||
| 771 | ] |
||
| 772 | MockDBClass.get_result.return_value = results |
||
| 773 | mock_add_scan_log_to_list.return_value = None |
||
| 774 | |||
| 775 | w.report_openvas_results(MockDBClass, '123-456', 'localhost') |
||
| 776 | mock_add_scan_log_to_list.assert_called_with( |
||
| 777 | host='localhost', |
||
| 778 | hostname='', |
||
| 779 | name='', |
||
| 780 | port='general/Host_Details', |
||
| 781 | qod='', |
||
| 782 | test_id='', |
||
| 783 | uri='', |
||
| 784 | value='Host dead', |
||
| 785 | ) |
||
| 786 | |||
| 787 | View Code Duplication | @patch('ospd_openvas.daemon.ScanDB') |
|
| 788 | @patch('ospd_openvas.daemon.ResultList.add_scan_error_to_list') |
||
| 789 | def test_get_openvas_result_host_deny( |
||
| 790 | self, mock_add_scan_error_to_list, MockDBClass |
||
| 791 | ): |
||
| 792 | w = DummyDaemon() |
||
| 793 | |||
| 794 | target_element = w.create_xml_target() |
||
| 795 | targets = OspRequest.process_target_element(target_element) |
||
| 796 | w.create_scan('123-456', targets, None, []) |
||
| 797 | |||
| 798 | results = [ |
||
| 799 | "ERRMSG|||127.0.0.1|||||||||Host access denied.", |
||
| 800 | ] |
||
| 801 | MockDBClass.get_result.return_value = results |
||
| 802 | mock_add_scan_error_to_list.return_value = None |
||
| 803 | |||
| 804 | w.report_openvas_results(MockDBClass, '123-456', '') |
||
| 805 | mock_add_scan_error_to_list.assert_called_with( |
||
| 806 | host='127.0.0.1', |
||
| 807 | hostname='127.0.0.1', |
||
| 808 | name='', |
||
| 809 | port='', |
||
| 810 | test_id='', |
||
| 811 | uri='', |
||
| 812 | value='Host access denied.', |
||
| 813 | ) |
||
| 814 | |||
| 815 | @patch('ospd_openvas.daemon.ScanDB') |
||
| 816 | def test_get_openvas_result_dead_hosts(self, MockDBClass): |
||
| 817 | w = DummyDaemon() |
||
| 818 | target_element = w.create_xml_target() |
||
| 819 | targets = OspRequest.process_target_element(target_element) |
||
| 820 | w.create_scan('123-456', targets, None, []) |
||
| 821 | |||
| 822 | results = [ |
||
| 823 | "DEADHOST||| ||| ||| |||4", |
||
| 824 | ] |
||
| 825 | MockDBClass.get_result.return_value = results |
||
| 826 | w.scan_collection.set_amount_dead_hosts = MagicMock() |
||
| 827 | |||
| 828 | w.report_openvas_results(MockDBClass, '123-456', 'localhost') |
||
| 829 | w.scan_collection.set_amount_dead_hosts.assert_called_with( |
||
| 830 | '123-456', |
||
| 831 | total_dead=4, |
||
| 832 | ) |
||
| 833 | |||
| 834 | @patch('ospd_openvas.daemon.ScanDB') |
||
| 835 | def test_get_openvas_result_hosts_count(self, MockDBClass): |
||
| 836 | w = DummyDaemon() |
||
| 837 | target_element = w.create_xml_target() |
||
| 838 | targets = OspRequest.process_target_element(target_element) |
||
| 839 | w.create_scan('123-456', targets, None, []) |
||
| 840 | |||
| 841 | results = [ |
||
| 842 | "HOSTS_COUNT||| ||| ||| |||4", |
||
| 843 | ] |
||
| 844 | MockDBClass.get_result.return_value = results |
||
| 845 | w.set_scan_total_hosts = MagicMock() |
||
| 846 | |||
| 847 | w.report_openvas_results(MockDBClass, '123-456', 'localhost') |
||
| 848 | w.set_scan_total_hosts.assert_called_with( |
||
| 849 | '123-456', |
||
| 850 | 4, |
||
| 851 | ) |
||
| 852 | |||
| 853 | @patch('ospd_openvas.daemon.ScanDB') |
||
| 854 | @patch('ospd_openvas.daemon.ResultList.add_scan_alarm_to_list') |
||
| 855 | def test_result_without_vt_oid( |
||
| 856 | self, mock_add_scan_alarm_to_list, MockDBClass |
||
| 857 | ): |
||
| 858 | w = DummyDaemon() |
||
| 859 | logging.Logger.warning = Mock() |
||
| 860 | |||
| 861 | target_element = w.create_xml_target() |
||
| 862 | targets = OspRequest.process_target_element(target_element) |
||
| 863 | w.create_scan('123-456', targets, None, []) |
||
| 864 | w.scan_collection.scans_table['123-456']['results'] = list() |
||
| 865 | results = ["ALARM||| ||| ||| |||some alarm|||path", None] |
||
| 866 | MockDBClass.get_result.return_value = results |
||
| 867 | mock_add_scan_alarm_to_list.return_value = None |
||
| 868 | |||
| 869 | w.report_openvas_results(MockDBClass, '123-456', 'localhost') |
||
| 870 | |||
| 871 | assert_called_once(logging.Logger.warning) |
||
| 872 | |||
| 873 | @patch('ospd_openvas.db.KbDB') |
||
| 874 | def test_openvas_is_alive_already_stopped(self, mock_db): |
||
| 875 | w = DummyDaemon() |
||
| 876 | # mock_psutil = MockPsutil.return_value |
||
| 877 | mock_db.scan_is_stopped.return_value = True |
||
| 878 | ret = w.is_openvas_process_alive(mock_db, '1234', 'a1-b2-c3-d4') |
||
| 879 | |||
| 880 | self.assertTrue(ret) |
||
| 881 | |||
| 882 | @patch('ospd_openvas.db.KbDB') |
||
| 883 | def test_openvas_is_alive_still(self, mock_db): |
||
| 884 | w = DummyDaemon() |
||
| 885 | # mock_psutil = MockPsutil.return_value |
||
| 886 | mock_db.scan_is_stopped.return_value = False |
||
| 887 | ret = w.is_openvas_process_alive(mock_db, '1234', 'a1-b2-c3-d4') |
||
| 888 | |||
| 889 | self.assertFalse(ret) |
||
| 890 | |||
| 891 | @patch('ospd_openvas.daemon.OSPDaemon.set_scan_host_progress') |
||
| 892 | def test_update_progress(self, mock_set_scan_host_progress): |
||
| 893 | w = DummyDaemon() |
||
| 894 | |||
| 895 | mock_set_scan_host_progress.return_value = None |
||
| 896 | |||
| 897 | msg = '0/-1' |
||
| 898 | target_element = w.create_xml_target() |
||
| 899 | targets = OspRequest.process_target_element(target_element) |
||
| 900 | |||
| 901 | w.create_scan('123-456', targets, None, []) |
||
| 902 | w.update_progress('123-456', 'localhost', msg) |
||
| 903 | |||
| 904 | mock_set_scan_host_progress.assert_called_with( |
||
| 905 | '123-456', host='localhost', progress=-1 |
||
| 906 | ) |
||
| 907 | |||
| 908 | |||
| 909 | class TestFilters(TestCase): |
||
| 910 | def test_format_vt_modification_time(self): |
||
| 911 | ovformat = OpenVasVtsFilter(None) |
||
| 912 | td = '1517443741' |
||
| 913 | formatted = ovformat.format_vt_modification_time(td) |
||
| 914 | self.assertEqual(formatted, "20180201000901") |
||
| 915 | |||
| 916 | def test_get_filtered_vts_false(self): |
||
| 917 | w = DummyDaemon() |
||
| 918 | vts_collection = ['1234', '1.3.6.1.4.1.25623.1.0.100061'] |
||
| 919 | |||
| 920 | ovfilter = OpenVasVtsFilter(w.nvti) |
||
| 921 | res = ovfilter.get_filtered_vts_list( |
||
| 922 | vts_collection, "modification_time<10" |
||
| 923 | ) |
||
| 924 | self.assertNotIn('1.3.6.1.4.1.25623.1.0.100061', res) |
||
| 925 | |||
| 926 | def test_get_filtered_vts_true(self): |
||
| 927 | w = DummyDaemon() |
||
| 928 | vts_collection = ['1234', '1.3.6.1.4.1.25623.1.0.100061'] |
||
| 929 | |||
| 930 | ovfilter = OpenVasVtsFilter(w.nvti) |
||
| 931 | res = ovfilter.get_filtered_vts_list( |
||
| 932 | vts_collection, "modification_time>10" |
||
| 933 | ) |
||
| 934 | self.assertIn('1.3.6.1.4.1.25623.1.0.100061', res) |
||
| 935 |