Test Failed
Push — master ( 36ef11...81e955 )
by Jan
03:06 queued 24s
created

ssg.templates.Builder.__init__()   B

Complexity

Conditions 7

Size

Total Lines 30
Code Lines 25

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 49.8796

Importance

Changes 0
Metric Value
cc 7
eloc 25
nop 6
dl 0
loc 30
ccs 1
cts 23
cp 0.0434
crap 49.8796
rs 7.8799
c 0
b 0
f 0
1 1
from __future__ import absolute_import
2 1
from __future__ import print_function
3
4 1
import os
5 1
import sys
6 1
import imp
7 1
import glob
8
9 1
import ssg.build_yaml
10 1
import ssg.utils
11 1
import ssg.yaml
12 1
from ssg.build_cpe import ProductCPEs
13
14 1
try:
15 1
    from urllib.parse import quote
16
except ImportError:
17
    from urllib import quote
18
19 1
languages = ["anaconda", "ansible", "bash", "oval", "puppet", "ignition",
20
             "kubernetes", "blueprint", "sce-bash"]
21 1
preprocessing_file_name = "template.py"
22 1
lang_to_ext_map = {
23
    "anaconda": ".anaconda",
24
    "ansible": ".yml",
25
    "bash": ".sh",
26
    "oval": ".xml",
27
    "puppet": ".pp",
28
    "ignition": ".yml",
29
    "kubernetes": ".yml",
30
    "blueprint": ".toml",
31
    "sce-bash": ".sh",
32
}
33
34
35 1
templates = dict()
36
37
38 1
class Template():
39 1
    def __init__(self, template_root_directory, name):
40
        self.template_root_directory = template_root_directory
41
        self.name = name
42
        self.template_path = os.path.join(self.template_root_directory, self.name)
43
        self.template_yaml_path = os.path.join(self.template_path, "template.yml")
44
        self.preprocessing_file_path = os.path.join(self.template_path, preprocessing_file_name)
45
46 1
    def load(self):
47
        if not os.path.exists(self.preprocessing_file_path):
48
            self.preprocessing_file_path = None
49
        self.langs = []
50
        template_yaml = ssg.yaml.open_raw(self.template_yaml_path)
51
        for lang in template_yaml["supported_languages"]:
52
            if lang not in languages:
53
                raise ValueError("The template {0} declares to support the {1} language,"
54
                "but this language is not supported by the content.".format(self.name, lang))
55
            langfilename = lang + ".template"
56
            if not os.path.exists(os.path.join(self.template_path, langfilename)):
57
                raise ValueError("The template {0} declares to support the {1} language,"
58
                "but the implementation file is missing.".format(self.name, lang))
59
            self.langs.append(lang)
60
61 1
    def preprocess(self, parameters, lang):
62
        # if no template.py file exists, skip this preprocessing part
63
        if self.preprocessing_file_path is not None:
64
            unique_dummy_module_name = "template_" + self.name
65
            preprocess_mod = imp.load_source(
66
                unique_dummy_module_name, self.preprocessing_file_path)
67
            if not hasattr(preprocess_mod, "preprocess"):
68
                msg = (
69
                    "The '{name}' template's preprocessing file {preprocessing_file} "
70
                    "doesn't define the 'preprocess' function, which is probably an omission."
71
                    .format(name=self.name, preprocessing_file=self.preprocessing_file_path)
72
                )
73
                raise ValueError(msg)
74
            parameters = preprocess_mod.preprocess(parameters.copy(), lang)
75
        # TODO: Remove this right after the variables in templates are renamed
76
        # to lowercase
77
        uppercases = dict()
78
        for k, v in parameters.items():
79
            uppercases[k.upper()] = v
80
        return uppercases
81
82 1
    def looks_like_template(self):
83
        if not os.path.isdir(self.template_root_directory):
84
            return False
85
        if os.path.islink(self.template_root_directory):
86
            return False
87
        template_sources = sorted(glob.glob(os.path.join(self.template_path, "*.template")))
