Completed
Push — master ( 18f3e1...b5ff15 )
by Björn
24s queued 11s
created

ospd_openvas.preferencehandler._from_bool_to_str()   A

Complexity

Conditions 2

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 2
nop 1
dl 0
loc 4
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
# pylint: disable=too-many-lines
21
22
""" Prepare the preferences to be used by OpenVAS. Get the data from the scan
23
collection and store the data in a redis KB in the right format to be used by
24
OpenVAS. """
25
26
import logging
27
import binascii
28
29
from enum import IntEnum
30
from typing import Optional, Dict, List, Tuple
31
from base64 import b64decode
32
33
from ospd.scan import ScanCollection
34
from ospd.ospd import BASE_SCANNER_PARAMS
35
from ospd_openvas.openvas import Openvas
36
from ospd_openvas.db import KbDB
37
from ospd_openvas.nvticache import NVTICache
38
from ospd_openvas.vthelper import VtHelper
39
40
logger = logging.getLogger(__name__)
41
42
43
OID_SSH_AUTH = "1.3.6.1.4.1.25623.1.0.103591"
44
OID_SMB_AUTH = "1.3.6.1.4.1.25623.1.0.90023"
45
OID_ESXI_AUTH = "1.3.6.1.4.1.25623.1.0.105058"
46
OID_SNMP_AUTH = "1.3.6.1.4.1.25623.1.0.105076"
47
OID_PING_HOST = "1.3.6.1.4.1.25623.1.0.100315"
48
49
BOREAS_ALIVE_TEST = "ALIVE_TEST"
50
BOREAS_SETTING_NAME = "test_alive_hosts_only"
51
52
53
class AliveTest(IntEnum):
54
    """ Alive Tests. """
55
56
    ALIVE_TEST_SCAN_CONFIG_DEFAULT = 0
57
    ALIVE_TEST_TCP_ACK_SERVICE = 1
58
    ALIVE_TEST_ICMP = 2
59
    ALIVE_TEST_ARP = 4
60
    ALIVE_TEST_CONSIDER_ALIVE = 8
61
    ALIVE_TEST_TCP_SYN_SERVICE = 16
62
63
64
def _from_bool_to_str(value: int) -> str:
65
    """ The OpenVAS scanner use yes and no as boolean values, whereas ospd
66
    uses 1 and 0."""
67
    return 'yes' if value == 1 else 'no'
68
69
70
class PreferenceHandler:
71
    def __init__(
72
        self,
73
        scan_id: str,
74
        kbdb: KbDB,
75
        scan_collection: ScanCollection,
76
        nvticache: NVTICache,
77
    ):
78
        self.scan_id = scan_id
79
        self.kbdb = kbdb
80
        self.scan_collection = scan_collection
81
82
        self._target_options = None
83
84
        self.nvti = nvticache
85
86
    def prepare_scan_id_for_openvas(self):
87
        """ Create the openvas scan id and store it in the redis kb.
88
        Return the openvas scan_id.
89
        """
90
        self.kbdb.add_scan_id(self.scan_id)
91
92
    @property
93
    def target_options(self) -> Dict:
94
        """ Return target options from Scan collection """
95
        if self._target_options is not None:
96
            return self._target_options
97
98
        self._target_options = self.scan_collection.get_target_options(
99
            self.scan_id
100
        )
101
        return self._target_options
102
103
    def _get_vts_in_groups(self, filters: List[str],) -> List[str]:
104
        """ Return a list of vts which match with the given filter.
105
106
        Arguments:
107
            filters A list of filters. Each filter has key, operator and
108
                    a value. They are separated by a space.
109
                    Supported keys: family
110
111
        Returns a list of vt oids which match with the given filter.
112
        """
113
        vts_list = list()
114
        families = dict()
115
116
        oids = self.nvti.get_oids()
117
118
        for _, oid in oids:
119
            family = self.nvti.get_nvt_family(oid)
120
            if family not in families:
121
                families[family] = list()
122
123
            families[family].append(oid)
124
125
        for elem in filters:
126
            key, value = elem.split('=')
127
            if key == 'family' and value in families:
128
                vts_list.extend(families[value])
129
130
        return vts_list
131
132
    def _get_vt_param_type(self, vt: Dict, vt_param_id: str) -> Optional[str]:
133
        """ Return the type of the vt parameter from the vts dictionary. """
134
135
        vt_params_list = vt.get("vt_params")
136
        if vt_params_list.get(vt_param_id):
137
            return vt_params_list[vt_param_id]["type"]
138
        return None
