Completed
Push — master ( dc752d...7abd25 )
by
unknown
14s queued 11s
created

gvmtools.parser.create_connection()   A

Complexity

Conditions 3

Size

Total Lines 32
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 28
dl 0
loc 32
rs 9.208
c 0
b 0
f 0
cc 3
nop 11

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 - 2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
"""Command Line Interface Parser
20
"""
21
22
import argparse
23
import logging
24
25
from pathlib import Path
26
27
from gvm import get_version as get_gvm_version
28
from gvm.connections import (
29
    DEFAULT_TIMEOUT,
30
    SSHConnection,
31
    TLSConnection,
32
    UnixSocketConnection,
33
)
34
35
from gvmtools import get_version
36
from gvmtools.config import Config
37
38
logger = logging.getLogger(__name__)
39
40
__version__ = get_version()
41
__api_version__ = get_gvm_version()
42
43
DEFAULT_CONFIG_PATH = '~/.config/gvm-tools.conf'
44
45
PROTOCOL_OSP = 'OSP'
46
PROTOCOL_GMP = 'GMP'
47
DEFAULT_PROTOCOL = PROTOCOL_GMP
48
49
50
class CliParser:
51
    def __init__(
52
        self, description, logfilename, *, prog=None, ignore_config=False
53
    ):
54
        bootstrap_parser = argparse.ArgumentParser(
55
            prog=prog,
56
            description=description,
57
            formatter_class=argparse.RawTextHelpFormatter,
58
            # don't parse help initially. the args from parser wouldn't be shown
59
            add_help=False,
60
        )
61
62
        bootstrap_parser.add_argument(
63
            '-c',
64
            '--config',
65
            nargs='?',
66
            default=DEFAULT_CONFIG_PATH,
67
            help='Configuration file path (default: %(default)s)',
68
        )
69
        bootstrap_parser.add_argument(
70
            '--log',
71
            nargs='?',
72
            dest='loglevel',
73
            const='INFO',
74
            choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
75
            help='Activate logging (default level: %(default)s)',
76
        )
77
78
        parser = argparse.ArgumentParser(prog=prog, parents=[bootstrap_parser])
79
80
        parser.add_argument(
81
            '--timeout',
82
            required=False,
83
            default=DEFAULT_TIMEOUT,
84
            type=int,
85
            help='Response timeout in seconds, or -1 to wait '
86
            'indefinitely (default: %(default)s)',
87
        )
88
        parser.add_argument(
89
            '--gmp-username',
90
            help='Username for GMP service (default: %(default)r)',
91
        )
92
        parser.add_argument(
93
            '--gmp-password',
94
            help='Password for GMP service (default: %(default)r)',
95
        )
96
        parser.add_argument(
97
            '-V',
98
            '--version',
99
            action='version',
100
            version='%(prog)s {version} (API version {apiversion})'.format(
101
                version=__version__, apiversion=__api_version__
102
            ),
103
            help='Show version information and exit',
104
        )
105
106
        subparsers = parser.add_subparsers(
107
            metavar='CONNECTION_TYPE',
108
            title='connections',
109
            description='valid connection types',
110
            help="Connection type to use",
111
        )
112
        subparsers.required = True
113
        subparsers.dest = 'connection_type'
114
115
        self._subparsers = subparsers
116
117
        self._parser = parser
118
        self._bootstrap_parser = bootstrap_parser
119
120
        self._logfilename = logfilename
121
        self._ignore_config = ignore_config
122
123
        self._add_subparsers()
124
125
    def parse_args(self, args=None):
126
        args, unkown_args = self.parse_known_args(args)
127
        if unkown_args:
128
            self._parser.error(
129
                'unrecognized arguments {}'.format(' '.join(unkown_args))
130
            )
131
        return args
132
133
    def parse_known_args(self, args=None):
134
        args_before, _ = self._bootstrap_parser.parse_known_args(args)
135
136
        if args_before.loglevel is not None:
137
            level = logging.getLevelName(args_before.loglevel)
138
            logging.basicConfig(filename=self._logfilename, level=level)
139
140
        self._set_defaults(None if self._ignore_config else args_before.config)
141
142
        args, unknown_args = self._parser.parse_known_args(args)
143
144
        # If timeout value is -1, then the socket should have no timeout
145
        if args.timeout == -1:
146
            args.timeout = None
147
148
        logging.debug('Parsed arguments %r', args)
149
150
        return args, unknown_args
151
152
    def add_argument(self, *args, **kwargs):
153
        self._parser_socket.add_argument(*args, **kwargs)
154
        self._parser_ssh.add_argument(*args, **kwargs)
155
        self._parser_tls.add_argument(*args, **kwargs)
156
157
    def add_protocol_argument(self):
158
        self._parser.add_argument(
159
            '--protocol',
160
            required=False,
161
            default=DEFAULT_PROTOCOL,
162
            choices=[PROTOCOL_GMP, PROTOCOL_OSP],
163
            help='Service protocol to use (default: %(default)s)',
164
        )
165
166
    def _load_config(self, configfile):
167
        config = Config()
168
169
        if not configfile:
170
            return config
171
172
        configpath = Path(configfile)
173
174
        try:
175
            if not configpath.expanduser().resolve().exists():
