ssg.content_diff.StandardContentDiffer.__init__()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 20
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
cc 2
eloc 15
nop 8
dl 0
loc 20
ccs 0
cts 12
cp 0
crap 6
rs 9.65
c 0
b 0
f 0

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
#!/usr/bin/python3
2
3
import difflib
4
import re
5
import sys
6
import xml.etree.ElementTree as ET
7
8
import ssg.xml
9
from ssg.constants import FIX_TYPE_TO_SYSTEM, XCCDF12_NS
10
from ssg.utils import mkdir_p
11
12
13
class StandardContentDiffer(object):
14
15
    def __init__(self, old_content, new_content, rule_id, show_diffs, rule_diffs,
16
                 only_rules, output_dir):
17
        self.old_content = old_content
18
        self.new_content = new_content
19
20
        self.rule_id = rule_id
21
        self.show_diffs = show_diffs
22
        self.rule_diffs = rule_diffs
23
        self.only_rules = only_rules
24
25
        self.context_lines = 3
26
27
        self.check_system_map = {
28
            "OVAL": {"uri": ssg.constants.oval_namespace, "comp_func": self.compare_ovals},
29
            "OCIL": {"uri": ssg.constants.ocil_cs, "comp_func": self.compare_ocils}
30
        }
31
32
        self.output_dir = output_dir
33
        if self.rule_diffs:
34
            self._ensure_output_dir_exists()
35
36
    def _ensure_output_dir_exists(self):
37
        try:
38
            mkdir_p(self.output_dir)
39
        except OSError:
40
            print("Output path '%s' exists and it is not a directory." % self.output_dir)
41
            sys.exit(1)
42
43
    def output_diff(self, identifier, diff, mode="a"):
44
        if not diff:
45
            return
46
47
        if self.rule_diffs:
48
            with open("%s/%s" % (self.output_dir, identifier), mode) as f:
49
                f.write(diff)
50
        else:
51
            print(diff)
52
53
    def _get_rules_to_compare(self, benchmark):
54
        rule_to_find = self.rule_id
55
        if self.rule_id:
56
            if not self.rule_id.startswith(ssg.constants.OSCAP_RULE):
57
                rule_to_find = ssg.constants.OSCAP_RULE + self.rule_id
58
        rules = benchmark.find_rules(rule_to_find)
59
        return rules
60
61
    def compare_rules(self, old_benchmark, new_benchmark):
62
        missing_rules = []
63
        try:
64
            rules_in_old_benchmark = self._get_rules_to_compare(old_benchmark)
65
        except ValueError as e:
66
            print(str(e))
67
            return
68
69
        for old_rule in rules_in_old_benchmark:
70
            rule_id = old_rule.get_attr("id")
71
            new_rule = new_benchmark.find_rule(rule_id)
72
            if new_rule is None:
73
                missing_rules.append(rule_id)
74
                print("%s is missing in new datastream." % (rule_id))
75
                continue
76
            if self.only_rules:
77
                continue
78
            self.compare_rule(old_rule, new_rule, rule_id)
79
            self.compare_platforms(old_rule, new_rule,
80
                                   old_benchmark, new_benchmark, rule_id)
81
82
        if self.rule_diffs:
83
            print("Diff files saved at %s." % self.output_dir)
84
85
    def compare_rule(self, old_rule, new_rule, identifier):
86
        self.compare_rule_texts(old_rule, new_rule, identifier)
87
        self.compare_checks(old_rule, new_rule, "OVAL", identifier)
88
        self.compare_checks(old_rule, new_rule, "OCIL", identifier)
89
        for remediation_type in FIX_TYPE_TO_SYSTEM.keys():
90
            self.compare_remediations(old_rule, new_rule, remediation_type, identifier)
91
92
    def _get_list_of_platforms(self, cpe_platforms, idref):
93
        cpe_list = []
94
        if len(cpe_platforms) == 0:
95
            print("Platform {0} not defined in platform specification".format(idref))
96
            return cpe_list
97
        elif len(cpe_platforms) > 1:
98
            print("Platform {0} defined more than once".format(idref))
99
            for cpe_p in cpe_platforms:
100
                ET.dump(cpe_p)
101
        for cpe_platform in cpe_platforms:
102
            check_fact_refs = cpe_platform.find_all_check_fact_ref_elements()
103
            for check_fact_ref in check_fact_refs:
