Completed
Push — master ( d058c9...ca4b9c )
by Kenny
45s
created

plumd.plugins.writers.InfluxdbRender   A

Complexity

Total Complexity 25

Size/Duplication

Total Lines 102
Duplicated Lines 0 %
Metric Value
dl 0
loc 102
rs 10
wmc 25

2 Methods

Rating   Name   Duplication   Size   Complexity  
F process() 0 62 18
B __init__() 0 18 7
1
# -*- coding: utf-8 -*-
2
3
__author__ = 'Kenny Freeman'
4
__email__ = '[email protected]'
5
__license__ = "ISCL"
6
__docformat__ = 'reStructuredText'
7
8
import copy
9
import socket
10
import logging
11
import httplib
12
13
import requests # pip install requests
14
15
import plumd
16
import plumd.plugins
17
import plumd
18
19
# from: http://stackoverflow.com/questions/11029717/how-do-i-disable-log-messages-from-the-requests-library
20
# http requests were generating log entries.. were.
21
httplib.HTTPConnection.debuglevel = 0
22
rlog = logging.getLogger("requests.packages.urllib3")
23
rlog.addHandler(logging.NullHandler())
24
rlog.setLevel(logging.ERROR)
25
rlog.propagate = True
26
27
28
class InfluxdbRender(plumd.Render):
29
    """Renders metrics in an influxdb writable format.
30
31
    As of this writing the latest influxdb line protocol documentation is at:
32
    https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/
33
34
    Plumd renders metrics like:
35
36
    cpu,host=some,meta=values idle=90.0,user=5.0,system=5.0 <ts>\n
37
    disk,host=some,meta=values,dev=/dev/sda free=95.0,used=5.0 <ts>\n
38
39
    where host and metric metadata map to measurement tags and the metric
40
    values for a single result get comma seperated as field values.
41
42
    Each result in a result set generates a new metric line and each metric
43
    line is queued in a deque for sending.
44
45
    :param rconfig: A Conf object initialized with the plugin configuration.
46
    :type rconfig: plumd.conf.Conf
47
    """
48
    def __init__(self, rconfig):
49
        super(InfluxdbRender, self).__init__(rconfig)
50
51
        # use a lambda for escaping
52
        escmd = lambda t: t.replace(" ", "\ ").replace(",", "\,").replace("\n", "\\n")
53
54
        # generate host metadata string once on init
55
        meta_dat = []
56
        for hmkey, hmval in self.meta.items:
57
            hmkey = escmd(hmkey)
58
            if isinstance(hmval, str):
59
                hmval = escmd(hmval)
60
            elif isinstance(hmval, int):
61
                hmval = "{0}i".format(hmval)
62
            elif isinstance(hmval, bool):
63
                hmval = "t" if hmval else "f"
64
            meta_dat.append("{0}={1}".format(hmkey, hmval))
65
        self.meta_str = ",".join(meta_dat)
66
67
68
    def process(self, results):
69
        """Process the metrics into the influxdb line protocol (as of v0.9).
70
71
        :param metrics: a generator that returns metrics in a standard format
72
        :type metrics: generator
73
        """
74
        # use lambdas for string fixups in metadata/fields
75
        escmd = lambda t: t.replace(" ", "\ ").replace(",", "\,").replace("\n", "\\n")
76
        escmf = lambda f: '""{0}""'.format(f).replace(" ", "\ ").replace(",", "\,").replace("\n", "\\n")
77
78
        # render each metric in the influxdb line format, append to self.metrics
79
        # this is a long function as I don't want to call outside the loop
80
        # ( time, result_name, result_meta, [metric, metric, metric] )
81
        for (ts, rname, rmeta, metrics) in results:
82
            # generate a metrics string eg. idle=0.0,user=90.0,sys=10.0
83
            met_str = ""
84
            for metric in metrics:
85
                mclass = metric.__class__
86
                key_str = escmd(metric[0])
87
                val_str = None
88
                # value => int:append i, bool: t or f, string dbl qut
89
                if mclass == plumd.Int or mclass == plumd.Counter:
90
                    val_str = "{0}i".format(metric[1])
91
                elif mclass == plumd.Boolean:
92
                    val_str = "t" if metric[1] else "f"
93
                elif mclass == plumd.String:
94
                    val_str = escmf(metric[1])
95
                else:
96
                    val_str = metric[1]
97
                if met_str:
98
                    met_str += ",{0}={1}".format(key_str, val_str)
99
                else:
100
                    met_str = "{0}={1}".format(key_str, val_str)
101
102
            # next add any metadata from the result
103
            rmdat = []
104
            # combine host and metric metadata
105
            for r_mkey, r_mval in rmeta.items:
106
                r_mkey = escmd(r_mkey)
107
                if isinstance(r_mval, str):
108
                    r_mval = escmd(r_mval)
109
                elif isinstance(r_mval, int):
110
                    r_mval = "{0}i".format(r_mval)
111
                elif isinstance(r_mval, bool):
112
                    r_mval = "t" if r_mval else "f"
113
                rmdat.append("{0}={1}".format(r_mkey, r_mval))
114
            rmstr = ",".join(rmdat)
115
116
            # finally create the combined metadata string
117
            meta_str = rname
118
            if self.meta_str:
119
                meta_str += "," + self.meta_str
120
            if rmstr:
121
                meta_str += "," + rmstr
122
123
            # now combine it all - note influx uses nanosecond timestamps
