Completed
Push — master ( ee38bb...21f330 )
by Matthias
01:45 queued 26s
created

kafka_influxdb.encoder.Encoder.parse_line()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 3
rs 10
1
# PyPy does not have ultrajson
2
# See https://github.com/esnme/ultrajson/issues/98
3
try:
4
    import ujson as json
5
except ImportError:
6
    import json
7
8
import logging
9
10
11
class Encoder(object):
12
    """
13
    An encoder for the Collectd JSON format
14
    See https://collectd.org/wiki/index.php/JSON
15
16
    Sample measurements:
17
18
    [{"values":[0],"dstypes":["derive"],"dsnames":["value"],"time":1436372292.412,"interval":10.000,"host":"26f2fc918f50","plugin":"cpu","plugin_instance":"1","type":"cpu","type_instance":"interrupt"}]
19
20
    [
21
       {
22
         "values":  [1901474177],
23
         "dstypes":  ["counter"],
24
         "dsnames":    ["value"],
25
         "time":      1280959128,
26
         "interval":          10,
27
         "host":            "leeloo.octo.it",
28
         "plugin":          "cpu",
29
         "plugin_instance": "0",
30
         "type":            "cpu",
31
         "type_instance":   "idle"
32
       }
33
    ]
34
    """
35
    def encode(self, msg):
36
        measurements = []
37
38
        for line in msg.decode().split("\n"):
39
            try:
40
                # Set flag for float precision to get the same
41
                # results for Python 2 and 3.
42
                json_object = self.parse_line(line)
43
            except ValueError as e:
44
                logging.debug("Error in encoder: %s", e)
45
                continue
46
            for entry in json_object:
47
                measurement = [entry['plugin']]
48
                # Check if plugin_instance is set and not empty
49
                if 'plugin_instance' in entry and entry['plugin_instance']:
50
                    measurement.append('-')
51
                    measurement.append(entry['plugin_instance'])
52
                # Todo: Read all values from collect json message
53
                value = str(entry['values'][0])
54
                # Always use millisecond precision for the timestamp
55
                timestamp = "{:.3f}".format(entry['time'])
56
                measurement.extend([
57
                    '_',
58
                    entry['plugin'],
59
                    '-',
60
                    entry['type_instance'],
61
                    ',',
62
                    'host=',
63
                    entry['host'],
64
                    ' ',
65
                    'value',
66
                    '=',
67
                    value,
68
                    ' ',
69
                    timestamp
70
                ])
71
                measurements.append(''.join(measurement))
72
        return measurements
73
74
    @staticmethod
75
    def parse_line(line):
76
        return json.loads(line, {'precise_float': True})
77