Passed
Pull Request — master (#9)
by Jan
03:42 queued 01:03
created

graph.xml_parser.xml_parser.get_oval_graph()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 2
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
nop 2
crap 1
1
'''
2
    Modules for create node IDs and parsing xml
3
'''
4
5 1
from lxml import etree as ET
6 1
import uuid
7 1
import graph.oval_graph
8
9
10 1
class xml_parser():
11 1
    def __init__(self, src):
12 1
        self.src = src
13 1
        self.tree = ET.parse(self.src)
14 1
        self.root = self.tree.getroot()
15 1
        if not self.validate(
16
                './schemas/arf/1.1/asset-reporting-format_1.1.0.xsd'):
17
            raise ValueError("err- This is not arf report file.")
18
19 1
    def validate(self, xsd_path):
20 1
        xmlschema_doc = ET.parse(xsd_path)
21 1
        xmlschema = ET.XMLSchema(xmlschema_doc)
22
23 1
        xml_doc = self.tree
24 1
        result = xmlschema.validate(xml_doc)
25
26 1
        return result
27
28 1
    def get_data(self, href):
29 1
        ns = {
30
            'ns0': 'http://oval.mitre.org/XMLSchema/oval-results-5',
31
            'ns1': 'http://scap.nist.gov/schema/asset-reporting-format/1.1'
32
        }
33
34 1
        report_data = None
35 1
        reports = self.root.find('.//ns1:reports', ns)
36 1
        if reports is None:
37
            raise ValueError("err- In file is missing arf reports")
38 1
        for report in reports:
39 1
            if "#" + str(report.get("id")) == href:
40 1
                report_data = report
41
42 1
        trees_data = report_data.find(
43
            './/ns0:oval_results/ns0:results/ns0:system/ns0:definitions', ns)
44 1
        return trees_data
45
46 1
    def get_used_rules(self):
47 1
        ns = {
48
            'ns0': 'http://checklists.nist.gov/xccdf/1.2',
49
        }
50 1
        rulesResults = self.root.findall(
51
            './/ns0:TestResult/ns0:rule-result', ns)
52 1
        rules = []
53 1
        for ruleResult in rulesResults:
54 1
            result = ruleResult.find('.//ns0:result', ns)
55 1
            if result.text != "notselected":
56 1
                check_content_ref = ruleResult.find(
57
                    './/ns0:check/ns0:check-content-ref', ns)
58 1
                if check_content_ref is not None:
59 1
                    rules.append(dict(
60
                        id_rule=ruleResult.get('idref'),
61
                        id_def=check_content_ref.attrib.get('name'),
62
                        href=check_content_ref.attrib.get('href'),
63
                        result=result.text))
64 1
        return rules
65
66 1
    def parse_data_to_dict(self, rule_id):
67 1
        scan = dict(definitions=[])
68 1
        used_rules = self.get_used_rules()
69 1
        for i in self.get_data(used_rules[0]['href']):
70 1
            scan['definitions'].append(self.build_graph(i))
71
72 1
        definitions = self._fill_extend_definition(scan)
73 1
        for definition in definitions['definitions']:
74 1
            if self.get_def_id_by_rule_id(rule_id) == definition['id']:
75 1
                return dict(rule_id=rule_id, definition=definition)
76
77 1
    def _xml_dict_to_node(self, dict_of_definition):
78 1
        children = []
79 1
        for child in dict_of_definition['node']:
80 1
            if 'operator' in child and 'id':
81 1
                children.append(self._xml_dict_to_node(child))
82
            else:
83 1
                children.append(
84
                    graph.oval_graph.OvalNode(
85
                        child['value_id'],
86
                        'value',
87
                        child['value']))
88
89 1
        if 'id' in dict_of_definition:
90 1
            children[0].node_id = dict_of_definition['id']
91 1
            return children[0]
92
        else:
93 1
            return graph.oval_graph.OvalNode(
94
                str(uuid.uuid4()),
95
                'operator',
96
                dict_of_definition['operator'],
97
                children
98
            )
99
100 1
    def get_def_id_by_rule_id(self, rule_id):
101 1
        used_rules = self.get_used_rules()
102 1
        for rule in used_rules:
103 1
            if rule['id_rule'] == rule_id:
104 1
                return rule['id_def']
105 1
        raise ValueError('err- 404 rule not found!')
106
107 1
    def xml_dict_of_rule_to_node(self, rule):
108 1
        dict_of_definition = rule['definition']
109 1
        return graph.oval_graph.OvalNode(
110
            rule['rule_id'],
111
            'operator',
112
            'and',
113
            [self._xml_dict_to_node(dict_of_definition)]
114
        )
115
116 1
    def get_oval_graph(self, rule_id=None):
117 1
        return self.xml_dict_of_rule_to_node(self.parse_data_to_dict(rule_id))
118
119 1
    def build_graph(self, tree_data):
120 1
        graph = dict(id=tree_data.get('definition_id'), node=[])
121 1
        for tree in tree_data:
122 1
            graph['node'].append(self._build_node(tree))
123 1
        return graph
124
125 1
    def _build_node(self, tree):
126 1
        node = dict(operator=tree.get('operator'), node=[])
127 1
        for child in tree:
128 1
            if child.get('operator') is not None:
129 1
                node['node'].append(self._build_node(child))
130
            else:
131 1
                if child.get('definition_ref') is not None:
132 1
                    node['node'].append(
133
                        dict(extend_definition=child.get('definition_ref')))
134
                else:
135 1
                    node['node'].append(
136
                        dict(
137
                            value_id=child.get('test_ref'),
138
                            value=child.get('result')))
139 1
        return node
140
141 1
    def _fill_extend_definition(self, scan):
142 1
        out = dict(definitions=[])
143 1
        for definition in scan['definitions']:
144 1
            nodes = []
145 1
            for value in definition['node']:
146 1
                nodes.append(self._operator_as_child(value, scan))
147 1
            out['definitions'].append(dict(id=definition['id'], node=nodes))
148 1
        return out
149
150 1
    def _operator_as_child(self, value, scan):
151 1
        out = dict(operator=value['operator'], node=[])
152 1
        for child in value['node']:
153 1
            if 'operator' in child:
154 1
                out['node'].append(self._operator_as_child(child, scan))
155 1
            elif 'extend_definition' in child:
156 1
                out['node'].append(
157
                    self._find_definition_by_id(
158
                        scan, child['extend_definition']))
159 1
            elif 'value_id' in child:
160 1
                out['node'].append(child)
161
            else:
162
                raise ValueError('error - unknown child')
163 1
        return out
164
165 1
    def _find_definition_by_id(self, scan, id):
166 1
        for definition in scan['definitions']:
167 1
            if definition['id'] == id:
168
                return self._operator_as_child(definition['node'][0], scan)
169