Completed
Push — master ( da3b73...3279ec )
by Kenny
01:17
created

GraphiteRenderPickle.process()   C

Complexity

Conditions 7

Size

Total Lines 39

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 1 Features 1
Metric Value
cc 7
c 1
b 1
f 1
dl 0
loc 39
rs 5.5
1
 # -*- coding: utf-8 -*-
2
3
__author__ = 'Kenny Freeman'
4
__email__ = '[email protected]'
5
__license__ = "ISCL"
6
__docformat__ = 'reStructuredText'
7
8
import sys
9
import time
10
import struct
11
import socket
12
13
import plumd
14
import plumd.util
15
import plumd.plugins
16
17
18
def format_meta(meta, sep):
19
    """Return a formatted metadata string from the Meta object.
20
21
    :param meta: :class:`plumd.Meta` object that describes a metrics metadata.
22
    :type meta: plumd.Meta
23
    :param sep: seperator character for metadata values (eg. =).
24
    :type sep: str
25
    :rtype: str
26
    """
27
    # generate metadata string
28
    mmeta_dat = []
29
    for mkey, mval in meta.items:
30
        # eg. nic, metric(name='nic', value='eth0')
31
        mkey = plumd.util.replace_chars(mkey)
32
        mval = mval[1]
33
        if isinstance(mval, str):
34
            mval = plumd.util.replace_chars(mval)
35
        elif isinstance(mval, bool):
36
            mval = "true" if mval else "false"
37
        mmeta_dat.append("{0}{1}{2}".format(mkey, sep, mval))
38
    return ".".join(mmeta_dat)
39
40
41
def get_metric_str(prefix, hmeta_str, mmeta_str, metric, rname):
42
    """Return a string representation of a metric.
43
44
    :param prefix: the metric prefix to use (eg. servers)
45
    :type prefix: str
46
    :param hmeta_str: the host metadata string.
47
    :type hmeta_str: str
48
    :param mmeta_str: the metric metadata string.
49
    :type mmeta_str: str
50
    :param metric: the metric.
51
    :type metric: plumd.Integer or plumd.Float or plumd.String
52
    :param rname: the result set name (eg. cpu).
53
    :type rname: str
54
    :rtype: str
55
    """
56
    args = []
57
    # both host and metric metadata exists
58
    if hmeta_str:
59
        args = [ prefix, hmeta_str, rname, metric[0] ]
60
    else:
61
        args = [ prefix, rname, metric[0] ]
62
    if mmeta_str:
63
        args.append(mmeta_str)
64
    return ".".join(args)
65
66
67
class GraphiteRenderPlain(plumd.Render):
68
    """Renders metrics in graphite plain text format:
69
70
    <prefix>.<hmeta=hval>.<measurement>.<metric>.<mmeta=mval>.value <value> <ts>
71
72
    where host and metric metadata keys are sorted before being added to the
73
    metric name and the seperator character is configurable. eg.
74
75
    servers.env=test.region=test.cpu.idle.value 0.0 1452240380
76
    servers.env=test.region=test.disk.free.dev=sda1.value 1213234232 1452240380
77
78
    or with a . seperator character:
79
80
    servers.env.test.region.test.cpu.idle.value 0.0 1452240380
81
    servers.env.test.region.test.disk.free.dev.sda1.value 1213234232 1452240380
82
83
    :param rconfig: A Conf object initialized with the plugin configuration.
84
    :type rconfig: plumd.conf.Conf
85
    :raises: plumd.ConfigError
86
    """
87
    def __init__(self, rconfig):
88
        super(GraphiteRenderPlain, self).__init__(rconfig)
89
        # get the configured values from the plugin configuration
90
        self.prefix = rconfig.get("prefix")
91
        # record rendered metrics here, when bytes > max_bytes put into deque
92
        self.buff = ""
93
        self.buff_bytes = 0
94
        self.seperator = rconfig.get("seperator")
95
        if self.seperator and isinstance(self.seperator, str):
96
            self.seperator = self.seperator[0]
97
98
        # generate the host meta data string - assumes static host metadata
99
        self.hmeta_str = format_meta(self.meta, self.seperator)
100
101
102
    def process(self, results):
103
        """Process the metrics in the result set into the graphite plain text
104
        format.
105
106
        :param results: a generator that returns metrics in a standard format
107
        :type results: generator
108
        """
109
        # render each metric in the graphite plaintext format
110
        for (ts, rname, rmeta, metrics) in results:
111
            # get the metadata for this metric
112
            mmeta_str = format_meta(rmeta, self.seperator)
113
114
            # generate a metric string for each metric
115
            for metric in metrics:
116
                # silently ignore strings
117
                if metric.__class__ == plumd.String:
118
                    continue
119
120
                # booleans are either really big or 0
121
                mval = metric[1]
122
                if metric.__class__ == plumd.Boolean:
123
                    mval = sys.maxint if mval else 0
124
125
                # get the full metric string
126
                met_str = get_metric_str(self.prefix, self.hmeta_str,
127
                                         mmeta_str, metric, rname)
