Passed
Pull Request — master (#103)
by Jan
27:19 queued 21:18
created

openscap_report.scap_results_parser.parsers.rule_parser   B

Complexity

Total Complexity 50

Size/Duplication

Total Lines 182
Duplicated Lines 0 %

Test Coverage

Coverage 94.29%

Importance

Changes 0
Metric Value
eloc 150
dl 0
loc 182
ccs 132
cts 140
cp 0.9429
rs 8.4
c 0
b 0
f 0
wmc 50

18 Methods

Rating   Name   Duplication   Size   Complexity  
A RuleParser._get_multi_check() 0 6 3
A RuleParser._get_references() 0 8 3
A RuleParser._get_identifiers() 0 8 3
A RuleParser._get_warnings() 0 7 3
A RuleParser._get_check_content_refs_dict() 0 12 3
A RuleParser.__init__() 0 5 1
A RuleParser._get_remediations() 0 8 3
A RuleParser._insert_rules_results() 0 16 5
A RuleParser._get_remediation_exit_code() 0 7 2
A RuleParser._add_platforms() 0 10 4
A RuleParser._add_oval_definition_id() 0 3 1
A RuleParser._evaluate_and_set_result() 0 7 5
A RuleParser._add_message_about_oval() 0 5 2
A RuleParser.process_rule() 0 19 1
A RuleParser._improve_result_of_remedied_rule() 0 14 5
A RuleParser._add_title() 0 5 2
A RuleParser.get_rules() 0 7 2
A RuleParser._get_check_engine_result() 0 7 2

How to fix   Complexity   

Complexity

Complex classes like openscap_report.scap_results_parser.parsers.rule_parser often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Copyright 2022, Red Hat, Inc.
2
# SPDX-License-Identifier: LGPL-2.1-or-later
3
4 1
from ..data_structures import Identifier, Reference, Rule
5 1
from ..namespaces import NAMESPACES
6 1
from .full_text_parser import FullTextParser
7 1
from .remediation_parser import RemediationParser
8
9
10 1
class RuleParser():
11 1
    def __init__(self, root, test_results, ref_values):
12 1
        self.root = root
13 1
        self.test_results = test_results
14 1
        self.full_text_parser = FullTextParser(ref_values)
15 1
        self.remediation_parser = RemediationParser(ref_values)
16
17 1
    @staticmethod
18 1
    def _get_references(rule):
19 1
        references = []
20 1
        for referenc in rule.findall(".//xccdf:reference", NAMESPACES):
21 1
            references.append(Reference(referenc.get("href"), referenc.text))
22 1
        if references:
23 1
            return references
24 1
        return None
25
26 1
    @staticmethod
27 1
    def _get_identifiers(rule):
28 1
        identifiers = []
29 1
        for identifier in rule.findall(".//xccdf:ident", NAMESPACES):
30 1
            identifiers.append(Identifier(identifier.get("system"), identifier.text))
31 1
        if identifiers:
32 1
            return identifiers
33 1
        return None
34
35 1
    def _get_warnings(self, rule):
36 1
        warnings = []
37 1
        for warning in rule.findall(".//xccdf:warning", NAMESPACES):
38 1
            warnings.append(self.full_text_parser.get_full_warning(warning))
39 1
        if warnings:
40 1
            return warnings
41 1
        return None
42
43 1
    def _get_remediations(self, rule):
44 1
        output = []
45 1
        for fix in rule.findall(".//xccdf:fix", NAMESPACES):
46 1
            remediation = self.remediation_parser.get_remediation(fix)
47 1
            output.append(remediation)
48 1
        if output:
49 1
            return output
50 1
        return None
51
52 1
    @staticmethod
53 1
    def _get_multi_check(rule):
54 1
        for check in rule.findall(".//xccdf:check", NAMESPACES):
55 1
            if check.get("multi-check") == "true":
56 1
                return True
57 1
        return False
58
59 1
    @staticmethod
60 1
    def _get_check_content_refs_dict(rule):
61 1
        check_content_refs = rule.findall(".//xccdf:check-content-ref", NAMESPACES)
62 1
        check_content_refs_dict = {}
63 1
        if check_content_refs is None:
64
            return check_content_refs_dict
65
66 1
        for check_ref in check_content_refs:
67 1
            name = check_ref.get("name", "")
68 1
            id_check = name[:name.find(":")]
69 1
            check_content_refs_dict[id_check] = name
70 1
        return check_content_refs_dict
71
72 1
    def process_rule(self, rule):
73 1
        rule_id = rule.get("id")
74
75 1
        rule_dict = {
76
            "rule_id": rule_id,
77
            "severity": rule.get("severity", "Unknown"),
78
            "description": self.full_text_parser.get_full_description_of_rule(rule),
79
            "references": self._get_references(rule),
80
            "identifiers": self._get_identifiers(rule),
81
            "warnings": self._get_warnings(rule),
82
            "remediations": self._get_remediations(rule),
83
            "multi_check": self._get_multi_check(rule),
84
            "rationale": self.full_text_parser.get_full_rationale(rule),
85
        }
86
87 1
        self._add_title(rule_dict, rule)
88 1
        self._add_platforms(rule_dict, rule)
89 1
        self._add_oval_definition_id(rule_dict, rule)
90 1
        return Rule(**rule_dict)
91
92 1
    @staticmethod
93 1
    def _add_title(rule_dict, rule):
94 1
        title = rule.find(".//xccdf:title", NAMESPACES)
95 1
        if title is not None:
96 1
            rule_dict["title"] = title.text
97
98 1
    @staticmethod
99 1
    def _add_platforms(rule_dict, rule):
100 1
        platforms = rule.findall(".//xccdf:platform", NAMESPACES)
101 1
        array_of_platforms = []
102 1
        if platforms is not None:
103 1
            for platform in platforms:
104 1
                array_of_platforms.append(platform.get("idref"))
105
106 1
        if array_of_platforms:
107 1
            rule_dict["platforms"] = array_of_platforms
108
109 1
    def _add_oval_definition_id(self, rule_dict, rule):
110 1
        check_content_refs_dict = self._get_check_content_refs_dict(rule)
111 1
        rule_dict["oval_definition_id"] = check_content_refs_dict.get("oval", None)
112
113 1
    @staticmethod
114 1
    def _get_remediation_exit_code(message):
115 1
        remediation_exit_code_prefix = "Fix execution completed and returned:"
116 1
        if message.startswith(remediation_exit_code_prefix):
117
            exit_code = message.replace(remediation_exit_code_prefix, "")
118
            return int(exit_code)
119 1
        return None
120
121 1
    @staticmethod
122 1
    def _get_check_engine_result(message):
123 1
        check_engine_result_prefix = "Checking engine returns:"
124 1
        index = message.find(check_engine_result_prefix)
125 1
        if index:
126 1
            return message[index:]
127
        return None
128
129 1
    @staticmethod
130 1
    def _evaluate_and_set_result(rule_id, rules, remediation_exit_code, check_engine_result):
131 1
        if check_engine_result is not None and "fail" in check_engine_result:
132
            rules[rule_id].result = "fix unsuccessful"
133
134 1
        if remediation_exit_code is not None and remediation_exit_code > 0:
135
            rules[rule_id].result = "fix failed"
136
137 1
    def _improve_result_of_remedied_rule(self, rule_id, rules):
138 1
        if not rules[rule_id].messages:
139 1
            return
140 1
        remediation_exit_code = None
141 1
        check_engine_result = None
142
143 1
        for message in rules[rule_id].messages:
144 1
            if remediation_exit_code is None:
145 1
                remediation_exit_code = self._get_remediation_exit_code(message)
146
147 1
            if check_engine_result is None:
148 1
                check_engine_result = self._get_check_engine_result(message)
149
150 1
        self._evaluate_and_set_result(rule_id, rules, remediation_exit_code, check_engine_result)
151
152 1
    @staticmethod
153 1
    def _add_message_about_oval(rule_id, rules):
154 1
        if "fix" in rules[rule_id].result:
155
            msg = "The OVAL graph of the rule as it was displayed before the fix was performed."
156
            rules[rule_id].messages.append(msg)
157
158 1
    def _insert_rules_results(self, rules):
159 1
        rules_results = self.test_results.findall('.//xccdf:rule-result', NAMESPACES)
160 1
        for rule_result in rules_results:
161 1
            rule_id = rule_result.get('idref')
162 1
            rules[rule_id].time = rule_result.get('time')
163 1
            rules[rule_id].result = rule_result.find('.//xccdf:result', NAMESPACES).text
164
165 1
            messages = rule_result.findall('.//xccdf:message', NAMESPACES)
166 1
            if messages is not None:
167 1
                rules[rule_id].messages = []
168 1
                for message in messages:
169 1
                    rules[rule_id].messages.append(message.text)
170 1
                if not rules[rule_id].messages:
171 1
                    rules[rule_id].messages = None
172 1
                self._improve_result_of_remedied_rule(rule_id, rules)
173 1
            self._add_message_about_oval(rule_id, rules)
174
175 1
    def get_rules(self):
176 1
        rules = {}
177 1
        for rule in self.root.findall(".//xccdf:Rule", NAMESPACES):
178 1
            rule = self.process_rule(rule)
179 1
            rules[rule.rule_id] = rule
180 1
        self._insert_rules_results(rules)
181
        return rules
182