|
1
|
|
|
import logging |
|
2
|
|
|
import os |
|
3
|
|
|
from urllib.parse import urlsplit |
|
4
|
|
|
|
|
5
|
|
|
from requests.utils import get_netrc_auth |
|
6
|
|
|
|
|
7
|
|
|
from ._version import VERSION |
|
8
|
|
|
|
|
9
|
|
|
DEFAULT_PORT = 9563 |
|
10
|
|
|
|
|
11
|
|
|
LOG_LEVELS = ["DEBUG", "INFO", "WARN", "WARNING", "ERROR", "CRITICAL", "FATAL"] |
|
12
|
|
|
|
|
13
|
|
|
|
|
14
|
|
|
class Config: |
|
15
|
|
|
def __init__(self): |
|
16
|
|
|
kibana_url = os.getenv("KIBANA_URL") |
|
17
|
|
|
listen_port = os.getenv("LISTEN_PORT", DEFAULT_PORT) |
|
18
|
|
|
log_level = os.getenv("LOG_LEVEL", "INFO") |
|
19
|
|
|
kibana_login = os.getenv("KIBANA_LOGIN") |
|
20
|
|
|
kibana_password = os.getenv("KIBANA_PASSWORD") |
|
21
|
|
|
ignore_ssl = os.getenv("IGNORE_SSL", "FALSE") |
|
22
|
|
|
requests_ca_bundle = os.getenv("REQUESTS_CA_BUNDLE", None) |
|
23
|
|
|
|
|
24
|
|
|
self.version = VERSION |
|
25
|
|
|
self.log_level = _check_log_level(log_level) |
|
26
|
|
|
logging.basicConfig(level=self.log_level) |
|
27
|
|
|
self.kibana_url = _check_url(kibana_url) |
|
28
|
|
|
self.listen_port = _check_port(listen_port) |
|
29
|
|
|
self.kibana_login = kibana_login |
|
30
|
|
|
self.kibana_password = kibana_password |
|
31
|
|
|
self.ignore_ssl = _check_ssl(ignore_ssl) |
|
32
|
|
|
self.requests_ca_bundle = _check_bundle(requests_ca_bundle) |
|
33
|
|
|
|
|
34
|
|
|
if not self.kibana_url: |
|
35
|
|
|
raise ValueError("The Kibana URL cannot be empty.") |
|
36
|
|
|
|
|
37
|
|
|
def description(self): |
|
38
|
|
|
config_list = [ |
|
39
|
|
|
("Listen port:", self.listen_port), |
|
40
|
|
|
("Log level:", logging.getLevelName(self.log_level)), |
|
41
|
|
|
("Kibana URL:", self.kibana_url), |
|
42
|
|
|
] |
|
43
|
|
|
# check if netrc is available |
|
44
|
|
|
netrc_auth = get_netrc_auth(self.kibana_url) |
|
45
|
|
|
if netrc_auth: |
|
46
|
|
|
config_list.append(("Kibana login (from netrc):", netrc_auth[0])) |
|
47
|
|
|
config_list.append(("Kibana password (from netrc):", "***")) |
|
48
|
|
|
elif self.kibana_login: |
|
49
|
|
|
config_list.append(("Kibana login:", self.kibana_login)) |
|
50
|
|
|
config_list.append(("Kibana password:", "***")) |
|
51
|
|
|
|
|
52
|
|
|
if self.ignore_ssl: |
|
53
|
|
|
config_list.append(("SSL verification:", "disabled")) |
|
54
|
|
|
else: |
|
55
|
|
|
config_list.append(("SSL verification:", "enabled")) |
|
56
|
|
|
|
|
57
|
|
|
if self.requests_ca_bundle: |
|
58
|
|
|
config_list.append(("Requests CA bundle path:", self.requests_ca_bundle)) |
|
59
|
|
|
|
|
60
|
|
|
max_length = max(map(lambda x: len(x[0]), config_list)) |
|
61
|
|
|
desc = "== CONFIGURATION ==\n" |
|
62
|
|
|
line_template = "%-" + str(max_length) + "s\t%s\n" |
|
63
|
|
|
for line in config_list: |
|
64
|
|
|
desc += line_template % line |
|
65
|
|
|
return desc |
|
66
|
|
|
|
|
67
|
|
|
|
|
68
|
|
|
def _check_bundle(requests_ca_bundle: str) -> str: |
|
69
|
|
|
if requests_ca_bundle and not os.path.isfile(requests_ca_bundle): |
|
70
|
|
|
raise ValueError("REQUESTS_CA_BUNDLE should point to existing certficate") |
|
71
|
|
|
return requests_ca_bundle |
|
72
|
|
|
|
|
73
|
|
|
|
|
74
|
|
|
def _check_url(url: str) -> str: |
|
75
|
|
|
split_url = urlsplit(url) |
|
76
|
|
|
if not all(split_url[:2]): |
|
77
|
|
|
raise ValueError("URL is malformed.") |
|
78
|
|
|
try: |
|
79
|
|
|
split_url.port |
|
80
|
|
|
except ValueError as e: |
|
81
|
|
|
raise ValueError("URL is malformed: %s" % e) |
|
82
|
|
|
return url |
|
83
|
|
|
|
|
84
|
|
|
|
|
85
|
|
|
def _check_ssl(ignore_ssl: str) -> bool: |
|
86
|
|
|
if ignore_ssl.upper() == "TRUE": |
|
87
|
|
|
return True |
|
88
|
|
|
elif ignore_ssl.upper() == "FALSE": |
|
89
|
|
|
return False |
|
90
|
|
|
else: |
|
91
|
|
|
raise ValueError("IGNORE_SSL should be `True` or `False`") |
|
92
|
|
|
|
|
93
|
|
|
|
|
94
|
|
|
def _check_port(port: str) -> int: |
|
95
|
|
|
if type(port) not in (str, int): |
|
96
|
|
|
raise ValueError("Listen port must be an integer. Got: %s" % (port,)) |
|
97
|
|
|
try: |
|
98
|
|
|
# Avoid converting types that can be represented as an int but are not int-like, such as IPs |
|
99
|
|
|
port = int(port) |
|
100
|
|
|
except (OverflowError, TypeError, ValueError) as e: |
|
101
|
|
|
raise ValueError("Listen port must be an integer: %s" % e) |
|
102
|
|
|
if 0 <= port <= 65535: |
|
103
|
|
|
return port |
|
104
|
|
|
else: |
|
105
|
|
|
raise ValueError("Listen port must be between 1 and 65535") |
|
106
|
|
|
|
|
107
|
|
|
|
|
108
|
|
|
def _check_log_level(log_level: str) -> int: |
|
109
|
|
|
try: |
|
110
|
|
|
return getattr(logging, log_level.upper()) |
|
111
|
|
|
except (AttributeError, TypeError): |
|
112
|
|
|
raise ValueError("Invalid log level: %s. Must be one of %s." % (", ".join(LOG_LEVELS), log_level)) |
|
113
|
|
|
|