openscap_report.scap_results_parser.parsers.rule_parser   C
last analyzed

Complexity

Total Complexity 55

Size/Duplication

Total Lines 222
Duplicated Lines 0 %

Test Coverage

Coverage 95.73%

Importance

Changes 0
Metric Value
eloc 182
dl 0
loc 222
ccs 157
cts 164
cp 0.9573
rs 6
c 0
b 0
f 0
wmc 55

21 Methods

Rating   Name   Duplication   Size   Complexity  
B RuleParser._insert_rules_results() 0 23 5
A RuleParser._create_new_multi_check_rule() 0 10 1
A RuleParser._get_remediation_exit_code() 0 7 2
A RuleParser.get_oval_check_href_name() 0 6 2
A RuleParser._get_multi_check() 0 6 3
A RuleParser._add_platforms() 0 7 3
A RuleParser._add_oval_definition_id() 0 3 1
B RuleParser._get_references() 0 14 6
A RuleParser._evaluate_and_set_result() 0 7 5
A RuleParser._get_identifiers() 0 6 2
A RuleParser._add_message_about_oval() 0 5 2
A RuleParser._get_warnings() 0 9 2
A RuleParser._get_check_content_refs_dict() 0 12 3
A RuleParser.set_oval_definition_id_if_is_none() 0 4 2
A RuleParser.process_rule() 0 19 1
A RuleParser._improve_result_of_remedied_rule() 0 14 5
A RuleParser.__init__() 0 8 1
A RuleParser._add_title() 0 5 2
A RuleParser.get_rules() 0 7 2
A RuleParser._get_remediations() 0 8 3
A RuleParser._get_check_engine_result() 0 5 2

How to fix   Complexity   

Complexity

Complex classes like openscap_report.scap_results_parser.parsers.rule_parser often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Copyright 2022, Red Hat, Inc.
2
# SPDX-License-Identifier: LGPL-2.1-or-later
3
4 1
import collections
5
6 1
from openscap_report.dataclasses import replace
7
8 1
from ..data_structures import Identifier, Reference, Rule, RuleWarning
9 1
from ..data_structures.remediation import HIDDEN_REMEDIATION_TYPES
10 1
from ..namespaces import NAMESPACES
11 1
from .full_text_parser import FullTextParser
12 1
from .known_references import KNOWN_REFERENCES, update_references
13 1
from .remediation_parser import RemediationParser
14
15
16 1
class RuleParser():
17 1
    def __init__(self, root, test_results, ref_values):
18 1
        self.root = root
19 1
        self.test_results = test_results
20 1
        self.full_text_parser = FullTextParser(ref_values)
21 1
        self.remediation_parser = RemediationParser(ref_values)
22 1
        self.to_select_rule_ids = set()
23 1
        self.to_deselect_rule_ids = set()
24 1
        update_references(self.root)
25
26 1
    @staticmethod
27 1
    def _get_references(rule):
28 1
        url_to_ref_ids = collections.defaultdict(list)
29 1
        for reference_el in rule.findall(".//xccdf:reference", NAMESPACES):
30 1
            url = reference_el.get("href")
31 1
            if url is None or url == "":
32 1
                url = "UNKNOWN"
33 1
            ref_id = reference_el.text
34 1
            url_to_ref_ids[url].append(ref_id)
35 1
        references = []
36 1
        for url, ref_ids in url_to_ref_ids.items():
37 1
            name = KNOWN_REFERENCES.get(url, url)
38 1
            references.append(Reference(name, url, sorted(ref_ids)))
39 1
        return sorted(references, key=lambda x: x.name)
40
41 1
    @staticmethod
42 1
    def _get_identifiers(rule):
43 1
        identifiers = []
44 1
        for identifier in rule.findall(".//xccdf:ident", NAMESPACES):
45 1
            identifiers.append(Identifier(identifier.get("system"), identifier.text))
46 1
        return identifiers
47
48 1
    def _get_warnings(self, rule):
