PreferenceHandler.build_alive_test_opt_as_prefs()   F
last analyzed

Complexity

Conditions 18

Size

Total Lines 109
Code Lines 73

Duplication

Lines 13
Ratio 11.93 %

Importance

Changes 0
Metric Value
cc 18
eloc 73
nop 1
dl 13
loc 109
rs 1.2
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like ospd_openvas.preferencehandler.PreferenceHandler.build_alive_test_opt_as_prefs() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
# pylint: disable=too-many-lines
21
22
""" Prepare the preferences to be used by OpenVAS. Get the data from the scan
23
collection and store the data in a redis KB in the right format to be used by
24
OpenVAS. """
25
26
import logging
27
import binascii
28
29
from enum import IntEnum
30
from typing import Optional, Dict, List, Tuple
31
from base64 import b64decode
32
33
from ospd.scan import ScanCollection
34
from ospd.ospd import BASE_SCANNER_PARAMS
35
from ospd.network import valid_port_list
36
from ospd_openvas.openvas import Openvas
37
from ospd_openvas.db import KbDB
38
from ospd_openvas.nvticache import NVTICache
39
from ospd_openvas.vthelper import VtHelper
40
41
logger = logging.getLogger(__name__)
42
43
44
OID_SSH_AUTH = "1.3.6.1.4.1.25623.1.0.103591"
45
OID_SMB_AUTH = "1.3.6.1.4.1.25623.1.0.90023"
46
OID_ESXI_AUTH = "1.3.6.1.4.1.25623.1.0.105058"
47
OID_SNMP_AUTH = "1.3.6.1.4.1.25623.1.0.105076"
48
OID_PING_HOST = "1.3.6.1.4.1.25623.1.0.100315"
49
50
BOREAS_ALIVE_TEST = "ALIVE_TEST"
51
BOREAS_ALIVE_TEST_PORTS = "ALIVE_TEST_PORTS"
52
BOREAS_SETTING_NAME = "test_alive_hosts_only"
53
54
55
class AliveTest(IntEnum):
56
    """Alive Tests."""
57
58
    ALIVE_TEST_SCAN_CONFIG_DEFAULT = 0
59
    ALIVE_TEST_TCP_ACK_SERVICE = 1
60
    ALIVE_TEST_ICMP = 2
61
    ALIVE_TEST_ARP = 4
62
    ALIVE_TEST_CONSIDER_ALIVE = 8
63
    ALIVE_TEST_TCP_SYN_SERVICE = 16
64
65
66
def alive_test_methods_to_bit_field(
67
    icmp: bool, tcp_syn: bool, tcp_ack: bool, arp: bool, consider_alive: bool
68
) -> int:
69
    """Internally a bit field is used as alive test. This function creates
70
    such a bit field out of the supplied alive test methods.
71
    """
72
73
    icmp_enum = AliveTest.ALIVE_TEST_ICMP if icmp else 0
74
    tcp_syn_enum = AliveTest.ALIVE_TEST_TCP_SYN_SERVICE if tcp_syn else 0
75
    tcp_ack_enum = AliveTest.ALIVE_TEST_TCP_ACK_SERVICE if tcp_ack else 0
76
    arp_enum = AliveTest.ALIVE_TEST_ARP if arp else 0
77
    consider_alive_enum = (
78
        AliveTest.ALIVE_TEST_CONSIDER_ALIVE if consider_alive else 0
79
    )
80
81
    bit_field = (
82
        icmp_enum | tcp_syn_enum | tcp_ack_enum | arp_enum | consider_alive_enum
83
    )
84
    return bit_field
85
86
87
def _from_bool_to_str(value: int) -> str:
88
    """The OpenVAS scanner use yes and no as boolean values, whereas ospd
89
    uses 1 and 0."""
90
    return 'yes' if value == 1 else 'no'
91
92
93
class PreferenceHandler:
94
    def __init__(
95
        self,
96
        scan_id: str,
97
        kbdb: KbDB,
98
        scan_collection: ScanCollection,
99
        nvticache: NVTICache,
100
    ):
101
        self.scan_id = scan_id
102
        self.kbdb = kbdb
103
        self.scan_collection = scan_collection
104
105
        self._target_options = None
106
        self._nvts_params = None