88
        if not os.path.isfile(self.template_yaml_path) and not template_sources:
89
            return False
90
        return True
91
92
93 1
class Builder(object):
94
    """
95
    Class for building all templated content for a given product.
96
97
    To generate content from templates, pass the env_yaml, path to the
98
    directory with resolved rule YAMLs, path to the directory that contains
99
    templates, path to the output directory for checks and a path to the
100
    output directory for remediations into the constructor. Then, call the
101
    method build() to perform a build.
102
    """
103 1
    def __init__(
104
            self, env_yaml, resolved_rules_dir, templates_dir,
105
            remediations_dir, checks_dir):
106
        self.env_yaml = env_yaml
107
        self.resolved_rules_dir = resolved_rules_dir
108
        self.templates_dir = templates_dir
109
        self.remediations_dir = remediations_dir
110
        self.checks_dir = checks_dir
111
        self.output_dirs = dict()
112
        for lang in languages:
113
            lang_dir = lang
114
            if lang == "oval" or lang.startswith("sce-"):
115
                # OVAL and SCE checks need to be put to a different directory
116
                # because they are processed differently than remediations
117
                # later in the build process
118
                output_dir = self.checks_dir
119
                if lang.startswith("sce-"):
120
                    lang_dir = "sce"
121
            else:
122
                output_dir = self.remediations_dir
123
            dir_ = os.path.join(output_dir, lang_dir)
124
            self.output_dirs[lang] = dir_
125
        # scan directory structure and dynamically create list of templates
126
        for item in sorted(os.listdir(self.templates_dir)):
127
            itempath = os.path.join(self.templates_dir, item)
128
            maybe_template = Template(templates_dir, item)
129
            if maybe_template.looks_like_template():
130
                maybe_template.load()
131
                templates[item] = maybe_template
132
        self.product_cpes = ProductCPEs(env_yaml)
133
134 1
    def build_lang_file(
135
            self, rule_id, template_name, template_vars, lang, local_env_yaml):
136
        """
137
        Builds and returns templated content for a given rule for a given
138
        language; does not write the output to disk.
139
        """
140
        if lang not in templates[template_name].langs:
141
            return None
142
143
        template_file_name = lang + ".template"
144
        template_file_path = os.path.join(self.templates_dir, template_name, template_file_name)
145
        template_parameters = templates[template_name].preprocess(template_vars, lang)
146
        jinja_dict = ssg.utils.merge_dicts(local_env_yaml, template_parameters)
147
        filled_template = ssg.jinja.process_file_with_macros(
148
            template_file_path, jinja_dict)
149
150
        return filled_template
151
152 1
    def get_all_tests(
153
            self, rule_id, rule_template, local_env_yaml, platforms=None):
154
        """
155
        Builds a dictionary of a test case path -> test case value mapping.
156
157
        Here, we want to know what the relative path on disk (under the tests/
158
        subdirectory) is (such as "installed.pass.sh"), along with the actual
159
        contents of the test case.
160
161
        Presumably, we'll find the test case we want (all of them when
162
        building a test case tarball) and write them to disk in the
163
        appropriate location.
164
        """
165
        template_name = rule_template['name']
166
        template_vars = rule_template['vars']
167
168
        base_dir = os.path.abspath(os.path.join(self.templates_dir, template_name, "tests"))
169
        results = dict()
170
171
        # If no test cases exist, return an empty dictionary.
172
        if not os.path.exists(base_dir):
173
            return results
174
175
        # Walk files; note that we don't need to do anything about directories
176
        # as only files are recorded in the mapping; directories can be
177
        # inferred from the path.
178
        for dirpath, _, filenames in os.walk(base_dir):
179
            if not filenames:
180
                continue
181
182
            for filename in filenames:
183
                # Relative path to the file becomes our results key.
184
                absolute_path = os.path.abspath(os.path.join(dirpath, filename))
185
                relative_path = os.path.relpath(absolute_path, base_dir)
