Passed
Push — master ( 2c78a8...1e73f5 )
by Matěj
02:33 queued 11s
created

BashRemediation.parse_from_file_with_jinja()   B

Complexity

Conditions 8

Size

Total Lines 50
Code Lines 31

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 30.8772

Importance

Changes 0
Metric Value
cc 8
eloc 31
nop 2
dl 0
loc 50
ccs 9
cts 31
cp 0.2903
crap 30.8772
rs 7.2693
c 0
b 0
f 0
1 2
from __future__ import absolute_import
2 2
from __future__ import print_function
3
4 2
import sys
5 2
import os
6 2
import os.path
7 2
import re
8 2
import codecs
9 2
from collections import defaultdict, namedtuple, OrderedDict
10
11 2
import ssg.yaml
12 2
from . import build_yaml
13 2
from . import rules
14 2
from . import utils
15
16 2
from . import constants
17 2
from .jinja import process_file_with_macros as jinja_process_file
18
19 2
from .xml import ElementTree
20
21 2
REMEDIATION_TO_EXT_MAP = {
22
    'anaconda': '.anaconda',
23
    'ansible': '.yml',
24
    'bash': '.sh',
25
    'puppet': '.pp',
26
    'ignition': '.yml',
27
    'kubernetes': '.yml'
28
}
29
30 2
PKG_MANAGER_TO_PACKAGE_CHECK_COMMAND = {
31
    'apt_get': 'dpkg-query -s {0} &>/dev/null',
32
    'dnf': 'rpm --quiet -q {0}',
33
    'yum': 'rpm --quiet -q {0}',
34
    'zypper': 'rpm --quiet -q {0}',
35
}
36
37 2
FILE_GENERATED_HASH_COMMENT = '# THIS FILE IS GENERATED'
38
39 2
REMEDIATION_CONFIG_KEYS = ['complexity', 'disruption', 'platform', 'reboot',
40
                           'strategy']
41 2
REMEDIATION_ELM_KEYS = ['complexity', 'disruption', 'reboot', 'strategy']
42
43
44 2
def get_available_functions(build_dir):
45
    """Parse the content of "$CMAKE_BINARY_DIR/bash-remediation-functions.xml"
46
    XML file to obtain the list of currently known SCAP Security Guide internal
47
    remediation functions"""
48
49
    # If location of /shared directory is known
50
    if build_dir is None or not os.path.isdir(build_dir):
51
        sys.stderr.write("Expected '%s' to be the build directory. It doesn't "
52
                         "exist or is not a directory." % (build_dir))
53
        sys.exit(1)
54
55
    # Construct the final path of XML file with remediation functions
56
    xmlfilepath = \
57
        os.path.join(build_dir, "bash-remediation-functions.xml")
58
59
    if not os.path.isfile(xmlfilepath):
60
        sys.stderr.write("Expected '%s' to contain the remediation functions. "
61
                         "The file was not found!\n" % (xmlfilepath))
62
        sys.exit(1)
63
64
    remediation_functions = []
65
    with codecs.open(xmlfilepath, "r", encoding="utf-8") as xmlfile:
66
        filestring = xmlfile.read()
67
        # This regex looks implementation dependent but we can rely on the element attributes
68
        # being present. Beware, DOTALL means we go through the whole file at once.
69
        # We can't rely on ElementTree sorting XML attrs in any way since Python 3.7.
70
        remediation_functions = re.findall(
71
            r'<Value[^>]+id=\"function_(\S+)\"',
72
            filestring, re.DOTALL
73
        )
74
75
    return remediation_functions
76
77
78 2
def get_fixgroup_for_type(fixcontent, remediation_type):
79
    """
80
    For a given remediation type, return a new subelement of that type.
81
82
    Exits if passed an unknown remediation type.
83
    """
84
    if remediation_type == 'anaconda':
85
        return ElementTree.SubElement(
86
            fixcontent, "fix-group", id="anaconda",
87
            system="urn:redhat:anaconda:pre",
88
            xmlns="http://checklists.nist.gov/xccdf/1.1")
89
90
    elif remediation_type == 'ansible':
91
        return ElementTree.SubElement(
92
            fixcontent, "fix-group", id="ansible",
93
            system="urn:xccdf:fix:script:ansible",
94
            xmlns="http://checklists.nist.gov/xccdf/1.1")
95
96
    elif remediation_type == 'bash':
97
        return ElementTree.SubElement(
98
            fixcontent, "fix-group", id="bash",
99
            system="urn:xccdf:fix:script:sh",
100
            xmlns="http://checklists.nist.gov/xccdf/1.1")
101
102
    elif remediation_type == 'puppet':
