Completed
Push — master ( f161ca...d2dc5c )
by Kenny
01:12
created

GraphiteWriteTCP.write()   B

Complexity

Conditions 5

Size

Total Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 2
Metric Value
cc 5
dl 0
loc 23
rs 8.2508
c 2
b 0
f 2
1
 # -*- coding: utf-8 -*-
2
3
__author__ = 'Kenny Freeman'
4
__email__ = '[email protected]'
5
__license__ = "ISCL"
6
__docformat__ = 'reStructuredText'
7
8
import re
9
import sys
10
11
PY3 = sys.version_info > (3,)
12
13
import time
14
import socket
15
16
import plumd
17
import plumd.plugins
18
19
20
def format_meta(meta, sep, meta_key):
21
    """Return a formatted metadata string from the Meta object.
22
23
    :param meta: :class:`plumd.Meta` object that describes a metrics metadata.
24
    :type meta: plumd.Meta
25
    :param sep: seperator character for metadata values (eg. =).
26
    :type sep: str
27
    :param meta_key: flag to include meta data key values or not
28
    :type meta_key: bool
29
    :rtype: str
30
    """
31
    # generate metadata string
32
    mmeta_dat = []
33
    for mkey, mval in meta.items:
34
        # eg. nic, metric(name='nic', value='eth0')
35
        mkey = re.sub('[^0-9a-zA-Z_\-]+', '', mkey)[0]
36
        mval = mval[1]
37
        if isinstance(mval, str):
38
            mkey = re.sub('[^0-9a-zA-Z_\-]+', '', mkey)[0]
39
        elif isinstance(mval, bool):
40
            mval = "true" if mval else "false"
41
        if meta_key:
42
            mmeta_dat.append("{0}{1}{2}".format(mkey, sep, mval))
43
        else:
44
            mmeta_dat.append(mval)
45
    return ".".join(mmeta_dat)
46
47
48
def get_metric_str(prefix, hmeta_str, mmeta_str, metric, rname):
49
    """Return a string representation of a metric.
50
51
    :param prefix: the metric prefix to use (eg. servers)
52
    :type prefix: str
53
    :param hmeta_str: the host metadata string.
54
    :type hmeta_str: str
55
    :param mmeta_str: the metric metadata string.
56
    :type mmeta_str: str
57
    :param metric: the metric.
58
    :type metric: plumd.Integer or plumd.Float or plumd.String
59
    :param rname: the result set name (eg. cpu).
60
    :type rname: str
61
    :rtype: str
62
    """
63
    args = []
64
    # both host and metric metadata exists
65
    if hmeta_str:
66
        args = [ prefix, hmeta_str, rname, metric[0] ]
67
    else:
68
        args = [ prefix, rname, metric[0] ]
69
    if mmeta_str:
70
        args.append(mmeta_str)
71
    return ".".join(args)
72
73
74
class GraphiteRenderPlain(plumd.Render):
75
    """Renders metrics in graphite plain text format:
76
77
    <prefix>.<hmeta=hval>.<measurement>.<metric>.<mmeta=mval>.value <value> <ts>
78
79
    where host and metric metadata keys are sorted before being added to the
80
    metric name and the seperator character is configurable. eg.
81
82
    servers.env=test.region=test.cpu.idle.value 0.0 1452240380
83
    servers.env=test.region=test.disk.free.dev=sda1.value 1213234232 1452240380
84
85
    or with a . seperator character:
86
87
    servers.env.test.region.test.cpu.idle.value 0.0 1452240380
88
    servers.env.test.region.test.disk.free.dev.sda1.value 1213234232 1452240380
89
90
    :param rconfig: A Conf object initialized with the plugin configuration.
91
    :type rconfig: plumd.conf.Conf
92
    :raises: plumd.ConfigError
93
    """
94
    def __init__(self, rconfig):
95
        super(GraphiteRenderPlain, self).__init__(rconfig)
96
        # get the configured values from the plugin configuration
97
        self.prefix = rconfig.get("prefix")
98
        # record rendered metrics here, when bytes > max_bytes put into deque
99
        self.buff = ""
100
        self.buff_bytes = 0
101
        self.seperator = rconfig.get("seperator")
102
        self.meta_key = rconfig.get("meta_key")
103
        if self.seperator and isinstance(self.seperator, str):
104
            self.seperator = self.seperator[0]
105
106
        # generate the host meta data string - assumes static host metadata
107
        self.hmeta_str = format_meta(self.meta, self.seperator, self.meta_key)
108
109
110
    def flush(self):
111
        """Return and clear the partial metric buffer, also clear queue."""
112
        metrics = self.buff
113
        self.buff = ""
114
        self.buff_bytes = 0
115
        self.metrics.clear()
116
        return metrics
