Worker.consume()   F
last analyzed

Complexity

Conditions 11

Size

Total Lines 35

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 11
c 2
b 0
f 0
dl 0
loc 35
rs 3.1764

How to fix   Complexity   

Complexity

Complex classes like Worker.consume() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
A worker handles the connection to both, Kafka and InfluxDB and handles encoding in between.
3
"""
4
import logging
5
import time
6
from requests.exceptions import ConnectionError
7
from influxdb.exceptions import InfluxDBServerError, InfluxDBClientError
8
from kafka_influxdb.encoder.errors import EncoderError
9
10
11
class Worker(object):
12
    """
13
    Implementation of worker class that handles Kafka and InfluxDB
14
    connections and manages message encoding.
15
    """
16
17
    def __init__(self, reader, encoder, writer, config):
18
        """
19
        Setup
20
        """
21
        self.config = config
22
        self.reader = reader
23
        self.encoder = encoder
24
        self.writer = writer
25
        self.buffer = []
26
27
        # Field for time measurement
28
        self.start_time = None
29
        self.last_flush_time = None
30
        self.db_create_delay = 1
31
32
    def consume(self):
33
        """
34
        Run loop. Consume messages from reader, convert it to the
35
        output format using encoder and write to output with writer
36
        """
37
        self.init_database()
38
39
        logging.info("Listening for messages on Kafka topic %s...",
40
                     self.config.kafka_topic)
41
        self.start_time = self.last_flush_time = time.time()
42
        while True:
43
            try:
44
                for index, raw_message in enumerate(self.reader.read(), 1):
45
                    if raw_message:
46
                        self.buffer.extend(self.encoder.encode(raw_message))
47
                        if index % self.config.buffer_size == 0:
48
                            self.flush()
49
                    elif (self.config.buffer_timeout and len(self.buffer) > 0 and
50
                          (time.time() - self.last_flush_time) >= self.config.buffer_timeout):
51
                        logging.debug("Buffer timeout %ss. Flushing remaining %s messages from buffer.",
52
                                      self.config.buffer_timeout, len(self.buffer))
53
                        self.flush()
54
            except EncoderError:
55
                logging.error("Encoder error. Trying to reconnect to %s:%s",
56
                              self.config.kafka_host, self.config.kafka_port)
57
                logging.debug("Sleeping for %d ms before reconnect",
58
                              self.config.reconnect_wait_time_ms)
59
                time.sleep(self.config.reconnect_wait_time_ms / 1000.0)
60
            except KeyboardInterrupt:
61
                logging.info(
62
                    "Shutdown. Flushing remaining messages from buffer.")
63
                self.flush()
64
                break
65
            except SystemExit:
66
                break
67
68
    def init_database(self):
69
        """
70
        Initialize the InfluxDB database if it is not already there
71
        """
72
        try:
73
            logging.info("Creating InfluxDB database if not exists: %s",
74
                         self.config.influxdb_dbname)
75
            self.writer.create_database(self.config.influxdb_dbname)
76
        except ConnectionError as error:
77
            logging.error(
78
                "Connection error while trying to create InfluxDB database: %s. Waiting for retry...", error)
79
            time.sleep(self.db_create_delay)
80
            self.init_database()
81
        except (InfluxDBServerError, InfluxDBClientError) as error:
82
            logging.warning(
83
                "Could not create InfluxDB database. Assuming it already exists: %s", error)
84
85
    def flush(self):
86
        """
87
        Flush values with writer
88
        """
89
        if not self.buffer:
90
            # Don't do anything when buffer empty
91
            return
92
        try:
93
            self.last_flush_time = time.time()
94
            self.writer.write(self.buffer)
95
            if self.config.statistics:
96
                self.show_statistics()
97
        except (InfluxDBServerError, InfluxDBClientError) as influx_error:
98
            logging.error("Error while writing to InfluxDB: %s", influx_error)
99
        finally:
100
            self.buffer = []
101
102
    def show_statistics(self):
103
        """
104
        Print performance metrics to stdout
105
        """
106
        delta = time.time() - self.start_time
107
        msg_per_sec = self.config.buffer_size / delta
108
        print("Flushing output buffer. {0:.2f} messages/s".format(msg_per_sec))
109
        # Reset timer
110
        self.start_time = time.time()
111
112
    def set_reader(self, reader):
113
        self.reader = reader
114
115
    def get_reader(self):
116
        return self.reader
117
118
    def set_writer(self, writer):
119
        self.writer = writer
120
121
    def get_writer(self):
122
        return self.writer
123
124
    def get_buffer(self):
125
        return self.buffer
126
127
    def get_config(self):
128
        return self.config
129