139
140
    def _get_vt_param_name(self, vt: Dict, vt_param_id: str) -> Optional[str]:
141
        """ Return the type of the vt parameter from the vts dictionary. """
142
143
        vt_params_list = vt.get("vt_params")
144
        if vt_params_list.get(vt_param_id):
145
            return vt_params_list[vt_param_id]["name"]
146
        return None
147
148
    @staticmethod
149
    def check_param_type(vt_param_value: str, param_type: str) -> Optional[int]:
150
        """ Check if the value of a vt parameter matches with
151
        the type founded.
152
        """
153
        if param_type in [
154
            'entry',
155
            'password',
156
            'radio',
157
            'sshlogin',
158
        ] and isinstance(vt_param_value, str):
159
            return None
160
        elif param_type == 'checkbox' and (
161
            vt_param_value == '0' or vt_param_value == '1'
162
        ):
163
            return None
164
        elif param_type == 'file':
165
            try:
166
                b64decode(vt_param_value.encode())
167
            except (binascii.Error, AttributeError, TypeError):
168
                return 1
169
            return None
170
        elif param_type == 'integer':
171
            try:
172
                int(vt_param_value)
173
            except ValueError:
174
                return 1
175
            return None
176
177
        return 1
178
179
    def _process_vts(
180
        self, vts: Dict[str, Dict[str, str]],
181
    ) -> Tuple[List[str], Dict[str, str]]:
182
        """ Add single VTs and their parameters. """
183
        vts_list = []
184
        vts_params = {}
185
        vtgroups = vts.pop('vt_groups')
186
187
        vthelper = VtHelper(self.nvti)
188
189
        if vtgroups:
190
            vts_list = self._get_vts_in_groups(vtgroups)
191
192
        for vtid, vt_params in vts.items():
193
            vt = vthelper.get_single_vt(vtid)
194
            if not vt:
195
                logger.warning(
196
                    'The VT %s was not found and it will not be added to the '
197
                    'plugin scheduler.',
198
                    vtid,
199
                )
200
                continue
201
202
            vts_list.append(vtid)
203
            for vt_param_id, vt_param_value in vt_params.items():
204
                param_type = self._get_vt_param_type(vt, vt_param_id)
205
                param_name = self._get_vt_param_name(vt, vt_param_id)
206
207
                if not param_type or not param_name:
208
                    logger.debug(
209
                        'Missing type or name for VT parameter %s of %s. '
210
                        'This VT parameter will not be set.',
211
                        vt_param_id,
212
                        vtid,
213
                    )
214
                    continue
215
216
                if vt_param_id == '0':
217
                    type_aux = 'integer'
218
                else:
219
                    type_aux = param_type
220
221
                if self.check_param_type(vt_param_value, type_aux):
222
                    logger.debug(
223
                        'The VT parameter %s for %s could not be set. '
224
                        'Expected %s type for parameter value %s',
225
                        vt_param_id,
226
                        vtid,
227
                        type_aux,
228
                        str(vt_param_value),
229
                    )
230
                    continue
231
232
                if type_aux == 'checkbox':
233
                    vt_param_value = _from_bool_to_str(int(vt_param_value))
234
235
                vts_params[
236
                    "{0}:{1}:{2}:{3}".format(
237
                        vtid, vt_param_id, param_type, param_name
238
                    )
239
                ] = str(vt_param_value)
240
241
        return vts_list, vts_params
242
243
    def prepare_plugins_for_openvas(self) -> bool:
244
        """ Get the plugin list to be launched from the Scan Collection
245
        and prepare the vts preferences. Store the data in the kb.
246
        """
247
        nvts = self.scan_collection.get_vts(self.scan_id)
248
        if nvts:
249
            nvts_list, nvts_params = self._process_vts(nvts)
250
            # Add nvts list
251
            separ = ';'
252
            plugin_list = 'plugin_set|||%s' % separ.join(nvts_list)
253
            self.kbdb.add_scan_preferences(self.scan_id, [plugin_list])
254
255
            # Add nvts parameters
256
            for key, val in nvts_params.items():
257
                item = '%s|||%s' % (key, val)
258
                self.kbdb.add_scan_preferences(self.scan_id, [item])
259
260
            nvts_params = None
261
            nvts_list = None
262
            item = None
263
            plugin_list = None
264
            nvts = None
265
266
            return True
267
268
        return False
269
270
    @staticmethod
271
    def build_alive_test_opt_as_prefs(
272
        target_options: Dict[str, str]
273
    ) -> List[str]:
274
        """ Parse the target options dictionary.
275
        Arguments:
276
            target_options: Dictionary with the target options.
277
278
        Return:
279
            A list with the target options related to alive test method
280
            in string format to be added to the redis KB.
281
        """
