Test Failed
Push — master ( 9b9e3a...2d11b6 )
by Jan
02:26 queued 13s
created

AnsibleRemediation.update_when_from_rule()   B

Complexity

Conditions 6

Size

Total Lines 42
Code Lines 31

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 19
CRAP Score 6.1893

Importance

Changes 0
Metric Value
cc 6
eloc 31
nop 2
dl 0
loc 42
ccs 19
cts 23
cp 0.8261
crap 6.1893
rs 8.2026
c 0
b 0
f 0
1 2
from __future__ import absolute_import
2 2
from __future__ import print_function
3
4 2
import sys
5 2
import os
6 2
import os.path
7 2
import re
8 2
from collections import defaultdict, namedtuple, OrderedDict
9
10 2
import ssg.yaml
11 2
from . import build_yaml
12 2
from . import rules
13 2
from . import utils
14
15 2
from . import constants
16 2
from .jinja import process_file_with_macros as jinja_process_file
17
18 2
from .xml import ElementTree
19
20 2
REMEDIATION_TO_EXT_MAP = {
21
    'anaconda': '.anaconda',
22
    'ansible': '.yml',
23
    'bash': '.sh',
24
    'puppet': '.pp',
25
    'ignition': '.yml',
26
    'kubernetes': '.yml',
27
    'blueprint': '.toml'
28
}
29
30 2
PKG_MANAGER_TO_PACKAGE_CHECK_COMMAND = {
31
    'apt_get': "dpkg-query --show --showformat='${{db:Status-Status}}\\n' '{0}' 2>/dev/null " +
32
               "| grep -q installed",
33
    'dnf': 'rpm --quiet -q {0}',
34
    'yum': 'rpm --quiet -q {0}',
35
    'zypper': 'rpm --quiet -q {0}',
36
}
37
38 2
FILE_GENERATED_HASH_COMMENT = '# THIS FILE IS GENERATED'
39
40 2
REMEDIATION_CONFIG_KEYS = ['complexity', 'disruption', 'platform', 'reboot',
41
                           'strategy']
42 2
REMEDIATION_ELM_KEYS = ['complexity', 'disruption', 'reboot', 'strategy']
43
44
45 2
def get_fixgroup_for_type(fixcontent, remediation_type):
46
    """
47
    For a given remediation type, return a new subelement of that type.
48
49
    Exits if passed an unknown remediation type.
50
    """
51
    if remediation_type == 'anaconda':
52
        return ElementTree.SubElement(
53
            fixcontent, "fix-group", id="anaconda",
54
            system="urn:redhat:anaconda:pre",
55
            xmlns="http://checklists.nist.gov/xccdf/1.1")
56
57
    elif remediation_type == 'ansible':
58
        return ElementTree.SubElement(
59
            fixcontent, "fix-group", id="ansible",
60
            system="urn:xccdf:fix:script:ansible",
61
            xmlns="http://checklists.nist.gov/xccdf/1.1")
62
63
    elif remediation_type == 'bash':
64
        return ElementTree.SubElement(
65
            fixcontent, "fix-group", id="bash",
66
            system="urn:xccdf:fix:script:sh",
67
            xmlns="http://checklists.nist.gov/xccdf/1.1")
68
69
    elif remediation_type == 'puppet':
70
        return ElementTree.SubElement(
71
            fixcontent, "fix-group", id="puppet",
72
            system="urn:xccdf:fix:script:puppet",
73
            xmlns="http://checklists.nist.gov/xccdf/1.1")
74
75
    elif remediation_type == 'ignition':
76
        return ElementTree.SubElement(
77
            fixcontent, "fix-group", id="ignition",
78
            system="urn:xccdf:fix:script:ignition",
79
            xmlns="http://checklists.nist.gov/xccdf/1.1")
80
81
    elif remediation_type == 'kubernetes':
82
        return ElementTree.SubElement(
83
            fixcontent, "fix-group", id="kubernetes",
84
            system="urn:xccdf:fix:script:kubernetes",
85
            xmlns="http://checklists.nist.gov/xccdf/1.1")
86
87
    elif remediation_type == 'blueprint':
88
        return ElementTree.SubElement(
89
            fixcontent, "fix-group", id="blueprint",
90
            system="urn:redhat:osbuild:blueprint",
91
            xmlns="http://checklists.nist.gov/xccdf/1.1")
92
93
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
94
                     % (remediation_type))
