Test Failed
Push — master ( 36ef11...81e955 )
by Jan
03:06 queued 24s
created

parse_from_file_with_jinja()   A

Complexity

Conditions 1

Size

Total Lines 13
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 2
dl 0
loc 13
ccs 3
cts 3
cp 1
crap 1
rs 10
c 0
b 0
f 0
1 2
from __future__ import absolute_import
2 2
from __future__ import print_function
3
4 2
import sys
5 2
import os
6 2
import os.path
7 2
import re
8 2
from collections import defaultdict, namedtuple, OrderedDict
9
10 2
import ssg.yaml
11 2
from . import build_yaml
12 2
from . import rules
13 2
from . import utils
14
15 2
from . import constants
16 2
from .jinja import process_file_with_macros as jinja_process_file
17
18 2
from .xml import ElementTree
19
20 2
REMEDIATION_TO_EXT_MAP = {
21
    'anaconda': '.anaconda',
22
    'ansible': '.yml',
23
    'bash': '.sh',
24
    'puppet': '.pp',
25
    'ignition': '.yml',
26
    'kubernetes': '.yml',
27
    'blueprint': '.toml'
28
}
29
30
31 2
FILE_GENERATED_HASH_COMMENT = '# THIS FILE IS GENERATED'
32
33 2
REMEDIATION_CONFIG_KEYS = ['complexity', 'disruption', 'platform', 'reboot',
34
                           'strategy']
35 2
REMEDIATION_ELM_KEYS = ['complexity', 'disruption', 'reboot', 'strategy']
36
37
38 2
def get_fixgroup_for_type(fixcontent, remediation_type):
39
    """
40
    For a given remediation type, return a new subelement of that type.
41
42
    Exits if passed an unknown remediation type.
43
    """
44
    if remediation_type == 'anaconda':
45
        return ElementTree.SubElement(
46
            fixcontent, "fix-group", id="anaconda",
47
            system="urn:redhat:anaconda:pre",
48
            xmlns="http://checklists.nist.gov/xccdf/1.1")
49
50
    elif remediation_type == 'ansible':
51
        return ElementTree.SubElement(
52
            fixcontent, "fix-group", id="ansible",
53
            system="urn:xccdf:fix:script:ansible",
54
            xmlns="http://checklists.nist.gov/xccdf/1.1")
55
56
    elif remediation_type == 'bash':
57
        return ElementTree.SubElement(
58
            fixcontent, "fix-group", id="bash",
59
            system="urn:xccdf:fix:script:sh",
60
            xmlns="http://checklists.nist.gov/xccdf/1.1")
61
62
    elif remediation_type == 'puppet':
63
        return ElementTree.SubElement(
64
            fixcontent, "fix-group", id="puppet",
65
            system="urn:xccdf:fix:script:puppet",
66
            xmlns="http://checklists.nist.gov/xccdf/1.1")
67
68
    elif remediation_type == 'ignition':
69
        return ElementTree.SubElement(
70
            fixcontent, "fix-group", id="ignition",
71
            system="urn:xccdf:fix:script:ignition",
72
            xmlns="http://checklists.nist.gov/xccdf/1.1")
73
74
    elif remediation_type == 'kubernetes':
75
        return ElementTree.SubElement(
76
            fixcontent, "fix-group", id="kubernetes",
77
            system="urn:xccdf:fix:script:kubernetes",
78
            xmlns="http://checklists.nist.gov/xccdf/1.1")
79
80
    elif remediation_type == 'blueprint':
81
        return ElementTree.SubElement(
82
            fixcontent, "fix-group", id="blueprint",
83
            system="urn:redhat:osbuild:blueprint",
84
            xmlns="http://checklists.nist.gov/xccdf/1.1")
85
86
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
87
                     % (remediation_type))
88
    sys.exit(1)
89
90
91 2
def is_supported_filename(remediation_type, filename):
92
    """
93
    Checks if filename has a supported extension for remediation_type.
94
95
    Exits when remediation_type is of an unknown type.
96
    """
97 2
    if remediation_type in REMEDIATION_TO_EXT_MAP:
98 2
        return filename.endswith(REMEDIATION_TO_EXT_MAP[remediation_type])
