Passed
Push — master ( 8347ae...d9e1ba )
by Jan
07:11 queued 05:16
created

ssg.build_remediations.get_rule_dir_remediations()   B

Complexity

Conditions 7

Size

Total Lines 33
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 14
CRAP Score 7.2694

Importance

Changes 0
Metric Value
cc 7
eloc 17
nop 3
dl 0
loc 33
ccs 14
cts 17
cp 0.8235
crap 7.2694
rs 8
c 0
b 0
f 0
1 2
from __future__ import absolute_import
2 2
from __future__ import print_function
3
4 2
import sys
5 2
import os
6 2
import os.path
7 2
import re
8 2
import codecs
9 2
from collections import defaultdict, namedtuple, OrderedDict
10
11 2
import ssg.yaml
12 2
from . import build_yaml
13 2
from . import rules
14 2
from . import utils
15
16 2
from . import constants
17 2
from .jinja import process_file_with_macros as jinja_process_file
18
19 2
from .xml import ElementTree
20
21 2
REMEDIATION_TO_EXT_MAP = {
22
    'anaconda': '.anaconda',
23
    'ansible': '.yml',
24
    'bash': '.sh',
25
    'puppet': '.pp',
26
    'ignition': '.yml',
27
    'kubernetes': '.yml'
28
}
29
30 2
FILE_GENERATED_HASH_COMMENT = '# THIS FILE IS GENERATED'
31
32 2
REMEDIATION_CONFIG_KEYS = ['complexity', 'disruption', 'platform', 'reboot',
33
                           'strategy']
34 2
REMEDIATION_ELM_KEYS = ['complexity', 'disruption', 'reboot', 'strategy']
35
36
37 2
def get_available_functions(build_dir):
38
    """Parse the content of "$CMAKE_BINARY_DIR/bash-remediation-functions.xml"
39
    XML file to obtain the list of currently known SCAP Security Guide internal
40
    remediation functions"""
41
42
    # If location of /shared directory is known
43
    if build_dir is None or not os.path.isdir(build_dir):
44
        sys.stderr.write("Expected '%s' to be the build directory. It doesn't "
45
                         "exist or is not a directory." % (build_dir))
46
        sys.exit(1)
47
48
    # Construct the final path of XML file with remediation functions
49
    xmlfilepath = \
50
        os.path.join(build_dir, "bash-remediation-functions.xml")
51
52
    if not os.path.isfile(xmlfilepath):
53
        sys.stderr.write("Expected '%s' to contain the remediation functions. "
54
                         "The file was not found!\n" % (xmlfilepath))
55
        sys.exit(1)
56
57
    remediation_functions = []
58
    with codecs.open(xmlfilepath, "r", encoding="utf-8") as xmlfile:
59
        filestring = xmlfile.read()
60
        # This regex looks implementation dependent but we can rely on the element attributes
61
        # being present. Beware, DOTALL means we go through the whole file at once.
62
        # We can't rely on ElementTree sorting XML attrs in any way since Python 3.7.
63
        remediation_functions = re.findall(
64
            r'<Value[^>]+id=\"function_(\S+)\"',
65
            filestring, re.DOTALL
66
        )
67
68
    return remediation_functions
69
70
71 2
def get_fixgroup_for_type(fixcontent, remediation_type):
72
    """
73
    For a given remediation type, return a new subelement of that type.
74
75
    Exits if passed an unknown remediation type.
76
    """
77
    if remediation_type == 'anaconda':
78
        return ElementTree.SubElement(
79
            fixcontent, "fix-group", id="anaconda",
80
            system="urn:redhat:anaconda:pre",
81
            xmlns="http://checklists.nist.gov/xccdf/1.1")
82
83
    elif remediation_type == 'ansible':
84
        return ElementTree.SubElement(
85
            fixcontent, "fix-group", id="ansible",
86
            system="urn:xccdf:fix:script:ansible",
87
            xmlns="http://checklists.nist.gov/xccdf/1.1")
88
89
    elif remediation_type == 'bash':
90
        return ElementTree.SubElement(
91
            fixcontent, "fix-group", id="bash",
92
            system="urn:xccdf:fix:script:sh",
93
            xmlns="http://checklists.nist.gov/xccdf/1.1")