95
    sys.exit(1)
96
97
98 2
def is_supported_filename(remediation_type, filename):
99
    """
100
    Checks if filename has a supported extension for remediation_type.
101
102
    Exits when remediation_type is of an unknown type.
103
    """
104 2
    if remediation_type in REMEDIATION_TO_EXT_MAP:
105 2
        return filename.endswith(REMEDIATION_TO_EXT_MAP[remediation_type])
106
107
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
108
                     % (remediation_type))
109
    sys.exit(1)
110
111
112 2
def split_remediation_content_and_metadata(fix_file):
113 2
    remediation_contents = []
114 2
    config = defaultdict(lambda: None)
115
116
    # Assignment automatically escapes shell characters for XML
117 2
    for line in fix_file.splitlines():
118 2
        if line.startswith(FILE_GENERATED_HASH_COMMENT):
119
            continue
120
121 2
        if line.startswith('#') and line.count('=') == 1:
122 2
            (key, value) = line.strip('#').split('=')
123 2
            if key.strip() in REMEDIATION_CONFIG_KEYS:
124 2
                config[key.strip()] = value.strip()
125 2
                continue
126
127
        # If our parsed line wasn't a config item, add it to the
128
        # returned file contents. This includes when the line
129
        # begins with a '#' and contains an equals sign, but
130
        # the "key" isn't one of the known keys from
131
        # REMEDIATION_CONFIG_KEYS.
132 2
        remediation_contents.append(line)
133
134 2
    contents = "\n".join(remediation_contents)
135 2
    remediation = namedtuple('remediation', ['contents', 'config'])
136 2
    return remediation(contents=contents, config=config)
137
138
139 2
def parse_from_file_with_jinja(file_path, env_yaml):
140
    """
141
    Parses a remediation from a file. As remediations contain jinja macros,
142
    we need a env_yaml context to process these. In practice, no remediations
143
    use jinja in the configuration, so for extracting only the configuration,
144
    env_yaml can be an abritrary product.yml dictionary.
145
146
    If the logic of configuration parsing changes significantly, please also
147
    update ssg.fixes.parse_platform(...).
148
    """
149
150 2
    fix_file = jinja_process_file(file_path, env_yaml)
151 2
    return split_remediation_content_and_metadata(fix_file)
152
153
154 2
def parse_from_file_without_jinja(file_path):
155
    """
156
    Parses a remediation from a file. Doesn't process the Jinja macros.
157
    This function is useful in build phases in which all the Jinja macros
158
    are already resolved.
159
    """
160
    with open(file_path, "r") as f:
161
        f_str = f.read()
162
        return split_remediation_content_and_metadata(f_str)
163
164
165 2
class Remediation(object):
166 2
    def __init__(self, file_path, remediation_type):
167 2
        self.file_path = file_path
168 2
        self.local_env_yaml = dict()
169
170 2
        self.metadata = defaultdict(lambda: None)
171
172 2
        self.remediation_type = remediation_type
173 2
        self.associated_rule = None
174
175 2
    def load_rule_from(self, rule_path):
176 2
        self.associated_rule = build_yaml.Rule.from_yaml(rule_path)
177 2
        self.expand_env_yaml_from_rule()
178
179 2
    def expand_env_yaml_from_rule(self):
180 2
        if not self.associated_rule:
181
            return
182
183 2
        self.local_env_yaml["rule_title"] = self.associated_rule.title
184 2
        self.local_env_yaml["rule_id"] = self.associated_rule.id_
185 2
        self.local_env_yaml["cce_identifiers"] = self.associated_rule.identifiers
186
187 2
    def parse_from_file_with_jinja(self, env_yaml):
188 2
        return parse_from_file_with_jinja(self.file_path, env_yaml)
189
190
191 2
def process(remediation, env_yaml, fixes, rule_id):
192
    """
193
    Process a fix, adding it to fixes iff the file is of a valid extension
194
    for the remediation type and the fix is valid for the current product.
195
196
    Note that platform is a required field in the contents of the fix.
197
    """
198 2
    if not is_supported_filename(remediation.remediation_type, remediation.file_path):
199
        return
200
201 2
    result = remediation.parse_from_file_with_jinja(env_yaml)
202 2
    platforms = result.config['platform']
203
204 2
    if not platforms:
205
        raise RuntimeError(
206
            "The '%s' remediation script does not contain the "
207
            "platform identifier!" % (remediation.file_path))
