Completed
Push — master ( d058c9...ca4b9c )
by Kenny
45s
created

plumd.plugins.writers.Graphite.push()   A

Complexity

Conditions 3

Size

Total Lines 17

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 3
dl 0
loc 17
rs 9.4285
1
# -*- coding: utf-8 -*-
2
3
__author__ = 'Kenny Freeman'
4
__email__ = '[email protected]'
5
__license__ = "ISCL"
6
__docformat__ = 'reStructuredText'
7
8
import sys
9
import time
10
import socket
11
12
import plumd
13
import plumd.plugins
14
import plumd
15
16
17
class GraphiteRender(plumd.Render):
18
    """Renders metrics in graphite plain text or pickle format. Plaintext is:
19
20
    <prefix>.<hmeta=hval>.<measurement>.<metric>.<mmeta=mval>.value <value> <ts>
21
22
    where host and metric metadata keys are sorted before being added to the
23
    metric name and the seperator character is configurable. eg.
24
25
    servers.env=test.region=test.cpu.idle.value 0.0 1452240380
26
    servers.env=test.region=test.disk.free.dev=sda1.value 1213234232 1452240380
27
28
    or with a . seperator character:
29
30
    servers.env.test.region.test.cpu.idle.value 0.0 1452240380
31
    servers.env.test.region.test.disk.free.dev.sda1.value 1213234232 1452240380
32
33
    the pickle format uses tuples:
34
35
    (<metric>, (<timestamp>, <value>))
36
37
    and sends pickled lists of tuples.
38
39
    :param rconfig: A Conf object initialized with the plugin configuration.
40
    :type rconfig: plumd.conf.Conf
41
    """
42
    def __init__(self, rconfig):
43
        super(GraphiteRender, self).__init__(rconfig)
44
        # get the configured values from the plugin configuration
45
        self.prefix = rconfig.get("prefix")
46
        self.format = rconfig.get("format").lower()
47
        if self.format.lower() not in ['plain', 'pickle']:
48
            err = "unknown graphite format: {0}".format(self.format)
49
            raise plumd.ConfigError(err)
50
        self.seperator = rconfig.get("seperator")
51
        if self.seperator and isinstance(self.seperator, str):
52
            self.seperator = self.seperator[0]
53
54
        escape_str = lambda v: v.replace(" ", "_").replace(".", "_").replace("\n", "_")
55
        escape_tag = lambda t: t.replace(" ", "_").replace(".", "_").replace("\n", "_")
56
57
        self.hmeta_str = ""
58
59
        # generate the host meta data string - assumes static host metadata
60
        hmdat = []
61
        for hmkey, hmval in self.meta.items:
62
            hmkey = escape_tag(hmkey)
63
            if isinstance(hmval, str):
64
                hmval = escape_str(hmval)
65
            elif isinstance(hmval, bool):
66
                hmval = "true" if hmval else "false"
67
            hmdat.append("{0}{1}{2}".format(hmkey, self.seperator, hmval))
68
        self.hmeta_str = ".".join(hmdat)
69
70
71
    def process(self, results):
72
        """Process the metrics in the result set into the graphite plain text
73
        format.
74
75
        :param results: a generator that returns metrics in a standard format
76
        :type metrics: generator
77
        """
78
        host_meta = self.hmeta_str # host metadata string previously generated
79
        fmt = self.format          # format to render metrics to (txt, pickle)
80
        sep = self.seperator       # metadata key=value seperator character
81
82
        # these lambda's 'might' save some cpu in the loop below
83
        escape_str = lambda v: v.replace(" ", "_").replace(".", "_").replace("\n", "_")
84
        escape_tag = lambda t: t.replace(" ", "_").replace(".", "_").replace("\n", "_")
85
86
        # render each metric in the graphite plaintext format
87
        #(self._time, robj.name, robj.meta, robj.metrics)
88
        for (ts, rname, rmeta, metrics) in results:
89
            # generate metadata string
90
            mmeta_dat = []
91
            for mkey, mval in rmeta.items:
92
                mkey = escape_tag(mkey)
93
                if isinstance(mval, str):
94
                    mval = escape_str(mval)
95
                elif isinstance(mval, bool):
96
                    mval = "true" if mval else "false"
97
                mmeta_dat.append("{0}{1}{2}".format(mkey, sep, mval))
98
            mmeta_str = ".".join(mmeta_dat)
99
100
            for metric in metrics:
101
                # silently ignore strings
102
                if metric.__class__ == plumd.String:
103
                    continue
