Completed
Push — master ( 11b435...f96b64 )
by Matthias
53s
created

Reader._handle_read()   A

Complexity

Conditions 3

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 6
Bugs 0 Features 0
Metric Value
cc 3
c 6
b 0
f 0
dl 0
loc 11
rs 9.4285
1
# -*- coding: utf-8 -*-
2
import logging
3
from kafka import KafkaConsumer
4
from kafka.common import ConsumerTimeout, KafkaUnavailableError
5
6
from kafka_influxdb.encoder.errors import EncoderError
7
from kafka_influxdb.reader.reader import ReaderAbstract
8
9
10
class Reader(ReaderAbstract):
11
    """
12
    A Kafka consumer based on kafka-python
13
    See: https://github.com/dpkp/kafka-python
14
    """
15
16
    def _connect(self):
17
        connection = "{0}:{1}".format(self.host, self.port)
18
        logging.info("Connecting to Kafka at %s...", connection)
19
        try:
20
            self.consumer = KafkaConsumer(self.topic,
21
                                          group_id=self.group,
22
                                          bootstrap_servers=[connection]
23
                                          )
24
        except KafkaUnavailableError as e:
25
            raise EncoderError(e)
26
27
    def _handle_read(self):
28
        """
29
        Read messages from Kafka.
30
        """
31
        try:
32
            for message in self.consumer:
33
                yield message.value
34
        except ConsumerTimeout as timeout:
35
            logging.error("Kafka error: %s.", timeout)
36
            # The actual reconnect handling is done in the caller
37
            raise EncoderError(timeout)
38