Completed
Push — master ( 8451c1...a45803 )
by Kenny
01:18
created

format_meta()   D

Complexity

Conditions 8

Size

Total Lines 37

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 8
c 1
b 0
f 1
dl 0
loc 37
rs 4
1
# -*- coding: utf-8 -*-
2
3
__author__ = 'Kenny Freeman'
4
__email__ = '[email protected]'
5
__license__ = "ISCL"
6
__docformat__ = 'reStructuredText'
7
8
import sys
9
10
PY3 = sys.version_info > (3,)
11
12
import time
13
import socket
14
15
import plumd
16
import plumd.plugins
17
18
19
def format_meta(meta, sep, meta_key, filter_chars):
20
    """Return a formatted metadata string from the Meta object.
21
22
    :param meta: :class:`plumd.Meta` object that describes a metrics metadata.
23
    :type meta: plumd.Meta
24
    :param sep: seperator character for metadata values (eg. =).
25
    :type sep: str
26
    :param meta_key: flag to include meta data key values or not
27
    :type meta_key: bool
28
    :param filter_chars: string value consisting of the characters to filter out
29
    :type filter_chars` : str
30
    :rtype: str
31
    """
32
    # generate metadata string
33
    mmeta_dat = []
34
    for mkey, mval in meta.items:
35
        if PY3:
36
            mkey = mkey.translate(dict.fromkeys(map(ord, filter_chars), None))
37
        else:
38
            mkey = mkey.translate(None, filter_chars)
39
        # mval is one of Int, Float, String, etc metric types
40
        # and so mval[1] is the actual value where mval[0] is the name of it
41
        mval = mval[1]
42
        if isinstance(mval, str):
43
            if PY3:
44
                mval = mval.translate(dict.fromkeys(map(ord, filter_chars), None))
45
            else:
46
                mval = mval.translate(None, filter_chars)
47
        elif isinstance(mval, bool):
48
            mval = "true" if mval else "false"
49
        # include the metadata key in the metric? eg. .region_uswest.
50
        if meta_key:
51
            mmeta_dat.append("{0}{1}{2}".format(mkey, sep, mval))
52
        # put the metadata value in the metric only eg. .uswest.
53
        else:
54
            mmeta_dat.append(mval)
55
    return ".".join(mmeta_dat)
56
57
58
59
60
class GraphiteRenderPlain(plumd.Render):
61
    """Renders metrics in graphite plain text format:
62
63
    <prefix>.<hmeta=hval>.<measurement>.<metric>.<mmeta=mval>.value <value> <ts>
64
65
    where host and metric metadata keys are sorted before being added to the
66
    metric name and the seperator character is configurable. eg.
67
68
    servers.env=test.region=test.cpu.idle.value 0.0 1452240380
69
    servers.env=test.region=test.disk.free.dev=sda1.value 1213234232 1452240380
70
71
    or with a . seperator character:
72
73
    servers.env.test.region.test.cpu.idle.value 0.0 1452240380
74
    servers.env.test.region.test.disk.free.dev.sda1.value 1213234232 1452240380
75
76
    :param rconfig: A Conf object initialized with the plugin configuration.
77
    :type rconfig: plumd.conf.Conf
78
    :raises: plumd.ConfigError
79
    """
80
    def __init__(self, rconfig):
81
        super(GraphiteRenderPlain, self).__init__(rconfig)
82
        # get the configured values from the plugin configuration
83
        self.prefix = rconfig.get("prefix")
84
        # record rendered metrics here, when bytes > max_bytes put into deque
85
        self.buff = list()
86
        self.buff_bytes = 0
87
        self.seperator = rconfig.get("seperator")
88
        self.meta_key = rconfig.get("meta_key")
89
        # need to build a list of characters to remove from metric/meta strings
90
        # this method is significantly faster/easier on the cpu than re.sub
91
        # build filter_chars with eg. chars = string.letters + string.digits + "_-"
92
        keep_chars = rconfig.get("filter_chars")
