Completed
Pull Request — master (#6876)
by Alexander
104:47 queued 102:42
created

ssg.templates.Builder.process_product_vars()   A

Complexity

Conditions 4

Size

Total Lines 14
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 0
Metric Value
cc 4
eloc 8
nop 2
dl 0
loc 14
ccs 0
cts 8
cp 0
crap 20
rs 10
c 0
b 0
f 0
1
from __future__ import absolute_import
2
from __future__ import print_function
3
4
import os
5
import sys
6
import imp
7
import glob
8
9
import ssg.build_yaml
10
import ssg.utils
11
import ssg.yaml
12
13
try:
14
    from urllib.parse import quote
15
except ImportError:
16
    from urllib import quote
17
18
languages = ["anaconda", "ansible", "bash", "oval", "puppet", "ignition", "kubernetes"]
19
preprocessing_file_name = "template.py"
20
lang_to_ext_map = {
21
    "anaconda": ".anaconda",
22
    "ansible": ".yml",
23
    "bash": ".sh",
24
    "oval": ".xml",
25
    "puppet": ".pp",
26
    "ignition": ".yml",
27
    "kubernetes": ".yml"
28
}
29
30
31
templates = dict()
32
33
34
class Template():
35
    def __init__(self, template_root_directory, name):
36
        self.template_root_directory = template_root_directory
37
        self.name = name
38
        self.template_path = os.path.join(self.template_root_directory, self.name)
39
        self.template_yaml_path = os.path.join(self.template_path, "template.yml")
40
        self.preprocessing_file_path = os.path.join(self.template_path, preprocessing_file_name)
41
42
    def load(self):
43
        if not os.path.exists(self.preprocessing_file_path):
44
            self.preprocessing_file_path = None
45
        self.langs = []
46
        template_yaml = ssg.yaml.open_raw(self.template_yaml_path)
47
        for lang in template_yaml["supported_languages"]:
48
            if lang not in languages:
49
                raise ValueError("The template {0} declares to support the {1} language,"
50
                "but this language is not supported by the content.".format(self.name, lang))
51
            langfilename = lang + ".template"
52
            if not os.path.exists(os.path.join(self.template_path, langfilename)):
53
                raise ValueError("The template {0} declares to support the {1} language,"
54
                "but the implementation file is missing.".format(self.name, lang))
55
            self.langs.append(lang)
56
57
    def preprocess(self, parameters, lang):
58
        # if no template.py file exists, skip this preprocessing part
59
        if self.preprocessing_file_path is not None:
60
            unique_dummy_module_name = "template_" + self.name
61
            preprocess_mod = imp.load_source(
62
                unique_dummy_module_name, self.preprocessing_file_path)
63
            if not hasattr(preprocess_mod, "preprocess"):
64
                msg = (
65
                    "The '{name}' template's preprocessing file {preprocessing_file} "
66
                    "doesn't define the 'preprocess' function, which is probably an omission."
67
                    .format(name=self.name, preprocessing_file=self.preprocessing_file_path)
68
                )
69
                raise ValueError(msg)
70
            parameters = preprocess_mod.preprocess(parameters.copy(), lang)
71
        # TODO: Remove this right after the variables in templates are renamed
72
        # to lowercase
73
        uppercases = dict()
74
        for k, v in parameters.items():
75
            uppercases[k.upper()] = v
76
        return uppercases
77
78
    def looks_like_template(self):
79
        if not os.path.isdir(self.template_root_directory):
80
            return False
81
        if os.path.islink(self.template_root_directory):
82
            return False
83
        template_sources = sorted(glob.glob(os.path.join(self.template_path, "*.template")))
84
        if not os.path.isfile(self.template_yaml_path) and not template_sources:
85
            return False
86
        return True
87
88
89
class Builder(object):
90
    """
91
    Class for building all templated content for a given product.
92
93
    To generate content from templates, pass the env_yaml, path to the
94
    directory with resolved rule YAMLs, path to the directory that contains
95
    templates, path to the output directory for checks and a path to the
96
    output directory for remediations into the constructor. Then, call the
97
    method build() to perform a build.
98
    """
99
    def __init__(
100
            self, env_yaml, resolved_rules_dir, templates_dir,
101
            remediations_dir, checks_dir):
102
        self.env_yaml = env_yaml
103
        self.resolved_rules_dir = resolved_rules_dir
104
        self.templates_dir = templates_dir
105
        self.remediations_dir = remediations_dir
106
        self.checks_dir = checks_dir
107
        self.output_dirs = dict()
108
        for lang in languages:
109
            if lang == "oval":
110
                # OVAL checks need to be put to a different directory because
111
                # they are processed differently than remediations later in the
112
                # build process
113
                output_dir = self.checks_dir
114
            else:
115
                output_dir = self.remediations_dir
116
            dir_ = os.path.join(output_dir, lang)
117
            self.output_dirs[lang] = dir_
118
        # scan directory structure and dynamically create list of templates
119
        for item in sorted(os.listdir(self.templates_dir)):
120
            itempath = os.path.join(self.templates_dir, item)
121
            maybe_template = Template(templates_dir, item)
122
            if maybe_template.looks_like_template():
123
                maybe_template.load()
124
                templates[item] = maybe_template
125
126
127
    def build_lang(
128
            self, rule_id, template_name, template_vars, lang, local_env_yaml):
129
        """
130
        Builds templated content for a given rule for a given language.
131
        Writes the output to the correct build directories.
132
        """
133
        if lang not in templates[template_name].langs:
134
            return
135
        template_file_name = lang + ".template"
136
        template_file_path = os.path.join(self.templates_dir, template_name, template_file_name)
137
        ext = lang_to_ext_map[lang]
138
        output_file_name = rule_id + ext
139
        output_filepath = os.path.join(
140
            self.output_dirs[lang], output_file_name)
141
        template_parameters = templates[template_name].preprocess(template_vars, lang)
142
        jinja_dict = ssg.utils.merge_dicts(local_env_yaml, template_parameters)
143
        filled_template = ssg.jinja.process_file_with_macros(
144
            template_file_path, jinja_dict)
145
        with open(output_filepath, "w") as f:
146
            f.write(filled_template)
147
148
    def get_langs_to_generate(self, rule):
149
        """
150
        For a given rule returns list of languages that should be generated
151
        from templates. This is controlled by "template_backends" in rule.yml.
152
        """
153
        if "backends" in rule.template:
154
            backends = rule.template["backends"]
155
            for lang in backends:
156
                if lang not in languages:
157
                    raise RuntimeError(
158
                        "Rule {0} wants to generate unknown language '{1}"
159
                        "from a template.".format(rule.id_, lang)
160
                    )
161
            langs_to_generate = []
162
            for lang in languages:
163
                backend = backends.get(lang, "on")
164
                if backend == "on":
165
                    langs_to_generate.append(lang)
166
            return langs_to_generate
167
        else:
168
            return languages
169
170
    def process_product_vars(self, all_variables):
171
        """
172
        Given a dictionary with the format key[@<product>]=value, filter out
173
        and only take keys that apply to this product (unqualified or qualified
174
        to exactly this product). Returns a new dict.
175
        """
176
        processed = dict(filter(lambda item: '@' not in item[0], all_variables.items()))
177
        suffix = '@' + self.env_yaml['product']
178
        for variable in filter(lambda key: key.endswith(suffix), all_variables):
179
            new_variable = variable[:-len(suffix)]
180
            value = all_variables[variable]
181
            processed[new_variable] = value
182
183
        return processed
184
185
    def build_rule(self, rule_id, rule_title, template, langs_to_generate):
186
        """
187
        Builds templated content for a given rule for selected languages,
188
        writing the output to the correct build directories.
189
        """
190
        try:
191
            template_name = template["name"]
192
        except KeyError:
193
            raise ValueError(
194
                "Rule {0} is missing template name under template key".format(
195
                    rule_id))
196
        if template_name not in templates.keys():
197
            raise ValueError(
198
                "Rule {0} uses template {1} which does not exist.".format(
199
                    rule_id, template_name))
200
        try:
201
            template_vars = self.process_product_vars(template["vars"])
202
        except KeyError:
203
            raise ValueError(
204
                "Rule {0} does not contain mandatory 'vars:' key under "
205
                "'template:' key.".format(rule_id))
206
        # Add the rule ID which will be reused in OVAL templates as OVAL
207
        # definition ID so that the build system matches the generated
208
        # check with the rule.
209
        template_vars["_rule_id"] = rule_id
210
        # checks and remediations are processed with a custom YAML dict
211
        local_env_yaml = self.env_yaml.copy()
212
        local_env_yaml["rule_id"] = rule_id
213
        local_env_yaml["rule_title"] = rule_title
214
        local_env_yaml["products"] = self.env_yaml["product"]
215
        for lang in langs_to_generate:
216
            self.build_lang(
217
                rule_id, template_name, template_vars, lang, local_env_yaml)
218
219
    def build_extra_ovals(self):
220
        declaration_path = os.path.join(self.templates_dir, "extra_ovals.yml")
221
        declaration = ssg.yaml.open_raw(declaration_path)
222
        for oval_def_id, template in declaration.items():
223
            langs_to_generate = ["oval"]
224
            # Since OVAL definition ID in shorthand format is always the same
225
            # as rule ID, we can use it instead of the rule ID even if no rule
226
            # with that ID exists
227
            self.build_rule(
228
                oval_def_id, oval_def_id, template, langs_to_generate)
229
230
    def build_all_rules(self):
231
        for rule_file in sorted(os.listdir(self.resolved_rules_dir)):
232
            rule_path = os.path.join(self.resolved_rules_dir, rule_file)
233
            try:
234
                rule = ssg.build_yaml.Rule.from_yaml(rule_path, self.env_yaml)
235
            except ssg.build_yaml.DocumentationNotComplete:
236
                # Happens on non-debug build when a rule is "documentation-incomplete"
237
                continue
238
            if rule.template is None:
239
                # rule is not templated, skipping
240
                continue
241
            langs_to_generate = self.get_langs_to_generate(rule)
242
            self.build_rule(
243
                rule.id_, rule.title, rule.template, langs_to_generate)
244
245
    def build(self):
246
        """
247
        Builds all templated content for all languages, writing
248
        the output to the correct build directories.
249
        """
250
251
        for dir_ in self.output_dirs.values():
252
            if not os.path.exists(dir_):
253
                os.makedirs(dir_)
254
255
        self.build_extra_ovals()
256
        self.build_all_rules()
257