103
        return ElementTree.SubElement(
104
            fixcontent, "fix-group", id="puppet",
105
            system="urn:xccdf:fix:script:puppet",
106
            xmlns="http://checklists.nist.gov/xccdf/1.1")
107
108
    elif remediation_type == 'ignition':
109
        return ElementTree.SubElement(
110
            fixcontent, "fix-group", id="ignition",
111
            system="urn:xccdf:fix:script:ignition",
112
            xmlns="http://checklists.nist.gov/xccdf/1.1")
113
114
    elif remediation_type == 'kubernetes':
115
        return ElementTree.SubElement(
116
            fixcontent, "fix-group", id="kubernetes",
117
            system="urn:xccdf:fix:script:kubernetes",
118
            xmlns="http://checklists.nist.gov/xccdf/1.1")
119
120
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
121
                     % (remediation_type))
122
    sys.exit(1)
123
124
125 2
def is_supported_filename(remediation_type, filename):
126
    """
127
    Checks if filename has a supported extension for remediation_type.
128
129
    Exits when remediation_type is of an unknown type.
130
    """
131 2
    if remediation_type in REMEDIATION_TO_EXT_MAP:
132 2
        return filename.endswith(REMEDIATION_TO_EXT_MAP[remediation_type])
133
134
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
135
                     % (remediation_type))
136
    sys.exit(1)
137
138
139 2
def get_populate_replacement(remediation_type, text):
140
    """
141
    Return varname, fixtextcontribution
142
    """
143
144
    if remediation_type == 'bash':
145
        # Extract variable name
146
        varname = re.search(r'\npopulate (\S+)\n',
147
                            text, re.DOTALL).group(1)
148
        # Define fix text part to contribute to main fix text
149
        fixtextcontribution = '\n%s="' % varname
150
        return (varname, fixtextcontribution)
151
152
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
153
                     % (remediation_type))
154
    sys.exit(1)
155
156
157 2
def split_remediation_content_and_metadata(fix_file):
158 2
    remediation_contents = []
159 2
    config = defaultdict(lambda: None)
160
161
    # Assignment automatically escapes shell characters for XML
162 2
    for line in fix_file.splitlines():
163 2
        if line.startswith(FILE_GENERATED_HASH_COMMENT):
164
            continue
165
166 2
        if line.startswith('#') and line.count('=') == 1:
167 2
            (key, value) = line.strip('#').split('=')
168 2
            if key.strip() in REMEDIATION_CONFIG_KEYS:
169 2
                config[key.strip()] = value.strip()
170 2
                continue
171
172
        # If our parsed line wasn't a config item, add it to the
173
        # returned file contents. This includes when the line
174
        # begins with a '#' and contains an equals sign, but
175
        # the "key" isn't one of the known keys from
176
        # REMEDIATION_CONFIG_KEYS.
177 2
        remediation_contents.append(line)
178
179 2
    contents = "\n".join(remediation_contents)
180 2
    remediation = namedtuple('remediation', ['contents', 'config'])
181 2
    return remediation(contents=contents, config=config)
182
183
184 2
def parse_from_file_with_jinja(file_path, env_yaml):
185
    """
186
    Parses a remediation from a file. As remediations contain jinja macros,
187
    we need a env_yaml context to process these. In practice, no remediations
188
    use jinja in the configuration, so for extracting only the configuration,
189
    env_yaml can be an abritrary product.yml dictionary.
190
191
    If the logic of configuration parsing changes significantly, please also
192
    update ssg.fixes.parse_platform(...).
193
    """
194
195 2
    fix_file = jinja_process_file(file_path, env_yaml)
196 2
    return split_remediation_content_and_metadata(fix_file)
197
198
199 2
def parse_from_file_without_jinja(file_path):
200
    """
201
    Parses a remediation from a file. Doesn't process the Jinja macros.
202
    This function is useful in build phases in which all the Jinja macros
203
    are already resolved.
204
    """
205
    with open(file_path, "r") as f:
206
        f_str = f.read()
207
        return split_remediation_content_and_metadata(f_str)
208
209
210 2
class Remediation(object):
211 2
    def __init__(self, file_path, remediation_type):
212 2
        self.file_path = file_path
213 2
        self.local_env_yaml = dict()
214
215 2
        self.metadata = defaultdict(lambda: None)
216
217 2
        self.remediation_type = remediation_type
218 2
        self.associated_rule = None
219
220 2
    def load_rule_from(self, rule_path):
221 2
        self.associated_rule = build_yaml.Rule.from_yaml(rule_path)
222 2
        self.expand_env_yaml_from_rule()
223
224 2
    def expand_env_yaml_from_rule(self):
