Test Failed
Push — master ( 8bef30...0a5b5b )
by Matěj
02:23 queued 12s
created

Remediation.associate_rule()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 2
dl 0
loc 3
ccs 3
cts 3
cp 1
crap 1
rs 10
c 0
b 0
f 0
1 2
from __future__ import absolute_import
2 2
from __future__ import print_function
3
4 2
import sys
5 2
import os
6 2
import os.path
7 2
import re
8 2
from collections import defaultdict, namedtuple, OrderedDict
9
10 2
import ssg.yaml
11 2
from . import build_yaml
12 2
from . import rules
13 2
from . import utils
14
15 2
from . import constants
16 2
from .jinja import process_file_with_macros as jinja_process_file
17
18 2
from .xml import ElementTree
19
20 2
REMEDIATION_TO_EXT_MAP = {
21
    'anaconda': '.anaconda',
22
    'ansible': '.yml',
23
    'bash': '.sh',
24
    'puppet': '.pp',
25
    'ignition': '.yml',
26
    'kubernetes': '.yml',
27
    'blueprint': '.toml'
28
}
29
30 2
PKG_MANAGER_TO_PACKAGE_CHECK_COMMAND = {
31
    'apt_get': "dpkg-query --show --showformat='${{db:Status-Status}}\\n' '{0}' 2>/dev/null " +
32
               "| grep -q installed",
33
    'dnf': 'rpm --quiet -q {0}',
34
    'yum': 'rpm --quiet -q {0}',
35
    'zypper': 'rpm --quiet -q {0}',
36
}
37
38 2
FILE_GENERATED_HASH_COMMENT = '# THIS FILE IS GENERATED'
39
40 2
REMEDIATION_CONFIG_KEYS = ['complexity', 'disruption', 'platform', 'reboot',
41
                           'strategy']
42 2
REMEDIATION_ELM_KEYS = ['complexity', 'disruption', 'reboot', 'strategy']
43
44
45 2
def get_fixgroup_for_type(fixcontent, remediation_type):
46
    """
47
    For a given remediation type, return a new subelement of that type.
48
49
    Exits if passed an unknown remediation type.
50
    """
51
    if remediation_type == 'anaconda':
52
        return ElementTree.SubElement(
53
            fixcontent, "fix-group", id="anaconda",
54
            system="urn:redhat:anaconda:pre",
55
            xmlns="http://checklists.nist.gov/xccdf/1.1")
56
57
    elif remediation_type == 'ansible':
58
        return ElementTree.SubElement(
59
            fixcontent, "fix-group", id="ansible",
60
            system="urn:xccdf:fix:script:ansible",
61
            xmlns="http://checklists.nist.gov/xccdf/1.1")
62
63
    elif remediation_type == 'bash':
64
        return ElementTree.SubElement(
65
            fixcontent, "fix-group", id="bash",
66
            system="urn:xccdf:fix:script:sh",
67
            xmlns="http://checklists.nist.gov/xccdf/1.1")
68
69
    elif remediation_type == 'puppet':
70
        return ElementTree.SubElement(
71
            fixcontent, "fix-group", id="puppet",
72
            system="urn:xccdf:fix:script:puppet",
73
            xmlns="http://checklists.nist.gov/xccdf/1.1")
74
75
    elif remediation_type == 'ignition':
76
        return ElementTree.SubElement(
77
            fixcontent, "fix-group", id="ignition",
78
            system="urn:xccdf:fix:script:ignition",
79
            xmlns="http://checklists.nist.gov/xccdf/1.1")
80
81
    elif remediation_type == 'kubernetes':
82
        return ElementTree.SubElement(
83
            fixcontent, "fix-group", id="kubernetes",
84
            system="urn:xccdf:fix:script:kubernetes",
85
            xmlns="http://checklists.nist.gov/xccdf/1.1")
86
87
    elif remediation_type == 'blueprint':
88
        return ElementTree.SubElement(
89
            fixcontent, "fix-group", id="blueprint",
90
            system="urn:redhat:osbuild:blueprint",
91
            xmlns="http://checklists.nist.gov/xccdf/1.1")
92
93
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
94
                     % (remediation_type))
95
    sys.exit(1)
96
97
98 2
def is_supported_filename(remediation_type, filename):
99
    """
100
    Checks if filename has a supported extension for remediation_type.
101
102
    Exits when remediation_type is of an unknown type.
103
    """
