Passed
Push — master ( 1b83a1...99b2eb )
by Jan
02:30 queued 15s
created

AnsibleRemediation._get_cce()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 2
ccs 2
cts 2
cp 1
crap 1
rs 10
c 0
b 0
f 0
1 2
from __future__ import absolute_import
2 2
from __future__ import print_function
3
4 2
import sys
5 2
import os
6 2
import os.path
7 2
import re
8 2
import codecs
9 2
from collections import defaultdict, namedtuple
10
11
12 2
import ssg.yaml
13 2
from . import build_yaml
14 2
from . import rules
15 2
from . import utils
16
17 2
from . import constants
18 2
from .jinja import process_file_with_macros as jinja_process_file
19
20 2
from .xml import ElementTree
21
22 2
REMEDIATION_TO_EXT_MAP = {
23
    'anaconda': '.anaconda',
24
    'ansible': '.yml',
25
    'bash': '.sh',
26
    'puppet': '.pp',
27
    'ignition': '.yml',
28
    'kubernetes': '.yml'
29
}
30
31 2
FILE_GENERATED_HASH_COMMENT = '# THIS FILE IS GENERATED'
32
33 2
REMEDIATION_CONFIG_KEYS = ['complexity', 'disruption', 'platform', 'reboot',
34
                           'strategy']
35 2
REMEDIATION_ELM_KEYS = ['complexity', 'disruption', 'reboot', 'strategy']
36
37
38 2
def get_available_functions(build_dir):
39
    """Parse the content of "$CMAKE_BINARY_DIR/bash-remediation-functions.xml"
40
    XML file to obtain the list of currently known SCAP Security Guide internal
41
    remediation functions"""
42
43
    # If location of /shared directory is known
44
    if build_dir is None or not os.path.isdir(build_dir):
45
        sys.stderr.write("Expected '%s' to be the build directory. It doesn't "
46
                         "exist or is not a directory." % (build_dir))
47
        sys.exit(1)
48
49
    # Construct the final path of XML file with remediation functions
50
    xmlfilepath = \
51
        os.path.join(build_dir, "bash-remediation-functions.xml")
52
53
    if not os.path.isfile(xmlfilepath):
54
        sys.stderr.write("Expected '%s' to contain the remediation functions. "
55
                         "The file was not found!\n" % (xmlfilepath))
56
        sys.exit(1)
57
58
    remediation_functions = []
59
    with codecs.open(xmlfilepath, "r", encoding="utf-8") as xmlfile:
60
        filestring = xmlfile.read()
61
        # This regex looks implementation dependent but we can rely on the element attributes
62
        # being present. Beware, DOTALL means we go through the whole file at once.
63
        # We can't rely on ElementTree sorting XML attrs in any way since Python 3.7.
64
        remediation_functions = re.findall(
65
            r'<Value[^>]+id=\"function_(\S+)\"',
66
            filestring, re.DOTALL
67
        )
68
69
    return remediation_functions
70
71
72 2
def get_fixgroup_for_type(fixcontent, remediation_type):
73
    """
74
    For a given remediation type, return a new subelement of that type.
75
76
    Exits if passed an unknown remediation type.
77
    """
78
    if remediation_type == 'anaconda':
79
        return ElementTree.SubElement(
80
            fixcontent, "fix-group", id="anaconda",
81
            system="urn:redhat:anaconda:pre",
82
            xmlns="http://checklists.nist.gov/xccdf/1.1")
83
84
    elif remediation_type == 'ansible':
85
        return ElementTree.SubElement(
86
            fixcontent, "fix-group", id="ansible",
87
            system="urn:xccdf:fix:script:ansible",
88
            xmlns="http://checklists.nist.gov/xccdf/1.1")
89
90
    elif remediation_type == 'bash':
91
        return ElementTree.SubElement(
92
            fixcontent, "fix-group", id="bash",
93
            system="urn:xccdf:fix:script:sh",
94
            xmlns="http://checklists.nist.gov/xccdf/1.1")
