Issues (70)

ssg/yaml.py (1 issue)

1
from __future__ import absolute_import
2
from __future__ import print_function
3
4
import codecs
5
import re
6
import sys
7
import yaml
8
9
from collections import OrderedDict
10
11
from .jinja import load_macros, process_file
12
13
try:
14
    from yaml import CSafeLoader as yaml_SafeLoader
15
except ImportError:
16
    from yaml import SafeLoader as yaml_SafeLoader
17
18
try:
19
    from yaml import CLoader as yaml_Loader
20
except ImportError:
21
    from yaml import Loader as yaml_Loader
22
23
try:
24
    from yaml import CDumper as yaml_Dumper
25
except ImportError:
26
    from yaml import Dumper as yaml_Dumper
27
28
def _bool_constructor(self, node):
29
    return self.construct_scalar(node)
30
31
32
def _unicode_constructor(self, node):
33
    string_like = self.construct_scalar(node)
34
    return str(string_like)
35
36
37
# Don't follow python bool case
38
yaml_SafeLoader.add_constructor(u'tag:yaml.org,2002:bool', _bool_constructor)
39
# Python2-relevant - become able to resolve "unicode strings"
40
yaml_SafeLoader.add_constructor(u'tag:yaml.org,2002:python/unicode', _unicode_constructor)
41
42
43
class DocumentationNotComplete(Exception):
44
    pass
45
46
47
def _save_rename(result, stem, prefix):
48
    result["{0}_{1}".format(prefix, stem)] = stem
49
50
51
def _get_yaml_contents_without_documentation_complete(parsed_yaml, substitutions_dict):
52
    """
53
    If the YAML is a mapping, then handle the documentation_complete accordingly,
54
    and take that key-value out.
55
    Otherwise, if YAML is empty, or it is a list, pass it on.
56
    """
57
    if isinstance(parsed_yaml, dict):
58
        documentation_incomplete_content_and_not_debug_build = (
59
            parsed_yaml.pop("documentation_complete", "true") == "false"
60
            and substitutions_dict.get("cmake_build_type") != "Debug")
61
        if documentation_incomplete_content_and_not_debug_build:
62
            raise DocumentationNotComplete("documentation not complete and not a debug build")
63
    return parsed_yaml
64
65
66
def _open_yaml(stream, original_file=None, substitutions_dict={}):
67
    """
68
    Open given file-like object and parse it as YAML.
69
70
    Optionally, pass the path to the original_file for better error handling
71
    when the file contents are passed.
72
73
    Raise an exception if it contains "documentation_complete" key set to "false".
74
    """
75
    try:
76
        yaml_contents = yaml.load(stream, Loader=yaml_SafeLoader)
77
78
        return _get_yaml_contents_without_documentation_complete(yaml_contents, substitutions_dict)
79
    except DocumentationNotComplete as e:
80
        raise e
81
    except Exception as e:
82
        count = 0
83
        _file = original_file
84
        if not _file:
85
            _file = stream
86
        with open(_file, "r") as e_file:
87
            lines = e_file.readlines()
88
            for line in lines:
89
                count = count + 1
90
                if re.match(r"^\s*\t+\s*", line):
91
                    print("Exception while handling file: %s" % _file, file=sys.stderr)
92
                    print("TabIndentationError: Line %s contains tabs instead of spaces:" % (count), file=sys.stderr)
93
                    print("%s\n\n" % repr(line.strip("\n")), file=sys.stderr)
94
                    sys.exit(1)
95
96
        print("Exception while handling file: %s" % _file, file=sys.stderr)
97
        raise e
98
99
100
def open_and_expand(yaml_file, substitutions_dict=None):
101
    """
102
    Process the file as a template, using substitutions_dict to perform
103
    expansion. Then, process the expansion result as a YAML content.
104
105
    See also: _open_yaml
106
    """
107
    if substitutions_dict is None:
108
        substitutions_dict = dict()
109
110
    expanded_template = process_file(yaml_file, substitutions_dict)
111
    try:
112
        yaml_contents = _open_yaml(expanded_template, yaml_file, substitutions_dict)
113
    except yaml.scanner.ScannerError as e:
114
        print("A Jinja template expansion can mess up the indentation.")
115
        print("Please, check if the contents below are correctly expanded:")