104 2
    if remediation_type in REMEDIATION_TO_EXT_MAP:
105 2
        return filename.endswith(REMEDIATION_TO_EXT_MAP[remediation_type])
106
107
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
108
                     % (remediation_type))
109
    sys.exit(1)
110
111
112 2
def split_remediation_content_and_metadata(fix_file):
113 2
    remediation_contents = []
114 2
    config = defaultdict(lambda: None)
115
116
    # Assignment automatically escapes shell characters for XML
117 2
    for line in fix_file.splitlines():
118 2
        if line.startswith(FILE_GENERATED_HASH_COMMENT):
119
            continue
120
121 2
        if line.startswith('#') and line.count('=') == 1:
122 2
            (key, value) = line.strip('#').split('=')
123 2
            if key.strip() in REMEDIATION_CONFIG_KEYS:
124 2
                config[key.strip()] = value.strip()
125 2
                continue
126
127
        # If our parsed line wasn't a config item, add it to the
128
        # returned file contents. This includes when the line
129
        # begins with a '#' and contains an equals sign, but
130
        # the "key" isn't one of the known keys from
131
        # REMEDIATION_CONFIG_KEYS.
132 2
        remediation_contents.append(line)
133
134 2
    contents = "\n".join(remediation_contents)
135 2
    remediation = namedtuple('remediation', ['contents', 'config'])
136 2
    return remediation(contents=contents, config=config)
137
138
139 2
def parse_from_file_with_jinja(file_path, env_yaml):
140
    """
141
    Parses a remediation from a file. As remediations contain jinja macros,
142
    we need a env_yaml context to process these. In practice, no remediations
143
    use jinja in the configuration, so for extracting only the configuration,
144
    env_yaml can be an abritrary product.yml dictionary.
145
146
    If the logic of configuration parsing changes significantly, please also
147
    update ssg.fixes.parse_platform(...).
148
    """
149
150 2
    fix_file = jinja_process_file(file_path, env_yaml)
151 2
    return split_remediation_content_and_metadata(fix_file)
152
153
154 2
def parse_from_file_without_jinja(file_path):
155
    """
156
    Parses a remediation from a file. Doesn't process the Jinja macros.
157
    This function is useful in build phases in which all the Jinja macros
158
    are already resolved.
159
    """
160
    with open(file_path, "r") as f:
161
        f_str = f.read()
162
        return split_remediation_content_and_metadata(f_str)
163
164
165 2
class Remediation(object):
166 2
    def __init__(self, file_path, remediation_type):
167 2
        self.file_path = file_path
168 2
        self.local_env_yaml = dict()
169
170 2
        self.metadata = defaultdict(lambda: None)
171
172 2
        self.remediation_type = remediation_type
173 2
        self.associated_rule = None
174
175 2
    def associate_rule(self, rule_obj):
176 2
        self.associated_rule = rule_obj
177 2
        self.expand_env_yaml_from_rule()
178
179 2
    def expand_env_yaml_from_rule(self):
180 2
        if not self.associated_rule:
181
            return
182
183 2
        self.local_env_yaml["rule_title"] = self.associated_rule.title
184 2
        self.local_env_yaml["rule_id"] = self.associated_rule.id_
185 2
        self.local_env_yaml["cce_identifiers"] = self.associated_rule.identifiers
186
187 2
    def parse_from_file_with_jinja(self, env_yaml):
188 2
        return parse_from_file_with_jinja(self.file_path, env_yaml)
189
190
191 2
def process(remediation, env_yaml):
192
    """
193
    Process a fix, and return the processed fix iff the file is of a valid
194
    extension for the remediation type and the fix is valid for the current
195
    product.
196
197
    Note that platform is a required field in the contents of the fix.
198
    """
199 2
    if not is_supported_filename(remediation.remediation_type, remediation.file_path):
200
        return
201
202 2
    result = remediation.parse_from_file_with_jinja(env_yaml)
203 2
    platforms = result.config['platform']
204
205 2
    if not platforms:
206
        raise RuntimeError(
207
            "The '%s' remediation script does not contain the "
208
            "platform identifier!" % (remediation.file_path))
209
210 2
    for platform in platforms.split(","):
211 2
        if platform.strip() != platform:
212
            msg = (
213
                "Comma-separated '{platform}' platforms "
214
                "in '{remediation_file}' contains whitespace."
215
                .format(platform=platforms, remediation_file=remediation.file_path))