95
96
    elif remediation_type == 'puppet':
97
        return ElementTree.SubElement(
98
            fixcontent, "fix-group", id="puppet",
99
            system="urn:xccdf:fix:script:puppet",
100
            xmlns="http://checklists.nist.gov/xccdf/1.1")
101
102
    elif remediation_type == 'ignition':
103
        return ElementTree.SubElement(
104
            fixcontent, "fix-group", id="ignition",
105
            system="urn:xccdf:fix:script:ignition",
106
            xmlns="http://checklists.nist.gov/xccdf/1.1")
107
108
    elif remediation_type == 'kubernetes':
109
        return ElementTree.SubElement(
110
            fixcontent, "fix-group", id="kubernetes",
111
            system="urn:xccdf:fix:script:kubernetes",
112
            xmlns="http://checklists.nist.gov/xccdf/1.1")
113
114
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
115
                     % (remediation_type))
116
    sys.exit(1)
117
118
119 2
def is_supported_filename(remediation_type, filename):
120
    """
121
    Checks if filename has a supported extension for remediation_type.
122
123
    Exits when remediation_type is of an unknown type.
124
    """
125 2
    if remediation_type in REMEDIATION_TO_EXT_MAP:
126 2
        return filename.endswith(REMEDIATION_TO_EXT_MAP[remediation_type])
127
128
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
129
                     % (remediation_type))
130
    sys.exit(1)
131
132
133 2
def get_populate_replacement(remediation_type, text):
134
    """
135
    Return varname, fixtextcontribution
136
    """
137
138
    if remediation_type == 'bash':
139
        # Extract variable name
140
        varname = re.search(r'\npopulate (\S+)\n',
141
                            text, re.DOTALL).group(1)
142
        # Define fix text part to contribute to main fix text
143
        fixtextcontribution = '\n%s="' % varname
144
        return (varname, fixtextcontribution)
145
146
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
147
                     % (remediation_type))
148
    sys.exit(1)
149
150
151 2
def split_remediation_content_and_metadata(fix_file):
152 2
    remediation_contents = []
153 2
    config = defaultdict(lambda: None)
154
155
    # Assignment automatically escapes shell characters for XML
156 2
    for line in fix_file.splitlines():
157 2
        if line.startswith(FILE_GENERATED_HASH_COMMENT):
158
            continue
159
160 2
        if line.startswith('#') and line.count('=') == 1:
161 2
            (key, value) = line.strip('#').split('=')
162 2
            if key.strip() in REMEDIATION_CONFIG_KEYS:
163 2
                config[key.strip()] = value.strip()
164 2
                continue
165
166
        # If our parsed line wasn't a config item, add it to the
167
        # returned file contents. This includes when the line
168
        # begins with a '#' and contains an equals sign, but
169
        # the "key" isn't one of the known keys from
170
        # REMEDIATION_CONFIG_KEYS.
171 2
        remediation_contents.append(line)
172
173 2
    contents = "\n".join(remediation_contents)
174 2
    remediation = namedtuple('remediation', ['contents', 'config'])
175 2
    return remediation(contents=contents, config=config)
176
177
178 2
def parse_from_file_with_jinja(file_path, env_yaml):
179
    """
180
    Parses a remediation from a file. As remediations contain jinja macros,
181
    we need a env_yaml context to process these. In practice, no remediations
182
    use jinja in the configuration, so for extracting only the configuration,
183
    env_yaml can be an abritrary product.yml dictionary.
184
185
    If the logic of configuration parsing changes significantly, please also
186
    update ssg.fixes.parse_platform(...).
187
    """
188
189 2
    fix_file = jinja_process_file(file_path, env_yaml)
190 2
    return split_remediation_content_and_metadata(fix_file)
191
192
193 2
def parse_from_file_without_jinja(file_path):
194
    """
195
    Parses a remediation from a file. Doesn't process the Jinja macros.
196
    This function is useful in build phases in which all the Jinja macros
197
    are already resolved.
198
    """
