1
|
|
|
import logging |
2
|
|
|
from confluent_kafka import Consumer, KafkaError, KafkaException, TopicPartition |
3
|
|
|
from kafka_influxdb.encoder.errors import EncoderError |
4
|
|
|
from kafka_influxdb.reader.reader import ReaderAbstract |
5
|
|
|
|
6
|
|
|
|
7
|
|
|
class Reader(ReaderAbstract): |
8
|
|
|
""" |
9
|
|
|
A high-performance Kafka consumer based on confluent-kafka, which uses librdkafka internally. |
10
|
|
|
See: https://github.com/confluentinc/confluent-kafka-python |
11
|
|
|
""" |
12
|
|
|
|
13
|
|
|
def __init__(self, host, port, group, topic, broker_version=KAFKA_VERSION_ZOOKEEPER_OPTIONAL): |
14
|
|
|
super().__init__(host, port, group, topic, broker_version=KAFKA_VERSION_ZOOKEEPER_OPTIONAL) |
15
|
|
|
|
16
|
|
|
def _subscribe(self): |
17
|
|
|
""" |
18
|
|
|
Subscribe to Kafka topics. |
19
|
|
|
|
20
|
|
|
A workaround for missing Zookeeper support in confluent-python is required here. |
21
|
|
|
Automatic partition rebalancing is not working with Kafka Versions < 0.9.0. |
22
|
|
|
Therefore we manually assign the partitions to the consumer for legacy Kafka versions. |
23
|
|
|
""" |
24
|
|
|
if self.broker_version < self.KAFKA_VERSION_ZOOKEEPER_OPTIONAL: |
25
|
|
|
self.consumer.assign([TopicPartition(self.topic, p) for p in range(0, 10)]) |
26
|
|
|
else: |
27
|
|
|
self.consumer.subscribe([self.topic]) |
28
|
|
|
|
29
|
|
|
def _setup_connection(self): |
30
|
|
|
""" |
31
|
|
|
Confluent-Kafka configuration |
32
|
|
|
""" |
33
|
|
|
# TODO: Test async commit handling (self.consumer.commit(async=False)) |
34
|
|
|
connection = { |
35
|
|
|
'bootstrap.servers': self.host + ":" + self.port, |
36
|
|
|
'group.id': self.group, |
37
|
|
|
'offset.store.method': 'broker', |
38
|
|
|
'default.topic.config': { |
39
|
|
|
# TODO: Make this configurable |
40
|
|
|
'auto.offset.reset': self.offset # 'largest' # smallest |
41
|
|
|
} |
42
|
|
|
} |
43
|
|
|
# Add additional flag based on the Kafka version. |
44
|
|
|
if self.broker_version < self.KAFKA_VERSION_ZOOKEEPER_OPTIONAL: |
45
|
|
|
connection['broker.version.fallback'] = self.broker_version |
46
|
|
|
|
47
|
|
|
return connection |
48
|
|
|
|
49
|
|
|
def _connect(self): |
50
|
|
|
""" |
51
|
|
|
Connect to Kafka and subscribe to the topic |
52
|
|
|
""" |
53
|
|
|
connection = self._setup_connection() |
54
|
|
|
logging.info("Connecting to Kafka with the following settings:\n %s...", connection) |
55
|
|
|
self.consumer = Consumer(**connection) |
56
|
|
|
self._subscribe() |
57
|
|
|
|
58
|
|
|
def _handle_read(self): |
59
|
|
|
""" |
60
|
|
|
Read messages from Kafka. |
61
|
|
|
""" |
62
|
|
|
while True: |
63
|
|
|
msg = self.consumer.poll(timeout=1.0) |
64
|
|
|
if __debug__: |
65
|
|
|
logging.debug(msg) |
66
|
|
|
if msg is None: |
67
|
|
|
yield False |
68
|
|
|
continue |
69
|
|
|
if msg.error(): |
70
|
|
|
self._handle_error(msg) |
71
|
|
|
else: |
72
|
|
|
# Proper message |
73
|
|
|
if __debug__: |
74
|
|
|
logging.debug('%s [%d] at offset %d with key %s:\n', |
75
|
|
|
msg.topic(), msg.partition(), msg.offset(), str(msg.key())) |
76
|
|
|
yield msg.value().rstrip() |
77
|
|
|
|
78
|
|
|
@staticmethod |
79
|
|
|
def _handle_error(msg): |
80
|
|
|
if not msg.error(): |
81
|
|
|
return |
82
|
|
|
# Error or event |
83
|
|
|
if msg.error().code() == KafkaError._PARTITION_EOF: |
84
|
|
|
# End of partition event |
85
|
|
|
logging.info('%s [%d] reached end at offset %d with key %s\n', |
86
|
|
|
msg.topic(), msg.partition(), msg.offset(), str(msg.key())) |
87
|
|
|
else: |
88
|
|
|
raise EncoderError(msg.error()) |
89
|
|
|
|