Completed
Push — master ( d058c9...ca4b9c )
by Kenny
45s
created

plumd.plugins.writers.Elastic.send()   F

Complexity

Conditions 14

Size

Total Lines 50

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 14
dl 0
loc 50
rs 2.6835

How to fix   Complexity   

Complexity

Complex classes like plumd.plugins.writers.Elastic.send() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
3
__author__ = 'Kenny Freeman'
4
__email__ = '[email protected]'
5
__license__ = "ISCL"
6
__docformat__ = 'reStructuredText'
7
8
import itertools
9
import logging
10
import httplib
11
import json
12
13
import requests # pip install requests
14
15
import plumd
16
import plumd.plugins
17
18
# from: http://stackoverflow.com/questions/11029717/how-do-i-disable-log-messages-from-the-requests-library
19
# http requests were generating log entries.. were.
20
httplib.HTTPConnection.debuglevel = 0
21
rlog = logging.getLogger("requests.packages.urllib3")
22
rlog.addHandler(logging.NullHandler())
23
rlog.setLevel(logging.ERROR)
24
rlog.propagate = True
25
26
27
class ElasticRender(plumd.Render):
28
    """Renders metrics in an elasticsearch writable format which is simply
29
    a dictionary with the required bulk api parameters and metric details.
30
31
    NOTE: this class is considered experimental and is not fully functional.
32
33
    :param rconfig: A Conf object initialized with the plugin configuration.
34
    :type rconfig: plumd.conf.Conf
35
    """
36
37
    def __init__(self, rconfig):
38
        super(ElasticRender, self).__init__(rconfig)
39
        # get the configured values from the plugin configuration
40
        self.index = rconfig.get("index")
41
        self.type = rconfig.get("type")
42
43
44
    def process(self, results):
45
        """Process the results into a format suitable for bulk api calls.
46
47
        Results are in the format:
48
49
        ( time, result_name, result_meta, [metric, metric, metric] )
50
51
        :param metrics: a generator that returns metrics in a standard format
52
        :type metrics: generator
53
        """
54
        field_map = { # this is here to save a lookup in the loop below
55
            plumd.Int:        ('int', 'int_val'),
56
            plumd.Counter:    ('counter', 'int_val'),
57
            plumd.Float:      ('float', 'float_val'),
58
            plumd.Gauge:      ('gauge', 'float_val'),
59
            plumd.Rate:       ('rate', 'float_val'),
60
            plumd.Timer:      ('timer', 'float_val'),
61
            plumd.String:     ('string', 'str_val'),
62
            plumd.Boolean:    ('bool', 'bool_val')
63
        }
64
        for (ts, rname, rmeta, metrics) in results:
65
            rmetric = {
66
                'measurement': mname,
67
                'metric': name,
68
                'time': ts.isoformat()
69
            }
70
            if mclass not in field_map:
71
                rmetric['unk_val'] = val
72
                rmetric['metric_type'] = "unknown"
73
            else:
74
                metric_type, metric_key = field_map[mclass]
75
                rmetric[metric_key] = val
76
                rmetric["metric_type"] = metric_type
77
            # todo: how to handle this better? (key collisions this way)
78
            # fold host meta data into metric document
79
            for key, val in self.host_meta.items():
80
                if key not in rmetric:
81
                    rmetric[key] = val
82
            # fold metric meta data into metric document
83
            for key, val in meta.items():
84
                if key not in rmetric:
85
                    rmetric[key] = val
86
            # es bulk api wants newline separated json strings
87
            index = { "index": {"_index" : self.index } }
88
            action_str = "{0}\n{1}\n".format(json.dumps(index),
89
                                             json.dumps(rmetric))
90
            self.metrics.append(action_str)
91
92
93
class Elastic(plumd.plugins.Writer):
94
    """Elasticsearch sender."""
95
    defaults = {
96
        'proto':    'http',
97
        'hosts':    ['127.0.0.1:9200'],     # default list of hosts
98
        'timeout':  30,                     # todo: timeouts in es calls
99
        'index':    'metrics',              # name of es index for metrics
100
        'type':     'metric',               # type of document to create
101
        'batch':    64,                     # number of metrics per bulk req
102
        'retries':  3,                      # retry failed requests
103
        'timeout':  5,                      # timeout http post requests
104
        'maxqueue': 8192,                   # maximum number of metrics to queue
105
        'warnqueue': 256                    # print a warning if the queue > this
106
    }
107
108
    def __init__(self, log, config):
