Completed
Push — master ( a4dee7...a3f003 )
by Juan José
18s queued 14s
created

PreferenceHandler.prepare_nvt_preferences()   A

Complexity

Conditions 3

Size

Total Lines 9
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 6
nop 1
dl 0
loc 9
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
# pylint: disable=too-many-lines
21
22
""" Prepare the preferences to be used by OpenVAS. Get the data from the scan
23
collection and store the data in a redis KB in the right format to be used by
24
OpenVAS. """
25
26
import logging
27
import binascii
28
29
from enum import IntEnum
30
from typing import Optional, Dict, List, Tuple
31
from base64 import b64decode
32
33
from ospd.scan import ScanCollection
34
from ospd.ospd import BASE_SCANNER_PARAMS
35
from ospd_openvas.openvas import Openvas
36
from ospd_openvas.db import KbDB
37
from ospd_openvas.nvticache import NVTICache
38
from ospd_openvas.vthelper import VtHelper
39
from ospd_openvas.notus.metadata import NotusMetadataHandler
40
41
logger = logging.getLogger(__name__)
42
43
OID_SSH_AUTH = "1.3.6.1.4.1.25623.1.0.103591"
44
OID_SMB_AUTH = "1.3.6.1.4.1.25623.1.0.90023"
45
OID_ESXI_AUTH = "1.3.6.1.4.1.25623.1.0.105058"
46
OID_SNMP_AUTH = "1.3.6.1.4.1.25623.1.0.105076"
47
OID_PING_HOST = "1.3.6.1.4.1.25623.1.0.100315"
48
49
BOREAS_ALIVE_TEST = "ALIVE_TEST"
50
BOREAS_ALIVE_TEST_PORTS = "ALIVE_TEST_PORTS"
51
BOREAS_SETTING_NAME = "test_alive_hosts_only"
52
53
54
class AliveTest(IntEnum):
55
    """ Alive Tests. """
56
57
    ALIVE_TEST_SCAN_CONFIG_DEFAULT = 0
58
    ALIVE_TEST_TCP_ACK_SERVICE = 1
59
    ALIVE_TEST_ICMP = 2
60
    ALIVE_TEST_ARP = 4
61
    ALIVE_TEST_CONSIDER_ALIVE = 8
62
    ALIVE_TEST_TCP_SYN_SERVICE = 16
63
64
65
def alive_test_methods_to_bit_field(
66
    icmp: bool, tcp_syn: bool, tcp_ack: bool, arp: bool, consider_alive: bool
67
) -> int:
68
    """Internally a bit field is used as alive test. This function creates
69
    such a bit field out of the supplied alive test methods.
70
    """
71
72
    icmp_enum = AliveTest.ALIVE_TEST_ICMP if icmp else 0
73
    tcp_syn_enum = AliveTest.ALIVE_TEST_TCP_SYN_SERVICE if tcp_syn else 0
74
    tcp_ack_enum = AliveTest.ALIVE_TEST_TCP_ACK_SERVICE if tcp_ack else 0
75
    arp_enum = AliveTest.ALIVE_TEST_ARP if arp else 0
76
    consider_alive_enum = (
77
        AliveTest.ALIVE_TEST_CONSIDER_ALIVE if consider_alive else 0
78
    )
79
80
    bit_field = (
81
        icmp_enum | tcp_syn_enum | tcp_ack_enum | arp_enum | consider_alive_enum
82
    )
83
    return bit_field
84
85
86
def _from_bool_to_str(value: int) -> str:
87
    """The OpenVAS scanner use yes and no as boolean values, whereas ospd
88
    uses 1 and 0."""
89
    return 'yes' if value == 1 else 'no'
90
91
92
class PreferenceHandler:
93
    def __init__(
94
        self,
95
        scan_id: str,
96
        kbdb: KbDB,
97
        scan_collection: ScanCollection,
98
        nvticache: NVTICache,
99
    ):
100
        self.scan_id = scan_id
101
        self.kbdb = kbdb
102
        self.scan_collection = scan_collection
103
104
        self._target_options = None
105
        self._nvts_params = None
106
107
        self.nvti = nvticache
108
109
    def prepare_scan_id_for_openvas(self):
110
        """Create the openvas scan id and store it in the redis kb.
111
        Return the openvas scan_id.
112
        """
113
        self.kbdb.add_scan_id(self.scan_id)
114
115
    @property
116
    def target_options(self) -> Dict:
117
        """ Return target options from Scan collection """
118
        if self._target_options is not None:
119
            return self._target_options
120
121
        self._target_options = self.scan_collection.get_target_options(
122
            self.scan_id
123
        )
124
        return self._target_options
125
126
    def _get_vts_in_groups(
127
        self,
128
        filters: List[str],
129
    ) -> List[str]:
130
        """Return a list of vts which match with the given filter.
131
132
        Arguments:
133
            filters A list of filters. Each filter has key, operator and
134
                    a value. They are separated by a space.
135
                    Supported keys: family
136
137
        Returns a list of vt oids which match with the given filter.
138
        """
139
        settings = Openvas.get_settings()
140
        notus_enabled = settings.get("table_driven_lsc")
141
142
        vts_list = list()
143
        families = dict()
144
145
        notus = NotusMetadataHandler()
146
        lsc_families_and_drivers = notus.get_family_driver_linkers()
147
148
        oids = self.nvti.get_oids()
149
150
        for _, oid in oids:
151
            family = self.nvti.get_nvt_family(oid)
152
            if family not in families:
153
                families[family] = list()
154
155
            families[family].append(oid)
156
157
        for elem in filters:
158
            key, value = elem.split('=')
159
            # If the family is supported by Notus LSC and Notus
160
            # is enabled
161
            if (
162
                key == 'family'
163
                and notus_enabled
164
                and value in lsc_families_and_drivers
165
            ):
166
                driver_oid = lsc_families_and_drivers.get(value)
167
                vts_list.append(driver_oid)
168
                logger.debug('Add Notus driver for family "%s"', value)
169
            # If Notus disable or family not supported by notus.
170
            elif key == 'family' and value in families:
171
                vts_list.extend(families[value])
172
                logger.debug('Add VTs collection for family "%s"', value)
173
174
        return vts_list
175
176
    def _get_vt_param_type(self, vt: Dict, vt_param_id: str) -> Optional[str]:
177
        """ Return the type of the vt parameter from the vts dictionary. """
178
179
        vt_params_list = vt.get("vt_params")
180
        if vt_params_list.get(vt_param_id):
181
            return vt_params_list[vt_param_id]["type"]
182
        return None
183
184
    def _get_vt_param_name(self, vt: Dict, vt_param_id: str) -> Optional[str]:
185
        """ Return the type of the vt parameter from the vts dictionary. """
186
187
        vt_params_list = vt.get("vt_params")
188
        if vt_params_list.get(vt_param_id):
189
            return vt_params_list[vt_param_id]["name"]
190
        return None
191
192
    @staticmethod
193
    def check_param_type(vt_param_value: str, param_type: str) -> Optional[int]:
194
        """Check if the value of a vt parameter matches with
195
        the type founded.
196
        """
197
        if (
198
            param_type
199
            in [
200
                'entry',
201
                'password',
202
                'radio',
203
                'sshlogin',
204
            ]
205
            and isinstance(vt_param_value, str)
206
        ):
207
            return None
208
        elif param_type == 'checkbox' and (
209
            vt_param_value == '0' or vt_param_value == '1'
210
        ):
211
            return None
212
        elif param_type == 'file':
213
            try:
214
                b64decode(vt_param_value.encode())
215
            except (binascii.Error, AttributeError, TypeError):
216
                return 1
217
            return None
218
        elif param_type == 'integer':
219
            try:
220
                int(vt_param_value)
221
            except ValueError:
222
                return 1
223
            return None
224
225
        return 1
226
227
    def _process_vts(
228
        self,
229
        vts: Dict[str, Dict[str, str]],
230
    ) -> Tuple[List[str], Dict[str, str]]:
231
        """ Add single VTs and their parameters. """
232
        vts_list = []
233
        vts_params = {}
234
        vtgroups = vts.pop('vt_groups')
235
236
        vthelper = VtHelper(self.nvti)
237
238
        if vtgroups:
239
            vts_list = self._get_vts_in_groups(vtgroups)
240
241
        for vtid, vt_params in vts.items():
242
            vt = vthelper.get_single_vt(vtid)
243
            if not vt:
244
                logger.warning(
245
                    'The VT %s was not found and it will not be added to the '
246
                    'plugin scheduler.',
247
                    vtid,
248
                )
249
                continue
250
251
            vts_list.append(vtid)
252
            for vt_param_id, vt_param_value in vt_params.items():
253
                param_type = self._get_vt_param_type(vt, vt_param_id)
254
                param_name = self._get_vt_param_name(vt, vt_param_id)
255
256
                if not param_type or not param_name:
257
                    logger.debug(
258
                        'Missing type or name for VT parameter %s of %s. '
259
                        'This VT parameter will not be set.',
260
                        vt_param_id,
261
                        vtid,
262
                    )
263
                    continue
264
265
                if vt_param_id == '0':
266
                    type_aux = 'integer'
267
                else:
268
                    type_aux = param_type
269
270
                if self.check_param_type(vt_param_value, type_aux):
271
                    logger.debug(
272
                        'The VT parameter %s for %s could not be set. '
273
                        'Expected %s type for parameter value %s',
274
                        vt_param_id,
275
                        vtid,
276
                        type_aux,
277
                        str(vt_param_value),
278
                    )
279
                    continue
280
281
                if type_aux == 'checkbox':
282
                    vt_param_value = _from_bool_to_str(int(vt_param_value))
283
284
                vts_params[
285
                    "{0}:{1}:{2}:{3}".format(
286
                        vtid, vt_param_id, param_type, param_name
287
                    )
288
                ] = str(vt_param_value)
289
290
        return vts_list, vts_params
291
292
    def prepare_plugins_for_openvas(self) -> bool:
293
        """Get the plugin list and it preferences from the Scan Collection.
294
        The plugin list is immediately stored in the kb.
295
        """
296
        nvts = self.scan_collection.get_vts(self.scan_id)
297
        if nvts:
298
            nvts_list, self._nvts_params = self._process_vts(nvts)
299
            # Add nvts list
300
            separ = ';'
301
            plugin_list = 'plugin_set|||%s' % separ.join(nvts_list)
302
            self.kbdb.add_scan_preferences(self.scan_id, [plugin_list])
303
304
            nvts_list = None
305
            plugin_list = None
306
            nvts = None
307
308
            return True
309
310
        return False
311
312
    def prepare_nvt_preferences(self):
313
        """Prepare the vts preferences. Store the data in the kb."""
314
315
        items_list = []
316
        for key, val in self._nvts_params.items():
317
            items_list.append('%s|||%s' % (key, val))
318
319
        if items_list:
320
            self.kbdb.add_scan_preferences(self.scan_id, items_list)
321
322
    @staticmethod
323
    def build_alive_test_opt_as_prefs(
324
        target_options: Dict[str, str]
325
    ) -> Dict[str, str]:
326
        """Parse the target options dictionary.
327
        Arguments:
328
            target_options: Dictionary with the target options.
329
330
        Return:
331
            A dict with the target options related to alive test method
332
            in string format to be added to the redis KB.
333
        """
334
        target_opt_prefs_list = {}
335
        alive_test = None
336
337 View Code Duplication
        if target_options:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
338
            # Alive test speciefied as bit field.
339
            alive_test = target_options.get('alive_test')
340
            # Alive test speciefied as individual methods.
341
            alive_test_methods = target_options.get('alive_test_methods')
342
            # alive_test takes precedence over alive_test_methods
343
            if alive_test is None and alive_test_methods:
344
                alive_test = alive_test_methods_to_bit_field(
345
                    icmp=target_options.get('icmp') == '1',
346
                    tcp_syn=target_options.get('tcp_syn') == '1',
347
                    tcp_ack=target_options.get('tcp_ack') == '1',
348
                    arp=target_options.get('arp') == '1',
349
                    consider_alive=target_options.get('consider_alive') == '1',
350
                )
351
352
        if target_options and alive_test:
353
            try:
354
                alive_test = int(alive_test)
355
            except ValueError:
356
                logger.debug(
357
                    'Alive test settings not applied. '
358
                    'Invalid alive test value %s',
359
                    target_options.get('alive_test'),
360
                )
361
                return target_opt_prefs_list
362
363
            # No alive test or wrong value, uses the default
364
            # preferences sent by the client.
365
            if alive_test < 1 or alive_test > 31:
366
                return target_opt_prefs_list
367
368
            if (
369
                alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
370
                or alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
371
            ):
372
                value = "yes"
373
            else:
374
                value = "no"
375
            target_opt_prefs_list[
376
                OID_PING_HOST + ':1:checkbox:' + 'Do a TCP ping'
377
            ] = value
378
379
            if (
380
                alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
381
                and alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
382
            ):
383
                value = "yes"
384
            else:
385
                value = "no"
386
            target_opt_prefs_list[
387
                OID_PING_HOST
388
                + ':2:checkbox:'
389
                + 'TCP ping tries also TCP-SYN ping'
390
            ] = value
391
392
            if (alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE) and not (
393
                alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
394
            ):