94
95
    elif remediation_type == 'puppet':
96
        return ElementTree.SubElement(
97
            fixcontent, "fix-group", id="puppet",
98
            system="urn:xccdf:fix:script:puppet",
99
            xmlns="http://checklists.nist.gov/xccdf/1.1")
100
101
    elif remediation_type == 'ignition':
102
        return ElementTree.SubElement(
103
            fixcontent, "fix-group", id="ignition",
104
            system="urn:xccdf:fix:script:ignition",
105
            xmlns="http://checklists.nist.gov/xccdf/1.1")
106
107
    elif remediation_type == 'kubernetes':
108
        return ElementTree.SubElement(
109
            fixcontent, "fix-group", id="kubernetes",
110
            system="urn:xccdf:fix:script:kubernetes",
111
            xmlns="http://checklists.nist.gov/xccdf/1.1")
112
113
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
114
                     % (remediation_type))
115
    sys.exit(1)
116
117
118 2
def is_supported_filename(remediation_type, filename):
119
    """
120
    Checks if filename has a supported extension for remediation_type.
121
122
    Exits when remediation_type is of an unknown type.
123
    """
124 2
    if remediation_type in REMEDIATION_TO_EXT_MAP:
125 2
        return filename.endswith(REMEDIATION_TO_EXT_MAP[remediation_type])
126
127
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
128
                     % (remediation_type))
129
    sys.exit(1)
130
131
132 2
def get_populate_replacement(remediation_type, text):
133
    """
134
    Return varname, fixtextcontribution
135
    """
136
137
    if remediation_type == 'bash':
138
        # Extract variable name
139
        varname = re.search(r'\npopulate (\S+)\n',
140
                            text, re.DOTALL).group(1)
141
        # Define fix text part to contribute to main fix text
142
        fixtextcontribution = '\n%s="' % varname
143
        return (varname, fixtextcontribution)
144
145
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
146
                     % (remediation_type))
147
    sys.exit(1)
148
149
150 2
def split_remediation_content_and_metadata(fix_file):
151 2
    remediation_contents = []
152 2
    config = defaultdict(lambda: None)
153
154
    # Assignment automatically escapes shell characters for XML
155 2
    for line in fix_file.splitlines():
156 2
        if line.startswith(FILE_GENERATED_HASH_COMMENT):
157
            continue
158
159 2
        if line.startswith('#') and line.count('=') == 1:
160 2
            (key, value) = line.strip('#').split('=')
161 2
            if key.strip() in REMEDIATION_CONFIG_KEYS:
162 2
                config[key.strip()] = value.strip()
163 2
                continue
164
165
        # If our parsed line wasn't a config item, add it to the
166
        # returned file contents. This includes when the line
167
        # begins with a '#' and contains an equals sign, but
168
        # the "key" isn't one of the known keys from
169
        # REMEDIATION_CONFIG_KEYS.
170 2
        remediation_contents.append(line)
171
172 2
    contents = "\n".join(remediation_contents)
173 2
    remediation = namedtuple('remediation', ['contents', 'config'])
174 2
    return remediation(contents=contents, config=config)
175
176
177 2
def parse_from_file_with_jinja(file_path, env_yaml):
178
    """
179
    Parses a remediation from a file. As remediations contain jinja macros,
180
    we need a env_yaml context to process these. In practice, no remediations
181
    use jinja in the configuration, so for extracting only the configuration,
182
    env_yaml can be an abritrary product.yml dictionary.
183
184
    If the logic of configuration parsing changes significantly, please also
185
    update ssg.fixes.parse_platform(...).
186
    """
187
188 2
    fix_file = jinja_process_file(file_path, env_yaml)
189 2
    return split_remediation_content_and_metadata(fix_file)
190
191
192 2
def parse_from_file_without_jinja(file_path):
193
    """
194
    Parses a remediation from a file. Doesn't process the Jinja macros.
195
    This function is useful in build phases in which all the Jinja macros
196
    are already resolved.
197
    """
198
    with open(file_path, "r") as f:
199
        f_str = f.read()
200
        return split_remediation_content_and_metadata(f_str)
201
202
203 2
class Remediation(object):
204 2
    def __init__(self, file_path, remediation_type):
205 2
        self.file_path = file_path
