StandardContentDiffer.output_diff()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 9
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 0
Metric Value
cc 4
eloc 7
nop 4
dl 0
loc 9
ccs 0
cts 7
cp 0
crap 20
rs 10
c 0
b 0
f 0
1
#!/usr/bin/python3
2
3
import difflib
4
import re
5
import sys
6
import xml.etree.ElementTree as ET
7
8
import ssg.xml
9
from ssg.constants import FIX_TYPE_TO_SYSTEM, XCCDF12_NS
10
from ssg.utils import mkdir_p
11
12
13
class StandardContentDiffer(object):
14
15
    def __init__(self, old_content, new_content, rule_id, show_diffs, rule_diffs,
16
                 only_rules, output_dir):
17
        self.old_content = old_content
18
        self.new_content = new_content
19
20
        self.rule_id = rule_id
21
        self.show_diffs = show_diffs
22
        self.rule_diffs = rule_diffs
23
        self.only_rules = only_rules
24
25
        self.context_lines = 3
26
27
        self.check_system_map = {
28
            "OVAL": {"uri": ssg.constants.oval_namespace, "comp_func": self.compare_ovals},
29
            "OCIL": {"uri": ssg.constants.ocil_cs, "comp_func": self.compare_ocils}
30
        }
31
32
        self.output_dir = output_dir
33
        if self.rule_diffs:
34
            self._ensure_output_dir_exists()
35
36
    def _ensure_output_dir_exists(self):
37
        try:
38
            mkdir_p(self.output_dir)
39
        except OSError:
40
            print("Output path '%s' exists and it is not a directory." % self.output_dir)
41
            sys.exit(1)
42
43
    def output_diff(self, identifier, diff, mode="a"):
44
        if not diff:
45
            return
46
47
        if self.rule_diffs:
48
            with open("%s/%s" % (self.output_dir, identifier), mode) as f:
49
                f.write(diff)
50
        else:
51
            print(diff)
52
53
    def _get_rules_to_compare(self, benchmark):
54
        rule_to_find = self.rule_id
55
        if self.rule_id:
56
            if not self.rule_id.startswith(ssg.constants.OSCAP_RULE):
57
                rule_to_find = ssg.constants.OSCAP_RULE + self.rule_id
58
        rules = benchmark.find_rules(rule_to_find)
59
        return rules
60
61
    def compare_rules(self, old_benchmark, new_benchmark):
62
        missing_rules = []
63
        try:
64
            rules_in_old_benchmark = self._get_rules_to_compare(old_benchmark)
65
        except ValueError as e:
66
            print(str(e))
67
            return
68
69
        for old_rule in rules_in_old_benchmark:
70
            rule_id = old_rule.get_attr("id")
71
            new_rule = new_benchmark.find_rule(rule_id)
72
            if new_rule is None:
73
                missing_rules.append(rule_id)
74
                print("%s is missing in new datastream." % (rule_id))
75
                continue
76
            if self.only_rules:
77
                continue
78
            self.compare_rule(old_rule, new_rule, rule_id)
79
            self.compare_platforms(old_rule, new_rule,
80
                                   old_benchmark, new_benchmark, rule_id)
81
82
        if self.rule_diffs:
83
            print("Diff files saved at %s." % self.output_dir)
84
85
    def compare_rule(self, old_rule, new_rule, identifier):
86
        self.compare_rule_texts(old_rule, new_rule, identifier)
87
        self.compare_checks(old_rule, new_rule, "OVAL", identifier)
88
        self.compare_checks(old_rule, new_rule, "OCIL", identifier)
89
        for remediation_type in FIX_TYPE_TO_SYSTEM.keys():
90
            self.compare_remediations(old_rule, new_rule, remediation_type, identifier)
91
92
    def _get_list_of_platforms(self, cpe_platforms, idref):
93
        cpe_list = []
94
        if len(cpe_platforms) == 0:
95
            print("Platform {0} not defined in platform specification".format(idref))
96
            return cpe_list
97
        elif len(cpe_platforms) > 1:
98
            print("Platform {0} defined more than once".format(idref))
99
            for cpe_p in cpe_platforms:
100
                ET.dump(cpe_p)
101
        for cpe_platform in cpe_platforms:
102
            check_fact_refs = cpe_platform.find_all_check_fact_ref_elements()
103
            for check_fact_ref in check_fact_refs:
104
                cpe_list.append(check_fact_ref.get("id-ref"))
105
        return cpe_list
106
107
    def compare_platforms(self, old_rule, new_rule, old_benchmark, new_benchmark, identifier):
108
        entries = [{
109
                "benchmark": old_benchmark,
110
                "rule": old_rule,
111
                "cpe": []
112
            }, {
113
                "benchmark": new_benchmark,
114
                "rule": new_rule,
115
                "cpe": []
116
            }]
117
118
        for entry in entries:
119
            for platform in entry["rule"].get_all_platform_elements():
120
                idref = platform.get("idref")
121
                if idref.startswith("#"):
122
                    cpe_platforms = entry["benchmark"].find_all_cpe_platforms(idref)
123
                    entry["cpe"] += self._get_list_of_platforms(cpe_platforms, idref)
124
                else:
125
                    entry["cpe"].append(idref)
126
127
        if entries[0]["cpe"] != entries[1]["cpe"]:
128
            print("Platform has been changed for rule '{0}'".format(identifier))
129
130
            if self.show_diffs:
131
                diff = self.generate_diff_text("\n".join(entries[0]["cpe"])+"\n",
132
                                               "\n".join(entries[1]["cpe"])+"\n",
133
                                               fromfile=identifier, tofile=identifier)
134
                self.output_diff(identifier, diff)
135
136
    def compare_rule_texts(self, old_rule, new_rule, identifier):
137
        old_rule_text = old_rule.join_text_elements()
138
        new_rule_text = new_rule.join_text_elements()
139
140
        if old_rule_text == new_rule_text:
141
            return
142
143
        if old_rule_text != "":
144
            print(
145
                "New content has different text for rule '%s'." % (identifier))
146
147
        if self.show_diffs:
148
            diff = self.generate_diff_text(old_rule_text, new_rule_text,
149
                                           fromfile=identifier, tofile=identifier,
150
                                           n=self.context_lines)
151
152
            self.output_diff(identifier, diff, mode="w")
153
154
    def compare_check_ids(self, system, identifier, old_check_id, new_check_id):
155
        if old_check_id != new_check_id:
156
            print(
157
                "%s definition ID for rule '%s' has changed from "
158
                "'%s' to '%s'." % (
159
                    system, identifier, old_check_id, new_check_id)
160
            )
161
162
    def compare_check_file_names(self, system, identifier,
163
                                 old_check_file_name, new_check_file_name):
164
        if old_check_file_name != new_check_file_name:
165
            print(
166
                "%s definition file for rule '%s' has changed from "
167
                "'%s' to '%s'." % (
168
                    system, identifier, old_check_file_name, new_check_file_name)
169
            )
170
171
    def get_check_docs(self, system, identifier, old_check_file_name, new_check_file_name):
172
        try:
173
            old_check_doc = self.old_content.components.get(system)[old_check_file_name]
174
        except (KeyError, TypeError):
175
            print(
176
                "Rule '%s' points to '%s' which isn't a part of the "
177
                "old datastream" % (identifier, old_check_file_name))
178
            old_check_doc = None
179
        try:
180
            new_check_doc = self.new_content.components.get(system)[new_check_file_name]
181
        except (KeyError, TypeError):
182
            print(
183
                "Rule '%s' points to '%s' which isn't a part of the "
184
                "new datastream" % (identifier, new_check_file_name))
185
            new_check_doc = None
186
        return old_check_doc, new_check_doc
187
188
    def compare_checks(self, old_rule, new_rule, system, identifier):
189
        check_system_uri = self.check_system_map[system]["uri"]
190
        old_check = old_rule.get_check_element(check_system_uri)
191
        new_check = new_rule.get_check_element(check_system_uri)
192
        if (old_check is None and new_check is not None):
193
            print("New datastream adds %s for rule '%s'." % (system, identifier))
194
        elif (old_check is not None and new_check is None):
195
            print(
196
                "New datastream is missing %s for rule '%s'." % (system, identifier))
197
        elif (old_check is not None and new_check is not None):
198
            old_check_content_ref = old_rule.get_check_content_ref_element(old_check)
199
            new_check_content_ref = new_rule.get_check_content_ref_element(new_check)
200
            old_check_id = old_check_content_ref.get("name")
201
            new_check_id = new_check_content_ref.get("name")
202
            old_check_file_name = old_check_content_ref.get("href")