104
                cpe_list.append(check_fact_ref.get("id-ref"))
105
        return cpe_list
106
107
    def compare_platforms(self, old_rule, new_rule, old_benchmark, new_benchmark, identifier):
108
        entries = [{
109
                "benchmark": old_benchmark,
110
                "rule": old_rule,
111
                "cpe": []
112
            }, {
113
                "benchmark": new_benchmark,
114
                "rule": new_rule,
115
                "cpe": []
116
            }]
117
118
        for entry in entries:
119
            for platform in entry["rule"].get_all_platform_elements():
120
                idref = platform.get("idref")
121
                if idref.startswith("#"):
122
                    cpe_platforms = entry["benchmark"].find_all_cpe_platforms(idref)
123
                    entry["cpe"] += self._get_list_of_platforms(cpe_platforms, idref)
124
                else:
125
                    entry["cpe"].append(idref)
126
127
        if entries[0]["cpe"] != entries[1]["cpe"]:
128
            print("Platform has been changed for rule '{0}'".format(identifier))
129
130
            if self.show_diffs:
131
                diff = self.generate_diff_text("\n".join(entries[0]["cpe"])+"\n",
132
                                               "\n".join(entries[1]["cpe"])+"\n",
133
                                               fromfile=identifier, tofile=identifier)
134
                self.output_diff(identifier, diff)
135
136
    def compare_rule_texts(self, old_rule, new_rule, identifier):
137
        old_rule_text = old_rule.join_text_elements()
138
        new_rule_text = new_rule.join_text_elements()
139
140
        if old_rule_text == new_rule_text:
141
            return
142
143
        if old_rule_text != "":
144
            print(
145
                "New content has different text for rule '%s'." % (identifier))
146
147
        if self.show_diffs:
148
            diff = self.generate_diff_text(old_rule_text, new_rule_text,
149
                                           fromfile=identifier, tofile=identifier,
150
                                           n=self.context_lines)
151
152
            self.output_diff(identifier, diff, mode="w")
153
154
    def compare_check_ids(self, system, identifier, old_check_id, new_check_id):
155
        if old_check_id != new_check_id:
156
            print(
157
                "%s definition ID for rule '%s' has changed from "
158
                "'%s' to '%s'." % (
159
                    system, identifier, old_check_id, new_check_id)
160
            )
161
162
    def compare_check_file_names(self, system, identifier,
163
                                 old_check_file_name, new_check_file_name):
164
        if old_check_file_name != new_check_file_name:
165
            print(
166
                "%s definition file for rule '%s' has changed from "
167
                "'%s' to '%s'." % (
168
                    system, identifier, old_check_file_name, new_check_file_name)
169
            )
170
171
    def get_check_docs(self, system, identifier, old_check_file_name, new_check_file_name):
172
        try:
173
            old_check_doc = self.old_content.components.get(system)[old_check_file_name]
174
        except (KeyError, TypeError):
175
            print(
176
                "Rule '%s' points to '%s' which isn't a part of the "
177
                "old datastream" % (identifier, old_check_file_name))
178
            old_check_doc = None
179
        try:
180
            new_check_doc = self.new_content.components.get(system)[new_check_file_name]
181
        except (KeyError, TypeError):
182
            print(
183
                "Rule '%s' points to '%s' which isn't a part of the "
184
                "new datastream" % (identifier, new_check_file_name))
185
            new_check_doc = None
186
        return old_check_doc, new_check_doc
187
188
    def compare_checks(self, old_rule, new_rule, system, identifier):
189
        check_system_uri = self.check_system_map[system]["uri"]
190
        old_check = old_rule.get_check_element(check_system_uri)
191
        new_check = new_rule.get_check_element(check_system_uri)
192
        if (old_check is None and new_check is not None):
193
            print("New datastream adds %s for rule '%s'." % (system, identifier))
194
        elif (old_check is not None and new_check is None):
195
            print(
196
                "New datastream is missing %s for rule '%s'." % (system, identifier))
197
        elif (old_check is not None and new_check is not None):
198
            old_check_content_ref = old_rule.get_check_content_ref_element(old_check)
199
            new_check_content_ref = new_rule.get_check_content_ref_element(new_check)
200
            old_check_id = old_check_content_ref.get("name")
201
            new_check_id = new_check_content_ref.get("name")
