| Total Complexity | 40 |
| Total Lines | 723 |
| Duplicated Lines | 6.5 % |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like tests.test_daemon often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | # -*- coding: utf-8 -*- |
||
| 2 | # Copyright (C) 2014-2020 Greenbone Networks GmbH |
||
| 3 | # |
||
| 4 | # SPDX-License-Identifier: AGPL-3.0-or-later |
||
| 5 | # |
||
| 6 | # This program is free software: you can redistribute it and/or modify |
||
| 7 | # it under the terms of the GNU Affero General Public License as |
||
| 8 | # published by the Free Software Foundation, either version 3 of the |
||
| 9 | # License, or (at your option) any later version. |
||
| 10 | # |
||
| 11 | # This program is distributed in the hope that it will be useful, |
||
| 12 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
||
| 13 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||
| 14 | # GNU Affero General Public License for more details. |
||
| 15 | # |
||
| 16 | # You should have received a copy of the GNU Affero General Public License |
||
| 17 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
||
| 18 | |||
| 19 | |||
| 20 | # pylint: disable=invalid-name,line-too-long,no-value-for-parameter |
||
| 21 | |||
| 22 | """ Unit Test for ospd-openvas """ |
||
| 23 | |||
| 24 | import io |
||
| 25 | import logging |
||
| 26 | |||
| 27 | from unittest import TestCase |
||
| 28 | from unittest.mock import patch, Mock, MagicMock |
||
| 29 | |||
| 30 | from ospd.vts import Vts |
||
| 31 | from ospd.protocol import OspRequest |
||
| 32 | |||
| 33 | from tests.dummydaemon import DummyDaemon |
||
| 34 | from tests.helper import assert_called_once |
||
| 35 | |||
| 36 | from ospd_openvas.daemon import OSPD_PARAMS, OpenVasVtsFilter |
||
| 37 | from ospd_openvas.openvas import Openvas |
||
| 38 | |||
| 39 | OSPD_PARAMS_OUT = { |
||
| 40 | 'auto_enable_dependencies': { |
||
| 41 | 'type': 'boolean', |
||
| 42 | 'name': 'auto_enable_dependencies', |
||
| 43 | 'default': 1, |
||
| 44 | 'mandatory': 1, |
||
| 45 | 'description': 'Automatically enable the plugins that are depended on', |
||
| 46 | }, |
||
| 47 | 'cgi_path': { |
||
| 48 | 'type': 'string', |
||
| 49 | 'name': 'cgi_path', |
||
| 50 | 'default': '/cgi-bin:/scripts', |
||
| 51 | 'mandatory': 1, |
||
| 52 | 'description': 'Look for default CGIs in /cgi-bin and /scripts', |
||
| 53 | }, |
||
| 54 | 'checks_read_timeout': { |
||
| 55 | 'type': 'integer', |
||
| 56 | 'name': 'checks_read_timeout', |
||
| 57 | 'default': 5, |
||
| 58 | 'mandatory': 1, |
||
| 59 | 'description': 'Number of seconds that the security checks will ' |
||
| 60 | 'wait for when doing a recv()', |
||
| 61 | }, |
||
| 62 | 'drop_privileges': { |
||
| 63 | 'type': 'boolean', |
||
| 64 | 'name': 'drop_privileges', |
||
| 65 | 'default': 0, |
||
| 66 | 'mandatory': 1, |
||
| 67 | 'description': '', |
||
| 68 | }, |
||
| 69 | 'network_scan': { |
||
| 70 | 'type': 'boolean', |
||
| 71 | 'name': 'network_scan', |
||
| 72 | 'default': 0, |
||
| 73 | 'mandatory': 1, |
||
| 74 | 'description': '', |
||
| 75 | }, |
||
| 76 | 'non_simult_ports': { |
||
| 77 | 'type': 'string', |
||
| 78 | 'name': 'non_simult_ports', |
||
| 79 | 'default': '22', |
||
| 80 | 'mandatory': 1, |
||
| 81 | 'description': 'Prevent to make two connections on the same given ' |
||
| 82 | 'ports at the same time.', |
||
| 83 | }, |
||
| 84 | 'open_sock_max_attempts': { |
||
| 85 | 'type': 'integer', |
||
| 86 | 'name': 'open_sock_max_attempts', |
||
| 87 | 'default': 5, |
||
| 88 | 'mandatory': 0, |
||
| 89 | 'description': 'Number of unsuccessful retries to open the socket ' |
||
| 90 | 'before to set the port as closed.', |
||
| 91 | }, |
||
| 92 | 'timeout_retry': { |
||
| 93 | 'type': 'integer', |
||
| 94 | 'name': 'timeout_retry', |
||
| 95 | 'default': 5, |
||
| 96 | 'mandatory': 0, |
||
| 97 | 'description': 'Number of retries when a socket connection attempt ' |
||
| 98 | 'timesout.', |
||
| 99 | }, |
||
| 100 | 'optimize_test': { |
||
| 101 | 'type': 'integer', |
||
| 102 | 'name': 'optimize_test', |
||
| 103 | 'default': 5, |
||
| 104 | 'mandatory': 0, |
||
| 105 | 'description': 'By default, openvas does not trust the remote ' |
||
| 106 | 'host banners.', |
||
| 107 | }, |
||
| 108 | 'plugins_timeout': { |
||
| 109 | 'type': 'integer', |
||
| 110 | 'name': 'plugins_timeout', |
||
| 111 | 'default': 5, |
||
| 112 | 'mandatory': 0, |
||
| 113 | 'description': 'This is the maximum lifetime, in seconds of a plugin.', |
||
| 114 | }, |
||
| 115 | 'report_host_details': { |
||
| 116 | 'type': 'boolean', |
||
| 117 | 'name': 'report_host_details', |
||
| 118 | 'default': 1, |
||
| 119 | 'mandatory': 1, |
||
| 120 | 'description': '', |
||
| 121 | }, |
||
| 122 | 'safe_checks': { |
||
| 123 | 'type': 'boolean', |
||
| 124 | 'name': 'safe_checks', |
||
| 125 | 'default': 1, |
||
| 126 | 'mandatory': 1, |
||
| 127 | 'description': 'Disable the plugins with potential to crash ' |
||
| 128 | 'the remote services', |
||
| 129 | }, |
||
| 130 | 'scanner_plugins_timeout': { |
||
| 131 | 'type': 'integer', |
||
| 132 | 'name': 'scanner_plugins_timeout', |
||
| 133 | 'default': 36000, |
||
| 134 | 'mandatory': 1, |
||
| 135 | 'description': 'Like plugins_timeout, but for ACT_SCANNER plugins.', |
||
| 136 | }, |
||
| 137 | 'time_between_request': { |
||
| 138 | 'type': 'integer', |
||
| 139 | 'name': 'time_between_request', |
||
| 140 | 'default': 0, |
||
| 141 | 'mandatory': 0, |
||
| 142 | 'description': 'Allow to set a wait time between two actions ' |
||
| 143 | '(open, send, close).', |
||
| 144 | }, |
||
| 145 | 'unscanned_closed': { |
||
| 146 | 'type': 'boolean', |
||
| 147 | 'name': 'unscanned_closed', |
||
| 148 | 'default': 1, |
||
| 149 | 'mandatory': 1, |
||
| 150 | 'description': '', |
||
| 151 | }, |
||
| 152 | 'unscanned_closed_udp': { |
||
| 153 | 'type': 'boolean', |
||
| 154 | 'name': 'unscanned_closed_udp', |
||
| 155 | 'default': 1, |
||
| 156 | 'mandatory': 1, |
||
| 157 | 'description': '', |
||
| 158 | }, |
||
| 159 | 'expand_vhosts': { |
||
| 160 | 'type': 'boolean', |
||
| 161 | 'name': 'expand_vhosts', |
||
| 162 | 'default': 1, |
||
| 163 | 'mandatory': 0, |
||
| 164 | 'description': 'Whether to expand the target hosts ' |
||
| 165 | + 'list of vhosts with values gathered from sources ' |
||
| 166 | + 'such as reverse-lookup queries and VT checks ' |
||
| 167 | + 'for SSL/TLS certificates.', |
||
| 168 | }, |
||
| 169 | 'test_empty_vhost': { |
||
| 170 | 'type': 'boolean', |
||
| 171 | 'name': 'test_empty_vhost', |
||
| 172 | 'default': 0, |
||
| 173 | 'mandatory': 0, |
||
| 174 | 'description': 'If set to yes, the scanner will ' |
||
| 175 | + 'also test the target by using empty vhost value ' |
||
| 176 | + 'in addition to the targets associated vhost values.', |
||
| 177 | }, |
||
| 178 | } |
||
| 179 | |||
| 180 | |||
| 181 | class TestOspdOpenvas(TestCase): |
||
| 182 | @patch('ospd_openvas.daemon.Openvas') |
||
| 183 | def test_set_params_from_openvas_settings(self, mock_openvas: Openvas): |
||
| 184 | mock_openvas.get_settings.return_value = { |
||
| 185 | 'non_simult_ports': '22', |
||
| 186 | 'plugins_folder': '/foo/bar', |
||
| 187 | } |
||
| 188 | w = DummyDaemon() |
||
| 189 | w.set_params_from_openvas_settings() |
||
| 190 | |||
| 191 | self.assertEqual(mock_openvas.get_settings.call_count, 1) |
||
| 192 | self.assertEqual(OSPD_PARAMS, OSPD_PARAMS_OUT) |
||
| 193 | self.assertEqual(w.scan_only_params.get('plugins_folder'), '/foo/bar') |
||
| 194 | |||
| 195 | @patch('ospd_openvas.daemon.Openvas') |
||
| 196 | def test_sudo_available(self, mock_openvas): |
||
| 197 | mock_openvas.check_sudo.return_value = True |
||
| 198 | |||
| 199 | w = DummyDaemon() |
||
| 200 | w._sudo_available = None # pylint: disable=protected-access |
||
| 201 | w.sudo_available # pylint: disable=pointless-statement |
||
| 202 | |||
| 203 | self.assertTrue(w.sudo_available) |
||
| 204 | |||
| 205 | def test_get_custom_xml(self): |
||
| 206 | out = ( |
||
| 207 | '<custom>' |
||
| 208 | '<required_ports>Services/www, 80</required_ports>' |
||
| 209 | '<category>3</category>' |
||
| 210 | '<excluded_keys>Settings/disable_cgi_scanning</excluded_keys>' |
||
| 211 | '<family>Product detection</family>' |
||
| 212 | '<filename>mantis_detect.nasl</filename>' |
||
| 213 | '<timeout>0</timeout>' |
||
| 214 | '</custom>' |
||
| 215 | ) |
||
| 216 | w = DummyDaemon() |
||
| 217 | |||
| 218 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 219 | res = w.get_custom_vt_as_xml_str( |
||
| 220 | '1.3.6.1.4.1.25623.1.0.100061', vt.get('custom') |
||
| 221 | ) |
||
| 222 | self.assertEqual(len(res), len(out)) |
||
| 223 | |||
| 224 | def test_get_custom_xml_failed(self): |
||
| 225 | w = DummyDaemon() |
||
| 226 | logging.Logger.warning = Mock() |
||
| 227 | |||
| 228 | custom = {'a': u"\u0006"} |
||
| 229 | w.get_custom_vt_as_xml_str( |
||
| 230 | '1.3.6.1.4.1.25623.1.0.100061', custom=custom |
||
| 231 | ) |
||
| 232 | |||
| 233 | assert_called_once(logging.Logger.warning) |
||
| 234 | |||
| 235 | def test_get_severities_xml(self): |
||
| 236 | w = DummyDaemon() |
||
| 237 | |||
| 238 | out = ( |
||
| 239 | '<severities>' |
||
| 240 | '<severity type="cvss_base_v2">' |
||
| 241 | 'AV:N/AC:L/Au:N/C:N/I:N/A:N' |
||
| 242 | '</severity>' |
||
| 243 | '</severities>' |
||
| 244 | ) |
||
| 245 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 246 | severities = vt.get('severities') |
||
| 247 | res = w.get_severities_vt_as_xml_str( |
||
| 248 | '1.3.6.1.4.1.25623.1.0.100061', severities |
||
| 249 | ) |
||
| 250 | |||
| 251 | self.assertEqual(res, out) |
||
| 252 | |||
| 253 | def test_get_severities_xml_failed(self): |
||
| 254 | w = DummyDaemon() |
||
| 255 | logging.Logger.warning = Mock() |
||
| 256 | |||
| 257 | sever = {'severity_base_vector': u"\u0006"} |
||
| 258 | w.get_severities_vt_as_xml_str( |
||
| 259 | '1.3.6.1.4.1.25623.1.0.100061', severities=sever |
||
| 260 | ) |
||
| 261 | |||
| 262 | assert_called_once(logging.Logger.warning) |
||
| 263 | |||
| 264 | def test_get_params_xml(self): |
||
| 265 | w = DummyDaemon() |
||
| 266 | out = ( |
||
| 267 | '<params>' |
||
| 268 | '<param type="checkbox" id="2">' |
||
| 269 | '<name>Do not randomize the order in which ports are ' |
||
| 270 | 'scanned</name>' |
||
| 271 | '<default>no</default>' |
||
| 272 | '</param>' |
||
| 273 | '<param type="entry" id="1">' |
||
| 274 | '<name>Data length :</name>' |
||
| 275 | '</param>' |
||
| 276 | '</params>' |
||
| 277 | ) |
||
| 278 | |||
| 279 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 280 | params = vt.get('vt_params') |
||
| 281 | res = w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params) |
||
| 282 | |||
| 283 | self.assertEqual(len(res), len(out)) |
||
| 284 | |||
| 285 | def test_get_params_xml_failed(self): |
||
| 286 | w = DummyDaemon() |
||
| 287 | logging.Logger.warning = Mock() |
||
| 288 | |||
| 289 | params = { |
||
| 290 | '1': { |
||
| 291 | 'id': '1', |
||
| 292 | 'type': 'entry', |
||
| 293 | 'default': u'\u0006', |
||
| 294 | 'name': 'dns-fuzz.timelimit', |
||
| 295 | 'description': 'Description', |
||
| 296 | } |
||
| 297 | } |
||
| 298 | w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params) |
||
| 299 | |||
| 300 | assert_called_once(logging.Logger.warning) |
||
| 301 | |||
| 302 | def test_get_refs_xml(self): |
||
| 303 | w = DummyDaemon() |
||
| 304 | |||
| 305 | out = '<refs><ref type="url" id="http://www.mantisbt.org/"/></refs>' |
||
| 306 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 307 | refs = vt.get('vt_refs') |
||
| 308 | res = w.get_refs_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', refs) |
||
| 309 | |||
| 310 | self.assertEqual(res, out) |
||
| 311 | |||
| 312 | def test_get_dependencies_xml(self): |
||
| 313 | w = DummyDaemon() |
||
| 314 | |||
| 315 | out = ( |
||
| 316 | '<dependencies>' |
||
| 317 | '<dependency vt_id="1.2.3.4"/><dependency vt_id="4.3.2.1"/>' |
||
| 318 | '</dependencies>' |
||
| 319 | ) |
||
| 320 | dep = ['1.2.3.4', '4.3.2.1'] |
||
| 321 | res = w.get_dependencies_vt_as_xml_str( |
||
| 322 | '1.3.6.1.4.1.25623.1.0.100061', dep |
||
| 323 | ) |
||
| 324 | |||
| 325 | self.assertEqual(res, out) |
||
| 326 | |||
| 327 | def test_get_dependencies_xml_failed(self): |
||
| 328 | w = DummyDaemon() |
||
| 329 | logging.Logger.error = Mock() |
||
| 330 | |||
| 331 | dep = [u"\u0006"] |
||
| 332 | w.get_dependencies_vt_as_xml_str( |
||
| 333 | '1.3.6.1.4.1.25623.1.0.100061', vt_dependencies=dep |
||
| 334 | ) |
||
| 335 | |||
| 336 | assert_called_once(logging.Logger.error) |
||
| 337 | |||
| 338 | def test_get_ctime_xml(self): |
||
| 339 | w = DummyDaemon() |
||
| 340 | |||
| 341 | out = '<creation_time>1237458156</creation_time>' |
||
| 342 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 343 | ctime = vt.get('creation_time') |
||
| 344 | res = w.get_creation_time_vt_as_xml_str( |
||
| 345 | '1.3.6.1.4.1.25623.1.0.100061', ctime |
||
| 346 | ) |
||
| 347 | |||
| 348 | self.assertEqual(res, out) |
||
| 349 | |||
| 350 | def test_get_ctime_xml_failed(self): |
||
| 351 | w = DummyDaemon() |
||
| 352 | logging.Logger.warning = Mock() |
||
| 353 | |||
| 354 | ctime = u'\u0006' |
||
| 355 | w.get_creation_time_vt_as_xml_str( |
||
| 356 | '1.3.6.1.4.1.25623.1.0.100061', vt_creation_time=ctime |
||
| 357 | ) |
||
| 358 | |||
| 359 | assert_called_once(logging.Logger.warning) |
||
| 360 | |||
| 361 | def test_get_mtime_xml(self): |
||
| 362 | w = DummyDaemon() |
||
| 363 | |||
| 364 | out = '<modification_time>1533906565</modification_time>' |
||
| 365 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 366 | mtime = vt.get('modification_time') |
||
| 367 | res = w.get_modification_time_vt_as_xml_str( |
||
| 368 | '1.3.6.1.4.1.25623.1.0.100061', mtime |
||
| 369 | ) |
||
| 370 | |||
| 371 | self.assertEqual(res, out) |
||
| 372 | |||
| 373 | def test_get_mtime_xml_failed(self): |
||
| 374 | w = DummyDaemon() |
||
| 375 | logging.Logger.warning = Mock() |
||
| 376 | |||
| 377 | mtime = u'\u0006' |
||
| 378 | w.get_modification_time_vt_as_xml_str( |
||
| 379 | '1.3.6.1.4.1.25623.1.0.100061', mtime |
||
| 380 | ) |
||
| 381 | |||
| 382 | assert_called_once(logging.Logger.warning) |
||
| 383 | |||
| 384 | def test_get_summary_xml(self): |
||
| 385 | w = DummyDaemon() |
||
| 386 | |||
| 387 | out = '<summary>some summary</summary>' |
||
| 388 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 389 | summary = vt.get('summary') |
||
| 390 | res = w.get_summary_vt_as_xml_str( |
||
| 391 | '1.3.6.1.4.1.25623.1.0.100061', summary |
||
| 392 | ) |
||
| 393 | |||
| 394 | self.assertEqual(res, out) |
||
| 395 | |||
| 396 | def test_get_summary_xml_failed(self): |
||
| 397 | w = DummyDaemon() |
||
| 398 | |||
| 399 | summary = u'\u0006' |
||
| 400 | logging.Logger.warning = Mock() |
||
| 401 | w.get_summary_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', summary) |
||
| 402 | |||
| 403 | assert_called_once(logging.Logger.warning) |
||
| 404 | |||
| 405 | def test_get_impact_xml(self): |
||
| 406 | w = DummyDaemon() |
||
| 407 | |||
| 408 | out = '<impact>some impact</impact>' |
||
| 409 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 410 | impact = vt.get('impact') |
||
| 411 | res = w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact) |
||
| 412 | |||
| 413 | self.assertEqual(res, out) |
||
| 414 | |||
| 415 | def test_get_impact_xml_failed(self): |
||
| 416 | w = DummyDaemon() |
||
| 417 | logging.Logger.warning = Mock() |
||
| 418 | |||
| 419 | impact = u'\u0006' |
||
| 420 | w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact) |
||
| 421 | |||
| 422 | assert_called_once(logging.Logger.warning) |
||
| 423 | |||
| 424 | def test_get_insight_xml(self): |
||
| 425 | w = DummyDaemon() |
||
| 426 | |||
| 427 | out = '<insight>some insight</insight>' |
||
| 428 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 429 | insight = vt.get('insight') |
||
| 430 | res = w.get_insight_vt_as_xml_str( |
||
| 431 | '1.3.6.1.4.1.25623.1.0.100061', insight |
||
| 432 | ) |
||
| 433 | |||
| 434 | self.assertEqual(res, out) |
||
| 435 | |||
| 436 | def test_get_insight_xml_failed(self): |
||
| 437 | w = DummyDaemon() |
||
| 438 | logging.Logger.warning = Mock() |
||
| 439 | |||
| 440 | insight = u'\u0006' |
||
| 441 | w.get_insight_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', insight) |
||
| 442 | |||
| 443 | assert_called_once(logging.Logger.warning) |
||
| 444 | |||
| 445 | def test_get_solution_xml(self): |
||
| 446 | w = DummyDaemon() |
||
| 447 | |||
| 448 | out = ( |
||
| 449 | '<solution type="WillNotFix" method="DebianAPTUpgrade">' |
||
| 450 | 'some solution' |
||
| 451 | '</solution>' |
||
| 452 | ) |
||
| 453 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 454 | solution = vt.get('solution') |
||
| 455 | solution_type = vt.get('solution_type') |
||
| 456 | solution_method = vt.get('solution_method') |
||
| 457 | |||
| 458 | res = w.get_solution_vt_as_xml_str( |
||
| 459 | '1.3.6.1.4.1.25623.1.0.100061', |
||
| 460 | solution, |
||
| 461 | solution_type, |
||
| 462 | solution_method, |
||
| 463 | ) |
||
| 464 | |||
| 465 | self.assertEqual(res, out) |
||
| 466 | |||
| 467 | def test_get_solution_xml_failed(self): |
||
| 468 | w = DummyDaemon() |
||
| 469 | logging.Logger.warning = Mock() |
||
| 470 | |||
| 471 | solution = u'\u0006' |
||
| 472 | w.get_solution_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', solution) |
||
| 473 | |||
| 474 | assert_called_once(logging.Logger.warning) |
||
| 475 | |||
| 476 | def test_get_detection_xml(self): |
||
| 477 | w = DummyDaemon() |
||
| 478 | |||
| 479 | out = '<detection qod_type="remote_banner"/>' |
||
| 480 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 481 | detection_type = vt.get('qod_type') |
||
| 482 | |||
| 483 | res = w.get_detection_vt_as_xml_str( |
||
| 484 | '1.3.6.1.4.1.25623.1.0.100061', qod_type=detection_type |
||
| 485 | ) |
||
| 486 | |||
| 487 | self.assertEqual(res, out) |
||
| 488 | |||
| 489 | def test_get_detection_xml_failed(self): |
||
| 490 | w = DummyDaemon() |
||
| 491 | logging.Logger.warning = Mock() |
||
| 492 | |||
| 493 | detection = u'\u0006' |
||
| 494 | w.get_detection_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', detection) |
||
| 495 | |||
| 496 | assert_called_once(logging.Logger.warning) |
||
| 497 | |||
| 498 | def test_get_affected_xml(self): |
||
| 499 | w = DummyDaemon() |
||
| 500 | out = '<affected>some affection</affected>' |
||
| 501 | vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 502 | affected = vt.get('affected') |
||
| 503 | |||
| 504 | res = w.get_affected_vt_as_xml_str( |
||
| 505 | '1.3.6.1.4.1.25623.1.0.100061', affected=affected |
||
| 506 | ) |
||
| 507 | |||
| 508 | self.assertEqual(res, out) |
||
| 509 | |||
| 510 | def test_get_affected_xml_failed(self): |
||
| 511 | w = DummyDaemon() |
||
| 512 | logging.Logger.warning = Mock() |
||
| 513 | |||
| 514 | affected = u"\u0006" + "affected" |
||
| 515 | w.get_affected_vt_as_xml_str( |
||
| 516 | '1.3.6.1.4.1.25623.1.0.100061', affected=affected |
||
| 517 | ) |
||
| 518 | |||
| 519 | assert_called_once(logging.Logger.warning) |
||
| 520 | |||
| 521 | @patch('ospd_openvas.daemon.Path.exists') |
||
| 522 | @patch('ospd_openvas.daemon.OSPDopenvas.set_params_from_openvas_settings') |
||
| 523 | def test_feed_is_outdated_none( |
||
| 524 | self, mock_set_params: MagicMock, mock_path_exists: MagicMock |
||
| 525 | ): |
||
| 526 | w = DummyDaemon() |
||
| 527 | |||
| 528 | w.scan_only_params['plugins_folder'] = '/foo/bar' |
||
| 529 | |||
| 530 | # Return None |
||
| 531 | mock_path_exists.return_value = False |
||
| 532 | |||
| 533 | ret = w.feed_is_outdated('1234') |
||
| 534 | self.assertIsNone(ret) |
||
| 535 | |||
| 536 | self.assertEqual(mock_set_params.call_count, 1) |
||
| 537 | self.assertEqual(mock_path_exists.call_count, 1) |
||
| 538 | |||
| 539 | @patch('ospd_openvas.daemon.Path.exists') |
||
| 540 | @patch('ospd_openvas.daemon.Path.open') |
||
| 541 | def test_feed_is_outdated_true( |
||
| 542 | self, mock_path_open: MagicMock, mock_path_exists: MagicMock, |
||
| 543 | ): |
||
| 544 | read_data = 'PLUGIN_SET = "1235";' |
||
| 545 | |||
| 546 | mock_path_exists.return_value = True |
||
| 547 | mock_read = MagicMock(name='Path open context manager') |
||
| 548 | mock_read.__enter__ = MagicMock(return_value=io.StringIO(read_data)) |
||
| 549 | mock_path_open.return_value = mock_read |
||
| 550 | |||
| 551 | w = DummyDaemon() |
||
| 552 | |||
| 553 | # Return True |
||
| 554 | w.scan_only_params['plugins_folder'] = '/foo/bar' |
||
| 555 | |||
| 556 | ret = w.feed_is_outdated('1234') |
||
| 557 | self.assertTrue(ret) |
||
| 558 | |||
| 559 | self.assertEqual(mock_path_exists.call_count, 1) |
||
| 560 | self.assertEqual(mock_path_open.call_count, 1) |
||
| 561 | |||
| 562 | @patch('ospd_openvas.daemon.Path.exists') |
||
| 563 | @patch('ospd_openvas.daemon.Path.open') |
||
| 564 | def test_feed_is_outdated_false( |
||
| 565 | self, mock_path_open: MagicMock, mock_path_exists: MagicMock, |
||
| 566 | ): |
||
| 567 | mock_path_exists.return_value = True |
||
| 568 | |||
| 569 | read_data = 'PLUGIN_SET = "1234"' |
||
| 570 | mock_path_exists.return_value = True |
||
| 571 | mock_read = MagicMock(name='Path open context manager') |
||
| 572 | mock_read.__enter__ = MagicMock(return_value=io.StringIO(read_data)) |
||
| 573 | mock_path_open.return_value = mock_read |
||
| 574 | |||
| 575 | w = DummyDaemon() |
||
| 576 | w.scan_only_params['plugins_folder'] = '/foo/bar' |
||
| 577 | |||
| 578 | ret = w.feed_is_outdated('1234') |
||
| 579 | self.assertFalse(ret) |
||
| 580 | |||
| 581 | self.assertEqual(mock_path_exists.call_count, 1) |
||
| 582 | self.assertEqual(mock_path_open.call_count, 1) |
||
| 583 | |||
| 584 | def test_check_feed_cache_unavailable(self): |
||
| 585 | w = DummyDaemon() |
||
| 586 | w.vts.is_cache_available = False |
||
| 587 | w.feed_is_outdated = Mock() |
||
| 588 | res = w.check_feed() |
||
| 589 | |||
| 590 | self.assertFalse(res) |
||
| 591 | w.feed_is_outdated.assert_not_called() |
||
| 592 | |||
| 593 | View Code Duplication | @patch('ospd_openvas.daemon.ScanDB') |
|
|
|
|||
| 594 | @patch('ospd_openvas.daemon.ResultList.add_scan_log_to_list') |
||
| 595 | def test_get_openvas_result(self, mock_add_scan_log_to_list, MockDBClass): |
||
| 596 | w = DummyDaemon() |
||
| 597 | |||
| 598 | target_element = w.create_xml_target() |
||
| 599 | targets = OspRequest.process_target_element(target_element) |
||
| 600 | w.create_scan('123-456', targets, None, []) |
||
| 601 | |||
| 602 | results = ["LOG||| |||general/Host_Details||| |||Host dead", None] |
||
| 603 | mock_db = MockDBClass.return_value |
||
| 604 | mock_db.get_result.side_effect = results |
||
| 605 | mock_add_scan_log_to_list.return_value = None |
||
| 606 | |||
| 607 | w.report_openvas_results(mock_db, '123-456', 'localhost') |
||
| 608 | mock_add_scan_log_to_list.assert_called_with( |
||
| 609 | host='localhost', |
||
| 610 | hostname='', |
||
| 611 | name='', |
||
| 612 | port='general/Host_Details', |
||
| 613 | qod='', |
||
| 614 | test_id='', |
||
| 615 | value='Host dead', |
||
| 616 | ) |
||
| 617 | |||
| 618 | View Code Duplication | @patch('ospd_openvas.daemon.ScanDB') |
|
| 619 | @patch('ospd_openvas.daemon.ResultList.add_scan_error_to_list') |
||
| 620 | def test_get_openvas_result_host_deny( |
||
| 621 | self, mock_add_scan_error_to_list, MockDBClass |
||
| 622 | ): |
||
| 623 | w = DummyDaemon() |
||
| 624 | |||
| 625 | target_element = w.create_xml_target() |
||
| 626 | targets = OspRequest.process_target_element(target_element) |
||
| 627 | w.create_scan('123-456', targets, None, []) |
||
| 628 | |||
| 629 | results = ["ERRMSG|||127.0.0.1|||||| |||Host access denied.", None] |
||
| 630 | mock_db = MockDBClass.return_value |
||
| 631 | mock_db.get_result.side_effect = results |
||
| 632 | mock_add_scan_error_to_list.return_value = None |
||
| 633 | |||
| 634 | w.report_openvas_results(mock_db, '123-456', '') |
||
| 635 | mock_add_scan_error_to_list.assert_called_with( |
||
| 636 | host='127.0.0.1', |
||
| 637 | hostname='127.0.0.1', |
||
| 638 | name='', |
||
| 639 | port='', |
||
| 640 | test_id='', |
||
| 641 | value='Host access denied.', |
||
| 642 | ) |
||
| 643 | |||
| 644 | @patch('ospd_openvas.daemon.ScanDB') |
||
| 645 | def test_get_openvas_result_dead_hosts(self, MockDBClass): |
||
| 646 | w = DummyDaemon() |
||
| 647 | target_element = w.create_xml_target() |
||
| 648 | targets = OspRequest.process_target_element(target_element) |
||
| 649 | w.create_scan('123-456', targets, None, []) |
||
| 650 | |||
| 651 | results = ["DEADHOST||| ||| ||| |||4", None] |
||
| 652 | mock_db = MockDBClass.return_value |
||
| 653 | mock_db.get_result.side_effect = results |
||
| 654 | w.scan_collection.set_amount_dead_hosts = MagicMock() |
||
| 655 | |||
| 656 | w.report_openvas_results(mock_db, '123-456', 'localhost') |
||
| 657 | w.scan_collection.set_amount_dead_hosts.assert_called_with( |
||
| 658 | '123-456', total_dead=4, |
||
| 659 | ) |
||
| 660 | |||
| 661 | @patch('ospd_openvas.db.KbDB') |
||
| 662 | def test_openvas_is_alive_already_stopped(self, mock_db): |
||
| 663 | w = DummyDaemon() |
||
| 664 | # mock_psutil = MockPsutil.return_value |
||
| 665 | mock_db.scan_is_stopped.return_value = True |
||
| 666 | ret = w.is_openvas_process_alive(mock_db, '1234', 'a1-b2-c3-d4') |
||
| 667 | |||
| 668 | self.assertTrue(ret) |
||
| 669 | |||
| 670 | @patch('ospd_openvas.db.KbDB') |
||
| 671 | def test_openvas_is_alive_still(self, mock_db): |
||
| 672 | w = DummyDaemon() |
||
| 673 | # mock_psutil = MockPsutil.return_value |
||
| 674 | mock_db.scan_is_stopped.return_value = False |
||
| 675 | ret = w.is_openvas_process_alive(mock_db, '1234', 'a1-b2-c3-d4') |
||
| 676 | |||
| 677 | self.assertFalse(ret) |
||
| 678 | |||
| 679 | @patch('ospd_openvas.daemon.OSPDaemon.set_scan_host_progress') |
||
| 680 | def test_update_progress(self, mock_set_scan_host_progress): |
||
| 681 | w = DummyDaemon() |
||
| 682 | |||
| 683 | mock_set_scan_host_progress.return_value = None |
||
| 684 | |||
| 685 | msg = '0/-1' |
||
| 686 | target_element = w.create_xml_target() |
||
| 687 | targets = OspRequest.process_target_element(target_element) |
||
| 688 | |||
| 689 | w.create_scan('123-456', targets, None, []) |
||
| 690 | w.update_progress('123-456', 'localhost', msg) |
||
| 691 | |||
| 692 | mock_set_scan_host_progress.assert_called_with( |
||
| 693 | '123-456', host='localhost', progress=-1 |
||
| 694 | ) |
||
| 695 | |||
| 696 | |||
| 697 | class TestFilters(TestCase): |
||
| 698 | def test_format_vt_modification_time(self): |
||
| 699 | ovformat = OpenVasVtsFilter(None) |
||
| 700 | td = '1517443741' |
||
| 701 | formatted = ovformat.format_vt_modification_time(td) |
||
| 702 | self.assertEqual(formatted, "20180201000901") |
||
| 703 | |||
| 704 | def test_get_filtered_vts_false(self): |
||
| 705 | w = DummyDaemon() |
||
| 706 | vts_collection = ['1234', '1.3.6.1.4.1.25623.1.0.100061'] |
||
| 707 | |||
| 708 | ovfilter = OpenVasVtsFilter(w.nvti) |
||
| 709 | res = ovfilter.get_filtered_vts_list( |
||
| 710 | vts_collection, "modification_time<10" |
||
| 711 | ) |
||
| 712 | self.assertNotIn('1.3.6.1.4.1.25623.1.0.100061', res) |
||
| 713 | |||
| 714 | def test_get_filtered_vts_true(self): |
||
| 715 | w = DummyDaemon() |
||
| 716 | vts_collection = ['1234', '1.3.6.1.4.1.25623.1.0.100061'] |
||
| 717 | |||
| 718 | ovfilter = OpenVasVtsFilter(w.nvti) |
||
| 719 | res = ovfilter.get_filtered_vts_list( |
||
| 720 | vts_collection, "modification_time>10" |
||
| 721 | ) |
||
| 722 | self.assertIn('1.3.6.1.4.1.25623.1.0.100061', res) |
||
| 723 |