ssg.build_remediations   F
last analyzed

Complexity

Total Complexity 120

Size/Duplication

Total Lines 648
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
eloc 390
dl 0
loc 648
ccs 0
cts 337
cp 0
rs 2
c 0
b 0
f 0
wmc 120

9 Functions

Rating   Name   Duplication   Size   Complexity  
B split_remediation_content_and_metadata() 0 24 7
A is_supported_filename() 0 12 2
C expand_xccdf_subs() 0 84 11
A load_compiled_remediations() 0 17 5
A write_fix_to_file() 0 9 3
A parse_from_file_with_jinja() 0 13 1
A parse_from_file_without_jinja() 0 9 2
B process() 0 32 6
B get_rule_dir_remediations() 0 57 8

28 Methods

Rating   Name   Duplication   Size   Complexity  
A BashRemediation.__init__() 0 2 1
A Remediation.get_inherited_cpe_platform_names() 0 6 2
A Remediation.expand_env_yaml_from_rule() 0 7 2
A Remediation.get_stripped_conditionals() 0 12 3
A Remediation.parse_from_file_with_jinja() 0 2 1
C AnsibleRemediation.inject_package_facts_task() 0 30 9
B AnsibleRemediation.update_when_from_rule() 0 27 7
A BlueprintRemediation.__init__() 0 3 1
A AnsibleRemediation.parse_from_file_with_jinja() 0 20 2
A AnsibleRemediation._get_cce() 0 2 1
A Remediation._get_stripped_conditional() 0 7 3
A AnsibleRemediation.get_references() 0 9 3
B BashRemediation.parse_from_file_with_jinja() 0 42 7
A PuppetRemediation.__init__() 0 3 1
A Remediation.get_inherited_conditionals() 0 3 1
A AnsibleRemediation.from_snippet_and_rule() 0 11 4
A IgnitionRemediation.__init__() 0 3 1
A AnsibleRemediation.update_tags_from_rule() 0 15 3
A KubernetesRemediation.__init__() 0 3 1
B AnsibleRemediation.update_tags_from_config() 0 15 6
A Remediation.associate_rule() 0 3 1
A Remediation.get_rule_specific_conditionals() 0 3 1
A AnsibleRemediation._get_rule_reference() 0 6 2
A AnsibleRemediation.update() 0 18 5
A Remediation.__init__() 0 8 2
A AnacondaRemediation.__init__() 0 3 1
A AnsibleRemediation.__init__() 0 5 1
A Remediation.get_rule_specific_cpe_platform_names() 0 8 3

How to fix   Complexity   

Complexity

Complex classes like ssg.build_remediations often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from __future__ import absolute_import
2
from __future__ import print_function
3
4
import sys
5
import os
6
import os.path
7
import re
8
from collections import defaultdict, namedtuple, OrderedDict
9
10
import ssg.yaml
11
import ssg.build_yaml
12
from . import rules
13
from . import utils
14
15
from . import constants
16
from .jinja import process_file_with_macros as jinja_process_file
17
18
from .xml import ElementTree
19
from .constants import XCCDF12_NS
20
21
REMEDIATION_TO_EXT_MAP = {
22
    'anaconda': '.anaconda',
23
    'ansible': '.yml',
24
    'bash': '.sh',
25
    'puppet': '.pp',
26
    'ignition': '.yml',
27
    'kubernetes': '.yml',
28
    'blueprint': '.toml'
29
}
30
31
32
FILE_GENERATED_HASH_COMMENT = '# THIS FILE IS GENERATED'
33
34
REMEDIATION_CONFIG_KEYS = ['complexity', 'disruption', 'platform', 'reboot',
35
                           'strategy']
36
REMEDIATION_ELM_KEYS = ['complexity', 'disruption', 'reboot', 'strategy']
37
38
RemediationObject = namedtuple('remediation', ['contents', 'config'])
39
40
41
def is_supported_filename(remediation_type, filename):
42
    """
43
    Checks if filename has a supported extension for remediation_type.
44
45
    Exits when remediation_type is of an unknown type.
46
    """
