1
|
|
|
# -*- coding: utf-8 -*- |
2
|
|
|
|
3
|
|
|
__author__ = 'Kenny Freeman' |
4
|
|
|
__email__ = '[email protected]' |
5
|
|
|
__license__ = "ISCL" |
6
|
|
|
__docformat__ = 'reStructuredText' |
7
|
|
|
|
8
|
|
|
import sys |
9
|
|
|
import time |
10
|
|
|
import socket |
11
|
|
|
|
12
|
|
|
import plumd |
13
|
|
|
import plumd.util |
14
|
|
|
import plumd.plugins |
15
|
|
|
|
16
|
|
|
|
17
|
|
|
def format_meta(meta, sep): |
|
|
|
|
18
|
|
|
"""Return a formatted metadata string from the Meta object. |
19
|
|
|
|
20
|
|
|
:param meta: :class:`plumd.Meta` object that describes a metrics metadata. |
21
|
|
|
:type meta: plumd.Meta |
22
|
|
|
:param sep: seperator character for metadata values (eg. =). |
23
|
|
|
:type sep: str |
24
|
|
|
:rtype: str |
25
|
|
|
""" |
26
|
|
|
# generate metadata string |
27
|
|
|
mmeta_dat = [] |
28
|
|
|
for mkey, mval in meta.items: |
29
|
|
|
# eg. nic, metric(name='nic', value='eth0') |
30
|
|
|
mkey = plumd.util.replace_chars(mkey) |
31
|
|
|
mval = mval[1] |
32
|
|
|
if isinstance(mval, str): |
33
|
|
|
mval = plumd.util.replace_chars(mval) |
34
|
|
|
elif isinstance(mval, bool): |
35
|
|
|
mval = "true" if mval else "false" |
36
|
|
|
mmeta_dat.append("{0}{1}{2}".format(mkey, sep, mval)) |
37
|
|
|
return ".".join(mmeta_dat) |
38
|
|
|
|
39
|
|
|
|
40
|
|
|
def get_metric_str(prefix, hmeta_str, mmeta_str, metric, rname): |
41
|
|
|
"""Return a string representation of a metric. |
42
|
|
|
|
43
|
|
|
:param prefix: the metric prefix to use (eg. servers) |
44
|
|
|
:type prefix: str |
45
|
|
|
:param hmeta_str: the host metadata string. |
46
|
|
|
:type hmeta_str: str |
47
|
|
|
:param mmeta_str: the metric metadata string. |
48
|
|
|
:type mmeta_str: str |
49
|
|
|
:param metric: the metric. |
50
|
|
|
:type metric: plumd.Integer or plumd.Float or plumd.String |
51
|
|
|
:param rname: the result set name (eg. cpu). |
52
|
|
|
:type rname: str |
53
|
|
|
:rtype: str |
54
|
|
|
""" |
55
|
|
|
args = [] |
56
|
|
|
# both host and metric metadata exists |
57
|
|
|
if hmeta_str: |
58
|
|
|
args = [ prefix, hmeta_str, rname, metric[0] ] |
59
|
|
|
else: |
60
|
|
|
args = [ prefix, rname, metric[0] ] |
61
|
|
|
if mmeta_str: |
62
|
|
|
args.append(mmeta_str) |
63
|
|
|
return ".".join(args) |
64
|
|
|
|
65
|
|
|
|
66
|
|
|
class GraphiteRender(plumd.Render): |
67
|
|
|
"""Renders metrics in graphite plain text or pickle format. Plaintext is: |
68
|
|
|
|
69
|
|
|
<prefix>.<hmeta=hval>.<measurement>.<metric>.<mmeta=mval>.value <value> <ts> |
70
|
|
|
|
71
|
|
|
where host and metric metadata keys are sorted before being added to the |
72
|
|
|
metric name and the seperator character is configurable. eg. |
73
|
|
|
|
74
|
|
|
servers.env=test.region=test.cpu.idle.value 0.0 1452240380 |
75
|
|
|
servers.env=test.region=test.disk.free.dev=sda1.value 1213234232 1452240380 |
76
|
|
|
|
77
|
|
|
or with a . seperator character: |
78
|
|
|
|
79
|
|
|
servers.env.test.region.test.cpu.idle.value 0.0 1452240380 |
80
|
|
|
servers.env.test.region.test.disk.free.dev.sda1.value 1213234232 1452240380 |
81
|
|
|
|
82
|
|
|
the pickle format uses tuples: |
83
|
|
|
|
84
|
|
|
(<metric>, (<timestamp>, <value>)) |
85
|
|
|
|
86
|
|
|
and sends pickled lists of tuples. |
87
|
|
|
|
88
|
|
|
raises: |
89
|
|
|
plumd.ConfigError if the configured format is unknown |
90
|
|
|
|
91
|
|
|
:param rconfig: A Conf object initialized with the plugin configuration. |
92
|
|
|
:type rconfig: plumd.conf.Conf |
93
|
|
|
:raises: plumd.ConfigError |
94
|
|
|
""" |
95
|
|
|
def __init__(self, rconfig): |
96
|
|
|
super(GraphiteRender, self).__init__(rconfig) |
97
|
|
|
# get the configured values from the plugin configuration |
98
|
|
|
self.prefix = rconfig.get("prefix") |
99
|
|
|
self.format = rconfig.get("format").lower() |
100
|
|
|
if self.format.lower() not in ['plain', 'pickle']: |
101
|
|
|
err = "unknown graphite format: {0}".format(self.format) |
102
|
|
|
raise plumd.ConfigError(err) |
103
|
|
|
self.seperator = rconfig.get("seperator") |
104
|
|
|
if self.seperator and isinstance(self.seperator, str): |
105
|
|
|
self.seperator = self.seperator[0] |
106
|
|
|
|
107
|
|
|
# generate the host meta data string - assumes static host metadata |
108
|
|
|
self.hmeta_str = format_meta(self.meta, self.seperator) |
109
|
|
|
|
110
|
|
|
|
111
|
|
|
def process(self, results): |
112
|
|
|
"""Process the metrics in the result set into the graphite plain text |
113
|
|
|
format. |
114
|
|
|
|
115
|
|
|
:param results: a generator that returns metrics in a standard format |
116
|
|
|
:type metrics: generator |
117
|
|
|
""" |
118
|
|
|
|
119
|
|
|
# render each metric in the graphite plaintext format |
120
|
|
|
# (self._time, robj.name, robj.meta, robj.metrics) |
121
|
|
|
for (ts, rname, rmeta, metrics) in results: |
122
|
|
|
# get the metadata for this metric |
123
|
|
|
mmeta_str = format_meta(rmeta, self.seperator) |
124
|
|
|
|
125
|
|
|
# generate a metric string for each metric |
126
|
|
|
for metric in metrics: |
127
|
|
|
# silently ignore strings |
128
|
|
|
if metric.__class__ == plumd.String: |
129
|
|
|
continue |
130
|
|
|
|
131
|
|
|
mval = metric[1] |
132
|
|
|
|
133
|
|
|
# for now at least, booleans are either really big or 0 |
134
|
|
|
if metric.__class__ == plumd.Boolean: |
135
|
|
|
mval = sys.maxint if mval else 0 |
136
|
|
|
|
137
|
|
|
# now combine all values |
138
|
|
|
# <pfx>.<hmet=val>.<meas>.<met>.<mmet=val> <value> <ts> |
139
|
|
|
met_str = get_metric_str(self.prefix, self.hmeta_str, |
140
|
|
|
mmeta_str, metric, rname) |
141
|
|
|
|
142
|
|
|
# add the rendered metric string to our deque of metrics |
143
|
|
|
if self.format == "plain": |
144
|
|
|
fmstr = "{0} {1} {2}".format(met_str, mval, ts.timestamp) |
145
|
|
|
self.metrics.append(fmstr) |
146
|
|
|
elif self.format == "pickle": |
147
|
|
|
self.metrics.append( ( met_str, (ts.timestamp, mval) ) ) |
148
|
|
|
|
149
|
|
|
|
150
|
|
|
class Graphite(plumd.plugins.Writer): |
151
|
|
|
"""Graphite sender.""" |
152
|
|
|
defaults = { |
153
|
|
|
'prefix': "servers", |
154
|
|
|
'host': '127.0.0.1', |
155
|
|
|
'port': 2003, |
156
|
|
|
'protocol': "tcp", |
157
|
|
|
'format': "plain", # plain or pickle |
158
|
|
|
'seperator': "=", # metadata seperator |
159
|
|
|
'batch': 64, # send batches of this size |
160
|
|
|
'retry': 30, # delay between connection attempts |
161
|
|
|
'retries': 3, # retry sends this many times |
162
|
|
|
'timeout': 5, # new connection timeout |
163
|
|
|
'maxqueue': 8192, # maximum number of metrics to queue |
164
|
|
|
'warnqueue': 1024 # print warning if queue size > this |
165
|
|
|
} |
166
|
|
|
|
167
|
|
|
def __init__(self, log, config): |
168
|
|
|
"""Graphite sender. WIP |
169
|
|
|
|
170
|
|
|
:param log: a structlog logger instance. |
171
|
|
|
:type log: structlog logger |
172
|
|
|
:param config: a plumd.config.Conf configuration helper instance. |
173
|
|
|
:type config: plumd.config.Conf |
174
|
|
|
""" |
175
|
|
|
super(Graphite, self).__init__(log, config) |
176
|
|
|
self.addr = (config.get('host'), config.get('port')) |
177
|
|
|
self.retry = config.get('retry') |
178
|
|
|
self.retries = config.get('retries') |
179
|
|
|
self.timeout = config.get('timeout') |
180
|
|
|
self.proto = config.get("protocol") |
181
|
|
|
self.batch = config.get('batch') |
182
|
|
|
self.warn = config.get('warnqueue') |
183
|
|
|
self.socket = None |
184
|
|
|
self.fd = None |
185
|
|
|
self.connected = False |
186
|
|
|
self.render = GraphiteRender(self.config) |
187
|
|
|
if self.proto == "tcp": |
188
|
|
|
self.connect = self.connect_tcp |
189
|
|
|
self.send = self.send_tcp |
190
|
|
|
elif self.proto == "udp": |
191
|
|
|
self.connect = self.connect_udp |
192
|
|
|
self.send = self.send_udp |
193
|
|
|
else: |
194
|
|
|
raise plumd.ConfigError("unkown protocol: {0}".format(self.proto)) |
195
|
|
|
|
196
|
|
|
|
197
|
|
|
def onstart(self): |
198
|
|
|
"""Connect to the graphite server on startup.""" |
199
|
|
|
self.connect(maxtries=1) # don't block the calling thread on startup! |
200
|
|
|
|
201
|
|
|
|
202
|
|
|
def onstop(self): |
203
|
|
|
"""Flush remaining metrics and disconnect socket on shutdown.""" |
204
|
|
|
# get the next full batch of metrics to send |
205
|
|
|
self.log.debug("onstop", action="flushing") |
206
|
|
|
qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, True) |
207
|
|
|
if qsize > self.warn: |
208
|
|
|
self.log.warn("onstop", qsize=qsize, warn=self.warn) |
209
|
|
|
while nmetrics > 0: |
210
|
|
|
self.log.debug("onstop", action="sending", metrics=bmetrics) |
211
|
|
|
self.send(bmetrics, maxtries=1) |
212
|
|
|
# and the next batch until there are no more full batches |
213
|
|
|
qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, True) |
214
|
|
|
self.log.debug("onstop", action="flushed") |
215
|
|
|
# flush and close fd, socket |
216
|
|
|
if self.fd is not None: |
217
|
|
|
try: |
218
|
|
|
self.fd.flush() |
219
|
|
|
self.fd.close() |
220
|
|
|
except Exception as e: |
221
|
|
|
self.log.error("onstop", action="close.fd", exception=e) |
222
|
|
|
if self.socket is not None: |
223
|
|
|
try: |
224
|
|
|
self.socket.close() |
225
|
|
|
except Exception as e: |
226
|
|
|
self.log.error("onstop", action="close.socket", exception=e) |
227
|
|
|
|
228
|
|
|
|
229
|
|
|
def push(self, rset): |
230
|
|
|
"""Render metrics onto our deque and send all full batches. |
231
|
|
|
|
232
|
|
|
:param metrics: The list of pre-formated metrics from our renderer. |
233
|
|
|
:type metrics: list |
234
|
|
|
""" |
235
|
|
|
# record this batch of metrics |
236
|
|
|
self.render.process(rset.results) |
237
|
|
|
|
238
|
|
|
# get the next full batch of metrics to send - next_batch pop_left's |
239
|
|
|
qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, False) |
240
|
|
|
if qsize > self.warn: |
241
|
|
|
self.log.warn("push", qsize=qsize) |
242
|
|
|
while nmetrics > 0: |
243
|
|
|
self.send(bmetrics) |
244
|
|
|
# and the next batch until there are no more full batches |
245
|
|
|
qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, False) |
246
|
|
|
|
247
|
|
|
|
248
|
|
|
def disconnect_tcp(self): |
249
|
|
|
"""Severe the graphite connection.""" |
250
|
|
|
if self.fd is not None: |
251
|
|
|
try: |
252
|
|
|
self.fd.close() |
253
|
|
|
except Exception as e: |
254
|
|
|
self.log.error("disconnect_tcp", action="close.fd", exception=e) |
255
|
|
|
if self.socket: |
256
|
|
|
try: |
257
|
|
|
self.socket.close() |
258
|
|
|
except Exception as e: |
259
|
|
|
self.log.error("disconnect_tcp", action="close.socket", |
260
|
|
|
exception=e) |
261
|
|
|
self.connected = False |
262
|
|
|
|
263
|
|
|
|
264
|
|
|
def connect_tcp(self, maxtries=None): |
265
|
|
|
"""Connect to graphite and maintain a connection. Returns True if |
266
|
|
|
the connection was made or False if it failed. |
267
|
|
|
|
268
|
|
|
:param maxtries: If defined attempt to connect at most this many times. |
269
|
|
|
:type maxtries: int |
270
|
|
|
:rtype: bool |
271
|
|
|
""" |
272
|
|
|
self.log.debug("connect_tcp", action="connecting") |
273
|
|
|
self.disconnect_tcp() |
274
|
|
|
self.connected = False |
275
|
|
|
if maxtries is None: |
276
|
|
|
maxtries = 1 |
277
|
|
|
while maxtries > 0: |
278
|
|
|
try: |
279
|
|
|
self.log.info("connect_tcp", action="connecting", |
280
|
|
|
address=self.addr) |
281
|
|
|
self.socket = socket.create_connection(self.addr, self.timeout) |
282
|
|
|
self.fd = self.socket.makefile("r+b", bufsize=0) |
283
|
|
|
self.connected = True |
284
|
|
|
self.log.debug("connect_tcp", action="connected") |
285
|
|
|
break |
286
|
|
|
except Exception as e: |
287
|
|
|
self.log.error("connect_tcp", exception=e) |
288
|
|
|
self.disconnect_tcp() |
289
|
|
|
time.sleep(self.retry) |
290
|
|
|
maxtries -= 1 |
291
|
|
|
return self.connected |
292
|
|
|
|
293
|
|
|
|
294
|
|
|
def send_tcp(self, metrics, maxtries=None): |
295
|
|
|
"""Send the metrics string to graphite. |
296
|
|
|
|
297
|
|
|
todo: add support for pickle format, currently assumes plain text |
298
|
|
|
|
299
|
|
|
:param mstr: the metrics string to send. |
300
|
|
|
:type mstr: str |
301
|
|
|
""" |
302
|
|
|
while not self.connected: |
303
|
|
|
self.connect(maxtries=maxtries) |
304
|
|
|
|
305
|
|
|
sent = False |
306
|
|
|
while not sent: |
307
|
|
|
try: |
308
|
|
|
self.fd.write("\n".join(metrics) + "\n") |
309
|
|
|
self.fd.flush() |
310
|
|
|
sent = True |
311
|
|
|
except Exception as e: |
312
|
|
|
self.log.error("send_tcp", exception=e) |
313
|
|
|
# reconnect |
314
|
|
|
self.connect() |
315
|
|
|
time.sleep(self.retry) |
316
|
|
|
|
317
|
|
|
|
318
|
|
|
def connect_udp(self, maxtries=None): |
319
|
|
|
"""Setup a UDP socket. |
320
|
|
|
|
321
|
|
|
:param maxtries: If defined attempt to connect at most this many times. |
322
|
|
|
:type maxtries: int |
323
|
|
|
:rtype: bool |
324
|
|
|
""" |
325
|
|
|
self.log.debug("connect_udp", action="socket") |
326
|
|
|
if self.socket: |
327
|
|
|
try: |
328
|
|
|
self.socket.close() |
329
|
|
|
except Exception as e: |
330
|
|
|
self.log.error("connect_udp", action="close.socket", |
331
|
|
|
exception=e) |
332
|
|
|
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
333
|
|
|
self.log.debug("connect_udp", action="created") |
334
|
|
|
return True |
335
|
|
|
|
336
|
|
|
|
337
|
|
|
def send_udp(self, metrics, maxtries=None): |
338
|
|
|
"""Send the metrics string to graphite. |
339
|
|
|
|
340
|
|
|
:param mstr: the metrics string to send. |
341
|
|
|
:type mstr: str |
342
|
|
|
""" |
343
|
|
|
mstr = "\n".join(metrics) + "\n" |
344
|
|
|
#str_len = len(mstr) |
345
|
|
|
sent = False |
346
|
|
|
while not sent: |
347
|
|
|
try: |
348
|
|
|
self.socket.sendto(mstr, self.addr) |
349
|
|
|
sent = True |
350
|
|
|
except Exception as e: |
351
|
|
|
self.log.debug("send_udp", exception=e) |
352
|
|
|
time.sleep(self.retry) |
353
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.