Completed
Push — master ( 3279ec...8d12dc )
by Kenny
01:09
created

GraphiteRenderPlain.flush()   A

Complexity

Conditions 1

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 1
dl 0
loc 7
rs 9.4285
c 1
b 0
f 1
1
 # -*- coding: utf-8 -*-
2
3
__author__ = 'Kenny Freeman'
4
__email__ = '[email protected]'
5
__license__ = "ISCL"
6
__docformat__ = 'reStructuredText'
7
8
import sys
9
import time
10
import struct
11
import socket
12
13
import plumd
14
import plumd.util
15
import plumd.plugins
16
17
18
def format_meta(meta, sep, meta_key):
19
    """Return a formatted metadata string from the Meta object.
20
21
    :param meta: :class:`plumd.Meta` object that describes a metrics metadata.
22
    :type meta: plumd.Meta
23
    :param sep: seperator character for metadata values (eg. =).
24
    :type sep: str
25
    :param meta_key: flag to include meta data key values or not
26
    :type meta_key: bool
27
    :rtype: str
28
    """
29
    # generate metadata string
30
    mmeta_dat = []
31
    for mkey, mval in meta.items:
32
        # eg. nic, metric(name='nic', value='eth0')
33
        mkey = plumd.util.replace_chars(mkey)
34
        mval = mval[1]
35
        if isinstance(mval, str):
36
            mval = plumd.util.replace_chars(mval)
37
        elif isinstance(mval, bool):
38
            mval = "true" if mval else "false"
39
        if meta_key:
40
            mmeta_dat.append("{0}{1}{2}".format(mkey, sep, mval))
41
        else:
42
            mmeta_dat.append(mval)
43
    return ".".join(mmeta_dat)
44
45
46
def get_metric_str(prefix, hmeta_str, mmeta_str, metric, rname):
47
    """Return a string representation of a metric.
48
49
    :param prefix: the metric prefix to use (eg. servers)
50
    :type prefix: str
51
    :param hmeta_str: the host metadata string.
52
    :type hmeta_str: str
53
    :param mmeta_str: the metric metadata string.
54
    :type mmeta_str: str
55
    :param metric: the metric.
56
    :type metric: plumd.Integer or plumd.Float or plumd.String
57
    :param rname: the result set name (eg. cpu).
58
    :type rname: str
59
    :rtype: str
60
    """
61
    args = []
62
    # both host and metric metadata exists
63
    if hmeta_str:
64
        args = [ prefix, hmeta_str, rname, metric[0] ]
65
    else:
66
        args = [ prefix, rname, metric[0] ]
67
    if mmeta_str:
68
        args.append(mmeta_str)
69
    return ".".join(args)
70
71
72
class GraphiteRenderPlain(plumd.Render):
73
    """Renders metrics in graphite plain text format:
74
75
    <prefix>.<hmeta=hval>.<measurement>.<metric>.<mmeta=mval>.value <value> <ts>
76
77
    where host and metric metadata keys are sorted before being added to the
78
    metric name and the seperator character is configurable. eg.
79
80
    servers.env=test.region=test.cpu.idle.value 0.0 1452240380
81
    servers.env=test.region=test.disk.free.dev=sda1.value 1213234232 1452240380
82
83
    or with a . seperator character:
84
85
    servers.env.test.region.test.cpu.idle.value 0.0 1452240380
86
    servers.env.test.region.test.disk.free.dev.sda1.value 1213234232 1452240380
87
88
    :param rconfig: A Conf object initialized with the plugin configuration.
89
    :type rconfig: plumd.conf.Conf
90
    :raises: plumd.ConfigError
91
    """
92
    def __init__(self, rconfig):
93
        super(GraphiteRenderPlain, self).__init__(rconfig)
94
        # get the configured values from the plugin configuration
95
        self.prefix = rconfig.get("prefix")
96
        # record rendered metrics here, when bytes > max_bytes put into deque
97
        self.buff = ""
98
        self.buff_bytes = 0
99
        self.seperator = rconfig.get("seperator")
100
        self.meta_key = rconfig.get("meta_key")
101
        if self.seperator and isinstance(self.seperator, str):
102
            self.seperator = self.seperator[0]
103
104
        # generate the host meta data string - assumes static host metadata
105
        self.hmeta_str = format_meta(self.meta, self.seperator, self.meta_key)
