load_config()   B
last analyzed

Complexity

Conditions 5

Size

Total Lines 36

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
c 0
b 0
f 0
dl 0
loc 36
rs 8.0894
1
from . import default_config
2
import yaml
3
import logging
4
import argparse
5
import collections
6
import sys
7
8
9
class ObjectView(object):
10
    """
11
    Creates an object out of a dictionary
12
    """
13
    def __init__(self, d):
14
        self.__dict__ = d
15
16
17
def load_config():
18
    """
19
    Load settings from default config and optionally
20
    overwrite with config file and commandline parameters
21
    (in that order).
22
23
    Note: Commandline parameters are of the form
24
    --kafka_host="localhost"
25
    to make them easy to enter from the cli
26
    while the config file parameters are stored in a dict
27
    {kafka: { host: localhost }}
28
    to avoid redundancy in the key name.
29
    So to merge them, we flatten all keys.
30
    """
31
    # We start with the default config
32
    config = flatten(default_config.DEFAULT_CONFIG)
33
34
    # Read commandline arguments
35
    cli_config = flatten(parse_args())
36
37
    if "configfile" in cli_config:
38
        print("Reading config file {}".format(cli_config['configfile']))
39
        configfile = flatten(parse_configfile(cli_config['configfile']))
40
        config = overwrite_config(config, configfile)
41
42
    # Parameters from commandline take precedence over all others
43
    config = overwrite_config(config, cli_config)
44
45
    # Set verbosity level
46
    if 'verbose' in config:
47
        if config['verbose'] == 1:
48
            logging.getLogger().setLevel(logging.INFO)
49
        elif config['verbose'] > 1:
50
            logging.getLogger().setLevel(logging.DEBUG)
51
52
    return ObjectView(config)
53
54
55
def overwrite_config(old_values, new_values):
56
    config = old_values.copy()
57
    config.update(new_values)
58
    return config
59
60
61
def parse_configfile(configfile):
62
    """
63
    Read settings from file
64
    """
65
    with open(configfile) as f:
66
        try:
67
            return yaml.safe_load(f)
68
        except Exception as e:
69
            logging.fatal("Could not load default config file: ", e)
70
            exit(-1)
71
72
73
def flatten(d, parent_key='', sep='_'):
74
    """
75
    Flatten keys in a dictionary
76
    Example:
77
    flatten({'a': 1, 'c': {'a': 2, 'b': {'x': 5, 'y' : 10}}, 'd': [1, 2, 3]})
78
    => {'a': 1, 'c_a': 2, 'c_b_x': 5, 'd': [1, 2, 3], 'c_b_y': 10}
79
    """
80
    items = []
81
    for k, v in d.items():
82
        new_key = parent_key + sep + k if parent_key else k
83
        if isinstance(v, collections.MutableMapping):
84
            items.extend(flatten(v, new_key, sep=sep).items())
85
        else:
86
            items.append((new_key, v))
87
    return dict(items)
88
89
90
def parse_args(args=sys.argv[1:]):
91
    parser = argparse.ArgumentParser(description='A Kafka consumer for InfluxDB',
92
                                     formatter_class=argparse.ArgumentDefaultsHelpFormatter)
93
    parser.add_argument('--kafka_host', type=str, default=argparse.SUPPRESS,
94
                        help="Hostname or IP of Kafka message broker (default: localhost)")
95
    parser.add_argument('--kafka_port', type=int, default=argparse.SUPPRESS,
96
                        help="Port of Kafka message broker (default: 9092)")
97
    parser.add_argument('--kafka_topic', type=str, default=argparse.SUPPRESS,
98
                        help="Topic for metrics (default: my_topic)")
99
    parser.add_argument('--kafka_offset', type=str, default=argparse.SUPPRESS,
100
                        help="Kafka offset (default: largest)")
101
    parser.add_argument('--kafka_group', type=str, default=argparse.SUPPRESS,
102
                        help="Kafka consumer group (default: my_group)")
103
    parser.add_argument('--kafka_reconnect_wait_time_ms', type=int, default=argparse.SUPPRESS,
104
                        help="Kafka reconnect wait time [ms] (default: 1000)")
105
    parser.add_argument('--kafka_reader', type=str, default=argparse.SUPPRESS,
106
                        help="Kafka client library to use (e.g. kafka_python or confluent)"
107
                             "(default: kafka_influxdb.reader.confluent)")
108
    parser.add_argument('--influxdb_host', type=str, default=argparse.SUPPRESS,
109
                        help="InfluxDB hostname or IP (default: localhost)")
110
    parser.add_argument('--influxdb_port', type=int, default=argparse.SUPPRESS,
111
                        help="InfluxDB API port (default: 8086)")
112
    parser.add_argument('--influxdb_user', type=str, default=argparse.SUPPRESS,
113
                        help="InfluxDB username (default: root)")
114
    parser.add_argument('--influxdb_password', type=str, default=argparse.SUPPRESS,
115
                        help="InfluxDB password (default: root)")
116
    parser.add_argument('--influxdb_dbname', type=str, default=argparse.SUPPRESS,
117
                        help="InfluxDB database to write metrics into (default: metrics)")
118
    parser.add_argument('--influxdb_use_ssl', default=argparse.SUPPRESS, action="store_true",
119
                        help="Use SSL connection for InfluxDB (default: False)")
120
    parser.add_argument('--influxdb_verify_ssl', default=argparse.SUPPRESS, action="store_true",
121
                        help="Verify the SSL certificate before connecting (default: False)")
122
    parser.add_argument('--influxdb_timeout', type=int, default=argparse.SUPPRESS,
123
                        help="Max number of seconds to establish a connection to InfluxDB (default: 5)")
124
    parser.add_argument('--influxdb_use_udp', default=argparse.SUPPRESS, action="store_true",
125
                        help="Use UDP connection for InfluxDB (default: False)")
126
    parser.add_argument('--influxdb_retention_policy', type=str, default=argparse.SUPPRESS,
127
                        help="Retention policy for incoming metrics (default: default)")
128
    parser.add_argument('--influxdb_time_precision', type=str, default=argparse.SUPPRESS,
129
                        help="Precision of incoming metrics. Can be one of 's', 'm', 'ms', 'u' (default: s)")
130
    parser.add_argument('--encoder', type=str, default=argparse.SUPPRESS,
131
                        help="Input encoder which converts an incoming message to dictionary "
132
                             "(default: kafka_influxdb.encoder.collectd_graphite_encoder)")
133
    parser.add_argument('--buffer_size', type=int, default=argparse.SUPPRESS,
134
                        help="Maximum number of messages that will be collected before flushing to the backend "
135
                             "(default: 1000)")
136
    parser.add_argument('--buffer_timeout', type=int, default=argparse.SUPPRESS,
137
                        help="Maximum age (seconds) of messages in buffer before flushing to the backend "
138
                             "(default: False)")
139
    parser.add_argument('-c', '--configfile', type=str, default=argparse.SUPPRESS,
140
                        help="Configfile path (default: None)")
141
    parser.add_argument('-s', '--statistics', default=argparse.SUPPRESS, action="store_true",
142
                        help="Show performance statistics (default: True)")
143
    parser.add_argument('-v', '--verbose', action='count', default=argparse.SUPPRESS,
144
                        help="Set verbosity level. Increase verbosity by adding a v: -v -vv -vvv (default: 0)")
145
    parser.add_argument('--version', action="store_true", help="Show version")
146
    cli_args = parser.parse_args(args)
147
    # Convert config from argparse Namespace to dict
148
    return vars(cli_args)
149