186
187
                # Load template parameters and apply it to the test case.
188
                template_parameters = templates[template_name].preprocess(template_vars, "tests")
189
                jinja_dict = ssg.utils.merge_dicts(local_env_yaml, template_parameters)
190
                filled_template = ssg.jinja.process_file_with_macros(
191
                    absolute_path, jinja_dict)
192
193
                # Save the results under the relative path.
194
                results[relative_path] = filled_template
195
196
        return results
197
198 1
    def build_lang(
199
            self, rule_id, template_name, template_vars, lang, local_env_yaml, platforms=None):
200
        """
201
        Builds templated content for a given rule for a given language.
202
        Writes the output to the correct build directories.
203
        """
204
        if lang not in templates[template_name].langs or lang.startswith("sce-"):
205
            return
206
207
        filled_template = self.build_lang_file(rule_id, template_name,
208
                                               template_vars, lang,
209
                                               local_env_yaml)
210
211
        ext = lang_to_ext_map[lang]
212
        output_file_name = rule_id + ext
213
        output_filepath = os.path.join(
214
            self.output_dirs[lang], output_file_name)
215
216
        with open(output_filepath, "w") as f:
217
            f.write(filled_template)
218
219 1
    def get_langs_to_generate(self, rule):
220
        """
221
        For a given rule returns list of languages that should be generated
222
        from templates. This is controlled by "template_backends" in rule.yml.
223
        """
224
        if "backends" in rule.template:
225
            backends = rule.template["backends"]
226
            for lang in backends:
227
                if lang not in languages:
228
                    raise RuntimeError(
229
                        "Rule {0} wants to generate unknown language '{1}"
230
                        "from a template.".format(rule.id_, lang)
231
                    )
232
            langs_to_generate = []
233
            for lang in languages:
234
                backend = backends.get(lang, "on")
235
                if backend == "on":
236
                    langs_to_generate.append(lang)
237
            return langs_to_generate
238
        else:
239
            return languages
240
241 1
    def get_template_name(self, template):
242
        """
243
        Given a template dictionary from a Rule instance, determine the name
244
        of the template (from templates) this rule uses.
245
        """
246
        try:
247
            template_name = template["name"]
248
        except KeyError:
249
            raise ValueError(
250
                "Rule {0} is missing template name under template key".format(
251
                    rule_id))
252
        if template_name not in templates.keys():
253
            raise ValueError(
254
                "Rule {0} uses template {1} which does not exist.".format(
255
                    rule_id, template_name))
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable rule_id does not seem to be defined.
Loading history...
256
        return template_name
257
258 1
    def get_resolved_langs_to_generate(self, rule):
259
        """
260
        Given a specific Rule instance, determine which languages are
261
        generated by the combination of the rule's template_backends AND
262
        the rule's template keys.
263
        """
264
        if rule.template is None:
265
            return None
266
267
        rule_langs = set(self.get_langs_to_generate(rule))
268
        template_name = self.get_template_name(rule.template)
269
        template_langs = set(templates[template_name].langs)
270
        return rule_langs.intersection(template_langs)
271
272 1
    def process_product_vars(self, all_variables):
273
        """
274
        Given a dictionary with the format key[@<product>]=value, filter out
275
        and only take keys that apply to this product (unqualified or qualified
276
        to exactly this product). Returns a new dict.
277
        """
278
        processed = dict(filter(lambda item: '@' not in item[0], all_variables.items()))
279
        suffix = '@' + self.env_yaml['product']
280
        for variable in filter(lambda key: key.endswith(suffix), all_variables):
281
            new_variable = variable[:-len(suffix)]
282
            value = all_variables[variable]
283
            processed[new_variable] = value
284
285
        return processed
286
287 1
    def build_rule(self, rule_id, rule_title, template, langs_to_generate, identifiers,
288
                   platforms=None):
289
        """
290
        Builds templated content for a given rule for selected languages,
291
        writing the output to the correct build directories.
292
        """
293
        template_name = self.get_template_name(template)
