ssg.xml.XMLCPEPlatform.__init__()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 2
dl 0
loc 2
ccs 0
cts 2
cp 0
crap 2
rs 10
c 0
b 0
f 0
1
from __future__ import absolute_import
2
from __future__ import print_function
3
import collections
4
5
import platform
6
import re
7
import xml.etree.ElementTree as ET
8
9
from .constants import (
10
    xml_version, oval_header, timestamp, PREFIX_TO_NS, XCCDF11_NS, XCCDF12_NS)
11
from .constants import (
12
    datastream_namespace,
13
    oval_namespace,
14
    stig_ns,
15
    cat_namespace,
16
    xlink_namespace,
17
    ocil_namespace,
18
    cpe_language_namespace,
19
)
20
21
22
try:
23
    from xml.etree import cElementTree as ElementTree
24
except ImportError:
25
    from xml.etree import ElementTree as ElementTree
26
27
28
def oval_generated_header(product_name, schema_version, ssg_version):
29
    return xml_version + oval_header + \
30
        """
31
    <generator>
32
        <oval:product_name>%s from SCAP Security Guide</oval:product_name>
33
        <oval:product_version>ssg: %s, python: %s</oval:product_version>
34
        <oval:schema_version>%s</oval:schema_version>
35
        <oval:timestamp>%s</oval:timestamp>
36
    </generator>""" % (product_name, ssg_version, platform.python_version(),
37
                       schema_version, timestamp)
38
39
40
def register_namespaces(ns=None):
41
    """
42
    Register all possible namespaces
43
    """
44
    try:
45
        if ns is None:
46
            ns = PREFIX_TO_NS
47
        for prefix, uri in ns.items():
48
            ElementTree.register_namespace(prefix, uri)
49
    except Exception:
50
        # Probably an old version of Python
51
        # Doesn't matter, as this is non-essential.
52
        pass
53
54
55
def get_namespaces_from(file):
56
    """
57
    Return dictionary of namespaces in file. Return empty dictionary in case of error.
58
    """
59
    result = {}
60
    try:
61
        result = {
62
            key: value
63
            for _, (key, value) in ElementTree.iterparse(file, events=["start-ns"])
64
        }
65
    except Exception:
66
        # Probably an old version of Python
67
        # Doesn't matter, as this is non-essential.
68
        pass
69
    finally:
70
        return result
71
72
73
def open_xml(filename):
74
    """
75
    Given a filename, register all possible namespaces, and return the XML tree.
76
    """
77
    register_namespaces()
78
    return ElementTree.parse(filename)
79
80
81
def parse_file(filename):
82
    """
83
    Given a filename, return the root of the ElementTree
84
    """
85
    tree = open_xml(filename)
86
    return tree.getroot()
87
88
89
def map_elements_to_their_ids(tree, xpath_expr):
90
    """
91
    Given an ElementTree and an XPath expression,
92
    iterate through matching elements and create 1:1 id->element mapping.
93
94
    Raises AssertionError if a matching element doesn't have the ``id``
95
    attribute.
96
97
    Returns mapping as a dictionary
98
    """
99
    aggregated = {}
100
    for element in tree.findall(xpath_expr):
101
        element_id = element.get("id")
102
        assert element_id is not None
103
        aggregated[element_id] = element
104
    return aggregated
105
106
107
SSG_XHTML_TAGS = [
108
    'table', 'tr', 'th', 'td', 'ul', 'li', 'ol',
109
    'p', 'code', 'strong', 'b', 'em', 'i', 'pre', 'br', 'hr', 'small',
110
]
111
112
113
def add_xhtml_namespace(data):
114
    """
115
    Given a xml blob, adds the xhtml namespace to all relevant tags.
116
    """
117
    # The use of lambda in the lines below is a workaround for https://bugs.python.org/issue1519638
118
    # I decided for this approach to avoid adding workarounds in the matching regex, this way only
119
    # the substituted part contains the workaround.
120
    # Transform <tt> in <code>
121
    data = re.sub(r'<(\/)?tt(\/)?>',
122
                  lambda m: r'<' + (m.group(1) or '') + 'code' + (m.group(2) or '') + '>', data)
123
    # Adds xhtml prefix to elements: <tag>, </tag>, <tag/>
124
    return re.sub(r'<(\/)?((?:%s).*?)(\/)?>' % "|".join(SSG_XHTML_TAGS),
125
                  lambda m: r'<' + (m.group(1) or '') + 'xhtml:' +
126
                  (m.group(2) or '') + (m.group(3) or '') + '>',
127
                  data)
