extract_ansible_tasks()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 5
nop 1
dl 0
loc 5
ccs 0
cts 5
cp 0
crap 2
rs 10
c 0
b 0
f 0
1
#!/usr/bin/python3
2
3
import argparse
4
import collections
5
import os
6
import re
7
import xml.etree.ElementTree as ET
8
9
import ssg.ansible
10
from ssg.constants import (
11
    ansible_system,
12
    bash_system,
13
    datastream_namespace,
14
    OSCAP_PROFILE,
15
    OSCAP_RULE,
16
    XCCDF12_NS,
17
)
18
19
20
DEFAULT_SELECTOR = "__DEFAULT"
21
HASH_ROW = "#" * 79
22
LANGUAGE_TO_SYSTEM = {"ansible": ansible_system, "bash": bash_system}
23
LANGUAGE_TO_TARGET = {"ansible": "playbook", "bash": "script"}
24
LANGUAGE_TO_EXTENSION = {"ansible": "yml", "bash": "sh"}
25
ANSIBLE_VAR_PATTERN = re.compile(
26
    "- name: XCCDF Value [^ ]+ # promote to variable\n  set_fact:\n"
27
    "    ([^:]+): (.+)\n  tags:\n    - always\n")
28
29
30
def parse_args():
31
    parser = argparse.ArgumentParser()
32
    parser.add_argument(
33
        "--data-stream", required=True,
34
        help="Path of a SCAP source data stream file"
35
    )
36
    parser.add_argument(
37
        "--output-dir", required=True,
38
        help="Path of the output directory to save generated files"
39
    )
40
    parser.add_argument(
41
        "--product", required=True,
42
        help="Product ID, eg. 'rhel9'"
43
    )
44
    parser.add_argument(
45
        "--language", required=True, choices=["bash", "ansible"],
46
        help="Remediation language, either 'bash' or 'ansible'"
47
    )
48
    args = parser.parse_args()
49
    return args
50
51
52
def extract_ansible_vars(string):
53
    ansible_vars = {}
54
    for match in ANSIBLE_VAR_PATTERN.finditer(string):
55
        variable_name, value = match.groups()
56
        ansible_vars[variable_name] = value
57
    return ansible_vars
58
59
60
def indent(string, level):
61
    out = ""
62
    for line in string.splitlines():
63
        out += " " * level + line + "\n"
64
    return out
65
66
67
def extract_ansible_tasks(string):
68
    string = ANSIBLE_VAR_PATTERN.sub("\n", string)
69
    string = indent(string, 4)
70
    string = ssg.ansible.remove_trailing_whitespace(string)
71
    return [string]
72
73
74
def comment(text):
75
    commented_text = ""
76
    for line in text.split("\n"):
77
        trimmed_line = line.strip()
78
        if trimmed_line != "":
79
            commented_line = "# " + trimmed_line + "\n"
80
            commented_text += commented_line
81
    return commented_text
82
83
84
def get_selected_rules(profile):
85
    selected_rules = set()
86
    for select in profile.findall("./{%s}select" % XCCDF12_NS):
87
        if select.get("selected") == "true":
88
            id_ = select.get("idref")
89
            if id_.startswith(OSCAP_RULE):
90
                selected_rules.add(id_)
91
    return selected_rules
92
93
94
def get_value_refinenements(profile):
95
    refinements = {}
96
    for refine_value in profile.findall("./{%s}refine-value" % XCCDF12_NS):
97
        value_id = refine_value.get("idref")
98
        selector = refine_value.get("selector")
99
        refinements[value_id] = selector
100
    return refinements
101
102
103
def get_remediation(rule, system):
104
    for fix in rule.findall("./{%s}fix" % XCCDF12_NS):
105
        if fix.get("system") == system:
106
            return fix
107
    return None
108
109
110
def _get_all_itms(benchmark, element_name, callback):
111
    itms = {}
112
    el_xpath = ".//{%s}%s" % (XCCDF12_NS, element_name)
113
    for el in benchmark.findall(el_xpath):
114
        rule_id = el.get("id")
115
        itms[rule_id] = callback(el)
116
    return itms
117
118
119
def get_variable_values(variable):
120
    values = {}
121
    for value in variable.findall("./{%s}value" % (XCCDF12_NS)):
122
        selector = value.get("selector")
123
        if selector is None:
124
            selector = DEFAULT_SELECTOR
125
        if value.text is None:
126
            values["selector"] = ""
127
        else:
128
            values[selector] = value.text
129
    return values
130
131
132
def get_all_variables(benchmark):
133
    return _get_all_itms(benchmark, "Value", get_variable_values)
134
135
136
def expand_variables(fix_el, refinements, variables):
137
    content = fix_el.text[:]
138
    for sub_el in fix_el.findall("./{%s}sub" % XCCDF12_NS):
139
        variable_id = sub_el.get("idref")
140
        values = variables[variable_id]
141
        selector = refinements.get(variable_id, DEFAULT_SELECTOR)
142
        if selector == "default":
143
            return None
144
        if selector in values:
