Passed
Push — master ( 24cca1...b681a3 )
by Vlad
03:58
created

kibana_collector.Metrics._response_times()   A

Complexity

Conditions 1

Size

Total Lines 19
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 12
dl 0
loc 19
rs 9.8
c 0
b 0
f 0
cc 1
nop 1
1
from datetime import datetime
2
import logging
3
from typing import Iterator
4
5
from prometheus_client.core import InfoMetricFamily, StateSetMetricFamily, GaugeMetricFamily, Metric
6
7
from requests import get
8
from requests.exceptions import ConnectionError, Timeout, HTTPError, RequestException
9
10
from .helpers import TimestampGaugeMetricFamily, TimestampCounterMetricFamily, url_join
11
12
13
logger = logging.getLogger(__name__)
14
15
16
def datestring_to_timestamp(date_str: str) -> float:
17
    return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S.%f%z").timestamp()
18
19
20
def _info(info: dict) -> InfoMetricFamily:
21
    info = {k: str(v) for k, v in info.items()}
22
    return InfoMetricFamily("kibana_version", "Kibana Version", value=info)
23
24
25
def _status(status: dict) -> StateSetMetricFamily:
26
    status_dict = {state: state == status["overall"]["level"] for state in ["available", "degraded"]}
27
    status = StateSetMetricFamily("kibana_status", "Kibana Status", value=status_dict)
28
    return status
29
30
31
class Metrics(object):
32
    def __init__(self, metrics_dict: dict):
33
        self._timestamp = datestring_to_timestamp(metrics_dict["last_updated"])
34
        self._metrics_dict = metrics_dict
35
36
    def __iter__(self):
37
        yield from self._response_times()
38
        yield from self._requests()
39
        yield from self._process()
40
        yield from self._os()
41
42
    def _os(self) -> Iterator[Metric]:
43
        os_dict = self._metrics_dict["os"]
44
45
        yield from (
46
            TimestampGaugeMetricFamily(
47
                "kibana_os_load_%s" % key, "Kibana OS load %s" % key, value=value, timestamp=self._timestamp
48
            )
49
            for key, value in os_dict["load"].items()
50
        )
51
52
        yield from (
53
            TimestampGaugeMetricFamily(
54
                "kibana_os_memory_%s_bytes" % key.split("_")[0],
55
                "Kibana %s OS memory" % key.split("_")[0],
56
                value=value,
57
                timestamp=self._timestamp,
58
            )
59
            for key, value in os_dict["memory"].items()
60
        )
61
62
        yield TimestampCounterMetricFamily(
63
            "kibana_os_uptime_seconds",
64
            "Kibana OS uptime in seconds",
65
            value=os_dict["uptime_in_millis"] / 1000,
66
            timestamp=self._timestamp,
67
        )
68
69
    def _response_times(self) -> Iterator[Metric]:
70
        rt_dict = self._metrics_dict["response_times"]
71
72
        yield TimestampGaugeMetricFamily(
73
            "kibana_response_time_max_seconds",
74
            "Kibana maximum response time in seconds",
75
            value=rt_dict["max_in_millis"] / 1000,
76
            timestamp=self._timestamp,
77
        )
78
79
        # Kibana statistics lib can sometimes return NaN for this value.
80
        # If that is the case, don't output the time series.
81
        if avg_in_millis := rt_dict.get("avg_in_millis"):
82
            yield TimestampGaugeMetricFamily(
83
                "kibana_response_time_avg_seconds",
84
                "Kibana average response time in seconds",
85
                value=avg_in_millis / 1000,
86
                timestamp=self._timestamp,
87
            )
88
89
    def _requests(self) -> Iterator[Metric]:
90
        req_dict = self._metrics_dict["requests"]
91
        yield TimestampGaugeMetricFamily(
92
            "kibana_requests_total", "Total requests serviced", value=req_dict["total"], timestamp=self._timestamp
93
        )
