Passed
Push — master ( dbd4af...e9c47c )
by Jan
02:33 queued 11s
created

ssg.templates.Builder.build_all_rules()   A

Complexity

Conditions 4

Size

Total Lines 14
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 0
Metric Value
cc 4
eloc 12
nop 1
dl 0
loc 14
ccs 0
cts 11
cp 0
crap 20
rs 9.8
c 0
b 0
f 0
1
from __future__ import absolute_import
2
from __future__ import print_function
3
4
import os
5
import sys
6
import imp
7
8
import ssg.build_yaml
9
import ssg.utils
10
import ssg.yaml
11
12
try:
13
    from urllib.parse import quote
14
except ImportError:
15
    from urllib import quote
16
17
languages = ["anaconda", "ansible", "bash", "oval", "puppet", "ignition", "kubernetes"]
18
preprocessing_file_name = "template.py"
19
lang_to_ext_map = {
20
    "anaconda": ".anaconda",
21
    "ansible": ".yml",
22
    "bash": ".sh",
23
    "oval": ".xml",
24
    "puppet": ".pp",
25
    "ignition": ".yml",
26
    "kubernetes": ".yml"
27
}
28
29
30
templates = dict()
31
32
33
class Template():
34
    def __init__(self, template_root_directory, name):
35
        self.template_root_directory = template_root_directory
36
        self.name = name
37
        self.template_path = os.path.join(self.template_root_directory, self.name)
38
        self.template_yaml_path = os.path.join(self.template_path, "template.yml")
39
        self.preprocessing_file_path = os.path.join(self.template_path, preprocessing_file_name)
40
        if not os.path.exists(self.preprocessing_file_path):
41
            self.preprocessing_file_path = None
42
        self.langs = []
43
        template_yaml = ssg.yaml.open_raw(self.template_yaml_path)
44
        for lang in template_yaml["supported_languages"]:
45
            if lang not in languages:
46
                raise ValueError("The template {0} declares to support the {1} language,"
47
                "but this language is not supported by the content.".format(self.name, lang))
48
            langfilename = lang + ".template"
49
            if not os.path.exists(os.path.join(self.template_path, langfilename)):
50
                raise ValueError("The template {0} declares to support the {1} language,"
51
                "but the implementation file is missing.".format(self.name, lang))
52
            self.langs.append(lang)
53
54
    def preprocess(self, parameters, lang):
55
        # if no template.py file exists, skip this preprocessing part
56
        if self.preprocessing_file_path is not None:
57
            module_name = "tmpmod" # dummy module name, we don't need it later
58
            preprocess_mod = imp.load_source(module_name, self.preprocessing_file_path)
59
            parameters = preprocess_mod.preprocess(parameters.copy(), lang)
60
        # TODO: Remove this right after the variables in templates are renamed
61
        # to lowercase
62
        uppercases = dict()
63
        for k, v in parameters.items():
64
            uppercases[k.upper()] = v
65
        return uppercases
66
67
68
class Builder(object):
69
    """
70
    Class for building all templated content for a given product.
71
72
    To generate content from templates, pass the env_yaml, path to the
73
    directory with resolved rule YAMLs, path to the directory that contains
74
    templates, path to the output directory for checks and a path to the
75
    output directory for remediations into the constructor. Then, call the
76
    method build() to perform a build.
77
    """
78
    def __init__(
79
            self, env_yaml, resolved_rules_dir, templates_dir,
80
            remediations_dir, checks_dir):
81
        self.env_yaml = env_yaml
82
        self.resolved_rules_dir = resolved_rules_dir
83
        self.templates_dir = templates_dir
84
        self.remediations_dir = remediations_dir
85
        self.checks_dir = checks_dir
86
        self.output_dirs = dict()
87
        for lang in languages:
88
            if lang == "oval":
89
                # OVAL checks need to be put to a different directory because
90
                # they are processed differently than remediations later in the
91
                # build process
92
                output_dir = self.checks_dir
93
            else:
94
                output_dir = self.remediations_dir
95
            dir_ = os.path.join(output_dir, lang)
96
            self.output_dirs[lang] = dir_
97
        # scan directory structure and dynamically create list of templates
98
        for item in os.listdir(self.templates_dir):
99
            itempath = os.path.join(self.templates_dir, item)
100
            if os.path.isdir(itempath) and not os.path.islink(itempath):
101
                templates[item] = Template(templates_dir, item)
102
103
104
    def build_lang(
105
            self, rule_id, template_name, template_vars, lang, local_env_yaml):
106
        """
107
        Builds templated content for a given rule for a given language.
108
        Writes the output to the correct build directories.
109
        """