206 2
        self.local_env_yaml = dict()
207
208 2
        self.metadata = defaultdict(lambda: None)
209
210 2
        self.remediation_type = remediation_type
211 2
        self.associated_rule = None
212
213 2
    def load_rule_from(self, rule_path):
214 2
        self.associated_rule = build_yaml.Rule.from_yaml(rule_path)
215 2
        self.expand_env_yaml_from_rule()
216
217 2
    def expand_env_yaml_from_rule(self):
218 2
        if not self.associated_rule:
219
            return
220
221 2
        self.local_env_yaml["rule_title"] = self.associated_rule.title
222 2
        self.local_env_yaml["rule_id"] = self.associated_rule.id_
223
224 2
    def parse_from_file_with_jinja(self, env_yaml):
225 2
        return parse_from_file_with_jinja(self.file_path, env_yaml)
226
227
228 2
def process(remediation, env_yaml, fixes, rule_id):
229
    """
230
    Process a fix, adding it to fixes iff the file is of a valid extension
231
    for the remediation type and the fix is valid for the current product.
232
233
    Note that platform is a required field in the contents of the fix.
234
    """
235 2
    if not is_supported_filename(remediation.remediation_type, remediation.file_path):
236
        return
237
238 2
    result = remediation.parse_from_file_with_jinja(env_yaml)
239 2
    platforms = result.config['platform']
240
241 2
    if not platforms:
242
        raise RuntimeError(
243
            "The '%s' remediation script does not contain the "
244
            "platform identifier!" % (remediation.file_path))
245
246 2
    for platform in platforms.split(","):
247 2
        if platform.strip() != platform:
248
            msg = (
249
                "Comma-separated '{platform}' platforms "
250
                "in '{remediation_file}' contains whitespace."
251
                .format(platform=platforms, remediation_file=remediation.file_path))
252
            raise ValueError(msg)
253
254 2
    product = env_yaml["product"]
255 2
    if utils.is_applicable_for_product(platforms, product):
256 2
        fixes[rule_id] = result
257
258 2
    return result
259
260
261 2
class BashRemediation(Remediation):
262 2
    def __init__(self, file_path):
263 2
        super(BashRemediation, self).__init__(file_path, "bash")
264
265
266 2
class AnsibleRemediation(Remediation):
267 2
    def __init__(self, file_path):
268 2
        super(AnsibleRemediation, self).__init__(
269
            file_path, "ansible")
270
271 2
        self.body = None
272
273 2
    def parse_from_file_with_jinja(self, env_yaml):
274 2
        self.local_env_yaml.update(env_yaml)
275 2
        result = super(AnsibleRemediation, self).parse_from_file_with_jinja(self.local_env_yaml)
276
277 2
        if not self.associated_rule:
278
            return result
279
280 2
        parsed = ssg.yaml.ordered_load(result.contents)
281
282 2
        self.update(parsed, result.config)
283
284 2
        updated_yaml_text = ssg.yaml.ordered_dump(
285
            parsed, None, default_flow_style=False)
286 2
        result = result._replace(contents=updated_yaml_text)
287
288 2
        self.body = parsed
289 2
        self.metadata = result.config
290
291 2
        return result
292
293 2
    def update_tags_from_config(self, to_update, config):
294 2
        tags = to_update.get("tags", [])
295 2
        if "strategy" in config:
296 2
            tags.append("{0}_strategy".format(config["strategy"]))
297 2
        if "complexity" in config:
298 2
            tags.append("{0}_complexity".format(config["complexity"]))
299 2
        if "disruption" in config:
300 2
            tags.append("{0}_disruption".format(config["disruption"]))
301 2
        if "reboot" in config:
302 2
            if config["reboot"] == "true":
303
                reboot_tag = "reboot_required"
304
            else:
305 2
                reboot_tag = "no_reboot_needed"
306 2
            tags.append(reboot_tag)
307 2
        to_update["tags"] = sorted(tags)
308
309 2
    def update_tags_from_rule(self, to_update):
310 2
        if not self.associated_rule:
311
            raise RuntimeError("The Ansible snippet has no rule loaded.")
312
313 2
        tags = to_update.get("tags", [])
314 2
        tags.insert(0, "{0}_severity".format(self.associated_rule.severity))
