Test Failed
Push — master ( 578641...18d67a )
by Jan
02:39 queued 12s
created

utils.compare_ds.find_all_oval_defs()   B

Complexity

Conditions 8

Size

Total Lines 28
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 72

Importance

Changes 0
Metric Value
cc 8
eloc 28
nop 1
dl 0
loc 28
ccs 0
cts 28
cp 0
crap 72
rs 7.3333
c 0
b 0
f 0
1
#!/usr/bin/python3
2
3
import argparse
4
import sys
5
import xml.etree.ElementTree as ET
6
import difflib
7
8
import ssg.constants
9
10
ns = {
11
    "ds": ssg.constants.datastream_namespace,
12
    "xccdf": ssg.constants.XCCDF12_NS,
13
    "oval": ssg.constants.oval_namespace,
14
    "catalog": ssg.constants.cat_namespace,
15
    "xlink": ssg.constants.xlink_namespace,
16
}
17
remediation_type_to_uri = {
18
    "bash": ssg.constants.bash_system,
19
    "ansible": ssg.constants.ansible_system,
20
    "puppet": ssg.constants.puppet_system,
21
    "anaconda": ssg.constants.anaconda_system,
22
}
23
24
25
def parse_args():
26
    parser = argparse.ArgumentParser(
27
        description="Compares two datastreams with regards to presence of"
28
        "OVAL checks and all remediations")
29
    parser.add_argument(
30
        "old", metavar="OLD_DS_PATH",
31
        help="Path to the old datastream")
32
    parser.add_argument(
33
        "new", metavar="NEW_DS_PATH",
34
        help="Path to the new datastream")
35
    parser.add_argument(
36
        "--rule", metavar="RULE_ID",
37
        help="Compare only the rule specified by given RULE_ID"
38
    )
39
    parser.add_argument(
40
        "--no-diffs", action="store_true",
41
        help="Do not perform detailed comparison of checks and "
42
        "remediations contents."
43
    )
44
    parser.add_argument(
45
        "--only-rules", action="store_true",
46
        help="Print only removals from rule set."
47
    )
48
    return parser.parse_args()
49
50
51
def get_benchmarks(root):
52
    for component in root.findall("ds:component", ns):
53
        for benchmark in component.findall("xccdf:Benchmark", ns):
54
            yield benchmark
55
56
57
def find_benchmark(root, id_):
58
    for component in root.findall("ds:component", ns):
59
        benchmark = component.find("xccdf:Benchmark[@id='%s']" % (id_), ns)
60
        if benchmark is not None:
61
            return benchmark
62
    return None
63
64
65
def find_oval_definition(oval_doc, def_id):
66
    definitions = oval_doc.find("oval:definitions", ns)
67
    definition = definitions.find("oval:definition[@id='%s']" % (def_id), ns)
68
    return definition
69
70
71
def find_oval_test(oval_doc, test_id):
72
    tests = oval_doc.find("oval:tests", ns)
73
    test = tests.find("*[@id='%s']" % (test_id))
74
    return test
75
76
77
def definition_to_elements(definition):
78
    criteria = definition.find("oval:criteria", ns)
79
    elements = []
80
    for child in criteria.iter():  # iter recurses
81
        if child.tag == "{%s}criteria" % (ns["oval"]):
82
            operator = child.get("operator")
83
            elements.append(("criteria", operator))
84
        elif child.tag == "{%s}criterion" % (ns["oval"]):
85
            test_id = child.get("test_ref")
86
            elements.append(("criterion", test_id))
87
        elif child.tag == "{%s}extend_definition" % (ns["oval"]):
88
            extend_def_id = child.get("definition_ref")
89
            elements.append(("extend_definition", extend_def_id))
90
    return elements
91
92
93
def print_offending_elements(elements, sign):
94
    for thing, atrribute in elements:
95
        print("%s %s %s" % (sign, thing, atrribute))
96
97
98
def compare_oval_definitions(
99
        old_oval_def_doc, old_oval_def_id, new_oval_def_doc, new_oval_def_id):
100
    old_def = find_oval_definition(old_oval_def_doc, old_oval_def_id)
101
    new_def = find_oval_definition(new_oval_def_doc, new_oval_def_id)
102
    old_els = definition_to_elements(old_def)
103
    new_els = definition_to_elements(new_def)
104
    for x in old_els.copy():
105
        for y in new_els.copy():
106
            if x[0] == y[0] and x[1] == y[1]:
107
                old_els.remove(x)
