Passed
Push — master ( 7a880f...a8ac27 )
by
unknown
27:41 queued 21:44
created

openscap_report.scap_results_parser.parsers.rule_parser   B

Complexity

Total Complexity 44

Size/Duplication

Total Lines 167
Duplicated Lines 0 %

Test Coverage

Coverage 94.44%

Importance

Changes 0
Metric Value
eloc 136
dl 0
loc 167
ccs 119
cts 126
cp 0.9444
rs 8.8798
c 0
b 0
f 0
wmc 44

18 Methods

Rating   Name   Duplication   Size   Complexity  
A RuleParser.__init__() 0 5 1
A RuleParser._get_remediation_exit_code() 0 7 2
A RuleParser._get_multi_check() 0 6 3
A RuleParser._add_platforms() 0 7 3
A RuleParser._add_oval_definition_id() 0 3 1
A RuleParser._get_references() 0 6 2
A RuleParser._get_identifiers() 0 6 2
A RuleParser._get_warnings() 0 5 2
A RuleParser._get_check_content_refs_dict() 0 12 3
A RuleParser.process_rule() 0 19 1
A RuleParser._add_title() 0 5 2
A RuleParser._get_remediations() 0 6 2
A RuleParser._insert_rules_results() 0 14 4
A RuleParser._evaluate_and_set_result() 0 7 5
A RuleParser._add_message_about_oval() 0 5 2
A RuleParser._improve_result_of_remedied_rule() 0 14 5
A RuleParser.get_rules() 0 7 2
A RuleParser._get_check_engine_result() 0 5 2

How to fix   Complexity   

Complexity

Complex classes like openscap_report.scap_results_parser.parsers.rule_parser often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Copyright 2022, Red Hat, Inc.
2
# SPDX-License-Identifier: LGPL-2.1-or-later
3
4 1
from ..data_structures import Identifier, Reference, Rule
5 1
from ..namespaces import NAMESPACES
6 1
from .full_text_parser import FullTextParser
7 1
from .remediation_parser import RemediationParser
8
9
10 1
class RuleParser():
11 1
    def __init__(self, root, test_results, ref_values):
12 1
        self.root = root
13 1
        self.test_results = test_results
14 1
        self.full_text_parser = FullTextParser(ref_values)
15 1
        self.remediation_parser = RemediationParser(ref_values)
16
17 1
    @staticmethod
18 1
    def _get_references(rule):
19 1
        references = []
20 1
        for referenc in rule.findall(".//xccdf:reference", NAMESPACES):
21 1
            references.append(Reference(referenc.get("href"), referenc.text))
22 1
        return references
23
24 1
    @staticmethod
25 1
    def _get_identifiers(rule):
26 1
        identifiers = []
27 1
        for identifier in rule.findall(".//xccdf:ident", NAMESPACES):
28 1
            identifiers.append(Identifier(identifier.get("system"), identifier.text))
29 1
        return identifiers
30
31 1
    def _get_warnings(self, rule):
32 1
        warnings = []
33 1
        for warning in rule.findall(".//xccdf:warning", NAMESPACES):
34 1
            warnings.append(self.full_text_parser.get_full_warning(warning))
35 1
        return warnings
36
37 1
    def _get_remediations(self, rule):
38 1
        output = []
39 1
        for fix in rule.findall(".//xccdf:fix", NAMESPACES):
40 1
            remediation = self.remediation_parser.get_remediation(fix)
41 1
            output.append(remediation)
42 1
        return output
43
44 1
    @staticmethod
45 1
    def _get_multi_check(rule):
46 1
        for check in rule.findall(".//xccdf:check", NAMESPACES):
47 1
            if check.get("multi-check") == "true":
48 1
                return True
49 1
        return False
50
51 1
    @staticmethod
52 1
    def _get_check_content_refs_dict(rule):
53 1
        check_content_refs = rule.findall(".//xccdf:check-content-ref", NAMESPACES)
54 1
        check_content_refs_dict = {}
55 1
        if check_content_refs is None:
56
            return check_content_refs_dict
57
58 1
        for check_ref in check_content_refs:
59 1
            name = check_ref.get("name", "")
60 1
            id_check = name[:name.find(":")]
61 1
            check_content_refs_dict[id_check] = name
62 1
        return check_content_refs_dict
63
64 1
    def process_rule(self, rule):
65 1
        rule_id = rule.get("id")
