Passed
Pull Request — master (#4216)
by Matěj
02:33 queued 10s
created

AnsibleRemediation.update_when_from_rule()   A

Complexity

Conditions 3

Size

Total Lines 11
Code Lines 9

Duplication

Lines 11
Ratio 100 %

Code Coverage

Tests 8
CRAP Score 3.0123

Importance

Changes 0
Metric Value
cc 3
eloc 9
nop 2
dl 11
loc 11
ccs 8
cts 9
cp 0.8889
crap 3.0123
rs 9.95
c 0
b 0
f 0
1 2
from __future__ import absolute_import
2 2
from __future__ import print_function
3
4 2
import sys
5 2
import os
6 2
import os.path
7 2
import re
8 2
import codecs
9 2
from collections import defaultdict, namedtuple
10
11
12 2
import ssg.yaml
13 2
from . import build_yaml
14 2
from . import rules
15 2
from . import utils
16 2
from .jinja import process_file as jinja_process_file
17 2
from .xml import ElementTree
18
19 2
REMEDIATION_TO_EXT_MAP = {
20
    'anaconda': '.anaconda',
21
    'ansible': '.yml',
22
    'bash': '.sh',
23
    'puppet': '.pp'
24
}
25
26 2
FILE_GENERATED_HASH_COMMENT = '# THIS FILE IS GENERATED'
27
28 2
REMEDIATION_CONFIG_KEYS = ['complexity', 'disruption', 'platform', 'reboot',
29
                           'strategy']
30 2
REMEDIATION_ELM_KEYS = ['complexity', 'disruption', 'reboot', 'strategy']
31
32
33 2
def get_available_functions(build_dir):
34
    """Parse the content of "$CMAKE_BINARY_DIR/bash-remediation-functions.xml"
35
    XML file to obtain the list of currently known SCAP Security Guide internal
36
    remediation functions"""
37
38
    # If location of /shared directory is known
39
    if build_dir is None or not os.path.isdir(build_dir):
40
        sys.stderr.write("Expected '%s' to be the build directory. It doesn't "
41
                         "exist or is not a directory." % (build_dir))
42
        sys.exit(1)
43
44
    # Construct the final path of XML file with remediation functions
45
    xmlfilepath = \
46
        os.path.join(build_dir, "bash-remediation-functions.xml")
47
48
    if not os.path.isfile(xmlfilepath):
49
        sys.stderr.write("Expected '%s' to contain the remediation functions. "
50
                         "The file was not found!\n" % (xmlfilepath))
51
        sys.exit(1)
52
53
    remediation_functions = []
54
    with codecs.open(xmlfilepath, "r", encoding="utf-8") as xmlfile:
55
        filestring = xmlfile.read()
56
        # This regex looks implementation dependent but we can rely on
57
        # ElementTree sorting XML attrs alphabetically. Hidden is guaranteed
58
        # to be the first attr and ID is guaranteed to be second.
59
        remediation_functions = re.findall(
60
            r'<Value hidden=\"true\" id=\"function_(\S+)\"',
61
            filestring, re.DOTALL
62
        )
63
64
    return remediation_functions
65
66
67 2
def get_fixgroup_for_type(fixcontent, remediation_type):
68
    """
69
    For a given remediation type, return a new subelement of that type.
70
71
    Exits if passed an unknown remediation type.
72
    """
73
    if remediation_type == 'anaconda':
74
        return ElementTree.SubElement(
75
            fixcontent, "fix-group", id="anaconda",
76
            system="urn:redhat:anaconda:pre",
77
            xmlns="http://checklists.nist.gov/xccdf/1.1")
78
79
    elif remediation_type == 'ansible':
80
        return ElementTree.SubElement(
81
            fixcontent, "fix-group", id="ansible",
82
            system="urn:xccdf:fix:script:ansible",
83
            xmlns="http://checklists.nist.gov/xccdf/1.1")
84
85
    elif remediation_type == 'bash':
86
        return ElementTree.SubElement(
87
            fixcontent, "fix-group", id="bash",
88
            system="urn:xccdf:fix:script:sh",
89
            xmlns="http://checklists.nist.gov/xccdf/1.1")
90
91
    elif remediation_type == 'puppet':
92
        return ElementTree.SubElement(
93
            fixcontent, "fix-group", id="puppet",
94
            system="urn:xccdf:fix:script:puppet",
95
            xmlns="http://checklists.nist.gov/xccdf/1.1")
96
97
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
98
                     % (remediation_type))
