|
1
|
|
|
# -*- coding: utf-8 -*- |
|
2
|
|
|
|
|
3
|
|
|
__author__ = 'Kenny Freeman' |
|
4
|
|
|
__email__ = '[email protected]' |
|
5
|
|
|
__license__ = "ISCL" |
|
6
|
|
|
__docformat__ = 'reStructuredText' |
|
7
|
|
|
|
|
8
|
|
|
import copy |
|
9
|
|
|
import socket |
|
10
|
|
|
import logging |
|
11
|
|
|
import httplib |
|
12
|
|
|
|
|
13
|
|
|
import requests # pip install requests |
|
14
|
|
|
|
|
15
|
|
|
import plumd |
|
16
|
|
|
import plumd.plugins |
|
17
|
|
|
import plumd |
|
18
|
|
|
|
|
19
|
|
|
# from: http://stackoverflow.com/questions/11029717/how-do-i-disable-log-messages-from-the-requests-library |
|
20
|
|
|
# http requests were generating log entries.. were. |
|
21
|
|
|
httplib.HTTPConnection.debuglevel = 0 |
|
22
|
|
|
rlog = logging.getLogger("requests.packages.urllib3") |
|
23
|
|
|
rlog.addHandler(logging.NullHandler()) |
|
24
|
|
|
rlog.setLevel(logging.ERROR) |
|
25
|
|
|
rlog.propagate = True |
|
26
|
|
|
|
|
27
|
|
|
|
|
28
|
|
|
class InfluxdbRender(plumd.Render): |
|
29
|
|
|
"""Renders metrics in an influxdb writable format. |
|
30
|
|
|
|
|
31
|
|
|
As of this writing the latest influxdb line protocol documentation is at: |
|
32
|
|
|
https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/ |
|
33
|
|
|
|
|
34
|
|
|
Plumd renders metrics like: |
|
35
|
|
|
|
|
36
|
|
|
cpu,host=some,meta=values idle=90.0,user=5.0,system=5.0 <ts>\n |
|
37
|
|
|
disk,host=some,meta=values,dev=/dev/sda free=95.0,used=5.0 <ts>\n |
|
38
|
|
|
|
|
39
|
|
|
where host and metric metadata map to measurement tags and the metric |
|
40
|
|
|
values for a single result get comma seperated as field values. |
|
41
|
|
|
|
|
42
|
|
|
Each result in a result set generates a new metric line and each metric |
|
43
|
|
|
line is queued in a deque for sending. |
|
44
|
|
|
|
|
45
|
|
|
:param rconfig: A Conf object initialized with the plugin configuration. |
|
46
|
|
|
:type rconfig: plumd.conf.Conf |
|
47
|
|
|
""" |
|
48
|
|
|
def __init__(self, rconfig): |
|
49
|
|
|
super(InfluxdbRender, self).__init__(rconfig) |
|
50
|
|
|
|
|
51
|
|
|
# use a lambda for escaping |
|
52
|
|
|
escmd = lambda t: t.replace(" ", "\ ").replace(",", "\,").replace("\n", "\\n") |
|
53
|
|
|
|
|
54
|
|
|
# generate host metadata string once on init |
|
55
|
|
|
meta_dat = [] |
|
56
|
|
|
for hmkey, hmval in self.meta.items: |
|
57
|
|
|
hmkey = escmd(hmkey) |
|
58
|
|
|
if isinstance(hmval, str): |
|
59
|
|
|
hmval = escmd(hmval) |
|
60
|
|
|
elif isinstance(hmval, int): |
|
61
|
|
|
hmval = "{0}i".format(hmval) |
|
62
|
|
|
elif isinstance(hmval, bool): |
|
63
|
|
|
hmval = "t" if hmval else "f" |
|
64
|
|
|
meta_dat.append("{0}={1}".format(hmkey, hmval)) |
|
65
|
|
|
self.meta_str = ",".join(meta_dat) |
|
66
|
|
|
|
|
67
|
|
|
|
|
68
|
|
|
def process(self, results): |
|
69
|
|
|
"""Process the metrics into the influxdb line protocol (as of v0.9). |
|
70
|
|
|
|
|
71
|
|
|
:param metrics: a generator that returns metrics in a standard format |
|
72
|
|
|
:type metrics: generator |
|
73
|
|
|
""" |
|
74
|
|
|
# use lambdas for string fixups in metadata/fields |
|
75
|
|
|
escmd = lambda t: t.replace(" ", "\ ").replace(",", "\,").replace("\n", "\\n") |
|
76
|
|
|
escmf = lambda f: '""{0}""'.format(f).replace(" ", "\ ").replace(",", "\,").replace("\n", "\\n") |
|
77
|
|
|
|
|
78
|
|
|
# render each metric in the influxdb line format, append to self.metrics |
|
79
|
|
|
# this is a long function as I don't want to call outside the loop |
|
80
|
|
|
# ( time, result_name, result_meta, [metric, metric, metric] ) |
|
81
|
|
|
for (ts, rname, rmeta, metrics) in results: |
|
82
|
|
|
# generate a metrics string eg. idle=0.0,user=90.0,sys=10.0 |
|
83
|
|
|
met_str = "" |
|
84
|
|
|
for metric in metrics: |
|
85
|
|
|
mclass = metric.__class__ |
|
86
|
|
|
key_str = escmd(metric[0]) |
|
87
|
|
|
val_str = None |
|
88
|
|
|
# value => int:append i, bool: t or f, string dbl qut |
|
89
|
|
|
if mclass == plumd.Int or mclass == plumd.Counter: |
|
90
|
|
|
val_str = "{0}i".format(metric[1]) |
|
91
|
|
|
elif mclass == plumd.Boolean: |
|
92
|
|
|
val_str = "t" if metric[1] else "f" |
|
93
|
|
|
elif mclass == plumd.String: |
|
94
|
|
|
val_str = escmf(metric[1]) |
|
95
|
|
|
else: |
|
96
|
|
|
val_str = metric[1] |
|
97
|
|
|
if met_str: |
|
98
|
|
|
met_str += ",{0}={1}".format(key_str, val_str) |
|
99
|
|
|
else: |
|
100
|
|
|
met_str = "{0}={1}".format(key_str, val_str) |
|
101
|
|
|
|
|
102
|
|
|
# next add any metadata from the result |
|
103
|
|
|
rmdat = [] |
|
104
|
|
|
# combine host and metric metadata |
|
105
|
|
|
for r_mkey, r_mval in rmeta.items: |
|
106
|
|
|
r_mkey = escmd(r_mkey) |
|
107
|
|
|
if isinstance(r_mval, str): |
|
108
|
|
|
r_mval = escmd(r_mval) |
|
109
|
|
|
elif isinstance(r_mval, int): |
|
110
|
|
|
r_mval = "{0}i".format(r_mval) |
|
111
|
|
|
elif isinstance(r_mval, bool): |
|
112
|
|
|
r_mval = "t" if r_mval else "f" |
|
113
|
|
|
rmdat.append("{0}={1}".format(r_mkey, r_mval)) |
|
114
|
|
|
rmstr = ",".join(rmdat) |
|
115
|
|
|
|
|
116
|
|
|
# finally create the combined metadata string |
|
117
|
|
|
meta_str = rname |
|
118
|
|
|
if self.meta_str: |
|
119
|
|
|
meta_str += "," + self.meta_str |
|
120
|
|
|
if rmstr: |
|
121
|
|
|
meta_str += "," + rmstr |
|
122
|
|
|
|
|
123
|
|
|
# now combine it all - note influx uses nanosecond timestamps |
|
124
|
|
|
# see http://crsmithdev.com/arrow/, X is timestamp and SSS is ms |
|
125
|
|
|
ts_fmt = "XSSS000000" |
|
126
|
|
|
mstr = "{0} {1} {2}".format(meta_str, met_str, ts.format(ts_fmt)) |
|
127
|
|
|
# add the rendered metric string to our deque of metrics |
|
128
|
|
|
# this allows for "\n".join() when sending |
|
129
|
|
|
self.metrics.append(mstr) |
|
130
|
|
|
|
|
131
|
|
|
|
|
132
|
|
|
class Influxdb(plumd.plugins.Writer): |
|
133
|
|
|
"""Influxdb sender.""" |
|
134
|
|
|
defaults = { |
|
135
|
|
|
'username': 'admin', |
|
136
|
|
|
'password': 'admin', |
|
137
|
|
|
'database': 'metrics', |
|
138
|
|
|
'host': '127.0.0.1', |
|
139
|
|
|
'port': 8086, |
|
140
|
|
|
'protocol': "http", # http or udp : todo: udp |
|
141
|
|
|
'batch': 64, # send batches of this size |
|
142
|
|
|
'retries': 3, # retry failed requests |
|
143
|
|
|
'timeout': 5, # timeout http posts in 5 seconds |
|
144
|
|
|
'maxqueue': 8192, # maximum number of metrics to queue |
|
145
|
|
|
'warnqueue': 1024 # print warning if queue size > this |
|
146
|
|
|
} |
|
147
|
|
|
|
|
148
|
|
|
def __init__(self, log, config): |
|
149
|
|
|
"""Influxdb sender. |
|
150
|
|
|
|
|
151
|
|
|
:param log: a structlog logger instance. |
|
152
|
|
|
:type log: structlog logger |
|
153
|
|
|
:param config: a plumd.config.Conf configuration helper instance. |
|
154
|
|
|
:type config: plumd.config.Conf |
|
155
|
|
|
""" |
|
156
|
|
|
super(Influxdb, self).__init__(log, config) |
|
157
|
|
|
self.config = config |
|
158
|
|
|
proto = config.get('protocol') |
|
159
|
|
|
self.batch = config.get('batch') |
|
160
|
|
|
self.warn = config.get('warnqueue') |
|
161
|
|
|
self.socket = None |
|
162
|
|
|
if proto.lower() == "http": |
|
163
|
|
|
self.session = requests.Session() |
|
164
|
|
|
self.session.auth = (config.get('username'), config.get('password')) |
|
165
|
|
|
content_type = {'Content-type': 'application/octet-stream'} |
|
166
|
|
|
self.session.headers.update(content_type) |
|
167
|
|
|
self.send = self.send_http |
|
168
|
|
|
args = [ config.get('host'), config.get('port'), |
|
169
|
|
|
config.get('database') ] |
|
170
|
|
|
self.url = "http://{0}:{1}/write?db={2}".format(*args) |
|
171
|
|
|
self.retries = config.get('retries') |
|
172
|
|
|
self.timeout = config.get('timeout') |
|
173
|
|
|
self.log.debug("__init__", timeout=self.timeout, url=self.url) |
|
174
|
|
|
elif proto.lower() == "udp": |
|
175
|
|
|
# udp has no auth or db (configured on receiver side) |
|
176
|
|
|
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
|
177
|
|
|
self.addr = (config.get('host'), config.get('port')) |
|
178
|
|
|
self.send = self.send_udp |
|
179
|
|
|
self.log.debug("__init__", timeout=self.timeout, address=self.addr) |
|
180
|
|
|
else: |
|
181
|
|
|
raise plumd.ConfigError("unknown protocol: {0}".format(self.proto)) |
|
182
|
|
|
self.render = InfluxdbRender(self.config) |
|
183
|
|
|
|
|
184
|
|
|
|
|
185
|
|
|
def send_udp(self, metrics): |
|
186
|
|
|
"""Send the batch of metrics to influxdb via UDP protocol. |
|
187
|
|
|
|
|
188
|
|
|
:param metrics: The list of pre-formated metrics from our renderer. |
|
189
|
|
|
:type metrics: list |
|
190
|
|
|
""" |
|
191
|
|
|
mstr = "\n".join(metrics) + "\n" |
|
192
|
|
|
str_len = len(mstr) |
|
193
|
|
|
try: |
|
194
|
|
|
self.socket.sendto(mstr, self.addr) |
|
195
|
|
|
except Exception as e: |
|
196
|
|
|
self.log.debug("send_udp", exception=e) |
|
197
|
|
|
return |
|
198
|
|
|
|
|
199
|
|
|
|
|
200
|
|
|
def send_http(self, metrics): |
|
201
|
|
|
"""Send the batch of metrics to influxb. |
|
202
|
|
|
|
|
203
|
|
|
:param metrics: The list of pre-formated metrics from our renderer. |
|
204
|
|
|
:type metrics: list |
|
205
|
|
|
""" |
|
206
|
|
|
rdat = "\n".join(metrics) + "\n" |
|
207
|
|
|
url = self.url |
|
208
|
|
|
nretries = self.retries |
|
209
|
|
|
timeout = self.timeout |
|
210
|
|
|
while nretries > 0: |
|
211
|
|
|
nretries -= 1 |
|
212
|
|
|
resp = None |
|
213
|
|
|
try: |
|
214
|
|
|
resp = self.session.post(url, data=rdat, timeout=timeout) |
|
215
|
|
|
except Exception as e: |
|
216
|
|
|
msg = "exception during post" |
|
217
|
|
|
rtxt = "None" |
|
218
|
|
|
if resp is not None and hasattr(resp, "text"): |
|
219
|
|
|
rtxt = resp.text |
|
220
|
|
|
self.log.error("send_http", exception=e, msg=msg, response=rtxt) |
|
221
|
|
|
continue |
|
222
|
|
|
# requests module ok codes doesn't include 204... |
|
223
|
|
|
if resp.status_code >= 200 and resp.status_code < 300: |
|
224
|
|
|
return |
|
225
|
|
|
else: |
|
226
|
|
|
self.log.debug("send_http", action="retrying", |
|
227
|
|
|
response=resp.status_code, text=resp.text) |
|
228
|
|
|
self.log.debug("send_http", action="failed", reason="too many retries") |
|
229
|
|
|
|
|
230
|
|
|
|
|
231
|
|
|
def onstop(self): |
|
|
|
|
|
|
232
|
|
|
"""Flush any remaining metrics.""" |
|
233
|
|
|
# get the next full batch of metrics to send |
|
234
|
|
|
self.log.debug("onstop", action="flushing") |
|
235
|
|
|
qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, True) |
|
236
|
|
|
if qsize > self.warn: |
|
237
|
|
|
self.log.warn("onstop", qsize=qsize, warn=self.warn) |
|
238
|
|
|
while nmetrics > 0: |
|
239
|
|
|
self.log.debug("onstop", action="sending", metrics=bmetrics) |
|
240
|
|
|
self.send(bmetrics) |
|
241
|
|
|
# and the next batch until there are no more full batches |
|
242
|
|
|
qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, True) |
|
243
|
|
|
self.log.debug("onstop", action="flushed") |
|
244
|
|
|
|
|
245
|
|
|
|
|
246
|
|
|
def push(self, rset): |
|
247
|
|
|
"""Render metrics onto our deque and send all full batches. |
|
248
|
|
|
|
|
249
|
|
|
:param metrics: The list of pre-formated metrics from our renderer. |
|
250
|
|
|
:type metrics: list |
|
251
|
|
|
""" |
|
252
|
|
|
# record this batch of metrics |
|
253
|
|
|
self.render.process(rset.results) |
|
254
|
|
|
|
|
255
|
|
|
# get the next full batch of metrics to send |
|
256
|
|
|
qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, False) |
|
257
|
|
|
if qsize > self.warn: |
|
258
|
|
|
self.log.warn("push", qsize=qsize) |
|
259
|
|
|
while nmetrics > 0: |
|
260
|
|
|
self.send(bmetrics) |
|
261
|
|
|
# and the next batch until there are no more full batches |
|
262
|
|
|
qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, |
|
263
|
|
|
False) |
|
264
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.