176
                logger.debug('Ignoring non existing config file %s', configfile)
177
                return config
178
        except FileNotFoundError:
179
            # we are on python 3.5 and Path.resolve raised a FileNotFoundError
180
            logger.debug('Ignoring non existing config file %s', configfile)
181
            return config
182
183
        try:
184
            config.load(configpath)
185
            logger.debug('Loaded config %s', configfile)
186
        except Exception as e:  # pylint: disable=broad-except
187
            raise RuntimeError(
188
                'Error while parsing config file {config}. Error was '
189
                '{message}'.format(config=configfile, message=e)
190
            )
191
192
        return config
193
194
    def _add_subparsers(self):
195
        parser_ssh = self._subparsers.add_parser(
196
            'ssh', help='Use SSH to connect to service'
197
        )
198
199
        parser_ssh.add_argument(
200
            '--hostname', required=True, help='Hostname or IP address'
201
        )
202
        parser_ssh.add_argument(
203
            '--port',
204
            required=False,
205
            help='SSH port (default: %(default)s)',
206
            type=int,
207
        )
208
        parser_ssh.add_argument(
209
            '--ssh-username', help='SSH username (default: %(default)r)'
210
        )
211
        parser_ssh.add_argument(
212
            '--ssh-password', help='SSH password (default: %(default)r)'
213
        )
214
215
        parser_tls = self._subparsers.add_parser(
216
            'tls', help='Use TLS secured connection to connect to service'
217
        )
218
        parser_tls.add_argument(
219
            '--hostname', required=True, help='Hostname or IP address'
220
        )
221
        parser_tls.add_argument(
222
            '--port',
223
            required=False,
224
            help='GMP/OSP port (default: %(default)s)',
225
            type=int,
226
        )
227
        parser_tls.add_argument(
228
            '--certfile',
229
            required=False,
230
            help='Path to the certificate file for client authentication. '
231
            '(default: %(default)s)',
232
        )
233
        parser_tls.add_argument(
234
            '--keyfile',
235
            required=False,
236
            help='Path to key file for client authentication. '
237
            '(default: %(default)s)',
238
        )
239
        parser_tls.add_argument(
240
            '--cafile',
241
            required=False,
242
            help='Path to CA certificate for server authentication. '
243
            '(default: %(default)s)',
244
        )
245
        parser_tls.add_argument(
246
            '--no-credentials',
247
            required=False,
248
            default=False,
249
            action='store_true',
250
            help='Use only certificates for authentication',
251
        )
252
253
        parser_socket = self._subparsers.add_parser(
254
            'socket', help='Use UNIX Domain socket to connect to service'
255
        )
256
257
        socketpath_group = parser_socket.add_mutually_exclusive_group()
258
        socketpath_group.add_argument(
259
            '--sockpath',
260
            nargs='?',
261
            default=None,
262
            help='Deprecated, use --socketpath instead',
263
        )
264
        socketpath_group.add_argument(
265
            '--socketpath',
266
            nargs='?',
267
            help='Path to UNIX Domain socket (default: %(default)s)',
268
        )
269
270
        self._parser_ssh = parser_ssh
271
        self._parser_socket = parser_socket
272
        self._parser_tls = parser_tls
273
274
    def _set_defaults(self, configfilename=None):
275
        self._config = self._load_config(configfilename)
276
277
        self._parser.set_defaults(
278
            gmp_username=self._config.get('gmp', 'username'),
279
            gmp_password=self._config.get('gmp', 'password'),
280
            **self._config.defaults()
281
        )
282
283
        self._parser_ssh.set_defaults(
284
            port=int(self._config.get('ssh', 'port')),
285
            ssh_username=self._config.get('ssh', 'username'),
286
            ssh_password=self._config.get('ssh', 'password'),
287
        )
288
        self._parser_tls.set_defaults(
289
            port=int(self._config.get('tls', 'port')),
290
            certfile=self._config.get('tls', 'certfile'),
291
            keyfile=self._config.get('tls', 'keyfile'),
292
            cafile=self._config.get('tls', 'cafile'),
293
        )
294
        self._parser_socket.set_defaults(
295
            socketpath=self._config.get('unixsocket', 'socketpath')
296
        )
297
298
299
def create_parser(description, logfilename):
300
    return CliParser(description, logfilename)
301
302
303
def create_connection(
304
    connection_type,
305
    socketpath=None,
306
    timeout=None,
307
    hostname=None,
308
    port=None,
309
    certfile=None,
310
    keyfile=None,
311
    cafile=None,
312
    ssh_username=None,
313
    ssh_password=None,
314
    **kwargs  # pylint: disable=unused-argument
315
):
316
    if 'socket' in connection_type:
317
        return UnixSocketConnection(timeout=timeout, path=socketpath)
318
319
    if 'tls' in connection_type:
320
        return TLSConnection(
321
            timeout=timeout,
322
            hostname=hostname,
323
            port=port,
324
            certfile=certfile,
325
            keyfile=keyfile,
326
            cafile=cafile,
327
        )
328
329
    return SSHConnection(
330
        timeout=timeout,
331
        hostname=hostname,
332
        port=port,
333
        username=ssh_username,
334
        password=ssh_password,
335
    )
336