Completed
Branch master (2f3d56)
by Kenny
01:12
created

GraphiteRenderPlain   A

Complexity

Total Complexity 20

Size/Duplication

Total Lines 119
Duplicated Lines 0 %

Importance

Changes 2
Bugs 1 Features 2
Metric Value
c 2
b 1
f 2
dl 0
loc 119
rs 10
wmc 20

3 Methods

Rating   Name   Duplication   Size   Complexity  
B __init__() 0 21 5
F process() 0 67 14
A flush() 0 7 1
1
 # -*- coding: utf-8 -*-
2
3
__author__ = 'Kenny Freeman'
4
__email__ = '[email protected]'
5
__license__ = "ISCL"
6
__docformat__ = 'reStructuredText'
7
8
import sys
9
10
PY3 = sys.version_info > (3,)
11
12
import time
13
import socket
14
15
import plumd
16
import plumd.plugins
17
18
19
def format_meta(meta, sep, meta_key, filter_chars):
20
    """Return a formatted metadata string from the Meta object.
21
22
    :param meta: :class:`plumd.Meta` object that describes a metrics metadata.
23
    :type meta: plumd.Meta
24
    :param sep: seperator character for metadata values (eg. =).
25
    :type sep: str
26
    :param meta_key: flag to include meta data key values or not
27
    :type meta_key: bool
28
    :param filter_chars: string value consisting of the characters to filter out
29
    :type filter_chars` : str
30
    :rtype: str
31
    """
32
    # generate metadata string
33
    mmeta_dat = []
34
    for mkey, mval in meta.items:
35
        mkey = mkey.translate(None, filter_chars)
36
        # mval is one of Int, Float, String, etc metric types
37
        # and so mval[1] is the actual value where mval[0] is the name of it
38
        mval = mval[1]
39
        if isinstance(mval, str):
40
            mval = mval.translate(None, filter_chars)
41
        elif isinstance(mval, bool):
42
            mval = "true" if mval else "false"
43
        # include the metadata key in the metric? eg. .region_uswest.
44
        if meta_key:
45
            mmeta_dat.append("{0}{1}{2}".format(mkey, sep, mval))
46
        # put the metadata value in the metric only eg. .uswest.
47
        else:
48
            mmeta_dat.append(mval)
49
    return ".".join(mmeta_dat)
50
51
52
53
54
class GraphiteRenderPlain(plumd.Render):
55
    """Renders metrics in graphite plain text format:
56
57
    <prefix>.<hmeta=hval>.<measurement>.<metric>.<mmeta=mval>.value <value> <ts>
58
59
    where host and metric metadata keys are sorted before being added to the
60
    metric name and the seperator character is configurable. eg.
61
62
    servers.env=test.region=test.cpu.idle.value 0.0 1452240380
63
    servers.env=test.region=test.disk.free.dev=sda1.value 1213234232 1452240380
64
65
    or with a . seperator character:
66
67
    servers.env.test.region.test.cpu.idle.value 0.0 1452240380
68
    servers.env.test.region.test.disk.free.dev.sda1.value 1213234232 1452240380
69
70
    :param rconfig: A Conf object initialized with the plugin configuration.
71
    :type rconfig: plumd.conf.Conf
72
    :raises: plumd.ConfigError
73
    """
74
    def __init__(self, rconfig):
75
        super(GraphiteRenderPlain, self).__init__(rconfig)
76
        # get the configured values from the plugin configuration
77
        self.prefix = rconfig.get("prefix")
78
        # record rendered metrics here, when bytes > max_bytes put into deque
79
        self.buff = list()
80
        self.buff_bytes = 0
81
        self.seperator = rconfig.get("seperator")
82
        self.meta_key = rconfig.get("meta_key")
83
        # need to build a list of characters to remove from metric/meta strings
84
        # this method is significantly faster/easier on the cpu than re.sub
85
        # build filter_chars with eg. chars = string.letters + string.digits + "_-"
86
        keep_chars = rconfig.get("filter_chars")
87
        self.filter_chars = "".join(char for char in map(chr, range(256)) \
88
                           if not char in keep_chars)
89
        if self.seperator and isinstance(self.seperator, str):
90
            self.seperator = self.seperator[0]
91
92
        # generate the host meta data string - assumes static host metadata
93
        self.hmeta_str = format_meta(self.meta, self.seperator, self.meta_key,
94
                                     self.filter_chars)
95
96
97
    def flush(self):
98
        """Return and clear the partial metric buffer, also clear queue."""
99
        metrics = self.buff
100
        self.buff = []
101
        self.buff_bytes = 0
102
        self.metrics.clear()
103
        return "".join(metrics)
104
105
106
    def process(self, results):
107
        """Process the metrics in the result set into the graphite plain text
108
        format.
109
110
        :param results: a generator that returns metrics in a standard format
111
        :type results: generator
112
        """
113
        hmeta_str = self.hmeta_str          # host metadata string, if any
114
        prefix = self.prefix                # metric prefix string
