|
1
|
|
|
from __future__ import print_function |
|
2
|
|
|
|
|
3
|
|
|
import os |
|
4
|
|
|
import sys |
|
5
|
|
|
import re |
|
6
|
|
|
|
|
7
|
|
|
import ssg.build_yaml |
|
8
|
|
|
|
|
9
|
|
|
languages = ["anaconda", "ansible", "bash", "oval", "puppet"] |
|
10
|
|
|
|
|
11
|
|
|
lang_to_ext_map = { |
|
12
|
|
|
"anaconda": ".anaconda", |
|
13
|
|
|
"ansible": ".yml", |
|
14
|
|
|
"bash": ".sh", |
|
15
|
|
|
"oval": ".xml", |
|
16
|
|
|
"puppet": ".pp" |
|
17
|
|
|
} |
|
18
|
|
|
|
|
19
|
|
|
# Callback functions for processing template parameters and/or validating them |
|
20
|
|
|
|
|
21
|
|
|
|
|
22
|
|
|
def sysctl(data, lang): |
|
23
|
|
|
data["sysctlid"] = re.sub(r'[-\.]', '_', data["sysctlvar"]) |
|
24
|
|
|
if not data.get("sysctlval"): |
|
25
|
|
|
data["sysctlval"] = "" |
|
26
|
|
|
ipv6_flag = "P" |
|
27
|
|
|
if data["sysctlid"].find("ipv6") >= 0: |
|
28
|
|
|
ipv6_flag = "I" |
|
29
|
|
|
data["flags"] = "SR" + ipv6_flag |
|
30
|
|
|
return data |
|
31
|
|
|
|
|
32
|
|
|
|
|
33
|
|
|
def package_installed(data, lang): |
|
34
|
|
|
if "evr" in data: |
|
35
|
|
|
evr = data["evr"] |
|
36
|
|
|
if evr and not re.match(r'\d:\d[\d\w+.]*-\d[\d\w+.]*', evr, 0): |
|
37
|
|
|
raise RuntimeError( |
|
38
|
|
|
"ERROR: input violation: evr key should be in " |
|
39
|
|
|
"epoch:version-release format, but package {0} has set " |
|
40
|
|
|
"evr to {1}".format(data["pkgname"], evr)) |
|
41
|
|
|
return data |
|
42
|
|
|
|
|
43
|
|
|
|
|
44
|
|
|
templates = { |
|
45
|
|
|
"accounts_password": None, |
|
46
|
|
|
"auditd_lineinfile": None, |
|
47
|
|
|
"audit_rules_dac_modification": None, |
|
48
|
|
|
"audit_rules_file_deletion_events": None, |
|
49
|
|
|
"audit_rules_login_events": None, |
|
50
|
|
|
"audit_rules_path_syscall": None, |
|
51
|
|
|
"audit_rules_privileged_commands": None, |
|
52
|
|
|
"audit_rules_unsuccessful_file_modification": None, |
|
53
|
|
|
"audit_rules_unsuccessful_file_modification_o_creat": None, |
|
54
|
|
|
"audit_rules_unsuccessful_file_modification_o_trunc_write": None, |
|
55
|
|
|
"audit_rules_unsuccessful_file_modification_rule_order": None, |
|
56
|
|
|
"audit_rules_usergroup_modification": None, |
|
57
|
|
|
"file_groupowner": None, |
|
58
|
|
|
"file_owner": None, |
|
59
|
|
|
"file_permissions": None, |
|
60
|
|
|
"file_regex_permissions": None, |
|
61
|
|
|
"grub2_bootloader_argument": None, |
|
62
|
|
|
"kernel_module_disabled": None, |
|
63
|
|
|
"mount": None, |
|
64
|
|
|
"mount_option": None, |
|
65
|
|
|
"mount_option_remote_filesystems": None, |
|
66
|
|
|
"mount_option_removable_partitions": None, |
|
67
|
|
|
"mount_option_var": None, |
|
68
|
|
|
"ocp_service_runtime_config": None, |
|
69
|
|
|
"package_installed": package_installed, |
|
70
|
|
|
"package_removed": None, |
|
71
|
|
|
"permissions": None, |
|
72
|
|
|
"sebool": None, |
|
73
|
|
|
"sebool_var": None, |
|
74
|
|
|
"service_disabled": None, |
|
75
|
|
|
"service_enabled": None, |
|
76
|
|
|
"sshd_lineinfile": None, |
|
77
|
|
|
"sysctl": sysctl, |
|
78
|
|
|
"timer_enabled": None, |
|
79
|
|
|
} |
|
80
|
|
|
|
|
81
|
|
|
|
|
82
|
|
|
class Builder(object): |
|
83
|
|
|
""" |
|
84
|
|
|
Class for building all templated content for a given product. |
|
85
|
|
|
|
|
86
|
|
|
To generate content from templates, pass the env_yaml, path to the |
|
87
|
|
|
directory with resolved rule YAMLs, path to the directory that contains |
|
88
|
|
|
templates, path to the output directory for checks and a path to the |
|
89
|
|
|
output directory for remediations into the constructor. Then, call the |
|
90
|
|
|
method build() to perform a build. |
|
91
|
|
|
""" |
|
92
|
|
|
def __init__( |
|
93
|
|
|
self, env_yaml, resolved_rules_dir, templates_dir, |
|
94
|
|
|
remediations_dir, checks_dir): |
|
95
|
|
|
self.env_yaml = env_yaml |
|
96
|
|
|
self.resolved_rules_dir = resolved_rules_dir |
|
97
|
|
|
self.templates_dir = templates_dir |
|
98
|
|
|
self.remediations_dir = remediations_dir |
|
99
|
|
|
self.checks_dir = checks_dir |
|
100
|
|
|
self.output_dirs = dict() |
|
101
|
|
|
for lang in languages: |
|
102
|
|
|
if lang == "oval": |
|
103
|
|
|
# OVAL checks need to be put to a different directory because |
|
104
|
|
|
# they are processed differently than remediations later in the |
|
105
|
|
|
# build process |
|
106
|
|
|
output_dir = self.checks_dir |
|
107
|
|
|
else: |
|
108
|
|
|
output_dir = self.remediations_dir |
|
109
|
|
|
dir_ = os.path.join(output_dir, lang) |
|
110
|
|
|
self.output_dirs[lang] = dir_ |
|
111
|
|
|
|
|
112
|
|
|
def preprocess_data(self, template, lang, raw_parameters): |
|
113
|
|
|
""" |
|
114
|
|
|
Processes template data using a callback before the data will be |
|
115
|
|
|
substituted into the Jinja template. |
|
116
|
|
|
""" |
|
117
|
|
|
template_func = templates[template] |
|
118
|
|
|
if template_func is not None: |
|
119
|
|
|
parameters = template_func(raw_parameters, lang) |
|
120
|
|
|
else: |
|
121
|
|
|
parameters = raw_parameters |
|
122
|
|
|
# TODO: Remove this right after the variables in templates are renamed |
|
123
|
|
|
# to lowercase |
|
124
|
|
|
parameters = {k.upper(): v for k, v in parameters.items()} |
|
125
|
|
|
return parameters |
|
126
|
|
|
|
|
127
|
|
|
def build_lang(self, rule, template_name, lang): |
|
128
|
|
|
""" |
|
129
|
|
|
Builds templated content for a given rule for a given language. |
|
130
|
|
|
Writes the output to the correct build directories. |
|
131
|
|
|
""" |
|
132
|
|
|
template_file_name = "template_{0}_{1}".format( |
|
133
|
|
|
lang.upper(), template_name) |
|
134
|
|
|
template_file_path = os.path.join( |
|
135
|
|
|
self.templates_dir, template_file_name) |
|
136
|
|
|
if not os.path.exists(template_file_path): |
|
137
|
|
|
return |
|
138
|
|
|
ext = lang_to_ext_map[lang] |
|
139
|
|
|
output_file_name = rule.id_ + ext |
|
140
|
|
|
output_filepath = os.path.join( |
|
141
|
|
|
self.output_dirs[lang], output_file_name) |
|
142
|
|
|
|
|
143
|
|
|
try: |
|
144
|
|
|
template_vars = rule.template["vars"] |
|
145
|
|
|
except KeyError: |
|
146
|
|
|
raise ValueError( |
|
147
|
|
|
"Rule {0} does not contain mandatory 'vars:' key under " |
|
148
|
|
|
"'template:' key.".format(rule.id_)) |
|
149
|
|
|
template_parameters = self.preprocess_data( |
|
150
|
|
|
template_name, lang, template_vars) |
|
151
|
|
|
jinja_dict = ssg.utils.merge_dicts(self.env_yaml, template_parameters) |
|
152
|
|
|
filled_template = ssg.jinja.process_file_with_macros( |
|
153
|
|
|
template_file_path, jinja_dict) |
|
154
|
|
|
with open(output_filepath, "w") as f: |
|
155
|
|
|
f.write(filled_template) |
|
156
|
|
|
|
|
157
|
|
|
def get_langs_to_generate(self, rule): |
|
158
|
|
|
""" |
|
159
|
|
|
For a given rule returns list of languages that should be generated |
|
160
|
|
|
from templates. This is controlled by "template_backends" in rule.yml. |
|
161
|
|
|
""" |
|
162
|
|
|
if "backends" in rule.template: |
|
163
|
|
|
backends = rule.template["backends"] |
|
164
|
|
|
for lang in backends: |
|
165
|
|
|
if lang not in languages: |
|
166
|
|
|
raise RuntimeError( |
|
167
|
|
|
"Rule {0} wants to generate unknown language '{1}" |
|
168
|
|
|
"from a template.".format(rule.id_, lang) |
|
169
|
|
|
) |
|
170
|
|
|
langs_to_generate = [] |
|
171
|
|
|
for lang in languages: |
|
172
|
|
|
backend = backends.get(lang, "on") |
|
173
|
|
|
if backend == "on": |
|
174
|
|
|
langs_to_generate.append(lang) |
|
175
|
|
|
return langs_to_generate |
|
176
|
|
|
else: |
|
177
|
|
|
return languages |
|
178
|
|
|
|
|
179
|
|
|
def build_rule(self, rule): |
|
180
|
|
|
""" |
|
181
|
|
|
Builds templated content for a given rule for all languages, writing |
|
182
|
|
|
the output to the correct build directories. |
|
183
|
|
|
""" |
|
184
|
|
|
if rule.template is None: |
|
185
|
|
|
# rule is not templated, skipping |
|
186
|
|
|
return |
|
187
|
|
|
try: |
|
188
|
|
|
template_name = rule.template["name"] |
|
189
|
|
|
except KeyError: |
|
190
|
|
|
raise ValueError( |
|
191
|
|
|
"Rule {0} is missing template name under template key".format( |
|
192
|
|
|
rule.id_)) |
|
193
|
|
|
if template_name not in templates: |
|
194
|
|
|
raise ValueError( |
|
195
|
|
|
"Rule {0} uses template {1} which does not exist.".format( |
|
196
|
|
|
rule.id_, template_name)) |
|
197
|
|
|
if templates[template_name] is None: |
|
198
|
|
|
sys.stderr.write( |
|
199
|
|
|
"The template {0} has not been completely implemented, no " |
|
200
|
|
|
"content will be generated for rule {1}.\n".format( |
|
201
|
|
|
template_name, rule.id_)) |
|
202
|
|
|
return |
|
203
|
|
|
langs_to_generate = self.get_langs_to_generate(rule) |
|
204
|
|
|
for lang in langs_to_generate: |
|
205
|
|
|
self.build_lang(rule, template_name, lang) |
|
206
|
|
|
|
|
207
|
|
|
def build(self): |
|
208
|
|
|
""" |
|
209
|
|
|
Builds all templated content for all languages, writing |
|
210
|
|
|
the output to the correct build directories. |
|
211
|
|
|
""" |
|
212
|
|
|
|
|
213
|
|
|
for dir_ in self.output_dirs.values(): |
|
214
|
|
|
if not os.path.exists(dir_): |
|
215
|
|
|
os.makedirs(dir_) |
|
216
|
|
|
|
|
217
|
|
|
for rule_file in os.listdir(self.resolved_rules_dir): |
|
218
|
|
|
rule_path = os.path.join(self.resolved_rules_dir, rule_file) |
|
219
|
|
|
rule = ssg.build_yaml.Rule.from_yaml(rule_path, self.env_yaml) |
|
220
|
|
|
self.build_rule(rule) |
|
221
|
|
|
|