99
100
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
101
                     % (remediation_type))
102
    sys.exit(1)
103
104
105 2
def split_remediation_content_and_metadata(fix_file):
106 2
    remediation_contents = []
107 2
    config = defaultdict(lambda: None)
108
109
    # Assignment automatically escapes shell characters for XML
110 2
    for line in fix_file.splitlines():
111 2
        if line.startswith(FILE_GENERATED_HASH_COMMENT):
112
            continue
113
114 2
        if line.startswith('#') and line.count('=') == 1:
115 2
            (key, value) = line.strip('#').split('=')
116 2
            if key.strip() in REMEDIATION_CONFIG_KEYS:
117 2
                config[key.strip()] = value.strip()
118 2
                continue
119
120
        # If our parsed line wasn't a config item, add it to the
121
        # returned file contents. This includes when the line
122
        # begins with a '#' and contains an equals sign, but
123
        # the "key" isn't one of the known keys from
124
        # REMEDIATION_CONFIG_KEYS.
125 2
        remediation_contents.append(line)
126
127 2
    contents = "\n".join(remediation_contents)
128 2
    remediation = namedtuple('remediation', ['contents', 'config'])
129 2
    return remediation(contents=contents, config=config)
130
131
132 2
def parse_from_file_with_jinja(file_path, env_yaml):
133
    """
134
    Parses a remediation from a file. As remediations contain jinja macros,
135
    we need a env_yaml context to process these. In practice, no remediations
136
    use jinja in the configuration, so for extracting only the configuration,
137
    env_yaml can be an abritrary product.yml dictionary.
138
139
    If the logic of configuration parsing changes significantly, please also
140
    update ssg.fixes.parse_platform(...).
141
    """
142
143 2
    fix_file = jinja_process_file(file_path, env_yaml)
144 2
    return split_remediation_content_and_metadata(fix_file)
145
146
147 2
def parse_from_file_without_jinja(file_path):
148
    """
149
    Parses a remediation from a file. Doesn't process the Jinja macros.
150
    This function is useful in build phases in which all the Jinja macros
151
    are already resolved.
152
    """
153
    with open(file_path, "r") as f:
154
        f_str = f.read()
155
        return split_remediation_content_and_metadata(f_str)
156
157
158 2
class Remediation(object):
159 2
    def __init__(self, file_path, remediation_type):
160 2
        self.file_path = file_path
161 2
        self.local_env_yaml = dict()
162
163 2
        self.metadata = defaultdict(lambda: None)
164
165 2
        self.remediation_type = remediation_type
166 2
        self.associated_rule = None
167
168 2
    def associate_rule(self, rule_obj):
169 2
        self.associated_rule = rule_obj
170 2
        self.expand_env_yaml_from_rule()
171
172 2
    def expand_env_yaml_from_rule(self):
173 2
        if not self.associated_rule:
174
            return
175
176 2
        self.local_env_yaml["rule_title"] = self.associated_rule.title
177 2
        self.local_env_yaml["rule_id"] = self.associated_rule.id_
178 2
        self.local_env_yaml["cce_identifiers"] = self.associated_rule.identifiers
179
180 2
    def parse_from_file_with_jinja(self, env_yaml, cpe_platforms):
181 2
        return parse_from_file_with_jinja(self.file_path, env_yaml)
182
183
184 2
def process(remediation, env_yaml, cpe_platforms):
185
    """
186
    Process a fix, and return the processed fix iff the file is of a valid
187
    extension for the remediation type and the fix is valid for the current
188
    product.
189
190
    Note that platform is a required field in the contents of the fix.
191
    """
192 2
    if not is_supported_filename(remediation.remediation_type, remediation.file_path):
193
        return
194
195 2
    result = remediation.parse_from_file_with_jinja(env_yaml, cpe_platforms)
196 2
    platforms = result.config['platform']
197
198 2
    if not platforms:
199
        raise RuntimeError(
200
            "The '%s' remediation script does not contain the "
201
            "platform identifier!" % (remediation.file_path))
202
203 2
    for platform in platforms.split(","):
204 2
        if platform.strip() != platform:
205
            msg = (
206
                "Comma-separated '{platform}' platforms "
207
                "in '{remediation_file}' contains whitespace."
208
                .format(platform=platforms, remediation_file=remediation.file_path))
209
            raise ValueError(msg)
210
211 2
    product = env_yaml["product"]
212 2
    if utils.is_applicable_for_product(platforms, product):
213 2
        return result
214
215
    return None
216
217
218 2
class BashRemediation(Remediation):
219 2
    def __init__(self, file_path):
220 2
        super(BashRemediation, self).__init__(file_path, "bash")
221
222 2
    def parse_from_file_with_jinja(self, env_yaml, cpe_platforms):
223 2
        self.local_env_yaml.update(env_yaml)
224 2
        result = super(BashRemediation, self).parse_from_file_with_jinja(
225
            self.local_env_yaml, cpe_platforms)
226
227
        # Avoid platform wrapping empty fix text
228
        # Remediations can be empty when a Jinja macro or conditional
229
        # renders no fix text for a product
230 2
        stripped_fix_text = result.contents.strip()
231 2
        if stripped_fix_text == "":
232
            return result
233
234 2
        rule_specific_cpe_platform_names = set()
235 2
        inherited_cpe_platform_names = set()
236 2
        if self.associated_rule:
237
            # There can be repeated inherited platforms and rule platforms
238
            inherited_cpe_platform_names.update(self.associated_rule.inherited_cpe_platform_names)
239
            if self.associated_rule.cpe_platform_names is not None:
240
                rule_specific_cpe_platform_names = {
241
                    p for p in self.associated_rule.cpe_platform_names
242
                    if p not in inherited_cpe_platform_names}
243
244 2
        inherited_conditionals = [
245
            cpe_platforms[p].to_bash_conditional()
246
            for p in inherited_cpe_platform_names]
247 2
        rule_specific_conditionals = [
248
            cpe_platforms[p].to_bash_conditional()
249
            for p in rule_specific_cpe_platform_names]
250
        # remove potential "None" from lists
251 2
        inherited_conditionals = sorted([
252
            p for p in inherited_conditionals if p != ''])
253 2
        rule_specific_conditionals = sorted([
254
            p for p in rule_specific_conditionals if p != ''])
255
256 2
        if inherited_conditionals or rule_specific_conditionals:
257
            wrapped_fix_text = ["# Remediation is applicable only in certain platforms"]
258
259
            all_conditions = ""
260
            if inherited_conditionals:
261
                all_conditions += " && ".join(inherited_conditionals)
262
            if rule_specific_conditionals:
263
                if all_conditions:
264
                    all_conditions += " && { " + " || ".join(rule_specific_conditionals) + "; }"
265
                else:
266
                    all_conditions = " || ".join(rule_specific_conditionals)
267
            wrapped_fix_text.append("if {0}; then".format(all_conditions))
268
            wrapped_fix_text.append("")
269
            # It is possible to indent the original body of the remediation with textwrap.indent(),
270
            # however, it is not supported by python2, and there is a risk of breaking remediations
271
            # For example, remediations with a here-doc block could be affected.
272
            wrapped_fix_text.append("{0}".format(stripped_fix_text))
273
            wrapped_fix_text.append("")
274
            wrapped_fix_text.append("else")
275
            wrapped_fix_text.append(
276
                "    >&2 echo 'Remediation is not applicable, nothing was done'")
277
            wrapped_fix_text.append("fi")
278
279
            remediation = namedtuple('remediation', ['contents', 'config'])
280
            result = remediation(contents="\n".join(wrapped_fix_text), config=result.config)
281
282 2
        return result
283
284
285 2
class AnsibleRemediation(Remediation):
286 2
    def __init__(self, file_path):
287 2
        super(AnsibleRemediation, self).__init__(
288
            file_path, "ansible")
289
290 2
        self.body = None
291
292 2
    def parse_from_file_with_jinja(self, env_yaml, cpe_platforms):
293 2
        self.local_env_yaml.update(env_yaml)
294 2
        result = super(AnsibleRemediation, self).parse_from_file_with_jinja(
295
            self.local_env_yaml, cpe_platforms)
296
297 2
        if not self.associated_rule:
298
            return result
299
300 2
        parsed = ssg.yaml.ordered_load(result.contents)