107
108
        self.nvti = nvticache
109
110
        self.errors = []
111
112
    def prepare_scan_id_for_openvas(self):
113
        """Create the openvas scan id and store it in the redis kb.
114
        Return the openvas scan_id.
115
        """
116
        self.kbdb.add_scan_id(self.scan_id)
117
118
    def get_error_messages(self) -> List:
119
        """Returns the Error List and reset it"""
120
        ret = self.errors
121
        self.errors = []
122
        return ret
123
124
    @property
125
    def target_options(self) -> Dict:
126
        """Return target options from Scan collection"""
127
        if self._target_options is not None:
128
            return self._target_options
129
130
        self._target_options = self.scan_collection.get_target_options(
131
            self.scan_id
132
        )
133
        return self._target_options
134
135
    def _get_vts_in_groups(
136
        self,
137
        filters: List[str],
138
    ) -> List[str]:
139
        """Return a list of vts which match with the given filter.
140
141
        Arguments:
142
            filters A list of filters. Each filter has key, operator and
143
                    a value. They are separated by a space.
144
                    Supported keys: family
145
146
        Returns a list of vt oids which match with the given filter.
147
        """
148
        vts_list = list()
149
        families = dict()
150
151
        oids = self.nvti.get_oids()
152
153
        for _, oid in oids:
154
            family = self.nvti.get_nvt_family(oid)
155
            if family not in families:
156
                families[family] = list()
157
158
            families[family].append(oid)
159
160
        for elem in filters:
161
            key, value = elem.split('=')
162
            if key == 'family' and value in families:
163
                vts_list.extend(families[value])
164
165
        return vts_list
166
167
    def _get_vt_param_type(self, vt: Dict, vt_param_id: str) -> Optional[str]:
168
        """Return the type of the vt parameter from the vts dictionary."""
169
170
        vt_params_list = vt.get("vt_params")
171
        if vt_params_list.get(vt_param_id):
172
            return vt_params_list[vt_param_id]["type"]
173
        return None
174
175
    def _get_vt_param_name(self, vt: Dict, vt_param_id: str) -> Optional[str]:
176
        """Return the type of the vt parameter from the vts dictionary."""
177
178
        vt_params_list = vt.get("vt_params")
179
        if vt_params_list.get(vt_param_id):
180
            return vt_params_list[vt_param_id]["name"]
181
        return None
182
183
    @staticmethod
184
    def check_param_type(vt_param_value: str, param_type: str) -> Optional[int]:
185
        """Check if the value of a vt parameter matches with
186
        the type founded.
187
        """
188
        if (
189
            param_type
190
            in [
191
                'entry',
192
                'password',
193
                'radio',
194
                'sshlogin',
195
            ]
196
            and isinstance(vt_param_value, str)
197
        ):
198
            return None
199
        elif param_type == 'checkbox' and (
200
            vt_param_value == '0' or vt_param_value == '1'
201
        ):
202
            return None
203
        elif param_type == 'file':
204
            try:
205
                b64decode(vt_param_value.encode())
206
            except (binascii.Error, AttributeError, TypeError):
207
                return 1
208
            return None
209
        elif param_type == 'integer':
210
            try:
211
                int(vt_param_value)
212
            except ValueError:
213
                return 1
214
            return None
215
216
        return 1
217
218
    def _process_vts(
219
        self,
220
        vts: Dict[str, Dict[str, str]],
221
    ) -> Tuple[List[str], Dict[str, str]]:
222
        """Add single VTs and their parameters."""
223
        vts_list = []
224
        vts_params = {}
225
        vtgroups = vts.pop('vt_groups')
226
227
        vthelper = VtHelper(self.nvti)
228
229
        if vtgroups:
230
            vts_list = self._get_vts_in_groups(vtgroups)
231
232
        for vtid, vt_params in vts.items():
233
            vt = vthelper.get_single_vt(vtid)
234
            if not vt:
235
                logger.warning(
236
                    'The VT %s was not found and it will not be added to the '
237
                    'plugin scheduler.',
238
                    vtid,
239
                )
240
                continue
241
242
            vts_list.append(vtid)
243
            for vt_param_id, vt_param_value in vt_params.items():