199
    with open(file_path, "r") as f:
200
        f_str = f.read()
201
        return split_remediation_content_and_metadata(f_str)
202
203
204 2
class Remediation(object):
205 2
    def __init__(self, file_path, remediation_type):
206 2
        self.file_path = file_path
207 2
        self.local_env_yaml = dict()
208
209 2
        self.metadata = defaultdict(lambda: None)
210
211 2
        self.remediation_type = remediation_type
212 2
        self.associated_rule = None
213
214 2
    def load_rule_from(self, rule_path):
215 2
        self.associated_rule = build_yaml.Rule.from_yaml(rule_path)
216 2
        self.expand_env_yaml_from_rule()
217
218 2
    def expand_env_yaml_from_rule(self):
219 2
        if not self.associated_rule:
220
            return
221
222 2
        self.local_env_yaml["rule_title"] = self.associated_rule.title
223 2
        self.local_env_yaml["rule_id"] = self.associated_rule.id_
224
225 2
    def parse_from_file_with_jinja(self, env_yaml):
226 2
        return parse_from_file_with_jinja(self.file_path, env_yaml)
227
228
229 2
def process(remediation, env_yaml, fixes, rule_id):
230
    """
231
    Process a fix, adding it to fixes iff the file is of a valid extension
232
    for the remediation type and the fix is valid for the current product.
233
234
    Note that platform is a required field in the contents of the fix.
235
    """
236 2
    if not is_supported_filename(remediation.remediation_type, remediation.file_path):
237
        return
238
239 2
    result = remediation.parse_from_file_with_jinja(env_yaml)
240 2
    platforms = result.config['platform']
241
242 2
    if not platforms:
243
        raise RuntimeError(
244
            "The '%s' remediation script does not contain the "
245
            "platform identifier!" % (remediation.file_path))
246
247 2
    for platform in platforms.split(","):
248 2
        if platform.strip() != platform:
249
            msg = (
250
                "Comma-separated '{platform}' platforms "
251
                "in '{remediation_file}' contains whitespace."
252
                .format(platform=platforms, remediation_file=remediation.file_path))
253
            raise ValueError(msg)
254
255 2
    product = env_yaml["product"]
256 2
    if utils.is_applicable_for_product(platforms, product):
257 2
        fixes[rule_id] = result
258
259 2
    return result
260
261
262 2
class BashRemediation(Remediation):
263 2
    def __init__(self, file_path):
264 2
        super(BashRemediation, self).__init__(file_path, "bash")
265
266
267 2
class AnsibleRemediation(Remediation):
268 2
    def __init__(self, file_path):
269 2
        super(AnsibleRemediation, self).__init__(
270
            file_path, "ansible")
271
272 2
        self.body = None
273
274 2
    def parse_from_file_with_jinja(self, env_yaml):
275 2
        self.local_env_yaml.update(env_yaml)
276 2
        result = super(AnsibleRemediation, self).parse_from_file_with_jinja(self.local_env_yaml)
277
278 2
        if not self.associated_rule:
279
            return result
280
281 2
        parsed = ssg.yaml.ordered_load(result.contents)
282
283 2
        self.update(parsed, result.config)
284
285 2
        updated_yaml_text = ssg.yaml.ordered_dump(
286
            parsed, None, default_flow_style=False)
287 2
        result = result._replace(contents=updated_yaml_text)
288
289 2
        self.body = parsed
290 2
        self.metadata = result.config
291
292 2
        return result
293
294 2
    def update_tags_from_config(self, to_update, config):
295 2
        tags = to_update.get("tags", [])
296 2
        if "strategy" in config:
297 2
            tags.append("{0}_strategy".format(config["strategy"]))
298 2
        if "complexity" in config:
299 2
            tags.append("{0}_complexity".format(config["complexity"]))
300 2
        if "disruption" in config:
301 2
            tags.append("{0}_disruption".format(config["disruption"]))
302 2
        if "reboot" in config:
303 2
            if config["reboot"] == "true":
304
                reboot_tag = "reboot_required"
305
            else:
306 2
                reboot_tag = "no_reboot_needed"
307 2
            tags.append(reboot_tag)
308 2
        to_update["tags"] = sorted(tags)
309
310 2
    def update_tags_from_rule(self, to_update):
311 2
        if not self.associated_rule:
312
            raise RuntimeError("The Ansible snippet has no rule loaded.")
313
314 2
        tags = to_update.get("tags", [])
315 2
        tags.insert(0, "{0}_severity".format(self.associated_rule.severity))
316 2
        tags.insert(0, self.associated_rule.id_)
317
318 2
        cce_num = self._get_cce()
319 2
        if cce_num:
320 2
            tags.append("CCE-{0}".format(cce_num))
321
322 2
        refs = self.get_references()
323 2
        tags.extend(refs)
324 2
        to_update["tags"] = sorted(tags)
325
326 2
    def _get_cce(self):
327 2
        return self.associated_rule.identifiers.get("cce", None)
328
329 2
    def get_references(self):
330 2
        if not self.associated_rule:
331
            raise RuntimeError("The Ansible snippet has no rule loaded.")
332
333 2
        result = []
334 2
        for ref_class, prefix in constants.REF_PREFIX_MAP.items():
335 2
            refs = self._get_rule_reference(ref_class)
336 2
            result.extend(["{prefix}-{value}".format(prefix=prefix, value=v) for v in refs])
337 2
        return result
338
339 2
    def _get_rule_reference(self, ref_class):
340 2
        refs = self.associated_rule.references.get(ref_class, "")
341 2
        if refs:
342 2
            return refs.split(",")
343
        else:
344 2
            return []
345
346 2
    def update_when_from_rule(self, to_update):
347 2
        additional_when = ""
348 2
        if self.associated_rule.platform == "machine":
349 2
            additional_when = ('ansible_virtualization_role != "guest" '
350
                               'or ansible_virtualization_type != "docker"')
351 2
        to_update.setdefault("when", "")
352 2
        new_when = ssg.yaml.update_yaml_list_or_string(to_update["when"], additional_when)
353 2
        if not new_when:
354
            to_update.pop("when")
355
        else:
356 2
            to_update["when"] = new_when
357
358 2
    def update(self, parsed, config):
359 2
        for p in parsed:
360 2
            if not isinstance(p, dict):
361
                continue
362 2
            self.update_when_from_rule(p)
363 2
            self.update_tags_from_config(p, config)
364 2
            self.update_tags_from_rule(p)
365
366 2
    @classmethod
367
    def from_snippet_and_rule(cls, snippet_fname, rule_fname):
368 2
        if os.path.isfile(snippet_fname) and os.path.isfile(rule_fname):
369 2
            result = cls(snippet_fname)
370 2
            try:
371 2
                result.load_rule_from(rule_fname)
372
            except ssg.yaml.DocumentationNotComplete:
373
                # Happens on non-debug build when a rule is "documentation-incomplete"
374
                return None
375 2
            return result
376
377
378 2
class AnacondaRemediation(Remediation):
379 2
    def __init__(self, file_path):
380
        super(AnacondaRemediation, self).__init__(
381
            file_path, "anaconda")
382
383
384 2
class PuppetRemediation(Remediation):
385 2
    def __init__(self, file_path):
386
        super(PuppetRemediation, self).__init__(
387
            file_path, "puppet")
388
389
390 2
class IgnitionRemediation(Remediation):
391 2
    def __init__(self, file_path):
392
        super(IgnitionRemediation, self).__init__(
393
            file_path, "ignition")
394
395
396 2
class KubernetesRemediation(Remediation):
397 2
    def __init__(self, file_path):
398
        super(KubernetesRemediation, self).__init__(
399
              file_path, "kubernetes")