47
    if remediation_type in REMEDIATION_TO_EXT_MAP:
48
        return filename.endswith(REMEDIATION_TO_EXT_MAP[remediation_type])
49
50
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
51
                     % (remediation_type))
52
    sys.exit(1)
53
54
55
def split_remediation_content_and_metadata(fix_file):
56
    remediation_contents = []
57
    config = defaultdict(lambda: None)
58
59
    # Assignment automatically escapes shell characters for XML
60
    for line in fix_file.splitlines():
61
        if line.startswith(FILE_GENERATED_HASH_COMMENT):
62
            continue
63
64
        if line.startswith('#') and line.count('=') == 1:
65
            (key, value) = line.strip('#').split('=')
66
            if key.strip() in REMEDIATION_CONFIG_KEYS:
67
                config[key.strip()] = value.strip()
68
                continue
69
70
        # If our parsed line wasn't a config item, add it to the
71
        # returned file contents. This includes when the line
72
        # begins with a '#' and contains an equals sign, but
73
        # the "key" isn't one of the known keys from
74
        # REMEDIATION_CONFIG_KEYS.
75
        remediation_contents.append(line)
76
77
    contents = "\n".join(remediation_contents)
78
    return RemediationObject(contents=contents, config=config)
79
80
81
def parse_from_file_with_jinja(file_path, env_yaml):
82
    """
83
    Parses a remediation from a file. As remediations contain jinja macros,
84
    we need a env_yaml context to process these. In practice, no remediations
85
    use jinja in the configuration, so for extracting only the configuration,
86
    env_yaml can be an abritrary product.yml dictionary.
87
88
    If the logic of configuration parsing changes significantly, please also
89
    update ssg.fixes.parse_platform(...).
90
    """
91
92
    fix_file = jinja_process_file(file_path, env_yaml)
93
    return split_remediation_content_and_metadata(fix_file)
94
95
96
def parse_from_file_without_jinja(file_path):
97
    """
98
    Parses a remediation from a file. Doesn't process the Jinja macros.
99
    This function is useful in build phases in which all the Jinja macros
100
    are already resolved.
101
    """
102
    with open(file_path, "r") as f:
103
        f_str = f.read()
104
        return split_remediation_content_and_metadata(f_str)
105
106
107
class Remediation(object):
108
    def __init__(self, file_path, remediation_type):
109
        self.file_path = file_path
110
        self.local_env_yaml = dict()
111
112
        self.metadata = defaultdict(lambda: None)
113
114
        self.remediation_type = remediation_type
115
        self.associated_rule = None
116
117
    def associate_rule(self, rule_obj):
118
        self.associated_rule = rule_obj
119
        self.expand_env_yaml_from_rule()
120
121
    def expand_env_yaml_from_rule(self):
122
        if not self.associated_rule:
123
            return
124
125
        self.local_env_yaml["rule_title"] = self.associated_rule.title
126
        self.local_env_yaml["rule_id"] = self.associated_rule.id_
127
        self.local_env_yaml["cce_identifiers"] = self.associated_rule.identifiers
128
129
    def parse_from_file_with_jinja(self, env_yaml, cpe_platforms):
130
        return parse_from_file_with_jinja(self.file_path, env_yaml)
131
132
    def get_inherited_cpe_platform_names(self):
133
        inherited_cpe_platform_names = set()
134
        if self.associated_rule:
135
            # There can be repeated inherited platforms and rule platforms
136
            inherited_cpe_platform_names.update(self.associated_rule.inherited_cpe_platform_names)
137
        return inherited_cpe_platform_names
138
139
    def get_rule_specific_cpe_platform_names(self):
140
        rule_specific_cpe_platform_names = set()
141
        inherited_cpe_platform_names = self.get_inherited_cpe_platform_names()
142
        if self.associated_rule and self.associated_rule.cpe_platform_names is not None:
143
            rule_specific_cpe_platform_names = {
144
                p for p in self.associated_rule.cpe_platform_names
145
                if p not in inherited_cpe_platform_names}
