Passed
Push — master ( 4c49b2...31e49c )
by Matěj
04:23 queued 11s
created

ssg.ansible.remove_trailing_whitespace()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.125

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 5
rs 10
c 0
b 0
f 0
ccs 1
cts 2
cp 0.5
crap 1.125
1
"""
2
Common functions for processing Ansible in SSG
3
"""
4
5 2
from __future__ import absolute_import
6 2
from __future__ import print_function
7
8 2
import re
9
10 2
from collections import OrderedDict
11
12 2
from .constants import ansible_version_requirement_pre_task_name
13 2
from .constants import min_ansible_version
14 2
from ssg import build_yaml
15 2
from ssg import build_remediations
16 2
from ssg import yaml
17
18
19 2
def add_minimum_version(ansible_src):
20
    """
21
    Adds minimum ansible version to an Ansible script
22
    """
23 2
    pre_task = (""" - hosts: all
24
   pre_tasks:
25
     - name: %s
26
       assert:
27
         that: "ansible_version.full is version_compare('%s', '>=')"
28
         msg: >
29
           "You must update Ansible to at least version %s to use this role."
30
          """ % (ansible_version_requirement_pre_task_name,
31
                 min_ansible_version, min_ansible_version))
32
33 2
    if ' - hosts: all' not in ansible_src:
34 2
        return ansible_src
35
36 2
    if 'pre_task' in ansible_src:
37 2
        if 'ansible_version.full is version_compare' in ansible_src:
38 2
            return ansible_src
39
40 2
        raise ValueError(
41
            "A pre_task already exists in ansible_src; failing to process: %s" %
42
            ansible_src)
43
44 2
    return ansible_src.replace(" - hosts: all", pre_task, 1)
45
46
47 2
def remove_multiple_blank_lines(ansible_src):
48
    """
49
    Removes multiple blank lines in an Ansible script
50
    """
51
    return re.sub(r'\n\s*\n', '\n\n', ansible_src)
52
53
54 2
def remove_trailing_whitespace(ansible_src):
55
    """
56
    Removes trailing whitespace in an Ansible script
57
    """
58
    return re.sub(r'[ \t]+$', '', ansible_src, 0, flags=re.M)
59
60
61 2
def _strings_to_list(one_or_more_strings):
62
    """
63
    Output a list, that either contains one string, or a list of strings.
64
    In Python, strings can be cast to lists without error, but with unexpected result.
65
    """
66 2
    if isinstance(one_or_more_strings, str):
67 2
        return [one_or_more_strings]
68
    else:
69 2
        return list(one_or_more_strings)
70
71
72 2
def update_yaml_list_or_string(current_contents, new_contents):
73 2
    result = []
74 2
    if current_contents:
75 2
        result += _strings_to_list(current_contents)
76 2
    if new_contents:
77 2
        result += _strings_to_list(new_contents)
78 2
    if not result:
79 2
        result = ""
80 2
    if len(result) == 1:
81 2
        result = result[0]
82 2
    return result
83
84
85 2
class AnsibleRemediation(object):
86
87 2
    def __init__(self, contents, config):
88 2
        self.contents = contents
89 2
        self.config = config
90
91 2
        self.parsed = yaml.ordered_load(contents)
92
93 2
        self.rule = None
94
95 2
    def update_tags_from_config(self, to_update):
96 2
        tags = to_update.get("tags", [])
97 2
        if "strategy" in self.config:
98 2
            tags.append("{0}_strategy".format(self.config["strategy"]))
99 2
        if "complexity" in self.config:
100 2
            tags.append("{0}_complexity".format(self.config["complexity"]))
101 2
        if "disruption" in self.config:
102 2
            tags.append("{0}_disruption".format(self.config["disruption"]))
103 2
        if "reboot" in self.config:
104 2
            if self.config["reboot"] == "true":
105
                reboot_tag = "reboot_required"