66
67 1
        rule_dict = {
68
            "rule_id": rule_id,
69
            "severity": rule.get("severity", "Unknown"),
70
            "description": self.full_text_parser.get_full_description_of_rule(rule),
71
            "references": self._get_references(rule),
72
            "identifiers": self._get_identifiers(rule),
73
            "warnings": self._get_warnings(rule),
74
            "remediations": self._get_remediations(rule),
75
            "multi_check": self._get_multi_check(rule),
76
            "rationale": self.full_text_parser.get_full_rationale(rule),
77
        }
78
79 1
        self._add_title(rule_dict, rule)
80 1
        self._add_platforms(rule_dict, rule)
81 1
        self._add_oval_definition_id(rule_dict, rule)
82 1
        return Rule(**rule_dict)
83
84 1
    @staticmethod
85 1
    def _add_title(rule_dict, rule):
86 1
        title = rule.find(".//xccdf:title", NAMESPACES)
87 1
        if title is not None:
88 1
            rule_dict["title"] = title.text
89
90 1
    @staticmethod
91 1
    def _add_platforms(rule_dict, rule):
92 1
        platforms = rule.findall(".//xccdf:platform", NAMESPACES)
93 1
        rule_dict["platforms"] = []
94 1
        if platforms is not None:
95 1
            for platform in platforms:
96 1
                rule_dict["platforms"].append(platform.get("idref"))
97
98 1
    def _add_oval_definition_id(self, rule_dict, rule):
99 1
        check_content_refs_dict = self._get_check_content_refs_dict(rule)
100 1
        rule_dict["oval_definition_id"] = check_content_refs_dict.get("oval", "")
101
102 1
    @staticmethod
103 1
    def _get_remediation_exit_code(message):
104 1
        remediation_exit_code_prefix = "Fix execution completed and returned:"
105 1
        if message.startswith(remediation_exit_code_prefix):
106
            exit_code = message.replace(remediation_exit_code_prefix, "")
107
            return int(exit_code)
108 1
        return None
109
110 1
    @staticmethod
111 1
    def _get_check_engine_result(message):
112 1
        check_engine_result_prefix = "Checking engine returns:"
113 1
        index = message.find(check_engine_result_prefix)
114 1
        return message[index:] if index else None
115
116 1
    @staticmethod
117 1
    def _evaluate_and_set_result(rule_id, rules, remediation_exit_code, check_engine_result):
118 1
        if check_engine_result is not None and "fail" in check_engine_result:
119
            rules[rule_id].result = "fix unsuccessful"
120
121 1
        if remediation_exit_code is not None and remediation_exit_code > 0:
122
            rules[rule_id].result = "fix failed"
123
124 1
    def _improve_result_of_remedied_rule(self, rule_id, rules):
125 1
        if not rules[rule_id].messages:
126 1
            return
127 1
        remediation_exit_code = None
128 1
        check_engine_result = None
129
130 1
        for message in rules[rule_id].messages:
131 1
            if remediation_exit_code is None:
132 1
                remediation_exit_code = self._get_remediation_exit_code(message)
133
134 1
            if check_engine_result is None:
135 1
                check_engine_result = self._get_check_engine_result(message)
136
137 1
        self._evaluate_and_set_result(rule_id, rules, remediation_exit_code, check_engine_result)
138
139 1
    @staticmethod
140 1
    def _add_message_about_oval(rule_id, rules):
141 1
        if "fix" in rules[rule_id].result:
142
            msg = "The OVAL graph of the rule as it was displayed before the fix was performed."
143
            rules[rule_id].messages.append(msg)
144
145 1
    def _insert_rules_results(self, rules):
146 1
        rules_results = self.test_results.findall('.//xccdf:rule-result', NAMESPACES)
147 1
        for rule_result in rules_results:
148 1
            rule_id = rule_result.get('idref')
149 1
            rules[rule_id].time = rule_result.get('time')
150 1
            rules[rule_id].result = rule_result.find('.//xccdf:result', NAMESPACES).text
151
152 1
            messages = rule_result.findall('.//xccdf:message', NAMESPACES)
153 1
            if messages is not None:
154 1
                rules[rule_id].messages = []
155 1
                for message in messages:
156 1
                    rules[rule_id].messages.append(message.text)
157 1
                self._improve_result_of_remedied_rule(rule_id, rules)
158 1
            self._add_message_about_oval(rule_id, rules)
159
160 1
    def get_rules(self):
161 1
        rules = {}
162 1
        for rule in self.root.findall(".//xccdf:Rule", NAMESPACES):
163 1
            rule = self.process_rule(rule)
164 1
            rules[rule.rule_id] = rule
165 1
        self._insert_rules_results(rules)
166
        return rules
167