Passed
Push — develop ( 38fd55...d2cab0 )
by Vlad
03:38
created

kibana_collector   A

Complexity

Total Complexity 20

Size/Duplication

Total Lines 199
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 20
eloc 139
dl 0
loc 199
rs 10
c 0
b 0
f 0

9 Methods

Rating   Name   Duplication   Size   Complexity  
A Metrics.__iter__() 0 5 1
A Metrics.__init__() 0 3 1
A Metrics._response_times() 0 19 1
A Metrics._os() 0 25 1
A Metrics._requests() 0 21 2
A Metrics._process() 0 35 1
A KibanaCollector.__init__() 0 12 1
B KibanaCollector.collect() 0 19 6
A KibanaCollector._fetch_stats() 0 13 3

3 Functions

Rating   Name   Duplication   Size   Complexity  
A datestring_to_timestamp() 0 2 1
A _info() 0 3 1
A _status() 0 6 1
1
from datetime import datetime
2
import logging
3
from typing import Iterator
4
5
from prometheus_client.core import InfoMetricFamily, StateSetMetricFamily, GaugeMetricFamily, Metric
6
7
from requests import get
8
from requests.exceptions import ConnectionError, Timeout, HTTPError, RequestException
9
10
from .helpers import TimestampGaugeMetricFamily, TimestampCounterMetricFamily, url_join
11
12
13
logger = logging.getLogger(__name__)
14
15
16
def datestring_to_timestamp(date_str: str) -> float:
17
    return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S.%f%z").timestamp()
18
19
20
def _info(info: dict) -> InfoMetricFamily:
21
    info = {k: str(v) for k, v in info.items()}
22
    return InfoMetricFamily("kibana_version", "Kibana Version", value=info)
23
24
25
def _status(status: dict) -> (StateSetMetricFamily, GaugeMetricFamily):
26
    status_dict = {state: state == status["overall"]["state"] for state in ["red", "yellow", "green"]}
27
    since = datestring_to_timestamp(status["overall"]["since"])
28
    status = StateSetMetricFamily("kibana_status", "Kibana Status", value=status_dict)
29
    since = GaugeMetricFamily("kibana_status_since", "Last change of status, in seconds since epoch", value=since)
30
    return status, since
31
32
33
class Metrics(object):
34
    def __init__(self, metrics_dict: dict):
35
        self._timestamp = datestring_to_timestamp(metrics_dict["last_updated"])
36
        self._metrics_dict = metrics_dict
37
38
    def __iter__(self):
39
        yield from self._response_times()
40
        yield from self._requests()
41
        yield from self._process()
42
        yield from self._os()
43
44
    def _os(self) -> Iterator[Metric]:
45
        os_dict = self._metrics_dict["os"]
46
47
        yield from (
48
            TimestampGaugeMetricFamily(
49
                "kibana_os_load_%s" % key, "Kibana OS load %s" % key, value=value, timestamp=self._timestamp
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable key does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable value does not seem to be defined.
Loading history...
50
            )
51
            for key, value in os_dict["load"].items()
52
        )
53
54
        yield from (
55
            TimestampGaugeMetricFamily(
56
                "kibana_os_memory_%s_bytes" % key.split("_")[0],
57
                "Kibana %s OS memory" % key.split("_")[0],
58
                value=value,
59
                timestamp=self._timestamp,
60
            )
61
            for key, value in os_dict["memory"].items()
62
        )
63
64
        yield TimestampCounterMetricFamily(
65
            "kibana_os_uptime_seconds",
66
            "Kibana OS uptime in seconds",
67
            value=os_dict["uptime_in_millis"] / 1000,
68
            timestamp=self._timestamp,
69
        )
70
71
    def _response_times(self) -> Iterator[Metric]:
72
        rt_dict = self._metrics_dict["response_times"]
73
74
        yield TimestampGaugeMetricFamily(
75
            "kibana_response_time_max_seconds",
76
            "Kibana maximum response time in seconds",
77
            value=rt_dict["max_in_millis"] / 1000,
78
            timestamp=self._timestamp,
79
        )
80
81
        # Kibana statistics lib can sometimes return NaN for this value.
82
        # If that is the case, this is set to 0 in order to avoid gaps in the time series.
83
        # Reference: https://github.com/elastic/kibana/blob/6.7/src/server/status/lib/metrics.js#L73
84
        # NaN is converted to `undefined` which then has the whole field removed from the response JSON
