Passed
Push — master ( 2dc7a6...07575f )
by Matěj
01:33 queued 11s
created

XmlParser.parse_data_to_dict()   A

Complexity

Conditions 3

Size

Total Lines 4
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 4
nop 2
dl 0
loc 4
ccs 4
cts 4
cp 1
crap 3
rs 10
c 0
b 0
f 0
1
'''
2
    Modules for create node IDs and parsing xml
3
'''
4 1
import uuid
5 1
import os
6 1
import sys
7
8 1
from lxml import etree as ET
9
10 1
from .oval_node import OvalNode
11
12 1
ns = {
13
    'XMLSchema': 'http://oval.mitre.org/XMLSchema/oval-results-5',
14
    'xccdf': 'http://checklists.nist.gov/xccdf/1.2',
15
    'arf': 'http://scap.nist.gov/schema/asset-reporting-format/1.1',
16
    'oval-definitions': 'http://oval.mitre.org/XMLSchema/oval-definitions-5',
17
    'scap': 'http://scap.nist.gov/schema/scap/source/1.2',
18
}
19
20
21 1
class XmlParser():
22 1
    def __init__(self, src):
23 1
        self.src = src
24 1
        self.tree = ET.parse(self.src)
25 1
        self.root = self.tree.getroot()
26 1
        if not self.validate(
27
                'schemas/arf/1.1/asset-reporting-format_1.1.0.xsd'):
28 1
            CRED = '\033[91m'
29 1
            CEND = '\033[0m'
30 1
            print(CRED + "Warning: This file is not valid arf report." + CEND, file=sys.stderr)
31 1
        try:
32 1
            self.used_rules = self._get_used_rules()
33 1
            self.notselected_rules = self._get_notselected_rules()
34 1
            self.scan_definitions = self._get_scan()
35 1
        except BaseException:
36 1
            raise ValueError("err- This is not arf report file.")
37
38 1
    def get_src(self, src):
39 1
        _dir = os.path.dirname(os.path.realpath(__file__))
40 1
        FIXTURE_DIR = os.path.join(_dir, src)
41 1
        return str(FIXTURE_DIR)
42
43 1
    def validate(self, xsd_path):
44 1
        xsd_path = self.get_src(xsd_path)
45 1
        xmlschema_doc = ET.parse(xsd_path)
46 1
        xmlschema = ET.XMLSchema(xmlschema_doc)
47
48 1
        xml_doc = self.tree
49 1
        result = xmlschema.validate(xml_doc)
50
51 1
        return result
52
53 1
    def get_data(self, href):
54 1
        report_data = None
55 1
        reports = self.root.find('.//arf:reports', ns)
56 1
        for report in reports:
57 1
            if "#" + str(report.get("id")) == href:
58 1
                report_data = report
59
60 1
        trees_data = report_data.find(
61
            ('.//XMLSchema:oval_results/XMLSchema:results/'
62
             'XMLSchema:system/XMLSchema:definitions'), ns)
63 1
        return trees_data
64
65 1
    def _get_used_rules(self):
66 1
        rulesResults = self.root.findall(
67
            './/xccdf:TestResult/xccdf:rule-result', ns)
68 1
        rules = []
69 1
        for ruleResult in rulesResults:
70 1
            result = ruleResult.find('.//xccdf:result', ns)
71 1
            if result.text != "notselected":
72 1
                check_content_ref = ruleResult.find(
73
                    './/xccdf:check/xccdf:check-content-ref', ns)
74 1
                if check_content_ref is not None:
75 1
                    rules.append(dict(
76
                        id_rule=ruleResult.get('idref'),
77
                        id_def=check_content_ref.attrib.get('name'),
78
                        href=check_content_ref.attrib.get('href'),
79
                        result=result.text,
80
                    ))
81 1
        return rules
82
83 1
    def _get_notselected_rules(self):
84 1
        rulesResults = self.root.findall(
85
            './/xccdf:TestResult/xccdf:rule-result', ns)
86 1
        rules = []
87 1
        for ruleResult in rulesResults:
88 1
            result = ruleResult.find('.//xccdf:result', ns)
89 1
            if result.text == "notselected":
90 1
                rules.append(dict(id_rule=ruleResult.get('idref')))
91 1
        return rules
92
93 1
    def _get_scan(self):
94 1
        scan = dict(definitions=[])
95 1
        for i in self.get_data(self.used_rules[0]['href']):
96 1
            scan['definitions'].append(self.build_graph(i))
97 1
        self.insert_comments(scan)
98 1
        return self._fill_extend_definition(scan)
