GraphiteRenderPlain.__init__()   B
last analyzed

Complexity

Conditions 5

Size

Total Lines 22

Duplication

Lines 0
Ratio 0 %

Importance

Changes 4
Bugs 1 Features 2
Metric Value
c 4
b 1
f 2
dl 0
loc 22
rs 8.3411
cc 5
1
# -*- coding: utf-8 -*-
0 ignored issues
show
Bug introduced by
There seems to be a cyclic import (plumd -> plumd.util).

Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.

Loading history...
2
"""Built-in Plumd Graphite Writer supporting plain text tcp/udp."""
3
import sys
4
import time
5
import socket
6
7
import plumd
8
9
__author__ = 'Kenny Freeman'
10
__email__ = '[email protected]'
11
__license__ = "ISCL"
12
__docformat__ = 'reStructuredText'
13
14
15
PY3 = sys.version_info > (3,)
16
17
18
def format_meta(meta, sep, meta_key, filter_chars):
19
    """Return a formatted metadata string from the Meta object.
20
21
    :param meta: :class:`plumd.Meta` object that describes a metrics metadata.
22
    :type meta: plumd.Meta
23
    :param sep: seperator character for metadata values (eg. =).
24
    :type sep: str
25
    :param meta_key: flag to include meta data key values or not
26
    :type meta_key: bool
27
    :param filter_chars: string value consisting of the characters to filter
28
    :type filter_chars` : str
29
    :rtype: str
30
    """
31
    # generate metadata string
32
    mmeta_dat = []
33
    for mkey, mval in meta.items:
34
        if PY3:
35
            mkey = mkey.translate(dict.fromkeys(map(ord, filter_chars), None))
36
        else:
37
            mkey = mkey.translate(None, filter_chars)
38
        # mval is one of Int, Float, String, etc metric types
39
        # and so mval[1] is the actual value where mval[0] is the name of it
40
        mval = mval[1]
41
        if isinstance(mval, str):
42
            if PY3:
43
                mval = mval.translate(dict.fromkeys(
44
                    map(ord, filter_chars), None))
45
            else:
46
                mval = mval.translate(None, filter_chars)
47
        elif isinstance(mval, bool):
48
            mval = "true" if mval else "false"
49
        # include the metadata key in the metric? eg. .region_uswest.
50
        if meta_key:
51
            mmeta_dat.append("{0}{1}{2}".format(mkey, sep, mval))
52
        # put the metadata value in the metric only eg. .uswest.
53
        else:
54
            mmeta_dat.append(mval)
55
    return ".".join(mmeta_dat)
56
57
58
class GraphiteRenderPlain(plumd.Render):
59
    """Renders metrics in graphite plain text format:
60
61
    <prefix>.<hmeta=hval>.<measurement>.<metric>.<mmeta=mval>.value <value> <ts>
62
63
    where host and metric metadata keys are sorted before being added to the
64
    metric name and the seperator character is configurable. eg.
65
66
    servers.env=test.region=test.cpu.idle.value 0.0 1452240380
67
    servers.env=test.region=test.disk.free.dev=sda1.value 1213234232 1452240380
68
69
    or with a . seperator character:
70
71
    servers.env.test.region.test.cpu.idle.value 0.0 1452240380
72
    servers.env.test.region.test.disk.free.dev.sda1.value 1213234232 1452240380
73
74
    :param rconfig: A Conf object initialized with the plugin configuration.
75
    :type rconfig: plumd.conf.Conf
76
    :raises: plumd.ConfigError
77
    """
78
79
    def __init__(self, rconfig):
80
        super(GraphiteRenderPlain, self).__init__(rconfig)
81
        # get the configured values from the plugin configuration
82
        self.prefix = rconfig.get("prefix")
83
        # record rendered metrics here, when bytes > max_bytes put into deque
84
        self.buff = list()
85
        self.buff_bytes = 0
86
        self.seperator = rconfig.get("seperator")
87
        self.meta_key = rconfig.get("meta_key")
88
        # need to build a list of characters to remove from metric/meta strings
89
        # this method is significantly faster/easier on the cpu than re.sub
