Completed
Push — master ( 2d460f...5c88c5 )
by
unknown
12s
created

gvmtools.parser.CliParser._set_defaults()   A

Complexity

Conditions 1

Size

Total Lines 25
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 19
dl 0
loc 25
rs 9.45
c 0
b 0
f 0
cc 1
nop 2
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 - 2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
"""Command Line Interface Parser
20
"""
21
22
import argparse
23
import configparser
24
import logging
25
import os
26
27
from gvm import get_version as get_gvm_version
28
from gvm.connections import (
29
    DEFAULT_UNIX_SOCKET_PATH,
30
    DEFAULT_TIMEOUT,
31
    DEFAULT_GVM_PORT,
32
    SSHConnection,
33
    TLSConnection,
34
    UnixSocketConnection,
35
)
36
37
from gvmtools import get_version
38
39
logger = logging.getLogger(__name__)
40
41
__version__ = get_version()
42
__api_version__ = get_gvm_version()
43
44
DEFAULT_CONFIG_PATH = '~/.config/gvm-tools.conf'
45
46
PROTOCOL_OSP = 'OSP'
47
PROTOCOL_GMP = 'GMP'
48
DEFAULT_PROTOCOL = PROTOCOL_GMP
49
50
51
def _filter_actions(actions, actiontypes):
52
    return [action for action in actions if not isinstance(action, actiontypes)]
53
54
55
class Subparser(argparse.ArgumentParser):
56
    """An ArgumentParser child class to allow better Subparser help formatting
57
58
    This class overrides the format_help method of ArgumentParser.
59
60
    It adds the actions of a parent parser to the usage output by skipping the
61
    _SubParserActions.
62
    """
63
64
    def __init__(self, parent=None, **kwargs):
65
        super().__init__(**kwargs)
66
67
        self._parent = parent
68
69
    def format_help(self):
70
        # pylint: disable=protected-access
71
72
        # this code may break with changes in argparse
73
74
        formatter = self._get_formatter()
75
76
        if self._parent:
77
            actions = _filter_actions(
78
                self._parent._actions, argparse._SubParsersAction
79
            )
80
            actions.extend(_filter_actions(self._actions, argparse._HelpAction))
81
        else:
82
            actions = self._actions
83
84
        formatter.add_usage(
85
            self.usage, actions, self._mutually_exclusive_groups
86
        )
87
88
        for i, action_group in enumerate(self._action_groups):
89
            formatter.start_section(action_group.title)
90
            formatter.add_text(action_group.description)
91
92
            if self._parent and len(self._parent._action_groups) > i:
93
                parent_action_group = self._parent._action_groups[i]
94
                formatter.add_arguments(parent_action_group._group_actions)
95
96
            formatter.add_arguments(
97
                _filter_actions(
98
                    action_group._group_actions, argparse._HelpAction
99
                )
100
            )
101
            formatter.end_section()
102
103
        # description
104
        formatter.add_text(self.description)
105
106
        # epilog
107
        formatter.add_text(self.epilog)
108
109
        return formatter.format_help()
110
111
112
class CliParser:
113
    def __init__(
114
        self, description, logfilename, *, prog=None, ignore_config=False
115
    ):
116
        root_parser = argparse.ArgumentParser(
117
            prog=prog,
118
            description=description,
119
            formatter_class=argparse.RawTextHelpFormatter,
120
            # don't parse help initially. the args from parser wouldn't be shown
121
            add_help=False,
122
        )
123
124
        root_parser.add_argument(
125
            '-c',
126
            '--config',
127
            nargs='?',
128
            default=DEFAULT_CONFIG_PATH,
129
            help='Configuration file path (default: %(default)s)',
130
        )
131
        root_parser.add_argument(
132
            '--log',
133
            nargs='?',
134
            dest='loglevel',
135
            const='INFO',
136
            choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
137
            help='Activate logging (default level: %(default)s)',
138
        )
139
140
        parser = argparse.ArgumentParser(prog=prog, parents=[root_parser])