108
                new_els.remove(y)
109
                break
110
    if old_els or new_els:
111
        print("OVAL definition %s differs:" % (old_oval_def_id))
112
        print("--- old datastream")
113
        print("+++ new datastream")
114
        print_offending_elements(old_els, "-")
115
        print_offending_elements(new_els, "+")
116
117
118
def compare_ovals(
119
        old_rule, new_rule, old_oval_defs, new_oval_defs, show_diffs):
120
    old_oval_ref = old_rule.find(
121
        "xccdf:check[@system='%s']" % (ssg.constants.oval_namespace), ns)
122
    new_oval_ref = new_rule.find(
123
        "xccdf:check[@system='%s']" % (ssg.constants.oval_namespace), ns)
124
    rule_id = old_rule.get("id")
125
    if (old_oval_ref is None and new_oval_ref is not None):
126
        print("New datastream adds OVAL for rule '%s'." % (rule_id))
127
    elif (old_oval_ref is not None and new_oval_ref is None):
128
        print("New datastream is missing OVAL for rule '%s'." % (rule_id))
129
    elif (old_oval_ref is not None and new_oval_ref is not None):
130
        old_check_content_ref = old_oval_ref.find(
131
            "xccdf:check-content-ref", ns)
132
        new_check_content_ref = new_oval_ref.find(
133
            "xccdf:check-content-ref", ns)
134
        old_oval_def_id = old_check_content_ref.get("name")
135
        new_oval_def_id = new_check_content_ref.get("name")
136
        old_oval_file_name = old_check_content_ref.get("href")
137
        new_oval_file_name = new_check_content_ref.get("href")
138
        if old_oval_file_name != new_oval_file_name:
139
            print(
140
                "OVAL definition file for rule '%s' has changed from "
141
                "'%s' to '%s'." % (
142
                    rule_id, old_oval_file_name, new_oval_file_name)
143
            )
144
        if old_oval_def_id != new_oval_def_id:
145
            print(
146
                "OVAL definition ID for rule '%s' has changed from "
147
                "'%s' to '%s'." % (rule_id, old_oval_def_id, new_oval_def_id)
148
            )
149
        if show_diffs:
150
            try:
151
                old_oval_def_doc = old_oval_defs[old_oval_file_name]
152
            except KeyError:
153
                print(
154
                    "Rule '%s' points to '%s' which isn't a part of the "
155
                    "old datastream" % (rule_id, old_oval_file_name))
156
                return
157
            try:
158
                new_oval_def_doc = new_oval_defs[new_oval_file_name]
159
            except KeyError:
160
                print(
161
                    "Rule '%s' points to '%s' which isn't a part of the "
162
                    "new datastream" % (rule_id, new_oval_file_name))
163
                return
164
            compare_oval_definitions(
165
                old_oval_def_doc, old_oval_def_id, new_oval_def_doc,
166
                new_oval_def_id)
167
168
169
def compare_fix_texts(old_r, new_r):
170
    if old_r != new_r:
171
        diff = "".join(difflib.unified_diff(
172
            old_r.splitlines(keepends=True), new_r.splitlines(keepends=True),
173
            fromfile="old datastream", tofile="new datastream"))
174
        return diff
175
    return None
176
177
178
def compare_fix_elements(
179
        old_fix, new_fix, remediation_type, rule_id, show_diffs):
180
    old_fix_id = old_fix.get("id")
181
    new_fix_id = new_fix.get("id")
182
    if old_fix_id != new_fix_id:
183
        print(
184
            "%s remediation ID for rule '%s' has changed from "
185
            "'%s' to '%s'." % (
186
                remediation_type, rule_id, old_fix_id, new_fix_id)
187
        )
188
    if show_diffs:
189
        old_fix_text = "".join(old_fix.itertext())
190
        new_fix_text = "".join(new_fix.itertext())
191
        diff = compare_fix_texts(old_fix_text, new_fix_text)
192
        if diff:
193
            print("%s remediation for rule '%s' differs:\n%s" % (
194
                remediation_type, rule_id, diff))
195
196
197
def compare_remediations(old_rule, new_rule, remediation_type, show_diffs):
198
    system = remediation_type_to_uri[remediation_type]
199
    old_fix = old_rule.find("xccdf:fix[@system='%s']" % (system), ns)
200
    new_fix = new_rule.find("xccdf:fix[@system='%s']" % (system), ns)
201
    rule_id = old_rule.get("id")
202
    if (old_fix is None and new_fix is not None):