301
302 2
        self.update(parsed, result.config, cpe_platforms)
303
304 2
        updated_yaml_text = ssg.yaml.ordered_dump(
305
            parsed, None, default_flow_style=False)
306 2
        result = result._replace(contents=updated_yaml_text)
307
308 2
        self.body = parsed
309 2
        self.metadata = result.config
310
311 2
        return result
312
313 2
    def update_tags_from_config(self, to_update, config):
314 2
        tags = to_update.get("tags", [])
315 2
        if "strategy" in config:
316 2
            tags.append("{0}_strategy".format(config["strategy"]))
317 2
        if "complexity" in config:
318 2
            tags.append("{0}_complexity".format(config["complexity"]))
319 2
        if "disruption" in config:
320 2
            tags.append("{0}_disruption".format(config["disruption"]))
321 2
        if "reboot" in config:
322 2
            if config["reboot"] == "true":
323
                reboot_tag = "reboot_required"
324
            else:
325 2
                reboot_tag = "no_reboot_needed"
326 2
            tags.append(reboot_tag)
327 2
        to_update["tags"] = sorted(tags)
328
329 2
    def update_tags_from_rule(self, to_update):
330 2
        if not self.associated_rule:
331
            raise RuntimeError("The Ansible snippet has no rule loaded.")
332
333 2
        tags = to_update.get("tags", [])
334 2
        tags.insert(0, "{0}_severity".format(self.associated_rule.severity))
335 2
        tags.insert(0, self.associated_rule.id_)
336
337 2
        cce_num = self._get_cce()
338 2
        if cce_num:
339 2
            tags.append("{0}".format(cce_num))
340
341 2
        refs = self.get_references()
342 2
        tags.extend(refs)
343 2
        to_update["tags"] = sorted(tags)
344
345 2
    def _get_cce(self):
346 2
        return self.associated_rule.identifiers.get("cce", None)
347
348 2
    def get_references(self):
349 2
        if not self.associated_rule:
350
            raise RuntimeError("The Ansible snippet has no rule loaded.")
351
352 2
        result = []
353 2
        for ref_class, prefix in constants.REF_PREFIX_MAP.items():
354 2
            refs = self._get_rule_reference(ref_class)
355 2
            result.extend(["{prefix}-{value}".format(prefix=prefix, value=v) for v in refs])
356 2
        return result
357
358 2
    def _get_rule_reference(self, ref_class):
359 2
        refs = self.associated_rule.references.get(ref_class, "")
360 2
        if refs:
361 2
            return refs.split(",")
362
        else:
363 2
            return []
364
365 2
    def inject_package_facts_task(self, parsed_snippet):
366
        """ Injects a package_facts task only if
367
            the snippet has a task with a when clause with ansible_facts.packages,
368
            and the snippet doesn't already have a package_facts task
369
        """
370 2
        has_package_facts_task = False
371 2
        has_ansible_facts_packages_clause = False
372
373 2
        for p_task in parsed_snippet:
374
            # We are only interested in the OrderedDicts, which represent Ansible tasks
375 2
            if not isinstance(p_task, dict):
376
                continue
377
378 2
            if "package_facts" in p_task:
379
                has_package_facts_task = True
380
381
            # When clause of the task can be string or a list, lets normalize to list
382 2
            task_when = p_task.get("when", "")
383 2
            if type(task_when) is str:
384 2
                task_when = [task_when]
385 2
            for when in task_when:
386 2
                if "ansible_facts.packages" in when:
387
                    has_ansible_facts_packages_clause = True
388
389 2
        if has_ansible_facts_packages_clause and not has_package_facts_task:
390
            facts_task = OrderedDict({'name': 'Gather the package facts',
391
                                      'package_facts': {'manager': 'auto'}})
392
            parsed_snippet.insert(0, facts_task)
393
394 2
    def update_when_from_rule(self, to_update, cpe_platforms):
395 2
        additional_when = []
396 2
        rule_specific_cpe_platform_names = set()
397 2
        inherited_cpe_platform_names = set()
398 2
        if self.associated_rule:
399
            # There can be repeated inherited platforms and rule platforms
