Passed
Push — master ( 62317c...eaef71 )
by Juan José
01:52 queued 13s
created

PreferenceHandler._get_vts_in_groups()   B

Complexity

Conditions 6

Size

Total Lines 31
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 16
nop 2
dl 0
loc 31
rs 8.6666
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
# pylint: disable=too-many-lines
21
22
""" Prepare the preferences to be used by OpenVAS. Get the data from the scan
23
collection and store the data in a redis KB in the right format to be used by
24
OpenVAS. """
25
26
import logging
27
import binascii
28
29
from enum import IntEnum
30
from typing import Optional, Dict, List, Tuple
31
from base64 import b64decode
32
33
from ospd.scan import ScanCollection
34
from ospd.ospd import BASE_SCANNER_PARAMS
35
from ospd_openvas.openvas import Openvas
36
from ospd_openvas.db import KbDB
37
from ospd_openvas.nvticache import NVTICache
38
from ospd_openvas.vthelper import VtHelper
39
40
logger = logging.getLogger(__name__)
41
42
43
OID_SSH_AUTH = "1.3.6.1.4.1.25623.1.0.103591"
44
OID_SMB_AUTH = "1.3.6.1.4.1.25623.1.0.90023"
45
OID_ESXI_AUTH = "1.3.6.1.4.1.25623.1.0.105058"
46
OID_SNMP_AUTH = "1.3.6.1.4.1.25623.1.0.105076"
47
OID_PING_HOST = "1.3.6.1.4.1.25623.1.0.100315"
48
49
BOREAS_ALIVE_TEST = "ALIVE_TEST"
50
BOREAS_ALIVE_TEST_PORTS = "ALIVE_TEST_PORTS"
51
BOREAS_SETTING_NAME = "test_alive_hosts_only"
52
53
54
class AliveTest(IntEnum):
55
    """ Alive Tests. """
56
57
    ALIVE_TEST_SCAN_CONFIG_DEFAULT = 0
58
    ALIVE_TEST_TCP_ACK_SERVICE = 1
59
    ALIVE_TEST_ICMP = 2
60
    ALIVE_TEST_ARP = 4
61
    ALIVE_TEST_CONSIDER_ALIVE = 8
62
    ALIVE_TEST_TCP_SYN_SERVICE = 16
63
64
65
def alive_test_methods_to_bit_field(
66
    icmp: bool, tcp_syn: bool, tcp_ack: bool, arp: bool, consider_alive: bool
67
) -> int:
68
    """Internally a bit field is used as alive test. This function creates
69
    such a bit field out of the supplied alive test methods.
70
    """
71
72
    icmp_enum = AliveTest.ALIVE_TEST_ICMP if icmp else 0
73
    tcp_syn_enum = AliveTest.ALIVE_TEST_TCP_SYN_SERVICE if tcp_syn else 0
74
    tcp_ack_enum = AliveTest.ALIVE_TEST_TCP_ACK_SERVICE if tcp_ack else 0
75
    arp_enum = AliveTest.ALIVE_TEST_ARP if arp else 0
76
    consider_alive_enum = (
77
        AliveTest.ALIVE_TEST_CONSIDER_ALIVE if consider_alive else 0
78
    )
79
80
    bit_field = (
81
        icmp_enum | tcp_syn_enum | tcp_ack_enum | arp_enum | consider_alive_enum
82
    )
83
    return bit_field
84
85
86
def _from_bool_to_str(value: int) -> str:
87
    """The OpenVAS scanner use yes and no as boolean values, whereas ospd
88
    uses 1 and 0."""
89
    return 'yes' if value == 1 else 'no'
90
91
92
class PreferenceHandler:
93
    def __init__(
94
        self,
95
        scan_id: str,
96
        kbdb: KbDB,
97
        scan_collection: ScanCollection,
98
        nvticache: NVTICache,
99
    ):
100
        self.scan_id = scan_id
101
        self.kbdb = kbdb
102
        self.scan_collection = scan_collection
103
104
        self._target_options = None
105
        self._nvts_params = None
106
107
        self.nvti = nvticache
108
109
    def prepare_scan_id_for_openvas(self):
110
        """Create the openvas scan id and store it in the redis kb.
111
        Return the openvas scan_id.
112
        """
113
        self.kbdb.add_scan_id(self.scan_id)
114
115
    @property
116
    def target_options(self) -> Dict:
117
        """ Return target options from Scan collection """
118
        if self._target_options is not None:
119
            return self._target_options
120
121
        self._target_options = self.scan_collection.get_target_options(
122
            self.scan_id
123
        )
124
        return self._target_options
125
126
    def _get_vts_in_groups(
127
        self,
128
        filters: List[str],
129
    ) -> List[str]:
130
        """Return a list of vts which match with the given filter.
131
132
        Arguments:
133
            filters A list of filters. Each filter has key, operator and
134
                    a value. They are separated by a space.
135
                    Supported keys: family
136
137
        Returns a list of vt oids which match with the given filter.
138
        """
139
        vts_list = list()
140
        families = dict()
141
142
        oids = self.nvti.get_oids()
143
144
        for _, oid in oids:
145
            family = self.nvti.get_nvt_family(oid)
146
            if family not in families:
147
                families[family] = list()
148
149
            families[family].append(oid)
150
151
        for elem in filters:
152
            key, value = elem.split('=')
153
            if key == 'family' and value in families:
154
                vts_list.extend(families[value])
155
156
        return vts_list
157
158
    def _get_vt_param_type(self, vt: Dict, vt_param_id: str) -> Optional[str]:
159
        """ Return the type of the vt parameter from the vts dictionary. """
160
161
        vt_params_list = vt.get("vt_params")
162
        if vt_params_list.get(vt_param_id):
163
            return vt_params_list[vt_param_id]["type"]
164
        return None
165
166
    def _get_vt_param_name(self, vt: Dict, vt_param_id: str) -> Optional[str]:
167
        """ Return the type of the vt parameter from the vts dictionary. """
168
169
        vt_params_list = vt.get("vt_params")
170
        if vt_params_list.get(vt_param_id):
171
            return vt_params_list[vt_param_id]["name"]
172
        return None
173
174
    @staticmethod
175
    def check_param_type(vt_param_value: str, param_type: str) -> Optional[int]:
176
        """Check if the value of a vt parameter matches with
177
        the type founded.
178
        """
179
        if (
180
            param_type
181
            in [
182
                'entry',
183
                'password',
184
                'radio',
185
                'sshlogin',
186
            ]
187
            and isinstance(vt_param_value, str)
188
        ):
189
            return None
190
        elif param_type == 'checkbox' and (
191
            vt_param_value == '0' or vt_param_value == '1'
192
        ):
193
            return None
194
        elif param_type == 'file':
195
            try:
196
                b64decode(vt_param_value.encode())
197
            except (binascii.Error, AttributeError, TypeError):
198
                return 1
199
            return None
200
        elif param_type == 'integer':
201
            try:
202
                int(vt_param_value)
203
            except ValueError:
204
                return 1
205
            return None
206
207
        return 1
208
209
    def _process_vts(
210
        self,
211
        vts: Dict[str, Dict[str, str]],
212
    ) -> Tuple[List[str], Dict[str, str]]:
213
        """ Add single VTs and their parameters. """
214
        vts_list = []
215
        vts_params = {}
216
        vtgroups = vts.pop('vt_groups')
217
218
        vthelper = VtHelper(self.nvti)
219
220
        if vtgroups:
221
            vts_list = self._get_vts_in_groups(vtgroups)
222
223
        for vtid, vt_params in vts.items():
224
            vt = vthelper.get_single_vt(vtid)
225
            if not vt:
226
                logger.warning(
227
                    'The VT %s was not found and it will not be added to the '
228
                    'plugin scheduler.',
229
                    vtid,
230
                )
231
                continue
232
233
            vts_list.append(vtid)
234
            for vt_param_id, vt_param_value in vt_params.items():
235
                param_type = self._get_vt_param_type(vt, vt_param_id)
236
                param_name = self._get_vt_param_name(vt, vt_param_id)
237
238
                if not param_type or not param_name:
239
                    logger.debug(
240
                        'Missing type or name for VT parameter %s of %s. '
241
                        'This VT parameter will not be set.',
242
                        vt_param_id,
243
                        vtid,
244
                    )
245
                    continue
246
247
                if vt_param_id == '0':
248
                    type_aux = 'integer'
249
                else:
250
                    type_aux = param_type
251
252
                if self.check_param_type(vt_param_value, type_aux):
