Passed
Pull Request — master (#374)
by
unknown
01:23
created

Openvas.get_gvm_libs_version()   A

Complexity

Conditions 3

Size

Total Lines 15
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 11
nop 1
dl 0
loc 15
rs 9.85
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
import logging
21
import subprocess
22
23
from typing import Optional, Dict, Any
24
25
logger = logging.getLogger(__name__)
26
27
_BOOL_DICT = {'no': 0, 'yes': 1}
28
29
30
class Openvas:
31
    """Class for calling the openvas executable"""
32
33
    @staticmethod
34
    def _get_version_output() -> Optional[str]:
35
        try:
36
            result = subprocess.check_output(
37
                ['openvas', '-V'], stderr=subprocess.STDOUT
38
            )
39
            return result.decode('ascii')
40
        except (subprocess.SubprocessError, OSError) as e:
41
            logger.debug(
42
                'Is was not possible to call openvas to get the version '
43
                'information. Reason %s',
44
                e,
45
            )
46
            return None
47
48
    @staticmethod
49
    def check() -> bool:
50
        """Checks that openvas command line tool is found and
51
        is executable.
52
        """
53
        try:
54
            subprocess.check_call(['openvas', '-V'], stdout=subprocess.DEVNULL)
55
            return True
56
        except (subprocess.SubprocessError, OSError) as e:
57
            logger.debug(
58
                'It was not possible to call the openvas executable. Reason %s',
59
                e,
60
            )
61
            return False
62
63
    @staticmethod
64
    def check_sudo() -> bool:
65
        """Checks if openvas can be run with sudo"""
66
        try:
67
            subprocess.check_call(
68
                ['sudo', '-n', 'openvas', '-s'], stdout=subprocess.DEVNULL
69
            )
70
            return True
71
        except (subprocess.SubprocessError, OSError) as e:
72
            logger.debug(
73
                'It was not possible to call openvas with sudo. '
74
                'The scanner will run as non-root user. Reason %s',
75
                e,
76
            )
77
            return False
78
79
    @classmethod
80
    def get_version(cls) -> Optional[str]:
81
        """Returns the version string of the openvas executable"""
82
        result = cls._get_version_output()
83
84
        if result is None:
85
            return None
86
87
        version = result.split('\n')
88
        if version[0].find('OpenVAS') < 0:
89
            return None
90
91
        return version[0]
92
93
    @staticmethod
94
    def get_settings() -> Dict[str, Any]:
95
        """Parses the current settings of the openvas executable"""
96
        param_list = dict()
97
98
        try:
99
            result = subprocess.check_output(['openvas', '-s'])
100
            result = result.decode('ascii')
101
        except (subprocess.SubprocessError, OSError) as e:
102
            logger.warning('Could not gather openvas settings. Reason %s', e)
103
            return param_list
104
105
        for conf in result.split('\n'):
106
            if not conf:
107
                continue
108
109
            try:
110
                key, value = conf.split('=', 1)
111
            except ValueError:
112
                logger.warning("Could not parse openvas setting '%s'", conf)
113
                continue
114
115
            key = key.strip()
116
            value = value.strip()
117
118
            if value:
119
                value = _BOOL_DICT.get(value, value)
120
                param_list[key] = value
121
122
        return param_list
123
124
    @staticmethod
125
    def load_vts_into_redis():
126
        """Loads all VTs into the redis database"""
127
        logger.debug('Loading VTs into Redis DB...')
128
129
        try:
130
            subprocess.check_call(
131
                ['openvas', '--update-vt-info'], stdout=subprocess.DEVNULL
132
            )
133
            logger.debug('Finished loading VTs into Redis DB')
134
        except (subprocess.SubprocessError, OSError) as err:
135
            logger.error('OpenVAS Scanner failed to load VTs. %s', err)
136
137
    @staticmethod
138
    def start_scan(
139
        scan_id: str, sudo: bool = False, niceness: int = None
140
    ) -> Optional[subprocess.Popen]:
141
        """Calls openvas to start a scan process"""
142
        cmd = []
143
144
        if niceness:
145
            cmd += ['nice', '-n', niceness]
146
            logger.debug("Starting scan with niceness %s", niceness)
147
148
        if sudo:
149
            cmd += ['sudo', '-n']
150
151
        cmd += ['openvas', '--scan-start', scan_id]
152
153
        try:
154
            return subprocess.Popen(cmd, shell=False)
155
        except (subprocess.SubprocessError, OSError) as e:
156
            # the command is not available
157
            logger.warning("Could not start scan process. Reason %s", e)
158
            return None
159
160
    @staticmethod
161
    def stop_scan(scan_id: str, sudo: bool = False) -> bool:
162
        """Calls openvas to stop a scan process"""
163
        cmd = []
164
165
        if sudo:
166
            cmd += ['sudo', '-n']
167
168
        cmd += ['openvas', '--scan-stop', scan_id]
169
170
        try:
171
            subprocess.check_call(cmd)
172
            return True
173
        except (subprocess.SubprocessError, OSError) as e:
174
            # the command is not available
175
            logger.warning(
176
                'Not possible to stop scan: %s. Reason %s',
177
                scan_id,
178
                e,
179
            )
180
            return False
181
182
    @classmethod
183
    def get_gvm_libs_version(cls) -> Optional[str]:
184
        """Parse version of gvm-libs"""
185
        result = cls._get_version_output()
186
        if not result:
187
            return None
188
189
        output = result.rstrip()
190
191
        if 'gvm-libs' not in output:
192
            return None
193
194
        lines = output.splitlines()
195
        _, version_string = lines[1].split(' ', 1)
196
        return version_string
197