Passed
Push — master ( 4c4a55...2073cf )
by Vlad
01:31
created

kibana_collector.Metrics._process()   A

Complexity

Conditions 1

Size

Total Lines 26
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 22
dl 0
loc 26
rs 9.352
c 0
b 0
f 0
cc 1
nop 1
1
from datetime import datetime
2
import logging
3
from typing import Iterator
4
5
from prometheus_client.core import InfoMetricFamily, StateSetMetricFamily, GaugeMetricFamily, Metric
6
7
from requests import get
8
from requests.compat import urljoin
9
from requests.exceptions import ConnectionError, Timeout, HTTPError, RequestException
10
11
from helpers import TimestampGaugeMetricFamily, TimestampCounterMetricFamily
12
13
14
logger = logging.getLogger(__name__)
15
16
17
def datestring_to_timestamp(date_str: str) -> float:
18
    return datetime.strptime(date_str, '%Y-%m-%dT%H:%M:%S.%f%z').timestamp()
19
20
21
def _info(info: dict) -> InfoMetricFamily:
22
    info = {k: str(v) for k, v in info.items()}
23
    return InfoMetricFamily('kibana_version', 'Kibana Version', value=info)
24
25
26
def _status(status: dict) -> (StateSetMetricFamily, GaugeMetricFamily):
27
    status_dict = {state: state == status['overall']['state'] for state in ['red', 'yellow', 'green']}
28
    since = datestring_to_timestamp(status['overall']['since'])
29
    status = StateSetMetricFamily('kibana_status', 'Kibana Status', value=status_dict)
30
    since = GaugeMetricFamily('kibana_status_since', 'Last change of status, in seconds since epoch', value=since)
31
    return status, since
32
33
34
class Metrics(object):
35
    def __init__(self, metrics_dict: dict):
36
        self._timestamp = datestring_to_timestamp(metrics_dict['last_updated'])
37
        self._metrics_dict = metrics_dict
38
39
    def __iter__(self):
40
        yield from self._response_times()
41
        yield from self._requests()
42
        yield from self._process()
43
        yield from self._os()
44
45
    def _os(self) -> Iterator[Metric]:
46
        os_dict = self._metrics_dict['os']
47
48
        yield from (TimestampGaugeMetricFamily('kibana_os_load_%s' % key,
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable key does not seem to be defined.
Loading history...
49
                                               'Kibana OS load %s' % key,
50
                                               value=value,
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable value does not seem to be defined.
Loading history...
51
                                               timestamp=self._timestamp) for key, value in os_dict['load'].items())
52
53
        yield from (TimestampGaugeMetricFamily('kibana_os_memory_%s_bytes' % key.split('_')[0],
54
                                               'Kibana %s OS memory' % key.split('_')[0],
55
                                               value=value,
56
                                               timestamp=self._timestamp) for key, value in os_dict['memory'].items())
57
58
        yield TimestampCounterMetricFamily('kibana_os_uptime_seconds',
59
                                           'Kibana OS uptime in seconds',
60
                                           value=os_dict['uptime_in_millis'] / 1000,
61
                                           timestamp=self._timestamp)
62
63
    def _response_times(self) -> Iterator[Metric]:
64
        rt_dict = self._metrics_dict['response_times']
65
66
        yield TimestampGaugeMetricFamily('kibana_response_time_max_seconds',
67
                                         'Kibana maximum response time in seconds',
68
                                         value=rt_dict['max_in_millis'] / 1000,
69
                                         timestamp=self._timestamp)
70
71
        # Kibana statistics lib can sometimes return NaN for this value.
72
        # If that is the case, this is set to 0 in order to avoid gaps in the time series.
73
        # Reference: https://github.com/elastic/kibana/blob/6.7/src/server/status/lib/metrics.js#L73
74
        # NaN is converted to `undefined` which then has the whole field removed from the response JSON
75
        yield TimestampGaugeMetricFamily('kibana_response_time_avg_seconds',
76
                                         'Kibana average response time in seconds',
77
                                         value=rt_dict.setdefault('avg_in_millis', 0) / 1000,
78
                                         timestamp=self._timestamp)
