Completed
Push — master ( 8d751b...7e5eed )
by Kenny
58s
created

plumd.plugins.writers.InfluxdbRender.__init__()   A

Complexity

Conditions 1

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 5
rs 9.4285
1
# -*- coding: utf-8 -*-
2
3
__author__ = 'Kenny Freeman'
4
__email__ = '[email protected]'
5
__license__ = "ISCL"
6
__docformat__ = 'reStructuredText'
7
8
import socket
9
import logging
10
import httplib
11
12
import requests # pip install requests
13
14
import plumd
15
import plumd.util
16
import plumd.plugins
17
18
# from: http://stackoverflow.com/questions/11029717/how-do-i-disable-log-messages-from-the-requests-library
19
# http requests were generating log entries.. were.
20
httplib.HTTPConnection.debuglevel = 0
21
rlog = logging.getLogger("requests.packages.urllib3")
22
rlog.addHandler(logging.NullHandler())
23
rlog.setLevel(logging.ERROR)
24
rlog.propagate = True
25
26
27
def format_key_meta(meta):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
28
    """Return a formatted metadata string for use as a string representing
29
    part of an influxdb measurement key.
30
31
    :param meta: :class:`plumd.Meta` object.
32
    :type meta: plumd.Meta
33
    :rtype: str
34
    """
35
    # generate metadata string
36
    mmeta_dat = []
37
    for mkey, mval in meta.items:
38
        mkey = plumd.util.escape_tag(mkey)
39
        mval = mval[1]
40
        if isinstance(mval, str):
41
            mval = plumd.util.escape_tag(mval)
42
        elif isinstance(mval, bool):
43
            mval = "t" if mval else "f"
44
        mmeta_dat.append("{0}={1}".format(mkey, mval))
45
    return ",".join(mmeta_dat)
46
47
48
def format_field_metrics(metrics):
49
    """Return a list of formatted strings for use as a string representing
50
    part of an influxdb measurement field.
51
52
    :param metrics: The list of metrics to format.
53
    :type metrics: list
54
    :rtype: str
55
    """
56
    ret = []
57
    for metric in metrics:
58
        # generate metadata string
59
        mname = plumd.util.escape_tag(metric[0])
60
        mval = metric[1]
61
        if isinstance(mval, str):
62
            mval = plumd.util.escape_field(mval)
63
        elif isinstance(mval, bool):
64
            mval = "t" if mval else "f"
65
        elif isinstance(mval, int):
66
            mval = "{0}i".format(mval)
67
        ret.append("{0}={1}".format(mname, mval))
68
    return ret
69
70
71
class InfluxdbRender(plumd.Render):
72
    """Renders metrics in an influxdb writable format.
73
74
    As of this writing the latest influxdb line protocol documentation is at:
75
    https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/
76
77
    Plumd renders metrics like:
78
79
    cpu,host=some,meta=values idle=90.0,user=5.0,system=5.0 <ts>\n
80
    disk,host=some,meta=values,dev=/dev/sda free=95.0,used=5.0 <ts>\n
81
82
    where host and metric metadata map to measurement tags and the metric
83
    values for a single result get comma seperated as field values.
84
85
    Each result in a result set generates a new metric line and each metric
86
    line is queued in a deque for sending.
87
88
    :param rconfig: A Conf object initialized with the plugin configuration.
89
    :type rconfig: plumd.conf.Conf
90
    """
91
    def __init__(self, rconfig):
92
        super(InfluxdbRender, self).__init__(rconfig)
93
94
        # generate host metadata string once on init
95
        self.meta_str = format_key_meta(self.meta)
96
97
98
    def process(self, results):
99
        """Process the metrics into the influxdb line protocol (as of v0.9).
100
101
        :param metrics: a generator that returns metrics in a standard format
102
        :type metrics: generator
103
        """
104
        # render each metric in the influxdb line format, append to self.metrics
105
        # (self._time, robj.name, robj.meta, robj.metrics)
106
        for (ts, rname, rmeta, metrics) in results:
107
            # get the result metadata string, if any, for the key part
108
            rmeta_str = format_key_meta(rmeta)
109
110
            # get a list of formatted metric strings
111
            met_strs = format_field_metrics(metrics)
112
            # since each metric in metrics is from the same result
113
            # we can include them in the same line
114
            field_str = ",".join(met_strs)
115
116
            # finally create the combined metadata string, for the field part
117
            key_str = rname
118
            if self.meta_str:
119
                key_str = "{0},{1}".format(rname, self.meta_str)
120
            if rmeta_str:
121
                key_str = "{0},{1}".format(key_str, rmeta_str)
122
123
            # now combine it all - note influx uses nanosecond timestamps
124
            # see http://crsmithdev.com/arrow/, X is timestamp and SSS is ms
125
            ts_str = ts.format("XSSS000000")
126
127
            fmt = "{0} {1} {2}"
128
            mstr = fmt.format(key_str, field_str, ts_str)
129
            # add the rendered metric string to our deque of metrics
130
            # this allows for "\n".join() when sending
131
            self.metrics.append(mstr)
