Encoder.format_measurement_name()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 4
dl 0
loc 9
rs 9.2
c 1
b 0
f 1
1
try:
2
    import ujson as json
3
except ImportError:
4
    import json
5
6
import logging
7
8
try:
9
    # Test for mypy support (requires Python 3)
10
    from typing import List, Text
11
except:
12
    pass
13
14
15
class Encoder(object):
16
    """
17
    An encoder for the Collectd JSON format
18
    See https://collectd.org/wiki/index.php/JSON
19
20
    Sample measurements:
21
22
    [{"values":[0],"dstypes":["derive"],"dsnames":["value"],"time":1436372292.412,"interval":10.000,"host":"26f2fc918f50","plugin":"cpu","plugin_instance":"1","type":"cpu","type_instance":"interrupt"}]
23
24
    [
25
       {
26
         "values":  [1901474177],
27
         "dstypes":  ["counter"],
28
         "dsnames":    ["value"],
29
         "time":      1280959128,
30
         "interval":          10,
31
         "host":            "leeloo.octo.it",
32
         "plugin":          "cpu",
33
         "plugin_instance": "0",
34
         "type":            "cpu",
35
         "type_instance":   "idle"
36
       }
37
    ]
38
39
    The following measurement format is also supported, which has more than one value for each sample.
40
    [{"values":[0.2, 0.3],"dstypes":["derive"],"dsnames":["cpu_usage", "mem_usage"],"time":1436372292.412,"interval":10.000,"host":"26f2fc918f50","plugin":"cpu","plugin_instance":"1","type":"cpu","type_instance":"interrupt"}]
41
    """
42
43
    def encode(self, msg):
44
        # type: (bytes) -> List[Text]
45
        measurements = []
46
47
        for line in msg.decode().split("\n"):
48
            try:
49
                # Set flag for float precision to get the same
50
                # results for Python 2 and 3.
51
                json_object = self.parse_line(line)
52
            except ValueError as e:
53
                logging.debug("Error in encoder: %s", e)
54
                continue
55
            for entry in json_object:
56
                try:
57
                    # to set plugin, plugin_instance as the measurement name, just need pass ['plugin', 'plugin_instance']
58
                    measurement = Encoder.format_measurement_name(
59
                        entry, ['plugin', 'plugin_instance', 'type'])
60
                    tags = Encoder.format_tags(
61
                        entry, ['host', 'type_instance'])
62
                    value = Encoder.format_value(entry)
63
                    time = Encoder.format_time(entry)
64
                    measurements.append(Encoder.compose_data(
65
                        measurement, tags, value, time))
66
                except Exception as e:
67
                    logging.debug("Error in input data: %s. Skipping.", e)
68
                    continue
69
        return measurements
70
71
    @staticmethod
72
    def parse_line(line):
73
        # return json.loads(line, {'precise_float': True})
74
        # for influxdb version > 0.9, timestamp is an integer
75
        return json.loads(line)
76
77
    # following methods are added to support customizing measurement name, tags much more flexible
78
    @staticmethod
79
    def compose_data(measurement, tags, value, time):
80
        data = "{0!s},{1!s} {2!s} {3!s}".format(measurement, tags, value, time)
81
        return data
82
83
    @staticmethod
84
    def format_measurement_name(entry, args):
85
        name = []
86
        for arg in args:
87
            if arg in entry:
88
                # avoid to add extra _ if some entry value is None
89
                if entry[arg] != '':
90
                    name.append(entry[arg])
91
        return '_'.join(name)
92
93
    @staticmethod
94
    def format_tags(entry, args):
95
        tag = []
96
        for arg in args:
97
            if arg in entry:
98
                # to avoid add None as tag value
99
                if entry[arg] != '':
100
                    tag.append("{0!s}={1!s}".format(arg, entry[arg]))
101
        return ','.join(tag)
102
103
    @staticmethod
104
    def format_time(entry):
105
        return int(float(entry['time']))
106
107
    @staticmethod
108
    def format_value(entry):
109
        values = entry['values']
110
        if len(values) == 1:
111
            return "value={0!s}".format(entry['values'][0])
112
        else:
113
            # influxdb supports writing a record with multiple field values.
114
            # e.g: 'cpu_load_short,host=server01,region=us-west mem=0.1,cpu=0.2 1422568543702900257'
115
            field_pairs = []
116
            for key, value in zip(entry['dsnames'], values):
117
                field_pairs.append("{0!s}={1!s}".format(key, value))
118
            return ','.join(field_pairs)
119