202
            old_check_file_name = old_check_content_ref.get("href")
203
            new_check_file_name = new_check_content_ref.get("href")
204
205
            self.compare_check_ids(system, identifier, old_check_id, new_check_id)
206
            self.compare_check_file_names(system, identifier,
207
                                          old_check_file_name, new_check_file_name)
208
209
            if (self.show_diffs and
210
               identifier != "xccdf_org.ssgproject.content_rule_security_patches_up_to_date"):
211
212
                old_check_doc, new_check_doc = self.get_check_docs(system, identifier,
213
                                                                   old_check_file_name,
214
                                                                   new_check_file_name)
215
                if not old_check_doc or not new_check_doc:
216
                    return
217
218
                self.check_system_map[system]["comp_func"](old_check_doc, old_check_id,
219
                                                           new_check_doc, new_check_id, identifier)
220
221
    def compare_ovals(self, old_oval_def_doc, old_oval_def_id,
222
                      new_oval_def_doc, new_oval_def_id, identifier):
223
        old_def = old_oval_def_doc.find_oval_definition(old_oval_def_id)
224
        new_def = new_oval_def_doc.find_oval_definition(new_oval_def_id)
225
        old_els = old_def.get_elements()
226
        new_els = new_def.get_elements()
227
228
        old_els_text = self.serialize_elements(old_els)
229
        new_els_text = self.serialize_elements(new_els)
230
        diff = self.generate_diff_text(old_els_text, new_els_text,
231
                                       fromfile=old_oval_def_id, tofile=new_oval_def_id)
232
        if diff:
233
            print("OVAL for rule '%s' differs." % (identifier))
234
        self.output_diff(identifier, diff)
235
236
    def compare_ocils(self, old_ocil_doc, old_ocil_id, new_ocil_doc, new_ocil_id, identifier):
237
        try:
238
            old_question = old_ocil_doc.find_boolean_question(old_ocil_id)
239
            new_question = new_ocil_doc.find_boolean_question(new_ocil_id)
240
        except ValueError as e:
241
            print("Rule '%s' OCIL can't be found: %s" % (identifier, str(e)))
242
            return
243
        diff = self.generate_diff_text(old_question, new_question,
244
                                       fromfile=old_ocil_id, tofile=new_ocil_id)
245
        if diff:
246
            print("OCIL for rule '%s' differs." % identifier)
247
        self.output_diff(identifier, diff)
248
249
    def compare_remediations(self, old_rule, new_rule, remediation_type, identifier):
250
        system = FIX_TYPE_TO_SYSTEM[remediation_type]
251
        old_fix = old_rule.get_fix_element(system)
252
        new_fix = new_rule.get_fix_element(system)
253
        if (old_fix is None and new_fix is not None):
254
            print("New datastream adds %s remediation for rule '%s'." % (
255
                remediation_type, identifier))
256
        elif (old_fix is not None and new_fix is None):
257
            print("New datastream is missing %s remediation for rule '%s'." % (
258
                remediation_type, identifier))
259
        elif (old_fix is not None and new_fix is not None):
260
            self._compare_fix_elements(
261
                old_fix, new_fix, remediation_type, identifier)
262
263
    def _compare_fix_elements(self, old_fix, new_fix, remediation_type, identifier):
264
        old_fix_id = old_fix.get("id")
265
        new_fix_id = new_fix.get("id")
266
        if old_fix_id != new_fix_id:
267
            print(
268
                "%s remediation ID for rule '%s' has changed from "
269
                "'%s' to '%s'." % (
270
                    remediation_type, identifier, old_fix_id, new_fix_id)
271
            )
272
        if self.show_diffs:
273
            old_fix_text = "".join(old_fix.itertext())
274
            new_fix_text = "".join(new_fix.itertext())
275
            diff = self.generate_diff_text(old_fix_text, new_fix_text,
276
                                           fromfile=identifier, tofile=identifier)
277
            if diff:
278
                print("%s remediation for rule '%s' differs." % (remediation_type, identifier))
279
            self.output_diff(identifier, diff)
280
281
    def generate_diff_text(self, old_r, new_r,
282
                           fromfile="old datastream", tofile="new datastream", n=3):
283
        if old_r != new_r:
284
            diff = "".join(difflib.unified_diff(
285
                old_r.splitlines(keepends=True), new_r.splitlines(keepends=True),
286
                fromfile=fromfile, tofile=tofile, n=n))