253
                    logger.debug(
254
                        'The VT parameter %s for %s could not be set. '
255
                        'Expected %s type for parameter value %s',
256
                        vt_param_id,
257
                        vtid,
258
                        type_aux,
259
                        str(vt_param_value),
260
                    )
261
                    continue
262
263
                if type_aux == 'checkbox':
264
                    vt_param_value = _from_bool_to_str(int(vt_param_value))
265
266
                vts_params[
267
                    "{0}:{1}:{2}:{3}".format(
268
                        vtid, vt_param_id, param_type, param_name
269
                    )
270
                ] = str(vt_param_value)
271
272
        return vts_list, vts_params
273
274
    def prepare_plugins_for_openvas(self) -> bool:
275
        """Get the plugin list and it preferences from the Scan Collection.
276
        The plugin list is immediately stored in the kb.
277
        """
278
        nvts = self.scan_collection.get_vts(self.scan_id)
279
        if nvts:
280
            nvts_list, self._nvts_params = self._process_vts(nvts)
281
            # Add nvts list
282
            separ = ';'
283
            plugin_list = 'plugin_set|||%s' % separ.join(nvts_list)
284
            self.kbdb.add_scan_preferences(self.scan_id, [plugin_list])
285
286
            nvts_list = None
287
            plugin_list = None
288
            nvts = None
289
290
            return True
291
292
        return False
293
294
    def prepare_nvt_preferences(self):
295
        """Prepare the vts preferences. Store the data in the kb."""
296
297
        items_list = []
298
        for key, val in self._nvts_params.items():
299
            items_list.append('%s|||%s' % (key, val))
300
301
        if items_list:
302
            self.kbdb.add_scan_preferences(self.scan_id, items_list)
303
304
    @staticmethod
305
    def build_alive_test_opt_as_prefs(
306
        target_options: Dict[str, str]
307
    ) -> Dict[str, str]:
308
        """Parse the target options dictionary.
309
        Arguments:
310
            target_options: Dictionary with the target options.
311
312
        Return:
313
            A dict with the target options related to alive test method
314
            in string format to be added to the redis KB.
315
        """
316
        target_opt_prefs_list = {}
317
        alive_test = None
318
319 View Code Duplication
        if target_options:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
320
            # Alive test speciefied as bit field.
321
            alive_test = target_options.get('alive_test')
322
            # Alive test speciefied as individual methods.
323
            alive_test_methods = target_options.get('alive_test_methods')
324
            # alive_test takes precedence over alive_test_methods
325
            if alive_test is None and alive_test_methods:
326
                alive_test = alive_test_methods_to_bit_field(
327
                    icmp=target_options.get('icmp') == '1',
328
                    tcp_syn=target_options.get('tcp_syn') == '1',
329
                    tcp_ack=target_options.get('tcp_ack') == '1',
330
                    arp=target_options.get('arp') == '1',
331
                    consider_alive=target_options.get('consider_alive') == '1',
332
                )
333
334
        if target_options and alive_test:
335
            try:
336
                alive_test = int(alive_test)
337
            except ValueError:
338
                logger.debug(
339
                    'Alive test settings not applied. '
340
                    'Invalid alive test value %s',
341
                    target_options.get('alive_test'),
342
                )
343
                return target_opt_prefs_list
344
345
            # No alive test or wrong value, uses the default
346
            # preferences sent by the client.
347
            if alive_test < 1 or alive_test > 31:
348
                return target_opt_prefs_list
349
350
            if (
351
                alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
352
                or alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
353
            ):
354
                value = "yes"
355
            else:
356
                value = "no"
357
            target_opt_prefs_list[
358
                OID_PING_HOST + ':1:checkbox:' + 'Do a TCP ping'
359
            ] = value
360
361
            if (
362
                alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
363
                and alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
364
            ):
365
                value = "yes"
366
            else:
367
                value = "no"
368
            target_opt_prefs_list[
369
                OID_PING_HOST
370
                + ':2:checkbox:'
371
                + 'TCP ping tries also TCP-SYN ping'
372
            ] = value
373
374
            if (alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE) and not (
375
                alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
376
            ):
377
                value = "yes"
378
            else:
379
                value = "no"
380
            target_opt_prefs_list[
381
                OID_PING_HOST
382
                + ':7:checkbox:'
383
                + 'TCP ping tries only TCP-SYN ping'
384
            ] = value
385
386
            if alive_test & AliveTest.ALIVE_TEST_ICMP:
387
                value = "yes"
