Passed
Push — master ( 42467a...9d011f )
by Matěj
03:19 queued 11s
created

utils.compare_ds.parse_args()   A

Complexity

Conditions 1

Size

Total Lines 20
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 16
nop 0
dl 0
loc 20
ccs 0
cts 7
cp 0
crap 2
rs 9.6
c 0
b 0
f 0
1
#!/usr/bin/python3
2
3
import argparse
4
import sys
5
import xml.etree.ElementTree as ET
6
import difflib
7
8
import ssg.constants
9
10
ns = {
11
    "ds": ssg.constants.datastream_namespace,
12
    "xccdf": ssg.constants.XCCDF12_NS,
13
    "oval": ssg.constants.oval_namespace,
14
    "catalog": ssg.constants.cat_namespace,
15
    "xlink": ssg.constants.xlink_namespace,
16
}
17
remediation_type_to_uri = {
18
    "bash": ssg.constants.bash_system,
19
    "ansible": ssg.constants.ansible_system,
20
    "puppet": ssg.constants.puppet_system,
21
    "anaconda": ssg.constants.anaconda_system,
22
}
23
24
25
def parse_args():
26
    parser = argparse.ArgumentParser(
27
        description="Compares two datastreams with regards to presence of"
28
        "OVAL checks and all remediations")
29
    parser.add_argument(
30
        "old", metavar="OLD_DS_PATH",
31
        help="Path to the old datastream")
32
    parser.add_argument(
33
        "new", metavar="NEW_DS_PATH",
34
        help="Path to the new datastream")
35
    parser.add_argument(
36
        "--rule", metavar="RULE_ID",
37
        help="Compare only the rule specified by given RULE_ID"
38
    )
39
    parser.add_argument(
40
        "--no-diffs", action="store_true",
41
        help="Do not perform detailed comparison of checks and "
42
        "remediations contents."
43
    )
44
    return parser.parse_args()
45
46
47
def get_benchmarks(root):
48
    for component in root.findall("ds:component", ns):
49
        for benchmark in component.findall("xccdf:Benchmark", ns):
50
            yield benchmark
51
52
53
def find_benchmark(root, id_):
54
    for component in root.findall("ds:component", ns):
55
        benchmark = component.find("xccdf:Benchmark[@id='%s']" % (id_), ns)
56
        if benchmark is not None:
57
            return benchmark
58
    return None
59
60
61
def find_oval_definition(oval_doc, def_id):
62
    definitions = oval_doc.find("oval:definitions", ns)
63
    definition = definitions.find("oval:definition[@id='%s']" % (def_id), ns)
64
    return definition
65
66
67
def find_oval_test(oval_doc, test_id):
68
    tests = oval_doc.find("oval:tests", ns)
69
    test = tests.find("*[@id='%s']" % (test_id))
70
    return test
71
72
73
def definition_to_elements(definition):
74
    criteria = definition.find("oval:criteria", ns)
75
    elements = []
76
    for child in criteria.iter():  # iter recurses
77
        if child.tag == "{%s}criteria" % (ns["oval"]):
78
            operator = child.get("operator")
79
            elements.append(("criteria", operator))
80
        elif child.tag == "{%s}criterion" % (ns["oval"]):
81
            test_id = child.get("test_ref")
82
            elements.append(("criterion", test_id))
83
        elif child.tag == "{%s}extend_definition" % (ns["oval"]):
84
            extend_def_id = child.get("definition_ref")
85
            elements.append(("extend_definition", extend_def_id))
86
    return elements
87
88
89
def print_offending_elements(elements, sign):
90
    for thing, atrribute in elements:
91
        print("%s %s %s" % (sign, thing, atrribute))
92
93
94
def compare_oval_definitions(
95
        old_oval_def_doc, old_oval_def_id, new_oval_def_doc, new_oval_def_id):
96
    old_def = find_oval_definition(old_oval_def_doc, old_oval_def_id)
97
    new_def = find_oval_definition(new_oval_def_doc, new_oval_def_id)
98
    old_els = definition_to_elements(old_def)
99
    new_els = definition_to_elements(new_def)
100
    for x in old_els.copy():
101
        for y in new_els.copy():
102
            if x[0] == y[0] and x[1] == y[1]:
103
                old_els.remove(x)
104
                new_els.remove(y)
105
                break
106
    if old_els or new_els:
107
        print("OVAL definition %s differs:" % (old_oval_def_id))
