Passed
Pull Request — master (#185)
by Juan José
02:12
created

gvmtools.parser.create_connection()   A

Complexity

Conditions 3

Size

Total Lines 32
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 28
dl 0
loc 32
rs 9.208
c 0
b 0
f 0
cc 3
nop 11

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 - 2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
"""Command Line Interface Parser
20
"""
21
22
import argparse
23
import logging
24
25
from pathlib import Path
26
27
from gvm import get_version as get_gvm_version
28
from gvm.connections import (
29
    DEFAULT_TIMEOUT,
30
    SSHConnection,
31
    TLSConnection,
32
    UnixSocketConnection,
33
)
34
35
from gvmtools import get_version
36
from gvmtools.config import Config
37
38
logger = logging.getLogger(__name__)
39
40
__version__ = get_version()
41
__api_version__ = get_gvm_version()
42
43
DEFAULT_CONFIG_PATH = '~/.config/gvm-tools.conf'
44
45
PROTOCOL_OSP = 'OSP'
46
PROTOCOL_GMP = 'GMP'
47
DEFAULT_PROTOCOL = PROTOCOL_GMP
48
49
50
def _filter_actions(actions, actiontypes):
51
    return [action for action in actions if not isinstance(action, actiontypes)]
52
53
54
class Subparser(argparse.ArgumentParser):
55
    """An ArgumentParser child class to allow better Subparser help formatting
56
57
    This class overrides the format_help method of ArgumentParser.
58
59
    It adds the actions of a parent parser to the usage output by skipping the
60
    _SubParserActions.
61
    """
62
63
    def __init__(self, parent=None, **kwargs):
64
        super().__init__(**kwargs)
65
66
        self._parent = parent
67
68
    def format_help(self):
69
        # pylint: disable=protected-access
70
71
        # this code may break with changes in argparse
72
73
        formatter = self._get_formatter()
74
75
        if self._parent:
76
            actions = _filter_actions(
77
                self._parent._actions, argparse._SubParsersAction
78
            )
79
            actions.extend(_filter_actions(self._actions, argparse._HelpAction))
80
        else:
81
            actions = self._actions
82
83
        formatter.add_usage(
84
            self.usage, actions, self._mutually_exclusive_groups
85
        )
86
87
        for i, action_group in enumerate(self._action_groups):
88
            formatter.start_section(action_group.title)
89
            formatter.add_text(action_group.description)
90
91
            if self._parent and len(self._parent._action_groups) > i:
92
                parent_action_group = self._parent._action_groups[i]
93
                formatter.add_arguments(parent_action_group._group_actions)
94
95
            formatter.add_arguments(
96
                _filter_actions(
97
                    action_group._group_actions, argparse._HelpAction
98
                )
99
            )
100
            formatter.end_section()
101
102
        # description
103
        formatter.add_text(self.description)
104
105
        # epilog
106
        formatter.add_text(self.epilog)
107
108
        return formatter.format_help()
109
110
111
class CliParser:
112
    def __init__(
113
        self, description, logfilename, *, prog=None, ignore_config=False
114
    ):
115
        root_parser = argparse.ArgumentParser(
116
            prog=prog,
117
            description=description,
118
            formatter_class=argparse.RawTextHelpFormatter,
119
            # don't parse help initially. the args from parser wouldn't be shown
120
            add_help=False,
121
        )
122
123
        root_parser.add_argument(
124
            '-c',
125
            '--config',
126
            nargs='?',
127
            default=DEFAULT_CONFIG_PATH,
128
            help='Configuration file path (default: %(default)s)',
129
        )
130
        root_parser.add_argument(
131
            '--log',
132
            nargs='?',
133
            dest='loglevel',
134
            const='INFO',
135
            choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
136
            help='Activate logging (default level: %(default)s)',
137
        )
138
139
        parser = argparse.ArgumentParser(prog=prog, parents=[root_parser])
140
141
        parser.add_argument(
142
            '--timeout',
143
            required=False,
144
            default=DEFAULT_TIMEOUT,
145
            type=int,
146
            help='Response timeout in seconds, or -1 to wait '
147
            'indefinitely (default: %(default)s)',
148
        )
149
        parser.add_argument(
150
            '--gmp-username',
151
            help='Username for GMP service (default: %(default)r)',
152
        )
153
        parser.add_argument(
154
            '--gmp-password',
155
            help='Password for GMP service (default: %(default)r)',
156
        )
157
        parser.add_argument(
158
            '-V',
159
            '--version',
160
            action='version',
161
            version='%(prog)s {version} (API version {apiversion})'.format(
162
                version=__version__, apiversion=__api_version__
163
            ),
164
            help='Show version information and exit',
165
        )
166
167
        subparsers = parser.add_subparsers(
168
            metavar='CONNECTION_TYPE',
169
            title='connections',
170
            description='valid connection types',
171
            help="Connection type to use",
172
            parser_class=Subparser,
173
        )
174
        subparsers.required = True
175
        subparsers.dest = 'connection_type'
176
177
        self._subparsers = subparsers
178
179
        self._parser = parser
180
        self._root_parser = root_parser
181
182
        self._logfilename = logfilename
183
        self._ignore_config = ignore_config
184
185
        self._add_subparsers()
186
187
    def parse_args(self, args=None, only_known=False):
188
        args_before, _ = self._root_parser.parse_known_args(args)
189
190
        if args_before.loglevel is not None:
191
            level = logging.getLevelName(args_before.loglevel)
192
            logging.basicConfig(filename=self._logfilename, level=level)
193
194
        self._set_defaults(None if self._ignore_config else args_before.config)
195
196
        if only_known:
197
            args, script_args = self._parser.parse_known_args(args)
198
        else:
199
            args = self._parser.parse_args(args)
200
201
        # If timeout value is -1, then the socket should have no timeout
202
        if args.timeout == -1:
203
            args.timeout = None
204
205
        logging.debug('Parsed arguments %r', args)
206
207
        if only_known:
208
            return args, script_args
0 ignored issues
show
introduced by
The variable script_args does not seem to be defined for all execution paths.
Loading history...
209
        return args
210
211
    def add_argument(self, *args, **kwargs):
212
        self._parser_socket.add_argument(*args, **kwargs)
213
        self._parser_ssh.add_argument(*args, **kwargs)
214
        self._parser_tls.add_argument(*args, **kwargs)
215
216
    def add_protocol_argument(self):
217
        self._parser.add_argument(
218
            '--protocol',
219
            required=False,
220
            default=DEFAULT_PROTOCOL,
221
            choices=[PROTOCOL_GMP, PROTOCOL_OSP],
222
            help='Service protocol to use (default: %(default)s)',
223
        )
224
225
    def _load_config(self, configfile):
226
        config = Config()
227
228
        if not configfile:
229
            return config
230
231
        configpath = Path(configfile)
232
233
        try:
234
            if not configpath.expanduser().resolve().exists():
235
                logger.debug('Ignoring non existing config file %s', configfile)
236
                return config
237
        except FileNotFoundError:
238
            # we are on python 3.5 and Path.resolve raised a FileNotFoundError
239
            logger.debug('Ignoring non existing config file %s', configfile)
240
            return config
241
242
        try:
243
            config.load(configpath)
244
            logger.debug('Loaded config %s', configfile)
245
        except Exception as e:  # pylint: disable=broad-except
246
            raise RuntimeError(
247
                'Error while parsing config file {config}. Error was '
248
                '{message}'.format(config=configfile, message=e)
249
            )
250
251
        return config
252
253
    def _add_subparsers(self):
254
        parser_ssh = self._subparsers.add_parser(
255
            'ssh', help='Use SSH to connect to service', parent=self._parser
256
        )
257
258
        parser_ssh.add_argument(
259
            '--hostname', required=True, help='Hostname or IP address'
260
        )
261
        parser_ssh.add_argument(
262
            '--port',
263
            required=False,
264
            help='SSH port (default: %(default)s)',
265
            type=int,
266
        )
267
        parser_ssh.add_argument(
268
            '--ssh-username', help='SSH username (default: %(default)r)'
269
        )
270
        parser_ssh.add_argument(
271
            '--ssh-password', help='SSH password (default: %(default)r)'
272
        )
273
274
        parser_tls = self._subparsers.add_parser(
275
            'tls',
276
            help='Use TLS secured connection to connect to service',
277
            parent=self._parser,
278
        )
279
        parser_tls.add_argument(
280
            '--hostname', required=True, help='Hostname or IP address'
281
        )
282
        parser_tls.add_argument(
283
            '--port',
284
            required=False,
285
            help='GMP/OSP port (default: %(default)s)',
286
            type=int,
287
        )
288
        parser_tls.add_argument(
289
            '--certfile',
290
            required=False,
291
            help='Path to the certificate file for client authentication. '
292
            '(default: %(default)s)',
293
        )
294
        parser_tls.add_argument(
295
            '--keyfile',
296
            required=False,
297
            help='Path to key file for client authentication. '
298
            '(default: %(default)s)',
299
        )
300
        parser_tls.add_argument(
301
            '--cafile',
302
            required=False,
303
            help='Path to CA certificate for server authentication. '
304
            '(default: %(default)s)',
305
        )
306
        parser_tls.add_argument(
307
            '--no-credentials',
308
            required=False,
309
            default=False,
310
            action='store_true',
311
            help='Use only certificates for authentication',
312
        )
313
314
        parser_socket = self._subparsers.add_parser(
315
            'socket',
316
            help='Use UNIX Domain socket to connect to service',
317
            parent=self._parser,
318
        )
319
320
        socketpath_group = parser_socket.add_mutually_exclusive_group()
321
        socketpath_group.add_argument(
322
            '--sockpath',
323
            nargs='?',
324
            default=None,
325
            help='Deprecated, use --socketpath instead',
326
        )
327
        socketpath_group.add_argument(
328
            '--socketpath',
329
            nargs='?',
330
            help='Path to UNIX Domain socket (default: %(default)s)',
331
        )
332
333
        self._parser_ssh = parser_ssh
334
        self._parser_socket = parser_socket
335
        self._parser_tls = parser_tls
336
337
    def _set_defaults(self, configfilename=None):
338
        self._config = self._load_config(configfilename)
339
340
        self._parser.set_defaults(
341
            gmp_username=self._config.get('gmp', 'username'),
342
            gmp_password=self._config.get('gmp', 'password'),
343
            **self._config.defaults()
344
        )
345
346
        self._parser_ssh.set_defaults(
347
            port=int(self._config.get('ssh', 'port')),
348
            ssh_username=self._config.get('ssh', 'username'),
349
            ssh_password=self._config.get('ssh', 'password'),
350
        )
351
        self._parser_tls.set_defaults(
352
            port=int(self._config.get('tls', 'port')),
353
            certfile=self._config.get('tls', 'certfile'),
354
            keyfile=self._config.get('tls', 'keyfile'),
355
            cafile=self._config.get('tls', 'cafile'),
356
        )
357
        self._parser_socket.set_defaults(
358
            socketpath=self._config.get('unixsocket', 'socketpath')
359
        )
360
361
362
def create_parser(description, logfilename):
363
    return CliParser(description, logfilename)
364
365
366
def create_connection(
367
    connection_type,
368
    socketpath=None,
369
    timeout=None,
370
    hostname=None,
371
    port=None,
372
    certfile=None,
373
    keyfile=None,
374
    cafile=None,
375
    ssh_username=None,
376
    ssh_password=None,
377
    **kwargs  # pylint: disable=unused-argument
378
):
379
    if 'socket' in connection_type:
380
        return UnixSocketConnection(timeout=timeout, path=socketpath)
381
382
    if 'tls' in connection_type:
383
        return TLSConnection(
384
            timeout=timeout,
385
            hostname=hostname,
386
            port=port,
387
            certfile=certfile,
388
            keyfile=keyfile,
389
            cafile=cafile,
390
        )
391
392
    return SSHConnection(
393
        timeout=timeout,
394
        hostname=hostname,
395
        port=port,
396
        username=ssh_username,
397
        password=ssh_password,
398
    )
399