282
        target_opt_prefs_list = []
283
        if target_options and target_options.get('alive_test'):
284
            try:
285
                alive_test = int(target_options.get('alive_test'))
286
            except ValueError:
287
                logger.debug(
288
                    'Alive test settings not applied. '
289
                    'Invalid alive test value %s',
290
                    target_options.get('alive_test'),
291
                )
292
                return target_opt_prefs_list
293
294
            if alive_test < 1 or alive_test > 31:
295
                return target_opt_prefs_list
296
297
            if (
298
                alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
299
                or alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
300
            ):
301
                value = "yes"
302
            else:
303
                value = "no"
304
            target_opt_prefs_list.append(
305
                OID_PING_HOST
306
                + ':1:checkbox:'
307
                + 'Do a TCP ping|||'
308
                + '{0}'.format(value)
309
            )
310
311
            if (
312
                alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
313
                and alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
314
            ):
315
                value = "yes"
316
            else:
317
                value = "no"
318
            target_opt_prefs_list.append(
319
                OID_PING_HOST
320
                + ':2:checkbox:'
321
                + 'TCP ping tries also TCP-SYN ping|||'
322
                + '{0}'.format(value)
323
            )
324
325
            if (alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE) and not (
326
                alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
327
            ):
328
                value = "yes"
329
            else:
330
                value = "no"
331
            target_opt_prefs_list.append(
332
                OID_PING_HOST
333
                + ':7:checkbox:'
334
                + 'TCP ping tries only TCP-SYN ping|||'
335
                + '{0}'.format(value)
336
            )
337
338
            if alive_test & AliveTest.ALIVE_TEST_ICMP:
339
                value = "yes"
340
            else:
341
                value = "no"
342
            target_opt_prefs_list.append(
343
                OID_PING_HOST
344
                + ':3:checkbox:'
345
                + 'Do an ICMP ping|||'
346
                + '{0}'.format(value)
347
            )
348
349
            if alive_test & AliveTest.ALIVE_TEST_ARP:
350
                value = "yes"
351
            else:
352
                value = "no"
353
            target_opt_prefs_list.append(
354
                OID_PING_HOST
355
                + ':4:checkbox:'
356
                + 'Use ARP|||'
357
                + '{0}'.format(value)
358
            )
359
360
            if alive_test & AliveTest.ALIVE_TEST_CONSIDER_ALIVE:
361
                value = "no"
362
            else:
363
                value = "yes"
364
            target_opt_prefs_list.append(
365
                OID_PING_HOST
366
                + ':5:checkbox:'
367
                + 'Mark unrechable Hosts as dead (not scanning)|||'
368
                + '{0}'.format(value)
369
            )
370
371
            # Also select a method, otherwise Ping Host logs a warning.
372
            if alive_test == AliveTest.ALIVE_TEST_CONSIDER_ALIVE:
373
                target_opt_prefs_list.append(
374
                    OID_PING_HOST + ':1:checkbox:' + 'Do a TCP ping|||yes'
375
                )
376
        return target_opt_prefs_list
377
378
    def prepare_alive_test_option_for_openvas(self):
379
        """ Set alive test option. Overwrite the scan config settings."""
380
        settings = Openvas.get_settings()
381
        if settings and self.target_options.get('alive_test'):
382
            alive_test_opt = self.build_alive_test_opt_as_prefs(
383
                self.target_options
384
            )
385
            self.kbdb.add_scan_preferences(self.scan_id, alive_test_opt)
386
387
    def prepare_boreas_alive_test(self):
388
        """ Set alive_test for Boreas if boreas scanner config
389
        (BOREAS_SETTING_NAME) was set"""
390
        settings = Openvas.get_settings()
391
        alive_test = -1
392
393
        if settings:
394
            boreas = settings.get(BOREAS_SETTING_NAME)
395
            if not boreas:
396
                return
397
            alive_test_str = self.target_options.get('alive_test')
398
            if alive_test_str is not None:
399
                try:
400
                    alive_test = int(alive_test_str)
401
                except ValueError:
402
                    logger.debug(
403
                        'Alive test preference for Boreas not set. '
404
                        'Invalid alive test value %s.',
405
                        alive_test_str,
406
                    )
407
            # ALIVE_TEST_SCAN_CONFIG_DEFAULT if no alive_test provided
408
            else:
409
                alive_test = AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT
410
411
        # If a valid alive_test was set then the bit mask
412
        # has value between 31 (11111) and 1 (10000)
