Completed
Push — master ( d5bef9...4758e6 )
by
unknown
13s queued 10s
created

gvmtools.parser.CliParser._load_config()   B

Complexity

Conditions 5

Size

Total Lines 27
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 20
dl 0
loc 27
rs 8.9332
c 0
b 0
f 0
cc 5
nop 2
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 - 2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
"""Command Line Interface Parser
20
"""
21
22
import argparse
23
import logging
24
25
from pathlib import Path
26
27
from gvm import get_version as get_gvm_version
28
from gvm.connections import (
29
    DEFAULT_TIMEOUT,
30
    SSHConnection,
31
    TLSConnection,
32
    UnixSocketConnection,
33
)
34
35
from gvmtools import get_version
36
from gvmtools.config import Config
37
38
logger = logging.getLogger(__name__)
39
40
__version__ = get_version()
41
__api_version__ = get_gvm_version()
42
43
DEFAULT_CONFIG_PATH = '~/.config/gvm-tools.conf'
44
45
PROTOCOL_OSP = 'OSP'
46
PROTOCOL_GMP = 'GMP'
47
DEFAULT_PROTOCOL = PROTOCOL_GMP
48
49
50
def _filter_actions(actions, actiontypes):
51
    return [action for action in actions if not isinstance(action, actiontypes)]
52
53
54
class Subparser(argparse.ArgumentParser):
55
    """An ArgumentParser child class to allow better Subparser help formatting
56
57
    This class overrides the format_help method of ArgumentParser.
58
59
    It adds the actions of a parent parser to the usage output by skipping the
60
    _SubParserActions.
61
    """
62
63
    def __init__(self, parent=None, **kwargs):
64
        super().__init__(**kwargs)
65
66
        self._parent = parent
67
68
    def format_help(self):
69
        # pylint: disable=protected-access
70
71
        # this code may break with changes in argparse
72
73
        formatter = self._get_formatter()
74
75
        if self._parent:
76
            actions = _filter_actions(
77
                self._parent._actions, argparse._SubParsersAction
78
            )
79
            actions.extend(_filter_actions(self._actions, argparse._HelpAction))
80
        else:
81
            actions = self._actions
82
83
        formatter.add_usage(
84
            self.usage, actions, self._mutually_exclusive_groups
85
        )
86
87
        for i, action_group in enumerate(self._action_groups):
88
            formatter.start_section(action_group.title)
89
            formatter.add_text(action_group.description)
90
91
            if self._parent and len(self._parent._action_groups) > i:
92
                parent_action_group = self._parent._action_groups[i]
93
                formatter.add_arguments(parent_action_group._group_actions)
94
95
            formatter.add_arguments(
96
                _filter_actions(
97
                    action_group._group_actions, argparse._HelpAction
98
                )
99
            )
100
            formatter.end_section()
101
102
        # description
103
        formatter.add_text(self.description)
104
105
        # epilog
106
        formatter.add_text(self.epilog)
107
108
        return formatter.format_help()
109
110
111
class CliParser:
112
    def __init__(
113
        self, description, logfilename, *, prog=None, ignore_config=False
114
    ):
115
        root_parser = argparse.ArgumentParser(
116
            prog=prog,
117
            description=description,
118
            formatter_class=argparse.RawTextHelpFormatter,
119
            # don't parse help initially. the args from parser wouldn't be shown
120
            add_help=False,
121
        )
122
123
        root_parser.add_argument(
124
            '-c',
125
            '--config',
126
            nargs='?',
127
            default=DEFAULT_CONFIG_PATH,
128
            help='Configuration file path (default: %(default)s)',
129
        )
130
        root_parser.add_argument(
131
            '--log',
132
            nargs='?',
133
            dest='loglevel',
134
            const='INFO',
135
            choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
136
            help='Activate logging (default level: %(default)s)',
137
        )
138
139
        parser = argparse.ArgumentParser(prog=prog, parents=[root_parser])