146
        return rule_specific_cpe_platform_names
147
148
    def _get_stripped_conditional(self, language, platform):
149
        conditional = platform.get_remediation_conditional(language)
150
        if conditional is not None:
151
            stripped_conditional = conditional.strip()
152
            if stripped_conditional:
153
                return stripped_conditional
154
        return None
155
156
    def get_stripped_conditionals(self, language, cpe_platform_names, cpe_platforms):
157
        """
158
        collect conditionals of platforms defined by cpe_platform_names
159
        and strip them of white spaces
160
        """
161
        stripped_conditionals = []
162
        for p in cpe_platform_names:
163
            platform = cpe_platforms[p]
164
            maybe_stripped_conditional = self._get_stripped_conditional(language, platform)
165
            if maybe_stripped_conditional is not None:
166
                stripped_conditionals.append(maybe_stripped_conditional)
167
        return stripped_conditionals
168
169
    def get_rule_specific_conditionals(self, language, cpe_platforms):
170
        cpe_platform_names = self.get_rule_specific_cpe_platform_names()
171
        return self.get_stripped_conditionals(language, cpe_platform_names, cpe_platforms)
172
173
    def get_inherited_conditionals(self, language, cpe_platforms):
174
        cpe_platform_names = self.get_inherited_cpe_platform_names()
175
        return self.get_stripped_conditionals(language, cpe_platform_names, cpe_platforms)
176
177
178
def process(remediation, env_yaml, cpe_platforms):
179
    """
180
    Process a fix, and return the processed fix iff the file is of a valid
181
    extension for the remediation type and the fix is valid for the current
182
    product.
183
184
    Note that platform is a required field in the contents of the fix.
185
    """
186
    if not is_supported_filename(remediation.remediation_type, remediation.file_path):
187
        return
188
189
    result = remediation.parse_from_file_with_jinja(env_yaml, cpe_platforms)
190
    platforms = result.config['platform']
191
192
    if not platforms:
193
        raise RuntimeError(
194
            "The '%s' remediation script does not contain the "
195
            "platform identifier!" % (remediation.file_path))
196
197
    for platform in platforms.split(","):
198
        if platform.strip() != platform:
199
            msg = (
200
                "Comma-separated '{platform}' platforms "
201
                "in '{remediation_file}' contains whitespace."
202
                .format(platform=platforms, remediation_file=remediation.file_path))
203
            raise ValueError(msg)
204
205
    product = env_yaml["product"]
206
    if utils.is_applicable_for_product(platforms, product):
207
        return result
208
209
    return None
210
211
212
class BashRemediation(Remediation):
213
    def __init__(self, file_path):
214
        super(BashRemediation, self).__init__(file_path, "bash")
215
216
    def parse_from_file_with_jinja(self, env_yaml, cpe_platforms):
217
        self.local_env_yaml.update(env_yaml)
218
        result = super(BashRemediation, self).parse_from_file_with_jinja(
219
            self.local_env_yaml, cpe_platforms)
220
221
        # Avoid platform wrapping empty fix text
222
        # Remediations can be empty when a Jinja macro or conditional
223
        # renders no fix text for a product
224
        stripped_fix_text = result.contents.strip()
225
        if stripped_fix_text == "":
226
            return result
227
228
        inherited_conditionals = sorted(super(
229
            BashRemediation, self).get_inherited_conditionals("bash", cpe_platforms))
230
        rule_specific_conditionals = sorted(super(
231
            BashRemediation, self).get_rule_specific_conditionals("bash", cpe_platforms))
232
        if inherited_conditionals or rule_specific_conditionals:
233
            wrapped_fix_text = ["# Remediation is applicable only in certain platforms"]
234
235
            all_conditions = ""
236
            if inherited_conditionals:
237
                all_conditions += " && ".join(inherited_conditionals)
238
            if rule_specific_conditionals:
239
                if all_conditions:
240
                    all_conditions += " && { " + " || ".join(rule_specific_conditionals) + "; }"
241
                else:
242
                    all_conditions = " || ".join(rule_specific_conditionals)
243
            wrapped_fix_text.append("if {0}; then".format(all_conditions))
244
            wrapped_fix_text.append("")
245
            # It is possible to indent the original body of the remediation with textwrap.indent(),
246
            # however, it is not supported by python2, and there is a risk of breaking remediations
247
            # For example, remediations with a here-doc block could be affected.
248
            wrapped_fix_text.append("{0}".format(stripped_fix_text))
249
            wrapped_fix_text.append("")
250
            wrapped_fix_text.append("else")
251
            wrapped_fix_text.append(
252
                "    >&2 echo 'Remediation is not applicable, nothing was done'")
253
            wrapped_fix_text.append("fi")
254
255
            result = RemediationObject(contents="\n".join(wrapped_fix_text), config=result.config)
256
257
        return result
258
259
260
class AnsibleRemediation(Remediation):
261
    def __init__(self, file_path):
262
        super(AnsibleRemediation, self).__init__(
263
            file_path, "ansible")
264
265
        self.body = None
266
267
    def parse_from_file_with_jinja(self, env_yaml, cpe_platforms):
268
        self.local_env_yaml.update(env_yaml)
269
        result = super(AnsibleRemediation, self).parse_from_file_with_jinja(
270
            self.local_env_yaml, cpe_platforms)
271
272
        if not self.associated_rule:
273
            return result
274
275
        parsed = ssg.yaml.ordered_load(result.contents)
276
277
        self.update(parsed, result.config, cpe_platforms)
278
279
        updated_yaml_text = ssg.yaml.ordered_dump(
280
            parsed, None, default_flow_style=False)
281
        result = result._replace(contents=updated_yaml_text)
282
283
        self.body = parsed
284
        self.metadata = result.config
285
286
        return result
287
288
    def update_tags_from_config(self, to_update, config):
289
        tags = to_update.get("tags", [])
290
        if "strategy" in config:
291
            tags.append("{0}_strategy".format(config["strategy"]))
292
        if "complexity" in config:
293
            tags.append("{0}_complexity".format(config["complexity"]))
294
        if "disruption" in config:
295
            tags.append("{0}_disruption".format(config["disruption"]))
296
        if "reboot" in config:
297
            if config["reboot"] == "true":
298
                reboot_tag = "reboot_required"
299
            else:
300
                reboot_tag = "no_reboot_needed"
301
            tags.append(reboot_tag)
302
        to_update["tags"] = sorted(tags)
303
304
    def update_tags_from_rule(self, to_update):
305
        if not self.associated_rule:
306
            raise RuntimeError("The Ansible snippet has no rule loaded.")
307
308
        tags = to_update.get("tags", [])
309
        tags.insert(0, "{0}_severity".format(self.associated_rule.severity))
310
        tags.insert(0, self.associated_rule.id_)
311
312
        cce_num = self._get_cce()
313
        if cce_num:
314
            tags.append("{0}".format(cce_num))
315
316
        refs = self.get_references()
317
        tags.extend(refs)
318
        to_update["tags"] = sorted(tags)
319
320
    def _get_cce(self):
321
        return self.associated_rule.identifiers.get("cce", None)
322
323
    def get_references(self):
324
        if not self.associated_rule:
325
            raise RuntimeError("The Ansible snippet has no rule loaded.")
326
327
        result = []
328
        for ref_class, prefix in constants.REF_PREFIX_MAP.items():
329
            refs = self._get_rule_reference(ref_class)
330
            result.extend(["{prefix}-{value}".format(prefix=prefix, value=v) for v in refs])
331
        return result
332
333
    def _get_rule_reference(self, ref_class):
334
        refs = self.associated_rule.references.get(ref_class, "")
335
        if refs:
336
            return refs.split(",")
337
        else:
338
            return []
339
340
    def inject_package_facts_task(self, parsed_snippet):
341
        """ Injects a package_facts task only if
342
            the snippet has a task with a when clause with ansible_facts.packages,
343
            and the snippet doesn't already have a package_facts task
344
        """
