start_consumer()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 21

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 21
rs 9.3142
1
import sys
2
import logging
3
from kafka_influxdb.worker import Worker
4
from kafka_influxdb.writer import influxdb_writer
5
from kafka_influxdb.encoder import load_encoder
6
from kafka_influxdb.reader import load_reader
7
from kafka_influxdb.config import loader
8
9
__title__ = 'kafka_influxdb'
10
__author__ = 'Matthias Endler'
11
__license__ = 'Apache License 2.0'
12
__copyright__ = 'Copyright 2015, Matthias Endler under Apache License, v2.0'
13
14
15
def main():
16
    """
17
    Setup consumer
18
    """
19
    config = loader.load_config()
20
    if config.version:
21
        show_version()
22
    start_consumer(config)
23
24
25
def show_version():
26
    """
27
    Output current version and exit
28
    """
29
    from .version import __version__
30
    print("{} {}".format(__package__, __version__))
31
    sys.exit(0)
32
33
34
def start_consumer(config):
35
    """
36
    Start metrics consumer
37
    :param config:
38
    """
39
    logging.debug("Initializing Kafka Consumer")
40
    reader = load_reader(
41
        config.kafka_reader,
42
        config.kafka_host,
43
        config.kafka_port,
44
        config.kafka_group,
45
        config.kafka_topic,
46
        config.kafka_offset
47
    )
48
    logging.debug("Initializing connection to InfluxDB at %s:%s",
49
                  config.influxdb_host, config.influxdb_port)
50
    writer = create_writer(config)
51
    logging.debug("Initializing message encoder: %s", config.encoder)
52
    encoder = load_encoder(config.encoder)
53
    client = Worker(reader, encoder, writer, config)
54
    client.consume()
55
56
57
def create_writer(config):
58
    """
59
    Create InfluxDB writer
60
    """
61
    return influxdb_writer.InfluxDBWriter(config.influxdb_host,
62
                                          config.influxdb_port,
63
                                          config.influxdb_user,
64
                                          config.influxdb_password,
65
                                          config.influxdb_dbname,
66
                                          config.influxdb_use_ssl,
67
                                          config.influxdb_verify_ssl,
68
                                          config.influxdb_timeout,
69
                                          config.influxdb_use_udp,
70
                                          config.influxdb_retention_policy,
71
                                          config.influxdb_time_precision)
72
73
74
if __name__ == '__main__':
75
    main()
76