Completed
Pull Request — master (#49)
by
unknown
01:11
created

Worker.consume()   F

Complexity

Conditions 10

Size

Total Lines 31

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 10
c 2
b 0
f 0
dl 0
loc 31
rs 3.1304

How to fix   Complexity   

Complexity

Complex classes like Worker.consume() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
A worker handles the connection to both, Kafka and InfluxDB and handles encoding in between.
3
"""
4
import logging
5
import time
6
import sys
7
from requests.exceptions import ConnectionError
8
from influxdb.exceptions import InfluxDBServerError, InfluxDBClientError
9
from kafka_influxdb.encoder.errors import EncoderError
10
11
12
class Worker(object):
13
    """
14
    Implementation of worker class that handles Kafka and InfluxDB
15
    connections and manages message encoding.
16
    """
17
    def __init__(self, reader, encoder, writer, config):
18
        """
19
        Setup
20
        """
21
        self.config = config
22
        self.reader = reader
23
        self.encoder = encoder
24
        self.writer = writer
25
        self.buffer = []
26
27
        # Field for time measurement
28
        self.start_time = None
29
        self.last_flush_time = None
30
31
    def consume(self):
32
        """
33
        Run loop. Consume messages from reader, convert it to the
34
        output format using encoder and write to output with writer
35
        """
36
        self.init_database()
37
38
        logging.info("Listening for messages on Kafka topic %s...", self.config.kafka_topic)
39
        self.start_time = self.last_flush_time = time.time()
40
        while True:
41
            try:
42
                for index, raw_message in enumerate(self.reader.read(), 1):
43
                    if raw_message:
44
                        self.buffer.extend(self.encoder.encode(raw_message))
45
                        if index % self.config.buffer_size == 0:
46
                            self.flush()
47
                    elif (self.config.buffer_timeout and len(self.buffer) > 0 and
48
                         (time.time() - self.last_flush_time) >= self.config.buffer_timeout):
49
                        logging.debug("Buffer timeout %ss. Flushing remaining %s messages from buffer.",
50
                                      self.config.buffer_timeout, len(self.buffer))
51
                        self.flush()
52
            except EncoderError:
53
                logging.error("Encoder error. Trying to reconnect to %s:%s",
54
                              self.config.kafka_host, self.config.kafka_port)
55
                logging.debug("Sleeping for %d ms before reconnect",
56
                              self.config.reconnect_wait_time_ms)
57
                time.sleep(self.config.reconnect_wait_time_ms / 1000.0)
58
            except KeyboardInterrupt:
59
                logging.info("Shutdown. Flushing remaining messages from buffer.")
60
                self.flush()
61
                break
62
63
    def init_database(self):
64
        """
65
        Initialize the InfluxDB database if it is not already there
66
        """
67
        try:
68
            logging.info("Creating InfluxDB database if not exists: %s",
69
                         self.config.influxdb_dbname)
70
            self.writer.create_database(self.config.influxdb_dbname)
71
        except (ConnectionError, InfluxDBServerError, InfluxDBClientError) as error:
72
            logging.error("Error while creating InfluxDB datbase: %s", error)
73
            sys.exit(2)
74
75
    def flush(self):
76
        """
77
        Flush values with writer
78
        """
79
        if not self.buffer:
80
            # Don't do anything when buffer empty
81
            return
82
        try:
83
            self.last_flush_time = time.time()
84
            self.writer.write(self.buffer)
85
            if self.config.statistics:
86
                self.show_statistics()
87
        except (InfluxDBServerError, InfluxDBClientError) as influx_error:
88
            logging.error("Error while writing to InfluxDB: %s", influx_error)
89
        finally:
90
            self.buffer = []
91
92
    def show_statistics(self):
93
        """
94
        Print performance metrics to stdout
95
        """
96
        delta = time.time() - self.start_time
97
        msg_per_sec = self.config.buffer_size / delta
98
        print("Flushing output buffer. {0:.2f} messages/s".format(msg_per_sec))
99
        # Reset timer
100
        self.start_time = time.time()
101
102
    def set_reader(self, reader):
103
        self.reader = reader
104
105
    def get_reader(self):
106
        return self.reader
107
108
    def set_writer(self, writer):
109
        self.writer = writer
110
111
    def get_writer(self):
112
        return self.writer
113
114
    def get_buffer(self):
115
        return self.buffer
116
117
    def get_config(self):
118
        return self.config
119