Passed
Pull Request — master (#154)
by
unknown
01:16
created

gvmtools.parser.create_connection()   A

Complexity

Conditions 3

Size

Total Lines 32
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 28
dl 0
loc 32
rs 9.208
c 0
b 0
f 0
cc 3
nop 11

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 - 2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
"""Command Line Interface Parser
20
"""
21
22
import argparse
23
import configparser
24
import logging
25
import os
26
27
from gvm import get_version as get_gvm_version
28
from gvm.connections import (
29
    DEFAULT_UNIX_SOCKET_PATH,
30
    DEFAULT_TIMEOUT,
31
    DEFAULT_GVM_PORT,
32
    SSHConnection,
33
    TLSConnection,
34
    UnixSocketConnection,
35
)
36
37
from gvmtools import get_version
38
39
logger = logging.getLogger(__name__)
40
41
__version__ = get_version()
42
__api_version__ = get_gvm_version()
43
44
DEFAULT_CONFIG_PATH = '~/.config/gvm-tools.conf'
45
46
PROTOCOL_OSP = 'OSP'
47
PROTOCOL_GMP = 'GMP'
48
DEFAULT_PROTOCOL = PROTOCOL_GMP
49
50
51
def _filter_actions(actions, actiontypes):
52
    return [action for action in actions if not isinstance(action, actiontypes)]
53
54
55
class Subparser(argparse.ArgumentParser):
56
    """An ArgumentParser child class to allow better Subparser help formatting
57
58
    This class overrides the format_help method of ArgumentParser.
59
60
    It adds the actions of a parent parser to the usage output by skipping the
61
    _SubParserActions.
62
    """
63
64
    def __init__(self, parent=None, **kwargs):
65
        super().__init__(**kwargs)
66
67
        self._parent = parent
68
69
    def format_help(self):
70
        # pylint: disable=protected-access
71
72
        # this code may break with changes in argparse
73
74
        formatter = self._get_formatter()
75
76
        if self._parent:
77
            actions = _filter_actions(
78
                self._parent._actions, argparse._SubParsersAction
79
            )
80
            actions.extend(_filter_actions(self._actions, argparse._HelpAction))
81
        else:
82
            actions = self._actions
83
84
        formatter.add_usage(
85
            self.usage, actions, self._mutually_exclusive_groups
86
        )
87
88
        for i, action_group in enumerate(self._action_groups):
89
            formatter.start_section(action_group.title)
90
            formatter.add_text(action_group.description)
91
92
            if self._parent and len(self._parent._action_groups) > i:
93
                parent_action_group = self._parent._action_groups[i]
94
                formatter.add_arguments(parent_action_group._group_actions)
95
96
            formatter.add_arguments(
97
                _filter_actions(
98
                    action_group._group_actions, argparse._HelpAction
99
                )
100
            )
101
            formatter.end_section()
102
103
        # description
104
        formatter.add_text(self.description)
105
106
        # epilog
107
        formatter.add_text(self.epilog)
108
109
        return formatter.format_help()
110
111
112
class CliParser:
113
    def __init__(
114
        self, description, logfilename, *, prog=None, ignore_config=False
115
    ):
116
        root_parser = argparse.ArgumentParser(
117
            prog=prog,
118
            description=description,
119
            formatter_class=argparse.RawTextHelpFormatter,
120
            # don't parse help initially. the args from parser wouldn't be shown
121
            add_help=False,
122
        )
123
124
        root_parser.add_argument(
125
            '-c',
126
            '--config',
127
            nargs='?',
128
            default=DEFAULT_CONFIG_PATH,
129
            help='Configuration file path (default: %(default)s)',
130
        )
131
        root_parser.add_argument(
132
            '--log',
133
            nargs='?',
134
            dest='loglevel',
135
            const='INFO',
136
            choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
137
            help='Activate logging (default level: %(default)s)',
138
        )
139
140
        parser = argparse.ArgumentParser(prog=prog, parents=[root_parser])
141
142
        parser.add_argument(
143
            '--timeout',
144
            required=False,
145
            default=DEFAULT_TIMEOUT,
146
            type=int,
147
            help='Response timeout in seconds, or -1 to wait '
148
            'indefinitely (default: %(default)s)',
149
        )
150
        parser.add_argument(
151
            '--gmp-username',
152
            help='Username for GMP service (default: %(default)r)',
153
        )
154
        parser.add_argument(
155
            '--gmp-password',
156
            help='Password for GMP service (default: %(default)r)',
157
        )
158
        parser.add_argument(
159
            '-V',
160
            '--version',
161
            action='version',
162
            version='%(prog)s {version} (API version {apiversion})'.format(
163
                version=__version__, apiversion=__api_version__
164
            ),
165
            help='Show version information and exit',
166
        )
167
168
        subparsers = parser.add_subparsers(
169
            metavar='CONNECTION_TYPE',
170
            title='connections',
171
            description='valid connection types',
172
            help="Connection type to use",
173
            parser_class=Subparser,
174
        )
175
        subparsers.required = True
176
        subparsers.dest = 'connection_type'
177
178
        self._subparsers = subparsers
179
180
        self._parser = parser
181
        self._root_parser = root_parser
182
183
        self._logfilename = logfilename
184
        self._ignore_config = ignore_config
185
186
        self._add_subparsers()
187
188
    def parse_args(self, args=None):
189
        args_before, _ = self._root_parser.parse_known_args(args)
190
191
        if args_before.loglevel is not None:
192
            level = logging.getLevelName(args_before.loglevel)
193
            logging.basicConfig(filename=self._logfilename, level=level)
194
195
        self._set_defaults(None if self._ignore_config else args_before.config)
196
197
        args = self._parser.parse_args(args)
198
199
        # If timeout value is -1, then the socket should have no timeout
200
        if args.timeout == -1:
201
            args.timeout = None
202
203
        logging.debug('Parsed arguments %r', args)
204
205
        return args
206
207
    def add_argument(self, *args, **kwargs):
208
        self._parser.add_argument(*args, **kwargs)
209
210
    def add_protocol_argument(self):
211
        self.add_argument(
212
            '--protocol',
213
            required=False,
214
            default=DEFAULT_PROTOCOL,
215
            choices=[PROTOCOL_GMP, PROTOCOL_OSP],
216
            help='Service protocol to use (default: %(default)s)',
217
        )
218
219
    def _load_config(self, configfile):
220
        config = configparser.ConfigParser(default_section='main')
221
222
        if not configfile:
223
            return config
224
225
        try:
226
            path = os.path.expanduser(configfile)
227
            config.read(path)
228
            logger.debug('Loaded config %s', configfile)
229
        except Exception as e:  # pylint: disable=broad-except
230
            raise RuntimeError(
231
                'Error while parsing config file {config}. Error was '
232
                '{message}'.format(config=configfile, message=e)
233
            )
234
235
        return config
236
237
    def _add_subparsers(self):
238
        parser_ssh = self._subparsers.add_parser(
239
            'ssh', help='Use SSH to connect to service', parent=self._parser
240
        )
241
242
        parser_ssh.add_argument(
243
            '--hostname', required=True, help='Hostname or IP address'
244
        )
245
        parser_ssh.add_argument(
246
            '--port',
247
            required=False,
248
            help='SSH port (default: %(default)s)',
249
            type=int,
250
        )
251
        parser_ssh.add_argument(
252
            '--ssh-username', help='SSH username (default: %(default)r)'
253
        )
254
        parser_ssh.add_argument(
255
            '--ssh-password', help='SSH password (default: %(default)r)'
256
        )
257
258
        parser_tls = self._subparsers.add_parser(
259
            'tls',
260
            help='Use TLS secured connection to connect to service',
261
            parent=self._parser,
262
        )
263
        parser_tls.add_argument(
264
            '--hostname', required=True, help='Hostname or IP address'
265
        )
266
        parser_tls.add_argument(
267
            '--port',
268
            required=False,
269
            help='GMP/OSP port (default: %(default)s)',
270
            type=int,
271
        )
272
        parser_tls.add_argument(
273
            '--certfile',
274
            required=False,
275
            help='Path to the certificate file for client authentication. '
276
            '(default: %(default)s)',
277
        )
278
        parser_tls.add_argument(
279
            '--keyfile',
280
            required=False,
281
            help='Path to key file for client authentication. '
282
            '(default: %(default)s)',
283
        )
284
        parser_tls.add_argument(
285
            '--cafile',
286
            required=False,
287
            help='Path to CA certificate for server authentication. '
288
            '(default: %(default)s)',
289
        )
290
        parser_tls.add_argument(
291
            '--no-credentials',
292
            required=False,
293
            default=False,
294
            action='store_true',
295
            help='Use only certificates for authentication',
296
        )
297
298
        parser_socket = self._subparsers.add_parser(
299
            'socket',
300
            help='Use UNIX Domain socket to connect to service',
301
            parent=self._parser,
302
        )
303
304
        socketpath_group = parser_socket.add_mutually_exclusive_group()
305
        socketpath_group.add_argument(
306
            '--sockpath',
307
            nargs='?',
308
            default=None,
309
            help='Deprecated, use --socketpath instead',
310
        )
311
        socketpath_group.add_argument(
312
            '--socketpath',
313
            nargs='?',
314
            help='Path to UNIX Domain socket (default: %(default)s)',
315
        )
316
317
        self._parser_ssh = parser_ssh
318
        self._parser_socket = parser_socket
319
        self._parser_tls = parser_tls
320
321
    def _set_defaults(self, configfilename=None):
322
        self._config = self._load_config(configfilename)
323
324
        self._parser.set_defaults(
325
            gmp_username=self._config.get('gmp', 'username', fallback=''),
326
            gmp_password=self._config.get('gmp', 'password', fallback=''),
327
            **self._config.defaults()
328
        )
329
330
        self._parser_ssh.set_defaults(
331
            port=int(self._config.get('ssh', 'port', fallback=22)),
332
            ssh_username=self._config.get('ssh', 'username', fallback='gmp'),
333
            ssh_password=self._config.get('ssh', 'password', fallback='gmp'),
334
        )
335
        self._parser_tls.set_defaults(
336
            port=int(
337
                self._config.get('tls', 'port', fallback=DEFAULT_GVM_PORT)
338
            ),
339
            certfile=self._config.get('tls', 'certfile', fallback=None),
340
            keyfile=self._config.get('tls', 'keyfile', fallback=None),
341
            cafile=self._config.get('tls', 'cafile', fallback=None),
342
        )
343
        self._parser_socket.set_defaults(
344
            socketpath=self._config.get(
345
                'unixsocket', 'socketpath', fallback=DEFAULT_UNIX_SOCKET_PATH
346
            )
347
        )
348
349
350
def create_parser(description, logfilename):
351
    return CliParser(description, logfilename)
352
353
354
def create_connection(
355
    connection_type,
356
    socketpath=None,
357
    timeout=None,
358
    hostname=None,
359
    port=None,
360
    certfile=None,
361
    keyfile=None,
362
    cafile=None,
363
    ssh_username=None,
364
    ssh_password=None,
365
    **kwargs  # pylint: disable=unused-argument
366
):
367
    if 'socket' in connection_type:
368
        return UnixSocketConnection(timeout=timeout, path=socketpath)
369
370
    if 'tls' in connection_type:
371
        return TLSConnection(
372
            timeout=timeout,
373
            hostname=hostname,
374
            port=port,
375
            certfile=certfile,
376
            keyfile=keyfile,
377
            cafile=cafile,
378
        )
379
380
    return SSHConnection(
381
        timeout=timeout,
382
        hostname=hostname,
383
        port=port,
384
        username=ssh_username,
385
        password=ssh_password,
386
    )
387