140
141
        parser.add_argument(
142
            '--timeout',
143
            required=False,
144
            default=DEFAULT_TIMEOUT,
145
            type=int,
146
            help='Response timeout in seconds, or -1 to wait '
147
            'indefinitely (default: %(default)s)',
148
        )
149
        parser.add_argument(
150
            '--gmp-username',
151
            help='Username for GMP service (default: %(default)r)',
152
        )
153
        parser.add_argument(
154
            '--gmp-password',
155
            help='Password for GMP service (default: %(default)r)',
156
        )
157
        parser.add_argument(
158
            '-V',
159
            '--version',
160
            action='version',
161
            version='%(prog)s {version} (API version {apiversion})'.format(
162
                version=__version__, apiversion=__api_version__
163
            ),
164
            help='Show version information and exit',
165
        )
166
167
        subparsers = parser.add_subparsers(
168
            metavar='CONNECTION_TYPE',
169
            title='connections',
170
            description='valid connection types',
171
            help="Connection type to use",
172
            parser_class=Subparser,
173
        )
174
        subparsers.required = True
175
        subparsers.dest = 'connection_type'
176
177
        self._subparsers = subparsers
178
179
        self._parser = parser
180
        self._root_parser = root_parser
181
182
        self._logfilename = logfilename
183
        self._ignore_config = ignore_config
184
185
        self._add_subparsers()
186
187
    def parse_args(self, args=None):
188
        args_before, _ = self._root_parser.parse_known_args(args)
189
190
        if args_before.loglevel is not None:
191
            level = logging.getLevelName(args_before.loglevel)
192
            logging.basicConfig(filename=self._logfilename, level=level)
193
194
        self._set_defaults(None if self._ignore_config else args_before.config)
195
196
        args = self._parser.parse_args(args)
197
198
        # If timeout value is -1, then the socket should have no timeout
199
        if args.timeout == -1:
200
            args.timeout = None
201
202
        logging.debug('Parsed arguments %r', args)
203
204
        return args
205
206
    def add_argument(self, *args, **kwargs):
207
        self._parser_socket.add_argument(*args, **kwargs)
208
        self._parser_ssh.add_argument(*args, **kwargs)
209
        self._parser_tls.add_argument(*args, **kwargs)
210
211
    def add_protocol_argument(self):
212
        self._parser.add_argument(
213
            '--protocol',
214
            required=False,
215
            default=DEFAULT_PROTOCOL,
216
            choices=[PROTOCOL_GMP, PROTOCOL_OSP],
217
            help='Service protocol to use (default: %(default)s)',
218
        )
219
220
    def _load_config(self, configfile):
221
        config = Config()
222
223
        if not configfile:
224
            return config
225
226
        configpath = Path(configfile)
227
228
        try:
229
            if not configpath.expanduser().resolve().exists():
230
                logger.debug('Ignoring non existing config file %s', configfile)
231
                return config
232
        except FileNotFoundError:
233
            # we are on python 3.5 and Path.resolve raised a FileNotFoundError
234
            logger.debug('Ignoring non existing config file %s', configfile)
235
            return config
236
237
        try:
238
            config.load(configpath)
239
            logger.debug('Loaded config %s', configfile)
240
        except Exception as e:  # pylint: disable=broad-except
241
            raise RuntimeError(
242
                'Error while parsing config file {config}. Error was '
243
                '{message}'.format(config=configfile, message=e)
244
            )
245
246
        return config
247
248
    def _add_subparsers(self):
249
        parser_ssh = self._subparsers.add_parser(
250
            'ssh', help='Use SSH to connect to service', parent=self._parser
251
        )
252
253
        parser_ssh.add_argument(
254
            '--hostname', required=True, help='Hostname or IP address'
255
        )
256
        parser_ssh.add_argument(
257
            '--port',
258
            required=False,
259
            help='SSH port (default: %(default)s)',
260
            type=int,
261
        )
262
        parser_ssh.add_argument(
263
            '--ssh-username', help='SSH username (default: %(default)r)'
264
        )
265
        parser_ssh.add_argument(
266
            '--ssh-password', help='SSH password (default: %(default)r)'
267
        )