108
        print("--- old datastream")
109
        print("+++ new datastream")
110
        print_offending_elements(old_els, "-")
111
        print_offending_elements(new_els, "+")
112
113
114
def compare_ovals(
115
        old_rule, new_rule, old_oval_defs, new_oval_defs, show_diffs):
116
    old_oval_ref = old_rule.find(
117
        "xccdf:check[@system='%s']" % (ssg.constants.oval_namespace), ns)
118
    new_oval_ref = new_rule.find(
119
        "xccdf:check[@system='%s']" % (ssg.constants.oval_namespace), ns)
120
    rule_id = old_rule.get("id")
121
    if (old_oval_ref is None and new_oval_ref is not None):
122
        print("New datastream adds OVAL for rule '%s'." % (rule_id))
123
    elif (old_oval_ref is not None and new_oval_ref is None):
124
        print("New datastream is missing OVAL for rule '%s'." % (rule_id))
125
    elif (old_oval_ref is not None and new_oval_ref is not None):
126
        old_check_content_ref = old_oval_ref.find(
127
            "xccdf:check-content-ref", ns)
128
        new_check_content_ref = new_oval_ref.find(
129
            "xccdf:check-content-ref", ns)
130
        old_oval_def_id = old_check_content_ref.get("name")
131
        new_oval_def_id = new_check_content_ref.get("name")
132
        old_oval_file_name = old_check_content_ref.get("href")
133
        new_oval_file_name = new_check_content_ref.get("href")
134
        if old_oval_file_name != new_oval_file_name:
135
            print(
136
                "OVAL definition file for rule '%s' has changed from "
137
                "'%s' to '%s'." % (
138
                    rule_id, old_oval_file_name, new_oval_file_name)
139
            )
140
        if old_oval_def_id != new_oval_def_id:
141
            print(
142
                "OVAL definition ID for rule '%s' has changed from "
143
                "'%s' to '%s'." % (rule_id, old_oval_def_id, new_oval_def_id)
144
            )
145
        if show_diffs:
146
            try:
147
                old_oval_def_doc = old_oval_defs[old_oval_file_name]
148
            except KeyError:
149
                print(
150
                    "Rule '%s' points to '%s' which isn't a part of the "
151
                    "old datastream" % (rule_id, old_oval_file_name))
152
                return
153
            try:
154
                new_oval_def_doc = new_oval_defs[new_oval_file_name]
155
            except KeyError:
156
                print(
157
                    "Rule '%s' points to '%s' which isn't a part of the "
158
                    "new datastream" % (rule_id, new_oval_file_name))
159
                return
160
            compare_oval_definitions(
161
                old_oval_def_doc, old_oval_def_id, new_oval_def_doc,
162
                new_oval_def_id)
163
164
165
def compare_fix_texts(old_r, new_r):
166
    if old_r != new_r:
167
        diff = "".join(difflib.unified_diff(
168
            old_r.splitlines(keepends=True), new_r.splitlines(keepends=True),
169
            fromfile="old datastream", tofile="new datastream"))
170
        return diff
171
    return None
172
173
174
def compare_fix_elements(
175
        old_fix, new_fix, remediation_type, rule_id, show_diffs):
176
    old_fix_id = old_fix.get("id")
177
    new_fix_id = new_fix.get("id")
178
    if old_fix_id != new_fix_id:
179
        print(
180
            "%s remediation ID for rule '%s' has changed from "
181
            "'%s' to '%s'." % (
182
                remediation_type, rule_id, old_fix_id, new_fix_id)
183
        )
184
    if show_diffs:
185
        old_fix_text = "".join(old_fix.itertext())
186
        new_fix_text = "".join(new_fix.itertext())
187
        diff = compare_fix_texts(old_fix_text, new_fix_text)
188
        if diff:
189
            print("%s remediation for rule '%s' differs:\n%s" % (
190
                remediation_type, rule_id, diff))
191
192
193
def compare_remediations(old_rule, new_rule, remediation_type, show_diffs):
194
    system = remediation_type_to_uri[remediation_type]
195
    old_fix = old_rule.find("xccdf:fix[@system='%s']" % (system), ns)
196
    new_fix = new_rule.find("xccdf:fix[@system='%s']" % (system), ns)
197
    rule_id = old_rule.get("id")
198
    if (old_fix is None and new_fix is not None):
