Completed
Push — master ( 5cbcf8...d3d059 )
by Matthias
01:08
created

kafka_influxdb.config.load_config()   B

Complexity

Conditions 5

Size

Total Lines 36

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 5
dl 0
loc 36
rs 8.0894
1
from . import default_config
2
import yaml
3
import logging
4
import argparse
5
import collections
6
import sys
7
8
9
class ObjectView(object):
10
    def __init__(self, d):
11
        self.__dict__ = d
12
13
14
def load_config():
15
    """
16
    Load settings from default config and optionally
17
    overwrite with config file and commandline parameters
18
    (in that order).
19
20
    Note: Commandline parameters are of the form
21
    --kafka_host="localhost"
22
    to make them easy to enter from the cli
23
    while the config file parameters are stored in a dict
24
    {kafka: { host: localhost }}
25
    to avoid redundancy in the key name.
26
    So to merge them, we flatten all keys.
27
    """
28
    # We start with the default config
29
    config = flatten(default_config.DEFAULT_CONFIG)
30
31
    # Read commandline arguments
32
    cli_config = flatten(parse_args())
33
34
    if "configfile" in cli_config:
35
        print("Reading config file {}".format(cli_config['configfile']))
36
        configfile = flatten(parse_configfile(cli_config['configfile']))
37
        config = overwrite_config(config, configfile)
38
39
    # Parameters from commandline take precedence over all others
40
    config = overwrite_config(config, cli_config)
41
42
    # Set verbosity level
43
    if 'verbose' in config:
44
        if config['verbose'] == 1:
45
            logging.getLogger().setLevel(logging.INFO)
46
        elif config['verbose'] > 1:
47
            logging.getLogger().setLevel(logging.DEBUG)
48
49
    return ObjectView(config)
50
51
52
def overwrite_config(old_values, new_values):
53
    config = old_values.copy()
54
    config.update(new_values)
55
    return config
56
57
58
def parse_configfile(configfile):
59
    """
60
    Read settings from file
61
    """
62
    with open(configfile) as f:
63
        try:
64
            return yaml.safe_load(f)
65
        except Exception as e:
66
            logging.fatal("Could not load default config file: ", e)
67
            exit(-1)
68
69
70
def flatten(d, parent_key='', sep='_'):
71
    """
72
    Flatten keys in a dictionary
73
    Example:
74
    flatten({'a': 1, 'c': {'a': 2, 'b': {'x': 5, 'y' : 10}}, 'd': [1, 2, 3]})
75
    => {'a': 1, 'c_a': 2, 'c_b_x': 5, 'd': [1, 2, 3], 'c_b_y': 10}
76
    """
77
    items = []
78
    for k, v in d.items():
79
        new_key = parent_key + sep + k if parent_key else k
80
        if isinstance(v, collections.MutableMapping):
81
            items.extend(flatten(v, new_key, sep=sep).items())
82
        else:
83
            items.append((new_key, v))
84
    return dict(items)
85
86
87
def parse_args(args=sys.argv[1:]):
88
    parser = argparse.ArgumentParser(description='A Kafka consumer for InfluxDB',
89
                                     formatter_class=argparse.ArgumentDefaultsHelpFormatter)
90
    parser.add_argument('--kafka_host', type=str, default=argparse.SUPPRESS,
91
                        help="Hostname or IP of Kafka message broker (default: localhost)")
92
    parser.add_argument('--kafka_port', type=int, default=argparse.SUPPRESS,
93
                        help="Port of Kafka message broker (default: 9092)")
94
    parser.add_argument('--kafka_topic', type=str, default=argparse.SUPPRESS,
95
                        help="Topic for metrics (default: my_topic)")
96
    parser.add_argument('--kafka_group', type=str, default=argparse.SUPPRESS,
97
                        help="Kafka consumer group (default: my_group)")
98
    parser.add_argument('--influxdb_host', type=str, default=argparse.SUPPRESS,
99
                        help="InfluxDB hostname or IP (default: localhost)")
100
    parser.add_argument('--influxdb_port', type=int, default=argparse.SUPPRESS,
101
                        help="InfluxDB API port (default: 8086)")
102
    parser.add_argument('--influxdb_user', type=str, default=argparse.SUPPRESS,
103
                        help="InfluxDB username (default: root)")
104
    parser.add_argument('--influxdb_password', type=str, default=argparse.SUPPRESS,
105
                        help="InfluxDB password (default: root)")
106
    parser.add_argument('--influxdb_dbname', type=str, default=argparse.SUPPRESS,
107
                        help="InfluxDB database to write metrics into (default: metrics)")
108
    parser.add_argument('--influxdb_use_ssl', default=argparse.SUPPRESS, action="store_true",
109
                        help="Use SSL connection for InfluxDB (default: False)")
110
    parser.add_argument('--influxdb_verify_ssl', default=argparse.SUPPRESS, action="store_true",
111
                        help="Verify the SSL certificate before connecting (default: False)")
112
    parser.add_argument('--influxdb_timeout', type=int, default=argparse.SUPPRESS,
113
                        help="Max number of seconds to establish a connection to InfluxDB (default: 5)")
114
    parser.add_argument('--influxdb_use_udp', default=argparse.SUPPRESS, action="store_true",
115
                        help="Use UDP connection for InfluxDB (default: False)")
116
    parser.add_argument('--influxdb_retention_policy', type=str, default=argparse.SUPPRESS,
117
                        help="Retention policy for incoming metrics (default: default)")
118
    parser.add_argument('--influxdb_time_precision', type=str, default=argparse.SUPPRESS,
119
                        help="Precision of incoming metrics. Can be one of 's', 'm', 'ms', 'u' (default: s)")
120
    parser.add_argument('--encoder', type=str, default=argparse.SUPPRESS,
121
                        help="Input encoder which converts an incoming message to dictionary "
122
                             "(default: collectd_graphite_encoder)")
123
    parser.add_argument('--buffer_size', type=int, default=argparse.SUPPRESS,
124
                        help="Maximum number of messages that will be collected before flushing to the backend "
125
                             "(default: 1000)")
126
    parser.add_argument('-c', '--configfile', type=str, default=argparse.SUPPRESS,
127
                        help="Configfile path (default: None)")
128
    parser.add_argument('-s', '--statistics', default=argparse.SUPPRESS, action="store_true",
129
                        help="Show performance statistics (default: True)")
130
    parser.add_argument('-b', '--benchmark', default=argparse.SUPPRESS, action="store_true",
131
                        help="Run benchmark (default: False)")
132
    parser.add_argument('-v', '--verbose', action='count', default=argparse.SUPPRESS,
133
                        help="Set verbosity level. Increase verbosity by adding a v: -v -vv -vvv (default: 0)")
134
    parser.add_argument('--version', action="store_true", help="Show version")
135
    cli_args = parser.parse_args(args)
136
    # Convert config from argparse Namespace to dict
137
    return vars(cli_args)
138