Passed
Pull Request — master (#185)
by Juan José
01:09
created

gvmtools.parser.create_connection()   A

Complexity

Conditions 3

Size

Total Lines 32
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 28
dl 0
loc 32
rs 9.208
c 0
b 0
f 0
cc 3
nop 11

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 - 2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
"""Command Line Interface Parser
20
"""
21
22
import argparse
23
import logging
24
25
from pathlib import Path
26
27
from gvm import get_version as get_gvm_version
28
from gvm.connections import (
29
    DEFAULT_TIMEOUT,
30
    SSHConnection,
31
    TLSConnection,
32
    UnixSocketConnection,
33
)
34
35
from gvmtools import get_version
36
from gvmtools.config import Config
37
38
logger = logging.getLogger(__name__)
39
40
__version__ = get_version()
41
__api_version__ = get_gvm_version()
42
43
DEFAULT_CONFIG_PATH = '~/.config/gvm-tools.conf'
44
45
PROTOCOL_OSP = 'OSP'
46
PROTOCOL_GMP = 'GMP'
47
DEFAULT_PROTOCOL = PROTOCOL_GMP
48
49
50
def _filter_actions(actions, actiontypes):
51
    return [action for action in actions if not isinstance(action, actiontypes)]
52
53
54
class Subparser(argparse.ArgumentParser):
55
    """An ArgumentParser child class to allow better Subparser help formatting
56
57
    This class overrides the format_help method of ArgumentParser.
58
59
    It adds the actions of a parent parser to the usage output by skipping the
60
    _SubParserActions.
61
    """
62
63
    def __init__(self, parent=None, **kwargs):
64
        super().__init__(**kwargs)
65
66
        self._parent = parent
67
68
    def format_help(self):
69
        # pylint: disable=protected-access
70
71
        # this code may break with changes in argparse
72
73
        formatter = self._get_formatter()
74
75
        if self._parent:
76
            actions = _filter_actions(
77
                self._parent._actions, argparse._SubParsersAction
78
            )
79
            actions.extend(_filter_actions(self._actions, argparse._HelpAction))
80
        else:
81
            actions = self._actions
82
83
        formatter.add_usage(
84
            self.usage, actions, self._mutually_exclusive_groups
85
        )
86
87
        for i, action_group in enumerate(self._action_groups):
88
            formatter.start_section(action_group.title)
89
            formatter.add_text(action_group.description)
90
91
            if self._parent and len(self._parent._action_groups) > i:
92
                parent_action_group = self._parent._action_groups[i]
93
                formatter.add_arguments(parent_action_group._group_actions)
94
95
            formatter.add_arguments(
96
                _filter_actions(
97
                    action_group._group_actions, argparse._HelpAction
98
                )
99
            )
100
            formatter.end_section()
101
102
        # description
103
        formatter.add_text(self.description)
104
105
        # epilog
106
        formatter.add_text(self.epilog)
107
108
        return formatter.format_help()
109
110
111
class CliParser:
112
    def __init__(
113
        self, description, logfilename, *, prog=None, ignore_config=False
114
    ):
115
        root_parser = argparse.ArgumentParser(
116
            prog=prog,
117
            description=description,
118
            formatter_class=argparse.RawTextHelpFormatter,
119
            # don't parse help initially. the args from parser wouldn't be shown
120
            add_help=False,
121
        )
122
123
        root_parser.add_argument(
124
            '-c',
125
            '--config',
126
            nargs='?',
127
            default=DEFAULT_CONFIG_PATH,
128
            help='Configuration file path (default: %(default)s)',
129
        )
130
        root_parser.add_argument(
131
            '--log',
132
            nargs='?',
133
            dest='loglevel',
134
            const='INFO',
135
            choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
136
            help='Activate logging (default level: %(default)s)',
137
        )
138
139
        parser = argparse.ArgumentParser(prog=prog, parents=[root_parser])
140
141
        parser.add_argument(
142
            '--timeout',
143
            required=False,
144
            default=DEFAULT_TIMEOUT,
145
            type=int,
146
            help='Response timeout in seconds, or -1 to wait '
147
            'indefinitely (default: %(default)s)',
148
        )
149
        parser.add_argument(
150
            '--gmp-username',
151
            help='Username for GMP service (default: %(default)r)',
152
        )
153
        parser.add_argument(
154
            '--gmp-password',
155
            help='Password for GMP service (default: %(default)r)',
156
        )
157
        parser.add_argument(
158
            '-V',
159
            '--version',
160
            action='version',
161
            version='%(prog)s {version} (API version {apiversion})'.format(
162
                version=__version__, apiversion=__api_version__
163
            ),
164
            help='Show version information and exit',
165
        )
166
167
        subparsers = parser.add_subparsers(
168
            metavar='CONNECTION_TYPE',
169
            title='connections',
170
            description='valid connection types',
171
            help="Connection type to use",
172
            parser_class=Subparser,
173
        )
174
        subparsers.required = True
175
        subparsers.dest = 'connection_type'
176
177
        self._subparsers = subparsers
178
179
        self._parser = parser
180
        self._root_parser = root_parser
181
182
        self._logfilename = logfilename
183
        self._ignore_config = ignore_config
184
185
        self._add_subparsers()
186
187 View Code Duplication
    def parse_args(self, args=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
188
        args_before, _ = self._root_parser.parse_known_args(args)
189
190
        if args_before.loglevel is not None:
191
            level = logging.getLevelName(args_before.loglevel)
192
            logging.basicConfig(filename=self._logfilename, level=level)
193
194
        self._set_defaults(None if self._ignore_config else args_before.config)
195
196
        args = self._parser.parse_args(args)
197
198
        # If timeout value is -1, then the socket should have no timeout
199
        if args.timeout == -1:
200
            args.timeout = None
201
202
        logging.debug('Parsed arguments %r', args)
203
204
        return args
205
206 View Code Duplication
    def parse_known_args(self, args=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
207
        args_before, _ = self._root_parser.parse_known_args(args)
208
209
        if args_before.loglevel is not None:
210
            level = logging.getLevelName(args_before.loglevel)
211
            logging.basicConfig(filename=self._logfilename, level=level)
212
213
        self._set_defaults(None if self._ignore_config else args_before.config)
214
215
        args, script_args = self._parser.parse_known_args(args)
216
217
        # If timeout value is -1, then the socket should have no timeout
218
        if args.timeout == -1:
219
            args.timeout = None
220
221
        logging.debug('Parsed arguments %r', args)
222
223
        return args, script_args
224
225
    def add_argument(self, *args, **kwargs):
226
        self._parser_socket.add_argument(*args, **kwargs)
227
        self._parser_ssh.add_argument(*args, **kwargs)
228
        self._parser_tls.add_argument(*args, **kwargs)
229
230
    def add_protocol_argument(self):
231
        self._parser.add_argument(
232
            '--protocol',
233
            required=False,
234
            default=DEFAULT_PROTOCOL,
235
            choices=[PROTOCOL_GMP, PROTOCOL_OSP],
236
            help='Service protocol to use (default: %(default)s)',
237
        )
238
239
    def _load_config(self, configfile):
240
        config = Config()
241
242
        if not configfile:
243
            return config
244
245
        configpath = Path(configfile)
246
247
        try:
248
            if not configpath.expanduser().resolve().exists():
249
                logger.debug('Ignoring non existing config file %s', configfile)
250
                return config
251
        except FileNotFoundError:
252
            # we are on python 3.5 and Path.resolve raised a FileNotFoundError
253
            logger.debug('Ignoring non existing config file %s', configfile)
254
            return config
255
256
        try:
257
            config.load(configpath)
258
            logger.debug('Loaded config %s', configfile)
259
        except Exception as e:  # pylint: disable=broad-except
260
            raise RuntimeError(
261
                'Error while parsing config file {config}. Error was '
262
                '{message}'.format(config=configfile, message=e)
263
            )
264
265
        return config
266
267
    def _add_subparsers(self):
268
        parser_ssh = self._subparsers.add_parser(
269
            'ssh', help='Use SSH to connect to service', parent=self._parser
270
        )
271
272
        parser_ssh.add_argument(
273
            '--hostname', required=True, help='Hostname or IP address'
274
        )
275
        parser_ssh.add_argument(
276
            '--port',
277
            required=False,
278
            help='SSH port (default: %(default)s)',
279
            type=int,
280
        )
281
        parser_ssh.add_argument(
282
            '--ssh-username', help='SSH username (default: %(default)r)'
283
        )
284
        parser_ssh.add_argument(
285
            '--ssh-password', help='SSH password (default: %(default)r)'
286
        )
287
288
        parser_tls = self._subparsers.add_parser(
289
            'tls',
290
            help='Use TLS secured connection to connect to service',
291
            parent=self._parser,
292
        )
293
        parser_tls.add_argument(
294
            '--hostname', required=True, help='Hostname or IP address'
295
        )
296
        parser_tls.add_argument(
297
            '--port',
298
            required=False,
299
            help='GMP/OSP port (default: %(default)s)',
300
            type=int,
301
        )
302
        parser_tls.add_argument(
303
            '--certfile',
304
            required=False,
305
            help='Path to the certificate file for client authentication. '
306
            '(default: %(default)s)',
307
        )
308
        parser_tls.add_argument(
309
            '--keyfile',
310
            required=False,
311
            help='Path to key file for client authentication. '
312
            '(default: %(default)s)',
313
        )
314
        parser_tls.add_argument(
315
            '--cafile',
316
            required=False,
317
            help='Path to CA certificate for server authentication. '
318
            '(default: %(default)s)',
319
        )
320
        parser_tls.add_argument(
321
            '--no-credentials',
322
            required=False,
323
            default=False,
324
            action='store_true',
325
            help='Use only certificates for authentication',
326
        )
327
328
        parser_socket = self._subparsers.add_parser(
329
            'socket',
330
            help='Use UNIX Domain socket to connect to service',
331
            parent=self._parser,
332
        )
333
334
        socketpath_group = parser_socket.add_mutually_exclusive_group()
335
        socketpath_group.add_argument(
336
            '--sockpath',
337
            nargs='?',
338
            default=None,
339
            help='Deprecated, use --socketpath instead',
340
        )
341
        socketpath_group.add_argument(
342
            '--socketpath',
343
            nargs='?',
344
            help='Path to UNIX Domain socket (default: %(default)s)',
345
        )
346
347
        self._parser_ssh = parser_ssh
348
        self._parser_socket = parser_socket
349
        self._parser_tls = parser_tls
350
351
    def _set_defaults(self, configfilename=None):
352
        self._config = self._load_config(configfilename)
353
354
        self._parser.set_defaults(
355
            gmp_username=self._config.get('gmp', 'username'),
356
            gmp_password=self._config.get('gmp', 'password'),
357
            **self._config.defaults()
358
        )
359
360
        self._parser_ssh.set_defaults(
361
            port=int(self._config.get('ssh', 'port')),
362
            ssh_username=self._config.get('ssh', 'username'),
363
            ssh_password=self._config.get('ssh', 'password'),
364
        )
365
        self._parser_tls.set_defaults(
366
            port=int(self._config.get('tls', 'port')),
367
            certfile=self._config.get('tls', 'certfile'),
368
            keyfile=self._config.get('tls', 'keyfile'),
369
            cafile=self._config.get('tls', 'cafile'),
370
        )
371
        self._parser_socket.set_defaults(
372
            socketpath=self._config.get('unixsocket', 'socketpath')
373
        )
374
375
376
def create_parser(description, logfilename):
377
    return CliParser(description, logfilename)
378
379
380
def create_connection(
381
    connection_type,
382
    socketpath=None,
383
    timeout=None,
384
    hostname=None,
385
    port=None,
386
    certfile=None,
387
    keyfile=None,
388
    cafile=None,
389
    ssh_username=None,
390
    ssh_password=None,
391
    **kwargs  # pylint: disable=unused-argument
392
):
393
    if 'socket' in connection_type:
394
        return UnixSocketConnection(timeout=timeout, path=socketpath)
395
396
    if 'tls' in connection_type:
397
        return TLSConnection(
398
            timeout=timeout,
399
            hostname=hostname,
400
            port=port,
401
            certfile=certfile,
402
            keyfile=keyfile,
403
            cafile=cafile,
404
        )
405
406
    return SSHConnection(
407
        timeout=timeout,
408
        hostname=hostname,
409
        port=port,
410
        username=ssh_username,
411
        password=ssh_password,
412
    )
413