124
            # see http://crsmithdev.com/arrow/, X is timestamp and SSS is ms
125
            ts_fmt = "XSSS000000"
126
            mstr = "{0} {1} {2}".format(meta_str, met_str, ts.format(ts_fmt))
127
            # add the rendered metric string to our deque of metrics
128
            # this allows for "\n".join() when sending
129
            self.metrics.append(mstr)
130
131
132
class Influxdb(plumd.plugins.Writer):
133
    """Influxdb sender."""
134
    defaults = {
135
        'username': 'admin',
136
        'password': 'admin',
137
        'database': 'metrics',
138
        'host':     '127.0.0.1',
139
        'port':     8086,
140
        'protocol': "http", # http or udp : todo: udp
141
        'batch':    64,     # send batches of this size
142
        'retries':  3,      # retry failed requests
143
        'timeout':  5,      # timeout http posts in 5 seconds
144
        'maxqueue': 8192,   # maximum number of metrics to queue
145
        'warnqueue': 1024   # print warning if queue size > this
146
    }
147
148
    def __init__(self, log, config):
149
        """Influxdb sender.
150
151
        :param log: a structlog logger instance.
152
        :type log: structlog logger
153
        :param config: a plumd.config.Conf configuration helper instance.
154
        :type config: plumd.config.Conf
155
        """
156
        super(Influxdb, self).__init__(log, config)
157
        self.config = config
158
        proto = config.get('protocol')
159
        self.batch = config.get('batch')
160
        self.warn = config.get('warnqueue')
161
        self.socket = None
162
        if proto.lower() == "http":
163
            self.session = requests.Session()
164
            self.session.auth = (config.get('username'), config.get('password'))
165
            content_type = {'Content-type': 'application/octet-stream'}
166
            self.session.headers.update(content_type)
167
            self.send = self.send_http
168
            args = [ config.get('host'), config.get('port'),
169
                     config.get('database') ]
170
            self.url = "http://{0}:{1}/write?db={2}".format(*args)
171
            self.retries = config.get('retries')
172
            self.timeout = config.get('timeout')
173
            self.log.debug("__init__", timeout=self.timeout, url=self.url)
174
        elif proto.lower() == "udp":
175
            # udp has no auth or db (configured on receiver side)
176
            self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
177
            self.addr = (config.get('host'), config.get('port'))
178
            self.send = self.send_udp
179
            self.log.debug("__init__", timeout=self.timeout, address=self.addr)
180
        else:
181
            raise plumd.ConfigError("unknown protocol: {0}".format(self.proto))
182
        self.render = InfluxdbRender(self.config)
183
184
185
    def send_udp(self, metrics):
186
        """Send the batch of metrics to influxdb via UDP protocol.
187
188
        :param metrics: The list of pre-formated metrics from our renderer.
189
        :type metrics: list
190
        """
191
        mstr = "\n".join(metrics) + "\n"
192
        str_len = len(mstr)
193
        try:
194
            self.socket.sendto(mstr, self.addr)
195
        except Exception as e:
196
            self.log.debug("send_udp", exception=e)
197
            return
198
199
200
    def send_http(self, metrics):
201
        """Send the batch of metrics to influxb.
202
203
        :param metrics: The list of pre-formated metrics from our renderer.
204
        :type metrics: list
205
        """
206
        rdat = "\n".join(metrics) + "\n"
207
        url = self.url
208
        nretries = self.retries
209
        timeout = self.timeout
210
        while nretries > 0:
211
            nretries -= 1
212
            resp = None
213
            try:
214
                resp = self.session.post(url, data=rdat, timeout=timeout)
215
            except Exception as e:
216
                msg = "exception during post"
217
                rtxt = "None"
218
                if resp is not None and hasattr(resp, "text"):
219
                    rtxt = resp.text
220
                self.log.error("send_http", exception=e, msg=msg, response=rtxt)
221
                continue
222
            # requests module ok codes doesn't include 204...
223
            if resp.status_code  >= 200 and resp.status_code < 300:
224
                return
225
            else:
226
                self.log.debug("send_http", action="retrying",
227
                               response=resp.status_code, text=resp.text)
228
        self.log.debug("send_http", action="failed", reason="too many retries")
229
230
231
    def onstop(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
232
        """Flush any remaining metrics."""
233
        # get the next full batch of metrics to send
234
        self.log.debug("onstop", action="flushing")
235
        qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, True)
236
        if qsize > self.warn:
237
            self.log.warn("onstop", qsize=qsize, warn=self.warn)
238
        while nmetrics > 0:
239
            self.log.debug("onstop", action="sending", metrics=bmetrics)
240
            self.send(bmetrics)
241
            # and the next batch until there are no more full batches
242
            qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, True)
243
        self.log.debug("onstop", action="flushed")
244
245
246
    def push(self, rset):
247
        """Render metrics onto our deque and send all full batches.
248
249
        :param metrics: The list of pre-formated metrics from our renderer.
250
        :type metrics: list
251
        """
252
        # record this batch of metrics
253
        self.render.process(rset.results)
254
255
        # get the next full batch of metrics to send
256
        qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, False)
257
        if qsize > self.warn:
258
            self.log.warn("push", qsize=qsize)
259
        while nmetrics > 0:
260
            self.send(bmetrics)
261
            # and the next batch until there are no more full batches
262
            qsize, nmetrics, bmetrics = self.render.next_batch(self.batch,
263
                                                               False)
264