Test Failed
Pull Request — master (#4843)
by Matěj
03:05 queued 53s
created

ssg.templates.Builder.get_langs_to_generate()   B

Complexity

Conditions 6

Size

Total Lines 21
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 42

Importance

Changes 0
Metric Value
cc 6
eloc 15
nop 2
dl 0
loc 21
ccs 0
cts 13
cp 0
crap 42
rs 8.6666
c 0
b 0
f 0
1
from __future__ import print_function
2
3
import os
4
import sys
5
import re
6
7
import ssg.build_yaml
8
9
languages = ["anaconda", "ansible", "bash", "oval", "puppet"]
10
11
lang_to_ext_map = {
12
    "anaconda": ".anaconda",
13
    "ansible": ".yml",
14
    "bash": ".sh",
15
    "oval": ".xml",
16
    "puppet": ".pp"
17
}
18
19
# Callback functions for processing template parameters and/or validating them
20
21
22
def sysctl(data, lang):
23
    data["sysctlid"] = re.sub(r'[-\.]', '_', data["sysctlvar"])
24
    if not data.get("sysctlval"):
25
        data["sysctlval"] = ""
26
    ipv6_flag = "P"
27
    if data["sysctlid"].find("ipv6") >= 0:
28
        ipv6_flag = "I"
29
    data["flags"] = "SR" + ipv6_flag
30
    return data
31
32
33
def package_installed(data, lang):
34
    if "evr" in data:
35
        evr = data["evr"]
36
        if evr and not re.match(r'\d:\d[\d\w+.]*-\d[\d\w+.]*', evr, 0):
37
            raise RuntimeError(
38
                "ERROR: input violation: evr key should be in "
39
                "epoch:version-release format, but package {0} has set "
40
                "evr to {1}".format(data["pkgname"], evr))
41
    return data
42
43
44
templates = {
45
    "accounts_password": None,
46
    "auditd_lineinfile": None,
47
    "audit_rules_dac_modification": None,
48
    "audit_rules_file_deletion_events": None,
49
    "audit_rules_login_events": None,
50
    "audit_rules_path_syscall": None,
51
    "audit_rules_privileged_commands": None,
52
    "audit_rules_unsuccessful_file_modification": None,
53
    "audit_rules_unsuccessful_file_modification_o_creat": None,
54
    "audit_rules_unsuccessful_file_modification_o_trunc_write": None,
55
    "audit_rules_unsuccessful_file_modification_rule_order": None,
56
    "audit_rules_usergroup_modification": None,
57
    "file_groupowner": None,
58
    "file_owner": None,
59
    "file_permissions": None,
60
    "file_regex_permissions": None,
61
    "grub2_bootloader_argument": None,
62
    "kernel_module_disabled": None,
63
    "mount": None,
64
    "mount_option": None,
65
    "mount_option_remote_filesystems": None,
66
    "mount_option_removable_partitions": None,
67
    "mount_option_var": None,
68
    "ocp_service_runtime_config": None,
69
    "package_installed": package_installed,
70
    "package_removed": None,
71
    "permissions": None,
72
    "sebool": None,
73
    "sebool_var": None,
74
    "service_disabled": None,
75
    "service_enabled": None,
76
    "sshd_lineinfile": None,
77
    "sysctl": sysctl,
78
    "timer_enabled": None,
79
}
80
81
82
class Builder(object):
83
    """
84
    Class for building all templated content for a given product.
85
86
    To generate content from templates, pass the env_yaml, path to the
87
    directory with resolved rule YAMLs, path to the directory that contains
88
    templates, path to the output directory for checks and a path to the
89
    output directory for remediations into the constructor. Then, call the
90
    method build() to perform a build.
91
    """
92
    def __init__(
93
            self, env_yaml, resolved_rules_dir, templates_dir,
94
            remediations_dir, checks_dir):
95
        self.env_yaml = env_yaml
96
        self.resolved_rules_dir = resolved_rules_dir
97
        self.templates_dir = templates_dir
98
        self.remediations_dir = remediations_dir
99
        self.checks_dir = checks_dir
100
        self.output_dirs = dict()
101
        for lang in languages:
102
            if lang == "oval":
103
                # OVAL checks need to be put to a different directory because
104
                # they are processed differently than remediations later in the
105
                # build process
106
                output_dir = self.checks_dir
107
            else:
108
                output_dir = self.remediations_dir
109
            dir_ = os.path.join(output_dir, lang)
110
            self.output_dirs[lang] = dir_
111
112
    def preprocess_data(self, template, lang, raw_parameters):
113
        """
114
        Processes template data using a callback before the data will be
115
        substituted into the Jinja template.
116
        """
117
        template_func = templates[template]
118
        if template_func is not None:
119
            parameters = template_func(raw_parameters, lang)
120
        else:
121
            parameters = raw_parameters
122
        # TODO: Remove this right after the variables in templates are renamed
123
        # to lowercase
124
        parameters = {k.upper(): v for k, v in parameters.items()}
125
        return parameters
126
127
    def build_lang(self, rule, template_name, lang):
128
        """
129
        Builds templated content for a given rule for a given language.
130
        Writes the output to the correct build directories.
131
        """
132
        template_file_name = "template_{0}_{1}".format(
133
            lang.upper(), template_name)
134
        template_file_path = os.path.join(
135
            self.templates_dir, template_file_name)
136
        if not os.path.exists(template_file_path):
137
            return
138
        ext = lang_to_ext_map[lang]
139
        output_file_name = rule.id_ + ext
140
        output_filepath = os.path.join(
141
            self.output_dirs[lang], output_file_name)
142
143
        try:
144
            template_vars = rule.template["vars"]
145
        except KeyError:
146
            raise ValueError(
147
                "Rule {0} does not contain mandatory 'vars:' key under "
148
                "'template:' key.".format(rule.id_))
149
        template_parameters = self.preprocess_data(
150
            template_name, lang, template_vars)
151
        jinja_dict = ssg.utils.merge_dicts(self.env_yaml, template_parameters)
152
        filled_template = ssg.jinja.process_file_with_macros(
153
            template_file_path, jinja_dict)
154
        with open(output_filepath, "w") as f:
155
            f.write(filled_template)
156
157
    def get_langs_to_generate(self, rule):
158
        """
159
        For a given rule returns list of languages that should be generated
160
        from templates. This is controlled by "template_backends" in rule.yml.
161
        """
162
        if "backends" in rule.template:
163
            backends = rule.template["backends"]
164
            for lang in backends:
165
                if lang not in languages:
166
                    raise RuntimeError(
167
                        "Rule {0} wants to generate unknown language '{1}"
168
                        "from a template.".format(rule.id_, lang)
169
                    )
170
            langs_to_generate = []
171
            for lang in languages:
172
                backend = backends.get(lang, "on")
173
                if backend == "on":
174
                    langs_to_generate.append(lang)
175
            return langs_to_generate
176
        else:
177
            return languages
178
179
    def build_rule(self, rule):
180
        """
181
        Builds templated content for a given rule for all languages, writing
182
        the output to the correct build directories.
183
        """
184
        if rule.template is None:
185
            # rule is not templated, skipping
186
            return
187
        try:
188
            template_name = rule.template["name"]
189
        except KeyError:
190
            raise ValueError(
191
                "Rule {0} is missing template name under template key".format(
192
                    rule.id_))
193
        if template_name not in templates:
194
            raise ValueError(
195
                "Rule {0} uses template {1} which does not exist.".format(
196
                    rule.id_, template_name))
197
        if templates[template_name] is None:
198
            sys.stderr.write(
199
                "The template {0} has not been completely implemented, no "
200
                "content will be generated for rule {1}.\n".format(
201
                    template_name, rule.id_))
202
            return
203
        langs_to_generate = self.get_langs_to_generate(rule)
204
        for lang in langs_to_generate:
205
            self.build_lang(rule, template_name, lang)
206
207
    def build(self):
208
        """
209
        Builds all templated content for all languages, writing
210
        the output to the correct build directories.
211
        """
212
213
        for dir_ in self.output_dirs.values():
214
            if not os.path.exists(dir_):
215
                os.makedirs(dir_)
216
217
        for rule_file in os.listdir(self.resolved_rules_dir):
218
            rule_path = os.path.join(self.resolved_rules_dir, rule_file)
219
            rule = ssg.build_yaml.Rule.from_yaml(rule_path, self.env_yaml)
220
            self.build_rule(rule)
221