| Total Complexity | 7 | 
| Total Lines | 59 | 
| Duplicated Lines | 0 % | 
| Changes | 2 | ||
| Bugs | 0 | Features | 0 | 
| 1 | import logging | ||
| 5 | class ReaderAbstract(object): | ||
| 6 | """ | ||
| 7 | A Kafka consumer based on kafka-python | ||
| 8 | See: https://github.com/dpkp/kafka-python | ||
| 9 | """ | ||
| 10 | |||
| 11 | # We need to treat legacy Kafka Versions (< 0.9.0) a little different. | ||
| 12 | # First, they don't work without Zookeeper and confluent-kafka has no support for Zookeeper. | ||
| 13 | # Second, they don't support API discovery. | ||
| 14 | # The specific workarounds required are documented below. | ||
| 15 | KAFKA_VERSION_ZOOKEEPER_OPTIONAL = "0.9.0" | ||
| 16 | |||
| 17 | def __init__(self, host, port, group, topic, offset, broker_version=KAFKA_VERSION_ZOOKEEPER_OPTIONAL): | ||
| 18 | """ | ||
| 19 | Initialize Kafka reader | ||
| 20 | """ | ||
| 21 | self.host = host | ||
| 22 | self.port = str(port) | ||
| 23 | self.group = group | ||
| 24 | self.topic = topic | ||
| 25 | self.offset = offset | ||
| 26 | self.broker_version = broker_version | ||
| 27 | |||
| 28 | # Initialized on read | ||
| 29 | self.consumer = None | ||
| 30 | |||
| 31 | def read(self): | ||
| 32 | """ | ||
| 33 | Read from Kafka. Reconnect on error. | ||
| 34 | """ | ||
| 35 | try: | ||
| 36 | self._connect() | ||
| 37 | for msg in self._handle_read(): | ||
| 38 | yield msg | ||
| 39 | finally: | ||
| 40 |             logging.info("Performing cleanup before stopping.") | ||
| 41 | self._shutdown() | ||
| 42 | |||
| 43 | def _connect(self): | ||
| 44 | """ | ||
| 45 | Overwrite in child classes | ||
| 46 | """ | ||
| 47 | raise NotImplementedError | ||
| 48 | |||
| 49 | def _shutdown(self): | ||
| 50 | """ | ||
| 51 | Cleanup tasks (e.g. closing the Kafka connection). | ||
| 52 | Can be overwritten by specific readers if required. | ||
| 53 | """ | ||
| 54 | if self.consumer: | ||
| 55 | self.consumer.close() | ||
| 56 | |||
| 57 | def _handle_read(self): | ||
| 58 | """ | ||
| 59 | Read messages from Kafka. | ||
| 60 | Library-specific internal message handling. | ||
| 61 | Needs to be implemented by every reader | ||
| 62 | """ | ||
| 63 | raise NotImplementedError | ||
| 64 |