225 2
        if not self.associated_rule:
226
            return
227
228 2
        self.local_env_yaml["rule_title"] = self.associated_rule.title
229 2
        self.local_env_yaml["rule_id"] = self.associated_rule.id_
230
231 2
    def parse_from_file_with_jinja(self, env_yaml):
232 2
        return parse_from_file_with_jinja(self.file_path, env_yaml)
233
234
235 2
def process(remediation, env_yaml, fixes, rule_id):
236
    """
237
    Process a fix, adding it to fixes iff the file is of a valid extension
238
    for the remediation type and the fix is valid for the current product.
239
240
    Note that platform is a required field in the contents of the fix.
241
    """
242 2
    if not is_supported_filename(remediation.remediation_type, remediation.file_path):
243
        return
244
245 2
    result = remediation.parse_from_file_with_jinja(env_yaml)
246 2
    platforms = result.config['platform']
247
248 2
    if not platforms:
249
        raise RuntimeError(
250
            "The '%s' remediation script does not contain the "
251
            "platform identifier!" % (remediation.file_path))
252
253 2
    for platform in platforms.split(","):
254 2
        if platform.strip() != platform:
255
            msg = (
256
                "Comma-separated '{platform}' platforms "
257
                "in '{remediation_file}' contains whitespace."
258
                .format(platform=platforms, remediation_file=remediation.file_path))
259
            raise ValueError(msg)
260
261 2
    product = env_yaml["product"]
262 2
    if utils.is_applicable_for_product(platforms, product):
263 2
        fixes[rule_id] = result
264
265 2
    return result
266
267
268 2
class BashRemediation(Remediation):
269 2
    def __init__(self, file_path):
270 2
        super(BashRemediation, self).__init__(file_path, "bash")
271
272 2
    def parse_from_file_with_jinja(self, env_yaml):
273 2
        self.local_env_yaml.update(env_yaml)
274 2
        result = super(BashRemediation, self).parse_from_file_with_jinja(self.local_env_yaml)
275
276 2
        rule_platforms = set()
277 2
        if self.associated_rule:
278
            # There can be repeated inherited platforms and rule platforms
279
            rule_platforms.update(self.associated_rule.inherited_platforms)
280
            rule_platforms.add(self.associated_rule.platform)
281
282 2
        platform_conditionals = []
283 2
        for platform in rule_platforms:
284
            if platform == "machine":
285
                # Based on check installed_env_is_a_container
286
                platform_conditionals.append('[ ! -f /.dockerenv -a ! -f /run/.containerenv ]')
287
            elif platform is not None:
288
                # Assume any other platform is a Package CPE
289
290
                # Some package names are different from the platform names
291
                if platform in self.local_env_yaml["platform_package_overrides"]:
292
                    platform = self.local_env_yaml["platform_package_overrides"].get(platform)
293
294
                # Adjust package check command according to the pkg_manager
295
                pkg_manager = self.local_env_yaml["pkg_manager"]
296
                pkg_check_command = PKG_MANAGER_TO_PACKAGE_CHECK_COMMAND[pkg_manager]
297
                platform_conditionals.append(pkg_check_command.format(platform))
298
299 2
        if platform_conditionals:
300
            wrapped_fix_text = ["# Remediation is applicable only in certain platforms"]
301
302
            all_conditions = " && ".join(platform_conditionals)
303
            wrapped_fix_text.append("if {0}; then".format(all_conditions))
304
305
            # Avoid adding extra blank line
306
            if not result.contents.startswith("\n"):
307
                wrapped_fix_text.append("")
308
309
            # It is possible to indent the original body of the remediation with textwrap.indent(),
310
            # however, it is not supported by python2, and there is a risk of breaking remediations
311
            # For example, remediations with a here-doc block could be affected.
312
            wrapped_fix_text.append("{0}".format(result.contents))
313
            wrapped_fix_text.append("")
314
            wrapped_fix_text.append("else")
315
            wrapped_fix_text.append("    >&2 echo 'Remediation is not applicable, nothing was done'")
316
            wrapped_fix_text.append("fi")
317
318
            remediation = namedtuple('remediation', ['contents', 'config'])
319
            result = remediation(contents="\n".join(wrapped_fix_text), config=result.config)
320
321 2
        return result
322
323 2
class AnsibleRemediation(Remediation):
324 2
    def __init__(self, file_path):
325 2
        super(AnsibleRemediation, self).__init__(
326
            file_path, "ansible")
327
328 2
        self.body = None
329
330 2
    def parse_from_file_with_jinja(self, env_yaml):
331 2
        self.local_env_yaml.update(env_yaml)