400 2
            inherited_cpe_platform_names.update(self.associated_rule.inherited_cpe_platform_names)
401 2
            if self.associated_rule.cpe_platform_names is not None:
402 2
                rule_specific_cpe_platform_names = {
403
                    p for p in self.associated_rule.cpe_platform_names
404
                    if p not in inherited_cpe_platform_names}
405
406 2
        inherited_conditionals = [
407
            cpe_platforms[p].to_ansible_conditional()
408
            for p in inherited_cpe_platform_names]
409 2
        rule_specific_conditionals = [
410
            cpe_platforms[p].to_ansible_conditional()
411
            for p in rule_specific_cpe_platform_names]
412
        # remove potential "None" from lists
413 2
        inherited_conditionals = sorted([
414
            p for p in inherited_conditionals if p != ''])
415 2
        rule_specific_conditionals = sorted([
416
            p for p in rule_specific_conditionals if p != ''])
417
418
        # remove conditionals related to package CPEs if the updated task
419
        # collects package facts
420 2
        if "package_facts" in to_update:
421
            inherited_conditionals = [
422
                c for c in inherited_conditionals if "in ansible_facts.packages" not in c]
423
            rule_specific_conditionals = [
424
                c for c in rule_specific_conditionals if "in ansible_facts.packages" not in c]
425
426 2
        if inherited_conditionals:
427
            additional_when = additional_when + inherited_conditionals
428
429 2
        if rule_specific_conditionals:
430 2
            additional_when.append(" or ".join(rule_specific_conditionals))
431
432 2
        to_update.setdefault("when", "")
433 2
        new_when = ssg.yaml.update_yaml_list_or_string(to_update["when"], additional_when,
434
                                                       prepend=True)
435 2
        if not new_when:
436
            to_update.pop("when")
437
        else:
438 2
            to_update["when"] = new_when
439
440 2
    def update(self, parsed, config, cpe_platforms):
441
        # We split the remediation update in three steps
442
443
        # 1. Update the when clause
444 2
        for p in parsed:
445 2
            if not isinstance(p, dict):
446
                continue
447 2
            self.update_when_from_rule(p, cpe_platforms)
448
449
        # 2. Inject any extra task necessary
450 2
        self.inject_package_facts_task(parsed)
451
452
        # 3. Add tags to all tasks, including the ones we have injected
453 2
        for p in parsed:
454 2
            if not isinstance(p, dict):
455
                continue
456 2
            self.update_tags_from_config(p, config)
457 2
            self.update_tags_from_rule(p)
458
459 2
    @classmethod
460
    def from_snippet_and_rule(cls, snippet_fname, rule_fname):
461 2
        if os.path.isfile(snippet_fname) and os.path.isfile(rule_fname):
462 2
            result = cls(snippet_fname)
463 2
            try:
464 2
                rule_obj = build_yaml.Rule.from_yaml(rule_fname)
465 2
                result.associate_rule(rule_obj)
466
            except ssg.yaml.DocumentationNotComplete:
467
                # Happens on non-debug build when a rule is "documentation-incomplete"
468
                return None
469 2
            return result
470
471
472 2
class AnacondaRemediation(Remediation):
473 2
    def __init__(self, file_path):
474
        super(AnacondaRemediation, self).__init__(
475
            file_path, "anaconda")
476
477
478 2
class PuppetRemediation(Remediation):
479 2
    def __init__(self, file_path):
480
        super(PuppetRemediation, self).__init__(
481
            file_path, "puppet")
482
483
484 2
class IgnitionRemediation(Remediation):
485 2
    def __init__(self, file_path):
486
        super(IgnitionRemediation, self).__init__(
487
            file_path, "ignition")
488
489
490 2
class KubernetesRemediation(Remediation):
491 2
    def __init__(self, file_path):
492
        super(KubernetesRemediation, self).__init__(
493
              file_path, "kubernetes")
494
495
496 2
class BlueprintRemediation(Remediation):
497
    """
498
    This provides class for OSBuild Blueprint remediations
499
    """
500 2
    def __init__(self, file_path):
501
        super(BlueprintRemediation, self).__init__(
502
            file_path, "blueprint")
