Passed
Pull Request — master (#331)
by
unknown
01:20
created

alive_test_methods_to_bit_field()   B

Complexity

Conditions 6

Size

Total Lines 19
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 11
nop 5
dl 0
loc 19
rs 8.6666
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
# pylint: disable=too-many-lines
21
22
""" Prepare the preferences to be used by OpenVAS. Get the data from the scan
23
collection and store the data in a redis KB in the right format to be used by
24
OpenVAS. """
25
26
import logging
27
import binascii
28
29
from enum import IntEnum
30
from typing import Optional, Dict, List, Tuple
31
from base64 import b64decode
32
33
from ospd.scan import ScanCollection
34
from ospd.ospd import BASE_SCANNER_PARAMS
35
from ospd_openvas.openvas import Openvas
36
from ospd_openvas.db import KbDB
37
from ospd_openvas.nvticache import NVTICache
38
from ospd_openvas.vthelper import VtHelper
39
40
logger = logging.getLogger(__name__)
41
42
43
OID_SSH_AUTH = "1.3.6.1.4.1.25623.1.0.103591"
44
OID_SMB_AUTH = "1.3.6.1.4.1.25623.1.0.90023"
45
OID_ESXI_AUTH = "1.3.6.1.4.1.25623.1.0.105058"
46
OID_SNMP_AUTH = "1.3.6.1.4.1.25623.1.0.105076"
47
OID_PING_HOST = "1.3.6.1.4.1.25623.1.0.100315"
48
49
BOREAS_ALIVE_TEST = "ALIVE_TEST"
50
BOREAS_ALIVE_TEST_PORTS = "ALIVE_TEST_PORTS"
51
BOREAS_SETTING_NAME = "test_alive_hosts_only"
52
53
54
class AliveTest(IntEnum):
55
    """ Alive Tests. """
56
57
    ALIVE_TEST_SCAN_CONFIG_DEFAULT = 0
58
    ALIVE_TEST_TCP_ACK_SERVICE = 1
59
    ALIVE_TEST_ICMP = 2
60
    ALIVE_TEST_ARP = 4
61
    ALIVE_TEST_CONSIDER_ALIVE = 8
62
    ALIVE_TEST_TCP_SYN_SERVICE = 16
63
64
65
def alive_test_methods_to_bit_field(
66
    icmp: bool, tcp_syn: bool, tcp_ack: bool, arp: bool, consider_alive: bool
67
) -> int:
68
    """Internally a bit field is used as alive test. This function creates
69
    such a bit field out of the supplied alive test methods.
70
    """
71
72
    icmp_enum = AliveTest.ALIVE_TEST_ICMP if icmp else 0
73
    tcp_syn_enum = AliveTest.ALIVE_TEST_TCP_SYN_SERVICE if tcp_syn else 0
74
    tcp_ack_enum = AliveTest.ALIVE_TEST_TCP_ACK_SERVICE if tcp_ack else 0
75
    arp_enum = AliveTest.ALIVE_TEST_ARP if arp else 0
76
    consider_alive_enum = (
77
        AliveTest.ALIVE_TEST_CONSIDER_ALIVE if consider_alive else 0
78
    )
79
80
    bit_field = (
81
        icmp_enum | tcp_syn_enum | tcp_ack_enum | arp_enum | consider_alive_enum
82
    )
83
    return bit_field
84
85
86
def _from_bool_to_str(value: int) -> str:
87
    """The OpenVAS scanner use yes and no as boolean values, whereas ospd
88
    uses 1 and 0."""
89
    return 'yes' if value == 1 else 'no'
90
91
92
class PreferenceHandler:
93
    def __init__(
94
        self,
95
        scan_id: str,
96
        kbdb: KbDB,
97
        scan_collection: ScanCollection,
98
        nvticache: NVTICache,
99
    ):
100
        self.scan_id = scan_id
101
        self.kbdb = kbdb
102
        self.scan_collection = scan_collection
103
104
        self._target_options = None
105
106
        self.nvti = nvticache
107
108
    def prepare_scan_id_for_openvas(self):
109
        """Create the openvas scan id and store it in the redis kb.
110
        Return the openvas scan_id.
111
        """
112
        self.kbdb.add_scan_id(self.scan_id)
113
114
    @property
115
    def target_options(self) -> Dict:
116
        """ Return target options from Scan collection """
117
        if self._target_options is not None:
118
            return self._target_options
119
120
        self._target_options = self.scan_collection.get_target_options(
121
            self.scan_id
122
        )
123
        return self._target_options
124
125
    def _get_vts_in_groups(
126
        self,
127
        filters: List[str],
128
    ) -> List[str]:
129
        """Return a list of vts which match with the given filter.
130
131
        Arguments:
132
            filters A list of filters. Each filter has key, operator and
133
                    a value. They are separated by a space.
134
                    Supported keys: family
135
136
        Returns a list of vt oids which match with the given filter.
137
        """
138
        vts_list = list()
139
        families = dict()
140
141
        oids = self.nvti.get_oids()
142
143
        for _, oid in oids:
144
            family = self.nvti.get_nvt_family(oid)
145
            if family not in families:
146
                families[family] = list()
147
148
            families[family].append(oid)
149
150
        for elem in filters:
151
            key, value = elem.split('=')
152
            if key == 'family' and value in families:
153
                vts_list.extend(families[value])
154
155
        return vts_list
156
157
    def _get_vt_param_type(self, vt: Dict, vt_param_id: str) -> Optional[str]:
158
        """ Return the type of the vt parameter from the vts dictionary. """
159
160
        vt_params_list = vt.get("vt_params")
161
        if vt_params_list.get(vt_param_id):
162
            return vt_params_list[vt_param_id]["type"]
163
        return None
164
165
    def _get_vt_param_name(self, vt: Dict, vt_param_id: str) -> Optional[str]:
166
        """ Return the type of the vt parameter from the vts dictionary. """
167
168
        vt_params_list = vt.get("vt_params")
169
        if vt_params_list.get(vt_param_id):
170
            return vt_params_list[vt_param_id]["name"]
171
        return None
172
173
    @staticmethod
174
    def check_param_type(vt_param_value: str, param_type: str) -> Optional[int]:
175
        """Check if the value of a vt parameter matches with
176
        the type founded.
177
        """
178
        if (
179
            param_type
180
            in [
181
                'entry',
182
                'password',
183
                'radio',
184
                'sshlogin',
185
            ]
186
            and isinstance(vt_param_value, str)
187
        ):
188
            return None
189
        elif param_type == 'checkbox' and (
190
            vt_param_value == '0' or vt_param_value == '1'
191
        ):
192
            return None
193
        elif param_type == 'file':
194
            try:
195
                b64decode(vt_param_value.encode())
196
            except (binascii.Error, AttributeError, TypeError):
197
                return 1
198
            return None
199
        elif param_type == 'integer':
200
            try:
201
                int(vt_param_value)
202
            except ValueError:
203
                return 1
204
            return None
205
206
        return 1
207
208
    def _process_vts(
209
        self,
210
        vts: Dict[str, Dict[str, str]],
211
    ) -> Tuple[List[str], Dict[str, str]]:
212
        """ Add single VTs and their parameters. """
213
        vts_list = []
214
        vts_params = {}
215
        vtgroups = vts.pop('vt_groups')
216
217
        vthelper = VtHelper(self.nvti)
218
219
        if vtgroups:
220
            vts_list = self._get_vts_in_groups(vtgroups)
221
222
        for vtid, vt_params in vts.items():
223
            vt = vthelper.get_single_vt(vtid)
224
            if not vt:
225
                logger.warning(
226
                    'The VT %s was not found and it will not be added to the '
227
                    'plugin scheduler.',
228
                    vtid,
229
                )
230
                continue
231
232
            vts_list.append(vtid)
233
            for vt_param_id, vt_param_value in vt_params.items():
234
                param_type = self._get_vt_param_type(vt, vt_param_id)
235
                param_name = self._get_vt_param_name(vt, vt_param_id)
236
237
                if not param_type or not param_name:
238
                    logger.debug(
239
                        'Missing type or name for VT parameter %s of %s. '
240
                        'This VT parameter will not be set.',
241
                        vt_param_id,
242
                        vtid,
243
                    )
244
                    continue
245
246
                if vt_param_id == '0':
247
                    type_aux = 'integer'
248
                else:
249
                    type_aux = param_type
250
251
                if self.check_param_type(vt_param_value, type_aux):
252
                    logger.debug(
253
                        'The VT parameter %s for %s could not be set. '
254
                        'Expected %s type for parameter value %s',
255
                        vt_param_id,
256
                        vtid,
257
                        type_aux,
258
                        str(vt_param_value),
259
                    )
260
                    continue
261
262
                if type_aux == 'checkbox':
263
                    vt_param_value = _from_bool_to_str(int(vt_param_value))
264
265
                vts_params[
266
                    "{0}:{1}:{2}:{3}".format(
267
                        vtid, vt_param_id, param_type, param_name
268
                    )
269
                ] = str(vt_param_value)
270
271
        return vts_list, vts_params
272
273
    def prepare_plugins_for_openvas(self) -> bool:
274
        """Get the plugin list to be launched from the Scan Collection
275
        and prepare the vts preferences. Store the data in the kb.
276
        """
277
        nvts = self.scan_collection.get_vts(self.scan_id)
278
        if nvts:
279
            nvts_list, nvts_params = self._process_vts(nvts)
280
            # Add nvts list
281
            separ = ';'
282
            plugin_list = 'plugin_set|||%s' % separ.join(nvts_list)
283
            self.kbdb.add_scan_preferences(self.scan_id, [plugin_list])
284
285
            # Add nvts parameters
286
            for key, val in nvts_params.items():
287
                item = '%s|||%s' % (key, val)
288
                self.kbdb.add_scan_preferences(self.scan_id, [item])
289
290
            nvts_params = None
291
            nvts_list = None
292
            item = None
293
            plugin_list = None
294
            nvts = None
295
296
            return True
297
298
        return False
299
300
    @staticmethod
301
    def build_alive_test_opt_as_prefs(
302
        target_options: Dict[str, str]
303
    ) -> List[str]:
304
        """Parse the target options dictionary.
305
        Arguments:
306
            target_options: Dictionary with the target options.
307
308
        Return:
309
            A list with the target options related to alive test method
310
            in string format to be added to the redis KB.
311
        """
312
        target_opt_prefs_list = []
313
        alive_test = None
314
315 View Code Duplication
        if target_options:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
316
            # Alive test speciefied as bit field.
317
            alive_test = target_options.get('alive_test')
318
            # Alive test speciefied as individual methods.
319
            alive_test_methods = target_options.get('alive_test_methods')
320
            # alive_test takes precedence over alive_test_methods
321
            if alive_test is None and alive_test_methods:
322
                alive_test = alive_test_methods_to_bit_field(
323
                    icmp=target_options.get('icmp') == '1',
324
                    tcp_syn=target_options.get('tcp_syn') == '1',
325
                    tcp_ack=target_options.get('tcp_ack') == '1',
326
                    arp=target_options.get('arp') == '1',
327
                    consider_alive=target_options.get('consider_alive') == '1',
328
                )
329
330
        if target_options and alive_test:
331
            try:
332
                alive_test = int(alive_test)
333
            except ValueError:
334
                logger.debug(
335
                    'Alive test settings not applied. '
336
                    'Invalid alive test value %s',
337
                    target_options.get('alive_test'),
338
                )
339
                return target_opt_prefs_list
340
341
            if alive_test < 1 or alive_test > 31:
342
                return target_opt_prefs_list
343
344
            if (
345
                alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
346
                or alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
347
            ):
348
                value = "yes"
349
            else:
350
                value = "no"
351
            target_opt_prefs_list.append(
352
                OID_PING_HOST
353
                + ':1:checkbox:'
354
                + 'Do a TCP ping|||'
355
                + '{0}'.format(value)
356
            )
357
358
            if (
359
                alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
360
                and alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
361
            ):
362
                value = "yes"
363
            else:
364
                value = "no"
365
            target_opt_prefs_list.append(
366
                OID_PING_HOST
367
                + ':2:checkbox:'
368
                + 'TCP ping tries also TCP-SYN ping|||'
369
                + '{0}'.format(value)
370
            )
371
372
            if (alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE) and not (
373
                alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
374
            ):
375
                value = "yes"
376
            else:
377
                value = "no"
378
            target_opt_prefs_list.append(
379
                OID_PING_HOST
380
                + ':7:checkbox:'
381
                + 'TCP ping tries only TCP-SYN ping|||'
382
                + '{0}'.format(value)
383
            )
384
385
            if alive_test & AliveTest.ALIVE_TEST_ICMP:
386
                value = "yes"
387
            else:
388
                value = "no"
389
            target_opt_prefs_list.append(
390
                OID_PING_HOST
391
                + ':3:checkbox:'
392
                + 'Do an ICMP ping|||'
393
                + '{0}'.format(value)
394
            )
395
396
            if alive_test & AliveTest.ALIVE_TEST_ARP:
397
                value = "yes"
398
            else:
399
                value = "no"
400
            target_opt_prefs_list.append(
401
                OID_PING_HOST
402
                + ':4:checkbox:'
403
                + 'Use ARP|||'
404
                + '{0}'.format(value)
405
            )
406
407
            if alive_test & AliveTest.ALIVE_TEST_CONSIDER_ALIVE:
408
                value = "no"
409
            else:
410
                value = "yes"
411
            target_opt_prefs_list.append(
412
                OID_PING_HOST
413
                + ':5:checkbox:'
414
                + 'Mark unrechable Hosts as dead (not scanning)|||'
415
                + '{0}'.format(value)
416
            )
417
418
            # Also select a method, otherwise Ping Host logs a warning.
419
            if alive_test == AliveTest.ALIVE_TEST_CONSIDER_ALIVE:
420
                target_opt_prefs_list.append(
421
                    OID_PING_HOST + ':1:checkbox:' + 'Do a TCP ping|||yes'
422
                )
423
        return target_opt_prefs_list
424
425
    def prepare_alive_test_option_for_openvas(self):
426
        """ Set alive test option. Overwrite the scan config settings."""
427
        settings = Openvas.get_settings()
428
        if settings and (
429
            self.target_options.get('alive_test')
430
            or self.target_options.get('alive_test_methods')
431
        ):
432
            alive_test_opt = self.build_alive_test_opt_as_prefs(
433
                self.target_options
434
            )
435
            if alive_test_opt:
436
                self.kbdb.add_scan_preferences(self.scan_id, alive_test_opt)
437
438
    def prepare_boreas_alive_test(self):
439
        """Set alive_test for Boreas if boreas scanner config
440
        (BOREAS_SETTING_NAME) was set"""
441
        settings = Openvas.get_settings()
442
        alive_test = None
443
        alive_test_ports = None
444
        target_options = self.target_options
445
446
        if settings:
447
            boreas = settings.get(BOREAS_SETTING_NAME)
448
            if not boreas:
449
                return
450
        else:
451
            return
452
453 View Code Duplication
        if target_options:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
454
            alive_test_ports = target_options.get('alive_test_ports')
455
            # Alive test was speciefied as bit field.
456
            alive_test = target_options.get('alive_test')
457
            # Alive test was speciefied as individual methods.
458
            alive_test_methods = target_options.get('alive_test_methods')
459
            # <alive_test> takes precedence over <alive_test_methods>
460
            if alive_test is None and alive_test_methods:
461
                alive_test = alive_test_methods_to_bit_field(
462
                    icmp=target_options.get('icmp') == '1',
463
                    tcp_syn=target_options.get('tcp_syn') == '1',
464
                    tcp_ack=target_options.get('tcp_ack') == '1',
465
                    arp=target_options.get('arp') == '1',
466
                    consider_alive=target_options.get('consider_alive') == '1',
467
                )
468
469
        if alive_test is not None:
470
            try:
471
                alive_test = int(alive_test)
472
            except ValueError:
473
                logger.debug(
474
                    'Alive test preference for Boreas not set. '
475
                    'Invalid alive test value %s.',
476
                    alive_test,
477
                )
478
                # Use default alive test as fall back
479
                alive_test = AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT
480
        # Use default alive test if no valid alive_test was provided
481
        else:
482
            alive_test = AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT
483
484
        # If a valid alive_test was set then the bit mask
485
        # has value between 31 (11111) and 1 (10000)
486
        if 1 <= alive_test <= 31:
487
            pref = "{pref_key}|||{pref_value}".format(
488
                pref_key=BOREAS_ALIVE_TEST, pref_value=alive_test
489
            )
490
            self.kbdb.add_scan_preferences(self.scan_id, [pref])
491
492
        if alive_test == AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT:
493
            alive_test = AliveTest.ALIVE_TEST_ICMP
494
            pref = "{pref_key}|||{pref_value}".format(
495
                pref_key=BOREAS_ALIVE_TEST, pref_value=alive_test
496
            )
497
            self.kbdb.add_scan_preferences(self.scan_id, [pref])
498
499
        # Add portlist if present. Validity is checked on Boreas side.
500
        if alive_test_ports is not None:
501
            pref = "{pref_key}|||{pref_value}".format(
502
                pref_key=BOREAS_ALIVE_TEST_PORTS,
503
                pref_value=alive_test_ports,
504
            )
505
            self.kbdb.add_scan_preferences(self.scan_id, [pref])
506
507
    def prepare_reverse_lookup_opt_for_openvas(self):
508
        """ Set reverse lookup options in the kb"""
509
        if self.target_options:
510
            items = []
511
            _rev_lookup_only = int(
512
                self.target_options.get('reverse_lookup_only', '0')
513
            )
514
            rev_lookup_only = _from_bool_to_str(_rev_lookup_only)
515
            items.append('reverse_lookup_only|||%s' % (rev_lookup_only))
516
517
            _rev_lookup_unify = int(
518
                self.target_options.get('reverse_lookup_unify', '0')
519
            )
520
            rev_lookup_unify = _from_bool_to_str(_rev_lookup_unify)
521
            items.append('reverse_lookup_unify|||%s' % rev_lookup_unify)
522
523
            self.kbdb.add_scan_preferences(self.scan_id, items)
524
525
    def prepare_target_for_openvas(self):
526
        """Get the target from the scan collection and set the target
527
        in the kb"""
528
529
        target = self.scan_collection.get_host_list(self.scan_id)
530
        target_aux = 'TARGET|||%s' % target
531
        self.kbdb.add_scan_preferences(self.scan_id, [target_aux])
532
533
    def prepare_ports_for_openvas(self) -> str:
534
        """Get the port list from the scan collection and store the list
535
        in the kb."""
536
        ports = self.scan_collection.get_ports(self.scan_id)
537
        port_range = 'port_range|||%s' % ports
538
        self.kbdb.add_scan_preferences(self.scan_id, [port_range])
539
540
        return ports
541
542
    def prepare_host_options_for_openvas(self):
543
        """Get the excluded and finished hosts from the scan collection and
544
        stores the list of hosts that must not be scanned in the kb."""
545
        exclude_hosts = self.scan_collection.get_exclude_hosts(self.scan_id)
546
547
        if exclude_hosts:
548
            pref_val = "exclude_hosts|||" + exclude_hosts
549
            self.kbdb.add_scan_preferences(self.scan_id, [pref_val])
550
551
    def prepare_scan_params_for_openvas(self, ospd_params: Dict[str, Dict]):
552
        """Get the scan parameters from the scan collection and store them
553
        in the kb.
554
        Arguments:
555
            ospd_params: Dictionary with the OSPD Params.
556
        """
557
        # Options which were supplied via the <scanner_params> XML element.
558
        options = self.scan_collection.get_options(self.scan_id)
559
        prefs_val = []
560
561
        for key, value in options.items():
562
            item_type = ''
563
            if key in ospd_params:
564
                item_type = ospd_params[key].get('type')
565
            else:
566
                if key not in BASE_SCANNER_PARAMS:
567
                    logger.debug(
568
                        "%s is a scanner only setting and should not be set "
569
                        "by the client. Setting needs to be included in "
570
                        "OpenVAS configuration file instead.",
571
                        key,
572
                    )
573
            if item_type == 'boolean':
574
                val = _from_bool_to_str(value)
575
            else:
576
                val = str(value)
577
            prefs_val.append(key + "|||" + val)
578
579
        if prefs_val:
580
            self.kbdb.add_scan_preferences(self.scan_id, prefs_val)
581
582
    @staticmethod
583
    def build_credentials_as_prefs(credentials: Dict) -> List[str]:
584
        """Parse the credential dictionary.
585
        Arguments:
586
            credentials: Dictionary with the credentials.
587
588
        Return:
589
            A list with the credentials in string format to be
590
            added to the redis KB.
591
        """
592
        cred_prefs_list = []
593
        for credential in credentials.items():
594
            service = credential[0]
595
            cred_params = credentials.get(service)
596
            cred_type = cred_params.get('type', '')
597
            username = cred_params.get('username', '')
598
            password = cred_params.get('password', '')
599
600
            if service == 'ssh':
601
                port = cred_params.get('port', '')
602
                cred_prefs_list.append('auth_port_ssh|||' + '{0}'.format(port))
603
                cred_prefs_list.append(
604
                    OID_SSH_AUTH
605
                    + ':1:'
606
                    + 'entry:SSH login '
607
                    + 'name:|||{0}'.format(username)
608
                )
609
                if cred_type == 'up':
610
                    cred_prefs_list.append(
611
                        OID_SSH_AUTH
612
                        + ':3:'
613
                        + 'password:SSH password '
614
                        + '(unsafe!):|||{0}'.format(password)
615
                    )
616
                else:
617
                    private = cred_params.get('private', '')
618
                    cred_prefs_list.append(
619
                        OID_SSH_AUTH
620
                        + ':2:'
621
                        + 'password:SSH key passphrase:|||'
622
                        + '{0}'.format(password)
623
                    )
624
                    cred_prefs_list.append(
625
                        OID_SSH_AUTH
626
                        + ':4:'
627
                        + 'file:SSH private key:|||'
628
                        + '{0}'.format(private)
629
                    )
630
            if service == 'smb':
631
                cred_prefs_list.append(
632
                    OID_SMB_AUTH
633
                    + ':1:entry'
634
                    + ':SMB login:|||{0}'.format(username)
635
                )
636
                cred_prefs_list.append(
637
                    OID_SMB_AUTH
638
                    + ':2:'
639
                    + 'password:SMB password:|||'
640
                    + '{0}'.format(password)
641
                )
642
            if service == 'esxi':
643
                cred_prefs_list.append(
644
                    OID_ESXI_AUTH
645
                    + ':1:entry:'
646
                    + 'ESXi login name:|||'
647
                    + '{0}'.format(username)
648
                )
649
                cred_prefs_list.append(
650
                    OID_ESXI_AUTH
651
                    + ':2:'
652
                    + 'password:ESXi login password:|||'
653
                    + '{0}'.format(password)
654
                )
655
656
            if service == 'snmp':
657
                community = cred_params.get('community', '')
658
                auth_algorithm = cred_params.get('auth_algorithm', '')
659
                privacy_password = cred_params.get('privacy_password', '')
660
                privacy_algorithm = cred_params.get('privacy_algorithm', '')
661
662
                cred_prefs_list.append(
663
                    OID_SNMP_AUTH
664
                    + ':1:'
665
                    + 'password:SNMP Community:|||'
666
                    + '{0}'.format(community)
667
                )
668
                cred_prefs_list.append(
669
                    OID_SNMP_AUTH
670
                    + ':2:'
671
                    + 'entry:SNMPv3 Username:|||'
672
                    + '{0}'.format(username)
673
                )
674
                cred_prefs_list.append(
675
                    OID_SNMP_AUTH + ':3:'
676
                    'password:SNMPv3 Password:|||' + '{0}'.format(password)
677
                )
678
                cred_prefs_list.append(
679
                    OID_SNMP_AUTH
680
                    + ':4:'
681
                    + 'radio:SNMPv3 Authentication Algorithm:|||'
682
                    + '{0}'.format(auth_algorithm)
683
                )
684
                cred_prefs_list.append(
685
                    OID_SNMP_AUTH
686
                    + ':5:'
687
                    + 'password:SNMPv3 Privacy Password:|||'
688
                    + '{0}'.format(privacy_password)
689
                )
690
                cred_prefs_list.append(
691
                    OID_SNMP_AUTH
692
                    + ':6:'
693
                    + 'radio:SNMPv3 Privacy Algorithm:|||'
694
                    + '{0}'.format(privacy_algorithm)
695
                )
696
697
        return cred_prefs_list
698
699
    def prepare_credentials_for_openvas(self) -> bool:
700
        """Get the credentials from the scan collection and store them
701
        in the kb."""
702
        credentials = self.scan_collection.get_credentials(self.scan_id)
703
        if credentials:
704
            cred_prefs = self.build_credentials_as_prefs(credentials)
705
            if cred_prefs:
706
                self.kbdb.add_credentials_to_scan_preferences(
707
                    self.scan_id, cred_prefs
708
                )
709
710
        if credentials and not cred_prefs:
0 ignored issues
show
introduced by
The variable cred_prefs does not seem to be defined in case credentials on line 703 is False. Are you sure this can never be the case?
Loading history...
711
            return False
712
713
        return True
714
715
    def prepare_main_kbindex_for_openvas(self):
716
        """Store main_kbindex as global preference in the
717
        kb, used by OpenVAS"""
718
        ov_maindbid = 'ov_maindbid|||%d' % self.kbdb.index
719
        self.kbdb.add_scan_preferences(self.scan_id, [ov_maindbid])
720