400
401
402 2
REMEDIATION_TO_CLASS = {
403
    'anaconda': AnacondaRemediation,
404
    'ansible': AnsibleRemediation,
405
    'bash': BashRemediation,
406
    'puppet': PuppetRemediation,
407
    'ignition': IgnitionRemediation,
408
    'kubernetes': KubernetesRemediation,
409
}
410
411
412 2
def write_fixes_to_xml(remediation_type, build_dir, output_path, fixes):
413
    """
414
    Builds a fix-content XML tree from the contents of fixes
415
    and writes it to output_path.
416
    """
417
418
    fixcontent = ElementTree.Element("fix-content", system="urn:xccdf:fix:script:sh",
419
                                     xmlns="http://checklists.nist.gov/xccdf/1.1")
420
    fixgroup = get_fixgroup_for_type(fixcontent, remediation_type)
421
422
    if remediation_type == "bash":
423
        # (bash_)remediation_functions are really only used for bash
424
        remediation_functions = get_available_functions(build_dir)
425
    else:
426
        remediation_functions = None
427
428
    for fix_name in fixes:
429
        fix_contents, config = fixes[fix_name]
430
431
        fix_elm = ElementTree.SubElement(fixgroup, "fix")
432
        fix_elm.set("rule", fix_name)
433
434
        for key in REMEDIATION_ELM_KEYS:
435
            if config[key]:
436
                fix_elm.set(key, config[key])
437
438
        fix_elm.text = fix_contents + "\n"
439
440
        # Expand shell variables and remediation functions
441
        # into corresponding XCCDF <sub> elements
442
        expand_xccdf_subs(fix_elm, remediation_type, remediation_functions)
443
444
    tree = ElementTree.ElementTree(fixcontent)
445
    tree.write(output_path)
446
447
448 2
def write_fixes_to_dir(fixes, remediation_type, output_dir):
449
    """
450
    Writes fixes as files to output_dir, each fix as a separate file
451
    """
452
    try:
453
        extension = REMEDIATION_TO_EXT_MAP[remediation_type]
454
    except KeyError:
455
        raise ValueError("Unknown remediation type %s." % remediation_type)
456
457
    if not os.path.exists(output_dir):
458
        os.makedirs(output_dir)
459
    for fix_name, fix in fixes.items():
460
        fix_contents, config = fix
461
        fix_path = os.path.join(output_dir, fix_name + extension)
462
        with open(fix_path, "w") as f:
463
            for k, v in config.items():
464
                f.write("# %s = %s\n" % (k, v))
465
            f.write(fix_contents)
466
467
468 2
def get_rule_dir_remediations(dir_path, remediation_type, product=None):
469
    """
470
    Gets a list of remediations of type remediation_type contained in a
471
    rule directory. If product is None, returns all such remediations.
472
    If product is not None, returns applicable remediations in order of
473
    priority:
474
475
        {{{ product }}}.ext -> shared.ext
476
477
    Only returns remediations which exist.
478
    """
479
480 2
    if not rules.is_rule_dir(dir_path):
481
        return []
482
483 2
    remediations_dir = os.path.join(dir_path, remediation_type)
484 2
    has_remediations_dir = os.path.isdir(remediations_dir)
485 2
    ext = REMEDIATION_TO_EXT_MAP[remediation_type]
486 2
    if not has_remediations_dir:
487
        return []
488
489 2
    results = []
490 2
    for remediation_file in os.listdir(remediations_dir):
491 2
        file_name, file_ext = os.path.splitext(remediation_file)
492 2
        remediation_path = os.path.join(remediations_dir, remediation_file)
493
494 2
        if file_ext == ext and rules.applies_to_product(file_name, product):
495 2
            if file_name == 'shared':
496
                results.append(remediation_path)
497
            else:
498 2
                results.insert(0, remediation_path)
499
500 2
    return results