99
    sys.exit(1)
100
101
102 2
def is_supported_filename(remediation_type, filename):
103
    """
104
    Checks if filename has a supported extension for remediation_type.
105
106
    Exits when remediation_type is of an unknown type.
107
    """
108 2
    if remediation_type in REMEDIATION_TO_EXT_MAP:
109 2
        return filename.endswith(REMEDIATION_TO_EXT_MAP[remediation_type])
110
111
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
112
                     % (remediation_type))
113
    sys.exit(1)
114
115
116 2
def get_populate_replacement(remediation_type, text):
117
    """
118
    Return varname, fixtextcontribution
119
    """
120
121
    if remediation_type == 'bash':
122
        # Extract variable name
123
        varname = re.search(r'\npopulate (\S+)\n',
124
                            text, re.DOTALL).group(1)
125
        # Define fix text part to contribute to main fix text
126
        fixtextcontribution = '\n%s="' % varname
127
        return (varname, fixtextcontribution)
128
129
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
130
                     % (remediation_type))
131
    sys.exit(1)
132
133
134 2
def split_remediation_content_and_metadata(fix_file):
135 2
    remediation_contents = []
136 2
    config = defaultdict(lambda: None)
137
138
    # Assignment automatically escapes shell characters for XML
139 2
    for line in fix_file.splitlines():
140 2
        if line.startswith(FILE_GENERATED_HASH_COMMENT):
141
            continue
142
143 2
        if line.startswith('#') and line.count('=') == 1:
144 2
            (key, value) = line.strip('#').split('=')
145 2
            if key.strip() in REMEDIATION_CONFIG_KEYS:
146 2
                config[key.strip()] = value.strip()
147 2
                continue
148
149
        # If our parsed line wasn't a config item, add it to the
150
        # returned file contents. This includes when the line
151
        # begins with a '#' and contains an equals sign, but
152
        # the "key" isn't one of the known keys from
153
        # REMEDIATION_CONFIG_KEYS.
154 2
        remediation_contents.append(line)
155
156 2
    contents = "\n".join(remediation_contents)
157 2
    remediation = namedtuple('remediation', ['contents', 'config'])
158 2
    return remediation(contents=contents, config=config)
159
160
161 2
def parse_from_file_with_jinja(file_path, env_yaml):
162
    """
163
    Parses a remediation from a file. As remediations contain jinja macros,
164
    we need a env_yaml context to process these. In practice, no remediations
165
    use jinja in the configuration, so for extracting only the configuration,
166
    env_yaml can be an abritrary product.yml dictionary.
167
168
    If the logic of configuration parsing changes significantly, please also
169
    update ssg.fixes.parse_platform(...).
170
    """
171
172 2
    fix_file = jinja_process_file(file_path, env_yaml)
173 2
    return split_remediation_content_and_metadata(fix_file)
174
175
176 2
def parse_from_file_without_jinja(file_path):
177
    """
178
    Parses a remediation from a file. Doesn't process the Jinja macros.
179
    This function is useful in build phases in which all the Jinja macros
180
    are already resolved.
181
    """
182
    with open(file_path, "r") as f:
183
        f_str = f.read()
184
        return split_remediation_content_and_metadata(f_str)
185
186
187 2
class Remediation(object):
188 2
    def __init__(self, file_path, remediation_type):
189 2
        self.file_path = file_path
190 2
        self.local_env_yaml = dict()
191
192 2
        self.metadata = defaultdict(lambda: None)