128
129
130
def determine_xccdf_tree_namespace(tree):
131
    root = tree.getroot()
132
    if root.tag == "{%s}Benchmark" % XCCDF11_NS:
133
        xccdf_ns = XCCDF11_NS
134
    elif root.tag == "{%s}Benchmark" % XCCDF12_NS:
135
        xccdf_ns = XCCDF12_NS
136
    else:
137
        raise ValueError("Unknown root element '%s'" % root.tag)
138
    return xccdf_ns
139
140
141
def get_element_tag_without_ns(xml_tag):
142
    return re.search(r'^{.*}(.*)', xml_tag).group(1)
143
144
145
def get_element_namespace(self):
146
    return re.search(r'^{(.*)}.*', self.root.tag).group(1)
147
148
149
class XMLElement(object):
150
    '''
151
    Represents an generic element read from an XML file.
152
    '''
153
    ns = {
154
        "ds": datastream_namespace,
155
        "xccdf-1.1": XCCDF11_NS,
156
        "xccdf-1.2": XCCDF12_NS,
157
        "oval": oval_namespace,
158
        "catalog": cat_namespace,
159
        "xlink": xlink_namespace,
160
        "ocil": ocil_namespace,
161
        "cpe-lang": cpe_language_namespace,
162
    }
163
164
    def __init__(self, root):
165
        self.root = root
166
        self._determine_xccdf_version()
167
168
    def get_attr(self, attr):
169
        return self.root.get(attr)
170
171
    def get_namespace(self):
172
        return re.search(r'^{(.*)}.*', self.root.tag).group(1)
173
174
    def _determine_xccdf_version(self):
175
        if self.get_namespace() == self.ns["xccdf-1.1"]:
176
            self.content_xccdf_ns = "xccdf-1.1"
177
        else:
178
            self.content_xccdf_ns = "xccdf-1.2"
179
180
181
class XMLContent(XMLElement):
182
    '''
183
    Can represent a Data Stream or an XCCDF Benchmark read from an XML file.
184
    '''
185
186
    check_engines = [("OVAL", "oval:oval_definitions"), ("OCIL", "ocil:ocil")]
187
188
    def __init__(self, root):
189
        super(XMLContent, self).__init__(root)
190
        self.component_refs = self.get_component_refs()
191
        self.uris = self.get_uris()
192
        self.components = self._find_all_component_contents()
193
194
    def get_component_refs(self):
195
        component_refs = dict()
196
        for ds in self.root.findall("ds:data-stream", self.ns):
197
            checks = ds.find("ds:checks", self.ns)
198
            for component_ref in checks.findall("ds:component-ref", self.ns):
199
                component_ref_href = component_ref.get("{%s}href" % (self.ns["xlink"]))
200
                component_ref_id = component_ref.get("id")
201
                component_refs[component_ref_href] = component_ref_id
202
        return component_refs
203
204
    def get_uris(self):
205
        uris = dict()
206
        for ds in self.root.findall("ds:data-stream", self.ns):
207
            checklists = ds.find("ds:checklists", self.ns)
208
            catalog = checklists.find(".//catalog:catalog", self.ns)
209
            for uri in catalog.findall("catalog:uri", self.ns):
210
                uri_uri = uri.get("uri")
211
                uri_name = uri.get("name")
212
                uris[uri_uri] = uri_name
213
        return uris
214
215
    def is_benchmark(self):
216
        if self.root.tag == "{%s}Benchmark" % (self.ns["xccdf-1.2"]):
217
            return True
218
        elif self.root.tag == "{%s}Benchmark" % (self.ns["xccdf-1.1"]):
219
            self.content_xccdf_ns = "xccdf-1.1"
220
            return True
221
222
    def get_benchmarks(self):
223
        ds_components = self.root.findall("ds:component", self.ns)
224
        if not ds_components:
225
            # The content is not a DS, maybe it is just an XCCDF Benchmark
226
            if self.is_benchmark():
227
                yield XMLBenchmark(self.root)
228
        for component in ds_components:
229
            for benchmark in component.findall("%s:Benchmark" % self.content_xccdf_ns, self.ns):
230
                yield XMLBenchmark(benchmark)
231
232
    def find_benchmark(self, id_):
233
        ds_components = self.root.findall("ds:component", self.ns)
234
        if not ds_components:
235
            # The content is not a DS, maybe it is just an XCCDF Benchmark
236
            if self.is_benchmark():
237
                return XMLBenchmark(self.root)
238
        for component in ds_components:
239
            benchmark = component.find("%s:Benchmark[@id='%s']"
240
                                       % (self.content_xccdf_ns, id_), self.ns)