145
            value = values[selector]
146
        else:
147
            value = list(values.values())[0]
148
        content += value
149
        content += sub_el.tail
150
    return content
151
152
153
def ansible_vars_to_string(variables):
154
    var_strings = []
155
    indent = 4 * " "
156
    for k, v in variables.items():
157
        var_string = "%s%s: %s\n" % (indent, k, v)
158
        var_strings.append(var_string)
159
    return "".join(var_strings)
160
161
162
def ansible_tasks_to_string(tasks):
163
    # tasks need to be separated by 2 empty lines
164
    tasks_str = "\n\n".join(tasks)
165
    tasks_str = ssg.ansible.remove_too_many_blank_lines(tasks_str)
166
    return tasks_str
167
168
169
class ScriptGenerator:
170
    def __init__(self, language, product_id, ds_file_path, output_dir):
171
        self.language = language
172
        self.product_id = product_id
173
        self.ds_file_path = ds_file_path
174
        self.output_dir = output_dir
175
        self.get_benchmark_data()
176
177
    def generate_remediation_scripts(self):
178
        profile_xpath = "./{%s}component/{%s}Benchmark/{%s}Profile" % (
179
            datastream_namespace, XCCDF12_NS, XCCDF12_NS)
180
        for profile in self.ds.findall(profile_xpath):
181
            self.generate_profile_remediation_script(profile)
182
183
    def get_benchmark_data(self):
184
        self.ds = ET.parse(self.ds_file_path)
185
        self.ds_file_name = os.path.basename(self.ds_file_path)
186
        benchmark_xpath = "./{%s}component/{%s}Benchmark" % (
187
            datastream_namespace, XCCDF12_NS)
188
        benchmark = self.ds.find(benchmark_xpath)
189
        self.benchmark_id = benchmark.get("id")
190
        self.benchmark_version = benchmark.find(
191
            "./{%s}version" % XCCDF12_NS).text
192
        self.load_all_remediations(benchmark)
193
        self.variables = get_all_variables(benchmark)
194
195
    def load_all_remediations(self, benchmark):
196
        self.remediations = collections.OrderedDict()
197
        rule_xpath = ".//{%s}Rule" % (XCCDF12_NS)
198
        for rule_el in benchmark.findall(rule_xpath):
199
            rule_id = rule_el.get("id")
200
            system = LANGUAGE_TO_SYSTEM[self.language]
201
            remediation = get_remediation(rule_el, system)
202
            self.remediations[rule_id] = remediation
203
204
    def generate_profile_remediation_script(self, profile_el):
205
        if self.language == "ansible":
206
            output = self.create_output_ansible(profile_el)
207
        elif self.language == "bash":
208
            output = self.create_output_bash(profile_el)
209
        file_path = self.get_output_file_path(profile_el)
210
        with open(file_path, "wb") as f:
211
            f.write(output.encode("utf-8"))
0 ignored issues
show
introduced by
The variable output does not seem to be defined for all execution paths.
Loading history...
212
213
    def get_output_file_path(self, profile_el):
214
        short_profile_id = profile_el.get("id").replace(OSCAP_PROFILE, "")
215
        file_name = "%s-%s-%s.%s" % (
216
            self.product_id, LANGUAGE_TO_TARGET[self.language],
217
            short_profile_id, LANGUAGE_TO_EXTENSION[self.language])
218
        file_path = os.path.join(self.output_dir, file_name)
219
        return file_path
220
221
    def create_output_ansible(self, profile_el):
222
        ansible_vars, ansible_tasks = self.collect_ansible_vars_and_tasks(
223
            profile_el)
224
        header = self.create_header(profile_el)
225
        profile_id = profile_el.get("id")
226
        vars_str = ansible_vars_to_string(ansible_vars)
227
        tasks_str = ansible_tasks_to_string(ansible_tasks)
228
        output = (
229
            "%s\n"
230
            "- name: Ansible Playbook for %s\n"
231
            "  hosts: all\n"
232
            "  vars:\n"
233
            "%s"
234
            "  tasks:\n"
235
            "%s"
236
            % (header, profile_id, vars_str, tasks_str))
237
        return output
238
239
    def collect_ansible_vars_and_tasks(self, profile_el):
240
        selected_rules = get_selected_rules(profile_el)
241
        refinements = get_value_refinenements(profile_el)
242
        all_vars = collections.OrderedDict()
243
        all_tasks = []
244
        for rule_id, fix_el in self.remediations.items():
245
            if rule_id not in selected_rules:
246
                continue
247
            rule_vars, rule_tasks = self.generate_ansible_rule_remediation(
248
                fix_el, refinements)
249
            all_vars.update(rule_vars)
250
            all_tasks.extend(rule_tasks)
251
        return (all_vars, all_tasks)
252
253
    def create_output_bash(self, profile):
254
        output = []
255
        selected_rules = get_selected_rules(profile)
256
        refinements = get_value_refinenements(profile)
257
        header = self.create_header(profile)
258
        output.append(header)
259
        total = len(selected_rules)