193
194 2
        self.remediation_type = remediation_type
195 2
        self.associated_rule = None
196
197 2
    def load_associated_rule(self, resolved_rules_dir, rule_id):
198
        rule_path = os.path.join(
199
            resolved_rules_dir, rule_id + ".yml")
200
        return self.load_rule_from(rule_path)
201
202 2
    def load_rule_from(self, rule_path):
203 2
        if not os.path.isfile(rule_path):
204
            msg = ("{lang} remediation snippet can't load the "
205
                   "respective rule YML at {rule_fname}"
206
                   .format(rule_fname=rule_path,
207
                           lang=self.remediation_type))
208
            print(msg, file=sys.stderr)
209
        else:
210 2
            self.associated_rule = build_yaml.Rule.from_yaml(rule_path)
211 2
            self.expand_env_yaml_from_rule()
212
213 2
    def expand_env_yaml_from_rule(self):
214 2
        if not self.associated_rule:
215
            return
216
217 2
        self.local_env_yaml["rule_title"] = self.associated_rule.title
218 2
        self.local_env_yaml["rule_id"] = self.associated_rule.id_
219
220 2
    def parse_from_file_with_jinja(self, env_yaml):
221 2
        return parse_from_file_with_jinja(self.file_path, env_yaml)
222
223
224 2
def process(remediation, env_yaml, fixes, rule_id):
225
    """
226
    Process a fix, adding it to fixes iff the file is of a valid extension
227
    for the remediation type and the fix is valid for the current product.
228
229
    Note that platform is a required field in the contents of the fix.
230
    """
231 2
    if not is_supported_filename(remediation.remediation_type, remediation.file_path):
232
        return
233
234 2
    result = remediation.parse_from_file_with_jinja(env_yaml)
235
236 2
    if not result.config['platform']:
237
        raise RuntimeError(
238
            "The '%s' remediation script does not contain the "
239
            "platform identifier!" % (remediation.file_path))
240
241 2
    product = env_yaml["product"]
242 2
    if utils.is_applicable_for_product(result.config['platform'], product):
243 2
        fixes[rule_id] = result
244
245 2
    return result
246
247
248 2
class BashRemediation(Remediation):
249 2
    def __init__(self, file_path):
250 2
        super(BashRemediation, self).__init__(file_path, "bash")
251
252 2
    def load_associated_rule(self, resolved_rules_dir, rule_id):
253
        # No point in loading rule for this remediation type as of now
254
        pass
255
256
257 2
class AnsibleRemediation(Remediation):
258 2
    def __init__(self, file_path):
259 2
        super(AnsibleRemediation, self).__init__(
260
            file_path, "ansible")
261
262 2
        self.body = None
263
264 2
    def parse_from_file_with_jinja(self, env_yaml):
265 2
        self.local_env_yaml.update(env_yaml)
266 2
        result = super(AnsibleRemediation, self).parse_from_file_with_jinja(self.local_env_yaml)
267
268 2
        if not self.associated_rule:
269
            return result
270
271 2
        parsed = ssg.yaml.ordered_load(result.contents)
272
273 2
        current_product = env_yaml.get("product")
274 2
        if current_product:
275 2
            self.update(parsed, result.config, current_product)
276
277 2
        updated_yaml_text = ssg.yaml.ordered_dump(
278
            parsed, None, default_flow_style=False)
279 2
        result = result._replace(contents=updated_yaml_text)
280
281 2
        self.body = parsed
282 2
        self.metadata = result.config
283
284 2
        return result
285
286 2 View Code Duplication
    def update_tags_from_config(self, to_update, config):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
287 2
        tags = to_update.get("tags", [])
288 2
        if "strategy" in config:
289 2
            tags.append("{0}_strategy".format(config["strategy"]))
290 2
        if "complexity" in config:
291 2
            tags.append("{0}_complexity".format(config["complexity"]))
292 2
        if "disruption" in config:
293 2
            tags.append("{0}_disruption".format(config["disruption"]))