503
504
505 2
REMEDIATION_TO_CLASS = {
506
    'anaconda': AnacondaRemediation,
507
    'ansible': AnsibleRemediation,
508
    'bash': BashRemediation,
509
    'puppet': PuppetRemediation,
510
    'ignition': IgnitionRemediation,
511
    'kubernetes': KubernetesRemediation,
512
    'blueprint': BlueprintRemediation,
513
}
514
515
516 2
def write_fixes_to_xml(remediation_type, build_dir, output_path, fixes):
517
    """
518
    Builds a fix-content XML tree from the contents of fixes
519
    and writes it to output_path.
520
    """
521
522
    fixcontent = ElementTree.Element("fix-content", system="urn:xccdf:fix:script:sh",
523
                                     xmlns="http://checklists.nist.gov/xccdf/1.1")
524
    fixgroup = get_fixgroup_for_type(fixcontent, remediation_type)
525
526
    for fix_name in fixes:
527
        fix_contents, config = fixes[fix_name]
528
529
        fix_elm = ElementTree.SubElement(fixgroup, "fix")
530
        fix_elm.set("rule", fix_name)
531
532
        for key in REMEDIATION_ELM_KEYS:
533
            if config[key]:
534
                fix_elm.set(key, config[key])
535
536
        fix_elm.text = fix_contents + "\n"
537
538
        # Expand shell variables and remediation functions
539
        # into corresponding XCCDF <sub> elements
540
        expand_xccdf_subs(fix_elm, remediation_type)
541
542
    tree = ElementTree.ElementTree(fixcontent)
543
    tree.write(output_path)
544
545
546 2
def write_fix_to_file(fix, file_path):
547
    """
548
    Writes a single fix to the given file path.
549
    """
550
    fix_contents, config = fix
551
    with open(file_path, "w") as f:
552
        for k, v in config.items():
553
            f.write("# %s = %s\n" % (k, v))
554
        f.write(fix_contents)
555
556
557 2
def write_fixes_to_dir(fixes, remediation_type, output_dir):
558
    """
559
    Writes fixes as files to output_dir, each fix as a separate file
560
    """
561
    try:
562
        extension = REMEDIATION_TO_EXT_MAP[remediation_type]
563
    except KeyError:
564
        raise ValueError("Unknown remediation type %s." % remediation_type)
565
566
    if not os.path.exists(output_dir):
567
        os.makedirs(output_dir)
568
    for fix_name, fix in fixes.items():
569
        fix_path = os.path.join(output_dir, fix_name + extension)
570
        write_fix_to_file(fix, fix_path)
571
572
573 2
def get_rule_dir_remediations(dir_path, remediation_type, product=None):
574
    """
575
    Gets a list of remediations of type remediation_type contained in a
576
    rule directory. If product is None, returns all such remediations.
577
    If product is not None, returns applicable remediations in order of
578
    priority:
579
580
        {{{ product }}}.ext -> shared.ext
581
582
    Only returns remediations which exist.
583
    """
584
585 2
    if not rules.is_rule_dir(dir_path):
586
        return []
587
588 2
    remediations_dir = os.path.join(dir_path, remediation_type)
589 2
    has_remediations_dir = os.path.isdir(remediations_dir)
590 2
    ext = REMEDIATION_TO_EXT_MAP[remediation_type]
591 2
    if not has_remediations_dir:
592
        return []
593
594
    # Two categories of results: those for a product and those that are
595
    # shared to multiple products. Within common results, there's two types:
596
    # those shared to multiple versions of the same type (added up front) and
597
    # those shared across multiple product types (e.g., RHEL and Ubuntu).
598 2
    product_results = []
599 2
    common_results = []
600 2
    for remediation_file in sorted(os.listdir(remediations_dir)):
601 2
        file_name, file_ext = os.path.splitext(remediation_file)
602 2
        remediation_path = os.path.join(remediations_dir, remediation_file)
603
604 2
        if file_ext == ext and rules.applies_to_product(file_name, product):
605
            # rules.applies_to_product ensures we only have three entries:
606
            # 1. shared
607
            # 2. <product>
608
            # 3. <product><version>
609
            #
610
            # Note that the product variable holds <product><version>.
611 2
            if file_name == 'shared':