216
            raise ValueError(msg)
217
218 2
    product = env_yaml["product"]
219 2
    if utils.is_applicable_for_product(platforms, product):
220 2
        return result
221
222
    return None
223
224
225 2
class BashRemediation(Remediation):
226 2
    def __init__(self, file_path):
227 2
        super(BashRemediation, self).__init__(file_path, "bash")
228
229 2
    def parse_from_file_with_jinja(self, env_yaml):
230 2
        self.local_env_yaml.update(env_yaml)
231 2
        result = super(BashRemediation, self).parse_from_file_with_jinja(self.local_env_yaml)
232
233
        # Avoid platform wrapping empty fix text
234
        # Remediations can be empty when a Jinja macro or conditional
235
        # renders no fix text for a product
236 2
        stripped_fix_text = result.contents.strip()
237 2
        if stripped_fix_text == "":
238
            return result
239
240 2
        rule_specific_platforms = set()
241 2
        inherited_platforms = set()
242 2
        if self.associated_rule:
243
            # There can be repeated inherited platforms and rule platforms
244
            inherited_platforms.update(self.associated_rule.inherited_platforms)
245
            if self.associated_rule.platforms is not None:
246
                rule_specific_platforms = {
247
                    p for p in self.associated_rule.platforms if p not in inherited_platforms}
248
249 2
        inherited_conditionals = [
250
            self.generate_platform_conditional(p) for p in inherited_platforms]
251 2
        rule_specific_conditionals = [
252
            self.generate_platform_conditional(p) for p in rule_specific_platforms]
253
        # remove potential "None" from lists
254 2
        inherited_conditionals = sorted([
255
            p for p in inherited_conditionals if p is not None])
256 2
        rule_specific_conditionals = sorted([
257
            p for p in rule_specific_conditionals if p is not None])
258
259 2
        if inherited_conditionals or rule_specific_conditionals:
260
            wrapped_fix_text = ["# Remediation is applicable only in certain platforms"]
261
262
            all_conditions = ""
263
            if inherited_conditionals:
264
                all_conditions += " && ".join(inherited_conditionals)
265
            if rule_specific_conditionals:
266
                if all_conditions:
267
                    all_conditions += " && { " + " || ".join(rule_specific_conditionals) + "; }"
268
                else:
269
                    all_conditions = " || ".join(rule_specific_conditionals)
270
            wrapped_fix_text.append("if {0}; then".format(all_conditions))
271
            wrapped_fix_text.append("")
272
            # It is possible to indent the original body of the remediation with textwrap.indent(),
273
            # however, it is not supported by python2, and there is a risk of breaking remediations
274
            # For example, remediations with a here-doc block could be affected.
275
            wrapped_fix_text.append("{0}".format(stripped_fix_text))
276
            wrapped_fix_text.append("")
277
            wrapped_fix_text.append("else")
278
            wrapped_fix_text.append(
279
                "    >&2 echo 'Remediation is not applicable, nothing was done'")
280
            wrapped_fix_text.append("fi")
281
282
            remediation = namedtuple('remediation', ['contents', 'config'])
283
            result = remediation(contents="\n".join(wrapped_fix_text), config=result.config)
284
285 2
        return result
286
287 2
    def generate_platform_conditional(self, platform):
288
        if platform == "machine":
289
            # Based on check installed_env_is_a_container
290
            return '[ ! -f /.dockerenv ] && [ ! -f /run/.containerenv ]'
291
        elif platform is not None:
292
            # Assume any other platform is a Package CPE
293
294
            # Some package names are different from the platform names
295
            if platform in self.local_env_yaml["platform_package_overrides"]:
296
                platform = self.local_env_yaml["platform_package_overrides"].get(platform)
297
298
                # Workaround for platforms that are not Package CPEs
299
                # Skip platforms that are not about packages installed
300
                # These should be handled in the remediation itself
301
                if not platform:
302
                    return
303
304
            # Adjust package check command according to the pkg_manager
305
            pkg_manager = self.local_env_yaml["pkg_manager"]
306
            pkg_check_command = PKG_MANAGER_TO_PACKAGE_CHECK_COMMAND[pkg_manager]
307
            return pkg_check_command.format(platform)
308
309
310 2
class AnsibleRemediation(Remediation):
311 2
    def __init__(self, file_path):
312 2
        super(AnsibleRemediation, self).__init__(
313
            file_path, "ansible")
