Passed
Pull Request — master (#170)
by
unknown
01:08
created

gvmtools.parser.create_connection()   A

Complexity

Conditions 3

Size

Total Lines 32
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 28
dl 0
loc 32
rs 9.208
c 0
b 0
f 0
cc 3
nop 11

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 - 2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
"""Command Line Interface Parser
20
"""
21
22
import argparse
23
import logging
24
25
from pathlib import Path
26
27
from gvm import get_version as get_gvm_version
28
from gvm.connections import (
29
    DEFAULT_TIMEOUT,
30
    SSHConnection,
31
    TLSConnection,
32
    UnixSocketConnection,
33
)
34
35
from gvmtools import get_version
36
from gvmtools.config import Config
37
38
logger = logging.getLogger(__name__)
39
40
__version__ = get_version()
41
__api_version__ = get_gvm_version()
42
43
DEFAULT_CONFIG_PATH = '~/.config/gvm-tools.conf'
44
45
PROTOCOL_OSP = 'OSP'
46
PROTOCOL_GMP = 'GMP'
47
DEFAULT_PROTOCOL = PROTOCOL_GMP
48
49
50
def _filter_actions(actions, actiontypes):
51
    return [action for action in actions if not isinstance(action, actiontypes)]
52
53
54
class Subparser(argparse.ArgumentParser):
55
    """An ArgumentParser child class to allow better Subparser help formatting
56
57
    This class overrides the format_help method of ArgumentParser.
58
59
    It adds the actions of a parent parser to the usage output by skipping the
60
    _SubParserActions.
61
    """
62
63
    def __init__(self, parent=None, **kwargs):
64
        super().__init__(**kwargs)
65
66
        self._parent = parent
67
68
    def format_help(self):
69
        # pylint: disable=protected-access
70
71
        # this code may break with changes in argparse
72
73
        formatter = self._get_formatter()
74
75
        if self._parent:
76
            actions = _filter_actions(
77
                self._parent._actions, argparse._SubParsersAction
78
            )
79
            actions.extend(_filter_actions(self._actions, argparse._HelpAction))
80
        else:
81
            actions = self._actions
82
83
        formatter.add_usage(
84
            self.usage, actions, self._mutually_exclusive_groups
85
        )
86
87
        for i, action_group in enumerate(self._action_groups):
88
            formatter.start_section(action_group.title)
89
            formatter.add_text(action_group.description)
90
91
            if self._parent and len(self._parent._action_groups) > i:
92
                parent_action_group = self._parent._action_groups[i]
93
                formatter.add_arguments(parent_action_group._group_actions)
94
95
            formatter.add_arguments(
96
                _filter_actions(
97
                    action_group._group_actions, argparse._HelpAction
98
                )
99
            )
100
            formatter.end_section()
101
102
        # description
103
        formatter.add_text(self.description)
104
105
        # epilog
106
        formatter.add_text(self.epilog)
107
108
        return formatter.format_help()
109
110
111
class CliParser:
112
    def __init__(
113
        self, description, logfilename, *, prog=None, ignore_config=False
114
    ):
115
        root_parser = argparse.ArgumentParser(
116
            prog=prog,
117
            description=description,
118
            formatter_class=argparse.RawTextHelpFormatter,
119
            # don't parse help initially. the args from parser wouldn't be shown
120
            add_help=False,
121
        )
122
123
        root_parser.add_argument(
124
            '-c',
125
            '--config',
126
            nargs='?',
127
            default=DEFAULT_CONFIG_PATH,
128
            help='Configuration file path (default: %(default)s)',
129
        )
130
        root_parser.add_argument(
131
            '--log',
132
            nargs='?',
133
            dest='loglevel',
134
            const='INFO',
135
            choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
136
            help='Activate logging (default level: %(default)s)',
137
        )
138
139
        parser = argparse.ArgumentParser(prog=prog, parents=[root_parser])
140
141
        parser.add_argument(
142
            '--timeout',
143
            required=False,
144
            default=DEFAULT_TIMEOUT,
145
            type=int,
146
            help='Response timeout in seconds, or -1 to wait '
147
            'indefinitely (default: %(default)s)',
148
        )
149
        parser.add_argument(
150
            '--gmp-username',
151
            help='Username for GMP service (default: %(default)r)',
152
        )
153
        parser.add_argument(
154
            '--gmp-password',
155
            help='Password for GMP service (default: %(default)r)',
156
        )
157
        parser.add_argument(
158
            '-V',
159
            '--version',
160
            action='version',
161
            version='%(prog)s {version} (API version {apiversion})'.format(
162
                version=__version__, apiversion=__api_version__
163
            ),
164
            help='Show version information and exit',
165
        )
166
167
        subparsers = parser.add_subparsers(
168
            metavar='CONNECTION_TYPE',
169
            title='connections',
170
            description='valid connection types',
171
            help="Connection type to use",
172
            parser_class=Subparser,
173
        )
174
        subparsers.required = True
175
        subparsers.dest = 'connection_type'
176
177
        self._subparsers = subparsers
178
179
        self._parser = parser
180
        self._root_parser = root_parser
181
182
        self._logfilename = logfilename
183
        self._ignore_config = ignore_config
184
185
        self._add_subparsers()
186
187
    def parse_args(self, args=None):
188
        args_before, _ = self._root_parser.parse_known_args(args)
189
190
        if args_before.loglevel is not None:
191
            level = logging.getLevelName(args_before.loglevel)
192
            logging.basicConfig(filename=self._logfilename, level=level)
193
194
        self._set_defaults(None if self._ignore_config else args_before.config)
195
196
        args = self._parser.parse_args(args)
197
198
        # If timeout value is -1, then the socket should have no timeout
199
        if args.timeout == -1:
200
            args.timeout = None
201
202
        logging.debug('Parsed arguments %r', args)
203
204
        return args
205
206
    def add_argument(self, *args, **kwargs):
207
        self._parser_socket.add_argument(*args, **kwargs)
208
        self._parser_ssh.add_argument(*args, **kwargs)
209
        self._parser_tls.add_argument(*args, **kwargs)
210
211
    def add_protocol_argument(self):
212
        self._parser.add_argument(
213
            '--protocol',
214
            required=False,
215
            default=DEFAULT_PROTOCOL,
216
            choices=[PROTOCOL_GMP, PROTOCOL_OSP],
217
            help='Service protocol to use (default: %(default)s)',
218
        )
219
220
    def _load_config(self, configfile):
221
        config = Config()
222
223
        if not configfile:
224
            return config
225
226
        configpath = Path(configfile)
227
228
        try:
229
            if not configpath.expanduser().resolve().exists():
230
                logger.debug('Ignoring non existing config file %s', configfile)
231
                return config
232
        except FileNotFoundError:
233
            # we are on python 3.5 and Path.resolve raised a FileNotFoundError
234
            logger.debug('Ignoring non existing config file %s', configfile)
235
            return config
236
237
        try:
238
            config.load(configpath)
239
            logger.debug('Loaded config %s', configfile)
240
        except Exception as e:  # pylint: disable=broad-except
241
            raise RuntimeError(
242
                'Error while parsing config file {config}. Error was '
243
                '{message}'.format(config=configfile, message=e)
244
            )
245
246
        return config
247
248
    def _add_subparsers(self):
249
        parser_ssh = self._subparsers.add_parser(
250
            'ssh', help='Use SSH to connect to service', parent=self._parser
251
        )
252
253
        parser_ssh.add_argument(
254
            '--hostname', required=True, help='Hostname or IP address'
255
        )
256
        parser_ssh.add_argument(
257
            '--port',
258
            required=False,
259
            help='SSH port (default: %(default)s)',
260
            type=int,
261
        )
262
        parser_ssh.add_argument(
263
            '--ssh-username', help='SSH username (default: %(default)r)'
264
        )
265
        parser_ssh.add_argument(
266
            '--ssh-password', help='SSH password (default: %(default)r)'
267
        )
268
269
        parser_tls = self._subparsers.add_parser(
270
            'tls',
271
            help='Use TLS secured connection to connect to service',
272
            parent=self._parser,
273
        )
274
        parser_tls.add_argument(
275
            '--hostname', required=True, help='Hostname or IP address'
276
        )
277
        parser_tls.add_argument(
278
            '--port',
279
            required=False,
280
            help='GMP/OSP port (default: %(default)s)',
281
            type=int,
282
        )
283
        parser_tls.add_argument(
284
            '--certfile',
285
            required=False,
286
            help='Path to the certificate file for client authentication. '
287
            '(default: %(default)s)',
288
        )
289
        parser_tls.add_argument(
290
            '--keyfile',
291
            required=False,
292
            help='Path to key file for client authentication. '
293
            '(default: %(default)s)',
294
        )
295
        parser_tls.add_argument(
296
            '--cafile',
297
            required=False,
298
            help='Path to CA certificate for server authentication. '
299
            '(default: %(default)s)',
300
        )
301
        parser_tls.add_argument(
302
            '--no-credentials',
303
            required=False,
304
            default=False,
305
            action='store_true',
306
            help='Use only certificates for authentication',
307
        )
308
309
        parser_socket = self._subparsers.add_parser(
310
            'socket',
311
            help='Use UNIX Domain socket to connect to service',
312
            parent=self._parser,
313
        )
314
315
        socketpath_group = parser_socket.add_mutually_exclusive_group()
316
        socketpath_group.add_argument(
317
            '--sockpath',
318
            nargs='?',
319
            default=None,
320
            help='Deprecated, use --socketpath instead',
321
        )
322
        socketpath_group.add_argument(
323
            '--socketpath',
324
            nargs='?',
325
            help='Path to UNIX Domain socket (default: %(default)s)',
326
        )
327
328
        self._parser_ssh = parser_ssh
329
        self._parser_socket = parser_socket
330
        self._parser_tls = parser_tls
331
332
    def _set_defaults(self, configfilename=None):
333
        self._config = self._load_config(configfilename)
334
335
        self._parser.set_defaults(
336
            gmp_username=self._config.get('gmp', 'username'),
337
            gmp_password=self._config.get('gmp', 'password'),
338
            **self._config.defaults()
339
        )
340
341
        self._parser_ssh.set_defaults(
342
            port=int(self._config.get('ssh', 'port')),
343
            ssh_username=self._config.get('ssh', 'username'),
344
            ssh_password=self._config.get('ssh', 'password'),
345
        )
346
        self._parser_tls.set_defaults(
347
            port=int(self._config.get('tls', 'port')),
348
            certfile=self._config.get('tls', 'certfile'),
349
            keyfile=self._config.get('tls', 'keyfile'),
350
            cafile=self._config.get('tls', 'cafile'),
351
        )
352
        self._parser_socket.set_defaults(
353
            socketpath=self._config.get('unixsocket', 'socketpath')
354
        )
355
356
357
def create_parser(description, logfilename):
358
    return CliParser(description, logfilename)
359
360
361
def create_connection(
362
    connection_type,
363
    socketpath=None,
364
    timeout=None,
365
    hostname=None,
366
    port=None,
367
    certfile=None,
368
    keyfile=None,
369
    cafile=None,
370
    ssh_username=None,
371
    ssh_password=None,
372
    **kwargs  # pylint: disable=unused-argument
373
):
374
    if 'socket' in connection_type:
375
        return UnixSocketConnection(timeout=timeout, path=socketpath)
376
377
    if 'tls' in connection_type:
378
        return TLSConnection(
379
            timeout=timeout,
380
            hostname=hostname,
381
            port=port,
382
            certfile=certfile,
383
            keyfile=keyfile,
384
            cafile=cafile,
385
        )
386
387
    return SSHConnection(
388
        timeout=timeout,
389
        hostname=hostname,
390
        port=port,
391
        username=ssh_username,
392
        password=ssh_password,
393
    )
394