Completed
Pull Request — master (#59)
by Matthias
36s
created

Reader.__init__()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 1
c 3
b 0
f 0
dl 0
loc 2
rs 10
1
import logging
2
from confluent_kafka import Consumer, KafkaError, KafkaException, TopicPartition
3
from kafka_influxdb.encoder.errors import EncoderError
4
from kafka_influxdb.reader.reader import ReaderAbstract
5
6
7
class Reader(ReaderAbstract):
8
    """
9
    A high-performance Kafka consumer based on confluent-kafka, which uses librdkafka internally.
10
    See: https://github.com/confluentinc/confluent-kafka-python
11
    """
12
13
    def __init__(self, host, port, group, topic, broker_version=KAFKA_VERSION_ZOOKEEPER_OPTIONAL):
14
        super().__init__(host, port, group, topic, broker_version=KAFKA_VERSION_ZOOKEEPER_OPTIONAL)
15
16
    def _subscribe(self):
17
        """
18
        Subscribe to Kafka topics.
19
20
        A workaround for missing Zookeeper support in confluent-python is required here.
21
        Automatic partition rebalancing is not working with Kafka Versions < 0.9.0.
22
        Therefore we manually assign the partitions to the consumer for legacy Kafka versions.
23
        """
24
        if self.broker_version < self.KAFKA_VERSION_ZOOKEEPER_OPTIONAL:
25
            self.consumer.assign([TopicPartition(self.topic, p) for p in range(0, 10)])
26
        else:
27
            self.consumer.subscribe([self.topic])
28
29
    def _setup_connection(self):
30
        """
31
        Confluent-Kafka configuration
32
        """
33
        # TODO: Test async commit handling (self.consumer.commit(async=False))
34
        connection = {
35
            'bootstrap.servers': self.host + ":" + self.port,
36
            'group.id': self.group,
37
            'offset.store.method': 'broker',
38
            'default.topic.config': {
39
                # TODO: Make this configurable
40
                'auto.offset.reset': self.offset # 'largest'  # smallest
41
            }
42
        }
43
        # Add additional flag based on the Kafka version.
44
        if self.broker_version < self.KAFKA_VERSION_ZOOKEEPER_OPTIONAL:
45
            connection['broker.version.fallback'] = self.broker_version
46
47
        return connection
48
49
    def _connect(self):
50
        """
51
        Connect to Kafka and subscribe to the topic
52
        """
53
        connection = self._setup_connection()
54
        logging.info("Connecting to Kafka with the following settings:\n %s...", connection)
55
        self.consumer = Consumer(**connection)
56
        self._subscribe()
57
58
    def _handle_read(self):
59
        """
60
        Read messages from Kafka.
61
        """
62
        while True:
63
            msg = self.consumer.poll(timeout=1.0)
64
            if __debug__:
65
                logging.debug(msg)
66
            if msg is None:
67
                yield False
68
                continue
69
            if msg.error():
70
                self._handle_error(msg)
71
            else:
72
                # Proper message
73
                if __debug__:
74
                    logging.debug('%s [%d] at offset %d with key %s:\n',
75
                                  msg.topic(), msg.partition(), msg.offset(), str(msg.key()))
76
                yield msg.value().rstrip()
77
78
    @staticmethod
79
    def _handle_error(msg):
80
        if not msg.error():
81
            return
82
        # Error or event
83
        if msg.error().code() == KafkaError._PARTITION_EOF:
84
            # End of partition event
85
            logging.info('%s [%d] reached end at offset %d with key %s\n',
86
                         msg.topic(), msg.partition(), msg.offset(), str(msg.key()))
87
        else:
88
            raise EncoderError(msg.error())
89