106
            else:
107 2
                reboot_tag = "no_reboot_needed"
108 2
            tags.append(reboot_tag)
109 2
        to_update["tags"] = tags
110
111 2
    def update_tags_from_rule(self, platform, to_update):
112 2
        if not self.rule:
113
            raise RuntimeError("The Ansible snippet has no rule loaded.")
114
115 2
        tags = to_update.get("tags", [])
116 2
        tags.insert(0, "{0}_severity".format(self.rule.severity))
117 2
        tags.insert(0, self.rule.id_)
118
119 2
        cce_num = self._get_cce(platform)
120 2
        if cce_num:
121 2
            tags.append("CCE-{0}".format(cce_num))
122
123 2
        refs = self.get_references(platform)
124 2
        tags.extend(refs)
125 2
        to_update["tags"] = tags
126
127 2
    def _get_cce(self, platform):
128 2
        our_cce = None
129 2
        for cce, val in self.rule.identifiers.items():
130 2
            if cce.endswith(platform):
131 2
                our_cce = val
132 2
                break
133 2
        return our_cce
134
135 2
    def get_references(self, platform):
136 2
        if not self.rule:
137
            raise RuntimeError("The Ansible snippet has no rule loaded.")
138
        # see xccdf-addremediations.xslt <- shared_constants.xslt <- shared_shorthand2xccdf.xslt
139
        # if you want to know how the map was constructed
140 2
        platform_id_map = {
141
            "rhel6": "RHEL-06",
142
            "rhel7": "RHEL-07",
143
            "rhel8": "RHEL-08",
144
        }
145 2
        stig_platform_id = "DISA-STIG-{id}".format(id=platform_id_map.get(platform, None))
146 2
        ref_prefix_map = {
147
            "nist": "NIST-800-53",
148
            "cui": "NIST-800-171",
149
            "pcidss": "PCI-DSS",
150
            "cjis": "CJIS",
151
            "stigid@{platform}".format(platform=platform): stig_platform_id
152
        }
153 2
        result = []
154 2
        for ref_class, prefix in ref_prefix_map.items():
155 2
            refs = self._get_rule_reference(ref_class)
156 2
            result.extend(["{prefix}-{value}".format(prefix=prefix, value=v) for v in refs])
157 2
        return result
158
159 2
    def _get_rule_reference(self, ref_class):
160 2
        refs = self.rule.references.get(ref_class, "")
161 2
        if refs:
162 2
            return refs.split(",")
163
        else:
164 2
            return []
165
166 2
    def update_when_from_rule(self, to_update):
167 2
        additional_when = ""
168 2
        if self.rule.platform == "machine":
169 2
            additional_when = ('ansible_virtualization_role != "guest" '
170
                               'or ansible_virtualization_type != "docker"')
171 2
        to_update.setdefault("when", "")
172 2
        new_when = update_yaml_list_or_string(to_update["when"], additional_when)
173 2
        if not new_when:
174
            to_update.pop("when")
175
        else:
176 2
            to_update["when"] = new_when
177
178 2
    def update(self, platform):
179 2
        for p in self.parsed:
180 2
            if not isinstance(p, dict):
181
                continue
182 2
            self.update_when_from_rule(p)
183 2
            self.update_tags_from_config(p)
184 2
            self.update_tags_from_rule(platform, p)
185
186 2
    @classmethod
187
    def from_snippet_and_rule(cls, snippet_fname, rule_fname):
188 2
        rule = build_yaml.Rule.from_yaml(rule_fname)
189 2
        result = cls.from_snippet(snippet_fname)
190 2
        result.rule = rule
191 2
        return result
192
193 2
    @classmethod
194
    def from_snippet(cls, snippet_fname):
195 2
        parsed = build_remediations.parse_from_file_without_jinja(snippet_fname)
196 2
        result = cls(parsed.contents, parsed.config)
197 2
        return result
198
199