Passed
Branch master (d3cca2)
by Jan
03:45
created

oval_graph.xml_parser.xml_parser.get_data()   A

Complexity

Conditions 3

Size

Total Lines 11
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 10
nop 2
dl 0
loc 11
ccs 8
cts 8
cp 1
crap 3
rs 9.9
c 0
b 0
f 0
1
'''
2
    Modules for create node IDs and parsing xml
3
'''
4
5 1
from lxml import etree as ET
6 1
import uuid
7 1
import oval_graph.oval_graph
8 1
import os
9
10 1
ns = {
11
    'XMLSchema': 'http://oval.mitre.org/XMLSchema/oval-results-5',
12
    'xccdf': 'http://checklists.nist.gov/xccdf/1.2',
13
    'arf': 'http://scap.nist.gov/schema/asset-reporting-format/1.1',
14
    'oval-definitions': 'http://oval.mitre.org/XMLSchema/oval-definitions-5',
15
    'scap': 'http://scap.nist.gov/schema/scap/source/1.2',
16
}
17
18
19 1
class xml_parser():
20 1
    def __init__(self, src):
21 1
        self.src = src
22 1
        self.tree = ET.parse(self.src)
23 1
        self.root = self.tree.getroot()
24 1
        if not self.validate(
25
                'schemas/arf/1.1/asset-reporting-format_1.1.0.xsd'):
26 1
            raise ValueError("err- This is not arf report file.")
27
28 1
    def get_src(self, src):
29 1
        _dir = os.path.dirname(os.path.realpath(__file__))
30 1
        FIXTURE_DIR = os.path.join(_dir, src)
31 1
        return str(FIXTURE_DIR)
32
33 1
    def validate(self, xsd_path):
34 1
        xsd_path = self.get_src(xsd_path)
35 1
        xmlschema_doc = ET.parse(xsd_path)
36 1
        xmlschema = ET.XMLSchema(xmlschema_doc)
37
38 1
        xml_doc = self.tree
39 1
        result = xmlschema.validate(xml_doc)
40
41 1
        return result
42
43 1
    def get_data(self, href):
44 1
        report_data = None
45 1
        reports = self.root.find('.//arf:reports', ns)
46 1
        for report in reports:
47 1
            if "#" + str(report.get("id")) == href:
48 1
                report_data = report
49
50 1
        trees_data = report_data.find(
51
            ('.//XMLSchema:oval_results/XMLSchema:results/'
52
             'XMLSchema:system/XMLSchema:definitions'), ns)
53 1
        return trees_data
54
55 1
    def get_used_rules(self):
56 1
        rulesResults = self.root.findall(
57
            './/xccdf:TestResult/xccdf:rule-result', ns)
58 1
        rules = []
59 1
        for ruleResult in rulesResults:
60 1
            result = ruleResult.find('.//xccdf:result', ns)
61 1
            if result.text != "notselected":
62 1
                check_content_ref = ruleResult.find(
63
                    './/xccdf:check/xccdf:check-content-ref', ns)
64 1
                if check_content_ref is not None:
65 1
                    rules.append(dict(
66
                        id_rule=ruleResult.get('idref'),
67
                        id_def=check_content_ref.attrib.get('name'),
68
                        href=check_content_ref.attrib.get('href'),
69
                        result=result.text,
70
                    ))
71 1
        return rules
72
73 1
    def get_notselected_rules(self):
74 1
        rulesResults = self.root.findall(
75
            './/xccdf:TestResult/xccdf:rule-result', ns)
76 1
        rules = []
77 1
        for ruleResult in rulesResults:
78 1
            result = ruleResult.find('.//xccdf:result', ns)
79 1
            if result.text == "notselected":
80 1
                rules.append(dict(id_rule=ruleResult.get('idref')))
81 1
        return rules
82
83 1
    def parse_data_to_dict(self, rule_id):
84 1
        scan = dict(definitions=[])
85 1
        used_rules = self.get_used_rules()
86 1
        for i in self.get_data(used_rules[0]['href']):
87 1
            scan['definitions'].append(self.build_graph(i))
88 1
        self.insert_comments(scan)
89 1
        definitions = self._fill_extend_definition(scan)
90 1
        for definition in definitions['definitions']:
91 1
            if self.get_def_id_by_rule_id(rule_id) == definition['id']:
92 1
                return dict(rule_id=rule_id, definition=definition)
