Completed
Push — master ( 42a2db...5f94f7 )
by Kenny
01:14
created

GraphiteRenderPlain.process()   D

Complexity

Conditions 9

Size

Total Lines 53

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 1 Features 2
Metric Value
cc 9
c 2
b 1
f 2
dl 0
loc 53
rs 4.9852

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
 # -*- coding: utf-8 -*-
2
3
__author__ = 'Kenny Freeman'
4
__email__ = '[email protected]'
5
__license__ = "ISCL"
6
__docformat__ = 'reStructuredText'
7
8
import re
9
import sys
10
import time
11
import socket
12
13
import plumd
14
#import plumd.util
15
import plumd.plugins
16
17
18
def format_meta(meta, sep, meta_key):
19
    """Return a formatted metadata string from the Meta object.
20
21
    :param meta: :class:`plumd.Meta` object that describes a metrics metadata.
22
    :type meta: plumd.Meta
23
    :param sep: seperator character for metadata values (eg. =).
24
    :type sep: str
25
    :param meta_key: flag to include meta data key values or not
26
    :type meta_key: bool
27
    :rtype: str
28
    """
29
    # generate metadata string
30
    mmeta_dat = []
31
    for mkey, mval in meta.items:
32
        # eg. nic, metric(name='nic', value='eth0')
33
        # mkey = plumd.util.replace_chars(mkey)
34
        mkey = re.sub('[^0-9a-zA-Z_\-]+', '', mkey)[0]
35
        mval = mval[1]
36
        if isinstance(mval, str):
37
            # mval = plumd.util.replace_chars(mval)
38
            mkey = re.sub('[^0-9a-zA-Z_\-]+', '', mkey)[0]
39
        elif isinstance(mval, bool):
40
            mval = "true" if mval else "false"
41
        if meta_key:
42
            mmeta_dat.append("{0}{1}{2}".format(mkey, sep, mval))
43
        else:
44
            mmeta_dat.append(mval)
45
    return ".".join(mmeta_dat)
46
47
48
def get_metric_str(prefix, hmeta_str, mmeta_str, metric, rname):
49
    """Return a string representation of a metric.
50
51
    :param prefix: the metric prefix to use (eg. servers)
52
    :type prefix: str
53
    :param hmeta_str: the host metadata string.
54
    :type hmeta_str: str
55
    :param mmeta_str: the metric metadata string.
56
    :type mmeta_str: str
57
    :param metric: the metric.
58
    :type metric: plumd.Integer or plumd.Float or plumd.String
59
    :param rname: the result set name (eg. cpu).
60
    :type rname: str
61
    :rtype: str
62
    """
63
    args = []
64
    # both host and metric metadata exists
65
    if hmeta_str:
66
        args = [ prefix, hmeta_str, rname, metric[0] ]
67
    else:
68
        args = [ prefix, rname, metric[0] ]
69
    if mmeta_str:
70
        args.append(mmeta_str)
71
    return ".".join(args)
72
73
74
class GraphiteRenderPlain(plumd.Render):
75
    """Renders metrics in graphite plain text format:
76
77
    <prefix>.<hmeta=hval>.<measurement>.<metric>.<mmeta=mval>.value <value> <ts>
78
79
    where host and metric metadata keys are sorted before being added to the
80
    metric name and the seperator character is configurable. eg.
81
82
    servers.env=test.region=test.cpu.idle.value 0.0 1452240380
83
    servers.env=test.region=test.disk.free.dev=sda1.value 1213234232 1452240380
84
85
    or with a . seperator character:
86
87
    servers.env.test.region.test.cpu.idle.value 0.0 1452240380
88
    servers.env.test.region.test.disk.free.dev.sda1.value 1213234232 1452240380
89
90
    :param rconfig: A Conf object initialized with the plugin configuration.
91
    :type rconfig: plumd.conf.Conf
92
    :raises: plumd.ConfigError
93
    """
94
    def __init__(self, rconfig):
95
        super(GraphiteRenderPlain, self).__init__(rconfig)
96
        # get the configured values from the plugin configuration
97
        self.prefix = rconfig.get("prefix")
98
        # record rendered metrics here, when bytes > max_bytes put into deque
99
        self.buff = ""
100
        self.buff_bytes = 0
101
        self.seperator = rconfig.get("seperator")
102
        self.meta_key = rconfig.get("meta_key")
103
        if self.seperator and isinstance(self.seperator, str):
104
            self.seperator = self.seperator[0]
105
106
        # generate the host meta data string - assumes static host metadata
107
        self.hmeta_str = format_meta(self.meta, self.seperator, self.meta_key)