99
100 1
    def parse_data_to_dict(self, rule_id):
101 1
        for definition in self.scan_definitions['definitions']:
102 1
            if self.get_def_id_by_rule_id(rule_id) == definition['id']:
103 1
                return dict(rule_id=rule_id, definition=definition)
104
105 1
    def _xml_dict_to_node(self, dict_of_definition):
106 1
        children = []
107 1
        for child in dict_of_definition['node']:
108 1
            if 'operator' in child and 'id':
109 1
                children.append(self._xml_dict_to_node(child))
110
            else:
111 1
                children.append(
112
                    OvalNode(
113
                        child['value_id'],
114
                        'value',
115
                        child['value'],
116
                        child['negate'],
117
                        child['comment'],
118
                        child['tag']
119
                    ))
120
121 1
        if 'id' in dict_of_definition:
122 1
            children[0].node_id = dict_of_definition['id']
123 1
            return children[0]
124
        else:
125 1
            return OvalNode(
126
                str(uuid.uuid4()),
127
                'operator',
128
                dict_of_definition['operator'],
129
                dict_of_definition['negate'],
130
                dict_of_definition['comment'],
131
                dict_of_definition['tag'],
132
                children,
133
            )
134
135 1
    def get_def_id_by_rule_id(self, rule_id):
136 1
        for rule in self.notselected_rules:
137 1
            if rule['id_rule'] == rule_id:
138 1
                raise ValueError(
139
                    'err- rule "{}" was not selected, so there are no results.'
140
                    .format(rule_id))
141 1
        for rule in self.used_rules:
142 1
            if rule['id_rule'] == rule_id:
143 1
                return rule['id_def']
144 1
        raise ValueError('err- 404 rule not found!')
145
146 1
    def get_rule_dict(self, rule_id):
147 1
        return self.parse_data_to_dict(rule_id)
148
149 1
    def xml_dict_of_rule_to_node(self, rule):
150 1
        dict_of_definition = rule['definition']
151 1
        return OvalNode(
152
            rule['rule_id'],
153
            'operator',
154
            'and',
155
            False,
156
            dict_of_definition['comment'],
157
            "Rule",
158
            [self._xml_dict_to_node(dict_of_definition)],
159
        )
160
161 1
    def get_oval_tree(self, rule_id=None):
162 1
        return self.xml_dict_of_rule_to_node(self.parse_data_to_dict(rule_id))
163
164 1
    def build_graph(self, tree_data):
165 1
        graph = dict(
166
            id=tree_data.get('definition_id'),
167
            node=[],
168
        )
169 1
        for tree in tree_data:
170 1
            negate_status = False
171 1
            if 'negate' in tree:
172
                negate_status = self._str_to_bool(tree.get('negate'))
173 1
            graph['negate'] = negate_status
174 1
            graph['node'].append(self._build_node(tree, "Definition"))
175 1
        return graph
176
177 1
    def _str_to_bool(self, s):
178 1
        if s == 'true':
179 1
            return True
180 1
        elif s == 'false':
181 1
            return False
182
        else:
183 1
            raise ValueError('err- negation is not bool')
184
185 1
    def _build_node(self, tree, tag):
186 1
        negate_status = False
187 1
        if tree.get('negate') is not None:
188 1
            negate_status = self._str_to_bool(tree.get('negate'))
189
190 1
        node = dict(
191
            operator=tree.get('operator'),
192
            negate=negate_status,
193
            result=tree.get('result'),
194
            comment=None,
195
            tag=tag,
196
            node=[],
197
        )
198 1
        for child in tree:
199 1
            if child.get('operator') is not None:
200 1
                node['node'].append(self._build_node(child, "Criteria"))
201
            else:
202 1
                negate_status = False
203 1
                if child.get('negate') is not None:
204 1
                    negate_status = self._str_to_bool(child.get('negate'))
205
206 1
                if child.get('definition_ref') is not None:
207 1
                    node['node'].append(
208
                        dict(
209
                            extend_definition=child.get('definition_ref'),
210
                            result=child.get('result'),
211
                            negate=negate_status,
212
                            comment=None,
213
                            tag="Extend definition",
214
                        ))
215
                else:
216 1
                    node['node'].append(
217
                        dict(
218
                            value_id=child.get('test_ref'),
219
                            value=child.get('result'),
220
                            negate=negate_status,
221
                            comment=None,
222
                            tag="Test",
223
                        ))
224 1
        return node
225
226 1
    def _fill_extend_definition(self, scan):
227 1
        out = dict(definitions=[])