203
        print("New datastream adds %s remediation for rule '%s'." % (
204
            remediation_type, rule_id))
205
    elif (old_fix is not None and new_fix is None):
206
        print("New datastream is missing %s remediation for rule '%s'." % (
207
            remediation_type, rule_id))
208
    elif (old_fix is not None and new_fix is not None):
209
        compare_fix_elements(
210
            old_fix, new_fix, remediation_type, rule_id, show_diffs)
211
212
213
def get_rules_to_compare(benchmark, rule_id):
214
    if rule_id:
215
        if not rule_id.startswith(ssg.constants.OSCAP_RULE):
216
            rule_id = ssg.constants.OSCAP_RULE + rule_id
217
        rules = benchmark.findall(
218
            ".//xccdf:Rule[@id='%s']" % (rule_id), ns)
219
        if len(rules) == 0:
220
            raise ValueError("Can't find rule %s" % (rule_id))
221
    else:
222
        rules = benchmark.findall(".//xccdf:Rule", ns)
223
    return rules
224
225
226
def compare_rules(
227
        old_rule, new_rule, old_oval_defs, new_oval_defs, show_diffs):
228
    compare_ovals(
229
        old_rule, new_rule, old_oval_defs, new_oval_defs, show_diffs)
230
    for remediation_type in remediation_type_to_uri.keys():
231
        compare_remediations(old_rule, new_rule, remediation_type, show_diffs)
232
233
234
def process_benchmarks(
235
        old_benchmark, new_benchmark, old_oval_defs, new_oval_defs,
236
        rule_id, show_diffs, only_rules):
237
    missing_rules = []
238
    try:
239
        rules_in_old_benchmark = get_rules_to_compare(old_benchmark, rule_id)
240
    except ValueError as e:
241
        print(str(e))
242
        return
243
    for old_rule in rules_in_old_benchmark:
244
        rule_id = old_rule.get("id")
245
        new_rule = new_benchmark.find(
246
            ".//xccdf:Rule[@id='%s']" % (rule_id), ns)
247
        if new_rule is None:
248
            missing_rules.append(rule_id)
249
            print("%s is missing in new datastream." % (rule_id))
250
            continue
251
        if only_rules:
252
            continue
253
        compare_rules(
254
            old_rule, new_rule, old_oval_defs, new_oval_defs, show_diffs)
255
256
257
def find_all_oval_defs(root):
258
    component_refs = dict()
259
    for ds in root.findall("ds:data-stream", ns):
260
        checks = ds.find("ds:checks", ns)
261
        for component_ref in checks.findall("ds:component-ref", ns):
262
            component_ref_href = component_ref.get("{%s}href" % (ns["xlink"]))
263
            component_ref_id = component_ref.get("id")
264
            component_refs[component_ref_href] = component_ref_id
265
    uris = dict()
266
    for ds in root.findall("ds:data-stream", ns):
267
        checklists = ds.find("ds:checklists", ns)
268
        catalog = checklists.find(".//catalog:catalog", ns)
269
        for uri in catalog.findall("catalog:uri", ns):
270
            uri_uri = uri.get("uri")
271
            uri_name = uri.get("name")
272
            uris[uri_uri] = uri_name
273
    def_doc_dict = dict()
274
    for component in root.findall("ds:component", ns):
275
        oval_def_doc = component.find("oval:oval_definitions", ns)
276
        if oval_def_doc is not None:
277
            comp_id = component.get("id")
278
            comp_href = "#" + comp_id
279
            try:
280
                filename = uris["#" + component_refs[comp_href]]
281
            except KeyError:
282
                continue
283
            def_doc_dict[filename] = oval_def_doc
284
    return def_doc_dict
285
286
287
def main():
288
    args = parse_args()
289
    old_tree = ET.parse(args.old)
290
    old_root = old_tree.getroot()
291
    new_tree = ET.parse(args.new)
292
    new_root = new_tree.getroot()
293
    old_oval_defs = find_all_oval_defs(old_root)
294
    new_oval_defs = find_all_oval_defs(new_root)
295
    for old_benchmark in get_benchmarks(old_root):
296
        new_benchmark = find_benchmark(new_root, old_benchmark.get("id"))
297
        process_benchmarks(
298
            old_benchmark, new_benchmark, old_oval_defs, new_oval_defs,
299
            args.rule, not args.no_diffs, args.only_rules)
300
    return 0
301
302
303
if __name__ == "__main__":
304
    sys.exit(main())
305