241
            if benchmark is not None:
242
                return XMLBenchmark(benchmark)
243
        return None
244
245
    def _find_all_component_contents(self):
246
        component_doc_dict = collections.defaultdict(dict)
247
        for component in self.root.findall("ds:component", self.ns):
248
            for check_id, check_tag in self.check_engines:
249
                def_doc = component.find(check_tag, self.ns)
250
                if def_doc is None:
251
                    continue
252
                comp_id = component.get("id")
253
                comp_href = "#" + comp_id
254
                try:
255
                    filename = self.uris["#" + self.component_refs[comp_href]]
256
                except KeyError:
257
                    continue
258
                xml_component = XMLComponent(def_doc)
259
                component_doc_dict[check_id][filename] = xml_component
260
        return component_doc_dict
261
262
263
class XMLBenchmark(XMLElement):
264
    '''
265
    Represents an XCCDF Benchmark read from an XML file.
266
    '''
267
268
    def __init__(self, root):
269
        super(XMLBenchmark, self).__init__(root)
270
        self.root = root
271
272
    def find_rules(self, rule_id):
273
        if rule_id:
274
            rules = [XMLRule(r) for r in self.root.iterfind(
275
                ".//%s:Rule[@id='%s']" % (self.content_xccdf_ns, rule_id), self.ns)]
276
            if len(rules) == 0:
277
                raise ValueError("Can't find rule %s" % (rule_id))
278
        else:
279
            rules = [XMLRule(r) for r in self.root.iterfind(
280
                ".//%s:Rule" % (self.content_xccdf_ns), self.ns)]
281
        return rules
282
283
    def find_rule(self, rule_id):
284
        rule = self.root.find(
285
                ".//%s:Rule[@id='%s']" % (self.content_xccdf_ns, rule_id), self.ns)
286
        return XMLRule(rule) if rule else None
287
288
    def find_all_cpe_platforms(self, idref):
289
        cpes = [XMLCPEPlatform(p) for p in self.root.iterfind(
290
            ".//cpe-lang:platform[@id='{0}']".format(idref.replace("#", "")), self.ns)]
291
        return cpes
292
293
294
class XMLRule(XMLElement):
295
    '''
296
    Represents an XCCDF Rule read from an XML file.
297
    '''
298
299
    def __init__(self, root):
300
        super(XMLRule, self).__init__(root)
301
        self.root = root
302
303
    def get_check_element(self, check_system_uri):
304
        return self.root.find(
305
            "%s:check[@system='%s']" % (self.content_xccdf_ns, check_system_uri), self.ns)
306
307
    def get_check_content_ref_element(self, check_element):
308
        return check_element.find(
309
            "%s:check-content-ref" % (self.content_xccdf_ns), self.ns)
310
311
    def get_fix_element(self, fix_uri):
312
        return self.root.find("%s:fix[@system='%s']" % (self.content_xccdf_ns, fix_uri), self.ns)
313
314
    def get_version_element(self):
315
        return self.root.find("%s:version" % (self.content_xccdf_ns), self.ns)
316
317
    def get_all_platform_elements(self):
318
        return self.root.findall(".//%s:platform" % (self.content_xccdf_ns), self.ns)
319
320
    def _get_description_text(self, el):
321
        desc_text = el.text if el.text else ""
322
        # If a 'sub' element is found, lets replace it with the id of the variable it references
323
        if get_element_tag_without_ns(el.tag) == "sub":
324
            desc_text += "'%s'" % el.attrib['idref']
325
        for desc_el in el:
326
            desc_text += self._get_description_text(desc_el)
327
        desc_text += el.tail if el.tail else ""
328
        return desc_text
329
330
    def get_element_text(self, el):
331
        el_tag = get_element_tag_without_ns(el.tag)
332
        if el_tag == "description":
333
            temp_text = self._get_description_text(el)
334
        else:
335
            temp_text = "".join(el.itertext())
336
        return temp_text
337
338
    def join_text_elements(self):
339
        """
340
        This function collects the text of almost all subelements.
341
        Similar to what itertext() would do, except that this function skips some elements that
342
        are not relevant for comparison.
343
344
        This function also injects a line for each element whose text was collected, to
345
        facilitate tracking of where in the rule the text came from.
346
        """
347
        text = ""
348
        for el in self.root:
349
            el_tag = get_element_tag_without_ns(el.tag)
350
            if el_tag == "fix":
351
                # We ignore the fix element because it has its own dedicated differ
352
                continue
353
            if el_tag == "reference" and el.get("href" == stig_ns):
354
                # We ignore references to DISA Benchmark Rules,
