Passed
Pull Request — master (#307)
by
unknown
01:05
created

PreferenceHandler.prepare_scan_params_for_openvas()   B

Complexity

Conditions 6

Size

Total Lines 30
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 17
nop 2
dl 0
loc 30
rs 8.6166
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
# pylint: disable=too-many-lines
21
22
""" Prepare the preferences to be used by OpenVAS. Get the data from the scan
23
collection and store the data in a redis KB in the right format to be used by
24
OpenVAS. """
25
26
import logging
27
import uuid
28
import binascii
29
30
from enum import IntEnum
31
from typing import Optional, Dict, List, Tuple
32
from base64 import b64decode
33
34
from ospd.scan import ScanCollection
35
from ospd.ospd import BASE_SCANNER_PARAMS
36
from ospd_openvas.openvas import Openvas
37
from ospd_openvas.db import KbDB
38
from ospd_openvas.nvticache import NVTICache
39
from ospd_openvas.vthelper import VtHelper
40
41
logger = logging.getLogger(__name__)
42
43
44
OID_SSH_AUTH = "1.3.6.1.4.1.25623.1.0.103591"
45
OID_SMB_AUTH = "1.3.6.1.4.1.25623.1.0.90023"
46
OID_ESXI_AUTH = "1.3.6.1.4.1.25623.1.0.105058"
47
OID_SNMP_AUTH = "1.3.6.1.4.1.25623.1.0.105076"
48
OID_PING_HOST = "1.3.6.1.4.1.25623.1.0.100315"
49
50
BOREAS_ALIVE_TEST = "ALIVE_TEST"
51
BOREAS_SETTING_NAME = "test_alive_hosts_only"
52
53
54
class AliveTest(IntEnum):
55
    """ Alive Tests. """
56
57
    ALIVE_TEST_SCAN_CONFIG_DEFAULT = 0
58
    ALIVE_TEST_TCP_ACK_SERVICE = 1
59
    ALIVE_TEST_ICMP = 2
60
    ALIVE_TEST_ARP = 4
61
    ALIVE_TEST_CONSIDER_ALIVE = 8
62
    ALIVE_TEST_TCP_SYN_SERVICE = 16
63
64
65
def _from_bool_to_str(value: int) -> str:
66
    """ The OpenVAS scanner use yes and no as boolean values, whereas ospd
67
    uses 1 and 0."""
68
    return 'yes' if value == 1 else 'no'
69
70
71
class PreferenceHandler:
72
    def __init__(
73
        self,
74
        scan_id: str,
75
        kbdb: KbDB,
76
        scan_collection: ScanCollection,
77
        nvticache: NVTICache,
78
    ):
79
        self.scan_id = scan_id
80
        self.kbdb = kbdb
81
        self.scan_collection = scan_collection
82
83
        self._openvas_scan_id = None
84
85
        self._target_options = None
86
87
        self.nvti = nvticache
88
89
    def prepare_openvas_scan_id_for_openvas(self):
90
        """ Create the openvas scan id and store it in the redis kb.
91
        Return the openvas scan_id.
92
        """
93
        self._openvas_scan_id = str(uuid.uuid4())
94
        self.kbdb.add_scan_id(self.scan_id, self._openvas_scan_id)
95
96
        return self._openvas_scan_id
97
98
    @property
99
    def target_options(self) -> Dict:
100
        """ Return target options from Scan collection """
101
        if self._target_options is not None:
102
            return self._target_options
103
104
        self._target_options = self.scan_collection.get_target_options(
105
            self.scan_id
106
        )
107
        return self._target_options
108
109
    def _get_vts_in_groups(self, filters: List[str],) -> List[str]:
110
        """ Return a list of vts which match with the given filter.
111
112
        Arguments:
113
            filters A list of filters. Each filter has key, operator and
114
                    a value. They are separated by a space.
115
                    Supported keys: family
116
117
        Returns a list of vt oids which match with the given filter.
118
        """
119
        vts_list = list()
120
        families = dict()
121
122
        oids = self.nvti.get_oids()
123
124
        for _, oid in oids:
125
            family = self.nvti.get_nvt_family(oid)
126
            if family not in families:
127
                families[family] = list()
128
129
            families[family].append(oid)
130
131
        for elem in filters:
132
            key, value = elem.split('=')
133
            if key == 'family' and value in families:
134
                vts_list.extend(families[value])
135
136
        return vts_list
137
138
    def _get_vt_param_type(self, vt: Dict, vt_param_id: str) -> Optional[str]:
139
        """ Return the type of the vt parameter from the vts dictionary. """
140
141
        vt_params_list = vt.get("vt_params")
142
        if vt_params_list.get(vt_param_id):
143
            return vt_params_list[vt_param_id]["type"]
144
        return None
145
146
    def _get_vt_param_name(self, vt: Dict, vt_param_id: str) -> Optional[str]:
147
        """ Return the type of the vt parameter from the vts dictionary. """
148
149
        vt_params_list = vt.get("vt_params")
150
        if vt_params_list.get(vt_param_id):
151
            return vt_params_list[vt_param_id]["name"]
152
        return None
153
154
    @staticmethod
155
    def check_param_type(vt_param_value: str, param_type: str) -> Optional[int]:
156
        """ Check if the value of a vt parameter matches with
157
        the type founded.
158
        """
159
        if param_type in [
160
            'entry',
161
            'password',
162
            'radio',
163
            'sshlogin',
164
        ] and isinstance(vt_param_value, str):
165
            return None
166
        elif param_type == 'checkbox' and (
167
            vt_param_value == '0' or vt_param_value == '1'
168
        ):
169
            return None
170
        elif param_type == 'file':
171
            try:
172
                b64decode(vt_param_value.encode())
173
            except (binascii.Error, AttributeError, TypeError):
174
                return 1
175
            return None
176
        elif param_type == 'integer':
177
            try:
178
                int(vt_param_value)
179
            except ValueError:
180
                return 1
181
            return None
182
183
        return 1
184
185
    def _process_vts(
186
        self, vts: Dict[str, Dict[str, str]],
187
    ) -> Tuple[List[str], Dict[str, str]]:
188
        """ Add single VTs and their parameters. """
189
        vts_list = []
190
        vts_params = {}
191
        vtgroups = vts.pop('vt_groups')
192
193
        vthelper = VtHelper(self.nvti)
194
195
        if vtgroups:
196
            vts_list = self._get_vts_in_groups(vtgroups)
197
198
        for vtid, vt_params in vts.items():
199
            vt = vthelper.get_single_vt(vtid)
200
            if not vt:
201
                logger.warning(
202
                    'The VT %s was not found and it will not be added to the '
203
                    'plugin scheduler.',
204
                    vtid,
205
                )
206
                continue
207
208
            vts_list.append(vtid)
209
            for vt_param_id, vt_param_value in vt_params.items():
210
                param_type = self._get_vt_param_type(vt, vt_param_id)
211
                param_name = self._get_vt_param_name(vt, vt_param_id)
212
213
                if not param_type or not param_name:
214
                    logger.debug(
215
                        'Missing type or name for VT parameter %s of %s. '
216
                        'This VT parameter will not be set.',
217
                        vt_param_id,
218
                        vtid,
219
                    )
220
                    continue
221
222
                if vt_param_id == '0':
223
                    type_aux = 'integer'
224
                else:
225
                    type_aux = param_type
226
227
                if self.check_param_type(vt_param_value, type_aux):
228
                    logger.debug(
229
                        'The VT parameter %s for %s could not be set. '
230
                        'Expected %s type for parameter value %s',
231
                        vt_param_id,
232
                        vtid,
233
                        type_aux,
234
                        str(vt_param_value),
235
                    )
236
                    continue
237
238
                if type_aux == 'checkbox':
239
                    vt_param_value = _from_bool_to_str(int(vt_param_value))
240
241
                vts_params[
242
                    "{0}:{1}:{2}:{3}".format(
243
                        vtid, vt_param_id, param_type, param_name
244
                    )
245
                ] = str(vt_param_value)
246
247
        return vts_list, vts_params
248
249
    def prepare_plugins_for_openvas(self) -> bool:
250
        """ Get the plugin list to be launched from the Scan Collection
251
        and prepare the vts preferences. Store the data in the kb.
252
        """
253
        nvts = self.scan_collection.get_vts(self.scan_id)
254
        if nvts:
255
            nvts_list, nvts_params = self._process_vts(nvts)
256
            # Add nvts list
257
            separ = ';'
258
            plugin_list = 'plugin_set|||%s' % separ.join(nvts_list)
259
            self.kbdb.add_scan_preferences(self._openvas_scan_id, [plugin_list])
260
261
            # Add nvts parameters
262
            for key, val in nvts_params.items():
263
                item = '%s|||%s' % (key, val)
264
                self.kbdb.add_scan_preferences(self._openvas_scan_id, [item])
265
266
            nvts_params = None
267
            nvts_list = None
268
            item = None
269
            plugin_list = None
270
            nvts = None
271
272
            return True
273
274
        return False
275
276
    @staticmethod
277
    def build_alive_test_opt_as_prefs(
278
        target_options: Dict[str, str]
279
    ) -> List[str]:
280
        """ Parse the target options dictionary.
281
        Arguments:
282
            target_options: Dictionary with the target options.
283
284
        Return:
285
            A list with the target options related to alive test method
286
            in string format to be added to the redis KB.
287
        """
288
        target_opt_prefs_list = []
289
        if target_options and target_options.get('alive_test'):
290
            try:
291
                alive_test = int(target_options.get('alive_test'))
292
            except ValueError:
293
                logger.debug(
294
                    'Alive test settings not applied. '
295
                    'Invalid alive test value %s',
296
                    target_options.get('alive_test'),
297
                )
298
                return target_opt_prefs_list
299
300
            if alive_test < 1 or alive_test > 31:
301
                return target_opt_prefs_list
302
303
            if (
304
                alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
305
                or alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
306
            ):
307
                value = "yes"
308
            else:
309
                value = "no"
310
            target_opt_prefs_list.append(
311
                OID_PING_HOST
312
                + ':1:checkbox:'
313
                + 'Do a TCP ping|||'
314
                + '{0}'.format(value)
315
            )
316
317
            if (
318
                alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
319
                and alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
320
            ):
321
                value = "yes"
322
            else:
323
                value = "no"
324
            target_opt_prefs_list.append(
325
                OID_PING_HOST
326
                + ':2:checkbox:'
327
                + 'TCP ping tries also TCP-SYN ping|||'
328
                + '{0}'.format(value)
329
            )
330
331
            if (alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE) and not (
332
                alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
333
            ):
334
                value = "yes"
335
            else:
336
                value = "no"
337
            target_opt_prefs_list.append(
338
                OID_PING_HOST
339
                + ':7:checkbox:'
340
                + 'TCP ping tries only TCP-SYN ping|||'
341
                + '{0}'.format(value)
342
            )
343
344
            if alive_test & AliveTest.ALIVE_TEST_ICMP:
345
                value = "yes"
346
            else:
347
                value = "no"
348
            target_opt_prefs_list.append(
349
                OID_PING_HOST
350
                + ':3:checkbox:'
351
                + 'Do an ICMP ping|||'
352
                + '{0}'.format(value)
353
            )
354
355
            if alive_test & AliveTest.ALIVE_TEST_ARP:
356
                value = "yes"
357
            else:
358
                value = "no"
359
            target_opt_prefs_list.append(
360
                OID_PING_HOST
361
                + ':4:checkbox:'
362
                + 'Use ARP|||'
363
                + '{0}'.format(value)
364
            )
365
366
            if alive_test & AliveTest.ALIVE_TEST_CONSIDER_ALIVE:
367
                value = "no"
368
            else:
369
                value = "yes"
370
            target_opt_prefs_list.append(
371
                OID_PING_HOST
372
                + ':5:checkbox:'
373
                + 'Mark unrechable Hosts as dead (not scanning)|||'
374
                + '{0}'.format(value)
375
            )
376
377
            # Also select a method, otherwise Ping Host logs a warning.
378
            if alive_test == AliveTest.ALIVE_TEST_CONSIDER_ALIVE:
379
                target_opt_prefs_list.append(
380
                    OID_PING_HOST + ':1:checkbox:' + 'Do a TCP ping|||yes'
381
                )
382
        return target_opt_prefs_list
383
384
    def prepare_alive_test_option_for_openvas(self):
385
        """ Set alive test option. Overwrite the scan config settings."""
386
        settings = Openvas.get_settings()
387
        if settings and self.target_options.get('alive_test'):
388
            alive_test_opt = self.build_alive_test_opt_as_prefs(
389
                self.target_options
390
            )
391
            self.kbdb.add_scan_preferences(
392
                self._openvas_scan_id, alive_test_opt
393
            )
394
395
    def prepare_boreas_alive_test(self):
396
        """ Set alive_test for Boreas if boreas scanner config
397
        (BOREAS_SETTING_NAME) was set"""
398
        settings = Openvas.get_settings()
399
        alive_test = -1
400
401
        if settings:
402
            boreas = settings.get(BOREAS_SETTING_NAME)
403
            if not boreas:
404
                return
405
            alive_test_str = self.target_options.get('alive_test')
406
            if alive_test_str is not None:
407
                try:
408
                    alive_test = int(alive_test_str)
409
                except ValueError:
410
                    logger.debug(
411
                        'Alive test preference for Boreas not set. '
412
                        'Invalid alive test value %s.',
413
                        alive_test_str,
414
                    )
415
            # ALIVE_TEST_SCAN_CONFIG_DEFAULT if no alive_test provided
416
            else:
417
                alive_test = AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT
418
419
        # If a valid alive_test was set then the bit mask
420
        # has value between 31 (11111) and 1 (10000)
421
        if 1 <= alive_test <= 31:
422
            pref = "{pref_key}|||{pref_value}".format(
423
                pref_key=BOREAS_ALIVE_TEST, pref_value=alive_test
424
            )
425
            self.kbdb.add_scan_preferences(self._openvas_scan_id, [pref])
426
427
        if alive_test == AliveTest.ALIVE_TEST_SCAN_CONFIG_DEFAULT:
428
            alive_test = AliveTest.ALIVE_TEST_ICMP
429
            pref = "{pref_key}|||{pref_value}".format(
430
                pref_key=BOREAS_ALIVE_TEST, pref_value=alive_test
431
            )
432
            self.kbdb.add_scan_preferences(self._openvas_scan_id, [pref])
433
434
    def prepare_reverse_lookup_opt_for_openvas(self):
435
        """ Set reverse lookup options in the kb"""
436
        if self.target_options:
437
            items = []
438
            _rev_lookup_only = int(
439
                self.target_options.get('reverse_lookup_only', '0')
440
            )
441
            rev_lookup_only = _from_bool_to_str(_rev_lookup_only)
442
            items.append('reverse_lookup_only|||%s' % (rev_lookup_only))
443
444
            _rev_lookup_unify = int(
445
                self.target_options.get('reverse_lookup_unify', '0')
446
            )
447
            rev_lookup_unify = _from_bool_to_str(_rev_lookup_unify)
448
            items.append('reverse_lookup_unify|||%s' % rev_lookup_unify)
449
450
            self.kbdb.add_scan_preferences(self._openvas_scan_id, items)
451
452
    def prepare_target_for_openvas(self):
453
        """ Get the target from the scan collection and set the target
454
        in the kb """
455
456
        target = self.scan_collection.get_host_list(self.scan_id)
457
        target_aux = 'TARGET|||%s' % target
458
        self.kbdb.add_scan_preferences(self._openvas_scan_id, [target_aux])
459
460
    def prepare_ports_for_openvas(self) -> str:
461
        """ Get the port list from the scan collection and store the list
462
        in the kb. """
463
        ports = self.scan_collection.get_ports(self.scan_id)
464
        port_range = 'port_range|||%s' % ports
465
        self.kbdb.add_scan_preferences(self._openvas_scan_id, [port_range])
466
467
        return ports
468
469
    def prepare_host_options_for_openvas(self):
470
        """ Get the excluded and finished hosts from the scan collection and
471
        stores the list of hosts that must not be scanned in the kb. """
472
        exclude_hosts = self.scan_collection.get_exclude_hosts(self.scan_id)
473
474
        if exclude_hosts:
475
            pref_val = "exclude_hosts|||" + exclude_hosts
476
            self.kbdb.add_scan_preferences(self._openvas_scan_id, [pref_val])
477
478
    def prepare_scan_params_for_openvas(self, ospd_params: Dict[str, Dict]):
479
        """ Get the scan parameters from the scan collection and store them
480
        in the kb.
481
        Arguments:
482
            ospd_params: Dictionary with the OSPD Params.
483
        """
484
        # Options which were supplied via the <scanner_params> XML element.
485
        options = self.scan_collection.get_options(self.scan_id)
486
        prefs_val = []
487
488
        for key, value in options.items():
489
            item_type = ''
490
            if key in ospd_params:
491
                item_type = ospd_params[key].get('type')
492
            else:
493
                if key not in BASE_SCANNER_PARAMS:
494
                    logger.debug(
495
                        "%s is a scanner only setting and should not be set "
496
                        "by the client. Setting needs to be included in "
497
                        "OpenVAS configuration file instead.",
498
                        key,
499
                    )
500
            if item_type == 'boolean':
501
                val = _from_bool_to_str(value)
502
            else:
503
                val = str(value)
504
            prefs_val.append(key + "|||" + val)
505
506
        if prefs_val:
507
            self.kbdb.add_scan_preferences(self._openvas_scan_id, prefs_val)
508
509
    @staticmethod
510
    def build_credentials_as_prefs(credentials: Dict) -> List[str]:
511
        """ Parse the credential dictionary.
512
        Arguments:
513
            credentials: Dictionary with the credentials.
514
515
        Return:
516
            A list with the credentials in string format to be
517
            added to the redis KB.
518
        """
519
        cred_prefs_list = []
520
        for credential in credentials.items():
521
            service = credential[0]
522
            cred_params = credentials.get(service)
523
            cred_type = cred_params.get('type', '')
524
            username = cred_params.get('username', '')
525
            password = cred_params.get('password', '')
526
527
            if service == 'ssh':
528
                port = cred_params.get('port', '')
529
                cred_prefs_list.append('auth_port_ssh|||' + '{0}'.format(port))
530
                cred_prefs_list.append(
531
                    OID_SSH_AUTH
532
                    + ':1:'
533
                    + 'entry:SSH login '
534
                    + 'name:|||{0}'.format(username)
535
                )
536
                if cred_type == 'up':
537
                    cred_prefs_list.append(
538
                        OID_SSH_AUTH
539
                        + ':3:'
540
                        + 'password:SSH password '
541
                        + '(unsafe!):|||{0}'.format(password)
542
                    )
543
                else:
544
                    private = cred_params.get('private', '')
545
                    cred_prefs_list.append(
546
                        OID_SSH_AUTH
547
                        + ':2:'
548
                        + 'password:SSH key passphrase:|||'
549
                        + '{0}'.format(password)
550
                    )
551
                    cred_prefs_list.append(
552
                        OID_SSH_AUTH
553
                        + ':4:'
554
                        + 'file:SSH private key:|||'
555
                        + '{0}'.format(private)
556
                    )
557
            if service == 'smb':
558
                cred_prefs_list.append(
559
                    OID_SMB_AUTH
560
                    + ':1:entry'
561
                    + ':SMB login:|||{0}'.format(username)
562
                )
563
                cred_prefs_list.append(
564
                    OID_SMB_AUTH
565
                    + ':2:'
566
                    + 'password:SMB password:|||'
567
                    + '{0}'.format(password)
568
                )
569
            if service == 'esxi':
570
                cred_prefs_list.append(
571
                    OID_ESXI_AUTH
572
                    + ':1:entry:'
573
                    + 'ESXi login name:|||'
574
                    + '{0}'.format(username)
575
                )
576
                cred_prefs_list.append(
577
                    OID_ESXI_AUTH
578
                    + ':2:'
579
                    + 'password:ESXi login password:|||'
580
                    + '{0}'.format(password)
581
                )
582
583
            if service == 'snmp':
584
                community = cred_params.get('community', '')
585
                auth_algorithm = cred_params.get('auth_algorithm', '')
586
                privacy_password = cred_params.get('privacy_password', '')
587
                privacy_algorithm = cred_params.get('privacy_algorithm', '')
588
589
                cred_prefs_list.append(
590
                    OID_SNMP_AUTH
591
                    + ':1:'
592
                    + 'password:SNMP Community:|||'
593
                    + '{0}'.format(community)
594
                )
595
                cred_prefs_list.append(
596
                    OID_SNMP_AUTH
597
                    + ':2:'
598
                    + 'entry:SNMPv3 Username:|||'
599
                    + '{0}'.format(username)
600
                )
601
                cred_prefs_list.append(
602
                    OID_SNMP_AUTH + ':3:'
603
                    'password:SNMPv3 Password:|||' + '{0}'.format(password)
604
                )
605
                cred_prefs_list.append(
606
                    OID_SNMP_AUTH
607
                    + ':4:'
608
                    + 'radio:SNMPv3 Authentication Algorithm:|||'
609
                    + '{0}'.format(auth_algorithm)
610
                )
611
                cred_prefs_list.append(
612
                    OID_SNMP_AUTH
613
                    + ':5:'
614
                    + 'password:SNMPv3 Privacy Password:|||'
615
                    + '{0}'.format(privacy_password)
616
                )
617
                cred_prefs_list.append(
618
                    OID_SNMP_AUTH
619
                    + ':6:'
620
                    + 'radio:SNMPv3 Privacy Algorithm:|||'
621
                    + '{0}'.format(privacy_algorithm)
622
                )
623
624
        return cred_prefs_list
625
626
    def prepare_credentials_for_openvas(self) -> bool:
627
        """ Get the credentials from the scan collection and store them
628
        in the kb. """
629
        credentials = self.scan_collection.get_credentials(self.scan_id)
630
        if credentials:
631
            cred_prefs = self.build_credentials_as_prefs(credentials)
632
            if cred_prefs:
633
                self.kbdb.add_credentials_to_scan_preferences(
634
                    self._openvas_scan_id, cred_prefs
635
                )
636
637
        if credentials and not cred_prefs:
0 ignored issues
show
introduced by
The variable cred_prefs does not seem to be defined in case credentials on line 630 is False. Are you sure this can never be the case?
Loading history...
638
            return False
639
640
        return True
641
642
    def prepare_main_kbindex_for_openvas(self):
643
        """ Store main_kbindex as global preference in the
644
        kb, used by OpenVAS"""
645
        ov_maindbid = 'ov_maindbid|||%d' % self.kbdb.index
646
        self.kbdb.add_scan_preferences(self._openvas_scan_id, [ov_maindbid])
647