Completed
Push — master ( 42c846...16c1df )
by Matthias
01:08
created

Worker.consume()   F

Complexity

Conditions 11

Size

Total Lines 33

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 11
c 2
b 0
f 0
dl 0
loc 33
rs 3.1764

How to fix   Complexity   

Complexity

Complex classes like Worker.consume() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
A worker handles the connection to both, Kafka and InfluxDB and handles encoding in between.
3
"""
4
import logging
5
import time
6
import sys
7
from requests.exceptions import ConnectionError
8
from influxdb.exceptions import InfluxDBServerError, InfluxDBClientError
9
from kafka_influxdb.encoder.errors import EncoderError
10
11
12
class Worker(object):
13
    """
14
    Implementation of worker class that handles Kafka and InfluxDB
15
    connections and manages message encoding.
16
    """
17
    def __init__(self, reader, encoder, writer, config):
18
        """
19
        Setup
20
        """
21
        self.config = config
22
        self.reader = reader
23
        self.encoder = encoder
24
        self.writer = writer
25
        self.buffer = []
26
27
        # Field for time measurement
28
        self.start_time = None
29
        self.last_flush_time = None
30
31
    def consume(self):
32
        """
33
        Run loop. Consume messages from reader, convert it to the
34
        output format using encoder and write to output with writer
35
        """
36
        self.init_database()
37
38
        logging.info("Listening for messages on Kafka topic %s...", self.config.kafka_topic)
39
        self.start_time = self.last_flush_time = time.time()
40
        while True:
41
            try:
42
                for index, raw_message in enumerate(self.reader.read(), 1):
43
                    if raw_message:
44
                        self.buffer.extend(self.encoder.encode(raw_message))
45
                        if index % self.config.buffer_size == 0:
46
                            self.flush()
47
                    elif (self.config.buffer_timeout and len(self.buffer) > 0 and
48
                         (time.time() - self.last_flush_time) >= self.config.buffer_timeout):
49
                        logging.debug("Buffer timeout %ss. Flushing remaining %s messages from buffer.",
50
                                      self.config.buffer_timeout, len(self.buffer))
51
                        self.flush()
52
            except EncoderError:
53
                logging.error("Encoder error. Trying to reconnect to %s:%s",
54
                              self.config.kafka_host, self.config.kafka_port)
55
                logging.debug("Sleeping for %d ms before reconnect",
56
                              self.config.reconnect_wait_time_ms)
57
                time.sleep(self.config.reconnect_wait_time_ms / 1000.0)
58
            except KeyboardInterrupt:
59
                logging.info("Shutdown. Flushing remaining messages from buffer.")
60
                self.flush()
61
                break
62
            except SystemExit:
63
                break
64
65
    def init_database(self):
66
        """
67
        Initialize the InfluxDB database if it is not already there
68
        """
69
        try:
70
            logging.info("Creating InfluxDB database if not exists: %s",
71
                         self.config.influxdb_dbname)
72
            self.writer.create_database(self.config.influxdb_dbname)
73
        except (ConnectionError, InfluxDBServerError, InfluxDBClientError) as error:
74
            logging.error("Error while creating InfluxDB datbase: %s", error)
75
            sys.exit(2)
76
77
    def flush(self):
78
        """
79
        Flush values with writer
80
        """
81
        if not self.buffer:
82
            # Don't do anything when buffer empty
83
            return
84
        try:
85
            self.last_flush_time = time.time()
86
            self.writer.write(self.buffer)
87
            if self.config.statistics:
88
                self.show_statistics()
89
        except (InfluxDBServerError, InfluxDBClientError) as influx_error:
90
            logging.error("Error while writing to InfluxDB: %s", influx_error)
91
        finally:
92
            self.buffer = []
93
94
    def show_statistics(self):
95
        """
96
        Print performance metrics to stdout
97
        """
98
        delta = time.time() - self.start_time
99
        msg_per_sec = self.config.buffer_size / delta
100
        print("Flushing output buffer. {0:.2f} messages/s".format(msg_per_sec))
101
        # Reset timer
102
        self.start_time = time.time()
103
104
    def set_reader(self, reader):
105
        self.reader = reader
106
107
    def get_reader(self):
108
        return self.reader
109
110
    def set_writer(self, writer):
111
        self.writer = writer
112
113
    def get_writer(self):
114
        return self.writer
115
116
    def get_buffer(self):
117
        return self.buffer
118
119
    def get_config(self):
120
        return self.config
121