413
        if 1 <= alive_test <= 31:
414
            pref = "{pref_key}|||{pref_value}".format(
415
                pref_key=BOREAS_ALIVE_TEST, pref_value=alive_test
416
            )
417
            self.kbdb.add_scan_preferences(self.scan_id, [pref])
418
419
        if alive_test == AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT:
420
            alive_test = AliveTest.ALIVE_TEST_ICMP
421
            pref = "{pref_key}|||{pref_value}".format(
422
                pref_key=BOREAS_ALIVE_TEST, pref_value=alive_test
423
            )
424
            self.kbdb.add_scan_preferences(self.scan_id, [pref])
425
426
    def prepare_reverse_lookup_opt_for_openvas(self):
427
        """ Set reverse lookup options in the kb"""
428
        if self.target_options:
429
            items = []
430
            _rev_lookup_only = int(
431
                self.target_options.get('reverse_lookup_only', '0')
432
            )
433
            rev_lookup_only = _from_bool_to_str(_rev_lookup_only)
434
            items.append('reverse_lookup_only|||%s' % (rev_lookup_only))
435
436
            _rev_lookup_unify = int(
437
                self.target_options.get('reverse_lookup_unify', '0')
438
            )
439
            rev_lookup_unify = _from_bool_to_str(_rev_lookup_unify)
440
            items.append('reverse_lookup_unify|||%s' % rev_lookup_unify)
441
442
            self.kbdb.add_scan_preferences(self.scan_id, items)
443
444
    def prepare_target_for_openvas(self):
445
        """ Get the target from the scan collection and set the target
446
        in the kb """
447
448
        target = self.scan_collection.get_host_list(self.scan_id)
449
        target_aux = 'TARGET|||%s' % target
450
        self.kbdb.add_scan_preferences(self.scan_id, [target_aux])
451
452
    def prepare_ports_for_openvas(self) -> str:
453
        """ Get the port list from the scan collection and store the list
454
        in the kb. """
455
        ports = self.scan_collection.get_ports(self.scan_id)
456
        port_range = 'port_range|||%s' % ports
457
        self.kbdb.add_scan_preferences(self.scan_id, [port_range])
458
459
        return ports
460
461
    def prepare_host_options_for_openvas(self):
462
        """ Get the excluded and finished hosts from the scan collection and
463
        stores the list of hosts that must not be scanned in the kb. """
464
        exclude_hosts = self.scan_collection.get_exclude_hosts(self.scan_id)
465
466
        if exclude_hosts:
467
            pref_val = "exclude_hosts|||" + exclude_hosts
468
            self.kbdb.add_scan_preferences(self.scan_id, [pref_val])
469
470
    def prepare_scan_params_for_openvas(self, ospd_params: Dict[str, Dict]):
471
        """ Get the scan parameters from the scan collection and store them
472
        in the kb.
473
        Arguments:
474
            ospd_params: Dictionary with the OSPD Params.
475
        """
476
        # Options which were supplied via the <scanner_params> XML element.
477
        options = self.scan_collection.get_options(self.scan_id)
478
        prefs_val = []
479
480
        for key, value in options.items():
481
            item_type = ''
482
            if key in ospd_params:
483
                item_type = ospd_params[key].get('type')
484
            else:
485
                if key not in BASE_SCANNER_PARAMS:
486
                    logger.debug(
487
                        "%s is a scanner only setting and should not be set "
488
                        "by the client. Setting needs to be included in "
489
                        "OpenVAS configuration file instead.",
490
                        key,
491
                    )
492
            if item_type == 'boolean':
493
                val = _from_bool_to_str(value)
494
            else:
495
                val = str(value)
496
            prefs_val.append(key + "|||" + val)
497
498
        if prefs_val:
499
            self.kbdb.add_scan_preferences(self.scan_id, prefs_val)
500
501
    @staticmethod
502
    def build_credentials_as_prefs(credentials: Dict) -> List[str]:
503
        """ Parse the credential dictionary.
504
        Arguments:
505
            credentials: Dictionary with the credentials.
506
507
        Return:
508
            A list with the credentials in string format to be
509
            added to the redis KB.
510
        """
511
        cred_prefs_list = []
512
        for credential in credentials.items():
513
            service = credential[0]
514
            cred_params = credentials.get(service)
515
            cred_type = cred_params.get('type', '')
516
            username = cred_params.get('username', '')
517
            password = cred_params.get('password', '')
518
519
            if service == 'ssh':
520
                port = cred_params.get('port', '')
521
                cred_prefs_list.append('auth_port_ssh|||' + '{0}'.format(port))
