Test Failed
Push — master ( d9aba9...ea04b2 )
by Milan
05:02 queued 02:43
created

ssg.yaml   A

Complexity

Total Complexity 40

Size/Duplication

Total Lines 242
Duplicated Lines 5.79 %

Test Coverage

Coverage 60.42%

Importance

Changes 0
Metric Value
eloc 151
dl 14
loc 242
ccs 84
cts 139
cp 0.6042
rs 9.2
c 0
b 0
f 0
wmc 40

13 Functions

Rating   Name   Duplication   Size   Complexity  
A open_and_macro_expand() 0 7 1
A open_environment() 0 11 1
A ordered_dump() 0 32 3
A _bool_constructor() 0 2 1
A open_and_expand() 0 21 3
A open_raw() 0 10 2
A ordered_load() 0 14 1
C _open_yaml() 0 36 9
A _save_rename() 0 2 1
A _validate_product_oval_feed_url() 0 10 3
A _strings_to_list() 0 9 2
B _get_implied_properties() 0 17 7
B update_yaml_list_or_string() 14 14 6

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like ssg.yaml often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 2
from __future__ import absolute_import
2 2
from __future__ import print_function
3
4 2
import codecs
5 2
import yaml
6 2
import os
7 2
import sys
8 2
import re
9
10 2
from collections import OrderedDict
11
12 2
from .jinja import load_macros, process_file
13 2
from .constants import (PKG_MANAGER_TO_SYSTEM,
14
                        PKG_MANAGER_TO_CONFIG_FILE,
15
                        XCCDF_PLATFORM_TO_PACKAGE)
16 2
from .constants import DEFAULT_UID_MIN
17 2
from .utils import merge_dicts
18
19 2
try:
20 2
    from yaml import CSafeLoader as yaml_SafeLoader
21
except ImportError:
22
    from yaml import SafeLoader as yaml_SafeLoader
23
24
25 2
def _bool_constructor(self, node):
26 2
    return self.construct_scalar(node)
27
28
29
# Don't follow python bool case
30 2
yaml_SafeLoader.add_constructor(u'tag:yaml.org,2002:bool', _bool_constructor)
31
32
33 2
class DocumentationNotComplete(Exception):
34 2
    pass
35
36
37 2
def _save_rename(result, stem, prefix):
38
    result["{0}_{1}".format(prefix, stem)] = stem
39
40
41 2
def _open_yaml(stream, original_file=None, substitutions_dict={}):
42
    """
43
    Open given file-like object and parse it as YAML.
44
45
    Optionally, pass the path to the original_file for better error handling
46
    when the file contents are passed.
47
48
    Return None if it contains "documentation_complete" key set to "false".
49
    """
50 2
    try:
51 2
        yaml_contents = yaml.load(stream, Loader=yaml_SafeLoader)
52
53 2
        if yaml_contents.pop("documentation_complete", "true") == "false" and \
54
                substitutions_dict.get("cmake_build_type") != "Debug":
55
            raise DocumentationNotComplete("documentation not complete and not a debug build")
56
57 2
        return yaml_contents
58
    except DocumentationNotComplete as e:
59
        raise e
60
    except Exception as e:
61
        count = 0
62
        _file = original_file
63
        if not _file:
64
            _file = stream
65
        with open(_file, "r") as e_file:
66
            lines = e_file.readlines()
67
            for line in lines:
68
                count = count + 1
69
                if re.match(r"^\s*\t+\s*", line):
70
                    print("Exception while handling file: %s" % _file, file=sys.stderr)
71
                    print("TabIndentationError: Line %s contains tabs instead of spaces:" % (count), file=sys.stderr)
72
                    print("%s\n\n" % repr(line.strip("\n")), file=sys.stderr)
73
                    sys.exit(1)
74
75
        print("Exception while handling file: %s" % _file, file=sys.stderr)
76
        raise e
77
78
79 2
def _get_implied_properties(existing_properties):
80
    result = dict()
81
    if "pkg_manager" in existing_properties:
82
        pkg_manager = existing_properties["pkg_manager"]
83
        if "pkg_system" not in existing_properties:
84
            result["pkg_system"] = PKG_MANAGER_TO_SYSTEM[pkg_manager]
85
        if "pkg_manager_config_file" not in existing_properties:
86
            if pkg_manager in PKG_MANAGER_TO_CONFIG_FILE:
87
                result["pkg_manager_config_file"] = PKG_MANAGER_TO_CONFIG_FILE[pkg_manager]
88
89
    if "uid_min" not in existing_properties:
90
        result["uid_min"] = DEFAULT_UID_MIN
91
92
    if "auid" not in existing_properties:
93
        result["auid"] = existing_properties.get("uid_min", DEFAULT_UID_MIN)
94
95
    return result
96
97
98 2
def open_and_expand(yaml_file, substitutions_dict=None):
99
    """
100
    Process the file as a template, using substitutions_dict to perform
101
    expansion. Then, process the expansion result as a YAML content.
102
103
    See also: _open_yaml
104
    """
105 2
    if substitutions_dict is None:
106 2
        substitutions_dict = dict()
107
108 2
    expanded_template = process_file(yaml_file, substitutions_dict)
109 2
    try:
