Passed
Pull Request — master (#91)
by Jan
02:10
created

oval_graph.xml_parser.XmlParser._get_object_info()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 5
nop 2
dl 0
loc 5
ccs 4
cts 4
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
'''
2
    Modules for create node IDs and parsing xml
3
'''
4 1
import uuid
5 1
import os
6 1
import sys
7
8 1
from lxml import etree as ET
9
10 1
from .oval_node import OvalNode
11
12 1
ns = {
13
    'XMLSchema': 'http://oval.mitre.org/XMLSchema/oval-results-5',
14
    'xccdf': 'http://checklists.nist.gov/xccdf/1.2',
15
    'arf': 'http://scap.nist.gov/schema/asset-reporting-format/1.1',
16
    'oval-definitions': 'http://oval.mitre.org/XMLSchema/oval-definitions-5',
17
    'scap': 'http://scap.nist.gov/schema/scap/source/1.2',
18
    'oval-characteristics': 'http://oval.mitre.org/XMLSchema/oval-system-characteristics-5',
19
}
20
21
22 1
class XmlParser():
23 1
    def __init__(self, src):
24 1
        self.src = src
25 1
        self.tree = ET.parse(self.src)
26 1
        self.root = self.tree.getroot()
27 1
        if not self.validate(
28
                'schemas/arf/1.1/asset-reporting-format_1.1.0.xsd'):
29 1
            CRED = '\033[91m'
30 1
            CEND = '\033[0m'
31 1
            print(
32
                CRED +
33
                "Warning: This file is not valid arf report." +
34
                CEND,
35
                file=sys.stderr)
36 1
        try:
37 1
            self.used_rules = self._get_used_rules()
38 1
            self.report_data = self._get_report_data(
39
                self.used_rules[0]['href'])
40 1
            self.notselected_rules = self._get_notselected_rules()
41 1
            self.scan_definitions = self._get_scan()
42 1
            self.oval_definitions = self._get_oval_definitions()
43 1
            self.tests = self._get_tests()
44 1
            self.objects = self._get_objects()
45 1
            self.collected_objects = self._get_collected_objects()
46 1
            self.system_data = self._get_system_data()
47 1
            self.tests_info = self._get_tests_info()
48 1
        except BaseException:
49 1
            raise ValueError("err- This is not arf report file.")
50
51 1
    def get_src(self, src):
52 1
        _dir = os.path.dirname(os.path.realpath(__file__))
53 1
        FIXTURE_DIR = os.path.join(_dir, src)
54 1
        return str(FIXTURE_DIR)
55
56 1
    def validate(self, xsd_path):
57 1
        xsd_path = self.get_src(xsd_path)
58 1
        xmlschema_doc = ET.parse(xsd_path)
59 1
        xmlschema = ET.XMLSchema(xmlschema_doc)
60
61 1
        xml_doc = self.tree
62 1
        result = xmlschema.validate(xml_doc)
63
64 1
        return result
65
66 1
    def _get_report_data(self, href):
67 1
        report_data = None
68 1
        reports = self.root.find('.//arf:reports', ns)
69 1
        for report in reports:
70 1
            if "#" + str(report.get("id")) == href:
71 1
                report_data = report
72 1
        return report_data
73
74 1
    def _get_definitions(self):
75 1
        data = self.report_data.find(
76
            ('.//XMLSchema:oval_results/XMLSchema:results/'
77
             'XMLSchema:system/XMLSchema:definitions'), ns)
78 1
        return data
79
80 1
    def _get_oval_definitions(self):
81 1
        data = self.report_data.find(
82
            ('.//XMLSchema:oval_results/oval-definitions:oval_definitions'), ns)
83 1
        return data
84
85 1
    def _get_collected_objects(self):
86 1
        data = self.report_data.find(
87
            ('.//XMLSchema:oval_results/XMLSchema:results/'
88
             'XMLSchema:system/oval-characteristics:oval_system_characteristics'
89
             '/oval-characteristics:collected_objects'), ns)
90 1
        out = {}
91 1
        for item in data:
92 1
            out[item.attrib.get('id')] = item
93 1
        return out
94
95 1
    def _get_system_data(self):
96 1
        data = self.report_data.find(
97
            ('.//XMLSchema:oval_results/XMLSchema:results/'
98
             'XMLSchema:system/oval-characteristics:oval_system_characteristics'
99
             '/oval-characteristics:system_data'), ns)
100 1
        out = {}
101 1
        for item in data:
102 1
            out[item.attrib.get('id')] = item
103 1
        return out
104
105 1
    def _get_tests(self):
106 1
        data = self.oval_definitions.find(
107
            ('.//oval-definitions:tests'), ns)
108 1
        return data
109
110 1
    def _get_objects(self):
111 1
        data = self.oval_definitions.find(
112
            ('.//oval-definitions:objects'), ns)
113 1
        out = {}
114 1
        for item in data:
115 1
            out[item.attrib.get('id')] = item
116 1
        return out
117
118 1
    def _get_used_rules(self):
119 1
        rulesResults = self.root.findall(
120
            './/xccdf:TestResult/xccdf:rule-result', ns)
121 1
        rules = []
122 1
        for ruleResult in rulesResults:
123 1
            result = ruleResult.find('.//xccdf:result', ns)
124 1
            if result.text != "notselected":
125 1
                check_content_ref = ruleResult.find(
126
                    './/xccdf:check/xccdf:check-content-ref', ns)
127 1
                if check_content_ref is not None:
128 1
                    rules.append(dict(
129
                        id_rule=ruleResult.get('idref'),
130
                        id_def=check_content_ref.attrib.get('name'),
131
                        href=check_content_ref.attrib.get('href'),
132
                        result=result.text,
133
                    ))
134 1
        return rules
135
136 1
    def _get_notselected_rules(self):
137 1
        rulesResults = self.root.findall(
138
            './/xccdf:TestResult/xccdf:rule-result', ns)
139 1
        rules = []
140 1
        for ruleResult in rulesResults:
141 1
            result = ruleResult.find('.//xccdf:result', ns)
142 1
            if result.text == "notselected":
143 1
                rules.append(dict(id_rule=ruleResult.get('idref')))
144 1
        return rules
145
146 1
    def _get_scan(self):
147 1
        scan = dict(definitions=[])
148 1
        for i in self._get_definitions():
149 1
            scan['definitions'].append(self.build_graph(i))
150 1
        self.insert_comments(scan)
151 1
        return self._fill_extend_definition(scan)
152
153 1
    def parse_data_to_dict(self, rule_id):
154 1
        for definition in self.scan_definitions['definitions']:
155 1
            if self.get_def_id_by_rule_id(rule_id) == definition['id']:
156 1
                return dict(rule_id=rule_id, definition=definition)
157
158 1
    def _xml_dict_to_node(self, dict_of_definition):
159 1
        children = []
160 1
        for child in dict_of_definition['node']:
161 1
            if 'operator' in child and 'id':
162 1
                children.append(self._xml_dict_to_node(child))
163
            else:
164 1
                children.append(
165
                    OvalNode(
166
                        child['value_id'],
167
                        'value',
168
                        child['value'],
169
                        child['negate'],
170
                        child['comment'],
171
                        child['tag'],
172
                        self.get_info_about_test(child['value_id']),
173
                        None
174
                    ))
175
176 1
        if 'id' in dict_of_definition:
177 1
            children[0].node_id = dict_of_definition['id']
178 1
            return children[0]
179
        else:
180 1
            return OvalNode(
181
                str(uuid.uuid4()),
182
                'operator',
183
                dict_of_definition['operator'],
184
                dict_of_definition['negate'],
185
                dict_of_definition['comment'],
186
                dict_of_definition['tag'],
187
                None,
188
                children,
189
            )
190
191 1
    def get_def_id_by_rule_id(self, rule_id):
192 1
        for rule in self.notselected_rules:
193 1
            if rule['id_rule'] == rule_id:
194 1
                raise ValueError(
195
                    'err- rule "{}" was not selected, so there are no results.'
196
                    .format(rule_id))
197 1
        for rule in self.used_rules:
198 1
            if rule['id_rule'] == rule_id:
199 1
                return rule['id_def']
200 1
        raise ValueError('err- 404 rule not found!')
201
202 1
    def get_rule_dict(self, rule_id):
203 1
        return self.parse_data_to_dict(rule_id)
204
205 1
    def xml_dict_of_rule_to_node(self, rule):
206 1
        dict_of_definition = rule['definition']
207 1
        return OvalNode(
208
            rule['rule_id'],
209
            'operator',
210
            'and',
211
            False,
212
            dict_of_definition['comment'],
213
            "Rule",
214
            None,
215
            [self._xml_dict_to_node(dict_of_definition)],
216
        )
217
218 1
    def get_oval_tree(self, rule_id=None):
219 1
        return self.xml_dict_of_rule_to_node(self.parse_data_to_dict(rule_id))
220
221 1
    def build_graph(self, tree_data):
222 1
        graph = dict(
223
            id=tree_data.get('definition_id'),
224
            node=[],
225
        )
226 1
        for tree in tree_data:
227 1
            negate_status = False
228 1
            if 'negate' in tree:
229
                negate_status = self._str_to_bool(tree.get('negate'))
230 1
            graph['negate'] = negate_status
231 1
            graph['node'].append(self._build_node(tree, "Definition"))
232 1
        return graph
233
234 1
    def _str_to_bool(self, s):
235 1
        if s == 'true':
236 1
            return True
237 1
        elif s == 'false':
238 1
            return False
239
        else:
240 1
            raise ValueError('err- negation is not bool')
241
242 1
    def _build_node(self, tree, tag):
243 1
        negate_status = False
244 1
        if tree.get('negate') is not None:
245 1
            negate_status = self._str_to_bool(tree.get('negate'))
246
247 1
        node = dict(
248
            operator=tree.get('operator'),
249
            negate=negate_status,
250
            result=tree.get('result'),
251
            comment=None,
252
            tag=tag,
253
            node=[],
254
        )
255 1
        for child in tree:
256 1
            if child.get('operator') is not None:
257 1
                node['node'].append(self._build_node(child, "Criteria"))
258
            else:
259 1
                negate_status = False
260 1
                if child.get('negate') is not None:
261 1
                    negate_status = self._str_to_bool(child.get('negate'))
262
263 1
                if child.get('definition_ref') is not None:
264 1
                    node['node'].append(
265
                        dict(
266
                            extend_definition=child.get('definition_ref'),
267
                            result=child.get('result'),
268
                            negate=negate_status,
269
                            comment=None,
270
                            tag="Extend definition",
271
                        ))
272
                else:
273 1
                    node['node'].append(
274
                        dict(
275
                            value_id=child.get('test_ref'),
276
                            value=child.get('result'),
277
                            negate=negate_status,
278
                            comment=None,
279
                            tag="Test",
280
                        ))
281 1
        return node
282
283 1
    def _fill_extend_definition(self, scan):
284 1
        out = dict(definitions=[])
285 1
        for definition in scan['definitions']:
286 1
            nodes = []
287 1
            for value in definition['node']:
288 1
                nodes.append(self._operator_as_child(value, scan))
289 1
            out['definitions'].append(
290
                dict(
291
                    id=definition['id'],
292
                    comment=definition['comment'],
293
                    node=nodes,
294
                ))
295 1
        return out
296
297 1
    def _operator_as_child(self, value, scan):
298 1
        out = dict(
299
            operator=value['operator'],
300
            negate=value['negate'],
301
            result=value['result'],
302
            comment=value['comment'],
303
            tag=value['tag'],
304
            node=[],
305
        )
306 1
        for child in value['node']:
307 1
            if 'operator' in child:
308 1
                out['node'].append(self._operator_as_child(child, scan))
309 1
            elif 'extend_definition' in child:
310 1
                out['node'].append(
311
                    self._find_definition_by_id(
312
                        scan,
313
                        child['extend_definition'],
314
                        child['negate'],
315
                        child['comment'],
316
                        child['tag'],
317
                    ))
318 1
            elif 'value_id' in child:
319 1
                out['node'].append(child)
320
            else:
321
                raise ValueError('error - unknown child')
322 1
        return out
323
324 1
    def _find_definition_by_id(self, scan, id, negate_status, comment, tag):
325 1
        for definition in scan['definitions']:
326 1
            if definition['id'] == id:
327 1
                definition['node'][0]['negate'] = negate_status
328 1
                definition['node'][0]['comment'] = comment
329 1
                definition['node'][0]['tag'] = tag
330 1
                return self._operator_as_child(definition['node'][0], scan)
331
332 1
    def create_dict_form_criteria(self, criteria, description):
333 1
        comments = dict(
334
            operator='AND' if criteria.get('operator') is None else criteria.get('operator'),
335
            comment=description if criteria.get('comment') is None else criteria.get('comment'),
336
            node=[],
337
        )
338 1
        for criterion in criteria:
339 1
            if criterion.get('operator'):
340 1
                comments['node'].append(
341
                    self.create_dict_form_criteria(criterion, None))
342
            else:
343 1
                if criterion.get('definition_ref'):
344 1
                    comments['node'].append(
345
                        dict(
346
                            extend_definition=criterion.get('definition_ref'),
347
                            comment=criterion.get('comment'),
348
                        ))
349
                else:
350 1
                    comments['node'].append(
351
                        dict(
352
                            value_id=criterion.get('test_ref'),
353
                            comment=criterion.get('comment'),
354
                        ))
355 1
        return comments
356
357 1
    def _prepare_definition_comments(self):
358 1
        oval_definitions = self.root.find(
359
            './/arf:report-requests/arf:report-request/'
360
            'arf:content/scap:data-stream-collection/'
361
            'scap:component/oval-definitions:oval_definitions/'
362
            'oval-definitions:definitions', ns)
363 1
        definitions = []
364 1
        for definition in oval_definitions:
365 1
            comment_definition = dict(
366
                id=definition.get('id'), comment=None, node=[])
367 1
            title = definition.find(
368
                './/oval-definitions:metadata/oval-definitions:title', ns)
369 1
            description = definition.find(
370
                './/oval-definitions:metadata/oval-definitions:description', ns)
371 1
            comment_definition['comment'] = title.text
372 1
            criteria = definition.find('.//oval-definitions:criteria', ns)
373 1
            comment_definition['node'].append(
374
                self.create_dict_form_criteria(criteria, description.text))
375 1
            definitions.append(comment_definition)
376 1
        return definitions
377
378 1
    def recursive_help_fill_comments(self, comments, nodes):
379 1
        out = nodes
380 1
        out['comment'] = comments['comment']
381 1
        for node, comment in zip(out['node'], comments['node']):
382 1
            node['comment'] = comment['comment']
383 1
            if 'operator' in node:
384 1
                self.recursive_help_fill_comments(comment, node)
385
386 1
    def fill_comment(self, comment_definition, data_definition):
387 1
        comments = comment_definition['node'][0]
388 1
        nodes = data_definition['node'][0]
389 1
        data_definition['comment'] = comment_definition['comment']
390 1
        self.recursive_help_fill_comments(comments, nodes)
391
392 1
    def insert_comments(self, data):
393 1
        comment_definitions = self._prepare_definition_comments()
394 1
        for data_definition in data['definitions']:
395 1
            for comment_definition in comment_definitions:
396 1
                if comment_definition['id'] == data_definition['id']:
397 1
                    self.fill_comment(comment_definition, data_definition)
398
399 1
    def _get_key_for_element(self, element):
400 1
        return element.tag.split('}')[1] if '}' in element.tag else element.tag
401
402 1
    def _find_item_ref(self, object_):
403 1
        return list(
404
            filter(
405
                None, [
406
                    self._get_item_ref(item) for item in object_]))
407
408 1
    def _get_item_ref(self, item):
409 1
        return item.get('item_ref') if item.get('item_ref') else None
410
411 1
    def _get_unicate_key(self, key):
412 1
        return key + '@' + str(uuid.uuid4())
413
414 1
    def _get_unicate_id_in_dict(self, object_, dict_):
415 1
        if self._get_key_for_element(object_) in dict_:
416 1
            return self._get_unicate_key(self._get_key_for_element(object_))
417
        else:
418 1
            return self._get_key_for_element(object_)
419
420 1
    def _get_collected_objects_info(self, collected_object, object_):
421 1
        out = {}
422 1
        if len(collected_object) == 0:
423 1
            out[self._get_unicate_id_in_dict(object_, out)
424
                ] = self._get_object_items(object_)
425
        else:
426 1
            item_refs = self._find_item_ref(collected_object)
427 1
            if item_refs:
428 1
                for item_id in item_refs:
429 1
                    out[self._get_unicate_id_in_dict(
430
                        object_, out)] = self._get_item(item_id)
431
            else:
432 1
                out[self._get_unicate_id_in_dict(
433
                    object_, out)] = self._get_object_items(object_)
434 1
        return out
435
436 1
    def _xml_element_to_dict(self, object_, collected_object):
437 1
        result = {}
438 1
        if collected_object is not None:
439 1
            result[
440
                collected_object.attrib.get('id')
441
            ] = collected_object.attrib.get('flag')
442 1
            out = {}
443 1
            result.update(
444
                self._get_collected_objects_info(collected_object, object_))
445
        else:
446 1
            result[object_.attrib.get('id')] = "does not exist"
447 1
            result[self._get_unicate_id_in_dict(
448
                object_, result)] = self._get_object_items(object_)
449 1
        return result
450
451 1
    def _get_object_items(self, object_):
452 1
        out = {}
453 1
        for element in object_.iterchildren():
454 1
            if element.text and element.text.strip():
455 1
                out[self._get_unicate_id_in_dict(element, out)] = element.text
456
            else:
457 1
                out[self._get_unicate_id_in_dict(element, out)] = "no value"
458 1
        return out
459
460 1
    def _get_item(self, item_ref):
461 1
        item = self._find_item_by_id(self.system_data, item_ref)
462 1
        out = {}
463 1
        for element in item.iterchildren():
464 1
            if element.text and element.text.strip():
465 1
                out[self._get_unicate_id_in_dict(element, out)] = element.text
466 1
        return out
467
468 1
    def _find_item_by_id(self, items, id):
469 1
        if id in items.keys():
470 1
            return items[id]
471 1
        return None
472
473 1
    def _get_object_info(self, id_object):
474 1
        object_ = self._find_item_by_id(self.objects, id_object)
475 1
        object_collected = self._find_item_by_id(
476
            self.collected_objects, id_object)
477 1
        return self._xml_element_to_dict(object_, object_collected)
478
479 1
    def _get_tests_info(self):
480 1
        out = []
481 1
        for test in self.tests:
482 1
            objects = []
483 1
            for item in test:
484 1
                object_id = item.attrib.get('object_ref')
485 1
                if object_id:
486 1
                    objects.append(self._get_object_info(object_id))
487 1
            out.append(
488
                dict(
489
                    id=test.attrib.get('id'),
490
                    comment=test.attrib.get('comment'),
491
                    objects=objects,
492
                ))
493 1
        return out
494
495 1
    def get_info_about_test(self, id):
496 1
        for test in self.tests_info:
497 1
            if test['id'] == id:
498
                return test
499