Passed
Pull Request — master (#435)
by Jaspar
01:34
created

_get_vt_param_name()   A

Complexity

Conditions 2

Size

Total Lines 7
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 5
nop 2
dl 0
loc 7
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
# pylint: disable=too-many-lines
21
22
""" Prepare the preferences to be used by OpenVAS. Get the data from the scan
23
collection and store the data in a redis KB in the right format to be used by
24
OpenVAS. """
25
26
import logging
27
import binascii
28
29
from enum import IntEnum
30
from typing import Optional, Dict, List, Tuple
31
from base64 import b64decode
32
33
from ospd.scan import ScanCollection
34
from ospd.ospd import BASE_SCANNER_PARAMS
35
from ospd.network import valid_port_list
36
from ospd_openvas.openvas import Openvas
37
from ospd_openvas.db import KbDB
38
from ospd_openvas.nvticache import NVTICache
39
from ospd_openvas.vthelper import VtHelper
40
41
logger = logging.getLogger(__name__)
42
43
44
OID_SSH_AUTH = "1.3.6.1.4.1.25623.1.0.103591"
45
OID_SMB_AUTH = "1.3.6.1.4.1.25623.1.0.90023"
46
OID_ESXI_AUTH = "1.3.6.1.4.1.25623.1.0.105058"
47
OID_SNMP_AUTH = "1.3.6.1.4.1.25623.1.0.105076"
48
OID_PING_HOST = "1.3.6.1.4.1.25623.1.0.100315"
49
50
BOREAS_ALIVE_TEST = "ALIVE_TEST"
51
BOREAS_ALIVE_TEST_PORTS = "ALIVE_TEST_PORTS"
52
BOREAS_SETTING_NAME = "test_alive_hosts_only"
53
54
55
class AliveTest(IntEnum):
56
    """Alive Tests."""
57
58
    ALIVE_TEST_SCAN_CONFIG_DEFAULT = 0
59
    ALIVE_TEST_TCP_ACK_SERVICE = 1
60
    ALIVE_TEST_ICMP = 2
61
    ALIVE_TEST_ARP = 4
62
    ALIVE_TEST_CONSIDER_ALIVE = 8
63
    ALIVE_TEST_TCP_SYN_SERVICE = 16
64
65
66
def alive_test_methods_to_bit_field(
67
    icmp: bool, tcp_syn: bool, tcp_ack: bool, arp: bool, consider_alive: bool
68
) -> int:
69
    """Internally a bit field is used as alive test. This function creates
70
    such a bit field out of the supplied alive test methods.
71
    """
72
73
    icmp_enum = AliveTest.ALIVE_TEST_ICMP if icmp else 0
74
    tcp_syn_enum = AliveTest.ALIVE_TEST_TCP_SYN_SERVICE if tcp_syn else 0
75
    tcp_ack_enum = AliveTest.ALIVE_TEST_TCP_ACK_SERVICE if tcp_ack else 0
76
    arp_enum = AliveTest.ALIVE_TEST_ARP if arp else 0
77
    consider_alive_enum = (
78
        AliveTest.ALIVE_TEST_CONSIDER_ALIVE if consider_alive else 0
79
    )
80
81
    bit_field = (
82
        icmp_enum | tcp_syn_enum | tcp_ack_enum | arp_enum | consider_alive_enum
83
    )
84
    return bit_field
85
86
87
def _from_bool_to_str(value: int) -> str:
88
    """The OpenVAS scanner use yes and no as boolean values, whereas ospd
89
    uses 1 and 0."""
90
    return 'yes' if value == 1 else 'no'
91
92
def _get_vt_param_type(vt: Dict, vt_param_id: str) -> Optional[str]:
93
    """Return the type of the vt parameter from the vts dictionary."""
94
95
    vt_params_list = vt.get("vt_params")
96
    if vt_params_list.get(vt_param_id):
97
        return vt_params_list[vt_param_id]["type"]
98
    return None
99
100
def _get_vt_param_name(vt: Dict, vt_param_id: str) -> Optional[str]:
101
    """Return the type of the vt parameter from the vts dictionary."""
102
103
    vt_params_list = vt.get("vt_params")
104
    if vt_params_list.get(vt_param_id):
105
        return vt_params_list[vt_param_id]["name"]
106
    return None
107
108
def set_alive_test_preference_list(alive_test) -> Dict:
109
    target_opt_prefs_list = {}
110
111
    if (
112
        alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
113
        or alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
114
    ):
115
        value = "yes"
116
    else:
117
        value = "no"
118
    target_opt_prefs_list[
119
        f'{OID_PING_HOST}:1:checkbox:Do a TCP ping'
120
    ] = value
121
122
    if (
123
        alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
124
        and alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
125
    ):
126
        value = "yes"
127
    else:
128
        value = "no"
129
    target_opt_prefs_list[
130
        f'{OID_PING_HOST}:2:checkbox:'
131
        'TCP ping tries also TCP-SYN ping'
132
    ] = value
133
134
    if (alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE) and not (
135
        alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
136
    ):
137
        value = "yes"
138
    else:
139
        value = "no"
140
    target_opt_prefs_list[
141
        f'{OID_PING_HOST}:7:checkbox:'
142
        'TCP ping tries only TCP-SYN ping'
143
    ] = value
144
145
    if alive_test & AliveTest.ALIVE_TEST_ICMP:
146
        value = "yes"
147
    else:
148
        value = "no"
149
    target_opt_prefs_list[
150
        f'{OID_PING_HOST}:3:checkbox:Do an ICMP ping'
151
    ] = value
152
153
    if alive_test & AliveTest.ALIVE_TEST_ARP:
154
        value = "yes"
155
    else:
156
        value = "no"
157
    target_opt_prefs_list[
158
        f'{OID_PING_HOST}:4:checkbox:Use ARP'
159
    ] = value
160
161
    if alive_test & AliveTest.ALIVE_TEST_CONSIDER_ALIVE:
162
        value = "no"
163
    else:
164
        value = "yes"
165
    target_opt_prefs_list[
166
        f'{OID_PING_HOST}:5:checkbox:'
167
        'Mark unrechable Hosts as dead (not scanning)'
168
    ] = value
169
170
    return target_opt_prefs_list
171
172
class PreferenceHandler:
173
    def __init__(
174
        self,
175
        scan_id: str,
176
        kbdb: KbDB,
177
        scan_collection: ScanCollection,
178
        nvticache: NVTICache,
179
    ):
180
        self.scan_id = scan_id
181
        self.kbdb = kbdb
182
        self.scan_collection = scan_collection
183
184
        self._target_options = None
185
        self._nvts_params = None
186
187
        self.nvti = nvticache
188
189
        self.errors = []
190
191
    def prepare_scan_id_for_openvas(self):
192
        """Create the openvas scan id and store it in the redis kb.
193
        Return the openvas scan_id.
194
        """
195
        self.kbdb.add_scan_id(self.scan_id)
196
197
    def get_error_messages(self) -> List:
198
        """Returns the Error List and reset it"""
199
        ret = self.errors
200
        self.errors = []
201
        return ret
202
203
    @property
204
    def target_options(self) -> Dict:
205
        """Return target options from Scan collection"""
206
        if self._target_options is not None:
207
            return self._target_options
208
209
        self._target_options = self.scan_collection.get_target_options(
210
            self.scan_id
211
        )
212
        return self._target_options
213
214
    def _get_vts_in_groups(
215
        self,
216
        filters: List[str],
217
    ) -> List[str]:
218
        """Return a list of vts which match with the given filter.
219
220
        Arguments:
221
            filters A list of filters. Each filter has key, operator and
222
                    a value. They are separated by a space.
223
                    Supported keys: family
224
225
        Returns a list of vt oids which match with the given filter.
226
        """
227
        vts_list = list()
228
        families = dict()
229
230
        oids = self.nvti.get_oids()
231
232
        for _, oid in oids:
233
            family = self.nvti.get_nvt_family(oid)
234
            if family not in families:
235
                families[family] = list()
236
237
            families[family].append(oid)
238
239
        for elem in filters:
240
            key, value = elem.split('=')
241
            if key == 'family' and value in families:
242
                vts_list.extend(families[value])
243
244
        return vts_list
245
246
    @staticmethod
247
    def check_param_type(vt_param_value: str, param_type: str) -> bool:
248
        """Check if the value of a vt parameter matches with
249
        the type founded.
250
        """
251
        if (
252
            (param_type
253
            in [
254
                'entry',
255
                'password',
256
                'radio',
257
                'sshlogin',
258
            ]
259
            and isinstance(vt_param_value, str)) or
260
            (
261
            param_type == 'checkbox' and
262
            vt_param_value in ['0', '1'])
263
        ):
264
            return True
265
        if param_type == 'file':
266
            try:
267
                b64decode(vt_param_value.encode())
268
            except (binascii.Error, AttributeError, TypeError):
269
                return False
270
            return True
271
        if param_type == 'integer':
272
            try:
273
                int(vt_param_value)
274
            except ValueError:
275
                return False
276
            return True
277
278
        return False
279
280
    def _process_vts(
281
        self,
282
        vts: Dict[str, Dict[str, str]],
283
    ) -> Tuple[List[str], Dict[str, str]]:
284
        """Add single VTs and their parameters."""
285
        vts_list = []
286
        vts_params = {}
287
        vtgroups = vts.pop('vt_groups')
288
289
        vthelper = VtHelper(self.nvti)
290
291
        if vtgroups:
292
            vts_list = self._get_vts_in_groups(vtgroups)
293
294
        for vtid, vt_params in vts.items():
295
            vt = vthelper.get_single_vt(vtid)
296
            if not vt:
297
                logger.warning(
298
                    'The VT %s was not found and it will not be added to the '
299
                    'plugin scheduler.',
300
                    vtid,
301
                )
302
                continue
303
304
            vts_list.append(vtid)
305
            for vt_param_id, vt_param_value in vt_params.items():
306
                param_type = _get_vt_param_type(vt, vt_param_id)
307
                param_name = _get_vt_param_name(vt, vt_param_id)
308
309
                if not param_type or not param_name:
310
                    logger.debug(
311
                        'Missing type or name for VT parameter %s of %s. '
312
                        'This VT parameter will not be set.',
313
                        vt_param_id,
314
                        vtid,
315
                    )
316
                    continue
317
318
                if vt_param_id == '0':
319
                    type_aux = 'integer'
320
                else:
321
                    type_aux = param_type
322
323
                if not self.check_param_type(vt_param_value, type_aux):
324
                    logger.debug(
325
                        'The VT parameter %s for %s could not be set. '
326
                        'Expected %s type for parameter value %s',
327
                        vt_param_id,
328
                        vtid,
329
                        type_aux,
330
                        str(vt_param_value),
331
                    )
332
                    continue
333
334
                if type_aux == 'checkbox':
335
                    vt_param_value = _from_bool_to_str(int(vt_param_value))
336
337
                vts_params[
338
                    "{0}:{1}:{2}:{3}".format(
339
                        vtid, vt_param_id, param_type, param_name
340
                    )
341
                ] = str(vt_param_value)
342
343
        return vts_list, vts_params
344
345
    def prepare_plugins_for_openvas(self) -> bool:
346
        """Get the plugin list and it preferences from the Scan Collection.
347
        The plugin list is immediately stored in the kb.
348
        """
349
        nvts = self.scan_collection.get_vts(self.scan_id)
350
        if nvts:
351
            nvts_list, self._nvts_params = self._process_vts(nvts)
352
            # Add nvts list
353
            separ = ';'
354
            plugin_list = 'plugin_set|||%s' % separ.join(nvts_list)
355
            self.kbdb.add_scan_preferences(self.scan_id, [plugin_list])
356
357
            nvts_list = None
358
            plugin_list = None
359
            nvts = None
360
361
            return True
362
363
        return False
364
365
    def prepare_nvt_preferences(self):
366
        """Prepare the vts preferences. Store the data in the kb."""
367
368
        items_list = []
369
        for key, val in self._nvts_params.items():
370
            items_list.append('%s|||%s' % (key, val))
371
372
        if items_list:
373
            self.kbdb.add_scan_preferences(self.scan_id, items_list)
374
375
    @staticmethod
376
    def build_alive_test_opt_as_prefs(
377
        target_options: Dict[str, str]
378
    ) -> Dict[str, str]:
379
        """Parse the target options dictionary.
380
        Arguments:
381
            target_options: Dictionary with the target options.
382
383
        Return:
384
            A dict with the target options related to alive test method
385
            in string format to be added to the redis KB.
386
        """
387
388
        if not target_options:
389
            return {}
390
391
        alive_test = None
392
        # Alive test specified as bit field.
393
        alive_test = target_options.get('alive_test')
394
        # Alive test specified as individual methods.
395
        alive_test_methods = target_options.get('alive_test_methods')
396
        # alive_test takes precedence over alive_test_methods
397
        if alive_test is None and alive_test_methods:
398
            alive_test = alive_test_methods_to_bit_field(
399
                icmp=target_options.get('icmp') == '1',
400
                tcp_syn=target_options.get('tcp_syn') == '1',
401
                tcp_ack=target_options.get('tcp_ack') == '1',
402
                arp=target_options.get('arp') == '1',
403
                consider_alive=target_options.get('consider_alive') == '1',
404
            )
405
406
        if not alive_test:
407
            return {}
408
409
        try:
410
            alive_test = int(alive_test)
411
        except ValueError:
412
            logger.debug(
413
                'Alive test settings not applied. '
414
                'Invalid alive test value %s',
415
                target_options.get('alive_test'),
416
            )
417
            return {}
418
419
        # No alive test or wrong value, uses the default
420
        # preferences sent by the client.
421
        if alive_test < 1 or alive_test > 31:
422
            return {}
423
424
        return set_alive_test_preference_list(alive_test=alive_test)
425
426
    def prepare_alive_test_option_for_openvas(self):
427
        """Set alive test option. Overwrite the scan config settings."""
428
        settings = Openvas.get_settings()
429
        if settings and (
430
            self.target_options.get('alive_test')
431
            or self.target_options.get('alive_test_methods')
432
        ):
433
            alive_test_opt = self.build_alive_test_opt_as_prefs(
434
                self.target_options
435
            )
436
            self._nvts_params.update(alive_test_opt)
437
438
    def prepare_boreas_alive_test(self):
439
        """Set alive_test for Boreas if boreas scanner config
440
        (BOREAS_SETTING_NAME) was set"""
441
        settings = Openvas.get_settings()
442
        alive_test = None
443
        alive_test_ports = None
444
        target_options = self.target_options
445
446
        if settings:
447
            boreas = settings.get(BOREAS_SETTING_NAME)
448
            if not boreas:
449
                return
450
        else:
451
            return
452
453
        if target_options:
454
            alive_test_ports = target_options.get('alive_test_ports')
455
            # Alive test was specified as bit field.
456
            alive_test = target_options.get('alive_test')
457
            # Alive test was specified as individual methods.
458
            alive_test_methods = target_options.get('alive_test_methods')
459
            # <alive_test> takes precedence over <alive_test_methods>
460
            if alive_test is None and alive_test_methods:
461
                alive_test = alive_test_methods_to_bit_field(
462
                    icmp=target_options.get('icmp') == '1',
463
                    tcp_syn=target_options.get('tcp_syn') == '1',
464
                    tcp_ack=target_options.get('tcp_ack') == '1',
465
                    arp=target_options.get('arp') == '1',
466
                    consider_alive=target_options.get('consider_alive') == '1',
467
                )
468
469
        if alive_test is not None:
470
            try:
471
                alive_test = int(alive_test)
472
            except ValueError:
473
                logger.debug(
474
                    'Alive test preference for Boreas not set. '
475
                    'Invalid alive test value %s.',
476
                    alive_test,
477
                )
478
                # Use default alive test as fall back
479
                alive_test = AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT
480
        # Use default alive test if no valid alive_test was provided
481
        else:
482
            alive_test = AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT
483
484
        # If a valid alive_test was set then the bit mask
485
        # has value between 31 (11111) and 1 (10000)
486
        if 1 <= alive_test <= 31:
487
            pref = "{pref_key}|||{pref_value}".format(
488
                pref_key=BOREAS_ALIVE_TEST, pref_value=alive_test
489
            )
490
            self.kbdb.add_scan_preferences(self.scan_id, [pref])
491
492
        if alive_test == AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT:
493
            alive_test = AliveTest.ALIVE_TEST_ICMP
494
            pref = "{pref_key}|||{pref_value}".format(
495
                pref_key=BOREAS_ALIVE_TEST, pref_value=alive_test
496
            )
497
            self.kbdb.add_scan_preferences(self.scan_id, [pref])
498
499
        # Add portlist if present. Validity is checked on Boreas side.
500
        if alive_test_ports is not None:
501
            pref = "{pref_key}|||{pref_value}".format(
502
                pref_key=BOREAS_ALIVE_TEST_PORTS,
503
                pref_value=alive_test_ports,
504
            )
505
            self.kbdb.add_scan_preferences(self.scan_id, [pref])
506
507
    def prepare_reverse_lookup_opt_for_openvas(self):
508
        """Set reverse lookup options in the kb"""
509
        if self.target_options:
510
            items = []
511
            _rev_lookup_only = int(
512
                self.target_options.get('reverse_lookup_only', '0')
513
            )
514
            rev_lookup_only = _from_bool_to_str(_rev_lookup_only)
515
            items.append('reverse_lookup_only|||%s' % (rev_lookup_only))
516
517
            _rev_lookup_unify = int(
518
                self.target_options.get('reverse_lookup_unify', '0')
519
            )
520
            rev_lookup_unify = _from_bool_to_str(_rev_lookup_unify)
521
            items.append('reverse_lookup_unify|||%s' % rev_lookup_unify)
522
523
            self.kbdb.add_scan_preferences(self.scan_id, items)
524
525
    def prepare_target_for_openvas(self):
526
        """Get the target from the scan collection and set the target
527
        in the kb"""
528
529
        target = self.scan_collection.get_host_list(self.scan_id)
530
        target_aux = 'TARGET|||%s' % target
531
        self.kbdb.add_scan_preferences(self.scan_id, [target_aux])
532
533
    def prepare_ports_for_openvas(self) -> str:
534
        """Get the port list from the scan collection and store the list
535
        in the kb."""
536
        ports = self.scan_collection.get_ports(self.scan_id)
537
        if not valid_port_list(ports):
538
            return False
539
540
        port_range = 'port_range|||%s' % ports
541
        self.kbdb.add_scan_preferences(self.scan_id, [port_range])
542
543
        return ports
544
545
    def prepare_host_options_for_openvas(self):
546
        """Get the excluded and finished hosts from the scan collection and
547
        stores the list of hosts that must not be scanned in the kb."""
548
        exclude_hosts = self.scan_collection.get_exclude_hosts(self.scan_id)
549
550
        if exclude_hosts:
551
            pref_val = "exclude_hosts|||" + exclude_hosts
552
            self.kbdb.add_scan_preferences(self.scan_id, [pref_val])
553
554
    def prepare_scan_params_for_openvas(self, ospd_params: Dict[str, Dict]):
555
        """Get the scan parameters from the scan collection and store them
556
        in the kb.
557
        Arguments:
558
            ospd_params: Dictionary with the OSPD Params.
559
        """
560
        # Options which were supplied via the <scanner_params> XML element.
561
        options = self.scan_collection.get_options(self.scan_id)
562
        prefs_val = []
563
564
        for key, value in options.items():
565
            item_type = ''
566
            if key in ospd_params:
567
                item_type = ospd_params[key].get('type')
568
            else:
569
                if key not in BASE_SCANNER_PARAMS:
570
                    logger.debug(
571
                        "%s is a scanner only setting and should not be set "
572
                        "by the client. Setting needs to be included in "
573
                        "OpenVAS configuration file instead.",
574
                        key,
575
                    )
576
            if item_type == 'boolean':
577
                val = _from_bool_to_str(value)
578
            else:
579
                val = str(value)
580
            prefs_val.append(key + "|||" + val)
581
582
        if prefs_val:
583
            self.kbdb.add_scan_preferences(self.scan_id, prefs_val)
584
585
    def check_ssh(self, cred_params, cred_prefs_list) -> None:
586
        port = cred_params.get('port', '22')
587
        cred_type = cred_params.get('type', '')
588
        username = cred_params.get('username', '')
589
        password = cred_params.get('password', '')
590
        priv_username = cred_params.get('priv_username', '')
591
        priv_password = cred_params.get('priv_password', '')
592
        if not port:
593
            port = '22'
594
            warning = (
595
                "Missing port number for ssh credentials."
596
                + " Using default port 22."
597
            )
598
            logger.warning(warning)
599
        elif not port.isnumeric():
600
            self.errors.append(
601
                f"Port for SSH '{port}' is not a valid number."
602
            )
603
            return
604
        elif int(port) > 65535 or int(port) < 1:
605
            self.errors.append(
606
                f"Port for SSH is out of range (1-65535): {port}"
607
            )
608
            return
609
        # For ssh check the credential type
610
        if cred_type == 'up':
611
            cred_prefs_list.append(
612
                OID_SSH_AUTH
613
                + ':3:'
614
                + 'password:SSH password '
615
                + '(unsafe!):|||{0}'.format(password)
616
            )
617
        elif cred_type == 'usk':
618
            private = cred_params.get('private', '')
619
            cred_prefs_list.append(
620
                OID_SSH_AUTH
621
                + ':2:'
622
                + 'password:SSH key passphrase:|||'
623
                + '{0}'.format(password)
624
            )
625
            cred_prefs_list.append(
626
                OID_SSH_AUTH
627
                + ':4:'
628
                + 'file:SSH private key:|||'
629
                + '{0}'.format(private)
630
            )
631
        elif cred_type:
632
            self.errors.append(
633
                "Unknown Credential Type for SSH: "
634
                + cred_type
635
                + ". Use 'up' for Username + Password"
636
                + " or 'usk' for Username + SSH Key."
637
            )
638
            return
639
        else:
640
            self.errors.append(
641
                "Missing Credential Type for SSH."
642
                + " Use 'up' for Username + Password"
643
                + " or 'usk' for Username + SSH Key."
644
            )
645
            return
646
        cred_prefs_list.append('auth_port_ssh|||' + '{0}'.format(port))
647
        cred_prefs_list.append(
648
            OID_SSH_AUTH
649
            + ':1:'
650
            + 'entry:SSH login '
651
            + 'name:|||{0}'.format(username)
652
        )
653
        cred_prefs_list.append(
654
            OID_SSH_AUTH
655
            + ':7:'
656
            + 'entry:SSH privilege login name:|||{0}'.format(
657
                priv_username
658
            )
659
        )
660
        cred_prefs_list.append(
661
            OID_SSH_AUTH
662
            + ':8:'
663
            + 'password:SSH privilege password:|||{0}'.format(
664
                priv_password
665
            )
666
        )
667
668
    def check_snmp(self, cred_params, cred_prefs_list) -> None:
669
        community = cred_params.get('community', '')
670
        auth_algorithm = cred_params.get('auth_algorithm', '')
671
        privacy_password = cred_params.get('privacy_password', '')
672
        privacy_algorithm = cred_params.get('privacy_algorithm', '')
673
        username = cred_params.get('username', '')
674
        password = cred_params.get('password', '')
675
676
        if not privacy_algorithm and privacy_password:
677
            self.errors.append(
678
                "When no privacy algorithm is used, the privacy"
679
                + " password also has to be empty."
680
            )
681
            return
682
        if (
683
            privacy_algorithm not in ("aes", "des", '')
684
        ):
685
            self.errors.append(
686
                "Unknown privacy algorithm used: "
687
                + privacy_algorithm
688
                + ". Use 'aes', 'des' or '' (none)."
689
            )
690
            return
691
692
        if not auth_algorithm:
693
            self.errors.append(
694
                "Missing authentication algorithm for SNMP."
695
                + " Use 'md5' or 'sha1'."
696
            )
697
            return
698
        if (
699
            auth_algorithm not in ["md5", "sha1"]
700
        ):
701
            self.errors.append(
702
                "Unknown authentication algorithm: "
703
                + auth_algorithm
704
                + ". Use 'md5' or 'sha1'."
705
            )
706
            return
707
708
        cred_prefs_list.append(
709
            OID_SNMP_AUTH
710
            + ':1:'
711
            + 'password:SNMP Community:|||'
712
            + '{0}'.format(community)
713
        )
714
        cred_prefs_list.append(
715
            OID_SNMP_AUTH
716
            + ':2:'
717
            + 'entry:SNMPv3 Username:|||'
718
            + '{0}'.format(username)
719
        )
720
        cred_prefs_list.append(
721
            OID_SNMP_AUTH + ':3:'
722
            'password:SNMPv3 Password:|||' + '{0}'.format(password)
723
        )
724
        cred_prefs_list.append(
725
            OID_SNMP_AUTH
726
            + ':4:'
727
            + 'radio:SNMPv3 Authentication Algorithm:|||'
728
            + '{0}'.format(auth_algorithm)
729
        )
730
        cred_prefs_list.append(
731
            OID_SNMP_AUTH
732
            + ':5:'
733
            + 'password:SNMPv3 Privacy Password:|||'
734
            + '{0}'.format(privacy_password)
735
        )
736
        cred_prefs_list.append(
737
            OID_SNMP_AUTH
738
            + ':6:'
739
            + 'radio:SNMPv3 Privacy Algorithm:|||'
740
            + '{0}'.format(privacy_algorithm)
741
        )
742
743
744
    def build_credentials_as_prefs(self, credentials: Dict) -> List[str]:
745
        """Parse the credential dictionary.
746
        Arguments:
747
            credentials: Dictionary with the credentials.
748
749
        Return:
750
            A list with the credentials in string format to be
751
            added to the redis KB.
752
        """
753
        cred_prefs_list = []
754
        for credential in credentials.items():
755
            service = credential[0]
756
            cred_params = credentials.get(service)
757
            username = cred_params.get('username', '')
758
            password = cred_params.get('password', '')
759
760
            # Check service ssh
761
            if service == 'ssh':
762
                # For ssh check the Port
763
                self.check_ssh(
764
                    cred_params=cred_params,
765
                    cred_prefs_list=cred_prefs_list)
766
            # Check servic smb
767
            elif service == 'smb':
768
                cred_prefs_list.append(
769
                    OID_SMB_AUTH
770
                    + ':1:entry'
771
                    + ':SMB login:|||{0}'.format(username)
772
                )
773
                cred_prefs_list.append(
774
                    OID_SMB_AUTH
775
                    + ':2:'
776
                    + 'password:SMB password:|||'
777
                    + '{0}'.format(password)
778
                )
779
            # Check service esxi
780
            elif service == 'esxi':
781
                cred_prefs_list.append(
782
                    OID_ESXI_AUTH
783
                    + ':1:entry:'
784
                    + 'ESXi login name:|||'
785
                    + '{0}'.format(username)
786
                )
787
                cred_prefs_list.append(
788
                    OID_ESXI_AUTH
789
                    + ':2:'
790
                    + 'password:ESXi login password:|||'
791
                    + '{0}'.format(password)
792
                )
793
            # Check service snmp
794
            elif service == 'snmp':
795
                self.check_snmp(
796
                    cred_params=cred_params,
797
                    cred_prefs_list=cred_prefs_list)
798
799
            elif service:
800
                self.errors.append(
801
                    "Unknown service type for credential: " + service
802
                )
803
            else:
804
                self.errors.append("Missing service type for credential.")
805
806
        return cred_prefs_list
807
808
    def prepare_credentials_for_openvas(self) -> bool:
809
        """Get the credentials from the scan collection and store them
810
        in the kb."""
811
        logger.debug("Looking for given Credentials...")
812
        credentials = self.scan_collection.get_credentials(self.scan_id)
813
        if credentials:
814
            cred_prefs = self.build_credentials_as_prefs(credentials)
815
            if cred_prefs:
816
                self.kbdb.add_credentials_to_scan_preferences(
817
                    self.scan_id, cred_prefs
818
                )
819
                logger.debug("Credentials added to the kb.")
820
        else:
821
            logger.debug("No credentials found.")
822
        if credentials and not cred_prefs:
0 ignored issues
show
introduced by
The variable cred_prefs does not seem to be defined for all execution paths.
Loading history...
823
            return False
824
825
        return True
826
827
    def prepare_main_kbindex_for_openvas(self):
828
        """Store main_kbindex as global preference in the
829
        kb, used by OpenVAS"""
830
        ov_maindbid = 'ov_maindbid|||%d' % self.kbdb.index
831
        self.kbdb.add_scan_preferences(self.scan_id, [ov_maindbid])
832