244
                param_type = self._get_vt_param_type(vt, vt_param_id)
245
                param_name = self._get_vt_param_name(vt, vt_param_id)
246
247
                if not param_type or not param_name:
248
                    logger.debug(
249
                        'Missing type or name for VT parameter %s of %s. '
250
                        'This VT parameter will not be set.',
251
                        vt_param_id,
252
                        vtid,
253
                    )
254
                    continue
255
256
                if vt_param_id == '0':
257
                    type_aux = 'integer'
258
                else:
259
                    type_aux = param_type
260
261
                if self.check_param_type(vt_param_value, type_aux):
262
                    logger.debug(
263
                        'The VT parameter %s for %s could not be set. '
264
                        'Expected %s type for parameter value %s',
265
                        vt_param_id,
266
                        vtid,
267
                        type_aux,
268
                        str(vt_param_value),
269
                    )
270
                    continue
271
272
                if type_aux == 'checkbox':
273
                    vt_param_value = _from_bool_to_str(int(vt_param_value))
274
275
                vts_params[
276
                    "{0}:{1}:{2}:{3}".format(
277
                        vtid, vt_param_id, param_type, param_name
278
                    )
279
                ] = str(vt_param_value)
280
281
        return vts_list, vts_params
282
283
    def prepare_plugins_for_openvas(self) -> bool:
284
        """Get the plugin list and it preferences from the Scan Collection.
285
        The plugin list is immediately stored in the kb.
286
        """
287
        nvts = self.scan_collection.get_vts(self.scan_id)
288
        if nvts:
289
            nvts_list, self._nvts_params = self._process_vts(nvts)
290
            # Add nvts list
291
            separ = ';'
292
            plugin_list = 'plugin_set|||%s' % separ.join(nvts_list)
293
            self.kbdb.add_scan_preferences(self.scan_id, [plugin_list])
294
295
            nvts_list = None
296
            plugin_list = None
297
            nvts = None
298
299
            return True
300
301
        return False
302
303
    def prepare_nvt_preferences(self):
304
        """Prepare the vts preferences. Store the data in the kb."""
305
306
        items_list = []
307
        for key, val in self._nvts_params.items():
308
            items_list.append('%s|||%s' % (key, val))
309
310
        if items_list:
311
            self.kbdb.add_scan_preferences(self.scan_id, items_list)
312
313
    @staticmethod
314
    def build_alive_test_opt_as_prefs(
315
        target_options: Dict[str, str]
316
    ) -> Dict[str, str]:
317
        """Parse the target options dictionary.
318
        Arguments:
319
            target_options: Dictionary with the target options.
320
321
        Return:
322
            A dict with the target options related to alive test method
323
            in string format to be added to the redis KB.
324
        """
325
        target_opt_prefs_list = {}
326
        alive_test = None
327
328 View Code Duplication
        if target_options:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
329
            # Alive test specified as bit field.
330
            alive_test = target_options.get('alive_test')
331
            # Alive test specified as individual methods.
332
            alive_test_methods = target_options.get('alive_test_methods')
333
            # alive_test takes precedence over alive_test_methods
334
            if alive_test is None and alive_test_methods:
335
                alive_test = alive_test_methods_to_bit_field(
336
                    icmp=target_options.get('icmp') == '1',
337
                    tcp_syn=target_options.get('tcp_syn') == '1',
338
                    tcp_ack=target_options.get('tcp_ack') == '1',
339
                    arp=target_options.get('arp') == '1',
340
                    consider_alive=target_options.get('consider_alive') == '1',
341
                )
342
343
        if target_options and alive_test:
344
            try:
345
                alive_test = int(alive_test)
346
            except ValueError:
347
                logger.debug(
348
                    'Alive test settings not applied. '
349
                    'Invalid alive test value %s',
350
                    target_options.get('alive_test'),
351
                )
352
                return target_opt_prefs_list
353
354
            # No alive test or wrong value, uses the default
355
            # preferences sent by the client.
356
            if alive_test < 1 or alive_test > 31:
357
                return target_opt_prefs_list
358
359
            if (
360
                alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
361
                or alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
362
            ):
363
                value = "yes"
364
            else:
365
                value = "no"
366
            target_opt_prefs_list[
367
                OID_PING_HOST + ':1:checkbox:' + 'Do a TCP ping'
368
            ] = value