79
80
    def _requests(self) -> Iterator[Metric]:
81
        req_dict = self._metrics_dict['requests']
82
        yield TimestampGaugeMetricFamily('kibana_requests_total',
83
                                         'Total requests serviced',
84
                                         value=req_dict['total'],
85
                                         timestamp=self._timestamp)
86
87
        yield TimestampGaugeMetricFamily('kibana_requests_disconnects',
88
                                         'Total requests disconnected',
89
                                         value=req_dict['disconnects'],
90
                                         timestamp=self._timestamp)
91
92
        per_status = TimestampGaugeMetricFamily('kibana_requests',
93
                                                'Total requests by status code',
94
                                                labels=['status_code'],
95
                                                timestamp=self._timestamp)
96
97
        for code, count in req_dict['status_codes'].items():
98
            per_status.add_metric(labels=[code], value=count)
99
100
        yield per_status
101
102
    def _process(self) -> Iterator[Metric]:
103
        process_dict = self._metrics_dict['process']
104
105
        yield TimestampGaugeMetricFamily('kibana_process_memory_heap_total_bytes',
106
                                         'Total heap size in bytes',
107
                                         value=process_dict['memory']['heap']['total_in_bytes'],
108
                                         timestamp=self._timestamp)
109
        yield TimestampGaugeMetricFamily('kibana_process_memory_heap_used_bytes',
110
                                         'Used heap size in bytes',
111
                                         value=process_dict['memory']['heap']['used_in_bytes'],
112
                                         timestamp=self._timestamp)
113
114
        yield TimestampGaugeMetricFamily('kibana_process_memory_heap_size_limit_bytes',
115
                                         'Heap size limit in bytes',
116
                                         value=process_dict['memory']['heap']['size_limit'],
117
                                         timestamp=self._timestamp)
118
119
        yield TimestampGaugeMetricFamily('kibana_process_memory_resident_set_size_bytes',
120
                                         'Memory resident set size',
121
                                         value=process_dict['memory']['resident_set_size_in_bytes'],
122
                                         timestamp=self._timestamp)
123
124
        yield TimestampCounterMetricFamily('kibana_process_uptime_seconds',
125
                                           'Kibana process uptime in seconds',
126
                                           value=process_dict['uptime_in_millis'] / 1000,
127
                                           timestamp=self._timestamp)
128
129
130
class KibanaCollector(object):
131
    def __init__(self, host: str, path: str = '/api/status', kibana_login: str = None, kibana_password: str = None):
132
        self._url = urljoin(host, path)
133
        self._kibana_login = kibana_login
134
        self._kibana_password = kibana_password
135
136
    def _fetch_stats(self) -> dict:
137
        if self._kibana_login:
138
            auth = (self._kibana_login, self._kibana_password)
139
        else:
140
            auth = None
141
        r = get(self._url, auth=auth)
142
        r.raise_for_status()
143
        return r.json()
144
145
    def collect(self):
146
        kibana_up = GaugeMetricFamily('kibana_node_reachable', 'Kibana node was reached', value=0)
147
        try:
148
            stats = self._fetch_stats()
149
        except ConnectionError as e:
150
            logger.warning('Got a connection error while trying to contact Kibana:\n%s' % e)
151
        except Timeout as e:
152
            logger.warning('Got a timeout while trying to contact Kibana:\n%s' % e)
153
        except HTTPError as e:
154
            logger.warning('Got a HTTP error %s while trying to contact Kibana:\n%s' % (e.response.status_code, e))
155
        except RequestException as e:
156
            logger.warning('Got a RequestException while trying to contact Kibana:\n%s' % e)
157
        else:
158
            kibana_up = GaugeMetricFamily('kibana_node_reachable', 'Kibana node was reached', value=1)
159
            yield _info(stats['version'])
160
            yield from _status(stats['status'])
161
            yield from Metrics(stats['metrics'])
162
        finally:
163
            yield kibana_up
164