Passed
Pull Request — master (#185)
by Juan José
02:12
created

gvmtools.parser.CliParser.parse_args()   B

Complexity

Conditions 6

Size

Total Lines 23
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 15
dl 0
loc 23
rs 8.6666
c 0
b 0
f 0
cc 6
nop 3
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 - 2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
"""Command Line Interface Parser
20
"""
21
22
import argparse
23
import logging
24
25
from pathlib import Path
26
27
from gvm import get_version as get_gvm_version
28
from gvm.connections import (
29
    DEFAULT_TIMEOUT,
30
    SSHConnection,
31
    TLSConnection,
32
    UnixSocketConnection,
33
)
34
35
from gvmtools import get_version
36
from gvmtools.config import Config
37
38
logger = logging.getLogger(__name__)
39
40
__version__ = get_version()
41
__api_version__ = get_gvm_version()
42
43
DEFAULT_CONFIG_PATH = '~/.config/gvm-tools.conf'
44
45
PROTOCOL_OSP = 'OSP'
46
PROTOCOL_GMP = 'GMP'
47
DEFAULT_PROTOCOL = PROTOCOL_GMP
48
49
50
def _filter_actions(actions, actiontypes):
51
    return [action for action in actions if not isinstance(action, actiontypes)]
52
53
54
class Subparser(argparse.ArgumentParser):
55
    """An ArgumentParser child class to allow better Subparser help formatting
56
57
    This class overrides the format_help method of ArgumentParser.
58
59
    It adds the actions of a parent parser to the usage output by skipping the
60
    _SubParserActions.
61
    """
62
63
    def __init__(self, parent=None, **kwargs):
64
        super().__init__(**kwargs)
65
66
        self._parent = parent
67
68
    def format_help(self):
69
        # pylint: disable=protected-access
70
71
        # this code may break with changes in argparse
72
73
        formatter = self._get_formatter()
74
75
        if self._parent:
76
            actions = _filter_actions(
77
                self._parent._actions, argparse._SubParsersAction
78
            )
79
            actions.extend(_filter_actions(self._actions, argparse._HelpAction))
80
        else:
81
            actions = self._actions
82
83
        formatter.add_usage(
84
            self.usage, actions, self._mutually_exclusive_groups
85
        )
86
87
        for i, action_group in enumerate(self._action_groups):
88
            formatter.start_section(action_group.title)
89
            formatter.add_text(action_group.description)
90
91
            if self._parent and len(self._parent._action_groups) > i:
92
                parent_action_group = self._parent._action_groups[i]
93
                formatter.add_arguments(parent_action_group._group_actions)
94
95
            formatter.add_arguments(
96
                _filter_actions(
97
                    action_group._group_actions, argparse._HelpAction
98
                )
99
            )
100
            formatter.end_section()
101
102
        # description
103
        formatter.add_text(self.description)
104
105
        # epilog
106
        formatter.add_text(self.epilog)
107
108
        return formatter.format_help()
109
110
111
class CliParser:
112
    def __init__(
113
        self, description, logfilename, *, prog=None, ignore_config=False
114
    ):
115
        root_parser = argparse.ArgumentParser(
116
            prog=prog,
117
            description=description,
118
            formatter_class=argparse.RawTextHelpFormatter,
119
            # don't parse help initially. the args from parser wouldn't be shown
120
            add_help=False,
121
        )
122
123
        root_parser.add_argument(
124
            '-c',
125
            '--config',
126
            nargs='?',
127
            default=DEFAULT_CONFIG_PATH,
128
            help='Configuration file path (default: %(default)s)',
129
        )
130
        root_parser.add_argument(
131
            '--log',
132
            nargs='?',
133
            dest='loglevel',
134
            const='INFO',
135
            choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
136
            help='Activate logging (default level: %(default)s)',
137
        )
138
139
        parser = argparse.ArgumentParser(prog=prog, parents=[root_parser])
140
141
        parser.add_argument(
142
            '--timeout',
143
            required=False,
144
            default=DEFAULT_TIMEOUT,
145
            type=int,
146
            help='Response timeout in seconds, or -1 to wait '
147
            'indefinitely (default: %(default)s)',
148
        )
149
        parser.add_argument(
150
            '--gmp-username',
151
            help='Username for GMP service (default: %(default)r)',
152
        )
153
        parser.add_argument(
154
            '--gmp-password',
155
            help='Password for GMP service (default: %(default)r)',
156
        )
157
        parser.add_argument(
158
            '-V',
159
            '--version',
160
            action='version',
161
            version='%(prog)s {version} (API version {apiversion})'.format(
162
                version=__version__, apiversion=__api_version__
163
            ),
164
            help='Show version information and exit',
165
        )
166
167
        subparsers = parser.add_subparsers(
168
            metavar='CONNECTION_TYPE',
169
            title='connections',
170
            description='valid connection types',
171
            help="Connection type to use",
172
            parser_class=Subparser,
173
        )
174
        subparsers.required = True
175
        subparsers.dest = 'connection_type'
176
177
        self._subparsers = subparsers
178
179
        self._parser = parser
180
        self._root_parser = root_parser
181
182
        self._logfilename = logfilename
183
        self._ignore_config = ignore_config
184
185
        self._add_subparsers()
186
187
    def parse_args(self, args=None, only_known=False):
188
        args_before, _ = self._root_parser.parse_known_args(args)
189
190
        if args_before.loglevel is not None:
191
            level = logging.getLevelName(args_before.loglevel)
192
            logging.basicConfig(filename=self._logfilename, level=level)
193
194
        self._set_defaults(None if self._ignore_config else args_before.config)
195
196
        if only_known:
197
            args, script_args = self._parser.parse_known_args(args)
198
        else:
199
            args = self._parser.parse_args(args)
200
201
        # If timeout value is -1, then the socket should have no timeout
202
        if args.timeout == -1:
203
            args.timeout = None
204
205
        logging.debug('Parsed arguments %r', args)
206
207
        if only_known:
208
            return args, script_args
0 ignored issues
show
introduced by
The variable script_args does not seem to be defined for all execution paths.
Loading history...
209
        return args
210
211
    def add_argument(self, *args, **kwargs):
212
        self._parser_socket.add_argument(*args, **kwargs)
213
        self._parser_ssh.add_argument(*args, **kwargs)
214
        self._parser_tls.add_argument(*args, **kwargs)
215
216
    def add_protocol_argument(self):
217
        self._parser.add_argument(
218
            '--protocol',
219
            required=False,
220
            default=DEFAULT_PROTOCOL,
221
            choices=[PROTOCOL_GMP, PROTOCOL_OSP],
222
            help='Service protocol to use (default: %(default)s)',
223
        )
224
225
    def _load_config(self, configfile):
226
        config = Config()
227
228
        if not configfile:
229
            return config
230
231
        configpath = Path(configfile)
232
233
        try:
234
            if not configpath.expanduser().resolve().exists():
235
                logger.debug('Ignoring non existing config file %s', configfile)
236
                return config
237
        except FileNotFoundError:
238
            # we are on python 3.5 and Path.resolve raised a FileNotFoundError
239
            logger.debug('Ignoring non existing config file %s', configfile)
240
            return config
241
242
        try:
243
            config.load(configpath)
244
            logger.debug('Loaded config %s', configfile)
245
        except Exception as e:  # pylint: disable=broad-except
246
            raise RuntimeError(
247
                'Error while parsing config file {config}. Error was '
248
                '{message}'.format(config=configfile, message=e)
249
            )
250
251
        return config
252
253
    def _add_subparsers(self):
254
        parser_ssh = self._subparsers.add_parser(
255
            'ssh', help='Use SSH to connect to service', parent=self._parser
256
        )
257
258
        parser_ssh.add_argument(
259
            '--hostname', required=True, help='Hostname or IP address'
260
        )
261
        parser_ssh.add_argument(
262
            '--port',
263
            required=False,
264
            help='SSH port (default: %(default)s)',
265
            type=int,
266
        )
267
        parser_ssh.add_argument(
268
            '--ssh-username', help='SSH username (default: %(default)r)'
269
        )
270
        parser_ssh.add_argument(
271
            '--ssh-password', help='SSH password (default: %(default)r)'
272
        )
273
274
        parser_tls = self._subparsers.add_parser(
275
            'tls',
276
            help='Use TLS secured connection to connect to service',
277
            parent=self._parser,
278
        )
279
        parser_tls.add_argument(
280
            '--hostname', required=True, help='Hostname or IP address'
281
        )
282
        parser_tls.add_argument(
283
            '--port',
284
            required=False,
285
            help='GMP/OSP port (default: %(default)s)',
286
            type=int,
287
        )
288
        parser_tls.add_argument(
289
            '--certfile',
290
            required=False,
291
            help='Path to the certificate file for client authentication. '
292
            '(default: %(default)s)',
293
        )
294
        parser_tls.add_argument(
295
            '--keyfile',
296
            required=False,
297
            help='Path to key file for client authentication. '
298
            '(default: %(default)s)',
299
        )
300
        parser_tls.add_argument(
301
            '--cafile',
302
            required=False,
303
            help='Path to CA certificate for server authentication. '
304
            '(default: %(default)s)',
305
        )
306
        parser_tls.add_argument(
307
            '--no-credentials',
308
            required=False,
309
            default=False,
310
            action='store_true',
311
            help='Use only certificates for authentication',
312
        )
313
314
        parser_socket = self._subparsers.add_parser(
315
            'socket',
316
            help='Use UNIX Domain socket to connect to service',
317
            parent=self._parser,
318
        )
319
320
        socketpath_group = parser_socket.add_mutually_exclusive_group()
321
        socketpath_group.add_argument(
322
            '--sockpath',
323
            nargs='?',
324
            default=None,
325
            help='Deprecated, use --socketpath instead',
326
        )
327
        socketpath_group.add_argument(
328
            '--socketpath',
329
            nargs='?',
330
            help='Path to UNIX Domain socket (default: %(default)s)',
331
        )
332
333
        self._parser_ssh = parser_ssh
334
        self._parser_socket = parser_socket
335
        self._parser_tls = parser_tls
336
337
    def _set_defaults(self, configfilename=None):
338
        self._config = self._load_config(configfilename)
339
340
        self._parser.set_defaults(
341
            gmp_username=self._config.get('gmp', 'username'),
342
            gmp_password=self._config.get('gmp', 'password'),
343
            **self._config.defaults()
344
        )
345
346
        self._parser_ssh.set_defaults(
347
            port=int(self._config.get('ssh', 'port')),
348
            ssh_username=self._config.get('ssh', 'username'),
349
            ssh_password=self._config.get('ssh', 'password'),
350
        )
351
        self._parser_tls.set_defaults(
352
            port=int(self._config.get('tls', 'port')),
353
            certfile=self._config.get('tls', 'certfile'),
354
            keyfile=self._config.get('tls', 'keyfile'),
355
            cafile=self._config.get('tls', 'cafile'),
356
        )
357
        self._parser_socket.set_defaults(
358
            socketpath=self._config.get('unixsocket', 'socketpath')
359
        )
360
361
362
def create_parser(description, logfilename):
363
    return CliParser(description, logfilename)
364
365
366
def create_connection(
367
    connection_type,
368
    socketpath=None,
369
    timeout=None,
370
    hostname=None,
371
    port=None,
372
    certfile=None,
373
    keyfile=None,
374
    cafile=None,
375
    ssh_username=None,
376
    ssh_password=None,
377
    **kwargs  # pylint: disable=unused-argument
378
):
379
    if 'socket' in connection_type:
380
        return UnixSocketConnection(timeout=timeout, path=socketpath)
381
382
    if 'tls' in connection_type:
383
        return TLSConnection(
384
            timeout=timeout,
385
            hostname=hostname,
386
            port=port,
387
            certfile=certfile,
388
            keyfile=keyfile,
389
            cafile=cafile,
390
        )
391
392
    return SSHConnection(
393
        timeout=timeout,
394
        hostname=hostname,
395
        port=port,
396
        username=ssh_username,
397
        password=ssh_password,
398
    )
399