369
370
            if (
371
                alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
372
                and alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
373
            ):
374
                value = "yes"
375
            else:
376
                value = "no"
377
            target_opt_prefs_list[
378
                OID_PING_HOST
379
                + ':2:checkbox:'
380
                + 'TCP ping tries also TCP-SYN ping'
381
            ] = value
382
383
            if (alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE) and not (
384
                alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
385
            ):
386
                value = "yes"
387
            else:
388
                value = "no"
389
            target_opt_prefs_list[
390
                OID_PING_HOST
391
                + ':7:checkbox:'
392
                + 'TCP ping tries only TCP-SYN ping'
393
            ] = value
394
395
            if alive_test & AliveTest.ALIVE_TEST_ICMP:
396
                value = "yes"
397
            else:
398
                value = "no"
399
            target_opt_prefs_list[
400
                OID_PING_HOST + ':3:checkbox:' + 'Do an ICMP ping'
401
            ] = value
402
403
            if alive_test & AliveTest.ALIVE_TEST_ARP:
404
                value = "yes"
405
            else:
406
                value = "no"
407
            target_opt_prefs_list[
408
                OID_PING_HOST + ':4:checkbox:' + 'Use ARP'
409
            ] = value
410
411
            if alive_test & AliveTest.ALIVE_TEST_CONSIDER_ALIVE:
412
                value = "no"
413
            else:
414
                value = "yes"
415
            target_opt_prefs_list[
416
                OID_PING_HOST
417
                + ':5:checkbox:'
418
                + 'Mark unrechable Hosts as dead (not scanning)'
419
            ] = value
420
421
        return target_opt_prefs_list
422
423
    def prepare_alive_test_option_for_openvas(self):
424
        """Set alive test option. Overwrite the scan config settings."""
425
        settings = Openvas.get_settings()
426
        if settings and (
427
            self.target_options.get('alive_test')
428
            or self.target_options.get('alive_test_methods')
429
        ):
430
            alive_test_opt = self.build_alive_test_opt_as_prefs(
431
                self.target_options
432
            )
433
            self._nvts_params.update(alive_test_opt)
434
435
    def prepare_boreas_alive_test(self):
436
        """Set alive_test for Boreas if boreas scanner config
437
        (BOREAS_SETTING_NAME) was set"""
438
        settings = Openvas.get_settings()
439
        alive_test = None
440
        alive_test_ports = None
441
        target_options = self.target_options
442
443
        if settings:
444
            boreas = settings.get(BOREAS_SETTING_NAME)
445
            if not boreas:
446
                return
447
        else:
448
            return
449
450 View Code Duplication
        if target_options:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
451
            alive_test_ports = target_options.get('alive_test_ports')
452
            # Alive test was specified as bit field.
453
            alive_test = target_options.get('alive_test')
454
            # Alive test was specified as individual methods.
455
            alive_test_methods = target_options.get('alive_test_methods')
456
            # <alive_test> takes precedence over <alive_test_methods>
457
            if alive_test is None and alive_test_methods:
458
                alive_test = alive_test_methods_to_bit_field(
459
                    icmp=target_options.get('icmp') == '1',
460
                    tcp_syn=target_options.get('tcp_syn') == '1',
461
                    tcp_ack=target_options.get('tcp_ack') == '1',
462
                    arp=target_options.get('arp') == '1',
463
                    consider_alive=target_options.get('consider_alive') == '1',
464
                )
465
466
        if alive_test is not None:
467
            try:
468
                alive_test = int(alive_test)
469
            except ValueError:
470
                logger.debug(
471
                    'Alive test preference for Boreas not set. '
472
                    'Invalid alive test value %s.',
473
                    alive_test,
474
                )
475
                # Use default alive test as fall back
476
                alive_test = AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT
477
        # Use default alive test if no valid alive_test was provided
478
        else:
479
            alive_test = AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT
480
481
        # If a valid alive_test was set then the bit mask
482
        # has value between 31 (11111) and 1 (10000)
483
        if 1 <= alive_test <= 31:
484
            pref = "{pref_key}|||{pref_value}".format(
485
                pref_key=BOREAS_ALIVE_TEST, pref_value=alive_test
486
            )
