Passed
Push — master ( f2d895...53ef26 )
by Matěj
02:42 queued 10s
created

graph.xml_parser.xml_parser._operator_as_child()   B

Complexity

Conditions 5

Size

Total Lines 24
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 5.0187

Importance

Changes 0
Metric Value
cc 5
eloc 21
nop 3
dl 0
loc 24
ccs 10
cts 11
cp 0.9091
crap 5.0187
rs 8.9093
c 0
b 0
f 0
1
'''
2
    Modules for create node IDs and parsing xml
3
'''
4
5 1
from lxml import etree as ET
6 1
import uuid
7 1
import graph.oval_graph
8
9 1
ns = {
10
    'XMLSchema': 'http://oval.mitre.org/XMLSchema/oval-results-5',
11
    'xccdf': 'http://checklists.nist.gov/xccdf/1.2',
12
    'arf': 'http://scap.nist.gov/schema/asset-reporting-format/1.1',
13
    'oval-definitions': 'http://oval.mitre.org/XMLSchema/oval-definitions-5'
14
}
15
16
17 1
class xml_parser():
18 1
    def __init__(self, src):
19 1
        self.src = src
20 1
        self.tree = ET.parse(self.src)
21 1
        self.root = self.tree.getroot()
22 1
        if not self.validate(
23
                './schemas/arf/1.1/asset-reporting-format_1.1.0.xsd'):
24 1
            raise ValueError("err- This is not arf report file.")
25
26 1
    def validate(self, xsd_path):
27 1
        xmlschema_doc = ET.parse(xsd_path)
28 1
        xmlschema = ET.XMLSchema(xmlschema_doc)
29
30 1
        xml_doc = self.tree
31 1
        result = xmlschema.validate(xml_doc)
32
33 1
        return result
34
35 1
    def get_data(self, href):
36 1
        report_data = None
37 1
        reports = self.root.find('.//arf:reports', ns)
38 1
        for report in reports:
39 1
            if "#" + str(report.get("id")) == href:
40 1
                report_data = report
41
42 1
        trees_data = report_data.find(
43
            ('.//XMLSchema:oval_results/XMLSchema:results/'
44
             'XMLSchema:system/XMLSchema:definitions'), ns)
45 1
        return trees_data
46
47 1
    def get_used_rules(self):
48 1
        rulesResults = self.root.findall(
49
            './/xccdf:TestResult/xccdf:rule-result', ns)
50 1
        rules = []
51 1
        for ruleResult in rulesResults:
52 1
            result = ruleResult.find('.//xccdf:result', ns)
53 1
            if result.text != "notselected":
54 1
                check_content_ref = ruleResult.find(
55
                    './/xccdf:check/xccdf:check-content-ref', ns)
56 1
                if check_content_ref is not None:
57 1
                    rules.append(dict(
58
                        id_rule=ruleResult.get('idref'),
59
                        id_def=check_content_ref.attrib.get('name'),
60
                        href=check_content_ref.attrib.get('href'),
61
                        result=result.text
62
                    ))
63 1
        return rules
64
65 1
    def get_notselected_rules(self):
66 1
        rulesResults = self.root.findall(
67
            './/xccdf:TestResult/xccdf:rule-result', ns)
68 1
        rules = []
69 1
        for ruleResult in rulesResults:
70 1
            result = ruleResult.find('.//xccdf:result', ns)
71 1
            if result.text == "notselected":
72 1
                rules.append(dict(id_rule=ruleResult.get('idref')))
73 1
        return rules
74
75 1
    def parse_data_to_dict(self, rule_id):
76 1
        scan = dict(definitions=[])
77 1
        used_rules = self.get_used_rules()
78 1
        for i in self.get_data(used_rules[0]['href']):
79 1
            scan['definitions'].append(self.build_graph(i))
80 1
        self.insert_comments(scan)
81 1
        definitions = self._fill_extend_definition(scan)
82 1
        for definition in definitions['definitions']:
83 1
            if self.get_def_id_by_rule_id(rule_id) == definition['id']:
84 1
                return dict(rule_id=rule_id, definition=definition)
85
86 1
    def _xml_dict_to_node(self, dict_of_definition):