106
107
108
    def flush(self):
109
        """Return and clear the partial metric buffer, also clear queue."""
110
        metrics = self.buff
111
        self.buff = ""
112
        self.buff_bytes = 0
113
        self.metrics.clear()
114
        return metrics
115
116
117
    def process(self, results):
118
        """Process the metrics in the result set into the graphite plain text
119
        format.
120
121
        :param results: a generator that returns metrics in a standard format
122
        :type results: generator
123
        """
124
        # render each metric in the graphite plaintext format
125
        for (ts, rname, rmeta, metrics) in results:
126
            # get the metadata for this metric
127
            mmeta_str = format_meta(rmeta, self.seperator, self.meta_key)
128
129
            # generate a metric string for each metric
130
            for metric in metrics:
131
                # silently ignore strings
132
                if metric.__class__ == plumd.String:
133
                    continue
134
135
                # booleans are either really big or 0
136
                mval = metric[1]
137
                if metric.__class__ == plumd.Boolean:
138
                    mval = sys.maxint if mval else 0
139
140
                # get the full metric string
141
                met_str = get_metric_str(self.prefix, self.hmeta_str,
142
                                         mmeta_str, metric, rname)
143
144
                # now combine the values
145
                fmstr = "{} {:f} {}\n".format(met_str, mval, int(ts))
146
                fmstr_len = len(fmstr)
147
148
                # can we append to the existing buffer?
149
                if len(self.buff) + fmstr_len < self.max_bytes:
150
                    self.buff += fmstr
151
                    self.buff_bytes += fmstr_len
152
                    continue
153
154
                # or do we need to queue and start a new buffer?
155
                self.metrics.append(self.buff)
156
                self.buff = fmstr
157
                self.buff_bytes = fmstr_len
158
159
160
class Graphite(plumd.plugins.Writer):
161
    """Graphite sender."""
162
    ## todo: move things to render, writers, remove from here
163
    ## also todo: print warning for queue size in render not in Graphite
164
    defaults = {
165
        'prefix':           "servers",
166
        'host':             '127.0.0.1',
167
        'port':             2003,
168
        'protocol':         "tcp",
169
        'format':           "plain", # plain or pickle
170
        'seperator':        "=",     # metadata seperator
171
        'meta_key':         False,   # include metadata keys in metrics
172
        'batch':            64,      # send batches of this size
173
        'retry_time':       30,      # delay between connection attempts
174
        'timeout':          5,       # timeout for socket operations
175
        'maxqueue':         8192,    # maximum number of metrics to queue
176
        'warnqueue':        1024     # print warning if queue size > this
177
    }
178
179
    def __init__(self, log, config):
180
        """Graphite sender. WIP
181
182
        :param log: A logger
183
        :type log: logging.RootLogger
184
        :param config: a plumd.config.Conf configuration helper instance.
185
        :type config: plumd.config.Conf
186
        """
187
        super(Graphite, self).__init__(log, config)
188
        config.defaults(Graphite.defaults)
189
        self.render = None
190
        fmt = config.get('format')
191
        fmts = {
192
            'plain': GraphiteRenderPlain
193
        }
194
        if fmt in fmts:
195
            self.render = fmts[fmt](self.config)
196
        else:
197
            raise plumd.ConfigError("unkown format: {0}".format(fmt))
198
        proto = config.get('protocol')
199
        host = config.get('host')
200
        port = config.get('port')
201
        self.writer = None
202
        writers = {
203
            'tcp': GraphiteWriteTCP,
204
            'udp': GraphiteWriteUDP
205
        }
206
        if proto in writers:
207
            self.writer = writers[proto](self.log, config)
208
        else:
209
            raise plumd.ConfigError("unkown protocol: {0}".format(proto))
210
211
212
    def onstop(self):
213
        """Flush remaining metrics and disconnect socket on shutdown."""
214
        # if we're not connected then drop all metrics and return
215
        if not self.writer.socket:
216
            self.render.flush()
217
            msg = "graphite: onstop: not connected, dropping all queued metrics"
218
            self.log.error(msg)
219
            return
220
221
        # ensure we don't take too long trying to flush metrics on shutdown
222
        start = time.time()
223
        for mbuff in self.render:
224
            self.writer.write(mbuff)
225
            if time.time() - start > 1.0:
226
                self.render.flush()
