Passed
Pull Request — master (#223)
by Juan José
01:56
created

ospd_openvas.preferencehandler   F

Complexity

Total Complexity 86

Size/Duplication

Total Lines 616
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 392
dl 0
loc 616
rs 2
c 0
b 0
f 0
wmc 86

19 Methods

Rating   Name   Duplication   Size   Complexity  
C PreferenceHandler.check_param_type() 0 30 10
A PreferenceHandler.prepare_host_options_for_openvas() 0 19 4
A PreferenceHandler.prepare_target_for_openvas() 0 7 1
A PreferenceHandler.__init__() 0 16 1
A PreferenceHandler.prepare_reverse_lookup_opt_for_openvas() 0 17 2
A PreferenceHandler.prepare_plugins_for_openvas() 0 27 3
C PreferenceHandler._process_vts() 0 58 10
A PreferenceHandler.prepare_ports_for_openvas() 0 8 1
A PreferenceHandler.prepare_openvas_scan_id_for_openvas() 0 8 1
C PreferenceHandler.build_credentials_as_prefs() 0 116 7
A PreferenceHandler._get_vt_param_name() 0 7 2
F PreferenceHandler.build_alive_test_opt_as_prefs() 0 107 16
A PreferenceHandler.target_options() 0 10 2
A PreferenceHandler._get_vt_param_type() 0 7 2
B PreferenceHandler._get_vts_in_groups() 0 26 6
A PreferenceHandler.prepare_main_kbindex_for_openvas() 0 5 1
A PreferenceHandler.prepare_credentials_for_openvas() 0 13 3
A PreferenceHandler.prepare_scan_params_for_openvas() 0 20 4
B PreferenceHandler.prepare_alive_test_option_for_openvas() 0 31 8

1 Function

Rating   Name   Duplication   Size   Complexity  
A _from_bool_to_str() 0 4 2

How to fix   Complexity   

Complexity

Complex classes like ospd_openvas.preferencehandler often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-2.0-or-later
5
#
6
# This program is free software; you can redistribute it and/or
7
# modify it under the terms of the GNU General Public License
8
# as published by the Free Software Foundation; either version 2
9
# of the License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
20
# pylint: disable=too-many-lines
21
22
""" Prepare the preferences to be used by OpenVAS. Get the data from the scan
23
collection and store the data in a redis KB in the right format to be used by
24
OpenVAS. """
25
26
import logging
27
import uuid
28
import binascii
29
30
from enum import IntEnum
31
from typing import Optional, Dict, List, Tuple
32
from base64 import b64decode
33
34
from ospd.scan import ScanCollection
35
from ospd_openvas.openvas import Openvas
36
from ospd_openvas.db import KbDB
37
38
logger = logging.getLogger(__name__)
39
40
41
OID_SSH_AUTH = "1.3.6.1.4.1.25623.1.0.103591"
42
OID_SMB_AUTH = "1.3.6.1.4.1.25623.1.0.90023"
43
OID_ESXI_AUTH = "1.3.6.1.4.1.25623.1.0.105058"
44
OID_SNMP_AUTH = "1.3.6.1.4.1.25623.1.0.105076"
45
OID_PING_HOST = "1.3.6.1.4.1.25623.1.0.100315"
46
47
48
class AliveTest(IntEnum):
49
    """ Alive Tests. """
50
51
    ALIVE_TEST_TCP_ACK_SERVICE = 1
52
    ALIVE_TEST_ICMP = 2
53
    ALIVE_TEST_ARP = 4
54
    ALIVE_TEST_CONSIDER_ALIVE = 8
55
    ALIVE_TEST_TCP_SYN_SERVICE = 16
56
57
58
def _from_bool_to_str(value: int) -> str:
59
    """ The OpenVAS scanner use yes and no as boolean values, whereas ospd
60
    uses 1 and 0."""
61
    return 'yes' if value == 1 else 'no'
62
63
64
class PreferenceHandler:
65
    def __init__(
66
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
67
        scan_id: str,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
68
        kbdb: KbDB,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
69
        scan_collection: ScanCollection,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
70
        vts_cache: Dict[str, Dict],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
71
    ):
72
        self.scan_id = scan_id
73
        self.kbdb = kbdb
74
        self.scan_collection = scan_collection
75
76
        self._openvas_scan_id = None
77
78
        self._target_options = None
79
80
        self.vts_cache = vts_cache
81
82
    def prepare_openvas_scan_id_for_openvas(self):
83
        """ Create the openvas scan id and store it in the redis kb.
84
        Return the openvas scan_id.
85
        """
86
        self._openvas_scan_id = str(uuid.uuid4())
87
        self.kbdb.add_scan_id(self.scan_id, self._openvas_scan_id)
88
89
        return self._openvas_scan_id
90
91
    @property
92
    def target_options(self) -> Dict:
93
        """ Return target options from Scan collection """
94
        if self._target_options is not None:
95
            return self._target_options
96
97
        self._target_options = self.scan_collection.get_target_options(
98
            self.scan_id
99
        )
100
        return self._target_options
101
102
    def _get_vts_in_groups(self, filters: List[str],) -> List[str]:
103
        """ Return a list of vts which match with the given filter.
104
105
        Arguments:
106
            filters A list of filters. Each filter has key, operator and
107
                    a value. They are separated by a space.
108
                    Supported keys: family
109
110
        Returns a list of vt oids which match with the given filter.
111
        """
112
        vts_list = list()
113
        families = dict()
114
115
        for oid in self.vts_cache:
116
            family = self.vts_cache[oid]['custom'].get('family')
117
            if family not in families:
118
                families[family] = list()
119
120
            families[family].append(oid)
121
122
        for elem in filters:
123
            key, value = elem.split('=')
124
            if key == 'family' and value in families:
125
                vts_list.extend(families[value])
126
127
        return vts_list
128
129
    def _get_vt_param_type(self, vtid: str, vt_param_id: str) -> Optional[str]:
130
        """ Return the type of the vt parameter from the vts dictionary. """
131
132
        vt_params_list = self.vts_cache[vtid].get("vt_params")
133
        if vt_params_list.get(vt_param_id):
134
            return vt_params_list[vt_param_id]["type"]
135
        return None
136
137
    def _get_vt_param_name(self, vtid: str, vt_param_id: str,) -> Optional[str]:
138
        """ Return the type of the vt parameter from the vts dictionary. """
139
140
        vt_params_list = self.vts_cache[vtid].get("vt_params")
141
        if vt_params_list.get(vt_param_id):
142
            return vt_params_list[vt_param_id]["name"]
143
        return None
144
145
    @staticmethod
146
    def check_param_type(vt_param_value: str, param_type: str) -> Optional[int]:
147
        """ Check if the value of a vt parameter matches with
148
        the type founded.
149
        """
150
        if param_type in [
151
            'entry',
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
152
            'password',
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
153
            'radio',
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
154
            'sshlogin',
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
155
        ] and isinstance(vt_param_value, str):
156
            return None
157
        elif param_type == 'checkbox' and (
158
            vt_param_value == '0' or vt_param_value == '1'
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
159
        ):
160
            return None
161
        elif param_type == 'file':
162
            try:
163
                b64decode(vt_param_value.encode())
164
            except (binascii.Error, AttributeError, TypeError):
165
                return 1
166
            return None
167
        elif param_type == 'integer':
168
            try:
169
                int(vt_param_value)
170
            except ValueError:
171
                return 1
172
            return None
173
174
        return 1
175
176
    def _process_vts(
177
        self, vts: Dict[str, Dict[str, str]],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
178
    ) -> Tuple[List[str], Dict[str, str]]:
179
        """ Add single VTs and their parameters. """
180
        vts_list = []
181
        vts_params = {}
182
        vtgroups = vts.pop('vt_groups')
183
184
        if vtgroups:
185
            vts_list = self._get_vts_in_groups(vtgroups)
186
187
        for vtid, vt_params in vts.items():
188
            if vtid not in self.vts_cache:
189
                logger.warning(
190
                    'The VT %s was not found and it will not be loaded.', vtid
191
                )
192
                continue
193
194
            vts_list.append(vtid)
195
            for vt_param_id, vt_param_value in vt_params.items():
196
                param_type = self._get_vt_param_type(vtid, vt_param_id)
197
                param_name = self._get_vt_param_name(vtid, vt_param_id)
198
199
                if not param_type or not param_name:
200
                    logger.debug(
201
                        'Missing type or name for VT parameter %s of %s. '
202
                        'It could not be loaded.',
203
                        vt_param_id,
204
                        vtid,
205
                    )
206
                    continue
207
208
                if vt_param_id == '0':
209
                    type_aux = 'integer'
210
                else:
211
                    type_aux = param_type
212
213
                if self.check_param_type(vt_param_value, type_aux):
214
                    logger.debug(
215
                        'The VT parameter %s for %s could not be loaded. '
216
                        'Expected %s type for parameter value %s',
217
                        vt_param_id,
218
                        vtid,
219
                        type_aux,
220
                        str(vt_param_value),
221
                    )
222
                    continue
223
224
                if type_aux == 'checkbox':
225
                    vt_param_value = _from_bool_to_str(int(vt_param_value))
226
227
                vts_params[
228
                    "{0}:{1}:{2}:{3}".format(
229
                        vtid, vt_param_id, param_type, param_name
230
                    )
231
                ] = str(vt_param_value)
232
233
        return vts_list, vts_params
234
235
    def prepare_plugins_for_openvas(self) -> bool:
236
        """ Get the plugin list to be launched from the Scan Collection
237
        and prepare the vts preferences. Store the data in the kb.
238
        """
239
        nvts = self.scan_collection.get_vts(self.scan_id)
240
        self.scan_collection.release_vts_list(self.scan_id)
241
        if nvts:
242
            nvts_list, nvts_params = self._process_vts(nvts)
243
            # Add nvts list
244
            separ = ';'
245
            plugin_list = 'plugin_set|||%s' % separ.join(nvts_list)
246
            self.kbdb.add_scan_preferences(self._openvas_scan_id, [plugin_list])
247
248
            # Add nvts parameters
249
            for key, val in nvts_params.items():
250
                item = '%s|||%s' % (key, val)
251
                self.kbdb.add_scan_preferences(self._openvas_scan_id, [item])
252
253
            nvts_params = None
254
            nvts_list = None
255
            item = None
256
            plugin_list = None
257
            nvts = None
258
259
            return True
260
261
        return False
262
263
    @staticmethod
264
    def build_alive_test_opt_as_prefs(
265
        target_options: Dict[str, str]
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
266
    ) -> List[str]:
267
        """ Parse the target options dictionary.
268
        Arguments:
269
            credentials: Dictionary with the target options.
270
271
        Return:
272
            A list with the target options in string format to be
273
            added to the redis KB.
274
        """
275
        target_opt_prefs_list = []
276
        if target_options and target_options.get('alive_test'):
277
            try:
278
                alive_test = int(target_options.get('alive_test'))
279
            except ValueError:
280
                logger.debug(
281
                    'Alive test settings not applied. '
282
                    'Invalid alive test value %s',
283
                    target_options.get('alive_test'),
284
                )
285
                return target_opt_prefs_list
286
287
            if alive_test < 1 or alive_test > 31:
288
                return target_opt_prefs_list
289
290
            if (
291
                alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
292
                or alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
293
            ):
294
                value = "yes"
295
            else:
296
                value = "no"
297
            target_opt_prefs_list.append(
298
                OID_PING_HOST
299
                + ':1:checkbox:'
300
                + 'Do a TCP ping|||'
301
                + '{0}'.format(value)
302
            )
303
304
            if (
305
                alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
306
                and alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
307
            ):
308
                value = "yes"
309
            else:
310
                value = "no"
311
            target_opt_prefs_list.append(
312
                OID_PING_HOST
313
                + ':2:checkbox:'
314
                + 'TCP ping tries also TCP-SYN ping|||'
315
                + '{0}'.format(value)
316
            )
317
318
            if (alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE) and not (
319
                alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
320
            ):
321
                value = "yes"
322
            else:
323
                value = "no"
324
            target_opt_prefs_list.append(
325
                OID_PING_HOST
326
                + ':7:checkbox:'
327
                + 'TCP ping tries only TCP-SYN ping|||'
328
                + '{0}'.format(value)
329
            )
330
331
            if alive_test & AliveTest.ALIVE_TEST_ICMP:
332
                value = "yes"
333
            else:
334
                value = "no"
335
            target_opt_prefs_list.append(
336
                OID_PING_HOST
337
                + ':3:checkbox:'
338
                + 'Do an ICMP ping|||'
339
                + '{0}'.format(value)
340
            )
341
342
            if alive_test & AliveTest.ALIVE_TEST_ARP:
343
                value = "yes"
344
            else:
345
                value = "no"
346
            target_opt_prefs_list.append(
347
                OID_PING_HOST
348
                + ':4:checkbox:'
349
                + 'Use ARP|||'
350
                + '{0}'.format(value)
351
            )
352
353
            if alive_test & AliveTest.ALIVE_TEST_CONSIDER_ALIVE:
354
                value = "no"
355
            else:
356
                value = "yes"
357
            target_opt_prefs_list.append(
358
                OID_PING_HOST
359
                + ':5:checkbox:'
360
                + 'Mark unrechable Hosts as dead (not scanning)|||'
361
                + '{0}'.format(value)
362
            )
363
364
            # Also select a method, otherwise Ping Host logs a warning.
365
            if alive_test == AliveTest.ALIVE_TEST_CONSIDER_ALIVE:
366
                target_opt_prefs_list.append(
367
                    OID_PING_HOST + ':1:checkbox:' + 'Do a TCP ping|||yes'
368
                )
369
        return target_opt_prefs_list
370
371
    def prepare_alive_test_option_for_openvas(self):
372
        """ Set alive test option. Overwrite the scan config settings.
373
        Check if test_alive_hosts_only feature of openvas is active.
374
        If active, put ALIVE_TEST enum in preferences. """
375
        settings = Openvas.get_settings()
376
        if settings:
377
            test_alive_hosts_only = settings.get('test_alive_hosts_only')
378
            if test_alive_hosts_only:
379
                if self.target_options.get('alive_test'):
380
                    try:
381
                        alive_test = int(self.target_options.get('alive_test'))
382
                    except ValueError:
383
                        logger.debug(
384
                            'Alive test settings not applied. '
385
                            'Invalid alive test value %s',
386
                            self.target_options.get('alive_test'),
387
                        )
388
                    # Put ALIVE_TEST enum in db, this is then taken
389
                    # by openvas to determine the method to use
390
                    # for the alive test.
391
                    if alive_test >= 1 and alive_test <= 31:
392
                        item = 'ALIVE_TEST|||%s' % str(alive_test)
393
                        self.kbdb.add_scan_preferences(
394
                            self._openvas_scan_id, [item]
395
                        )
396
            elif self.target_options.get('alive_test'):
397
                alive_test_opt = self.build_alive_test_opt_as_prefs(
398
                    self.target_options
399
                )
400
                self.kbdb.add_scan_preferences(
401
                    self._openvas_scan_id, alive_test_opt
402
                )
403
404
    def prepare_reverse_lookup_opt_for_openvas(self):
405
        """ Set reverse lookup options in the kb"""
406
        if self.target_options:
407
            items = []
408
            _rev_lookup_only = int(
409
                self.target_options.get('reverse_lookup_only', '0')
410
            )
411
            rev_lookup_only = _from_bool_to_str(_rev_lookup_only)
412
            items.append('reverse_lookup_only|||%s' % (rev_lookup_only))
413
414
            _rev_lookup_unify = int(
415
                self.target_options.get('reverse_lookup_unify', '0')
416
            )
417
            rev_lookup_unify = _from_bool_to_str(_rev_lookup_unify)
418
            items.append('reverse_lookup_unify|||%s' % rev_lookup_unify)
419
420
            self.kbdb.add_scan_preferences(self._openvas_scan_id, items)
421
422
    def prepare_target_for_openvas(self):
423
        """ Get the target from the scan collection and set the target
424
        in the kb """
425
426
        target = self.scan_collection.get_host_list(self.scan_id)
427
        target_aux = 'TARGET|||%s' % target
428
        self.kbdb.add_scan_preferences(self._openvas_scan_id, [target_aux])
429
430
    def prepare_ports_for_openvas(self) -> str:
431
        """ Get the port list from the scan collection and store the list
432
        in the kb. """
433
        ports = self.scan_collection.get_ports(self.scan_id)
434
        port_range = 'port_range|||%s' % ports
435
        self.kbdb.add_scan_preferences(self._openvas_scan_id, [port_range])
436
437
        return ports
438
439
    def prepare_host_options_for_openvas(self):
440
        """ Get the excluded and finished hosts from the scan collection and
441
        stores the list of hosts that must not be scanned in the kb. """
442
        exclude_hosts = self.scan_collection.get_exclude_hosts(self.scan_id)
443
444
        # Get unfinished hosts, in case it is a resumed scan. And added
445
        # into exclude_hosts scan preference. Set progress for the finished ones
446
        # to 100%.
447
        finished_hosts = self.scan_collection.get_hosts_finished(self.scan_id)
448
        if finished_hosts:
449
            if exclude_hosts:
450
                finished_hosts_str = ','.join(finished_hosts)
451
                exclude_hosts = exclude_hosts + ',' + finished_hosts_str
452
            else:
453
                exclude_hosts = ','.join(finished_hosts)
454
455
        if exclude_hosts:
456
            pref_val = "exclude_hosts|||" + exclude_hosts
457
            self.kbdb.add_scan_preferences(self._openvas_scan_id, [pref_val])
458
459
    def prepare_scan_params_for_openvas(self, ospd_params: Dict[str, Dict]):
460
        """ Get the scan parameters from the scan collection and store them
461
        in the kb.
462
        Arguments:
463
            ospd_params: Dictionary with the OSPD Params.
464
        """
465
        options = self.scan_collection.get_options(self.scan_id)
466
        prefs_val = []
467
468
        for key, value in options.items():
469
            item_type = ''
470
            if key in ospd_params:
471
                item_type = ospd_params[key].get('type')
472
            if item_type == 'boolean':
473
                val = _from_bool_to_str(value)
474
            else:
475
                val = str(value)
476
            prefs_val.append(key + "|||" + val)
477
478
        self.kbdb.add_scan_preferences(self._openvas_scan_id, prefs_val)
479
480
    @staticmethod
481
    def build_credentials_as_prefs(credentials: Dict) -> List[str]:
482
        """ Parse the credential dictionary.
483
        Arguments:
484
            credentials: Dictionary with the credentials.
485
486
        Return:
487
            A list with the credentials in string format to be
488
            added to the redis KB.
489
        """
490
        cred_prefs_list = []
491
        for credential in credentials.items():
492
            service = credential[0]
493
            cred_params = credentials.get(service)
494
            cred_type = cred_params.get('type', '')
495
            username = cred_params.get('username', '')
496
            password = cred_params.get('password', '')
497
498
            if service == 'ssh':
499
                port = cred_params.get('port', '')
500
                cred_prefs_list.append('auth_port_ssh|||' + '{0}'.format(port))
501
                cred_prefs_list.append(
502
                    OID_SSH_AUTH
503
                    + ':1:'
504
                    + 'entry:SSH login '
505
                    + 'name:|||{0}'.format(username)
506
                )
507
                if cred_type == 'up':
508
                    cred_prefs_list.append(
509
                        OID_SSH_AUTH
510
                        + ':3:'
511
                        + 'password:SSH password '
512
                        + '(unsafe!):|||{0}'.format(password)
513
                    )
514
                else:
515
                    private = cred_params.get('private', '')
516
                    cred_prefs_list.append(
517
                        OID_SSH_AUTH
518
                        + ':2:'
519
                        + 'password:SSH key passphrase:|||'
520
                        + '{0}'.format(password)
521
                    )
522
                    cred_prefs_list.append(
523
                        OID_SSH_AUTH
524
                        + ':4:'
525
                        + 'file:SSH private key:|||'
526
                        + '{0}'.format(private)
527
                    )
528
            if service == 'smb':
529
                cred_prefs_list.append(
530
                    OID_SMB_AUTH
531
                    + ':1:entry'
532
                    + ':SMB login:|||{0}'.format(username)
533
                )
534
                cred_prefs_list.append(
535
                    OID_SMB_AUTH
536
                    + ':2:'
537
                    + 'password:SMB password:|||'
538
                    + '{0}'.format(password)
539
                )
540
            if service == 'esxi':
541
                cred_prefs_list.append(
542
                    OID_ESXI_AUTH
543
                    + ':1:entry:'
544
                    + 'ESXi login name:|||'
545
                    + '{0}'.format(username)
546
                )
547
                cred_prefs_list.append(
548
                    OID_ESXI_AUTH
549
                    + ':2:'
550
                    + 'password:ESXi login password:|||'
551
                    + '{0}'.format(password)
552
                )
553
554
            if service == 'snmp':
555
                community = cred_params.get('community', '')
556
                auth_algorithm = cred_params.get('auth_algorithm', '')
557
                privacy_password = cred_params.get('privacy_password', '')
558
                privacy_algorithm = cred_params.get('privacy_algorithm', '')
559
560
                cred_prefs_list.append(
561
                    OID_SNMP_AUTH
562
                    + ':1:'
563
                    + 'password:SNMP Community:|||'
564
                    + '{0}'.format(community)
565
                )
566
                cred_prefs_list.append(
567
                    OID_SNMP_AUTH
568
                    + ':2:'
569
                    + 'entry:SNMPv3 Username:|||'
570
                    + '{0}'.format(username)
571
                )
572
                cred_prefs_list.append(
573
                    OID_SNMP_AUTH + ':3:'
574
                    'password:SNMPv3 Password:|||' + '{0}'.format(password)
575
                )
576
                cred_prefs_list.append(
577
                    OID_SNMP_AUTH
578
                    + ':4:'
579
                    + 'radio:SNMPv3 Authentication Algorithm:|||'
580
                    + '{0}'.format(auth_algorithm)
581
                )
582
                cred_prefs_list.append(
583
                    OID_SNMP_AUTH
584
                    + ':5:'
585
                    + 'password:SNMPv3 Privacy Password:|||'
586
                    + '{0}'.format(privacy_password)
587
                )
588
                cred_prefs_list.append(
589
                    OID_SNMP_AUTH
590
                    + ':6:'
591
                    + 'radio:SNMPv3 Privacy Algorithm:|||'
592
                    + '{0}'.format(privacy_algorithm)
593
                )
594
595
        return cred_prefs_list
596
597
    def prepare_credentials_for_openvas(self) -> bool:
598
        """ Get the credentials from the scan collection and store them
599
        in the kb. """
600
        credentials = self.scan_collection.get_credentials(self.scan_id)
601
        if credentials:
602
            cred_prefs = self.build_credentials_as_prefs(credentials)
603
            if cred_prefs:
604
                self.kbdb.add_scan_preferences(
605
                    self._openvas_scan_id, cred_prefs
606
                )
607
                return True
608
609
        return False
610
611
    def prepare_main_kbindex_for_openvas(self):
612
        """ Store main_kbindex as global preference in the
613
        kb, used by OpenVAS"""
614
        ov_maindbid = 'ov_maindbid|||%d' % self.kbdb.index
615
        self.kbdb.add_scan_preferences(self._openvas_scan_id, [ov_maindbid])
616