Passed
Pull Request — master (#374)
by
unknown
01:23
created

PreferenceHandler.prepare_boreas_alive_test()   B

Complexity

Conditions 7

Size

Total Lines 38
Code Lines 25

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
eloc 25
nop 1
dl 0
loc 38
rs 7.8799
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
# pylint: disable=too-many-lines
21
22
""" Prepare the preferences to be used by OpenVAS. Get the data from the scan
23
collection and store the data in a redis KB in the right format to be used by
24
OpenVAS. """
25
26
import logging
27
import uuid
28
import binascii
29
30
from enum import IntEnum
31
from typing import Optional, Dict, List, Tuple
32
from base64 import b64decode
33
34
from ospd.scan import ScanCollection
35
from ospd.ospd import BASE_SCANNER_PARAMS
36
from ospd_openvas.openvas import Openvas
37
from ospd_openvas.db import KbDB
38
from ospd_openvas.nvticache import NVTICache
39
from ospd_openvas.vthelper import VtHelper
40
41
logger = logging.getLogger(__name__)
42
43
44
OID_SSH_AUTH = "1.3.6.1.4.1.25623.1.0.103591"
45
OID_SMB_AUTH = "1.3.6.1.4.1.25623.1.0.90023"
46
OID_ESXI_AUTH = "1.3.6.1.4.1.25623.1.0.105058"
47
OID_SNMP_AUTH = "1.3.6.1.4.1.25623.1.0.105076"
48
OID_PING_HOST = "1.3.6.1.4.1.25623.1.0.100315"
49
50
BOREAS_ALIVE_TEST = "ALIVE_TEST"
51
BOREAS_SETTING_NAME = "test_alive_hosts_only"
52
53
54
class AliveTest(IntEnum):
55
    """ Alive Tests. """
56
57
    ALIVE_TEST_SCAN_CONFIG_DEFAULT = 0
58
    ALIVE_TEST_TCP_ACK_SERVICE = 1
59
    ALIVE_TEST_ICMP = 2
60
    ALIVE_TEST_ARP = 4
61
    ALIVE_TEST_CONSIDER_ALIVE = 8
62
    ALIVE_TEST_TCP_SYN_SERVICE = 16
63
64
65
def _from_bool_to_str(value: int) -> str:
66
    """The OpenVAS scanner use yes and no as boolean values, whereas ospd
67
    uses 1 and 0."""
68
    return 'yes' if value == 1 else 'no'
69
70
71
class PreferenceHandler:
72
    def __init__(
73
        self,
74
        scan_id: str,
75
        kbdb: KbDB,
76
        scan_collection: ScanCollection,
77
        nvticache: NVTICache,
78
    ):
79
        self.scan_id = scan_id
80
        self.kbdb = kbdb
81
        self.scan_collection = scan_collection
82
83
        self._openvas_scan_id = None
84
85
        self._target_options = None
86
        self._nvts_params = None
87
88
        self.nvti = nvticache
89
90
    def prepare_openvas_scan_id_for_openvas(self):
91
        """Create the openvas scan id and store it in the redis kb.
92
        Return the openvas scan_id.
93
        """
94
        self._openvas_scan_id = str(uuid.uuid4())
95
        self.kbdb.add_scan_id(self.scan_id, self._openvas_scan_id)
96
97
        return self._openvas_scan_id
98
99
    @property
100
    def target_options(self) -> Dict:
101
        """ Return target options from Scan collection """
102
        if self._target_options is not None:
103
            return self._target_options
104
105
        self._target_options = self.scan_collection.get_target_options(
106
            self.scan_id
107
        )
108
        return self._target_options
109
110
    def _get_vts_in_groups(
111
        self,
112
        filters: List[str],
113
    ) -> List[str]:
114
        """Return a list of vts which match with the given filter.
115
116
        Arguments:
117
            filters A list of filters. Each filter has key, operator and
118
                    a value. They are separated by a space.
119
                    Supported keys: family
120
121
        Returns a list of vt oids which match with the given filter.
122
        """
123
        vts_list = list()
124
        families = dict()
125
126
        oids = self.nvti.get_oids()
127
128
        for _, oid in oids:
129
            family = self.nvti.get_nvt_family(oid)
130
            if family not in families:
131
                families[family] = list()
132
133
            families[family].append(oid)
134
135
        for elem in filters:
136
            key, value = elem.split('=')
137
            if key == 'family' and value in families:
138
                vts_list.extend(families[value])
139
140
        return vts_list
141
142
    def _get_vt_param_type(self, vt: Dict, vt_param_id: str) -> Optional[str]:
143
        """ Return the type of the vt parameter from the vts dictionary. """
144
145
        vt_params_list = vt.get("vt_params")
146
        if vt_params_list.get(vt_param_id):
147
            return vt_params_list[vt_param_id]["type"]
148
        return None
149
150
    def _get_vt_param_name(self, vt: Dict, vt_param_id: str) -> Optional[str]:
151
        """ Return the type of the vt parameter from the vts dictionary. """
152
153
        vt_params_list = vt.get("vt_params")
154
        if vt_params_list.get(vt_param_id):
155
            return vt_params_list[vt_param_id]["name"]
156
        return None
157
158
    @staticmethod
159
    def check_param_type(vt_param_value: str, param_type: str) -> Optional[int]:
160
        """Check if the value of a vt parameter matches with
161
        the type founded.
162
        """
163
        if (
164
            param_type
165
            in [
166
                'entry',
167
                'password',
168
                'radio',
169
                'sshlogin',
170
            ]
171
            and isinstance(vt_param_value, str)
172
        ):
173
            return None
174
        elif param_type == 'checkbox' and (
175
            vt_param_value == '0' or vt_param_value == '1'
176
        ):
177
            return None
178
        elif param_type == 'file':
179
            try:
180
                b64decode(vt_param_value.encode())
181
            except (binascii.Error, AttributeError, TypeError):
182
                return 1
183
            return None
184
        elif param_type == 'integer':
185
            try:
186
                int(vt_param_value)
187
            except ValueError:
188
                return 1
189
            return None
190
191
        return 1
192
193
    def _process_vts(
194
        self,
195
        vts: Dict[str, Dict[str, str]],
196
    ) -> Tuple[List[str], Dict[str, str]]:
197
        """ Add single VTs and their parameters. """
198
        vts_list = []
199
        vts_params = {}
200
        vtgroups = vts.pop('vt_groups')
201
202
        vthelper = VtHelper(self.nvti)
203
204
        if vtgroups:
205
            vts_list = self._get_vts_in_groups(vtgroups)
206
207
        for vtid, vt_params in vts.items():
208
            vt = vthelper.get_single_vt(vtid)
209
            if not vt:
210
                logger.warning(
211
                    'The VT %s was not found and it will not be added to the '
212
                    'plugin scheduler.',
213
                    vtid,
214
                )
215
                continue
216
217
            vts_list.append(vtid)
218
            for vt_param_id, vt_param_value in vt_params.items():
219
                param_type = self._get_vt_param_type(vt, vt_param_id)
220
                param_name = self._get_vt_param_name(vt, vt_param_id)
221
222
                if not param_type or not param_name:
223
                    logger.debug(
224
                        'Missing type or name for VT parameter %s of %s. '
225
                        'This VT parameter will not be set.',
226
                        vt_param_id,
227
                        vtid,
228
                    )
229
                    continue
230
231
                if vt_param_id == '0':
232
                    type_aux = 'integer'
233
                else:
234
                    type_aux = param_type
235
236
                if self.check_param_type(vt_param_value, type_aux):
237
                    logger.debug(
238
                        'The VT parameter %s for %s could not be set. '
239
                        'Expected %s type for parameter value %s',
240
                        vt_param_id,
241
                        vtid,
242
                        type_aux,
243
                        str(vt_param_value),
244
                    )
245
                    continue
246
247
                if type_aux == 'checkbox':
248
                    vt_param_value = _from_bool_to_str(int(vt_param_value))
249
250
                vts_params[
251
                    "{0}:{1}:{2}:{3}".format(
252
                        vtid, vt_param_id, param_type, param_name
253
                    )
254
                ] = str(vt_param_value)
255
256
        return vts_list, vts_params
257
258
    def prepare_plugins_for_openvas(self) -> bool:
259
        """Get the plugin list and it preferences from the Scan Collection.
260
        The plugin list is immediately stored in the kb.
261
        """
262
        nvts = self.scan_collection.get_vts(self.scan_id)
263
        if nvts:
264
            nvts_list, self._nvts_params = self._process_vts(nvts)
265
            # Add nvts list
266
            separ = ';'
267
            plugin_list = 'plugin_set|||%s' % separ.join(nvts_list)
268
            self.kbdb.add_scan_preferences(self._openvas_scan_id, [plugin_list])
269
270
            nvts_list = None
271
            plugin_list = None
272
            nvts = None
273
274
            return True
275
276
        return False
277
278
    def prepare_nvt_preferences(self):
279
        """Prepare the vts preferences. Store the data in the kb."""
280
281
        items_list = []
282
        for key, val in self._nvts_params.items():
283
            items_list.append('%s|||%s' % (key, val))
284
285
        if items_list:
286
            self.kbdb.add_scan_preferences(self._openvas_scan_id, items_list)
287
288
    @staticmethod
289
    def build_alive_test_opt_as_prefs(
290
        target_options: Dict[str, str]
291
    ) -> List[str]:
292
        """Parse the target options dictionary.
293
        Arguments:
294
            target_options: Dictionary with the target options.
295
296
        Return:
297
            A list with the target options related to alive test method
298
            in string format to be added to the redis KB.
299
        """
300
        target_opt_prefs_list = {}
301
302
        if target_options and target_options.get('alive_test'):
303
            try:
304
                alive_test = int(target_options.get('alive_test'))
305
            except ValueError:
306
                logger.debug(
307
                    'Alive test settings not applied. '
308
                    'Invalid alive test value %s',
309
                    target_options.get('alive_test'),
310
                )
311
                return target_opt_prefs_list
312
313
            # No alive test or wrong value, uses the default
314
            # preferences sent by the client.
315
            if alive_test < 1 or alive_test > 31:
316
                return target_opt_prefs_list
317
318
            if (
319
                alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
320
                or alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
321
            ):
322
                value = "yes"
323
            else:
324
                value = "no"
325
            target_opt_prefs_list[
326
                OID_PING_HOST + ':1:checkbox:' + 'Do a TCP ping'
327
            ] = value
328
329
            if (
330
                alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
331
                and alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
332
            ):
333
                value = "yes"
334
            else:
335
                value = "no"
336
            target_opt_prefs_list[
337
                OID_PING_HOST
338
                + ':2:checkbox:'
339
                + 'TCP ping tries also TCP-SYN ping'
340
            ] = value
341
342
            if (alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE) and not (
343
                alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
344
            ):
345
                value = "yes"
346
            else:
347
                value = "no"
348
            target_opt_prefs_list[
349
                OID_PING_HOST
350
                + ':7:checkbox:'
351
                + 'TCP ping tries only TCP-SYN ping'
352
            ] = value
353
354
            if alive_test & AliveTest.ALIVE_TEST_ICMP:
355
                value = "yes"
356
            else:
357
                value = "no"
358
            target_opt_prefs_list[
359
                OID_PING_HOST + ':3:checkbox:' + 'Do an ICMP ping'
360
            ] = value
361
362
            if alive_test & AliveTest.ALIVE_TEST_ARP:
363
                value = "yes"
364
            else:
365
                value = "no"
366
            target_opt_prefs_list[
367
                OID_PING_HOST + ':4:checkbox:' + 'Use ARP'
368
            ] = value
369
370
            if alive_test & AliveTest.ALIVE_TEST_CONSIDER_ALIVE:
371
                value = "no"
372
            else:
373
                value = "yes"
374
            target_opt_prefs_list[
375
                OID_PING_HOST
376
                + ':5:checkbox:'
377
                + 'Mark unrechable Hosts as dead (not scanning)'
378
            ] = value
379
380
            # Also select a method, otherwise Ping Host logs a warning.
381
            if alive_test == AliveTest.ALIVE_TEST_CONSIDER_ALIVE:
382
                target_opt_prefs_list[
383
                    OID_PING_HOST + ':1:checkbox:' + 'Do a TCP ping'
384
                ] = 'yes'
385
386
        return target_opt_prefs_list
387
388
    def prepare_alive_test_option_for_openvas(self):
389
        """ Set alive test option. Overwrite the scan config settings."""
390
        settings = Openvas.get_settings()
391
        if settings and self.target_options.get('alive_test'):
392
            alive_test_opt = self.build_alive_test_opt_as_prefs(
393
                self.target_options
394
            )
395
            self._nvts_params.update(alive_test_opt)
396
397
    def prepare_boreas_alive_test(self):
398
        """Set alive_test for Boreas if boreas scanner config
399
        (BOREAS_SETTING_NAME) was set"""
400
        settings = Openvas.get_settings()
401
        alive_test = -1
402
403
        if settings:
404
            boreas = settings.get(BOREAS_SETTING_NAME)
405
            if not boreas:
406
                return
407
            alive_test_str = self.target_options.get('alive_test')
408
            if alive_test_str is not None:
409
                try:
410
                    alive_test = int(alive_test_str)
411
                except ValueError:
412
                    logger.debug(
413
                        'Alive test preference for Boreas not set. '
414
                        'Invalid alive test value %s.',
415
                        alive_test_str,
416
                    )
417
            # ALIVE_TEST_SCAN_CONFIG_DEFAULT if no alive_test provided
418
            else:
419
                alive_test = AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT
420
421
        # If a valid alive_test was set then the bit mask
422
        # has value between 31 (11111) and 1 (10000)
423
        if 1 <= alive_test <= 31:
424
            pref = "{pref_key}|||{pref_value}".format(
425
                pref_key=BOREAS_ALIVE_TEST, pref_value=alive_test
426
            )
427
            self.kbdb.add_scan_preferences(self._openvas_scan_id, [pref])
428
429
        if alive_test == AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT:
430
            alive_test = AliveTest.ALIVE_TEST_ICMP
431
            pref = "{pref_key}|||{pref_value}".format(
432
                pref_key=BOREAS_ALIVE_TEST, pref_value=alive_test
433
            )
434
            self.kbdb.add_scan_preferences(self._openvas_scan_id, [pref])
435
436
    def prepare_reverse_lookup_opt_for_openvas(self):
437
        """ Set reverse lookup options in the kb"""
438
        if self.target_options:
439
            items = []
440
            _rev_lookup_only = int(
441
                self.target_options.get('reverse_lookup_only', '0')
442
            )
443
            rev_lookup_only = _from_bool_to_str(_rev_lookup_only)
444
            items.append('reverse_lookup_only|||%s' % (rev_lookup_only))
445
446
            _rev_lookup_unify = int(
447
                self.target_options.get('reverse_lookup_unify', '0')
448
            )
449
            rev_lookup_unify = _from_bool_to_str(_rev_lookup_unify)
450
            items.append('reverse_lookup_unify|||%s' % rev_lookup_unify)
451
452
            self.kbdb.add_scan_preferences(self._openvas_scan_id, items)
453
454
    def prepare_target_for_openvas(self):
455
        """Get the target from the scan collection and set the target
456
        in the kb"""
457
458
        target = self.scan_collection.get_host_list(self.scan_id)
459
        target_aux = 'TARGET|||%s' % target
460
        self.kbdb.add_scan_preferences(self._openvas_scan_id, [target_aux])
461
462
    def prepare_ports_for_openvas(self) -> str:
463
        """Get the port list from the scan collection and store the list
464
        in the kb."""
465
        ports = self.scan_collection.get_ports(self.scan_id)
466
        port_range = 'port_range|||%s' % ports
467
        self.kbdb.add_scan_preferences(self._openvas_scan_id, [port_range])
468
469
        return ports
470
471
    def prepare_host_options_for_openvas(self):
472
        """Get the excluded and finished hosts from the scan collection and
473
        stores the list of hosts that must not be scanned in the kb."""
474
        exclude_hosts = self.scan_collection.get_exclude_hosts(self.scan_id)
475
476
        if exclude_hosts:
477
            pref_val = "exclude_hosts|||" + exclude_hosts
478
            self.kbdb.add_scan_preferences(self._openvas_scan_id, [pref_val])
479
480
    def prepare_scan_params_for_openvas(self, ospd_params: Dict[str, Dict]):
481
        """Get the scan parameters from the scan collection and store them
482
        in the kb.
483
        Arguments:
484
            ospd_params: Dictionary with the OSPD Params.
485
        """
486
        # Options which were supplied via the <scanner_params> XML element.
487
        options = self.scan_collection.get_options(self.scan_id)
488
        prefs_val = []
489
490
        for key, value in options.items():
491
            item_type = ''
492
            if key in ospd_params:
493
                item_type = ospd_params[key].get('type')
494
            else:
495
                if key not in BASE_SCANNER_PARAMS:
496
                    logger.debug(
497
                        "%s is a scanner only setting and should not be set "
498
                        "by the client. Setting needs to be included in "
499
                        "OpenVAS configuration file instead.",
500
                        key,
501
                    )
502
            if item_type == 'boolean':
503
                val = _from_bool_to_str(value)
504
            else:
505
                val = str(value)
506
            prefs_val.append(key + "|||" + val)
507
508
        if prefs_val:
509
            self.kbdb.add_scan_preferences(self._openvas_scan_id, prefs_val)
510
511
    @staticmethod
512
    def build_credentials_as_prefs(credentials: Dict) -> List[str]:
513
        """Parse the credential dictionary.
514
        Arguments:
515
            credentials: Dictionary with the credentials.
516
517
        Return:
518
            A list with the credentials in string format to be
519
            added to the redis KB.
520
        """
521
        cred_prefs_list = []
522
        for credential in credentials.items():
523
            service = credential[0]
524
            cred_params = credentials.get(service)
525
            cred_type = cred_params.get('type', '')
526
            username = cred_params.get('username', '')
527
            password = cred_params.get('password', '')
528
529
            if service == 'ssh':
530
                port = cred_params.get('port', '')
531
                cred_prefs_list.append('auth_port_ssh|||' + '{0}'.format(port))
532
                cred_prefs_list.append(
533
                    OID_SSH_AUTH
534
                    + ':1:'
535
                    + 'entry:SSH login '
536
                    + 'name:|||{0}'.format(username)
537
                )
538
                if cred_type == 'up':
539
                    cred_prefs_list.append(
540
                        OID_SSH_AUTH
541
                        + ':3:'
542
                        + 'password:SSH password '
543
                        + '(unsafe!):|||{0}'.format(password)
544
                    )
545
                else:
546
                    private = cred_params.get('private', '')
547
                    cred_prefs_list.append(
548
                        OID_SSH_AUTH
549
                        + ':2:'
550
                        + 'password:SSH key passphrase:|||'
551
                        + '{0}'.format(password)
552
                    )
553
                    cred_prefs_list.append(
554
                        OID_SSH_AUTH
555
                        + ':4:'
556
                        + 'file:SSH private key:|||'
557
                        + '{0}'.format(private)
558
                    )
559
            if service == 'smb':
560
                cred_prefs_list.append(
561
                    OID_SMB_AUTH
562
                    + ':1:entry'
563
                    + ':SMB login:|||{0}'.format(username)
564
                )
565
                cred_prefs_list.append(
566
                    OID_SMB_AUTH
567
                    + ':2:'
568
                    + 'password:SMB password:|||'
569
                    + '{0}'.format(password)
570
                )
571
            if service == 'esxi':
572
                cred_prefs_list.append(
573
                    OID_ESXI_AUTH
574
                    + ':1:entry:'
575
                    + 'ESXi login name:|||'
576
                    + '{0}'.format(username)
577
                )
578
                cred_prefs_list.append(
579
                    OID_ESXI_AUTH
580
                    + ':2:'
581
                    + 'password:ESXi login password:|||'
582
                    + '{0}'.format(password)
583
                )
584
585
            if service == 'snmp':
586
                community = cred_params.get('community', '')
587
                auth_algorithm = cred_params.get('auth_algorithm', '')
588
                privacy_password = cred_params.get('privacy_password', '')
589
                privacy_algorithm = cred_params.get('privacy_algorithm', '')
590
591
                cred_prefs_list.append(
592
                    OID_SNMP_AUTH
593
                    + ':1:'
594
                    + 'password:SNMP Community:|||'
595
                    + '{0}'.format(community)
596
                )
597
                cred_prefs_list.append(
598
                    OID_SNMP_AUTH
599
                    + ':2:'
600
                    + 'entry:SNMPv3 Username:|||'
601
                    + '{0}'.format(username)
602
                )
603
                cred_prefs_list.append(
604
                    OID_SNMP_AUTH + ':3:'
605
                    'password:SNMPv3 Password:|||' + '{0}'.format(password)
606
                )
607
                cred_prefs_list.append(
608
                    OID_SNMP_AUTH
609
                    + ':4:'
610
                    + 'radio:SNMPv3 Authentication Algorithm:|||'
611
                    + '{0}'.format(auth_algorithm)
612
                )
613
                cred_prefs_list.append(
614
                    OID_SNMP_AUTH
615
                    + ':5:'
616
                    + 'password:SNMPv3 Privacy Password:|||'
617
                    + '{0}'.format(privacy_password)
618
                )
619
                cred_prefs_list.append(
620
                    OID_SNMP_AUTH
621
                    + ':6:'
622
                    + 'radio:SNMPv3 Privacy Algorithm:|||'
623
                    + '{0}'.format(privacy_algorithm)
624
                )
625
626
        return cred_prefs_list
627
628
    def prepare_credentials_for_openvas(self) -> bool:
629
        """Get the credentials from the scan collection and store them
630
        in the kb."""
631
        credentials = self.scan_collection.get_credentials(self.scan_id)
632
        if credentials:
633
            cred_prefs = self.build_credentials_as_prefs(credentials)
634
            if cred_prefs:
635
                self.kbdb.add_credentials_to_scan_preferences(
636
                    self._openvas_scan_id, cred_prefs
637
                )
638
639
        if credentials and not cred_prefs:
0 ignored issues
show
introduced by
The variable cred_prefs does not seem to be defined in case credentials on line 632 is False. Are you sure this can never be the case?
Loading history...
640
            return False
641
642
        return True
643
644
    def prepare_main_kbindex_for_openvas(self):
645
        """Store main_kbindex as global preference in the
646
        kb, used by OpenVAS"""
647
        ov_maindbid = 'ov_maindbid|||%d' % self.kbdb.index
648
        self.kbdb.add_scan_preferences(self._openvas_scan_id, [ov_maindbid])
649