Passed
Push — master ( 6f2783...987229 )
by Matěj
01:45 queued 13s
created

XmlParser._fill_extend_definition()   A

Complexity

Conditions 3

Size

Total Lines 13
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 12
nop 2
dl 0
loc 13
ccs 8
cts 8
cp 1
crap 3
rs 9.8
c 0
b 0
f 0
1
'''
2
    Modules for create node IDs and parsing xml
3
'''
4 1
import uuid
5 1
import os
6
7 1
from lxml import etree as ET
8
9 1
from .oval_node import OvalNode
10
11 1
ns = {
12
    'XMLSchema': 'http://oval.mitre.org/XMLSchema/oval-results-5',
13
    'xccdf': 'http://checklists.nist.gov/xccdf/1.2',
14
    'arf': 'http://scap.nist.gov/schema/asset-reporting-format/1.1',
15
    'oval-definitions': 'http://oval.mitre.org/XMLSchema/oval-definitions-5',
16
    'scap': 'http://scap.nist.gov/schema/scap/source/1.2',
17
}
18
19
20 1
class XmlParser():
21 1
    def __init__(self, src):
22 1
        self.src = src
23 1
        self.tree = ET.parse(self.src)
24 1
        self.root = self.tree.getroot()
25 1
        if not self.validate(
26
                'schemas/arf/1.1/asset-reporting-format_1.1.0.xsd'):
27 1
            raise ValueError("err- This is not arf report file.")
28 1
        self.used_rules = self._get_used_rules()
29 1
        self.notselected_rules = self._get_notselected_rules()
30 1
        self.scan_definitions = self._get_scan()
31
32 1
    def get_src(self, src):
33 1
        _dir = os.path.dirname(os.path.realpath(__file__))
34 1
        FIXTURE_DIR = os.path.join(_dir, src)
35 1
        return str(FIXTURE_DIR)
36
37 1
    def validate(self, xsd_path):
38 1
        xsd_path = self.get_src(xsd_path)
39 1
        xmlschema_doc = ET.parse(xsd_path)
40 1
        xmlschema = ET.XMLSchema(xmlschema_doc)
41
42 1
        xml_doc = self.tree
43 1
        result = xmlschema.validate(xml_doc)
44
45 1
        return result
46
47 1
    def get_data(self, href):
48 1
        report_data = None
49 1
        reports = self.root.find('.//arf:reports', ns)
50 1
        for report in reports:
51 1
            if "#" + str(report.get("id")) == href:
52 1
                report_data = report
53
54 1
        trees_data = report_data.find(
55
            ('.//XMLSchema:oval_results/XMLSchema:results/'
56
             'XMLSchema:system/XMLSchema:definitions'), ns)
57 1
        return trees_data
58
59 1
    def _get_used_rules(self):
60 1
        rulesResults = self.root.findall(
61
            './/xccdf:TestResult/xccdf:rule-result', ns)
62 1
        rules = []
63 1
        for ruleResult in rulesResults:
64 1
            result = ruleResult.find('.//xccdf:result', ns)
65 1
            if result.text != "notselected":
66 1
                check_content_ref = ruleResult.find(
67
                    './/xccdf:check/xccdf:check-content-ref', ns)
68 1
                if check_content_ref is not None:
69 1
                    rules.append(dict(
70
                        id_rule=ruleResult.get('idref'),
71
                        id_def=check_content_ref.attrib.get('name'),
72
                        href=check_content_ref.attrib.get('href'),
73
                        result=result.text,
74
                    ))
75 1
        return rules
76
77 1
    def _get_notselected_rules(self):
78 1
        rulesResults = self.root.findall(
79
            './/xccdf:TestResult/xccdf:rule-result', ns)
80 1
        rules = []
81 1
        for ruleResult in rulesResults:
82 1
            result = ruleResult.find('.//xccdf:result', ns)
83 1
            if result.text == "notselected":
84 1
                rules.append(dict(id_rule=ruleResult.get('idref')))
85 1
        return rules
86
87 1
    def _get_scan(self):
88 1
        scan = dict(definitions=[])
89 1
        for i in self.get_data(self.used_rules[0]['href']):
90 1
            scan['definitions'].append(self.build_graph(i))
91 1
        self.insert_comments(scan)
92 1
        return self._fill_extend_definition(scan)
