ospd.parser   A
last analyzed

Complexity

Total Complexity 12

Size/Duplication

Total Lines 255
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 173
dl 0
loc 255
rs 10
c 0
b 0
f 0
wmc 12

1 Function

Rating   Name   Duplication   Size   Complexity  
A create_parser() 0 2 1

6 Methods

Rating   Name   Duplication   Size   Complexity  
A CliParser.parse_arguments() 0 10 1
A CliParser.log_level() 0 8 2
A CliParser._set_defaults() 0 3 1
A CliParser.network_port() 0 9 2
B CliParser.__init__() 0 143 1
A CliParser._load_config() 0 22 4
1
# Copyright (C) 2014-2021 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: AGPL-3.0-or-later
4
#
5
# This program is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Affero General Public License as
7
# published by the Free Software Foundation, either version 3 of the
8
# License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
import argparse
19
import logging
20
from pathlib import Path
21
22
from ospd.config import Config
23
24
# Default file locations as used by a OpenVAS default installation
25
DEFAULT_KEY_FILE = "/usr/var/lib/gvm/private/CA/serverkey.pem"
26
DEFAULT_CERT_FILE = "/usr/var/lib/gvm/CA/servercert.pem"
27
DEFAULT_CA_FILE = "/usr/var/lib/gvm/CA/cacert.pem"
28
29
DEFAULT_PORT = 0
30
DEFAULT_ADDRESS = "0.0.0.0"
31
DEFAULT_NICENESS = 10
32
DEFAULT_UNIX_SOCKET_MODE = "0o700"
33
DEFAULT_CONFIG_PATH = "~/.config/ospd.conf"
34
DEFAULT_LOG_CONFIG_PATH = "~/.config/ospd-logging.conf"
35
DEFAULT_UNIX_SOCKET_PATH = "/var/run/ospd/ospd.sock"
36
DEFAULT_PID_PATH = "/var/run/ospd.pid"
37
DEFAULT_LOCKFILE_DIR_PATH = "/var/run/ospd"
38
DEFAULT_STREAM_TIMEOUT = 10  # ten seconds
39
DEFAULT_SCANINFO_STORE_TIME = 0  # in hours
40
DEFAULT_MAX_SCAN = 0  # 0 = disable
41
DEFAULT_MIN_FREE_MEM_SCAN_QUEUE = 0  # 0 = Disable
42
DEFAULT_MAX_QUEUED_SCANS = 0  # 0 = Disable
43
44
ParserType = argparse.ArgumentParser
45
Arguments = argparse.Namespace
46
47
logger = logging.getLogger(__name__)
48
49
50
class CliParser:
51
    def __init__(self, description: str) -> None:
52
        """Create a command-line arguments parser for OSPD."""
53
        self._name = description
54
        parser = argparse.ArgumentParser(description=description)
55
56
        parser.add_argument(
57
            '--version', action='store_true', help='Print version then exit.'
58
        )
59
60
        parser.add_argument(
61
            '-s',
62
            '--config',
63
            nargs='?',
64
            default=DEFAULT_CONFIG_PATH,
65
            help='Configuration file path (default: %(default)s)',
66
        )
67
        parser.add_argument(
68
            '--log-config',
69
            nargs='?',
70
            default=DEFAULT_LOG_CONFIG_PATH,
71
            help='Log configuration file path (default: %(default)s)',
72
        )
73
        parser.add_argument(
74
            '-p',
75
            '--port',
76
            default=DEFAULT_PORT,
77
            type=self.network_port,
78
            help='TCP Port to listen on. Default: %(default)s',
79
        )
80
        parser.add_argument(
81
            '-b',
82
            '--bind-address',
83
            default=DEFAULT_ADDRESS,
84
            dest='address',
85
            help='Address to listen on. Default: %(default)s',
86
        )
87
        parser.add_argument(
88
            '-u',
89
            '--unix-socket',
90
            default=DEFAULT_UNIX_SOCKET_PATH,
91
            help='Unix file socket to listen on. Default: %(default)s',
92
        )
93
        parser.add_argument(
94
            '--pid-file',
95
            default=DEFAULT_PID_PATH,
96
            help='Location of the file for the process ID. '
97
            'Default: %(default)s',
98
        )
99
        parser.add_argument(
100
            '--lock-file-dir',
101
            default=DEFAULT_LOCKFILE_DIR_PATH,
102
            help='Directory where lock files are placed. Default: %(default)s',
103
        )
104
        parser.add_argument(
105
            '-m',
106
            '--socket-mode',
107
            default=DEFAULT_UNIX_SOCKET_MODE,
108
            help='Unix file socket mode. Default: %(default)s',
109
        )