287
            return diff
288
        return None
289
290
    def serialize_elements(self, elements):
291
        text = ""
292
        for thing, atrribute in elements:
293
            text += "%s %s\n" % (thing, atrribute)
294
        return text
295
296
297
class StigContentDiffer(StandardContentDiffer):
298
299
    def __init__(self, old_content, new_content, rule_id, show_diffs, rule_diffs,
300
                 only_rules, output_dir):
301
        super(StigContentDiffer, self).__init__(old_content, new_content, rule_id,
302
                                                show_diffs, rule_diffs, only_rules, output_dir)
303
304
        self.context_lines = 200
305
306
    def _get_stig_id(self, element):
307
        return element.get_version_element().text
308
309
    def get_stig_rule_SV(self, sv_rule_id):
310
        stig_rule_id = re.search(r'(SV-\d+)r\d+_rule', sv_rule_id)
311
        if not stig_rule_id:
312
            print("The rule '%s' doesn't have the usual STIG format: 'SV-XXXXXXrXXXXXX_rule.\n"
313
                  "Please make sure the input contents are DISA STIG Benchmarks." % sv_rule_id)
314
            sys.exit(1)
315
        return stig_rule_id.group(1)
316
317
    def compare_rules(self, old_benchmark, new_benchmark):
318
        missing_rules = []
319
        try:
320
            rules_in_old_benchmark = self._get_rules_to_compare(old_benchmark)
321
            rules_in_new_benchmark = self._get_rules_to_compare(new_benchmark)
322
        except ValueError as e:
323
            print(str(e))
324
            return
325
326
        # The rules in STIG Benchmarks will have their IDS changed whenever there is an update.
327
        # However, only the release number changes, the SV number stays the same.
328
        # This creates map the SV numbers to their equivalent full IDs in the new Benchmark.
329
        new_rule_mapping = {self.get_stig_rule_SV(rule.get_attr("id")): rule.get_attr("id")
330
                            for rule in rules_in_new_benchmark}
331
        old_rule_mapping = {self.get_stig_rule_SV(rule.get_attr("id")): rule.get_attr("id")
332
                            for rule in rules_in_old_benchmark}
333
334
        self.compare_existing_rules(new_benchmark,
335
                                    old_benchmark, rules_in_old_benchmark, new_rule_mapping)
336
337
        self.check_for_new_rules(rules_in_new_benchmark, old_rule_mapping)
338
339
        if self.rule_diffs:
340
            print("Diff files saved at %s." % self.output_dir)
341
342
    def compare_existing_rules(self, new_benchmark, old_benchmark,
343
                               rules_in_old_benchmark, new_rule_mapping):
344
        for old_rule in rules_in_old_benchmark:
345
            old_sv_rule_id = self.get_stig_rule_SV(old_rule.get_attr("id"))
346
            old_stig_id = self._get_stig_id(old_rule)
347
            try:
348
                new_sv_rule_id = new_rule_mapping[old_sv_rule_id]
349
            except KeyError:
350
                missing_rules.append(old_sv_rule_id)
351
                print("%s is missing in new datastream." % old_stig_id)
352
                continue
353
            if self.only_rules:
354
                continue
355
356
            new_rule = new_benchmark.find_rule(new_sv_rule_id)
357
            new_stig_id = self._get_stig_id(new_rule)
358
359
            self.compare_rule(old_rule, new_rule, new_stig_id)
360
            self.compare_platforms(old_rule, new_rule,
361
                                   old_benchmark, new_benchmark, new_stig_id)
362
363
    def check_for_new_rules(self, rules_in_new_benchmark, old_rule_mapping):
364
        # Check for rules added in new content
365
        for new_rule in rules_in_new_benchmark:
366
            new_stig_id = self._get_stig_id(new_rule)
367
            new_sv_rule_id = self.get_stig_rule_SV(new_rule.get_attr("id"))
368
            try:
369
                old_sv_rule_id = old_rule_mapping[new_sv_rule_id]
370
            except KeyError:
371
                print("%s was added in new datastream." % (new_stig_id))
372
373
                # Compare against empty rule so that a diff is generated
374
                empty_rule = ssg.xml.XMLRule(ET.Element("{%s}Rule" % XCCDF12_NS))
375
                self.compare_rule(empty_rule, new_rule, new_stig_id)
376
377