1
|
|
|
import logging |
2
|
|
|
from confluent_kafka import Consumer, KafkaError, KafkaException, TopicPartition |
3
|
|
|
from kafka_influxdb.encoder.errors import EncoderError |
4
|
|
|
from kafka_influxdb.reader.reader import ReaderAbstract |
5
|
|
|
|
6
|
|
|
|
7
|
|
|
class Reader(ReaderAbstract): |
8
|
|
|
""" |
9
|
|
|
A high-performance Kafka consumer based on confluent-kafka, which uses librdkafka internally. |
10
|
|
|
See: https://github.com/confluentinc/confluent-kafka-python |
11
|
|
|
""" |
12
|
|
|
|
13
|
|
|
def _subscribe(self): |
14
|
|
|
""" |
15
|
|
|
Subscribe to Kafka topics. |
16
|
|
|
|
17
|
|
|
A workaround for missing Zookeeper support in confluent-python is required here. |
18
|
|
|
Automatic partition rebalancing is not working with Kafka Versions < 0.9.0. |
19
|
|
|
Therefore we manually assign the partitions to the consumer for legacy Kafka versions. |
20
|
|
|
""" |
21
|
|
|
if self.broker_version < self.KAFKA_VERSION_ZOOKEEPER_OPTIONAL: |
22
|
|
|
self.consumer.assign([TopicPartition(self.topic, p) for p in range(0, 10)]) |
23
|
|
|
else: |
24
|
|
|
self.consumer.subscribe([self.topic]) |
25
|
|
|
|
26
|
|
|
def _setup_connection(self): |
27
|
|
|
""" |
28
|
|
|
Confluent-Kafka configuration |
29
|
|
|
""" |
30
|
|
|
# TODO: Test async commit handling (self.consumer.commit(async=False)) |
31
|
|
|
connection = { |
32
|
|
|
'bootstrap.servers': self.host + ":" + self.port, |
33
|
|
|
'group.id': self.group, |
34
|
|
|
'offset.store.method': 'broker', |
35
|
|
|
'default.topic.config': { |
36
|
|
|
# TODO: Make this configurable |
37
|
|
|
'auto.offset.reset': 'largest' # smallest |
38
|
|
|
} |
39
|
|
|
} |
40
|
|
|
# Add additional flag based on the Kafka version. |
41
|
|
|
if self.broker_version < self.KAFKA_VERSION_ZOOKEEPER_OPTIONAL: |
42
|
|
|
connection['broker.version.fallback'] = self.broker_version |
43
|
|
|
|
44
|
|
|
return connection |
45
|
|
|
|
46
|
|
|
def _connect(self): |
47
|
|
|
""" |
48
|
|
|
Connect to Kafka and subscribe to the topic |
49
|
|
|
""" |
50
|
|
|
connection = self._setup_connection() |
51
|
|
|
logging.info("Connecting to Kafka with the following settings:\n %s...", connection) |
52
|
|
|
self.consumer = Consumer(**connection) |
53
|
|
|
self._subscribe() |
54
|
|
|
|
55
|
|
|
def _handle_read(self): |
56
|
|
|
""" |
57
|
|
|
Read messages from Kafka. |
58
|
|
|
""" |
59
|
|
|
while True: |
60
|
|
|
msg = self.consumer.poll(timeout=1.0) |
61
|
|
|
if __debug__: |
62
|
|
|
logging.debug(msg) |
63
|
|
|
if msg is None: |
64
|
|
|
continue |
65
|
|
|
if msg.error(): |
66
|
|
|
self._handle_error(msg) |
67
|
|
|
else: |
68
|
|
|
# Proper message |
69
|
|
|
if __debug__: |
70
|
|
|
logging.debug('%s [%d] at offset %d with key %s:\n', |
71
|
|
|
msg.topic(), msg.partition(), msg.offset(), str(msg.key())) |
72
|
|
|
yield msg.value().rstrip() |
73
|
|
|
|
74
|
|
|
@staticmethod |
75
|
|
|
def _handle_error(msg): |
76
|
|
|
if not msg.error(): |
77
|
|
|
return |
78
|
|
|
# Error or event |
79
|
|
|
if msg.error().code() == KafkaError._PARTITION_EOF: |
80
|
|
|
# End of partition event |
81
|
|
|
logging.info('%s [%d] reached end at offset %d with key %s\n', |
82
|
|
|
msg.topic(), msg.partition(), msg.offset(), str(msg.key())) |
83
|
|
|
else: |
84
|
|
|
raise EncoderError(msg.error()) |
85
|
|
|
|