314
315 2
        self.body = None
316
317 2
    def parse_from_file_with_jinja(self, env_yaml):
318 2
        self.local_env_yaml.update(env_yaml)
319 2
        result = super(AnsibleRemediation, self).parse_from_file_with_jinja(self.local_env_yaml)
320
321 2
        if not self.associated_rule:
322
            return result
323
324 2
        parsed = ssg.yaml.ordered_load(result.contents)
325
326 2
        self.update(parsed, result.config)
327
328 2
        updated_yaml_text = ssg.yaml.ordered_dump(
329
            parsed, None, default_flow_style=False)
330 2
        result = result._replace(contents=updated_yaml_text)
331
332 2
        self.body = parsed
333 2
        self.metadata = result.config
334
335 2
        return result
336
337 2
    def update_tags_from_config(self, to_update, config):
338 2
        tags = to_update.get("tags", [])
339 2
        if "strategy" in config:
340 2
            tags.append("{0}_strategy".format(config["strategy"]))
341 2
        if "complexity" in config:
342 2
            tags.append("{0}_complexity".format(config["complexity"]))
343 2
        if "disruption" in config:
344 2
            tags.append("{0}_disruption".format(config["disruption"]))
345 2
        if "reboot" in config:
346 2
            if config["reboot"] == "true":
347
                reboot_tag = "reboot_required"
348
            else:
349 2
                reboot_tag = "no_reboot_needed"
350 2
            tags.append(reboot_tag)
351 2
        to_update["tags"] = sorted(tags)
352
353 2
    def update_tags_from_rule(self, to_update):
354 2
        if not self.associated_rule:
355
            raise RuntimeError("The Ansible snippet has no rule loaded.")
356
357 2
        tags = to_update.get("tags", [])
358 2
        tags.insert(0, "{0}_severity".format(self.associated_rule.severity))
359 2
        tags.insert(0, self.associated_rule.id_)
360
361 2
        cce_num = self._get_cce()
362 2
        if cce_num:
363 2
            tags.append("{0}".format(cce_num))
364
365 2
        refs = self.get_references()
366 2
        tags.extend(refs)
367 2
        to_update["tags"] = sorted(tags)
368
369 2
    def _get_cce(self):
370 2
        return self.associated_rule.identifiers.get("cce", None)
371
372 2
    def get_references(self):
373 2
        if not self.associated_rule:
374
            raise RuntimeError("The Ansible snippet has no rule loaded.")
375
376 2
        result = []
377 2
        for ref_class, prefix in constants.REF_PREFIX_MAP.items():
378 2
            refs = self._get_rule_reference(ref_class)
379 2
            result.extend(["{prefix}-{value}".format(prefix=prefix, value=v) for v in refs])
380 2
        return result
381
382 2
    def _get_rule_reference(self, ref_class):
383 2
        refs = self.associated_rule.references.get(ref_class, "")
384 2
        if refs:
385 2
            return refs.split(",")
386
        else:
387 2
            return []
388
389 2
    def inject_package_facts_task(self, parsed_snippet):
390
        """ Injects a package_facts task only if
391
            the snippet has a task with a when clause with ansible_facts.packages,
392
            and the snippet doesn't already have a package_facts task
393
        """
394 2
        has_package_facts_task = False
395 2
        has_ansible_facts_packages_clause = False
396
397 2
        for p_task in parsed_snippet:
398
            # We are only interested in the OrderedDicts, which represent Ansible tasks
399 2
            if not isinstance(p_task, dict):
400
                continue
401
402 2
            if "package_facts" in p_task:
403
                has_package_facts_task = True
404
405
            # When clause of the task can be string or a list, lets normalize to list
406 2
            task_when = p_task.get("when", "")
407 2
            if type(task_when) is str:
408 2
                task_when = [task_when]
409 2
            for when in task_when:
410 2
                if "ansible_facts.packages" in when:
411
                    has_ansible_facts_packages_clause = True
412
413 2
        if has_ansible_facts_packages_clause and not has_package_facts_task:
414
            facts_task = OrderedDict({'name': 'Gather the package facts',
415
                                      'package_facts': {'manager': 'auto'}})
416
            parsed_snippet.insert(0, facts_task)
417
418 2
    def update_when_from_rule(self, to_update):
419 2
        additional_when = []
420
421
        # There can be repeated inherited platforms and rule platforms
