Total Complexity | 40 |
Total Lines | 242 |
Duplicated Lines | 5.79 % |
Coverage | 60.42% |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like ssg.yaml often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | 2 | from __future__ import absolute_import |
|
2 | 2 | from __future__ import print_function |
|
3 | |||
4 | 2 | import codecs |
|
5 | 2 | import yaml |
|
6 | 2 | import os |
|
7 | 2 | import sys |
|
8 | 2 | import re |
|
9 | |||
10 | 2 | from collections import OrderedDict |
|
11 | |||
12 | 2 | from .jinja import load_macros, process_file |
|
13 | 2 | from .constants import (PKG_MANAGER_TO_SYSTEM, |
|
14 | PKG_MANAGER_TO_CONFIG_FILE, |
||
15 | XCCDF_PLATFORM_TO_PACKAGE) |
||
16 | 2 | from .constants import DEFAULT_UID_MIN |
|
17 | 2 | from .utils import merge_dicts |
|
18 | |||
19 | 2 | try: |
|
20 | 2 | from yaml import CSafeLoader as yaml_SafeLoader |
|
21 | except ImportError: |
||
22 | from yaml import SafeLoader as yaml_SafeLoader |
||
23 | |||
24 | |||
25 | 2 | def _bool_constructor(self, node): |
|
26 | 2 | return self.construct_scalar(node) |
|
27 | |||
28 | |||
29 | # Don't follow python bool case |
||
30 | 2 | yaml_SafeLoader.add_constructor(u'tag:yaml.org,2002:bool', _bool_constructor) |
|
31 | |||
32 | |||
33 | 2 | class DocumentationNotComplete(Exception): |
|
34 | 2 | pass |
|
35 | |||
36 | |||
37 | 2 | def _save_rename(result, stem, prefix): |
|
38 | result["{0}_{1}".format(prefix, stem)] = stem |
||
39 | |||
40 | |||
41 | 2 | def _open_yaml(stream, original_file=None, substitutions_dict={}): |
|
42 | """ |
||
43 | Open given file-like object and parse it as YAML. |
||
44 | |||
45 | Optionally, pass the path to the original_file for better error handling |
||
46 | when the file contents are passed. |
||
47 | |||
48 | Return None if it contains "documentation_complete" key set to "false". |
||
49 | """ |
||
50 | 2 | try: |
|
51 | 2 | yaml_contents = yaml.load(stream, Loader=yaml_SafeLoader) |
|
52 | |||
53 | 2 | if yaml_contents.pop("documentation_complete", "true") == "false" and \ |
|
54 | substitutions_dict.get("cmake_build_type") != "Debug": |
||
55 | raise DocumentationNotComplete("documentation not complete and not a debug build") |
||
56 | |||
57 | 2 | return yaml_contents |
|
58 | except DocumentationNotComplete as e: |
||
59 | raise e |
||
60 | except Exception as e: |
||
61 | count = 0 |
||
62 | _file = original_file |
||
63 | if not _file: |
||
64 | _file = stream |
||
65 | with open(_file, "r") as e_file: |
||
66 | lines = e_file.readlines() |
||
67 | for line in lines: |
||
68 | count = count + 1 |
||
69 | if re.match(r"^\s*\t+\s*", line): |
||
70 | print("Exception while handling file: %s" % _file, file=sys.stderr) |
||
71 | print("TabIndentationError: Line %s contains tabs instead of spaces:" % (count), file=sys.stderr) |
||
72 | print("%s\n\n" % repr(line.strip("\n")), file=sys.stderr) |
||
73 | sys.exit(1) |
||
74 | |||
75 | print("Exception while handling file: %s" % _file, file=sys.stderr) |
||
76 | raise e |
||
77 | |||
78 | |||
79 | 2 | def _get_implied_properties(existing_properties): |
|
80 | result = dict() |
||
81 | if "pkg_manager" in existing_properties: |
||
82 | pkg_manager = existing_properties["pkg_manager"] |
||
83 | if "pkg_system" not in existing_properties: |
||
84 | result["pkg_system"] = PKG_MANAGER_TO_SYSTEM[pkg_manager] |
||
85 | if "pkg_manager_config_file" not in existing_properties: |
||
86 | if pkg_manager in PKG_MANAGER_TO_CONFIG_FILE: |
||
87 | result["pkg_manager_config_file"] = PKG_MANAGER_TO_CONFIG_FILE[pkg_manager] |
||
88 | |||
89 | if "uid_min" not in existing_properties: |
||
90 | result["uid_min"] = DEFAULT_UID_MIN |
||
91 | |||
92 | if "auid" not in existing_properties: |
||
93 | result["auid"] = existing_properties.get("uid_min", DEFAULT_UID_MIN) |
||
94 | |||
95 | return result |
||
96 | |||
97 | |||
98 | 2 | def open_and_expand(yaml_file, substitutions_dict=None): |
|
99 | """ |
||
100 | Process the file as a template, using substitutions_dict to perform |
||
101 | expansion. Then, process the expansion result as a YAML content. |
||
102 | |||
103 | See also: _open_yaml |
||
104 | """ |
||
105 | 2 | if substitutions_dict is None: |
|
106 | 2 | substitutions_dict = dict() |
|
107 | |||
108 | 2 | expanded_template = process_file(yaml_file, substitutions_dict) |
|
109 | 2 | try: |
|
110 | 2 | yaml_contents = _open_yaml(expanded_template, yaml_file, substitutions_dict) |
|
111 | except yaml.scanner.ScannerError as e: |
||
112 | print("A Jinja template expansion can mess up the indentation.") |
||
113 | print("Please, check if the contents below are correctly expanded:") |
||
114 | print("Source yaml: {}".format(yaml_file)) |
||
115 | print("Expanded yaml:\n{}".format(expanded_template)) |
||
116 | sys.exit(1) |
||
117 | |||
118 | 2 | return yaml_contents |
|
119 | |||
120 | |||
121 | 2 | def open_and_macro_expand(yaml_file, substitutions_dict=None): |
|
122 | """ |
||
123 | Do the same as open_and_expand, but load definitions of macros |
||
124 | so they can be expanded in the template. |
||
125 | """ |
||
126 | 2 | substitutions_dict = load_macros(substitutions_dict) |
|
127 | 2 | return open_and_expand(yaml_file, substitutions_dict) |
|
128 | |||
129 | |||
130 | 2 | def open_raw(yaml_file): |
|
131 | """ |
||
132 | Open given file-like object and parse it as YAML |
||
133 | without performing any kind of template processing |
||
134 | |||
135 | See also: _open_yaml |
||
136 | """ |
||
137 | 2 | with codecs.open(yaml_file, "r", "utf8") as stream: |
|
138 | 2 | yaml_contents = _open_yaml(stream, original_file=yaml_file) |
|
139 | 2 | return yaml_contents |
|
140 | |||
141 | |||
142 | 2 | def _validate_product_oval_feed_url(contents): |
|
143 | if "oval_feed_url" not in contents: |
||
144 | return |
||
145 | url = contents["oval_feed_url"] |
||
146 | if not url.startswith("https"): |
||
147 | msg = ( |
||
148 | "OVAL feed of product '{product}' is not available through an encrypted channel: {url}" |
||
149 | .format(product=contents["product"], url=url) |
||
150 | ) |
||
151 | raise ValueError(msg) |
||
152 | |||
153 | |||
154 | 2 | def open_environment(build_config_yaml, product_yaml): |
|
155 | 2 | contents = open_raw(build_config_yaml) |
|
156 | contents.update(open_raw(product_yaml)) |
||
157 | contents["product_dir"] = os.path.dirname(product_yaml) |
||
158 | _validate_product_oval_feed_url(contents) |
||
159 | platform_package_overrides = contents.get("platform_package_overrides", {}) |
||
160 | # Merge common platform package mappings, while keeping product specific mappings |
||
161 | contents["platform_package_overrides"] = merge_dicts(XCCDF_PLATFORM_TO_PACKAGE, |
||
162 | platform_package_overrides) |
||
163 | contents.update(_get_implied_properties(contents)) |
||
164 | return contents |
||
165 | |||
166 | |||
167 | 2 | def ordered_load(stream, Loader=yaml.Loader, object_pairs_hook=OrderedDict): |
|
168 | """ |
||
169 | Drop-in replacement for yaml.load(), but preserves order of dictionaries |
||
170 | """ |
||
171 | 2 | class OrderedLoader(Loader): |
|
172 | 2 | pass |
|
173 | |||
174 | 2 | def construct_mapping(loader, node): |
|
175 | 2 | loader.flatten_mapping(node) |
|
176 | 2 | return object_pairs_hook(loader.construct_pairs(node)) |
|
177 | 2 | OrderedLoader.add_constructor( |
|
178 | yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, |
||
179 | construct_mapping) |
||
180 | 2 | return yaml.load(stream, OrderedLoader) |
|
181 | |||
182 | |||
183 | 2 | def ordered_dump(data, stream=None, Dumper=yaml.Dumper, **kwds): |
|
184 | """ |
||
185 | Drop-in replacement for yaml.dump(), but preserves order of dictionaries |
||
186 | """ |
||
187 | 2 | class OrderedDumper(Dumper): |
|
188 | # fix tag indentations |
||
189 | 2 | def increase_indent(self, flow=False, indentless=False): |
|
190 | 2 | return super(OrderedDumper, self).increase_indent(flow, False) |
|
191 | |||
192 | 2 | def _dict_representer(dumper, data): |
|
193 | 2 | return dumper.represent_mapping( |
|
194 | yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, |
||
195 | data.items()) |
||
196 | |||
197 | 2 | def _str_representer(dumper, data): |
|
198 | 2 | if '\n' in data: |
|
199 | return dumper.represent_scalar(u'tag:yaml.org,2002:str', data, |
||
200 | style='|') |
||
201 | else: |
||
202 | 2 | return dumper.represent_str(data) |
|
203 | |||
204 | 2 | OrderedDumper.add_representer(OrderedDict, _dict_representer) |
|
205 | 2 | OrderedDumper.add_representer(str, _str_representer) |
|
206 | |||
207 | # Fix formatting by adding a space in between tasks |
||
208 | 2 | unformatted_yaml = yaml.dump(data, None, OrderedDumper, **kwds) |
|
209 | 2 | formatted_yaml = re.sub(r"[\n]+([\s]*)- name", r"\n\n\1- name", unformatted_yaml) |
|
210 | |||
211 | 2 | if stream is not None: |
|
212 | 2 | return stream.write(formatted_yaml) |
|
213 | else: |
||
214 | 2 | return formatted_yaml |
|
215 | |||
216 | |||
217 | 2 | def _strings_to_list(one_or_more_strings): |
|
218 | """ |
||
219 | Output a list, that either contains one string, or a list of strings. |
||
220 | In Python, strings can be cast to lists without error, but with unexpected result. |
||
221 | """ |
||
222 | 2 | if isinstance(one_or_more_strings, str): |
|
223 | 2 | return [one_or_more_strings] |
|
224 | else: |
||
225 | 2 | return list(one_or_more_strings) |
|
226 | |||
227 | |||
228 | 2 | View Code Duplication | def update_yaml_list_or_string(current_contents, new_contents, prepend=False): |
|
|||
229 | 2 | result = [] |
|
230 | 2 | if current_contents: |
|
231 | 2 | result += _strings_to_list(current_contents) |
|
232 | 2 | if new_contents: |
|
233 | 2 | if prepend: |
|
234 | 2 | result = _strings_to_list(new_contents) + result |
|
235 | else: |
||
236 | 2 | result += _strings_to_list(new_contents) |
|
237 | 2 | if not result: |
|
238 | 2 | result = "" |
|
239 | 2 | if len(result) == 1: |
|
240 | 2 | result = result[0] |
|
241 | return result |
||
242 |