345
        has_package_facts_task = False
346
        has_ansible_facts_packages_clause = False
347
348
        for p_task in parsed_snippet:
349
            # We are only interested in the OrderedDicts, which represent Ansible tasks
350
            if not isinstance(p_task, dict):
351
                continue
352
353
            if "package_facts" in p_task:
354
                has_package_facts_task = True
355
356
            # When clause of the task can be string or a list, lets normalize to list
357
            task_when = p_task.get("when", "")
358
            if type(task_when) is str:
359
                task_when = [task_when]
360
            for when in task_when:
361
                if "ansible_facts.packages" in when:
362
                    has_ansible_facts_packages_clause = True
363
364
        if has_ansible_facts_packages_clause and not has_package_facts_task:
365
            facts_task = OrderedDict([
366
                ('name', 'Gather the package facts'),
367
                ('package_facts', {'manager': 'auto'})
368
            ])
369
            parsed_snippet.insert(0, facts_task)
370
371
    def update_when_from_rule(self, to_update, cpe_platforms):
372
        additional_when = []
373
        inherited_conditionals = sorted(super(
374
            AnsibleRemediation, self).get_inherited_conditionals("ansible", cpe_platforms))
375
        rule_specific_conditionals = sorted(super(
376
            AnsibleRemediation, self).get_rule_specific_conditionals("ansible", cpe_platforms))
377
        # Remove conditionals related to package CPEs if the updated task collects package facts
378
        if "package_facts" in to_update:
379
            inherited_conditionals = filter(
380
                lambda c: "in ansible_facts.packages" not in c,
381
                inherited_conditionals)
382
            rule_specific_conditionals = filter(
383
                lambda c: "in ansible_facts.packages" not in c, rule_specific_conditionals)
384
385
        if inherited_conditionals:
386
            additional_when.extend(inherited_conditionals)
387
388
        if rule_specific_conditionals:
389
            additional_when.append(" or ".join(rule_specific_conditionals))
390
391
        to_update.setdefault("when", "")
392
        new_when = ssg.yaml.update_yaml_list_or_string(to_update["when"], additional_when,
393
                                                       prepend=True)
394
        if not new_when:
395
            to_update.pop("when")
396
        else:
397
            to_update["when"] = new_when
398
399
    def update(self, parsed, config, cpe_platforms):
400
        # We split the remediation update in three steps
401
402
        # 1. Update the when clause
403
        for p in parsed:
404
            if not isinstance(p, dict):
405
                continue
406
            self.update_when_from_rule(p, cpe_platforms)
407
408
        # 2. Inject any extra task necessary
409
        self.inject_package_facts_task(parsed)
410
411
        # 3. Add tags to all tasks, including the ones we have injected
412
        for p in parsed:
413
            if not isinstance(p, dict):
414
                continue
415
            self.update_tags_from_config(p, config)
416
            self.update_tags_from_rule(p)
417
418
    @classmethod
419
    def from_snippet_and_rule(cls, snippet_fname, rule_fname):
420
        if os.path.isfile(snippet_fname) and os.path.isfile(rule_fname):
421
            result = cls(snippet_fname)
422
            try:
423
                rule_obj = ssg.build_yaml.Rule.from_yaml(rule_fname)
424
                result.associate_rule(rule_obj)
425
            except ssg.yaml.DocumentationNotComplete:
426
                # Happens on non-debug build when a rule is "documentation-incomplete"
427
                return None
428
            return result
429
430
431
class AnacondaRemediation(Remediation):
432
    def __init__(self, file_path):
433
        super(AnacondaRemediation, self).__init__(
434
            file_path, "anaconda")
435
436
437
class PuppetRemediation(Remediation):
438
    def __init__(self, file_path):
439
        super(PuppetRemediation, self).__init__(
440
            file_path, "puppet")
441
442
443
class IgnitionRemediation(Remediation):
444
    def __init__(self, file_path):
445
        super(IgnitionRemediation, self).__init__(
446
            file_path, "ignition")