332 2
        result = super(AnsibleRemediation, self).parse_from_file_with_jinja(self.local_env_yaml)
333
334 2
        if not self.associated_rule:
335
            return result
336
337 2
        parsed = ssg.yaml.ordered_load(result.contents)
338
339 2
        self.update(parsed, result.config)
340
341 2
        updated_yaml_text = ssg.yaml.ordered_dump(
342
            parsed, None, default_flow_style=False)
343 2
        result = result._replace(contents=updated_yaml_text)
344
345 2
        self.body = parsed
346 2
        self.metadata = result.config
347
348 2
        return result
349
350 2
    def update_tags_from_config(self, to_update, config):
351 2
        tags = to_update.get("tags", [])
352 2
        if "strategy" in config:
353 2
            tags.append("{0}_strategy".format(config["strategy"]))
354 2
        if "complexity" in config:
355 2
            tags.append("{0}_complexity".format(config["complexity"]))
356 2
        if "disruption" in config:
357 2
            tags.append("{0}_disruption".format(config["disruption"]))
358 2
        if "reboot" in config:
359 2
            if config["reboot"] == "true":
360
                reboot_tag = "reboot_required"
361
            else:
362 2
                reboot_tag = "no_reboot_needed"
363 2
            tags.append(reboot_tag)
364 2
        to_update["tags"] = sorted(tags)
365
366 2
    def update_tags_from_rule(self, to_update):
367 2
        if not self.associated_rule:
368
            raise RuntimeError("The Ansible snippet has no rule loaded.")
369
370 2
        tags = to_update.get("tags", [])
371 2
        tags.insert(0, "{0}_severity".format(self.associated_rule.severity))
372 2
        tags.insert(0, self.associated_rule.id_)
373
374 2
        cce_num = self._get_cce()
375 2
        if cce_num:
376 2
            tags.append("{0}".format(cce_num))
377
378 2
        refs = self.get_references()
379 2
        tags.extend(refs)
380 2
        to_update["tags"] = sorted(tags)
381
382 2
    def _get_cce(self):
383 2
        return self.associated_rule.identifiers.get("cce", None)
384
385 2
    def get_references(self):
386 2
        if not self.associated_rule:
387
            raise RuntimeError("The Ansible snippet has no rule loaded.")
388
389 2
        result = []
390 2
        for ref_class, prefix in constants.REF_PREFIX_MAP.items():
391 2
            refs = self._get_rule_reference(ref_class)
392 2
            result.extend(["{prefix}-{value}".format(prefix=prefix, value=v) for v in refs])
393 2
        return result
394
395 2
    def _get_rule_reference(self, ref_class):
396 2
        refs = self.associated_rule.references.get(ref_class, "")
397 2
        if refs:
398 2
            return refs.split(",")
399
        else:
400 2
            return []
401
402 2
    def inject_package_facts_task(self, parsed_snippet):
403
        """ Injects a package_facts task only if
404
            the snippet has a task with a when clause with ansible_facts.packages,
405
            and the snippet doesn't already have a package_facts task
406
        """
407 2
        has_package_facts_task = False
408 2
        has_ansible_facts_packages_clause = False
409
410 2
        for p_task in parsed_snippet:
411
            # We are only interested in the OrderedDicts, which represent Ansible tasks
412 2
            if not isinstance(p_task, dict):
413
                continue
414
415 2
            if "package_facts" in p_task:
416
                has_package_facts_task = True
417
418
            # When clause of the task can be string or a list, lets normalize to list
419 2
            task_when = p_task.get("when", "")
420 2
            if type(task_when) is str:
421 2
                task_when = [task_when]
422 2
            for when in task_when:
423 2
                if "ansible_facts.packages" in when:
424
                    has_ansible_facts_packages_clause = True
425
426 2
        if has_ansible_facts_packages_clause and not has_package_facts_task:
427
            facts_task = OrderedDict({'name': 'Gather the package facts',
428
                                      'package_facts': {'manager': 'auto'}})
429
            parsed_snippet.insert(0, facts_task)
430
431 2
    def update_when_from_rule(self, to_update):
432 2
        additional_when = []
433
434
        # There can be repeated inherited platforms and rule platforms
435 2
        rule_platforms = set(self.associated_rule.inherited_platforms)
436 2
        rule_platforms.add(self.associated_rule.platform)
437
438 2
        for platform in rule_platforms:
439 2
            if platform == "machine":
440 2
                additional_when.append('ansible_virtualization_type not in ["docker", "lxc", "openvz"]')
441
            elif platform is not None:
442
                # Assume any other platform is a Package CPE
443
444
                # It doesn't make sense to add a conditional on the task that