315 2
        tags.insert(0, self.associated_rule.id_)
316
317 2
        cce_num = self._get_cce()
318 2
        if cce_num:
319 2
            tags.append("{0}".format(cce_num))
320
321 2
        refs = self.get_references()
322 2
        tags.extend(refs)
323 2
        to_update["tags"] = sorted(tags)
324
325 2
    def _get_cce(self):
326 2
        return self.associated_rule.identifiers.get("cce", None)
327
328 2
    def get_references(self):
329 2
        if not self.associated_rule:
330
            raise RuntimeError("The Ansible snippet has no rule loaded.")
331
332 2
        result = []
333 2
        for ref_class, prefix in constants.REF_PREFIX_MAP.items():
334 2
            refs = self._get_rule_reference(ref_class)
335 2
            result.extend(["{prefix}-{value}".format(prefix=prefix, value=v) for v in refs])
336 2
        return result
337
338 2
    def _get_rule_reference(self, ref_class):
339 2
        refs = self.associated_rule.references.get(ref_class, "")
340 2
        if refs:
341 2
            return refs.split(",")
342
        else:
343 2
            return []
344
345 2
    def inject_package_facts_task(self, parsed_snippet):
346
        """ Injects a package_facts task only if
347
            the snippet has a task with a when clause with ansible_facts.packages,
348
            and the snippet doesn't already have a package_facts task
349
        """
350 2
        has_package_facts_task = False
351 2
        has_ansible_facts_packages_clause = False
352
353 2
        for p_task in parsed_snippet:
354
            # We are only interested in the OrderedDicts, which represent Ansible tasks
355 2
            if not isinstance(p_task, dict):
356
                continue
357
358 2
            if "package_facts" in p_task:
359
                has_package_facts_task = True
360
361
            # When clause of the task can be string or a list, lets normalize to list
362 2
            task_when = p_task.get("when", "")
363 2
            if type(task_when) is str:
364 2
                task_when = [task_when]
365 2
            for when in task_when:
366 2
                if "ansible_facts.packages" in when:
367
                    has_ansible_facts_packages_clause = True
368
369 2
        if has_ansible_facts_packages_clause and not has_package_facts_task:
370
            facts_task = OrderedDict({'name': 'Gather the package facts',
371
                                      'package_facts': {'manager': 'auto'}})
372
            parsed_snippet.insert(0, facts_task)
373
374 2
    def update_when_from_rule(self, to_update):
375 2
        additional_when = []
376
377
        # There can be repeated inherited platforms and rule platforms
378 2
        rule_platforms = set(self.associated_rule.inherited_platforms)
379 2
        rule_platforms.add(self.associated_rule.platform)
380
381 2
        for platform in rule_platforms:
382 2
            if platform == "machine":
383 2
                additional_when.append('ansible_virtualization_type not in ["docker", "lxc", "openvz"]')
384
            elif platform is not None:
385
                # Assume any other platform is a Package CPE
386
387
                # It doesn't make sense to add a conditional on the task that
388
                # gathers data for the conditional
389
                if "package_facts" in to_update:
390
                    continue
391
392
                additional_when.append('"' + platform + '" in ansible_facts.packages')
393
                # After adding the conditional, we need to make sure package_facts are collected.
394
                # This is done via inject_package_facts_task()
395
396 2
        to_update.setdefault("when", "")
397 2
        new_when = ssg.yaml.update_yaml_list_or_string(to_update["when"], additional_when)
398 2
        if not new_when:
399
            to_update.pop("when")
400
        else:
401 2
            to_update["when"] = new_when
402
403 2
    def update(self, parsed, config):
404
        # We split the remediation update in three steps
405
406
        # 1. Update the when clause
407 2
        for p in parsed:
408 2
            if not isinstance(p, dict):
409
                continue
410 2
            self.update_when_from_rule(p)
411
412
        # 2. Inject any extra task necessary
413 2
        self.inject_package_facts_task(parsed)
414
415
        # 3. Add tags to all tasks, including the ones we have injected
416 2
        for p in parsed:
417 2
            if not isinstance(p, dict):
418
                continue
419 2
            self.update_tags_from_config(p, config)
420 2
            self.update_tags_from_rule(p)
421
422 2
    @classmethod
