Completed
Push — master ( 8d751b...7e5eed )
by Kenny
58s
created

plumd.plugins.writers.Graphite.send_tcp()   B

Complexity

Conditions 4

Size

Total Lines 22

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 4
dl 0
loc 22
rs 8.9197
1
# -*- coding: utf-8 -*-
2
3
__author__ = 'Kenny Freeman'
4
__email__ = '[email protected]'
5
__license__ = "ISCL"
6
__docformat__ = 'reStructuredText'
7
8
import sys
9
import time
10
import socket
11
12
import plumd
13
import plumd.util
14
import plumd.plugins
15
16
17
def format_meta(meta, sep):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
18
    """Return a formatted metadata string from the Meta object.
19
20
    :param meta: :class:`plumd.Meta` object that describes a metrics metadata.
21
    :type meta: plumd.Meta
22
    :param sep: seperator character for metadata values (eg. =).
23
    :type sep: str
24
    :rtype: str
25
    """
26
    # generate metadata string
27
    mmeta_dat = []
28
    for mkey, mval in meta.items:
29
        # eg. nic, metric(name='nic', value='eth0')
30
        mkey = plumd.util.replace_chars(mkey)
31
        mval = mval[1]
32
        if isinstance(mval, str):
33
            mval = plumd.util.replace_chars(mval)
34
        elif isinstance(mval, bool):
35
            mval = "true" if mval else "false"
36
        mmeta_dat.append("{0}{1}{2}".format(mkey, sep, mval))
37
    return ".".join(mmeta_dat)
38
39
40
def get_metric_str(prefix, hmeta_str, mmeta_str, metric, rname):
41
    """Return a string representation of a metric.
42
43
    :param prefix: the metric prefix to use (eg. servers)
44
    :type prefix: str
45
    :param hmeta_str: the host metadata string.
46
    :type hmeta_str: str
47
    :param mmeta_str: the metric metadata string.
48
    :type mmeta_str: str
49
    :param metric: the metric.
50
    :type metric: plumd.Integer or plumd.Float or plumd.String
51
    :param rname: the result set name (eg. cpu).
52
    :type rname: str
53
    :rtype: str
54
    """
55
    args = []
56
    # both host and metric metadata exists
57
    if hmeta_str:
58
        args = [ prefix, hmeta_str, rname, metric[0] ]
59
    else:
60
        args = [ prefix, rname, metric[0] ]
61
    if mmeta_str:
62
        args.append(mmeta_str)
63
    return ".".join(args)
64
65
66
class GraphiteRender(plumd.Render):
67
    """Renders metrics in graphite plain text or pickle format. Plaintext is:
68
69
    <prefix>.<hmeta=hval>.<measurement>.<metric>.<mmeta=mval>.value <value> <ts>
70
71
    where host and metric metadata keys are sorted before being added to the
72
    metric name and the seperator character is configurable. eg.
73
74
    servers.env=test.region=test.cpu.idle.value 0.0 1452240380
75
    servers.env=test.region=test.disk.free.dev=sda1.value 1213234232 1452240380
76
77
    or with a . seperator character:
78
79
    servers.env.test.region.test.cpu.idle.value 0.0 1452240380
80
    servers.env.test.region.test.disk.free.dev.sda1.value 1213234232 1452240380
81
82
    the pickle format uses tuples:
83
84
    (<metric>, (<timestamp>, <value>))
85
86
    and sends pickled lists of tuples.
87
88
    raises:
89
        plumd.ConfigError if the configured format is unknown
90
91
    :param rconfig: A Conf object initialized with the plugin configuration.
92
    :type rconfig: plumd.conf.Conf
93
    :raises: plumd.ConfigError
94
    """
95
    def __init__(self, rconfig):
96
        super(GraphiteRender, self).__init__(rconfig)
97
        # get the configured values from the plugin configuration
98
        self.prefix = rconfig.get("prefix")
99
        self.format = rconfig.get("format").lower()
100
        if self.format.lower() not in ['plain', 'pickle']:
101
            err = "unknown graphite format: {0}".format(self.format)
102
            raise plumd.ConfigError(err)
103
        self.seperator = rconfig.get("seperator")
104
        if self.seperator and isinstance(self.seperator, str):
105
            self.seperator = self.seperator[0]