395
                value = "yes"
396
            else:
397
                value = "no"
398
            target_opt_prefs_list[
399
                OID_PING_HOST
400
                + ':7:checkbox:'
401
                + 'TCP ping tries only TCP-SYN ping'
402
            ] = value
403
404
            if alive_test & AliveTest.ALIVE_TEST_ICMP:
405
                value = "yes"
406
            else:
407
                value = "no"
408
            target_opt_prefs_list[
409
                OID_PING_HOST + ':3:checkbox:' + 'Do an ICMP ping'
410
            ] = value
411
412
            if alive_test & AliveTest.ALIVE_TEST_ARP:
413
                value = "yes"
414
            else:
415
                value = "no"
416
            target_opt_prefs_list[
417
                OID_PING_HOST + ':4:checkbox:' + 'Use ARP'
418
            ] = value
419
420
            if alive_test & AliveTest.ALIVE_TEST_CONSIDER_ALIVE:
421
                value = "no"
422
            else:
423
                value = "yes"
424
            target_opt_prefs_list[
425
                OID_PING_HOST
426
                + ':5:checkbox:'
427
                + 'Mark unrechable Hosts as dead (not scanning)'
428
            ] = value
429
430
            # Also select a method, otherwise Ping Host logs a warning.
431
            if alive_test == AliveTest.ALIVE_TEST_CONSIDER_ALIVE:
432
                target_opt_prefs_list[
433
                    OID_PING_HOST + ':1:checkbox:' + 'Do a TCP ping'
434
                ] = 'yes'
435
436
        return target_opt_prefs_list
437
438
    def prepare_alive_test_option_for_openvas(self):
439
        """ Set alive test option. Overwrite the scan config settings."""
440
        settings = Openvas.get_settings()
441
        if settings and (
442
            self.target_options.get('alive_test')
443
            or self.target_options.get('alive_test_methods')
444
        ):
445
            alive_test_opt = self.build_alive_test_opt_as_prefs(
446
                self.target_options
447
            )
448
            self._nvts_params.update(alive_test_opt)
449
450
    def prepare_boreas_alive_test(self):
451
        """Set alive_test for Boreas if boreas scanner config
452
        (BOREAS_SETTING_NAME) was set"""
453
        settings = Openvas.get_settings()
454
        alive_test = None
455
        alive_test_ports = None
456
        target_options = self.target_options
457
458
        if settings:
459
            boreas = settings.get(BOREAS_SETTING_NAME)
460
            if not boreas:
461
                return
462
        else:
463
            return
464
465 View Code Duplication
        if target_options:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
466
            alive_test_ports = target_options.get('alive_test_ports')
467
            # Alive test was speciefied as bit field.
468
            alive_test = target_options.get('alive_test')
469
            # Alive test was speciefied as individual methods.
470
            alive_test_methods = target_options.get('alive_test_methods')
471
            # <alive_test> takes precedence over <alive_test_methods>
472
            if alive_test is None and alive_test_methods:
473
                alive_test = alive_test_methods_to_bit_field(
474
                    icmp=target_options.get('icmp') == '1',
475
                    tcp_syn=target_options.get('tcp_syn') == '1',
476
                    tcp_ack=target_options.get('tcp_ack') == '1',
477
                    arp=target_options.get('arp') == '1',
478
                    consider_alive=target_options.get('consider_alive') == '1',
479
                )
480
481
        if alive_test is not None:
482
            try:
483
                alive_test = int(alive_test)
484
            except ValueError:
485
                logger.debug(
486
                    'Alive test preference for Boreas not set. '
487
                    'Invalid alive test value %s.',
488
                    alive_test,
489
                )
490
                # Use default alive test as fall back
491
                alive_test = AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT
492
        # Use default alive test if no valid alive_test was provided
493
        else:
494
            alive_test = AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT
495
496
        # If a valid alive_test was set then the bit mask
497
        # has value between 31 (11111) and 1 (10000)
498
        if 1 <= alive_test <= 31:
499
            pref = "{pref_key}|||{pref_value}".format(
500
                pref_key=BOREAS_ALIVE_TEST, pref_value=alive_test
501
            )
502
            self.kbdb.add_scan_preferences(self.scan_id, [pref])
503
504
        if alive_test == AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT:
505
            alive_test = AliveTest.ALIVE_TEST_ICMP
506
            pref = "{pref_key}|||{pref_value}".format(
507
                pref_key=BOREAS_ALIVE_TEST, pref_value=alive_test
508
            )