141
142
        parser.add_argument(
143
            '--timeout',
144
            required=False,
145
            default=DEFAULT_TIMEOUT,
146
            type=int,
147
            help='Response timeout in seconds, or -1 to wait '
148
            'indefinitely (default: %(default)s)',
149
        )
150
        parser.add_argument(
151
            '--gmp-username',
152
            help='Username for GMP service (default: %(default)r)',
153
        )
154
        parser.add_argument(
155
            '--gmp-password',
156
            help='Password for GMP service (default: %(default)r)',
157
        )
158
        parser.add_argument(
159
            '-V',
160
            '--version',
161
            action='version',
162
            version='%(prog)s {version} (API version {apiversion})'.format(
163
                version=__version__, apiversion=__api_version__
164
            ),
165
            help='Show version information and exit',
166
        )
167
168
        subparsers = parser.add_subparsers(
169
            metavar='CONNECTION_TYPE',
170
            title='connections',
171
            description='valid connection types',
172
            help="Connection type to use",
173
            parser_class=Subparser,
174
        )
175
        subparsers.required = True
176
        subparsers.dest = 'connection_type'
177
178
        self._subparsers = subparsers
179
180
        self._parser = parser
181
        self._root_parser = root_parser
182
183
        self._logfilename = logfilename
184
        self._ignore_config = ignore_config
185
186
        self._add_subparsers()
187
188
    def parse_args(self, args=None):
189
        args_before, _ = self._root_parser.parse_known_args(args)
190
191
        if args_before.loglevel is not None:
192
            level = logging.getLevelName(args_before.loglevel)
193
            logging.basicConfig(filename=self._logfilename, level=level)
194
195
        self._set_defaults(None if self._ignore_config else args_before.config)
196
197
        args = self._parser.parse_args(args)
198
199
        # If timeout value is -1, then the socket should have no timeout
200
        if args.timeout == -1:
201
            args.timeout = None
202
203
        logging.debug('Parsed arguments %r', args)
204
205
        return args
206
207
    def add_argument(self, *args, **kwargs):
208
        self._parser.add_argument(*args, **kwargs)
209
210
    def add_protocol_argument(self):
211
        self.add_argument(
212
            '--protocol',
213
            required=False,
214
            default=DEFAULT_PROTOCOL,
215
            choices=[PROTOCOL_GMP, PROTOCOL_OSP],
216
            help='Service protocol to use (default: %(default)s)',
217
        )
218
219
    def _load_config(self, configfile):
220
        config = configparser.ConfigParser(default_section='main')
221
222
        if not configfile:
223
            return config
224
225
        try:
226
            path = os.path.expanduser(configfile)
227
            config.read(path)
228
            logger.debug('Loaded config %s', configfile)
229
        except Exception as e:  # pylint: disable=broad-except
230
            raise RuntimeError(
231
                'Error while parsing config file {config}. Error was '
232
                '{message}'.format(config=configfile, message=e)
233
            )
234
235
        return config
236
237
    def _add_subparsers(self):
238
        parser_ssh = self._subparsers.add_parser(
239
            'ssh', help='Use SSH to connect to service', parent=self._parser
240
        )
241
242
        parser_ssh.add_argument(
243
            '--hostname', required=True, help='Hostname or IP address'
244
        )
245
        parser_ssh.add_argument(
246
            '--port',
247
            required=False,
248
            help='SSH port (default: %(default)s)',
249
            type=int,
250
        )
251
        parser_ssh.add_argument(
252
            '--ssh-username', help='SSH username (default: %(default)r)'
253
        )
254
        parser_ssh.add_argument(
255
            '--ssh-password', help='SSH password (default: %(default)r)'
256
        )
257
258
        parser_tls = self._subparsers.add_parser(
259
            'tls',
260
            help='Use TLS secured connection to connect to service',
261
            parent=self._parser,
262
        )
263
        parser_tls.add_argument(
264
            '--hostname', required=True, help='Hostname or IP address'
265
        )