116
        print("Source yaml: {}".format(yaml_file))
117
        print("Expanded yaml:\n{}".format(expanded_template))
118
        sys.exit(1)
119
120
    return yaml_contents
121
122
123
def open_and_macro_expand(yaml_file, substitutions_dict=None):
124
    """
125
    Do the same as open_and_expand, but load definitions of macros
126
    so they can be expanded in the template.
127
    """
128
    substitutions_dict = load_macros(substitutions_dict)
129
    return open_and_expand(yaml_file, substitutions_dict)
130
131
132
def open_raw(yaml_file):
133
    """
134
    Open given file-like object and parse it as YAML
135
    without performing any kind of template processing
136
137
    See also: _open_yaml
138
    """
139
    with codecs.open(yaml_file, "r", "utf8") as stream:
140
        yaml_contents = _open_yaml(stream, original_file=yaml_file)
141
    return yaml_contents
142
143
144
def ordered_load(stream, Loader=yaml_Loader, object_pairs_hook=OrderedDict):
145
    """
146
    Drop-in replacement for yaml.load(), but preserves order of dictionaries
147
    """
148
    class OrderedLoader(Loader):
149
        pass
150
151
    def construct_mapping(loader, node):
152
        loader.flatten_mapping(node)
153
        return object_pairs_hook(loader.construct_pairs(node))
154
    OrderedLoader.add_constructor(
155
        yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
156
        construct_mapping)
157
    return yaml.load(stream, OrderedLoader)
158
159
160
def ordered_dump(data, stream=None, Dumper=yaml_Dumper, **kwds):
161
    """
162
    Drop-in replacement for yaml.dump(), but preserves order of dictionaries
163
    """
164
    class OrderedDumper(Dumper):
165
        # fix tag indentations
166
        def increase_indent(self, flow=False, indentless=False):
167
            return super(OrderedDumper, self).increase_indent(flow, False)
168
169
    def _dict_representer(dumper, data):
170
        return dumper.represent_mapping(
171
            yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
172
            data.items())
173
174
    def _str_representer(dumper, data):
175
        if '\n' in data:
176
            return dumper.represent_scalar(u'tag:yaml.org,2002:str', data,
177
                                           style='|')
178
        else:
179
            return dumper.represent_str(data)
180
181
    OrderedDumper.add_representer(OrderedDict, _dict_representer)
182
    OrderedDumper.add_representer(str, _str_representer)
183
184
    # Fix formatting by adding a space in between tasks
185
    unformatted_yaml = yaml.dump(data, None, OrderedDumper, **kwds)
186
    formatted_yaml = re.sub(r"[\n]+([\s]*)- name", r"\n\n\1- name", unformatted_yaml)
187
188
    # Fix CDumper issue where it adds yaml document ending '...'
189
    # in some templated ansible remediations
190
    formatted_yaml = re.sub(r"\n\s*\.\.\.\s*", r"\n", formatted_yaml)
191
192
    if stream is not None:
193
        return stream.write(formatted_yaml)
194
    else:
195
        return formatted_yaml
196
197
198
def _strings_to_list(one_or_more_strings):
199
    """
200
    Output a list, that either contains one string, or a list of strings.
201
    In Python, strings can be cast to lists without error, but with unexpected result.
202
    """
203
    if isinstance(one_or_more_strings, str):
204
        return [one_or_more_strings]
205
    else:
206
        return list(one_or_more_strings)
207
208
209 View Code Duplication
def update_yaml_list_or_string(current_contents, new_contents, prepend=False):
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
210
    result = []
211
    if current_contents:
212
        result += _strings_to_list(current_contents)
213
    if new_contents:
214
        if prepend:
215
            result = _strings_to_list(new_contents) + result
216
        else:
217
            result += _strings_to_list(new_contents)
218
    if not result:
219
        result = ""
220
    if len(result) == 1:
221
        result = result[0]
222
    return result
223
224
225
def convert_string_to_bool(string):
226
    """
227
    Returns True if string is "true" (in any letter case)
228
    returns False if "false"
229
    raises ValueError
230
    """
231
    lower = string.lower()
232
    if lower == "true":
233
        return True
234
    elif lower == "false":
235
        return False
236
    else:
237
        raise ValueError(
238
                    "Invalid value %s while expecting boolean string" % string)
239