Test Failed
Push — master ( 9b9e3a...2d11b6 )
by Jan
02:26 queued 13s
created

ssg.templates.Builder.build_rule()   B

Complexity

Conditions 5

Size

Total Lines 32
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 26.2588

Importance

Changes 0
Metric Value
cc 5
eloc 23
nop 7
dl 0
loc 32
ccs 1
cts 19
cp 0.0526
crap 26.2588
rs 8.8613
c 0
b 0
f 0
1 1
from __future__ import absolute_import
2 1
from __future__ import print_function
3
4 1
import os
5 1
import sys
6 1
import imp
7 1
import glob
8
9 1
import ssg.build_yaml
10 1
import ssg.utils
11 1
import ssg.yaml
12
13 1
try:
14 1
    from urllib.parse import quote
15
except ImportError:
16
    from urllib import quote
17
18 1
languages = ["anaconda", "ansible", "bash", "oval", "puppet", "ignition",
19
             "kubernetes", "blueprint", "sce-bash"]
20 1
preprocessing_file_name = "template.py"
21 1
lang_to_ext_map = {
22
    "anaconda": ".anaconda",
23
    "ansible": ".yml",
24
    "bash": ".sh",
25
    "oval": ".xml",
26
    "puppet": ".pp",
27
    "ignition": ".yml",
28
    "kubernetes": ".yml",
29
    "blueprint": ".toml",
30
    "sce-bash": ".sh",
31
}
32
33
34 1
templates = dict()
35
36
37 1
class Template():
38 1
    def __init__(self, template_root_directory, name):
39
        self.template_root_directory = template_root_directory
40
        self.name = name
41
        self.template_path = os.path.join(self.template_root_directory, self.name)
42
        self.template_yaml_path = os.path.join(self.template_path, "template.yml")
43
        self.preprocessing_file_path = os.path.join(self.template_path, preprocessing_file_name)
44
45 1
    def load(self):
46
        if not os.path.exists(self.preprocessing_file_path):
47
            self.preprocessing_file_path = None
48
        self.langs = []
49
        template_yaml = ssg.yaml.open_raw(self.template_yaml_path)
50
        for lang in template_yaml["supported_languages"]:
51
            if lang not in languages:
52
                raise ValueError("The template {0} declares to support the {1} language,"
53
                "but this language is not supported by the content.".format(self.name, lang))
54
            langfilename = lang + ".template"
55
            if not os.path.exists(os.path.join(self.template_path, langfilename)):
56
                raise ValueError("The template {0} declares to support the {1} language,"
57
                "but the implementation file is missing.".format(self.name, lang))
58
            self.langs.append(lang)
59
60 1
    def preprocess(self, parameters, lang):
61
        # if no template.py file exists, skip this preprocessing part
62
        if self.preprocessing_file_path is not None:
63
            unique_dummy_module_name = "template_" + self.name
64
            preprocess_mod = imp.load_source(
65
                unique_dummy_module_name, self.preprocessing_file_path)
66
            if not hasattr(preprocess_mod, "preprocess"):
67
                msg = (
68
                    "The '{name}' template's preprocessing file {preprocessing_file} "
69
                    "doesn't define the 'preprocess' function, which is probably an omission."
70
                    .format(name=self.name, preprocessing_file=self.preprocessing_file_path)
71
                )
72
                raise ValueError(msg)
73
            parameters = preprocess_mod.preprocess(parameters.copy(), lang)
74
        # TODO: Remove this right after the variables in templates are renamed
75
        # to lowercase
76
        uppercases = dict()
77
        for k, v in parameters.items():
78
            uppercases[k.upper()] = v
79
        return uppercases
80
81 1
    def looks_like_template(self):
82
        if not os.path.isdir(self.template_root_directory):
83
            return False
84
        if os.path.islink(self.template_root_directory):
85
            return False
86
        template_sources = sorted(glob.glob(os.path.join(self.template_path, "*.template")))