94
95
        yield TimestampGaugeMetricFamily(
96
            "kibana_requests_disconnects",
97
            "Total requests disconnected",
98
            value=req_dict["disconnects"],
99
            timestamp=self._timestamp,
100
        )
101
102
        per_status = TimestampGaugeMetricFamily(
103
            "kibana_requests", "Total requests by status code", labels=["status_code"], timestamp=self._timestamp
104
        )
105
106
        for code, count in req_dict["status_codes"].items():
107
            per_status.add_metric(labels=[code], value=count)
108
109
        yield per_status
110
111
    def _process(self) -> Iterator[Metric]:
112
        process_dict = self._metrics_dict["process"]
113
114
        yield TimestampGaugeMetricFamily(
115
            "kibana_process_memory_heap_total_bytes",
116
            "Total heap size in bytes",
117
            value=process_dict["memory"]["heap"]["total_in_bytes"],
118
            timestamp=self._timestamp,
119
        )
120
        yield TimestampGaugeMetricFamily(
121
            "kibana_process_memory_heap_used_bytes",
122
            "Used heap size in bytes",
123
            value=process_dict["memory"]["heap"]["used_in_bytes"],
124
            timestamp=self._timestamp,
125
        )
126
127
        yield TimestampGaugeMetricFamily(
128
            "kibana_process_memory_heap_size_limit_bytes",
129
            "Heap size limit in bytes",
130
            value=process_dict["memory"]["heap"]["size_limit"],
131
            timestamp=self._timestamp,
132
        )
133
134
        yield TimestampGaugeMetricFamily(
135
            "kibana_process_memory_resident_set_size_bytes",
136
            "Memory resident set size",
137
            value=process_dict["memory"]["resident_set_size_in_bytes"],
138
            timestamp=self._timestamp,
139
        )
140
141
        yield TimestampCounterMetricFamily(
142
            "kibana_process_uptime_seconds",
143
            "Kibana process uptime in seconds",
144
            value=process_dict["uptime_in_millis"] / 1000,
145
            timestamp=self._timestamp,
146
        )
147
148
149
class KibanaCollector(object):
150
    def __init__(
151
        self,
152
        host: str,
153
        path: str = "/api/status",
154
        kibana_login: str = None,
155
        kibana_password: str = None,
156
        ignore_ssl: bool = False,
157
    ):
158
        self._url = url_join(host, path)
159
        self._kibana_login = kibana_login
160
        self._kibana_password = kibana_password
161
        self._ignore_ssl = ignore_ssl
162
163
    def _fetch_stats(self) -> dict:
164
        if self._kibana_login:
165
            auth = (self._kibana_login, self._kibana_password)
166
        else:
167
            auth = None
168
169
        if self._ignore_ssl is True:
170
            r = get(self._url, auth=auth, verify=not self._ignore_ssl)
171
        else:
172
            r = get(self._url, auth=auth)
173
174
        r.raise_for_status()
175
        return r.json()
176
177
    def collect(self):
178
        kibana_up = GaugeMetricFamily("kibana_node_reachable", "Kibana node was reached", value=0)
179
        try:
180
            stats = self._fetch_stats()
181
        except ConnectionError as e:
182
            logger.warning("Got a connection error while trying to contact Kibana:\n%s" % e)
183
        except Timeout as e:
184
            logger.warning("Got a timeout while trying to contact Kibana:\n%s" % e)
185
        except HTTPError as e:
186
            logger.warning("Got a HTTP error %s while trying to contact Kibana:\n%s" % (e.response.status_code, e))
187
        except RequestException as e:
188
            logger.warning("Got a RequestException while trying to contact Kibana:\n%s" % e)
189
        else:
190
            kibana_up = GaugeMetricFamily("kibana_node_reachable", "Kibana node was reached", value=1)
191
            yield _info(stats["version"])
192
            yield _status(stats["status"])
193
            yield from Metrics(stats["metrics"])
194
        finally:
195
            yield kibana_up
196