|
1
|
|
|
import logging |
|
2
|
|
|
from kafka_influxdb.encoder.escape_functions import influxdb_tag_escaper |
|
3
|
|
|
|
|
4
|
|
|
|
|
5
|
|
|
class Encoder(object): |
|
6
|
|
|
""" |
|
7
|
|
|
An encoder for the Collectd Graphite ASCII format |
|
8
|
|
|
See https://collectd.org/wiki/index.php/Graphite |
|
9
|
|
|
|
|
10
|
|
|
Sample measurements this encoder can handle: |
|
11
|
|
|
[prefix.]host.plugin.measurement[.postfix] value timestamp |
|
12
|
|
|
|
|
13
|
|
|
26f2fc918f50.load.load.shortterm 0.05 1436357630 |
|
14
|
|
|
26f2fc918f50.load.load.midterm 0.05 1436357630 |
|
15
|
|
|
26f2fc918f50.load.load.longterm 0.05 1436357630 |
|
16
|
|
|
|
|
17
|
|
|
26f2fc918f50.cpu-0.cpu-user 30364 1436357630 |
|
18
|
|
|
|
|
19
|
|
|
26f2fc918f50.memory.memory-buffered 743657472 1436357630 |
|
20
|
|
|
|
|
21
|
|
|
The optional prefix and postifx can be set in the collectd plugin: |
|
22
|
|
|
<Plugin write_kafka> |
|
23
|
|
|
Property "metadata.broker.list" "kafka:9092" |
|
24
|
|
|
<Topic "metrics"> |
|
25
|
|
|
Format Graphite |
|
26
|
|
|
GraphitePrefix "myprefix" |
|
27
|
|
|
GraphitePostfix "mypostfix" |
|
28
|
|
|
</Topic> |
|
29
|
|
|
</Plugin> |
|
30
|
|
|
""" |
|
31
|
|
|
|
|
32
|
|
|
def __init__(self): |
|
33
|
|
|
self.escape_tag = influxdb_tag_escaper() |
|
34
|
|
|
|
|
35
|
|
|
def encode(self, |
|
36
|
|
|
msg, |
|
37
|
|
|
delimiter='.', |
|
38
|
|
|
prefix='', |
|
39
|
|
|
prefix_tag=None, |
|
40
|
|
|
postfix='', |
|
41
|
|
|
postfix_tag=None, |
|
42
|
|
|
): |
|
43
|
|
|
""" |
|
44
|
|
|
:param msg: Payload from reader |
|
45
|
|
|
:param delimiter: Delimiter between Graphite series parts |
|
46
|
|
|
:param prefix: Graphite prefix string |
|
47
|
|
|
:param prefix_tag: Tag to use for Graphite prefix |
|
48
|
|
|
:param postfix: Graphite postfix string |
|
49
|
|
|
:param postfix_tag: Tag to use for Graphite postfix |
|
50
|
|
|
:return: A list of encoded messages |
|
51
|
|
|
""" |
|
52
|
|
|
# One message could consist of several measurements |
|
53
|
|
|
measurements = [] |
|
54
|
|
|
|
|
55
|
|
|
for line in msg.decode().split("\n"): |
|
56
|
|
|
try: |
|
57
|
|
|
series, value, timestamp = line.split() |
|
58
|
|
|
except ValueError as e: |
|
59
|
|
|
logging.debug("Error in encoder: %s", e) |
|
60
|
|
|
continue |
|
61
|
|
|
# Strip prefix and postfix: |
|
62
|
|
|
series = series[len(prefix):len(series) - len(postfix)] |
|
63
|
|
|
# Split into tags |
|
64
|
|
|
hostname, measurement = series.split(delimiter, 1) |
|
65
|
|
|
measurement = measurement.replace(delimiter, '_') |
|
66
|
|
|
|
|
67
|
|
|
tags = { |
|
68
|
|
|
"host": hostname |
|
69
|
|
|
} |
|
70
|
|
|
if prefix_tag: |
|
71
|
|
|
if prefix.endswith(delimiter): |
|
72
|
|
|
prefix = prefix[:-len(delimiter)] |
|
73
|
|
|
tags[prefix_tag] = prefix |
|
74
|
|
|
if postfix_tag: |
|
75
|
|
|
if postfix.endswith(delimiter): |
|
76
|
|
|
postfix = postfix[:-len(delimiter)] |
|
77
|
|
|
tags[postfix_tag] = postfix |
|
78
|
|
|
|
|
79
|
|
|
encoded = ''.join([ |
|
80
|
|
|
str(measurement), |
|
81
|
|
|
',', |
|
82
|
|
|
','.join('{}={}'.format(self.escape_tag(k), self.escape_tag(tags[k])) for k in tags), |
|
83
|
|
|
' value=', |
|
84
|
|
|
str(value), |
|
85
|
|
|
' ', |
|
86
|
|
|
timestamp |
|
87
|
|
|
]) |
|
88
|
|
|
measurements.append(encoded) |
|
89
|
|
|
return measurements |
|
90
|
|
|
|