266
        parser_tls.add_argument(
267
            '--port',
268
            required=False,
269
            help='GMP/OSP port (default: %(default)s)',
270
            type=int,
271
        )
272
        parser_tls.add_argument(
273
            '--certfile',
274
            required=False,
275
            help='Path to the certificate file for client authentication. '
276
            '(default: %(default)s)',
277
        )
278
        parser_tls.add_argument(
279
            '--keyfile',
280
            required=False,
281
            help='Path to key file for client authentication. '
282
            '(default: %(default)s)',
283
        )
284
        parser_tls.add_argument(
285
            '--cafile',
286
            required=False,
287
            help='Path to CA certificate for server authentication. '
288
            '(default: %(default)s)',
289
        )
290
        parser_tls.add_argument(
291
            '--no-credentials',
292
            required=False,
293
            default=False,
294
            action='store_true',
295
            help='Use only certificates for authentication',
296
        )
297
298
        parser_socket = self._subparsers.add_parser(
299
            'socket',
300
            help='Use UNIX Domain socket to connect to service',
301
            parent=self._parser,
302
        )
303
304
        socketpath_group = parser_socket.add_mutually_exclusive_group()
305
        socketpath_group.add_argument(
306
            '--sockpath',
307
            nargs='?',
308
            default=None,
309
            help='Deprecated, use --socketpath instead',
310
        )
311
        socketpath_group.add_argument(
312
            '--socketpath',
313
            nargs='?',
314
            help='Path to UNIX Domain socket (default: %(default)s)',
315
        )
316
317
        self._parser_ssh = parser_ssh
318
        self._parser_socket = parser_socket
319
        self._parser_tls = parser_tls
320
321
    def _set_defaults(self, configfilename=None):
322
        self._config = self._load_config(configfilename)
323
324
        self._parser.set_defaults(
325
            gmp_username=self._config.get('gmp', 'username', fallback=''),
326
            gmp_password=self._config.get('gmp', 'password', fallback=''),
327
            **self._config.defaults()
328
        )
329
330
        self._parser_ssh.set_defaults(
331
            port=int(self._config.get('ssh', 'port', fallback=22)),
332
            ssh_username=self._config.get('ssh', 'username', fallback='gmp'),
333
            ssh_password=self._config.get('ssh', 'password', fallback='gmp'),
334
        )
335
        self._parser_tls.set_defaults(
336
            port=int(
337
                self._config.get('tls', 'port', fallback=DEFAULT_GVM_PORT)
338
            ),
339
            certfile=self._config.get('tls', 'certfile', fallback=None),
340
            keyfile=self._config.get('tls', 'keyfile', fallback=None),
341
            cafile=self._config.get('tls', 'cafile', fallback=None),
342
        )
343
        self._parser_socket.set_defaults(
344
            socketpath=self._config.get(
345
                'unixsocket', 'socketpath', fallback=DEFAULT_UNIX_SOCKET_PATH
346
            )
347
        )
348
349
350
def create_parser(description, logfilename):
351
    return CliParser(description, logfilename)
352
353
354
def create_connection(
355
    connection_type,
356
    socketpath=None,
357
    timeout=None,
358
    hostname=None,
359
    port=None,
360
    certfile=None,
361
    keyfile=None,
362
    cafile=None,
363
    ssh_username=None,
364
    ssh_password=None,
365
    **kwargs  # pylint: disable=unused-argument
366
):
367
    if 'socket' in connection_type:
368
        return UnixSocketConnection(timeout=timeout, path=socketpath)
369
370
    if 'tls' in connection_type:
371
        return TLSConnection(
372
            timeout=timeout,
373
            hostname=hostname,
374
            port=port,
375
            certfile=certfile,
376
            keyfile=keyfile,
377
            cafile=cafile,
378
        )
379
380
    return SSHConnection(
381
        timeout=timeout,
382
        hostname=hostname,
383
        port=port,
384
        username=ssh_username,
385
        password=ssh_password,
386
    )
387