388
            else:
389
                value = "no"
390
            target_opt_prefs_list[
391
                OID_PING_HOST + ':3:checkbox:' + 'Do an ICMP ping'
392
            ] = value
393
394
            if alive_test & AliveTest.ALIVE_TEST_ARP:
395
                value = "yes"
396
            else:
397
                value = "no"
398
            target_opt_prefs_list[
399
                OID_PING_HOST + ':4:checkbox:' + 'Use ARP'
400
            ] = value
401
402
            if alive_test & AliveTest.ALIVE_TEST_CONSIDER_ALIVE:
403
                value = "no"
404
            else:
405
                value = "yes"
406
            target_opt_prefs_list[
407
                OID_PING_HOST
408
                + ':5:checkbox:'
409
                + 'Mark unrechable Hosts as dead (not scanning)'
410
            ] = value
411
412
            # Also select a method, otherwise Ping Host logs a warning.
413
            if alive_test == AliveTest.ALIVE_TEST_CONSIDER_ALIVE:
414
                target_opt_prefs_list[
415
                    OID_PING_HOST + ':1:checkbox:' + 'Do a TCP ping'
416
                ] = 'yes'
417
418
        return target_opt_prefs_list
419
420
    def prepare_alive_test_option_for_openvas(self):
421
        """ Set alive test option. Overwrite the scan config settings."""
422
        settings = Openvas.get_settings()
423
        if settings and (
424
            self.target_options.get('alive_test')
425
            or self.target_options.get('alive_test_methods')
426
        ):
427
            alive_test_opt = self.build_alive_test_opt_as_prefs(
428
                self.target_options
429
            )
430
            self._nvts_params.update(alive_test_opt)
431
432
    def prepare_boreas_alive_test(self):
433
        """Set alive_test for Boreas if boreas scanner config
434
        (BOREAS_SETTING_NAME) was set"""
435
        settings = Openvas.get_settings()
436
        alive_test = None
437
        alive_test_ports = None
438
        target_options = self.target_options
439
440
        if settings:
441
            boreas = settings.get(BOREAS_SETTING_NAME)
442
            if not boreas:
443
                return
444
        else:
445
            return
446
447 View Code Duplication
        if target_options:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
448
            alive_test_ports = target_options.get('alive_test_ports')
449
            # Alive test was speciefied as bit field.
450
            alive_test = target_options.get('alive_test')
451
            # Alive test was speciefied as individual methods.
452
            alive_test_methods = target_options.get('alive_test_methods')
453
            # <alive_test> takes precedence over <alive_test_methods>
454
            if alive_test is None and alive_test_methods:
455
                alive_test = alive_test_methods_to_bit_field(
456
                    icmp=target_options.get('icmp') == '1',
457
                    tcp_syn=target_options.get('tcp_syn') == '1',
458
                    tcp_ack=target_options.get('tcp_ack') == '1',
459
                    arp=target_options.get('arp') == '1',
460
                    consider_alive=target_options.get('consider_alive') == '1',
461
                )
462
463
        if alive_test is not None:
464
            try:
465
                alive_test = int(alive_test)
466
            except ValueError:
467
                logger.debug(
468
                    'Alive test preference for Boreas not set. '
469
                    'Invalid alive test value %s.',
470
                    alive_test,
471
                )
472
                # Use default alive test as fall back
473
                alive_test = AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT
474
        # Use default alive test if no valid alive_test was provided
475
        else:
476
            alive_test = AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT
477
478
        # If a valid alive_test was set then the bit mask
479
        # has value between 31 (11111) and 1 (10000)
480
        if 1 <= alive_test <= 31:
481
            pref = "{pref_key}|||{pref_value}".format(
482
                pref_key=BOREAS_ALIVE_TEST, pref_value=alive_test
483
            )
484
            self.kbdb.add_scan_preferences(self.scan_id, [pref])
485
486
        if alive_test == AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT:
487
            alive_test = AliveTest.ALIVE_TEST_ICMP
488
            pref = "{pref_key}|||{pref_value}".format(
489
                pref_key=BOREAS_ALIVE_TEST, pref_value=alive_test
490
            )
491
            self.kbdb.add_scan_preferences(self.scan_id, [pref])
492
493
        # Add portlist if present. Validity is checked on Boreas side.
494
        if alive_test_ports is not None:
495
            pref = "{pref_key}|||{pref_value}".format(
496
                pref_key=BOREAS_ALIVE_TEST_PORTS,
497
                pref_value=alive_test_ports,
498
            )
499
            self.kbdb.add_scan_preferences(self.scan_id, [pref])
500
501
    def prepare_reverse_lookup_opt_for_openvas(self):
502
        """ Set reverse lookup options in the kb"""
503
        if self.target_options:
504
            items = []
505
            _rev_lookup_only = int(
506
                self.target_options.get('reverse_lookup_only', '0')
507
            )
508
            rev_lookup_only = _from_bool_to_str(_rev_lookup_only)
509
            items.append('reverse_lookup_only|||%s' % (rev_lookup_only))
510
511
            _rev_lookup_unify = int(
512
                self.target_options.get('reverse_lookup_unify', '0')
513
            )
514
            rev_lookup_unify = _from_bool_to_str(_rev_lookup_unify)
515
            items.append('reverse_lookup_unify|||%s' % rev_lookup_unify)
516
517
            self.kbdb.add_scan_preferences(self.scan_id, items)
518
519
    def prepare_target_for_openvas(self):
520
        """Get the target from the scan collection and set the target
521
        in the kb"""
522
523
        target = self.scan_collection.get_host_list(self.scan_id)
524
        target_aux = 'TARGET|||%s' % target
525
        self.kbdb.add_scan_preferences(self.scan_id, [target_aux])
526
527
    def prepare_ports_for_openvas(self) -> str:
528
        """Get the port list from the scan collection and store the list
529
        in the kb."""
530
        ports = self.scan_collection.get_ports(self.scan_id)
531
        port_range = 'port_range|||%s' % ports
532
        self.kbdb.add_scan_preferences(self.scan_id, [port_range])
533
534
        return ports
535
536
    def prepare_host_options_for_openvas(self):
537
        """Get the excluded and finished hosts from the scan collection and
538
        stores the list of hosts that must not be scanned in the kb."""
539
        exclude_hosts = self.scan_collection.get_exclude_hosts(self.scan_id)
540
541
        if exclude_hosts:
542
            pref_val = "exclude_hosts|||" + exclude_hosts
543
            self.kbdb.add_scan_preferences(self.scan_id, [pref_val])
544
545
    def prepare_scan_params_for_openvas(self, ospd_params: Dict[str, Dict]):
546
        """Get the scan parameters from the scan collection and store them
547
        in the kb.
548
        Arguments:
549
            ospd_params: Dictionary with the OSPD Params.
550
        """
551
        # Options which were supplied via the <scanner_params> XML element.
552
        options = self.scan_collection.get_options(self.scan_id)
553
        prefs_val = []
554
555
        for key, value in options.items():
556
            item_type = ''
557
            if key in ospd_params:
558
                item_type = ospd_params[key].get('type')
559
            else:
560
                if key not in BASE_SCANNER_PARAMS:
561
                    logger.debug(
562
                        "%s is a scanner only setting and should not be set "
563
                        "by the client. Setting needs to be included in "
564
                        "OpenVAS configuration file instead.",
565
                        key,
566
                    )
567
            if item_type == 'boolean':
568
                val = _from_bool_to_str(value)
569
            else:
570
                val = str(value)
571
            prefs_val.append(key + "|||" + val)
572
573
        if prefs_val:
574
            self.kbdb.add_scan_preferences(self.scan_id, prefs_val)
575
576
    @staticmethod
577
    def build_credentials_as_prefs(credentials: Dict) -> List[str]:
578
        """Parse the credential dictionary.
579
        Arguments:
580
            credentials: Dictionary with the credentials.
581
582
        Return:
583
            A list with the credentials in string format to be
584
            added to the redis KB.
585
        """
586
        cred_prefs_list = []
587
        for credential in credentials.items():
588
            service = credential[0]
589
            cred_params = credentials.get(service)
590
            cred_type = cred_params.get('type', '')
591
            username = cred_params.get('username', '')
592
            password = cred_params.get('password', '')
593
594
            if service == 'ssh':
595
                port = cred_params.get('port', '')
596
                cred_prefs_list.append('auth_port_ssh|||' + '{0}'.format(port))
597
                cred_prefs_list.append(
598
                    OID_SSH_AUTH
599
                    + ':1:'
600
                    + 'entry:SSH login '
601
                    + 'name:|||{0}'.format(username)
602
                )