87 1
        children = []
88 1
        for child in dict_of_definition['node']:
89 1
            if 'operator' in child and 'id':
90 1
                children.append(self._xml_dict_to_node(child))
91
            else:
92 1
                children.append(
93
                    graph.oval_graph.OvalNode(
94
                        child['value_id'],
95
                        'value',
96
                        child['value'],
97
                        child['negate']
98
                    ))
99
100 1
        if 'id' in dict_of_definition:
101 1
            children[0].node_id = dict_of_definition['id']
102 1
            return children[0]
103
        else:
104 1
            return graph.oval_graph.OvalNode(
105
                str(uuid.uuid4()),
106
                'operator',
107
                dict_of_definition['operator'],
108
                dict_of_definition['negate'],
109
                children
110
            )
111
112 1
    def get_def_id_by_rule_id(self, rule_id):
113 1
        used_rules = self.get_used_rules()
114 1
        notselected_rules = self.get_notselected_rules()
115 1
        for rule in notselected_rules:
116 1
            if rule['id_rule'] == rule_id:
117 1
                raise ValueError(
118
                    'err- rule "{}" was not selected, so there are no results.'
119
                    .format(rule_id))
120 1
        for rule in used_rules:
121 1
            if rule['id_rule'] == rule_id:
122 1
                return rule['id_def']
123 1
        raise ValueError('err- 404 rule not found!')
124
125 1
    def get_rule_dict(self, rule_id):
126 1
        return self.parse_data_to_dict(rule_id)
127
128 1
    def xml_dict_of_rule_to_node(self, rule):
129 1
        dict_of_definition = rule['definition']
130 1
        return graph.oval_graph.OvalNode(
131
            rule['rule_id'],
132
            'operator',
133
            'and',
134
            False,
135
            [self._xml_dict_to_node(dict_of_definition)]
136
        )
137
138 1
    def get_oval_graph(self, rule_id=None):
139 1
        return self.xml_dict_of_rule_to_node(self.parse_data_to_dict(rule_id))
140
141 1
    def build_graph(self, tree_data):
142 1
        graph = dict(
143
            id=tree_data.get('definition_id'),
144
            node=[]
145
        )
146 1
        for tree in tree_data:
147 1
            negate_status = False
148 1
            if 'negate' in tree:
149
                negate_status = self._str_to_bool(tree.get('negate'))
150 1
            graph['negate'] = negate_status
151 1
            graph['node'].append(self._build_node(tree))
152 1
        return graph
153
154 1
    def _str_to_bool(self, s):
155 1
        if s == 'true':
156 1
            return True
157 1
        elif s == 'false':
158 1
            return False
159
        else:
160 1
            raise ValueError('err- negation is not bool')
161
162 1
    def _build_node(self, tree):
163 1
        negate_status = False
164 1
        if tree.get('negate') is not None:
165 1
            negate_status = self._str_to_bool(tree.get('negate'))
166
167 1
        node = dict(
168
            operator=tree.get('operator'),
169
            negate=negate_status,
170
            result=tree.get('result'),
171
            comment=None,
172
            node=[]
173
        )
174 1
        for child in tree:
175 1
            if child.get('operator') is not None:
176 1
                node['node'].append(self._build_node(child))
177
            else:
178 1
                negate_status = False
179 1
                if child.get('negate') is not None:
180 1
                    negate_status = self._str_to_bool(child.get('negate'))
181
182 1
                if child.get('definition_ref') is not None:
183 1
                    node['node'].append(
184
                        dict(
185
                            extend_definition=child.get('definition_ref'),
186
                            result=child.get('result'),
187
                            negate=negate_status,
188
                            comment=None
189
                        ))
190
                else:
191 1
                    node['node'].append(
192
                        dict(
193
                            value_id=child.get('test_ref'),
194
                            value=child.get('result'),
195
                            negate=negate_status,
196
                            comment=None
197
                        ))
198 1
        return node
199
200 1
    def _fill_extend_definition(self, scan):
201 1
        out = dict(definitions=[])
202 1
        for definition in scan['definitions']:
203 1
            nodes = []
204 1
            for value in definition['node']:
205 1
                nodes.append(self._operator_as_child(value, scan))