115
        meta_key = self.meta_key            # bool: include metadata in metric?
116
        sep = self.seperator                # meta data seperator character
117
        filter_chars = self.filter_chars    # remove these chars from strings
118
119
        # render each metric in the graphite plaintext format
120
        for (ts, rname, rmeta, metrics) in results:
121
            # get the metadata for this metric, if any
122
            mmeta_dat = []
123
            for mkey, mval in rmeta.items:
124
                mkey = mkey.translate(None, filter_chars)
125
                mval = mval[1] # mval[0] is the name, mval[1] is the value
126
                if isinstance(mval, str):
127
                    mval = mval.translate(None, filter_chars)
128
                elif isinstance(mval, bool):
129
                    mval = "true" if mval else "false"
130
                # include the metadata key in the metric? eg. .region_uswest.
131
                if meta_key:
132
                    mmeta_dat.append("{0}{1}{2}".format(mkey, sep, mval))
133
                # put the metadata value in the metric only eg. .uswest.
134
                else:
135
                    mmeta_dat.append(mval)
136
137
            # generate a metric string for each metric
138
            for metric in metrics:
139
                mclass = metric.__class__
140
                # silently ignore strings
141
                if mclass == plumd.String:
142
                    continue
143
                # booleans are either really big or 0
144
                mval = metric[1]
145
                if mclass == plumd.Boolean:
146
                    mval = sys.maxint if mval else 0
147
148
                # get the full metric string
149
                if hmeta_str:
150
                    args = [ prefix, hmeta_str, rname, metric[0] ]
151
                else:
152
                    args = [ prefix, rname, metric[0] ]
153
                # now combine the values
154
                if mmeta_dat:
155
                    fmstr = "{0}.{1} {2:.2f} {3}\n".format(".".join(args),
156
                                                       ".".join(mmeta_dat),
157
                                                       mval, int(ts))
158
                else:
159
                    fmstr = "{0} {1:.2f} {2}\n".format(".".join(args), mval,
160
                                                    int(ts))
161
                fmstr_len = len(fmstr)
162
163
                # can we append to the existing buffer?
164
                if len(self.buff) + fmstr_len < self.max_bytes:
165
                    self.buff.append(fmstr)
166
                    self.buff_bytes += fmstr_len
167
                    continue
168
169
                # or do we need to queue and start a new buffer?
170
                self.metrics.append(self.buff)
171
                self.buff = list(fmstr)
172
                self.buff_bytes = fmstr_len
173
174
175
class Graphite(plumd.plugins.Writer):
176
    """Graphite sender."""
177
    ## todo: move things to render, writers, remove from here
178
    ## also todo: print warning for queue size in render not in Graphite
179
    defaults = {
180
        'prefix':           "servers",
181
        'host':             '127.0.0.1',
182
        'port':             2003,
183
        'protocol':         "tcp",
184
        'format':           "plain", # plain or pickle
185
        'seperator':        "=",     # metadata seperator
186
        # any characters not in this value are removed from metrics/meta data
187
        'filter_chars':     'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-',
188
        'meta_key':         False,   # include metadata keys in metrics
189
        'max_bytes':        1400,    # send metrics in chunks of this many bytes
190
        'retry_time':       30,      # delay between connection attempts
191
        'timeout':          5,       # timeout for socket operations
192
        'maxqueue':         8192,    # maximum number of metrics to queue
193
        'warnqueue':        1024     # print warning if queue size > this
194
    }
195
196
    def __init__(self, log, config):
197
        """Graphite sender.
198
199
        :param log: A logger
200
        :type log: logging.RootLogger
201
        :param config: a plumd.config.Conf configuration helper instance.
202
        :type config: plumd.config.Conf
203
        """
204
        super(Graphite, self).__init__(log, config)
205
        config.defaults(Graphite.defaults)
206
        self.render = None
207
        fmt = config.get('format')
208
        fmts = {
209
            'plain': GraphiteRenderPlain
210
        }
211
        if fmt in fmts:
212
            self.render = fmts[fmt](self.config)
213
        else:
214
            raise plumd.ConfigError("unkown format: {0}".format(fmt))
215
        proto = config.get('protocol')
216
        self.writer = None
217
        writers = {
218
            'tcp': GraphiteWriteTCP,
219
            'udp': GraphiteWriteUDP
220
        }
221
        if proto in writers:
222
            self.writer = writers[proto](self.log, config)
223
        else:
224
            raise plumd.ConfigError("unkown protocol: {0}".format(proto))
225
226
227
    def onstop(self):
228
        """Flush remaining metrics and disconnect socket on shutdown."""
229
        msg = "graphite: onstop: {0}: flushing metrics"
230
        self.log.info(msg.format(self.config.get('name')))
231
        # if we're not connected then drop all metrics and return
232
        if not self.writer.socket:
233
            msg = "graphite: onstop: not connected, dropping all queued metrics"
234
            self.log.error(msg)
235
            self.render.flush()
236
            return
237
238
        # ensure we don't take too long trying to flush metrics on shutdown
