Completed
Push — master ( 1ed431...35dbab )
by
unknown
17s queued 13s
created

ospd.parser.create_parser()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
# Copyright (C) 2014-2020 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: AGPL-3.0-or-later
4
#
5
# This program is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Affero General Public License as
7
# published by the Free Software Foundation, either version 3 of the
8
# License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
import argparse
19
import logging
20
from pathlib import Path
21
22
from ospd.config import Config
23
24
# Default file locations as used by a OpenVAS default installation
25
DEFAULT_KEY_FILE = "/usr/var/lib/gvm/private/CA/serverkey.pem"
26
DEFAULT_CERT_FILE = "/usr/var/lib/gvm/CA/servercert.pem"
27
DEFAULT_CA_FILE = "/usr/var/lib/gvm/CA/cacert.pem"
28
29
DEFAULT_PORT = 0
30
DEFAULT_ADDRESS = "0.0.0.0"
31
DEFAULT_NICENESS = 10
32
DEFAULT_UNIX_SOCKET_MODE = "0o700"
33
DEFAULT_CONFIG_PATH = "~/.config/ospd.conf"
34
DEFAULT_UNIX_SOCKET_PATH = "/var/run/ospd/ospd.sock"
35
DEFAULT_PID_PATH = "/var/run/ospd.pid"
36
DEFAULT_LOCKFILE_DIR_PATH = "/var/run/ospd"
37
DEFAULT_STREAM_TIMEOUT = 10  # ten seconds
38
DEFAULT_SCANINFO_STORE_TIME = 0  # in hours
39
DEFAULT_MAX_SCAN = 0  # 0 = disable
40
DEFAULT_MIN_FREE_MEM_SCAN_QUEUE = 0  # 0 = Disable
41
DEFAULT_MAX_QUEUED_SCANS = 0  # 0 = Disable
42
43
ParserType = argparse.ArgumentParser
44
Arguments = argparse.Namespace
45
46
logger = logging.getLogger(__name__)
47
48
49
class CliParser:
50
    def __init__(self, description: str) -> None:
51
        """ Create a command-line arguments parser for OSPD. """
52
        self._name = description
53
        parser = argparse.ArgumentParser(description=description)
54
55
        parser.add_argument(
56
            '--version', action='store_true', help='Print version then exit.'
57
        )
58
59
        parser.add_argument(
60
            '-s',
61
            '--config',
62
            nargs='?',
63
            default=DEFAULT_CONFIG_PATH,
64
            help='Configuration file path (default: %(default)s)',
65
        )
66
67
        parser.add_argument(
68
            '-p',
69
            '--port',
70
            default=DEFAULT_PORT,
71
            type=self.network_port,
72
            help='TCP Port to listen on. Default: %(default)s',
73
        )
74
        parser.add_argument(
75
            '-b',
76
            '--bind-address',
77
            default=DEFAULT_ADDRESS,
78
            dest='address',
79
            help='Address to listen on. Default: %(default)s',
80
        )
81
        parser.add_argument(
82
            '-u',
83
            '--unix-socket',
84
            default=DEFAULT_UNIX_SOCKET_PATH,
85
            help='Unix file socket to listen on. Default: %(default)s',
86
        )
87
        parser.add_argument(
88
            '--pid-file',
89
            default=DEFAULT_PID_PATH,
90
            help='Location of the file for the process ID. '
91
            'Default: %(default)s',
92
        )
93
        parser.add_argument(
94
            '--lock-file-dir',
95
            default=DEFAULT_LOCKFILE_DIR_PATH,
96
            help='Directory where lock files are placed. Default: %(default)s',
97
        )
98
        parser.add_argument(
99
            '-m',
100
            '--socket-mode',
101
            default=DEFAULT_UNIX_SOCKET_MODE,
102
            help='Unix file socket mode. Default: %(default)s',
103
        )
104
        parser.add_argument(
105
            '-k',
106
            '--key-file',
107
            default=DEFAULT_KEY_FILE,
108
            help='Server key file. Default: %(default)s',
109
        )
110
        parser.add_argument(
111
            '-c',
112
            '--cert-file',
113
            default=DEFAULT_CERT_FILE,
114
            help='Server cert file. Default: %(default)s',
115
        )
116
        parser.add_argument(
117
            '--ca-file',
118
            default=DEFAULT_CA_FILE,
119
            help='CA cert file. Default: %(default)s',
120
        )