93
        self.filter_chars = "".join(char for char in map(chr, range(256)) \
94
                           if not char in keep_chars)
95
        if self.seperator and isinstance(self.seperator, str):
96
            self.seperator = self.seperator[0]
97
98
        # generate the host meta data string - assumes static host metadata
99
        self.hmeta_str = format_meta(self.meta, self.seperator, self.meta_key,
100
                                     self.filter_chars)
101
102
103
    def flush(self):
104
        """Return and clear the partial metric buffer, also clear queue."""
105
        metrics = self.buff
106
        self.buff = []
107
        self.buff_bytes = 0
108
        self.metrics.clear()
109
        return "".join(metrics)
110
111
112
    def process(self, results):
113
        """Process the metrics in the result set into the graphite plain text
114
        format.
115
116
        :param results: a generator that returns metrics in a standard format
117
        :type results: generator
118
        """
119
        hmeta_str = self.hmeta_str          # host metadata string, if any
120
        prefix = self.prefix                # metric prefix string
121
        meta_key = self.meta_key            # bool: include metadata in metric?
122
        sep = self.seperator                # meta data seperator character
123
        filter_chars = self.filter_chars    # remove these chars from strings
124
125
        # render each metric in the graphite plaintext format
126
        for (ts, rname, rmeta, metrics) in results:
127
            # get the metadata for this metric, if any
128
            mmeta_dat = []
129
            for mkey, mval in rmeta.items:
130
                mkey = mkey.translate(None, filter_chars)
131
                mval = mval[1] # mval[0] is the name, mval[1] is the value
132
                if isinstance(mval, str):
133
                    mval = mval.translate(None, filter_chars)
134
                elif isinstance(mval, bool):
135
                    mval = "true" if mval else "false"
136
                # include the metadata key in the metric? eg. .region_uswest.
137
                if meta_key:
138
                    mmeta_dat.append("{0}{1}{2}".format(mkey, sep, mval))
139
                # put the metadata value in the metric only eg. .uswest.
140
                else:
141
                    mmeta_dat.append(mval)
142
143
            # generate a metric string for each metric
144
            for metric in metrics:
145
                mclass = metric.__class__
146
                # silently ignore strings
147
                if mclass == plumd.String:
148
                    continue
149
                # booleans are either really big or 0
150
                mval = metric[1]
151
                if mclass == plumd.Boolean:
152
                    mval = sys.maxint if mval else 0
153
154
                # get the full metric string
155
                if hmeta_str:
156
                    args = [ prefix, hmeta_str, rname, metric[0] ]
157
                else:
158
                    args = [ prefix, rname, metric[0] ]
159
                # now combine the values
160
                if mmeta_dat:
161
                    fmstr = "{0}.{1} {2:.2f} {3}\n".format(".".join(args),
162
                                                       ".".join(mmeta_dat),
163
                                                       int(mval), int(ts))
164
                else:
165
                    fmstr = "{0} {1:.2f} {2}\n".format(".".join(args), mval,
166
                                                    int(ts))
167
                fmstr_len = len(fmstr)
168
169
                # can we append to the existing buffer?
170
                if len(self.buff) + fmstr_len < self.max_bytes:
171
                    self.buff.append(fmstr)
172
                    self.buff_bytes += fmstr_len
173
                    continue
174
175
                # or do we need to queue and start a new buffer?
176
                self.metrics.append(self.buff)
177
                self.buff = list(fmstr)
178
                self.buff_bytes = fmstr_len
179
180
181
class Graphite(plumd.plugins.Writer):
182
    """Graphite sender."""
183
    ## todo: move things to render, writers, remove from here
184
    ## also todo: print warning for queue size in render not in Graphite
