Passed
Pull Request — master (#14)
by Jan
03:36
created

oscap_report.scap_results_parser.data_structures   B

Complexity

Total Complexity 45

Size/Duplication

Total Lines 286
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 247
dl 0
loc 286
rs 8.8
c 0
b 0
f 0
wmc 45

13 Methods

Rating   Name   Duplication   Size   Complexity  
B Report.get_severity_of_failed_rules_stats() 0 15 6
A Report.as_dict() 0 18 1
B Report.get_rule_results_stats() 0 21 5
A OvalObject.as_dict() 0 6 1
A OvalObject.get_header_from_object_data() 0 7 4
A OvalObject.filtr_object_data_item() 0 8 4
B Remediation.get_type() 0 12 6
A OvalTest.as_dict() 0 6 1
A OvalNode.log_oval_tree() 0 10 4
A Rule.as_dict() 0 18 1
C OvalObject.filter_permissions() 0 35 9
A OvalNode.as_dict() 0 21 2
A Remediation.as_dict() 0 8 1

How to fix   Complexity   

Complexity

Complex classes like oscap_report.scap_results_parser.data_structures often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import logging
2
from dataclasses import dataclass
3
4
5
@dataclass
6
class Report:  # pylint: disable=R0902
7
    title: str = ""
8
    identity: str = ""
9
    profile_name: str = ""
10
    target: str = ""
11
    cpe_platforms: str = ""
12
    scanner: str = ""
13
    scanner_version: str = ""
14
    benchmark_url: str = ""
15
    benchmark_id: str = ""
16
    benchmark_version: str = ""
17
    start_time: str = ""
18
    end_time: str = ""
19
    test_system: str = ""
20
    score: float = 0.0
21
    score_max: float = 0.0
22
    rules: dict = None
23
24
    def as_dict(self):
25
        return {
26
            "title": self.title,
27
            "profile_name": self.profile_name,
28
            "target": self.target,
29
            "identit": self.identity,
30
            "cpe_platforms": self.cpe_platforms,
31
            "scanner": self.scanner,
32
            "scanner_version": self.scanner_version,
33
            "benchmark_url": self.benchmark_url,
34
            "benchmark_id": self.benchmark_id,
35
            "benchmark_version": self.benchmark_version,
36
            "start_time": self.start_time,
37
            "end_time": self.end_time,
38
            "test_system": self.test_system,
39
            "score": self.score,
40
            "score_max": self.score_max,
41
            "rules": self.rules,
42
        }
43
44
    def get_rule_results_stats(self):
45
        results_stats = {
46
            "fail": len(list(
47
                filter(lambda rule: rule.result.lower() == "fail", self.rules.values()))),
48
            "pass": len(list(
49
                filter(lambda rule: rule.result.lower() == "pass", self.rules.values()))),
50
            "unknown_error": len(list(
51
                filter(lambda rule:
52
                       rule.result.lower() in ("error", "unknown"), self.rules.values()))),
53
        }
54
        not_ignored_rules = len(list(
55
            filter(
56
                lambda rule: rule.result.lower() not in ("notselected", "notapplicable"),
57
                self.rules.values()
58
            )))
59
        percent_per_rule = 100 / not_ignored_rules
60
        results_stats["other"] = not_ignored_rules - results_stats["fail"] - results_stats['pass']
61
        results_stats["fail_percent"] = results_stats["fail"] * percent_per_rule
62
        results_stats["pass_percent"] = results_stats["pass"] * percent_per_rule
63
        results_stats["other_percent"] = results_stats["other"] * percent_per_rule
64
        return results_stats
65
66
    def get_severity_of_failed_rules_stats(self):
67
        failed_rules = list(
68
            filter(lambda rule: rule.result.lower() == "fail", self.rules.values()))
69
        percent_per_rule = 100 / len(failed_rules)
70
        severity_stats = {
71
            "low": sum(map(lambda rule: rule.severity.lower() == "low", failed_rules)),
72
            "medium": sum(map(lambda rule: rule.severity.lower() == "medium", failed_rules)),
73
            "high": sum(map(lambda rule: rule.severity.lower() == "high", failed_rules)),
74
            "unknown": sum(map(lambda rule: rule.severity.lower() == "unknown", failed_rules)),
75
        }
76
        severity_stats["low_percent"] = severity_stats["low"] * percent_per_rule
77
        severity_stats["medium_percent"] = severity_stats["medium"] * percent_per_rule
78
        severity_stats["high_percent"] = severity_stats["high"] * percent_per_rule
79
        severity_stats["unknown_percent"] = severity_stats["unknown"] * percent_per_rule
80
        return severity_stats
81
82
83
@dataclass
84
class OvalObject():
85
    object_id: str = ""
86
    flag: str = ""
87
    object_type: str = ""
88
    object_data: dict = None
89
90
    def as_dict(self):
91
        return {
92
            "object_id": self.object_id,
93
            "flag": self.flag,
94
            "object_type": self.object_type,
95
            "object_data": self.object_data,
96
        }
97
98
    def get_header_from_object_data(self):
99
        out = []
100
        for item in self.object_data:
101
            for key in self.filtr_object_data_item(item):
102
                if key not in out:
103
                    out.append(key)
104
        return out
105
106
    def filtr_object_data_item(self, item):
107
        if self.object_type == "textfilecontent54_object":
108
            if "filepath" in item and "text" in item:
109
                return {
110
                    "filepath": item["filepath"],
111
                    "content": item["text"]
112
                }
113
        return self.filter_permissions(item)
114
115
    @staticmethod
116
    def filter_permissions(item):
117
        permission = {
118
            "uread": None,
119
            "uwrite": None,
120
            "uexec": None,
121
            "gread": None,
122
            "gwrite": None,
123
            "gexec": None,
124
            "oread": None,
125
            "owrite": None,
126
            "oexec": None,
127
        }
128
        out = {}
129
        for key, value in item.items():
130
            if key in permission:
131
                permission[key] = value
132
            else:
133
                out[key] = value
134
        if None in permission.values():
135
            return out
136
        permission_str = "<code>"
137
        for key, value in permission.items():
138
            if value:
139
                if "read" in key:
140
                    permission_str += "r"
141
                elif "write" in key:
142
                    permission_str += "w"
143
                elif "exec" in key:
144
                    permission_str += "x"
145
                else:
146
                    permission_str += "-"
147
        permission_str += "</code>"
148
        out["permission"] = permission_str
149
        return out
150
151
152
@dataclass
153
class OvalTest():
154
    test_id: str = ""
155
    test_type: str = ""
156
    comment: str = ""
157
    oval_object: OvalObject = None
158
159
    def as_dict(self):
160
        return {
161
            "test_id": self.test_id,
162
            "test_type": self.test_type,
163
            "comment": self.comment,
164
            "oval_object": self.oval_object,
165
        }
166
167
168
@dataclass
169
class OvalNode:  # pylint: disable=R0902
170
    node_id: str
171
    node_type: str
172
    value: str
173
    negation: bool = False
174
    comment: str = ""
175
    tag: str = ""
176
    test_result_details: dict = None
177
    children: list = None
178
    test_info: OvalTest = None
179
180
    def as_dict(self):
181
        if not self.children:
182
            return {
183
                'node_id': self.node_id,
184
                'type': self.node_type,
185
                'value': self.value,
186
                'negation': self.negation,
187
                'comment': self.comment,
188
                'tag': self.tag,
189
                'test_result_details': self.test_result_details,
190
                'child': None
191
            }
192
        return {
193
            'node_id': self.node_id,
194
            'type': self.node_type,
195
            'value': self.value,
196
            'negation': self.negation,
197
            'comment': self.comment,
198
            'tag': self.tag,
199
            'test_result_details': self.test_result_details,
200
            'child': [child.as_dict() for child in self.children]
201
        }
202
203
    def log_oval_tree(self, level=0):
204
        out = ""
205
        if self.node_type != "value":
206
            out = "  " * level + self.node_type + " = " + self.value
207
        else:
208
            out = "  " * level + self.node_id + " = " + self.value
209
        logging.info(out)
210
        if self.children is not None:
211
            for child in self.children:
212
                child.log_oval_tree(level + 1)
213
214
215
@dataclass
216
class Rule:  # pylint: disable=R0902
217
    rule_id: str = ""
218
    title: str = ""
219
    result: str = ""
220
    multi_check: bool = False
221
    time: str = ""
222
    severity: str = ""
223
    identifiers: list = None
224
    references: list = None
225
    description: str = ""
226
    rationale: str = ""
227
    warnings: list = None
228
    platform: str = ""
229
    oval_definition_id: str = ""
230
    message: str = ""
231
    remediations: list = None
232
    oval_tree: OvalNode = None
233
234
    def as_dict(self):
235
        return {
236
            "rule_id": self.rule_id,
237
            "title": self.title,
238
            "result": self.result,
239
            "multi_check": self.multi_check,
240
            "time": self.time,
241
            "severity": self.severity,
242
            "identifiers": self.identifiers,
243
            "references": self.references,
244
            "description": self.description,
245
            "rationale": self.rationale,
246
            "warnings": self.warnings,
247
            "platform": self.platform,
248
            "oval_definition_id": self.oval_definition_id,
249
            "message": self.message,
250
            "remediations": self.remediations,
251
            "oval_tree": self.oval_tree,
252
        }
253
254
255
@dataclass
256
class Remediation:
257
    remediation_id: str = ""
258
    system: str = ""
259
    complexity: str = ""
260
    disruption: str = ""
261
    strategy: str = ""
262
    fix: str = ""
263
264
    def as_dict(self):
265
        return {
266
            "remediation_id": self.remediation_id,
267
            "system": self.system,
268
            "complexity": self.complexity,
269
            "disruption": self.disruption,
270
            "strategy": self.strategy,
271
            "fix": self.fix,
272
        }
273
274
    def get_type(self):
275
        if self.system == "urn:xccdf:fix:script:sh":
276
            return "Shell script"
277
        if self.system == "urn:xccdf:fix:script:ansible":
278
            return "Ansible snippet"
279
        if self.system == "urn:xccdf:fix:script:puppet":
280
            return "Puppet snippet"
281
        if self.system == "urn:redhat:anaconda:pre":
282
            return "Anaconda snippet"
283
        if self.system == "urn:xccdf:fix:script:kubernetes":
284
            return "Kubernetes snippet"
285
        return self.system
286