487
            self.kbdb.add_scan_preferences(self.scan_id, [pref])
488
489
        if alive_test == AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT:
490
            alive_test = AliveTest.ALIVE_TEST_ICMP
491
            pref = "{pref_key}|||{pref_value}".format(
492
                pref_key=BOREAS_ALIVE_TEST, pref_value=alive_test
493
            )
494
            self.kbdb.add_scan_preferences(self.scan_id, [pref])
495
496
        # Add portlist if present. Validity is checked on Boreas side.
497
        if alive_test_ports is not None:
498
            pref = "{pref_key}|||{pref_value}".format(
499
                pref_key=BOREAS_ALIVE_TEST_PORTS,
500
                pref_value=alive_test_ports,
501
            )
502
            self.kbdb.add_scan_preferences(self.scan_id, [pref])
503
504
    def prepare_reverse_lookup_opt_for_openvas(self):
505
        """Set reverse lookup options in the kb"""
506
        if self.target_options:
507
            items = []
508
            _rev_lookup_only = int(
509
                self.target_options.get('reverse_lookup_only', '0')
510
            )
511
            rev_lookup_only = _from_bool_to_str(_rev_lookup_only)
512
            items.append('reverse_lookup_only|||%s' % (rev_lookup_only))
513
514
            _rev_lookup_unify = int(
515
                self.target_options.get('reverse_lookup_unify', '0')
516
            )
517
            rev_lookup_unify = _from_bool_to_str(_rev_lookup_unify)
518
            items.append('reverse_lookup_unify|||%s' % rev_lookup_unify)
519
520
            self.kbdb.add_scan_preferences(self.scan_id, items)
521
522
    def prepare_target_for_openvas(self):
523
        """Get the target from the scan collection and set the target
524
        in the kb"""
525
526
        target = self.scan_collection.get_host_list(self.scan_id)
527
        target_aux = 'TARGET|||%s' % target
528
        self.kbdb.add_scan_preferences(self.scan_id, [target_aux])
529
530
    def prepare_ports_for_openvas(self) -> str:
531
        """Get the port list from the scan collection and store the list
532
        in the kb."""
533
        ports = self.scan_collection.get_ports(self.scan_id)
534
        if not valid_port_list(ports):
535
            return False
536
537
        port_range = 'port_range|||%s' % ports
538
        self.kbdb.add_scan_preferences(self.scan_id, [port_range])
539
540
        return ports
541
542
    def prepare_host_options_for_openvas(self):
543
        """Get the excluded and finished hosts from the scan collection and
544
        stores the list of hosts that must not be scanned in the kb."""
545
        exclude_hosts = self.scan_collection.get_exclude_hosts(self.scan_id)
546
547
        if exclude_hosts:
548
            pref_val = "exclude_hosts|||" + exclude_hosts
549
            self.kbdb.add_scan_preferences(self.scan_id, [pref_val])
550
551
    def prepare_scan_params_for_openvas(self, ospd_params: Dict[str, Dict]):
552
        """Get the scan parameters from the scan collection and store them
553
        in the kb.
554
        Arguments:
555
            ospd_params: Dictionary with the OSPD Params.
556
        """
557
        # Options which were supplied via the <scanner_params> XML element.
558
        options = self.scan_collection.get_options(self.scan_id)
559
        prefs_val = []
560
561
        for key, value in options.items():
562
            item_type = ''
563
            if key in ospd_params:
564
                item_type = ospd_params[key].get('type')
565
            else:
566
                if key not in BASE_SCANNER_PARAMS:
567
                    logger.debug(
568
                        "%s is a scanner only setting and should not be set "
569
                        "by the client. Setting needs to be included in "
570
                        "OpenVAS configuration file instead.",
571
                        key,
572
                    )
573
            if item_type == 'boolean':
574
                val = _from_bool_to_str(value)
575
            else:
576
                val = str(value)
577
            prefs_val.append(key + "|||" + val)
578
579
        if prefs_val:
580
            self.kbdb.add_scan_preferences(self.scan_id, prefs_val)
581
582
    def build_credentials_as_prefs(self, credentials: Dict) -> List[str]:
583
        """Parse the credential dictionary.
584
        Arguments:
585
            credentials: Dictionary with the credentials.
586
587
        Return:
588
            A list with the credentials in string format to be
589
            added to the redis KB.
590
        """
591
        cred_prefs_list = []
592
        for credential in credentials.items():
593
            service = credential[0]
594
            cred_params = credentials.get(service)
595
            cred_type = cred_params.get('type', '')
596
            username = cred_params.get('username', '')
597
            password = cred_params.get('password', '')
598
599
            # Check service ssh
600
            if service == 'ssh':
601
                # For ssh check the Port
602
                port = cred_params.get('port', '22')
603
                priv_username = cred_params.get('priv_username', '')
604
                priv_password = cred_params.get('priv_password', '')
605
                if not port:
606
                    port = '22'
607
                    warning = (
608
                        "Missing port number for ssh credentials."
609
                        + " Using default port 22."
610
                    )
611
                    logger.warning(warning)
612
                elif not port.isnumeric():
613
                    self.errors.append(
614
                        "Port for SSH '" + port + "' is not a valid number."
615
                    )
616
                    continue
617
                elif int(port) > 65535 or int(port) < 1:
618
                    self.errors.append(
619
                        "Port for SSH is out of range (1-65535): " + port
620
                    )
621
                    continue
622
                # For ssh check the credential type
623
                if cred_type == 'up':
624
                    cred_prefs_list.append(
625
                        OID_SSH_AUTH
626
                        + ':3:'
627
                        + 'password:SSH password '
628
                        + '(unsafe!):|||{0}'.format(password)
629
                    )
630
                elif cred_type == 'usk':
631
                    private = cred_params.get('private', '')
632
                    cred_prefs_list.append(
633
                        OID_SSH_AUTH
634
                        + ':2:'
635
                        + 'password:SSH key passphrase:|||'
636
                        + '{0}'.format(password)
637
                    )
638
                    cred_prefs_list.append(
639
                        OID_SSH_AUTH
640
                        + ':4:'
641
                        + 'file:SSH private key:|||'
642
                        + '{0}'.format(private)
643
                    )
644
                elif cred_type:
645
                    self.errors.append(
646
                        "Unknown Credential Type for SSH: "
647
                        + cred_type
648
                        + ". Use 'up' for Username + Password"
649
                        + " or 'usk' for Username + SSH Key."
650
                    )
651
                    continue
652
                else:
653
                    self.errors.append(
654
                        "Missing Credential Type for SSH."
655
                        + " Use 'up' for Username + Password"
656
                        + " or 'usk' for Username + SSH Key."
657
                    )
658
                    continue
659
                cred_prefs_list.append('auth_port_ssh|||' + '{0}'.format(port))
660
                cred_prefs_list.append(
661
                    OID_SSH_AUTH
662
                    + ':1:'
663
                    + 'entry:SSH login '
664
                    + 'name:|||{0}'.format(username)
665
                )
666
                cred_prefs_list.append(
667
                    OID_SSH_AUTH
668
                    + ':7:'
669
                    + 'entry:SSH privilege login name:|||{0}'.format(
670
                        priv_username
671
                    )
672
                )
673
                cred_prefs_list.append(
674
                    OID_SSH_AUTH
675
                    + ':8:'
676
                    + 'password:SSH privilege password:|||{0}'.format(
677
                        priv_password
678
                    )
679
                )
680
            # Check servic smb
681
            elif service == 'smb':
682
                cred_prefs_list.append(
683
                    OID_SMB_AUTH
684
                    + ':1:entry'
685
                    + ':SMB login:|||{0}'.format(username)
686
                )
687
                cred_prefs_list.append(
688
                    OID_SMB_AUTH
689
                    + ':2:'
690
                    + 'password:SMB password:|||'
691
                    + '{0}'.format(password)
692
                )
693
            # Check service esxi
694
            elif service == 'esxi':
695
                cred_prefs_list.append(
696
                    OID_ESXI_AUTH
697
                    + ':1:entry:'
698
                    + 'ESXi login name:|||'
699
                    + '{0}'.format(username)
700
                )
701
                cred_prefs_list.append(
702
                    OID_ESXI_AUTH
703
                    + ':2:'
704
                    + 'password:ESXi login password:|||'
705
                    + '{0}'.format(password)
706
                )
