1
|
|
|
# -*- coding: utf-8 -*- |
2
|
|
|
|
3
|
|
|
__author__ = 'Kenny Freeman' |
4
|
|
|
__email__ = '[email protected]' |
5
|
|
|
__license__ = "ISCL" |
6
|
|
|
__docformat__ = 'reStructuredText' |
7
|
|
|
|
8
|
|
|
import socket |
9
|
|
|
import logging |
10
|
|
|
import httplib |
11
|
|
|
|
12
|
|
|
import requests # pip install requests |
13
|
|
|
|
14
|
|
|
import plumd |
15
|
|
|
import plumd.util |
16
|
|
|
import plumd.plugins |
17
|
|
|
|
18
|
|
|
# from: http://stackoverflow.com/questions/11029717/how-do-i-disable-log-messages-from-the-requests-library |
19
|
|
|
# http requests were generating log entries.. were. |
20
|
|
|
httplib.HTTPConnection.debuglevel = 0 |
21
|
|
|
rlog = logging.getLogger("requests.packages.urllib3") |
22
|
|
|
rlog.addHandler(logging.NullHandler()) |
23
|
|
|
rlog.setLevel(logging.ERROR) |
24
|
|
|
rlog.propagate = True |
25
|
|
|
|
26
|
|
|
|
27
|
|
|
def format_key_meta(meta): |
|
|
|
|
28
|
|
|
"""Return a formatted metadata string for use as a string representing |
29
|
|
|
part of an influxdb measurement key. |
30
|
|
|
|
31
|
|
|
:param meta: :class:`plumd.Meta` object. |
32
|
|
|
:type meta: plumd.Meta |
33
|
|
|
:rtype: str |
34
|
|
|
""" |
35
|
|
|
# generate metadata string |
36
|
|
|
mmeta_dat = [] |
37
|
|
|
for mkey, mval in meta.items: |
38
|
|
|
mkey = plumd.util.escape_tag(mkey) |
39
|
|
|
mval = mval[1] |
40
|
|
|
if isinstance(mval, str): |
41
|
|
|
mval = plumd.util.escape_tag(mval) |
42
|
|
|
elif isinstance(mval, bool): |
43
|
|
|
mval = "t" if mval else "f" |
44
|
|
|
mmeta_dat.append("{0}={1}".format(mkey, mval)) |
45
|
|
|
return ",".join(mmeta_dat) |
46
|
|
|
|
47
|
|
|
|
48
|
|
|
def format_field_metrics(metrics): |
49
|
|
|
"""Return a list of formatted strings for use as a string representing |
50
|
|
|
part of an influxdb measurement field. |
51
|
|
|
|
52
|
|
|
:param metrics: The list of metrics to format. |
53
|
|
|
:type metrics: list |
54
|
|
|
:rtype: str |
55
|
|
|
""" |
56
|
|
|
ret = [] |
57
|
|
|
for metric in metrics: |
58
|
|
|
# generate metadata string |
59
|
|
|
mname = plumd.util.escape_tag(metric[0]) |
60
|
|
|
mval = metric[1] |
61
|
|
|
if isinstance(mval, str): |
62
|
|
|
mval = plumd.util.escape_field(mval) |
63
|
|
|
elif isinstance(mval, bool): |
64
|
|
|
mval = "t" if mval else "f" |
65
|
|
|
elif isinstance(mval, int): |
66
|
|
|
mval = "{0}i".format(mval) |
67
|
|
|
ret.append("{0}={1}".format(mname, mval)) |
68
|
|
|
return ret |
69
|
|
|
|
70
|
|
|
|
71
|
|
|
class InfluxdbRender(plumd.Render): |
72
|
|
|
"""Renders metrics in an influxdb writable format. |
73
|
|
|
|
74
|
|
|
As of this writing the latest influxdb line protocol documentation is at: |
75
|
|
|
https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/ |
76
|
|
|
|
77
|
|
|
Plumd renders metrics like: |
78
|
|
|
|
79
|
|
|
cpu,host=some,meta=values idle=90.0,user=5.0,system=5.0 <ts>\n |
80
|
|
|
disk,host=some,meta=values,dev=/dev/sda free=95.0,used=5.0 <ts>\n |
81
|
|
|
|
82
|
|
|
where host and metric metadata map to measurement tags and the metric |
83
|
|
|
values for a single result get comma seperated as field values. |
84
|
|
|
|
85
|
|
|
Each result in a result set generates a new metric line and each metric |
86
|
|
|
line is queued in a deque for sending. |
87
|
|
|
|
88
|
|
|
:param rconfig: A Conf object initialized with the plugin configuration. |
89
|
|
|
:type rconfig: plumd.conf.Conf |
90
|
|
|
""" |
91
|
|
|
def __init__(self, rconfig): |
92
|
|
|
super(InfluxdbRender, self).__init__(rconfig) |
93
|
|
|
|
94
|
|
|
# generate host metadata string once on init |
95
|
|
|
self.meta_str = format_key_meta(self.meta) |
96
|
|
|
|
97
|
|
|
|
98
|
|
|
def process(self, results): |
99
|
|
|
"""Process the metrics into the influxdb line protocol (as of v0.9). |
100
|
|
|
|
101
|
|
|
:param metrics: a generator that returns metrics in a standard format |
102
|
|
|
:type metrics: generator |
103
|
|
|
""" |
104
|
|
|
# render each metric in the influxdb line format, append to self.metrics |
105
|
|
|
# (self._time, robj.name, robj.meta, robj.metrics) |
106
|
|
|
for (ts, rname, rmeta, metrics) in results: |
107
|
|
|
# get the result metadata string, if any, for the key part |
108
|
|
|
rmeta_str = format_key_meta(rmeta) |
109
|
|
|
|
110
|
|
|
# get a list of formatted metric strings |
111
|
|
|
met_strs = format_field_metrics(metrics) |
112
|
|
|
# since each metric in metrics is from the same result |
113
|
|
|
# we can include them in the same line |
114
|
|
|
field_str = ",".join(met_strs) |
115
|
|
|
|
116
|
|
|
# finally create the combined metadata string, for the field part |
117
|
|
|
key_str = rname |
118
|
|
|
if self.meta_str: |
119
|
|
|
key_str = "{0},{1}".format(rname, self.meta_str) |
120
|
|
|
if rmeta_str: |
121
|
|
|
key_str = "{0},{1}".format(key_str, rmeta_str) |
122
|
|
|
|
123
|
|
|
# now combine it all - note influx uses nanosecond timestamps |
124
|
|
|
# see http://crsmithdev.com/arrow/, X is timestamp and SSS is ms |
125
|
|
|
ts_str = ts.format("XSSS000000") |
126
|
|
|
|
127
|
|
|
fmt = "{0} {1} {2}" |
128
|
|
|
mstr = fmt.format(key_str, field_str, ts_str) |
129
|
|
|
# add the rendered metric string to our deque of metrics |
130
|
|
|
# this allows for "\n".join() when sending |
131
|
|
|
self.metrics.append(mstr) |
132
|
|
|
|
133
|
|
|
|
134
|
|
|
|
135
|
|
|
class Influxdb(plumd.plugins.Writer): |
136
|
|
|
"""Influxdb sender.""" |
137
|
|
|
defaults = { |
138
|
|
|
'username': 'admin', |
139
|
|
|
'password': 'admin', |
140
|
|
|
'database': 'metrics', |
141
|
|
|
'host': '127.0.0.1', |
142
|
|
|
'port': 8086, |
143
|
|
|
'protocol': "http", # http or udp : todo: udp |
144
|
|
|
'batch': 64, # send batches of this size |
145
|
|
|
'retries': 3, # retry failed requests |
146
|
|
|
'timeout': 5, # timeout http posts in 5 seconds |
147
|
|
|
'maxqueue': 8192, # maximum number of metrics to queue |
148
|
|
|
'warnqueue': 1024 # print warning if queue size > this |
149
|
|
|
} |
150
|
|
|
|
151
|
|
|
def __init__(self, log, config): |
152
|
|
|
"""Influxdb sender. |
153
|
|
|
|
154
|
|
|
:param log: a structlog logger instance. |
155
|
|
|
:type log: structlog logger |
156
|
|
|
:param config: a plumd.config.Conf configuration helper instance. |
157
|
|
|
:type config: plumd.config.Conf |
158
|
|
|
""" |
159
|
|
|
super(Influxdb, self).__init__(log, config) |
160
|
|
|
self.config = config |
161
|
|
|
proto = config.get('protocol') |
162
|
|
|
self.batch = config.get('batch') |
163
|
|
|
self.warn = config.get('warnqueue') |
164
|
|
|
self.socket = None |
165
|
|
|
if proto.lower() == "http": |
166
|
|
|
self.session = requests.Session() |
167
|
|
|
self.session.auth = (config.get('username'), config.get('password')) |
168
|
|
|
content_type = {'Content-type': 'application/octet-stream'} |
169
|
|
|
self.session.headers.update(content_type) |
170
|
|
|
self.send = self.send_http |
171
|
|
|
args = [ config.get('host'), config.get('port'), |
172
|
|
|
config.get('database') ] |
173
|
|
|
self.url = "http://{0}:{1}/write?db={2}".format(*args) |
174
|
|
|
self.retries = config.get('retries') |
175
|
|
|
self.timeout = config.get('timeout') |
176
|
|
|
self.log.debug("__init__", timeout=self.timeout, url=self.url) |
177
|
|
|
elif proto.lower() == "udp": |
178
|
|
|
# udp has no auth or db (configured on receiver side) |
179
|
|
|
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
180
|
|
|
self.addr = (config.get('host'), config.get('port')) |
181
|
|
|
self.send = self.send_udp |
182
|
|
|
self.log.debug("__init__", timeout=self.timeout, address=self.addr) |
183
|
|
|
else: |
184
|
|
|
raise plumd.ConfigError("unknown protocol: {0}".format(self.proto)) |
185
|
|
|
self.render = InfluxdbRender(self.config) |
186
|
|
|
|
187
|
|
|
|
188
|
|
|
def send_udp(self, metrics): |
189
|
|
|
"""Send the batch of metrics to influxdb via UDP protocol. |
190
|
|
|
|
191
|
|
|
:param metrics: The list of pre-formated metrics from our renderer. |
192
|
|
|
:type metrics: list |
193
|
|
|
""" |
194
|
|
|
mstr = "\n".join(metrics) + "\n" |
195
|
|
|
#str_len = len(mstr) |
196
|
|
|
try: |
197
|
|
|
self.socket.sendto(mstr, self.addr) |
198
|
|
|
except Exception as e: |
199
|
|
|
self.log.debug("send_udp", exception=e) |
200
|
|
|
return |
201
|
|
|
|
202
|
|
|
|
203
|
|
|
def send_http(self, metrics): |
204
|
|
|
"""Send the batch of metrics to influxb. |
205
|
|
|
|
206
|
|
|
:param metrics: The list of pre-formated metrics from our renderer. |
207
|
|
|
:type metrics: list |
208
|
|
|
""" |
209
|
|
|
rdat = "\n".join(metrics) + "\n" |
210
|
|
|
url = self.url |
211
|
|
|
nretries = self.retries |
212
|
|
|
timeout = self.timeout |
213
|
|
|
while nretries > 0: |
214
|
|
|
nretries -= 1 |
215
|
|
|
resp = None |
216
|
|
|
try: |
217
|
|
|
resp = self.session.post(url, data=rdat, timeout=timeout) |
218
|
|
|
except Exception as e: |
219
|
|
|
msg = "exception during post" |
220
|
|
|
rtxt = "None" |
221
|
|
|
if resp is not None and hasattr(resp, "text"): |
222
|
|
|
rtxt = resp.text |
223
|
|
|
self.log.error("send_http", exception=e, msg=msg, response=rtxt) |
224
|
|
|
continue |
225
|
|
|
# requests module ok codes doesn't include 204... |
226
|
|
|
if resp.status_code >= 200 and resp.status_code < 300: |
227
|
|
|
return |
228
|
|
|
else: |
229
|
|
|
self.log.debug("send_http", action="retrying", |
230
|
|
|
response=resp.status_code, text=resp.text) |
231
|
|
|
self.log.debug("send_http", action="failed", reason="too many retries") |
232
|
|
|
|
233
|
|
|
|
234
|
|
|
def onstop(self): |
235
|
|
|
"""Flush any remaining metrics.""" |
236
|
|
|
# get the next full batch of metrics to send |
237
|
|
|
self.log.debug("onstop", action="flushing") |
238
|
|
|
qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, True) |
239
|
|
|
if qsize > self.warn: |
240
|
|
|
self.log.warn("onstop", qsize=qsize, warn=self.warn) |
241
|
|
|
while nmetrics > 0: |
242
|
|
|
self.log.debug("onstop", action="sending", metrics=bmetrics) |
243
|
|
|
self.send(bmetrics) |
244
|
|
|
# and the next batch until there are no more full batches |
245
|
|
|
qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, True) |
246
|
|
|
self.log.debug("onstop", action="flushed") |
247
|
|
|
|
248
|
|
|
|
249
|
|
|
def push(self, rset): |
250
|
|
|
"""Render metrics onto our deque and send all full batches. |
251
|
|
|
|
252
|
|
|
:param metrics: The list of pre-formated metrics from our renderer. |
253
|
|
|
:type metrics: list |
254
|
|
|
""" |
255
|
|
|
# record this batch of metrics |
256
|
|
|
self.render.process(rset.results) |
257
|
|
|
|
258
|
|
|
# get the next full batch of metrics to send |
259
|
|
|
qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, False) |
260
|
|
|
if qsize > self.warn: |
261
|
|
|
self.log.warn("push", qsize=qsize) |
262
|
|
|
while nmetrics > 0: |
263
|
|
|
self.send(bmetrics) |
264
|
|
|
# and the next batch until there are no more full batches |
265
|
|
|
qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, |
266
|
|
|
False) |
267
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.