Passed
Pull Request — master (#8)
by
unknown
01:09
created

config._check_url()   A

Complexity

Conditions 3

Size

Total Lines 9
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 9
dl 0
loc 9
rs 9.95
c 0
b 0
f 0
cc 3
nop 1
1
import logging
2
import os
3
from urllib.parse import urlsplit
4
5
from requests.utils import get_netrc_auth
6
7
from _version import VERSION
8
9
DEFAULT_PORT = 9563
10
11
LOG_LEVELS = ["DEBUG", "INFO", "WARN", "WARNING", "ERROR", "CRITICAL", "FATAL"]
12
13
14
class Config:
15
    def __init__(self):
16
        kibana_url = os.getenv("KIBANA_URL")
17
        listen_port = os.getenv("LISTEN_PORT", DEFAULT_PORT)
18
        log_level = os.getenv("LOG_LEVEL", "INFO")
19
        cert_path = os.getenv("CERT_PATH", "true")
20
        kibana_login = os.getenv("KIBANA_LOGIN")
21
        kibana_password = os.getenv("KIBANA_PASSWORD")
22
23
        self.version = VERSION
24
        self.log_level = _check_log_level(log_level)
25
        logging.basicConfig(level=self.log_level)
26
        self.kibana_url = _check_url(kibana_url)
27
        self.listen_port = _check_port(listen_port)
28
        self.cert_path = _check_cert(cert_path)
29
        self.kibana_login = kibana_login
30
        self.kibana_password = kibana_password
31
32
        if not self.kibana_url:
33
            raise ValueError("The Kibana URL cannot be empty.")
34
35
    def description(self):
36
        config_list = [
37
            ("Listen port:", self.listen_port),
38
            ("Log level:", self.log_level),
39
            ("Kibana URL:", self.kibana_url),
40
        ]
41
        # check if netrc is available
42
        netrc_auth = get_netrc_auth(self.kibana_url)
43
        if netrc_auth:
44
            config_list.append(("Kibana login (from netrc):", netrc_auth[0]))
45
            config_list.append(("Kibana password (from netrc):", "***"))
46
        elif self.kibana_login:
47
            config_list.append(("Kibana login:", self.kibana_login))
48
            config_list.append(("Kibana password:", "***"))
49
50
        if os.path.isfile(self.cert_path):
51
            config_list.append(("Cert path:", self.cert_path))
52
        elif self.cert_path == "false":
53
            config_list.append("SSL verification disabled")
54
        else:
55
            config_list.append("SSL verification enabled")
56
57
        max_length = max(map(lambda x: len(x[0]), config_list))
58
        desc = "== CONFIGURATION ==\n"
59
        line_template = "%-" + str(max_length) + "s\t%s\n"
60
        for line in config_list:
61
            desc += line_template % line
62
        return desc
63
64
65
def _check_url(url: str) -> str:
66
    split_url = urlsplit(url)
67
    if not all(split_url[:2]):
68
        raise ValueError("URL is malformed.")
69
    try:
70
        split_url.port
71
    except ValueError as e:
72
        raise ValueError("URL is malformed: %s" % e)
73
    return url
74
75
76
def _check_cert(path: str) -> str:
77
    if path == "true":
78
        return True
79
    elif path == "false":
80
        return False
81
    else:
82
        if not os.path.isfile(path):
83
            raise ValueError("Cert file must exists")
84
        return path
85
86
87
def _check_port(port: str) -> int:
88
    if type(port) not in (str, int):
89
        raise ValueError("Listen port must be an integer. Got: %s" % (port,))
90
    try:
91
        # Avoid converting types that can be represented as an int but are not int-like, such as IPs
92
        port = int(port)
93
    except (OverflowError, TypeError, ValueError) as e:
94
        raise ValueError("Listen port must be an integer: %s" % e)
95
    if 0 <= port <= 65535:
96
        return port
97
    else:
98
        raise ValueError("Listen port must be between 1 and 65535")
99
100
101
def _check_log_level(log_level: str) -> int:
102
    try:
103
        return getattr(logging, log_level.upper())
104
    except (AttributeError, TypeError):
105
        raise ValueError("Invalid log level: %s. Must be one of %s." % (", ".join(LOG_LEVELS), log_level))
106