110 2
        yaml_contents = _open_yaml(expanded_template, yaml_file, substitutions_dict)
111
    except yaml.scanner.ScannerError as e:
112
        print("A Jinja template expansion can mess up the indentation.")
113
        print("Please, check if the contents below are correctly expanded:")
114
        print("Source yaml: {}".format(yaml_file))
115
        print("Expanded yaml:\n{}".format(expanded_template))
116
        sys.exit(1)
117
118 2
    return yaml_contents
119
120
121 2
def open_and_macro_expand(yaml_file, substitutions_dict=None):
122
    """
123
    Do the same as open_and_expand, but load definitions of macros
124
    so they can be expanded in the template.
125
    """
126 2
    substitutions_dict = load_macros(substitutions_dict)
127 2
    return open_and_expand(yaml_file, substitutions_dict)
128
129
130 2
def open_raw(yaml_file):
131
    """
132
    Open given file-like object and parse it as YAML
133
    without performing any kind of template processing
134
135
    See also: _open_yaml
136
    """
137 2
    with codecs.open(yaml_file, "r", "utf8") as stream:
138 2
        yaml_contents = _open_yaml(stream, original_file=yaml_file)
139 2
    return yaml_contents
140
141
142 2
def _validate_product_oval_feed_url(contents):
143
    if "oval_feed_url" not in contents:
144
        return
145
    url = contents["oval_feed_url"]
146
    if not url.startswith("https"):
147
        msg = (
148
            "OVAL feed of product '{product}' is not available through an encrypted channel: {url}"
149
            .format(product=contents["product"], url=url)
150
        )
151
        raise ValueError(msg)
152
153
154 2
def open_environment(build_config_yaml, product_yaml):
155 2
    contents = open_raw(build_config_yaml)
156
    contents.update(open_raw(product_yaml))
157
    contents["product_dir"] = os.path.dirname(product_yaml)
158
    _validate_product_oval_feed_url(contents)
159
    platform_package_overrides = contents.get("platform_package_overrides", {})
160
    # Merge common platform package mappings, while keeping product specific mappings
161
    contents["platform_package_overrides"] = merge_dicts(XCCDF_PLATFORM_TO_PACKAGE,
162
                                                         platform_package_overrides)
163
    contents.update(_get_implied_properties(contents))
164
    return contents
165
166
167 2
def ordered_load(stream, Loader=yaml.Loader, object_pairs_hook=OrderedDict):
168
    """
169
    Drop-in replacement for yaml.load(), but preserves order of dictionaries
170
    """
171 2
    class OrderedLoader(Loader):
172 2
        pass
173
174 2
    def construct_mapping(loader, node):
175 2
        loader.flatten_mapping(node)
176 2
        return object_pairs_hook(loader.construct_pairs(node))
177 2
    OrderedLoader.add_constructor(
178
        yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
179
        construct_mapping)
180 2
    return yaml.load(stream, OrderedLoader)
181
182
183 2
def ordered_dump(data, stream=None, Dumper=yaml.Dumper, **kwds):
184
    """
185
    Drop-in replacement for yaml.dump(), but preserves order of dictionaries
186
    """
187 2
    class OrderedDumper(Dumper):
188
        # fix tag indentations
189 2
        def increase_indent(self, flow=False, indentless=False):
190 2
            return super(OrderedDumper, self).increase_indent(flow, False)
191
192 2
    def _dict_representer(dumper, data):
193 2
        return dumper.represent_mapping(
194
            yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
195
            data.items())
196
197 2
    def _str_representer(dumper, data):
198 2
        if '\n' in data:
199
            return dumper.represent_scalar(u'tag:yaml.org,2002:str', data,
200
                                           style='|')
201
        else:
202 2
            return dumper.represent_str(data)
203
204 2
    OrderedDumper.add_representer(OrderedDict, _dict_representer)
205 2
    OrderedDumper.add_representer(str, _str_representer)
206
207
    # Fix formatting by adding a space in between tasks
208 2
    unformatted_yaml = yaml.dump(data, None, OrderedDumper, **kwds)
209 2
    formatted_yaml = re.sub(r"[\n]+([\s]*)- name", r"\n\n\1- name", unformatted_yaml)
210
211 2
    if stream is not None:
212 2
        return stream.write(formatted_yaml)
213
    else:
214 2
        return formatted_yaml
215
216
217 2
def _strings_to_list(one_or_more_strings):
218
    """
219
    Output a list, that either contains one string, or a list of strings.
220
    In Python, strings can be cast to lists without error, but with unexpected result.
221
    """
222 2
    if isinstance(one_or_more_strings, str):
223 2
        return [one_or_more_strings]
224
    else:
225 2
        return list(one_or_more_strings)
226
227
228 2 View Code Duplication
def update_yaml_list_or_string(current_contents, new_contents, prepend=False):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
229 2
    result = []
230 2
    if current_contents:
231 2
        result += _strings_to_list(current_contents)
232 2
    if new_contents:
233 2
        if prepend:
234 2
            result = _strings_to_list(new_contents) + result
235
        else:
236 2
            result += _strings_to_list(new_contents)
237 2
    if not result:
238 2
        result = ""
239 2
    if len(result) == 1:
240 2
        result = result[0]
241
    return result
242