87
        if not os.path.isfile(self.template_yaml_path) and not template_sources:
88
            return False
89
        return True
90
91
92 1
class Builder(object):
93
    """
94
    Class for building all templated content for a given product.
95
96
    To generate content from templates, pass the env_yaml, path to the
97
    directory with resolved rule YAMLs, path to the directory that contains
98
    templates, path to the output directory for checks and a path to the
99
    output directory for remediations into the constructor. Then, call the
100
    method build() to perform a build.
101
    """
102 1
    def __init__(
103
            self, env_yaml, resolved_rules_dir, templates_dir,
104
            remediations_dir, checks_dir):
105
        self.env_yaml = env_yaml
106
        self.resolved_rules_dir = resolved_rules_dir
107
        self.templates_dir = templates_dir
108
        self.remediations_dir = remediations_dir
109
        self.checks_dir = checks_dir
110
        self.output_dirs = dict()
111
        for lang in languages:
112
            lang_dir = lang
113
            if lang == "oval" or lang.startswith("sce-"):
114
                # OVAL and SCE checks need to be put to a different directory
115
                # because they are processed differently than remediations
116
                # later in the build process
117
                output_dir = self.checks_dir
118
                if lang.startswith("sce-"):
119
                    lang_dir = "sce"
120
            else:
121
                output_dir = self.remediations_dir
122
            dir_ = os.path.join(output_dir, lang_dir)
123
            self.output_dirs[lang] = dir_
124
        # scan directory structure and dynamically create list of templates
125
        for item in sorted(os.listdir(self.templates_dir)):
126
            itempath = os.path.join(self.templates_dir, item)
127
            maybe_template = Template(templates_dir, item)
128
            if maybe_template.looks_like_template():
129
                maybe_template.load()
130
                templates[item] = maybe_template
131
132 1
    def build_lang_file(
133
            self, rule_id, template_name, template_vars, lang, local_env_yaml):
134
        """
135
        Builds and returns templated content for a given rule for a given
136
        language; does not write the output to disk.
137
        """
138
        if lang not in templates[template_name].langs:
139
            return None
140
141
        template_file_name = lang + ".template"
142
        template_file_path = os.path.join(self.templates_dir, template_name, template_file_name)
143
        template_parameters = templates[template_name].preprocess(template_vars, lang)
144
        jinja_dict = ssg.utils.merge_dicts(local_env_yaml, template_parameters)
145
        filled_template = ssg.jinja.process_file_with_macros(
146
            template_file_path, jinja_dict)
147
148
        return filled_template
149
150 1
    def get_all_tests(
151
            self, rule_id, rule_template, local_env_yaml, platforms=None):
152
        """
153
        Builds a dictionary of a test case path -> test case value mapping.
154
155
        Here, we want to know what the relative path on disk (under the tests/
156
        subdirectory) is (such as "installed.pass.sh"), along with the actual
157
        contents of the test case.
158
159
        Presumably, we'll find the test case we want (all of them when
160
        building a test case tarball) and write them to disk in the
161
        appropriate location.
162
        """
163
        template_name = rule_template['name']
164
        template_vars = rule_template['vars']
165
166
        base_dir = os.path.abspath(os.path.join(self.templates_dir, template_name, "tests"))
167
        results = dict()
168
169
        # If no test cases exist, return an empty dictionary.
170
        if not os.path.exists(base_dir):
171
            return results
172
173
        # Walk files; note that we don't need to do anything about directories
174
        # as only files are recorded in the mapping; directories can be
175
        # inferred from the path.
176
        for dirpath, _, filenames in os.walk(base_dir):
177
            if not filenames:
178
                continue
179
180
            for filename in filenames:
181
                # Relative path to the file becomes our results key.
182
                absolute_path = os.path.abspath(os.path.join(dirpath, filename))
183
                relative_path = os.path.relpath(absolute_path, base_dir)
184
185
                # Load template parameters and apply it to the test case.