208
209 2
    for platform in platforms.split(","):
210 2
        if platform.strip() != platform:
211
            msg = (
212
                "Comma-separated '{platform}' platforms "
213
                "in '{remediation_file}' contains whitespace."
214
                .format(platform=platforms, remediation_file=remediation.file_path))
215
            raise ValueError(msg)
216
217 2
    product = env_yaml["product"]
218 2
    if utils.is_applicable_for_product(platforms, product):
219 2
        fixes[rule_id] = result
220
221 2
    return result
222
223
224 2
class BashRemediation(Remediation):
225 2
    def __init__(self, file_path):
226 2
        super(BashRemediation, self).__init__(file_path, "bash")
227
228 2
    def parse_from_file_with_jinja(self, env_yaml):
229 2
        self.local_env_yaml.update(env_yaml)
230 2
        result = super(BashRemediation, self).parse_from_file_with_jinja(self.local_env_yaml)
231
232
        # Avoid platform wrapping empty fix text
233
        # Remediations can be empty when a Jinja macro or conditional
234
        # renders no fix text for a product
235 2
        stripped_fix_text = result.contents.strip()
236 2
        if stripped_fix_text == "":
237
            return result
238
239 2
        rule_specific_platforms = set()
240 2
        inherited_platforms = set()
241 2
        if self.associated_rule:
242
            # There can be repeated inherited platforms and rule platforms
243
            inherited_platforms.update(self.associated_rule.inherited_platforms)
244
            if self.associated_rule.platforms is not None:
245
                rule_specific_platforms = {
246
                    p for p in self.associated_rule.platforms if p not in inherited_platforms}
247
248 2
        inherited_conditionals = [
249
            self.generate_platform_conditional(p) for p in inherited_platforms]
250 2
        rule_specific_conditionals = [
251
            self.generate_platform_conditional(p) for p in rule_specific_platforms]
252
        # remove potential "None" from lists
253 2
        inherited_conditionals = sorted([
254
            p for p in inherited_conditionals if p is not None])
255 2
        rule_specific_conditionals = sorted([
256
            p for p in rule_specific_conditionals if p is not None])
257
258 2
        if inherited_conditionals or rule_specific_conditionals:
259
            wrapped_fix_text = ["# Remediation is applicable only in certain platforms"]
260
261
            all_conditions = ""
262
            if inherited_conditionals:
263
                all_conditions += " && ".join(inherited_conditionals)
264
            if rule_specific_conditionals:
265
                if all_conditions:
266
                    all_conditions += " && { " + " || ".join(rule_specific_conditionals) + "; }"
267
                else:
268
                    all_conditions = " || ".join(rule_specific_conditionals)
269
            wrapped_fix_text.append("if {0}; then".format(all_conditions))
270
            wrapped_fix_text.append("")
271
            # It is possible to indent the original body of the remediation with textwrap.indent(),
272
            # however, it is not supported by python2, and there is a risk of breaking remediations
273
            # For example, remediations with a here-doc block could be affected.
274
            wrapped_fix_text.append("{0}".format(stripped_fix_text))
275
            wrapped_fix_text.append("")
276
            wrapped_fix_text.append("else")
277
            wrapped_fix_text.append(
278
                "    >&2 echo 'Remediation is not applicable, nothing was done'")
279
            wrapped_fix_text.append("fi")
280
281
            remediation = namedtuple('remediation', ['contents', 'config'])
282
            result = remediation(contents="\n".join(wrapped_fix_text), config=result.config)
283
284 2
        return result
285
286 2
    def generate_platform_conditional(self, platform):
287
        if platform == "machine":
288
            # Based on check installed_env_is_a_container
289
            return '[ ! -f /.dockerenv ] && [ ! -f /run/.containerenv ]'
290
        elif platform is not None:
291
            # Assume any other platform is a Package CPE
292
293
            # Some package names are different from the platform names
294
            if platform in self.local_env_yaml["platform_package_overrides"]:
295
                platform = self.local_env_yaml["platform_package_overrides"].get(platform)
296
297
                # Workaround for platforms that are not Package CPEs
298
                # Skip platforms that are not about packages installed
299
                # These should be handled in the remediation itself
300
                if not platform:
301
                    return
302
303
            # Adjust package check command according to the pkg_manager
304
            pkg_manager = self.local_env_yaml["pkg_manager"]