203
            new_check_file_name = new_check_content_ref.get("href")
204
205
            self.compare_check_ids(system, identifier, old_check_id, new_check_id)
206
            self.compare_check_file_names(system, identifier,
207
                                          old_check_file_name, new_check_file_name)
208
209
            if (self.show_diffs and
210
               identifier != "xccdf_org.ssgproject.content_rule_security_patches_up_to_date"):
211
212
                old_check_doc, new_check_doc = self.get_check_docs(system, identifier,
213
                                                                   old_check_file_name,
214
                                                                   new_check_file_name)
215
                if not old_check_doc or not new_check_doc:
216
                    return
217
218
                self.check_system_map[system]["comp_func"](old_check_doc, old_check_id,
219
                                                           new_check_doc, new_check_id, identifier)
220
221
    def compare_ovals(self, old_oval_def_doc, old_oval_def_id,
222
                      new_oval_def_doc, new_oval_def_id, identifier):
223
        old_def = old_oval_def_doc.find_oval_definition(old_oval_def_id)
224
        new_def = new_oval_def_doc.find_oval_definition(new_oval_def_id)
225
        old_els = old_def.get_elements()
226
        new_els = new_def.get_elements()
227
228
        old_els_text = self.serialize_elements(old_els)
229
        new_els_text = self.serialize_elements(new_els)
230
        diff = self.generate_diff_text(old_els_text, new_els_text,
231
                                       fromfile=old_oval_def_id, tofile=new_oval_def_id)
232
        if diff:
233
            print("OVAL for rule '%s' differs." % (identifier))
234
        self.output_diff(identifier, diff)
235
236
    def compare_ocils(self, old_ocil_doc, old_ocil_id, new_ocil_doc, new_ocil_id, identifier):
237
        try:
238
            old_question = old_ocil_doc.find_boolean_question(old_ocil_id)
239
            new_question = new_ocil_doc.find_boolean_question(new_ocil_id)
240
        except ValueError as e:
241
            print("Rule '%s' OCIL can't be found: %s" % (identifier, str(e)))
242
            return
243
        diff = self.generate_diff_text(old_question, new_question,
244
                                       fromfile=old_ocil_id, tofile=new_ocil_id)
245
        if diff:
246
            print("OCIL for rule '%s' differs." % identifier)
247
        self.output_diff(identifier, diff)
248
249
    def compare_remediations(self, old_rule, new_rule, remediation_type, identifier):
250
        system = FIX_TYPE_TO_SYSTEM[remediation_type]
251
        old_fix = old_rule.get_fix_element(system)
252
        new_fix = new_rule.get_fix_element(system)
253
        if (old_fix is None and new_fix is not None):
254
            print("New datastream adds %s remediation for rule '%s'." % (
255
                remediation_type, identifier))
256
        elif (old_fix is not None and new_fix is None):
257
            print("New datastream is missing %s remediation for rule '%s'." % (
258
                remediation_type, identifier))
259
        elif (old_fix is not None and new_fix is not None):
260
            self._compare_fix_elements(
261
                old_fix, new_fix, remediation_type, identifier)
262
263
    def _compare_fix_elements(self, old_fix, new_fix, remediation_type, identifier):
264
        old_fix_id = old_fix.get("id")
265
        new_fix_id = new_fix.get("id")
266
        if old_fix_id != new_fix_id:
267
            print(
268
                "%s remediation ID for rule '%s' has changed from "
269
                "'%s' to '%s'." % (
270
                    remediation_type, identifier, old_fix_id, new_fix_id)
271
            )
272
        if self.show_diffs:
273
            old_fix_text = "".join(old_fix.itertext())
274
            new_fix_text = "".join(new_fix.itertext())
275
            diff = self.generate_diff_text(old_fix_text, new_fix_text,
276
                                           fromfile=identifier, tofile=identifier)
277
            if diff:
278
                print("%s remediation for rule '%s' differs." % (remediation_type, identifier))
279
            self.output_diff(identifier, diff)
280
281
    def generate_diff_text(self, old_r, new_r,
282
                           fromfile="old datastream", tofile="new datastream", n=3):
283
        if old_r != new_r:
284
            diff = "".join(difflib.unified_diff(
285
                old_r.splitlines(keepends=True), new_r.splitlines(keepends=True),
286
                fromfile=fromfile, tofile=tofile, n=n))