447
448
449
class KubernetesRemediation(Remediation):
450
    def __init__(self, file_path):
451
        super(KubernetesRemediation, self).__init__(
452
              file_path, "kubernetes")
453
454
455
class BlueprintRemediation(Remediation):
456
    """
457
    This provides class for OSBuild Blueprint remediations
458
    """
459
    def __init__(self, file_path):
460
        super(BlueprintRemediation, self).__init__(
461
            file_path, "blueprint")
462
463
464
REMEDIATION_TO_CLASS = {
465
    'anaconda': AnacondaRemediation,
466
    'ansible': AnsibleRemediation,
467
    'bash': BashRemediation,
468
    'puppet': PuppetRemediation,
469
    'ignition': IgnitionRemediation,
470
    'kubernetes': KubernetesRemediation,
471
    'blueprint': BlueprintRemediation,
472
}
473
474
475
def write_fix_to_file(fix, file_path):
476
    """
477
    Writes a single fix to the given file path.
478
    """
479
    fix_contents, config = fix
480
    with open(file_path, "w") as f:
481
        for k, v in config.items():
482
            f.write("# %s = %s\n" % (k, v))
483
        f.write(fix_contents)
484
485
486
def get_rule_dir_remediations(dir_path, remediation_type, product=None):
487
    """
488
    Gets a list of remediations of type remediation_type contained in a
489
    rule directory. If product is None, returns all such remediations.
490
    If product is not None, returns applicable remediations in order of
491
    priority:
492
493
        {{{ product }}}.ext -> shared.ext
494
495
    Only returns remediations which exist.
496
    """
497
498
    if not rules.is_rule_dir(dir_path):
499
        return []
500
501
    remediations_dir = os.path.join(dir_path, remediation_type)
502
    has_remediations_dir = os.path.isdir(remediations_dir)
503
    ext = REMEDIATION_TO_EXT_MAP[remediation_type]
504
    if not has_remediations_dir:
505
        return []
506
507
    # Two categories of results: those for a product and those that are
508
    # shared to multiple products. Within common results, there's two types:
509
    # those shared to multiple versions of the same type (added up front) and
510
    # those shared across multiple product types (e.g., RHEL and Ubuntu).
511
    product_results = []
512
    common_results = []
513
    for remediation_file in sorted(os.listdir(remediations_dir)):
514
        file_name, file_ext = os.path.splitext(remediation_file)
515
        remediation_path = os.path.join(remediations_dir, remediation_file)
516
517
        if file_ext == ext and rules.applies_to_product(file_name, product):
518
            # rules.applies_to_product ensures we only have three entries:
519
            # 1. shared
520
            # 2. <product>
521
            # 3. <product><version>
522
            #
523
            # Note that the product variable holds <product><version>.
524
            if file_name == 'shared':
525
                # Shared are the lowest priority items, add them to the end
526
                # of the common results.
527
                common_results.append(remediation_path)
528
            elif file_name != product:
529
                # Here, the filename is a subset of the product, but isn't
530
                # the full product. Product here is both the product name
531
                # (e.g., ubuntu) and its version (2004). Filename could be
532
                # either "ubuntu" or "ubuntu2004" so we want this branch
533
                # to trigger when it is the former, not the latter. It is
534
                # the highest priority of common results, so insert it
535
                # before any shared ones.
536
                common_results.insert(0, remediation_path)
537
            else:
538
                # Finally, this must be product-specific result.
539
                product_results.append(remediation_path)
540
541
    # Combine the two sets in priority order.
542
    return product_results + common_results
543
544
545
def expand_xccdf_subs(fix, remediation_type):
546
    """Expand the respective populate keywords of each
547
    remediation type with an <xccdf:sub> element
548
549
    This routine translates any instance of the '`type`-populate' keyword in
550
    the form of:
551
552
            (`type`-populate variable_name)
553
554
    where `type` can be either ansible, puppet, anaconda or bash, into
555
556
            <sub idref="variable_name"/>
557
558
    """
559
560
    if fix is not None:
