Completed
Push — master ( 822bc5...a4197e )
by Kenny
54s
created

plumd.plugins.writers.Graphite.disconnect_tcp()   B

Complexity

Conditions 5

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 5
dl 0
loc 14
rs 8.5454
1
# -*- coding: utf-8 -*-
2
3
__author__ = 'Kenny Freeman'
4
__email__ = '[email protected]'
5
__license__ = "ISCL"
6
__docformat__ = 'reStructuredText'
7
8
import sys
9
import time
10
import socket
11
12
import plumd
13
import plumd.util
14
import plumd.plugins
15
16
17
def get_metric_meta(meta, sep):
18
    """Return a formatted metric metadata.
19
20
    :param meta: :class:`plumd.Meta` object that describes a metrics metadata.
21
    :type meta: plumd.Meta
22
    :param sep: seperator character for metadata values (eg. =).
23
    :type sep: str
24
    :rtype: str
25
    """
26
    # generate metadata string
27
    mmeta_dat = []
28
    for mkey, mval in meta.items:
29
        # eg. nic, metric(name='nic', value='eth0')
30
        mkey = plumd.util.replace_chars(mkey)
31
        mval = mval[1]
32
        if isinstance(mval, str):
33
            mval = plumd.util.replace_chars(mval)
34
        elif isinstance(mval, bool):
35
            mval = "true" if mval else "false"
36
        mmeta_dat.append("{0}{1}{2}".format(mkey, sep, mval))
37
    return ".".join(mmeta_dat)
38
39
40
def get_metric_str(prefix, hmeta_str, mmeta_str, metric, rname):
41
    """Return a string representation of a metric.
42
43
    :param prefix: the metric prefix to use (eg. servers)
44
    :type prefix: str
45
    :param hmeta_str: the host metadata string.
46
    :type hmeta_str: str
47
    :param mmeta_str: the metric metadata string.
48
    :type mmeta_str: str
49
    :param metric: the metric.
50
    :type metric: plumd.Integer or plumd.Float or plumd.String
51
    :param rname: the result set name (eg. cpu).
52
    :type rname: str
53
    :rtype: str
54
    """
55
    args = []
56
    # both host and metric metadata exists
57
    if hmeta_str:
58
        args = [ prefix, hmeta_str, rname, metric[0] ]
59
    else:
60
        args = [ prefix, rname, metric[0] ]
61
    if mmeta_str:
62
        args.append(mmeta_str)
63
    return ".".join(args)
64
65
66
class GraphiteRender(plumd.Render):
67
    """Renders metrics in graphite plain text or pickle format. Plaintext is:
68
69
    <prefix>.<hmeta=hval>.<measurement>.<metric>.<mmeta=mval>.value <value> <ts>
70
71
    where host and metric metadata keys are sorted before being added to the
72
    metric name and the seperator character is configurable. eg.
73
74
    servers.env=test.region=test.cpu.idle.value 0.0 1452240380
75
    servers.env=test.region=test.disk.free.dev=sda1.value 1213234232 1452240380
76
77
    or with a . seperator character:
78
79
    servers.env.test.region.test.cpu.idle.value 0.0 1452240380
80
    servers.env.test.region.test.disk.free.dev.sda1.value 1213234232 1452240380
81
82
    the pickle format uses tuples:
83
84
    (<metric>, (<timestamp>, <value>))
85
86
    and sends pickled lists of tuples.
87
88
    raises:
89
        plumd.ConfigError if the configured format is unknown
90
91
    :param rconfig: A Conf object initialized with the plugin configuration.
92
    :type rconfig: plumd.conf.Conf
93
    :raises: plumd.ConfigError
94
    """
95
    def __init__(self, rconfig):
96
        super(GraphiteRender, self).__init__(rconfig)
97
        # get the configured values from the plugin configuration
98
        self.prefix = rconfig.get("prefix")
99
        self.format = rconfig.get("format").lower()
100
        if self.format.lower() not in ['plain', 'pickle']:
101
            err = "unknown graphite format: {0}".format(self.format)
102
            raise plumd.ConfigError(err)
103
        self.seperator = rconfig.get("seperator")
104
        if self.seperator and isinstance(self.seperator, str):
105
            self.seperator = self.seperator[0]