132
133
134
135
class Influxdb(plumd.plugins.Writer):
136
    """Influxdb sender."""
137
    defaults = {
138
        'username': 'admin',
139
        'password': 'admin',
140
        'database': 'metrics',
141
        'host':     '127.0.0.1',
142
        'port':     8086,
143
        'protocol': "http", # http or udp : todo: udp
144
        'batch':    64,     # send batches of this size
145
        'retries':  3,      # retry failed requests
146
        'timeout':  5,      # timeout http posts in 5 seconds
147
        'maxqueue': 8192,   # maximum number of metrics to queue
148
        'warnqueue': 1024   # print warning if queue size > this
149
    }
150
151
    def __init__(self, log, config):
152
        """Influxdb sender.
153
154
        :param log: a structlog logger instance.
155
        :type log: structlog logger
156
        :param config: a plumd.config.Conf configuration helper instance.
157
        :type config: plumd.config.Conf
158
        """
159
        super(Influxdb, self).__init__(log, config)
160
        self.config = config
161
        proto = config.get('protocol')
162
        self.batch = config.get('batch')
163
        self.warn = config.get('warnqueue')
164
        self.socket = None
165
        if proto.lower() == "http":
166
            self.session = requests.Session()
167
            self.session.auth = (config.get('username'), config.get('password'))
168
            content_type = {'Content-type': 'application/octet-stream'}
169
            self.session.headers.update(content_type)
170
            self.send = self.send_http
171
            args = [ config.get('host'), config.get('port'),
172
                     config.get('database') ]
173
            self.url = "http://{0}:{1}/write?db={2}".format(*args)
174
            self.retries = config.get('retries')
175
            self.timeout = config.get('timeout')
176
            self.log.debug("__init__", timeout=self.timeout, url=self.url)
177
        elif proto.lower() == "udp":
178
            # udp has no auth or db (configured on receiver side)
179
            self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
180
            self.addr = (config.get('host'), config.get('port'))
181
            self.send = self.send_udp
182
            self.log.debug("__init__", timeout=self.timeout, address=self.addr)
183
        else:
184
            raise plumd.ConfigError("unknown protocol: {0}".format(self.proto))
185
        self.render = InfluxdbRender(self.config)
186
187
188
    def send_udp(self, metrics):
189
        """Send the batch of metrics to influxdb via UDP protocol.
190
191
        :param metrics: The list of pre-formated metrics from our renderer.
192
        :type metrics: list
193
        """
194
        mstr = "\n".join(metrics) + "\n"
195
        #str_len = len(mstr)
196
        try:
197
            self.socket.sendto(mstr, self.addr)
198
        except Exception as e:
199
            self.log.debug("send_udp", exception=e)
200
            return
201
202
203
    def send_http(self, metrics):
204
        """Send the batch of metrics to influxb.
205
206
        :param metrics: The list of pre-formated metrics from our renderer.
207
        :type metrics: list
208
        """
209
        rdat = "\n".join(metrics) + "\n"
210
        url = self.url
211
        nretries = self.retries
212
        timeout = self.timeout
213
        while nretries > 0:
214
            nretries -= 1
215
            resp = None
216
            try:
217
                resp = self.session.post(url, data=rdat, timeout=timeout)
218
            except Exception as e:
219
                msg = "exception during post"
220
                rtxt = "None"
221
                if resp is not None and hasattr(resp, "text"):
222
                    rtxt = resp.text
223
                self.log.error("send_http", exception=e, msg=msg, response=rtxt)
224
                continue
225
            # requests module ok codes doesn't include 204...
226
            if resp.status_code  >= 200 and resp.status_code < 300:
227
                return
228
            else:
229
                self.log.debug("send_http", action="retrying",
230
                               response=resp.status_code, text=resp.text)
231
        self.log.debug("send_http", action="failed", reason="too many retries")
232
233
234
    def onstop(self):
235
        """Flush any remaining metrics."""
236
        # get the next full batch of metrics to send
237
        self.log.debug("onstop", action="flushing")
238
        qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, True)
239
        if qsize > self.warn:
240
            self.log.warn("onstop", qsize=qsize, warn=self.warn)
241
        while nmetrics > 0:
242
            self.log.debug("onstop", action="sending", metrics=bmetrics)
243
            self.send(bmetrics)
244
            # and the next batch until there are no more full batches
245
            qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, True)
246
        self.log.debug("onstop", action="flushed")
247
248
249
    def push(self, rset):
250
        """Render metrics onto our deque and send all full batches.
251
252
        :param metrics: The list of pre-formated metrics from our renderer.
253
        :type metrics: list
254
        """
255
        # record this batch of metrics
256
        self.render.process(rset.results)
257
258
        # get the next full batch of metrics to send
259
        qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, False)
260
        if qsize > self.warn:
261
            self.log.warn("push", qsize=qsize)
262
        while nmetrics > 0:
263
            self.send(bmetrics)
264
            # and the next batch until there are no more full batches
265
            qsize, nmetrics, bmetrics = self.render.next_batch(self.batch,
266
                                                               False)
267