Passed
Pull Request — master (#185)
by Juan José
01:09
created

gvmtools.parser.CliParser.parse_known_args()   A

Complexity

Conditions 4

Size

Total Lines 18
Code Lines 11

Duplication

Lines 18
Ratio 100 %

Importance

Changes 0
Metric Value
eloc 11
dl 18
loc 18
rs 9.85
c 0
b 0
f 0
cc 4
nop 2
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 - 2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
"""Command Line Interface Parser
20
"""
21
22
import argparse
23
import logging
24
25
from pathlib import Path
26
27
from gvm import get_version as get_gvm_version
28
from gvm.connections import (
29
    DEFAULT_TIMEOUT,
30
    SSHConnection,
31
    TLSConnection,
32
    UnixSocketConnection,
33
)
34
35
from gvmtools import get_version
36
from gvmtools.config import Config
37
38
logger = logging.getLogger(__name__)
39
40
__version__ = get_version()
41
__api_version__ = get_gvm_version()
42
43
DEFAULT_CONFIG_PATH = '~/.config/gvm-tools.conf'
44
45
PROTOCOL_OSP = 'OSP'
46
PROTOCOL_GMP = 'GMP'
47
DEFAULT_PROTOCOL = PROTOCOL_GMP
48
49
50
def _filter_actions(actions, actiontypes):
51
    return [action for action in actions if not isinstance(action, actiontypes)]
52
53
54
class Subparser(argparse.ArgumentParser):
55
    """An ArgumentParser child class to allow better Subparser help formatting
56
57
    This class overrides the format_help method of ArgumentParser.
58
59
    It adds the actions of a parent parser to the usage output by skipping the
60
    _SubParserActions.
61
    """
62
63
    def __init__(self, parent=None, **kwargs):
64
        super().__init__(**kwargs)
65
66
        self._parent = parent
67
68
    def format_help(self):
69
        # pylint: disable=protected-access
70
71
        # this code may break with changes in argparse
72
73
        formatter = self._get_formatter()
74
75
        if self._parent:
76
            actions = _filter_actions(
77
                self._parent._actions, argparse._SubParsersAction
78
            )
79
            actions.extend(_filter_actions(self._actions, argparse._HelpAction))
80
        else:
81
            actions = self._actions
82
83
        formatter.add_usage(
84
            self.usage, actions, self._mutually_exclusive_groups
85
        )
86
87
        for i, action_group in enumerate(self._action_groups):
88
            formatter.start_section(action_group.title)
89
            formatter.add_text(action_group.description)
90
91
            if self._parent and len(self._parent._action_groups) > i:
92
                parent_action_group = self._parent._action_groups[i]
93
                formatter.add_arguments(parent_action_group._group_actions)
94
95
            formatter.add_arguments(
96
                _filter_actions(
97
                    action_group._group_actions, argparse._HelpAction
98
                )
99
            )
100
            formatter.end_section()
101
102
        # description
103
        formatter.add_text(self.description)
104
105
        # epilog
106
        formatter.add_text(self.epilog)
107
108
        return formatter.format_help()
109
110
111
class CliParser:
112
    def __init__(
113
        self, description, logfilename, *, prog=None, ignore_config=False
114
    ):
115
        root_parser = argparse.ArgumentParser(
116
            prog=prog,
117
            description=description,
118
            formatter_class=argparse.RawTextHelpFormatter,
119
            # don't parse help initially. the args from parser wouldn't be shown
120
            add_help=False,
121
        )
122
123
        root_parser.add_argument(
124
            '-c',
125
            '--config',
126
            nargs='?',
127
            default=DEFAULT_CONFIG_PATH,
128
            help='Configuration file path (default: %(default)s)',
129
        )
130
        root_parser.add_argument(
131
            '--log',
132
            nargs='?',
133
            dest='loglevel',
134
            const='INFO',
135
            choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
136
            help='Activate logging (default level: %(default)s)',
137
        )
138
139
        parser = argparse.ArgumentParser(prog=prog, parents=[root_parser])
140
141
        parser.add_argument(
142
            '--timeout',
143
            required=False,
144
            default=DEFAULT_TIMEOUT,
145
            type=int,
146
            help='Response timeout in seconds, or -1 to wait '
147
            'indefinitely (default: %(default)s)',
148
        )
149
        parser.add_argument(
150
            '--gmp-username',
151
            help='Username for GMP service (default: %(default)r)',
152
        )
153
        parser.add_argument(
154
            '--gmp-password',
155
            help='Password for GMP service (default: %(default)r)',
156
        )
157
        parser.add_argument(
158
            '-V',
159
            '--version',
160
            action='version',
161
            version='%(prog)s {version} (API version {apiversion})'.format(
162
                version=__version__, apiversion=__api_version__
163
            ),
164
            help='Show version information and exit',
165
        )
166
167
        subparsers = parser.add_subparsers(
168
            metavar='CONNECTION_TYPE',
169
            title='connections',
170
            description='valid connection types',
171
            help="Connection type to use",
172
            parser_class=Subparser,
173
        )
174
        subparsers.required = True
175
        subparsers.dest = 'connection_type'
176
177
        self._subparsers = subparsers
178
179
        self._parser = parser
180
        self._root_parser = root_parser
181
182
        self._logfilename = logfilename
183
        self._ignore_config = ignore_config
184
185
        self._add_subparsers()
186
187 View Code Duplication
    def parse_args(self, args=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
188
        args_before, _ = self._root_parser.parse_known_args(args)
189
190
        if args_before.loglevel is not None:
191
            level = logging.getLevelName(args_before.loglevel)
192
            logging.basicConfig(filename=self._logfilename, level=level)
193
194
        self._set_defaults(None if self._ignore_config else args_before.config)
195
196
        args = self._parser.parse_args(args)
197
198
        # If timeout value is -1, then the socket should have no timeout
199
        if args.timeout == -1:
200
            args.timeout = None
201
202
        logging.debug('Parsed arguments %r', args)
203
204
        return args
205
206 View Code Duplication
    def parse_known_args(self, args=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
207
        args_before, _ = self._root_parser.parse_known_args(args)
208
209
        if args_before.loglevel is not None:
210
            level = logging.getLevelName(args_before.loglevel)
211
            logging.basicConfig(filename=self._logfilename, level=level)
212
213
        self._set_defaults(None if self._ignore_config else args_before.config)
214
215
        args, script_args = self._parser.parse_known_args(args)
216
217
        # If timeout value is -1, then the socket should have no timeout
218
        if args.timeout == -1:
219
            args.timeout = None
220
221
        logging.debug('Parsed arguments %r', args)
222
223
        return args, script_args
224
225
    def add_argument(self, *args, **kwargs):
226
        self._parser_socket.add_argument(*args, **kwargs)
227
        self._parser_ssh.add_argument(*args, **kwargs)
228
        self._parser_tls.add_argument(*args, **kwargs)
229
230
    def add_protocol_argument(self):
231
        self._parser.add_argument(
232
            '--protocol',
233
            required=False,
234
            default=DEFAULT_PROTOCOL,
235
            choices=[PROTOCOL_GMP, PROTOCOL_OSP],
236
            help='Service protocol to use (default: %(default)s)',
237
        )
238
239
    def _load_config(self, configfile):
240
        config = Config()
241
242
        if not configfile:
243
            return config
244
245
        configpath = Path(configfile)
246
247
        try:
248
            if not configpath.expanduser().resolve().exists():
249
                logger.debug('Ignoring non existing config file %s', configfile)
250
                return config
251
        except FileNotFoundError:
252
            # we are on python 3.5 and Path.resolve raised a FileNotFoundError
253
            logger.debug('Ignoring non existing config file %s', configfile)
254
            return config
255
256
        try:
257
            config.load(configpath)
258
            logger.debug('Loaded config %s', configfile)
259
        except Exception as e:  # pylint: disable=broad-except
260
            raise RuntimeError(
261
                'Error while parsing config file {config}. Error was '
262
                '{message}'.format(config=configfile, message=e)
263
            )
264
265
        return config
266
267
    def _add_subparsers(self):
268
        parser_ssh = self._subparsers.add_parser(
269
            'ssh', help='Use SSH to connect to service', parent=self._parser
270
        )
271
272
        parser_ssh.add_argument(
273
            '--hostname', required=True, help='Hostname or IP address'
274
        )
275
        parser_ssh.add_argument(
276
            '--port',
277
            required=False,
278
            help='SSH port (default: %(default)s)',
279
            type=int,
280
        )
281
        parser_ssh.add_argument(
282
            '--ssh-username', help='SSH username (default: %(default)r)'
283
        )
284
        parser_ssh.add_argument(
285
            '--ssh-password', help='SSH password (default: %(default)r)'
286
        )
287
288
        parser_tls = self._subparsers.add_parser(
289
            'tls',
290
            help='Use TLS secured connection to connect to service',
291
            parent=self._parser,
292
        )
293
        parser_tls.add_argument(
294
            '--hostname', required=True, help='Hostname or IP address'
295
        )
296
        parser_tls.add_argument(
297
            '--port',
298
            required=False,
299
            help='GMP/OSP port (default: %(default)s)',
300
            type=int,
301
        )
302
        parser_tls.add_argument(
303
            '--certfile',
304
            required=False,
305
            help='Path to the certificate file for client authentication. '
306
            '(default: %(default)s)',
307
        )
308
        parser_tls.add_argument(
309
            '--keyfile',
310
            required=False,
311
            help='Path to key file for client authentication. '
312
            '(default: %(default)s)',
313
        )
314
        parser_tls.add_argument(
315
            '--cafile',
316
            required=False,
317
            help='Path to CA certificate for server authentication. '
318
            '(default: %(default)s)',
319
        )
320
        parser_tls.add_argument(
321
            '--no-credentials',
322
            required=False,
323
            default=False,
324
            action='store_true',
325
            help='Use only certificates for authentication',
326
        )
327
328
        parser_socket = self._subparsers.add_parser(
329
            'socket',
330
            help='Use UNIX Domain socket to connect to service',
331
            parent=self._parser,
332
        )
333
334
        socketpath_group = parser_socket.add_mutually_exclusive_group()
335
        socketpath_group.add_argument(
336
            '--sockpath',
337
            nargs='?',
338
            default=None,
339
            help='Deprecated, use --socketpath instead',
340
        )
341
        socketpath_group.add_argument(
342
            '--socketpath',
343
            nargs='?',
344
            help='Path to UNIX Domain socket (default: %(default)s)',
345
        )
346
347
        self._parser_ssh = parser_ssh
348
        self._parser_socket = parser_socket
349
        self._parser_tls = parser_tls
350
351
    def _set_defaults(self, configfilename=None):
352
        self._config = self._load_config(configfilename)
353
354
        self._parser.set_defaults(
355
            gmp_username=self._config.get('gmp', 'username'),
356
            gmp_password=self._config.get('gmp', 'password'),
357
            **self._config.defaults()
358
        )
359
360
        self._parser_ssh.set_defaults(
361
            port=int(self._config.get('ssh', 'port')),
362
            ssh_username=self._config.get('ssh', 'username'),
363
            ssh_password=self._config.get('ssh', 'password'),
364
        )
365
        self._parser_tls.set_defaults(
366
            port=int(self._config.get('tls', 'port')),
367
            certfile=self._config.get('tls', 'certfile'),
368
            keyfile=self._config.get('tls', 'keyfile'),
369
            cafile=self._config.get('tls', 'cafile'),
370
        )
371
        self._parser_socket.set_defaults(
372
            socketpath=self._config.get('unixsocket', 'socketpath')
373
        )
374
375
376
def create_parser(description, logfilename):
377
    return CliParser(description, logfilename)
378
379
380
def create_connection(
381
    connection_type,
382
    socketpath=None,
383
    timeout=None,
384
    hostname=None,
385
    port=None,
386
    certfile=None,
387
    keyfile=None,
388
    cafile=None,
389
    ssh_username=None,
390
    ssh_password=None,
391
    **kwargs  # pylint: disable=unused-argument
392
):
393
    if 'socket' in connection_type:
394
        return UnixSocketConnection(timeout=timeout, path=socketpath)
395
396
    if 'tls' in connection_type:
397
        return TLSConnection(
398
            timeout=timeout,
399
            hostname=hostname,
400
            port=port,
401
            certfile=certfile,
402
            keyfile=keyfile,
403
            cafile=cafile,
404
        )
405
406
    return SSHConnection(
407
        timeout=timeout,
408
        hostname=hostname,
409
        port=port,
410
        username=ssh_username,
411
        password=ssh_password,
412
    )
413