294 2
        if "reboot" in config:
295 2
            if config["reboot"] == "true":
296
                reboot_tag = "reboot_required"
297
            else:
298 2
                reboot_tag = "no_reboot_needed"
299 2
            tags.append(reboot_tag)
300 2
        to_update["tags"] = tags
301
302 2
    def update_tags_from_rule(self, product, to_update):
303 2
        if not self.associated_rule:
304
            raise RuntimeError("The Ansible snippet has no rule loaded.")
305
306 2
        tags = to_update.get("tags", [])
307 2
        tags.insert(0, "{0}_severity".format(self.associated_rule.severity))
308 2
        tags.insert(0, self.associated_rule.id_)
309
310 2
        cce_num = self._get_cce(product)
311 2
        if cce_num:
312 2
            tags.append("CCE-{0}".format(cce_num))
313
314 2
        refs = self.get_references(product)
315 2
        tags.extend(refs)
316 2
        to_update["tags"] = tags
317
318 2
    def _get_cce(self, product):
319 2
        our_cce = None
320 2
        for cce, val in self.associated_rule.identifiers.items():
321 2
            if cce.endswith(product):
322 2
                our_cce = val
323 2
                break
324 2
        return our_cce
325
        return self.associated_rule.identifiers.get("cce", None)
326
327 2 View Code Duplication
    def get_references(self, product):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
328 2
        if not self.associated_rule:
329
            raise RuntimeError("The Ansible snippet has no rule loaded.")
330
        # see xccdf-addremediations.xslt <- shared_constants.xslt <- shared_shorthand2xccdf.xslt
331
        # if you want to know how the map was constructed
332 2
        platform_id_map = {
333
            "rhel7": "DISA-STIG-RHEL-07",
334
            "rhel8": "DISA-STIG-RHEL-08",
335
        }
336
        # RHEL6 is a special case, in our content,
337
        # we have only stig IDs for RHEL6 that include the literal 'RHEL-06'
338 2
        stig_platform_id = platform_id_map.get(product, "DISA-STIG")
339
340 2
        ref_prefix_map = {
341
            "nist": "NIST-800-53",
342
            "cui": "NIST-800-171",
343
            "pcidss": "PCI-DSS",
344
            "cjis": "CJIS",
345
            "stigid@{product}".format(product=product): stig_platform_id,
346
        }
347 2
        result = []
348 2
        for ref_class, prefix in ref_prefix_map.items():
349 2
            refs = self._get_rule_reference(ref_class)
350 2
            result.extend(["{prefix}-{value}".format(prefix=prefix, value=v) for v in refs])
351 2
        return result
352
353 2
    def _get_rule_reference(self, ref_class):
354 2
        refs = self.associated_rule.references.get(ref_class, "")
355 2
        if refs:
356 2
            return refs.split(",")
357
        else:
358 2
            return []
359
360 2 View Code Duplication
    def update_when_from_rule(self, to_update):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
361 2
        additional_when = ""
362 2
        if self.associated_rule.platform == "machine":
363 2
            additional_when = ('ansible_virtualization_role != "guest" '
364
                               'or ansible_virtualization_type != "docker"')
365 2
        to_update.setdefault("when", "")
366 2
        new_when = ssg.yaml.update_yaml_list_or_string(to_update["when"], additional_when)
367 2
        if not new_when:
368
            to_update.pop("when")
369
        else:
370 2
            to_update["when"] = new_when
371
372 2
    def update(self, parsed, config, product):
373 2
        for p in parsed:
374 2
            if not isinstance(p, dict):
375
                continue
376 2
            self.update_when_from_rule(p)
377 2
            self.update_tags_from_config(p, config)
378 2
            self.update_tags_from_rule(product, p)
379
380 2
    @classmethod
381
    def from_snippet_and_rule(cls, snippet_fname, rule_fname):
382 2
        result = cls(snippet_fname)
383 2
        result.load_rule_from(rule_fname)
384 2
        return result