185
    defaults = {
186
        'prefix':           "servers",
187
        'host':             '127.0.0.1',
188
        'port':             2003,
189
        'protocol':         "tcp",
190
        'format':           "plain", # plain or pickle
191
        'seperator':        "=",     # metadata seperator
192
        # any characters not in this value are removed from metrics/meta data
193
        'filter_chars':     'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-',
194
        'meta_key':         False,   # include metadata keys in metrics
195
        'max_bytes':        1400,    # send metrics in chunks of this many bytes
196
        'retry_time':       30,      # delay between connection attempts
197
        'timeout':          5,       # timeout for socket operations
198
        'maxqueue':         8192,    # maximum number of metrics to queue
199
        'warnqueue':        1024,    # print warning if queue size > this
200
        'tcp_ka_idle':      60,      # send keepalives if idle (seconds)
201
        'tcp_ka_intvl':     5,       # send unacked keepalives (seconds)
202
        'tcp_ka_count':     5        # up to this many unacked keepalives
203
    }
204
205
    def __init__(self, log, config):
206
        """Graphite sender.
207
208
        :param log: A logger
209
        :type log: logging.RootLogger
210
        :param config: a plumd.config.Conf configuration helper instance.
211
        :type config: plumd.config.Conf
212
        """
213
        super(Graphite, self).__init__(log, config)
214
        config.defaults(Graphite.defaults)
215
        self.render = None
216
        fmt = config.get('format')
217
        fmts = {
218
            'plain': GraphiteRenderPlain
219
        }
220
        if fmt in fmts:
221
            self.render = fmts[fmt](self.config)
222
        else:
223
            raise plumd.ConfigError("unkown format: {0}".format(fmt))
224
        proto = config.get('protocol')
225
        self.writer = None
226
        writers = {
227
            'tcp': GraphiteWriteTCP,
228
            'udp': GraphiteWriteUDP
229
        }
230
        if proto in writers:
231
            self.writer = writers[proto](self.log, config)
232
        else:
233
            raise plumd.ConfigError("unkown protocol: {0}".format(proto))
234
235
236
    def onstop(self):
237
        """Flush remaining metrics and disconnect socket on shutdown."""
238
        msg = "graphite: onstop: {0}: flushing metrics"
239
        self.log.info(msg.format(self.config.get('name')))
240
        # if we're not connected then drop all metrics and return
241
        if not self.writer.socket:
242
            msg = "graphite: onstop: not connected, dropping all queued metrics"
243
            self.log.error(msg)
244
            self.render.flush()
245
            return
246
247
        # ensure we don't take too long trying to flush metrics on shutdown
248
        start = time.time()
249
        for mbuff in self.render:
250
            self.writer.write("".join(mbuff))
251
            if time.time() - start > 1.0:
252
                self.render.flush()
253
                msg = "graphite: onstop: {0}: unable to send metrics, dropping"
254
                self.log.error(msg.format(self.config.get('name')))
255
                self.writer.disconnect()
256
                return
257
        mbuff = self.render.flush()
258
        if mbuff:
259
            self.writer.write("".join(mbuff))
260
        self.writer.disconnect()
261
        msg = "graphite: onstop: {0}: done"
262
        self.log.info(msg.format(self.config.get('name')))
263
264
265
    def push(self, rset):
266
        """Render metrics onto our deque and send all full batches.
267
268
        :param metrics: The list of pre-formated metrics from our renderer.
269
        :type metrics: list
270
        """
271
        # record this batch of metrics
272
        if rset:
273
            self.render.process(rset.results)
274
275
        # get the next full batch of metrics to send - next_batch pop_left's
276
        resend = None
277
        for mbuff in self.render:
278
            metrics = self.writer.write("".join(mbuff))
279
            if metrics:
280
                resend = metrics
281
                # do not continue if we're unable to send
282
                # this can cause significant delays on shutdown
283
                break
284
285
        # add metrics back onto queue if any
286
        if resend:
287
            self.log.debug("graphite: write: requeuing metrics")
288
            self.render.metrics.append(resend)
289
290
291
class GraphiteWriteTCP(object):
292
    """Graphite TCP writer."""
293
294
    def __init__(self, log, config):