93
94 1
    def parse_data_to_dict(self, rule_id):
95 1
        for definition in self.scan_definitions['definitions']:
96 1
            if self.get_def_id_by_rule_id(rule_id) == definition['id']:
97 1
                return dict(rule_id=rule_id, definition=definition)
98
99 1
    def _xml_dict_to_node(self, dict_of_definition):
100 1
        children = []
101 1
        for child in dict_of_definition['node']:
102 1
            if 'operator' in child and 'id':
103 1
                children.append(self._xml_dict_to_node(child))
104
            else:
105 1
                children.append(
106
                    OvalNode(
107
                        child['value_id'],
108
                        'value',
109
                        child['value'],
110
                        child['negate'],
111
                        child['comment'],
112
                    ))
113
114 1
        if 'id' in dict_of_definition:
115 1
            children[0].node_id = dict_of_definition['id']
116 1
            return children[0]
117
        else:
118 1
            return OvalNode(
119
                str(uuid.uuid4()),
120
                'operator',
121
                dict_of_definition['operator'],
122
                dict_of_definition['negate'],
123
                dict_of_definition['comment'],
124
                children,
125
            )
126
127 1
    def get_def_id_by_rule_id(self, rule_id):
128 1
        for rule in self.notselected_rules:
129 1
            if rule['id_rule'] == rule_id:
130 1
                raise ValueError(
131
                    'err- rule "{}" was not selected, so there are no results.'
132
                    .format(rule_id))
133 1
        for rule in self.used_rules:
134 1
            if rule['id_rule'] == rule_id:
135 1
                return rule['id_def']
136 1
        raise ValueError('err- 404 rule not found!')
137
138 1
    def get_rule_dict(self, rule_id):
139 1
        return self.parse_data_to_dict(rule_id)
140
141 1
    def xml_dict_of_rule_to_node(self, rule):
142 1
        dict_of_definition = rule['definition']
143 1
        return OvalNode(
144
            rule['rule_id'],
145
            'operator',
146
            'and',
147
            False,
148
            dict_of_definition['comment'],
149
            [self._xml_dict_to_node(dict_of_definition)],
150
        )
151
152 1
    def get_oval_tree(self, rule_id=None):
153 1
        return self.xml_dict_of_rule_to_node(self.parse_data_to_dict(rule_id))
154
155 1
    def build_graph(self, tree_data):
156 1
        graph = dict(
157
            id=tree_data.get('definition_id'),
158
            node=[],
159
        )
160 1
        for tree in tree_data:
161 1
            negate_status = False
162 1
            if 'negate' in tree:
163
                negate_status = self._str_to_bool(tree.get('negate'))
164 1
            graph['negate'] = negate_status
165 1
            graph['node'].append(self._build_node(tree))
166 1
        return graph
167
168 1
    def _str_to_bool(self, s):
169 1
        if s == 'true':
170 1
            return True
171 1
        elif s == 'false':
172 1
            return False
173
        else:
174 1
            raise ValueError('err- negation is not bool')
175
176 1
    def _build_node(self, tree):
177 1
        negate_status = False
178 1
        if tree.get('negate') is not None:
179 1
            negate_status = self._str_to_bool(tree.get('negate'))
180
181 1
        node = dict(
182
            operator=tree.get('operator'),
183
            negate=negate_status,
184
            result=tree.get('result'),
185
            comment=None,
186
            node=[],
187
        )
188 1
        for child in tree:
189 1
            if child.get('operator') is not None:
190 1
                node['node'].append(self._build_node(child))
191
            else:
192 1
                negate_status = False
193 1
                if child.get('negate') is not None:
194 1
                    negate_status = self._str_to_bool(child.get('negate'))
195
196 1
                if child.get('definition_ref') is not None:
197 1
                    node['node'].append(
198
                        dict(
199
                            extend_definition=child.get('definition_ref'),
200
                            result=child.get('result'),
201
                            negate=negate_status,
202
                            comment=None,
203
                        ))
204
                else:
205 1
                    node['node'].append(
206
                        dict(
207
                            value_id=child.get('test_ref'),
208
                            value=child.get('result'),
209
                            negate=negate_status,
210
                            comment=None,
211
                        ))
212 1
        return node
213
214 1
    def _fill_extend_definition(self, scan):
215 1
        out = dict(definitions=[])