445
                # gathers data for the conditional
446
                if "package_facts" in to_update:
447
                    continue
448
449
                if platform in self.local_env_yaml["platform_package_overrides"]:
450
                    platform = self.local_env_yaml["platform_package_overrides"].get(platform)
451
452
                additional_when.append('"' + platform + '" in ansible_facts.packages')
453
                # After adding the conditional, we need to make sure package_facts are collected.
454
                # This is done via inject_package_facts_task()
455
456 2
        to_update.setdefault("when", "")
457 2
        new_when = ssg.yaml.update_yaml_list_or_string(to_update["when"], additional_when)
458 2
        if not new_when:
459
            to_update.pop("when")
460
        else:
461 2
            to_update["when"] = new_when
462
463 2
    def update(self, parsed, config):
464
        # We split the remediation update in three steps
465
466
        # 1. Update the when clause
467 2
        for p in parsed:
468 2
            if not isinstance(p, dict):
469
                continue
470 2
            self.update_when_from_rule(p)
471
472
        # 2. Inject any extra task necessary
473 2
        self.inject_package_facts_task(parsed)
474
475
        # 3. Add tags to all tasks, including the ones we have injected
476 2
        for p in parsed:
477 2
            if not isinstance(p, dict):
478
                continue
479 2
            self.update_tags_from_config(p, config)
480 2
            self.update_tags_from_rule(p)
481
482 2
    @classmethod
483
    def from_snippet_and_rule(cls, snippet_fname, rule_fname):
484 2
        if os.path.isfile(snippet_fname) and os.path.isfile(rule_fname):
485 2
            result = cls(snippet_fname)
486 2
            try:
487 2
                result.load_rule_from(rule_fname)
488
            except ssg.yaml.DocumentationNotComplete:
489
                # Happens on non-debug build when a rule is "documentation-incomplete"
490
                return None
491 2
            return result
492
493
494 2
class AnacondaRemediation(Remediation):
495 2
    def __init__(self, file_path):
496
        super(AnacondaRemediation, self).__init__(
497
            file_path, "anaconda")
498
499
500 2
class PuppetRemediation(Remediation):
501 2
    def __init__(self, file_path):
502
        super(PuppetRemediation, self).__init__(
503
            file_path, "puppet")
504
505
506 2
class IgnitionRemediation(Remediation):
507 2
    def __init__(self, file_path):
508
        super(IgnitionRemediation, self).__init__(
509
            file_path, "ignition")
510
511
512 2
class KubernetesRemediation(Remediation):
513 2
    def __init__(self, file_path):
514
        super(KubernetesRemediation, self).__init__(
515
              file_path, "kubernetes")
516
517
518 2
REMEDIATION_TO_CLASS = {
519
    'anaconda': AnacondaRemediation,
520
    'ansible': AnsibleRemediation,
521
    'bash': BashRemediation,
522
    'puppet': PuppetRemediation,
523
    'ignition': IgnitionRemediation,
524
    'kubernetes': KubernetesRemediation,
525
}
526
527
528 2
def write_fixes_to_xml(remediation_type, build_dir, output_path, fixes):
529
    """
530
    Builds a fix-content XML tree from the contents of fixes
531
    and writes it to output_path.
532
    """
533
534
    fixcontent = ElementTree.Element("fix-content", system="urn:xccdf:fix:script:sh",
535
                                     xmlns="http://checklists.nist.gov/xccdf/1.1")
536
    fixgroup = get_fixgroup_for_type(fixcontent, remediation_type)
537
538
    if remediation_type == "bash":
539
        # (bash_)remediation_functions are really only used for bash
540
        remediation_functions = get_available_functions(build_dir)
541
    else:
542
        remediation_functions = None
543
544
    for fix_name in fixes:
545
        fix_contents, config = fixes[fix_name]
546
547
        fix_elm = ElementTree.SubElement(fixgroup, "fix")
548
        fix_elm.set("rule", fix_name)
549
550
        for key in REMEDIATION_ELM_KEYS:
551
            if config[key]:
552
                fix_elm.set(key, config[key])
553
554
        fix_elm.text = fix_contents + "\n"
555
556
        # Expand shell variables and remediation functions
557
        # into corresponding XCCDF <sub> elements
558
        expand_xccdf_subs(fix_elm, remediation_type, remediation_functions)
559
560
    tree = ElementTree.ElementTree(fixcontent)
561
    tree.write(output_path)
562
563
564 2
def write_fixes_to_dir(fixes, remediation_type, output_dir):
565
    """
566
    Writes fixes as files to output_dir, each fix as a separate file
567
    """
568
    try:
569
        extension = REMEDIATION_TO_EXT_MAP[remediation_type]