239
        start = time.time()
240
        for mbuff in self.render:
241
            self.writer.write("".join(mbuff))
242
            if time.time() - start > 1.0:
243
                self.render.flush()
244
                msg = "graphite: onstop: {0}: unable to send metrics, dropping"
245
                self.log.error(msg.format(self.config.get('name')))
246
                self.writer.disconnect()
247
                return
248
        mbuff = self.render.flush()
249
        if mbuff:
250
            self.writer.write("".join(mbuff))
251
        self.writer.disconnect()
252
        msg = "graphite: onstop: {0}: done"
253
        self.log.info(msg.format(self.config.get('name')))
254
255
256
    def push(self, rset):
257
        """Render metrics onto our deque and send all full batches.
258
259
        :param metrics: The list of pre-formated metrics from our renderer.
260
        :type metrics: list
261
        """
262
        # record this batch of metrics
263
        if rset:
264
            self.render.process(rset.results)
265
266
        # get the next full batch of metrics to send - next_batch pop_left's
267
        resend = None
268
        for mbuff in self.render:
269
            metrics = self.writer.write("".join(mbuff))
270
            if metrics:
271
                resend = metrics
272
                # do not continue if we're unable to send
273
                # this can cause significant delays on shutdown
274
                break
275
276
        # add metrics back onto queue if any
277
        if resend:
278
            self.log.debug("graphite: write: requeuing metrics")
279
            self.render.metrics.append(resend)
280
281
282
class GraphiteWriteTCP(object):
283
    """Graphite TCP writer."""
284
285
    def __init__(self, log, config):
286
        """Graphite TCP writer."""
287
        self.log = log
288
        self.addr = (config.get('host'), config.get('port'))
289
        self.socket = None
290
        self.timeout = config.get('timeout')
291
        self.retry_time = config.get('retry_time')
292
293
294
    def connect(self):
295
        """Connect to graphite and maintain a connection. Returns True if
296
        the connection was made or False if it failed.
297
298
        :rtype: bool
299
        """
300
        # disconnect if we're already connected
301
        if self.socket:
302
            self.disconnect()
303
        try:
304
            # create the socket
305
            self.socket = socket.socket(socket.AF_INET,
306
                                        socket.SOCK_STREAM)
307
            # set timeout for socket operations
308
            self.socket.settimeout(self.timeout)
309
            # connect
310
            self.socket.connect(self.addr)
311
            # log and return
312
            msg = "graphite: connected: {0}"
313
            self.log.info(msg.format(self.addr))
314
            return True
315
        except Exception as e:
316
            # log exception, ensure cleanup is done (disconnect)
317
            msg = "graphite: {0}: connect: excception: {1}"
318
            self.log.error(msg.format(self.addr, e))
319
            # pause before reconnecting
320
            time.sleep(self.retry_time)
321
            self.disconnect()
322
        return False
323
324
325
    def disconnect(self):
326
        """Severe the graphite connection."""
327
        if self.socket:
328
            try:
329
                self.socket.close()
330
                self.socket = None
331
            except Exception as e:
332
                msg = "graphite: dicsonnect: exception {0}".format(e)
333
                self.log.error(msg)
334
                self.socket = None
335
336
337
    def write(self, metrics):
338
        """Send the metrics string to graphite.
339
340
        :param metrics: the metrics string to send.
341
        :type metrics: str
342
        :rtype: bool
343
        """
344
        if not self.socket:
345
            self.connect()
346
        if not self.socket:
347
            msg = "graphite: write: connection failed"
348
            self.log.error(msg)
349
            return metrics # resend them
350
        try:
351
            if PY3:
352
                self.socket.sendall(bytes(metrics, 'utf8'))
353
            else:
354
                self.socket.sendall(metrics)
355
        except Exception as e:
356
            msg = "graphite: {0}: write: exception: {1}"
357
            self.log.error(msg.format(self.addr, e))
358
            self.connect()
359
            return metrics # resend them
360
361
362
class GraphiteWriteUDP(object):
363
    """Graphite UDP writer."""
364
365
    def __init__(self, log, config):
366
        """Graphite UDP writer."""
367
        self.log = log
368
        self.addr = (config.get('host'), config.get('port'))
369
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
370
371
372
    def write_blocking(self, metrics):
373
        """Send the metrics to graphite and return number of bytes sent.
374
375
        :param metrics: the metrics string to send.
376
        :type metrics: str
377
        :rtype: int
378
        """
379
        if PY3:
380
            return self.socket.sendto(bytes(metrics, 'utf8'), self.addr)
381
        else:
382
            return self.socket.sendto(metrics, self.addr)
383
384
    def write(self, metrics):
385
        """Send the metrics to graphite and return number of bytes sent.
386
387
        :param metrics: the metrics string to send.
388
        :type metrics: str
389
        :rtype: int
390
        """
391
        if PY3:
392
            return self.socket.sendto(bytes(metrics, 'utf8'), self.addr)
393
        else:
394
            return self.socket.sendto(metrics, self.addr)
395