509
            self.kbdb.add_scan_preferences(self.scan_id, [pref])
510
511
        # Add portlist if present. Validity is checked on Boreas side.
512
        if alive_test_ports is not None:
513
            pref = "{pref_key}|||{pref_value}".format(
514
                pref_key=BOREAS_ALIVE_TEST_PORTS,
515
                pref_value=alive_test_ports,
516
            )
517
            self.kbdb.add_scan_preferences(self.scan_id, [pref])
518
519
    def prepare_reverse_lookup_opt_for_openvas(self):
520
        """ Set reverse lookup options in the kb"""
521
        if self.target_options:
522
            items = []
523
            _rev_lookup_only = int(
524
                self.target_options.get('reverse_lookup_only', '0')
525
            )
526
            rev_lookup_only = _from_bool_to_str(_rev_lookup_only)
527
            items.append('reverse_lookup_only|||%s' % (rev_lookup_only))
528
529
            _rev_lookup_unify = int(
530
                self.target_options.get('reverse_lookup_unify', '0')
531
            )
532
            rev_lookup_unify = _from_bool_to_str(_rev_lookup_unify)
533
            items.append('reverse_lookup_unify|||%s' % rev_lookup_unify)
534
535
            self.kbdb.add_scan_preferences(self.scan_id, items)
536
537
    def prepare_target_for_openvas(self):
538
        """Get the target from the scan collection and set the target
539
        in the kb"""
540
541
        target = self.scan_collection.get_host_list(self.scan_id)
542
        target_aux = 'TARGET|||%s' % target
543
        self.kbdb.add_scan_preferences(self.scan_id, [target_aux])
544
545
    def prepare_ports_for_openvas(self) -> str:
546
        """Get the port list from the scan collection and store the list
547
        in the kb."""
548
        ports = self.scan_collection.get_ports(self.scan_id)
549
        port_range = 'port_range|||%s' % ports
550
        self.kbdb.add_scan_preferences(self.scan_id, [port_range])
551
552
        return ports
553
554
    def prepare_host_options_for_openvas(self):
555
        """Get the excluded and finished hosts from the scan collection and
556
        stores the list of hosts that must not be scanned in the kb."""
557
        exclude_hosts = self.scan_collection.get_exclude_hosts(self.scan_id)
558
559
        if exclude_hosts:
560
            pref_val = "exclude_hosts|||" + exclude_hosts
561
            self.kbdb.add_scan_preferences(self.scan_id, [pref_val])
562
563
    def prepare_scan_params_for_openvas(self, ospd_params: Dict[str, Dict]):
564
        """Get the scan parameters from the scan collection and store them
565
        in the kb.
566
        Arguments:
567
            ospd_params: Dictionary with the OSPD Params.
568
        """
569
        # Options which were supplied via the <scanner_params> XML element.
570
        options = self.scan_collection.get_options(self.scan_id)
571
        prefs_val = []
572
573
        for key, value in options.items():
574
            item_type = ''
575
            if key in ospd_params:
576
                item_type = ospd_params[key].get('type')
577
            else:
578
                if key not in BASE_SCANNER_PARAMS:
579
                    logger.debug(
580
                        "%s is a scanner only setting and should not be set "
581
                        "by the client. Setting needs to be included in "
582
                        "OpenVAS configuration file instead.",
583
                        key,
584
                    )
585
            if item_type == 'boolean':
586
                val = _from_bool_to_str(value)
587
            else:
588
                val = str(value)
589
            prefs_val.append(key + "|||" + val)
590
591
        if prefs_val:
592
            self.kbdb.add_scan_preferences(self.scan_id, prefs_val)
593
594
    @staticmethod
595
    def build_credentials_as_prefs(credentials: Dict) -> List[str]:
596
        """Parse the credential dictionary.
597
        Arguments:
598
            credentials: Dictionary with the credentials.
599
600
        Return:
601
            A list with the credentials in string format to be
602
            added to the redis KB.
603
        """
604
        cred_prefs_list = []
605
        for credential in credentials.items():
606
            service = credential[0]
607
            cred_params = credentials.get(service)
608
            cred_type = cred_params.get('type', '')
609
            username = cred_params.get('username', '')
610
            password = cred_params.get('password', '')
611
612
            if service == 'ssh':
613
                port = cred_params.get('port', '')
614
                cred_prefs_list.append('auth_port_ssh|||' + '{0}'.format(port))