501
502
503 2
def expand_xccdf_subs(fix, remediation_type, remediation_functions):
504
    """For those remediation scripts utilizing some of the internal SCAP
505
    Security Guide remediation functions expand the selected shell variables
506
    and remediation functions calls with <xccdf:sub> element
507
508
    This routine translates any instance of the 'populate' function call in
509
    the form of:
510
511
            populate variable_name
512
513
    into
514
515
            variable_name="<sub idref="variable_name"/>"
516
517
    Also transforms any instance of the 'ansible-populate' function call in the
518
    form of:
519
            (ansible-populate variable_name)
520
    into
521
522
            <sub idref="variable_name"/>
523
524
    Also transforms any instance of some other known remediation function (e.g.
525
    'replace_or_append' etc.) from the form of:
526
527
            function_name "arg1" "arg2" ... "argN"
528
529
    into:
530
531
            <sub idref="function_function_name"/>
532
            function_name "arg1" "arg2" ... "argN"
533
    """
534
535
    if remediation_type == "ignition":
536
        return
537
    if remediation_type == "kubernetes":
538
        return
539
    elif remediation_type == "ansible":
540
        fix_text = fix.text
541
542
        if "(ansible-populate " in fix_text:
543
            raise RuntimeError(
544
                "(ansible-populate VAR) has been deprecated. Please use "
545
                "(xccdf-var VAR) instead. Keep in mind that the latter will "
546
                "make an ansible variable out of XCCDF Value as opposed to "
547
                "substituting directly."
548
            )
549
550
        # If you change this string make sure it still matches the pattern
551
        # defined in OpenSCAP. Otherwise you break variable handling in
552
        # 'oscap xccdf generate fix' and the variables won't be customizable!
553
        # https://github.com/OpenSCAP/openscap/blob/1.2.17/src/XCCDF_POLICY/xccdf_policy_remediate.c#L588
554
        #   const char *pattern =
555
        #     "- name: XCCDF Value [^ ]+ # promote to variable\n  set_fact:\n"
556
        #     "    ([^:]+): (.+)\n  tags:\n    - always\n";
557
        # We use !!str typecast to prevent treating values as different types
558
        # eg. yes as a bool or 077 as an octal number
559
        fix_text = re.sub(
560
            r"- \(xccdf-var\s+(\S+)\)",
561
            r"- name: XCCDF Value \1 # promote to variable\n"
562
            r"  set_fact:\n"
563
            r"    \1: !!str (ansible-populate \1)\n"
564
            r"  tags:\n"
565
            r"    - always",
566
            fix_text
567
        )
568
569
        pattern = r'\(ansible-populate\s*(\S+)\)'
570
571
        # we will get list what looks like
572
        # [text, varname, text, varname, ..., text]
573
        parts = re.split(pattern, fix_text)
574
575
        fix.text = parts[0]  # add first "text"
576
        for index in range(1, len(parts), 2):
577
            varname = parts[index]
578
            text_between_vars = parts[index + 1]
579
580
            # we cannot combine elements and text easily
581
            # so text is in ".tail" of element
582
            xccdfvarsub = ElementTree.SubElement(fix, "sub", idref=varname)
583
            xccdfvarsub.tail = text_between_vars
584
        return
585
586
    elif remediation_type == "puppet":
587
        pattern = r'\(puppet-populate\s*(\S+)\)'
588
589
        # we will get list what looks like
590
        # [text, varname, text, varname, ..., text]
591
        parts = re.split(pattern, fix.text)
592
593
        fix.text = parts[0]  # add first "text"
594
        for index in range(1, len(parts), 2):
595
            varname = parts[index]
596
            text_between_vars = parts[index + 1]
597
598
            # we cannot combine elements and text easily
599
            # so text is in ".tail" of element
600
            xccdfvarsub = ElementTree.SubElement(fix, "sub", idref=varname)
601
            xccdfvarsub.tail = text_between_vars
602
        return
603
604
    elif remediation_type == "anaconda":
605
        pattern = r'\(anaconda-populate\s*(\S+)\)'
