Completed
Push — master ( 5cbcf8...d3d059 )
by Matthias
01:08
created

Encoder.__init__()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
import logging
2
from kafka_influxdb.encoder.escape_functions import influxdb_tag_escaper
3
4
5
class Encoder(object):
6
    """
7
    An encoder for the Collectd Graphite ASCII format
8
    See https://collectd.org/wiki/index.php/Graphite
9
10
    Sample measurements this encoder can handle:
11
    [prefix.]host.plugin.measurement[.postfix] value timestamp
12
13
    26f2fc918f50.load.load.shortterm 0.05 1436357630
14
    26f2fc918f50.load.load.midterm 0.05 1436357630
15
    26f2fc918f50.load.load.longterm 0.05 1436357630
16
17
    26f2fc918f50.cpu-0.cpu-user 30364 1436357630
18
19
    26f2fc918f50.memory.memory-buffered 743657472 1436357630
20
21
    The optional prefix and postifx can be set in the collectd plugin:
22
    <Plugin write_kafka>
23
        Property "metadata.broker.list" "kafka:9092"
24
        <Topic "metrics">
25
            Format Graphite
26
            GraphitePrefix "myprefix"
27
            GraphitePostfix "mypostfix"
28
        </Topic>
29
    </Plugin>
30
    """
31
32
    def __init__(self):
33
        self.escape_tag = influxdb_tag_escaper()
34
35
    def encode(self,
36
               msg,
37
               delimiter='.',
38
               prefix='',
39
               prefix_tag=None,
40
               postfix='',
41
               postfix_tag=None,
42
               ):
43
        """
44
        :param msg: Payload from reader
45
        :param delimiter: Delimiter between Graphite series parts
46
        :param prefix: Graphite prefix string
47
        :param prefix_tag: Tag to use for Graphite prefix
48
        :param postfix: Graphite postfix string
49
        :param postfix_tag: Tag to use for Graphite postfix
50
        :return: A list of encoded messages
51
        """
52
        # One message could consist of several measurements
53
        measurements = []
54
55
        for line in msg.decode().split("\n"):
56
            try:
57
                series, value, timestamp = line.split()
58
            except ValueError as e:
59
                logging.debug("Error in encoder: %s", e)
60
                continue
61
            # Strip prefix and postfix:
62
            series = series[len(prefix):len(series) - len(postfix)]
63
            # Split into tags
64
            hostname, measurement = series.split(delimiter, 1)
65
            measurement = measurement.replace(delimiter, '_')
66
67
            tags = {
68
                "host": hostname
69
            }
70
            if prefix_tag:
71
                if prefix.endswith(delimiter):
72
                    prefix = prefix[:-len(delimiter)]
73
                tags[prefix_tag] = prefix
74
            if postfix_tag:
75
                if postfix.endswith(delimiter):
76
                    postfix = postfix[:-len(delimiter)]
77
                tags[postfix_tag] = postfix
78
79
            encoded = ''.join([
80
                str(measurement),
81
                ',',
82
                ','.join('{}={}'.format(self.escape_tag(k), self.escape_tag(tags[k])) for k in tags),
83
                ' value=',
84
                str(value),
85
                ' ',
86
                timestamp
87
            ])
88
            measurements.append(encoded)
89
        return measurements
90