117
118
119
    def process(self, results):
120
        """Process the metrics in the result set into the graphite plain text
121
        format.
122
123
        :param results: a generator that returns metrics in a standard format
124
        :type results: generator
125
        """
126
        hmeta_str = self.hmeta_str
127
        prefix = self.prefix
128
        # render each metric in the graphite plaintext format
129
        for (ts, rname, rmeta, metrics) in results:
130
            # get the metadata for this metric
131
            mmeta_str = format_meta(rmeta, self.seperator, self.meta_key)
132
133
            # generate a metric string for each metric
134
            for metric in metrics:
135
                # silently ignore strings
136
                if metric.__class__ == plumd.String:
137
                    continue
138
139
                # booleans are either really big or 0
140
                mval = metric[1]
141
                if metric.__class__ == plumd.Boolean:
142
                    mval = sys.maxint if mval else 0
143
144
                # get the full metric string
145
                # adding this here instead of a function call to reduce cost
146
                if hmeta_str:
147
                    args = [ prefix, hmeta_str, rname, metric[0] ]
148
                else:
149
                    args = [ prefix, rname, metric[0] ]
150
                if mmeta_str:
151
                    args.append(mmeta_str)
152
                met_str = ".".join(args)
153
                #met_str = get_metric_str(self.prefix, self.hmeta_str,
154
                #                         mmeta_str, metric, rname)
155
156
                # now combine the values
157
                fmstr = "{} {:f} {}\n".format(met_str, mval, int(ts))
158
                fmstr_len = len(fmstr)
159
160
                # can we append to the existing buffer?
161
                if len(self.buff) + fmstr_len < self.max_bytes:
162
                    # todo: try using a list/deque here (or something else)
163
                    # as += is a tad expensive afaik
164
                    self.buff += fmstr
165
                    self.buff_bytes += fmstr_len
166
                    continue
167
168
                # or do we need to queue and start a new buffer?
169
                self.metrics.append(self.buff)
170
                self.buff = fmstr
171
                self.buff_bytes = fmstr_len
172
173
174
class Graphite(plumd.plugins.Writer):
175
    """Graphite sender."""
176
    ## todo: move things to render, writers, remove from here
177
    ## also todo: print warning for queue size in render not in Graphite
178
    defaults = {
179
        'prefix':           "servers",
180
        'host':             '127.0.0.1',
181
        'port':             2003,
182
        'protocol':         "tcp",
183
        'format':           "plain", # plain or pickle
184
        'seperator':        "=",     # metadata seperator
185
        'meta_key':         False,   # include metadata keys in metrics
186
        'batch':            64,      # send batches of this size
187
        'retry_time':       30,      # delay between connection attempts
188
        'timeout':          5,       # timeout for socket operations
189
        'maxqueue':         8192,    # maximum number of metrics to queue
190
        'warnqueue':        1024     # print warning if queue size > this
191
    }
192
193
    def __init__(self, log, config):
194
        """Graphite sender.
195
196
        :param log: A logger
197
        :type log: logging.RootLogger
198
        :param config: a plumd.config.Conf configuration helper instance.
199
        :type config: plumd.config.Conf
200
        """
201
        super(Graphite, self).__init__(log, config)
202
        config.defaults(Graphite.defaults)
203
        self.render = None
204
        fmt = config.get('format')
205
        fmts = {
206
            'plain': GraphiteRenderPlain
207
        }
208
        if fmt in fmts:
209
            self.render = fmts[fmt](self.config)
210
        else:
211
            raise plumd.ConfigError("unkown format: {0}".format(fmt))
212
        proto = config.get('protocol')
213
        self.writer = None
214
        writers = {
215
            'tcp': GraphiteWriteTCP,
216
            'udp': GraphiteWriteUDP
217
        }
218
        if proto in writers:
219
            self.writer = writers[proto](self.log, config)
220
        else:
221
            raise plumd.ConfigError("unkown protocol: {0}".format(proto))
222
223
224
    def onstop(self):
225
        """Flush remaining metrics and disconnect socket on shutdown."""
226
        msg = "graphite: onstop: {0}: flushing metrics"
227
        self.log.info(msg.format(self.config.get('name')))
228
        # if we're not connected then drop all metrics and return
229
        if not self.writer.socket:
230
            msg = "graphite: onstop: not connected, dropping all queued metrics"
231
            self.log.error(msg)
232
            self.render.flush()
233
            return
234
235
        # ensure we don't take too long trying to flush metrics on shutdown
236
        start = time.time()
237
        for mbuff in self.render:
238
            self.writer.write(mbuff)
239
            if time.time() - start > 1.0:
240
                self.render.flush()
241
                msg = "graphite: onstop: {0}: unable to send metrics, dropping"