106
107
        # generate the host meta data string - assumes static host metadata
108
        self.hmeta_str = get_metric_meta(self.meta, self.seperator)
109
110
111
    def process(self, results):
112
        """Process the metrics in the result set into the graphite plain text
113
        format.
114
115
        :param results: a generator that returns metrics in a standard format
116
        :type metrics: generator
117
        """
118
119
        # render each metric in the graphite plaintext format
120
        # (self._time, robj.name, robj.meta, robj.metrics)
121
        for (ts, rname, rmeta, metrics) in results:
122
            # get the metadata for this metric
123
            mmeta_str = get_metric_meta(rmeta, self.seperator)
124
125
            # generate a metric string for each metric
126
            for metric in metrics:
127
                # silently ignore strings
128
                if metric.__class__ == plumd.String:
129
                    continue
130
131
                mval = metric[1]
132
133
                # for now at least, booleans are either really big or 0
134
                if metric.__class__ == plumd.Boolean:
135
                    mval = sys.maxint if mval else 0
136
137
                # now combine all values
138
                # <pfx>.<hmet=val>.<meas>.<met>.<mmet=val> <value> <ts>
139
                met_str = get_metric_str(self.prefix, self.hmeta_str,
140
                                         mmeta_str, metric, rname)
141
142
                # add the rendered metric string to our deque of metrics
143
                if self.format == "plain":
144
                    fmstr = "{0} {1} {2}".format(met_str, mval, ts.timestamp)
145
                    self.metrics.append(fmstr)
146
                elif self.format == "pickle":
147
                    self.metrics.append( ( met_str, (ts.timestamp, mval) ) )
148
149
150
class Graphite(plumd.plugins.Writer):
151
    """Graphite sender."""
152
    defaults = {
153
        'prefix':           "servers",
154
        'host':             '127.0.0.1',
155
        'port':             2003,
156
        'protocol':         "tcp",
157
        'format':           "plain", # plain or pickle
158
        'seperator':        "=",     # metadata seperator
159
        'batch':            64,      # send batches of this size
160
        'retry':            30,      # delay between connection attempts
161
        'retries':          3,       # retry sends this many times
162
        'timeout':          5,       # new connection timeout
163
        'maxqueue':         8192,    # maximum number of metrics to queue
164
        'warnqueue':        1024     # print warning if queue size > this
165
    }
166
167
    def __init__(self, log, config):
168
        """Graphite sender. WIP
169
170
        :param log: a structlog logger instance.
171
        :type log: structlog logger
172
        :param config: a plumd.config.Conf configuration helper instance.
173
        :type config: plumd.config.Conf
174
        """
175
        super(Graphite, self).__init__(log, config)
176
        self.addr = (config.get('host'), config.get('port'))
177
        self.retry = config.get('retry')
178
        self.retries = config.get('retries')
179
        self.timeout = config.get('timeout')
180
        self.proto = config.get("protocol")
181
        self.batch = config.get('batch')
182
        self.warn = config.get('warnqueue')
183
        self.socket = None
184
        self.fd = None
185
        self.connected = False
186
        self.render = GraphiteRender(self.config)
187
        if self.proto == "tcp":
188
            self.connect = self.connect_tcp
189
            self.send = self.send_tcp
190
        elif self.proto == "udp":
191
            self.connect = self.connect_udp
192
            self.send = self.send_udp
193
        else:
194
            raise plumd.ConfigError("unkown protocol: {0}".format(self.proto))
195
196
197
    def onstart(self):
198
        """Connect to the graphite server on startup."""
199
        self.connect(maxtries=1) # don't block the calling thread on startup!
200
201
202
    def onstop(self):
203
        """Flush remaining metrics and disconnect socket on shutdown."""
204
        # get the next full batch of metrics to send
205
        self.log.debug("onstop", action="flushing")
206
        qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, True)
207
        if qsize > self.warn:
208
            self.log.warn("onstop", qsize=qsize, warn=self.warn)
209
        while nmetrics > 0:
210
            self.log.debug("onstop", action="sending", metrics=bmetrics)
211
            self.send(bmetrics, maxtries=1)
212
            # and the next batch until there are no more full batches
213
            qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, True)
214
        self.log.debug("onstop", action="flushed")