228 1
        for definition in scan['definitions']:
229 1
            nodes = []
230 1
            for value in definition['node']:
231 1
                nodes.append(self._operator_as_child(value, scan))
232 1
            out['definitions'].append(
233
                dict(
234
                    id=definition['id'],
235
                    comment=definition['comment'],
236
                    node=nodes,
237
                ))
238 1
        return out
239
240 1
    def _operator_as_child(self, value, scan):
241 1
        out = dict(
242
            operator=value['operator'],
243
            negate=value['negate'],
244
            result=value['result'],
245
            comment=value['comment'],
246
            tag=value['tag'],
247
            node=[],
248
        )
249 1
        for child in value['node']:
250 1
            if 'operator' in child:
251 1
                out['node'].append(self._operator_as_child(child, scan))
252 1
            elif 'extend_definition' in child:
253 1
                out['node'].append(
254
                    self._find_definition_by_id(
255
                        scan,
256
                        child['extend_definition'],
257
                        child['negate'],
258
                        child['comment'],
259
                        child['tag'],
260
                    ))
261 1
            elif 'value_id' in child:
262 1
                out['node'].append(child)
263
            else:
264
                raise ValueError('error - unknown child')
265 1
        return out
266
267 1
    def _find_definition_by_id(self, scan, id, negate_status, comment, tag):
268 1
        for definition in scan['definitions']:
269 1
            if definition['id'] == id:
270 1
                definition['node'][0]['negate'] = negate_status
271 1
                definition['node'][0]['comment'] = comment
272 1
                definition['node'][0]['tag'] = tag
273 1
                return self._operator_as_child(definition['node'][0], scan)
274
275 1
    def create_dict_form_criteria(self, criteria, description):
276 1
        comments = dict(
277
            operator='AND' if criteria.get('operator') is None else criteria.get('operator'),
278
            comment=description if criteria.get('comment') is None else criteria.get('comment'),
279
            node=[],
280
        )
281 1
        for criterion in criteria:
282 1
            if criterion.get('operator'):
283 1
                comments['node'].append(
284
                    self.create_dict_form_criteria(criterion, None))
285
            else:
286 1
                if criterion.get('definition_ref'):
287 1
                    comments['node'].append(
288
                        dict(
289
                            extend_definition=criterion.get('definition_ref'),
290
                            comment=criterion.get('comment'),
291
                        ))
292
                else:
293 1
                    comments['node'].append(
294
                        dict(
295
                            value_id=criterion.get('test_ref'),
296
                            comment=criterion.get('comment'),
297
                        ))
298 1
        return comments
299
300 1
    def _prepare_definition_comments(self):
301 1
        oval_definitions = self.root.find(
302
            './/arf:report-requests/arf:report-request/'
303
            'arf:content/scap:data-stream-collection/'
304
            'scap:component/oval-definitions:oval_definitions/'
305
            'oval-definitions:definitions', ns)
306 1
        definitions = []
307 1
        for definition in oval_definitions:
308 1
            comment_definition = dict(
309
                id=definition.get('id'), comment=None, node=[])
310 1
            title = definition.find(
311
                './/oval-definitions:metadata/oval-definitions:title', ns)
312 1
            description = definition.find(
313
                './/oval-definitions:metadata/oval-definitions:description', ns)
314 1
            comment_definition['comment'] = title.text
315 1
            criteria = definition.find('.//oval-definitions:criteria', ns)
316 1
            comment_definition['node'].append(
317
                self.create_dict_form_criteria(criteria, description.text))
318 1
            definitions.append(comment_definition)
319 1
        return definitions
320
321 1
    def recursive_help_fill_comments(self, comments, nodes):
322 1
        out = nodes
323 1
        out['comment'] = comments['comment']
324 1
        for node, comment in zip(out['node'], comments['node']):
325 1
            node['comment'] = comment['comment']
326 1
            if 'operator' in node:
327 1
                self.recursive_help_fill_comments(comment, node)
328
329 1
    def fill_comment(self, comment_definition, data_definition):
330 1
        comments = comment_definition['node'][0]
331 1
        nodes = data_definition['node'][0]
332 1
        data_definition['comment'] = comment_definition['comment']
333 1
        self.recursive_help_fill_comments(comments, nodes)
334
335 1
    def insert_comments(self, data):
336 1
        comment_definitions = self._prepare_definition_comments()
337 1
        for data_definition in data['definitions']:
338 1
            for comment_definition in comment_definitions:
339 1
                if comment_definition['id'] == data_definition['id']:
340
                    self.fill_comment(comment_definition, data_definition)
341