Passed
Pull Request — master (#436)
by Jaspar
02:22
created

PreferenceHandler.check_ssh()   C

Complexity

Conditions 8

Size

Total Lines 75
Code Lines 64

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 8
eloc 64
nop 3
dl 0
loc 75
rs 6.3115
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
# pylint: disable=too-many-lines
21
22
""" Prepare the preferences to be used by OpenVAS. Get the data from the scan
23
collection and store the data in a redis KB in the right format to be used by
24
OpenVAS. """
25
26
import logging
27
import binascii
28
29
from enum import IntEnum
30
from typing import Optional, Dict, List, Tuple
31
from base64 import b64decode
32
33
from ospd.scan import ScanCollection
34
from ospd.ospd import BASE_SCANNER_PARAMS
35
from ospd.network import valid_port_list
36
from ospd_openvas.openvas import Openvas
37
from ospd_openvas.db import KbDB
38
from ospd_openvas.nvticache import NVTICache
39
from ospd_openvas.vthelper import VtHelper
40
41
logger = logging.getLogger(__name__)
42
43
44
OID_SSH_AUTH = "1.3.6.1.4.1.25623.1.0.103591"
45
OID_SMB_AUTH = "1.3.6.1.4.1.25623.1.0.90023"
46
OID_ESXI_AUTH = "1.3.6.1.4.1.25623.1.0.105058"
47
OID_SNMP_AUTH = "1.3.6.1.4.1.25623.1.0.105076"
48
OID_PING_HOST = "1.3.6.1.4.1.25623.1.0.100315"
49
50
BOREAS_ALIVE_TEST = "ALIVE_TEST"
51
BOREAS_ALIVE_TEST_PORTS = "ALIVE_TEST_PORTS"
52
BOREAS_SETTING_NAME = "test_alive_hosts_only"
53
54
55
class AliveTest(IntEnum):
56
    """Alive Tests."""
57
58
    ALIVE_TEST_SCAN_CONFIG_DEFAULT = 0
59
    ALIVE_TEST_TCP_ACK_SERVICE = 1
60
    ALIVE_TEST_ICMP = 2
61
    ALIVE_TEST_ARP = 4
62
    ALIVE_TEST_CONSIDER_ALIVE = 8
63
    ALIVE_TEST_TCP_SYN_SERVICE = 16
64
65
66
def alive_test_methods_to_bit_field(
67
    icmp: bool, tcp_syn: bool, tcp_ack: bool, arp: bool, consider_alive: bool
68
) -> int:
69
    """Internally a bit field is used as alive test. This function creates
70
    such a bit field out of the supplied alive test methods.
71
    """
72
73
    icmp_enum = AliveTest.ALIVE_TEST_ICMP if icmp else 0
74
    tcp_syn_enum = AliveTest.ALIVE_TEST_TCP_SYN_SERVICE if tcp_syn else 0
75
    tcp_ack_enum = AliveTest.ALIVE_TEST_TCP_ACK_SERVICE if tcp_ack else 0
76
    arp_enum = AliveTest.ALIVE_TEST_ARP if arp else 0
77
    consider_alive_enum = (
78
        AliveTest.ALIVE_TEST_CONSIDER_ALIVE if consider_alive else 0
79
    )
80
81
    bit_field = (
82
        icmp_enum | tcp_syn_enum | tcp_ack_enum | arp_enum | consider_alive_enum
83
    )
84
    return bit_field
85
86
87
def _from_bool_to_str(value: int) -> str:
88
    """The OpenVAS scanner use yes and no as boolean values, whereas ospd
89
    uses 1 and 0."""
90
    return 'yes' if value == 1 else 'no'
91
92
93
def _get_vt_param_type(vt: Dict, vt_param_id: str) -> Optional[str]:
94
    """Return the type of the vt parameter from the vts dictionary."""
95
96
    vt_params_list = vt.get("vt_params")
97
    if vt_params_list.get(vt_param_id):
98
        return vt_params_list[vt_param_id]["type"]
99
    return None
100
101
102
def _get_vt_param_name(vt: Dict, vt_param_id: str) -> Optional[str]:
103
    """Return the type of the vt parameter from the vts dictionary."""
104
105
    vt_params_list = vt.get("vt_params")
106
    if vt_params_list.get(vt_param_id):
107
        return vt_params_list[vt_param_id]["name"]
108
    return None
109
110
111
def set_alive_test_preference_list(alive_test) -> Dict:
112
    target_opt_prefs_list = {}
113
114
    if (
115
        alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
116
        or alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
117
    ):
118
        value = "yes"
119
    else:
120
        value = "no"
121
    target_opt_prefs_list[f'{OID_PING_HOST}:1:checkbox:Do a TCP ping'] = value
122
123
    if (
124
        alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
125
        and alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
126
    ):
127
        value = "yes"
128
    else:
129
        value = "no"
130
    target_opt_prefs_list[
131
        f'{OID_PING_HOST}:2:checkbox:' 'TCP ping tries also TCP-SYN ping'
132
    ] = value
133
134
    if (alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE) and not (
135
        alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
136
    ):
137
        value = "yes"
138
    else:
139
        value = "no"
140
    target_opt_prefs_list[
141
        f'{OID_PING_HOST}:7:checkbox:' 'TCP ping tries only TCP-SYN ping'
142
    ] = value
143
144
    if alive_test & AliveTest.ALIVE_TEST_ICMP:
145
        value = "yes"
146
    else:
147
        value = "no"
148
    target_opt_prefs_list[f'{OID_PING_HOST}:3:checkbox:Do an ICMP ping'] = value
149
150
    if alive_test & AliveTest.ALIVE_TEST_ARP:
151
        value = "yes"
152
    else:
153
        value = "no"
154
    target_opt_prefs_list[f'{OID_PING_HOST}:4:checkbox:Use ARP'] = value
155
156
    if alive_test & AliveTest.ALIVE_TEST_CONSIDER_ALIVE:
157
        value = "no"
158
    else:
159
        value = "yes"
160
    target_opt_prefs_list[
161
        f'{OID_PING_HOST}:5:checkbox:'
162
        'Mark unrechable Hosts as dead (not scanning)'
163
    ] = value
164
165
    return target_opt_prefs_list
166
167
168
class PreferenceHandler:
169
    def __init__(
170
        self,
171
        scan_id: str,
172
        kbdb: KbDB,
173
        scan_collection: ScanCollection,
174
        nvticache: NVTICache,
175
    ):
176
        self.scan_id = scan_id
177
        self.kbdb = kbdb
178
        self.scan_collection = scan_collection
179
180
        self._target_options = None
181
        self._nvts_params = None
182
183
        self.nvti = nvticache
184
185
        self.errors = []
186
187
    def prepare_scan_id_for_openvas(self):
188
        """Create the openvas scan id and store it in the redis kb.
189
        Return the openvas scan_id.
190
        """
191
        self.kbdb.add_scan_id(self.scan_id)
192
193
    def get_error_messages(self) -> List:
194
        """Returns the Error List and reset it"""
195
        ret = self.errors
196
        self.errors = []
197
        return ret
198
199
    @property
200
    def target_options(self) -> Dict:
201
        """Return target options from Scan collection"""
202
        if self._target_options is not None:
203
            return self._target_options
204
205
        self._target_options = self.scan_collection.get_target_options(
206
            self.scan_id
207
        )
208
        return self._target_options
209
210
    def _get_vts_in_groups(
211
        self,
212
        filters: List[str],
213
    ) -> List[str]:
214
        """Return a list of vts which match with the given filter.
215
216
        Arguments:
217
            filters A list of filters. Each filter has key, operator and
218
                    a value. They are separated by a space.
219
                    Supported keys: family
220
221
        Returns a list of vt oids which match with the given filter.
222
        """
223
        vts_list = list()
224
        families = dict()
225
226
        oids = self.nvti.get_oids()
227
228
        for _, oid in oids:
229
            family = self.nvti.get_nvt_family(oid)
230
            if family not in families:
231
                families[family] = list()
232
233
            families[family].append(oid)
234
235
        for elem in filters:
236
            key, value = elem.split('=')
237
            if key == 'family' and value in families:
238
                vts_list.extend(families[value])
239
240
        return vts_list
241
242
    @staticmethod
243
    def check_param_type(vt_param_value: str, param_type: str) -> bool:
244
        """Check if the value of a vt parameter matches with
245
        the type founded.
246
        """
247
        if (
248
            param_type
249
            in [
250
                'entry',
251
                'password',
252
                'radio',
253
                'sshlogin',
254
            ]
255
            and isinstance(vt_param_value, str)
256
        ) or (param_type == 'checkbox' and vt_param_value in ['0', '1']):
257
            return True
258
        if param_type == 'file':
259
            try:
260
                b64decode(vt_param_value.encode())
261
            except (binascii.Error, AttributeError, TypeError):
262
                return False
263
            return True
264
        if param_type == 'integer':
265
            try:
266
                int(vt_param_value)
267
            except ValueError:
268
                return False
269
            return True
270
271
        return False
272
273
    def _process_vts(
274
        self,
275
        vts: Dict[str, Dict[str, str]],
276
    ) -> Tuple[List[str], Dict[str, str]]:
277
        """Add single VTs and their parameters."""
278
        vts_list = []
279
        vts_params = {}
280
        vtgroups = vts.pop('vt_groups')
281
282
        vthelper = VtHelper(self.nvti)
283
284
        if vtgroups:
285
            vts_list = self._get_vts_in_groups(vtgroups)
286
287
        for vtid, vt_params in vts.items():
288
            vt = vthelper.get_single_vt(vtid)
289
            if not vt:
290
                logger.warning(
291
                    'The VT %s was not found and it will not be added to the '
292
                    'plugin scheduler.',
293
                    vtid,
294
                )
295
                continue
296
297
            vts_list.append(vtid)
298
            for vt_param_id, vt_param_value in vt_params.items():
299
                param_type = _get_vt_param_type(vt, vt_param_id)
300
                param_name = _get_vt_param_name(vt, vt_param_id)
301
302
                if not param_type or not param_name:
303
                    logger.debug(
304
                        'Missing type or name for VT parameter %s of %s. '
305
                        'This VT parameter will not be set.',
306
                        vt_param_id,
307
                        vtid,
308
                    )
309
                    continue
310
311
                if vt_param_id == '0':
312
                    type_aux = 'integer'
313
                else:
314
                    type_aux = param_type
315
316
                if not self.check_param_type(vt_param_value, type_aux):
317
                    logger.debug(
318
                        'The VT parameter %s for %s could not be set. '
319
                        'Expected %s type for parameter value %s',
320
                        vt_param_id,
321
                        vtid,
322
                        type_aux,
323
                        str(vt_param_value),
324
                    )
325
                    continue
326
327
                if type_aux == 'checkbox':
328
                    vt_param_value = _from_bool_to_str(int(vt_param_value))
329
330
                vts_params[
331
                    "{0}:{1}:{2}:{3}".format(
332
                        vtid, vt_param_id, param_type, param_name
333
                    )
334
                ] = str(vt_param_value)
335
336
        return vts_list, vts_params
337
338
    def prepare_plugins_for_openvas(self) -> bool:
339
        """Get the plugin list and it preferences from the Scan Collection.
340
        The plugin list is immediately stored in the kb.
341
        """
342
        nvts = self.scan_collection.get_vts(self.scan_id)
343
        if nvts:
344
            nvts_list, self._nvts_params = self._process_vts(nvts)
345
            # Add nvts list
346
            separ = ';'
347
            plugin_list = 'plugin_set|||%s' % separ.join(nvts_list)
348
            self.kbdb.add_scan_preferences(self.scan_id, [plugin_list])
349
350
            nvts_list = None
351
            plugin_list = None
352
            nvts = None
353
354
            return True
355
356
        return False
357
358
    def prepare_nvt_preferences(self):
359
        """Prepare the vts preferences. Store the data in the kb."""
360
361
        items_list = []
362
        for key, val in self._nvts_params.items():
363
            items_list.append('%s|||%s' % (key, val))
364
365
        if items_list:
366
            self.kbdb.add_scan_preferences(self.scan_id, items_list)
367
368
    @staticmethod
369
    def build_alive_test_opt_as_prefs(
370
        target_options: Dict[str, str]
371
    ) -> Dict[str, str]:
372
        """Parse the target options dictionary.
373
        Arguments:
374
            target_options: Dictionary with the target options.
375
376
        Return:
377
            A dict with the target options related to alive test method
378
            in string format to be added to the redis KB.
379
        """
380
381
        if not target_options:
382
            return {}
383
384
        alive_test = None
385
        # Alive test specified as bit field.
386
        alive_test = target_options.get('alive_test')
387
        # Alive test specified as individual methods.
388
        alive_test_methods = target_options.get('alive_test_methods')
389
        # alive_test takes precedence over alive_test_methods
390
        if alive_test is None and alive_test_methods:
391
            alive_test = alive_test_methods_to_bit_field(
392
                icmp=target_options.get('icmp') == '1',
393
                tcp_syn=target_options.get('tcp_syn') == '1',
394
                tcp_ack=target_options.get('tcp_ack') == '1',
395
                arp=target_options.get('arp') == '1',
396
                consider_alive=target_options.get('consider_alive') == '1',
397
            )
398
399
        if not alive_test:
400
            return {}
401
402
        try:
403
            alive_test = int(alive_test)
404
        except ValueError:
405
            logger.debug(
406
                'Alive test settings not applied. '
407
                'Invalid alive test value %s',
408
                target_options.get('alive_test'),
409
            )
410
            return {}
411
412
        # No alive test or wrong value, uses the default
413
        # preferences sent by the client.
414
        if alive_test < 1 or alive_test > 31:
415
            return {}
416
417
        return set_alive_test_preference_list(alive_test=alive_test)
418
419
    def prepare_alive_test_option_for_openvas(self):
420
        """Set alive test option. Overwrite the scan config settings."""
421
        settings = Openvas.get_settings()
422
        if settings and (
423
            self.target_options.get('alive_test')
424
            or self.target_options.get('alive_test_methods')
425
        ):
426
            alive_test_opt = self.build_alive_test_opt_as_prefs(
427
                self.target_options
428
            )
429
            self._nvts_params.update(alive_test_opt)
430
431
    def prepare_boreas_alive_test(self):
432
        """Set alive_test for Boreas if boreas scanner config
433
        (BOREAS_SETTING_NAME) was set"""
434
        settings = Openvas.get_settings()
435
        alive_test = None
436
        alive_test_ports = None
437
        target_options = self.target_options
438
439
        if settings:
440
            boreas = settings.get(BOREAS_SETTING_NAME)
441
            if not boreas:
442
                return
443
        else:
444
            return
445
446
        if target_options:
447
            alive_test_ports = target_options.get('alive_test_ports')
448
            # Alive test was specified as bit field.
449
            alive_test = target_options.get('alive_test')
450
            # Alive test was specified as individual methods.
451
            alive_test_methods = target_options.get('alive_test_methods')
452
            # <alive_test> takes precedence over <alive_test_methods>
453
            if alive_test is None and alive_test_methods:
454
                alive_test = alive_test_methods_to_bit_field(
455
                    icmp=target_options.get('icmp') == '1',
456
                    tcp_syn=target_options.get('tcp_syn') == '1',
457
                    tcp_ack=target_options.get('tcp_ack') == '1',
458
                    arp=target_options.get('arp') == '1',
459
                    consider_alive=target_options.get('consider_alive') == '1',
460
                )
461
462
        if alive_test is not None:
463
            try:
464
                alive_test = int(alive_test)
465
            except ValueError:
466
                logger.debug(
467
                    'Alive test preference for Boreas not set. '
468
                    'Invalid alive test value %s.',
469
                    alive_test,
470
                )
471
                # Use default alive test as fall back
472
                alive_test = AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT
473
        # Use default alive test if no valid alive_test was provided
474
        else:
475
            alive_test = AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT
476
477
        # If a valid alive_test was set then the bit mask
478
        # has value between 31 (11111) and 1 (10000)
479
        if 1 <= alive_test <= 31:
480
            pref = "{pref_key}|||{pref_value}".format(
481
                pref_key=BOREAS_ALIVE_TEST, pref_value=alive_test
482
            )
483
            self.kbdb.add_scan_preferences(self.scan_id, [pref])
484
485
        if alive_test == AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT:
486
            alive_test = AliveTest.ALIVE_TEST_ICMP
487
            pref = "{pref_key}|||{pref_value}".format(
488
                pref_key=BOREAS_ALIVE_TEST, pref_value=alive_test
489
            )
490
            self.kbdb.add_scan_preferences(self.scan_id, [pref])
491
492
        # Add portlist if present. Validity is checked on Boreas side.
493
        if alive_test_ports is not None:
494
            pref = "{pref_key}|||{pref_value}".format(
495
                pref_key=BOREAS_ALIVE_TEST_PORTS,
496
                pref_value=alive_test_ports,
497
            )
498
            self.kbdb.add_scan_preferences(self.scan_id, [pref])
499
500
    def prepare_reverse_lookup_opt_for_openvas(self):
501
        """Set reverse lookup options in the kb"""
502
        if self.target_options:
503
            items = []
504
            _rev_lookup_only = int(
505
                self.target_options.get('reverse_lookup_only', '0')
506
            )
507
            rev_lookup_only = _from_bool_to_str(_rev_lookup_only)
508
            items.append('reverse_lookup_only|||%s' % (rev_lookup_only))
509
510
            _rev_lookup_unify = int(
511
                self.target_options.get('reverse_lookup_unify', '0')
512
            )
513
            rev_lookup_unify = _from_bool_to_str(_rev_lookup_unify)
514
            items.append('reverse_lookup_unify|||%s' % rev_lookup_unify)
515
516
            self.kbdb.add_scan_preferences(self.scan_id, items)
517
518
    def prepare_target_for_openvas(self):
519
        """Get the target from the scan collection and set the target
520
        in the kb"""
521
522
        target = self.scan_collection.get_host_list(self.scan_id)
523
        target_aux = 'TARGET|||%s' % target
524
        self.kbdb.add_scan_preferences(self.scan_id, [target_aux])
525
526
    def prepare_ports_for_openvas(self) -> str:
527
        """Get the port list from the scan collection and store the list
528
        in the kb."""
529
        ports = self.scan_collection.get_ports(self.scan_id)
530
        if not valid_port_list(ports):
531
            return False
532
533
        port_range = 'port_range|||%s' % ports
534
        self.kbdb.add_scan_preferences(self.scan_id, [port_range])
535
536
        return ports
537
538
    def prepare_host_options_for_openvas(self):
539
        """Get the excluded and finished hosts from the scan collection and
540
        stores the list of hosts that must not be scanned in the kb."""
541
        exclude_hosts = self.scan_collection.get_exclude_hosts(self.scan_id)
542
543
        if exclude_hosts:
544
            pref_val = "exclude_hosts|||" + exclude_hosts
545
            self.kbdb.add_scan_preferences(self.scan_id, [pref_val])
546
547
    def prepare_scan_params_for_openvas(self, ospd_params: Dict[str, Dict]):
548
        """Get the scan parameters from the scan collection and store them
549
        in the kb.
550
        Arguments:
551
            ospd_params: Dictionary with the OSPD Params.
552
        """
553
        # Options which were supplied via the <scanner_params> XML element.
554
        options = self.scan_collection.get_options(self.scan_id)
555
        prefs_val = []
556
557
        for key, value in options.items():
558
            item_type = ''
559
            if key in ospd_params:
560
                item_type = ospd_params[key].get('type')
561
            else:
562
                if key not in BASE_SCANNER_PARAMS:
563
                    logger.debug(
564
                        "%s is a scanner only setting and should not be set "
565
                        "by the client. Setting needs to be included in "
566
                        "OpenVAS configuration file instead.",
567
                        key,
568
                    )
569
            if item_type == 'boolean':
570
                val = _from_bool_to_str(value)
571
            else:
572
                val = str(value)
573
            prefs_val.append(key + "|||" + val)
574
575
        if prefs_val:
576
            self.kbdb.add_scan_preferences(self.scan_id, prefs_val)
577
578
    def check_ssh(self, cred_params, cred_prefs_list) -> None:
579
        port = cred_params.get('port', '22')
580
        cred_type = cred_params.get('type', '')
581
        username = cred_params.get('username', '')
582
        password = cred_params.get('password', '')
583
        priv_username = cred_params.get('priv_username', '')
584
        priv_password = cred_params.get('priv_password', '')
585
        if not port:
586
            port = '22'
587
            warning = (
588
                "Missing port number for ssh credentials."
589
                + " Using default port 22."
590
            )
591
            logger.warning(warning)
592
        elif not port.isnumeric():
593
            self.errors.append(f"Port for SSH '{port}' is not a valid number.")
594
            return
595
        elif int(port) > 65535 or int(port) < 1:
596
            self.errors.append(
597
                f"Port for SSH is out of range (1-65535): {port}"
598
            )
599
            return
600
        # For ssh check the credential type
601
        if cred_type == 'up':
602
            cred_prefs_list.append(
603
                OID_SSH_AUTH
604
                + ':3:'
605
                + 'password:SSH password '
606
                + '(unsafe!):|||{0}'.format(password)
607
            )
608
        elif cred_type == 'usk':
609
            private = cred_params.get('private', '')
610
            cred_prefs_list.append(
611
                OID_SSH_AUTH
612
                + ':2:'
613
                + 'password:SSH key passphrase:|||'
614
                + '{0}'.format(password)
615
            )
616
            cred_prefs_list.append(
617
                OID_SSH_AUTH
618
                + ':4:'
619
                + 'file:SSH private key:|||'
620
                + '{0}'.format(private)
621
            )
622
        elif cred_type:
623
            self.errors.append(
624
                "Unknown Credential Type for SSH: "
625
                + cred_type
626
                + ". Use 'up' for Username + Password"
627
                + " or 'usk' for Username + SSH Key."
628
            )
629
            return
630
        else:
631
            self.errors.append(
632
                "Missing Credential Type for SSH."
633
                + " Use 'up' for Username + Password"
634
                + " or 'usk' for Username + SSH Key."
635
            )
636
            return
637
        cred_prefs_list.append('auth_port_ssh|||' + '{0}'.format(port))
638
        cred_prefs_list.append(
639
            OID_SSH_AUTH
640
            + ':1:'
641
            + 'entry:SSH login '
642
            + 'name:|||{0}'.format(username)
643
        )
644
        cred_prefs_list.append(
645
            OID_SSH_AUTH
646
            + ':7:'
647
            + 'entry:SSH privilege login name:|||{0}'.format(priv_username)
648
        )
649
        cred_prefs_list.append(
650
            OID_SSH_AUTH
651
            + ':8:'
652
            + 'password:SSH privilege password:|||{0}'.format(priv_password)
653
        )
654
655
    def check_snmp(self, cred_params, cred_prefs_list) -> None:
656
        community = cred_params.get('community', '')
657
        auth_algorithm = cred_params.get('auth_algorithm', '')
658
        privacy_password = cred_params.get('privacy_password', '')
659
        privacy_algorithm = cred_params.get('privacy_algorithm', '')
660
        username = cred_params.get('username', '')
661
        password = cred_params.get('password', '')
662
663
        if not privacy_algorithm and privacy_password:
664
            self.errors.append(
665
                "When no privacy algorithm is used, the privacy"
666
                + " password also has to be empty."
667
            )
668
            return
669
        if privacy_algorithm not in ("aes", "des", ''):
670
            self.errors.append(
671
                "Unknown privacy algorithm used: "
672
                + privacy_algorithm
673
                + ". Use 'aes', 'des' or '' (none)."
674
            )
675
            return
676
677
        if not auth_algorithm:
678
            self.errors.append(
679
                "Missing authentication algorithm for SNMP."
680
                + " Use 'md5' or 'sha1'."
681
            )
682
            return
683
        if auth_algorithm not in ["md5", "sha1"]:
684
            self.errors.append(
685
                "Unknown authentication algorithm: "
686
                + auth_algorithm
687
                + ". Use 'md5' or 'sha1'."
688
            )
689
            return
690
691
        cred_prefs_list.append(
692
            OID_SNMP_AUTH
693
            + ':1:'
694
            + 'password:SNMP Community:|||'
695
            + '{0}'.format(community)
696
        )
697
        cred_prefs_list.append(
698
            OID_SNMP_AUTH
699
            + ':2:'
700
            + 'entry:SNMPv3 Username:|||'
701
            + '{0}'.format(username)
702
        )
703
        cred_prefs_list.append(
704
            OID_SNMP_AUTH + ':3:'
705
            'password:SNMPv3 Password:|||' + '{0}'.format(password)
706
        )
707
        cred_prefs_list.append(
708
            OID_SNMP_AUTH
709
            + ':4:'
710
            + 'radio:SNMPv3 Authentication Algorithm:|||'
711
            + '{0}'.format(auth_algorithm)
712
        )
713
        cred_prefs_list.append(
714
            OID_SNMP_AUTH
715
            + ':5:'
716
            + 'password:SNMPv3 Privacy Password:|||'
717
            + '{0}'.format(privacy_password)
718
        )
719
        cred_prefs_list.append(
720
            OID_SNMP_AUTH
721
            + ':6:'
722
            + 'radio:SNMPv3 Privacy Algorithm:|||'
723
            + '{0}'.format(privacy_algorithm)
724
        )
725
726
    def build_credentials_as_prefs(self, credentials: Dict) -> List[str]:
727
        """Parse the credential dictionary.
728
        Arguments:
729
            credentials: Dictionary with the credentials.
730
731
        Return:
732
            A list with the credentials in string format to be
733
            added to the redis KB.
734
        """
735
        cred_prefs_list = []
736
        for credential in credentials.items():
737
            service = credential[0]
738
            cred_params = credentials.get(service)
739
            username = cred_params.get('username', '')
740
            password = cred_params.get('password', '')
741
742
            # Check service ssh
743
            if service == 'ssh':
744
                # For ssh check the Port
745
                self.check_ssh(
746
                    cred_params=cred_params, cred_prefs_list=cred_prefs_list
747
                )
748
            # Check servic smb
749
            elif service == 'smb':
750
                cred_prefs_list.append(
751
                    OID_SMB_AUTH
752
                    + ':1:entry'
753
                    + ':SMB login:|||{0}'.format(username)
754
                )
755
                cred_prefs_list.append(
756
                    OID_SMB_AUTH
757
                    + ':2:'
758
                    + 'password:SMB password:|||'
759
                    + '{0}'.format(password)
760
                )
761
            # Check service esxi
762
            elif service == 'esxi':
763
                cred_prefs_list.append(
764
                    OID_ESXI_AUTH
765
                    + ':1:entry:'
766
                    + 'ESXi login name:|||'
767
                    + '{0}'.format(username)
768
                )
769
                cred_prefs_list.append(
770
                    OID_ESXI_AUTH
771
                    + ':2:'
772
                    + 'password:ESXi login password:|||'
773
                    + '{0}'.format(password)
774
                )
775
            # Check service snmp
776
            elif service == 'snmp':
777
                self.check_snmp(
778
                    cred_params=cred_params, cred_prefs_list=cred_prefs_list
779
                )
780
781
            elif service:
782
                self.errors.append(
783
                    "Unknown service type for credential: " + service
784
                )
785
            else:
786
                self.errors.append("Missing service type for credential.")
787
788
        return cred_prefs_list
789
790
    def prepare_credentials_for_openvas(self) -> bool:
791
        """Get the credentials from the scan collection and store them
792
        in the kb."""
793
        logger.debug("Looking for given Credentials...")
794
        credentials = self.scan_collection.get_credentials(self.scan_id)
795
        if credentials:
796
            cred_prefs = self.build_credentials_as_prefs(credentials)
797
            if cred_prefs:
798
                self.kbdb.add_credentials_to_scan_preferences(
799
                    self.scan_id, cred_prefs
800
                )
801
                logger.debug("Credentials added to the kb.")
802
        else:
803
            logger.debug("No credentials found.")
804
        if credentials and not cred_prefs:
0 ignored issues
show
introduced by
The variable cred_prefs does not seem to be defined for all execution paths.
Loading history...
805
            return False
806
807
        return True
808
809
    def prepare_main_kbindex_for_openvas(self):
810
        """Store main_kbindex as global preference in the
811
        kb, used by OpenVAS"""
812
        ov_maindbid = 'ov_maindbid|||%d' % self.kbdb.index
813
        self.kbdb.add_scan_preferences(self.scan_id, [ov_maindbid])
814