385
386
387 2
class AnacondaRemediation(Remediation):
388 2
    def __init__(self, file_path):
389
        super(AnacondaRemediation, self).__init__(
390
            file_path, "anaconda")
391
392 2
    def load_associated_rule(self, resolved_rules_dir):
393
        # No point in loading rule for this remediation type as of now
394
        pass
395
396
397 2
class PuppetRemediation(Remediation):
398 2
    def __init__(self, file_path):
399
        super(PuppetRemediation, self).__init__(
400
            file_path, "puppet")
401
402 2
    def load_associated_rule(self, resolved_rules_dir):
403
        # No point in loading rule for this remediation type as of now
404
        pass
405
406
407 2
REMEDIATION_TO_CLASS = {
408
    'anaconda': AnacondaRemediation,
409
    'ansible': AnsibleRemediation,
410
    'bash': BashRemediation,
411
    'puppet': PuppetRemediation,
412
}
413
414
415 2
def write_fixes_to_xml(remediation_type, build_dir, output_path, fixes):
416
    """
417
    Builds a fix-content XML tree from the contents of fixes
418
    and writes it to output_path.
419
    """
420
421
    fixcontent = ElementTree.Element("fix-content", system="urn:xccdf:fix:script:sh",
422
                                     xmlns="http://checklists.nist.gov/xccdf/1.1")
423
    fixgroup = get_fixgroup_for_type(fixcontent, remediation_type)
424
425
    remediation_functions = get_available_functions(build_dir)
426
427
    for fix_name in fixes:
428
        fix_contents, config = fixes[fix_name]
429
430
        fix_elm = ElementTree.SubElement(fixgroup, "fix")
431
        fix_elm.set("rule", fix_name)
432
433
        for key in REMEDIATION_ELM_KEYS:
434
            if config[key]:
435
                fix_elm.set(key, config[key])
436
437
        fix_elm.text = fix_contents + "\n"
438
439
        # Expand shell variables and remediation functions
440
        # into corresponding XCCDF <sub> elements
441
        expand_xccdf_subs(fix_elm, remediation_type, remediation_functions)
442
443
    tree = ElementTree.ElementTree(fixcontent)
444
    tree.write(output_path)
445
446
447 2
def write_fixes_to_dir(fixes, remediation_type, output_dir):
448
    """
449
    Writes fixes as files to output_dir, each fix as a separate file
450
    """
451
    try:
452
        extension = REMEDIATION_TO_EXT_MAP[remediation_type]
453
    except KeyError:
454
        raise ValueError("Unknown remediation type %s." % remediation_type)
455
456
    if not os.path.exists(output_dir):
457
        os.makedirs(output_dir)
458
    for fix_name, fix in fixes.items():
459
        fix_contents, config = fix
460
        fix_path = os.path.join(output_dir, fix_name + extension)
461
        with open(fix_path, "w") as f:
462
            for k, v in config.items():
463
                f.write("# %s = %s\n" % (k, v))
464
            f.write(fix_contents)
465
466
467 2
def get_rule_dir_remediations(dir_path, remediation_type, product=None):
468
    """
469
    Gets a list of remediations of type remediation_type contained in a
470
    rule directory. If product is None, returns all such remediations.
471
    If product is not None, returns applicable remediations in order of
472
    priority:
473
474
        {{{ product }}}.ext -> shared.ext
475
476
    Only returns remediations which exist.
477
    """
478
479 2
    if not rules.is_rule_dir(dir_path):
480
        return []
481
482 2
    remediations_dir = os.path.join(dir_path, remediation_type)
483 2
    has_remediations_dir = os.path.isdir(remediations_dir)
484 2
    ext = REMEDIATION_TO_EXT_MAP[remediation_type]
485 2
    if not has_remediations_dir:
486
        return []
487
488 2
    results = []
489 2
    for remediation_file in os.listdir(remediations_dir):
490 2
        file_name, file_ext = os.path.splitext(remediation_file)
491 2
        remediation_path = os.path.join(remediations_dir, remediation_file)