104
105
                mname = metric[0]
106
                mval = metric[1]
107
108
                # for now at least, booleans are either really big or 0
109
                if metric.__class__ == plumd.Boolean:
110
                    mval = sys.maxint if mval else 0
111
112
                # now combine all values
113
                # <pfx>.<hmet=val>.<meas>.<met>.<mmet=val>.value <value> <ts>
114
                # see http://crsmithdev.com/arrow/, X is timestamp
115
                ts_fmt = "X"
116
                met_str = None
117
118
                # both host and metric metadata exists
119
                if self.hmeta_str and mmeta_str:
120
                    rset = "{0}.{1}.{2}.{3}.{4}.value"
121
                    args = [ self.prefix, self.hmeta_str, mname, rname,
122
                             mmeta_str ]
123
                    met_str = rset.format(*args)
124
                # only host meta data exists
125
                elif self.hmeta_str:
126
                    rset = "{0}.{1}.{2}.{3}.value"
127
                    args = [ self.prefix, self.hmeta_str, mname, rname ]
128
                    met_str = rset.format(*args)
129
                # only metric metadata exists
130
                elif mmeta_str:
131
                    rset = "{0}.{1}.{2}.{3}.value"
132
                    args = [ self.prefix, mname, rname, mmeta_str ]
133
                    met_str = rset.format(*args)
134
                # no metadata exists
135
                else:
136
                    rset = "{0}.{1}.{2}.value"
137
                    args = [ self.prefix, mname, rname ]
138
                    met_str = rset.format(*args)
139
140
                # add the rendered metric string to our deque of metrics
141
                if fmt == "plain":
142
                    full_str = "{0} {1} {2}"
143
                    full_str = full_str.format(met_str, val, ts.timestamp)
144
                    self.metrics.append(full_str)
145
                elif fmt == "pickle":
146
                    self.metrics.append( ( met_str, (ts.timestamp, val) ) )
147
148
149
class Graphite(plumd.plugins.Writer):
150
    """Graphite sender."""
151
    defaults = {
152
        'prefix':           "servers",
153
        'host':             '127.0.0.1',
154
        'port':             2003,
155
        'protocol':         "tcp",
156
        'format':           "plain", # plain or pickle
157
        'seperator':        "=",     # metadata seperator
158
        'batch':            64,      # send batches of this size
159
        'retry':            30,      # delay between connection attempts
160
        'retries':          3,       # retry sends this many times
161
        'timeout':          5,       # new connection timeout
162
        'maxqueue':         8192,    # maximum number of metrics to queue
163
        'warnqueue':        1024     # print warning if queue size > this
164
    }
165
166
    def __init__(self, log, config):
167
        """Graphite sender. WIP
168
169
        :param log: a structlog logger instance.
170
        :type log: structlog logger
171
        :param config: a plumd.config.Conf configuration helper instance.
172
        :type config: plumd.config.Conf
173
        """
174
        super(Graphite, self).__init__(log, config)
175
        self.addr = (config.get('host'), config.get('port'))
176
        self.retry = config.get('retry')
177
        self.retries = config.get('retries')
178
        self.timeout = config.get('timeout')
179
        self.proto = config.get("protocol")
180
        self.batch = config.get('batch')
181
        self.warn = config.get('warnqueue')
182
        self.socket = None
183
        self.fd = None
184
        self.render = GraphiteRender(self.config)
185
        if self.proto == "tcp":
186
            self.connect = self.connect_tcp
187
            self.send = self.send_tcp
188
        elif self.proto == "udp":
189
            self.connect = self.connect_udp
190
            self.send = self.send_udp
191
        else:
192
            raise plumd.ConfigError("unkown protocol: {0}".format(self.proto))
193
194
195
    def onstart(self):
196
        """Connect to the graphite server on startup."""
197
        self.connect(maxtries=1) # don't block the calling thread on startup!
198
199
200
    def onstop(self):
201
        """Flush remaining metrics and disconnect socket on shutdown."""
202
        # get the next full batch of metrics to send
203
        self.log.debug("onstop", action="flushing")
204
        qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, True)
205
        if qsize > self.warn:
206
            self.log.warn("onstop", qsize=qsize, warn=self.warn)
207
        while nmetrics > 0:
208
            self.log.debug("onstop", action="sending", metrics=bmetrics)
209
            self.send(bmetrics, maxtries=1)
210
            # and the next batch until there are no more full batches
211
            qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, True)
212
        self.log.debug("onstop", action="flushed")
213
        # flush and close fd, socket