603
                if cred_type == 'up':
604
                    cred_prefs_list.append(
605
                        OID_SSH_AUTH
606
                        + ':3:'
607
                        + 'password:SSH password '
608
                        + '(unsafe!):|||{0}'.format(password)
609
                    )
610
                else:
611
                    private = cred_params.get('private', '')
612
                    cred_prefs_list.append(
613
                        OID_SSH_AUTH
614
                        + ':2:'
615
                        + 'password:SSH key passphrase:|||'
616
                        + '{0}'.format(password)
617
                    )
618
                    cred_prefs_list.append(
619
                        OID_SSH_AUTH
620
                        + ':4:'
621
                        + 'file:SSH private key:|||'
622
                        + '{0}'.format(private)
623
                    )
624
            if service == 'smb':
625
                cred_prefs_list.append(
626
                    OID_SMB_AUTH
627
                    + ':1:entry'
628
                    + ':SMB login:|||{0}'.format(username)
629
                )
630
                cred_prefs_list.append(
631
                    OID_SMB_AUTH
632
                    + ':2:'
633
                    + 'password:SMB password:|||'
634
                    + '{0}'.format(password)
635
                )
636
            if service == 'esxi':
637
                cred_prefs_list.append(
638
                    OID_ESXI_AUTH
639
                    + ':1:entry:'
640
                    + 'ESXi login name:|||'
641
                    + '{0}'.format(username)
642
                )
643
                cred_prefs_list.append(
644
                    OID_ESXI_AUTH
645
                    + ':2:'
646
                    + 'password:ESXi login password:|||'
647
                    + '{0}'.format(password)
648
                )
649
650
            if service == 'snmp':
651
                community = cred_params.get('community', '')
652
                auth_algorithm = cred_params.get('auth_algorithm', '')
653
                privacy_password = cred_params.get('privacy_password', '')
654
                privacy_algorithm = cred_params.get('privacy_algorithm', '')
655
656
                cred_prefs_list.append(
657
                    OID_SNMP_AUTH
658
                    + ':1:'
659
                    + 'password:SNMP Community:|||'
660
                    + '{0}'.format(community)
661
                )
662
                cred_prefs_list.append(
663
                    OID_SNMP_AUTH
664
                    + ':2:'
665
                    + 'entry:SNMPv3 Username:|||'
666
                    + '{0}'.format(username)
667
                )
668
                cred_prefs_list.append(
669
                    OID_SNMP_AUTH + ':3:'
670
                    'password:SNMPv3 Password:|||' + '{0}'.format(password)
671
                )
672
                cred_prefs_list.append(
673
                    OID_SNMP_AUTH
674
                    + ':4:'
675
                    + 'radio:SNMPv3 Authentication Algorithm:|||'
676
                    + '{0}'.format(auth_algorithm)
677
                )
678
                cred_prefs_list.append(
679
                    OID_SNMP_AUTH
680
                    + ':5:'
681
                    + 'password:SNMPv3 Privacy Password:|||'
682
                    + '{0}'.format(privacy_password)
683
                )
684
                cred_prefs_list.append(
685
                    OID_SNMP_AUTH
686
                    + ':6:'
687
                    + 'radio:SNMPv3 Privacy Algorithm:|||'
688
                    + '{0}'.format(privacy_algorithm)
689
                )
690
691
        return cred_prefs_list
692
693
    def prepare_credentials_for_openvas(self) -> bool:
694
        """Get the credentials from the scan collection and store them
695
        in the kb."""
696
        credentials = self.scan_collection.get_credentials(self.scan_id)
697
        if credentials:
698
            cred_prefs = self.build_credentials_as_prefs(credentials)
699
            if cred_prefs:
700
                self.kbdb.add_credentials_to_scan_preferences(
701
                    self.scan_id, cred_prefs
702
                )
703
704
        if credentials and not cred_prefs:
0 ignored issues
show
introduced by
The variable cred_prefs does not seem to be defined in case credentials on line 697 is False. Are you sure this can never be the case?
Loading history...
705
            return False
706
707
        return True
708
709
    def prepare_main_kbindex_for_openvas(self):
710
        """Store main_kbindex as global preference in the
711
        kb, used by OpenVAS"""
712
        ov_maindbid = 'ov_maindbid|||%d' % self.kbdb.index
713
        self.kbdb.add_scan_preferences(self.scan_id, [ov_maindbid])
714