Passed
Push — master ( 6551ad...b5b667 )
by Matěj
02:53 queued 12s
created

graph.xml_parser.xml_parser.insert_comments()   A

Complexity

Conditions 4

Size

Total Lines 12
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 4

Importance

Changes 0
Metric Value
cc 4
eloc 9
nop 2
dl 0
loc 12
ccs 7
cts 7
cp 1
crap 4
rs 9.95
c 0
b 0
f 0
1
'''
2
    Modules for create node IDs and parsing xml
3
'''
4
5 1
from lxml import etree as ET
6 1
import uuid
7 1
import graph.oval_graph
8
9 1
ns = {
10
    'XMLSchema': 'http://oval.mitre.org/XMLSchema/oval-results-5',
11
    'xccdf': 'http://checklists.nist.gov/xccdf/1.2',
12
    'arf': 'http://scap.nist.gov/schema/asset-reporting-format/1.1',
13
    'oval-definitions': 'http://oval.mitre.org/XMLSchema/oval-definitions-5',
14
    'scap': 'http://scap.nist.gov/schema/scap/source/1.2',
15
}
16
17
18 1
class xml_parser():
19 1
    def __init__(self, src):
20 1
        self.src = src
21 1
        self.tree = ET.parse(self.src)
22 1
        self.root = self.tree.getroot()
23 1
        if not self.validate(
24
                './schemas/arf/1.1/asset-reporting-format_1.1.0.xsd'):
25 1
            raise ValueError("err- This is not arf report file.")
26
27 1
    def validate(self, xsd_path):
28 1
        xmlschema_doc = ET.parse(xsd_path)
29 1
        xmlschema = ET.XMLSchema(xmlschema_doc)
30
31 1
        xml_doc = self.tree
32 1
        result = xmlschema.validate(xml_doc)
33
34 1
        return result
35
36 1
    def get_data(self, href):
37 1
        report_data = None
38 1
        reports = self.root.find('.//arf:reports', ns)
39 1
        for report in reports:
40 1
            if "#" + str(report.get("id")) == href:
41 1
                report_data = report
42
43 1
        trees_data = report_data.find(
44
            ('.//XMLSchema:oval_results/XMLSchema:results/'
45
             'XMLSchema:system/XMLSchema:definitions'), ns)
46 1
        return trees_data
47
48 1
    def get_used_rules(self):
49 1
        rulesResults = self.root.findall(
50
            './/xccdf:TestResult/xccdf:rule-result', ns)
51 1
        rules = []
52 1
        for ruleResult in rulesResults:
53 1
            result = ruleResult.find('.//xccdf:result', ns)
54 1
            if result.text != "notselected":
55 1
                check_content_ref = ruleResult.find(
56
                    './/xccdf:check/xccdf:check-content-ref', ns)
57 1
                if check_content_ref is not None:
58 1
                    rules.append(dict(
59
                        id_rule=ruleResult.get('idref'),
60
                        id_def=check_content_ref.attrib.get('name'),
61
                        href=check_content_ref.attrib.get('href'),
62
                        result=result.text,
63
                    ))
64 1
        return rules
65
66 1
    def get_notselected_rules(self):
67 1
        rulesResults = self.root.findall(
68
            './/xccdf:TestResult/xccdf:rule-result', ns)
69 1
        rules = []
70 1
        for ruleResult in rulesResults:
71 1
            result = ruleResult.find('.//xccdf:result', ns)
72 1
            if result.text == "notselected":
73 1
                rules.append(dict(id_rule=ruleResult.get('idref')))
74 1
        return rules
75
76 1
    def parse_data_to_dict(self, rule_id):
77 1
        scan = dict(definitions=[])
78 1
        used_rules = self.get_used_rules()
79 1
        for i in self.get_data(used_rules[0]['href']):
80 1
            scan['definitions'].append(self.build_graph(i))
81 1
        self.insert_comments(scan)
82 1
        definitions = self._fill_extend_definition(scan)
83 1
        for definition in definitions['definitions']:
84 1
            if self.get_def_id_by_rule_id(rule_id) == definition['id']:
85 1
                return dict(rule_id=rule_id, definition=definition)