612
                # Shared are the lowest priority items, add them to the end
613
                # of the common results.
614
                common_results.append(remediation_path)
615 2
            elif file_name != product:
616
                # Here, the filename is a subset of the product, but isn't
617
                # the full product. Product here is both the product name
618
                # (e.g., ubuntu) and its version (2004). Filename could be
619
                # either "ubuntu" or "ubuntu2004" so we want this branch
620
                # to trigger when it is the former, not the latter. It is
621
                # the highest priority of common results, so insert it
622
                # before any shared ones.
623 2
                common_results.insert(0, remediation_path)
624
            else:
625
                # Finally, this must be product-specific result.
626 2
                product_results.append(remediation_path)
627
628
    # Combine the two sets in priority order.
629 2
    return product_results + common_results
630
631
632 2
def expand_xccdf_subs(fix, remediation_type):
633
    """Expand the respective populate keywords of each
634
    remediation type with an <xccdf:sub> element
635
636
    This routine translates any instance of the '`type`-populate' keyword in
637
    the form of:
638
639
            (`type`-populate variable_name)
640
641
    where `type` can be either ansible, puppet, anaconda or bash, into
642
643
            <sub idref="variable_name"/>
644
645
    """
646
647
    if fix is not None:
648
        fix_text = fix.text
649
    else:
650
        return
651
    if remediation_type == "ignition":
652
        return
653
    elif remediation_type == "kubernetes":
654
        return
655
    elif remediation_type == "blueprint":
656
        return
657
    elif remediation_type == "ansible":
658
659
        if "(ansible-populate " in fix_text:
660
            raise RuntimeError(
661
                "(ansible-populate VAR) has been deprecated. Please use "
662
                "(xccdf-var VAR) instead. Keep in mind that the latter will "
663
                "make an ansible variable out of XCCDF Value as opposed to "
664
                "substituting directly."
665
            )
666
667
        # If you change this string make sure it still matches the pattern
668
        # defined in OpenSCAP. Otherwise you break variable handling in
669
        # 'oscap xccdf generate fix' and the variables won't be customizable!
670
        # https://github.com/OpenSCAP/openscap/blob/1.2.17/src/XCCDF_POLICY/xccdf_policy_remediate.c#L588
671
        #   const char *pattern =
672
        #     "- name: XCCDF Value [^ ]+ # promote to variable\n  set_fact:\n"
673
        #     "    ([^:]+): (.+)\n  tags:\n    - always\n";
674
        # We use !!str typecast to prevent treating values as different types
675
        # eg. yes as a bool or 077 as an octal number
676
        fix_text = re.sub(
677
            r"- \(xccdf-var\s+(\S+)\)",
678
            r"- name: XCCDF Value \1 # promote to variable\n"
679
            r"  set_fact:\n"
680
            r"    \1: !!str (ansible-populate \1)\n"
681
            r"  tags:\n"
682
            r"    - always",
683
            fix_text
684
        )
685
686
        pattern = r'\(ansible-populate\s*(\S+)\)'
687
688
    elif remediation_type == "puppet":
689
        pattern = r'\(puppet-populate\s*(\S+)\)'
690
691
    elif remediation_type == "anaconda":
692
        pattern = r'\(anaconda-populate\s*(\S+)\)'
693
694
    elif remediation_type == "bash":
695
        pattern = r'\(bash-populate\s*(\S+)\)'
696
697
    else:
698
        sys.stderr.write("Unknown remediation type '%s'\n" % (remediation_type))
699
        sys.exit(1)
700
701
    # we will get list what looks like
702
    # [text, varname, text, varname, ..., text]
703
    parts = re.split(pattern, fix_text)
0 ignored issues
show
introduced by
The variable pattern does not seem to be defined for all execution paths.
Loading history...
704
705
    fix.text = parts[0]  # add first "text"
706
    for index in range(1, len(parts), 2):
707
        varname = parts[index]
708
        text_between_vars = parts[index + 1]
709
710
        # we cannot combine elements and text easily
711
        # so text is in ".tail" of element
712
        xccdfvarsub = ElementTree.SubElement(fix, "sub", idref=varname)
713
        xccdfvarsub.tail = text_between_vars
714