Completed
Push — master ( 11b435...f96b64 )
by Matthias
53s
created

ReaderAbstract.read()   A

Complexity

Conditions 2

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
c 1
b 0
f 0
dl 0
loc 11
rs 9.4285
1
import logging
2
from kafka_influxdb.encoder.errors import EncoderError
3
4
5
class ReaderAbstract(object):
6
    """
7
    A Kafka consumer based on kafka-python
8
    See: https://github.com/dpkp/kafka-python
9
    """
10
11
    # We need to treat legacy Kafka Versions (< 0.9.0) a little different.
12
    # First, they don't work without Zookeeper and confluent-kafka has no support for Zookeeper.
13
    # Second, they don't support API discovery.
14
    # The specific workarounds required are documented below.
15
    KAFKA_VERSION_ZOOKEEPER_OPTIONAL = "0.9.0"
16
17
    def __init__(self, host, port, group, topic, broker_version=KAFKA_VERSION_ZOOKEEPER_OPTIONAL):
18
        """
19
        Initialize Kafka reader
20
        """
21
        self.host = host
22
        self.port = str(port)
23
        self.group = group
24
        self.topic = topic
25
        self.broker_version = broker_version
26
27
        # Initialized on read
28
        self.consumer = None
29
30
    def read(self):
31
        """
32
        Read from Kafka. Reconnect on error.
33
        """
34
        try:
35
            self._connect()
36
            for msg in self._handle_read():
37
                yield msg
38
        finally:
39
            logging.info("Performing cleanup before stopping.")
40
            self._shutdown()
41
42
    def _connect(self):
43
        """
44
        Overwrite in child classes
45
        """
46
        raise NotImplementedError
47
48
    def _shutdown(self):
49
        """
50
        Cleanup tasks (e.g. closing the Kafka connection).
51
        Can be overwritten by specific readers if required.
52
        """
53
        if self.consumer:
54
            self.consumer.close()
55
56
    def _handle_read(self):
57
        """
58
        Read messages from Kafka.
59
        Library-specific internal message handling.
60
        Needs to be implemented by every reader
61
        """
62
        raise NotImplementedError
63