Passed
Pull Request — master (#154)
by
unknown
01:13
created

gvmtools.parser.create_connection()   A

Complexity

Conditions 3

Size

Total Lines 32
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 28
dl 0
loc 32
rs 9.208
c 0
b 0
f 0
cc 3
nop 11

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 - 2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
"""Command Line Interface Parser
20
"""
21
22
import argparse
23
import configparser
24
import logging
25
import os
26
27
from gvm import get_version as get_gvm_version
28
from gvm.connections import (
29
    DEFAULT_UNIX_SOCKET_PATH,
30
    DEFAULT_TIMEOUT,
31
    DEFAULT_GVM_PORT,
32
    SSHConnection,
33
    TLSConnection,
34
    UnixSocketConnection,
35
)
36
37
from gvmtools import get_version
38
39
logger = logging.getLogger(__name__)
40
41
__version__ = get_version()
42
__api_version__ = get_gvm_version()
43
44
DEFAULT_CONFIG_PATH = '~/.config/gvm-tools.conf'
45
46
PROTOCOL_OSP = 'OSP'
47
PROTOCOL_GMP = 'GMP'
48
DEFAULT_PROTOCOL = PROTOCOL_GMP
49
50
51
class CliParser:
52
    def __init__(self, description, logfilename, ignore_config=False):
53
        root_parser = argparse.ArgumentParser(
54
            description=description,
55
            formatter_class=argparse.RawTextHelpFormatter,
56
            add_help=False,
57
        )
58
59
        root_parser.add_argument(
60
            '-c',
61
            '--config',
62
            nargs='?',
63
            default=DEFAULT_CONFIG_PATH,
64
            help='Configuration file path (default: %(default)s)',
65
        )
66
        root_parser.add_argument(
67
            '--log',
68
            nargs='?',
69
            dest='loglevel',
70
            const='INFO',
71
            choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
72
            help='Activate logging (default level: %(default)s)',
73
        )
74
75
        self._root_parser = root_parser
76
77
        self._logfilename = logfilename
78
        self._ignore_config = ignore_config
79
80
    def parse_args(self, args=None):
81
        args_before, _ = self._root_parser.parse_known_args(args)
82
83
        if args_before.loglevel is not None:
84
            level = logging.getLevelName(args_before.loglevel)
85
            logging.basicConfig(filename=self._logfilename, level=level)
86
87
        self._config = self._load_config(
88
            None if self._ignore_config else args_before.config
89
        )
90
91
        parser = argparse.ArgumentParser(parents=[self._root_parser])
92
93
        parser.add_argument(
94
            '--timeout',
95
            required=False,
96
            default=DEFAULT_TIMEOUT,
97
            type=int,
98
            help='Response timeout in seconds, or -1 to wait '
99
            'indefinitely (default: %(default)s)',
100
        )
101
        parser.add_argument(
102
            '--gmp-username',
103
            help='Username for GMP service (default: %(default)r)',
104
        )
105
        parser.add_argument(
106
            '--gmp-password',
107
            help='Password for GMP service (default: %(default)r)',
108
        )
109
        parser.add_argument(
110
            '-V',
111
            '--version',
112
            action='version',
113
            version='%(prog)s {version} (API version {apiversion})'.format(
114
                version=__version__, apiversion=__api_version__
115
            ),
116
            help='Show version information and exit',
117
        )
118
119
        subparsers = parser.add_subparsers(
120
            metavar='CONNECTION_TYPE',
121
            title='connections',
122
            description='valid connection types',
123
            help="Connection type to use",
124
        )
125
        subparsers.required = True
126
        subparsers.dest = 'connection_type'
127
128
        self._parser = parser
129
        self._subparsers = subparsers
130
131
        self._add_subparsers()
132
133
        self._parser.set_defaults(
134
            gmp_username=self._config.get('gmp', 'username', fallback=''),
135
            gmp_password=self._config.get('gmp', 'password', fallback=''),
136
            **self._config.defaults()
137
        )
138
139
        args = self._parser.parse_args(args)
140
141
        # If timeout value is -1, then the socket should have no timeout
142
        if args.timeout == -1:
143
            args.timeout = None
144
145
        logging.debug('Parsed arguments %r', args)
146
147
        return args
148
149
    def add_argument(self, *args, **kwargs):
150
        self._parser.add_argument(*args, **kwargs)
151
152
    def _load_config(self, configfile):
153
        config = configparser.ConfigParser(default_section='main')
154
155
        if not configfile:
156
            return config
157
158
        try:
159
            path = os.path.expanduser(configfile)
160
            config.read(path)
161
            logger.debug('Loaded config %s', configfile)
162
        except Exception as e:  # pylint: disable=broad-except
163
            raise RuntimeError(
164
                'Error while parsing config file {config}. Error was '
165
                '{message}'.format(config=configfile, message=e)
166
            )
167
168
        return config
169
170
    def _add_subparsers(self):
171
        parser_ssh = self._subparsers.add_parser(
172
            'ssh',
173
            help='Use SSH to connect to service',
174
            # parents=[self._root_parser],
175
        )
176
177
        parser_ssh.add_argument(
178
            '--hostname', required=True, help='Hostname or IP address'
179
        )
180
        parser_ssh.add_argument(
181
            '--port',
182
            required=False,
183
            help='SSH port (default: %(default)s)',
184
            type=int,
185
        )
186
        parser_ssh.add_argument(
187
            '--ssh-username', help='SSH username (default: %(default)r)'
188
        )
189
        parser_ssh.add_argument(
190
            '--ssh-password', help='SSH password (default: %(default)r)'
191
        )
192
193
        parser_ssh.set_defaults(
194
            port=int(self._config.get('ssh', 'port', fallback=22)),
195
            ssh_username=self._config.get('ssh', 'username', fallback='gmp'),
196
            ssh_password=self._config.get('ssh', 'password', fallback='gmp'),
197
        )
198
199
        parser_tls = self._subparsers.add_parser(
200
            'tls',
201
            help='Use TLS secured connection to connect to service',
202
            # parents=[self._root_parser],
203
        )
204
        parser_tls.add_argument(
205
            '--hostname', required=True, help='Hostname or IP address'
206
        )
207
        parser_tls.add_argument(
208
            '--port',
209
            required=False,
210
            help='GMP/OSP port (default: %(default)s)',
211
            type=int,
212
        )
213
        parser_tls.add_argument(
214
            '--certfile',
215
            required=False,
216
            help='Path to the certificate file for client authentication. '
217
            '(default: %(default)s)',
218
        )
219
        parser_tls.add_argument(
220
            '--keyfile',
221
            required=False,
222
            help='Path to key file for client authentication. '
223
            '(default: %(default)s)',
224
        )
225
        parser_tls.add_argument(
226
            '--cafile',
227
            required=False,
228
            help='Path to CA certificate for server authentication. '
229
            '(default: %(default)s)',
230
        )
231
        parser_tls.add_argument(
232
            '--no-credentials',
233
            required=False,
234
            default=False,
235
            action='store_true',
236
            help='Use only certificates for authentication',
237
        )
238
        parser_tls.set_defaults(
239
            port=int(
240
                self._config.get('tls', 'port', fallback=DEFAULT_GVM_PORT)
241
            ),
242
            certfile=self._config.get('tls', 'certfile', fallback=None),
243
            keyfile=self._config.get('tls', 'keyfile', fallback=None),
244
            cafile=self._config.get('tls', 'cafile', fallback=None),
245
        )
246
247
        parser_socket = self._subparsers.add_parser(
248
            'socket',
249
            help='Use UNIX Domain socket to connect to service',
250
            # parents=[self._root_parser],
251
        )
252
253
        socketpath_group = parser_socket.add_mutually_exclusive_group()
254
        socketpath_group.add_argument(
255
            '--sockpath',
256
            nargs='?',
257
            default=None,
258
            help='Deprecated, use --socketpath instead',
259
        )
260
        socketpath_group.add_argument(
261
            '--socketpath',
262
            nargs='?',
263
            help='Path to UNIX Domain socket (default: %(default)s)',
264
        )
265
266
        parser_socket.set_defaults(
267
            socketpath=self._config.get(
268
                'unixsocket', 'socketpath', fallback=DEFAULT_UNIX_SOCKET_PATH
269
            )
270
        )
271
272
    def add_protocol_argument(self):
273
        self.add_argument(
274
            '--protocol',
275
            required=False,
276
            default=DEFAULT_PROTOCOL,
277
            choices=[PROTOCOL_GMP, PROTOCOL_OSP],
278
            help='Service protocol to use (default: %(default)s)',
279
        )
280
281
282
def create_parser(description, logfilename):
283
    return CliParser(description, logfilename)
284
285
286
def create_connection(
287
    connection_type,
288
    socketpath=None,
289
    timeout=None,
290
    hostname=None,
291
    port=None,
292
    certfile=None,
293
    keyfile=None,
294
    cafile=None,
295
    ssh_username=None,
296
    ssh_password=None,
297
    **kwargs  # pylint: disable=unused-argument
298
):
299
    if 'socket' in connection_type:
300
        return UnixSocketConnection(timeout=timeout, path=socketpath)
301
302
    if 'tls' in connection_type:
303
        return TLSConnection(
304
            timeout=timeout,
305
            hostname=hostname,
306
            port=port,
307
            certfile=certfile,
308
            keyfile=keyfile,
309
            cafile=cafile,
310
        )
311
312
    return SSHConnection(
313
        timeout=timeout,
314
        hostname=hostname,
315
        port=port,
316
        username=ssh_username,
317
        password=ssh_password,
318
    )
319