110
        parser.add_argument(
111
            '-k',
112
            '--key-file',
113
            default=DEFAULT_KEY_FILE,
114
            help='Server key file. Default: %(default)s',
115
        )
116
        parser.add_argument(
117
            '-c',
118
            '--cert-file',
119
            default=DEFAULT_CERT_FILE,
120
            help='Server cert file. Default: %(default)s',
121
        )
122
        parser.add_argument(
123
            '--ca-file',
124
            default=DEFAULT_CA_FILE,
125
            help='CA cert file. Default: %(default)s',
126
        )
127
        parser.add_argument(
128
            '-L',
129
            '--log-level',
130
            default='INFO',
131
            type=self.log_level,
132
            help='Wished level of logging. Default: %(default)s',
133
        )
134
        parser.add_argument(
135
            '-f',
136
            '--foreground',
137
            action='store_true',
138
            help='Run in foreground and logs all messages to console.',
139
        )
140
        parser.add_argument(
141
            '-t',
142
            '--stream-timeout',
143
            default=DEFAULT_STREAM_TIMEOUT,
144
            type=int,
145
            help='Stream timeout. Default: %(default)s',
146
        )
147
        parser.add_argument(
148
            '-l', '--log-file', help='Path to the logging file.'
149
        )
150
        parser.add_argument(
151
            '--niceness',
152
            default=DEFAULT_NICENESS,
153
            type=int,
154
            help='Start the scan with the given niceness. Default %(default)s',
155
        )
156
        parser.add_argument(
157
            '--scaninfo-store-time',
158
            default=DEFAULT_SCANINFO_STORE_TIME,
159
            type=int,
160
            help='Time in hours a scan is stored before being considered '
161
            'forgotten and being delete from the scan table. '
162
            'Default %(default)s, disabled.',
163
        )
164
        parser.add_argument(
165
            '--list-commands',
166
            action='store_true',
167
            help='Display all protocol commands',
168
        )
169
        parser.add_argument(
170
            '--max-scans',
171
            default=DEFAULT_MAX_SCAN,
172
            type=int,
173
            help='Max. amount of parallel task that can be started. '
174
            'Default %(default)s, disabled',
175
        )
176
        parser.add_argument(
177
            '--min-free-mem-scan-queue',
178
            default=DEFAULT_MIN_FREE_MEM_SCAN_QUEUE,
179
            type=int,
180
            help='Minimum free memory in MB required to run the scan. '
181
            'If no enough free memory is available, the scan queued. '
182
            'Default %(default)s, disabled',
183
        )
184
        parser.add_argument(
185
            '--max-queued-scans',
186
            default=DEFAULT_MAX_QUEUED_SCANS,
187
            type=int,
188
            help='Maximum number allowed of queued scans before '
189
            'starting to reject new scans. '
190
            'Default %(default)s, disabled',
191
        )
192
193
        self.parser = parser
194
195
    def network_port(self, string: str) -> int:
196
        """Check if provided string is a valid network port."""
197
198
        value = int(string)
199
        if not 0 < value <= 65535:
200
            raise argparse.ArgumentTypeError(
201
                'port must be in ]0,65535] interval'
202
            )
203
        return value
204
205
    def log_level(self, string: str) -> str:
206
        """Check if provided string is a valid log level."""
207
208
        if not hasattr(logging, string.upper()):
209
            raise argparse.ArgumentTypeError(
210
                'log level must be one of {debug,info,warning,error,critical}'
211
            )
212
        return string.upper()
213
214
    def _set_defaults(self, configfilename=None) -> None:
215
        self._config = self._load_config(configfilename)
216
        self.parser.set_defaults(**self._config.defaults())
217
218
    def _load_config(self, configfile: str) -> Config:
219
        config = Config()
220
221
        if not configfile:
222
            return config
223
224
        configpath = Path(configfile)
225
226
        if not configpath.expanduser().resolve().exists():
227
            logger.debug('Ignoring non existing config file %s', configfile)
228
            return config
229
230
        try:
231
            config.load(configpath, def_section=self._name)
232
            logger.debug('Loaded config %s', configfile)
233
        except Exception as e:  # pylint: disable=broad-except
234
            raise RuntimeError(
235
                'Error while parsing config file {config}. Error was '
236
                '{message}'.format(config=configfile, message=e)
237
            ) from None
238
239
        return config
240
241
    def parse_arguments(self, args=None):
242
        # Parse args to get the config file path passed as option
243
        _args, _ = self.parser.parse_known_args(args)
244
245
        # Load the defaults from the config file if it exists.
246
        # This override also what it was passed as cmd option.
247
        self._set_defaults(_args.config)
248
        args, _ = self.parser.parse_known_args(args)
249
250
        return args
251
252
253
def create_parser(description: str) -> CliParser:
254
    return CliParser(description)
255