Completed
Push — master ( 21f330...5cbcf8 )
by Matthias
01:02
created

kafka_influxdb.encoder.Encoder.format_measurement_name()   A

Complexity

Conditions 4

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 4
dl 0
loc 9
rs 9.2
1
# PyPy does not have ultrajson
2
# See https://github.com/esnme/ultrajson/issues/98
3
try:
4
    import ujson as json
5
except ImportError:
6
    import json
7
8
import logging
9
10
11
class Encoder(object):
12
    """
13
    An encoder for the Collectd JSON format
14
    See https://collectd.org/wiki/index.php/JSON
15
16
    Sample measurements:
17
18
    [{"values":[0],"dstypes":["derive"],"dsnames":["value"],"time":1436372292.412,"interval":10.000,"host":"26f2fc918f50","plugin":"cpu","plugin_instance":"1","type":"cpu","type_instance":"interrupt"}]
19
20
    [
21
       {
22
         "values":  [1901474177],
23
         "dstypes":  ["counter"],
24
         "dsnames":    ["value"],
25
         "time":      1280959128,
26
         "interval":          10,
27
         "host":            "leeloo.octo.it",
28
         "plugin":          "cpu",
29
         "plugin_instance": "0",
30
         "type":            "cpu",
31
         "type_instance":   "idle"
32
       }
33
    ]
34
    """
35
    def encode(self, msg):
36
        measurements = []
37
38
        for line in msg.decode().split("\n"):
39
            try:
40
                # Set flag for float precision to get the same
41
                # results for Python 2 and 3.
42
                json_object = self.parse_line(line)
43
            except ValueError as e:
44
                logging.debug("Error in encoder: %s", e)
45
                continue
46
            for entry in json_object:
47
                # people can customize the measurement name, tags much more flexible
48
                # to set plugin, plugin_instance as the measurement name, just need to pass ['plugin', 'plugin_instance']
49
                measurement = Encoder.format_measurement_name(entry, ['plugin', 'plugin_instance', 'type'])
50
                tags = Encoder.format_tags(entry, ['host', 'type_instance'])
51
                value = Encoder.format_value(entry)
52
                time = Encoder.format_time(entry)
53
                measurements.append(Encoder.compose_data(measurement, tags, value, time))
54
        return measurements
55
56
    @staticmethod
57
    def parse_line(line):
58
        # return json.loads(line, {'precise_float': True})
59
        # for influxdb version > 0.9, timestamp is an integer
60
        return json.loads(line)
61
62
    # following methods are added to support customizing measurement name, tags much more flexible
63
    @staticmethod
64
    def compose_data(measurement, tags, value, time):
65
        data = "%s,%s value=%s %s" % (measurement, tags, value, time)
66
        return data
67
68
    @staticmethod
69
    def format_measurement_name(entry, args):
70
        name = []
71
        for arg in args:
72
            if arg in entry:
73
                # avoid to add extra _ if some entry value is None
74
                if entry[arg] != '':
75
                    name.append(entry[arg])
76
        return '_'.join(name)
77
78
    @staticmethod
79
    def format_tags(entry, args):
80
        tag = []
81
        for arg in args:
82
            if arg in entry:
83
                # to avoid add None as tag value
84
                if entry[arg] != '':
85
                    tag.append("%s=%s" % (arg, entry[arg]))
86
        return ','.join(tag)
87
88
    @staticmethod
89
    def format_time(entry):
90
        return int(float(entry['time']))
91
92
    @staticmethod
93
    def format_value(entry):
94
        values = entry['values']
95
        if len(values) == 1:
96
            return entry['values'][0]
97
        else:
98
            # support to add multiple values
99
            value = ' '.join(str(value) for value in values)
100
            return '"%s"' % value
101
102
103