294
        try:
295
            template_vars = self.process_product_vars(template["vars"])
296
        except KeyError:
297
            raise ValueError(
298
                "Rule {0} does not contain mandatory 'vars:' key under "
299
                "'template:' key.".format(rule_id))
300
        # Add the rule ID which will be reused in OVAL templates as OVAL
301
        # definition ID so that the build system matches the generated
302
        # check with the rule.
303
        template_vars["_rule_id"] = rule_id
304
        # checks and remediations are processed with a custom YAML dict
305
        local_env_yaml = self.env_yaml.copy()
306
        local_env_yaml["rule_id"] = rule_id
307
        local_env_yaml["rule_title"] = rule_title
308
        local_env_yaml["products"] = self.env_yaml["product"]
309
        if identifiers is not None:
310
            local_env_yaml["cce_identifiers"] = identifiers
311
312
        for lang in langs_to_generate:
313
            try:
314
                self.build_lang(
315
                    rule_id, template_name, template_vars, lang, local_env_yaml, platforms)
316
            except Exception as e:
317
                print("Error building templated {0} content for rule {1}".format(lang, rule_id), file=sys.stderr)
318
                raise e
319
320 1
    def get_lang_for_rule(self, rule_id, rule_title, template, language):
321
        """
322
        For the specified rule, build and return only the specified language
323
        content.
324
        """
325
        template_name = self.get_template_name(template)
326
        try:
327
            template_vars = self.process_product_vars(template["vars"])
328
        except KeyError:
329
            raise ValueError(
330
                "Rule {0} does not contain mandatory 'vars:' key under "
331
                "'template:' key.".format(rule_id))
332
        # Add the rule ID which will be reused in OVAL templates as OVAL
333
        # definition ID so that the build system matches the generated
334
        # check with the rule.
335
        template_vars["_rule_id"] = rule_id
336
        # checks and remediations are processed with a custom YAML dict
337
        local_env_yaml = self.env_yaml.copy()
338
        local_env_yaml["rule_id"] = rule_id
339
        local_env_yaml["rule_title"] = rule_title
340
        local_env_yaml["products"] = self.env_yaml["product"]
341
342
        return self.build_lang_file(rule_id, template_name, template_vars,
343
                                    language, local_env_yaml)
344
345 1
    def build_extra_ovals(self):
346
        declaration_path = os.path.join(self.templates_dir, "extra_ovals.yml")
347
        declaration = ssg.yaml.open_raw(declaration_path)
348
        for oval_def_id, template in declaration.items():
349
            langs_to_generate = ["oval"]
350
            # Since OVAL definition ID in shorthand format is always the same
351
            # as rule ID, we can use it instead of the rule ID even if no rule
352
            # with that ID exists
353
            self.build_rule(
354
                oval_def_id, oval_def_id, template, langs_to_generate, None)
355
356 1
    def build_all_rules(self):
357
        for rule_file in sorted(os.listdir(self.resolved_rules_dir)):
358
            rule_path = os.path.join(self.resolved_rules_dir, rule_file)
359
            try:
360
                rule = ssg.build_yaml.Rule.from_yaml(rule_path, self.env_yaml, self.product_cpes)
361
            except ssg.build_yaml.DocumentationNotComplete:
362
                # Happens on non-debug build when a rule is "documentation-incomplete"
363
                continue
364
            if rule.template is None:
365
                # rule is not templated, skipping
366
                continue
367
            langs_to_generate = self.get_langs_to_generate(rule)
368
            self.build_rule(rule.id_, rule.title, rule.template, langs_to_generate,
369
                            rule.identifiers, platforms=rule.platforms)
370
371 1
    def build(self):
372
        """
373
        Builds all templated content for all languages, writing
374
        the output to the correct build directories.
375
        """
376
377
        for dir_ in self.output_dirs.values():
378
            if not os.path.exists(dir_):
379
                os.makedirs(dir_)
380
381
        self.build_extra_ovals()
382
        self.build_all_rules()
383