242
                self.log.error(msg.format(self.config.get('name')))
243
                self.writer.disconnect()
244
                return
245
        mbuff = self.render.flush()
246
        if mbuff:
247
            self.writer.write(mbuff)
248
        self.writer.disconnect()
249
        msg = "graphite: onstop: {0}: done"
250
        self.log.info(msg.format(self.config.get('name')))
251
252
253
    def push(self, rset):
254
        """Render metrics onto our deque and send all full batches.
255
256
        :param metrics: The list of pre-formated metrics from our renderer.
257
        :type metrics: list
258
        """
259
        # record this batch of metrics
260
        if rset:
261
            self.render.process(rset.results)
262
263
        # get the next full batch of metrics to send - next_batch pop_left's
264
        resend = None
265
        for mbuff in self.render:
266
            metrics = self.writer.write(mbuff)
267
            if metrics:
268
                resend = metrics
269
                # do not continue if we're unable to send
270
                # this can cause significant delays on shutdown
271
                break
272
273
        # add metrics back onto queue if any
274
        if resend:
275
            self.log.debug("graphite: write: requeuing metrics")
276
            self.render.metrics.append(resend)
277
278
279
class GraphiteWriteTCP(object):
280
    """Graphite TCP writer."""
281
282
    def __init__(self, log, config):
283
        """Graphite TCP writer."""
284
        self.log = log
285
        self.addr = (config.get('host'), config.get('port'))
286
        self.socket = None
287
        self.timeout = config.get('timeout')
288
        self.retry_time = config.get('retry_time')
289
290
291
    def connect(self):
292
        """Connect to graphite and maintain a connection. Returns True if
293
        the connection was made or False if it failed.
294
295
        :rtype: bool
296
        """
297
        # disconnect if we're already connected
298
        if self.socket:
299
            self.disconnect()
300
        try:
301
            # create the socket
302
            self.socket = socket.socket(socket.AF_INET,
303
                                        socket.SOCK_STREAM)
304
            # set timeout for socket operations
305
            self.socket.settimeout(self.timeout)
306
            # connect
307
            self.socket.connect(self.addr)
308
            # log and return
309
            msg = "graphite: connected: {0}"
310
            self.log.info(msg.format(self.addr))
311
            return True
312
        except Exception as e:
313
            # log exception, ensure cleanup is done (disconnect)
314
            msg = "graphite: {0}: connect: excception: {1}"
315
            self.log.error(msg.format(self.addr, e))
316
            # pause before reconnecting
317
            time.sleep(self.retry_time)
318
            self.disconnect()
319
        return False
320
321
322
    def disconnect(self):
323
        """Severe the graphite connection."""
324
        if self.socket:
325
            try:
326
                self.socket.close()
327
                self.socket = None
328
            except Exception as e:
329
                msg = "graphite: dicsonnect: exception {0}".format(e)
330
                self.log.error(msg)
331
                self.socket = None
332
333
334
    def write(self, metrics):
335
        """Send the metrics string to graphite.
336
337
        :param metrics: the metrics string to send.
338
        :type metrics: str
339
        :rtype: bool
340
        """
341
        if not self.socket:
342
            self.connect()
343
        if not self.socket:
344
            msg = "graphite: write: connection failed"
345
            self.log.error(msg)
346
            return metrics # resend them
347
        try:
348
            if PY3:
349
                self.socket.sendall(bytes(metrics, 'utf8'))
350
            else:
351
                self.socket.sendall(metrics)
352
        except Exception as e:
353
            msg = "graphite: {0}: write: exception: {1}"
354
            self.log.error(msg.format(self.addr, e))
355
            self.connect()
356
            return metrics # resend them
357
358
359
class GraphiteWriteUDP(object):
360
    """Graphite UDP writer."""
361
362
    def __init__(self, log, config):
363
        """Graphite UDP writer."""
364
        self.log = log
365
        self.addr = (config.get('host'), config.get('port'))
366
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
367
368
369
    def write_blocking(self, metrics):
370
        """Send the metrics to graphite and return number of bytes sent.
371
372
        :param metrics: the metrics string to send.
373
        :type metrics: str
374
        :rtype: int
375
        """
376
        if PY3:
377
            return self.socket.sendto(bytes(metrics, 'utf8'), self.addr)
378
        else:
379
            return self.socket.sendto(metrics, self.addr)
380
381
    def write(self, metrics):
382
        """Send the metrics to graphite and return number of bytes sent.
383
384
        :param metrics: the metrics string to send.
385
        :type metrics: str
386
        :rtype: int
387
        """
388
        if PY3:
389
            return self.socket.sendto(bytes(metrics, 'utf8'), self.addr)
390
        else:
391
            return self.socket.sendto(metrics, self.addr)
392