707
            # Check service snmp
708
            elif service == 'snmp':
709
                community = cred_params.get('community', '')
710
                auth_algorithm = cred_params.get('auth_algorithm', '')
711
                privacy_password = cred_params.get('privacy_password', '')
712
                privacy_algorithm = cred_params.get('privacy_algorithm', '')
713
714
                if not privacy_algorithm:
715
                    if privacy_password:
716
                        self.errors.append(
717
                            "When no privacy algorithm is used, the privacy"
718
                            + " password also has to be empty."
719
                        )
720
                        continue
721
                elif (
722
                    not privacy_algorithm == "aes"
723
                    and not privacy_algorithm == "des"
724
                ):
725
                    self.errors.append(
726
                        "Unknown privacy algorithm used: "
727
                        + privacy_algorithm
728
                        + ". Use 'aes', 'des' or '' (none)."
729
                    )
730
                    continue
731
732
                if not auth_algorithm:
733
                    self.errors.append(
734
                        "Missing authentication algorithm for SNMP."
735
                        + " Use 'md5' or 'sha1'."
736
                    )
737
                    continue
738
                elif (
739
                    not auth_algorithm == "md5" and not auth_algorithm == "sha1"
740
                ):
741
                    self.errors.append(
742
                        "Unknown authentication algorithm: "
743
                        + auth_algorithm
744
                        + ". Use 'md5' or 'sha1'."
745
                    )
746
                    continue
747
748
                cred_prefs_list.append(
749
                    OID_SNMP_AUTH
750
                    + ':1:'
751
                    + 'password:SNMP Community:|||'
752
                    + '{0}'.format(community)
753
                )
754
                cred_prefs_list.append(
755
                    OID_SNMP_AUTH
756
                    + ':2:'
757
                    + 'entry:SNMPv3 Username:|||'
758
                    + '{0}'.format(username)
759
                )
760
                cred_prefs_list.append(
761
                    OID_SNMP_AUTH + ':3:'
762
                    'password:SNMPv3 Password:|||' + '{0}'.format(password)
763
                )
764
                cred_prefs_list.append(
765
                    OID_SNMP_AUTH
766
                    + ':4:'
767
                    + 'radio:SNMPv3 Authentication Algorithm:|||'
768
                    + '{0}'.format(auth_algorithm)
769
                )
770
                cred_prefs_list.append(
771
                    OID_SNMP_AUTH
772
                    + ':5:'
773
                    + 'password:SNMPv3 Privacy Password:|||'
774
                    + '{0}'.format(privacy_password)
775
                )
776
                cred_prefs_list.append(
777
                    OID_SNMP_AUTH
778
                    + ':6:'
779
                    + 'radio:SNMPv3 Privacy Algorithm:|||'
780
                    + '{0}'.format(privacy_algorithm)
781
                )
782
            elif service:
783
                self.errors.append(
784
                    "Unknown service type for credential: " + service
785
                )
786
            else:
787
                self.errors.append("Missing service type for credential.")
788
789
        return cred_prefs_list
790
791
    def prepare_credentials_for_openvas(self) -> bool:
792
        """Get the credentials from the scan collection and store them
793
        in the kb."""
794
        logger.debug("Looking for given Credentials...")
795
        credentials = self.scan_collection.get_credentials(self.scan_id)
796
        if credentials:
797
            cred_prefs = self.build_credentials_as_prefs(credentials)
798
            if cred_prefs:
799
                self.kbdb.add_credentials_to_scan_preferences(
800
                    self.scan_id, cred_prefs
801
                )
802
                logger.debug("Credentials added to the kb.")
803
        else:
804
            logger.debug("No credentials found.")
805
        if credentials and not cred_prefs:
0 ignored issues
show
introduced by
The variable cred_prefs does not seem to be defined for all execution paths.
Loading history...
806
            return False
807
808
        return True
809
810
    def prepare_main_kbindex_for_openvas(self):
811
        """Store main_kbindex as global preference in the
812
        kb, used by OpenVAS"""
813
        ov_maindbid = 'ov_maindbid|||%d' % self.kbdb.index
814
        self.kbdb.add_scan_preferences(self.scan_id, [ov_maindbid])
815