570
    except KeyError:
571
        raise ValueError("Unknown remediation type %s." % remediation_type)
572
573
    if not os.path.exists(output_dir):
574
        os.makedirs(output_dir)
575
    for fix_name, fix in fixes.items():
576
        fix_contents, config = fix
577
        fix_path = os.path.join(output_dir, fix_name + extension)
578
        with open(fix_path, "w") as f:
579
            for k, v in config.items():
580
                f.write("# %s = %s\n" % (k, v))
581
            f.write(fix_contents)
582
583
584 2
def get_rule_dir_remediations(dir_path, remediation_type, product=None):
585
    """
586
    Gets a list of remediations of type remediation_type contained in a
587
    rule directory. If product is None, returns all such remediations.
588
    If product is not None, returns applicable remediations in order of
589
    priority:
590
591
        {{{ product }}}.ext -> shared.ext
592
593
    Only returns remediations which exist.
594
    """
595
596 2
    if not rules.is_rule_dir(dir_path):
597
        return []
598
599 2
    remediations_dir = os.path.join(dir_path, remediation_type)
600 2
    has_remediations_dir = os.path.isdir(remediations_dir)
601 2
    ext = REMEDIATION_TO_EXT_MAP[remediation_type]
602 2
    if not has_remediations_dir:
603
        return []
604
605 2
    results = []
606 2
    for remediation_file in os.listdir(remediations_dir):
607 2
        file_name, file_ext = os.path.splitext(remediation_file)
608 2
        remediation_path = os.path.join(remediations_dir, remediation_file)
609
610 2
        if file_ext == ext and rules.applies_to_product(file_name, product):
611 2
            if file_name == 'shared':
612
                results.append(remediation_path)
613
            else:
614 2
                results.insert(0, remediation_path)
615
616 2
    return results
617
618
619 2
def expand_xccdf_subs(fix, remediation_type, remediation_functions):
620
    """For those remediation scripts utilizing some of the internal SCAP
621
    Security Guide remediation functions expand the selected shell variables
622
    and remediation functions calls with <xccdf:sub> element
623
624
    This routine translates any instance of the 'populate' function call in
625
    the form of:
626
627
            populate variable_name
628
629
    into
630
631
            variable_name="<sub idref="variable_name"/>"
632
633
    Also transforms any instance of the 'ansible-populate' function call in the
634
    form of:
635
            (ansible-populate variable_name)
636
    into
637
638
            <sub idref="variable_name"/>
639
640
    Also transforms any instance of some other known remediation function (e.g.
641
    'replace_or_append' etc.) from the form of:
642
643
            function_name "arg1" "arg2" ... "argN"
644
645
    into:
646
647
            <sub idref="function_function_name"/>
648
            function_name "arg1" "arg2" ... "argN"
649
    """
650
651
    if remediation_type == "ignition":
652
        return
653
    if remediation_type == "kubernetes":
654
        return
655
    elif remediation_type == "ansible":
656
        fix_text = fix.text
657
658
        if "(ansible-populate " in fix_text:
659
            raise RuntimeError(
660
                "(ansible-populate VAR) has been deprecated. Please use "
661
                "(xccdf-var VAR) instead. Keep in mind that the latter will "
662
                "make an ansible variable out of XCCDF Value as opposed to "
663
                "substituting directly."
664
            )
665
666
        # If you change this string make sure it still matches the pattern
667
        # defined in OpenSCAP. Otherwise you break variable handling in
668
        # 'oscap xccdf generate fix' and the variables won't be customizable!
669
        # https://github.com/OpenSCAP/openscap/blob/1.2.17/src/XCCDF_POLICY/xccdf_policy_remediate.c#L588
670
        #   const char *pattern =
671
        #     "- name: XCCDF Value [^ ]+ # promote to variable\n  set_fact:\n"
672
        #     "    ([^:]+): (.+)\n  tags:\n    - always\n";
673
        # We use !!str typecast to prevent treating values as different types
674
        # eg. yes as a bool or 077 as an octal number
675
        fix_text = re.sub(
676
            r"- \(xccdf-var\s+(\S+)\)",
677
            r"- name: XCCDF Value \1 # promote to variable\n"
678
            r"  set_fact:\n"
679
            r"    \1: !!str (ansible-populate \1)\n"
680
            r"  tags:\n"
681
            r"    - always",
682
            fix_text
683
        )
684
685
        pattern = r'\(ansible-populate\s*(\S+)\)'
686
687
        # we will get list what looks like
688
        # [text, varname, text, varname, ..., text]
689
        parts = re.split(pattern, fix_text)