90
        # build filter_chars with eg. chars = string.letters + string.digits +
91
        # "_-"
92
        keep_chars = rconfig.get("filter_chars")
93
        self.filter_chars = "".join(char for char in map(chr, range(256))
94
                                    if char not in keep_chars)
95
        if self.seperator and isinstance(self.seperator, str):
96
            self.seperator = self.seperator[0]
97
98
        # generate the host meta data string - assumes static host metadata
99
        self.hmeta_str = format_meta(self.meta, self.seperator, self.meta_key,
100
                                     self.filter_chars)
101
102
    def flush(self):
103
        """Return and clear the partial metric buffer, also clear queue."""
104
        metrics = self.buff
105
        self.buff = []
106
        self.buff_bytes = 0
107
        self.metrics.clear()
108
        return "".join(metrics)
109
110
    def process(self, results):
111
        """Process the metrics in the result set into the graphite plain text
112
        format.
113
114
        :param results: a generator that returns metrics in a standard format
115
        :type results: generator
116
        """
117
        hmeta_str = self.hmeta_str          # host metadata string, if any
118
        prefix = self.prefix                # metric prefix string
119
        meta_key = self.meta_key            # bool: include metadata in metric?
120
        sep = self.seperator                # meta data seperator character
121
        filter_chars = self.filter_chars    # remove these chars from strings
122
123
        # render each metric in the graphite plaintext format
124
        for (ts, rname, rmeta, metrics) in results:
125
            # get the metadata for this metric, if any
126
            mmeta_dat = []
127
            for mkey, mval in rmeta.items:
128
                mkey = mkey.translate(None, filter_chars)
129
                mval = mval[1]  # mval[0] is the name, mval[1] is the value
130
                if isinstance(mval, str):
131
                    mval = mval.translate(None, filter_chars)
132
                elif isinstance(mval, bool):
133
                    mval = "true" if mval else "false"
134
                # include the metadata key in the metric? eg. .region_uswest.
135
                if meta_key:
136
                    mmeta_dat.append("{0}{1}{2}".format(mkey, sep, mval))
137
                # put the metadata value in the metric only eg. .uswest.
138
                else:
139
                    mmeta_dat.append(mval)
140
141
            # generate a metric string for each metric
142
            for metric in metrics:
143
                mclass = metric.__class__
144
                # silently ignore strings
145
                if mclass == plumd.String:
146
                    continue
147
                # booleans are either really big or 0
148
                mval = metric[1]
149
                if mclass == plumd.Boolean:
150
                    mval = sys.maxint if mval else 0
0 ignored issues
show
Bug introduced by
The Module sys does not seem to have a member named maxint.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
151
152
                # get the full metric string
153
                if hmeta_str:
154
                    args = [prefix, hmeta_str, rname, metric[0]]
155
                else:
156
                    args = [prefix, rname, metric[0]]
157
                # now combine the values
158
                if mmeta_dat:
159
                    fmstr = "{0}.{1} {2:.2f} {3}\n".format(".".join(args),
160
                                                           ".".join(mmeta_dat),
161
                                                           int(mval), int(ts))
162
                else:
163
                    fmstr = "{0} {1:.2f} {2}\n".format(".".join(args), mval,
164
                                                       int(ts))
165
                fmstr_len = len(fmstr)
166
167
                # can we append to the existing buffer?
168
                if len(self.buff) + fmstr_len < self.max_bytes:
169
                    self.buff.append(fmstr)
170
                    self.buff_bytes += fmstr_len
171
                    continue
172
173
                # or do we need to queue and start a new buffer?
174
                self.metrics.append(self.buff)
175
                self.buff = list(fmstr)
176
                self.buff_bytes = fmstr_len
177
178
179
class Graphite(plumd.Writer):
180
    """Graphite sender."""
181
    # todo: move things to render, writers, remove from here
182
    # also todo: print warning for queue size in render not in Graphite
