| Total Complexity | 65 |
| Total Lines | 847 |
| Duplicated Lines | 8.74 % |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like tests.test_daemon often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | # -*- coding: utf-8 -*- |
||
| 2 | # Copyright (C) 2018-2019 Greenbone Networks GmbH |
||
| 3 | # |
||
| 4 | # SPDX-License-Identifier: GPL-2.0-or-later |
||
| 5 | # |
||
| 6 | # This program is free software; you can redistribute it and/or |
||
| 7 | # modify it under the terms of the GNU General Public License |
||
| 8 | # as published by the Free Software Foundation; either version 2 |
||
| 9 | # of the License, or (at your option) any later version. |
||
| 10 | # |
||
| 11 | # This program is distributed in the hope that it will be useful, |
||
| 12 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
||
| 13 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||
| 14 | # GNU General Public License for more details. |
||
| 15 | # |
||
| 16 | # You should have received a copy of the GNU General Public License |
||
| 17 | # along with this program; if not, write to the Free Software |
||
| 18 | # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA |
||
| 19 | |||
| 20 | # pylint: disable=invalid-name,line-too-long |
||
| 21 | |||
| 22 | """ Unit Test for ospd-openvas """ |
||
| 23 | |||
| 24 | from pathlib import Path |
||
| 25 | |||
| 26 | from unittest import TestCase |
||
| 27 | from unittest.mock import patch, Mock, MagicMock |
||
| 28 | |||
| 29 | from ospd.vts import Vts |
||
| 30 | |||
| 31 | import io |
||
| 32 | import logging |
||
| 33 | |||
| 34 | from tests.dummydaemon import DummyDaemon |
||
| 35 | |||
| 36 | from ospd_openvas.daemon import OSPD_PARAMS, OpenVasVtsFilter |
||
| 37 | from ospd_openvas.openvas import Openvas |
||
| 38 | |||
| 39 | OSPD_PARAMS_OUT = { |
||
| 40 | 'auto_enable_dependencies': { |
||
| 41 | 'type': 'boolean', |
||
| 42 | 'name': 'auto_enable_dependencies', |
||
| 43 | 'default': 1, |
||
| 44 | 'mandatory': 1, |
||
| 45 | 'description': 'Automatically enable the plugins that are depended on', |
||
| 46 | }, |
||
| 47 | 'cgi_path': { |
||
| 48 | 'type': 'string', |
||
| 49 | 'name': 'cgi_path', |
||
| 50 | 'default': '/cgi-bin:/scripts', |
||
| 51 | 'mandatory': 1, |
||
| 52 | 'description': 'Look for default CGIs in /cgi-bin and /scripts', |
||
| 53 | }, |
||
| 54 | 'checks_read_timeout': { |
||
| 55 | 'type': 'integer', |
||
| 56 | 'name': 'checks_read_timeout', |
||
| 57 | 'default': 5, |
||
| 58 | 'mandatory': 1, |
||
| 59 | 'description': 'Number of seconds that the security checks will ' |
||
| 60 | 'wait for when doing a recv()', |
||
| 61 | }, |
||
| 62 | 'drop_privileges': { |
||
| 63 | 'type': 'boolean', |
||
| 64 | 'name': 'drop_privileges', |
||
| 65 | 'default': 0, |
||
| 66 | 'mandatory': 1, |
||
| 67 | 'description': '', |
||
| 68 | }, |
||
| 69 | 'network_scan': { |
||
| 70 | 'type': 'boolean', |
||
| 71 | 'name': 'network_scan', |
||
| 72 | 'default': 0, |
||
| 73 | 'mandatory': 1, |
||
| 74 | 'description': '', |
||
| 75 | }, |
||
| 76 | 'non_simult_ports': { |
||
| 77 | 'type': 'string', |
||
| 78 | 'name': 'non_simult_ports', |
||
| 79 | 'default': '22', |
||
| 80 | 'mandatory': 1, |
||
| 81 | 'description': 'Prevent to make two connections on the same given ' |
||
| 82 | 'ports at the same time.', |
||
| 83 | }, |
||
| 84 | 'open_sock_max_attempts': { |
||
| 85 | 'type': 'integer', |
||
| 86 | 'name': 'open_sock_max_attempts', |
||
| 87 | 'default': 5, |
||
| 88 | 'mandatory': 0, |
||
| 89 | 'description': 'Number of unsuccessful retries to open the socket ' |
||
| 90 | 'before to set the port as closed.', |
||
| 91 | }, |
||
| 92 | 'timeout_retry': { |
||
| 93 | 'type': 'integer', |
||
| 94 | 'name': 'timeout_retry', |
||
| 95 | 'default': 5, |
||
| 96 | 'mandatory': 0, |
||
| 97 | 'description': 'Number of retries when a socket connection attempt ' |
||
| 98 | 'timesout.', |
||
| 99 | }, |
||
| 100 | 'optimize_test': { |
||
| 101 | 'type': 'integer', |
||
| 102 | 'name': 'optimize_test', |
||
| 103 | 'default': 5, |
||
| 104 | 'mandatory': 0, |
||
| 105 | 'description': 'By default, openvas does not trust the remote ' |
||
| 106 | 'host banners.', |
||
| 107 | }, |
||
| 108 | 'plugins_timeout': { |
||
| 109 | 'type': 'integer', |
||
| 110 | 'name': 'plugins_timeout', |
||
| 111 | 'default': 5, |
||
| 112 | 'mandatory': 0, |
||
| 113 | 'description': 'This is the maximum lifetime, in seconds of a plugin.', |
||
| 114 | }, |
||
| 115 | 'report_host_details': { |
||
| 116 | 'type': 'boolean', |
||
| 117 | 'name': 'report_host_details', |
||
| 118 | 'default': 1, |
||
| 119 | 'mandatory': 1, |
||
| 120 | 'description': '', |
||
| 121 | }, |
||
| 122 | 'safe_checks': { |
||
| 123 | 'type': 'boolean', |
||
| 124 | 'name': 'safe_checks', |
||
| 125 | 'default': 1, |
||
| 126 | 'mandatory': 1, |
||
| 127 | 'description': 'Disable the plugins with potential to crash ' |
||
| 128 | 'the remote services', |
||
| 129 | }, |
||
| 130 | 'scanner_plugins_timeout': { |
||
| 131 | 'type': 'integer', |
||
| 132 | 'name': 'scanner_plugins_timeout', |
||
| 133 | 'default': 36000, |
||
| 134 | 'mandatory': 1, |
||
| 135 | 'description': 'Like plugins_timeout, but for ACT_SCANNER plugins.', |
||
| 136 | }, |
||
| 137 | 'time_between_request': { |
||
| 138 | 'type': 'integer', |
||
| 139 | 'name': 'time_between_request', |
||
| 140 | 'default': 0, |
||
| 141 | 'mandatory': 0, |
||
| 142 | 'description': 'Allow to set a wait time between two actions ' |
||
| 143 | '(open, send, close).', |
||
| 144 | }, |
||
| 145 | 'unscanned_closed': { |
||
| 146 | 'type': 'boolean', |
||
| 147 | 'name': 'unscanned_closed', |
||
| 148 | 'default': 1, |
||
| 149 | 'mandatory': 1, |
||
| 150 | 'description': '', |
||
| 151 | }, |
||
| 152 | 'unscanned_closed_udp': { |
||
| 153 | 'type': 'boolean', |
||
| 154 | 'name': 'unscanned_closed_udp', |
||
| 155 | 'default': 1, |
||
| 156 | 'mandatory': 1, |
||
| 157 | 'description': '', |
||
| 158 | }, |
||
| 159 | 'expand_vhosts': { |
||
| 160 | 'type': 'boolean', |
||
| 161 | 'name': 'expand_vhosts', |
||
| 162 | 'default': 1, |
||
| 163 | 'mandatory': 0, |
||
| 164 | 'description': 'Whether to expand the target hosts ' |
||
| 165 | + 'list of vhosts with values gathered from sources ' |
||
| 166 | + 'such as reverse-lookup queries and VT checks ' |
||
| 167 | + 'for SSL/TLS certificates.', |
||
| 168 | }, |
||
| 169 | 'test_empty_vhost': { |
||
| 170 | 'type': 'boolean', |
||
| 171 | 'name': 'test_empty_vhost', |
||
| 172 | 'default': 0, |
||
| 173 | 'mandatory': 0, |
||
| 174 | 'description': 'If set to yes, the scanner will ' |
||
| 175 | + 'also test the target by using empty vhost value ' |
||
| 176 | + 'in addition to the targets associated vhost values.', |
||
| 177 | }, |
||
| 178 | } |
||
| 179 | |||
| 180 | |||
| 181 | @patch('ospd_openvas.db.OpenvasDB') |
||
| 182 | @patch('ospd_openvas.nvticache.NVTICache') |
||
| 183 | class TestOspdOpenvas(TestCase): |
||
| 184 | @patch('ospd_openvas.daemon.Openvas') |
||
| 185 | def test_set_params_from_openvas_settings( |
||
| 186 | self, mock_openvas: Openvas, mock_nvti: MagicMock, mock_db: MagicMock |
||
| 187 | ): |
||
| 188 | mock_openvas.get_settings.return_value = { |
||
| 189 | 'non_simult_ports': '22', |
||
| 190 | 'plugins_folder': '/foo/bar', |
||
| 191 | } |
||
| 192 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 193 | w.set_params_from_openvas_settings() |
||
| 194 | |||
| 195 | self.assertEqual(mock_openvas.get_settings.call_count, 1) |
||
| 196 | self.assertEqual(OSPD_PARAMS, OSPD_PARAMS_OUT) |
||
| 197 | self.assertEqual(w.scan_only_params.get('plugins_folder'), '/foo/bar') |
||
| 198 | |||
| 199 | @patch('ospd_openvas.daemon.Openvas') |
||
| 200 | def test_sudo_available(self, mock_openvas, mock_nvti, mock_db): |
||
| 201 | mock_openvas.check_sudo.return_value = True |
||
| 202 | |||
| 203 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 204 | w._sudo_available = None # pylint: disable=protected-access |
||
| 205 | w.sudo_available # pylint: disable=pointless-statement |
||
| 206 | |||
| 207 | self.assertTrue(w.sudo_available) |
||
| 208 | |||
| 209 | def test_load_vts(self, mock_nvti, mock_db): |
||
| 210 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 211 | w.load_vts() |
||
| 212 | self.maxDiff = None |
||
| 213 | self.assertIsInstance(w.vts, type(Vts())) |
||
| 214 | self.assertEqual(len(w.vts), len(w.VT)) |
||
| 215 | |||
| 216 | def test_get_custom_xml(self, mock_nvti, mock_db): |
||
| 217 | out = ( |
||
| 218 | '<custom><required_ports>Services/www, 80</re' |
||
| 219 | 'quired_ports><category>3</category><' |
||
| 220 | 'excluded_keys>Settings/disable_cgi_s' |
||
| 221 | 'canning</excluded_keys><family>Produ' |
||
| 222 | 'ct detection</family><filename>manti' |
||
| 223 | 's_detect.nasl</filename><timeout>0</' |
||
| 224 | 'timeout></custom>' |
||
| 225 | ) |
||
| 226 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 227 | vt = w.VT['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 228 | res = w.get_custom_vt_as_xml_str( |
||
| 229 | '1.3.6.1.4.1.25623.1.0.100061', vt.get('custom') |
||
| 230 | ) |
||
| 231 | self.assertEqual(len(res), len(out)) |
||
| 232 | |||
| 233 | def test_get_custom_xml_failed(self, mock_nvti, mock_db): |
||
| 234 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 235 | custom = {'a': u"\u0006"} |
||
| 236 | logging.Logger.warning = Mock() |
||
| 237 | w.get_custom_vt_as_xml_str( |
||
| 238 | '1.3.6.1.4.1.25623.1.0.100061', custom=custom |
||
| 239 | ) |
||
| 240 | if hasattr(Mock, 'assert_called_once'): |
||
| 241 | logging.Logger.warning.assert_called_once() |
||
| 242 | |||
| 243 | def test_get_severities_xml(self, mock_nvti, mock_db): |
||
| 244 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 245 | out = ( |
||
| 246 | '<severities><severity type="cvss_base_v2">' |
||
| 247 | 'AV:N/AC:L/Au:N/C:N/I:N/A:N</severity></severities>' |
||
| 248 | ) |
||
| 249 | vt = w.VT['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 250 | severities = vt.get('severities') |
||
| 251 | res = w.get_severities_vt_as_xml_str( |
||
| 252 | '1.3.6.1.4.1.25623.1.0.100061', severities |
||
| 253 | ) |
||
| 254 | |||
| 255 | self.assertEqual(res, out) |
||
| 256 | |||
| 257 | def test_get_severities_xml_failed(self, mock_nvti, mock_db): |
||
| 258 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 259 | vt = w.VT['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 260 | sever = {'severity_base_vector': u"\u0006"} |
||
| 261 | logging.Logger.warning = Mock() |
||
| 262 | w.get_severities_vt_as_xml_str( |
||
| 263 | '1.3.6.1.4.1.25623.1.0.100061', severities=sever |
||
| 264 | ) |
||
| 265 | if hasattr(Mock, 'assert_called_once'): |
||
| 266 | logging.Logger.warning.assert_called_once() |
||
| 267 | |||
| 268 | def test_get_params_xml(self, mock_nvti, mock_db): |
||
| 269 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 270 | out = ( |
||
| 271 | '<params><param type="checkbox" id="2"><name>Do ' |
||
| 272 | 'not randomize the order in which ports are scanned</name' |
||
| 273 | '><default>no</default></param><param type="ent' |
||
| 274 | 'ry" id="1"><name>Data length :</name><' |
||
| 275 | '/param></params>' |
||
| 276 | ) |
||
| 277 | |||
| 278 | vt = w.VT['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 279 | params = vt.get('vt_params') |
||
| 280 | res = w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params) |
||
| 281 | self.assertEqual(len(res), len(out)) |
||
| 282 | |||
| 283 | def test_get_params_xml_failed(self, mock_nvti, mock_db): |
||
| 284 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 285 | vt = w.VT['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 286 | params = { |
||
| 287 | '1': { |
||
| 288 | 'id': '1', |
||
| 289 | 'type': 'entry', |
||
| 290 | 'default': u'\u0006', |
||
| 291 | 'name': 'dns-fuzz.timelimit', |
||
| 292 | 'description': 'Description', |
||
| 293 | } |
||
| 294 | } |
||
| 295 | logging.Logger.warning = Mock() |
||
| 296 | w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params) |
||
| 297 | if hasattr(Mock, 'assert_called_once'): |
||
| 298 | logging.Logger.warning.assert_called_once() |
||
| 299 | |||
| 300 | def test_get_refs_xml(self, mock_nvti, mock_db): |
||
| 301 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 302 | out = '<refs><ref type="url" id="http://www.mantisbt.org/"/>' '</refs>' |
||
| 303 | vt = w.VT['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 304 | refs = vt.get('vt_refs') |
||
| 305 | res = w.get_refs_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', refs) |
||
| 306 | |||
| 307 | self.assertEqual(res, out) |
||
| 308 | |||
| 309 | def test_get_dependencies_xml(self, mock_nvti, mock_db): |
||
| 310 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 311 | out = ( |
||
| 312 | '<dependencies><dependency vt_id="1.2.3.4"/><dependency vt' |
||
| 313 | '_id="4.3.2.1"/></dependencies>' |
||
| 314 | ) |
||
| 315 | dep = ['1.2.3.4', '4.3.2.1'] |
||
| 316 | res = w.get_dependencies_vt_as_xml_str( |
||
| 317 | '1.3.6.1.4.1.25623.1.0.100061', dep |
||
| 318 | ) |
||
| 319 | |||
| 320 | self.assertEqual(res, out) |
||
| 321 | |||
| 322 | def test_get_dependencies_xml_failed(self, mock_nvti, mock_db): |
||
| 323 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 324 | vt = w.VT['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 325 | dep = [u"\u0006"] |
||
| 326 | logging.Logger.error = Mock() |
||
| 327 | w.get_dependencies_vt_as_xml_str( |
||
| 328 | '1.3.6.1.4.1.25623.1.0.100061', vt_dependencies=dep |
||
| 329 | ) |
||
| 330 | if hasattr(Mock, 'assert_called_once'): |
||
| 331 | logging.Logger.error.assert_called_once() |
||
| 332 | |||
| 333 | def test_get_ctime_xml(self, mock_nvti, mock_db): |
||
| 334 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 335 | out = '<creation_time>1237458156</creation_time>' |
||
| 336 | vt = w.VT['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 337 | ctime = vt.get('creation_time') |
||
| 338 | res = w.get_creation_time_vt_as_xml_str( |
||
| 339 | '1.3.6.1.4.1.25623.1.0.100061', ctime |
||
| 340 | ) |
||
| 341 | |||
| 342 | self.assertEqual(res, out) |
||
| 343 | |||
| 344 | def test_get_ctime_xml_failed(self, mock_nvti, mock_db): |
||
| 345 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 346 | vt = w.VT['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 347 | ctime = u'\u0006' |
||
| 348 | logging.Logger.warning = Mock() |
||
| 349 | w.get_creation_time_vt_as_xml_str( |
||
| 350 | '1.3.6.1.4.1.25623.1.0.100061', vt_creation_time=ctime |
||
| 351 | ) |
||
| 352 | if hasattr(Mock, 'assert_called_onc'): |
||
| 353 | logging.Logger.warning.assert_called_once() |
||
| 354 | |||
| 355 | def test_get_mtime_xml(self, mock_nvti, mock_db): |
||
| 356 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 357 | out = '<modification_time>1533906565</modification_time>' |
||
| 358 | vt = w.VT['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 359 | mtime = vt.get('modification_time') |
||
| 360 | res = w.get_modification_time_vt_as_xml_str( |
||
| 361 | '1.3.6.1.4.1.25623.1.0.100061', mtime |
||
| 362 | ) |
||
| 363 | |||
| 364 | self.assertEqual(res, out) |
||
| 365 | |||
| 366 | def test_get_mtime_xml_failed(self, mock_nvti, mock_db): |
||
| 367 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 368 | vt = w.VT['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 369 | mtime = u'\u0006' |
||
| 370 | logging.Logger.warning = Mock() |
||
| 371 | w.get_modification_time_vt_as_xml_str( |
||
| 372 | '1.3.6.1.4.1.25623.1.0.100061', mtime |
||
| 373 | ) |
||
| 374 | if hasattr(Mock, 'assert_called_once'): |
||
| 375 | logging.Logger.warning.assert_called_once() |
||
| 376 | |||
| 377 | def test_get_summary_xml(self, mock_nvti, mock_db): |
||
| 378 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 379 | out = '<summary>some summary</summary>' |
||
| 380 | vt = w.VT['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 381 | summary = vt.get('summary') |
||
| 382 | res = w.get_summary_vt_as_xml_str( |
||
| 383 | '1.3.6.1.4.1.25623.1.0.100061', summary |
||
| 384 | ) |
||
| 385 | |||
| 386 | self.assertEqual(res, out) |
||
| 387 | |||
| 388 | def test_get_summary_xml_failed(self, mock_nvti, mock_db): |
||
| 389 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 390 | vt = w.VT['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 391 | summary = u'\u0006' |
||
| 392 | logging.Logger.warning = Mock() |
||
| 393 | w.get_summary_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', summary) |
||
| 394 | if hasattr(Mock, 'assert_calledonce'): |
||
| 395 | logging.Logger.warning.assert_called_once() |
||
| 396 | |||
| 397 | def test_get_impact_xml(self, mock_nvti, mock_db): |
||
| 398 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 399 | out = '<impact>some impact</impact>' |
||
| 400 | vt = w.VT['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 401 | impact = vt.get('impact') |
||
| 402 | res = w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact) |
||
| 403 | |||
| 404 | self.assertEqual(res, out) |
||
| 405 | |||
| 406 | def test_get_impact_xml_failed(self, mock_nvti, mock_db): |
||
| 407 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 408 | vt = w.VT['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 409 | impact = u'\u0006' |
||
| 410 | logging.Logger.warning = Mock() |
||
| 411 | w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact) |
||
| 412 | if hasattr(Mock, 'assert_called_once'): |
||
| 413 | logging.Logger.warning.assert_called_once() |
||
| 414 | |||
| 415 | def test_get_insight_xml(self, mock_nvti, mock_db): |
||
| 416 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 417 | out = '<insight>some insight</insight>' |
||
| 418 | vt = w.VT['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 419 | insight = vt.get('insight') |
||
| 420 | res = w.get_insight_vt_as_xml_str( |
||
| 421 | '1.3.6.1.4.1.25623.1.0.100061', insight |
||
| 422 | ) |
||
| 423 | |||
| 424 | self.assertEqual(res, out) |
||
| 425 | |||
| 426 | def test_get_insight_xml_failed(self, mock_nvti, mock_db): |
||
| 427 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 428 | vt = w.VT['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 429 | insight = u'\u0006' |
||
| 430 | logging.Logger.warning = Mock() |
||
| 431 | w.get_insight_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', insight) |
||
| 432 | if hasattr(Mock, 'assert_called_once'): |
||
| 433 | logging.Logger.warning.assert_called_once() |
||
| 434 | |||
| 435 | def test_get_solution_xml(self, mock_nvti, mock_db): |
||
| 436 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 437 | out = '<solution type="WillNotFix" method="DebianAPTUpgrade">some solution</solution>' |
||
| 438 | vt = w.VT['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 439 | solution = vt.get('solution') |
||
| 440 | solution_type = vt.get('solution_type') |
||
| 441 | solution_method = vt.get('solution_method') |
||
| 442 | |||
| 443 | res = w.get_solution_vt_as_xml_str( |
||
| 444 | '1.3.6.1.4.1.25623.1.0.100061', |
||
| 445 | solution, |
||
| 446 | solution_type, |
||
| 447 | solution_method, |
||
| 448 | ) |
||
| 449 | |||
| 450 | self.assertEqual(res, out) |
||
| 451 | |||
| 452 | def test_get_solution_xml_failed(self, mock_nvti, mock_db): |
||
| 453 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 454 | vt = w.VT['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 455 | solution = u'\u0006' |
||
| 456 | logging.Logger.warning = Mock() |
||
| 457 | w.get_solution_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', solution) |
||
| 458 | if hasattr(Mock, 'assert_called_once'): |
||
| 459 | logging.Logger.warning.assert_called_once() |
||
| 460 | |||
| 461 | def test_get_detection_xml(self, mock_nvti, mock_db): |
||
| 462 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 463 | out = '<detection qod_type="remote_banner"/>' |
||
| 464 | vt = w.VT['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 465 | detection_type = vt.get('qod_type') |
||
| 466 | |||
| 467 | res = w.get_detection_vt_as_xml_str( |
||
| 468 | '1.3.6.1.4.1.25623.1.0.100061', qod_type=detection_type |
||
| 469 | ) |
||
| 470 | |||
| 471 | self.assertEqual(res, out) |
||
| 472 | |||
| 473 | def test_get_detection_xml_failed(self, mock_nvti, mock_db): |
||
| 474 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 475 | detection = u'\u0006' |
||
| 476 | logging.Logger.warning = Mock() |
||
| 477 | w.get_detection_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', detection) |
||
| 478 | if hasattr(Mock, 'assert_called_once'): |
||
| 479 | logging.Logger.warning.assert_called_once() |
||
| 480 | |||
| 481 | def test_get_affected_xml(self, mock_nvti, mock_db): |
||
| 482 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 483 | out = '<affected>some affection</affected>' |
||
| 484 | vt = w.VT['1.3.6.1.4.1.25623.1.0.100061'] |
||
| 485 | affected = vt.get('affected') |
||
| 486 | |||
| 487 | res = w.get_affected_vt_as_xml_str( |
||
| 488 | '1.3.6.1.4.1.25623.1.0.100061', affected=affected |
||
| 489 | ) |
||
| 490 | |||
| 491 | self.assertEqual(res, out) |
||
| 492 | |||
| 493 | def test_get_affected_xml_failed(self, mock_nvti, mock_db): |
||
| 494 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 495 | affected = u"\u0006" + "affected" |
||
| 496 | logging.Logger.warning = Mock() |
||
| 497 | w.get_affected_vt_as_xml_str( |
||
| 498 | '1.3.6.1.4.1.25623.1.0.100061', affected=affected |
||
| 499 | ) |
||
| 500 | if hasattr(Mock, 'assert_called_once'): |
||
| 501 | logging.Logger.warning.assert_called_once() |
||
| 502 | |||
| 503 | def test_build_credentials(self, mock_nvti, mock_db): |
||
| 504 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 505 | |||
| 506 | cred_out = [ |
||
| 507 | '1.3.6.1.4.1.25623.1.0.105058:1:entry:ESXi login name:|||username', |
||
| 508 | '1.3.6.1.4.1.25623.1.0.105058:2:password:ESXi login password:|||pass', |
||
| 509 | 'auth_port_ssh|||22', |
||
| 510 | '1.3.6.1.4.1.25623.1.0.103591:1:entry:SSH login name:|||username', |
||
| 511 | '1.3.6.1.4.1.25623.1.0.103591:2:password:SSH key passphrase:|||pass', |
||
| 512 | '1.3.6.1.4.1.25623.1.0.103591:4:file:SSH private key:|||', |
||
| 513 | '1.3.6.1.4.1.25623.1.0.90023:1:entry:SMB login:|||username', |
||
| 514 | '1.3.6.1.4.1.25623.1.0.90023:2:password]:SMB password :|||pass', |
||
| 515 | '1.3.6.1.4.1.25623.1.0.105076:1:password:SNMP Community:some comunity', |
||
| 516 | '1.3.6.1.4.1.25623.1.0.105076:2:entry:SNMPv3 Username:username', |
||
| 517 | '1.3.6.1.4.1.25623.1.0.105076:3:password:SNMPv3 Password:pass', |
||
| 518 | '1.3.6.1.4.1.25623.1.0.105076:4:radio:SNMPv3 Authentication Algorithm:some auth algo', |
||
| 519 | '1.3.6.1.4.1.25623.1.0.105076:5:password:SNMPv3 Privacy Password:privacy pass', |
||
| 520 | '1.3.6.1.4.1.25623.1.0.105076:6:radio:SNMPv3 Privacy Algorithm:privacy algo', |
||
| 521 | ] |
||
| 522 | cred_dict = { |
||
| 523 | 'ssh': { |
||
| 524 | 'type': 'ssh', |
||
| 525 | 'port': '22', |
||
| 526 | 'username': 'username', |
||
| 527 | 'password': 'pass', |
||
| 528 | }, |
||
| 529 | 'smb': {'type': 'smb', 'username': 'username', 'password': 'pass'}, |
||
| 530 | 'esxi': { |
||
| 531 | 'type': 'esxi', |
||
| 532 | 'username': 'username', |
||
| 533 | 'password': 'pass', |
||
| 534 | }, |
||
| 535 | 'snmp': { |
||
| 536 | 'type': 'snmp', |
||
| 537 | 'username': 'username', |
||
| 538 | 'password': 'pass', |
||
| 539 | 'community': 'some comunity', |
||
| 540 | 'auth_algorithm': 'some auth algo', |
||
| 541 | 'privacy_password': 'privacy pass', |
||
| 542 | 'privacy_algorithm': 'privacy algo', |
||
| 543 | }, |
||
| 544 | } |
||
| 545 | self.maxDiff = None |
||
| 546 | ret = w.build_credentials_as_prefs(cred_dict) |
||
| 547 | self.assertEqual(len(ret), len(cred_out)) |
||
| 548 | self.assertIn('auth_port_ssh|||22', cred_out) |
||
| 549 | self.assertIn( |
||
| 550 | '1.3.6.1.4.1.25623.1.0.90023:1:entry:SMB login:|||username', |
||
| 551 | cred_out, |
||
| 552 | ) |
||
| 553 | |||
| 554 | def test_build_credentials_ssh_up(self, mock_nvti, mock_db): |
||
| 555 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 556 | cred_out = [ |
||
| 557 | 'auth_port_ssh|||22', |
||
| 558 | '1.3.6.1.4.1.25623.1.0.103591:1:entry:SSH login name:|||username', |
||
| 559 | '1.3.6.1.4.1.25623.1.0.103591:3:password:SSH password (unsafe!):|||pass', |
||
| 560 | ] |
||
| 561 | cred_dict = { |
||
| 562 | 'ssh': { |
||
| 563 | 'type': 'up', |
||
| 564 | 'port': '22', |
||
| 565 | 'username': 'username', |
||
| 566 | 'password': 'pass', |
||
| 567 | } |
||
| 568 | } |
||
| 569 | self.maxDiff = None |
||
| 570 | ret = w.build_credentials_as_prefs(cred_dict) |
||
| 571 | self.assertEqual(ret, cred_out) |
||
| 572 | |||
| 573 | def test_build_alive_test_opt_empty(self, mock_nvti, mock_db): |
||
| 574 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 575 | target_options_dict = {'alive_test': '0'} |
||
| 576 | |||
| 577 | self.maxDiff = None |
||
| 578 | ret = w.build_alive_test_opt_as_prefs(target_options_dict) |
||
| 579 | self.assertEqual(ret, []) |
||
| 580 | |||
| 581 | def test_build_alive_test_opt(self, mock_nvti, mock_db): |
||
| 582 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 583 | alive_test_out = [ |
||
| 584 | "1.3.6.1.4.1.25623.1.0.100315:1:checkbox:Do a TCP ping|||no", |
||
| 585 | "1.3.6.1.4.1.25623.1.0.100315:2:checkbox:TCP ping tries also TCP-SYN ping|||no", |
||
| 586 | "1.3.6.1.4.1.25623.1.0.100315:7:checkbox:TCP ping tries only TCP-SYN ping|||no", |
||
| 587 | "1.3.6.1.4.1.25623.1.0.100315:3:checkbox:Do an ICMP ping|||yes", |
||
| 588 | "1.3.6.1.4.1.25623.1.0.100315:4:checkbox:Use ARP|||no", |
||
| 589 | "1.3.6.1.4.1.25623.1.0.100315:5:checkbox:Mark unrechable Hosts as dead (not scanning)|||yes", |
||
| 590 | ] |
||
| 591 | target_options_dict = {'alive_test': '2'} |
||
| 592 | |||
| 593 | self.maxDiff = None |
||
| 594 | ret = w.build_alive_test_opt_as_prefs(target_options_dict) |
||
| 595 | self.assertEqual(ret, alive_test_out) |
||
| 596 | |||
| 597 | def test_build_alive_test_opt_fail_1(self, mock_nvti, mock_db): |
||
| 598 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 599 | target_options_dict = {'alive_test': 'a'} |
||
| 600 | |||
| 601 | self.maxDiff = None |
||
| 602 | logging.Logger.debug = Mock() |
||
| 603 | ret = w.build_alive_test_opt_as_prefs(target_options_dict) |
||
| 604 | if hasattr(Mock, 'assert_called_once'): |
||
| 605 | logging.Logger.debug.assert_called_once() |
||
| 606 | |||
| 607 | def test_process_vts(self, mock_nvti, mock_db): |
||
| 608 | vts = { |
||
| 609 | '1.3.6.1.4.1.25623.1.0.100061': {'1': 'new value'}, |
||
| 610 | 'vt_groups': ['family=debian', 'family=general'], |
||
| 611 | } |
||
| 612 | vt_out = ( |
||
| 613 | ['1.3.6.1.4.1.25623.1.0.100061'], |
||
| 614 | {'1.3.6.1.4.1.25623.1.0.100061:1:entry:Data length :': 'new value'}, |
||
| 615 | ) |
||
| 616 | |||
| 617 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 618 | w.load_vts() |
||
| 619 | w.temp_vts_dict = w.vts |
||
| 620 | ret = w.process_vts(vts) |
||
| 621 | self.assertEqual(ret, vt_out) |
||
| 622 | |||
| 623 | def test_process_vts_bad_param_id(self, mock_nvti, mock_db): |
||
| 624 | vts = { |
||
| 625 | '1.3.6.1.4.1.25623.1.0.100061': {'3': 'new value'}, |
||
| 626 | 'vt_groups': ['family=debian', 'family=general'], |
||
| 627 | } |
||
| 628 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 629 | w.load_vts() |
||
| 630 | w.temp_vts_dict = w.vts |
||
| 631 | ret = w.process_vts(vts) |
||
| 632 | self.assertFalse(ret[1]) |
||
| 633 | |||
| 634 | def test_process_vts_not_found(self, mock_nvti, mock_db): |
||
| 635 | vts = { |
||
| 636 | '1.3.6.1.4.1.25623.1.0.100065': {'3': 'new value'}, |
||
| 637 | 'vt_groups': ['family=debian', 'family=general'], |
||
| 638 | } |
||
| 639 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 640 | w.load_vts() |
||
| 641 | w.temp_vts_dict = w.vts |
||
| 642 | logging.Logger.warning = Mock() |
||
| 643 | ret = w.process_vts(vts) |
||
| 644 | if hasattr(Mock, 'assert_called_once'): |
||
| 645 | logging.Logger.warning.assert_called_once() |
||
| 646 | |||
| 647 | View Code Duplication | def test_get_openvas_timestamp_scan_host_end(self, mock_nvti, mock_db): |
|
|
|
|||
| 648 | mock_db.get_host_scan_scan_end_time.return_value = '12345' |
||
| 649 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 650 | |||
| 651 | target_list = w.create_xml_target() |
||
| 652 | targets = w.process_targets_element(target_list) |
||
| 653 | |||
| 654 | w.create_scan('123-456', targets, None, []) |
||
| 655 | w.get_openvas_timestamp_scan_host('123-456', '192.168.0.1') |
||
| 656 | for result in w.scan_collection.results_iterator('123-456', False): |
||
| 657 | self.assertEqual(result.get('value'), '12345') |
||
| 658 | |||
| 659 | View Code Duplication | def test_get_openvas_timestamp_scan_host_start(self, mock_nvti, mock_db): |
|
| 660 | mock_db.get_host_scan_scan_end_time.return_value = None |
||
| 661 | mock_db.get_host_scan_scan_end_time.return_value = '54321' |
||
| 662 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 663 | |||
| 664 | target_list = w.create_xml_target() |
||
| 665 | targets = w.process_targets_element(target_list) |
||
| 666 | |||
| 667 | w.create_scan('123-456', targets, None, []) |
||
| 668 | w.get_openvas_timestamp_scan_host('123-456', '192.168.0.1') |
||
| 669 | for result in w.scan_collection.results_iterator('123-456', False): |
||
| 670 | self.assertEqual(result.get('value'), '54321') |
||
| 671 | |||
| 672 | def test_host_is_finished(self, mock_nvti, mock_db): |
||
| 673 | mock_db.get_single_item.return_value = 'finished' |
||
| 674 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 675 | ret = w.host_is_finished('123-456') |
||
| 676 | self.assertEqual(ret, True) |
||
| 677 | |||
| 678 | def test_scan_is_stopped(self, mock_nvti, mock_db): |
||
| 679 | mock_db.get_single_item.return_value = 'stop_all' |
||
| 680 | mock_db.kb_connect_item.return_value = mock_db |
||
| 681 | mock_db.set_redisctx.return_value = None |
||
| 682 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 683 | ret = w.scan_is_stopped('123-456') |
||
| 684 | self.assertEqual(ret, True) |
||
| 685 | |||
| 686 | def test_feed_is_healthy_true( |
||
| 687 | self, mock_nvti: MagicMock, mock_db: MagicMock, |
||
| 688 | ): |
||
| 689 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 690 | |||
| 691 | mock_nvti.get_redix_context.return_value = None |
||
| 692 | mock_db.get_key_count.side_effect = [2, 2] |
||
| 693 | w.vts = ["a", "b"] |
||
| 694 | |||
| 695 | ret = w.feed_is_healthy() |
||
| 696 | self.assertTrue(ret) |
||
| 697 | |||
| 698 | def test_feed_is_healthy_false( |
||
| 699 | self, mock_nvti: MagicMock, mock_db: MagicMock, |
||
| 700 | ): |
||
| 701 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 702 | |||
| 703 | mock_nvti.get_redix_context.return_value = None |
||
| 704 | mock_db.get_key_count.side_effect = [1, 2] |
||
| 705 | w.vts = ["a", "b"] |
||
| 706 | |||
| 707 | ret = w.feed_is_healthy() |
||
| 708 | self.assertFalse(ret) |
||
| 709 | |||
| 710 | @patch('ospd_openvas.daemon.Path.exists') |
||
| 711 | @patch('ospd_openvas.daemon.OSPDopenvas.set_params_from_openvas_settings') |
||
| 712 | def test_feed_is_outdated_none( |
||
| 713 | self, |
||
| 714 | mock_set_params: MagicMock, |
||
| 715 | mock_path_exists: MagicMock, |
||
| 716 | mock_nvti: MagicMock, |
||
| 717 | mock_db: MagicMock, |
||
| 718 | ): |
||
| 719 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 720 | w.scan_only_params['plugins_folder'] = '/foo/bar' |
||
| 721 | |||
| 722 | mock_path_exists.return_value = False |
||
| 723 | |||
| 724 | # Return None |
||
| 725 | ret = w.feed_is_outdated('1234') |
||
| 726 | self.assertIsNone(ret) |
||
| 727 | |||
| 728 | self.assertEqual(mock_set_params.call_count, 1) |
||
| 729 | self.assertEqual(mock_path_exists.call_count, 1) |
||
| 730 | |||
| 731 | View Code Duplication | @patch('ospd_openvas.daemon.Path.exists') |
|
| 732 | @patch('ospd_openvas.daemon.Path.open') |
||
| 733 | def test_feed_is_outdated_true( |
||
| 734 | self, |
||
| 735 | mock_path_open: MagicMock, |
||
| 736 | mock_path_exists: MagicMock, |
||
| 737 | mock_nvti: MagicMock, |
||
| 738 | mock_db: MagicMock, |
||
| 739 | ): |
||
| 740 | read_data = 'PLUGIN_SET = "1235";' |
||
| 741 | |||
| 742 | mock_path_exists.return_value = True |
||
| 743 | mock_read = MagicMock(name='Path open context manager') |
||
| 744 | mock_read.__enter__ = MagicMock(return_value=io.StringIO(read_data)) |
||
| 745 | mock_path_open.return_value = mock_read |
||
| 746 | |||
| 747 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 748 | |||
| 749 | # Return True |
||
| 750 | w.scan_only_params['plugins_folder'] = '/foo/bar' |
||
| 751 | |||
| 752 | ret = w.feed_is_outdated('1234') |
||
| 753 | self.assertTrue(ret) |
||
| 754 | |||
| 755 | self.assertEqual(mock_path_exists.call_count, 1) |
||
| 756 | self.assertEqual(mock_path_open.call_count, 1) |
||
| 757 | |||
| 758 | View Code Duplication | @patch('ospd_openvas.daemon.Path.exists') |
|
| 759 | @patch('ospd_openvas.daemon.Path.open') |
||
| 760 | def test_feed_is_outdated_false( |
||
| 761 | self, |
||
| 762 | mock_path_open: MagicMock, |
||
| 763 | mock_path_exists: MagicMock, |
||
| 764 | mock_nvti: MagicMock, |
||
| 765 | mock_db: MagicMock, |
||
| 766 | ): |
||
| 767 | mock_path_exists.return_value = True |
||
| 768 | |||
| 769 | read_data = 'PLUGIN_SET = "1234"' |
||
| 770 | mock_path_exists.return_value = True |
||
| 771 | mock_read = MagicMock(name='Path open context manager') |
||
| 772 | mock_read.__enter__ = MagicMock(return_value=io.StringIO(read_data)) |
||
| 773 | mock_path_open.return_value = mock_read |
||
| 774 | |||
| 775 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 776 | w.scan_only_params['plugins_folder'] = '/foo/bar' |
||
| 777 | |||
| 778 | ret = w.feed_is_outdated('1234') |
||
| 779 | self.assertFalse(ret) |
||
| 780 | |||
| 781 | self.assertEqual(mock_path_exists.call_count, 1) |
||
| 782 | self.assertEqual(mock_path_open.call_count, 1) |
||
| 783 | |||
| 784 | @patch('ospd_openvas.daemon.OSPDaemon.add_scan_log') |
||
| 785 | def test_get_openvas_result(self, mock_ospd, mock_nvti, mock_db): |
||
| 786 | results = ["LOG||| |||general/Host_Details||| |||Host dead", None] |
||
| 787 | mock_db.get_result.side_effect = results |
||
| 788 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 789 | w.load_vts() |
||
| 790 | mock_ospd.return_value = None |
||
| 791 | w.get_openvas_result('123-456', 'localhost') |
||
| 792 | mock_ospd.assert_called_with( |
||
| 793 | '123-456', |
||
| 794 | host='localhost', |
||
| 795 | hostname='', |
||
| 796 | name='', |
||
| 797 | port='general/Host_Details', |
||
| 798 | qod='', |
||
| 799 | test_id='', |
||
| 800 | value='Host dead', |
||
| 801 | ) |
||
| 802 | |||
| 803 | @patch('ospd_openvas.daemon.OSPDaemon.add_scan_log') |
||
| 804 | def test_get_openvas_result_escaped(self, mock_ospd, mock_nvti, mock_db): |
||
| 805 | results = [ |
||
| 806 | "LOG||| |||general/Host_Details|||1.3.6.1.4.1.25623.1.0.100061|||Alive", |
||
| 807 | None, |
||
| 808 | ] |
||
| 809 | mock_db.get_result.side_effect = results |
||
| 810 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 811 | w.load_vts() |
||
| 812 | mock_ospd.return_value = None |
||
| 813 | mock_nvti.QOD_TYPES.__getitem__.return_value = '' |
||
| 814 | w.get_openvas_result('123-456', 'localhost') |
||
| 815 | |||
| 816 | mock_ospd.assert_called_with( |
||
| 817 | '123-456', |
||
| 818 | host='localhost', |
||
| 819 | hostname='', |
||
| 820 | name='Mantis Detection & foo', |
||
| 821 | port='general/Host_Details', |
||
| 822 | qod='', |
||
| 823 | test_id='1.3.6.1.4.1.25623.1.0.100061', |
||
| 824 | value='Alive', |
||
| 825 | ) |
||
| 826 | |||
| 827 | @patch('ospd_openvas.daemon.OSPDaemon.set_scan_host_progress') |
||
| 828 | def test_update_progress(self, mock_ospd, mock_nvti, mock_db): |
||
| 829 | msg = '0/-1' |
||
| 830 | w = DummyDaemon(mock_nvti, mock_db) |
||
| 831 | target_list = w.create_xml_target() |
||
| 832 | targets = w.process_targets_element(target_list) |
||
| 833 | |||
| 834 | w.create_scan('123-456', targets, None, []) |
||
| 835 | |||
| 836 | mock_ospd.return_value = None |
||
| 837 | w.update_progress('123-456', 'localhost', 'localhost', msg) |
||
| 838 | mock_ospd.assert_called_with('123-456', 'localhost', 'localhost', 100) |
||
| 839 | |||
| 840 | |||
| 841 | class TestFilters(TestCase): |
||
| 842 | def test_format_vt_modification_time(self): |
||
| 843 | ovformat = OpenVasVtsFilter() |
||
| 844 | td = '1517443741' |
||
| 845 | formatted = ovformat.format_vt_modification_time(td) |
||
| 846 | self.assertEqual(formatted, "20180201000901") |
||
| 847 |