305
            pkg_check_command = PKG_MANAGER_TO_PACKAGE_CHECK_COMMAND[pkg_manager]
306
            return pkg_check_command.format(platform)
307
308
309 2
class AnsibleRemediation(Remediation):
310 2
    def __init__(self, file_path):
311 2
        super(AnsibleRemediation, self).__init__(
312
            file_path, "ansible")
313
314 2
        self.body = None
315
316 2
    def parse_from_file_with_jinja(self, env_yaml):
317 2
        self.local_env_yaml.update(env_yaml)
318 2
        result = super(AnsibleRemediation, self).parse_from_file_with_jinja(self.local_env_yaml)
319
320 2
        if not self.associated_rule:
321
            return result
322
323 2
        parsed = ssg.yaml.ordered_load(result.contents)
324
325 2
        self.update(parsed, result.config)
326
327 2
        updated_yaml_text = ssg.yaml.ordered_dump(
328
            parsed, None, default_flow_style=False)
329 2
        result = result._replace(contents=updated_yaml_text)
330
331 2
        self.body = parsed
332 2
        self.metadata = result.config
333
334 2
        return result
335
336 2
    def update_tags_from_config(self, to_update, config):
337 2
        tags = to_update.get("tags", [])
338 2
        if "strategy" in config:
339 2
            tags.append("{0}_strategy".format(config["strategy"]))
340 2
        if "complexity" in config:
341 2
            tags.append("{0}_complexity".format(config["complexity"]))
342 2
        if "disruption" in config:
343 2
            tags.append("{0}_disruption".format(config["disruption"]))
344 2
        if "reboot" in config:
345 2
            if config["reboot"] == "true":
346
                reboot_tag = "reboot_required"
347
            else:
348 2
                reboot_tag = "no_reboot_needed"
349 2
            tags.append(reboot_tag)
350 2
        to_update["tags"] = sorted(tags)
351
352 2
    def update_tags_from_rule(self, to_update):
353 2
        if not self.associated_rule:
354
            raise RuntimeError("The Ansible snippet has no rule loaded.")
355
356 2
        tags = to_update.get("tags", [])
357 2
        tags.insert(0, "{0}_severity".format(self.associated_rule.severity))
358 2
        tags.insert(0, self.associated_rule.id_)
359
360 2
        cce_num = self._get_cce()
361 2
        if cce_num:
362 2
            tags.append("{0}".format(cce_num))
363
364 2
        refs = self.get_references()
365 2
        tags.extend(refs)
366 2
        to_update["tags"] = sorted(tags)
367
368 2
    def _get_cce(self):
369 2
        return self.associated_rule.identifiers.get("cce", None)
370
371 2
    def get_references(self):
372 2
        if not self.associated_rule:
373
            raise RuntimeError("The Ansible snippet has no rule loaded.")
374
375 2
        result = []
376 2
        for ref_class, prefix in constants.REF_PREFIX_MAP.items():
377 2
            refs = self._get_rule_reference(ref_class)
378 2
            result.extend(["{prefix}-{value}".format(prefix=prefix, value=v) for v in refs])
379 2
        return result
380
381 2
    def _get_rule_reference(self, ref_class):
382 2
        refs = self.associated_rule.references.get(ref_class, "")
383 2
        if refs:
384 2
            return refs.split(",")
385
        else:
386 2
            return []
387
388 2
    def inject_package_facts_task(self, parsed_snippet):
389
        """ Injects a package_facts task only if
390
            the snippet has a task with a when clause with ansible_facts.packages,
391
            and the snippet doesn't already have a package_facts task
392
        """
393 2
        has_package_facts_task = False
394 2
        has_ansible_facts_packages_clause = False
395
396 2
        for p_task in parsed_snippet:
397
            # We are only interested in the OrderedDicts, which represent Ansible tasks
398 2
            if not isinstance(p_task, dict):
399
                continue
400
401 2
            if "package_facts" in p_task:
402
                has_package_facts_task = True
403
404
            # When clause of the task can be string or a list, lets normalize to list
405 2
            task_when = p_task.get("when", "")
406 2
            if type(task_when) is str:
407 2
                task_when = [task_when]
408 2
            for when in task_when:
409 2
                if "ansible_facts.packages" in when:
410
                    has_ansible_facts_packages_clause = True
411
412 2
        if has_ansible_facts_packages_clause and not has_package_facts_task:
413
            facts_task = OrderedDict({'name': 'Gather the package facts',
414
                                      'package_facts': {'manager': 'auto'}})
415
            parsed_snippet.insert(0, facts_task)
416
417 2
    def update_when_from_rule(self, to_update):
418 2
        additional_when = []
419
420
        # There can be repeated inherited platforms and rule platforms
421 2
        inherited_platforms = set()
422 2
        rule_specific_platforms = set()
423 2
        inherited_platforms.update(self.associated_rule.inherited_platforms)
424 2
        if self.associated_rule.platforms is not None:
425 2
            rule_specific_platforms = {
426
                p for p in self.associated_rule.platforms if p not in inherited_platforms}
427
428 2
        inherited_conditionals = [
429
            self.generate_platform_conditional(p) for p in inherited_platforms]
430 2
        rule_specific_conditionals = [
431
            self.generate_platform_conditional(p) for p in rule_specific_platforms]
432
        # remove potential "None" from lists
433 2
        inherited_conditionals = sorted([
434
            p for p in inherited_conditionals if p is not None])
435 2
        rule_specific_conditionals = sorted([
436
            p for p in rule_specific_conditionals if p is not None])
437
438
        # remove conditionals related to package CPEs if the updated task
439
        # collects package facts
440 2
        if "package_facts" in to_update:
441
            inherited_conditionals = [
442
                c for c in inherited_conditionals if "in ansible_facts.packages" not in c]
443
            rule_specific_conditionals = [
444
                c for c in rule_specific_conditionals if "in ansible_facts.packages" not in c]
445
446 2
        if inherited_conditionals:
447
            additional_when = additional_when + inherited_conditionals
448
449 2
        if rule_specific_conditionals:
450 2
            additional_when.append(" or ".join(rule_specific_conditionals))
451
452 2
        to_update.setdefault("when", "")
453 2
        new_when = ssg.yaml.update_yaml_list_or_string(to_update["when"], additional_when,
454
                                                       prepend=True)
455 2
        if not new_when:
456
            to_update.pop("when")
457
        else:
458 2
            to_update["when"] = new_when
459
460 2
    def update(self, parsed, config):
461
        # We split the remediation update in three steps
462
463
        # 1. Update the when clause
464 2
        for p in parsed:
465 2
            if not isinstance(p, dict):
466
                continue
467 2
            self.update_when_from_rule(p)
468
469
        # 2. Inject any extra task necessary
470 2
        self.inject_package_facts_task(parsed)
471
472
        # 3. Add tags to all tasks, including the ones we have injected
473 2
        for p in parsed:
474 2
            if not isinstance(p, dict):
475
                continue
476 2
            self.update_tags_from_config(p, config)
477 2
            self.update_tags_from_rule(p)
478
479 2
    @classmethod
480
    def from_snippet_and_rule(cls, snippet_fname, rule_fname):
481 2
        if os.path.isfile(snippet_fname) and os.path.isfile(rule_fname):
482 2
            result = cls(snippet_fname)
483 2
            try:
484 2
                result.load_rule_from(rule_fname)
485
            except ssg.yaml.DocumentationNotComplete:
486
                # Happens on non-debug build when a rule is "documentation-incomplete"
487
                return None
488 2
            return result
489
490 2
    def generate_platform_conditional(self, platform):
491 2
        if platform == "machine":
492 2
            return 'ansible_virtualization_type not in '\
493
                '["docker", "lxc", "openvz", "podman", "container"]'
494
        elif platform is not None:
495
            # Assume any other platform is a Package CPE
496
497
            if platform in self.local_env_yaml["platform_package_overrides"]:
498
                platform = self.local_env_yaml["platform_package_overrides"].get(platform)
499
500
                # Workaround for platforms that are not Package CPEs
501
                # Skip platforms that are not about packages installed
502
                # These should be handled in the remediation itself
503
                if not platform:
504
                    return
505
506
            return '"' + platform + '" in ansible_facts.packages'
507
508
509 2
class AnacondaRemediation(Remediation):
510 2
    def __init__(self, file_path):
511
        super(AnacondaRemediation, self).__init__(
512
            file_path, "anaconda")
513
514
515 2
class PuppetRemediation(Remediation):
516 2
    def __init__(self, file_path):
517
        super(PuppetRemediation, self).__init__(
518
            file_path, "puppet")
519
520
521 2
class IgnitionRemediation(Remediation):
522 2
    def __init__(self, file_path):