295
        """Graphite TCP writer."""
296
        self.log = log
297
        self.addr = (config.get('host'), config.get('port'))
298
        self.socket = None
299
        self.timeout = config.get('timeout')
300
        self.retry_time = config.get('retry_time')
301
        self.tcp_ka_idle = config.get('tcp_ka_idle')
302
        self.tcp_ka_intvl = config.get('tcp_ka_intvl')
303
        self.tcp_ka_count = config.get('tcp_ka_count')
304
305
306
    def connect(self):
307
        """Connect to graphite and maintain a connection. Returns True if
308
        the connection was made or False if it failed.
309
310
        :rtype: bool
311
        """
312
        # disconnect if we're already connected
313
        if self.socket:
314
            self.disconnect()
315
        try:
316
            # create the socket
317
            self.socket = socket.socket(socket.AF_INET,
318
                                        socket.SOCK_STREAM)
319
            # enable tcp keepalives with more aggresive values than default
320
            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
321
            self.socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE,
322
                                   self.tcp_ka_idle)
323
            self.socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL,
324
                                   self.tcp_ka_intvl)
325
            self.socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT,
326
                                   self.tcp_ka_count)
327
            # set timeout for socket operations
328
            self.socket.settimeout(self.timeout)
329
            # connect
330
            self.socket.connect(self.addr)
331
            # log and return
332
            msg = "graphite: connected: {0}"
333
            self.log.info(msg.format(self.addr))
334
            return True
335
        except Exception as e:
336
            # log exception, ensure cleanup is done (disconnect)
337
            msg = "graphite: {0}: connect: excception: {1}"
338
            self.log.error(msg.format(self.addr, e))
339
            # pause before reconnecting
340
            time.sleep(self.retry_time)
341
            self.disconnect()
342
        return False
343
344
345
    def disconnect(self):
346
        """Severe the graphite connection."""
347
        if self.socket:
348
            try:
349
                self.socket.close()
350
                self.socket = None
351
            except Exception as e:
352
                msg = "graphite: dicsonnect: exception {0}".format(e)
353
                self.log.error(msg)
354
                self.socket = None
355
356
357
    def write(self, metrics):
358
        """Send the metrics string to graphite.
359
360
        :param metrics: the metrics string to send.
361
        :type metrics: str
362
        :rtype: bool
363
        """
364
        if not self.socket:
365
            self.connect()
366
        if not self.socket:
367
            msg = "graphite: write: connection failed"
368
            self.log.error(msg)
369
            return metrics # resend them
370
        try:
371
            if PY3:
372
                self.socket.sendall(bytes(metrics, 'utf8'))
373
            else:
374
                self.socket.sendall(metrics)
375
        except Exception as e:
376
            msg = "graphite: {0}: write: exception: {1}"
377
            self.log.error(msg.format(self.addr, e))
378
            self.connect()
379
            return metrics # resend them
380
381
382
class GraphiteWriteUDP(object):
383
    """Graphite UDP writer."""
384
385
    def __init__(self, log, config):
386
        """Graphite UDP writer."""
387
        self.log = log
388
        self.addr = (config.get('host'), config.get('port'))
389
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
390
391
392
    def write_blocking(self, metrics):
393
        """Send the metrics to graphite and return number of bytes sent.
394
395
        :param metrics: the metrics string to send.
396
        :type metrics: str
397
        :rtype: int
398
        """
399
        if PY3:
400
            return self.socket.sendto(bytes(metrics, 'utf8'), self.addr)
401
        else:
402
            return self.socket.sendto(metrics, self.addr)
403
404
    def write(self, metrics):
405
        """Send the metrics to graphite and return number of bytes sent.
406
407
        :param metrics: the metrics string to send.
408
        :type metrics: str
409
        :rtype: int
410
        """
411
        if PY3:
412
            return self.socket.sendto(bytes(metrics, 'utf8'), self.addr)
413
        else:
414
            return self.socket.sendto(metrics, self.addr)
415