stakkr.configreader   A
last analyzed

Complexity

Total Complexity 15

Size/Duplication

Total Lines 106
Duplicated Lines 0 %

Test Coverage

Coverage 98.15%

Importance

Changes 0
Metric Value
eloc 59
dl 0
loc 106
ccs 53
cts 54
cp 0.9815
rs 10
c 0
b 0
f 0
wmc 15

5 Methods

Rating   Name   Duplication   Size   Complexity  
A Config.__init__() 0 9 1
A Config.display_errors() 0 9 1
A Config._build_config_files_list() 0 7 1
A Config.read() 0 32 5
A Config._build_config_schemas_list() 0 5 1

2 Functions

Rating   Name   Duplication   Size   Complexity  
A get_config_and_project_dir() 0 7 2
A _merge_dictionaries() 0 11 4
1
# coding: utf-8
2 1
"""Simple Config Reader."""
3
4 1
from os import path
5 1
from sys import stderr
6 1
import anyconfig
7 1
from stakkr.file_utils import get_file, find_project_dir
8 1
from yaml import FullLoader
9
10
11 1
class Config:
12
    """
13
    Parser of Stakkr.
14
15
    Set default values and validate stakkr.yml with specs
16
    """
17
18 1
    def __init__(self, config_file: str):
19
        """
20
        Build list of files to validate a config, set default values
21
        Then the given config file
22
        """
23 1
        self.config_file, self.project_dir = get_config_and_project_dir(config_file)
24 1
        self._build_config_files_list()
25 1
        self._build_config_schemas_list()
26 1
        self.error = ''
27
28 1
    def display_errors(self):
29
        """Display errors in STDERR."""
30 1
        from click import style
31
32 1
        msg = 'Failed validating config ('
33 1
        msg += ', '.join(self.config_files)
34 1
        msg += '):\n    - {}\n'.format(self.error)
35 1
        msg += '\nMake sure you have the right services.\n'
36 1
        stderr.write(style(msg, fg='red'))
37
38 1
    def read(self):
39
        """
40
        Parse the configs and validate it.
41
42
        It could be either local or from a local services
43
        (first local then packages by alphabetical order).
44
        """
45 1
        spec = {}
46 1
        for filepath in self.spec_files:
47 1
            yaml = anyconfig.load(filepath, Loader=FullLoader)
48 1
            spec = _merge_dictionaries(spec, yaml)
49
        # Waiting anyconfig to work for that :
50
        # schema = anyconfig.load(self.spec_files, Loader=FullLoader)
51
52 1
        conf = {}
53 1
        for filepath in self.config_files:
54 1
            yaml = anyconfig.load(filepath, Loader=FullLoader)
55 1
            conf = _merge_dictionaries(conf, yaml)
56
        # Waiting anyconfig to work for that :
57
        # config = anyconfig.load(self.spec_files, Loader=FullLoader)
58
59
        # Make sure the compiled configuration is valid
60 1
        valid, errs = anyconfig.validate(conf, spec, ac_schema_safe=False, ac_schema_errors=True)
61 1
        if valid is False:
62 1
            self.error = errs[0]
63 1
            return False
64
65 1
        conf['project_dir'] = path.realpath(path.dirname(self.config_file))
66 1
        if conf['project_name'] == '':
67 1
            conf['project_name'] = path.basename(conf['project_dir'])
68
69 1
        return conf
70
71 1
    def _build_config_files_list(self):
72 1
        self.config_files = [
73
            # Stakkr default config
74
            get_file('static', 'config_default.yml'),
75
            '{}/services/*/config_default.yml'.format(self.project_dir)]
76
        # Stakkr main config file finally with user's values
77 1
        self.config_files += [self.config_file]
78
79 1
    def _build_config_schemas_list(self):
80 1
        self.spec_files = [
81
            # Stakkr config validation
82
            get_file('static', 'config_schema.yml'),
83
            '{}/services/*/config_schema.yml'.format(self.project_dir)]
84
85
86 1
def get_config_and_project_dir(config_file: str):
87
    """Guess config file name and project dir"""
88 1
    if config_file is not None:
89 1
        return path.abspath(config_file), path.dirname(config_file)
90
91 1
    project_dir = find_project_dir()
92
    return '{}/stakkr.yml'.format(project_dir), project_dir
93
94
95 1
def _merge_dictionaries(dict1, dict2):
96 1
    for key, val in dict1.items():
97 1
        if isinstance(val, dict):
98 1
            dict2_node = dict2.setdefault(key, {})
99 1
            _merge_dictionaries(val, dict2_node)
100
101
        else:
102 1
            if key not in dict2:
103 1
                dict2[key] = val
104
105
    return dict2
106