523
        super(IgnitionRemediation, self).__init__(
524
            file_path, "ignition")
525
526
527 2
class KubernetesRemediation(Remediation):
528 2
    def __init__(self, file_path):
529
        super(KubernetesRemediation, self).__init__(
530
              file_path, "kubernetes")
531
532
533 2
class BlueprintRemediation(Remediation):
534
    """
535
    This provides class for OSBuild Blueprint remediations
536
    """
537 2
    def __init__(self, file_path):
538
        super(BlueprintRemediation, self).__init__(
539
            file_path, "blueprint")
540
541
542 2
REMEDIATION_TO_CLASS = {
543
    'anaconda': AnacondaRemediation,
544
    'ansible': AnsibleRemediation,
545
    'bash': BashRemediation,
546
    'puppet': PuppetRemediation,
547
    'ignition': IgnitionRemediation,
548
    'kubernetes': KubernetesRemediation,
549
    'blueprint': BlueprintRemediation,
550
}
551
552
553 2
def write_fixes_to_xml(remediation_type, build_dir, output_path, fixes):
554
    """
555
    Builds a fix-content XML tree from the contents of fixes
556
    and writes it to output_path.
557
    """
558
559
    fixcontent = ElementTree.Element("fix-content", system="urn:xccdf:fix:script:sh",
560
                                     xmlns="http://checklists.nist.gov/xccdf/1.1")
561
    fixgroup = get_fixgroup_for_type(fixcontent, remediation_type)
562
563
    for fix_name in fixes:
564
        fix_contents, config = fixes[fix_name]
565
566
        fix_elm = ElementTree.SubElement(fixgroup, "fix")
567
        fix_elm.set("rule", fix_name)
568
569
        for key in REMEDIATION_ELM_KEYS:
570
            if config[key]:
571
                fix_elm.set(key, config[key])
572
573
        fix_elm.text = fix_contents + "\n"
574
575
        # Expand shell variables and remediation functions
576
        # into corresponding XCCDF <sub> elements
577
        expand_xccdf_subs(fix_elm, remediation_type)
578
579
    tree = ElementTree.ElementTree(fixcontent)
580
    tree.write(output_path)
581
582
583 2
def write_fixes_to_dir(fixes, remediation_type, output_dir):
584
    """
585
    Writes fixes as files to output_dir, each fix as a separate file
586
    """
587
    try:
588
        extension = REMEDIATION_TO_EXT_MAP[remediation_type]
589
    except KeyError:
590
        raise ValueError("Unknown remediation type %s." % remediation_type)
591
592
    if not os.path.exists(output_dir):
593
        os.makedirs(output_dir)
594
    for fix_name, fix in fixes.items():
595
        fix_contents, config = fix
596
        fix_path = os.path.join(output_dir, fix_name + extension)
597
        with open(fix_path, "w") as f:
598
            for k, v in config.items():
599
                f.write("# %s = %s\n" % (k, v))
600
            f.write(fix_contents)
601
602
603 2
def get_rule_dir_remediations(dir_path, remediation_type, product=None):
604
    """
605
    Gets a list of remediations of type remediation_type contained in a
606
    rule directory. If product is None, returns all such remediations.
607
    If product is not None, returns applicable remediations in order of
608
    priority:
609
610
        {{{ product }}}.ext -> shared.ext
611
612
    Only returns remediations which exist.
613
    """
614
615 2
    if not rules.is_rule_dir(dir_path):
616
        return []
617
618 2
    remediations_dir = os.path.join(dir_path, remediation_type)
619 2
    has_remediations_dir = os.path.isdir(remediations_dir)
620 2
    ext = REMEDIATION_TO_EXT_MAP[remediation_type]
621 2
    if not has_remediations_dir:
622
        return []
623
624
    # Two categories of results: those for a product and those that are
625
    # shared to multiple products. Within common results, there's two types:
626
    # those shared to multiple versions of the same type (added up front) and
627
    # those shared across multiple product types (e.g., RHEL and Ubuntu).
628 2
    product_results = []
629 2
    common_results = []
630 2
    for remediation_file in sorted(os.listdir(remediations_dir)):
631 2
        file_name, file_ext = os.path.splitext(remediation_file)
632 2
        remediation_path = os.path.join(remediations_dir, remediation_file)
633
634 2
        if file_ext == ext and rules.applies_to_product(file_name, product):
635
            # rules.applies_to_product ensures we only have three entries:
636
            # 1. shared
637
            # 2. <product>
638
            # 3. <product><version>
639
            #
640
            # Note that the product variable holds <product><version>.
641 2
            if file_name == 'shared':
642
                # Shared are the lowest priority items, add them to the end
643
                # of the common results.
644
                common_results.append(remediation_path)
645 2
            elif file_name != product:
646
                # Here, the filename is a subset of the product, but isn't
647
                # the full product. Product here is both the product name
648
                # (e.g., ubuntu) and its version (2004). Filename could be
649
                # either "ubuntu" or "ubuntu2004" so we want this branch
650
                # to trigger when it is the former, not the latter. It is
651
                # the highest priority of common results, so insert it
652
                # before any shared ones.
653 2
                common_results.insert(0, remediation_path)
654
            else:
655
                # Finally, this must be product-specific result.
656 2
                product_results.append(remediation_path)
657
658
    # Combine the two sets in priority order.
659 2
    return product_results + common_results
660
661
662 2
def expand_xccdf_subs(fix, remediation_type):
663
    """Expand the respective populate keywords of each
664
    remediation type with an <xccdf:sub> element
665
666
    This routine translates any instance of the '`type`-populate' keyword in
667
    the form of:
668
669
            (`type`-populate variable_name)
670
671
    where `type` can be either ansible, puppet, anaconda or bash, into
672
673
            <sub idref="variable_name"/>
674
675
    """
676
677
    if fix is not None:
678
        fix_text = fix.text
679
    else:
680
        return
681
    if remediation_type == "ignition":
682
        return
683
    elif remediation_type == "kubernetes":
684
        return
685
    elif remediation_type == "blueprint":
686
        return
687
    elif remediation_type == "ansible":
688
689
        if "(ansible-populate " in fix_text:
690
            raise RuntimeError(
691
                "(ansible-populate VAR) has been deprecated. Please use "
692
                "(xccdf-var VAR) instead. Keep in mind that the latter will "
693
                "make an ansible variable out of XCCDF Value as opposed to "
694
                "substituting directly."
695
            )
696
697
        # If you change this string make sure it still matches the pattern
698
        # defined in OpenSCAP. Otherwise you break variable handling in
699
        # 'oscap xccdf generate fix' and the variables won't be customizable!
700
        # https://github.com/OpenSCAP/openscap/blob/1.2.17/src/XCCDF_POLICY/xccdf_policy_remediate.c#L588
701
        #   const char *pattern =
702
        #     "- name: XCCDF Value [^ ]+ # promote to variable\n  set_fact:\n"
703
        #     "    ([^:]+): (.+)\n  tags:\n    - always\n";
704
        # We use !!str typecast to prevent treating values as different types
705
        # eg. yes as a bool or 077 as an octal number
706
        fix_text = re.sub(
707
            r"- \(xccdf-var\s+(\S+)\)",
708
            r"- name: XCCDF Value \1 # promote to variable\n"
709
            r"  set_fact:\n"
710
            r"    \1: !!str (ansible-populate \1)\n"
711
            r"  tags:\n"
712
            r"    - always",
713
            fix_text
714
        )
715
716
        pattern = r'\(ansible-populate\s*(\S+)\)'
717
718
    elif remediation_type == "puppet":
719
        pattern = r'\(puppet-populate\s*(\S+)\)'
720
721
    elif remediation_type == "anaconda":
722
        pattern = r'\(anaconda-populate\s*(\S+)\)'
723
724
    elif remediation_type == "bash":
725
        pattern = r'\(bash-populate\s*(\S+)\)'
726
727
    else:
728
        sys.stderr.write("Unknown remediation type '%s'\n" % (remediation_type))
729
        sys.exit(1)
730
731
    # we will get list what looks like
732
    # [text, varname, text, varname, ..., text]
733
    parts = re.split(pattern, fix_text)
0 ignored issues
show
introduced by
The variable pattern does not seem to be defined for all execution paths.
Loading history...
734
735
    fix.text = parts[0]  # add first "text"
736
    for index in range(1, len(parts), 2):
737
        varname = parts[index]
738
        text_between_vars = parts[index + 1]
739
740
        # we cannot combine elements and text easily
741
        # so text is in ".tail" of element
742
        xccdfvarsub = ElementTree.SubElement(fix, "sub", idref=varname)
743
        xccdfvarsub.tail = text_between_vars
744