Passed
Pull Request — master (#4216)
by Matěj
02:33 queued 10s
created

ssg.ansible.remove_multiple_blank_lines()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.125

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 5
rs 10
c 0
b 0
f 0
ccs 1
cts 2
cp 0.5
crap 1.125
1
"""
2
Common functions for processing Ansible in SSG
3
"""
4
5 2
from __future__ import absolute_import
6 2
from __future__ import print_function
7
8 2
import re
9
10 2
from .constants import ansible_version_requirement_pre_task_name
11 2
from .constants import min_ansible_version
12
13
14 2
def add_minimum_version(ansible_src):
15
    """
16
    Adds minimum ansible version to an Ansible script
17
    """
18 2
    pre_task = (""" - hosts: all
19
   pre_tasks:
20
     - name: %s
21
       assert:
22
         that: "ansible_version.full is version_compare('%s', '>=')"
23
         msg: >
24
           "You must update Ansible to at least version %s to use this role."
25
          """ % (ansible_version_requirement_pre_task_name,
26
                 min_ansible_version, min_ansible_version))
27
28 2
    if ' - hosts: all' not in ansible_src:
29 2
        return ansible_src
30
31 2
    if 'pre_task' in ansible_src:
32 2
        if 'ansible_version.full is version_compare' in ansible_src:
33 2
            return ansible_src
34
35 2
        raise ValueError(
36
            "A pre_task already exists in ansible_src; failing to process: %s" %
37
            ansible_src)
38
39 2
    return ansible_src.replace(" - hosts: all", pre_task, 1)
40
41
42 2
def remove_multiple_blank_lines(ansible_src):
43
    """
44
    Removes multiple blank lines in an Ansible script
45
    """
46
    return re.sub(r'\n\s*\n', '\n\n', ansible_src)
47
48
49 2
def remove_trailing_whitespace(ansible_src):
50
    """
51
    Removes trailing whitespace in an Ansible script
52
    """
53
    return re.sub(r'[ \t]+$', '', ansible_src, 0, flags=re.M)
54
55
56 2
def _strings_to_list(one_or_more_strings):
57
    """
58
    Output a list, that either contains one string, or a list of strings.
59
    In Python, strings can be cast to lists without error, but with unexpected result.
60
    """
61
    if isinstance(one_or_more_strings, str):
62
        return [one_or_more_strings]
63
    else:
64
        return list(one_or_more_strings)
65
66
67 2 View Code Duplication
def update_yaml_list_or_string(current_contents, new_contents):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
68
    result = []
69
    if current_contents:
70
        result += _strings_to_list(current_contents)
71
    if new_contents:
72
        result += _strings_to_list(new_contents)
73
    if not result:
74
        result = ""
75
    if len(result) == 1:
76
        result = result[0]
77
    return result
78
79
80 2
class AnsibleRemediation(object):
81
82 2
    def __init__(self, contents, config):
83
        self.contents = contents
84
        self.config = config
85
86
        self.parsed = yaml.ordered_load(contents)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable yaml does not seem to be defined.
Loading history...
87
88
        self.rule = None
89
90 2 View Code Duplication
    def update_tags_from_config(self, to_update):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
91
        tags = to_update.get("tags", [])
92
        if "strategy" in self.config:
93
            tags.append("{0}_strategy".format(self.config["strategy"]))
94
        if "complexity" in self.config:
95
            tags.append("{0}_complexity".format(self.config["complexity"]))
96
        if "disruption" in self.config:
97
            tags.append("{0}_disruption".format(self.config["disruption"]))
98
        if "reboot" in self.config:
99
            if self.config["reboot"] == "true":
100
                reboot_tag = "reboot_required"
101
            else:
102
                reboot_tag = "no_reboot_needed"
103
            tags.append(reboot_tag)
104
        to_update["tags"] = tags
105
106 2
    def update_tags_from_rule(self, platform, to_update):
107
        if not self.rule:
108
            raise RuntimeError("The Ansible snippet has no rule loaded.")
109
110
        tags = to_update.get("tags", [])
111
        tags.insert(0, "{0}_severity".format(self.rule.severity))
112
        tags.insert(0, self.rule.id_)
113
114
        cce_num = self._get_cce(platform)
115
        if cce_num:
116
            tags.append("CCE-{0}".format(cce_num))
117
118
        refs = self.get_references(platform)
119
        tags.extend(refs)
120
        to_update["tags"] = tags
121
122 2
    def _get_cce(self, platform):
123
        our_cce = None
124
        for cce, val in self.rule.identifiers.items():
125
            if cce.endswith(platform):
126
                our_cce = val
127
                break
128
        return our_cce
129
130 2 View Code Duplication
    def get_references(self, platform):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
131
        if not self.rule:
132
            raise RuntimeError("The Ansible snippet has no rule loaded.")
133
        # see xccdf-addremediations.xslt <- shared_constants.xslt <- shared_shorthand2xccdf.xslt
134
        # if you want to know how the map was constructed
135
        platform_id_map = {
136
            "rhel6": "RHEL-06",
137
            "rhel7": "RHEL-07",
138
            "rhel8": "RHEL-08",
139
        }
140
        stig_platform_id = "DISA-STIG-{id}".format(id=platform_id_map.get(platform, None))
141
        ref_prefix_map = {
142
            "nist": "NIST-800-53",
143
            "cui": "NIST-800-171",
144
            "pcidss": "PCI-DSS",
145
            "cjis": "CJIS",
146
            "stigid@{platform}".format(platform=platform): stig_platform_id
147
        }
148
        result = []
149
        for ref_class, prefix in ref_prefix_map.items():
150
            refs = self._get_rule_reference(ref_class)
151
            result.extend(["{prefix}-{value}".format(prefix=prefix, value=v) for v in refs])
152
        return result
153
154 2
    def _get_rule_reference(self, ref_class):
155
        refs = self.rule.references.get(ref_class, "")
156
        if refs:
157
            return refs.split(",")
158
        else:
159
            return []
160
161 2 View Code Duplication
    def update_when_from_rule(self, to_update):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
162
        additional_when = ""
163
        if self.rule.platform == "machine":
164
            additional_when = ('ansible_virtualization_role != "guest" '
165
                               'or ansible_virtualization_type != "docker"')
166
        to_update.setdefault("when", "")
167
        new_when = update_yaml_list_or_string(to_update["when"], additional_when)
168
        if not new_when:
169
            to_update.pop("when")
170
        else:
171
            to_update["when"] = new_when
172
173 2
    def update(self, platform):
174
        for p in self.parsed:
175
            if not isinstance(p, dict):
176
                continue
177
            self.update_when_from_rule(p)
178
            self.update_tags_from_config(p)
179
            self.update_tags_from_rule(platform, p)
180
181 2
    @classmethod
182
    def from_snippet_and_rule(cls, snippet_fname, rule_fname):
183
        rule = build_yaml.Rule.from_yaml(rule_fname)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable build_yaml does not seem to be defined.
Loading history...
184
        result = cls.from_snippet(snippet_fname)
185
        result.rule = rule
186
        return result
187
188 2
    @classmethod
189
    def from_snippet(cls, snippet_fname):
190
        parsed = build_remediations.parse_from_file_without_jinja(snippet_fname)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable build_remediations does not seem to be defined.
Loading history...
191
        result = cls(parsed.contents, parsed.config)
192
        return result
193