206 1
            out['definitions'].append(dict(id=definition['id'], node=nodes))
207 1
        return out
208
209 1
    def _operator_as_child(self, value, scan):
210 1
        out = dict(
211
            operator=value['operator'],
212
            negate=value['negate'],
213
            result=value['result'],
214
            comment=value['comment'],
215
            node=[]
216
        )
217 1
        for child in value['node']:
218 1
            if 'operator' in child:
219 1
                out['node'].append(self._operator_as_child(child, scan))
220 1
            elif 'extend_definition' in child:
221 1
                out['node'].append(
222
                    self._find_definition_by_id(
223
                        scan,
224
                        child['extend_definition'],
225
                        child['negate'],
226
                        child['comment']
227
                    ))
228 1
            elif 'value_id' in child:
229 1
                out['node'].append(child)
230
            else:
231
                raise ValueError('error - unknown child')
232 1
        return out
233
234 1
    def _find_definition_by_id(self, scan, id, negate_status, comment):
235 1
        for definition in scan['definitions']:
236 1
            if definition['id'] == id:
237 1
                definition['node'][0]['negate'] = negate_status
238 1
                definition['node'][0]['comment'] = comment
239 1
                return self._operator_as_child(definition['node'][0], scan)
240
241 1
    def create_dict_form_criteria(self, criteria):
242 1
        comments = dict(
243
            operator='AND' if criteria.get('operator') is None else criteria.get('operator'),
244
            comment=criteria.get('comment'),
245
            node=[])
246 1
        for criterion in criteria:
247 1
            if 'operator' in criterion:
248
                comments['node'].append(
249
                    self.create_dict_form_criteria(criterion))
250
            else:
251 1
                if 'definition_ref' in criterion:
252
                    comments['node'].append(
253
                        dict(
254
                            extend_definition=criterion.get('definition_ref'),
255
                            comment=criterion.get('comment')
256
                        ))
257
                else:
258 1
                    comments['node'].append(
259
                        dict(
260
                            value_id=criterion.get('test_ref'),
261
                            comment=criterion.get('comment')
262
                        ))
263 1
        return comments
264
265 1
    def prepare_definition_comments(self, oval_definitions):
266 1
        definitions = []
267 1
        for definition in oval_definitions:
268 1
            comment_definition = dict(id=definition.get('id'), node=[])
269 1
            criteria = definition.find('.//oval-definitions:criteria', ns)
270 1
            comment_definition['node'].append(
271
                self.create_dict_form_criteria(criteria))
272 1
            definitions.append(comment_definition)
273 1
        return definitions
274
275 1
    def is_definition_in_array(self, definition_, array):
276 1
        for definition in array:
277 1
            if definition_['id'] == definition['id']:
278 1
                return True
279 1
        return False
280
281 1
    def recursive_help_fill_comments(self, comments, nodes):
282 1
        out = nodes
283 1
        out['comment'] = comments['comment']
284 1
        if 'node' in comments:
285 1
            for node, comment in zip(out['node'], comments['node']):
286 1
                node['comment'] = comment['comment']
287 1
                if 'operator' in node:
288 1
                    self.recursive_help_fill_comments(comment, node)
289 1
        return [out]
290
291 1
    def fill_comment(self, comment_definition, data_definition):
292 1
        comments = comment_definition['node'][0]
293 1
        nodes = data_definition['node'][0]
294 1
        return self.recursive_help_fill_comments(comments, nodes)
295
296 1
    def insert_comments(self, data):
297 1
        oval_def = self.root.findall('.//oval-definitions:definition', ns)
298 1
        comment_definitions = self.prepare_definition_comments(oval_def)
299 1
        clean_comment_definitions = []
300 1
        for definition in comment_definitions:
301 1
            if not self.is_definition_in_array(
302
                    definition, clean_comment_definitions):
303 1
                clean_comment_definitions.append(definition)
304
305 1
        for comment_definition in clean_comment_definitions:
306 1
            for data_definition in data['definitions']:
307 1
                if comment_definition['id'] == data_definition['id']:
308 1
                    data_definition['node'] = self.fill_comment(
309
                        comment_definition, data_definition)
310