268
269
        parser_tls = self._subparsers.add_parser(
270
            'tls',
271
            help='Use TLS secured connection to connect to service',
272
            parent=self._parser,
273
        )
274
        parser_tls.add_argument(
275
            '--hostname', required=True, help='Hostname or IP address'
276
        )
277
        parser_tls.add_argument(
278
            '--port',
279
            required=False,
280
            help='GMP/OSP port (default: %(default)s)',
281
            type=int,
282
        )
283
        parser_tls.add_argument(
284
            '--certfile',
285
            required=False,
286
            help='Path to the certificate file for client authentication. '
287
            '(default: %(default)s)',
288
        )
289
        parser_tls.add_argument(
290
            '--keyfile',
291
            required=False,
292
            help='Path to key file for client authentication. '
293
            '(default: %(default)s)',
294
        )
295
        parser_tls.add_argument(
296
            '--cafile',
297
            required=False,
298
            help='Path to CA certificate for server authentication. '
299
            '(default: %(default)s)',
300
        )
301
        parser_tls.add_argument(
302
            '--no-credentials',
303
            required=False,
304
            default=False,
305
            action='store_true',
306
            help='Use only certificates for authentication',
307
        )
308
309
        parser_socket = self._subparsers.add_parser(
310
            'socket',
311
            help='Use UNIX Domain socket to connect to service',
312
            parent=self._parser,
313
        )
314
315
        socketpath_group = parser_socket.add_mutually_exclusive_group()
316
        socketpath_group.add_argument(
317
            '--sockpath',
318
            nargs='?',
319
            default=None,
320
            help='Deprecated, use --socketpath instead',
321
        )
322
        socketpath_group.add_argument(
323
            '--socketpath',
324
            nargs='?',
325
            help='Path to UNIX Domain socket (default: %(default)s)',
326
        )
327
328
        self._parser_ssh = parser_ssh
329
        self._parser_socket = parser_socket
330
        self._parser_tls = parser_tls
331
332
    def _set_defaults(self, configfilename=None):
333
        self._config = self._load_config(configfilename)
334
335
        self._parser.set_defaults(
336
            gmp_username=self._config.get('gmp', 'username'),
337
            gmp_password=self._config.get('gmp', 'password'),
338
            **self._config.defaults()
339
        )
340
341
        self._parser_ssh.set_defaults(
342
            port=int(self._config.get('ssh', 'port')),
343
            ssh_username=self._config.get('ssh', 'username'),
344
            ssh_password=self._config.get('ssh', 'password'),
345
        )
346
        self._parser_tls.set_defaults(
347
            port=int(self._config.get('tls', 'port')),
348
            certfile=self._config.get('tls', 'certfile'),
349
            keyfile=self._config.get('tls', 'keyfile'),
350
            cafile=self._config.get('tls', 'cafile'),
351
        )
352
        self._parser_socket.set_defaults(
353
            socketpath=self._config.get('unixsocket', 'socketpath')
354
        )
355
356
357
def create_parser(description, logfilename):
358
    return CliParser(description, logfilename)
359
360
361
def create_connection(
362
    connection_type,
363
    socketpath=None,
364
    timeout=None,
365
    hostname=None,
366
    port=None,
367
    certfile=None,
368
    keyfile=None,
369
    cafile=None,
370
    ssh_username=None,
371
    ssh_password=None,
372
    **kwargs  # pylint: disable=unused-argument
373
):
374
    if 'socket' in connection_type:
375
        return UnixSocketConnection(timeout=timeout, path=socketpath)
376
377
    if 'tls' in connection_type:
378
        return TLSConnection(
379
            timeout=timeout,
380
            hostname=hostname,
381
            port=port,
382
            certfile=certfile,
383
            keyfile=keyfile,
384
            cafile=cafile,
385
        )
386
387
    return SSHConnection(
388
        timeout=timeout,
389
        hostname=hostname,
390
        port=port,
391
        username=ssh_username,
392
        password=ssh_password,
393
    )
394