Completed
Push — master ( dc752d...7abd25 )
by
unknown
14s queued 11s
created

gvmtools.parser.CliParser._set_defaults()   A

Complexity

Conditions 1

Size

Total Lines 22
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 17
dl 0
loc 22
rs 9.55
c 0
b 0
f 0
cc 1
nop 2
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 - 2019 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
"""Command Line Interface Parser
20
"""
21
22
import argparse
23
import logging
24
25
from pathlib import Path
26
27
from gvm import get_version as get_gvm_version
28
from gvm.connections import (
29
    DEFAULT_TIMEOUT,
30
    SSHConnection,
31
    TLSConnection,
32
    UnixSocketConnection,
33
)
34
35
from gvmtools import get_version
36
from gvmtools.config import Config
37
38
logger = logging.getLogger(__name__)
39
40
__version__ = get_version()
41
__api_version__ = get_gvm_version()
42
43
DEFAULT_CONFIG_PATH = '~/.config/gvm-tools.conf'
44
45
PROTOCOL_OSP = 'OSP'
46
PROTOCOL_GMP = 'GMP'
47
DEFAULT_PROTOCOL = PROTOCOL_GMP
48
49
50
class CliParser:
51
    def __init__(
52
        self, description, logfilename, *, prog=None, ignore_config=False
53
    ):
54
        bootstrap_parser = argparse.ArgumentParser(
55
            prog=prog,
56
            description=description,
57
            formatter_class=argparse.RawTextHelpFormatter,
58
            # don't parse help initially. the args from parser wouldn't be shown
59
            add_help=False,
60
        )
61
62
        bootstrap_parser.add_argument(
63
            '-c',
64
            '--config',
65
            nargs='?',
66
            default=DEFAULT_CONFIG_PATH,
67
            help='Configuration file path (default: %(default)s)',
68
        )
69
        bootstrap_parser.add_argument(
70
            '--log',
71
            nargs='?',
72
            dest='loglevel',
73
            const='INFO',
74
            choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
75
            help='Activate logging (default level: %(default)s)',
76
        )
77
78
        parser = argparse.ArgumentParser(prog=prog, parents=[bootstrap_parser])
79
80
        parser.add_argument(
81
            '--timeout',
82
            required=False,
83
            default=DEFAULT_TIMEOUT,
84
            type=int,
85
            help='Response timeout in seconds, or -1 to wait '
86
            'indefinitely (default: %(default)s)',
87
        )
88
        parser.add_argument(
89
            '--gmp-username',
90
            help='Username for GMP service (default: %(default)r)',
91
        )
92
        parser.add_argument(
93
            '--gmp-password',
94
            help='Password for GMP service (default: %(default)r)',
95
        )
96
        parser.add_argument(
97
            '-V',
98
            '--version',
99
            action='version',
100
            version='%(prog)s {version} (API version {apiversion})'.format(
101
                version=__version__, apiversion=__api_version__
102
            ),
103
            help='Show version information and exit',
104
        )
105
106
        subparsers = parser.add_subparsers(
107
            metavar='CONNECTION_TYPE',
108
            title='connections',
109
            description='valid connection types',
110
            help="Connection type to use",
111
        )
112
        subparsers.required = True
113
        subparsers.dest = 'connection_type'
114
115
        self._subparsers = subparsers
116
117
        self._parser = parser
118
        self._bootstrap_parser = bootstrap_parser
119
120
        self._logfilename = logfilename
121
        self._ignore_config = ignore_config
122
123
        self._add_subparsers()
124
125
    def parse_args(self, args=None):
126
        args, unkown_args = self.parse_known_args(args)
127
        if unkown_args:
128
            self._parser.error(
129
                'unrecognized arguments {}'.format(' '.join(unkown_args))
130
            )
131
        return args
132
133
    def parse_known_args(self, args=None):
134
        args_before, _ = self._bootstrap_parser.parse_known_args(args)
135
136
        if args_before.loglevel is not None:
137
            level = logging.getLevelName(args_before.loglevel)
138
            logging.basicConfig(filename=self._logfilename, level=level)
139
140
        self._set_defaults(None if self._ignore_config else args_before.config)
141
142
        args, unknown_args = self._parser.parse_known_args(args)
143
144
        # If timeout value is -1, then the socket should have no timeout
145
        if args.timeout == -1:
146
            args.timeout = None
147
148
        logging.debug('Parsed arguments %r', args)
149
150
        return args, unknown_args
151
152
    def add_argument(self, *args, **kwargs):
153
        self._parser_socket.add_argument(*args, **kwargs)
154
        self._parser_ssh.add_argument(*args, **kwargs)
155
        self._parser_tls.add_argument(*args, **kwargs)
156
157
    def add_protocol_argument(self):
158
        self._parser.add_argument(
159
            '--protocol',
160
            required=False,
161
            default=DEFAULT_PROTOCOL,
162
            choices=[PROTOCOL_GMP, PROTOCOL_OSP],
163
            help='Service protocol to use (default: %(default)s)',
164
        )
165
166
    def _load_config(self, configfile):
167
        config = Config()
168
169
        if not configfile:
170
            return config
171
172
        configpath = Path(configfile)
173
174
        try:
175
            if not configpath.expanduser().resolve().exists():