128
129
                # now combine the values
130
                fmstr = "{0} {1} {2}\n".format(met_str, mval, int(ts))
131
                fmstr_len = len(fmstr)
132
133
                # can we append to the existing buffer?
134
                if len(self.buff) + fmstr_len < self.max_bytes:
135
                    self.buff += fmstr
136
                    self.buff_bytes += fmstr_len
137
                    continue
138
139
                # or do we need to queue and start a new buffer?
140
                self.metrics.append(self.buff)
141
                self.buff = fmstr
142
                self.buff_bytes = fmstr_len
143
144
145
class GraphiteRenderPickle(plumd.Render):
146
    """Renders metrics in graphite pickle format:
147
148
    [ (<metric>, (<timestamp>, <value>)), ... ]
149
150
    where host and metric metadata keys are sorted before being added to the
151
    metric name and the seperator character is configurable - aside from
152
    rendering metrics in the pickle format the functionality is the same as the
153
    GraphiteRenderPlain class.
154
155
    :param rconfig: A Conf object initialized with the plugin configuration.
156
    :type rconfig: plumd.conf.Conf
157
    :raises: plumd.ConfigError
158
    """
159
    def __init__(self, rconfig):
160
        super(GraphiteRenderPickle, self).__init__(rconfig)
161
        # get the configured values from the plugin configuration
162
        self.prefix = rconfig.get("prefix")
163
        # record metric tuples here, pickle and queue once max_items reached
164
        self.buff = []
165
        self.seperator = rconfig.get("seperator")
166
        if self.seperator and isinstance(self.seperator, str):
167
            self.seperator = self.seperator[0]
168
169
        # generate the host meta data string - assumes static host metadata
170
        self.hmeta_str = format_meta(self.meta, self.seperator)
171
172
173
    def process(self, results):
174
        """Process the metrics in the result set into the graphite pickle
175
        format.
176
177
        :param results: a generator that returns metrics in a standard format
178
        :type results: generator
179
        """
180
        # render each metric in the graphite plaintext format
181
        for (ts, rname, rmeta, metrics) in results:
182
            # get the metadata for this metric
183
            mmeta_str = format_meta(rmeta, self.seperator)
184
185
            # generate a metric string for each metric
186
            for metric in metrics:
187
                # silently ignore strings
188
                if metric.__class__ == plumd.String:
189
                    continue
190
191
                # booleans are either really big or 0
192
                mval = metric[1]
193
                if metric.__class__ == plumd.Boolean:
194
                    mval = sys.maxint if mval else 0
195
196
                # get the full metric string
197
                met_str = get_metric_str(self.prefix, self.hmeta_str,
198
                                         mmeta_str, metric, rname)
199
200
                # now combine the values
201
                fmstr = "{0} {1} {2}\n".format(met_str, mval, int(ts))
202
                pmetric = ( fmstr, (ts.timestamp, mval) )
203
                if len(self.buff) < self.max_items:
204
                    self.buff.append(pmetric)
205
                    continue
206
207
                # or do we need to queue and start a new buffer?
208
                pvals = pickle.dumps(self.buff, protocol=2)
209
                pmetrics = struct.pack("!L", len(pvals)) + pvals
210
                self.metrics.append(pmetrics)
211
                self.buff = []
212
213
214
class Graphite(plumd.plugins.Writer):
215
    """Graphite sender."""
216
    ## todo: move things to render, writers, remove from here
217
    ## also todo: print warning for queue size in render not in Graphite
218
    defaults = {
219
        'prefix':           "servers",
220
        'host':             '127.0.0.1',
221
        'port':             2003,
222
        'protocol':         "tcp",
223
        'format':           "plain", # plain or pickle
224
        'seperator':        "=",     # metadata seperator
225
        'batch':            64,      # send batches of this size
226
        'retry':            30,      # delay between connection attempts
227
        'retries':          3,       # retry sends this many times
228
        'timeout':          5,       # new connection timeout
229
        'maxqueue':         8192,    # maximum number of metrics to queue
230
        'warnqueue':        1024     # print warning if queue size > this
231
    }
232
233
    def __init__(self, log, config):
234
        """Graphite sender. WIP
235
236
        :param log: A logger
237
        :type log: logging.RootLogger
238
        :param config: a plumd.config.Conf configuration helper instance.
239
        :type config: plumd.config.Conf
240
        """
241
        super(Graphite, self).__init__(log, config)
242
        config.defaults(Graphite.defaults)
243
        self.render = None
244
        fmt = config.get('format')
245
        fmts = {
246
            'plain': GraphiteRenderPlain,
247
            'pickle': GraphiteRenderPickle
248
        }
249
        if fmt in fmts:
250
            self.render = fmts[fmt](self.config)
251
        else:
252
            raise plumd.ConfigError("unkown format: {0}".format(fmt))
253
        proto = config.get('protocol')
254
        host = config.get('host')
255
        port = config.get('port')
256
        self.writer = None
257
        writers = {
258
            'tcp': GraphiteWriteTCP,
259
            'udp': GraphiteWriteUDP
260
        }
