Completed
Push — master ( f571c7...585a62 )
by
unknown
17s queued 12s
created

ospd.parser.CliParser.str2bool()   A

Complexity

Conditions 3

Size

Total Lines 7
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 6
nop 2
dl 0
loc 7
rs 10
c 0
b 0
f 0
1
# Copyright (C) 2014-2020 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: AGPL-3.0-or-later
4
#
5
# This program is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Affero General Public License as
7
# published by the Free Software Foundation, either version 3 of the
8
# License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
import argparse
19
import logging
20
from pathlib import Path
21
from typing import Union
22
23
from ospd.config import Config
24
25
# Default file locations as used by a OpenVAS default installation
26
DEFAULT_KEY_FILE = "/usr/var/lib/gvm/private/CA/serverkey.pem"
27
DEFAULT_CERT_FILE = "/usr/var/lib/gvm/CA/servercert.pem"
28
DEFAULT_CA_FILE = "/usr/var/lib/gvm/CA/cacert.pem"
29
30
DEFAULT_PORT = 0
31
DEFAULT_ADDRESS = "0.0.0.0"
32
DEFAULT_NICENESS = 10
33
DEFAULT_UNIX_SOCKET_MODE = "0o700"
34
DEFAULT_CONFIG_PATH = "~/.config/ospd.conf"
35
DEFAULT_UNIX_SOCKET_PATH = "/var/run/ospd/ospd.sock"
36
DEFAULT_PID_PATH = "/var/run/ospd.pid"
37
DEFAULT_LOCKFILE_DIR_PATH = "/var/run/ospd"
38
DEFAULT_STREAM_TIMEOUT = 10  # ten seconds
39
DEFAULT_SCANINFO_STORE_TIME = 0  # in hours
40
DEFAULT_MAX_SCAN = 0  # 0 = disable
41
42
ParserType = argparse.ArgumentParser
43
Arguments = argparse.Namespace
44
45
logger = logging.getLogger(__name__)
46
47
48
class CliParser:
49
    def __init__(self, description: str) -> None:
50
        """ Create a command-line arguments parser for OSPD. """
51
        self._name = description
52
        parser = argparse.ArgumentParser(description=description)
53
54
        parser.add_argument(
55
            '--version', action='store_true', help='Print version then exit.'
56
        )
57
58
        parser.add_argument(
59
            '-s',
60
            '--config',
61
            nargs='?',
62
            default=DEFAULT_CONFIG_PATH,
63
            help='Configuration file path (default: %(default)s)',
64
        )
65
66
        parser.add_argument(
67
            '-p',
68
            '--port',
69
            default=DEFAULT_PORT,
70
            type=self.network_port,
71
            help='TCP Port to listen on. Default: %(default)s',
72
        )
73
        parser.add_argument(
74
            '-b',
75
            '--bind-address',
76
            default=DEFAULT_ADDRESS,
77
            dest='address',
78
            help='Address to listen on. Default: %(default)s',
79
        )
80
        parser.add_argument(
81
            '-u',
82
            '--unix-socket',
83
            default=DEFAULT_UNIX_SOCKET_PATH,
84
            help='Unix file socket to listen on. Default: %(default)s',
85
        )
86
        parser.add_argument(
87
            '--pid-file',
88
            default=DEFAULT_PID_PATH,
89
            help='Location of the file for the process ID. '
90
            'Default: %(default)s',
91
        )
92
        parser.add_argument(
93
            '--lock-file-dir',
94
            default=DEFAULT_LOCKFILE_DIR_PATH,
95
            help='Directory where lock files are placed. Default: %(default)s',
96
        )
97
        parser.add_argument(
98
            '-m',
99
            '--socket-mode',
100
            default=DEFAULT_UNIX_SOCKET_MODE,
101
            help='Unix file socket mode. Default: %(default)s',
102
        )
103
        parser.add_argument(
104
            '-k',
105
            '--key-file',
106
            default=DEFAULT_KEY_FILE,
107
            help='Server key file. Default: %(default)s',
108
        )
109
        parser.add_argument(
110
            '-c',
111
            '--cert-file',
112
            default=DEFAULT_CERT_FILE,
113
            help='Server cert file. Default: %(default)s',
114
        )
115
        parser.add_argument(
116
            '--ca-file',
117
            default=DEFAULT_CA_FILE,
118
            help='CA cert file. Default: %(default)s',
119
        )