93
94 1
    def _xml_dict_to_node(self, dict_of_definition):
95 1
        children = []
96 1
        for child in dict_of_definition['node']:
97 1
            if 'operator' in child and 'id':
98 1
                children.append(self._xml_dict_to_node(child))
99
            else:
100 1
                children.append(
101
                    oval_graph.oval_graph.OvalNode(
102
                        child['value_id'],
103
                        'value',
104
                        child['value'],
105
                        child['negate'],
106
                        child['comment'],
107
                    ))
108
109 1
        if 'id' in dict_of_definition:
110 1
            children[0].node_id = dict_of_definition['id']
111 1
            return children[0]
112
        else:
113 1
            return oval_graph.oval_graph.OvalNode(
114
                str(uuid.uuid4()),
115
                'operator',
116
                dict_of_definition['operator'],
117
                dict_of_definition['negate'],
118
                dict_of_definition['comment'],
119
                children,
120
            )
121
122 1
    def get_def_id_by_rule_id(self, rule_id):
123 1
        used_rules = self.get_used_rules()
124 1
        notselected_rules = self.get_notselected_rules()
125 1
        for rule in notselected_rules:
126 1
            if rule['id_rule'] == rule_id:
127 1
                raise ValueError(
128
                    'err- rule "{}" was not selected, so there are no results.'
129
                   .format(rule_id))
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation (add 1 space).
Loading history...
130 1
        for rule in used_rules:
131 1
            if rule['id_rule'] == rule_id:
132 1
                return rule['id_def']
133 1
        raise ValueError('err- 404 rule not found!')
134
135 1
    def get_rule_dict(self, rule_id):
136 1
        return self.parse_data_to_dict(rule_id)
137
138 1
    def xml_dict_of_rule_to_node(self, rule):
139 1
        dict_of_definition = rule['definition']
140 1
        return oval_graph.oval_graph.OvalNode(
141
            rule['rule_id'],
142
            'operator',
143
            'and',
144
            False,
145
            dict_of_definition['comment'],
146
            [self._xml_dict_to_node(dict_of_definition)],
147
        )
148
149 1
    def get_oval_graph(self, rule_id=None):
150 1
        return self.xml_dict_of_rule_to_node(self.parse_data_to_dict(rule_id))
151
152 1
    def build_graph(self, tree_data):
153 1
        graph = dict(
154
            id=tree_data.get('definition_id'),
155
            node=[],
156
        )
157 1
        for tree in tree_data:
158 1
            negate_status = False
159 1
            if 'negate' in tree:
160
                negate_status = self._str_to_bool(tree.get('negate'))
161 1
            graph['negate'] = negate_status
162 1
            graph['node'].append(self._build_node(tree))
163 1
        return graph
164
165 1
    def _str_to_bool(self, s):
166 1
        if s == 'true':
167 1
            return True
168 1
        elif s == 'false':
169 1
            return False
170
        else:
171 1
            raise ValueError('err- negation is not bool')
172
173 1
    def _build_node(self, tree):
174 1
        negate_status = False
175 1
        if tree.get('negate') is not None:
176 1
            negate_status = self._str_to_bool(tree.get('negate'))
177
178 1
        node = dict(
179
            operator=tree.get('operator'),
180
            negate=negate_status,
181
            result=tree.get('result'),
182
            comment=None,
183
            node=[],
184
        )
185 1
        for child in tree:
186 1
            if child.get('operator') is not None:
187 1
                node['node'].append(self._build_node(child))
188
            else:
189 1
                negate_status = False
190 1
                if child.get('negate') is not None:
191 1
                    negate_status = self._str_to_bool(child.get('negate'))
192
193 1
                if child.get('definition_ref') is not None:
194 1
                    node['node'].append(
195
                        dict(
196
                            extend_definition=child.get('definition_ref'),
197
                            result=child.get('result'),
198
                            negate=negate_status,
199
                            comment=None,
200
                        ))
201
                else:
202 1
                    node['node'].append(
203
                        dict(
204
                            value_id=child.get('test_ref'),
205
                            value=child.get('result'),
206
                            negate=negate_status,
207
                            comment=None,
208
                        ))
209 1
        return node
210
211 1
    def _fill_extend_definition(self, scan):
212 1
        out = dict(definitions=[])
213 1
        for definition in scan['definitions']:
214 1
            nodes = []
215 1
            for value in definition['node']:
216 1
                nodes.append(self._operator_as_child(value, scan))
217 1
            out['definitions'].append(
218
                dict(
219
                    id=definition['id'],
220
                    comment=definition['comment'],
221
                    node=nodes,
222
                    ))
223 1
        return out
224
225 1
    def _operator_as_child(self, value, scan):
226 1
        out = dict(
227
            operator=value['operator'],
228
            negate=value['negate'],
229
            result=value['result'],
230
            comment=value['comment'],
231
            node=[],
232
        )
233 1
        for child in value['node']:
234 1
            if 'operator' in child:
235 1
                out['node'].append(self._operator_as_child(child, scan))
236 1
            elif 'extend_definition' in child:
237 1
                out['node'].append(
238
                    self._find_definition_by_id(
239
                        scan,
240
                        child['extend_definition'],
241
                        child['negate'],
242
                        child['comment'],
243
                    ))
244 1
            elif 'value_id' in child:
245 1
                out['node'].append(child)
246
            else:
247
                raise ValueError('error - unknown child')
248 1
        return out
249
250 1
    def _find_definition_by_id(self, scan, id, negate_status, comment):
251 1
        for definition in scan['definitions']:
252 1
            if definition['id'] == id:
253 1
                definition['node'][0]['negate'] = negate_status
254 1
                definition['node'][0]['comment'] = comment
255 1
                return self._operator_as_child(definition['node'][0], scan)
256
257 1
    def create_dict_form_criteria(self, criteria):
258 1
        comments = dict(
259
            operator='AND' if criteria.get('operator') is None else criteria.get('operator'),
260
            comment=criteria.get('comment'),
261
            node=[],
262
        )
263 1
        for criterion in criteria:
264 1
            if criterion.get('operator'):
265 1
                comments['node'].append(
266
                    self.create_dict_form_criteria(criterion))
267
            else:
268 1
                if criterion.get('definition_ref'):
269 1
                    comments['node'].append(
270
                        dict(
271
                            extend_definition=criterion.get('definition_ref'),
272
                            comment=criterion.get('comment'),
273
                        ))
274
                else:
275 1
                    comments['node'].append(
276
                        dict(
277
                            value_id=criterion.get('test_ref'),
278
                            comment=criterion.get('comment'),
279
                        ))
280 1
        return comments
281
282 1
    def prepare_definition_comments(self, oval_definitions):
283 1
        definitions = []
284 1
        for definition in oval_definitions:
285 1
            comment_definition = dict(
286
                id=definition.get('id'), comment=None, node=[])
287 1
            title = definition.find(
288
                './/oval-definitions:metadata/oval-definitions:title', ns)
289 1
            comment_definition['comment'] = title.text
290 1
            criteria = definition.find('.//oval-definitions:criteria', ns)
291 1
            comment_definition['node'].append(
292
                self.create_dict_form_criteria(criteria))
293 1
            definitions.append(comment_definition)
294 1
        return definitions
295
296 1
    def is_definition_in_array(self, definition_, array):
297
        for definition in array:
298
            if definition_['id'] == definition['id']:
299
                return True
300
        return False
301
302 1
    def recursive_help_fill_comments(self, comments, nodes):
303 1
        out = nodes
304 1
        out['comment'] = comments['comment']
305 1
        for node, comment in zip(out['node'], comments['node']):
306 1
            node['comment'] = comment['comment']
307 1
            if 'operator' in node:
308 1
                self.recursive_help_fill_comments(comment, node)
309
310 1
    def fill_comment(self, comment_definition, data_definition):
311 1
        comments = comment_definition['node'][0]
312 1
        nodes = data_definition['node'][0]
313 1
        data_definition['comment'] = comment_definition['comment']
314 1
        self.recursive_help_fill_comments(comments, nodes)
315
316 1
    def insert_comments(self, data):
317 1
        oval_def = self.root.find(
318
            './/arf:report-requests/arf:report-request/'
319
            'arf:content/scap:data-stream-collection/'
320
            'scap:component/oval-definitions:oval_definitions/'
321
            'oval-definitions:definitions', ns)
322 1
        comment_definitions = self.prepare_definition_comments(oval_def)
323
324 1
        for data_definition in data['definitions']:
325 1
            for comment_definition in comment_definitions:
326 1
                if comment_definition['id'] == data_definition['id']:
327
                    self.fill_comment(comment_definition, data_definition)
328