1 | #!/usr/bin/python3 |
||
2 | |||
3 | import argparse |
||
4 | import collections |
||
5 | import os |
||
6 | import re |
||
7 | import xml.etree.ElementTree as ET |
||
8 | |||
9 | import ssg.ansible |
||
10 | from ssg.constants import ( |
||
11 | ansible_system, |
||
12 | bash_system, |
||
13 | datastream_namespace, |
||
14 | OSCAP_PROFILE, |
||
15 | OSCAP_RULE, |
||
16 | XCCDF12_NS, |
||
17 | ) |
||
18 | |||
19 | |||
20 | DEFAULT_SELECTOR = "__DEFAULT" |
||
21 | HASH_ROW = "#" * 79 |
||
22 | LANGUAGE_TO_SYSTEM = {"ansible": ansible_system, "bash": bash_system} |
||
23 | LANGUAGE_TO_TARGET = {"ansible": "playbook", "bash": "script"} |
||
24 | LANGUAGE_TO_EXTENSION = {"ansible": "yml", "bash": "sh"} |
||
25 | ANSIBLE_VAR_PATTERN = re.compile( |
||
26 | "- name: XCCDF Value [^ ]+ # promote to variable\n set_fact:\n" |
||
27 | " ([^:]+): (.+)\n tags:\n - always\n") |
||
28 | |||
29 | |||
30 | def parse_args(): |
||
31 | parser = argparse.ArgumentParser() |
||
32 | parser.add_argument( |
||
33 | "--data-stream", required=True, |
||
34 | help="Path of a SCAP source data stream file" |
||
35 | ) |
||
36 | parser.add_argument( |
||
37 | "--output-dir", required=True, |
||
38 | help="Path of the output directory to save generated files" |
||
39 | ) |
||
40 | parser.add_argument( |
||
41 | "--product", required=True, |
||
42 | help="Product ID, eg. 'rhel9'" |
||
43 | ) |
||
44 | parser.add_argument( |
||
45 | "--language", required=True, choices=["bash", "ansible"], |
||
46 | help="Remediation language, either 'bash' or 'ansible'" |
||
47 | ) |
||
48 | args = parser.parse_args() |
||
49 | return args |
||
50 | |||
51 | |||
52 | def extract_ansible_vars(string): |
||
53 | ansible_vars = {} |
||
54 | for match in ANSIBLE_VAR_PATTERN.finditer(string): |
||
55 | variable_name, value = match.groups() |
||
56 | ansible_vars[variable_name] = value |
||
57 | return ansible_vars |
||
58 | |||
59 | |||
60 | def indent(string, level): |
||
61 | out = "" |
||
62 | for line in string.splitlines(): |
||
63 | out += " " * level + line + "\n" |
||
64 | return out |
||
65 | |||
66 | |||
67 | def extract_ansible_tasks(string): |
||
68 | string = ANSIBLE_VAR_PATTERN.sub("\n", string) |
||
69 | string = indent(string, 4) |
||
70 | string = ssg.ansible.remove_trailing_whitespace(string) |
||
71 | return [string] |
||
72 | |||
73 | |||
74 | def comment(text): |
||
75 | commented_text = "" |
||
76 | for line in text.split("\n"): |
||
77 | trimmed_line = line.strip() |
||
78 | if trimmed_line != "": |
||
79 | commented_line = "# " + trimmed_line + "\n" |
||
80 | commented_text += commented_line |
||
81 | return commented_text |
||
82 | |||
83 | |||
84 | def get_selected_rules(profile): |
||
85 | selected_rules = set() |
||
86 | for select in profile.findall("./{%s}select" % XCCDF12_NS): |
||
87 | if select.get("selected") == "true": |
||
88 | id_ = select.get("idref") |
||
89 | if id_.startswith(OSCAP_RULE): |
||
90 | selected_rules.add(id_) |
||
91 | return selected_rules |
||
92 | |||
93 | |||
94 | def get_value_refinenements(profile): |
||
95 | refinements = {} |
||
96 | for refine_value in profile.findall("./{%s}refine-value" % XCCDF12_NS): |
||
97 | value_id = refine_value.get("idref") |
||
98 | selector = refine_value.get("selector") |
||
99 | refinements[value_id] = selector |
||
100 | return refinements |
||
101 | |||
102 | |||
103 | def get_remediation(rule, system): |
||
104 | for fix in rule.findall("./{%s}fix" % XCCDF12_NS): |
||
105 | if fix.get("system") == system: |
||
106 | return fix |
||
107 | return None |
||
108 | |||
109 | |||
110 | def _get_all_itms(benchmark, element_name, callback): |
||
111 | itms = {} |
||
112 | el_xpath = ".//{%s}%s" % (XCCDF12_NS, element_name) |
||
113 | for el in benchmark.findall(el_xpath): |
||
114 | rule_id = el.get("id") |
||
115 | itms[rule_id] = callback(el) |
||
116 | return itms |
||
117 | |||
118 | |||
119 | def get_variable_values(variable): |
||
120 | values = {} |
||
121 | for value in variable.findall("./{%s}value" % (XCCDF12_NS)): |
||
122 | selector = value.get("selector") |
||
123 | if selector is None: |
||
124 | selector = DEFAULT_SELECTOR |
||
125 | if value.text is None: |
||
126 | values["selector"] = "" |
||
127 | else: |
||
128 | values[selector] = value.text |
||
129 | return values |
||
130 | |||
131 | |||
132 | def get_all_variables(benchmark): |
||
133 | return _get_all_itms(benchmark, "Value", get_variable_values) |
||
134 | |||
135 | |||
136 | def expand_variables(fix_el, refinements, variables): |
||
137 | content = fix_el.text[:] |
||
138 | for sub_el in fix_el.findall("./{%s}sub" % XCCDF12_NS): |
||
139 | variable_id = sub_el.get("idref") |
||
140 | values = variables[variable_id] |
||
141 | selector = refinements.get(variable_id, DEFAULT_SELECTOR) |
||
142 | if selector == "default": |
||
143 | return None |
||
144 | if selector in values: |
||
145 | value = values[selector] |
||
146 | else: |
||
147 | value = list(values.values())[0] |
||
148 | content += value |
||
149 | content += sub_el.tail |
||
150 | return content |
||
151 | |||
152 | |||
153 | def ansible_vars_to_string(variables): |
||
154 | var_strings = [] |
||
155 | indent = 4 * " " |
||
156 | for k, v in variables.items(): |
||
157 | var_string = "%s%s: %s\n" % (indent, k, v) |
||
158 | var_strings.append(var_string) |
||
159 | return "".join(var_strings) |
||
160 | |||
161 | |||
162 | def ansible_tasks_to_string(tasks): |
||
163 | # tasks need to be separated by 2 empty lines |
||
164 | tasks_str = "\n\n".join(tasks) |
||
165 | tasks_str = ssg.ansible.remove_too_many_blank_lines(tasks_str) |
||
166 | return tasks_str |
||
167 | |||
168 | |||
169 | class ScriptGenerator: |
||
170 | def __init__(self, language, product_id, ds_file_path, output_dir): |
||
171 | self.language = language |
||
172 | self.product_id = product_id |
||
173 | self.ds_file_path = ds_file_path |
||
174 | self.output_dir = output_dir |
||
175 | self.get_benchmark_data() |
||
176 | |||
177 | def generate_remediation_scripts(self): |
||
178 | profile_xpath = "./{%s}component/{%s}Benchmark/{%s}Profile" % ( |
||
179 | datastream_namespace, XCCDF12_NS, XCCDF12_NS) |
||
180 | for profile in self.ds.findall(profile_xpath): |
||
181 | self.generate_profile_remediation_script(profile) |
||
182 | |||
183 | def get_benchmark_data(self): |
||
184 | self.ds = ET.parse(self.ds_file_path) |
||
185 | self.ds_file_name = os.path.basename(self.ds_file_path) |
||
186 | benchmark_xpath = "./{%s}component/{%s}Benchmark" % ( |
||
187 | datastream_namespace, XCCDF12_NS) |
||
188 | benchmark = self.ds.find(benchmark_xpath) |
||
189 | self.benchmark_id = benchmark.get("id") |
||
190 | self.benchmark_version = benchmark.find( |
||
191 | "./{%s}version" % XCCDF12_NS).text |
||
192 | self.load_all_remediations(benchmark) |
||
193 | self.variables = get_all_variables(benchmark) |
||
194 | |||
195 | def load_all_remediations(self, benchmark): |
||
196 | self.remediations = collections.OrderedDict() |
||
197 | rule_xpath = ".//{%s}Rule" % (XCCDF12_NS) |
||
198 | for rule_el in benchmark.findall(rule_xpath): |
||
199 | rule_id = rule_el.get("id") |
||
200 | system = LANGUAGE_TO_SYSTEM[self.language] |
||
201 | remediation = get_remediation(rule_el, system) |
||
202 | self.remediations[rule_id] = remediation |
||
203 | |||
204 | def generate_profile_remediation_script(self, profile_el): |
||
205 | if self.language == "ansible": |
||
206 | output = self.create_output_ansible(profile_el) |
||
207 | elif self.language == "bash": |
||
208 | output = self.create_output_bash(profile_el) |
||
209 | file_path = self.get_output_file_path(profile_el) |
||
210 | with open(file_path, "wb") as f: |
||
211 | f.write(output.encode("utf-8")) |
||
0 ignored issues
–
show
introduced
by
![]() |
|||
212 | |||
213 | def get_output_file_path(self, profile_el): |
||
214 | short_profile_id = profile_el.get("id").replace(OSCAP_PROFILE, "") |
||
215 | file_name = "%s-%s-%s.%s" % ( |
||
216 | self.product_id, LANGUAGE_TO_TARGET[self.language], |
||
217 | short_profile_id, LANGUAGE_TO_EXTENSION[self.language]) |
||
218 | file_path = os.path.join(self.output_dir, file_name) |
||
219 | return file_path |
||
220 | |||
221 | def create_output_ansible(self, profile_el): |
||
222 | ansible_vars, ansible_tasks = self.collect_ansible_vars_and_tasks( |
||
223 | profile_el) |
||
224 | header = self.create_header(profile_el) |
||
225 | profile_id = profile_el.get("id") |
||
226 | vars_str = ansible_vars_to_string(ansible_vars) |
||
227 | tasks_str = ansible_tasks_to_string(ansible_tasks) |
||
228 | output = ( |
||
229 | "%s\n" |
||
230 | "- name: Ansible Playbook for %s\n" |
||
231 | " hosts: all\n" |
||
232 | " vars:\n" |
||
233 | "%s" |
||
234 | " tasks:\n" |
||
235 | "%s" |
||
236 | % (header, profile_id, vars_str, tasks_str)) |
||
237 | return output |
||
238 | |||
239 | def collect_ansible_vars_and_tasks(self, profile_el): |
||
240 | selected_rules = get_selected_rules(profile_el) |
||
241 | refinements = get_value_refinenements(profile_el) |
||
242 | all_vars = collections.OrderedDict() |
||
243 | all_tasks = [] |
||
244 | for rule_id, fix_el in self.remediations.items(): |
||
245 | if rule_id not in selected_rules: |
||
246 | continue |
||
247 | rule_vars, rule_tasks = self.generate_ansible_rule_remediation( |
||
248 | fix_el, refinements) |
||
249 | all_vars.update(rule_vars) |
||
250 | all_tasks.extend(rule_tasks) |
||
251 | return (all_vars, all_tasks) |
||
252 | |||
253 | def create_output_bash(self, profile): |
||
254 | output = [] |
||
255 | selected_rules = get_selected_rules(profile) |
||
256 | refinements = get_value_refinenements(profile) |
||
257 | header = self.create_header(profile) |
||
258 | output.append(header) |
||
259 | total = len(selected_rules) |
||
260 | current = 1 |
||
261 | for rule_id in self.remediations: |
||
262 | if rule_id not in selected_rules: |
||
263 | continue |
||
264 | status = (current, total) |
||
265 | rule_remediation = self.generate_bash_rule_remediation( |
||
266 | rule_id, status, refinements) |
||
267 | output.append(rule_remediation) |
||
268 | current += 1 |
||
269 | return "".join(output) |
||
270 | |||
271 | def create_header(self, profile): |
||
272 | if self.language == "ansible": |
||
273 | shebang_with_newline = "---\n" |
||
274 | remediation_type = "Ansible Playbook" |
||
275 | how_to_apply = ( |
||
276 | "# $ ansible-playbook -i \"localhost,\"" |
||
277 | " -c local playbook.yml\n" |
||
278 | "# $ ansible-playbook -i \"192.168.1.155,\" playbook.yml\n" |
||
279 | "# $ ansible-playbook -i inventory.ini playbook.yml\n" |
||
280 | ) |
||
281 | elif self.language == "bash": |
||
282 | shebang_with_newline = "#!/usr/bin/env bash\n" |
||
283 | remediation_type = "Bash Remediation Script" |
||
284 | how_to_apply = "# $ sudo ./remediation-script.sh\n" |
||
285 | profile_title = profile.find("./{%s}title" % XCCDF12_NS).text |
||
286 | description = profile.find("./{%s}description" % XCCDF12_NS).text |
||
287 | commented_profile_description = comment(description) |
||
288 | xccdf_version_name = "1.2" |
||
289 | profile_id = profile.get("id") |
||
290 | fix_header = ( |
||
291 | "%s" |
||
292 | "%s\n" |
||
293 | "#\n" |
||
294 | "# %s for %s\n" |
||
295 | "#\n" |
||
296 | "# Profile Description:\n" |
||
297 | "%s" |
||
298 | "#\n" |
||
299 | "# Profile ID: %s\n" |
||
300 | "# Benchmark ID: %s\n" |
||
301 | "# Benchmark Version: %s\n" |
||
302 | "# XCCDF Version: %s\n" |
||
303 | "#\n" |
||
304 | "# This file can be generated by OpenSCAP using:\n" |
||
305 | "# $ oscap xccdf generate fix --profile %s --fix-type %s %s\n" |
||
306 | "#\n" |
||
307 | "# This %s is generated from an XCCDF profile without" |
||
308 | " preliminary evaluation.\n" |
||
309 | "# It attempts to fix every selected rule, even if the system is" |
||
310 | " already compliant.\n" |
||
311 | "#\n" |
||
312 | "# How to apply this %s:\n" |
||
313 | "%s" |
||
314 | "#\n" |
||
315 | "%s\n\n" % ( |
||
316 | shebang_with_newline, HASH_ROW, remediation_type, |
||
0 ignored issues
–
show
|
|||
317 | profile_title, commented_profile_description, profile_id, |
||
318 | self.benchmark_id, self.benchmark_version, xccdf_version_name, |
||
319 | profile_id, self.language, self.ds_file_name, |
||
320 | remediation_type, remediation_type, how_to_apply, HASH_ROW)) |
||
0 ignored issues
–
show
|
|||
321 | return fix_header |
||
322 | |||
323 | def generate_bash_rule_remediation(self, rule_id, status, refinements): |
||
324 | current, total = status |
||
325 | fix_el = self.remediations[rule_id] |
||
326 | output = [] |
||
327 | header = ( |
||
328 | "%s\n" |
||
329 | "# BEGIN fix (%s / %s) for '%s'\n" |
||
330 | "%s\n" % (HASH_ROW, current, total, rule_id, HASH_ROW)) |
||
331 | output.append(header) |
||
332 | begin_msg = "(>&2 echo \"Remediating rule %s/%s: '%s'\")\n" % ( |
||
333 | current, total, rule_id) |
||
334 | output.append(begin_msg) |
||
335 | expanded_remediation = None |
||
336 | if fix_el is not None: |
||
337 | expanded_remediation = expand_variables( |
||
338 | fix_el, refinements, self.variables) |
||
339 | if expanded_remediation is not None: |
||
340 | output.append(expanded_remediation) |
||
341 | else: |
||
342 | warning = ( |
||
343 | "(>&2 echo \"FIX FOR THIS RULE '%s' IS MISSING!\")\n" % |
||
344 | (rule_id)) |
||
345 | output.append(warning) |
||
346 | end_msg = "\n# END fix for '%s'\n\n" % (rule_id) |
||
347 | output.append(end_msg) |
||
348 | return "".join(output) |
||
349 | |||
350 | def generate_ansible_rule_remediation(self, fix_el, refinements): |
||
351 | rule_vars = {} |
||
352 | tasks = [] |
||
353 | expanded_remediation = None |
||
354 | if fix_el is not None: |
||
355 | expanded_remediation = expand_variables( |
||
356 | fix_el, refinements, self.variables) |
||
357 | if expanded_remediation is not None: |
||
358 | rule_vars = extract_ansible_vars(expanded_remediation) |
||
359 | tasks = extract_ansible_tasks(expanded_remediation) |
||
360 | return rule_vars, tasks |
||
361 | |||
362 | |||
363 | def main(): |
||
364 | args = parse_args() |
||
365 | if not os.path.exists(args.output_dir): |
||
366 | os.mkdir(args.output_dir) |
||
367 | sg = ScriptGenerator( |
||
368 | args.language, args.product, args.data_stream, args.output_dir) |
||
369 | sg.generate_remediation_scripts() |
||
370 | |||
371 | |||
372 | if __name__ == "__main__": |
||
373 | main() |
||
374 |