615
                cred_prefs_list.append(
616
                    OID_SSH_AUTH
617
                    + ':1:'
618
                    + 'entry:SSH login '
619
                    + 'name:|||{0}'.format(username)
620
                )
621
                if cred_type == 'up':
622
                    cred_prefs_list.append(
623
                        OID_SSH_AUTH
624
                        + ':3:'
625
                        + 'password:SSH password '
626
                        + '(unsafe!):|||{0}'.format(password)
627
                    )
628
                else:
629
                    private = cred_params.get('private', '')
630
                    cred_prefs_list.append(
631
                        OID_SSH_AUTH
632
                        + ':2:'
633
                        + 'password:SSH key passphrase:|||'
634
                        + '{0}'.format(password)
635
                    )
636
                    cred_prefs_list.append(
637
                        OID_SSH_AUTH
638
                        + ':4:'
639
                        + 'file:SSH private key:|||'
640
                        + '{0}'.format(private)
641
                    )
642
            if service == 'smb':
643
                cred_prefs_list.append(
644
                    OID_SMB_AUTH
645
                    + ':1:entry'
646
                    + ':SMB login:|||{0}'.format(username)
647
                )
648
                cred_prefs_list.append(
649
                    OID_SMB_AUTH
650
                    + ':2:'
651
                    + 'password:SMB password:|||'
652
                    + '{0}'.format(password)
653
                )
654
            if service == 'esxi':
655
                cred_prefs_list.append(
656
                    OID_ESXI_AUTH
657
                    + ':1:entry:'
658
                    + 'ESXi login name:|||'
659
                    + '{0}'.format(username)
660
                )
661
                cred_prefs_list.append(
662
                    OID_ESXI_AUTH
663
                    + ':2:'
664
                    + 'password:ESXi login password:|||'
665
                    + '{0}'.format(password)
666
                )
667
668
            if service == 'snmp':
669
                community = cred_params.get('community', '')
670
                auth_algorithm = cred_params.get('auth_algorithm', '')
671
                privacy_password = cred_params.get('privacy_password', '')
672
                privacy_algorithm = cred_params.get('privacy_algorithm', '')
673
674
                cred_prefs_list.append(
675
                    OID_SNMP_AUTH
676
                    + ':1:'
677
                    + 'password:SNMP Community:|||'
678
                    + '{0}'.format(community)
679
                )
680
                cred_prefs_list.append(
681
                    OID_SNMP_AUTH
682
                    + ':2:'
683
                    + 'entry:SNMPv3 Username:|||'
684
                    + '{0}'.format(username)
685
                )
686
                cred_prefs_list.append(
687
                    OID_SNMP_AUTH + ':3:'
688
                    'password:SNMPv3 Password:|||' + '{0}'.format(password)
689
                )
690
                cred_prefs_list.append(
691
                    OID_SNMP_AUTH
692
                    + ':4:'
693
                    + 'radio:SNMPv3 Authentication Algorithm:|||'
694
                    + '{0}'.format(auth_algorithm)
695
                )
696
                cred_prefs_list.append(
697
                    OID_SNMP_AUTH
698
                    + ':5:'
699
                    + 'password:SNMPv3 Privacy Password:|||'
700
                    + '{0}'.format(privacy_password)
701
                )
702
                cred_prefs_list.append(
703
                    OID_SNMP_AUTH
704
                    + ':6:'
705
                    + 'radio:SNMPv3 Privacy Algorithm:|||'
706
                    + '{0}'.format(privacy_algorithm)
707
                )
708
709
        return cred_prefs_list
710
711
    def prepare_credentials_for_openvas(self) -> bool:
712
        """Get the credentials from the scan collection and store them
713
        in the kb."""
714
        credentials = self.scan_collection.get_credentials(self.scan_id)
715
        if credentials:
716
            cred_prefs = self.build_credentials_as_prefs(credentials)
717
            if cred_prefs:
718
                self.kbdb.add_credentials_to_scan_preferences(
719
                    self.scan_id, cred_prefs
720
                )
721
722
        if credentials and not cred_prefs:
0 ignored issues
show
introduced by
The variable cred_prefs does not seem to be defined in case credentials on line 715 is False. Are you sure this can never be the case?
Loading history...
723
            return False
724
725
        return True
726
727
    def prepare_main_kbindex_for_openvas(self):
728
        """Store main_kbindex as global preference in the
729
        kb, used by OpenVAS"""
730
        ov_maindbid = 'ov_maindbid|||%d' % self.kbdb.index
731
        self.kbdb.add_scan_preferences(self.scan_id, [ov_maindbid])
732