355
                # they have a format of SV-\d+r\d+_rule
356
                # and can change for non-text related changes
357
                continue
358
            el_text = self.get_element_text(el).strip()
359
            if el_text:
360
                text += "\n[%s]:\n" % el_tag
361
                text += el_text + "\n"
362
363
        return text
364
365
366
class XMLComponent(XMLElement):
367
    '''
368
    Represents the element of the Data stream component that has relevant content.
369
370
    This make it easier to access contents pertaining to a SCAP component.
371
    '''
372
    def __init__(self, root):
373
        super(XMLComponent, self).__init__(root)
374
375
    def find_oval_definition(self, def_id):
376
        definitions = self.root.find("oval:definitions", self.ns)
377
        definition = definitions.find("oval:definition[@id='%s']" % (def_id), self.ns)
378
        return XMLOvalDefinition(definition)
379
380
    def find_ocil_questionnaire(self, def_id):
381
        questionnaires = self.root.find("ocil:questionnaires", self.ns)
382
        questionnaire = questionnaires.find(
383
            "ocil:questionnaire[@id='%s']" % def_id, self.ns)
384
        return XMLOcilQuestionnaire(questionnaire)
385
386
    def find_ocil_test_action(self, test_action_ref):
387
        test_actions = self.root.find("ocil:test_actions", self.ns)
388
        test_action = test_actions.find(
389
            "ocil:boolean_question_test_action[@id='%s']" % test_action_ref, self.ns)
390
        return XMLOcilTestAction(test_action)
391
392
    def find_ocil_boolean_question(self, question_id):
393
        questions = self.root.find("ocil:questions", self.ns)
394
        question = questions.find(
395
            "ocil:boolean_question[@id='%s']" % question_id, self.ns)
396
        return XMLOcilQuestion(question)
397
398
    def find_boolean_question(self, ocil_id):
399
        questionnaire = self.find_ocil_questionnaire(ocil_id)
400
        if questionnaire is None:
401
            raise ValueError("OCIL questionnaire %s doesn't exist" % ocil_id)
402
        test_action_ref = questionnaire.get_test_action_ref_element().text
403
        test_action = self.find_ocil_test_action(test_action_ref)
404
        if test_action is None:
405
            raise ValueError(
406
                "OCIL boolean_question_test_action %s doesn't exist" % (
407
                    test_action_ref))
408
        question_id = test_action.get_attr("question_ref")
409
        question = self.find_ocil_boolean_question(question_id)
410
        if question is None:
411
            raise ValueError(
412
                "OCIL boolean_question %s doesn't exist" % question_id)
413
        question_text = question.get_question_test_element()
414
        return question_text.text
415
416
417
class XMLOvalDefinition(XMLComponent):
418
    def __init__(self, root):
419
        super(XMLOvalDefinition, self).__init__(root)
420
421
    def get_criteria_element(self):
422
        return self.root.find("oval:criteria", self.ns)
423
424
    def get_elements(self):
425
        criteria = self.get_criteria_element()
426
        elements = []
427
        for child in criteria.iter():  # iter recurses
428
            el_tag = get_element_tag_without_ns(child.tag)
429
            if el_tag == "criteria":
430
                operator = child.get("operator")
431
                elements.append(("criteria", operator))
432
            elif el_tag == "criterion":
433
                test_id = child.get("test_ref")
434
                elements.append(("criterion", test_id))
435
            elif el_tag == "extend_definition":
436
                extend_def_id = child.get("definition_ref")
437
                elements.append(("extend_definition", extend_def_id))
438
        return elements
439
440
441
class XMLOcilQuestionnaire(XMLComponent):
442
    def __init__(self, root):
443
        super(XMLOcilQuestionnaire, self).__init__(root)
444
445
    def get_test_action_ref_element(self):
446
        return self.root.find(
447
            "ocil:actions/ocil:test_action_ref", self.ns)
448
449
450
class XMLOcilTestAction(XMLComponent):
451
    def __init__(self, root):
452
        super(XMLOcilTestAction, self).__init__(root)
453
454
455
class XMLOcilQuestion(XMLComponent):
456
    def __init__(self, root):
457
        super(XMLOcilQuestion, self).__init__(root)
458
459
    def get_question_test_element(self):
460
        return self.root.find("ocil:question_text", self.ns)
461
462
463
class XMLCPEPlatform(XMLElement):
464
    def __init__(self, root):
465
        super(XMLCPEPlatform, self).__init__(root)
466
467
    def find_all_check_fact_ref_elements(self):
468
        return self.root.findall(".//cpe-lang:check-fact-ref", self.ns)
469