Passed
Pull Request — master (#154)
by
unknown
01:13
created

gvmtools.parser.CliParser.__init__()   A

Complexity

Conditions 1

Size

Total Lines 27
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 21
dl 0
loc 27
rs 9.376
c 0
b 0
f 0
cc 1
nop 4
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 - 2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
"""Command Line Interface Parser
20
"""
21
22
import argparse
23
import configparser
24
import logging
25
import os
26
27
from gvm import get_version as get_gvm_version
28
from gvm.connections import (
29
    DEFAULT_UNIX_SOCKET_PATH,
30
    DEFAULT_TIMEOUT,
31
    DEFAULT_GVM_PORT,
32
    SSHConnection,
33
    TLSConnection,
34
    UnixSocketConnection,
35
)
36
37
from gvmtools import get_version
38
39
logger = logging.getLogger(__name__)
40
41
__version__ = get_version()
42
__api_version__ = get_gvm_version()
43
44
DEFAULT_CONFIG_PATH = '~/.config/gvm-tools.conf'
45
46
PROTOCOL_OSP = 'OSP'
47
PROTOCOL_GMP = 'GMP'
48
DEFAULT_PROTOCOL = PROTOCOL_GMP
49
50
51
class CliParser:
52
    def __init__(self, description, logfilename, ignore_config=False):
53
        root_parser = argparse.ArgumentParser(
54
            description=description,
55
            formatter_class=argparse.RawTextHelpFormatter,
56
            add_help=False,
57
        )
58
59
        root_parser.add_argument(
60
            '-c',
61
            '--config',
62
            nargs='?',
63
            default=DEFAULT_CONFIG_PATH,
64
            help='Configuration file path (default: %(default)s)',
65
        )
66
        root_parser.add_argument(
67
            '--log',
68
            nargs='?',
69
            dest='loglevel',
70
            const='INFO',
71
            choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
72
            help='Activate logging (default level: %(default)s)',
73
        )
74
75
        self._root_parser = root_parser
76
77
        self._logfilename = logfilename
78
        self._ignore_config = ignore_config
79
80
    def parse_args(self, args=None):
81
        args_before, _ = self._root_parser.parse_known_args(args)
82
83
        if args_before.loglevel is not None:
84
            level = logging.getLevelName(args_before.loglevel)
85
            logging.basicConfig(filename=self._logfilename, level=level)
86
87
        self._config = self._load_config(
88
            None if self._ignore_config else args_before.config
89
        )
90
91
        parser = argparse.ArgumentParser(parents=[self._root_parser])
92
93
        parser.add_argument(
94
            '--timeout',
95
            required=False,
96
            default=DEFAULT_TIMEOUT,
97
            type=int,
98
            help='Response timeout in seconds, or -1 to wait '
99
            'indefinitely (default: %(default)s)',
100
        )
101
        parser.add_argument(
102
            '--gmp-username',
103
            help='Username for GMP service (default: %(default)r)',
104
        )
105
        parser.add_argument(
106
            '--gmp-password',
107
            help='Password for GMP service (default: %(default)r)',
108
        )
109
        parser.add_argument(
110
            '-V',
111
            '--version',
112
            action='version',
113
            version='%(prog)s {version} (API version {apiversion})'.format(
114
                version=__version__, apiversion=__api_version__
115
            ),
116
            help='Show version information and exit',
117
        )
118
119
        subparsers = parser.add_subparsers(
120
            metavar='CONNECTION_TYPE',
121
            title='connections',
122
            description='valid connection types',
123
            help="Connection type to use",
124
        )
125
        subparsers.required = True
126
        subparsers.dest = 'connection_type'
127
128
        self._parser = parser
129
        self._subparsers = subparsers
130
131
        self._add_subparsers()
132
133
        self._parser.set_defaults(
134
            gmp_username=self._config.get('gmp', 'username', fallback=''),
135
            gmp_password=self._config.get('gmp', 'password', fallback=''),
136
            **self._config.defaults()
137
        )
138
139
        args = self._parser.parse_args(args)
140
141
        # If timeout value is -1, then the socket should have no timeout
142
        if args.timeout == -1:
143
            args.timeout = None
144
145
        logging.debug('Parsed arguments %r', args)
146
147
        return args
148
149
    def add_argument(self, *args, **kwargs):
150
        self._parser.add_argument(*args, **kwargs)
151
152
    def _load_config(self, configfile):
153
        config = configparser.ConfigParser(default_section='main')
154
155
        if not configfile:
156
            return config
157
158
        try:
159
            path = os.path.expanduser(configfile)
160
            config.read(path)
161
            logger.debug('Loaded config %s', configfile)
162
        except Exception as e:  # pylint: disable=broad-except
163
            raise RuntimeError(
164
                'Error while parsing config file {config}. Error was '
165
                '{message}'.format(config=configfile, message=e)
166
            )
167
168
        return config
169
170
    def _add_subparsers(self):
171
        parser_ssh = self._subparsers.add_parser(
172
            'ssh',
173
            help='Use SSH to connect to service',
174
            # parents=[self._root_parser],
175
        )
176
177
        parser_ssh.add_argument(
178
            '--hostname', required=True, help='Hostname or IP address'
179
        )
180
        parser_ssh.add_argument(
181
            '--port',
182
            required=False,
183
            help='SSH port (default: %(default)s)',
184
            type=int,
185
        )
186
        parser_ssh.add_argument(
187
            '--ssh-username', help='SSH username (default: %(default)r)'
188
        )
189
        parser_ssh.add_argument(
190
            '--ssh-password', help='SSH password (default: %(default)r)'
191
        )
192
193
        parser_ssh.set_defaults(
194
            port=int(self._config.get('ssh', 'port', fallback=22)),
195
            ssh_username=self._config.get('ssh', 'username', fallback='gmp'),
196
            ssh_password=self._config.get('ssh', 'password', fallback='gmp'),
197
        )
198
199
        parser_tls = self._subparsers.add_parser(
200
            'tls',
201
            help='Use TLS secured connection to connect to service',
202
            # parents=[self._root_parser],
203
        )
204
        parser_tls.add_argument(
205
            '--hostname', required=True, help='Hostname or IP address'
206
        )
207
        parser_tls.add_argument(
208
            '--port',
209
            required=False,
210
            help='GMP/OSP port (default: %(default)s)',
211
            type=int,
212
        )
213
        parser_tls.add_argument(
214
            '--certfile',
215
            required=False,
216
            help='Path to the certificate file for client authentication. '
217
            '(default: %(default)s)',
218
        )
219
        parser_tls.add_argument(
220
            '--keyfile',
221
            required=False,
222
            help='Path to key file for client authentication. '
223
            '(default: %(default)s)',
224
        )
225
        parser_tls.add_argument(
226
            '--cafile',
227
            required=False,
228
            help='Path to CA certificate for server authentication. '
229
            '(default: %(default)s)',
230
        )
231
        parser_tls.add_argument(
232
            '--no-credentials',
233
            required=False,
234
            default=False,
235
            action='store_true',
236
            help='Use only certificates for authentication',
237
        )
238
        parser_tls.set_defaults(
239
            port=int(
240
                self._config.get('tls', 'port', fallback=DEFAULT_GVM_PORT)
241
            ),
242
            certfile=self._config.get('tls', 'certfile', fallback=None),
243
            keyfile=self._config.get('tls', 'keyfile', fallback=None),
244
            cafile=self._config.get('tls', 'cafile', fallback=None),
245
        )
246
247
        parser_socket = self._subparsers.add_parser(
248
            'socket',
249
            help='Use UNIX Domain socket to connect to service',
250
            # parents=[self._root_parser],
251
        )
252
253
        socketpath_group = parser_socket.add_mutually_exclusive_group()
254
        socketpath_group.add_argument(
255
            '--sockpath',
256
            nargs='?',
257
            default=None,
258
            help='Deprecated, use --socketpath instead',
259
        )
260
        socketpath_group.add_argument(
261
            '--socketpath',
262
            nargs='?',
263
            help='Path to UNIX Domain socket (default: %(default)s)',
264
        )
265
266
        parser_socket.set_defaults(
267
            socketpath=self._config.get(
268
                'unixsocket', 'socketpath', fallback=DEFAULT_UNIX_SOCKET_PATH
269
            )
270
        )
271
272
    def add_protocol_argument(self):
273
        self.add_argument(
274
            '--protocol',
275
            required=False,
276
            default=DEFAULT_PROTOCOL,
277
            choices=[PROTOCOL_GMP, PROTOCOL_OSP],
278
            help='Service protocol to use (default: %(default)s)',
279
        )
280
281
282
def create_parser(description, logfilename):
283
    return CliParser(description, logfilename)
284
285
286
def create_connection(
287
    connection_type,
288
    socketpath=None,
289
    timeout=None,
290
    hostname=None,
291
    port=None,
292
    certfile=None,
293
    keyfile=None,
294
    cafile=None,
295
    ssh_username=None,
296
    ssh_password=None,
297
    **kwargs  # pylint: disable=unused-argument
298
):
299
    if 'socket' in connection_type:
300
        return UnixSocketConnection(timeout=timeout, path=socketpath)
301
302
    if 'tls' in connection_type:
303
        return TLSConnection(
304
            timeout=timeout,
305
            hostname=hostname,
306
            port=port,
307
            certfile=certfile,
308
            keyfile=keyfile,
309
            cafile=cafile,
310
        )
311
312
    return SSHConnection(
313
        timeout=timeout,
314
        hostname=hostname,
315
        port=port,
316
        username=ssh_username,
317
        password=ssh_password,
318
    )
319