| Conditions | 3 |
| Total Lines | 11 |
| Lines | 0 |
| Ratio | 0 % |
| Changes | 6 | ||
| Bugs | 0 | Features | 0 |
| 1 | # -*- coding: utf-8 -*- |
||
| 27 | def _handle_read(self): |
||
| 28 | """ |
||
| 29 | Read messages from Kafka. |
||
| 30 | """ |
||
| 31 | try: |
||
| 32 | for message in self.consumer: |
||
| 33 | yield message.value |
||
| 34 | except ConsumerTimeout as timeout: |
||
| 35 | logging.error("Kafka error: %s.", timeout) |
||
| 36 | # The actual reconnect handling is done in the caller |
||
| 37 | raise EncoderError(timeout) |
||
| 38 |