Conditions | 3 |
Total Lines | 11 |
Lines | 0 |
Ratio | 0 % |
Changes | 6 | ||
Bugs | 0 | Features | 0 |
1 | # -*- coding: utf-8 -*- |
||
27 | def _handle_read(self): |
||
28 | """ |
||
29 | Read messages from Kafka. |
||
30 | """ |
||
31 | try: |
||
32 | for message in self.consumer: |
||
33 | yield message.value |
||
34 | except ConsumerTimeout as timeout: |
||
35 | logging.error("Kafka error: %s.", timeout) |
||
36 | # The actual reconnect handling is done in the caller |
||
37 | raise EncoderError(timeout) |
||
38 |