199
        print("New datastream adds %s remediation for rule '%s'." % (
200
            remediation_type, rule_id))
201
    elif (old_fix is not None and new_fix is None):
202
        print("New datastream is missing %s remediation for rule '%s'." % (
203
            remediation_type, rule_id))
204
    elif (old_fix is not None and new_fix is not None):
205
        compare_fix_elements(
206
            old_fix, new_fix, remediation_type, rule_id, show_diffs)
207
208
209
def get_rules_to_compare(benchmark, rule_id):
210
    if rule_id:
211
        if not rule_id.startswith(ssg.constants.OSCAP_RULE):
212
            rule_id = ssg.constants.OSCAP_RULE + rule_id
213
        rules = benchmark.findall(
214
            ".//xccdf:Rule[@id='%s']" % (rule_id), ns)
215
        if len(rules) == 0:
216
            raise ValueError("Can't find rule %s" % (rule_id))
217
    else:
218
        rules = benchmark.findall(".//xccdf:Rule", ns)
219
    return rules
220
221
222
def compare_rules(
223
        old_rule, new_rule, old_oval_defs, new_oval_defs, show_diffs):
224
    compare_ovals(
225
        old_rule, new_rule, old_oval_defs, new_oval_defs, show_diffs)
226
    for remediation_type in remediation_type_to_uri.keys():
227
        compare_remediations(old_rule, new_rule, remediation_type, show_diffs)
228
229
230
def process_benchmarks(
231
        old_benchmark, new_benchmark, old_oval_defs, new_oval_defs,
232
        rule_id, show_diffs):
233
    missing_rules = []
234
    try:
235
        rules_in_old_benchmark = get_rules_to_compare(old_benchmark, rule_id)
236
    except ValueError as e:
237
        print(str(e))
238
        return
239
    for old_rule in rules_in_old_benchmark:
240
        rule_id = old_rule.get("id")
241
        new_rule = new_benchmark.find(
242
            ".//xccdf:Rule[@id='%s']" % (rule_id), ns)
243
        if new_rule is None:
244
            missing_rules.append(rule_id)
245
            print("%s is missing in new datastream." % (rule_id))
246
            continue
247
        compare_rules(
248
            old_rule, new_rule, old_oval_defs, new_oval_defs, show_diffs)
249
250
251
def find_all_oval_defs(root):
252
    component_refs = dict()
253
    for ds in root.findall("ds:data-stream", ns):
254
        checks = ds.find("ds:checks", ns)
255
        for component_ref in checks.findall("ds:component-ref", ns):
256
            component_ref_href = component_ref.get("{%s}href" % (ns["xlink"]))
257
            component_ref_id = component_ref.get("id")
258
            component_refs[component_ref_href] = component_ref_id
259
    uris = dict()
260
    for ds in root.findall("ds:data-stream", ns):
261
        checklists = ds.find("ds:checklists", ns)
262
        catalog = checklists.find(".//catalog:catalog", ns)
263
        for uri in catalog.findall("catalog:uri", ns):
264
            uri_uri = uri.get("uri")
265
            uri_name = uri.get("name")
266
            uris[uri_uri] = uri_name
267
    def_doc_dict = dict()
268
    for component in root.findall("ds:component", ns):
269
        oval_def_doc = component.find("oval:oval_definitions", ns)
270
        if oval_def_doc is not None:
271
            comp_id = component.get("id")
272
            comp_href = "#" + comp_id
273
            try:
274
                filename = uris["#" + component_refs[comp_href]]
275
            except KeyError:
276
                continue
277
            def_doc_dict[filename] = oval_def_doc
278
    return def_doc_dict
279
280
281
def main():
282
    args = parse_args()
283
    old_tree = ET.parse(args.old)
284
    old_root = old_tree.getroot()
285
    new_tree = ET.parse(args.new)
286
    new_root = new_tree.getroot()
287
    old_oval_defs = find_all_oval_defs(old_root)
288
    new_oval_defs = find_all_oval_defs(new_root)
289
    for old_benchmark in get_benchmarks(old_root):
290
        new_benchmark = find_benchmark(new_root, old_benchmark.get("id"))
291
        process_benchmarks(
292
            old_benchmark, new_benchmark, old_oval_defs, new_oval_defs,
293
            args.rule, not args.no_diffs)
294
    return 0
295
296
297
if __name__ == "__main__":
298
    sys.exit(main())
299