423
    def from_snippet_and_rule(cls, snippet_fname, rule_fname):
424 2
        if os.path.isfile(snippet_fname) and os.path.isfile(rule_fname):
425 2
            result = cls(snippet_fname)
426 2
            try:
427 2
                result.load_rule_from(rule_fname)
428
            except ssg.yaml.DocumentationNotComplete:
429
                # Happens on non-debug build when a rule is "documentation-incomplete"
430
                return None
431 2
            return result
432
433
434 2
class AnacondaRemediation(Remediation):
435 2
    def __init__(self, file_path):
436
        super(AnacondaRemediation, self).__init__(
437
            file_path, "anaconda")
438
439
440 2
class PuppetRemediation(Remediation):
441 2
    def __init__(self, file_path):
442
        super(PuppetRemediation, self).__init__(
443
            file_path, "puppet")
444
445
446 2
class IgnitionRemediation(Remediation):
447 2
    def __init__(self, file_path):
448
        super(IgnitionRemediation, self).__init__(
449
            file_path, "ignition")
450
451
452 2
class KubernetesRemediation(Remediation):
453 2
    def __init__(self, file_path):
454
        super(KubernetesRemediation, self).__init__(
455
              file_path, "kubernetes")
456
457
458 2
REMEDIATION_TO_CLASS = {
459
    'anaconda': AnacondaRemediation,
460
    'ansible': AnsibleRemediation,
461
    'bash': BashRemediation,
462
    'puppet': PuppetRemediation,
463
    'ignition': IgnitionRemediation,
464
    'kubernetes': KubernetesRemediation,
465
}
466
467
468 2
def write_fixes_to_xml(remediation_type, build_dir, output_path, fixes):
469
    """
470
    Builds a fix-content XML tree from the contents of fixes
471
    and writes it to output_path.
472
    """
473
474
    fixcontent = ElementTree.Element("fix-content", system="urn:xccdf:fix:script:sh",
475
                                     xmlns="http://checklists.nist.gov/xccdf/1.1")
476
    fixgroup = get_fixgroup_for_type(fixcontent, remediation_type)
477
478
    if remediation_type == "bash":
479
        # (bash_)remediation_functions are really only used for bash
480
        remediation_functions = get_available_functions(build_dir)
481
    else:
482
        remediation_functions = None
483
484
    for fix_name in fixes:
485
        fix_contents, config = fixes[fix_name]
486
487
        fix_elm = ElementTree.SubElement(fixgroup, "fix")
488
        fix_elm.set("rule", fix_name)
489
490
        for key in REMEDIATION_ELM_KEYS:
491
            if config[key]:
492
                fix_elm.set(key, config[key])
493
494
        fix_elm.text = fix_contents + "\n"
495
496
        # Expand shell variables and remediation functions
497
        # into corresponding XCCDF <sub> elements
498
        expand_xccdf_subs(fix_elm, remediation_type, remediation_functions)
499
500
    tree = ElementTree.ElementTree(fixcontent)
501
    tree.write(output_path)
502
503
504 2
def write_fixes_to_dir(fixes, remediation_type, output_dir):
505
    """
506
    Writes fixes as files to output_dir, each fix as a separate file
507
    """
508
    try:
509
        extension = REMEDIATION_TO_EXT_MAP[remediation_type]
510
    except KeyError:
511
        raise ValueError("Unknown remediation type %s." % remediation_type)
512
513
    if not os.path.exists(output_dir):
514
        os.makedirs(output_dir)
515
    for fix_name, fix in fixes.items():
516
        fix_contents, config = fix
517
        fix_path = os.path.join(output_dir, fix_name + extension)
518
        with open(fix_path, "w") as f:
519
            for k, v in config.items():
520
                f.write("# %s = %s\n" % (k, v))
521
            f.write(fix_contents)
522
523
524 2
def get_rule_dir_remediations(dir_path, remediation_type, product=None):
525
    """
526
    Gets a list of remediations of type remediation_type contained in a
527
    rule directory. If product is None, returns all such remediations.
528
    If product is not None, returns applicable remediations in order of
529
    priority:
530
531
        {{{ product }}}.ext -> shared.ext
532
533
    Only returns remediations which exist.
534
    """
535
536 2
    if not rules.is_rule_dir(dir_path):
