ospd_openvas.openvas   A
last analyzed

Complexity

Total Complexity 24

Size/Duplication

Total Lines 184
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 113
dl 0
loc 184
rs 10
c 0
b 0
f 0
wmc 24

8 Methods

Rating   Name   Duplication   Size   Complexity  
A Openvas._get_version_output() 0 14 2
A Openvas.check() 0 14 2
A Openvas.stop_scan() 0 21 3
A Openvas.get_version() 0 13 3
A Openvas.check_sudo() 0 15 2
A Openvas.start_scan() 0 24 4
A Openvas.load_vts_into_redis() 0 12 2
B Openvas.get_settings() 0 30 6
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
from typing import Optional, Dict, Any
20
21
import logging
22
import subprocess
23
import psutil
24
25
26
logger = logging.getLogger(__name__)
27
28
_BOOL_DICT = {'no': 0, 'yes': 1}
29
30
31
class Openvas:
32
    """Class for calling the openvas executable"""
33
34
    @staticmethod
35
    def _get_version_output() -> Optional[str]:
36
        try:
37
            result = subprocess.check_output(
38
                ['openvas', '-V'], stderr=subprocess.STDOUT
39
            )
40
            return result.decode('ascii')
41
        except (subprocess.SubprocessError, OSError) as e:
42
            logger.debug(
43
                'Is was not possible to call openvas to get the version '
44
                'information. Reason %s',
45
                e,
46
            )
47
            return None
48
49
    @staticmethod
50
    def check() -> bool:
51
        """Checks that openvas command line tool is found and
52
        is executable.
53
        """
54
        try:
55
            subprocess.check_call(['openvas', '-V'], stdout=subprocess.DEVNULL)
56
            return True
57
        except (subprocess.SubprocessError, OSError) as e:
58
            logger.debug(
59
                'It was not possible to call the openvas executable. Reason %s',
60
                e,
61
            )
62
            return False
63
64
    @staticmethod
65
    def check_sudo() -> bool:
66
        """Checks if openvas can be run with sudo"""
67
        try:
68
            subprocess.check_call(
69
                ['sudo', '-n', 'openvas', '-s'], stdout=subprocess.DEVNULL
70
            )
71
            return True
72
        except (subprocess.SubprocessError, OSError) as e:
73
            logger.debug(
74
                'It was not possible to call openvas with sudo. '
75
                'The scanner will run as non-root user. Reason %s',
76
                e,
77
            )
78
            return False
79
80
    @classmethod
81
    def get_version(cls) -> Optional[str]:
82
        """Returns the version string of the openvas executable"""
83
        result = cls._get_version_output()
84
85
        if result is None:
86
            return None
87
88
        version = result.split('\n')
89
        if version[0].find('OpenVAS') < 0:
90
            return None
91
92
        return version[0]
93
94
    @staticmethod
95
    def get_settings() -> Dict[str, Any]:
96
        """Parses the current settings of the openvas executable"""
97
        param_list = dict()
98
99
        try:
100
            result = subprocess.check_output(['openvas', '-s'])
101
            result = result.decode('ascii')
102
        except (subprocess.SubprocessError, OSError, UnicodeDecodeError) as e:
103
            logger.warning('Could not gather openvas settings. Reason %s', e)
104
            return param_list
105
106
        for conf in result.split('\n'):
107
            if not conf:
108
                continue
109
110
            try:
111
                key, value = conf.split('=', 1)
112
            except ValueError:
113
                logger.warning("Could not parse openvas setting '%s'", conf)
114
                continue
115
116
            key = key.strip()
117
            value = value.strip()
118
119
            if value:
120
                value = _BOOL_DICT.get(value, value)
121
                param_list[key] = value
122
123
        return param_list
124
125
    @staticmethod
126
    def load_vts_into_redis():
127
        """Loads all VTs into the redis database"""
128
        logger.debug('Loading VTs into Redis DB...')
129
130
        try:
131
            subprocess.check_call(
132
                ['openvas', '--update-vt-info'], stdout=subprocess.DEVNULL
133
            )
134
            logger.debug('Finished loading VTs into Redis DB')
135
        except (subprocess.SubprocessError, OSError) as err:
136
            logger.error('OpenVAS Scanner failed to load VTs. %s', err)
137
138
    @staticmethod
139
    def start_scan(
140
        scan_id: str,
141
        sudo: bool = False,
142
        niceness: int = None,
143
    ) -> Optional[psutil.Popen]:
144
        """Calls openvas to start a scan process"""
145
        cmd = []
146
147
        if niceness:
148
            cmd += ['nice', '-n', niceness]
149
            logger.debug("Starting scan with niceness %s", niceness)
150
151
        if sudo:
152
            cmd += ['sudo', '-n']
153
154
        cmd += ['openvas', '--scan-start', scan_id]
155
156
        try:
157
            return psutil.Popen(cmd, shell=False)
158
        except (psutil.Error, OSError, FileNotFoundError) as e:
159
            # the command is not available
160
            logger.warning("Could not start scan process. Reason %s", e)
161
            return None
162
163
    @staticmethod
164
    def stop_scan(scan_id: str, sudo: bool = False) -> bool:
165
        """Calls openvas to stop a scan process"""
166
        cmd = []
167
168
        if sudo:
169
            cmd += ['sudo', '-n']
170
171
        cmd += ['openvas', '--scan-stop', scan_id]
172
173
        try:
174
            subprocess.check_call(cmd)
175
            return True
176
        except (subprocess.SubprocessError, OSError) as e:
177
            # the command is not available
178
            logger.warning(
179
                'Not possible to stop scan: %s. Reason %s',
180
                scan_id,
181
                e,
182
            )
183
            return False
184