492
493 2
        if file_ext == ext and rules.applies_to_product(file_name, product):
494 2
            if file_name == 'shared':
495
                results.append(remediation_path)
496
            else:
497 2
                results.insert(0, remediation_path)
498
499 2
    return results
500
501
502 2
def expand_xccdf_subs(fix, remediation_type, remediation_functions):
503
    """For those remediation scripts utilizing some of the internal SCAP
504
    Security Guide remediation functions expand the selected shell variables
505
    and remediation functions calls with <xccdf:sub> element
506
507
    This routine translates any instance of the 'populate' function call in
508
    the form of:
509
510
            populate variable_name
511
512
    into
513
514
            variable_name="<sub idref="variable_name"/>"
515
516
    Also transforms any instance of the 'ansible-populate' function call in the
517
    form of:
518
            (ansible-populate variable_name)
519
    into
520
521
            <sub idref="variable_name"/>
522
523
    Also transforms any instance of some other known remediation function (e.g.
524
    'replace_or_append' etc.) from the form of:
525
526
            function_name "arg1" "arg2" ... "argN"
527
528
    into:
529
530
            <sub idref="function_function_name"/>
531
            function_name "arg1" "arg2" ... "argN"
532
    """
533
534
    if remediation_type == "ansible":
535
        fix_text = fix.text
536
537
        if "(ansible-populate " in fix_text:
538
            raise RuntimeError(
539
                "(ansible-populate VAR) has been deprecated. Please use "
540
                "(xccdf-var VAR) instead. Keep in mind that the latter will "
541
                "make an ansible variable out of XCCDF Value as opposed to "
542
                "substituting directly."
543
            )
544
545
        # If you change this string make sure it still matches the pattern
546
        # defined in OpenSCAP. Otherwise you break variable handling in
547
        # 'oscap xccdf generate fix' and the variables won't be customizable!
548
        # https://github.com/OpenSCAP/openscap/blob/1.2.17/src/XCCDF_POLICY/xccdf_policy_remediate.c#L588
549
        #   const char *pattern =
550
        #     "- name: XCCDF Value [^ ]+ # promote to variable\n  set_fact:\n"
551
        #     "    ([^:]+): (.+)\n  tags:\n    - always\n";
552
        # We use !!str typecast to prevent treating values as different types
553
        # eg. yes as a bool or 077 as an octal number
554
        fix_text = re.sub(
555
            r"- \(xccdf-var\s+(\S+)\)",
556
            r"- name: XCCDF Value \1 # promote to variable\n"
557
            r"  set_fact:\n"
558
            r"    \1: !!str (ansible-populate \1)\n"
559
            r"  tags:\n"
560
            r"    - always",
561
            fix_text
562
        )
563
564
        pattern = r'\(ansible-populate\s*(\S+)\)'
565
566
        # we will get list what looks like
567
        # [text, varname, text, varname, ..., text]
568
        parts = re.split(pattern, fix_text)
569
570
        fix.text = parts[0]  # add first "text"
571
        for index in range(1, len(parts), 2):
572
            varname = parts[index]
573
            text_between_vars = parts[index + 1]
574
575
            # we cannot combine elements and text easily
576
            # so text is in ".tail" of element
577
            xccdfvarsub = ElementTree.SubElement(fix, "sub", idref=varname)
578
            xccdfvarsub.tail = text_between_vars
579
        return
580
581
    elif remediation_type == "puppet":
582
        pattern = r'\(puppet-populate\s*(\S+)\)'
583
584
        # we will get list what looks like
585
        # [text, varname, text, varname, ..., text]
586
        parts = re.split(pattern, fix.text)
587
588
        fix.text = parts[0]  # add first "text"
589
        for index in range(1, len(parts), 2):
590
            varname = parts[index]
591
            text_between_vars = parts[index + 1]
592
593
            # we cannot combine elements and text easily
594
            # so text is in ".tail" of element
595
            xccdfvarsub = ElementTree.SubElement(fix, "sub", idref=varname)
