|
1
|
|
|
# -*- coding: utf-8 -*- |
|
2
|
|
|
|
|
3
|
|
|
__author__ = 'Kenny Freeman' |
|
4
|
|
|
__email__ = '[email protected]' |
|
5
|
|
|
__license__ = "ISCL" |
|
6
|
|
|
__docformat__ = 'reStructuredText' |
|
7
|
|
|
|
|
8
|
|
|
import itertools |
|
9
|
|
|
import logging |
|
10
|
|
|
import httplib |
|
11
|
|
|
import json |
|
12
|
|
|
|
|
13
|
|
|
import requests # pip install requests |
|
14
|
|
|
|
|
15
|
|
|
import plumd |
|
16
|
|
|
import plumd.plugins |
|
17
|
|
|
|
|
18
|
|
|
# from: http://stackoverflow.com/questions/11029717/how-do-i-disable-log-messages-from-the-requests-library |
|
19
|
|
|
# http requests were generating log entries.. were. |
|
20
|
|
|
httplib.HTTPConnection.debuglevel = 0 |
|
21
|
|
|
rlog = logging.getLogger("requests.packages.urllib3") |
|
22
|
|
|
rlog.addHandler(logging.NullHandler()) |
|
23
|
|
|
rlog.setLevel(logging.ERROR) |
|
24
|
|
|
rlog.propagate = True |
|
25
|
|
|
|
|
26
|
|
|
|
|
27
|
|
|
class ElasticRender(plumd.Render): |
|
28
|
|
|
"""Renders metrics in an elasticsearch writable format which is simply |
|
29
|
|
|
a dictionary with the required bulk api parameters and metric details. |
|
30
|
|
|
|
|
31
|
|
|
NOTE: this class is considered experimental and is not fully functional. |
|
32
|
|
|
|
|
33
|
|
|
:param rconfig: A Conf object initialized with the plugin configuration. |
|
34
|
|
|
:type rconfig: plumd.conf.Conf |
|
35
|
|
|
""" |
|
36
|
|
|
|
|
37
|
|
|
def __init__(self, rconfig): |
|
38
|
|
|
super(ElasticRender, self).__init__(rconfig) |
|
39
|
|
|
# get the configured values from the plugin configuration |
|
40
|
|
|
self.index = rconfig.get("index") |
|
41
|
|
|
self.type = rconfig.get("type") |
|
42
|
|
|
|
|
43
|
|
|
|
|
44
|
|
|
def process(self, results): |
|
45
|
|
|
"""Process the results into a format suitable for bulk api calls. |
|
46
|
|
|
|
|
47
|
|
|
Results are in the format: |
|
48
|
|
|
|
|
49
|
|
|
( time, result_name, result_meta, [metric, metric, metric] ) |
|
50
|
|
|
|
|
51
|
|
|
:param metrics: a generator that returns metrics in a standard format |
|
52
|
|
|
:type metrics: generator |
|
53
|
|
|
""" |
|
54
|
|
|
field_map = { # this is here to save a lookup in the loop below |
|
55
|
|
|
plumd.Int: ('int', 'int_val'), |
|
56
|
|
|
plumd.Counter: ('counter', 'int_val'), |
|
57
|
|
|
plumd.Float: ('float', 'float_val'), |
|
58
|
|
|
plumd.Gauge: ('gauge', 'float_val'), |
|
59
|
|
|
plumd.Rate: ('rate', 'float_val'), |
|
60
|
|
|
plumd.Timer: ('timer', 'float_val'), |
|
61
|
|
|
plumd.String: ('string', 'str_val'), |
|
62
|
|
|
plumd.Boolean: ('bool', 'bool_val') |
|
63
|
|
|
} |
|
64
|
|
|
for (ts, rname, rmeta, metrics) in results: |
|
65
|
|
|
rmetric = { |
|
66
|
|
|
'measurement': mname, |
|
67
|
|
|
'metric': name, |
|
68
|
|
|
'time': ts.isoformat() |
|
69
|
|
|
} |
|
70
|
|
|
if mclass not in field_map: |
|
71
|
|
|
rmetric['unk_val'] = val |
|
72
|
|
|
rmetric['metric_type'] = "unknown" |
|
73
|
|
|
else: |
|
74
|
|
|
metric_type, metric_key = field_map[mclass] |
|
75
|
|
|
rmetric[metric_key] = val |
|
76
|
|
|
rmetric["metric_type"] = metric_type |
|
77
|
|
|
# todo: how to handle this better? (key collisions this way) |
|
78
|
|
|
# fold host meta data into metric document |
|
79
|
|
|
for key, val in self.host_meta.items(): |
|
80
|
|
|
if key not in rmetric: |
|
81
|
|
|
rmetric[key] = val |
|
82
|
|
|
# fold metric meta data into metric document |
|
83
|
|
|
for key, val in meta.items(): |
|
84
|
|
|
if key not in rmetric: |
|
85
|
|
|
rmetric[key] = val |
|
86
|
|
|
# es bulk api wants newline separated json strings |
|
87
|
|
|
index = { "index": {"_index" : self.index } } |
|
88
|
|
|
action_str = "{0}\n{1}\n".format(json.dumps(index), |
|
89
|
|
|
json.dumps(rmetric)) |
|
90
|
|
|
self.metrics.append(action_str) |
|
91
|
|
|
|
|
92
|
|
|
|
|
93
|
|
|
class Elastic(plumd.plugins.Writer): |
|
94
|
|
|
"""Elasticsearch sender.""" |
|
95
|
|
|
defaults = { |
|
96
|
|
|
'proto': 'http', |
|
97
|
|
|
'hosts': ['127.0.0.1:9200'], # default list of hosts |
|
98
|
|
|
'timeout': 30, # todo: timeouts in es calls |
|
99
|
|
|
'index': 'metrics', # name of es index for metrics |
|
100
|
|
|
'type': 'metric', # type of document to create |
|
101
|
|
|
'batch': 64, # number of metrics per bulk req |
|
102
|
|
|
'retries': 3, # retry failed requests |
|
103
|
|
|
'timeout': 5, # timeout http post requests |
|
104
|
|
|
'maxqueue': 8192, # maximum number of metrics to queue |
|
105
|
|
|
'warnqueue': 256 # print a warning if the queue > this |
|
106
|
|
|
} |
|
107
|
|
|
|
|
108
|
|
|
def __init__(self, log, config): |
|
109
|
|
|
"""Elasticsearch sender. |
|
110
|
|
|
|
|
111
|
|
|
NOTE: this class is considered experimental and is not fully functional. |
|
112
|
|
|
|
|
113
|
|
|
:param log: a structlog logger instance. |
|
114
|
|
|
:type log: structlog logger |
|
115
|
|
|
:param config: a plumd.config.Conf configuration helper instance. |
|
116
|
|
|
:type config: plumd.config.Conf |
|
117
|
|
|
""" |
|
118
|
|
|
super(Elastic, self).__init__(log, config) |
|
119
|
|
|
self.host_meta = config.get('meta') # host level metadata |
|
120
|
|
|
self.hosts = config.get('hosts') |
|
121
|
|
|
self.index = config.get('index') |
|
122
|
|
|
self.dtype = self.config.get('type') |
|
123
|
|
|
self.batch = self.config.get('batch') |
|
124
|
|
|
self.warn = self.config.get('warnqueue') |
|
125
|
|
|
url = "{0}://{1}/{2}/{3}/_bulk/" # the documents have the index in them |
|
126
|
|
|
base = self.config.get('proto') |
|
127
|
|
|
hosts = self.config.get('hosts') |
|
128
|
|
|
urls = [ url.format(base, h, self.index, self.dtype) for h in hosts ] |
|
129
|
|
|
sessions = [ requests.Session() for host in hosts ] |
|
130
|
|
|
self.sessions = itertools.cycle(zip(urls, sessions)) |
|
131
|
|
|
self.retries = config.get('retries') |
|
132
|
|
|
self.timeout = config.get('timeout') |
|
133
|
|
|
self.render = ElasticRender(self.config) |
|
134
|
|
|
self.log.debug("__init__", timeout=self.timeout) |
|
135
|
|
|
|
|
136
|
|
|
|
|
137
|
|
|
def send(self, metrics): |
|
138
|
|
|
"""Render metrics onto our deque and send all full batches using |
|
139
|
|
|
the bulk api. |
|
140
|
|
|
|
|
141
|
|
|
:param metrics: The list of pre-formated metrics from our renderer. |
|
142
|
|
|
:type metrics: list |
|
143
|
|
|
""" |
|
144
|
|
|
rdat = "\n".join(metrics) # es bulk api wants \n separated json strings |
|
145
|
|
|
self.log.debug("send", index=self.index, action="posting") |
|
146
|
|
|
# cycle through each url, retry up to self.retries count |
|
147
|
|
|
nretries = self.retries |
|
148
|
|
|
timeout = self.timeout |
|
149
|
|
|
for url, sess in self.sessions: |
|
150
|
|
|
nretries -= 1 |
|
151
|
|
|
if nretries < 0: |
|
152
|
|
|
break |
|
153
|
|
|
(resp, took, errs, items, failed) = (None, "unknown", True, [], []) |
|
154
|
|
|
try: |
|
155
|
|
|
self.log.debug("send", request=url, data=rdat, timeout=timeout) |
|
156
|
|
|
resp = sess.post(url, data=rdat) |
|
157
|
|
|
except Exception as e: |
|
158
|
|
|
msg = "exception during post" |
|
159
|
|
|
rtxt = "None" |
|
160
|
|
|
if resp is not None and hasattr(resp, "text"): |
|
161
|
|
|
rtxt = resp.text |
|
162
|
|
|
self.log.error("send", exception=e, msg=msg, response=rtxt) |
|
163
|
|
|
continue |
|
164
|
|
|
if resp.status_code == requests.codes.ok: |
|
165
|
|
|
try: |
|
166
|
|
|
robj = resp.json() |
|
167
|
|
|
if 'took' in robj: |
|
168
|
|
|
took = robj['took'] |
|
169
|
|
|
if 'errors' in robj: |
|
170
|
|
|
errs = robj['errors'] |
|
171
|
|
|
if errs and 'items' in robj: |
|
172
|
|
|
for item in robj['items']: |
|
173
|
|
|
failed.append(item) |
|
174
|
|
|
except Exception as e: |
|
175
|
|
|
msg = "error parsing response" |
|
176
|
|
|
self.log.debug("send", exception=e, msg=msg) |
|
177
|
|
|
self.log.debug("send", action="SUCCESS", took=took, |
|
178
|
|
|
errors=errs, failed=failed) |
|
179
|
|
|
break |
|
180
|
|
|
else: |
|
181
|
|
|
self.log.debug("send", action="retrying", |
|
182
|
|
|
response=resp.status_code) |
|
183
|
|
|
if nretries > 0: |
|
184
|
|
|
self.log.debug("send", action="complete") |
|
185
|
|
|
else: |
|
186
|
|
|
self.log.debug("send", action="failed", reason="too many retries") |
|
187
|
|
|
|
|
188
|
|
|
|
|
189
|
|
|
def onstop(self): |
|
|
|
|
|
|
190
|
|
|
"""Flush any remaining metrics.""" |
|
191
|
|
|
self.log.debug("onstop", action="flushing") |
|
192
|
|
|
(qsize, nmetrics, bmetrics) = self.render.next_batch(self.batch, True) |
|
193
|
|
|
if qsize > self.warn: |
|
194
|
|
|
self.log.warn("onstop", qsize=qsize, warn=self.warn) |
|
195
|
|
|
while nmetrics > 0: |
|
196
|
|
|
self.log.debug("push", action="sending", metrics=bmetrics) |
|
197
|
|
|
self.send(bmetrics) |
|
198
|
|
|
# and the next batch until there are no more full batches |
|
199
|
|
|
qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, True) |
|
200
|
|
|
self.log.debug("onstop", action="flushed") |
|
201
|
|
|
|
|
202
|
|
|
|
|
203
|
|
|
def push(self, rset): |
|
204
|
|
|
"""Send metrics to elasticsearch.""" |
|
205
|
|
|
self.log.debug("push", action="processing", index=self.index, |
|
206
|
|
|
dtype=self.dtype) |
|
207
|
|
|
# record this batch of metrics |
|
208
|
|
|
self.render.process(rset.results) |
|
209
|
|
|
|
|
210
|
|
|
# get the next full batch of metrics to send |
|
211
|
|
|
qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, False) |
|
212
|
|
|
if qsize > self.warn: |
|
213
|
|
|
self.log.warn("push", qsize=qsize) |
|
214
|
|
|
while nmetrics > 0: |
|
215
|
|
|
self.log.debug("push", action="sending", metrics=bmetrics) |
|
216
|
|
|
self.send(bmetrics) |
|
217
|
|
|
# and the next batch until there are no more full batches |
|
218
|
|
|
qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, False) |
|
219
|
|
|
self.log.debug("push", action="processed") |
|
220
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.