216 1
        for definition in scan['definitions']:
217 1
            nodes = []
218 1
            for value in definition['node']:
219 1
                nodes.append(self._operator_as_child(value, scan))
220 1
            out['definitions'].append(
221
                dict(
222
                    id=definition['id'],
223
                    comment=definition['comment'],
224
                    node=nodes,
225
                ))
226 1
        return out
227
228 1
    def _operator_as_child(self, value, scan):
229 1
        out = dict(
230
            operator=value['operator'],
231
            negate=value['negate'],
232
            result=value['result'],
233
            comment=value['comment'],
234
            node=[],
235
        )
236 1
        for child in value['node']:
237 1
            if 'operator' in child:
238 1
                out['node'].append(self._operator_as_child(child, scan))
239 1
            elif 'extend_definition' in child:
240 1
                out['node'].append(
241
                    self._find_definition_by_id(
242
                        scan,
243
                        child['extend_definition'],
244
                        child['negate'],
245
                        child['comment'],
246
                    ))
247 1
            elif 'value_id' in child:
248 1
                out['node'].append(child)
249
            else:
250
                raise ValueError('error - unknown child')
251 1
        return out
252
253 1
    def _find_definition_by_id(self, scan, id, negate_status, comment):
254 1
        for definition in scan['definitions']:
255 1
            if definition['id'] == id:
256 1
                definition['node'][0]['negate'] = negate_status
257 1
                definition['node'][0]['comment'] = comment
258 1
                return self._operator_as_child(definition['node'][0], scan)
259
260 1
    def create_dict_form_criteria(self, criteria):
261 1
        comments = dict(
262
            operator='AND' if criteria.get('operator') is None else criteria.get('operator'),
263
            comment=criteria.get('comment'),
264
            node=[],
265
        )
266 1
        for criterion in criteria:
267 1
            if criterion.get('operator'):
268 1
                comments['node'].append(
269
                    self.create_dict_form_criteria(criterion))
270
            else:
271 1
                if criterion.get('definition_ref'):
272 1
                    comments['node'].append(
273
                        dict(
274
                            extend_definition=criterion.get('definition_ref'),
275
                            comment=criterion.get('comment'),
276
                        ))
277
                else:
278 1
                    comments['node'].append(
279
                        dict(
280
                            value_id=criterion.get('test_ref'),
281
                            comment=criterion.get('comment'),
282
                        ))
283 1
        return comments
284
285 1
    def _prepare_definition_comments(self):
286 1
        oval_definitions = self.root.find(
287
            './/arf:report-requests/arf:report-request/'
288
            'arf:content/scap:data-stream-collection/'
289
            'scap:component/oval-definitions:oval_definitions/'
290
            'oval-definitions:definitions', ns)
291 1
        definitions = []
292 1
        for definition in oval_definitions:
293 1
            comment_definition = dict(
294
                id=definition.get('id'), comment=None, node=[])
295 1
            title = definition.find(
296
                './/oval-definitions:metadata/oval-definitions:title', ns)
297 1
            comment_definition['comment'] = title.text
298 1
            criteria = definition.find('.//oval-definitions:criteria', ns)
299 1
            comment_definition['node'].append(
300
                self.create_dict_form_criteria(criteria))
301 1
            definitions.append(comment_definition)
302 1
        return definitions
303
304 1
    def recursive_help_fill_comments(self, comments, nodes):
305 1
        out = nodes
306 1
        out['comment'] = comments['comment']
307 1
        for node, comment in zip(out['node'], comments['node']):
308 1
            node['comment'] = comment['comment']
309 1
            if 'operator' in node:
310 1
                self.recursive_help_fill_comments(comment, node)
311
312 1
    def fill_comment(self, comment_definition, data_definition):
313 1
        comments = comment_definition['node'][0]
314 1
        nodes = data_definition['node'][0]
315 1
        data_definition['comment'] = comment_definition['comment']
316 1
        self.recursive_help_fill_comments(comments, nodes)
317
318 1
    def insert_comments(self, data):
319 1
        comment_definitions = self._prepare_definition_comments()
320 1
        for data_definition in data['definitions']:
321 1
            for comment_definition in comment_definitions:
322 1
                if comment_definition['id'] == data_definition['id']:
323
                    self.fill_comment(comment_definition, data_definition)
324