Passed
Push — master ( 3ac411...885497 )
by Matěj
02:58 queued 14s
created

ssg.yaml.ordered_dump()   A

Complexity

Conditions 3

Size

Total Lines 32
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 16
CRAP Score 3.0017

Importance

Changes 0
Metric Value
cc 3
eloc 20
nop 4
dl 0
loc 32
ccs 16
cts 17
cp 0.9412
crap 3.0017
rs 9.4
c 0
b 0
f 0
1 2
from __future__ import absolute_import
2 2
from __future__ import print_function
3
4 2
import codecs
5 2
import yaml
6 2
import sys
7 2
import re
8
9 2
from collections import OrderedDict
10
11 2
from .jinja import load_macros, process_file
12 2
from .constants import (PKG_MANAGER_TO_SYSTEM,
13
                        PKG_MANAGER_TO_CONFIG_FILE,
14
                        XCCDF_PLATFORM_TO_PACKAGE)
15 2
from .constants import DEFAULT_UID_MIN
16 2
from .utils import merge_dicts
17
18 2
try:
19 2
    from yaml import CSafeLoader as yaml_SafeLoader
20
except ImportError:
21
    from yaml import SafeLoader as yaml_SafeLoader
22
23
24 2
def _bool_constructor(self, node):
25 2
    return self.construct_scalar(node)
26
27
28
# Don't follow python bool case
29 2
yaml_SafeLoader.add_constructor(u'tag:yaml.org,2002:bool', _bool_constructor)
30
31
32 2
class DocumentationNotComplete(Exception):
33 2
    pass
34
35
36 2
def _save_rename(result, stem, prefix):
37
    result["{0}_{1}".format(prefix, stem)] = stem
38
39
40 2
def _open_yaml(stream, original_file=None, substitutions_dict={}):
41
    """
42
    Open given file-like object and parse it as YAML.
43
44
    Optionally, pass the path to the original_file for better error handling
45
    when the file contents are passed.
46
47
    Return None if it contains "documentation_complete" key set to "false".
48
    """
49 2
    try:
50 2
        yaml_contents = yaml.load(stream, Loader=yaml_SafeLoader)
51
52 2
        if yaml_contents.pop("documentation_complete", "true") == "false" and \
53
                substitutions_dict.get("cmake_build_type") != "Debug":
54
            raise DocumentationNotComplete("documentation not complete and not a debug build")
55
56 2
        return yaml_contents
57
    except DocumentationNotComplete as e:
58
        raise e
59
    except Exception as e:
60
        count = 0
61
        _file = original_file
62
        if not _file:
63
            _file = stream
64
        with open(_file, "r") as e_file:
65
            lines = e_file.readlines()
66
            for line in lines:
67
                count = count + 1
68
                if re.match(r"^\s*\t+\s*", line):
69
                    print("Exception while handling file: %s" % _file, file=sys.stderr)
70
                    print("TabIndentationError: Line %s contains tabs instead of spaces:" % (count), file=sys.stderr)
71
                    print("%s\n\n" % repr(line.strip("\n")), file=sys.stderr)
72
                    sys.exit(1)
73
74
        print("Exception while handling file: %s" % _file, file=sys.stderr)
75
        raise e
76
77
78 2
def _get_implied_properties(existing_properties):
79
    result = dict()
80
    if "pkg_manager" in existing_properties:
81
        pkg_manager = existing_properties["pkg_manager"]
82
        if "pkg_system" not in existing_properties:
83
            result["pkg_system"] = PKG_MANAGER_TO_SYSTEM[pkg_manager]
84
        if "pkg_manager_config_file" not in existing_properties:
85
            if pkg_manager in PKG_MANAGER_TO_CONFIG_FILE:
86
                result["pkg_manager_config_file"] = PKG_MANAGER_TO_CONFIG_FILE[pkg_manager]
87
88
    if "uid_min" not in existing_properties:
89
        result["uid_min"] = DEFAULT_UID_MIN
90
91
    if "auid" not in existing_properties:
92
        result["auid"] = existing_properties.get("uid_min", DEFAULT_UID_MIN)
93
94
    return result
95
96
97 2
def open_and_expand(yaml_file, substitutions_dict=None):
98
    """
99
    Process the file as a template, using substitutions_dict to perform
100
    expansion. Then, process the expansion result as a YAML content.
101
102
    See also: _open_yaml
103
    """
104 2
    if substitutions_dict is None:
105 2
        substitutions_dict = dict()
106
107 2
    expanded_template = process_file(yaml_file, substitutions_dict)
108 2
    try:
109 2
        yaml_contents = _open_yaml(expanded_template, yaml_file, substitutions_dict)
110
    except yaml.scanner.ScannerError as e:
111
        print("A Jinja template expansion can mess up the indentation.")
112
        print("Please, check if the contents below are correctly expanded:")