537
        return []
538
539 2
    remediations_dir = os.path.join(dir_path, remediation_type)
540 2
    has_remediations_dir = os.path.isdir(remediations_dir)
541 2
    ext = REMEDIATION_TO_EXT_MAP[remediation_type]
542 2
    if not has_remediations_dir:
543
        return []
544
545 2
    results = []
546 2
    for remediation_file in os.listdir(remediations_dir):
547 2
        file_name, file_ext = os.path.splitext(remediation_file)
548 2
        remediation_path = os.path.join(remediations_dir, remediation_file)
549
550 2
        if file_ext == ext and rules.applies_to_product(file_name, product):
551 2
            if file_name == 'shared':
552
                results.append(remediation_path)
553
            else:
554 2
                results.insert(0, remediation_path)
555
556 2
    return results
557
558
559 2
def expand_xccdf_subs(fix, remediation_type, remediation_functions):
560
    """For those remediation scripts utilizing some of the internal SCAP
561
    Security Guide remediation functions expand the selected shell variables
562
    and remediation functions calls with <xccdf:sub> element
563
564
    This routine translates any instance of the 'populate' function call in
565
    the form of:
566
567
            populate variable_name
568
569
    into
570
571
            variable_name="<sub idref="variable_name"/>"
572
573
    Also transforms any instance of the 'ansible-populate' function call in the
574
    form of:
575
            (ansible-populate variable_name)
576
    into
577
578
            <sub idref="variable_name"/>
579
580
    Also transforms any instance of some other known remediation function (e.g.
581
    'replace_or_append' etc.) from the form of:
582
583
            function_name "arg1" "arg2" ... "argN"
584
585
    into:
586
587
            <sub idref="function_function_name"/>
588
            function_name "arg1" "arg2" ... "argN"
589
    """
590
591
    if remediation_type == "ignition":
592
        return
593
    if remediation_type == "kubernetes":
594
        return
595
    elif remediation_type == "ansible":
596
        fix_text = fix.text
597
598
        if "(ansible-populate " in fix_text:
599
            raise RuntimeError(
600
                "(ansible-populate VAR) has been deprecated. Please use "
601
                "(xccdf-var VAR) instead. Keep in mind that the latter will "
602
                "make an ansible variable out of XCCDF Value as opposed to "
603
                "substituting directly."
604
            )
605
606
        # If you change this string make sure it still matches the pattern
607
        # defined in OpenSCAP. Otherwise you break variable handling in
608
        # 'oscap xccdf generate fix' and the variables won't be customizable!
609
        # https://github.com/OpenSCAP/openscap/blob/1.2.17/src/XCCDF_POLICY/xccdf_policy_remediate.c#L588
610
        #   const char *pattern =
611
        #     "- name: XCCDF Value [^ ]+ # promote to variable\n  set_fact:\n"
612
        #     "    ([^:]+): (.+)\n  tags:\n    - always\n";
613
        # We use !!str typecast to prevent treating values as different types
614
        # eg. yes as a bool or 077 as an octal number
615
        fix_text = re.sub(
616
            r"- \(xccdf-var\s+(\S+)\)",
617
            r"- name: XCCDF Value \1 # promote to variable\n"
618
            r"  set_fact:\n"
619
            r"    \1: !!str (ansible-populate \1)\n"
620
            r"  tags:\n"
621
            r"    - always",
622
            fix_text
623
        )
624
625
        pattern = r'\(ansible-populate\s*(\S+)\)'
626
627
        # we will get list what looks like
628
        # [text, varname, text, varname, ..., text]
629
        parts = re.split(pattern, fix_text)
630
631
        fix.text = parts[0]  # add first "text"
632
        for index in range(1, len(parts), 2):
633
            varname = parts[index]
634
            text_between_vars = parts[index + 1]
635
636
            # we cannot combine elements and text easily
637
            # so text is in ".tail" of element
638
            xccdfvarsub = ElementTree.SubElement(fix, "sub", idref=varname)
639
            xccdfvarsub.tail = text_between_vars
640
        return
641
642
    elif remediation_type == "puppet":
643
        pattern = r'\(puppet-populate\s*(\S+)\)'
644
645
        # we will get list what looks like
646
        # [text, varname, text, varname, ..., text]