110
        if lang not in templates[template_name].langs:
111
            return
112
        template_file_name = lang + ".template"
113
        template_file_path = os.path.join(self.templates_dir, template_name, template_file_name)
114
        ext = lang_to_ext_map[lang]
115
        output_file_name = rule_id + ext
116
        output_filepath = os.path.join(
117
            self.output_dirs[lang], output_file_name)
118
        template_parameters = templates[template_name].preprocess(template_vars, lang)
119
        jinja_dict = ssg.utils.merge_dicts(local_env_yaml, template_parameters)
120
        filled_template = ssg.jinja.process_file_with_macros(
121
            template_file_path, jinja_dict)
122
        with open(output_filepath, "w") as f:
123
            f.write(filled_template)
124
125
    def get_langs_to_generate(self, rule):
126
        """
127
        For a given rule returns list of languages that should be generated
128
        from templates. This is controlled by "template_backends" in rule.yml.
129
        """
130
        if "backends" in rule.template:
131
            backends = rule.template["backends"]
132
            for lang in backends:
133
                if lang not in languages:
134
                    raise RuntimeError(
135
                        "Rule {0} wants to generate unknown language '{1}"
136
                        "from a template.".format(rule.id_, lang)
137
                    )
138
            langs_to_generate = []
139
            for lang in languages:
140
                backend = backends.get(lang, "on")
141
                if backend == "on":
142
                    langs_to_generate.append(lang)
143
            return langs_to_generate
144
        else:
145
            return languages
146
147
    def build_rule(self, rule_id, rule_title, template, langs_to_generate):
148
        """
149
        Builds templated content for a given rule for selected languages,
150
        writing the output to the correct build directories.
151
        """
152
        try:
153
            template_name = template["name"]
154
        except KeyError:
155
            raise ValueError(
156
                "Rule {0} is missing template name under template key".format(
157
                    rule_id))
158
        if template_name not in templates.keys():
159
            raise ValueError(
160
                "Rule {0} uses template {1} which does not exist.".format(
161
                    rule_id, template_name))
162
        try:
163
            template_vars = template["vars"]
164
        except KeyError:
165
            raise ValueError(
166
                "Rule {0} does not contain mandatory 'vars:' key under "
167
                "'template:' key.".format(rule_id))
168
        # Add the rule ID which will be reused in OVAL templates as OVAL
169
        # definition ID so that the build system matches the generated
170
        # check with the rule.
171
        template_vars["_rule_id"] = rule_id
172
        # checks and remediations are processed with a custom YAML dict
173
        local_env_yaml = self.env_yaml.copy()
174
        local_env_yaml["rule_id"] = rule_id
175
        local_env_yaml["rule_title"] = rule_title
176
        local_env_yaml["products"] = self.env_yaml["product"]
177
        for lang in langs_to_generate:
178
            self.build_lang(
179
                rule_id, template_name, template_vars, lang, local_env_yaml)
180
181
    def build_extra_ovals(self):
182
        declaration_path = os.path.join(self.templates_dir, "extra_ovals.yml")
183
        declaration = ssg.yaml.open_raw(declaration_path)
184
        for oval_def_id, template in declaration.items():
185
            langs_to_generate = ["oval"]
186
            # Since OVAL definition ID in shorthand format is always the same
187
            # as rule ID, we can use it instead of the rule ID even if no rule
188
            # with that ID exists
189
            self.build_rule(
190
                oval_def_id, oval_def_id, template, langs_to_generate)
191
192
    def build_all_rules(self):
193
        for rule_file in os.listdir(self.resolved_rules_dir):
194
            rule_path = os.path.join(self.resolved_rules_dir, rule_file)
195
            try:
196
                rule = ssg.build_yaml.Rule.from_yaml(rule_path, self.env_yaml)
197
            except ssg.build_yaml.DocumentationNotComplete:
198
                # Happens on non-debug build when a rule is "documentation-incomplete"
199
                continue
200
            if rule.template is None:
201
                # rule is not templated, skipping
202
                continue
203
            langs_to_generate = self.get_langs_to_generate(rule)
204
            self.build_rule(
205
                rule.id_, rule.title, rule.template, langs_to_generate)
206
207
    def build(self):
208
        """
209
        Builds all templated content for all languages, writing
210
        the output to the correct build directories.
211
        """
212
213
        for dir_ in self.output_dirs.values():
214
            if not os.path.exists(dir_):
215
                os.makedirs(dir_)
216
217
        self.build_extra_ovals()
218
        self.build_all_rules()
219