176
                logger.debug('Ignoring non existing config file %s', configfile)
177
                return config
178
        except FileNotFoundError:
179
            # we are on python 3.5 and Path.resolve raised a FileNotFoundError
180
            logger.debug('Ignoring non existing config file %s', configfile)
181
            return config
182
183
        try:
184
            config.load(configpath)
185
            logger.debug('Loaded config %s', configfile)
186
        except Exception as e:  # pylint: disable=broad-except
187
            raise RuntimeError(
188
                'Error while parsing config file {config}. Error was '
189
                '{message}'.format(config=configfile, message=e)
190
            )
191
192
        return config
193
194
    def _add_subparsers(self):
195
        parser_ssh = self._subparsers.add_parser(
196
            'ssh', help='Use SSH to connect to service'
197
        )
198
199
        parser_ssh.add_argument(
200
            '--hostname', required=True, help='Hostname or IP address'
201
        )
202
        parser_ssh.add_argument(
203
            '--port',
204
            required=False,
205
            help='SSH port (default: %(default)s)',
206
            type=int,
207
        )
208
        parser_ssh.add_argument(
209
            '--ssh-username', help='SSH username (default: %(default)r)'
210
        )
211
        parser_ssh.add_argument(
212
            '--ssh-password', help='SSH password (default: %(default)r)'
213
        )
214
215
        parser_tls = self._subparsers.add_parser(
216
            'tls', help='Use TLS secured connection to connect to service'
217
        )
218
        parser_tls.add_argument(
219
            '--hostname', required=True, help='Hostname or IP address'
220
        )
221
        parser_tls.add_argument(
222
            '--port',
223
            required=False,
224
            help='GMP/OSP port (default: %(default)s)',
225
            type=int,
226
        )
227
        parser_tls.add_argument(
228
            '--certfile',
229
            required=False,
230
            help='Path to the certificate file for client authentication. '
231
            '(default: %(default)s)',
232
        )
233
        parser_tls.add_argument(
234
            '--keyfile',
235
            required=False,
236
            help='Path to key file for client authentication. '
237
            '(default: %(default)s)',
238
        )
239
        parser_tls.add_argument(
240
            '--cafile',
241
            required=False,
242
            help='Path to CA certificate for server authentication. '
243
            '(default: %(default)s)',
244
        )
245
        parser_tls.add_argument(
246
            '--no-credentials',
247
            required=False,
248
            default=False,
249
            action='store_true',
250
            help='Use only certificates for authentication',
251
        )
252
253
        parser_socket = self._subparsers.add_parser(
254
            'socket', help='Use UNIX Domain socket to connect to service'
255
        )
256
257
        socketpath_group = parser_socket.add_mutually_exclusive_group()
258
        socketpath_group.add_argument(
259
            '--sockpath',
260
            nargs='?',
261
            default=None,
262
            help='Deprecated, use --socketpath instead',
263
        )
264
        socketpath_group.add_argument(
265
            '--socketpath',
266
            nargs='?',
267
            help='Path to UNIX Domain socket (default: %(default)s)',
268
        )
269
270
        self._parser_ssh = parser_ssh
271
        self._parser_socket = parser_socket
272
        self._parser_tls = parser_tls
273
274
    def _set_defaults(self, configfilename=None):
275
        self._config = self._load_config(configfilename)
276
277
        self._parser.set_defaults(
278
            gmp_username=self._config.get('gmp', 'username'),
279
            gmp_password=self._config.get('gmp', 'password'),
280
            **self._config.defaults()
281
        )
282
283
        self._parser_ssh.set_defaults(
284
            port=int(self._config.get('ssh', 'port')),
285
            ssh_username=self._config.get('ssh', 'username'),
286
            ssh_password=self._config.get('ssh', 'password'),
287
        )
288
        self._parser_tls.set_defaults(
289
            port=int(self._config.get('tls', 'port')),
290
            certfile=self._config.get('tls', 'certfile'),
291
            keyfile=self._config.get('tls', 'keyfile'),
292
            cafile=self._config.get('tls', 'cafile'),
293
        )
294
        self._parser_socket.set_defaults(
295
            socketpath=self._config.get('unixsocket', 'socketpath')
296
        )
297
298
299
def create_parser(description, logfilename):
300
    return CliParser(description, logfilename)
301
302
303
def create_connection(
304
    connection_type,
305
    socketpath=None,
306
    timeout=None,
307
    hostname=None,
308
    port=None,
309
    certfile=None,
310
    keyfile=None,
311
    cafile=None,
312
    ssh_username=None,
313
    ssh_password=None,
314
    **kwargs  # pylint: disable=unused-argument
315
):
316
    if 'socket' in connection_type:
317
        return UnixSocketConnection(timeout=timeout, path=socketpath)
318
319
    if 'tls' in connection_type:
320
        return TLSConnection(
321
            timeout=timeout,
322
            hostname=hostname,
323
            port=port,
324
            certfile=certfile,
325
            keyfile=keyfile,
326
            cafile=cafile,
327
        )
328
329
    return SSHConnection(
330
        timeout=timeout,
331
        hostname=hostname,
332
        port=port,
333
        username=ssh_username,
334
        password=ssh_password,
335
    )
336