Passed
Pull Request — master (#367)
by Juan José
01:29
created

PreferenceHandler._get_vts_in_groups()   C

Complexity

Conditions 9

Size

Total Lines 47
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 9
eloc 26
nop 2
dl 0
loc 47
rs 6.6666
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
# pylint: disable=too-many-lines
21
22
""" Prepare the preferences to be used by OpenVAS. Get the data from the scan
23
collection and store the data in a redis KB in the right format to be used by
24
OpenVAS. """
25
26
import logging
27
import binascii
28
29
from enum import IntEnum
30
from typing import Optional, Dict, List, Tuple
31
from base64 import b64decode
32
33
from ospd.scan import ScanCollection
34
from ospd.ospd import BASE_SCANNER_PARAMS
35
from ospd_openvas.openvas import Openvas
36
from ospd_openvas.db import KbDB
37
from ospd_openvas.nvticache import NVTICache
38
from ospd_openvas.vthelper import VtHelper
39
from ospd_openvas.notus.metadata import NotusMetadataHandler
40
41
logger = logging.getLogger(__name__)
42
43
OID_SSH_AUTH = "1.3.6.1.4.1.25623.1.0.103591"
44
OID_SMB_AUTH = "1.3.6.1.4.1.25623.1.0.90023"
45
OID_ESXI_AUTH = "1.3.6.1.4.1.25623.1.0.105058"
46
OID_SNMP_AUTH = "1.3.6.1.4.1.25623.1.0.105076"
47
OID_PING_HOST = "1.3.6.1.4.1.25623.1.0.100315"
48
49
BOREAS_ALIVE_TEST = "ALIVE_TEST"
50
BOREAS_ALIVE_TEST_PORTS = "ALIVE_TEST_PORTS"
51
BOREAS_SETTING_NAME = "test_alive_hosts_only"
52
53
54
class AliveTest(IntEnum):
55
    """ Alive Tests. """
56
57
    ALIVE_TEST_SCAN_CONFIG_DEFAULT = 0
58
    ALIVE_TEST_TCP_ACK_SERVICE = 1
59
    ALIVE_TEST_ICMP = 2
60
    ALIVE_TEST_ARP = 4
61
    ALIVE_TEST_CONSIDER_ALIVE = 8
62
    ALIVE_TEST_TCP_SYN_SERVICE = 16
63
64
65
def alive_test_methods_to_bit_field(
66
    icmp: bool, tcp_syn: bool, tcp_ack: bool, arp: bool, consider_alive: bool
67
) -> int:
68
    """Internally a bit field is used as alive test. This function creates
69
    such a bit field out of the supplied alive test methods.
70
    """
71
72
    icmp_enum = AliveTest.ALIVE_TEST_ICMP if icmp else 0
73
    tcp_syn_enum = AliveTest.ALIVE_TEST_TCP_SYN_SERVICE if tcp_syn else 0
74
    tcp_ack_enum = AliveTest.ALIVE_TEST_TCP_ACK_SERVICE if tcp_ack else 0
75
    arp_enum = AliveTest.ALIVE_TEST_ARP if arp else 0
76
    consider_alive_enum = (
77
        AliveTest.ALIVE_TEST_CONSIDER_ALIVE if consider_alive else 0
78
    )
79
80
    bit_field = (
81
        icmp_enum | tcp_syn_enum | tcp_ack_enum | arp_enum | consider_alive_enum
82
    )
83
    return bit_field
84
85
86
def _from_bool_to_str(value: int) -> str:
87
    """The OpenVAS scanner use yes and no as boolean values, whereas ospd
88
    uses 1 and 0."""
89
    return 'yes' if value == 1 else 'no'
90
91
92
class PreferenceHandler:
93
    def __init__(
94
        self,
95
        scan_id: str,
96
        kbdb: KbDB,
97
        scan_collection: ScanCollection,
98
        nvticache: NVTICache,
99
    ):
100
        self.scan_id = scan_id
101
        self.kbdb = kbdb
102
        self.scan_collection = scan_collection
103
104
        self._target_options = None
105
106
        self.nvti = nvticache
107
108
    def prepare_scan_id_for_openvas(self):
109
        """Create the openvas scan id and store it in the redis kb.
110
        Return the openvas scan_id.
111
        """
112
        self.kbdb.add_scan_id(self.scan_id)
113
114
    @property
115
    def target_options(self) -> Dict:
116
        """ Return target options from Scan collection """
117
        if self._target_options is not None:
118
            return self._target_options
119
120
        self._target_options = self.scan_collection.get_target_options(
121
            self.scan_id
122
        )
123
        return self._target_options
124
125
    def _get_vts_in_groups(
126
        self,
127
        filters: List[str],
128
    ) -> List[str]:
129
        """Return a list of vts which match with the given filter.
130
131
        Arguments:
132
            filters A list of filters. Each filter has key, operator and
133
                    a value. They are separated by a space.
134
                    Supported keys: family
135
136
        Returns a list of vt oids which match with the given filter.
137
        """
138
        settings = Openvas.get_settings()
139
        notus_enabled = settings.get("table_driven_lsc")
140
141
        vts_list = list()
142
        families = dict()
143
144
        notus = NotusMetadataHandler()
145
        lsc_families_and_drivers = notus.get_family_driver_linkers()
146
147
        oids = self.nvti.get_oids()
148
149
        for _, oid in oids:
150
            family = self.nvti.get_nvt_family(oid)
151
            if family not in families:
152
                families[family] = list()
153
154
            families[family].append(oid)
155
156
        for elem in filters:
157
            key, value = elem.split('=')
158
            # If the family is supported by Notus LSC and Notus
159
            # is enabled
160
            if (
161
                key == 'family'
162
                and notus_enabled
163
                and value in lsc_families_and_drivers
164
            ):
165
                driver_oid = lsc_families_and_drivers.get(value)
166
                vts_list.append(driver_oid)
167
            # If Notus disable or family not supported by notus.
168
            elif key == 'family' and value in families:
169
                vts_list.extend(families[value])
170
171
        return vts_list
172
173
    def _get_vt_param_type(self, vt: Dict, vt_param_id: str) -> Optional[str]:
174
        """ Return the type of the vt parameter from the vts dictionary. """
175
176
        vt_params_list = vt.get("vt_params")
177
        if vt_params_list.get(vt_param_id):
178
            return vt_params_list[vt_param_id]["type"]
179
        return None
180
181
    def _get_vt_param_name(self, vt: Dict, vt_param_id: str) -> Optional[str]:
182
        """ Return the type of the vt parameter from the vts dictionary. """
183
184
        vt_params_list = vt.get("vt_params")
185
        if vt_params_list.get(vt_param_id):
186
            return vt_params_list[vt_param_id]["name"]
187
        return None
188
189
    @staticmethod
190
    def check_param_type(vt_param_value: str, param_type: str) -> Optional[int]:
191
        """Check if the value of a vt parameter matches with
192
        the type founded.
193
        """
194
        if (
195
            param_type
196
            in [
197
                'entry',
198
                'password',
199
                'radio',
200
                'sshlogin',
201
            ]
202
            and isinstance(vt_param_value, str)
203
        ):
204
            return None
205
        elif param_type == 'checkbox' and (
206
            vt_param_value == '0' or vt_param_value == '1'
207
        ):
208
            return None
209
        elif param_type == 'file':
210
            try:
211
                b64decode(vt_param_value.encode())
212
            except (binascii.Error, AttributeError, TypeError):
213
                return 1
214
            return None
215
        elif param_type == 'integer':
216
            try:
217
                int(vt_param_value)
218
            except ValueError:
219
                return 1
220
            return None
221
222
        return 1
223
224
    def _process_vts(
225
        self,
226
        vts: Dict[str, Dict[str, str]],
227
    ) -> Tuple[List[str], Dict[str, str]]:
228
        """ Add single VTs and their parameters. """
229
        vts_list = []
230
        vts_params = {}
231
        vtgroups = vts.pop('vt_groups')
232
233
        vthelper = VtHelper(self.nvti)
234
235
        if vtgroups:
236
            vts_list = self._get_vts_in_groups(vtgroups)
237
238
        for vtid, vt_params in vts.items():
239
            vt = vthelper.get_single_vt(vtid)
240
            if not vt:
241
                logger.warning(
242
                    'The VT %s was not found and it will not be added to the '
243
                    'plugin scheduler.',
244
                    vtid,
245
                )
246
                continue
247
248
            vts_list.append(vtid)
249
            for vt_param_id, vt_param_value in vt_params.items():
250
                param_type = self._get_vt_param_type(vt, vt_param_id)
251
                param_name = self._get_vt_param_name(vt, vt_param_id)
252
253
                if not param_type or not param_name:
254
                    logger.debug(
255
                        'Missing type or name for VT parameter %s of %s. '
256
                        'This VT parameter will not be set.',
257
                        vt_param_id,
258
                        vtid,
259
                    )
260
                    continue
261
262
                if vt_param_id == '0':
263
                    type_aux = 'integer'
264
                else:
265
                    type_aux = param_type
266
267
                if self.check_param_type(vt_param_value, type_aux):
268
                    logger.debug(
269
                        'The VT parameter %s for %s could not be set. '
270
                        'Expected %s type for parameter value %s',
271
                        vt_param_id,
272
                        vtid,
273
                        type_aux,
274
                        str(vt_param_value),
275
                    )
276
                    continue
277
278
                if type_aux == 'checkbox':
279
                    vt_param_value = _from_bool_to_str(int(vt_param_value))
280
281
                vts_params[
282
                    "{0}:{1}:{2}:{3}".format(
283
                        vtid, vt_param_id, param_type, param_name
284
                    )
285
                ] = str(vt_param_value)
286
287
        return vts_list, vts_params
288
289
    def prepare_plugins_for_openvas(self) -> bool:
290
        """Get the plugin list to be launched from the Scan Collection
291
        and prepare the vts preferences. Store the data in the kb.
292
        """
293
        nvts = self.scan_collection.get_vts(self.scan_id)
294
        if nvts:
295
            nvts_list, nvts_params = self._process_vts(nvts)
296
            # Add nvts list
297
            separ = ';'
298
            plugin_list = 'plugin_set|||%s' % separ.join(nvts_list)
299
            self.kbdb.add_scan_preferences(self.scan_id, [plugin_list])
300
301
            # Add nvts parameters
302
            for key, val in nvts_params.items():
303
                item = '%s|||%s' % (key, val)
304
                self.kbdb.add_scan_preferences(self.scan_id, [item])
305
306
            nvts_params = None
307
            nvts_list = None
308
            item = None
309
            plugin_list = None
310
            nvts = None
311
312
            return True
313
314
        return False
315
316
    @staticmethod
317
    def build_alive_test_opt_as_prefs(
318
        target_options: Dict[str, str]
319
    ) -> List[str]:
320
        """Parse the target options dictionary.
321
        Arguments:
322
            target_options: Dictionary with the target options.
323
324
        Return:
325
            A list with the target options related to alive test method
326
            in string format to be added to the redis KB.
327
        """
328
        target_opt_prefs_list = []
329
        alive_test = None
330
331 View Code Duplication
        if target_options:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
332
            # Alive test speciefied as bit field.
333
            alive_test = target_options.get('alive_test')
334
            # Alive test speciefied as individual methods.
335
            alive_test_methods = target_options.get('alive_test_methods')
336
            # alive_test takes precedence over alive_test_methods
337
            if alive_test is None and alive_test_methods:
338
                alive_test = alive_test_methods_to_bit_field(
339
                    icmp=target_options.get('icmp') == '1',
340
                    tcp_syn=target_options.get('tcp_syn') == '1',
341
                    tcp_ack=target_options.get('tcp_ack') == '1',
342
                    arp=target_options.get('arp') == '1',
343
                    consider_alive=target_options.get('consider_alive') == '1',
344
                )
345
346
        if target_options and alive_test:
347
            try:
348
                alive_test = int(alive_test)
349
            except ValueError:
350
                logger.debug(
351
                    'Alive test settings not applied. '
352
                    'Invalid alive test value %s',
353
                    target_options.get('alive_test'),
354
                )
355
                return target_opt_prefs_list
356
357
            if alive_test < 1 or alive_test > 31:
358
                return target_opt_prefs_list
359
360
            if (
361
                alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
362
                or alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
363
            ):
364
                value = "yes"
365
            else:
366
                value = "no"
367
            target_opt_prefs_list.append(
368
                OID_PING_HOST
369
                + ':1:checkbox:'
370
                + 'Do a TCP ping|||'
371
                + '{0}'.format(value)
372
            )
373
374
            if (
375
                alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
376
                and alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
377
            ):
378
                value = "yes"
379
            else:
380
                value = "no"
381
            target_opt_prefs_list.append(
382
                OID_PING_HOST
383
                + ':2:checkbox:'
384
                + 'TCP ping tries also TCP-SYN ping|||'
385
                + '{0}'.format(value)
386
            )
387
388
            if (alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE) and not (
389
                alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
390
            ):
391
                value = "yes"
392
            else:
393
                value = "no"
394
            target_opt_prefs_list.append(
395
                OID_PING_HOST
396
                + ':7:checkbox:'
397
                + 'TCP ping tries only TCP-SYN ping|||'
398
                + '{0}'.format(value)
399
            )
400
401
            if alive_test & AliveTest.ALIVE_TEST_ICMP:
402
                value = "yes"
403
            else:
404
                value = "no"
405
            target_opt_prefs_list.append(
406
                OID_PING_HOST
407
                + ':3:checkbox:'
408
                + 'Do an ICMP ping|||'
409
                + '{0}'.format(value)
410
            )
411
412
            if alive_test & AliveTest.ALIVE_TEST_ARP:
413
                value = "yes"
414
            else:
415
                value = "no"
416
            target_opt_prefs_list.append(
417
                OID_PING_HOST
418
                + ':4:checkbox:'
419
                + 'Use ARP|||'
420
                + '{0}'.format(value)
421
            )
422
423
            if alive_test & AliveTest.ALIVE_TEST_CONSIDER_ALIVE:
424
                value = "no"
425
            else:
426
                value = "yes"
427
            target_opt_prefs_list.append(
428
                OID_PING_HOST
429
                + ':5:checkbox:'
430
                + 'Mark unrechable Hosts as dead (not scanning)|||'
431
                + '{0}'.format(value)
432
            )
433
434
            # Also select a method, otherwise Ping Host logs a warning.
435
            if alive_test == AliveTest.ALIVE_TEST_CONSIDER_ALIVE:
436
                target_opt_prefs_list.append(
437
                    OID_PING_HOST + ':1:checkbox:' + 'Do a TCP ping|||yes'
438
                )
439
        return target_opt_prefs_list
440
441
    def prepare_alive_test_option_for_openvas(self):
442
        """ Set alive test option. Overwrite the scan config settings."""
443
        settings = Openvas.get_settings()
444
        if settings and (
445
            self.target_options.get('alive_test')
446
            or self.target_options.get('alive_test_methods')
447
        ):
448
            alive_test_opt = self.build_alive_test_opt_as_prefs(
449
                self.target_options
450
            )
451
            if alive_test_opt:
452
                self.kbdb.add_scan_preferences(self.scan_id, alive_test_opt)
453
454
    def prepare_boreas_alive_test(self):
455
        """Set alive_test for Boreas if boreas scanner config
456
        (BOREAS_SETTING_NAME) was set"""
457
        settings = Openvas.get_settings()
458
        alive_test = None
459
        alive_test_ports = None
460
        target_options = self.target_options
461
462
        if settings:
463
            boreas = settings.get(BOREAS_SETTING_NAME)
464
            if not boreas:
465
                return
466
        else:
467
            return
468
469 View Code Duplication
        if target_options:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
470
            alive_test_ports = target_options.get('alive_test_ports')
471
            # Alive test was speciefied as bit field.
472
            alive_test = target_options.get('alive_test')
473
            # Alive test was speciefied as individual methods.
474
            alive_test_methods = target_options.get('alive_test_methods')
475
            # <alive_test> takes precedence over <alive_test_methods>
476
            if alive_test is None and alive_test_methods:
477
                alive_test = alive_test_methods_to_bit_field(
478
                    icmp=target_options.get('icmp') == '1',
479
                    tcp_syn=target_options.get('tcp_syn') == '1',
480
                    tcp_ack=target_options.get('tcp_ack') == '1',
481
                    arp=target_options.get('arp') == '1',
482
                    consider_alive=target_options.get('consider_alive') == '1',
483
                )
484
485
        if alive_test is not None:
486
            try:
487
                alive_test = int(alive_test)
488
            except ValueError:
489
                logger.debug(
490
                    'Alive test preference for Boreas not set. '
491
                    'Invalid alive test value %s.',
492
                    alive_test,
493
                )
494
                # Use default alive test as fall back
495
                alive_test = AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT
496
        # Use default alive test if no valid alive_test was provided
497
        else:
498
            alive_test = AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT
499
500
        # If a valid alive_test was set then the bit mask
501
        # has value between 31 (11111) and 1 (10000)
502
        if 1 <= alive_test <= 31:
503
            pref = "{pref_key}|||{pref_value}".format(
504
                pref_key=BOREAS_ALIVE_TEST, pref_value=alive_test
505
            )
506
            self.kbdb.add_scan_preferences(self.scan_id, [pref])
507
508
        if alive_test == AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT:
509
            alive_test = AliveTest.ALIVE_TEST_ICMP
510
            pref = "{pref_key}|||{pref_value}".format(
511
                pref_key=BOREAS_ALIVE_TEST, pref_value=alive_test
512
            )
513
            self.kbdb.add_scan_preferences(self.scan_id, [pref])
514
515
        # Add portlist if present. Validity is checked on Boreas side.
516
        if alive_test_ports is not None:
517
            pref = "{pref_key}|||{pref_value}".format(
518
                pref_key=BOREAS_ALIVE_TEST_PORTS,
519
                pref_value=alive_test_ports,
520
            )
521
            self.kbdb.add_scan_preferences(self.scan_id, [pref])
522
523
    def prepare_reverse_lookup_opt_for_openvas(self):
524
        """ Set reverse lookup options in the kb"""
525
        if self.target_options:
526
            items = []
527
            _rev_lookup_only = int(
528
                self.target_options.get('reverse_lookup_only', '0')
529
            )
530
            rev_lookup_only = _from_bool_to_str(_rev_lookup_only)
531
            items.append('reverse_lookup_only|||%s' % (rev_lookup_only))
532
533
            _rev_lookup_unify = int(
534
                self.target_options.get('reverse_lookup_unify', '0')
535
            )
536
            rev_lookup_unify = _from_bool_to_str(_rev_lookup_unify)
537
            items.append('reverse_lookup_unify|||%s' % rev_lookup_unify)
538
539
            self.kbdb.add_scan_preferences(self.scan_id, items)
540
541
    def prepare_target_for_openvas(self):
542
        """Get the target from the scan collection and set the target
543
        in the kb"""
544
545
        target = self.scan_collection.get_host_list(self.scan_id)
546
        target_aux = 'TARGET|||%s' % target
547
        self.kbdb.add_scan_preferences(self.scan_id, [target_aux])
548
549
    def prepare_ports_for_openvas(self) -> str:
550
        """Get the port list from the scan collection and store the list
551
        in the kb."""
552
        ports = self.scan_collection.get_ports(self.scan_id)
553
        port_range = 'port_range|||%s' % ports
554
        self.kbdb.add_scan_preferences(self.scan_id, [port_range])
555
556
        return ports
557
558
    def prepare_host_options_for_openvas(self):
559
        """Get the excluded and finished hosts from the scan collection and
560
        stores the list of hosts that must not be scanned in the kb."""
561
        exclude_hosts = self.scan_collection.get_exclude_hosts(self.scan_id)
562
563
        if exclude_hosts:
564
            pref_val = "exclude_hosts|||" + exclude_hosts
565
            self.kbdb.add_scan_preferences(self.scan_id, [pref_val])
566
567
    def prepare_scan_params_for_openvas(self, ospd_params: Dict[str, Dict]):
568
        """Get the scan parameters from the scan collection and store them
569
        in the kb.
570
        Arguments:
571
            ospd_params: Dictionary with the OSPD Params.
572
        """
573
        # Options which were supplied via the <scanner_params> XML element.
574
        options = self.scan_collection.get_options(self.scan_id)
575
        prefs_val = []
576
577
        for key, value in options.items():
578
            item_type = ''
579
            if key in ospd_params:
580
                item_type = ospd_params[key].get('type')
581
            else:
582
                if key not in BASE_SCANNER_PARAMS:
583
                    logger.debug(
584
                        "%s is a scanner only setting and should not be set "
585
                        "by the client. Setting needs to be included in "
586
                        "OpenVAS configuration file instead.",
587
                        key,
588
                    )
589
            if item_type == 'boolean':
590
                val = _from_bool_to_str(value)
591
            else:
592
                val = str(value)
593
            prefs_val.append(key + "|||" + val)
594
595
        if prefs_val:
596
            self.kbdb.add_scan_preferences(self.scan_id, prefs_val)
597
598
    @staticmethod
599
    def build_credentials_as_prefs(credentials: Dict) -> List[str]:
600
        """Parse the credential dictionary.
601
        Arguments:
602
            credentials: Dictionary with the credentials.
603
604
        Return:
605
            A list with the credentials in string format to be
606
            added to the redis KB.
607
        """
608
        cred_prefs_list = []
609
        for credential in credentials.items():
610
            service = credential[0]
611
            cred_params = credentials.get(service)
612
            cred_type = cred_params.get('type', '')
613
            username = cred_params.get('username', '')
614
            password = cred_params.get('password', '')
615
616
            if service == 'ssh':
617
                port = cred_params.get('port', '')
618
                cred_prefs_list.append('auth_port_ssh|||' + '{0}'.format(port))
619
                cred_prefs_list.append(
620
                    OID_SSH_AUTH
621
                    + ':1:'
622
                    + 'entry:SSH login '
623
                    + 'name:|||{0}'.format(username)
624
                )
625
                if cred_type == 'up':
626
                    cred_prefs_list.append(
627
                        OID_SSH_AUTH
628
                        + ':3:'
629
                        + 'password:SSH password '
630
                        + '(unsafe!):|||{0}'.format(password)
631
                    )
632
                else:
633
                    private = cred_params.get('private', '')
634
                    cred_prefs_list.append(
635
                        OID_SSH_AUTH
636
                        + ':2:'
637
                        + 'password:SSH key passphrase:|||'
638
                        + '{0}'.format(password)
639
                    )
640
                    cred_prefs_list.append(
641
                        OID_SSH_AUTH
642
                        + ':4:'
643
                        + 'file:SSH private key:|||'
644
                        + '{0}'.format(private)
645
                    )
646
            if service == 'smb':
647
                cred_prefs_list.append(
648
                    OID_SMB_AUTH
649
                    + ':1:entry'
650
                    + ':SMB login:|||{0}'.format(username)
651
                )
652
                cred_prefs_list.append(
653
                    OID_SMB_AUTH
654
                    + ':2:'
655
                    + 'password:SMB password:|||'
656
                    + '{0}'.format(password)
657
                )
658
            if service == 'esxi':
659
                cred_prefs_list.append(
660
                    OID_ESXI_AUTH
661
                    + ':1:entry:'
662
                    + 'ESXi login name:|||'
663
                    + '{0}'.format(username)
664
                )
665
                cred_prefs_list.append(
666
                    OID_ESXI_AUTH
667
                    + ':2:'
668
                    + 'password:ESXi login password:|||'
669
                    + '{0}'.format(password)
670
                )
671
672
            if service == 'snmp':
673
                community = cred_params.get('community', '')
674
                auth_algorithm = cred_params.get('auth_algorithm', '')
675
                privacy_password = cred_params.get('privacy_password', '')
676
                privacy_algorithm = cred_params.get('privacy_algorithm', '')
677
678
                cred_prefs_list.append(
679
                    OID_SNMP_AUTH
680
                    + ':1:'
681
                    + 'password:SNMP Community:|||'
682
                    + '{0}'.format(community)
683
                )
684
                cred_prefs_list.append(
685
                    OID_SNMP_AUTH
686
                    + ':2:'
687
                    + 'entry:SNMPv3 Username:|||'
688
                    + '{0}'.format(username)
689
                )
690
                cred_prefs_list.append(
691
                    OID_SNMP_AUTH + ':3:'
692
                    'password:SNMPv3 Password:|||' + '{0}'.format(password)
693
                )
694
                cred_prefs_list.append(
695
                    OID_SNMP_AUTH
696
                    + ':4:'
697
                    + 'radio:SNMPv3 Authentication Algorithm:|||'
698
                    + '{0}'.format(auth_algorithm)
699
                )
700
                cred_prefs_list.append(
701
                    OID_SNMP_AUTH
702
                    + ':5:'
703
                    + 'password:SNMPv3 Privacy Password:|||'
704
                    + '{0}'.format(privacy_password)
705
                )
706
                cred_prefs_list.append(
707
                    OID_SNMP_AUTH
708
                    + ':6:'
709
                    + 'radio:SNMPv3 Privacy Algorithm:|||'
710
                    + '{0}'.format(privacy_algorithm)
711
                )
712
713
        return cred_prefs_list
714
715
    def prepare_credentials_for_openvas(self) -> bool:
716
        """Get the credentials from the scan collection and store them
717
        in the kb."""
718
        credentials = self.scan_collection.get_credentials(self.scan_id)
719
        if credentials:
720
            cred_prefs = self.build_credentials_as_prefs(credentials)
721
            if cred_prefs:
722
                self.kbdb.add_credentials_to_scan_preferences(
723
                    self.scan_id, cred_prefs
724
                )
725
726
        if credentials and not cred_prefs:
0 ignored issues
show
introduced by
The variable cred_prefs does not seem to be defined in case credentials on line 719 is False. Are you sure this can never be the case?
Loading history...
727
            return False
728
729
        return True
730
731
    def prepare_main_kbindex_for_openvas(self):
732
        """Store main_kbindex as global preference in the
733
        kb, used by OpenVAS"""
734
        ov_maindbid = 'ov_maindbid|||%d' % self.kbdb.index
735
        self.kbdb.add_scan_preferences(self.scan_id, [ov_maindbid])
736