1 | from __future__ import absolute_import |
||
2 | from __future__ import print_function |
||
3 | |||
4 | import codecs |
||
5 | import re |
||
6 | import sys |
||
7 | import yaml |
||
8 | |||
9 | from collections import OrderedDict |
||
10 | |||
11 | from .jinja import load_macros, process_file |
||
12 | |||
13 | try: |
||
14 | from yaml import CSafeLoader as yaml_SafeLoader |
||
15 | except ImportError: |
||
16 | from yaml import SafeLoader as yaml_SafeLoader |
||
17 | |||
18 | try: |
||
19 | from yaml import CLoader as yaml_Loader |
||
20 | except ImportError: |
||
21 | from yaml import Loader as yaml_Loader |
||
22 | |||
23 | try: |
||
24 | from yaml import CDumper as yaml_Dumper |
||
25 | except ImportError: |
||
26 | from yaml import Dumper as yaml_Dumper |
||
27 | |||
28 | def _bool_constructor(self, node): |
||
29 | return self.construct_scalar(node) |
||
30 | |||
31 | |||
32 | def _unicode_constructor(self, node): |
||
33 | string_like = self.construct_scalar(node) |
||
34 | return str(string_like) |
||
35 | |||
36 | |||
37 | # Don't follow python bool case |
||
38 | yaml_SafeLoader.add_constructor(u'tag:yaml.org,2002:bool', _bool_constructor) |
||
39 | # Python2-relevant - become able to resolve "unicode strings" |
||
40 | yaml_SafeLoader.add_constructor(u'tag:yaml.org,2002:python/unicode', _unicode_constructor) |
||
41 | |||
42 | |||
43 | class DocumentationNotComplete(Exception): |
||
44 | pass |
||
45 | |||
46 | |||
47 | def _save_rename(result, stem, prefix): |
||
48 | result["{0}_{1}".format(prefix, stem)] = stem |
||
49 | |||
50 | |||
51 | def _get_yaml_contents_without_documentation_complete(parsed_yaml, substitutions_dict): |
||
52 | """ |
||
53 | If the YAML is a mapping, then handle the documentation_complete accordingly, |
||
54 | and take that key-value out. |
||
55 | Otherwise, if YAML is empty, or it is a list, pass it on. |
||
56 | """ |
||
57 | if isinstance(parsed_yaml, dict): |
||
58 | documentation_incomplete_content_and_not_debug_build = ( |
||
59 | parsed_yaml.pop("documentation_complete", "true") == "false" |
||
60 | and substitutions_dict.get("cmake_build_type") != "Debug") |
||
61 | if documentation_incomplete_content_and_not_debug_build: |
||
62 | raise DocumentationNotComplete("documentation not complete and not a debug build") |
||
63 | return parsed_yaml |
||
64 | |||
65 | |||
66 | def _open_yaml(stream, original_file=None, substitutions_dict={}): |
||
67 | """ |
||
68 | Open given file-like object and parse it as YAML. |
||
69 | |||
70 | Optionally, pass the path to the original_file for better error handling |
||
71 | when the file contents are passed. |
||
72 | |||
73 | Raise an exception if it contains "documentation_complete" key set to "false". |
||
74 | """ |
||
75 | try: |
||
76 | yaml_contents = yaml.load(stream, Loader=yaml_SafeLoader) |
||
77 | |||
78 | return _get_yaml_contents_without_documentation_complete(yaml_contents, substitutions_dict) |
||
79 | except DocumentationNotComplete as e: |
||
80 | raise e |
||
81 | except Exception as e: |
||
82 | count = 0 |
||
83 | _file = original_file |
||
84 | if not _file: |
||
85 | _file = stream |
||
86 | with open(_file, "r") as e_file: |
||
87 | lines = e_file.readlines() |
||
88 | for line in lines: |
||
89 | count = count + 1 |
||
90 | if re.match(r"^\s*\t+\s*", line): |
||
91 | print("Exception while handling file: %s" % _file, file=sys.stderr) |
||
92 | print("TabIndentationError: Line %s contains tabs instead of spaces:" % (count), file=sys.stderr) |
||
93 | print("%s\n\n" % repr(line.strip("\n")), file=sys.stderr) |
||
94 | sys.exit(1) |
||
95 | |||
96 | print("Exception while handling file: %s" % _file, file=sys.stderr) |
||
97 | raise e |
||
98 | |||
99 | |||
100 | def open_and_expand(yaml_file, substitutions_dict=None): |
||
101 | """ |
||
102 | Process the file as a template, using substitutions_dict to perform |
||
103 | expansion. Then, process the expansion result as a YAML content. |
||
104 | |||
105 | See also: _open_yaml |
||
106 | """ |
||
107 | if substitutions_dict is None: |
||
108 | substitutions_dict = dict() |
||
109 | |||
110 | expanded_template = process_file(yaml_file, substitutions_dict) |
||
111 | try: |
||
112 | yaml_contents = _open_yaml(expanded_template, yaml_file, substitutions_dict) |
||
113 | except yaml.scanner.ScannerError as e: |
||
114 | print("A Jinja template expansion can mess up the indentation.") |
||
115 | print("Please, check if the contents below are correctly expanded:") |
||
116 | print("Source yaml: {}".format(yaml_file)) |
||
117 | print("Expanded yaml:\n{}".format(expanded_template)) |
||
118 | sys.exit(1) |
||
119 | |||
120 | return yaml_contents |
||
121 | |||
122 | |||
123 | def open_and_macro_expand(yaml_file, substitutions_dict=None): |
||
124 | """ |
||
125 | Do the same as open_and_expand, but load definitions of macros |
||
126 | so they can be expanded in the template. |
||
127 | """ |
||
128 | substitutions_dict = load_macros(substitutions_dict) |
||
129 | return open_and_expand(yaml_file, substitutions_dict) |
||
130 | |||
131 | |||
132 | def open_raw(yaml_file): |
||
133 | """ |
||
134 | Open given file-like object and parse it as YAML |
||
135 | without performing any kind of template processing |
||
136 | |||
137 | See also: _open_yaml |
||
138 | """ |
||
139 | with codecs.open(yaml_file, "r", "utf8") as stream: |
||
140 | yaml_contents = _open_yaml(stream, original_file=yaml_file) |
||
141 | return yaml_contents |
||
142 | |||
143 | |||
144 | def ordered_load(stream, Loader=yaml_Loader, object_pairs_hook=OrderedDict): |
||
145 | """ |
||
146 | Drop-in replacement for yaml.load(), but preserves order of dictionaries |
||
147 | """ |
||
148 | class OrderedLoader(Loader): |
||
149 | pass |
||
150 | |||
151 | def construct_mapping(loader, node): |
||
152 | loader.flatten_mapping(node) |
||
153 | return object_pairs_hook(loader.construct_pairs(node)) |
||
154 | OrderedLoader.add_constructor( |
||
155 | yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, |
||
156 | construct_mapping) |
||
157 | return yaml.load(stream, OrderedLoader) |
||
158 | |||
159 | |||
160 | def ordered_dump(data, stream=None, Dumper=yaml_Dumper, **kwds): |
||
161 | """ |
||
162 | Drop-in replacement for yaml.dump(), but preserves order of dictionaries |
||
163 | """ |
||
164 | class OrderedDumper(Dumper): |
||
165 | # fix tag indentations |
||
166 | def increase_indent(self, flow=False, indentless=False): |
||
167 | return super(OrderedDumper, self).increase_indent(flow, False) |
||
168 | |||
169 | def _dict_representer(dumper, data): |
||
170 | return dumper.represent_mapping( |
||
171 | yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, |
||
172 | data.items()) |
||
173 | |||
174 | def _str_representer(dumper, data): |
||
175 | if '\n' in data: |
||
176 | return dumper.represent_scalar(u'tag:yaml.org,2002:str', data, |
||
177 | style='|') |
||
178 | else: |
||
179 | return dumper.represent_str(data) |
||
180 | |||
181 | OrderedDumper.add_representer(OrderedDict, _dict_representer) |
||
182 | OrderedDumper.add_representer(str, _str_representer) |
||
183 | |||
184 | # Fix formatting by adding a space in between tasks |
||
185 | unformatted_yaml = yaml.dump(data, None, OrderedDumper, **kwds) |
||
186 | formatted_yaml = re.sub(r"[\n]+([\s]*)- name", r"\n\n\1- name", unformatted_yaml) |
||
187 | |||
188 | # Fix CDumper issue where it adds yaml document ending '...' |
||
189 | # in some templated ansible remediations |
||
190 | formatted_yaml = re.sub(r"\n\s*\.\.\.\s*", r"\n", formatted_yaml) |
||
191 | |||
192 | if stream is not None: |
||
193 | return stream.write(formatted_yaml) |
||
194 | else: |
||
195 | return formatted_yaml |
||
196 | |||
197 | |||
198 | def _strings_to_list(one_or_more_strings): |
||
199 | """ |
||
200 | Output a list, that either contains one string, or a list of strings. |
||
201 | In Python, strings can be cast to lists without error, but with unexpected result. |
||
202 | """ |
||
203 | if isinstance(one_or_more_strings, str): |
||
204 | return [one_or_more_strings] |
||
205 | else: |
||
206 | return list(one_or_more_strings) |
||
207 | |||
208 | |||
209 | View Code Duplication | def update_yaml_list_or_string(current_contents, new_contents, prepend=False): |
|
0 ignored issues
–
show
Duplication
introduced
by
![]() |
|||
210 | result = [] |
||
211 | if current_contents: |
||
212 | result += _strings_to_list(current_contents) |
||
213 | if new_contents: |
||
214 | if prepend: |
||
215 | result = _strings_to_list(new_contents) + result |
||
216 | else: |
||
217 | result += _strings_to_list(new_contents) |
||
218 | if not result: |
||
219 | result = "" |
||
220 | if len(result) == 1: |
||
221 | result = result[0] |
||
222 | return result |
||
223 | |||
224 | |||
225 | def convert_string_to_bool(string): |
||
226 | """ |
||
227 | Returns True if string is "true" (in any letter case) |
||
228 | returns False if "false" |
||
229 | raises ValueError |
||
230 | """ |
||
231 | lower = string.lower() |
||
232 | if lower == "true": |
||
233 | return True |
||
234 | elif lower == "false": |
||
235 | return False |
||
236 | else: |
||
237 | raise ValueError( |
||
238 | "Invalid value %s while expecting boolean string" % string) |
||
239 |