49 1
        warnings = []
50 1
        for warning in rule.findall(".//xccdf:warning", NAMESPACES):
51 1
            warnings.append(
52
                RuleWarning(
53
                    self.full_text_parser.get_full_warning(warning),
54
                    warning.get("category")
55
                ))
56 1
        return warnings
57
58 1
    def _get_remediations(self, rule):
59 1
        output = []
60 1
        for fix in rule.findall(".//xccdf:fix", NAMESPACES):
61 1
            remediation = self.remediation_parser.get_remediation(fix)
62 1
            if remediation.system in HIDDEN_REMEDIATION_TYPES:
63 1
                continue
64 1
            output.append(remediation)
65 1
        return output
66
67 1
    @staticmethod
68 1
    def _get_multi_check(rule):
69 1
        for check in rule.findall(".//xccdf:check", NAMESPACES):
70 1
            if check.get("multi-check") == "true":
71 1
                return True
72 1
        return False
73
74 1
    @staticmethod
75 1
    def _get_check_content_refs_dict(rule):
76 1
        check_content_refs = rule.findall(".//xccdf:check-content-ref", NAMESPACES)
77 1
        check_content_refs_dict = {}
78 1
        if check_content_refs is None:
79
            return check_content_refs_dict
80
81 1
        for check_ref in check_content_refs:
82 1
            name = check_ref.get("name", "")
83 1
            id_check = name[:name.find(":")]
84 1
            check_content_refs_dict[id_check] = name
85 1
        return check_content_refs_dict
86
87 1
    def process_rule(self, rule):
88 1
        rule_id = rule.get("id")
89
90 1
        rule_dict = {
91
            "rule_id": rule_id,
92
            "severity": rule.get("severity", "Unknown"),
93
            "description": self.full_text_parser.get_full_description_of_rule(rule),
94
            "references": self._get_references(rule),
95
            "identifiers": self._get_identifiers(rule),
96
            "warnings": self._get_warnings(rule),
97
            "remediations": self._get_remediations(rule),
98
            "multi_check": self._get_multi_check(rule),
99
            "rationale": self.full_text_parser.get_full_rationale(rule),
100
        }
101
102 1
        self._add_title(rule_dict, rule)
103 1
        self._add_platforms(rule_dict, rule)
104 1
        self._add_oval_definition_id(rule_dict, rule)
105 1
        return Rule(**rule_dict)
106
107 1
    @staticmethod
108 1
    def _add_title(rule_dict, rule):
109 1
        title = rule.find(".//xccdf:title", NAMESPACES)
110 1
        if title is not None:
111 1
            rule_dict["title"] = title.text
112
113 1
    @staticmethod
114 1
    def _add_platforms(rule_dict, rule):
115 1
        platforms = rule.findall(".//xccdf:platform", NAMESPACES)
116 1
        rule_dict["platforms"] = []
117 1
        if platforms is not None:
118 1
            for platform in platforms:
119 1
                rule_dict["platforms"].append(platform.get("idref"))
120
121 1
    def _add_oval_definition_id(self, rule_dict, rule):
122 1
        check_content_refs_dict = self._get_check_content_refs_dict(rule)
123 1
        rule_dict["oval_definition_id"] = check_content_refs_dict.get("oval")
124
125 1
    @staticmethod
126 1
    def _get_remediation_exit_code(message):
127 1
        remediation_exit_code_prefix = "Fix execution completed and returned:"
128 1
        if message.startswith(remediation_exit_code_prefix):
129
            exit_code = message.replace(remediation_exit_code_prefix, "")
130
            return int(exit_code)
131 1
        return None
132
133 1
    @staticmethod
134 1
    def _get_check_engine_result(message):
135 1
        check_engine_result_prefix = "Checking engine returns:"
136 1
        index = message.find(check_engine_result_prefix)
137 1
        return message[index:] if index > -1 else None
138
139 1
    @staticmethod
140 1
    def _evaluate_and_set_result(rule_id, rules, remediation_exit_code, check_engine_result):
