Completed
Push — master ( 5cbcf8...d3d059 )
by Matthias
01:08
created

kafka_influxdb.Worker.consume()   A

Complexity

Conditions 4

Size

Total Lines 15

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 4
dl 0
loc 15
rs 9.2
1
import logging
2
import time
3
4
5
class Worker(object):
6
    def __init__(self, reader, encoder, writer, config):
7
        """
8
        Setup
9
        """
10
        self.config = config
11
        self.reader = reader
12
        self.encoder = encoder
13
        self.writer = writer
14
        self.buffer = []
15
16
        # Field for time measurement
17
        self.start_time = None
18
19
    def consume(self):
20
        """
21
        Run loop. Consume messages from reader, convert it to the output format and write with writer
22
        """
23
        self.init_database()
24
25
        logging.info("Listening for messages on Kafka topic %s...", self.config.kafka_topic)
26
        self.start_time = time.time()
27
        try:
28
            for index, raw_message in enumerate(self.reader.read(), 1):
29
                self.buffer.extend(self.encoder.encode(raw_message))
30
                if index % self.config.buffer_size == 0:
31
                    self.flush()
32
        except KeyboardInterrupt:
33
            logging.info("Shutdown")
34
35
    def init_database(self):
36
        """
37
        Initialize the InfluxDB database if it is not already there
38
        """
39
        try:
40
            logging.info("Creating InfluxDB database if not exists: %s", self.config.influxdb_dbname)
41
            self.writer.create_database(self.config.influxdb_dbname)
42
        except Exception as e:
43
            logging.info(e)
44
45
    def flush(self):
46
        """ Flush values with writer """
47
        try:
48
            self.writer.write(self.buffer)
49
            if self.config.statistics:
50
                self.show_statistics()
51
        except Exception as e:
52
            logging.warning(e)
53
        self.buffer = []
54
55
    def show_statistics(self):
56
        delta = time.time() - self.start_time
57
        msg_per_sec = self.config.buffer_size / delta
58
        print("Flushing output buffer. {0:.2f} messages/s".format(msg_per_sec))
59
        # Reset timer
60
        self.start_time = time.time()
61
62
    def set_reader(self, reader):
63
        self.reader = reader
64
65
    def get_reader(self):
66
        return self.reader
67
68
    def set_writer(self, writer):
69
        self.writer = writer
70
71
    def get_writer(self):
72
        return self.writer
73
74
    def get_buffer(self):
75
        return self.buffer
76
77
    def get_config(self):
78
        return self.config
79