227
                msg = "graphite: onstop: unable to send metrics, dropping"
228
                self.log.error(msg)
229
                self.writer.disconnect()
230
                return
231
        mbuff = self.render.flush()
232
        if mbuff:
233
            self.writer.write(mbuff)
234
        self.writer.disconnect()
235
236
237
    def push(self, rset):
238
        """Render metrics onto our deque and send all full batches.
239
240
        :param metrics: The list of pre-formated metrics from our renderer.
241
        :type metrics: list
242
        """
243
        # record this batch of metrics
244
        if rset:
245
            self.render.process(rset.results)
246
247
        # get the next full batch of metrics to send - next_batch pop_left's
248
        resend = None
249
        for mbuff in self.render:
250
            metrics = self.writer.write(mbuff)
251
            if metrics:
252
                resend = metrics
253
                # do not continue if we're unable to send
254
                # this can cause significant delays on shutdown
255
                break
256
257
        # add metrics back onto queue if any
258
        if resend:
259
            self.log.debug("graphite: write: requeuing metrics")
260
            self.render.metrics.append(resend)
261
262
263
class GraphiteWriteTCP(object):
264
    """Graphite TCP writer."""
265
266
    def __init__(self, log, config):
267
        """Graphite TCP writer."""
268
        self.log = log
269
        self.addr = (config.get('host'), config.get('port'))
270
        self.socket = None
271
        self.timeout = config.get('timeout')
272
        self.retry_time = config.get('retry_time')
273
274
275
    def connect(self):
276
        """Connect to graphite and maintain a connection. Returns True if
277
        the connection was made or False if it failed.
278
279
        :rtype: bool
280
        """
281
        # disconnect if we're already connected
282
        if self.socket:
283
            self.disconnect()
284
        try:
285
            # create the socket
286
            self.socket = socket.socket(socket.AF_INET,
287
                                        socket.SOCK_STREAM)
288
            # set timeout for socket operations
289
            self.socket.settimeout(self.timeout)
290
            # connect
291
            self.socket.connect(self.addr)
292
            # log and return
293
            msg = "graphite: connected: {0}"
294
            self.log.info(msg.format(self.addr))
295
            return True
296
        except Exception as e:
297
            # log exception, ensure cleanup is done (disconnect)
298
            msg = "graphite: {0}: connect: excception: {1}"
299
            self.log.error(msg.format(self.addr, e))
300
            # pause before reconnecting
301
            time.sleep(self.retry_time)
302
            self.disconnect()
303
        return False
304
305
306
    def disconnect(self):
307
        """Severe the graphite connection."""
308
        if self.socket:
309
            try:
310
                self.socket.close()
311
                self.socket = None
312
            except Exception as e:
313
                msg = "graphite: dicsonnect: exception {0}".format(e)
314
                self.log.error(msg)
315
                self.socket = None
316
317
318
    def write(self, metrics):
319
        """Send the metrics string to graphite.
320
321
        :param metrics: the metrics string to send.
322
        :type metrics: str
323
        :rtype: bool
324
        """
325
        if not self.socket:
326
            self.connect()
327
        if not self.socket:
328
            msg = "graphite: write: connection failed"
329
            self.log.error(msg)
330
            return metrics # resend them
331
        try:
332
            self.socket.sendall(metrics)
333
        except Exception as e:
334
            msg = "graphite: {0}: write: exception: {1}"
335
            self.log.error(msg.format(self.addr, e))
336
            self.connect()
337
            return metrics # resend them
338
339
340
class GraphiteWriteUDP(object):
341
    """Graphite UDP writer."""
342
343
    def __init__(self, log, config):
344
        """Graphite UDP writer."""
345
        self.log = log
346
        self.addr = (config.get('host'), config.get('port'))
347
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
348
349
350
    def write_blocking(self, metrics):
351
        """Send the metrics to graphite and return number of bytes sent.
352
353
        :param metrics: the metrics string to send.
354
        :type metrics: str
355
        :rtype: int
356
        """
357
        return self.socket.sendto(metrics, self.addr)
358
359
360
    def write(self, metrics):
361
        """Send the metrics to graphite and return number of bytes sent.
362
363
        :param metrics: the metrics string to send.
364
        :type metrics: str
365
        :rtype: int
366
        """
367
        return self.socket.sendto(metrics, self.addr)
368