183
    defaults = {
184
        'prefix':           "servers",
185
        'host':             '127.0.0.1',
186
        'port':             2003,
187
        'protocol':         "tcp",
188
        'format':           "plain",  # plain or pickle
189
        'seperator':        "=",     # metadata seperator
190
        # any characters not in this value are removed from metrics/meta data
191
        'filter_chars':     'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-',
192
        'meta_key':         False,   # include metadata keys in metrics
193
        'max_bytes':        1400,    # send metrics in chunks of this many bytes
194
        'retry_time':       30,      # delay between connection attempts
195
        'timeout':          5,       # timeout for socket operations
196
        'maxqueue':         8192,    # maximum number of metrics to queue
197
        'warnqueue':        1024,    # print warning if queue size > this
198
        'tcp_ka_idle':      60,      # send keepalives if idle (seconds)
199
        'tcp_ka_intvl':     5,       # send unacked keepalives (seconds)
200
        'tcp_ka_count':     5        # up to this many unacked keepalives
201
    }
202
203
    def __init__(self, log, config):
204
        """Graphite sender.
205
206
        :param log: A logger
207
        :type log: logging.RootLogger
208
        :param config: a plumd.config.Conf configuration helper instance.
209
        :type config: plumd.config.Conf
210
        """
211
        super(Graphite, self).__init__(log, config)
212
        config.defaults(Graphite.defaults)
213
        self.render = None
214
        fmt = config.get('format')
215
        fmts = {
216
            'plain': GraphiteRenderPlain
217
        }
218
        if fmt in fmts:
219
            self.render = fmts[fmt](self.config)
220
        else:
221
            raise plumd.ConfigError("unkown format: {0}".format(fmt))
222
        proto = config.get('protocol')
223
        self.writer = None
224
        writers = {
225
            'tcp': GraphiteWriteTCP,
226
            'udp': GraphiteWriteUDP
227
        }
228
        if proto in writers:
229
            self.writer = writers[proto](self.log, config)
230
        else:
231
            raise plumd.ConfigError("unkown protocol: {0}".format(proto))
232
233
    def onstop(self):
234
        """Flush remaining metrics and disconnect socket on shutdown."""
235
        msg = "Graphite: onstop: {0}: flushing metrics"
236
        self.log.info(msg.format(self.config.get('name')))
237
        # if we're not connected then drop all metrics and return
238
        if not self.writer.socket:
239
            msg = "Graphite: onstop: not connected, dropping all queued metrics"
240
            self.log.error(msg)
241
            self.render.flush()
242
            return
243
244
        # ensure we don't take too long trying to flush metrics on shutdown
245
        start = time.time()
246
        for mbuff in self.render:
247
            self.writer.write("".join(mbuff))
248
            if time.time() - start > 1.0:
249
                self.render.flush()
250
                msg = "Graphite: onstop: {0}: unable to send metrics, dropping"
251
                self.log.error(msg.format(self.config.get('name')))
252
                self.writer.disconnect()
253
                return
254
        mbuff = self.render.flush()
255
        if mbuff:
256
            self.writer.write("".join(mbuff))
257
        self.writer.disconnect()
258
        msg = "Graphite: onstop: {0}: done"
259
        self.log.info(msg.format(self.config.get('name')))
260
261
    def push(self, rset):
262
        """Render metrics onto our deque and send all full batches.
263
264
        :param metrics: The list of pre-formated metrics from our renderer.
265
        :type metrics: list
266
        """
267
        # record this batch of metrics
268
        if rset:
269
            self.render.process(rset.results)
270
271
        # get the next full batch of metrics to send - next_batch pop_left's
272
        resend = None
273
        for mbuff in self.render:
274
            metrics = self.writer.write("".join(mbuff))
275
            if metrics:
276
                resend = metrics
277
                # do not continue if we're unable to send
278
                # this can cause significant delays on shutdown
279
                break
280
281
        # add metrics back onto queue if any
282
        if resend:
283
            self.log.debug("Graphite: write: requeuing metrics")
284
            self.render.metrics.append(resend)
285
286
287
class GraphiteWriteTCP(object):
288
    """Graphite TCP writer."""
289
290
    def __init__(self, log, config):
291
        """Graphite TCP writer."""