121
        parser.add_argument(
122
            '-L',
123
            '--log-level',
124
            default='WARNING',
125
            type=self.log_level,
126
            help='Wished level of logging. Default: %(default)s',
127
        )
128
        parser.add_argument(
129
            '-f',
130
            '--foreground',
131
            action='store_true',
132
            help='Run in foreground and logs all messages to console.',
133
        )
134
        parser.add_argument(
135
            '-t',
136
            '--stream-timeout',
137
            default=DEFAULT_STREAM_TIMEOUT,
138
            type=int,
139
            help='Stream timeout. Default: %(default)s',
140
        )
141
        parser.add_argument(
142
            '-l', '--log-file', help='Path to the logging file.'
143
        )
144
        parser.add_argument(
145
            '--niceness',
146
            default=DEFAULT_NICENESS,
147
            type=int,
148
            help='Start the scan with the given niceness. Default %(default)s',
149
        )
150
        parser.add_argument(
151
            '--scaninfo-store-time',
152
            default=DEFAULT_SCANINFO_STORE_TIME,
153
            type=int,
154
            help='Time in hours a scan is stored before being considered '
155
            'forgotten and being delete from the scan table. '
156
            'Default %(default)s, disabled.',
157
        )
158
        parser.add_argument(
159
            '--list-commands',
160
            action='store_true',
161
            help='Display all protocol commands',
162
        )
163
        parser.add_argument(
164
            '--max-scans',
165
            default=DEFAULT_MAX_SCAN,
166
            type=int,
167
            help='Max. amount of parallel task that can be started. '
168
            'Default %(default)s, disabled',
169
        )
170
        parser.add_argument(
171
            '--min-free-mem-scan-queue',
172
            default=DEFAULT_MIN_FREE_MEM_SCAN_QUEUE,
173
            type=int,
174
            help='Minimum free memory in MB required to run the scan. '
175
            'If no enough free memory is available, the scan queued. '
176
            'Default %(default)s, disabled',
177
        )
178
        parser.add_argument(
179
            '--max-queued-scans',
180
            default=DEFAULT_MAX_QUEUED_SCANS,
181
            type=int,
182
            help='Maximum number allowed of queued scans before '
183
            'starting to reject new scans. '
184
            'Default %(default)s, disabled',
185
        )
186
187
        self.parser = parser
188
189
    def network_port(self, string: str) -> int:
190
        """ Check if provided string is a valid network port. """
191
192
        value = int(string)
193
        if not 0 < value <= 65535:
194
            raise argparse.ArgumentTypeError(
195
                'port must be in ]0,65535] interval'
196
            )
197
        return value
198
199
    def log_level(self, string: str) -> int:
200
        """ Check if provided string is a valid log level. """
201
202
        value = getattr(logging, string.upper(), None)
203
        if not isinstance(value, int):
204
            raise argparse.ArgumentTypeError(
205
                'log level must be one of {debug,info,warning,error,critical}'
206
            )
207
        return value
208
209
    def _set_defaults(self, configfilename=None) -> None:
210
        self._config = self._load_config(configfilename)
211
        self.parser.set_defaults(**self._config.defaults())
212
213
    def _load_config(self, configfile: str) -> Config:
214
        config = Config()
215
216
        if not configfile:
217
            return config
218
219
        configpath = Path(configfile)
220
221
        try:
222
            if not configpath.expanduser().resolve().exists():
223
                logger.debug('Ignoring non existing config file %s', configfile)
224
                return config
225
        except FileNotFoundError:
226
            # we are on python 3.5 and Path.resolve raised a FileNotFoundError
227
            logger.debug('Ignoring non existing config file %s', configfile)
228
            return config
229
230
        try:
231
            config.load(configpath, def_section=self._name)
232
            logger.debug('Loaded config %s', configfile)
233
        except Exception as e:  # pylint: disable=broad-except
234
            raise RuntimeError(
235
                'Error while parsing config file {config}. Error was '
236
                '{message}'.format(config=configfile, message=e)
237
            )
238
239
        return config
240
241
    def parse_arguments(self, args=None):
242
        # Parse args to get the config file path passed as option
243
        _args, _ = self.parser.parse_known_args(args)
244
245
        # Load the defaults from the config file if it exists.
246
        # This override also what it was passed as cmd option.
247
        self._set_defaults(_args.config)
248
        args, _ = self.parser.parse_known_args(args)
249
250
        return args
251
252
253
def create_parser(description: str) -> CliParser:
254
    return CliParser(description)
255