606
607
        # we will get list what looks like
608
        # [text, varname, text, varname, ..., text]
609
        parts = re.split(pattern, fix.text)
610
611
        fix.text = parts[0]  # add first "text"
612
        for index in range(1, len(parts), 2):
613
            varname = parts[index]
614
            text_between_vars = parts[index + 1]
615
616
            # we cannot combine elements and text easily
617
            # so text is in ".tail" of element
618
            xccdfvarsub = ElementTree.SubElement(fix, "sub", idref=varname)
619
            xccdfvarsub.tail = text_between_vars
620
        return
621
622
    elif remediation_type == "bash":
623
        # This remediation script doesn't utilize internal remediation functions
624
        # Skip it without any further processing
625
        if 'remediation_functions' not in fix.text:
626
            return
627
628
        # This remediation script utilizes some of internal remediation functions
629
        # Expand shell variables and remediation functions calls with <xccdf:sub>
630
        # elements
631
        pattern = r'\n+(\s*(?:' + r'|'.join(remediation_functions) + r')[^\n]*)\n'
632
        patcomp = re.compile(pattern, re.DOTALL)
633
        fixparts = re.split(patcomp, fix.text)
634
        if fixparts[0] is not None:
635
            # Split the portion of fix.text from fix start to first call of
636
            # remediation function, keeping only the third part:
637
            # * tail        to hold part of the fix.text after inclusion,
638
            #               but before first call of remediation function
639
            try:
640
                rfpattern = '(.*remediation_functions)(.*)'
641
                rfpatcomp = re.compile(rfpattern, re.DOTALL)
642
                _, _, tail, _ = re.split(rfpatcomp, fixparts[0], maxsplit=2)
643
            except ValueError:
644
                sys.stderr.write("Processing fix.text for: %s rule\n"
645
                                 % fix.get('rule'))
646
                sys.stderr.write("Unable to extract part of the fix.text "
647
                                 "after inclusion of remediation functions."
648
                                 " Aborting..\n")
649
                sys.exit(1)
650
            # If the 'tail' is not empty, make it new fix.text.
651
            # Otherwise use ''
652
            fix.text = tail if tail is not None else ''
0 ignored issues
show
introduced by
The variable tail does not seem to be defined for all execution paths.
Loading history...
653
            # Drop the first element of 'fixparts' since it has been processed
654
            fixparts.pop(0)
655
            # Perform sanity check on new 'fixparts' list content (to continue
656
            # successfully 'fixparts' has to contain even count of elements)
657
            if len(fixparts) % 2 != 0:
658
                sys.stderr.write("Error performing XCCDF expansion on "
659
                                 "remediation script: %s\n"
660
                                 % fix.get("rule"))
661
                sys.stderr.write("Invalid count of elements. Exiting!\n")
662
                sys.exit(1)
663
            # Process remaining 'fixparts' elements in pairs
664
            # First pair element is remediation function to be XCCDF expanded
665
            # Second pair element (if not empty) is the portion of the original
666
            # fix text to be used in newly added sublement's tail
667
            for idx in range(0, len(fixparts), 2):
668
                # We previously removed enclosing newlines when creating
669
                # fixparts list. Add them back and reuse the above 'pattern'
670
                fixparts[idx] = "\n%s\n" % fixparts[idx]
671
                # Sanity check (verify the first field truly contains call of
672
                # some of the remediation functions)
673
                if re.match(pattern, fixparts[idx], re.DOTALL) is not None:
674
                    # This chunk contains call of 'populate' function
675
                    if "populate" in fixparts[idx]:
676
                        varname, fixtextcontrib = get_populate_replacement(remediation_type,
677
                                                                           fixparts[idx])
678
                        # Define new XCCDF <sub> element for the variable
679
                        xccdfvarsub = ElementTree.Element("sub", idref=varname)
680
681
                        # If this is first sub element,
682
                        # the textcontribution needs to go to fix text