106
107
        # generate the host meta data string - assumes static host metadata
108
        self.hmeta_str = format_meta(self.meta, self.seperator)
109
110
111
    def process(self, results):
112
        """Process the metrics in the result set into the graphite plain text
113
        format.
114
115
        :param results: a generator that returns metrics in a standard format
116
        :type metrics: generator
117
        """
118
119
        # render each metric in the graphite plaintext format
120
        # (self._time, robj.name, robj.meta, robj.metrics)
121
        for (ts, rname, rmeta, metrics) in results:
122
            # get the metadata for this metric
123
            mmeta_str = format_meta(rmeta, self.seperator)
124
125
            # generate a metric string for each metric
126
            for metric in metrics:
127
                # silently ignore strings
128
                if metric.__class__ == plumd.String:
129
                    continue
130
131
                mval = metric[1]
132
133
                # for now at least, booleans are either really big or 0
134
                if metric.__class__ == plumd.Boolean:
135
                    mval = sys.maxint if mval else 0
136
137
                # now combine all values
138
                # <pfx>.<hmet=val>.<meas>.<met>.<mmet=val> <value> <ts>
139
                met_str = get_metric_str(self.prefix, self.hmeta_str,
140
                                         mmeta_str, metric, rname)
141
142
                # add the rendered metric string to our deque of metrics
143
                if self.format == "plain":
144
                    fmstr = "{0} {1} {2}".format(met_str, mval, ts.timestamp)
145
                    self.metrics.append(fmstr)
146
                elif self.format == "pickle":
147
                    self.metrics.append( ( met_str, (ts.timestamp, mval) ) )
148
149
150
class Graphite(plumd.plugins.Writer):
151
    """Graphite sender."""
152
    defaults = {
153
        'prefix':           "servers",
154
        'host':             '127.0.0.1',
155
        'port':             2003,
156
        'protocol':         "tcp",
157
        'format':           "plain", # plain or pickle
158
        'seperator':        "=",     # metadata seperator
159
        'batch':            64,      # send batches of this size
160
        'retry':            30,      # delay between connection attempts
161
        'retries':          3,       # retry sends this many times
162
        'timeout':          5,       # new connection timeout
163
        'maxqueue':         8192,    # maximum number of metrics to queue
164
        'warnqueue':        1024     # print warning if queue size > this
165
    }
166
167
    def __init__(self, log, config):
168
        """Graphite sender. WIP
169
170
        :param log: a structlog logger instance.
171
        :type log: structlog logger
172
        :param config: a plumd.config.Conf configuration helper instance.
173
        :type config: plumd.config.Conf
174
        """
175
        super(Graphite, self).__init__(log, config)
176
        self.addr = (config.get('host'), config.get('port'))
177
        self.retry = config.get('retry')
178
        self.retries = config.get('retries')
179
        self.timeout = config.get('timeout')
180
        self.proto = config.get("protocol")
181
        self.batch = config.get('batch')
182
        self.warn = config.get('warnqueue')
183
        self.socket = None
184
        self.fd = None
185
        self.connected = False
186
        self.render = GraphiteRender(self.config)
187
        if self.proto == "tcp":
188
            self.connect = self.connect_tcp
189
            self.send = self.send_tcp
190
        elif self.proto == "udp":
191
            self.connect = self.connect_udp
192
            self.send = self.send_udp
193
        else:
194
            raise plumd.ConfigError("unkown protocol: {0}".format(self.proto))
195
196
197
    def onstart(self):
198
        """Connect to the graphite server on startup."""
199
        self.connect(maxtries=1) # don't block the calling thread on startup!
200
201
202
    def onstop(self):
203
        """Flush remaining metrics and disconnect socket on shutdown."""
204
        # get the next full batch of metrics to send
205
        self.log.debug("onstop", action="flushing")
206
        qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, True)
207
        if qsize > self.warn:
208
            self.log.warn("onstop", qsize=qsize, warn=self.warn)
209
        while nmetrics > 0:
210
            self.log.debug("onstop", action="sending", metrics=bmetrics)
211
            self.send(bmetrics, maxtries=1)
212
            # and the next batch until there are no more full batches
213
            qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, True)
214
        self.log.debug("onstop", action="flushed")
215
        # flush and close fd, socket
216
        if self.fd is not None:
217
            try:
218
                self.fd.flush()
219
                self.fd.close()
220
            except Exception as e:
221
                self.log.error("onstop", action="close.fd", exception=e)
222
        if self.socket is not None:
223
            try:
224
                self.socket.close()
225
            except Exception as e:
226
                self.log.error("onstop", action="close.socket", exception=e)
227
228
229
    def push(self, rset):
230
        """Render metrics onto our deque and send all full batches.
231
232
        :param metrics: The list of pre-formated metrics from our renderer.
233
        :type metrics: list
234
        """
235
        # record this batch of metrics
236
        self.render.process(rset.results)
237
238
        # get the next full batch of metrics to send - next_batch pop_left's
239
        qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, False)
240
        if qsize > self.warn:
241
            self.log.warn("push", qsize=qsize)
242
        while nmetrics > 0:
243
            self.send(bmetrics)
244
            # and the next batch until there are no more full batches
245
            qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, False)
246
247
248
    def disconnect_tcp(self):
249
        """Severe the graphite connection."""
250
        if self.fd is not None:
251
            try:
252
                self.fd.close()
253
            except Exception as e:
254
                self.log.error("disconnect_tcp", action="close.fd", exception=e)
255
        if self.socket:
256
            try:
257
                self.socket.close()
258
            except Exception as e:
259
                self.log.error("disconnect_tcp", action="close.socket",
260
                               exception=e)
261
        self.connected = False
262
263
264
    def connect_tcp(self, maxtries=None):
265
        """Connect to graphite and maintain a connection. Returns True if
266
        the connection was made or False if it failed.
267
268
        :param maxtries: If defined attempt to connect at most this many times.
269
        :type maxtries: int
270
        :rtype: bool
271
        """
272
        self.log.debug("connect_tcp", action="connecting")
273
        self.disconnect_tcp()
274
        self.connected = False
275
        if maxtries is None:
276
            maxtries = 1
277
        while maxtries > 0:
278
            try:
279
                self.log.info("connect_tcp", action="connecting",
280
                              address=self.addr)
281
                self.socket = socket.create_connection(self.addr, self.timeout)
282
                self.fd = self.socket.makefile("r+b", bufsize=0)
283
                self.connected = True
284
                self.log.debug("connect_tcp", action="connected")
285
                break
286
            except Exception as e:
287
                self.log.error("connect_tcp", exception=e)
288
                self.disconnect_tcp()
289
                time.sleep(self.retry)
290
            maxtries -= 1
291
        return self.connected
292
293
294
    def send_tcp(self, metrics, maxtries=None):
295
        """Send the metrics string to graphite.
296
297
        todo: add support for pickle format, currently assumes plain text
298
299
        :param mstr: the metrics string to send.
300
        :type mstr: str
301
        """
302
        while not self.connected:
303
            self.connect(maxtries=maxtries)
304
305
        sent = False
306
        while not sent:
307
            try:
308
                self.fd.write("\n".join(metrics) + "\n")
309
                self.fd.flush()
310
                sent = True
311
            except Exception as e:
312
                self.log.error("send_tcp", exception=e)
313
                # reconnect
314
                self.connect()
315
                time.sleep(self.retry)
316
317
318
    def connect_udp(self, maxtries=None):
319
        """Setup a UDP socket.
320
321
        :param maxtries: If defined attempt to connect at most this many times.
322
        :type maxtries: int
323
        :rtype: bool
324
        """
325
        self.log.debug("connect_udp", action="socket")
326
        if self.socket:
327
            try:
328
                self.socket.close()
329
            except Exception as e:
330
                self.log.error("connect_udp", action="close.socket",
331
                               exception=e)
332
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
333
        self.log.debug("connect_udp", action="created")
334
        return True
335
336
337
    def send_udp(self, metrics, maxtries=None):
338
        """Send the metrics string to graphite.
339
340
        :param mstr: the metrics string to send.
341
        :type mstr: str
342
        """
343
        mstr = "\n".join(metrics) + "\n"
344
        #str_len = len(mstr)
345
        sent = False
346
        while not sent:
347
            try:
348
                self.socket.sendto(mstr, self.addr)
349
                sent = True
350
            except Exception as e:
351
                self.log.debug("send_udp", exception=e)
352
                time.sleep(self.retry)
353