596
            xccdfvarsub.tail = text_between_vars
597
        return
598
599
    elif remediation_type == "anaconda":
600
        pattern = r'\(anaconda-populate\s*(\S+)\)'
601
602
        # we will get list what looks like
603
        # [text, varname, text, varname, ..., text]
604
        parts = re.split(pattern, fix.text)
605
606
        fix.text = parts[0]  # add first "text"
607
        for index in range(1, len(parts), 2):
608
            varname = parts[index]
609
            text_between_vars = parts[index + 1]
610
611
            # we cannot combine elements and text easily
612
            # so text is in ".tail" of element
613
            xccdfvarsub = ElementTree.SubElement(fix, "sub", idref=varname)
614
            xccdfvarsub.tail = text_between_vars
615
        return
616
617
    elif remediation_type == "bash":
618
        # This remediation script doesn't utilize internal remediation functions
619
        # Skip it without any further processing
620
        if 'remediation_functions' not in fix.text:
621
            return
622
623
        # This remediation script utilizes some of internal remediation functions
624
        # Expand shell variables and remediation functions calls with <xccdf:sub>
625
        # elements
626
        pattern = r'\n+(\s*(?:' + r'|'.join(remediation_functions) + r')[^\n]*)\n'
627
        patcomp = re.compile(pattern, re.DOTALL)
628
        fixparts = re.split(patcomp, fix.text)
629
        if fixparts[0] is not None:
630
            # Split the portion of fix.text from fix start to first call of
631
            # remediation function, keeping only the third part:
632
            # * tail        to hold part of the fix.text after inclusion,
633
            #               but before first call of remediation function
634
            try:
635
                rfpattern = '(.*remediation_functions)(.*)'
636
                rfpatcomp = re.compile(rfpattern, re.DOTALL)
637
                _, _, tail, _ = re.split(rfpatcomp, fixparts[0], maxsplit=2)
638
            except ValueError:
639
                sys.stderr.write("Processing fix.text for: %s rule\n"
640
                                 % fix.get('rule'))
641
                sys.stderr.write("Unable to extract part of the fix.text "
642
                                 "after inclusion of remediation functions."
643
                                 " Aborting..\n")
644
                sys.exit(1)
645
            # If the 'tail' is not empty, make it new fix.text.
646
            # Otherwise use ''
647
            fix.text = tail if tail is not None else ''
0 ignored issues
show
introduced by
The variable tail does not seem to be defined for all execution paths.
Loading history...
648
            # Drop the first element of 'fixparts' since it has been processed
649
            fixparts.pop(0)
650
            # Perform sanity check on new 'fixparts' list content (to continue
651
            # successfully 'fixparts' has to contain even count of elements)
652
            if len(fixparts) % 2 != 0:
653
                sys.stderr.write("Error performing XCCDF expansion on "
654
                                 "remediation script: %s\n"
655
                                 % fix.get("rule"))
656
                sys.stderr.write("Invalid count of elements. Exiting!\n")
657
                sys.exit(1)
658
            # Process remaining 'fixparts' elements in pairs
659
            # First pair element is remediation function to be XCCDF expanded
660
            # Second pair element (if not empty) is the portion of the original
661
            # fix text to be used in newly added sublement's tail
662
            for idx in range(0, len(fixparts), 2):
663
                # We previously removed enclosing newlines when creating
664
                # fixparts list. Add them back and reuse the above 'pattern'
665
                fixparts[idx] = "\n%s\n" % fixparts[idx]
666
                # Sanity check (verify the first field truly contains call of
667
                # some of the remediation functions)
668
                if re.match(pattern, fixparts[idx], re.DOTALL) is not None:
669
                    # This chunk contains call of 'populate' function
670
                    if "populate" in fixparts[idx]:
671
                        varname, fixtextcontrib = get_populate_replacement(remediation_type,
672
                                                                           fixparts[idx])
673
                        # Define new XCCDF <sub> element for the variable