647
        parts = re.split(pattern, fix.text)
648
649
        fix.text = parts[0]  # add first "text"
650
        for index in range(1, len(parts), 2):
651
            varname = parts[index]
652
            text_between_vars = parts[index + 1]
653
654
            # we cannot combine elements and text easily
655
            # so text is in ".tail" of element
656
            xccdfvarsub = ElementTree.SubElement(fix, "sub", idref=varname)
657
            xccdfvarsub.tail = text_between_vars
658
        return
659
660
    elif remediation_type == "anaconda":
661
        pattern = r'\(anaconda-populate\s*(\S+)\)'
662
663
        # we will get list what looks like
664
        # [text, varname, text, varname, ..., text]
665
        parts = re.split(pattern, fix.text)
666
667
        fix.text = parts[0]  # add first "text"
668
        for index in range(1, len(parts), 2):
669
            varname = parts[index]
670
            text_between_vars = parts[index + 1]
671
672
            # we cannot combine elements and text easily
673
            # so text is in ".tail" of element
674
            xccdfvarsub = ElementTree.SubElement(fix, "sub", idref=varname)
675
            xccdfvarsub.tail = text_between_vars
676
        return
677
678
    elif remediation_type == "bash":
679
        # This remediation script doesn't utilize internal remediation functions
680
        # Skip it without any further processing
681
        if 'remediation_functions' not in fix.text:
682
            return
683
684
        # This remediation script utilizes some of internal remediation functions
685
        # Expand shell variables and remediation functions calls with <xccdf:sub>
686
        # elements
687
        pattern = r'\n+(\s*(?:' + r'|'.join(remediation_functions) + r')[^\n]*)\n'
688
        patcomp = re.compile(pattern, re.DOTALL)
689
        fixparts = re.split(patcomp, fix.text)
690
        if fixparts[0] is not None:
691
            # Split the portion of fix.text from fix start to first call of
692
            # remediation function, keeping only the third part:
693
            # * tail        to hold part of the fix.text after inclusion,
694
            #               but before first call of remediation function
695
            try:
696
                rfpattern = '(.*remediation_functions)(.*)'
697
                rfpatcomp = re.compile(rfpattern, re.DOTALL)
698
                _, _, tail, _ = re.split(rfpatcomp, fixparts[0], maxsplit=2)
699
            except ValueError:
700
                sys.stderr.write("Processing fix.text for: %s rule\n"
701
                                 % fix.get('rule'))
702
                sys.stderr.write("Unable to extract part of the fix.text "
703
                                 "after inclusion of remediation functions."
704
                                 " Aborting..\n")
705
                sys.exit(1)
706
            # If the 'tail' is not empty, make it new fix.text.
707
            # Otherwise use ''
708
            fix.text = tail if tail is not None else ''
0 ignored issues
show
introduced by
The variable tail does not seem to be defined for all execution paths.
Loading history...
709
            # Drop the first element of 'fixparts' since it has been processed
710
            fixparts.pop(0)
711
            # Perform sanity check on new 'fixparts' list content (to continue
712
            # successfully 'fixparts' has to contain even count of elements)
713
            if len(fixparts) % 2 != 0:
714
                sys.stderr.write("Error performing XCCDF expansion on "
715
                                 "remediation script: %s\n"
716
                                 % fix.get("rule"))
717
                sys.stderr.write("Invalid count of elements. Exiting!\n")
718
                sys.exit(1)
719
            # Process remaining 'fixparts' elements in pairs
720
            # First pair element is remediation function to be XCCDF expanded
721
            # Second pair element (if not empty) is the portion of the original
722
            # fix text to be used in newly added sublement's tail
723
            for idx in range(0, len(fixparts), 2):
724
                # We previously removed enclosing newlines when creating
725
                # fixparts list. Add them back and reuse the above 'pattern'
726
                fixparts[idx] = "\n%s\n" % fixparts[idx]
727
                # Sanity check (verify the first field truly contains call of
728
                # some of the remediation functions)
729
                if re.match(pattern, fixparts[idx], re.DOTALL) is not None:
730
                    # This chunk contains call of 'populate' function
731
                    if "populate" in fixparts[idx]:
