gvmtools.parser   A
last analyzed

Complexity

Total Complexity 21

Size/Duplication

Total Lines 342
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 231
dl 0
loc 342
rs 10
c 0
b 0
f 0
wmc 21

8 Methods

Rating   Name   Duplication   Size   Complexity  
A CliParser.add_protocol_argument() 0 7 1
B CliParser._add_subparsers() 0 80 1
B CliParser.__init__() 0 77 2
A CliParser._set_defaults() 0 24 1
A CliParser.add_argument() 0 4 1
A CliParser.parse_known_args() 0 18 4
A CliParser.parse_args() 0 7 2
B CliParser._load_config() 0 27 5

2 Functions

Rating   Name   Duplication   Size   Complexity  
A create_connection() 0 32 3
A create_parser() 0 2 1
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018-2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
"""Command Line Interface Parser
20
"""
21
22
import argparse
23
import logging
24
25
from pathlib import Path
26
27
from gvm import get_version as get_gvm_version
28
from gvm.connections import (
29
    DEFAULT_TIMEOUT,
30
    SSHConnection,
31
    TLSConnection,
32
    UnixSocketConnection,
33
)
34
35
from gvmtools import get_version
36
from gvmtools.config import Config
37
38
logger = logging.getLogger(__name__)
39
40
__version__ = get_version()
41
__api_version__ = get_gvm_version()
42
43
DEFAULT_CONFIG_PATH = '~/.config/gvm-tools.conf'
44
45
PROTOCOL_OSP = 'OSP'
46
PROTOCOL_GMP = 'GMP'
47
DEFAULT_PROTOCOL = PROTOCOL_GMP
48
49
50
class CliParser:
51
    def __init__(
52
        self, description, logfilename, *, prog=None, ignore_config=False
53
    ):
54
        bootstrap_parser = argparse.ArgumentParser(
55
            prog=prog,
56
            description=description,
57
            formatter_class=argparse.RawTextHelpFormatter,
58
            # don't parse help initially. the args from parser wouldn't be shown
59
            add_help=False,
60
        )
61
62
        bootstrap_parser.add_argument(
63
            '-c',
64
            '--config',
65
            nargs='?',
66
            default=DEFAULT_CONFIG_PATH,
67
            help='Configuration file path (default: %(default)s)',
68
        )
69
70
        choices = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
71
72
        bootstrap_parser.add_argument(
73
            '--log',
74
            nargs='?',
75
            dest='loglevel',
76
            const='INFO',
77
            type=lambda arg: {x.upper(): x for x in choices}[arg.upper()],
78
            choices=choices,
79
            help='Activate logging (default level: %(default)s)',
80
        )
81
82
        parser = argparse.ArgumentParser(prog=prog, parents=[bootstrap_parser])
83
84
        parser.add_argument(
85
            '--timeout',
86
            required=False,
87
            default=DEFAULT_TIMEOUT,
88
            type=int,
89
            help='Response timeout in seconds, or -1 to wait '
90
            'indefinitely (default: %(default)s)',
91
        )
92
        parser.add_argument(
93
            '--gmp-username',
94
            help='Username for GMP service (default: %(default)r)',
95
        )
96
        parser.add_argument(
97
            '--gmp-password',
98
            help='Password for GMP service (default: %(default)r)',
99
        )
100
        parser.add_argument(
101
            '-V',
102
            '--version',
103
            action='version',
104
            version='%(prog)s {version} (API version {apiversion})'.format(
105
                version=__version__, apiversion=__api_version__
106
            ),
107
            help='Show version information and exit',
108
        )
109
110
        subparsers = parser.add_subparsers(
111
            metavar='CONNECTION_TYPE',
112
            title='connections',
113
            description='valid connection types',
114
            help="Connection type to use",
115
        )
116
        subparsers.required = True
117
        subparsers.dest = 'connection_type'
118
119
        self._subparsers = subparsers
120
121
        self._parser = parser
122
        self._bootstrap_parser = bootstrap_parser
123
124
        self._logfilename = logfilename
125
        self._ignore_config = ignore_config
126
127
        self._add_subparsers()
128
129
    def parse_args(self, args=None):
130
        args, unkown_args = self.parse_known_args(args)
131
        if unkown_args:
132
            self._parser.error(
133
                'unrecognized arguments {}'.format(' '.join(unkown_args))
134
            )
135
        return args
136
137
    def parse_known_args(self, args=None):
138
        args_before, _ = self._bootstrap_parser.parse_known_args(args)
139
140
        if args_before.loglevel is not None:
141
            level = logging.getLevelName(args_before.loglevel)
142
            logging.basicConfig(filename=self._logfilename, level=level)
143
144
        self._set_defaults(None if self._ignore_config else args_before.config)
145
146
        args, unknown_args = self._parser.parse_known_args(args)
147
148
        # If timeout value is -1, then the socket should have no timeout
149
        if args.timeout == -1:
150
            args.timeout = None
151
152
        logging.debug('Parsed arguments %r', args)
153
154
        return args, unknown_args
155
156
    def add_argument(self, *args, **kwargs):
157
        self._parser_socket.add_argument(*args, **kwargs)
158
        self._parser_ssh.add_argument(*args, **kwargs)
159
        self._parser_tls.add_argument(*args, **kwargs)
160
161
    def add_protocol_argument(self):
162
        self._parser.add_argument(
163
            '--protocol',
164
            required=False,
165
            default=DEFAULT_PROTOCOL,
166
            choices=[PROTOCOL_GMP, PROTOCOL_OSP],
167
            help='Service protocol to use (default: %(default)s)',
168
        )
