1 | from __future__ import absolute_import |
||
2 | from __future__ import print_function |
||
3 | |||
4 | import sys |
||
5 | import os |
||
6 | import os.path |
||
7 | import re |
||
8 | from collections import defaultdict, namedtuple, OrderedDict |
||
9 | |||
10 | import ssg.yaml |
||
11 | import ssg.build_yaml |
||
12 | from . import rules |
||
13 | from . import utils |
||
14 | |||
15 | from . import constants |
||
16 | from .jinja import process_file_with_macros as jinja_process_file |
||
17 | |||
18 | from .xml import ElementTree |
||
19 | from .constants import XCCDF12_NS |
||
20 | |||
21 | REMEDIATION_TO_EXT_MAP = { |
||
22 | 'anaconda': '.anaconda', |
||
23 | 'ansible': '.yml', |
||
24 | 'bash': '.sh', |
||
25 | 'puppet': '.pp', |
||
26 | 'ignition': '.yml', |
||
27 | 'kubernetes': '.yml', |
||
28 | 'blueprint': '.toml' |
||
29 | } |
||
30 | |||
31 | |||
32 | FILE_GENERATED_HASH_COMMENT = '# THIS FILE IS GENERATED' |
||
33 | |||
34 | REMEDIATION_CONFIG_KEYS = ['complexity', 'disruption', 'platform', 'reboot', |
||
35 | 'strategy'] |
||
36 | REMEDIATION_ELM_KEYS = ['complexity', 'disruption', 'reboot', 'strategy'] |
||
37 | |||
38 | RemediationObject = namedtuple('remediation', ['contents', 'config']) |
||
39 | |||
40 | |||
41 | def is_supported_filename(remediation_type, filename): |
||
42 | """ |
||
43 | Checks if filename has a supported extension for remediation_type. |
||
44 | |||
45 | Exits when remediation_type is of an unknown type. |
||
46 | """ |
||
47 | if remediation_type in REMEDIATION_TO_EXT_MAP: |
||
48 | return filename.endswith(REMEDIATION_TO_EXT_MAP[remediation_type]) |
||
49 | |||
50 | sys.stderr.write("ERROR: Unknown remediation type '%s'!\n" |
||
51 | % (remediation_type)) |
||
52 | sys.exit(1) |
||
53 | |||
54 | |||
55 | def split_remediation_content_and_metadata(fix_file): |
||
56 | remediation_contents = [] |
||
57 | config = defaultdict(lambda: None) |
||
58 | |||
59 | # Assignment automatically escapes shell characters for XML |
||
60 | for line in fix_file.splitlines(): |
||
61 | if line.startswith(FILE_GENERATED_HASH_COMMENT): |
||
62 | continue |
||
63 | |||
64 | if line.startswith('#') and line.count('=') == 1: |
||
65 | (key, value) = line.strip('#').split('=') |
||
66 | if key.strip() in REMEDIATION_CONFIG_KEYS: |
||
67 | config[key.strip()] = value.strip() |
||
68 | continue |
||
69 | |||
70 | # If our parsed line wasn't a config item, add it to the |
||
71 | # returned file contents. This includes when the line |
||
72 | # begins with a '#' and contains an equals sign, but |
||
73 | # the "key" isn't one of the known keys from |
||
74 | # REMEDIATION_CONFIG_KEYS. |
||
75 | remediation_contents.append(line) |
||
76 | |||
77 | contents = "\n".join(remediation_contents) |
||
78 | return RemediationObject(contents=contents, config=config) |
||
79 | |||
80 | |||
81 | def parse_from_file_with_jinja(file_path, env_yaml): |
||
82 | """ |
||
83 | Parses a remediation from a file. As remediations contain jinja macros, |
||
84 | we need a env_yaml context to process these. In practice, no remediations |
||
85 | use jinja in the configuration, so for extracting only the configuration, |
||
86 | env_yaml can be an abritrary product.yml dictionary. |
||
87 | |||
88 | If the logic of configuration parsing changes significantly, please also |
||
89 | update ssg.fixes.parse_platform(...). |
||
90 | """ |
||
91 | |||
92 | fix_file = jinja_process_file(file_path, env_yaml) |
||
93 | return split_remediation_content_and_metadata(fix_file) |
||
94 | |||
95 | |||
96 | def parse_from_file_without_jinja(file_path): |
||
97 | """ |
||
98 | Parses a remediation from a file. Doesn't process the Jinja macros. |
||
99 | This function is useful in build phases in which all the Jinja macros |
||
100 | are already resolved. |
||
101 | """ |
||
102 | with open(file_path, "r") as f: |
||
103 | f_str = f.read() |
||
104 | return split_remediation_content_and_metadata(f_str) |
||
105 | |||
106 | |||
107 | class Remediation(object): |
||
108 | def __init__(self, file_path, remediation_type): |
||
109 | self.file_path = file_path |
||
110 | self.local_env_yaml = dict() |
||
111 | |||
112 | self.metadata = defaultdict(lambda: None) |
||
113 | |||
114 | self.remediation_type = remediation_type |
||
115 | self.associated_rule = None |
||
116 | |||
117 | def associate_rule(self, rule_obj): |
||
118 | self.associated_rule = rule_obj |
||
119 | self.expand_env_yaml_from_rule() |
||
120 | |||
121 | def expand_env_yaml_from_rule(self): |
||
122 | if not self.associated_rule: |
||
123 | return |
||
124 | |||
125 | self.local_env_yaml["rule_title"] = self.associated_rule.title |
||
126 | self.local_env_yaml["rule_id"] = self.associated_rule.id_ |
||
127 | self.local_env_yaml["cce_identifiers"] = self.associated_rule.identifiers |
||
128 | |||
129 | def parse_from_file_with_jinja(self, env_yaml, cpe_platforms): |
||
130 | return parse_from_file_with_jinja(self.file_path, env_yaml) |
||
131 | |||
132 | def get_inherited_cpe_platform_names(self): |
||
133 | inherited_cpe_platform_names = set() |
||
134 | if self.associated_rule: |
||
135 | # There can be repeated inherited platforms and rule platforms |
||
136 | inherited_cpe_platform_names.update(self.associated_rule.inherited_cpe_platform_names) |
||
137 | return inherited_cpe_platform_names |
||
138 | |||
139 | def get_rule_specific_cpe_platform_names(self): |
||
140 | rule_specific_cpe_platform_names = set() |
||
141 | inherited_cpe_platform_names = self.get_inherited_cpe_platform_names() |
||
142 | if self.associated_rule and self.associated_rule.cpe_platform_names is not None: |
||
143 | rule_specific_cpe_platform_names = { |
||
144 | p for p in self.associated_rule.cpe_platform_names |
||
145 | if p not in inherited_cpe_platform_names} |
||
146 | return rule_specific_cpe_platform_names |
||
147 | |||
148 | def _get_stripped_conditional(self, language, platform): |
||
149 | conditional = platform.get_remediation_conditional(language) |
||
150 | if conditional is not None: |
||
151 | stripped_conditional = conditional.strip() |
||
152 | if stripped_conditional: |
||
153 | return stripped_conditional |
||
154 | return None |
||
155 | |||
156 | def get_stripped_conditionals(self, language, cpe_platform_names, cpe_platforms): |
||
157 | """ |
||
158 | collect conditionals of platforms defined by cpe_platform_names |
||
159 | and strip them of white spaces |
||
160 | """ |
||
161 | stripped_conditionals = [] |
||
162 | for p in cpe_platform_names: |
||
163 | platform = cpe_platforms[p] |
||
164 | maybe_stripped_conditional = self._get_stripped_conditional(language, platform) |
||
165 | if maybe_stripped_conditional is not None: |
||
166 | stripped_conditionals.append(maybe_stripped_conditional) |
||
167 | return stripped_conditionals |
||
168 | |||
169 | def get_rule_specific_conditionals(self, language, cpe_platforms): |
||
170 | cpe_platform_names = self.get_rule_specific_cpe_platform_names() |
||
171 | return self.get_stripped_conditionals(language, cpe_platform_names, cpe_platforms) |
||
172 | |||
173 | def get_inherited_conditionals(self, language, cpe_platforms): |
||
174 | cpe_platform_names = self.get_inherited_cpe_platform_names() |
||
175 | return self.get_stripped_conditionals(language, cpe_platform_names, cpe_platforms) |
||
176 | |||
177 | |||
178 | def process(remediation, env_yaml, cpe_platforms): |
||
179 | """ |
||
180 | Process a fix, and return the processed fix iff the file is of a valid |
||
181 | extension for the remediation type and the fix is valid for the current |
||
182 | product. |
||
183 | |||
184 | Note that platform is a required field in the contents of the fix. |
||
185 | """ |
||
186 | if not is_supported_filename(remediation.remediation_type, remediation.file_path): |
||
187 | return |
||
188 | |||
189 | result = remediation.parse_from_file_with_jinja(env_yaml, cpe_platforms) |
||
190 | platforms = result.config['platform'] |
||
191 | |||
192 | if not platforms: |
||
193 | raise RuntimeError( |
||
194 | "The '%s' remediation script does not contain the " |
||
195 | "platform identifier!" % (remediation.file_path)) |
||
196 | |||
197 | for platform in platforms.split(","): |
||
198 | if platform.strip() != platform: |
||
199 | msg = ( |
||
200 | "Comma-separated '{platform}' platforms " |
||
201 | "in '{remediation_file}' contains whitespace." |
||
202 | .format(platform=platforms, remediation_file=remediation.file_path)) |
||
203 | raise ValueError(msg) |
||
204 | |||
205 | product = env_yaml["product"] |
||
206 | if utils.is_applicable_for_product(platforms, product): |
||
207 | return result |
||
208 | |||
209 | return None |
||
210 | |||
211 | |||
212 | class BashRemediation(Remediation): |
||
213 | def __init__(self, file_path): |
||
214 | super(BashRemediation, self).__init__(file_path, "bash") |
||
215 | |||
216 | def parse_from_file_with_jinja(self, env_yaml, cpe_platforms): |
||
217 | self.local_env_yaml.update(env_yaml) |
||
218 | result = super(BashRemediation, self).parse_from_file_with_jinja( |
||
219 | self.local_env_yaml, cpe_platforms) |
||
220 | |||
221 | # Avoid platform wrapping empty fix text |
||
222 | # Remediations can be empty when a Jinja macro or conditional |
||
223 | # renders no fix text for a product |
||
224 | stripped_fix_text = result.contents.strip() |
||
225 | if stripped_fix_text == "": |
||
226 | return result |
||
227 | |||
228 | inherited_conditionals = sorted(super( |
||
229 | BashRemediation, self).get_inherited_conditionals("bash", cpe_platforms)) |
||
230 | rule_specific_conditionals = sorted(super( |
||
231 | BashRemediation, self).get_rule_specific_conditionals("bash", cpe_platforms)) |
||
232 | if inherited_conditionals or rule_specific_conditionals: |
||
233 | wrapped_fix_text = ["# Remediation is applicable only in certain platforms"] |
||
234 | |||
235 | all_conditions = "" |
||
236 | if inherited_conditionals: |
||
237 | all_conditions += " && ".join(inherited_conditionals) |
||
238 | if rule_specific_conditionals: |
||
239 | if all_conditions: |
||
240 | all_conditions += " && { " + " || ".join(rule_specific_conditionals) + "; }" |
||
241 | else: |
||
242 | all_conditions = " || ".join(rule_specific_conditionals) |
||
243 | wrapped_fix_text.append("if {0}; then".format(all_conditions)) |
||
244 | wrapped_fix_text.append("") |
||
245 | # It is possible to indent the original body of the remediation with textwrap.indent(), |
||
246 | # however, it is not supported by python2, and there is a risk of breaking remediations |
||
247 | # For example, remediations with a here-doc block could be affected. |
||
248 | wrapped_fix_text.append("{0}".format(stripped_fix_text)) |
||
249 | wrapped_fix_text.append("") |
||
250 | wrapped_fix_text.append("else") |
||
251 | wrapped_fix_text.append( |
||
252 | " >&2 echo 'Remediation is not applicable, nothing was done'") |
||
253 | wrapped_fix_text.append("fi") |
||
254 | |||
255 | result = RemediationObject(contents="\n".join(wrapped_fix_text), config=result.config) |
||
256 | |||
257 | return result |
||
258 | |||
259 | |||
260 | class AnsibleRemediation(Remediation): |
||
261 | def __init__(self, file_path): |
||
262 | super(AnsibleRemediation, self).__init__( |
||
263 | file_path, "ansible") |
||
264 | |||
265 | self.body = None |
||
266 | |||
267 | def parse_from_file_with_jinja(self, env_yaml, cpe_platforms): |
||
268 | self.local_env_yaml.update(env_yaml) |
||
269 | result = super(AnsibleRemediation, self).parse_from_file_with_jinja( |
||
270 | self.local_env_yaml, cpe_platforms) |
||
271 | |||
272 | if not self.associated_rule: |
||
273 | return result |
||
274 | |||
275 | parsed = ssg.yaml.ordered_load(result.contents) |
||
276 | |||
277 | self.update(parsed, result.config, cpe_platforms) |
||
278 | |||
279 | updated_yaml_text = ssg.yaml.ordered_dump( |
||
280 | parsed, None, default_flow_style=False) |
||
281 | result = result._replace(contents=updated_yaml_text) |
||
282 | |||
283 | self.body = parsed |
||
284 | self.metadata = result.config |
||
285 | |||
286 | return result |
||
287 | |||
288 | def update_tags_from_config(self, to_update, config): |
||
289 | tags = to_update.get("tags", []) |
||
290 | if "strategy" in config: |
||
291 | tags.append("{0}_strategy".format(config["strategy"])) |
||
292 | if "complexity" in config: |
||
293 | tags.append("{0}_complexity".format(config["complexity"])) |
||
294 | if "disruption" in config: |
||
295 | tags.append("{0}_disruption".format(config["disruption"])) |
||
296 | if "reboot" in config: |
||
297 | if config["reboot"] == "true": |
||
298 | reboot_tag = "reboot_required" |
||
299 | else: |
||
300 | reboot_tag = "no_reboot_needed" |
||
301 | tags.append(reboot_tag) |
||
302 | to_update["tags"] = sorted(tags) |
||
303 | |||
304 | def update_tags_from_rule(self, to_update): |
||
305 | if not self.associated_rule: |
||
306 | raise RuntimeError("The Ansible snippet has no rule loaded.") |
||
307 | |||
308 | tags = to_update.get("tags", []) |
||
309 | tags.insert(0, "{0}_severity".format(self.associated_rule.severity)) |
||
310 | tags.insert(0, self.associated_rule.id_) |
||
311 | |||
312 | cce_num = self._get_cce() |
||
313 | if cce_num: |
||
314 | tags.append("{0}".format(cce_num)) |
||
315 | |||
316 | refs = self.get_references() |
||
317 | tags.extend(refs) |
||
318 | to_update["tags"] = sorted(tags) |
||
319 | |||
320 | def _get_cce(self): |
||
321 | return self.associated_rule.identifiers.get("cce", None) |
||
322 | |||
323 | def get_references(self): |
||
324 | if not self.associated_rule: |
||
325 | raise RuntimeError("The Ansible snippet has no rule loaded.") |
||
326 | |||
327 | result = [] |
||
328 | for ref_class, prefix in constants.REF_PREFIX_MAP.items(): |
||
329 | refs = self._get_rule_reference(ref_class) |
||
330 | result.extend(["{prefix}-{value}".format(prefix=prefix, value=v) for v in refs]) |
||
331 | return result |
||
332 | |||
333 | def _get_rule_reference(self, ref_class): |
||
334 | refs = self.associated_rule.references.get(ref_class, "") |
||
335 | if refs: |
||
336 | return refs.split(",") |
||
337 | else: |
||
338 | return [] |
||
339 | |||
340 | def inject_package_facts_task(self, parsed_snippet): |
||
341 | """ Injects a package_facts task only if |
||
342 | the snippet has a task with a when clause with ansible_facts.packages, |
||
343 | and the snippet doesn't already have a package_facts task |
||
344 | """ |
||
345 | has_package_facts_task = False |
||
346 | has_ansible_facts_packages_clause = False |
||
347 | |||
348 | for p_task in parsed_snippet: |
||
349 | # We are only interested in the OrderedDicts, which represent Ansible tasks |
||
350 | if not isinstance(p_task, dict): |
||
351 | continue |
||
352 | |||
353 | if "package_facts" in p_task: |
||
354 | has_package_facts_task = True |
||
355 | |||
356 | # When clause of the task can be string or a list, lets normalize to list |
||
357 | task_when = p_task.get("when", "") |
||
358 | if type(task_when) is str: |
||
359 | task_when = [task_when] |
||
360 | for when in task_when: |
||
361 | if "ansible_facts.packages" in when: |
||
362 | has_ansible_facts_packages_clause = True |
||
363 | |||
364 | if has_ansible_facts_packages_clause and not has_package_facts_task: |
||
365 | facts_task = OrderedDict([ |
||
366 | ('name', 'Gather the package facts'), |
||
367 | ('package_facts', {'manager': 'auto'}) |
||
368 | ]) |
||
369 | parsed_snippet.insert(0, facts_task) |
||
370 | |||
371 | def update_when_from_rule(self, to_update, cpe_platforms): |
||
372 | additional_when = [] |
||
373 | inherited_conditionals = sorted(super( |
||
374 | AnsibleRemediation, self).get_inherited_conditionals("ansible", cpe_platforms)) |
||
375 | rule_specific_conditionals = sorted(super( |
||
376 | AnsibleRemediation, self).get_rule_specific_conditionals("ansible", cpe_platforms)) |
||
377 | # Remove conditionals related to package CPEs if the updated task collects package facts |
||
378 | if "package_facts" in to_update: |
||
379 | inherited_conditionals = filter( |
||
380 | lambda c: "in ansible_facts.packages" not in c, |
||
381 | inherited_conditionals) |
||
382 | rule_specific_conditionals = filter( |
||
383 | lambda c: "in ansible_facts.packages" not in c, rule_specific_conditionals) |
||
384 | |||
385 | if inherited_conditionals: |
||
386 | additional_when.extend(inherited_conditionals) |
||
387 | |||
388 | if rule_specific_conditionals: |
||
389 | additional_when.append(" or ".join(rule_specific_conditionals)) |
||
390 | |||
391 | to_update.setdefault("when", "") |
||
392 | new_when = ssg.yaml.update_yaml_list_or_string(to_update["when"], additional_when, |
||
393 | prepend=True) |
||
394 | if not new_when: |
||
395 | to_update.pop("when") |
||
396 | else: |
||
397 | to_update["when"] = new_when |
||
398 | |||
399 | def update(self, parsed, config, cpe_platforms): |
||
400 | # We split the remediation update in three steps |
||
401 | |||
402 | # 1. Update the when clause |
||
403 | for p in parsed: |
||
404 | if not isinstance(p, dict): |
||
405 | continue |
||
406 | self.update_when_from_rule(p, cpe_platforms) |
||
407 | |||
408 | # 2. Inject any extra task necessary |
||
409 | self.inject_package_facts_task(parsed) |
||
410 | |||
411 | # 3. Add tags to all tasks, including the ones we have injected |
||
412 | for p in parsed: |
||
413 | if not isinstance(p, dict): |
||
414 | continue |
||
415 | self.update_tags_from_config(p, config) |
||
416 | self.update_tags_from_rule(p) |
||
417 | |||
418 | @classmethod |
||
419 | def from_snippet_and_rule(cls, snippet_fname, rule_fname): |
||
420 | if os.path.isfile(snippet_fname) and os.path.isfile(rule_fname): |
||
421 | result = cls(snippet_fname) |
||
422 | try: |
||
423 | rule_obj = ssg.build_yaml.Rule.from_yaml(rule_fname) |
||
424 | result.associate_rule(rule_obj) |
||
425 | except ssg.yaml.DocumentationNotComplete: |
||
426 | # Happens on non-debug build when a rule is "documentation-incomplete" |
||
427 | return None |
||
428 | return result |
||
429 | |||
430 | |||
431 | class AnacondaRemediation(Remediation): |
||
432 | def __init__(self, file_path): |
||
433 | super(AnacondaRemediation, self).__init__( |
||
434 | file_path, "anaconda") |
||
435 | |||
436 | |||
437 | class PuppetRemediation(Remediation): |
||
438 | def __init__(self, file_path): |
||
439 | super(PuppetRemediation, self).__init__( |
||
440 | file_path, "puppet") |
||
441 | |||
442 | |||
443 | class IgnitionRemediation(Remediation): |
||
444 | def __init__(self, file_path): |
||
445 | super(IgnitionRemediation, self).__init__( |
||
446 | file_path, "ignition") |
||
447 | |||
448 | |||
449 | class KubernetesRemediation(Remediation): |
||
450 | def __init__(self, file_path): |
||
451 | super(KubernetesRemediation, self).__init__( |
||
452 | file_path, "kubernetes") |
||
453 | |||
454 | |||
455 | class BlueprintRemediation(Remediation): |
||
456 | """ |
||
457 | This provides class for OSBuild Blueprint remediations |
||
458 | """ |
||
459 | def __init__(self, file_path): |
||
460 | super(BlueprintRemediation, self).__init__( |
||
461 | file_path, "blueprint") |
||
462 | |||
463 | |||
464 | REMEDIATION_TO_CLASS = { |
||
465 | 'anaconda': AnacondaRemediation, |
||
466 | 'ansible': AnsibleRemediation, |
||
467 | 'bash': BashRemediation, |
||
468 | 'puppet': PuppetRemediation, |
||
469 | 'ignition': IgnitionRemediation, |
||
470 | 'kubernetes': KubernetesRemediation, |
||
471 | 'blueprint': BlueprintRemediation, |
||
472 | } |
||
473 | |||
474 | |||
475 | def write_fix_to_file(fix, file_path): |
||
476 | """ |
||
477 | Writes a single fix to the given file path. |
||
478 | """ |
||
479 | fix_contents, config = fix |
||
480 | with open(file_path, "w") as f: |
||
481 | for k, v in config.items(): |
||
482 | f.write("# %s = %s\n" % (k, v)) |
||
483 | f.write(fix_contents) |
||
484 | |||
485 | |||
486 | def get_rule_dir_remediations(dir_path, remediation_type, product=None): |
||
487 | """ |
||
488 | Gets a list of remediations of type remediation_type contained in a |
||
489 | rule directory. If product is None, returns all such remediations. |
||
490 | If product is not None, returns applicable remediations in order of |
||
491 | priority: |
||
492 | |||
493 | {{{ product }}}.ext -> shared.ext |
||
494 | |||
495 | Only returns remediations which exist. |
||
496 | """ |
||
497 | |||
498 | if not rules.is_rule_dir(dir_path): |
||
499 | return [] |
||
500 | |||
501 | remediations_dir = os.path.join(dir_path, remediation_type) |
||
502 | has_remediations_dir = os.path.isdir(remediations_dir) |
||
503 | ext = REMEDIATION_TO_EXT_MAP[remediation_type] |
||
504 | if not has_remediations_dir: |
||
505 | return [] |
||
506 | |||
507 | # Two categories of results: those for a product and those that are |
||
508 | # shared to multiple products. Within common results, there's two types: |
||
509 | # those shared to multiple versions of the same type (added up front) and |
||
510 | # those shared across multiple product types (e.g., RHEL and Ubuntu). |
||
511 | product_results = [] |
||
512 | common_results = [] |
||
513 | for remediation_file in sorted(os.listdir(remediations_dir)): |
||
514 | file_name, file_ext = os.path.splitext(remediation_file) |
||
515 | remediation_path = os.path.join(remediations_dir, remediation_file) |
||
516 | |||
517 | if file_ext == ext and rules.applies_to_product(file_name, product): |
||
518 | # rules.applies_to_product ensures we only have three entries: |
||
519 | # 1. shared |
||
520 | # 2. <product> |
||
521 | # 3. <product><version> |
||
522 | # |
||
523 | # Note that the product variable holds <product><version>. |
||
524 | if file_name == 'shared': |
||
525 | # Shared are the lowest priority items, add them to the end |
||
526 | # of the common results. |
||
527 | common_results.append(remediation_path) |
||
528 | elif file_name != product: |
||
529 | # Here, the filename is a subset of the product, but isn't |
||
530 | # the full product. Product here is both the product name |
||
531 | # (e.g., ubuntu) and its version (2004). Filename could be |
||
532 | # either "ubuntu" or "ubuntu2004" so we want this branch |
||
533 | # to trigger when it is the former, not the latter. It is |
||
534 | # the highest priority of common results, so insert it |
||
535 | # before any shared ones. |
||
536 | common_results.insert(0, remediation_path) |
||
537 | else: |
||
538 | # Finally, this must be product-specific result. |
||
539 | product_results.append(remediation_path) |
||
540 | |||
541 | # Combine the two sets in priority order. |
||
542 | return product_results + common_results |
||
543 | |||
544 | |||
545 | def expand_xccdf_subs(fix, remediation_type): |
||
546 | """Expand the respective populate keywords of each |
||
547 | remediation type with an <xccdf:sub> element |
||
548 | |||
549 | This routine translates any instance of the '`type`-populate' keyword in |
||
550 | the form of: |
||
551 | |||
552 | (`type`-populate variable_name) |
||
553 | |||
554 | where `type` can be either ansible, puppet, anaconda or bash, into |
||
555 | |||
556 | <sub idref="variable_name"/> |
||
557 | |||
558 | """ |
||
559 | |||
560 | if fix is not None: |
||
561 | fix_text = fix.text |
||
562 | else: |
||
563 | return |
||
564 | if remediation_type == "ignition": |
||
565 | return |
||
566 | elif remediation_type == "kubernetes": |
||
567 | return |
||
568 | elif remediation_type == "blueprint": |
||
569 | pattern = r'\(blueprint-populate\s*(\S+)\)' |
||
570 | elif remediation_type == "ansible": |
||
571 | |||
572 | if "(ansible-populate " in fix_text: |
||
573 | raise RuntimeError( |
||
574 | "(ansible-populate VAR) has been deprecated. Please use " |
||
575 | "(xccdf-var VAR) instead. Keep in mind that the latter will " |
||
576 | "make an ansible variable out of XCCDF Value as opposed to " |
||
577 | "substituting directly." |
||
578 | ) |
||
579 | |||
580 | # If you change this string make sure it still matches the pattern |
||
581 | # defined in OpenSCAP. Otherwise you break variable handling in |
||
582 | # 'oscap xccdf generate fix' and the variables won't be customizable! |
||
583 | # https://github.com/OpenSCAP/openscap/blob/1.2.17/src/XCCDF_POLICY/xccdf_policy_remediate.c#L588 |
||
584 | # const char *pattern = |
||
585 | # "- name: XCCDF Value [^ ]+ # promote to variable\n set_fact:\n" |
||
586 | # " ([^:]+): (.+)\n tags:\n - always\n"; |
||
587 | # We use !!str typecast to prevent treating values as different types |
||
588 | # eg. yes as a bool or 077 as an octal number |
||
589 | fix_text = re.sub( |
||
590 | r"- \(xccdf-var\s+(\S+)\)", |
||
591 | r"- name: XCCDF Value \1 # promote to variable\n" |
||
592 | r" set_fact:\n" |
||
593 | r" \1: !!str (ansible-populate \1)\n" |
||
594 | r" tags:\n" |
||
595 | r" - always", |
||
596 | fix_text |
||
597 | ) |
||
598 | |||
599 | pattern = r'\(ansible-populate\s*(\S+)\)' |
||
600 | |||
601 | elif remediation_type == "puppet": |
||
602 | pattern = r'\(puppet-populate\s*(\S+)\)' |
||
603 | |||
604 | elif remediation_type == "anaconda": |
||
605 | pattern = r'\(anaconda-populate\s*(\S+)\)' |
||
606 | |||
607 | elif remediation_type == "bash": |
||
608 | pattern = r'\(bash-populate\s*(\S+)\)' |
||
609 | |||
610 | else: |
||
611 | sys.stderr.write("Unknown remediation type '%s'\n" % (remediation_type)) |
||
612 | sys.exit(1) |
||
613 | |||
614 | # we will get list what looks like |
||
615 | # [text, varname, text, varname, ..., text] |
||
616 | parts = re.split(pattern, fix_text) |
||
0 ignored issues
–
show
introduced
by
![]() |
|||
617 | |||
618 | fix.text = parts[0] # add first "text" |
||
619 | for index in range(1, len(parts), 2): |
||
620 | varname = parts[index] |
||
621 | text_between_vars = parts[index + 1] |
||
622 | |||
623 | # we cannot combine elements and text easily |
||
624 | # so text is in ".tail" of element |
||
625 | xccdfvarsub = ElementTree.SubElement( |
||
626 | fix, "{%s}sub" % XCCDF12_NS, idref=constants.OSCAP_VALUE + varname) |
||
627 | xccdfvarsub.tail = text_between_vars |
||
628 | xccdfvarsub.set("use", "legacy") |
||
629 | |||
630 | |||
631 | def load_compiled_remediations(fixes_dir): |
||
632 | if not os.path.isdir(fixes_dir): |
||
633 | raise RuntimeError( |
||
634 | "Directory with compiled fixes '%s' does not exist" % fixes_dir) |
||
635 | all_remediations = defaultdict(dict) |
||
636 | for language in os.listdir(fixes_dir): |
||
637 | language_dir = os.path.join(fixes_dir, language) |
||
638 | if not os.path.isdir(language_dir): |
||
639 | raise RuntimeError( |
||
640 | "Can't find the '%s' directory with fixes for %s" % |
||
641 | (language_dir, language)) |
||
642 | for filename in sorted(os.listdir(language_dir)): |
||
643 | file_path = os.path.join(language_dir, filename) |
||
644 | rule_id, _ = os.path.splitext(filename) |
||
645 | remediation = parse_from_file_without_jinja(file_path) |
||
646 | all_remediations[rule_id][language] = remediation |
||
647 | return all_remediations |
||
648 |