120
        parser.add_argument(
121
            '-L',
122
            '--log-level',
123
            default='WARNING',
124
            type=self.log_level,
125
            help='Wished level of logging. Default: %(default)s',
126
        )
127
        parser.add_argument(
128
            '-f',
129
            '--foreground',
130
            action='store_true',
131
            help='Run in foreground and logs all messages to console.',
132
        )
133
        parser.add_argument(
134
            '-t',
135
            '--stream-timeout',
136
            default=DEFAULT_STREAM_TIMEOUT,
137
            type=int,
138
            help='Stream timeout. Default: %(default)s',
139
        )
140
        parser.add_argument(
141
            '-l', '--log-file', help='Path to the logging file.'
142
        )
143
        parser.add_argument(
144
            '--niceness',
145
            default=DEFAULT_NICENESS,
146
            type=int,
147
            help='Start the scan with the given niceness. Default %(default)s',
148
        )
149
        parser.add_argument(
150
            '--scaninfo-store-time',
151
            default=DEFAULT_SCANINFO_STORE_TIME,
152
            type=int,
153
            help='Time in hours a scan is stored before being considered '
154
            'forgotten and being delete from the scan table. '
155
            'Default %(default)s, disabled.',
156
        )
157
        parser.add_argument(
158
            '--list-commands',
159
            action='store_true',
160
            help='Display all protocol commands',
161
        )
162
        parser.add_argument(
163
            '--max-scans',
164
            default=DEFAULT_MAX_SCAN,
165
            type=int,
166
            help='Max. amount of parallel task that can be started. '
167
            'Default %(default)s, disabled',
168
        )
169
        parser.add_argument(
170
            '--check-free-memory',
171
            default=False,
172
            type=self.str2bool,
173
            help='Check if there is enough free memory to run the scan. '
174
            'This is an experimental feature. '
175
            'Default %(default)s, disabled',
176
        )
177
178
        self.parser = parser
179
180
    def network_port(self, string: str) -> int:
181
        """ Check if provided string is a valid network port. """
182
183
        value = int(string)
184
        if not 0 < value <= 65535:
185
            raise argparse.ArgumentTypeError(
186
                'port must be in ]0,65535] interval'
187
            )
188
        return value
189
190
    def str2bool(self, value: Union[int, str, bool]) -> bool:
191
        """ Check if provided string is a valid bool value. """
192
        if isinstance(value, bool):
193
            return value
194
        if value.lower() in ('yes', 'true', 't', 'y', '1'):
195
            return True
196
        return False
197
198
    def log_level(self, string: str) -> int:
199
        """ Check if provided string is a valid log level. """
200
201
        value = getattr(logging, string.upper(), None)
202
        if not isinstance(value, int):
203
            raise argparse.ArgumentTypeError(
204
                'log level must be one of {debug,info,warning,error,critical}'
205
            )
206
        return value
207
208
    def _set_defaults(self, configfilename=None) -> None:
209
        self._config = self._load_config(configfilename)
210
        self.parser.set_defaults(**self._config.defaults())
211
212
    def _load_config(self, configfile: str) -> Config:
213
        config = Config()
214
215
        if not configfile:
216
            return config
217
218
        configpath = Path(configfile)
219
220
        try:
221
            if not configpath.expanduser().resolve().exists():
222
                logger.debug('Ignoring non existing config file %s', configfile)
223
                return config
224
        except FileNotFoundError:
225
            # we are on python 3.5 and Path.resolve raised a FileNotFoundError
226
            logger.debug('Ignoring non existing config file %s', configfile)
227
            return config
228
229
        try:
230
            config.load(configpath, def_section=self._name)
231
            logger.debug('Loaded config %s', configfile)
232
        except Exception as e:  # pylint: disable=broad-except
233
            raise RuntimeError(
234
                'Error while parsing config file {config}. Error was '
235
                '{message}'.format(config=configfile, message=e)
236
            )
237
238
        return config
239
240
    def parse_arguments(self, args=None):
241
        # Parse args to get the config file path passed as option
242
        _args, _ = self.parser.parse_known_args(args)
243
244
        # Load the defaults from the config file if it exists.
245
        # This override also what it was passed as cmd option.
246
        self._set_defaults(_args.config)
247
        args, _ = self.parser.parse_known_args(args)
248
249
        return args
250
251
252
def create_parser(description: str) -> CliParser:
253
    return CliParser(description)
254