109
        """Elasticsearch sender.
110
111
        NOTE: this class is considered experimental and is not fully functional.
112
113
        :param log: a structlog logger instance.
114
        :type log: structlog logger
115
        :param config: a plumd.config.Conf configuration helper instance.
116
        :type config: plumd.config.Conf
117
        """
118
        super(Elastic, self).__init__(log, config)
119
        self.host_meta = config.get('meta')    # host level metadata
120
        self.hosts = config.get('hosts')
121
        self.index = config.get('index')
122
        self.dtype = self.config.get('type')
123
        self.batch = self.config.get('batch')
124
        self.warn = self.config.get('warnqueue')
125
        url = "{0}://{1}/{2}/{3}/_bulk/" # the documents have the index in them
126
        base = self.config.get('proto')
127
        hosts = self.config.get('hosts')
128
        urls = [ url.format(base, h, self.index, self.dtype) for h in hosts ]
129
        sessions = [ requests.Session() for host in hosts ]
130
        self.sessions = itertools.cycle(zip(urls, sessions))
131
        self.retries = config.get('retries')
132
        self.timeout = config.get('timeout')
133
        self.render = ElasticRender(self.config)
134
        self.log.debug("__init__", timeout=self.timeout)
135
136
137
    def send(self, metrics):
138
        """Render metrics onto our deque and send all full batches using
139
        the bulk api.
140
141
        :param metrics: The list of pre-formated metrics from our renderer.
142
        :type metrics: list
143
        """
144
        rdat = "\n".join(metrics) # es bulk api wants \n separated json strings
145
        self.log.debug("send", index=self.index, action="posting")
146
        # cycle through each url, retry up to self.retries count
147
        nretries = self.retries
148
        timeout = self.timeout
149
        for url, sess in self.sessions:
150
            nretries -= 1
151
            if nretries < 0:
152
                break
153
            (resp, took, errs, items, failed) = (None, "unknown", True, [], [])
154
            try:
155
                self.log.debug("send", request=url, data=rdat, timeout=timeout)
156
                resp = sess.post(url, data=rdat)
157
            except Exception as e:
158
                msg = "exception during post"
159
                rtxt = "None"
160
                if resp is not None and hasattr(resp, "text"):
161
                    rtxt = resp.text
162
                self.log.error("send", exception=e, msg=msg, response=rtxt)
163
                continue
164
            if resp.status_code  == requests.codes.ok:
165
                try:
166
                    robj = resp.json()
167
                    if 'took' in robj:
168
                        took = robj['took']
169
                    if 'errors' in robj:
170
                        errs = robj['errors']
171
                    if errs and 'items' in robj:
172
                        for item in robj['items']:
173
                            failed.append(item)
174
                except Exception as e:
175
                    msg = "error parsing response"
176
                    self.log.debug("send", exception=e, msg=msg)
177
                self.log.debug("send", action="SUCCESS", took=took,
178
                               errors=errs, failed=failed)
179
                break
180
            else:
181
                self.log.debug("send", action="retrying",
182
                               response=resp.status_code)
183
        if nretries > 0:
184
            self.log.debug("send", action="complete")
185
        else:
186
            self.log.debug("send", action="failed", reason="too many retries")
187
188
189
    def onstop(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
190
        """Flush any remaining metrics."""
191
        self.log.debug("onstop", action="flushing")
192
        (qsize, nmetrics, bmetrics) = self.render.next_batch(self.batch, True)
193
        if qsize > self.warn:
194
            self.log.warn("onstop", qsize=qsize, warn=self.warn)
195
        while nmetrics > 0:
196
            self.log.debug("push", action="sending", metrics=bmetrics)
197
            self.send(bmetrics)
198
            # and the next batch until there are no more full batches
199
            qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, True)
200
        self.log.debug("onstop", action="flushed")
201
202
203
    def push(self, rset):
204
        """Send metrics to elasticsearch."""
205
        self.log.debug("push", action="processing", index=self.index,
206
                       dtype=self.dtype)
207
        # record this batch of metrics
208
        self.render.process(rset.results)
209
210
        # get the next full batch of metrics to send
211
        qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, False)
212
        if qsize > self.warn:
213
            self.log.warn("push", qsize=qsize)
214
        while nmetrics > 0:
215
            self.log.debug("push", action="sending", metrics=bmetrics)
216
            self.send(bmetrics)
217
            # and the next batch until there are no more full batches
218
            qsize, nmetrics, bmetrics = self.render.next_batch(self.batch, False)
219
        self.log.debug("push", action="processed")
220