|
1
|
|
|
import logging |
|
2
|
|
|
from confluent_kafka import Consumer, KafkaError, KafkaException, TopicPartition |
|
3
|
|
|
from kafka_influxdb.encoder.errors import EncoderError |
|
4
|
|
|
from kafka_influxdb.reader.reader import ReaderAbstract |
|
5
|
|
|
|
|
6
|
|
|
|
|
7
|
|
|
class Reader(ReaderAbstract): |
|
8
|
|
|
""" |
|
9
|
|
|
A high-performance Kafka consumer based on confluent-kafka, which uses librdkafka internally. |
|
10
|
|
|
See: https://github.com/confluentinc/confluent-kafka-python |
|
11
|
|
|
""" |
|
12
|
|
|
|
|
13
|
|
|
def _subscribe(self): |
|
14
|
|
|
""" |
|
15
|
|
|
Subscribe to Kafka topics. |
|
16
|
|
|
|
|
17
|
|
|
A workaround for missing Zookeeper support in confluent-python is required here. |
|
18
|
|
|
Automatic partition rebalancing is not working with Kafka Versions < 0.9.0. |
|
19
|
|
|
Therefore we manually assign the partitions to the consumer for legacy Kafka versions. |
|
20
|
|
|
""" |
|
21
|
|
|
if self.broker_version < self.KAFKA_VERSION_ZOOKEEPER_OPTIONAL: |
|
22
|
|
|
self.consumer.assign([TopicPartition(self.topic, p) for p in range(0, 10)]) |
|
23
|
|
|
else: |
|
24
|
|
|
self.consumer.subscribe([self.topic]) |
|
25
|
|
|
|
|
26
|
|
|
def _setup_connection(self): |
|
27
|
|
|
""" |
|
28
|
|
|
Confluent-Kafka configuration |
|
29
|
|
|
""" |
|
30
|
|
|
# TODO: Test async commit handling (self.consumer.commit(async=False)) |
|
31
|
|
|
connection = { |
|
32
|
|
|
'bootstrap.servers': self.host + ":" + self.port, |
|
33
|
|
|
'group.id': self.group, |
|
34
|
|
|
'offset.store.method': 'broker', |
|
35
|
|
|
'default.topic.config': { |
|
36
|
|
|
# TODO: Make this configurable |
|
37
|
|
|
'auto.offset.reset': 'largest' # smallest |
|
38
|
|
|
} |
|
39
|
|
|
} |
|
40
|
|
|
# Add additional flag based on the Kafka version. |
|
41
|
|
|
if self.broker_version < self.KAFKA_VERSION_ZOOKEEPER_OPTIONAL: |
|
42
|
|
|
connection['broker.version.fallback'] = self.broker_version |
|
43
|
|
|
|
|
44
|
|
|
return connection |
|
45
|
|
|
|
|
46
|
|
|
def _connect(self): |
|
47
|
|
|
""" |
|
48
|
|
|
Connect to Kafka and subscribe to the topic |
|
49
|
|
|
""" |
|
50
|
|
|
connection = self._setup_connection() |
|
51
|
|
|
logging.info("Connecting to Kafka with the following settings:\n %s...", connection) |
|
52
|
|
|
self.consumer = Consumer(**connection) |
|
53
|
|
|
self._subscribe() |
|
54
|
|
|
|
|
55
|
|
|
def _handle_read(self): |
|
56
|
|
|
""" |
|
57
|
|
|
Read messages from Kafka. |
|
58
|
|
|
""" |
|
59
|
|
|
while True: |
|
60
|
|
|
msg = self.consumer.poll(timeout=1.0) |
|
61
|
|
|
if __debug__: |
|
62
|
|
|
logging.debug(msg) |
|
63
|
|
|
if msg is None: |
|
64
|
|
|
continue |
|
65
|
|
|
if msg.error(): |
|
66
|
|
|
self._handle_error(msg) |
|
67
|
|
|
else: |
|
68
|
|
|
# Proper message |
|
69
|
|
|
if __debug__: |
|
70
|
|
|
logging.debug('%s [%d] at offset %d with key %s:\n', |
|
71
|
|
|
msg.topic(), msg.partition(), msg.offset(), str(msg.key())) |
|
72
|
|
|
yield msg.value().rstrip() |
|
73
|
|
|
|
|
74
|
|
|
@staticmethod |
|
75
|
|
|
def _handle_error(msg): |
|
76
|
|
|
if not msg.error(): |
|
77
|
|
|
return |
|
78
|
|
|
# Error or event |
|
79
|
|
|
if msg.error().code() == KafkaError._PARTITION_EOF: |
|
80
|
|
|
# End of partition event |
|
81
|
|
|
logging.info('%s [%d] reached end at offset %d with key %s\n', |
|
82
|
|
|
msg.topic(), msg.partition(), msg.offset(), str(msg.key())) |
|
83
|
|
|
else: |
|
84
|
|
|
raise EncoderError(msg.error()) |
|
85
|
|
|
|