1
|
|
|
import logging |
2
|
|
|
import os |
3
|
|
|
from urllib.parse import urlsplit |
4
|
|
|
|
5
|
|
|
from requests.utils import get_netrc_auth |
6
|
|
|
|
7
|
|
|
from ._version import VERSION |
8
|
|
|
|
9
|
|
|
DEFAULT_PORT = 9563 |
10
|
|
|
|
11
|
|
|
LOG_LEVELS = ["DEBUG", "INFO", "WARN", "WARNING", "ERROR", "CRITICAL", "FATAL"] |
12
|
|
|
|
13
|
|
|
|
14
|
|
|
class Config: |
15
|
|
|
def __init__(self): |
16
|
|
|
kibana_url = os.getenv("KIBANA_URL") |
17
|
|
|
listen_port = os.getenv("LISTEN_PORT", DEFAULT_PORT) |
18
|
|
|
log_level = os.getenv("LOG_LEVEL", "INFO") |
19
|
|
|
kibana_login = os.getenv("KIBANA_LOGIN") |
20
|
|
|
kibana_password = os.getenv("KIBANA_PASSWORD") |
21
|
|
|
ignore_ssl = os.getenv("IGNORE_SSL", "FALSE") |
22
|
|
|
requests_ca_bundle = os.getenv("REQUESTS_CA_BUNDLE", None) |
23
|
|
|
|
24
|
|
|
self.version = VERSION |
25
|
|
|
self.log_level = _check_log_level(log_level) |
26
|
|
|
logging.basicConfig(level=self.log_level) |
27
|
|
|
self.kibana_url = _check_url(kibana_url) |
28
|
|
|
self.listen_port = _check_port(listen_port) |
29
|
|
|
self.kibana_login = kibana_login |
30
|
|
|
self.kibana_password = kibana_password |
31
|
|
|
self.ignore_ssl = _check_ssl(ignore_ssl) |
32
|
|
|
self.requests_ca_bundle = _check_bundle(requests_ca_bundle) |
33
|
|
|
|
34
|
|
|
if not self.kibana_url: |
35
|
|
|
raise ValueError("The Kibana URL cannot be empty.") |
36
|
|
|
|
37
|
|
|
def description(self): |
38
|
|
|
config_list = [ |
39
|
|
|
("Listen port:", self.listen_port), |
40
|
|
|
("Log level:", logging.getLevelName(self.log_level)), |
41
|
|
|
("Kibana URL:", self.kibana_url), |
42
|
|
|
] |
43
|
|
|
# check if netrc is available |
44
|
|
|
netrc_auth = get_netrc_auth(self.kibana_url) |
45
|
|
|
if netrc_auth: |
46
|
|
|
config_list.append(("Kibana login (from netrc):", netrc_auth[0])) |
47
|
|
|
config_list.append(("Kibana password (from netrc):", "***")) |
48
|
|
|
elif self.kibana_login: |
49
|
|
|
config_list.append(("Kibana login:", self.kibana_login)) |
50
|
|
|
config_list.append(("Kibana password:", "***")) |
51
|
|
|
|
52
|
|
|
if self.ignore_ssl: |
53
|
|
|
config_list.append(("SSL verification:", "disabled")) |
54
|
|
|
else: |
55
|
|
|
config_list.append(("SSL verification:", "enabled")) |
56
|
|
|
|
57
|
|
|
if self.requests_ca_bundle: |
58
|
|
|
config_list.append(("Requests CA bundle path:", self.requests_ca_bundle)) |
59
|
|
|
|
60
|
|
|
max_length = max(map(lambda x: len(x[0]), config_list)) |
61
|
|
|
desc = "== CONFIGURATION ==\n" |
62
|
|
|
line_template = "%-" + str(max_length) + "s\t%s\n" |
63
|
|
|
for line in config_list: |
64
|
|
|
desc += line_template % line |
65
|
|
|
return desc |
66
|
|
|
|
67
|
|
|
|
68
|
|
|
def _check_bundle(requests_ca_bundle: str) -> str: |
69
|
|
|
if requests_ca_bundle and not os.path.isfile(requests_ca_bundle): |
70
|
|
|
raise ValueError("REQUESTS_CA_BUNDLE should point to existing certficate") |
71
|
|
|
return requests_ca_bundle |
72
|
|
|
|
73
|
|
|
|
74
|
|
|
def _check_url(url: str) -> str: |
75
|
|
|
split_url = urlsplit(url) |
76
|
|
|
if not all(split_url[:2]): |
77
|
|
|
raise ValueError("URL is malformed.") |
78
|
|
|
try: |
79
|
|
|
split_url.port |
80
|
|
|
except ValueError as e: |
81
|
|
|
raise ValueError("URL is malformed: %s" % e) |
82
|
|
|
return url |
83
|
|
|
|
84
|
|
|
|
85
|
|
|
def _check_ssl(ignore_ssl: str) -> bool: |
86
|
|
|
if ignore_ssl.upper() == "TRUE": |
87
|
|
|
return True |
88
|
|
|
elif ignore_ssl.upper() == "FALSE": |
89
|
|
|
return False |
90
|
|
|
else: |
91
|
|
|
raise ValueError("IGNORE_SSL should be `True` or `False`") |
92
|
|
|
|
93
|
|
|
|
94
|
|
|
def _check_port(port: str) -> int: |
95
|
|
|
if type(port) not in (str, int): |
96
|
|
|
raise ValueError("Listen port must be an integer. Got: %s" % (port,)) |
97
|
|
|
try: |
98
|
|
|
# Avoid converting types that can be represented as an int but are not int-like, such as IPs |
99
|
|
|
port = int(port) |
100
|
|
|
except (OverflowError, TypeError, ValueError) as e: |
101
|
|
|
raise ValueError("Listen port must be an integer: %s" % e) |
102
|
|
|
if 0 <= port <= 65535: |
103
|
|
|
return port |
104
|
|
|
else: |
105
|
|
|
raise ValueError("Listen port must be between 1 and 65535") |
106
|
|
|
|
107
|
|
|
|
108
|
|
|
def _check_log_level(log_level: str) -> int: |
109
|
|
|
try: |
110
|
|
|
return getattr(logging, log_level.upper()) |
111
|
|
|
except (AttributeError, TypeError): |
112
|
|
|
raise ValueError("Invalid log level: %s. Must be one of %s." % (", ".join(LOG_LEVELS), log_level)) |
113
|
|
|
|