169
170
    def _load_config(self, configfile):
171
        config = Config()
172
173
        if not configfile:
174
            return config
175
176
        configpath = Path(configfile)
177
178
        try:
179
            if not configpath.expanduser().resolve().exists():
180
                logger.debug('Ignoring non existing config file %s', configfile)
181
                return config
182
        except FileNotFoundError:
183
            # we are on python 3.5 and Path.resolve raised a FileNotFoundError
184
            logger.debug('Ignoring non existing config file %s', configfile)
185
            return config
186
187
        try:
188
            config.load(configpath)
189
            logger.debug('Loaded config %s', configfile)
190
        except Exception as e:  # pylint: disable=broad-except
191
            raise RuntimeError(
192
                'Error while parsing config file {config}. Error was '
193
                '{message}'.format(config=configfile, message=e)
194
            ) from None
195
196
        return config
197
198
    def _add_subparsers(self):
199
        parser_ssh = self._subparsers.add_parser(
200
            'ssh', help='Use SSH to connect to service'
201
        )
202
203
        parser_ssh.add_argument(
204
            '--hostname', help='Hostname or IP address (default: %(default)s)'
205
        )
206
        parser_ssh.add_argument(
207
            '--port',
208
            required=False,
209
            help='SSH port (default: %(default)s)',
210
            type=int,
211
        )
212
        parser_ssh.add_argument(
213
            '--ssh-username', help='SSH username (default: %(default)r)'
214
        )
215
        parser_ssh.add_argument(
216
            '--ssh-password', help='SSH password (default: %(default)r)'
217
        )
218
219
        parser_tls = self._subparsers.add_parser(
220
            'tls', help='Use TLS secured connection to connect to service'
221
        )
222
        parser_tls.add_argument(
223
            '--hostname', help='Hostname or IP address (default: %(default)s)'
224
        )
225
        parser_tls.add_argument(
226
            '--port',
227
            required=False,
228
            help='GMP/OSP port (default: %(default)s)',
229
            type=int,
230
        )
231
        parser_tls.add_argument(
232
            '--certfile',
233
            required=False,
234
            help='Path to the certificate file for client authentication. '
235
            '(default: %(default)s)',
236
        )
237
        parser_tls.add_argument(
238
            '--keyfile',
239
            required=False,
240
            help='Path to key file for client authentication. '
241
            '(default: %(default)s)',
242
        )
243
        parser_tls.add_argument(
244
            '--cafile',
245
            required=False,
246
            help='Path to CA certificate for server authentication. '
247
            '(default: %(default)s)',
248
        )
249
        parser_tls.add_argument(
250
            '--no-credentials',
251
            required=False,
252
            default=False,
253
            action='store_true',
254
            help='Use only certificates for authentication',
255
        )
256
257
        parser_socket = self._subparsers.add_parser(
258
            'socket', help='Use UNIX Domain socket to connect to service'
259
        )
260
261
        socketpath_group = parser_socket.add_mutually_exclusive_group()
262
        socketpath_group.add_argument(
263
            '--sockpath',
264
            nargs='?',
265
            default=None,
266
            dest='socketpath',
267
            help='Deprecated, use --socketpath instead',
268
        )
269
        socketpath_group.add_argument(
270
            '--socketpath',
271
            nargs='?',
272
            help='Path to UNIX Domain socket (default: %(default)s)',
273
        )
274
275
        self._parser_ssh = parser_ssh
276
        self._parser_socket = parser_socket
277
        self._parser_tls = parser_tls
278
279
    def _set_defaults(self, configfilename=None):
280
        self._config = self._load_config(configfilename)
281
282
        self._parser.set_defaults(
283
            gmp_username=self._config.get('gmp', 'username'),
284
            gmp_password=self._config.get('gmp', 'password'),
285
            **self._config.defaults(),
286
        )
287
288
        self._parser_ssh.set_defaults(
289
            port=int(self._config.get('ssh', 'port')),
290
            ssh_username=self._config.get('ssh', 'username'),
291
            ssh_password=self._config.get('ssh', 'password'),
292
            hostname=self._config.get('ssh', 'hostname'),
293
        )
294
        self._parser_tls.set_defaults(
295
            port=int(self._config.get('tls', 'port')),
296
            certfile=self._config.get('tls', 'certfile'),
297
            keyfile=self._config.get('tls', 'keyfile'),
298
            cafile=self._config.get('tls', 'cafile'),
299
            hostname=self._config.get('tls', 'hostname'),
300
        )
301
        self._parser_socket.set_defaults(
302
            socketpath=self._config.get('unixsocket', 'socketpath')
303
        )
304
305
306
def create_parser(description, logfilename):
307
    return CliParser(description, logfilename)
308
309
310
def create_connection(
311
    connection_type,
312
    socketpath=None,
313
    timeout=None,
314
    hostname=None,
315
    port=None,
316
    certfile=None,
317
    keyfile=None,
318
    cafile=None,
319
    ssh_username=None,
320
    ssh_password=None,
321
    **kwargs  # pylint: disable=unused-argument
322
):
323
    if 'socket' in connection_type:
324
        return UnixSocketConnection(timeout=timeout, path=socketpath)
325
326
    if 'tls' in connection_type:
327
        return TLSConnection(
328
            timeout=timeout,
329
            hostname=hostname,
330
            port=port,
331
            certfile=certfile,
332
            keyfile=keyfile,
333
            cafile=cafile,
334
        )
335
336
    return SSHConnection(
337
        timeout=timeout,
338
        hostname=hostname,
339
        port=port,
340
        username=ssh_username,
341
        password=ssh_password,
342
    )
343