522
                cred_prefs_list.append(
523
                    OID_SSH_AUTH
524
                    + ':1:'
525
                    + 'entry:SSH login '
526
                    + 'name:|||{0}'.format(username)
527
                )
528
                if cred_type == 'up':
529
                    cred_prefs_list.append(
530
                        OID_SSH_AUTH
531
                        + ':3:'
532
                        + 'password:SSH password '
533
                        + '(unsafe!):|||{0}'.format(password)
534
                    )
535
                else:
536
                    private = cred_params.get('private', '')
537
                    cred_prefs_list.append(
538
                        OID_SSH_AUTH
539
                        + ':2:'
540
                        + 'password:SSH key passphrase:|||'
541
                        + '{0}'.format(password)
542
                    )
543
                    cred_prefs_list.append(
544
                        OID_SSH_AUTH
545
                        + ':4:'
546
                        + 'file:SSH private key:|||'
547
                        + '{0}'.format(private)
548
                    )
549
            if service == 'smb':
550
                cred_prefs_list.append(
551
                    OID_SMB_AUTH
552
                    + ':1:entry'
553
                    + ':SMB login:|||{0}'.format(username)
554
                )
555
                cred_prefs_list.append(
556
                    OID_SMB_AUTH
557
                    + ':2:'
558
                    + 'password:SMB password:|||'
559
                    + '{0}'.format(password)
560
                )
561
            if service == 'esxi':
562
                cred_prefs_list.append(
563
                    OID_ESXI_AUTH
564
                    + ':1:entry:'
565
                    + 'ESXi login name:|||'
566
                    + '{0}'.format(username)
567
                )
568
                cred_prefs_list.append(
569
                    OID_ESXI_AUTH
570
                    + ':2:'
571
                    + 'password:ESXi login password:|||'
572
                    + '{0}'.format(password)
573
                )
574
575
            if service == 'snmp':
576
                community = cred_params.get('community', '')
577
                auth_algorithm = cred_params.get('auth_algorithm', '')
578
                privacy_password = cred_params.get('privacy_password', '')
579
                privacy_algorithm = cred_params.get('privacy_algorithm', '')
580
581
                cred_prefs_list.append(
582
                    OID_SNMP_AUTH
583
                    + ':1:'
584
                    + 'password:SNMP Community:|||'
585
                    + '{0}'.format(community)
586
                )
587
                cred_prefs_list.append(
588
                    OID_SNMP_AUTH
589
                    + ':2:'
590
                    + 'entry:SNMPv3 Username:|||'
591
                    + '{0}'.format(username)
592
                )
593
                cred_prefs_list.append(
594
                    OID_SNMP_AUTH + ':3:'
595
                    'password:SNMPv3 Password:|||' + '{0}'.format(password)
596
                )
597
                cred_prefs_list.append(
598
                    OID_SNMP_AUTH
599
                    + ':4:'
600
                    + 'radio:SNMPv3 Authentication Algorithm:|||'
601
                    + '{0}'.format(auth_algorithm)
602
                )
603
                cred_prefs_list.append(
604
                    OID_SNMP_AUTH
605
                    + ':5:'
606
                    + 'password:SNMPv3 Privacy Password:|||'
607
                    + '{0}'.format(privacy_password)
608
                )
609
                cred_prefs_list.append(
610
                    OID_SNMP_AUTH
611
                    + ':6:'
612
                    + 'radio:SNMPv3 Privacy Algorithm:|||'
613
                    + '{0}'.format(privacy_algorithm)
614
                )
615
616
        return cred_prefs_list
617
618
    def prepare_credentials_for_openvas(self) -> bool:
619
        """ Get the credentials from the scan collection and store them
620
        in the kb. """
621
        credentials = self.scan_collection.get_credentials(self.scan_id)
622
        if credentials:
623
            cred_prefs = self.build_credentials_as_prefs(credentials)
624
            if cred_prefs:
625
                self.kbdb.add_credentials_to_scan_preferences(
626
                    self.scan_id, cred_prefs
627
                )
628
629
        if credentials and not cred_prefs:
0 ignored issues
show
introduced by
The variable cred_prefs does not seem to be defined in case credentials on line 622 is False. Are you sure this can never be the case?
Loading history...
630
            return False
631
632
        return True
633
634
    def prepare_main_kbindex_for_openvas(self):
635
        """ Store main_kbindex as global preference in the
636
        kb, used by OpenVAS"""
637
        ov_maindbid = 'ov_maindbid|||%d' % self.kbdb.index
638
        self.kbdb.add_scan_preferences(self.scan_id, [ov_maindbid])
639