683
                        # otherwise, append to last subelement
684
                        nfixchildren = len(list(fix))
685
                        if nfixchildren == 0:
686
                            fix.text += fixtextcontrib
687
                        else:
688
                            previouselem = fix[nfixchildren-1]
689
                            previouselem.tail += fixtextcontrib
690
691
                        # If second pair element is not empty, append it as
692
                        # tail for the subelement (prefixed with closing '"')
693
                        if fixparts[idx + 1] is not None:
694
                            xccdfvarsub.tail = '"' + '\n' + fixparts[idx + 1]
695
                        # Otherwise append just enclosing '"'
696
                        else:
697
                            xccdfvarsub.tail = '"' + '\n'
698
                        # Append the new subelement to the fix element
699
                        fix.append(xccdfvarsub)
700
                    # This chunk contains call of other remediation function
701
                    else:
702
                        # Extract remediation function name
703
                        funcname = re.search(r'\n\s*(\S+)(| .*)\n',
704
                                             fixparts[idx],
705
                                             re.DOTALL).group(1)
706
                        # Define new XCCDF <sub> element for the function
707
                        xccdffuncsub = ElementTree.Element(
708
                            "sub", idref='function_%s' % funcname)
709
                        # Append original function call into tail of the
710
                        # subelement
711
                        xccdffuncsub.tail = fixparts[idx]
712
                        # If the second element of the pair is not empty,
713
                        # append it to the tail of the subelement too
714
                        if fixparts[idx + 1] is not None:
715
                            xccdffuncsub.tail += fixparts[idx + 1]
716
                        # Append the new subelement to the fix element
717
                        fix.append(xccdffuncsub)
718
                        # Ensure the newly added <xccdf:sub> element for the
719
                        # function will be always inserted at newline
720
                        # If xccdffuncsub is the first <xccdf:sub> element
721
                        # being added as child of <fix> and fix.text doesn't
722
                        # end up with newline character, append the newline
723
                        # to the fix.text
724
                        if list(fix).index(xccdffuncsub) == 0:
725
                            if re.search(r'.*\n$', fix.text) is None:
726
                                fix.text += '\n'
727
                        # If xccdffuncsub isn't the first child (first
728
                        # <xccdf:sub> being added), and tail of previous
729
                        # child doesn't end up with newline, append the newline
730
                        # to the tail of previous child
731
                        else:
732
                            previouselem = fix[list(fix).index(xccdffuncsub) - 1]
733
                            if re.search(r'.*\n$', previouselem.tail) is None:
734
                                previouselem.tail += '\n'
735
736
        # Perform a sanity check if all known remediation function calls have been
737
        # properly XCCDF substituted. Exit with failure if some wasn't
738
739
        # First concat output form of modified fix text (including text appended
740
        # to all children of the fix)
741
        modfix = [fix.text]
742
        for child in list(fix):
743
            if child is not None and child.text is not None:
744
                modfix.append(child.text)
745
        modfixtext = "".join(modfix)
746
        # Don't perform sanity check at bash comments because they are not substituted
747
        modfixtext = re.sub(r'#.*', '', modfixtext)
748
        for func in remediation_functions:
749
            # Then efine expected XCCDF sub element form for this function
750
            funcxccdfsub = "<sub idref=\"function_%s\"" % func
751
            # Finally perform the sanity check -- if function was properly XCCDF
752
            # substituted both the original function call and XCCDF <sub> element
753
            # for that function need to be present in the modified text of the fix
754
            # Otherwise something went wrong, thus exit with failure
755
            if func in modfixtext and funcxccdfsub not in modfixtext:
756
                sys.stderr.write("Error performing XCCDF <sub> substitution "
757
                                 "for function %s in %s fix. Exiting...\n"
758
                                 % (func, fix.get("rule")))
759
                sys.exit(1)
760
    else:
761
        sys.stderr.write("Unknown remediation type '%s'\n" % (remediation_type))
762
        sys.exit(1)
763