ReaderAbstract._connect()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 5
rs 9.4285
1
import logging
2
from kafka_influxdb.encoder.errors import EncoderError
3
4
5
class ReaderAbstract(object):
6
    """
7
    A Kafka consumer based on kafka-python
8
    See: https://github.com/dpkp/kafka-python
9
    """
10
11
    # We need to treat legacy Kafka Versions (< 0.9.0) a little different.
12
    # First, they don't work without Zookeeper and confluent-kafka has no support for Zookeeper.
13
    # Second, they don't support API discovery.
14
    # The specific workarounds required are documented below.
15
    KAFKA_VERSION_ZOOKEEPER_OPTIONAL = "0.9.0"
16
17
    def __init__(self, host, port, group, topic, offset, broker_version=KAFKA_VERSION_ZOOKEEPER_OPTIONAL):
18
        """
19
        Initialize Kafka reader
20
        """
21
        self.host = host
22
        self.port = str(port)
23
        self.group = group
24
        self.topic = topic
25
        self.offset = offset
26
        self.broker_version = broker_version
27
28
        # Initialized on read
29
        self.consumer = None
30
31
    def read(self):
32
        """
33
        Read from Kafka. Reconnect on error.
34
        """
35
        try:
36
            self._connect()
37
            for msg in self._handle_read():
38
                yield msg
39
        finally:
40
            logging.info("Performing cleanup before stopping.")
41
            self._shutdown()
42
43
    def _connect(self):
44
        """
45
        Overwrite in child classes
46
        """
47
        raise NotImplementedError
48
49
    def _shutdown(self):
50
        """
51
        Cleanup tasks (e.g. closing the Kafka connection).
52
        Can be overwritten by specific readers if required.
53
        """
54
        if self.consumer:
55
            self.consumer.close()
56
57
    def _handle_read(self):
58
        """
59
        Read messages from Kafka.
60
        Library-specific internal message handling.
61
        Needs to be implemented by every reader
62
        """
63
        raise NotImplementedError
64