287
            return diff
288
        return None
289
290
    def serialize_elements(self, elements):
291
        text = ""
292
        for thing, atrribute in elements:
293
            text += "%s %s\n" % (thing, atrribute)
294
        return text
295
296
297
class StigContentDiffer(StandardContentDiffer):
298
299
    def __init__(self, old_content, new_content, rule_id, show_diffs, rule_diffs,
300
                 only_rules, output_dir):
301
        super(StigContentDiffer, self).__init__(old_content, new_content, rule_id,
302
                                                show_diffs, rule_diffs, only_rules, output_dir)
303
304
        self.context_lines = 200
305
306
    def _get_stig_id(self, element):
307
        return element.get_version_element().text
308
309
    def get_stig_rule_SV(self, sv_rule_id):
310
        stig_rule_id = re.search(r'(SV-\d+)r\d+_rule', sv_rule_id)
311
        if not stig_rule_id:
312
            print("The rule '%s' doesn't have the usual STIG format: 'SV-XXXXXXrXXXXXX_rule.\n"
313
                  "Please make sure the input contents are DISA STIG Benchmarks." % sv_rule_id)
314
            sys.exit(1)
315
        return stig_rule_id.group(1)
316
317
    def compare_rules(self, old_benchmark, new_benchmark):
318
        missing_rules = []
319
        try:
320
            rules_in_old_benchmark = self._get_rules_to_compare(old_benchmark)
321
            rules_in_new_benchmark = self._get_rules_to_compare(new_benchmark)
322
        except ValueError as e:
323
            print(str(e))
324
            return
325
326
        # The rules in STIG Benchmarks will have their IDS changed whenever there is an update.
327
        # However, only the release number changes, the SV number stays the same.
328
        # This creates map the SV numbers to their equivalent full IDs in the new Benchmark.
329
        new_rule_mapping = {self.get_stig_rule_SV(rule.get_attr("id")): rule.get_attr("id")
330
                            for rule in rules_in_new_benchmark}
331
        old_rule_mapping = {self.get_stig_rule_SV(rule.get_attr("id")): rule.get_attr("id")
332
                            for rule in rules_in_old_benchmark}
333
334
        self.compare_existing_rules(new_benchmark,
335
                                    old_benchmark, rules_in_old_benchmark, new_rule_mapping)
336
337
        self.check_for_new_rules(rules_in_new_benchmark, old_rule_mapping)
338
339
        if self.rule_diffs:
340
            print("Diff files saved at %s." % self.output_dir)
341
342
    def compare_existing_rules(self, new_benchmark, old_benchmark,
343
                               rules_in_old_benchmark, new_rule_mapping):
344
        for old_rule in rules_in_old_benchmark:
345
            old_sv_rule_id = self.get_stig_rule_SV(old_rule.get_attr("id"))
346
            old_stig_id = self._get_stig_id(old_rule)
347
            try:
348
                new_sv_rule_id = new_rule_mapping[old_sv_rule_id]
349
            except KeyError:
350
                missing_rules.append(old_sv_rule_id)
351
                print("%s is missing in new datastream." % old_stig_id)
352
                continue
353
            if self.only_rules:
354
                continue
355
356
            new_rule = new_benchmark.find_rule(new_sv_rule_id)
357
            new_stig_id = self._get_stig_id(new_rule)
358
359
            self.compare_rule(old_rule, new_rule, new_stig_id)
360
            self.compare_platforms(old_rule, new_rule,
361
                                   old_benchmark, new_benchmark, new_stig_id)
362
363
    def check_for_new_rules(self, rules_in_new_benchmark, old_rule_mapping):
364
        # Check for rules added in new content
365
        for new_rule in rules_in_new_benchmark:
366
            new_stig_id = self._get_stig_id(new_rule)
367
            new_sv_rule_id = self.get_stig_rule_SV(new_rule.get_attr("id"))
368
            try:
369
                old_sv_rule_id = old_rule_mapping[new_sv_rule_id]
370
            except KeyError:
371
                print("%s was added in new datastream." % (new_stig_id))
372
373
                # Compare against empty rule so that a diff is generated
374
                empty_rule = ssg.xml.XMLRule(ET.Element("{%s}Rule" % XCCDF12_NS))
375
                self.compare_rule(empty_rule, new_rule, new_stig_id)
376
377