1
|
|
|
import logging |
2
|
|
|
from kafka_influxdb.encoder.escape_functions import influxdb_tag_escaper |
3
|
|
|
|
4
|
|
|
|
5
|
|
|
class Encoder(object): |
6
|
|
|
""" |
7
|
|
|
An encoder for the Collectd Graphite ASCII format |
8
|
|
|
See https://collectd.org/wiki/index.php/Graphite |
9
|
|
|
|
10
|
|
|
Sample measurements this encoder can handle: |
11
|
|
|
[prefix.]host.plugin.measurement[.postfix] value timestamp |
12
|
|
|
|
13
|
|
|
26f2fc918f50.load.load.shortterm 0.05 1436357630 |
14
|
|
|
26f2fc918f50.load.load.midterm 0.05 1436357630 |
15
|
|
|
26f2fc918f50.load.load.longterm 0.05 1436357630 |
16
|
|
|
|
17
|
|
|
26f2fc918f50.cpu-0.cpu-user 30364 1436357630 |
18
|
|
|
|
19
|
|
|
26f2fc918f50.memory.memory-buffered 743657472 1436357630 |
20
|
|
|
|
21
|
|
|
The optional prefix and postifx can be set in the collectd plugin: |
22
|
|
|
<Plugin write_kafka> |
23
|
|
|
Property "metadata.broker.list" "kafka:9092" |
24
|
|
|
<Topic "metrics"> |
25
|
|
|
Format Graphite |
26
|
|
|
GraphitePrefix "myprefix" |
27
|
|
|
GraphitePostfix "mypostfix" |
28
|
|
|
</Topic> |
29
|
|
|
</Plugin> |
30
|
|
|
""" |
31
|
|
|
|
32
|
|
|
def __init__(self): |
33
|
|
|
self.escape_tag = influxdb_tag_escaper() |
34
|
|
|
|
35
|
|
|
def encode(self, |
36
|
|
|
msg, |
37
|
|
|
delimiter='.', |
38
|
|
|
prefix='', |
39
|
|
|
prefix_tag=None, |
40
|
|
|
postfix='', |
41
|
|
|
postfix_tag=None, |
42
|
|
|
): |
43
|
|
|
""" |
44
|
|
|
:param msg: Payload from reader |
45
|
|
|
:param delimiter: Delimiter between Graphite series parts |
46
|
|
|
:param prefix: Graphite prefix string |
47
|
|
|
:param prefix_tag: Tag to use for Graphite prefix |
48
|
|
|
:param postfix: Graphite postfix string |
49
|
|
|
:param postfix_tag: Tag to use for Graphite postfix |
50
|
|
|
:return: A list of encoded messages |
51
|
|
|
""" |
52
|
|
|
# One message could consist of several measurements |
53
|
|
|
measurements = [] |
54
|
|
|
|
55
|
|
|
for line in msg.decode().split("\n"): |
56
|
|
|
try: |
57
|
|
|
series, value, timestamp = line.split() |
58
|
|
|
except ValueError as e: |
59
|
|
|
logging.debug("Error in encoder: %s", e) |
60
|
|
|
continue |
61
|
|
|
# Strip prefix and postfix: |
62
|
|
|
series = series[len(prefix):len(series) - len(postfix)] |
63
|
|
|
# Split into tags |
64
|
|
|
hostname, measurement = series.split(delimiter, 1) |
65
|
|
|
measurement = measurement.replace(delimiter, '_') |
66
|
|
|
|
67
|
|
|
tags = { |
68
|
|
|
"host": hostname |
69
|
|
|
} |
70
|
|
|
if prefix_tag: |
71
|
|
|
if prefix.endswith(delimiter): |
72
|
|
|
prefix = prefix[:-len(delimiter)] |
73
|
|
|
tags[prefix_tag] = prefix |
74
|
|
|
if postfix_tag: |
75
|
|
|
if postfix.endswith(delimiter): |
76
|
|
|
postfix = postfix[:-len(delimiter)] |
77
|
|
|
tags[postfix_tag] = postfix |
78
|
|
|
|
79
|
|
|
encoded = ''.join([ |
80
|
|
|
str(measurement), |
81
|
|
|
',', |
82
|
|
|
','.join('{}={}'.format(self.escape_tag(k), self.escape_tag(tags[k])) for k in tags), |
83
|
|
|
' value=', |
84
|
|
|
str(value), |
85
|
|
|
' ', |
86
|
|
|
timestamp |
87
|
|
|
]) |
88
|
|
|
measurements.append(encoded) |
89
|
|
|
return measurements |
90
|
|
|
|