Passed
Pull Request — master (#366)
by Juan José
01:36
created

ospd_openvas.preferencehandler   F

Complexity

Total Complexity 108

Size/Duplication

Total Lines 733
Duplicated Lines 3.68 %

Importance

Changes 0
Metric Value
eloc 466
dl 27
loc 733
rs 2
c 0
b 0
f 0
wmc 108

20 Methods

Rating   Name   Duplication   Size   Complexity  
A PreferenceHandler.prepare_scan_id_for_openvas() 0 5 1
A PreferenceHandler.__init__() 0 14 1
A PreferenceHandler.target_options() 0 10 2
A PreferenceHandler.prepare_host_options_for_openvas() 0 8 2
A PreferenceHandler.prepare_target_for_openvas() 0 7 1
A PreferenceHandler.prepare_reverse_lookup_opt_for_openvas() 0 17 2
A PreferenceHandler.prepare_plugins_for_openvas() 0 26 3
C PreferenceHandler._process_vts() 0 64 10
A PreferenceHandler.prepare_ports_for_openvas() 0 8 1
C PreferenceHandler.build_credentials_as_prefs() 0 116 7
A PreferenceHandler._get_vt_param_name() 0 7 2
F PreferenceHandler.build_alive_test_opt_as_prefs() 13 124 19
C PreferenceHandler.prepare_boreas_alive_test() 14 68 11
A PreferenceHandler._get_vt_param_type() 0 7 2
C PreferenceHandler._get_vts_in_groups() 0 44 9
A PreferenceHandler.prepare_main_kbindex_for_openvas() 0 5 1
C PreferenceHandler.check_param_type() 0 34 10
A PreferenceHandler.prepare_credentials_for_openvas() 0 15 5
B PreferenceHandler.prepare_scan_params_for_openvas() 0 30 6
A PreferenceHandler.prepare_alive_test_option_for_openvas() 0 12 5

2 Functions

Rating   Name   Duplication   Size   Complexity  
A _from_bool_to_str() 0 4 2
B alive_test_methods_to_bit_field() 0 19 6

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like ospd_openvas.preferencehandler often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
# pylint: disable=too-many-lines
21
22
""" Prepare the preferences to be used by OpenVAS. Get the data from the scan
23
collection and store the data in a redis KB in the right format to be used by
24
OpenVAS. """
25
26
import logging
27
import binascii
28
29
from enum import IntEnum
30
from typing import Optional, Dict, List, Tuple
31
from base64 import b64decode
32
33
from ospd.scan import ScanCollection
34
from ospd.ospd import BASE_SCANNER_PARAMS
35
from ospd_openvas.openvas import Openvas
36
from ospd_openvas.db import KbDB
37
from ospd_openvas.nvticache import NVTICache
38
from ospd_openvas.vthelper import VtHelper
39
from ospd_openvas.notus.translationtable import LSC_FAMILIES_AND_DRIVERS
40
41
logger = logging.getLogger(__name__)
42
43
OID_SSH_AUTH = "1.3.6.1.4.1.25623.1.0.103591"
44
OID_SMB_AUTH = "1.3.6.1.4.1.25623.1.0.90023"
45
OID_ESXI_AUTH = "1.3.6.1.4.1.25623.1.0.105058"
46
OID_SNMP_AUTH = "1.3.6.1.4.1.25623.1.0.105076"
47
OID_PING_HOST = "1.3.6.1.4.1.25623.1.0.100315"
48
49
BOREAS_ALIVE_TEST = "ALIVE_TEST"
50
BOREAS_ALIVE_TEST_PORTS = "ALIVE_TEST_PORTS"
51
BOREAS_SETTING_NAME = "test_alive_hosts_only"
52
53
54
class AliveTest(IntEnum):
55
    """ Alive Tests. """
56
57
    ALIVE_TEST_SCAN_CONFIG_DEFAULT = 0
58
    ALIVE_TEST_TCP_ACK_SERVICE = 1
59
    ALIVE_TEST_ICMP = 2
60
    ALIVE_TEST_ARP = 4
61
    ALIVE_TEST_CONSIDER_ALIVE = 8
62
    ALIVE_TEST_TCP_SYN_SERVICE = 16
63
64
65
def alive_test_methods_to_bit_field(
66
    icmp: bool, tcp_syn: bool, tcp_ack: bool, arp: bool, consider_alive: bool
67
) -> int:
68
    """Internally a bit field is used as alive test. This function creates
69
    such a bit field out of the supplied alive test methods.
70
    """
71
72
    icmp_enum = AliveTest.ALIVE_TEST_ICMP if icmp else 0
73
    tcp_syn_enum = AliveTest.ALIVE_TEST_TCP_SYN_SERVICE if tcp_syn else 0
74
    tcp_ack_enum = AliveTest.ALIVE_TEST_TCP_ACK_SERVICE if tcp_ack else 0
75
    arp_enum = AliveTest.ALIVE_TEST_ARP if arp else 0
76
    consider_alive_enum = (
77
        AliveTest.ALIVE_TEST_CONSIDER_ALIVE if consider_alive else 0
78
    )
79
80
    bit_field = (
81
        icmp_enum | tcp_syn_enum | tcp_ack_enum | arp_enum | consider_alive_enum
82
    )
83
    return bit_field
84
85
86
def _from_bool_to_str(value: int) -> str:
87
    """The OpenVAS scanner use yes and no as boolean values, whereas ospd
88
    uses 1 and 0."""
89
    return 'yes' if value == 1 else 'no'
90
91
92
class PreferenceHandler:
93
    def __init__(
94
        self,
95
        scan_id: str,
96
        kbdb: KbDB,
97
        scan_collection: ScanCollection,
98
        nvticache: NVTICache,
99
    ):
100
        self.scan_id = scan_id
101
        self.kbdb = kbdb
102
        self.scan_collection = scan_collection
103
104
        self._target_options = None
105
106
        self.nvti = nvticache
107
108
    def prepare_scan_id_for_openvas(self):
109
        """Create the openvas scan id and store it in the redis kb.
110
        Return the openvas scan_id.
111
        """
112
        self.kbdb.add_scan_id(self.scan_id)
113
114
    @property
115
    def target_options(self) -> Dict:
116
        """ Return target options from Scan collection """
117
        if self._target_options is not None:
118
            return self._target_options
119
120
        self._target_options = self.scan_collection.get_target_options(
121
            self.scan_id
122
        )
123
        return self._target_options
124
125
    def _get_vts_in_groups(
126
        self,
127
        filters: List[str],
128
    ) -> List[str]:
129
        """Return a list of vts which match with the given filter.
130
131
        Arguments:
132
            filters A list of filters. Each filter has key, operator and
133
                    a value. They are separated by a space.
134
                    Supported keys: family
135
136
        Returns a list of vt oids which match with the given filter.
137
        """
138
        settings = Openvas.get_settings()
139
        notus_enabled = settings.get("table_driven_lsc")
140
141
        vts_list = list()
142
        families = dict()
143
144
        oids = self.nvti.get_oids()
145
146
        for _, oid in oids:
147
            family = self.nvti.get_nvt_family(oid)
148
            if family not in families:
149
                families[family] = list()
150
151
            families[family].append(oid)
152
153
        for elem in filters:
154
            key, value = elem.split('=')
155
            # If the family is supported by Notus LSC and Notus
156
            # is enabled
157
            if (
158
                key == 'family'
159
                and notus_enabled
160
                and value in LSC_FAMILIES_AND_DRIVERS
161
            ):
162
                driver_oid = LSC_FAMILIES_AND_DRIVERS.get(value)
163
                vts_list.append(driver_oid)
164
            # If Notus disable or family not supported by notus.
165
            elif key == 'family' and value in families:
166
                vts_list.extend(families[value])
167
168
        return vts_list
169
170
    def _get_vt_param_type(self, vt: Dict, vt_param_id: str) -> Optional[str]:
171
        """ Return the type of the vt parameter from the vts dictionary. """
172
173
        vt_params_list = vt.get("vt_params")
174
        if vt_params_list.get(vt_param_id):
175
            return vt_params_list[vt_param_id]["type"]
176
        return None
177
178
    def _get_vt_param_name(self, vt: Dict, vt_param_id: str) -> Optional[str]:
179
        """ Return the type of the vt parameter from the vts dictionary. """
180
181
        vt_params_list = vt.get("vt_params")
182
        if vt_params_list.get(vt_param_id):
183
            return vt_params_list[vt_param_id]["name"]
184
        return None
185
186
    @staticmethod
187
    def check_param_type(vt_param_value: str, param_type: str) -> Optional[int]:
188
        """Check if the value of a vt parameter matches with
189
        the type founded.
190
        """
191
        if (
192
            param_type
193
            in [
194
                'entry',
195
                'password',
196
                'radio',
197
                'sshlogin',
198
            ]
199
            and isinstance(vt_param_value, str)
200
        ):
201
            return None
202
        elif param_type == 'checkbox' and (
203
            vt_param_value == '0' or vt_param_value == '1'
204
        ):
205
            return None
206
        elif param_type == 'file':
207
            try:
208
                b64decode(vt_param_value.encode())
209
            except (binascii.Error, AttributeError, TypeError):
210
                return 1
211
            return None
212
        elif param_type == 'integer':
213
            try:
214
                int(vt_param_value)
215
            except ValueError:
216
                return 1
217
            return None
218
219
        return 1
220
221
    def _process_vts(
222
        self,
223
        vts: Dict[str, Dict[str, str]],
224
    ) -> Tuple[List[str], Dict[str, str]]:
225
        """ Add single VTs and their parameters. """
226
        vts_list = []
227
        vts_params = {}
228
        vtgroups = vts.pop('vt_groups')
229
230
        vthelper = VtHelper(self.nvti)
231
232
        if vtgroups:
233
            vts_list = self._get_vts_in_groups(vtgroups)
234
235
        for vtid, vt_params in vts.items():
236
            vt = vthelper.get_single_vt(vtid)
237
            if not vt:
238
                logger.warning(
239
                    'The VT %s was not found and it will not be added to the '
240
                    'plugin scheduler.',
241
                    vtid,
242
                )
243
                continue
244
245
            vts_list.append(vtid)
246
            for vt_param_id, vt_param_value in vt_params.items():
247
                param_type = self._get_vt_param_type(vt, vt_param_id)
248
                param_name = self._get_vt_param_name(vt, vt_param_id)
249
250
                if not param_type or not param_name:
251
                    logger.debug(
252
                        'Missing type or name for VT parameter %s of %s. '
253
                        'This VT parameter will not be set.',
254
                        vt_param_id,
255
                        vtid,
256
                    )
257
                    continue
258
259
                if vt_param_id == '0':
260
                    type_aux = 'integer'
261
                else:
262
                    type_aux = param_type
263
264
                if self.check_param_type(vt_param_value, type_aux):
265
                    logger.debug(
266
                        'The VT parameter %s for %s could not be set. '
267
                        'Expected %s type for parameter value %s',
268
                        vt_param_id,
269
                        vtid,
270
                        type_aux,
271
                        str(vt_param_value),
272
                    )
273
                    continue
274
275
                if type_aux == 'checkbox':
276
                    vt_param_value = _from_bool_to_str(int(vt_param_value))
277
278
                vts_params[
279
                    "{0}:{1}:{2}:{3}".format(
280
                        vtid, vt_param_id, param_type, param_name
281
                    )
282
                ] = str(vt_param_value)
283
284
        return vts_list, vts_params
285
286
    def prepare_plugins_for_openvas(self) -> bool:
287
        """Get the plugin list to be launched from the Scan Collection
288
        and prepare the vts preferences. Store the data in the kb.
289
        """
290
        nvts = self.scan_collection.get_vts(self.scan_id)
291
        if nvts:
292
            nvts_list, nvts_params = self._process_vts(nvts)
293
            # Add nvts list
294
            separ = ';'
295
            plugin_list = 'plugin_set|||%s' % separ.join(nvts_list)
296
            self.kbdb.add_scan_preferences(self.scan_id, [plugin_list])
297
298
            # Add nvts parameters
299
            for key, val in nvts_params.items():
300
                item = '%s|||%s' % (key, val)
301
                self.kbdb.add_scan_preferences(self.scan_id, [item])
302
303
            nvts_params = None
304
            nvts_list = None
305
            item = None
306
            plugin_list = None
307
            nvts = None
308
309
            return True
310
311
        return False
312
313
    @staticmethod
314
    def build_alive_test_opt_as_prefs(
315
        target_options: Dict[str, str]
316
    ) -> List[str]:
317
        """Parse the target options dictionary.
318
        Arguments:
319
            target_options: Dictionary with the target options.
320
321
        Return:
322
            A list with the target options related to alive test method
323
            in string format to be added to the redis KB.
324
        """
325
        target_opt_prefs_list = []
326
        alive_test = None
327
328 View Code Duplication
        if target_options:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
329
            # Alive test speciefied as bit field.
330
            alive_test = target_options.get('alive_test')
331
            # Alive test speciefied as individual methods.
332
            alive_test_methods = target_options.get('alive_test_methods')
333
            # alive_test takes precedence over alive_test_methods
334
            if alive_test is None and alive_test_methods:
335
                alive_test = alive_test_methods_to_bit_field(
336
                    icmp=target_options.get('icmp') == '1',
337
                    tcp_syn=target_options.get('tcp_syn') == '1',
338
                    tcp_ack=target_options.get('tcp_ack') == '1',
339
                    arp=target_options.get('arp') == '1',
340
                    consider_alive=target_options.get('consider_alive') == '1',
341
                )
342
343
        if target_options and alive_test:
344
            try:
345
                alive_test = int(alive_test)
346
            except ValueError:
347
                logger.debug(
348
                    'Alive test settings not applied. '
349
                    'Invalid alive test value %s',
350
                    target_options.get('alive_test'),
351
                )
352
                return target_opt_prefs_list
353
354
            if alive_test < 1 or alive_test > 31:
355
                return target_opt_prefs_list
356
357
            if (
358
                alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
359
                or alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
360
            ):
361
                value = "yes"
362
            else:
363
                value = "no"
364
            target_opt_prefs_list.append(
365
                OID_PING_HOST
366
                + ':1:checkbox:'
367
                + 'Do a TCP ping|||'
368
                + '{0}'.format(value)
369
            )
370
371
            if (
372
                alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
373
                and alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
374
            ):
375
                value = "yes"
376
            else:
377
                value = "no"
378
            target_opt_prefs_list.append(
379
                OID_PING_HOST
380
                + ':2:checkbox:'
381
                + 'TCP ping tries also TCP-SYN ping|||'
382
                + '{0}'.format(value)
383
            )
384
385
            if (alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE) and not (
386
                alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
387
            ):
388
                value = "yes"
389
            else:
390
                value = "no"
391
            target_opt_prefs_list.append(
392
                OID_PING_HOST
393
                + ':7:checkbox:'
394
                + 'TCP ping tries only TCP-SYN ping|||'
395
                + '{0}'.format(value)
396
            )
397
398
            if alive_test & AliveTest.ALIVE_TEST_ICMP:
399
                value = "yes"
400
            else:
401
                value = "no"
402
            target_opt_prefs_list.append(
403
                OID_PING_HOST
404
                + ':3:checkbox:'
405
                + 'Do an ICMP ping|||'
406
                + '{0}'.format(value)
407
            )
408
409
            if alive_test & AliveTest.ALIVE_TEST_ARP:
410
                value = "yes"
411
            else:
412
                value = "no"
413
            target_opt_prefs_list.append(
414
                OID_PING_HOST
415
                + ':4:checkbox:'
416
                + 'Use ARP|||'
417
                + '{0}'.format(value)
418
            )
419
420
            if alive_test & AliveTest.ALIVE_TEST_CONSIDER_ALIVE:
421
                value = "no"
422
            else:
423
                value = "yes"
424
            target_opt_prefs_list.append(
425
                OID_PING_HOST
426
                + ':5:checkbox:'
427
                + 'Mark unrechable Hosts as dead (not scanning)|||'
428
                + '{0}'.format(value)
429
            )
430
431
            # Also select a method, otherwise Ping Host logs a warning.
432
            if alive_test == AliveTest.ALIVE_TEST_CONSIDER_ALIVE:
433
                target_opt_prefs_list.append(
434
                    OID_PING_HOST + ':1:checkbox:' + 'Do a TCP ping|||yes'
435
                )
436
        return target_opt_prefs_list
437
438
    def prepare_alive_test_option_for_openvas(self):
439
        """ Set alive test option. Overwrite the scan config settings."""
440
        settings = Openvas.get_settings()
441
        if settings and (
442
            self.target_options.get('alive_test')
443
            or self.target_options.get('alive_test_methods')
444
        ):
445
            alive_test_opt = self.build_alive_test_opt_as_prefs(
446
                self.target_options
447
            )
448
            if alive_test_opt:
449
                self.kbdb.add_scan_preferences(self.scan_id, alive_test_opt)
450
451
    def prepare_boreas_alive_test(self):
452
        """Set alive_test for Boreas if boreas scanner config
453
        (BOREAS_SETTING_NAME) was set"""
454
        settings = Openvas.get_settings()
455
        alive_test = None
456
        alive_test_ports = None
457
        target_options = self.target_options
458
459
        if settings:
460
            boreas = settings.get(BOREAS_SETTING_NAME)
461
            if not boreas:
462
                return
463
        else:
464
            return
465
466 View Code Duplication
        if target_options:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
467
            alive_test_ports = target_options.get('alive_test_ports')
468
            # Alive test was speciefied as bit field.
469
            alive_test = target_options.get('alive_test')
470
            # Alive test was speciefied as individual methods.
471
            alive_test_methods = target_options.get('alive_test_methods')
472
            # <alive_test> takes precedence over <alive_test_methods>
473
            if alive_test is None and alive_test_methods:
474
                alive_test = alive_test_methods_to_bit_field(
475
                    icmp=target_options.get('icmp') == '1',
476
                    tcp_syn=target_options.get('tcp_syn') == '1',
477
                    tcp_ack=target_options.get('tcp_ack') == '1',
478
                    arp=target_options.get('arp') == '1',
479
                    consider_alive=target_options.get('consider_alive') == '1',
480
                )
481
482
        if alive_test is not None:
483
            try:
484
                alive_test = int(alive_test)
485
            except ValueError:
486
                logger.debug(
487
                    'Alive test preference for Boreas not set. '
488
                    'Invalid alive test value %s.',
489
                    alive_test,
490
                )
491
                # Use default alive test as fall back
492
                alive_test = AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT
493
        # Use default alive test if no valid alive_test was provided
494
        else:
495
            alive_test = AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT
496
497
        # If a valid alive_test was set then the bit mask
498
        # has value between 31 (11111) and 1 (10000)
499
        if 1 <= alive_test <= 31:
500
            pref = "{pref_key}|||{pref_value}".format(
501
                pref_key=BOREAS_ALIVE_TEST, pref_value=alive_test
502
            )
503
            self.kbdb.add_scan_preferences(self.scan_id, [pref])
504
505
        if alive_test == AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT:
506
            alive_test = AliveTest.ALIVE_TEST_ICMP
507
            pref = "{pref_key}|||{pref_value}".format(
508
                pref_key=BOREAS_ALIVE_TEST, pref_value=alive_test
509
            )
510
            self.kbdb.add_scan_preferences(self.scan_id, [pref])
511
512
        # Add portlist if present. Validity is checked on Boreas side.
513
        if alive_test_ports is not None:
514
            pref = "{pref_key}|||{pref_value}".format(
515
                pref_key=BOREAS_ALIVE_TEST_PORTS,
516
                pref_value=alive_test_ports,
517
            )
518
            self.kbdb.add_scan_preferences(self.scan_id, [pref])
519
520
    def prepare_reverse_lookup_opt_for_openvas(self):
521
        """ Set reverse lookup options in the kb"""
522
        if self.target_options:
523
            items = []
524
            _rev_lookup_only = int(
525
                self.target_options.get('reverse_lookup_only', '0')
526
            )
527
            rev_lookup_only = _from_bool_to_str(_rev_lookup_only)
528
            items.append('reverse_lookup_only|||%s' % (rev_lookup_only))
529
530
            _rev_lookup_unify = int(
531
                self.target_options.get('reverse_lookup_unify', '0')
532
            )
533
            rev_lookup_unify = _from_bool_to_str(_rev_lookup_unify)
534
            items.append('reverse_lookup_unify|||%s' % rev_lookup_unify)
535
536
            self.kbdb.add_scan_preferences(self.scan_id, items)
537
538
    def prepare_target_for_openvas(self):
539
        """Get the target from the scan collection and set the target
540
        in the kb"""
541
542
        target = self.scan_collection.get_host_list(self.scan_id)
543
        target_aux = 'TARGET|||%s' % target
544
        self.kbdb.add_scan_preferences(self.scan_id, [target_aux])
545
546
    def prepare_ports_for_openvas(self) -> str:
547
        """Get the port list from the scan collection and store the list
548
        in the kb."""
549
        ports = self.scan_collection.get_ports(self.scan_id)
550
        port_range = 'port_range|||%s' % ports
551
        self.kbdb.add_scan_preferences(self.scan_id, [port_range])
552
553
        return ports
554
555
    def prepare_host_options_for_openvas(self):
556
        """Get the excluded and finished hosts from the scan collection and
557
        stores the list of hosts that must not be scanned in the kb."""
558
        exclude_hosts = self.scan_collection.get_exclude_hosts(self.scan_id)
559
560
        if exclude_hosts:
561
            pref_val = "exclude_hosts|||" + exclude_hosts
562
            self.kbdb.add_scan_preferences(self.scan_id, [pref_val])
563
564
    def prepare_scan_params_for_openvas(self, ospd_params: Dict[str, Dict]):
565
        """Get the scan parameters from the scan collection and store them
566
        in the kb.
567
        Arguments:
568
            ospd_params: Dictionary with the OSPD Params.
569
        """
570
        # Options which were supplied via the <scanner_params> XML element.
571
        options = self.scan_collection.get_options(self.scan_id)
572
        prefs_val = []
573
574
        for key, value in options.items():
575
            item_type = ''
576
            if key in ospd_params:
577
                item_type = ospd_params[key].get('type')
578
            else:
579
                if key not in BASE_SCANNER_PARAMS:
580
                    logger.debug(
581
                        "%s is a scanner only setting and should not be set "
582
                        "by the client. Setting needs to be included in "
583
                        "OpenVAS configuration file instead.",
584
                        key,
585
                    )
586
            if item_type == 'boolean':
587
                val = _from_bool_to_str(value)
588
            else:
589
                val = str(value)
590
            prefs_val.append(key + "|||" + val)
591
592
        if prefs_val:
593
            self.kbdb.add_scan_preferences(self.scan_id, prefs_val)
594
595
    @staticmethod
596
    def build_credentials_as_prefs(credentials: Dict) -> List[str]:
597
        """Parse the credential dictionary.
598
        Arguments:
599
            credentials: Dictionary with the credentials.
600
601
        Return:
602
            A list with the credentials in string format to be
603
            added to the redis KB.
604
        """
605
        cred_prefs_list = []
606
        for credential in credentials.items():
607
            service = credential[0]
608
            cred_params = credentials.get(service)
609
            cred_type = cred_params.get('type', '')
610
            username = cred_params.get('username', '')
611
            password = cred_params.get('password', '')
612
613
            if service == 'ssh':
614
                port = cred_params.get('port', '')
615
                cred_prefs_list.append('auth_port_ssh|||' + '{0}'.format(port))
616
                cred_prefs_list.append(
617
                    OID_SSH_AUTH
618
                    + ':1:'
619
                    + 'entry:SSH login '
620
                    + 'name:|||{0}'.format(username)
621
                )
622
                if cred_type == 'up':
623
                    cred_prefs_list.append(
624
                        OID_SSH_AUTH
625
                        + ':3:'
626
                        + 'password:SSH password '
627
                        + '(unsafe!):|||{0}'.format(password)
628
                    )
629
                else:
630
                    private = cred_params.get('private', '')
631
                    cred_prefs_list.append(
632
                        OID_SSH_AUTH
633
                        + ':2:'
634
                        + 'password:SSH key passphrase:|||'
635
                        + '{0}'.format(password)
636
                    )
637
                    cred_prefs_list.append(
638
                        OID_SSH_AUTH
639
                        + ':4:'
640
                        + 'file:SSH private key:|||'
641
                        + '{0}'.format(private)
642
                    )
643
            if service == 'smb':
644
                cred_prefs_list.append(
645
                    OID_SMB_AUTH
646
                    + ':1:entry'
647
                    + ':SMB login:|||{0}'.format(username)
648
                )
649
                cred_prefs_list.append(
650
                    OID_SMB_AUTH
651
                    + ':2:'
652
                    + 'password:SMB password:|||'
653
                    + '{0}'.format(password)
654
                )
655
            if service == 'esxi':
656
                cred_prefs_list.append(
657
                    OID_ESXI_AUTH
658
                    + ':1:entry:'
659
                    + 'ESXi login name:|||'
660
                    + '{0}'.format(username)
661
                )
662
                cred_prefs_list.append(
663
                    OID_ESXI_AUTH
664
                    + ':2:'
665
                    + 'password:ESXi login password:|||'
666
                    + '{0}'.format(password)
667
                )
668
669
            if service == 'snmp':
670
                community = cred_params.get('community', '')
671
                auth_algorithm = cred_params.get('auth_algorithm', '')
672
                privacy_password = cred_params.get('privacy_password', '')
673
                privacy_algorithm = cred_params.get('privacy_algorithm', '')
674
675
                cred_prefs_list.append(
676
                    OID_SNMP_AUTH
677
                    + ':1:'
678
                    + 'password:SNMP Community:|||'
679
                    + '{0}'.format(community)
680
                )
681
                cred_prefs_list.append(
682
                    OID_SNMP_AUTH
683
                    + ':2:'
684
                    + 'entry:SNMPv3 Username:|||'
685
                    + '{0}'.format(username)
686
                )
687
                cred_prefs_list.append(
688
                    OID_SNMP_AUTH + ':3:'
689
                    'password:SNMPv3 Password:|||' + '{0}'.format(password)
690
                )
691
                cred_prefs_list.append(
692
                    OID_SNMP_AUTH
693
                    + ':4:'
694
                    + 'radio:SNMPv3 Authentication Algorithm:|||'
695
                    + '{0}'.format(auth_algorithm)
696
                )
697
                cred_prefs_list.append(
698
                    OID_SNMP_AUTH
699
                    + ':5:'
700
                    + 'password:SNMPv3 Privacy Password:|||'
701
                    + '{0}'.format(privacy_password)
702
                )
703
                cred_prefs_list.append(
704
                    OID_SNMP_AUTH
705
                    + ':6:'
706
                    + 'radio:SNMPv3 Privacy Algorithm:|||'
707
                    + '{0}'.format(privacy_algorithm)
708
                )
709
710
        return cred_prefs_list
711
712
    def prepare_credentials_for_openvas(self) -> bool:
713
        """Get the credentials from the scan collection and store them
714
        in the kb."""
715
        credentials = self.scan_collection.get_credentials(self.scan_id)
716
        if credentials:
717
            cred_prefs = self.build_credentials_as_prefs(credentials)
718
            if cred_prefs:
719
                self.kbdb.add_credentials_to_scan_preferences(
720
                    self.scan_id, cred_prefs
721
                )
722
723
        if credentials and not cred_prefs:
0 ignored issues
show
introduced by
The variable cred_prefs does not seem to be defined in case credentials on line 716 is False. Are you sure this can never be the case?
Loading history...
724
            return False
725
726
        return True
727
728
    def prepare_main_kbindex_for_openvas(self):
729
        """Store main_kbindex as global preference in the
730
        kb, used by OpenVAS"""
731
        ov_maindbid = 'ov_maindbid|||%d' % self.kbdb.index
732
        self.kbdb.add_scan_preferences(self.scan_id, [ov_maindbid])
733