113
        print("Source yaml: {}".format(yaml_file))
114
        print("Expanded yaml:\n{}".format(expanded_template))
115
        sys.exit(1)
116
117 2
    return yaml_contents
118
119
120 2
def open_and_macro_expand(yaml_file, substitutions_dict=None):
121
    """
122
    Do the same as open_and_expand, but load definitions of macros
123
    so they can be expanded in the template.
124
    """
125 2
    substitutions_dict = load_macros(substitutions_dict)
126 2
    return open_and_expand(yaml_file, substitutions_dict)
127
128
129 2
def open_raw(yaml_file):
130
    """
131
    Open given file-like object and parse it as YAML
132
    without performing any kind of template processing
133
134
    See also: _open_yaml
135
    """
136 2
    with codecs.open(yaml_file, "r", "utf8") as stream:
137 2
        yaml_contents = _open_yaml(stream, original_file=yaml_file)
138 2
    return yaml_contents
139
140
141 2
def _validate_product_oval_feed_url(contents):
142
    if "oval_feed_url" not in contents:
143
        return
144
    url = contents["oval_feed_url"]
145
    if not url.startswith("https"):
146
        msg = (
147
            "OVAL feed of product '{product}' is not available through an encrypted channel: {url}"
148
            .format(product=contents["product"], url=url)
149
        )
150
        raise ValueError(msg)
151
152
153 2
def open_environment(build_config_yaml, product_yaml):
154
    contents = open_raw(build_config_yaml)
155
    contents.update(open_raw(product_yaml))
156
    _validate_product_oval_feed_url(contents)
157
    platform_package_overrides = contents.get("platform_package_overrides", {})
158
    # Merge common platform package mappings, while keeping product specific mappings
159
    contents["platform_package_overrides"] = merge_dicts(XCCDF_PLATFORM_TO_PACKAGE,
160
                                                         platform_package_overrides)
161
    contents.update(_get_implied_properties(contents))
162
    return contents
163
164
165 2
def ordered_load(stream, Loader=yaml.Loader, object_pairs_hook=OrderedDict):
166
    """
167
    Drop-in replacement for yaml.load(), but preserves order of dictionaries
168
    """
169 2
    class OrderedLoader(Loader):
170 2
        pass
171
172 2
    def construct_mapping(loader, node):
173 2
        loader.flatten_mapping(node)
174 2
        return object_pairs_hook(loader.construct_pairs(node))
175 2
    OrderedLoader.add_constructor(
176
        yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
177
        construct_mapping)
178 2
    return yaml.load(stream, OrderedLoader)
179
180
181 2
def ordered_dump(data, stream=None, Dumper=yaml.Dumper, **kwds):
182
    """
183
    Drop-in replacement for yaml.dump(), but preserves order of dictionaries
184
    """
185 2
    class OrderedDumper(Dumper):
186
        # fix tag indentations
187 2
        def increase_indent(self, flow=False, indentless=False):
188 2
            return super(OrderedDumper, self).increase_indent(flow, False)
189
190 2
    def _dict_representer(dumper, data):
191 2
        return dumper.represent_mapping(
192
            yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
193
            data.items())
194
195 2
    def _str_representer(dumper, data):
196 2
        if '\n' in data:
197
            return dumper.represent_scalar(u'tag:yaml.org,2002:str', data,
198
                                           style='|')
199
        else:
200 2
            return dumper.represent_str(data)
201
202 2
    OrderedDumper.add_representer(OrderedDict, _dict_representer)
203 2
    OrderedDumper.add_representer(str, _str_representer)
204
205
    # Fix formatting by adding a space in between tasks
206 2
    unformatted_yaml = yaml.dump(data, None, OrderedDumper, **kwds)
207 2
    formatted_yaml = re.sub(r"[\n]+([\s]*)- name", r"\n\n\1- name", unformatted_yaml)
208
209 2
    if stream is not None:
210 2
        return stream.write(formatted_yaml)
211
    else:
212 2
        return formatted_yaml
213
214
215 2
def _strings_to_list(one_or_more_strings):
216
    """
217
    Output a list, that either contains one string, or a list of strings.
218
    In Python, strings can be cast to lists without error, but with unexpected result.
219
    """
220 2
    if isinstance(one_or_more_strings, str):
221 2
        return [one_or_more_strings]
222
    else:
223 2
        return list(one_or_more_strings)
224
225
226 2 View Code Duplication
def update_yaml_list_or_string(current_contents, new_contents):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
227 2
    result = []
228 2
    if current_contents:
229 2
        result += _strings_to_list(current_contents)
230 2
    if new_contents:
231 2
        result += _strings_to_list(new_contents)
232 2
    if not result:
233 2
        result = ""
234 2
    if len(result) == 1:
235 2
        result = result[0]
236
    return result
237