292
        self.log = log
293
        self.addr = (config.get('host'), config.get('port'))
294
        self.socket = None
295
        self.timeout = config.get('timeout')
296
        self.retry_time = config.get('retry_time')
297
        self.tcp_ka_idle = config.get('tcp_ka_idle')
298
        self.tcp_ka_intvl = config.get('tcp_ka_intvl')
299
        self.tcp_ka_count = config.get('tcp_ka_count')
300
301
    def connect(self):
302
        """Connect to graphite and maintain a connection. Returns True if
303
        the connection was made or False if it failed.
304
305
        :rtype: bool
306
        """
307
        # disconnect if we're already connected
308
        if self.socket:
309
            self.disconnect()
310
        try:
311
            # create the socket
312
            self.socket = socket.socket(socket.AF_INET,
313
                                        socket.SOCK_STREAM)
314
            # enable tcp keepalives with more aggresive values than default
315
            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
316
            self.socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE,
317
                                   self.tcp_ka_idle)
318
            self.socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL,
319
                                   self.tcp_ka_intvl)
320
            self.socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT,
321
                                   self.tcp_ka_count)
322
            # set timeout for socket operations
323
            self.socket.settimeout(self.timeout)
324
            # connect
325
            self.socket.connect(self.addr)
326
            # log and return
327
            msg = "Graphite: connected: {0}"
328
            self.log.info(msg.format(self.addr))
329
            return True
330
        except Exception as e:
0 ignored issues
show
Best Practice introduced by
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
331
            # log exception, ensure cleanup is done (disconnect)
332
            msg = "Graphite: {0}: connect: excception: {1}"
333
            self.log.error(msg.format(self.addr, e))
334
            # pause before reconnecting
335
            time.sleep(self.retry_time)
336
            self.disconnect()
337
        return False
338
339
    def disconnect(self):
340
        """Severe the graphite connection."""
341
        if self.socket:
342
            try:
343
                self.socket.close()
344
                self.socket = None
345
            except Exception as e:
0 ignored issues
show
Best Practice introduced by
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
346
                msg = "Graphite: dicsonnect: exception {0}".format(e)
347
                self.log.error(msg)
348
                self.socket = None
349
350
    def write(self, metrics):
351
        """Send the metrics string to graphite.
352
353
        :param metrics: the metrics string to send.
354
        :type metrics: str
355
        :rtype: bool
356
        """
357
        if not self.socket:
358
            self.connect()
359
        if not self.socket:
360
            msg = "Graphite: write: connection failed"
361
            self.log.error(msg)
362
            return metrics  # resend them
363
        try:
364
            if PY3:
365
                self.socket.sendall(bytes(metrics, 'utf8'))
366
            else:
367
                self.socket.sendall(metrics)
368
        except Exception as e:
0 ignored issues
show
Best Practice introduced by
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
369
            msg = "Graphite: {0}: write: exception: {1}"
370
            self.log.error(msg.format(self.addr, e))
371
            self.connect()
372
            return metrics  # resend them
373
374
375
class GraphiteWriteUDP(object):
376
    """Graphite UDP writer."""
377
378
    def __init__(self, log, config):
379
        """Graphite UDP writer."""
380
        self.log = log
381
        self.addr = (config.get('host'), config.get('port'))
382
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
383
384
    def write_blocking(self, metrics):
385
        """Send the metrics to graphite and return number of bytes sent.
386
387
        :param metrics: the metrics string to send.
388
        :type metrics: str
389
        :rtype: int
390
        """
391
        if PY3:
392
            return self.socket.sendto(bytes(metrics, 'utf8'), self.addr)
393
        else:
394
            return self.socket.sendto(metrics, self.addr)
395
396
    def write(self, metrics):
397
        """Send the metrics to graphite and return number of bytes sent.
398
399
        :param metrics: the metrics string to send.
400
        :type metrics: str
401
        :rtype: int
402
        """
403
        if PY3:
404
            return self.socket.sendto(bytes(metrics, 'utf8'), self.addr)
405
        else:
406
            return self.socket.sendto(metrics, self.addr)
407