260
        current = 1
261
        for rule_id in self.remediations:
262
            if rule_id not in selected_rules:
263
                continue
264
            status = (current, total)
265
            rule_remediation = self.generate_bash_rule_remediation(
266
                rule_id, status, refinements)
267
            output.append(rule_remediation)
268
            current += 1
269
        return "".join(output)
270
271
    def create_header(self, profile):
272
        if self.language == "ansible":
273
            shebang_with_newline = "---\n"
274
            remediation_type = "Ansible Playbook"
275
            how_to_apply = (
276
                "# $ ansible-playbook -i \"localhost,\""
277
                " -c local playbook.yml\n"
278
                "# $ ansible-playbook -i \"192.168.1.155,\" playbook.yml\n"
279
                "# $ ansible-playbook -i inventory.ini playbook.yml\n"
280
            )
281
        elif self.language == "bash":
282
            shebang_with_newline = "#!/usr/bin/env bash\n"
283
            remediation_type = "Bash Remediation Script"
284
            how_to_apply = "# $ sudo ./remediation-script.sh\n"
285
        profile_title = profile.find("./{%s}title" % XCCDF12_NS).text
286
        description = profile.find("./{%s}description" % XCCDF12_NS).text
287
        commented_profile_description = comment(description)
288
        xccdf_version_name = "1.2"
289
        profile_id = profile.get("id")
290
        fix_header = (
291
            "%s"
292
            "%s\n"
293
            "#\n"
294
            "# %s for %s\n"
295
            "#\n"
296
            "# Profile Description:\n"
297
            "%s"
298
            "#\n"
299
            "# Profile ID:  %s\n"
300
            "# Benchmark ID:  %s\n"
301
            "# Benchmark Version:  %s\n"
302
            "# XCCDF Version:  %s\n"
303
            "#\n"
304
            "# This file can be generated by OpenSCAP using:\n"
305
            "# $ oscap xccdf generate fix --profile %s --fix-type %s %s\n"
306
            "#\n"
307
            "# This %s is generated from an XCCDF profile without"
308
            " preliminary evaluation.\n"
309
            "# It attempts to fix every selected rule, even if the system is"
310
            " already compliant.\n"
311
            "#\n"
312
            "# How to apply this %s:\n"
313
            "%s"
314
            "#\n"
315
            "%s\n\n" % (
316
                shebang_with_newline, HASH_ROW, remediation_type,
0 ignored issues
show
introduced by
The variable shebang_with_newline does not seem to be defined for all execution paths.
Loading history...
introduced by
The variable remediation_type does not seem to be defined for all execution paths.
Loading history...
317
                profile_title, commented_profile_description, profile_id,
318
                self.benchmark_id, self.benchmark_version, xccdf_version_name,
319
                profile_id, self.language, self.ds_file_name,
320
                remediation_type, remediation_type, how_to_apply, HASH_ROW))
0 ignored issues
show
introduced by
The variable how_to_apply does not seem to be defined for all execution paths.
Loading history...
321
        return fix_header
322
323
    def generate_bash_rule_remediation(self, rule_id, status, refinements):
324
        current, total = status
325
        fix_el = self.remediations[rule_id]
326
        output = []
327
        header = (
328
            "%s\n"
329
            "# BEGIN fix (%s / %s) for '%s'\n"
330
            "%s\n" % (HASH_ROW, current, total, rule_id, HASH_ROW))
331
        output.append(header)
332
        begin_msg = "(>&2 echo \"Remediating rule %s/%s: '%s'\")\n" % (
333
            current, total, rule_id)
334
        output.append(begin_msg)
335
        expanded_remediation = None
336
        if fix_el is not None:
337
            expanded_remediation = expand_variables(
338
                fix_el, refinements, self.variables)
339
        if expanded_remediation is not None:
340
            output.append(expanded_remediation)
341
        else:
342
            warning = (
343
                "(>&2 echo \"FIX FOR THIS RULE '%s' IS MISSING!\")\n" %
344
                (rule_id))
345
            output.append(warning)
346
        end_msg = "\n# END fix for '%s'\n\n" % (rule_id)
347
        output.append(end_msg)
348
        return "".join(output)
349
350
    def generate_ansible_rule_remediation(self, fix_el, refinements):
351
        rule_vars = {}
352
        tasks = []
353
        expanded_remediation = None
354
        if fix_el is not None:
355
            expanded_remediation = expand_variables(
356
                fix_el, refinements, self.variables)
357
        if expanded_remediation is not None:
358
            rule_vars = extract_ansible_vars(expanded_remediation)
359
            tasks = extract_ansible_tasks(expanded_remediation)
360
        return rule_vars, tasks
361
362
363
def main():
364
    args = parse_args()
365
    if not os.path.exists(args.output_dir):
366
        os.mkdir(args.output_dir)
367
    sg = ScriptGenerator(
368
        args.language, args.product, args.data_stream, args.output_dir)
369
    sg.generate_remediation_scripts()
370
371
372
if __name__ == "__main__":
373
    main()
374