85
        yield TimestampGaugeMetricFamily(
86
            "kibana_response_time_avg_seconds",
87
            "Kibana average response time in seconds",
88
            value=rt_dict.setdefault("avg_in_millis", 0) / 1000,
89
            timestamp=self._timestamp,
90
        )
91
92
    def _requests(self) -> Iterator[Metric]:
93
        req_dict = self._metrics_dict["requests"]
94
        yield TimestampGaugeMetricFamily(
95
            "kibana_requests_total", "Total requests serviced", value=req_dict["total"], timestamp=self._timestamp
96
        )
97
98
        yield TimestampGaugeMetricFamily(
99
            "kibana_requests_disconnects",
100
            "Total requests disconnected",
101
            value=req_dict["disconnects"],
102
            timestamp=self._timestamp,
103
        )
104
105
        per_status = TimestampGaugeMetricFamily(
106
            "kibana_requests", "Total requests by status code", labels=["status_code"], timestamp=self._timestamp
107
        )
108
109
        for code, count in req_dict["status_codes"].items():
110
            per_status.add_metric(labels=[code], value=count)
111
112
        yield per_status
113
114
    def _process(self) -> Iterator[Metric]:
115
        process_dict = self._metrics_dict["process"]
116
117
        yield TimestampGaugeMetricFamily(
118
            "kibana_process_memory_heap_total_bytes",
119
            "Total heap size in bytes",
120
            value=process_dict["memory"]["heap"]["total_in_bytes"],
121
            timestamp=self._timestamp,
122
        )
123
        yield TimestampGaugeMetricFamily(
124
            "kibana_process_memory_heap_used_bytes",
125
            "Used heap size in bytes",
126
            value=process_dict["memory"]["heap"]["used_in_bytes"],
127
            timestamp=self._timestamp,
128
        )
129
130
        yield TimestampGaugeMetricFamily(
131
            "kibana_process_memory_heap_size_limit_bytes",
132
            "Heap size limit in bytes",
133
            value=process_dict["memory"]["heap"]["size_limit"],
134
            timestamp=self._timestamp,
135
        )
136
137
        yield TimestampGaugeMetricFamily(
138
            "kibana_process_memory_resident_set_size_bytes",
139
            "Memory resident set size",
140
            value=process_dict["memory"]["resident_set_size_in_bytes"],
141
            timestamp=self._timestamp,
142
        )
143
144
        yield TimestampCounterMetricFamily(
145
            "kibana_process_uptime_seconds",
146
            "Kibana process uptime in seconds",
147
            value=process_dict["uptime_in_millis"] / 1000,
148
            timestamp=self._timestamp,
149
        )
150
151
152
class KibanaCollector(object):
153
    def __init__(
154
        self,
155
        host: str,
156
        path: str = "/api/status",
157
        kibana_login: str = None,
158
        kibana_password: str = None,
159
        ignore_ssl: bool = False,
160
    ):
161
        self._url = url_join(host, path)
162
        self._kibana_login = kibana_login
163
        self._kibana_password = kibana_password
164
        self._ignore_ssl = ignore_ssl
165
166
    def _fetch_stats(self) -> dict:
167
        if self._kibana_login:
168
            auth = (self._kibana_login, self._kibana_password)
169
        else:
170
            auth = None
171
172
        if self._ignore_ssl == True:
173
            r = get(self._url, auth=auth, verify=not self._ignore_ssl)
174
        else:
175
            r = get(self._url, auth=auth)
176
177
        r.raise_for_status()
178
        return r.json()
179
180
    def collect(self):
181
        kibana_up = GaugeMetricFamily("kibana_node_reachable", "Kibana node was reached", value=0)
182
        try:
183
            stats = self._fetch_stats()
184
        except ConnectionError as e:
185
            logger.warning("Got a connection error while trying to contact Kibana:\n%s" % e)
186
        except Timeout as e:
187
            logger.warning("Got a timeout while trying to contact Kibana:\n%s" % e)
188
        except HTTPError as e:
189
            logger.warning("Got a HTTP error %s while trying to contact Kibana:\n%s" % (e.response.status_code, e))
190
        except RequestException as e:
191
            logger.warning("Got a RequestException while trying to contact Kibana:\n%s" % e)
192
        else:
193
            kibana_up = GaugeMetricFamily("kibana_node_reachable", "Kibana node was reached", value=1)
194
            yield _info(stats["version"])
195
            yield from _status(stats["status"])
196
            yield from Metrics(stats["metrics"])
197
        finally:
198
            yield kibana_up
199