690
691
        fix.text = parts[0]  # add first "text"
692
        for index in range(1, len(parts), 2):
693
            varname = parts[index]
694
            text_between_vars = parts[index + 1]
695
696
            # we cannot combine elements and text easily
697
            # so text is in ".tail" of element
698
            xccdfvarsub = ElementTree.SubElement(fix, "sub", idref=varname)
699
            xccdfvarsub.tail = text_between_vars
700
        return
701
702
    elif remediation_type == "puppet":
703
        pattern = r'\(puppet-populate\s*(\S+)\)'
704
705
        # we will get list what looks like
706
        # [text, varname, text, varname, ..., text]
707
        parts = re.split(pattern, fix.text)
708
709
        fix.text = parts[0]  # add first "text"
710
        for index in range(1, len(parts), 2):
711
            varname = parts[index]
712
            text_between_vars = parts[index + 1]
713
714
            # we cannot combine elements and text easily
715
            # so text is in ".tail" of element
716
            xccdfvarsub = ElementTree.SubElement(fix, "sub", idref=varname)
717
            xccdfvarsub.tail = text_between_vars
718
        return
719
720
    elif remediation_type == "anaconda":
721
        pattern = r'\(anaconda-populate\s*(\S+)\)'
722
723
        # we will get list what looks like
724
        # [text, varname, text, varname, ..., text]
725
        parts = re.split(pattern, fix.text)
726
727
        fix.text = parts[0]  # add first "text"
728
        for index in range(1, len(parts), 2):
729
            varname = parts[index]
730
            text_between_vars = parts[index + 1]
731
732
            # we cannot combine elements and text easily
733
            # so text is in ".tail" of element
734
            xccdfvarsub = ElementTree.SubElement(fix, "sub", idref=varname)
735
            xccdfvarsub.tail = text_between_vars
736
        return
737
738
    elif remediation_type == "bash":
739
        # This remediation script doesn't utilize internal remediation functions
740
        # Skip it without any further processing
741
        if 'remediation_functions' not in fix.text:
742
            return
743
744
        # This remediation script utilizes some of internal remediation functions
745
        # Expand shell variables and remediation functions calls with <xccdf:sub>
746
        # elements
747
        pattern = r'\n+(\s*(?:' + r'|'.join(remediation_functions) + r')[^\n]*)\n'
748
        patcomp = re.compile(pattern, re.DOTALL)
749
        fixparts = re.split(patcomp, fix.text)
750
        if fixparts[0] is not None:
751
            # Split the portion of fix.text at the string remediation_functions,
752
            # and remove preceeding comment whenever it is there.
753
            # * head        holds part of the fix.text before
754
            #               remediation_functions string
755
            # * tail        holds part of the fix.text after the
756
            #               remediation_functions string
757
            try:
758
                rfpattern = r'((?:# Include source function library\.\n)?.*remediation_functions)'
759
                rfpatcomp = re.compile(rfpattern)
760
                head, _, tail = re.split(rfpatcomp, fixparts[0], maxsplit=1)
761
            except ValueError:
762
                sys.stderr.write("Processing fix.text for: %s rule\n"
763
                                 % fix.get('rule'))
764
                sys.stderr.write("Unable to extract part of the fix.text "
765
                                 "after inclusion of remediation functions."
766
                                 " Aborting..\n")
767
                sys.exit(1)
768
            # If the 'head' is not empty, make it new fix.text.
769
            # Otherwise use ''
770
            fix.text = head if head is not None else ''
0 ignored issues
show
introduced by
The variable head does not seem to be defined for all execution paths.
Loading history...
771
            fix.text += tail if tail is not None else ''
0 ignored issues
show
introduced by
The variable tail does not seem to be defined for all execution paths.
Loading history...
772
            # Drop the first element of 'fixparts' since it has been processed
773
            fixparts.pop(0)
774
            # Perform sanity check on new 'fixparts' list content (to continue
775
            # successfully 'fixparts' has to contain even count of elements)
776
            if len(fixparts) % 2 != 0:
777
                sys.stderr.write("Error performing XCCDF expansion on "
778
                                 "remediation script: %s\n"
779
                                 % fix.get("rule"))
780
                sys.stderr.write("Invalid count of elements. Exiting!\n")
781
                sys.exit(1)
782
            # Process remaining 'fixparts' elements in pairs
783
            # First pair element is remediation function to be XCCDF expanded
784
            # Second pair element (if not empty) is the portion of the original
785
            # fix text to be used in newly added sublement's tail
786
            for idx in range(0, len(fixparts), 2):
787
                # We previously removed enclosing newlines when creating
788
                # fixparts list. Add them back and reuse the above 'pattern'