86
87 1
    def _xml_dict_to_node(self, dict_of_definition):
88 1
        children = []
89 1
        for child in dict_of_definition['node']:
90 1
            if 'operator' in child and 'id':
91 1
                children.append(self._xml_dict_to_node(child))
92
            else:
93 1
                children.append(
94
                    graph.oval_graph.OvalNode(
95
                        child['value_id'],
96
                        'value',
97
                        child['value'],
98
                        child['negate'],
99
                        child['comment'],
100
                    ))
101
102 1
        if 'id' in dict_of_definition:
103 1
            children[0].node_id = dict_of_definition['id']
104 1
            return children[0]
105
        else:
106 1
            return graph.oval_graph.OvalNode(
107
                str(uuid.uuid4()),
108
                'operator',
109
                dict_of_definition['operator'],
110
                dict_of_definition['negate'],
111
                dict_of_definition['comment'],
112
                children,
113
            )
114
115 1
    def get_def_id_by_rule_id(self, rule_id):
116 1
        used_rules = self.get_used_rules()
117 1
        notselected_rules = self.get_notselected_rules()
118 1
        for rule in notselected_rules:
119 1
            if rule['id_rule'] == rule_id:
120 1
                raise ValueError(
121
                    'err- rule "{}" was not selected, so there are no results.'
122
                    .format(rule_id))
123 1
        for rule in used_rules:
124 1
            if rule['id_rule'] == rule_id:
125 1
                return rule['id_def']
126 1
        raise ValueError('err- 404 rule not found!')
127
128 1
    def get_rule_dict(self, rule_id):
129 1
        return self.parse_data_to_dict(rule_id)
130
131 1
    def xml_dict_of_rule_to_node(self, rule):
132 1
        dict_of_definition = rule['definition']
133 1
        return graph.oval_graph.OvalNode(
134
            rule['rule_id'],
135
            'operator',
136
            'and',
137
            False,
138
            None,
139
            [self._xml_dict_to_node(dict_of_definition)],
140
        )
141
142 1
    def get_oval_graph(self, rule_id=None):
143 1
        return self.xml_dict_of_rule_to_node(self.parse_data_to_dict(rule_id))
144
145 1
    def build_graph(self, tree_data):
146 1
        graph = dict(
147
            id=tree_data.get('definition_id'),
148
            node=[],
149
        )
150 1
        for tree in tree_data:
151 1
            negate_status = False
152 1
            if 'negate' in tree:
153
                negate_status = self._str_to_bool(tree.get('negate'))
154 1
            graph['negate'] = negate_status
155 1
            graph['node'].append(self._build_node(tree))
156 1
        return graph
157
158 1
    def _str_to_bool(self, s):
159 1
        if s == 'true':
160 1
            return True
161 1
        elif s == 'false':
162 1
            return False
163
        else:
164 1
            raise ValueError('err- negation is not bool')
165
166 1
    def _build_node(self, tree):
167 1
        negate_status = False
168 1
        if tree.get('negate') is not None:
169 1
            negate_status = self._str_to_bool(tree.get('negate'))
170
171 1
        node = dict(
172
            operator=tree.get('operator'),
173
            negate=negate_status,
174
            result=tree.get('result'),
175
            comment=None,
176
            node=[],
177
        )
178 1
        for child in tree:
179 1
            if child.get('operator') is not None:
180 1
                node['node'].append(self._build_node(child))
181
            else:
182 1
                negate_status = False
183 1
                if child.get('negate') is not None:
184 1
                    negate_status = self._str_to_bool(child.get('negate'))
185
186 1
                if child.get('definition_ref') is not None:
187 1
                    node['node'].append(
188
                        dict(
189
                            extend_definition=child.get('definition_ref'),
190
                            result=child.get('result'),
191
                            negate=negate_status,
192
                            comment=None,
193
                        ))
194
                else:
195 1
                    node['node'].append(
196
                        dict(
197
                            value_id=child.get('test_ref'),
198
                            value=child.get('result'),
199
                            negate=negate_status,
200
                            comment=None,
201
                        ))
202 1
        return node
203
204 1
    def _fill_extend_definition(self, scan):
205 1
        out = dict(definitions=[])