108
109
110
    def flush(self):
111
        """Return and clear the partial metric buffer, also clear queue."""
112
        metrics = self.buff
113
        self.buff = ""
114
        self.buff_bytes = 0
115
        self.metrics.clear()
116
        return metrics
117
118
119
    def process(self, results):
120
        """Process the metrics in the result set into the graphite plain text
121
        format.
122
123
        :param results: a generator that returns metrics in a standard format
124
        :type results: generator
125
        """
126
        hmeta_str = self.hmeta_str
127
        prefix = self.prefix
128
        # render each metric in the graphite plaintext format
129
        for (ts, rname, rmeta, metrics) in results:
130
            # get the metadata for this metric
131
            mmeta_str = format_meta(rmeta, self.seperator, self.meta_key)
132
133
            # generate a metric string for each metric
134
            for metric in metrics:
135
                # silently ignore strings
136
                if metric.__class__ == plumd.String:
137
                    continue
138
139
                # booleans are either really big or 0
140
                mval = metric[1]
141
                if metric.__class__ == plumd.Boolean:
142
                    mval = sys.maxint if mval else 0
143
144
                # get the full metric string
145
                # adding this here instead of a function call to reduce cost
146
                if hmeta_str:
147
                    args = [ prefix, hmeta_str, rname, metric[0] ]
148
                else:
149
                    args = [ prefix, rname, metric[0] ]
150
                if mmeta_str:
151
                    args.append(mmeta_str)
152
                met_str = ".".join(args)
153
                #met_str = get_metric_str(self.prefix, self.hmeta_str,
154
                #                         mmeta_str, metric, rname)
155
156
                # now combine the values
157
                fmstr = "{} {:f} {}\n".format(met_str, mval, int(ts))
158
                fmstr_len = len(fmstr)
159
160
                # can we append to the existing buffer?
161
                if len(self.buff) + fmstr_len < self.max_bytes:
162
                    # todo: try using a list/deque here (or something else)
163
                    # as += is a tad expensive afaik
164
                    self.buff += fmstr
165
                    self.buff_bytes += fmstr_len
166
                    continue
167
168
                # or do we need to queue and start a new buffer?
169
                self.metrics.append(self.buff)
170
                self.buff = fmstr
171
                self.buff_bytes = fmstr_len
172
173
174
class Graphite(plumd.plugins.Writer):
175
    """Graphite sender."""
176
    ## todo: move things to render, writers, remove from here
177
    ## also todo: print warning for queue size in render not in Graphite
178
    defaults = {
179
        'prefix':           "servers",
180
        'host':             '127.0.0.1',
181
        'port':             2003,
182
        'protocol':         "tcp",
183
        'format':           "plain", # plain or pickle
184
        'seperator':        "=",     # metadata seperator
185
        'meta_key':         False,   # include metadata keys in metrics
186
        'batch':            64,      # send batches of this size
187
        'retry_time':       30,      # delay between connection attempts
188
        'timeout':          5,       # timeout for socket operations
189
        'maxqueue':         8192,    # maximum number of metrics to queue
190
        'warnqueue':        1024     # print warning if queue size > this
191
    }
192
193
    def __init__(self, log, config):
194
        """Graphite sender.
195
196
        :param log: A logger
197
        :type log: logging.RootLogger
198
        :param config: a plumd.config.Conf configuration helper instance.
199
        :type config: plumd.config.Conf
200
        """
201
        super(Graphite, self).__init__(log, config)
202
        config.defaults(Graphite.defaults)
203
        self.render = None
204
        fmt = config.get('format')
205
        fmts = {
206
            'plain': GraphiteRenderPlain
207
        }
208
        if fmt in fmts:
209
            self.render = fmts[fmt](self.config)
210
        else:
211
            raise plumd.ConfigError("unkown format: {0}".format(fmt))
212
        proto = config.get('protocol')
213
        self.writer = None
214
        writers = {
215
            'tcp': GraphiteWriteTCP,
216
            'udp': GraphiteWriteUDP
217
        }
218
        if proto in writers:
219
            self.writer = writers[proto](self.log, config)
220
        else:
221
            raise plumd.ConfigError("unkown protocol: {0}".format(proto))
222
223
224
    def onstop(self):
225
        """Flush remaining metrics and disconnect socket on shutdown."""
226
        # if we're not connected then drop all metrics and return
227
        if not self.writer.socket:
228
            msg = "graphite: onstop: not connected, dropping all queued metrics"
229
            self.log.error(msg)
230
            self.render.flush()
231
            return