789
                fixparts[idx] = "\n%s\n" % fixparts[idx]
790
                # Sanity check (verify the first field truly contains call of
791
                # some of the remediation functions)
792
                if re.match(pattern, fixparts[idx], re.DOTALL) is not None:
793
                    # This chunk contains call of 'populate' function
794
                    if "populate" in fixparts[idx]:
795
                        varname, fixtextcontrib = get_populate_replacement(remediation_type,
796
                                                                           fixparts[idx])
797
                        # Define new XCCDF <sub> element for the variable
798
                        xccdfvarsub = ElementTree.Element("sub", idref=varname)
799
800
                        # If this is first sub element,
801
                        # the textcontribution needs to go to fix text
802
                        # otherwise, append to last subelement
803
                        nfixchildren = len(list(fix))
804
                        if nfixchildren == 0:
805
                            fix.text += fixtextcontrib
806
                        else:
807
                            previouselem = fix[nfixchildren-1]
808
                            previouselem.tail += fixtextcontrib
809
810
                        # If second pair element is not empty, append it as
811
                        # tail for the subelement (prefixed with closing '"')
812
                        if fixparts[idx + 1] is not None:
813
                            xccdfvarsub.tail = '"' + '\n' + fixparts[idx + 1]
814
                        # Otherwise append just enclosing '"'
815
                        else:
816
                            xccdfvarsub.tail = '"' + '\n'
817
                        # Append the new subelement to the fix element
818
                        fix.append(xccdfvarsub)
819
                    # This chunk contains call of other remediation function
820
                    else:
821
                        # Extract remediation function name
822
                        funcname = re.search(r'\n\s*(\S+)(| .*)\n',
823
                                             fixparts[idx],
824
                                             re.DOTALL).group(1)
825
                        # Define new XCCDF <sub> element for the function
826
                        xccdffuncsub = ElementTree.Element(
827
                            "sub", idref='function_%s' % funcname)
828
                        # Append original function call into tail of the
829
                        # subelement
830
                        xccdffuncsub.tail = fixparts[idx]
831
                        # If the second element of the pair is not empty,
832
                        # append it to the tail of the subelement too
833
                        if fixparts[idx + 1] is not None:
834
                            xccdffuncsub.tail += fixparts[idx + 1]
835
                        # Append the new subelement to the fix element
836
                        fix.append(xccdffuncsub)
837
                        # Ensure the newly added <xccdf:sub> element for the
838
                        # function will be always inserted at newline
839
                        # If xccdffuncsub is the first <xccdf:sub> element
840
                        # being added as child of <fix> and fix.text doesn't
841
                        # end up with newline character, append the newline
842
                        # to the fix.text
843
                        if list(fix).index(xccdffuncsub) == 0:
844
                            if re.search(r'.*\n$', fix.text) is None:
845
                                fix.text += '\n'
846
                        # If xccdffuncsub isn't the first child (first
847
                        # <xccdf:sub> being added), and tail of previous
848
                        # child doesn't end up with newline, append the newline
849
                        # to the tail of previous child
850
                        else:
851
                            previouselem = fix[list(fix).index(xccdffuncsub) - 1]
852
                            if re.search(r'.*\n$', previouselem.tail) is None:
853
                                previouselem.tail += '\n'
854
855
        # Perform a sanity check if all known remediation function calls have been
856
        # properly XCCDF substituted. Exit with failure if some wasn't
857
858
        # First concat output form of modified fix text (including text appended
859
        # to all children of the fix)
860
        modfix = [fix.text]
861
        for child in list(fix):
862
            if child is not None and child.text is not None:
863
                modfix.append(child.text)
864
        modfixtext = "".join(modfix)
865
        # Don't perform sanity check at bash comments because they are not substituted
866
        modfixtext = re.sub(r'#.*', '', modfixtext)
867
        for func in remediation_functions:
868
            # Then efine expected XCCDF sub element form for this function
869
            funcxccdfsub = "<sub idref=\"function_%s\"" % func
870
            # Finally perform the sanity check -- if function was properly XCCDF
871
            # substituted both the original function call and XCCDF <sub> element
872
            # for that function need to be present in the modified text of the fix
873
            # Otherwise something went wrong, thus exit with failure
874
            if func in modfixtext and funcxccdfsub not in modfixtext:
875
                sys.stderr.write("Error performing XCCDF <sub> substitution "
876
                                 "for function %s in %s fix. Exiting...\n"
877
                                 % (func, fix.get("rule")))
878
                sys.exit(1)
879
    else:
880
        sys.stderr.write("Unknown remediation type '%s'\n" % (remediation_type))
881
        sys.exit(1)
882