206 1
        for definition in scan['definitions']:
207 1
            nodes = []
208 1
            for value in definition['node']:
209 1
                nodes.append(self._operator_as_child(value, scan))
210 1
            out['definitions'].append(dict(id=definition['id'], node=nodes))
211 1
        return out
212
213 1
    def _operator_as_child(self, value, scan):
214 1
        out = dict(
215
            operator=value['operator'],
216
            negate=value['negate'],
217
            result=value['result'],
218
            comment=value['comment'],
219
            node=[],
220
        )
221 1
        for child in value['node']:
222 1
            if 'operator' in child:
223 1
                out['node'].append(self._operator_as_child(child, scan))
224 1
            elif 'extend_definition' in child:
225 1
                out['node'].append(
226
                    self._find_definition_by_id(
227
                        scan,
228
                        child['extend_definition'],
229
                        child['negate'],
230
                        child['comment'],
231
                    ))
232 1
            elif 'value_id' in child:
233 1
                out['node'].append(child)
234
            else:
235
                raise ValueError('error - unknown child')
236 1
        return out
237
238 1
    def _find_definition_by_id(self, scan, id, negate_status, comment):
239 1
        for definition in scan['definitions']:
240 1
            if definition['id'] == id:
241 1
                definition['node'][0]['negate'] = negate_status
242 1
                definition['node'][0]['comment'] = comment
243 1
                return self._operator_as_child(definition['node'][0], scan)
244
245 1
    def create_dict_form_criteria(self, criteria):
246 1
        comments = dict(
247
            operator='AND' if criteria.get('operator') is None else criteria.get('operator'),
248
            comment=criteria.get('comment'),
249
            node=[],
250
        )
251 1
        for criterion in criteria:
252 1
            if criterion.get('operator'):
253 1
                comments['node'].append(
254
                    self.create_dict_form_criteria(criterion))
255
            else:
256 1
                if criterion.get('definition_ref'):
257 1
                    comments['node'].append(
258
                        dict(
259
                            extend_definition=criterion.get('definition_ref'),
260
                            comment=criterion.get('comment'),
261
                        ))
262
                else:
263 1
                    comments['node'].append(
264
                        dict(
265
                            value_id=criterion.get('test_ref'),
266
                            comment=criterion.get('comment'),
267
                        ))
268 1
        return comments
269
270 1
    def prepare_definition_comments(self, oval_definitions):
271 1
        definitions = []
272 1
        for definition in oval_definitions:
273 1
            comment_definition = dict(id=definition.get('id'), node=[])
274 1
            criteria = definition.find('.//oval-definitions:criteria', ns)
275 1
            comment_definition['node'].append(
276
                self.create_dict_form_criteria(criteria))
277 1
            definitions.append(comment_definition)
278 1
        return definitions
279
280 1
    def is_definition_in_array(self, definition_, array):
281
        for definition in array:
282
            if definition_['id'] == definition['id']:
283
                return True
284
        return False
285
286 1
    def recursive_help_fill_comments(self, comments, nodes):
287 1
        out = nodes
288 1
        out['comment'] = comments['comment']
289 1
        for node, comment in zip(out['node'], comments['node']):
290 1
            node['comment'] = comment['comment']
291 1
            if 'operator' in node:
292 1
                self.recursive_help_fill_comments(comment, node)
293
294 1
    def fill_comment(self, comment_definition, data_definition):
295 1
        comments = comment_definition['node'][0]
296 1
        nodes = data_definition['node'][0]
297 1
        self.recursive_help_fill_comments(comments, nodes)
298
299 1
    def insert_comments(self, data):
300 1
        oval_def = self.root.find(
301
            './/arf:report-requests/arf:report-request/'
302
            'arf:content/scap:data-stream-collection/'
303
            'scap:component/oval-definitions:oval_definitions/'
304
            'oval-definitions:definitions', ns)
305 1
        comment_definitions = self.prepare_definition_comments(oval_def)
306
307 1
        for data_definition in data['definitions']:
308 1
            for comment_definition in comment_definitions:
309 1
                if comment_definition['id'] == data_definition['id']:
310
                    self.fill_comment(comment_definition, data_definition)
311