186
                template_parameters = templates[template_name].preprocess(template_vars, "tests")
187
                jinja_dict = ssg.utils.merge_dicts(local_env_yaml, template_parameters)
188
                filled_template = ssg.jinja.process_file_with_macros(
189
                    absolute_path, jinja_dict)
190
191
                # Save the results under the relative path.
192
                results[relative_path] = filled_template
193
194
        return results
195
196 1
    def build_lang(
197
            self, rule_id, template_name, template_vars, lang, local_env_yaml, platforms=None):
198
        """
199
        Builds templated content for a given rule for a given language.
200
        Writes the output to the correct build directories.
201
        """
202
        if lang not in templates[template_name].langs or lang.startswith("sce-"):
203
            return
204
205
        filled_template = self.build_lang_file(rule_id, template_name,
206
                                               template_vars, lang,
207
                                               local_env_yaml)
208
209
        ext = lang_to_ext_map[lang]
210
        output_file_name = rule_id + ext
211
        output_filepath = os.path.join(
212
            self.output_dirs[lang], output_file_name)
213
214
        with open(output_filepath, "w") as f:
215
            f.write(filled_template)
216
217 1
    def get_langs_to_generate(self, rule):
218
        """
219
        For a given rule returns list of languages that should be generated
220
        from templates. This is controlled by "template_backends" in rule.yml.
221
        """
222
        if "backends" in rule.template:
223
            backends = rule.template["backends"]
224
            for lang in backends:
225
                if lang not in languages:
226
                    raise RuntimeError(
227
                        "Rule {0} wants to generate unknown language '{1}"
228
                        "from a template.".format(rule.id_, lang)
229
                    )
230
            langs_to_generate = []
231
            for lang in languages:
232
                backend = backends.get(lang, "on")
233
                if backend == "on":
234
                    langs_to_generate.append(lang)
235
            return langs_to_generate
236
        else:
237
            return languages
238
239 1
    def get_template_name(self, template):
240
        """
241
        Given a template dictionary from a Rule instance, determine the name
242
        of the template (from templates) this rule uses.
243
        """
244
        try:
245
            template_name = template["name"]
246
        except KeyError:
247
            raise ValueError(
248
                "Rule {0} is missing template name under template key".format(
249
                    rule_id))
250
        if template_name not in templates.keys():
251
            raise ValueError(
252
                "Rule {0} uses template {1} which does not exist.".format(
253
                    rule_id, template_name))
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable rule_id does not seem to be defined.
Loading history...
254
        return template_name
255
256 1
    def get_resolved_langs_to_generate(self, rule):
257
        """
258
        Given a specific Rule instance, determine which languages are
259
        generated by the combination of the rule's template_backends AND
260
        the rule's template keys.
261
        """
262
        if rule.template is None:
263
            return None
264
265
        rule_langs = set(self.get_langs_to_generate(rule))
266
        template_name = self.get_template_name(rule.template)
267
        template_langs = set(templates[template_name].langs)
268
        return rule_langs.intersection(template_langs)
269
270 1
    def process_product_vars(self, all_variables):
271
        """
272
        Given a dictionary with the format key[@<product>]=value, filter out
273
        and only take keys that apply to this product (unqualified or qualified
274
        to exactly this product). Returns a new dict.
275
        """
276
        processed = dict(filter(lambda item: '@' not in item[0], all_variables.items()))
277
        suffix = '@' + self.env_yaml['product']
278
        for variable in filter(lambda key: key.endswith(suffix), all_variables):
279
            new_variable = variable[:-len(suffix)]
280
            value = all_variables[variable]
281
            processed[new_variable] = value
282
283
        return processed
284
285 1
    def build_rule(self, rule_id, rule_title, template, langs_to_generate, identifiers,
286
                   platforms=None):
287
        """
288
        Builds templated content for a given rule for selected languages,
289
        writing the output to the correct build directories.
290
        """
291
        template_name = self.get_template_name(template)
292
        try:
293
            template_vars = self.process_product_vars(template["vars"])
294
        except KeyError:
295
            raise ValueError(
296
                "Rule {0} does not contain mandatory 'vars:' key under "
297
                "'template:' key.".format(rule_id))
298
        # Add the rule ID which will be reused in OVAL templates as OVAL
299
        # definition ID so that the build system matches the generated
300
        # check with the rule.
301
        template_vars["_rule_id"] = rule_id
302
        # checks and remediations are processed with a custom YAML dict
303
        local_env_yaml = self.env_yaml.copy()
304
        local_env_yaml["rule_id"] = rule_id
305
        local_env_yaml["rule_title"] = rule_title
306
        local_env_yaml["products"] = self.env_yaml["product"]
307
        if identifiers is not None:
308
            local_env_yaml["cce_identifiers"] = identifiers
309
310
        for lang in langs_to_generate:
311
            try:
312
                self.build_lang(
313
                    rule_id, template_name, template_vars, lang, local_env_yaml, platforms)
314
            except Exception as e:
315
                print("Error building templated {0} content for rule {1}".format(lang, rule_id), file=sys.stderr)
316
                raise e
317
318 1
    def get_lang_for_rule(self, rule_id, rule_title, template, language):
319
        """
320
        For the specified rule, build and return only the specified language
321
        content.
322
        """
323
        template_name = self.get_template_name(template)
324
        try:
325
            template_vars = self.process_product_vars(template["vars"])
326
        except KeyError:
327
            raise ValueError(
328
                "Rule {0} does not contain mandatory 'vars:' key under "
329
                "'template:' key.".format(rule_id))
330
        # Add the rule ID which will be reused in OVAL templates as OVAL
331
        # definition ID so that the build system matches the generated
332
        # check with the rule.
333
        template_vars["_rule_id"] = rule_id
334
        # checks and remediations are processed with a custom YAML dict
335
        local_env_yaml = self.env_yaml.copy()
336
        local_env_yaml["rule_id"] = rule_id
337
        local_env_yaml["rule_title"] = rule_title
338
        local_env_yaml["products"] = self.env_yaml["product"]
339
340
        return self.build_lang_file(rule_id, template_name, template_vars,
341
                                    language, local_env_yaml)
342
343 1
    def build_extra_ovals(self):
344
        declaration_path = os.path.join(self.templates_dir, "extra_ovals.yml")
345
        declaration = ssg.yaml.open_raw(declaration_path)
346
        for oval_def_id, template in declaration.items():
347
            langs_to_generate = ["oval"]
348
            # Since OVAL definition ID in shorthand format is always the same
349
            # as rule ID, we can use it instead of the rule ID even if no rule
350
            # with that ID exists
351
            self.build_rule(
352
                oval_def_id, oval_def_id, template, langs_to_generate, None)
353
354 1
    def build_all_rules(self):
355
        for rule_file in sorted(os.listdir(self.resolved_rules_dir)):
356
            rule_path = os.path.join(self.resolved_rules_dir, rule_file)
357
            try:
358
                rule = ssg.build_yaml.Rule.from_yaml(rule_path, self.env_yaml)
359
            except ssg.build_yaml.DocumentationNotComplete:
360
                # Happens on non-debug build when a rule is "documentation-incomplete"
361
                continue
362
            if rule.template is None:
363
                # rule is not templated, skipping
364
                continue
365
            langs_to_generate = self.get_langs_to_generate(rule)
366
            self.build_rule(rule.id_, rule.title, rule.template, langs_to_generate,
367
                            rule.identifiers, platforms=rule.platforms)
368
369 1
    def build(self):
370
        """
371
        Builds all templated content for all languages, writing
372
        the output to the correct build directories.
373
        """
374
375
        for dir_ in self.output_dirs.values():
376
            if not os.path.exists(dir_):
377
                os.makedirs(dir_)
378
379
        self.build_extra_ovals()
380
        self.build_all_rules()
381