| Total Complexity | 116 |
| Total Lines | 756 |
| Duplicated Lines | 6.75 % |
| Coverage | 50% |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like ssg.build_remediations often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | 2 | from __future__ import absolute_import |
|
| 2 | 2 | from __future__ import print_function |
|
| 3 | |||
| 4 | 2 | import sys |
|
| 5 | 2 | import os |
|
| 6 | 2 | import os.path |
|
| 7 | 2 | import re |
|
| 8 | 2 | import codecs |
|
| 9 | 2 | from collections import defaultdict, namedtuple |
|
| 10 | |||
| 11 | |||
| 12 | 2 | import ssg.yaml |
|
| 13 | 2 | from . import build_yaml |
|
| 14 | 2 | from . import rules |
|
| 15 | 2 | from . import utils |
|
| 16 | 2 | from .jinja import process_file as jinja_process_file |
|
| 17 | 2 | from .xml import ElementTree |
|
| 18 | |||
| 19 | 2 | REMEDIATION_TO_EXT_MAP = { |
|
| 20 | 'anaconda': '.anaconda', |
||
| 21 | 'ansible': '.yml', |
||
| 22 | 'bash': '.sh', |
||
| 23 | 'puppet': '.pp' |
||
| 24 | } |
||
| 25 | |||
| 26 | 2 | FILE_GENERATED_HASH_COMMENT = '# THIS FILE IS GENERATED' |
|
| 27 | |||
| 28 | 2 | REMEDIATION_CONFIG_KEYS = ['complexity', 'disruption', 'platform', 'reboot', |
|
| 29 | 'strategy'] |
||
| 30 | 2 | REMEDIATION_ELM_KEYS = ['complexity', 'disruption', 'reboot', 'strategy'] |
|
| 31 | |||
| 32 | |||
| 33 | 2 | def get_available_functions(build_dir): |
|
| 34 | """Parse the content of "$CMAKE_BINARY_DIR/bash-remediation-functions.xml" |
||
| 35 | XML file to obtain the list of currently known SCAP Security Guide internal |
||
| 36 | remediation functions""" |
||
| 37 | |||
| 38 | # If location of /shared directory is known |
||
| 39 | if build_dir is None or not os.path.isdir(build_dir): |
||
| 40 | sys.stderr.write("Expected '%s' to be the build directory. It doesn't " |
||
| 41 | "exist or is not a directory." % (build_dir)) |
||
| 42 | sys.exit(1) |
||
| 43 | |||
| 44 | # Construct the final path of XML file with remediation functions |
||
| 45 | xmlfilepath = \ |
||
| 46 | os.path.join(build_dir, "bash-remediation-functions.xml") |
||
| 47 | |||
| 48 | if not os.path.isfile(xmlfilepath): |
||
| 49 | sys.stderr.write("Expected '%s' to contain the remediation functions. " |
||
| 50 | "The file was not found!\n" % (xmlfilepath)) |
||
| 51 | sys.exit(1) |
||
| 52 | |||
| 53 | remediation_functions = [] |
||
| 54 | with codecs.open(xmlfilepath, "r", encoding="utf-8") as xmlfile: |
||
| 55 | filestring = xmlfile.read() |
||
| 56 | # This regex looks implementation dependent but we can rely on |
||
| 57 | # ElementTree sorting XML attrs alphabetically. Hidden is guaranteed |
||
| 58 | # to be the first attr and ID is guaranteed to be second. |
||
| 59 | remediation_functions = re.findall( |
||
| 60 | r'<Value hidden=\"true\" id=\"function_(\S+)\"', |
||
| 61 | filestring, re.DOTALL |
||
| 62 | ) |
||
| 63 | |||
| 64 | return remediation_functions |
||
| 65 | |||
| 66 | |||
| 67 | 2 | def get_fixgroup_for_type(fixcontent, remediation_type): |
|
| 68 | """ |
||
| 69 | For a given remediation type, return a new subelement of that type. |
||
| 70 | |||
| 71 | Exits if passed an unknown remediation type. |
||
| 72 | """ |
||
| 73 | if remediation_type == 'anaconda': |
||
| 74 | return ElementTree.SubElement( |
||
| 75 | fixcontent, "fix-group", id="anaconda", |
||
| 76 | system="urn:redhat:anaconda:pre", |
||
| 77 | xmlns="http://checklists.nist.gov/xccdf/1.1") |
||
| 78 | |||
| 79 | elif remediation_type == 'ansible': |
||
| 80 | return ElementTree.SubElement( |
||
| 81 | fixcontent, "fix-group", id="ansible", |
||
| 82 | system="urn:xccdf:fix:script:ansible", |
||
| 83 | xmlns="http://checklists.nist.gov/xccdf/1.1") |
||
| 84 | |||
| 85 | elif remediation_type == 'bash': |
||
| 86 | return ElementTree.SubElement( |
||
| 87 | fixcontent, "fix-group", id="bash", |
||
| 88 | system="urn:xccdf:fix:script:sh", |
||
| 89 | xmlns="http://checklists.nist.gov/xccdf/1.1") |
||
| 90 | |||
| 91 | elif remediation_type == 'puppet': |
||
| 92 | return ElementTree.SubElement( |
||
| 93 | fixcontent, "fix-group", id="puppet", |
||
| 94 | system="urn:xccdf:fix:script:puppet", |
||
| 95 | xmlns="http://checklists.nist.gov/xccdf/1.1") |
||
| 96 | |||
| 97 | sys.stderr.write("ERROR: Unknown remediation type '%s'!\n" |
||
| 98 | % (remediation_type)) |
||
| 99 | sys.exit(1) |
||
| 100 | |||
| 101 | |||
| 102 | 2 | def is_supported_filename(remediation_type, filename): |
|
| 103 | """ |
||
| 104 | Checks if filename has a supported extension for remediation_type. |
||
| 105 | |||
| 106 | Exits when remediation_type is of an unknown type. |
||
| 107 | """ |
||
| 108 | 2 | if remediation_type in REMEDIATION_TO_EXT_MAP: |
|
| 109 | 2 | return filename.endswith(REMEDIATION_TO_EXT_MAP[remediation_type]) |
|
| 110 | |||
| 111 | sys.stderr.write("ERROR: Unknown remediation type '%s'!\n" |
||
| 112 | % (remediation_type)) |
||
| 113 | sys.exit(1) |
||
| 114 | |||
| 115 | |||
| 116 | 2 | def get_populate_replacement(remediation_type, text): |
|
| 117 | """ |
||
| 118 | Return varname, fixtextcontribution |
||
| 119 | """ |
||
| 120 | |||
| 121 | if remediation_type == 'bash': |
||
| 122 | # Extract variable name |
||
| 123 | varname = re.search(r'\npopulate (\S+)\n', |
||
| 124 | text, re.DOTALL).group(1) |
||
| 125 | # Define fix text part to contribute to main fix text |
||
| 126 | fixtextcontribution = '\n%s="' % varname |
||
| 127 | return (varname, fixtextcontribution) |
||
| 128 | |||
| 129 | sys.stderr.write("ERROR: Unknown remediation type '%s'!\n" |
||
| 130 | % (remediation_type)) |
||
| 131 | sys.exit(1) |
||
| 132 | |||
| 133 | |||
| 134 | 2 | def split_remediation_content_and_metadata(fix_file): |
|
| 135 | 2 | remediation_contents = [] |
|
| 136 | 2 | config = defaultdict(lambda: None) |
|
| 137 | |||
| 138 | # Assignment automatically escapes shell characters for XML |
||
| 139 | 2 | for line in fix_file.splitlines(): |
|
| 140 | 2 | if line.startswith(FILE_GENERATED_HASH_COMMENT): |
|
| 141 | continue |
||
| 142 | |||
| 143 | 2 | if line.startswith('#') and line.count('=') == 1: |
|
| 144 | 2 | (key, value) = line.strip('#').split('=') |
|
| 145 | 2 | if key.strip() in REMEDIATION_CONFIG_KEYS: |
|
| 146 | 2 | config[key.strip()] = value.strip() |
|
| 147 | 2 | continue |
|
| 148 | |||
| 149 | # If our parsed line wasn't a config item, add it to the |
||
| 150 | # returned file contents. This includes when the line |
||
| 151 | # begins with a '#' and contains an equals sign, but |
||
| 152 | # the "key" isn't one of the known keys from |
||
| 153 | # REMEDIATION_CONFIG_KEYS. |
||
| 154 | 2 | remediation_contents.append(line) |
|
| 155 | |||
| 156 | 2 | contents = "\n".join(remediation_contents) |
|
| 157 | 2 | remediation = namedtuple('remediation', ['contents', 'config']) |
|
| 158 | 2 | return remediation(contents=contents, config=config) |
|
| 159 | |||
| 160 | |||
| 161 | 2 | def parse_from_file_with_jinja(file_path, env_yaml): |
|
| 162 | """ |
||
| 163 | Parses a remediation from a file. As remediations contain jinja macros, |
||
| 164 | we need a env_yaml context to process these. In practice, no remediations |
||
| 165 | use jinja in the configuration, so for extracting only the configuration, |
||
| 166 | env_yaml can be an abritrary product.yml dictionary. |
||
| 167 | |||
| 168 | If the logic of configuration parsing changes significantly, please also |
||
| 169 | update ssg.fixes.parse_platform(...). |
||
| 170 | """ |
||
| 171 | |||
| 172 | 2 | fix_file = jinja_process_file(file_path, env_yaml) |
|
| 173 | 2 | return split_remediation_content_and_metadata(fix_file) |
|
| 174 | |||
| 175 | |||
| 176 | 2 | def parse_from_file_without_jinja(file_path): |
|
| 177 | """ |
||
| 178 | Parses a remediation from a file. Doesn't process the Jinja macros. |
||
| 179 | This function is useful in build phases in which all the Jinja macros |
||
| 180 | are already resolved. |
||
| 181 | """ |
||
| 182 | with open(file_path, "r") as f: |
||
| 183 | f_str = f.read() |
||
| 184 | return split_remediation_content_and_metadata(f_str) |
||
| 185 | |||
| 186 | |||
| 187 | 2 | class Remediation(object): |
|
| 188 | 2 | def __init__(self, file_path, remediation_type): |
|
| 189 | 2 | self.file_path = file_path |
|
| 190 | 2 | self.local_env_yaml = dict() |
|
| 191 | |||
| 192 | 2 | self.metadata = defaultdict(lambda: None) |
|
| 193 | |||
| 194 | 2 | self.remediation_type = remediation_type |
|
| 195 | 2 | self.associated_rule = None |
|
| 196 | |||
| 197 | 2 | def load_associated_rule(self, resolved_rules_dir, rule_id): |
|
| 198 | rule_path = os.path.join( |
||
| 199 | resolved_rules_dir, rule_id + ".yml") |
||
| 200 | return self.load_rule_from(rule_path) |
||
| 201 | |||
| 202 | 2 | def load_rule_from(self, rule_path): |
|
| 203 | 2 | if not os.path.isfile(rule_path): |
|
| 204 | msg = ("{lang} remediation snippet can't load the " |
||
| 205 | "respective rule YML at {rule_fname}" |
||
| 206 | .format(rule_fname=rule_path, |
||
| 207 | lang=self.remediation_type)) |
||
| 208 | print(msg, file=sys.stderr) |
||
| 209 | else: |
||
| 210 | 2 | self.associated_rule = build_yaml.Rule.from_yaml(rule_path) |
|
| 211 | 2 | self.expand_env_yaml_from_rule() |
|
| 212 | |||
| 213 | 2 | def expand_env_yaml_from_rule(self): |
|
| 214 | 2 | if not self.associated_rule: |
|
| 215 | return |
||
| 216 | |||
| 217 | 2 | self.local_env_yaml["rule_title"] = self.associated_rule.title |
|
| 218 | 2 | self.local_env_yaml["rule_id"] = self.associated_rule.id_ |
|
| 219 | |||
| 220 | 2 | def parse_from_file_with_jinja(self, env_yaml): |
|
| 221 | 2 | return parse_from_file_with_jinja(self.file_path, env_yaml) |
|
| 222 | |||
| 223 | |||
| 224 | 2 | def process(remediation, env_yaml, fixes, rule_id): |
|
| 225 | """ |
||
| 226 | Process a fix, adding it to fixes iff the file is of a valid extension |
||
| 227 | for the remediation type and the fix is valid for the current product. |
||
| 228 | |||
| 229 | Note that platform is a required field in the contents of the fix. |
||
| 230 | """ |
||
| 231 | 2 | if not is_supported_filename(remediation.remediation_type, remediation.file_path): |
|
| 232 | return |
||
| 233 | |||
| 234 | 2 | result = remediation.parse_from_file_with_jinja(env_yaml) |
|
| 235 | |||
| 236 | 2 | if not result.config['platform']: |
|
| 237 | raise RuntimeError( |
||
| 238 | "The '%s' remediation script does not contain the " |
||
| 239 | "platform identifier!" % (remediation.file_path)) |
||
| 240 | |||
| 241 | 2 | product = env_yaml["product"] |
|
| 242 | 2 | if utils.is_applicable_for_product(result.config['platform'], product): |
|
| 243 | 2 | fixes[rule_id] = result |
|
| 244 | |||
| 245 | 2 | return result |
|
| 246 | |||
| 247 | |||
| 248 | 2 | class BashRemediation(Remediation): |
|
| 249 | 2 | def __init__(self, file_path): |
|
| 250 | 2 | super(BashRemediation, self).__init__(file_path, "bash") |
|
| 251 | |||
| 252 | 2 | def load_associated_rule(self, resolved_rules_dir, rule_id): |
|
| 253 | # No point in loading rule for this remediation type as of now |
||
| 254 | pass |
||
| 255 | |||
| 256 | |||
| 257 | 2 | class AnsibleRemediation(Remediation): |
|
| 258 | 2 | def __init__(self, file_path): |
|
| 259 | 2 | super(AnsibleRemediation, self).__init__( |
|
| 260 | file_path, "ansible") |
||
| 261 | |||
| 262 | 2 | self.body = None |
|
| 263 | |||
| 264 | 2 | def parse_from_file_with_jinja(self, env_yaml): |
|
| 265 | 2 | self.local_env_yaml.update(env_yaml) |
|
| 266 | 2 | result = super(AnsibleRemediation, self).parse_from_file_with_jinja(self.local_env_yaml) |
|
| 267 | |||
| 268 | 2 | if not self.associated_rule: |
|
| 269 | return result |
||
| 270 | |||
| 271 | 2 | parsed = ssg.yaml.ordered_load(result.contents) |
|
| 272 | |||
| 273 | 2 | current_product = env_yaml.get("product") |
|
| 274 | 2 | if current_product: |
|
| 275 | 2 | self.update(parsed, result.config, current_product) |
|
| 276 | |||
| 277 | 2 | updated_yaml_text = ssg.yaml.ordered_dump( |
|
| 278 | parsed, None, default_flow_style=False) |
||
| 279 | 2 | result = result._replace(contents=updated_yaml_text) |
|
| 280 | |||
| 281 | 2 | self.body = parsed |
|
| 282 | 2 | self.metadata = result.config |
|
| 283 | |||
| 284 | 2 | return result |
|
| 285 | |||
| 286 | 2 | View Code Duplication | def update_tags_from_config(self, to_update, config): |
|
|
|||
| 287 | 2 | tags = to_update.get("tags", []) |
|
| 288 | 2 | if "strategy" in config: |
|
| 289 | 2 | tags.append("{0}_strategy".format(config["strategy"])) |
|
| 290 | 2 | if "complexity" in config: |
|
| 291 | 2 | tags.append("{0}_complexity".format(config["complexity"])) |
|
| 292 | 2 | if "disruption" in config: |
|
| 293 | 2 | tags.append("{0}_disruption".format(config["disruption"])) |
|
| 294 | 2 | if "reboot" in config: |
|
| 295 | 2 | if config["reboot"] == "true": |
|
| 296 | reboot_tag = "reboot_required" |
||
| 297 | else: |
||
| 298 | 2 | reboot_tag = "no_reboot_needed" |
|
| 299 | 2 | tags.append(reboot_tag) |
|
| 300 | 2 | to_update["tags"] = tags |
|
| 301 | |||
| 302 | 2 | def update_tags_from_rule(self, product, to_update): |
|
| 303 | 2 | if not self.associated_rule: |
|
| 304 | raise RuntimeError("The Ansible snippet has no rule loaded.") |
||
| 305 | |||
| 306 | 2 | tags = to_update.get("tags", []) |
|
| 307 | 2 | tags.insert(0, "{0}_severity".format(self.associated_rule.severity)) |
|
| 308 | 2 | tags.insert(0, self.associated_rule.id_) |
|
| 309 | |||
| 310 | 2 | cce_num = self._get_cce(product) |
|
| 311 | 2 | if cce_num: |
|
| 312 | 2 | tags.append("CCE-{0}".format(cce_num)) |
|
| 313 | |||
| 314 | 2 | refs = self.get_references(product) |
|
| 315 | 2 | tags.extend(refs) |
|
| 316 | 2 | to_update["tags"] = tags |
|
| 317 | |||
| 318 | 2 | def _get_cce(self, product): |
|
| 319 | 2 | our_cce = None |
|
| 320 | 2 | for cce, val in self.associated_rule.identifiers.items(): |
|
| 321 | 2 | if cce.endswith(product): |
|
| 322 | 2 | our_cce = val |
|
| 323 | 2 | break |
|
| 324 | 2 | return our_cce |
|
| 325 | return self.associated_rule.identifiers.get("cce", None) |
||
| 326 | |||
| 327 | 2 | View Code Duplication | def get_references(self, product): |
| 328 | 2 | if not self.associated_rule: |
|
| 329 | raise RuntimeError("The Ansible snippet has no rule loaded.") |
||
| 330 | # see xccdf-addremediations.xslt <- shared_constants.xslt <- shared_shorthand2xccdf.xslt |
||
| 331 | # if you want to know how the map was constructed |
||
| 332 | 2 | platform_id_map = { |
|
| 333 | "rhel7": "DISA-STIG-RHEL-07", |
||
| 334 | "rhel8": "DISA-STIG-RHEL-08", |
||
| 335 | } |
||
| 336 | # RHEL6 is a special case, in our content, |
||
| 337 | # we have only stig IDs for RHEL6 that include the literal 'RHEL-06' |
||
| 338 | 2 | stig_platform_id = platform_id_map.get(product, "DISA-STIG") |
|
| 339 | |||
| 340 | 2 | ref_prefix_map = { |
|
| 341 | "nist": "NIST-800-53", |
||
| 342 | "cui": "NIST-800-171", |
||
| 343 | "pcidss": "PCI-DSS", |
||
| 344 | "cjis": "CJIS", |
||
| 345 | "stigid@{product}".format(product=product): stig_platform_id, |
||
| 346 | } |
||
| 347 | 2 | result = [] |
|
| 348 | 2 | for ref_class, prefix in ref_prefix_map.items(): |
|
| 349 | 2 | refs = self._get_rule_reference(ref_class) |
|
| 350 | 2 | result.extend(["{prefix}-{value}".format(prefix=prefix, value=v) for v in refs]) |
|
| 351 | 2 | return result |
|
| 352 | |||
| 353 | 2 | def _get_rule_reference(self, ref_class): |
|
| 354 | 2 | refs = self.associated_rule.references.get(ref_class, "") |
|
| 355 | 2 | if refs: |
|
| 356 | 2 | return refs.split(",") |
|
| 357 | else: |
||
| 358 | 2 | return [] |
|
| 359 | |||
| 360 | 2 | View Code Duplication | def update_when_from_rule(self, to_update): |
| 361 | 2 | additional_when = "" |
|
| 362 | 2 | if self.associated_rule.platform == "machine": |
|
| 363 | 2 | additional_when = ('ansible_virtualization_role != "guest" ' |
|
| 364 | 'or ansible_virtualization_type != "docker"') |
||
| 365 | 2 | to_update.setdefault("when", "") |
|
| 366 | 2 | new_when = ssg.yaml.update_yaml_list_or_string(to_update["when"], additional_when) |
|
| 367 | 2 | if not new_when: |
|
| 368 | to_update.pop("when") |
||
| 369 | else: |
||
| 370 | 2 | to_update["when"] = new_when |
|
| 371 | |||
| 372 | 2 | def update(self, parsed, config, product): |
|
| 373 | 2 | for p in parsed: |
|
| 374 | 2 | if not isinstance(p, dict): |
|
| 375 | continue |
||
| 376 | 2 | self.update_when_from_rule(p) |
|
| 377 | 2 | self.update_tags_from_config(p, config) |
|
| 378 | 2 | self.update_tags_from_rule(product, p) |
|
| 379 | |||
| 380 | 2 | @classmethod |
|
| 381 | def from_snippet_and_rule(cls, snippet_fname, rule_fname): |
||
| 382 | 2 | result = cls(snippet_fname) |
|
| 383 | 2 | result.load_rule_from(rule_fname) |
|
| 384 | 2 | return result |
|
| 385 | |||
| 386 | |||
| 387 | 2 | class AnacondaRemediation(Remediation): |
|
| 388 | 2 | def __init__(self, file_path): |
|
| 389 | super(AnacondaRemediation, self).__init__( |
||
| 390 | file_path, "anaconda") |
||
| 391 | |||
| 392 | 2 | def load_associated_rule(self, resolved_rules_dir): |
|
| 393 | # No point in loading rule for this remediation type as of now |
||
| 394 | pass |
||
| 395 | |||
| 396 | |||
| 397 | 2 | class PuppetRemediation(Remediation): |
|
| 398 | 2 | def __init__(self, file_path): |
|
| 399 | super(PuppetRemediation, self).__init__( |
||
| 400 | file_path, "puppet") |
||
| 401 | |||
| 402 | 2 | def load_associated_rule(self, resolved_rules_dir): |
|
| 403 | # No point in loading rule for this remediation type as of now |
||
| 404 | pass |
||
| 405 | |||
| 406 | |||
| 407 | 2 | REMEDIATION_TO_CLASS = { |
|
| 408 | 'anaconda': AnacondaRemediation, |
||
| 409 | 'ansible': AnsibleRemediation, |
||
| 410 | 'bash': BashRemediation, |
||
| 411 | 'puppet': PuppetRemediation, |
||
| 412 | } |
||
| 413 | |||
| 414 | |||
| 415 | 2 | def write_fixes_to_xml(remediation_type, build_dir, output_path, fixes): |
|
| 416 | """ |
||
| 417 | Builds a fix-content XML tree from the contents of fixes |
||
| 418 | and writes it to output_path. |
||
| 419 | """ |
||
| 420 | |||
| 421 | fixcontent = ElementTree.Element("fix-content", system="urn:xccdf:fix:script:sh", |
||
| 422 | xmlns="http://checklists.nist.gov/xccdf/1.1") |
||
| 423 | fixgroup = get_fixgroup_for_type(fixcontent, remediation_type) |
||
| 424 | |||
| 425 | remediation_functions = get_available_functions(build_dir) |
||
| 426 | |||
| 427 | for fix_name in fixes: |
||
| 428 | fix_contents, config = fixes[fix_name] |
||
| 429 | |||
| 430 | fix_elm = ElementTree.SubElement(fixgroup, "fix") |
||
| 431 | fix_elm.set("rule", fix_name) |
||
| 432 | |||
| 433 | for key in REMEDIATION_ELM_KEYS: |
||
| 434 | if config[key]: |
||
| 435 | fix_elm.set(key, config[key]) |
||
| 436 | |||
| 437 | fix_elm.text = fix_contents + "\n" |
||
| 438 | |||
| 439 | # Expand shell variables and remediation functions |
||
| 440 | # into corresponding XCCDF <sub> elements |
||
| 441 | expand_xccdf_subs(fix_elm, remediation_type, remediation_functions) |
||
| 442 | |||
| 443 | tree = ElementTree.ElementTree(fixcontent) |
||
| 444 | tree.write(output_path) |
||
| 445 | |||
| 446 | |||
| 447 | 2 | def write_fixes_to_dir(fixes, remediation_type, output_dir): |
|
| 448 | """ |
||
| 449 | Writes fixes as files to output_dir, each fix as a separate file |
||
| 450 | """ |
||
| 451 | try: |
||
| 452 | extension = REMEDIATION_TO_EXT_MAP[remediation_type] |
||
| 453 | except KeyError: |
||
| 454 | raise ValueError("Unknown remediation type %s." % remediation_type) |
||
| 455 | |||
| 456 | if not os.path.exists(output_dir): |
||
| 457 | os.makedirs(output_dir) |
||
| 458 | for fix_name, fix in fixes.items(): |
||
| 459 | fix_contents, config = fix |
||
| 460 | fix_path = os.path.join(output_dir, fix_name + extension) |
||
| 461 | with open(fix_path, "w") as f: |
||
| 462 | for k, v in config.items(): |
||
| 463 | f.write("# %s = %s\n" % (k, v)) |
||
| 464 | f.write(fix_contents) |
||
| 465 | |||
| 466 | |||
| 467 | 2 | def get_rule_dir_remediations(dir_path, remediation_type, product=None): |
|
| 468 | """ |
||
| 469 | Gets a list of remediations of type remediation_type contained in a |
||
| 470 | rule directory. If product is None, returns all such remediations. |
||
| 471 | If product is not None, returns applicable remediations in order of |
||
| 472 | priority: |
||
| 473 | |||
| 474 | {{{ product }}}.ext -> shared.ext |
||
| 475 | |||
| 476 | Only returns remediations which exist. |
||
| 477 | """ |
||
| 478 | |||
| 479 | 2 | if not rules.is_rule_dir(dir_path): |
|
| 480 | return [] |
||
| 481 | |||
| 482 | 2 | remediations_dir = os.path.join(dir_path, remediation_type) |
|
| 483 | 2 | has_remediations_dir = os.path.isdir(remediations_dir) |
|
| 484 | 2 | ext = REMEDIATION_TO_EXT_MAP[remediation_type] |
|
| 485 | 2 | if not has_remediations_dir: |
|
| 486 | return [] |
||
| 487 | |||
| 488 | 2 | results = [] |
|
| 489 | 2 | for remediation_file in os.listdir(remediations_dir): |
|
| 490 | 2 | file_name, file_ext = os.path.splitext(remediation_file) |
|
| 491 | 2 | remediation_path = os.path.join(remediations_dir, remediation_file) |
|
| 492 | |||
| 493 | 2 | if file_ext == ext and rules.applies_to_product(file_name, product): |
|
| 494 | 2 | if file_name == 'shared': |
|
| 495 | results.append(remediation_path) |
||
| 496 | else: |
||
| 497 | 2 | results.insert(0, remediation_path) |
|
| 498 | |||
| 499 | 2 | return results |
|
| 500 | |||
| 501 | |||
| 502 | 2 | def expand_xccdf_subs(fix, remediation_type, remediation_functions): |
|
| 503 | """For those remediation scripts utilizing some of the internal SCAP |
||
| 504 | Security Guide remediation functions expand the selected shell variables |
||
| 505 | and remediation functions calls with <xccdf:sub> element |
||
| 506 | |||
| 507 | This routine translates any instance of the 'populate' function call in |
||
| 508 | the form of: |
||
| 509 | |||
| 510 | populate variable_name |
||
| 511 | |||
| 512 | into |
||
| 513 | |||
| 514 | variable_name="<sub idref="variable_name"/>" |
||
| 515 | |||
| 516 | Also transforms any instance of the 'ansible-populate' function call in the |
||
| 517 | form of: |
||
| 518 | (ansible-populate variable_name) |
||
| 519 | into |
||
| 520 | |||
| 521 | <sub idref="variable_name"/> |
||
| 522 | |||
| 523 | Also transforms any instance of some other known remediation function (e.g. |
||
| 524 | 'replace_or_append' etc.) from the form of: |
||
| 525 | |||
| 526 | function_name "arg1" "arg2" ... "argN" |
||
| 527 | |||
| 528 | into: |
||
| 529 | |||
| 530 | <sub idref="function_function_name"/> |
||
| 531 | function_name "arg1" "arg2" ... "argN" |
||
| 532 | """ |
||
| 533 | |||
| 534 | if remediation_type == "ansible": |
||
| 535 | fix_text = fix.text |
||
| 536 | |||
| 537 | if "(ansible-populate " in fix_text: |
||
| 538 | raise RuntimeError( |
||
| 539 | "(ansible-populate VAR) has been deprecated. Please use " |
||
| 540 | "(xccdf-var VAR) instead. Keep in mind that the latter will " |
||
| 541 | "make an ansible variable out of XCCDF Value as opposed to " |
||
| 542 | "substituting directly." |
||
| 543 | ) |
||
| 544 | |||
| 545 | # If you change this string make sure it still matches the pattern |
||
| 546 | # defined in OpenSCAP. Otherwise you break variable handling in |
||
| 547 | # 'oscap xccdf generate fix' and the variables won't be customizable! |
||
| 548 | # https://github.com/OpenSCAP/openscap/blob/1.2.17/src/XCCDF_POLICY/xccdf_policy_remediate.c#L588 |
||
| 549 | # const char *pattern = |
||
| 550 | # "- name: XCCDF Value [^ ]+ # promote to variable\n set_fact:\n" |
||
| 551 | # " ([^:]+): (.+)\n tags:\n - always\n"; |
||
| 552 | # We use !!str typecast to prevent treating values as different types |
||
| 553 | # eg. yes as a bool or 077 as an octal number |
||
| 554 | fix_text = re.sub( |
||
| 555 | r"- \(xccdf-var\s+(\S+)\)", |
||
| 556 | r"- name: XCCDF Value \1 # promote to variable\n" |
||
| 557 | r" set_fact:\n" |
||
| 558 | r" \1: !!str (ansible-populate \1)\n" |
||
| 559 | r" tags:\n" |
||
| 560 | r" - always", |
||
| 561 | fix_text |
||
| 562 | ) |
||
| 563 | |||
| 564 | pattern = r'\(ansible-populate\s*(\S+)\)' |
||
| 565 | |||
| 566 | # we will get list what looks like |
||
| 567 | # [text, varname, text, varname, ..., text] |
||
| 568 | parts = re.split(pattern, fix_text) |
||
| 569 | |||
| 570 | fix.text = parts[0] # add first "text" |
||
| 571 | for index in range(1, len(parts), 2): |
||
| 572 | varname = parts[index] |
||
| 573 | text_between_vars = parts[index + 1] |
||
| 574 | |||
| 575 | # we cannot combine elements and text easily |
||
| 576 | # so text is in ".tail" of element |
||
| 577 | xccdfvarsub = ElementTree.SubElement(fix, "sub", idref=varname) |
||
| 578 | xccdfvarsub.tail = text_between_vars |
||
| 579 | return |
||
| 580 | |||
| 581 | elif remediation_type == "puppet": |
||
| 582 | pattern = r'\(puppet-populate\s*(\S+)\)' |
||
| 583 | |||
| 584 | # we will get list what looks like |
||
| 585 | # [text, varname, text, varname, ..., text] |
||
| 586 | parts = re.split(pattern, fix.text) |
||
| 587 | |||
| 588 | fix.text = parts[0] # add first "text" |
||
| 589 | for index in range(1, len(parts), 2): |
||
| 590 | varname = parts[index] |
||
| 591 | text_between_vars = parts[index + 1] |
||
| 592 | |||
| 593 | # we cannot combine elements and text easily |
||
| 594 | # so text is in ".tail" of element |
||
| 595 | xccdfvarsub = ElementTree.SubElement(fix, "sub", idref=varname) |
||
| 596 | xccdfvarsub.tail = text_between_vars |
||
| 597 | return |
||
| 598 | |||
| 599 | elif remediation_type == "anaconda": |
||
| 600 | pattern = r'\(anaconda-populate\s*(\S+)\)' |
||
| 601 | |||
| 602 | # we will get list what looks like |
||
| 603 | # [text, varname, text, varname, ..., text] |
||
| 604 | parts = re.split(pattern, fix.text) |
||
| 605 | |||
| 606 | fix.text = parts[0] # add first "text" |
||
| 607 | for index in range(1, len(parts), 2): |
||
| 608 | varname = parts[index] |
||
| 609 | text_between_vars = parts[index + 1] |
||
| 610 | |||
| 611 | # we cannot combine elements and text easily |
||
| 612 | # so text is in ".tail" of element |
||
| 613 | xccdfvarsub = ElementTree.SubElement(fix, "sub", idref=varname) |
||
| 614 | xccdfvarsub.tail = text_between_vars |
||
| 615 | return |
||
| 616 | |||
| 617 | elif remediation_type == "bash": |
||
| 618 | # This remediation script doesn't utilize internal remediation functions |
||
| 619 | # Skip it without any further processing |
||
| 620 | if 'remediation_functions' not in fix.text: |
||
| 621 | return |
||
| 622 | |||
| 623 | # This remediation script utilizes some of internal remediation functions |
||
| 624 | # Expand shell variables and remediation functions calls with <xccdf:sub> |
||
| 625 | # elements |
||
| 626 | pattern = r'\n+(\s*(?:' + r'|'.join(remediation_functions) + r')[^\n]*)\n' |
||
| 627 | patcomp = re.compile(pattern, re.DOTALL) |
||
| 628 | fixparts = re.split(patcomp, fix.text) |
||
| 629 | if fixparts[0] is not None: |
||
| 630 | # Split the portion of fix.text from fix start to first call of |
||
| 631 | # remediation function, keeping only the third part: |
||
| 632 | # * tail to hold part of the fix.text after inclusion, |
||
| 633 | # but before first call of remediation function |
||
| 634 | try: |
||
| 635 | rfpattern = '(.*remediation_functions)(.*)' |
||
| 636 | rfpatcomp = re.compile(rfpattern, re.DOTALL) |
||
| 637 | _, _, tail, _ = re.split(rfpatcomp, fixparts[0], maxsplit=2) |
||
| 638 | except ValueError: |
||
| 639 | sys.stderr.write("Processing fix.text for: %s rule\n" |
||
| 640 | % fix.get('rule')) |
||
| 641 | sys.stderr.write("Unable to extract part of the fix.text " |
||
| 642 | "after inclusion of remediation functions." |
||
| 643 | " Aborting..\n") |
||
| 644 | sys.exit(1) |
||
| 645 | # If the 'tail' is not empty, make it new fix.text. |
||
| 646 | # Otherwise use '' |
||
| 647 | fix.text = tail if tail is not None else '' |
||
| 648 | # Drop the first element of 'fixparts' since it has been processed |
||
| 649 | fixparts.pop(0) |
||
| 650 | # Perform sanity check on new 'fixparts' list content (to continue |
||
| 651 | # successfully 'fixparts' has to contain even count of elements) |
||
| 652 | if len(fixparts) % 2 != 0: |
||
| 653 | sys.stderr.write("Error performing XCCDF expansion on " |
||
| 654 | "remediation script: %s\n" |
||
| 655 | % fix.get("rule")) |
||
| 656 | sys.stderr.write("Invalid count of elements. Exiting!\n") |
||
| 657 | sys.exit(1) |
||
| 658 | # Process remaining 'fixparts' elements in pairs |
||
| 659 | # First pair element is remediation function to be XCCDF expanded |
||
| 660 | # Second pair element (if not empty) is the portion of the original |
||
| 661 | # fix text to be used in newly added sublement's tail |
||
| 662 | for idx in range(0, len(fixparts), 2): |
||
| 663 | # We previously removed enclosing newlines when creating |
||
| 664 | # fixparts list. Add them back and reuse the above 'pattern' |
||
| 665 | fixparts[idx] = "\n%s\n" % fixparts[idx] |
||
| 666 | # Sanity check (verify the first field truly contains call of |
||
| 667 | # some of the remediation functions) |
||
| 668 | if re.match(pattern, fixparts[idx], re.DOTALL) is not None: |
||
| 669 | # This chunk contains call of 'populate' function |
||
| 670 | if "populate" in fixparts[idx]: |
||
| 671 | varname, fixtextcontrib = get_populate_replacement(remediation_type, |
||
| 672 | fixparts[idx]) |
||
| 673 | # Define new XCCDF <sub> element for the variable |
||
| 674 | xccdfvarsub = ElementTree.Element("sub", idref=varname) |
||
| 675 | |||
| 676 | # If this is first sub element, |
||
| 677 | # the textcontribution needs to go to fix text |
||
| 678 | # otherwise, append to last subelement |
||
| 679 | nfixchildren = len(list(fix)) |
||
| 680 | if nfixchildren == 0: |
||
| 681 | fix.text += fixtextcontrib |
||
| 682 | else: |
||
| 683 | previouselem = fix[nfixchildren-1] |
||
| 684 | previouselem.tail += fixtextcontrib |
||
| 685 | |||
| 686 | # If second pair element is not empty, append it as |
||
| 687 | # tail for the subelement (prefixed with closing '"') |
||
| 688 | if fixparts[idx + 1] is not None: |
||
| 689 | xccdfvarsub.tail = '"' + '\n' + fixparts[idx + 1] |
||
| 690 | # Otherwise append just enclosing '"' |
||
| 691 | else: |
||
| 692 | xccdfvarsub.tail = '"' + '\n' |
||
| 693 | # Append the new subelement to the fix element |
||
| 694 | fix.append(xccdfvarsub) |
||
| 695 | # This chunk contains call of other remediation function |
||
| 696 | else: |
||
| 697 | # Extract remediation function name |
||
| 698 | funcname = re.search(r'\n\s*(\S+)(| .*)\n', |
||
| 699 | fixparts[idx], |
||
| 700 | re.DOTALL).group(1) |
||
| 701 | # Define new XCCDF <sub> element for the function |
||
| 702 | xccdffuncsub = ElementTree.Element( |
||
| 703 | "sub", idref='function_%s' % funcname) |
||
| 704 | # Append original function call into tail of the |
||
| 705 | # subelement |
||
| 706 | xccdffuncsub.tail = fixparts[idx] |
||
| 707 | # If the second element of the pair is not empty, |
||
| 708 | # append it to the tail of the subelement too |
||
| 709 | if fixparts[idx + 1] is not None: |
||
| 710 | xccdffuncsub.tail += fixparts[idx + 1] |
||
| 711 | # Append the new subelement to the fix element |
||
| 712 | fix.append(xccdffuncsub) |
||
| 713 | # Ensure the newly added <xccdf:sub> element for the |
||
| 714 | # function will be always inserted at newline |
||
| 715 | # If xccdffuncsub is the first <xccdf:sub> element |
||
| 716 | # being added as child of <fix> and fix.text doesn't |
||
| 717 | # end up with newline character, append the newline |
||
| 718 | # to the fix.text |
||
| 719 | if list(fix).index(xccdffuncsub) == 0: |
||
| 720 | if re.search(r'.*\n$', fix.text) is None: |
||
| 721 | fix.text += '\n' |
||
| 722 | # If xccdffuncsub isn't the first child (first |
||
| 723 | # <xccdf:sub> being added), and tail of previous |
||
| 724 | # child doesn't end up with newline, append the newline |
||
| 725 | # to the tail of previous child |
||
| 726 | else: |
||
| 727 | previouselem = fix[list(fix).index(xccdffuncsub) - 1] |
||
| 728 | if re.search(r'.*\n$', previouselem.tail) is None: |
||
| 729 | previouselem.tail += '\n' |
||
| 730 | |||
| 731 | # Perform a sanity check if all known remediation function calls have been |
||
| 732 | # properly XCCDF substituted. Exit with failure if some wasn't |
||
| 733 | |||
| 734 | # First concat output form of modified fix text (including text appended |
||
| 735 | # to all children of the fix) |
||
| 736 | modfix = [fix.text] |
||
| 737 | for child in fix.getchildren(): |
||
| 738 | if child is not None and child.text is not None: |
||
| 739 | modfix.append(child.text) |
||
| 740 | modfixtext = "".join(modfix) |
||
| 741 | for func in remediation_functions: |
||
| 742 | # Then efine expected XCCDF sub element form for this function |
||
| 743 | funcxccdfsub = "<sub idref=\"function_%s\"" % func |
||
| 744 | # Finally perform the sanity check -- if function was properly XCCDF |
||
| 745 | # substituted both the original function call and XCCDF <sub> element |
||
| 746 | # for that function need to be present in the modified text of the fix |
||
| 747 | # Otherwise something went wrong, thus exit with failure |
||
| 748 | if func in modfixtext and funcxccdfsub not in modfixtext: |
||
| 749 | sys.stderr.write("Error performing XCCDF <sub> substitution " |
||
| 750 | "for function %s in %s fix. Exiting...\n" |
||
| 751 | % (func, fix.get("rule"))) |
||
| 752 | sys.exit(1) |
||
| 753 | else: |
||
| 754 | sys.stderr.write("Unknown remediation type '%s'\n" % (remediation_type)) |
||
| 755 | sys.exit(1) |
||
| 756 |