Completed
Push — master ( 11b435...f96b64 )
by Matthias
53s
created

Worker.consume()   B

Complexity

Conditions 6

Size

Total Lines 25

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 6
c 1
b 0
f 0
dl 0
loc 25
rs 7.5384
1
"""
2
A worker handles the connection to both, Kafka and InfluxDB and handles encoding in between.
3
"""
4
import logging
5
import time
6
import sys
7
from requests.exceptions import ConnectionError
8
from influxdb.exceptions import InfluxDBServerError, InfluxDBClientError
9
from kafka_influxdb.encoder.errors import EncoderError
10
11
12
class Worker(object):
13
    """
14
    Implementation of worker class that handles Kafka and InfluxDB
15
    connections and manages message encoding.
16
    """
17
    def __init__(self, reader, encoder, writer, config):
18
        """
19
        Setup
20
        """
21
        self.config = config
22
        self.reader = reader
23
        self.encoder = encoder
24
        self.writer = writer
25
        self.buffer = []
26
27
        # Field for time measurement
28
        self.start_time = None
29
30
    def consume(self):
31
        """
32
        Run loop. Consume messages from reader, convert it to the
33
        output format using encoder and write to output with writer
34
        """
35
        self.init_database()
36
37
        logging.info("Listening for messages on Kafka topic %s...", self.config.kafka_topic)
38
        self.start_time = time.time()
39
        while True:
40
            try:
41
                for index, raw_message in enumerate(self.reader.read(), 1):
42
                    self.buffer.extend(self.encoder.encode(raw_message))
43
                    if index % self.config.buffer_size == 0:
44
                        self.flush()
45
            except EncoderError:
46
                logging.error("Encoder error. Trying to reconnect to %s:%s",
47
                              self.config.kafka_host, self.config.kafka_port)
48
                logging.debug("Sleeping for %d ms before reconnect",
49
                              self.config.reconnect_wait_time_ms)
50
                time.sleep(self.config.reconnect_wait_time_ms / 1000.0)
51
            except KeyboardInterrupt:
52
                logging.info("Shutdown. Flushing remaining messages from buffer.")
53
                self.flush()
54
                break
55
56
    def init_database(self):
57
        """
58
        Initialize the InfluxDB database if it is not already there
59
        """
60
        try:
61
            logging.info("Creating InfluxDB database if not exists: %s",
62
                         self.config.influxdb_dbname)
63
            self.writer.create_database(self.config.influxdb_dbname)
64
        except (ConnectionError, InfluxDBServerError, InfluxDBClientError) as error:
65
            logging.error("Error while creating InfluxDB datbase: %s", error)
66
            sys.exit(2)
67
68
    def flush(self):
69
        """
70
        Flush values with writer
71
        """
72
        if not self.buffer:
73
            # Don't do anything when buffer empty
74
            return
75
        try:
76
            self.writer.write(self.buffer)
77
            if self.config.statistics:
78
                self.show_statistics()
79
        except (InfluxDBServerError, InfluxDBClientError) as influx_error:
80
            logging.error("Error while writing to InfluxDB: %s", influx_error)
81
        finally:
82
            self.buffer = []
83
84
    def show_statistics(self):
85
        """
86
        Print performance metrics to stdout
87
        """
88
        delta = time.time() - self.start_time
89
        msg_per_sec = self.config.buffer_size / delta
90
        print("Flushing output buffer. {0:.2f} messages/s".format(msg_per_sec))
91
        # Reset timer
92
        self.start_time = time.time()
93
94
    def set_reader(self, reader):
95
        self.reader = reader
96
97
    def get_reader(self):
98
        return self.reader
99
100
    def set_writer(self, writer):
101
        self.writer = writer
102
103
    def get_writer(self):
104
        return self.writer
105
106
    def get_buffer(self):
107
        return self.buffer
108
109
    def get_config(self):
110
        return self.config
111