261
        if proto in writers:
262
            self.writer = writers[proto](self.log, config)
263
        else:
264
            raise plumd.ConfigError("unkown protocol: {0}".format(proto))
265
266
267
    def onstop(self):
268
        """Flush remaining metrics and disconnect socket on shutdown."""
269
        # get the next full batch of metrics to send
270
        for mbuff in self.render:
271
            self.writer.write_nonblock(mbuff)
272
273
        # flush and close fd, socket
274
        self.writer.disconnect()
275
276
277
    def push(self, rset):
278
        """Render metrics onto our deque and send all full batches.
279
280
        :param metrics: The list of pre-formated metrics from our renderer.
281
        :type metrics: list
282
        """
283
        # record this batch of metrics
284
        if rset:
285
            self.render.process(rset.results)
286
287
        # get the next full batch of metrics to send - next_batch pop_left's
288
        for mbuff in self.render:
289
            self.writer.write(mbuff)
290
291
292
class GraphiteWriteTCP(object):
293
    """Graphite TCP writer."""
294
295
    def __init__(self, log, config):
296
        """Graphite TCP writer."""
297
        self.log = log
298
        self.retries = config.get('retries')
299
        self.timeout = config.get('timeout')
300
        self.retry = config.get('retry')
301
        self.addr = (config.get('host'), config.get('port'))
302
        self.socket = None
303
        self.fd = None
304
        self.connected = False
305
306
307
    def disconnect(self):
308
        """Severe the graphite connection."""
309
        if self.fd:
310
            try:
311
                self.fd.close()
312
            except Exception as e:
313
                pass
314
        if self.socket:
315
            try:
316
                self.socket.close()
317
            except Exception as e:
318
                pass
319
        self.connected = False
320
321
322
    def connect(self):
323
        """Connect to graphite and maintain a connection. Returns True if
324
        the connection was made or False if it failed.
325
326
        :param blocking: Wether to keep trying on failures or not.
327
        :type blocking: bool
328
        :rtype: bool
329
        """
330
        self.disconnect()
331
        # attempt to connect up to max number of attempts
332
        maxtries = self.retries
333
        while maxtries > 0:
334
            try:
335
                self.socket = socket.create_connection(self.addr, self.timeout)
336
                self.fd = self.socket.makefile("r+b", bufsize=0)
337
                self.connected = True
338
                msg = "graphite: connected: {0}"
339
                self.log.info(msg.format(self.addr))
340
                break
341
            except Exception as e:
342
                msg = "graphite: {0}: connect: excception: {1}"
343
                self.log.error(msg.format(self.addr, e))
344
                self.disconnect()
345
                time.sleep(self.retry)
346
            maxtries -= 1
347
        return self.connected
348
349
350
    def write_nonblock(self, metrics):
351
        """Send the metrics string to graphite.
352
353
        todo: add support for pickle format, currently assumes plain text
354
355
        :param metrics: the metrics string to send.
356
        :type metrics: str
357
        """
358
        retries = self.retries
359
        while ( not self.connected ) and ( retries > 1 ):
360
            self.connect()
361
            self.retries -= 1
362
363
        if not self.connected:
364
            self.log.error("graphite: max connect retries: dropping metrics")
365
            return
366
367
        sent = False
368
        while not sent:
369
            try:
370
                self.fd.write(metrics)
371
                self.fd.flush()
372
                sent = True
373
            except Exception as e:
374
                msg = "graphite: {0}: write: dropping metrics: exception: {1}"
375
                self.log.error(msg.format(self.addr, e))
376
                # reconnect
377
378
379
    def write(self, metrics):
380
        """Send the metrics string to graphite.
381
382
        todo: add support for pickle format, currently assumes plain text
383
384
        :param metrics: the metrics string to send.
385
        :type metrics: str
386
        """
387
        while not self.connected:
388
            self.connect()
389
390
        sent = False
391
        while not sent:
392
            try:
393
                self.fd.write(metrics)
394
                self.fd.flush()
395
                sent = True
396
            except Exception as e:
397
                msg = "graphite: {0}: write: exception: {1}"
398
                self.log.error(msg.format(self.addr, e))
399
                # reconnect
400
                self.connect()
401
                time.sleep(self.retry)
402
403
404
class GraphiteWriteUDP(object):
405
    """Graphite UDP writer."""
406
407
    def __init__(self, log, config):
408
        """Graphite UDP writer."""
409
        self.log = log
410
        self.addr = (config.get('host'), config.get('port'))
411
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
412
413
414
    def write_blocking(self, metrics):
415
        """Send the metrics to graphite and return number of bytes sent.
416
417
        :param metrics: the metrics string to send.
418
        :type metrics: str
419
        :rtype: int
420
        """
421
        return self.socket.sendto(metrics, self.addr)
422
423
424
    def write(self, metrics):
425
        """Send the metrics to graphite and return number of bytes sent.
426
427
        :param metrics: the metrics string to send.
428
        :type metrics: str
429
        :rtype: int
430
        """
431
        return self.socket.sendto(metrics, self.addr)
432