422 2
        inherited_platforms = set()
423 2
        rule_specific_platforms = set()
424 2
        inherited_platforms.update(self.associated_rule.inherited_platforms)
425 2
        if self.associated_rule.platforms is not None:
426 2
            rule_specific_platforms = {
427
                p for p in self.associated_rule.platforms if p not in inherited_platforms}
428
429 2
        inherited_conditionals = [
430
            self.generate_platform_conditional(p) for p in inherited_platforms]
431 2
        rule_specific_conditionals = [
432
            self.generate_platform_conditional(p) for p in rule_specific_platforms]
433
        # remove potential "None" from lists
434 2
        inherited_conditionals = sorted([
435
            p for p in inherited_conditionals if p is not None])
436 2
        rule_specific_conditionals = sorted([
437
            p for p in rule_specific_conditionals if p is not None])
438
439
        # remove conditionals related to package CPEs if the updated task
440
        # collects package facts
441 2
        if "package_facts" in to_update:
442
            inherited_conditionals = [
443
                c for c in inherited_conditionals if "in ansible_facts.packages" not in c]
444
            rule_specific_conditionals = [
445
                c for c in rule_specific_conditionals if "in ansible_facts.packages" not in c]
446
447 2
        if inherited_conditionals:
448
            additional_when = additional_when + inherited_conditionals
449
450 2
        if rule_specific_conditionals:
451 2
            additional_when.append(" or ".join(rule_specific_conditionals))
452
453 2
        to_update.setdefault("when", "")
454 2
        new_when = ssg.yaml.update_yaml_list_or_string(to_update["when"], additional_when,
455
                                                       prepend=True)
456 2
        if not new_when:
457
            to_update.pop("when")
458
        else:
459 2
            to_update["when"] = new_when
460
461 2
    def update(self, parsed, config):
462
        # We split the remediation update in three steps
463
464
        # 1. Update the when clause
465 2
        for p in parsed:
466 2
            if not isinstance(p, dict):
467
                continue
468 2
            self.update_when_from_rule(p)
469
470
        # 2. Inject any extra task necessary
471 2
        self.inject_package_facts_task(parsed)
472
473
        # 3. Add tags to all tasks, including the ones we have injected
474 2
        for p in parsed:
475 2
            if not isinstance(p, dict):
476
                continue
477 2
            self.update_tags_from_config(p, config)
478 2
            self.update_tags_from_rule(p)
479
480 2
    @classmethod
481
    def from_snippet_and_rule(cls, snippet_fname, rule_fname):
482 2
        if os.path.isfile(snippet_fname) and os.path.isfile(rule_fname):
483 2
            result = cls(snippet_fname)
484 2
            try:
485 2
                rule_obj = build_yaml.Rule.from_yaml(rule_fname)
486 2
                result.associate_rule(rule_obj)
487
            except ssg.yaml.DocumentationNotComplete:
488
                # Happens on non-debug build when a rule is "documentation-incomplete"
489
                return None
490 2
            return result
491
492 2
    def generate_platform_conditional(self, platform):
493 2
        if platform == "machine":
494 2
            return 'ansible_virtualization_type not in '\
495
                '["docker", "lxc", "openvz", "podman", "container"]'
496
        elif platform is not None:
497
            # Assume any other platform is a Package CPE
498
499
            if platform in self.local_env_yaml["platform_package_overrides"]:
500
                platform = self.local_env_yaml["platform_package_overrides"].get(platform)
501
502
                # Workaround for platforms that are not Package CPEs
503
                # Skip platforms that are not about packages installed
504
                # These should be handled in the remediation itself
505
                if not platform:
506
                    return
507
508
            return '"' + platform + '" in ansible_facts.packages'
509
510
511 2
class AnacondaRemediation(Remediation):
512 2
    def __init__(self, file_path):
513
        super(AnacondaRemediation, self).__init__(
514
            file_path, "anaconda")
515
516
517 2
class PuppetRemediation(Remediation):
518 2
    def __init__(self, file_path):
519
        super(PuppetRemediation, self).__init__(
520
            file_path, "puppet")
521
522
523 2
class IgnitionRemediation(Remediation):
524 2
    def __init__(self, file_path):
525
        super(IgnitionRemediation, self).__init__(
526
            file_path, "ignition")
527
528
529 2
class KubernetesRemediation(Remediation):
530 2
    def __init__(self, file_path):