674
                        xccdfvarsub = ElementTree.Element("sub", idref=varname)
675
676
                        # If this is first sub element,
677
                        # the textcontribution needs to go to fix text
678
                        # otherwise, append to last subelement
679
                        nfixchildren = len(list(fix))
680
                        if nfixchildren == 0:
681
                            fix.text += fixtextcontrib
682
                        else:
683
                            previouselem = fix[nfixchildren-1]
684
                            previouselem.tail += fixtextcontrib
685
686
                        # If second pair element is not empty, append it as
687
                        # tail for the subelement (prefixed with closing '"')
688
                        if fixparts[idx + 1] is not None:
689
                            xccdfvarsub.tail = '"' + '\n' + fixparts[idx + 1]
690
                        # Otherwise append just enclosing '"'
691
                        else:
692
                            xccdfvarsub.tail = '"' + '\n'
693
                        # Append the new subelement to the fix element
694
                        fix.append(xccdfvarsub)
695
                    # This chunk contains call of other remediation function
696
                    else:
697
                        # Extract remediation function name
698
                        funcname = re.search(r'\n\s*(\S+)(| .*)\n',
699
                                             fixparts[idx],
700
                                             re.DOTALL).group(1)
701
                        # Define new XCCDF <sub> element for the function
702
                        xccdffuncsub = ElementTree.Element(
703
                            "sub", idref='function_%s' % funcname)
704
                        # Append original function call into tail of the
705
                        # subelement
706
                        xccdffuncsub.tail = fixparts[idx]
707
                        # If the second element of the pair is not empty,
708
                        # append it to the tail of the subelement too
709
                        if fixparts[idx + 1] is not None:
710
                            xccdffuncsub.tail += fixparts[idx + 1]
711
                        # Append the new subelement to the fix element
712
                        fix.append(xccdffuncsub)
713
                        # Ensure the newly added <xccdf:sub> element for the
714
                        # function will be always inserted at newline
715
                        # If xccdffuncsub is the first <xccdf:sub> element
716
                        # being added as child of <fix> and fix.text doesn't
717
                        # end up with newline character, append the newline
718
                        # to the fix.text
719
                        if list(fix).index(xccdffuncsub) == 0:
720
                            if re.search(r'.*\n$', fix.text) is None:
721
                                fix.text += '\n'
722
                        # If xccdffuncsub isn't the first child (first
723
                        # <xccdf:sub> being added), and tail of previous
724
                        # child doesn't end up with newline, append the newline
725
                        # to the tail of previous child
726
                        else:
727
                            previouselem = fix[list(fix).index(xccdffuncsub) - 1]
728
                            if re.search(r'.*\n$', previouselem.tail) is None:
729
                                previouselem.tail += '\n'
730
731
        # Perform a sanity check if all known remediation function calls have been
732
        # properly XCCDF substituted. Exit with failure if some wasn't
733
734
        # First concat output form of modified fix text (including text appended
735
        # to all children of the fix)
736
        modfix = [fix.text]
737
        for child in fix.getchildren():
738
            if child is not None and child.text is not None:
739
                modfix.append(child.text)
740
        modfixtext = "".join(modfix)
741
        for func in remediation_functions:
742
            # Then efine expected XCCDF sub element form for this function
743
            funcxccdfsub = "<sub idref=\"function_%s\"" % func
744
            # Finally perform the sanity check -- if function was properly XCCDF
745
            # substituted both the original function call and XCCDF <sub> element
746
            # for that function need to be present in the modified text of the fix
747
            # Otherwise something went wrong, thus exit with failure
748
            if func in modfixtext and funcxccdfsub not in modfixtext:
749
                sys.stderr.write("Error performing XCCDF <sub> substitution "
750
                                 "for function %s in %s fix. Exiting...\n"
751
                                 % (func, fix.get("rule")))
752
                sys.exit(1)
753
    else:
754
        sys.stderr.write("Unknown remediation type '%s'\n" % (remediation_type))
755
        sys.exit(1)
756