232
233
        # ensure we don't take too long trying to flush metrics on shutdown
234
        start = time.time()
235
        for mbuff in self.render:
236
            self.writer.write(mbuff)
237
            if time.time() - start > 1.0:
238
                self.render.flush()
239
                msg = "graphite: onstop: unable to send metrics, dropping"
240
                self.log.error(msg)
241
                self.writer.disconnect()
242
                return
243
        mbuff = self.render.flush()
244
        if mbuff:
245
            self.writer.write(mbuff)
246
        self.writer.disconnect()
247
248
249
    def push(self, rset):
250
        """Render metrics onto our deque and send all full batches.
251
252
        :param metrics: The list of pre-formated metrics from our renderer.
253
        :type metrics: list
254
        """
255
        # record this batch of metrics
256
        if rset:
257
            self.render.process(rset.results)
258
259
        # get the next full batch of metrics to send - next_batch pop_left's
260
        resend = None
261
        for mbuff in self.render:
262
            metrics = self.writer.write(mbuff)
263
            if metrics:
264
                resend = metrics
265
                # do not continue if we're unable to send
266
                # this can cause significant delays on shutdown
267
                break
268
269
        # add metrics back onto queue if any
270
        if resend:
271
            self.log.debug("graphite: write: requeuing metrics")
272
            self.render.metrics.append(resend)
273
274
275
class GraphiteWriteTCP(object):
276
    """Graphite TCP writer."""
277
278
    def __init__(self, log, config):
279
        """Graphite TCP writer."""
280
        self.log = log
281
        self.addr = (config.get('host'), config.get('port'))
282
        self.socket = None
283
        self.timeout = config.get('timeout')
284
        self.retry_time = config.get('retry_time')
285
286
287
    def connect(self):
288
        """Connect to graphite and maintain a connection. Returns True if
289
        the connection was made or False if it failed.
290
291
        :rtype: bool
292
        """
293
        # disconnect if we're already connected
294
        if self.socket:
295
            self.disconnect()
296
        try:
297
            # create the socket
298
            self.socket = socket.socket(socket.AF_INET,
299
                                        socket.SOCK_STREAM)
300
            # set timeout for socket operations
301
            self.socket.settimeout(self.timeout)
302
            # connect
303
            self.socket.connect(self.addr)
304
            # log and return
305
            msg = "graphite: connected: {0}"
306
            self.log.info(msg.format(self.addr))
307
            return True
308
        except Exception as e:
309
            # log exception, ensure cleanup is done (disconnect)
310
            msg = "graphite: {0}: connect: excception: {1}"
311
            self.log.error(msg.format(self.addr, e))
312
            # pause before reconnecting
313
            time.sleep(self.retry_time)
314
            self.disconnect()
315
        return False
316
317
318
    def disconnect(self):
319
        """Severe the graphite connection."""
320
        if self.socket:
321
            try:
322
                self.socket.close()
323
                self.socket = None
324
            except Exception as e:
325
                msg = "graphite: dicsonnect: exception {0}".format(e)
326
                self.log.error(msg)
327
                self.socket = None
328
329
330
    def write(self, metrics):
331
        """Send the metrics string to graphite.
332
333
        :param metrics: the metrics string to send.
334
        :type metrics: str
335
        :rtype: bool
336
        """
337
        if not self.socket:
338
            self.connect()
339
        if not self.socket:
340
            msg = "graphite: write: connection failed"
341
            self.log.error(msg)
342
            return metrics # resend them
343
        try:
344
            self.socket.sendall(metrics)
345
        except Exception as e:
346
            msg = "graphite: {0}: write: exception: {1}"
347
            self.log.error(msg.format(self.addr, e))
348
            self.connect()
349
            return metrics # resend them
350
351
352
class GraphiteWriteUDP(object):
353
    """Graphite UDP writer."""
354
355
    def __init__(self, log, config):
356
        """Graphite UDP writer."""
357
        self.log = log
358
        self.addr = (config.get('host'), config.get('port'))
359
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
360
361
362
    def write_blocking(self, metrics):
363
        """Send the metrics to graphite and return number of bytes sent.
364
365
        :param metrics: the metrics string to send.
366
        :type metrics: str
367
        :rtype: int
368
        """
369
        return self.socket.sendto(metrics, self.addr)
370
371
372
    def write(self, metrics):
373
        """Send the metrics to graphite and return number of bytes sent.
374
375
        :param metrics: the metrics string to send.
376
        :type metrics: str
377
        :rtype: int
378
        """
379
        return self.socket.sendto(metrics, self.addr)
380