531
        super(KubernetesRemediation, self).__init__(
532
              file_path, "kubernetes")
533
534
535 2
class BlueprintRemediation(Remediation):
536
    """
537
    This provides class for OSBuild Blueprint remediations
538
    """
539 2
    def __init__(self, file_path):
540
        super(BlueprintRemediation, self).__init__(
541
            file_path, "blueprint")
542
543
544 2
REMEDIATION_TO_CLASS = {
545
    'anaconda': AnacondaRemediation,
546
    'ansible': AnsibleRemediation,
547
    'bash': BashRemediation,
548
    'puppet': PuppetRemediation,
549
    'ignition': IgnitionRemediation,
550
    'kubernetes': KubernetesRemediation,
551
    'blueprint': BlueprintRemediation,
552
}
553
554
555 2
def write_fixes_to_xml(remediation_type, build_dir, output_path, fixes):
556
    """
557
    Builds a fix-content XML tree from the contents of fixes
558
    and writes it to output_path.
559
    """
560
561
    fixcontent = ElementTree.Element("fix-content", system="urn:xccdf:fix:script:sh",
562
                                     xmlns="http://checklists.nist.gov/xccdf/1.1")
563
    fixgroup = get_fixgroup_for_type(fixcontent, remediation_type)
564
565
    for fix_name in fixes:
566
        fix_contents, config = fixes[fix_name]
567
568
        fix_elm = ElementTree.SubElement(fixgroup, "fix")
569
        fix_elm.set("rule", fix_name)
570
571
        for key in REMEDIATION_ELM_KEYS:
572
            if config[key]:
573
                fix_elm.set(key, config[key])
574
575
        fix_elm.text = fix_contents + "\n"
576
577
        # Expand shell variables and remediation functions
578
        # into corresponding XCCDF <sub> elements
579
        expand_xccdf_subs(fix_elm, remediation_type)
580
581
    tree = ElementTree.ElementTree(fixcontent)
582
    tree.write(output_path)
583
584
585 2
def write_fix_to_file(fix, file_path):
586
    """
587
    Writes a single fix to the given file path.
588
    """
589
    fix_contents, config = fix
590
    with open(file_path, "w") as f:
591
        for k, v in config.items():
592
            f.write("# %s = %s\n" % (k, v))
593
        f.write(fix_contents)
594
595
596 2
def write_fixes_to_dir(fixes, remediation_type, output_dir):
597
    """
598
    Writes fixes as files to output_dir, each fix as a separate file
599
    """
600
    try:
601
        extension = REMEDIATION_TO_EXT_MAP[remediation_type]
602
    except KeyError:
603
        raise ValueError("Unknown remediation type %s." % remediation_type)
604
605
    if not os.path.exists(output_dir):
606
        os.makedirs(output_dir)
607
    for fix_name, fix in fixes.items():
608
        fix_path = os.path.join(output_dir, fix_name + extension)
609
        write_fix_to_file(fix, fix_path)
610
611
612 2
def get_rule_dir_remediations(dir_path, remediation_type, product=None):
613
    """
614
    Gets a list of remediations of type remediation_type contained in a
615
    rule directory. If product is None, returns all such remediations.
616
    If product is not None, returns applicable remediations in order of
617
    priority:
618
619
        {{{ product }}}.ext -> shared.ext
620
621
    Only returns remediations which exist.
622
    """
623
624 2
    if not rules.is_rule_dir(dir_path):
625
        return []
626
627 2
    remediations_dir = os.path.join(dir_path, remediation_type)
628 2
    has_remediations_dir = os.path.isdir(remediations_dir)
629 2
    ext = REMEDIATION_TO_EXT_MAP[remediation_type]
630 2
    if not has_remediations_dir:
631
        return []
632
633
    # Two categories of results: those for a product and those that are
634
    # shared to multiple products. Within common results, there's two types:
635
    # those shared to multiple versions of the same type (added up front) and
636
    # those shared across multiple product types (e.g., RHEL and Ubuntu).
637 2
    product_results = []
638 2
    common_results = []
639 2
    for remediation_file in sorted(os.listdir(remediations_dir)):
640 2
        file_name, file_ext = os.path.splitext(remediation_file)
641 2
        remediation_path = os.path.join(remediations_dir, remediation_file)
642
643 2
        if file_ext == ext and rules.applies_to_product(file_name, product):
644
            # rules.applies_to_product ensures we only have three entries:
645
            # 1. shared
646
            # 2. <product>
647
            # 3. <product><version>
648
            #
649
            # Note that the product variable holds <product><version>.
650 2
            if file_name == 'shared':
651
                # Shared are the lowest priority items, add them to the end
652
                # of the common results.
653
                common_results.append(remediation_path)
654 2
            elif file_name != product:
655
                # Here, the filename is a subset of the product, but isn't
656
                # the full product. Product here is both the product name
657
                # (e.g., ubuntu) and its version (2004). Filename could be
658
                # either "ubuntu" or "ubuntu2004" so we want this branch
659
                # to trigger when it is the former, not the latter. It is
660
                # the highest priority of common results, so insert it
661
                # before any shared ones.
662 2
                common_results.insert(0, remediation_path)
663
            else:
664
                # Finally, this must be product-specific result.
665 2
                product_results.append(remediation_path)
666
667
    # Combine the two sets in priority order.
668 2
    return product_results + common_results
669
670
671 2
def expand_xccdf_subs(fix, remediation_type):
672
    """Expand the respective populate keywords of each
673
    remediation type with an <xccdf:sub> element
674
675
    This routine translates any instance of the '`type`-populate' keyword in
676
    the form of:
677
678
            (`type`-populate variable_name)
679
680
    where `type` can be either ansible, puppet, anaconda or bash, into
681
682
            <sub idref="variable_name"/>
683
684
    """
685
686
    if fix is not None:
687
        fix_text = fix.text
688
    else:
689
        return
690
    if remediation_type == "ignition":
691
        return
692
    elif remediation_type == "kubernetes":
693
        return
694
    elif remediation_type == "blueprint":
695
        return
696
    elif remediation_type == "ansible":
697
698
        if "(ansible-populate " in fix_text:
699
            raise RuntimeError(
700
                "(ansible-populate VAR) has been deprecated. Please use "
701
                "(xccdf-var VAR) instead. Keep in mind that the latter will "
702
                "make an ansible variable out of XCCDF Value as opposed to "
703
                "substituting directly."
704
            )
705
706
        # If you change this string make sure it still matches the pattern
707
        # defined in OpenSCAP. Otherwise you break variable handling in
708
        # 'oscap xccdf generate fix' and the variables won't be customizable!
709
        # https://github.com/OpenSCAP/openscap/blob/1.2.17/src/XCCDF_POLICY/xccdf_policy_remediate.c#L588
710
        #   const char *pattern =
711
        #     "- name: XCCDF Value [^ ]+ # promote to variable\n  set_fact:\n"
712
        #     "    ([^:]+): (.+)\n  tags:\n    - always\n";
713
        # We use !!str typecast to prevent treating values as different types
714
        # eg. yes as a bool or 077 as an octal number
715
        fix_text = re.sub(
716
            r"- \(xccdf-var\s+(\S+)\)",
717
            r"- name: XCCDF Value \1 # promote to variable\n"
718
            r"  set_fact:\n"
719
            r"    \1: !!str (ansible-populate \1)\n"
720
            r"  tags:\n"
721
            r"    - always",
722
            fix_text
723
        )
724
725
        pattern = r'\(ansible-populate\s*(\S+)\)'
726
727
    elif remediation_type == "puppet":
728
        pattern = r'\(puppet-populate\s*(\S+)\)'
729
730
    elif remediation_type == "anaconda":
731
        pattern = r'\(anaconda-populate\s*(\S+)\)'
732
733
    elif remediation_type == "bash":
734
        pattern = r'\(bash-populate\s*(\S+)\)'
735
736
    else:
737
        sys.stderr.write("Unknown remediation type '%s'\n" % (remediation_type))
738
        sys.exit(1)
739
740
    # we will get list what looks like
741
    # [text, varname, text, varname, ..., text]
742
    parts = re.split(pattern, fix_text)
0 ignored issues
show
introduced by
The variable pattern does not seem to be defined for all execution paths.
Loading history...
743
744
    fix.text = parts[0]  # add first "text"
745
    for index in range(1, len(parts), 2):
746
        varname = parts[index]
747
        text_between_vars = parts[index + 1]
748
749
        # we cannot combine elements and text easily
750
        # so text is in ".tail" of element
751
        xccdfvarsub = ElementTree.SubElement(fix, "sub", idref=varname)
752
        xccdfvarsub.tail = text_between_vars
753