732
                        varname, fixtextcontrib = get_populate_replacement(remediation_type,
733
                                                                           fixparts[idx])
734
                        # Define new XCCDF <sub> element for the variable
735
                        xccdfvarsub = ElementTree.Element("sub", idref=varname)
736
737
                        # If this is first sub element,
738
                        # the textcontribution needs to go to fix text
739
                        # otherwise, append to last subelement
740
                        nfixchildren = len(list(fix))
741
                        if nfixchildren == 0:
742
                            fix.text += fixtextcontrib
743
                        else:
744
                            previouselem = fix[nfixchildren-1]
745
                            previouselem.tail += fixtextcontrib
746
747
                        # If second pair element is not empty, append it as
748
                        # tail for the subelement (prefixed with closing '"')
749
                        if fixparts[idx + 1] is not None:
750
                            xccdfvarsub.tail = '"' + '\n' + fixparts[idx + 1]
751
                        # Otherwise append just enclosing '"'
752
                        else:
753
                            xccdfvarsub.tail = '"' + '\n'
754
                        # Append the new subelement to the fix element
755
                        fix.append(xccdfvarsub)
756
                    # This chunk contains call of other remediation function
757
                    else:
758
                        # Extract remediation function name
759
                        funcname = re.search(r'\n\s*(\S+)(| .*)\n',
760
                                             fixparts[idx],
761
                                             re.DOTALL).group(1)
762
                        # Define new XCCDF <sub> element for the function
763
                        xccdffuncsub = ElementTree.Element(
764
                            "sub", idref='function_%s' % funcname)
765
                        # Append original function call into tail of the
766
                        # subelement
767
                        xccdffuncsub.tail = fixparts[idx]
768
                        # If the second element of the pair is not empty,
769
                        # append it to the tail of the subelement too
770
                        if fixparts[idx + 1] is not None:
771
                            xccdffuncsub.tail += fixparts[idx + 1]
772
                        # Append the new subelement to the fix element
773
                        fix.append(xccdffuncsub)
774
                        # Ensure the newly added <xccdf:sub> element for the
775
                        # function will be always inserted at newline
776
                        # If xccdffuncsub is the first <xccdf:sub> element
777
                        # being added as child of <fix> and fix.text doesn't
778
                        # end up with newline character, append the newline
779
                        # to the fix.text
780
                        if list(fix).index(xccdffuncsub) == 0:
781
                            if re.search(r'.*\n$', fix.text) is None:
782
                                fix.text += '\n'
783
                        # If xccdffuncsub isn't the first child (first
784
                        # <xccdf:sub> being added), and tail of previous
785
                        # child doesn't end up with newline, append the newline
786
                        # to the tail of previous child
787
                        else:
788
                            previouselem = fix[list(fix).index(xccdffuncsub) - 1]
789
                            if re.search(r'.*\n$', previouselem.tail) is None:
790
                                previouselem.tail += '\n'
791
792
        # Perform a sanity check if all known remediation function calls have been
793
        # properly XCCDF substituted. Exit with failure if some wasn't
794
795
        # First concat output form of modified fix text (including text appended
796
        # to all children of the fix)
797
        modfix = [fix.text]
798
        for child in list(fix):
799
            if child is not None and child.text is not None:
800
                modfix.append(child.text)
801
        modfixtext = "".join(modfix)
802
        # Don't perform sanity check at bash comments because they are not substituted
803
        modfixtext = re.sub(r'#.*', '', modfixtext)
804
        for func in remediation_functions:
805
            # Then efine expected XCCDF sub element form for this function
806
            funcxccdfsub = "<sub idref=\"function_%s\"" % func
807
            # Finally perform the sanity check -- if function was properly XCCDF
808
            # substituted both the original function call and XCCDF <sub> element
809
            # for that function need to be present in the modified text of the fix
810
            # Otherwise something went wrong, thus exit with failure
811
            if func in modfixtext and funcxccdfsub not in modfixtext:
812
                sys.stderr.write("Error performing XCCDF <sub> substitution "
813
                                 "for function %s in %s fix. Exiting...\n"
814
                                 % (func, fix.get("rule")))
815
                sys.exit(1)
816
    else:
817
        sys.stderr.write("Unknown remediation type '%s'\n" % (remediation_type))
818
        sys.exit(1)
819