141 1
        if check_engine_result is not None and "fail" in check_engine_result:
142
            rules[rule_id].result = "fix unsuccessful"
143
144 1
        if remediation_exit_code is not None and remediation_exit_code > 0:
145
            rules[rule_id].result = "fix failed"
146
147 1
    def _improve_result_of_remedied_rule(self, rule_id, rules):
148 1
        if not rules[rule_id].messages:
149 1
            return
150 1
        remediation_exit_code = None
151 1
        check_engine_result = None
152
153 1
        for message in rules[rule_id].messages:
154 1
            if remediation_exit_code is None:
155 1
                remediation_exit_code = self._get_remediation_exit_code(message)
156
157 1
            if check_engine_result is None:
158 1
                check_engine_result = self._get_check_engine_result(message)
159
160 1
        self._evaluate_and_set_result(rule_id, rules, remediation_exit_code, check_engine_result)
161
162 1
    @staticmethod
163 1
    def _add_message_about_oval(rule_id, rules):
164 1
        if "fix" in rules[rule_id].result:
165
            msg = "The OVAL graph of the rule as it was displayed before the fix was performed."
166
            rules[rule_id].messages.append(msg)
167
168 1
    @staticmethod
169 1
    def set_oval_definition_id_if_is_none(rule, check_name):
170 1
        if rule.oval_definition_id is None:
171 1
            rule.oval_definition_id = check_name
172
173 1
    @staticmethod
174 1
    def get_oval_check_href_name(rule_result_el):
175 1
        check_ref = rule_result_el.find('.//xccdf:check/xccdf:check-content-ref', NAMESPACES)
176 1
        if check_ref is None:
177 1
            return None, None
178 1
        return check_ref.get("href").lstrip("#"), check_ref.get("name")
179
180 1
    def _insert_rules_results(self, rules):
181 1
        rules_results = self.test_results.findall('.//xccdf:rule-result', NAMESPACES)
182 1
        for rule_result in rules_results:
183 1
            rule_id = rule_result.get('idref')
184 1
            rules[rule_id].time = rule_result.get('time')
185 1
            rules[rule_id].result = rule_result.find('.//xccdf:result', NAMESPACES).text
186 1
            rules[rule_id].weight = float(rule_result.get('weight'))
187
188 1
            rules[rule_id].oval_reference, check_name = self.get_oval_check_href_name(
189
                rule_result
190
            )
191 1
            self.set_oval_definition_id_if_is_none(rules[rule_id], check_name)
192
193 1
            messages = rule_result.findall('.//xccdf:message', NAMESPACES)
194 1
            if messages is not None:
195 1
                rules[rule_id].messages = []
196 1
                for message in messages:
197 1
                    rules[rule_id].messages.append(message.text)
198 1
                self._improve_result_of_remedied_rule(rule_id, rules)
199 1
            self._add_message_about_oval(rule_id, rules)
200
201 1
            if rules[rule_id].multi_check:
202 1
                self._create_new_multi_check_rule(rules, rule_id, check_name)
203
204 1
    def _create_new_multi_check_rule(self, rules, rule_id, check_name):
205 1
        self.to_deselect_rule_ids.add(rule_id)
206 1
        new_rule_id = f"{rule_id}-{check_name}"
207 1
        changes = {
208
            "rule_id": new_rule_id,
209
            "title": f"{rules[rule_id].title} ({check_name})",
210
            "oval_definition_id": check_name,
211
        }
212 1
        rules[new_rule_id] = replace(rules[rule_id], **changes)
213 1
        self.to_select_rule_ids.add(new_rule_id)
214
215 1
    def get_rules(self):
216 1
        rules = {}
217 1
        for rule_el in self.root.findall(".//xccdf:Rule", NAMESPACES):
218 1
            rule = self.process_rule(rule_el)
219 1
            rules[rule.rule_id] = rule
220 1
        self._insert_rules_results(rules)
221
        return rules
222