Passed
Pull Request — master (#416)
by
unknown
01:26
created

PreferenceHandler.build_credentials_as_prefs()   F

Complexity

Conditions 20

Size

Total Lines 188
Code Lines 142

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 20
eloc 142
nop 2
dl 0
loc 188
rs 0
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like ospd_openvas.preferencehandler.PreferenceHandler.build_credentials_as_prefs() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
# pylint: disable=too-many-lines
21
22
""" Prepare the preferences to be used by OpenVAS. Get the data from the scan
23
collection and store the data in a redis KB in the right format to be used by
24
OpenVAS. """
25
26
import logging
27
import binascii
28
29
from enum import IntEnum
30
from typing import Optional, Dict, List, Tuple
31
from base64 import b64decode
32
33
from ospd.scan import ScanCollection
34
from ospd.ospd import BASE_SCANNER_PARAMS
35
from ospd.network import valid_port_list
36
from ospd_openvas.openvas import Openvas
37
from ospd_openvas.db import KbDB
38
from ospd_openvas.nvticache import NVTICache
39
from ospd_openvas.vthelper import VtHelper
40
41
logger = logging.getLogger(__name__)
42
43
44
OID_SSH_AUTH = "1.3.6.1.4.1.25623.1.0.103591"
45
OID_SMB_AUTH = "1.3.6.1.4.1.25623.1.0.90023"
46
OID_ESXI_AUTH = "1.3.6.1.4.1.25623.1.0.105058"
47
OID_SNMP_AUTH = "1.3.6.1.4.1.25623.1.0.105076"
48
OID_PING_HOST = "1.3.6.1.4.1.25623.1.0.100315"
49
50
BOREAS_ALIVE_TEST = "ALIVE_TEST"
51
BOREAS_ALIVE_TEST_PORTS = "ALIVE_TEST_PORTS"
52
BOREAS_SETTING_NAME = "test_alive_hosts_only"
53
54
55
class AliveTest(IntEnum):
56
    """Alive Tests."""
57
58
    ALIVE_TEST_SCAN_CONFIG_DEFAULT = 0
59
    ALIVE_TEST_TCP_ACK_SERVICE = 1
60
    ALIVE_TEST_ICMP = 2
61
    ALIVE_TEST_ARP = 4
62
    ALIVE_TEST_CONSIDER_ALIVE = 8
63
    ALIVE_TEST_TCP_SYN_SERVICE = 16
64
65
66
def alive_test_methods_to_bit_field(
67
    icmp: bool, tcp_syn: bool, tcp_ack: bool, arp: bool, consider_alive: bool
68
) -> int:
69
    """Internally a bit field is used as alive test. This function creates
70
    such a bit field out of the supplied alive test methods.
71
    """
72
73
    icmp_enum = AliveTest.ALIVE_TEST_ICMP if icmp else 0
74
    tcp_syn_enum = AliveTest.ALIVE_TEST_TCP_SYN_SERVICE if tcp_syn else 0
75
    tcp_ack_enum = AliveTest.ALIVE_TEST_TCP_ACK_SERVICE if tcp_ack else 0
76
    arp_enum = AliveTest.ALIVE_TEST_ARP if arp else 0
77
    consider_alive_enum = (
78
        AliveTest.ALIVE_TEST_CONSIDER_ALIVE if consider_alive else 0
79
    )
80
81
    bit_field = (
82
        icmp_enum | tcp_syn_enum | tcp_ack_enum | arp_enum | consider_alive_enum
83
    )
84
    return bit_field
85
86
87
def _from_bool_to_str(value: int) -> str:
88
    """The OpenVAS scanner use yes and no as boolean values, whereas ospd
89
    uses 1 and 0."""
90
    return 'yes' if value == 1 else 'no'
91
92
93
class PreferenceHandler:
94
    def __init__(
95
        self,
96
        scan_id: str,
97
        kbdb: KbDB,
98
        scan_collection: ScanCollection,
99
        nvticache: NVTICache,
100
    ):
101
        self.scan_id = scan_id
102
        self.kbdb = kbdb
103
        self.scan_collection = scan_collection
104
105
        self._target_options = None
106
        self._nvts_params = None
107
108
        self.nvti = nvticache
109
110
        self.errors = []
111
112
    def prepare_scan_id_for_openvas(self):
113
        """Create the openvas scan id and store it in the redis kb.
114
        Return the openvas scan_id.
115
        """
116
        self.kbdb.add_scan_id(self.scan_id)
117
118
    @property
119
    def target_options(self) -> Dict:
120
        """Return target options from Scan collection"""
121
        if self._target_options is not None:
122
            return self._target_options
123
124
        self._target_options = self.scan_collection.get_target_options(
125
            self.scan_id
126
        )
127
        return self._target_options
128
129
    def _get_vts_in_groups(
130
        self,
131
        filters: List[str],
132
    ) -> List[str]:
133
        """Return a list of vts which match with the given filter.
134
135
        Arguments:
136
            filters A list of filters. Each filter has key, operator and
137
                    a value. They are separated by a space.
138
                    Supported keys: family
139
140
        Returns a list of vt oids which match with the given filter.
141
        """
142
        vts_list = list()
143
        families = dict()
144
145
        oids = self.nvti.get_oids()
146
147
        for _, oid in oids:
148
            family = self.nvti.get_nvt_family(oid)
149
            if family not in families:
150
                families[family] = list()
151
152
            families[family].append(oid)
153
154
        for elem in filters:
155
            key, value = elem.split('=')
156
            if key == 'family' and value in families:
157
                vts_list.extend(families[value])
158
159
        return vts_list
160
161
    def _get_vt_param_type(self, vt: Dict, vt_param_id: str) -> Optional[str]:
162
        """Return the type of the vt parameter from the vts dictionary."""
163
164
        vt_params_list = vt.get("vt_params")
165
        if vt_params_list.get(vt_param_id):
166
            return vt_params_list[vt_param_id]["type"]
167
        return None
168
169
    def _get_vt_param_name(self, vt: Dict, vt_param_id: str) -> Optional[str]:
170
        """Return the type of the vt parameter from the vts dictionary."""
171
172
        vt_params_list = vt.get("vt_params")
173
        if vt_params_list.get(vt_param_id):
174
            return vt_params_list[vt_param_id]["name"]
175
        return None
176
177
    @staticmethod
178
    def check_param_type(vt_param_value: str, param_type: str) -> Optional[int]:
179
        """Check if the value of a vt parameter matches with
180
        the type founded.
181
        """
182
        if (
183
            param_type
184
            in [
185
                'entry',
186
                'password',
187
                'radio',
188
                'sshlogin',
189
            ]
190
            and isinstance(vt_param_value, str)
191
        ):
192
            return None
193
        elif param_type == 'checkbox' and (
194
            vt_param_value == '0' or vt_param_value == '1'
195
        ):
196
            return None
197
        elif param_type == 'file':
198
            try:
199
                b64decode(vt_param_value.encode())
200
            except (binascii.Error, AttributeError, TypeError):
201
                return 1
202
            return None
203
        elif param_type == 'integer':
204
            try:
205
                int(vt_param_value)
206
            except ValueError:
207
                return 1
208
            return None
209
210
        return 1
211
212
    def _process_vts(
213
        self,
214
        vts: Dict[str, Dict[str, str]],
215
    ) -> Tuple[List[str], Dict[str, str]]:
216
        """Add single VTs and their parameters."""
217
        vts_list = []
218
        vts_params = {}
219
        vtgroups = vts.pop('vt_groups')
220
221
        vthelper = VtHelper(self.nvti)
222
223
        if vtgroups:
224
            vts_list = self._get_vts_in_groups(vtgroups)
225
226
        for vtid, vt_params in vts.items():
227
            vt = vthelper.get_single_vt(vtid)
228
            if not vt:
229
                logger.warning(
230
                    'The VT %s was not found and it will not be added to the '
231
                    'plugin scheduler.',
232
                    vtid,
233
                )
234
                continue
235
236
            vts_list.append(vtid)
237
            for vt_param_id, vt_param_value in vt_params.items():
238
                param_type = self._get_vt_param_type(vt, vt_param_id)
239
                param_name = self._get_vt_param_name(vt, vt_param_id)
240
241
                if not param_type or not param_name:
242
                    logger.debug(
243
                        'Missing type or name for VT parameter %s of %s. '
244
                        'This VT parameter will not be set.',
245
                        vt_param_id,
246
                        vtid,
247
                    )
248
                    continue
249
250
                if vt_param_id == '0':
251
                    type_aux = 'integer'
252
                else:
253
                    type_aux = param_type
254
255
                if self.check_param_type(vt_param_value, type_aux):
256
                    logger.debug(
257
                        'The VT parameter %s for %s could not be set. '
258
                        'Expected %s type for parameter value %s',
259
                        vt_param_id,
260
                        vtid,
261
                        type_aux,
262
                        str(vt_param_value),
263
                    )
264
                    continue
265
266
                if type_aux == 'checkbox':
267
                    vt_param_value = _from_bool_to_str(int(vt_param_value))
268
269
                vts_params[
270
                    "{0}:{1}:{2}:{3}".format(
271
                        vtid, vt_param_id, param_type, param_name
272
                    )
273
                ] = str(vt_param_value)
274
275
        return vts_list, vts_params
276
277
    def prepare_plugins_for_openvas(self) -> bool:
278
        """Get the plugin list and it preferences from the Scan Collection.
279
        The plugin list is immediately stored in the kb.
280
        """
281
        nvts = self.scan_collection.get_vts(self.scan_id)
282
        if nvts:
283
            nvts_list, self._nvts_params = self._process_vts(nvts)
284
            # Add nvts list
285
            separ = ';'
286
            plugin_list = 'plugin_set|||%s' % separ.join(nvts_list)
287
            self.kbdb.add_scan_preferences(self.scan_id, [plugin_list])
288
289
            nvts_list = None
290
            plugin_list = None
291
            nvts = None
292
293
            return True
294
295
        return False
296
297
    def prepare_nvt_preferences(self):
298
        """Prepare the vts preferences. Store the data in the kb."""
299
300
        items_list = []
301
        for key, val in self._nvts_params.items():
302
            items_list.append('%s|||%s' % (key, val))
303
304
        if items_list:
305
            self.kbdb.add_scan_preferences(self.scan_id, items_list)
306
307
    @staticmethod
308
    def build_alive_test_opt_as_prefs(
309
        target_options: Dict[str, str]
310
    ) -> Dict[str, str]:
311
        """Parse the target options dictionary.
312
        Arguments:
313
            target_options: Dictionary with the target options.
314
315
        Return:
316
            A dict with the target options related to alive test method
317
            in string format to be added to the redis KB.
318
        """
319
        target_opt_prefs_list = {}
320
        alive_test = None
321
322 View Code Duplication
        if target_options:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
323
            # Alive test specified as bit field.
324
            alive_test = target_options.get('alive_test')
325
            # Alive test specified as individual methods.
326
            alive_test_methods = target_options.get('alive_test_methods')
327
            # alive_test takes precedence over alive_test_methods
328
            if alive_test is None and alive_test_methods:
329
                alive_test = alive_test_methods_to_bit_field(
330
                    icmp=target_options.get('icmp') == '1',
331
                    tcp_syn=target_options.get('tcp_syn') == '1',
332
                    tcp_ack=target_options.get('tcp_ack') == '1',
333
                    arp=target_options.get('arp') == '1',
334
                    consider_alive=target_options.get('consider_alive') == '1',
335
                )
336
337
        if target_options and alive_test:
338
            try:
339
                alive_test = int(alive_test)
340
            except ValueError:
341
                logger.debug(
342
                    'Alive test settings not applied. '
343
                    'Invalid alive test value %s',
344
                    target_options.get('alive_test'),
345
                )
346
                return target_opt_prefs_list
347
348
            # No alive test or wrong value, uses the default
349
            # preferences sent by the client.
350
            if alive_test < 1 or alive_test > 31:
351
                return target_opt_prefs_list
352
353
            if (
354
                alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
355
                or alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
356
            ):
357
                value = "yes"
358
            else:
359
                value = "no"
360
            target_opt_prefs_list[
361
                OID_PING_HOST + ':1:checkbox:' + 'Do a TCP ping'
362
            ] = value
363
364
            if (
365
                alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
366
                and alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
367
            ):
368
                value = "yes"
369
            else:
370
                value = "no"
371
            target_opt_prefs_list[
372
                OID_PING_HOST
373
                + ':2:checkbox:'
374
                + 'TCP ping tries also TCP-SYN ping'
375
            ] = value
376
377
            if (alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE) and not (
378
                alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
379
            ):
380
                value = "yes"
381
            else:
382
                value = "no"
383
            target_opt_prefs_list[
384
                OID_PING_HOST
385
                + ':7:checkbox:'
386
                + 'TCP ping tries only TCP-SYN ping'
387
            ] = value
388
389
            if alive_test & AliveTest.ALIVE_TEST_ICMP:
390
                value = "yes"
391
            else:
392
                value = "no"
393
            target_opt_prefs_list[
394
                OID_PING_HOST + ':3:checkbox:' + 'Do an ICMP ping'
395
            ] = value
396
397
            if alive_test & AliveTest.ALIVE_TEST_ARP:
398
                value = "yes"
399
            else:
400
                value = "no"
401
            target_opt_prefs_list[
402
                OID_PING_HOST + ':4:checkbox:' + 'Use ARP'
403
            ] = value
404
405
            if alive_test & AliveTest.ALIVE_TEST_CONSIDER_ALIVE:
406
                value = "no"
407
            else:
408
                value = "yes"
409
            target_opt_prefs_list[
410
                OID_PING_HOST
411
                + ':5:checkbox:'
412
                + 'Mark unrechable Hosts as dead (not scanning)'
413
            ] = value
414
415
        return target_opt_prefs_list
416
417
    def prepare_alive_test_option_for_openvas(self):
418
        """Set alive test option. Overwrite the scan config settings."""
419
        settings = Openvas.get_settings()
420
        if settings and (
421
            self.target_options.get('alive_test')
422
            or self.target_options.get('alive_test_methods')
423
        ):
424
            alive_test_opt = self.build_alive_test_opt_as_prefs(
425
                self.target_options
426
            )
427
            self._nvts_params.update(alive_test_opt)
428
429
    def prepare_boreas_alive_test(self):
430
        """Set alive_test for Boreas if boreas scanner config
431
        (BOREAS_SETTING_NAME) was set"""
432
        settings = Openvas.get_settings()
433
        alive_test = None
434
        alive_test_ports = None
435
        target_options = self.target_options
436
437
        if settings:
438
            boreas = settings.get(BOREAS_SETTING_NAME)
439
            if not boreas:
440
                return
441
        else:
442
            return
443
444 View Code Duplication
        if target_options:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
445
            alive_test_ports = target_options.get('alive_test_ports')
446
            # Alive test was specified as bit field.
447
            alive_test = target_options.get('alive_test')
448
            # Alive test was specified as individual methods.
449
            alive_test_methods = target_options.get('alive_test_methods')
450
            # <alive_test> takes precedence over <alive_test_methods>
451
            if alive_test is None and alive_test_methods:
452
                alive_test = alive_test_methods_to_bit_field(
453
                    icmp=target_options.get('icmp') == '1',
454
                    tcp_syn=target_options.get('tcp_syn') == '1',
455
                    tcp_ack=target_options.get('tcp_ack') == '1',
456
                    arp=target_options.get('arp') == '1',
457
                    consider_alive=target_options.get('consider_alive') == '1',
458
                )
459
460
        if alive_test is not None:
461
            try:
462
                alive_test = int(alive_test)
463
            except ValueError:
464
                logger.debug(
465
                    'Alive test preference for Boreas not set. '
466
                    'Invalid alive test value %s.',
467
                    alive_test,
468
                )
469
                # Use default alive test as fall back
470
                alive_test = AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT
471
        # Use default alive test if no valid alive_test was provided
472
        else:
473
            alive_test = AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT
474
475
        # If a valid alive_test was set then the bit mask
476
        # has value between 31 (11111) and 1 (10000)
477
        if 1 <= alive_test <= 31:
478
            pref = "{pref_key}|||{pref_value}".format(
479
                pref_key=BOREAS_ALIVE_TEST, pref_value=alive_test
480
            )
481
            self.kbdb.add_scan_preferences(self.scan_id, [pref])
482
483
        if alive_test == AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT:
484
            alive_test = AliveTest.ALIVE_TEST_ICMP
485
            pref = "{pref_key}|||{pref_value}".format(
486
                pref_key=BOREAS_ALIVE_TEST, pref_value=alive_test
487
            )
488
            self.kbdb.add_scan_preferences(self.scan_id, [pref])
489
490
        # Add portlist if present. Validity is checked on Boreas side.
491
        if alive_test_ports is not None:
492
            pref = "{pref_key}|||{pref_value}".format(
493
                pref_key=BOREAS_ALIVE_TEST_PORTS,
494
                pref_value=alive_test_ports,
495
            )
496
            self.kbdb.add_scan_preferences(self.scan_id, [pref])
497
498
    def prepare_reverse_lookup_opt_for_openvas(self):
499
        """Set reverse lookup options in the kb"""
500
        if self.target_options:
501
            items = []
502
            _rev_lookup_only = int(
503
                self.target_options.get('reverse_lookup_only', '0')
504
            )
505
            rev_lookup_only = _from_bool_to_str(_rev_lookup_only)
506
            items.append('reverse_lookup_only|||%s' % (rev_lookup_only))
507
508
            _rev_lookup_unify = int(
509
                self.target_options.get('reverse_lookup_unify', '0')
510
            )
511
            rev_lookup_unify = _from_bool_to_str(_rev_lookup_unify)
512
            items.append('reverse_lookup_unify|||%s' % rev_lookup_unify)
513
514
            self.kbdb.add_scan_preferences(self.scan_id, items)
515
516
    def prepare_target_for_openvas(self):
517
        """Get the target from the scan collection and set the target
518
        in the kb"""
519
520
        target = self.scan_collection.get_host_list(self.scan_id)
521
        target_aux = 'TARGET|||%s' % target
522
        self.kbdb.add_scan_preferences(self.scan_id, [target_aux])
523
524
    def prepare_ports_for_openvas(self) -> str:
525
        """Get the port list from the scan collection and store the list
526
        in the kb."""
527
        ports = self.scan_collection.get_ports(self.scan_id)
528
        if not valid_port_list(ports):
529
            return False
530
531
        port_range = 'port_range|||%s' % ports
532
        self.kbdb.add_scan_preferences(self.scan_id, [port_range])
533
534
        return ports
535
536
    def prepare_host_options_for_openvas(self):
537
        """Get the excluded and finished hosts from the scan collection and
538
        stores the list of hosts that must not be scanned in the kb."""
539
        exclude_hosts = self.scan_collection.get_exclude_hosts(self.scan_id)
540
541
        if exclude_hosts:
542
            pref_val = "exclude_hosts|||" + exclude_hosts
543
            self.kbdb.add_scan_preferences(self.scan_id, [pref_val])
544
545
    def prepare_scan_params_for_openvas(self, ospd_params: Dict[str, Dict]):
546
        """Get the scan parameters from the scan collection and store them
547
        in the kb.
548
        Arguments:
549
            ospd_params: Dictionary with the OSPD Params.
550
        """
551
        # Options which were supplied via the <scanner_params> XML element.
552
        options = self.scan_collection.get_options(self.scan_id)
553
        prefs_val = []
554
555
        for key, value in options.items():
556
            item_type = ''
557
            if key in ospd_params:
558
                item_type = ospd_params[key].get('type')
559
            else:
560
                if key not in BASE_SCANNER_PARAMS:
561
                    logger.debug(
562
                        "%s is a scanner only setting and should not be set "
563
                        "by the client. Setting needs to be included in "
564
                        "OpenVAS configuration file instead.",
565
                        key,
566
                    )
567
            if item_type == 'boolean':
568
                val = _from_bool_to_str(value)
569
            else:
570
                val = str(value)
571
            prefs_val.append(key + "|||" + val)
572
573
        if prefs_val:
574
            self.kbdb.add_scan_preferences(self.scan_id, prefs_val)
575
576
    def build_credentials_as_prefs(self, credentials: Dict) -> List[str]:
577
        """Parse the credential dictionary.
578
        Arguments:
579
            credentials: Dictionary with the credentials.
580
581
        Return:
582
            A list with the credentials in string format to be
583
            added to the redis KB.
584
        """
585
        cred_prefs_list = []
586
        for credential in credentials.items():
587
            service = credential[0]
588
            cred_params = credentials.get(service)
589
            cred_type = cred_params.get('type', '')
590
            username = cred_params.get('username', '')
591
            password = cred_params.get('password', '')
592
593
            # Check service ssh
594
            if service == 'ssh':
595
                # For ssh check the Port
596
                port = cred_params.get('port', '')
597
                if not port:
598
                    self.errors.append("Port for SSH is missing.")
599
                    continue
600
                elif not port.isnumeric():
601
                    self.errors.append(
602
                        "Port for SSH '" + port + "' is not a valid number."
603
                    )
604
                    continue
605
                elif int(port) > 65535:
606
                    self.errors.append(
607
                        "Port for SSH is out of range (0-65535): " + port
608
                    )
609
                    continue
610
                # For ssh check the credential type
611
                if cred_type == 'up':
612
                    cred_prefs_list.append(
613
                        OID_SSH_AUTH
614
                        + ':3:'
615
                        + 'password:SSH password '
616
                        + '(unsafe!):|||{0}'.format(password)
617
                    )
618
                elif cred_type == 'usk':
619
                    private = cred_params.get('private', '')
620
                    cred_prefs_list.append(
621
                        OID_SSH_AUTH
622
                        + ':2:'
623
                        + 'password:SSH key passphrase:|||'
624
                        + '{0}'.format(password)
625
                    )
626
                    cred_prefs_list.append(
627
                        OID_SSH_AUTH
628
                        + ':4:'
629
                        + 'file:SSH private key:|||'
630
                        + '{0}'.format(private)
631
                    )
632
                elif cred_type:
633
                    self.errors.append(
634
                        "Unknown Credential Type for SSH: "
635
                        + cred_type
636
                        + ". Use 'up' for Username + Password"
637
                        + " or 'usk' for Username + SSH Key."
638
                    )
639
                    continue
640
                else:
641
                    self.errors.append(
642
                        "Missing Credential Type for SSH."
643
                        + " Use 'up' for Username + Password"
644
                        + " or 'usk' for Username + SSH Key."
645
                    )
646
                    continue
647
                cred_prefs_list.append('auth_port_ssh|||' + '{0}'.format(port))
648
                cred_prefs_list.append(
649
                    OID_SSH_AUTH
650
                    + ':1:'
651
                    + 'entry:SSH login '
652
                    + 'name:|||{0}'.format(username)
653
                )
654
            # Check servic smb
655
            elif service == 'smb':
656
                cred_prefs_list.append(
657
                    OID_SMB_AUTH
658
                    + ':1:entry'
659
                    + ':SMB login:|||{0}'.format(username)
660
                )
661
                cred_prefs_list.append(
662
                    OID_SMB_AUTH
663
                    + ':2:'
664
                    + 'password:SMB password:|||'
665
                    + '{0}'.format(password)
666
                )
667
            # Check service esxi
668
            elif service == 'esxi':
669
                cred_prefs_list.append(
670
                    OID_ESXI_AUTH
671
                    + ':1:entry:'
672
                    + 'ESXi login name:|||'
673
                    + '{0}'.format(username)
674
                )
675
                cred_prefs_list.append(
676
                    OID_ESXI_AUTH
677
                    + ':2:'
678
                    + 'password:ESXi login password:|||'
679
                    + '{0}'.format(password)
680
                )
681
            # Check service snmp
682
            elif service == 'snmp':
683
                community = cred_params.get('community', '')
684
                auth_algorithm = cred_params.get('auth_algorithm', '')
685
                privacy_password = cred_params.get('privacy_password', '')
686
                privacy_algorithm = cred_params.get('privacy_algorithm', '')
687
688
                if not privacy_algorithm:
689
                    if privacy_password:
690
                        self.errors.append(
691
                            "When no privacy algorithm is used, the privacy"
692
                            + " password also has to be empty."
693
                        )
694
                        continue
695
                elif (
696
                    not privacy_algorithm == "aes"
697
                    and not privacy_algorithm == "des"
698
                ):
699
                    self.errors.append(
700
                        "Unknows privacy algorithm used: "
701
                        + privacy_algorithm
702
                        + ". Use 'aes', 'des' or '' (none)."
703
                    )
704
                    continue
705
706
                if not auth_algorithm:
707
                    self.errors.append(
708
                        "Missing authentification algorithm for SNMP."
709
                        + " Use 'md5' or 'sha1'."
710
                    )
711
                    continue
712
                elif (
713
                    not auth_algorithm == "md5" and not auth_algorithm == "sha1"
714
                ):
715
                    self.errors.append(
716
                        "Unknown authentification algorithm: "
717
                        + auth_algorithm
718
                        + ". Use 'md5' or 'sha1'."
719
                    )
720
                    continue
721
722
                cred_prefs_list.append(
723
                    OID_SNMP_AUTH
724
                    + ':1:'
725
                    + 'password:SNMP Community:|||'
726
                    + '{0}'.format(community)
727
                )
728
                cred_prefs_list.append(
729
                    OID_SNMP_AUTH
730
                    + ':2:'
731
                    + 'entry:SNMPv3 Username:|||'
732
                    + '{0}'.format(username)
733
                )
734
                cred_prefs_list.append(
735
                    OID_SNMP_AUTH + ':3:'
736
                    'password:SNMPv3 Password:|||' + '{0}'.format(password)
737
                )
738
                cred_prefs_list.append(
739
                    OID_SNMP_AUTH
740
                    + ':4:'
741
                    + 'radio:SNMPv3 Authentication Algorithm:|||'
742
                    + '{0}'.format(auth_algorithm)
743
                )
744
                cred_prefs_list.append(
745
                    OID_SNMP_AUTH
746
                    + ':5:'
747
                    + 'password:SNMPv3 Privacy Password:|||'
748
                    + '{0}'.format(privacy_password)
749
                )
750
                cred_prefs_list.append(
751
                    OID_SNMP_AUTH
752
                    + ':6:'
753
                    + 'radio:SNMPv3 Privacy Algorithm:|||'
754
                    + '{0}'.format(privacy_algorithm)
755
                )
756
            elif service:
757
                self.errors.append(
758
                    "Unknown service type for credential: " + service
759
                )
760
            else:
761
                self.errors.append("Missing service type for credential.")
762
763
        return cred_prefs_list
764
765
    def prepare_credentials_for_openvas(self) -> bool:
766
        """Get the credentials from the scan collection and store them
767
        in the kb."""
768
        logger.debug("Looking for given Credentials...")
769
        credentials = self.scan_collection.get_credentials(self.scan_id)
770
        if credentials:
771
            cred_prefs = self.build_credentials_as_prefs(credentials)
772
            if cred_prefs:
773
                self.kbdb.add_credentials_to_scan_preferences(
774
                    self.scan_id, cred_prefs
775
                )
776
                logger.debug("Credentials added to the kb.")
777
        else:
778
            logger.debug("No credentials found.")
779
        if credentials and not cred_prefs:
0 ignored issues
show
introduced by
The variable cred_prefs does not seem to be defined for all execution paths.
Loading history...
780
            return False
781
782
        return True
783
784
    def prepare_main_kbindex_for_openvas(self):
785
        """Store main_kbindex as global preference in the
786
        kb, used by OpenVAS"""
787
        ov_maindbid = 'ov_maindbid|||%d' % self.kbdb.index
788
        self.kbdb.add_scan_preferences(self.scan_id, [ov_maindbid])
789