561
        fix_text = fix.text
562
    else:
563
        return
564
    if remediation_type == "ignition":
565
        return
566
    elif remediation_type == "kubernetes":
567
        return
568
    elif remediation_type == "blueprint":
569
        pattern = r'\(blueprint-populate\s*(\S+)\)'
570
    elif remediation_type == "ansible":
571
572
        if "(ansible-populate " in fix_text:
573
            raise RuntimeError(
574
                "(ansible-populate VAR) has been deprecated. Please use "
575
                "(xccdf-var VAR) instead. Keep in mind that the latter will "
576
                "make an ansible variable out of XCCDF Value as opposed to "
577
                "substituting directly."
578
            )
579
580
        # If you change this string make sure it still matches the pattern
581
        # defined in OpenSCAP. Otherwise you break variable handling in
582
        # 'oscap xccdf generate fix' and the variables won't be customizable!
583
        # https://github.com/OpenSCAP/openscap/blob/1.2.17/src/XCCDF_POLICY/xccdf_policy_remediate.c#L588
584
        #   const char *pattern =
585
        #     "- name: XCCDF Value [^ ]+ # promote to variable\n  set_fact:\n"
586
        #     "    ([^:]+): (.+)\n  tags:\n    - always\n";
587
        # We use !!str typecast to prevent treating values as different types
588
        # eg. yes as a bool or 077 as an octal number
589
        fix_text = re.sub(
590
            r"- \(xccdf-var\s+(\S+)\)",
591
            r"- name: XCCDF Value \1 # promote to variable\n"
592
            r"  set_fact:\n"
593
            r"    \1: !!str (ansible-populate \1)\n"
594
            r"  tags:\n"
595
            r"    - always",
596
            fix_text
597
        )
598
599
        pattern = r'\(ansible-populate\s*(\S+)\)'
600
601
    elif remediation_type == "puppet":
602
        pattern = r'\(puppet-populate\s*(\S+)\)'
603
604
    elif remediation_type == "anaconda":
605
        pattern = r'\(anaconda-populate\s*(\S+)\)'
606
607
    elif remediation_type == "bash":
608
        pattern = r'\(bash-populate\s*(\S+)\)'
609
610
    else:
611
        sys.stderr.write("Unknown remediation type '%s'\n" % (remediation_type))
612
        sys.exit(1)
613
614
    # we will get list what looks like
615
    # [text, varname, text, varname, ..., text]
616
    parts = re.split(pattern, fix_text)
0 ignored issues
show
introduced by
The variable pattern does not seem to be defined for all execution paths.
Loading history...
617
618
    fix.text = parts[0]  # add first "text"
619
    for index in range(1, len(parts), 2):
620
        varname = parts[index]
621
        text_between_vars = parts[index + 1]
622
623
        # we cannot combine elements and text easily
624
        # so text is in ".tail" of element
625
        xccdfvarsub = ElementTree.SubElement(
626
            fix, "{%s}sub" % XCCDF12_NS, idref=constants.OSCAP_VALUE + varname)
627
        xccdfvarsub.tail = text_between_vars
628
        xccdfvarsub.set("use", "legacy")
629
630
631
def load_compiled_remediations(fixes_dir):
632
    if not os.path.isdir(fixes_dir):
633
        raise RuntimeError(
634
            "Directory with compiled fixes '%s' does not exist" % fixes_dir)
635
    all_remediations = defaultdict(dict)
636
    for language in os.listdir(fixes_dir):
637
        language_dir = os.path.join(fixes_dir, language)
638
        if not os.path.isdir(language_dir):
639
            raise RuntimeError(
640
                "Can't find the '%s' directory with fixes for %s" %
641
                (language_dir, language))
642
        for filename in sorted(os.listdir(language_dir)):
643
            file_path = os.path.join(language_dir, filename)
644
            rule_id, _ = os.path.splitext(filename)
645
            remediation = parse_from_file_without_jinja(file_path)
646
            all_remediations[rule_id][language] = remediation
647
    return all_remediations
648