215
        # flush and close fd, socket
216
        if self.fd is not None:
217
            try:
218
                self.fd.flush()
219
                self.fd.close()
220
            except Exception as e:
221
                self.log.error("onstop", action="close.fd", exception=e)
222
        if self.socket is not None:
223
            try:
224
                self.socket.close()
225
            except Exception as e:
226
                self.log.error("onstop", action="close.socket", exception=e)
227
228
229
    def push(self, rset):
230
        """Render metrics onto our deque and send all full batches.
231
232
        :param metrics: The list of pre-formated metrics from our renderer.
233
        :type metrics: list
234
        """
235
        # record this batch of metrics
236
        self.render.process(rset.results)
237
238
        # get the next full batch of metrics to send
239
        qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, False)
240
        if qsize > self.warn:
241
            self.log.warn("push", qsize=qsize)
242
        while nmetrics > 0:
243
            self.send(bmetrics)
244
            # and the next batch until there are no more full batches
245
            qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, False)
246
247
248
    def disconnect_tcp(self):
249
        """Severe the graphite connection."""
250
        if self.fd is not None:
251
            try:
252
                self.fd.close()
253
            except Exception as e:
254
                self.log.error("disconnect_tcp", action="close.fd", exception=e)
255
        if self.socket:
256
            try:
257
                self.socket.close()
258
            except Exception as e:
259
                self.log.error("disconnect_tcp", action="close.socket",
260
                               exception=e)
261
        self.connected = False
262
263
264
    def connect_tcp(self, maxtries=None):
265
        """Connect to graphite and maintain a connection. Returns True if
266
        the connection was made or False if it failed.
267
268
        :param maxtries: If defined attempt to connect at most this many times.
269
        :type maxtries: int
270
        :rtype: bool
271
        """
272
        self.log.debug("connect_tcp", action="connecting")
273
        self.disconnect_tcp()
274
        self.connected = False
275
        if maxtries is None:
276
            maxtries = 1
277
        while maxtries > 0:
278
            try:
279
                self.log.info("connect_tcp", action="connecting",
280
                              address=self.addr)
281
                self.socket = socket.create_connection(self.addr, self.timeout)
282
                self.fd = self.socket.makefile("r+b", bufsize=0)
283
                self.connected = True
284
                self.log.debug("connect_tcp", action="connected")
285
                break
286
            except Exception as e:
287
                self.log.error("connect_tcp", exception=e)
288
                self.disconnect_tcp()
289
                time.sleep(self.retry)
290
            maxtries -= 1
291
        return self.connected
292
293
294
    def send_tcp(self, metrics, maxtries=None):
295
        """Send the metrics string to graphite.
296
297
        :param mstr: the metrics string to send.
298
        :type mstr: str
299
        """
300
        while not self.connected:
301
            self.connect(maxtries=maxtries)
302
303
        sent = False
304
        while not sent:
305
            try:
306
                self.fd.write("\n".join(metrics) + "\n")
307
                self.fd.flush()
308
                sent = True
309
            except Exception as e:
310
                self.log.error("send_tcp", exception=e)
311
                # reconnect
312
                self.connect()
313
                time.sleep(self.retry)
314
315
316
    def connect_udp(self, maxtries=None):
317
        """Setup a UDP socket.
318
319
        :param maxtries: If defined attempt to connect at most this many times.
320
        :type maxtries: int
321
        :rtype: bool
322
        """
323
        self.log.debug("connect_udp", action="socket")
324
        if self.socket:
325
            try:
326
                self.socket.close()
327
            except Exception as e:
328
                self.log.error("connect_udp", action="close.socket",
329
                               exception=e)
330
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
331
        self.log.debug("connect_udp", action="created")
332
        return True
333
334
335
    def send_udp(self, metrics, maxtries=None):
336
        """Send the metrics string to graphite.
337
338
        :param mstr: the metrics string to send.
339
        :type mstr: str
340
        """
341
        mstr = "\n".join(metrics) + "\n"
342
        #str_len = len(mstr)
343
        sent = False
344
        while not sent:
345
            try:
346
                self.socket.sendto(mstr, self.addr)
347
                sent = True
348
            except Exception as e:
349
                self.log.debug("send_udp", exception=e)
350
                time.sleep(self.retry)
351