214
        if self.fd is not None:
215
            try:
216
                self.fd.flush()
217
                self.fd.close()
218
            except Exception as e:
219
                self.log.error("onstop", action="close.fd", exception=e)
220
        if self.socket is not None:
221
            try:
222
                self.socket.close()
223
            except Exception as e:
224
                self.log.error("onstop", action="close.socket", exception=e)
225
226
227
    def push(self, rset):
228
        """Render metrics onto our deque and send all full batches.
229
230
        :param metrics: The list of pre-formated metrics from our renderer.
231
        :type metrics: list
232
        """
233
        # record this batch of metrics
234
        self.render.process(rset.results)
235
236
        # get the next full batch of metrics to send
237
        qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, False)
238
        if qsize > self.warn:
239
            self.log.warn("push", qsize=qsize)
240
        while nmetrics > 0:
241
            self.send(bmetrics)
242
            # and the next batch until there are no more full batches
243
            qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, False)
244
245
246
    def connect_tcp(self, maxtries=None):
247
        """Connect to graphite and maintain a connection. Returns True if
248
        the connection was made or False if it failed.
249
250
        :param maxtries: If defined attempt to connect at most this many times.
251
        :type maxtries: int
252
        :rtype: bool
253
        """
254
        self.log.debug("connect_tcp", action="connecting")
255
        if self.fd is not None:
256
            try:
257
                self.fd.close()
258
            except Exception as e:
259
                self.log.error("connect_tcp", action="close.fd", exception=e)
260
        if self.socket:
261
            try:
262
                self.socket.close()
263
            except Exception as e:
264
                self.log.error("connect_tcp", action="close.socket",
265
                               exception=e)
266
        connected = False
267
        while not connected:
268
            if maxtries is not None:
269
                maxtries -= 1
270
                if maxtries < 0:
271
                    self.log.error("connect_tcp", action="failed",
272
                                   address=self.addr)
273
                    return False
274
            try:
275
                self.log.info("connect_tcp", action="connecting",
276
                              address=self.addr)
277
                self.socket = socket.create_connection(self.addr, self.timeout)
278
                self.fd = self.socket.makefile("r+b", bufsize=0)
279
                connected = True
280
            except Exception as e:
281
                self.log.error("connect_tcp", exception=e)
282
                if self.fd is not None:
283
                    try:
284
                        self.fd.close()
285
                    except Exception as e:
286
                        pass
287
                if self.socket:
288
                    try:
289
                        self.socket.close()
290
                    except Exception as e:
291
                        pass
292
                time.sleep(self.retry)
293
294
        self.log.debug("connect_tcp", action="connected")
295
        return True
296
297
298
    def send_tcp(self, metrics, maxtries=None):
299
        """Send the metrics string to graphite.
300
301
        :param mstr: the metrics string to send.
302
        :type mstr: str
303
        """
304
        if not self.socket and self.fd:
305
            if not self.connect(maxtries=maxtries):
306
                msg = "dropping metrics - connection failed"
307
                self.log.debug("send_tcp", action="fail", msg=msg,
308
                               metrics=metrics)
309
        sent = False
310
        while not sent:
311
            try:
312
                self.fd.write("\n".join(metrics) + "\n")
313
                self.fd.flush()
314
                sent = True
315
            except Exception as e:
316
                self.log.error("send_tcp", exception=e)
317
                # reconnect
318
                self.connect()
319
                time.sleep(self.retry)
320
321
322
    def connect_udp(self, maxtries=None):
323
        """Setup a UDP socket.
324
325
        :param maxtries: If defined attempt to connect at most this many times.
326
        :type maxtries: int
327
        :rtype: bool
328
        """
329
        self.log.debug("connect_udp", action="socket")
330
        if self.socket:
331
            try:
332
                self.socket.close()
333
            except Exception as e:
334
                self.log.error("connect_udp", action="close.socket",
335
                               exception=e)
336
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
337
        self.log.debug("connect_udp", action="created")
338
        return True
339
340
341
    def send_udp(self, metrics, maxtries=None):
342
        """Send the metrics string to graphite.
343
344
        :param mstr: the metrics string to send.
345
        :type mstr: str
346
        """
347
        mstr = "\n".join(metrics) + "\n"
348
        str_len = len(mstr)
349
        sent = False
350
        while not sent:
351
            try:
352
                self.socket.sendto(mstr, self.addr)
353
                sent = True
354
            except Exception as e:
355
                self.log.debug("send_udp", exception=e)
356
                time.sleep(self.retry)
357