Completed
Push — master ( 11b435...f96b64 )
by Matthias
53s
created

Reader._subscribe()   A

Complexity

Conditions 3

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 3
c 3
b 0
f 0
dl 0
loc 12
rs 9.4285
1
import logging
2
from confluent_kafka import Consumer, KafkaError, KafkaException, TopicPartition
3
from kafka_influxdb.encoder.errors import EncoderError
4
from kafka_influxdb.reader.reader import ReaderAbstract
5
6
7
class Reader(ReaderAbstract):
8
    """
9
    A high-performance Kafka consumer based on confluent-kafka, which uses librdkafka internally.
10
    See: https://github.com/confluentinc/confluent-kafka-python
11
    """
12
13
    def _subscribe(self):
14
        """
15
        Subscribe to Kafka topics.
16
17
        A workaround for missing Zookeeper support in confluent-python is required here.
18
        Automatic partition rebalancing is not working with Kafka Versions < 0.9.0.
19
        Therefore we manually assign the partitions to the consumer for legacy Kafka versions.
20
        """
21
        if self.broker_version < self.KAFKA_VERSION_ZOOKEEPER_OPTIONAL:
22
            self.consumer.assign([TopicPartition(self.topic, p) for p in range(0, 10)])
23
        else:
24
            self.consumer.subscribe([self.topic])
25
26
    def _setup_connection(self):
27
        """
28
        Confluent-Kafka configuration
29
        """
30
        # TODO: Test async commit handling (self.consumer.commit(async=False))
31
        connection = {
32
            'bootstrap.servers': self.host + ":" + self.port,
33
            'group.id': self.group,
34
            'offset.store.method': 'broker',
35
            'default.topic.config': {
36
                # TODO: Make this configurable
37
                'auto.offset.reset': 'largest'  # smallest
38
            }
39
        }
40
        # Add additional flag based on the Kafka version.
41
        if self.broker_version < self.KAFKA_VERSION_ZOOKEEPER_OPTIONAL:
42
            connection['broker.version.fallback'] = self.broker_version
43
44
        return connection
45
46
    def _connect(self):
47
        """
48
        Connect to Kafka and subscribe to the topic
49
        """
50
        connection = self._setup_connection()
51
        logging.info("Connecting to Kafka with the following settings:\n %s...", connection)
52
        self.consumer = Consumer(**connection)
53
        self._subscribe()
54
55
    def _handle_read(self):
56
        """
57
        Read messages from Kafka.
58
        """
59
        while True:
60
            msg = self.consumer.poll(timeout=1.0)
61
            if __debug__:
62
                logging.debug(msg)
63
            if msg is None:
64
                continue
65
            if msg.error():
66
                self._handle_error(msg)
67
            else:
68
                # Proper message
69
                if __debug__:
70
                    logging.debug('%s [%d] at offset %d with key %s:\n',
71
                                  msg.topic(), msg.partition(), msg.offset(), str(msg.key()))
72
                yield msg.value().rstrip()
73
74
    @staticmethod
75
    def _handle_error(msg):
76
        if not msg.error():
77
            return
78
        # Error or event
79
        if msg.error().code() == KafkaError._PARTITION_EOF:
80
            # End of partition event
81
            logging.info('%s [%d] reached end at offset %d with key %s\n',
82
                         msg.topic(), msg.partition(), msg.offset(), str(msg.key()))
83
        else:
84
            raise EncoderError(msg.error())
85