Completed
Pull Request — master (#6402)
by Matěj
103:34 queued 101:25
created

ssg.build_renumber.drop_oval_definitions()   A

Complexity

Conditions 2

Size

Total Lines 6
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
cc 2
eloc 6
nop 4
dl 0
loc 6
ccs 0
cts 6
cp 0
crap 6
rs 10
c 0
b 0
f 0
1
from __future__ import absolute_import
2
from __future__ import print_function
3
import sys
4
import collections
5
6
7
from .constants import oval_namespace, XCCDF11_NS, cce_uri, ocil_cs, ocil_namespace
8
from .constants import OVAL_TO_XCCDF_DATATYPE_CONSTRAINTS
9
from .parse_oval import resolve_definition, find_extending_defs, get_container_groups
10
from .xml import parse_file, map_elements_to_their_ids
11
12
13
from .checks import get_content_ref_if_exists_and_not_remote, is_cce_value_valid, is_cce_format_valid
14
from .utils import SSGError
15
from .xml import ElementTree as ET
16
oval_ns = oval_namespace
17
oval_cs = oval_namespace
18
19
20
class FileLinker(object):
21
    """
22
    Bass class which represents the linking of checks to their identifiers.
23
    """
24
25
    CHECK_SYSTEM = None
26
    CHECK_NAMESPACE = None
27
28
    def __init__(self, translator, xccdftree, checks):
29
        self.translator = translator
30
        self.checks_related_to_us = self._get_related_checks(checks)
31
        self.fname = self._get_input_fname()
32
        self.tree = None
33
        self.linked_fname = self.fname.replace("unlinked", "linked")
34
        self.xccdftree = xccdftree
35
36
    def _get_related_checks(self, checks):
37
        """
38
        Returns a list of checks which have the same check system as this
39
        class.
40
        """
41
        return [ch for ch in checks if ch.get("system") == self.CHECK_SYSTEM]
42
43
    def _get_fnames_from_related_checks(self):
44
        """
45
        Returns a list of filenames from non-remote check content href
46
        attributes.
47
        """
48
        checkfiles = set()
49
        for check in self.checks_related_to_us:
50
            # Include the file in the particular check system only if it's NOT
51
            # a remotely located file (to allow OVAL checks to reference http://
52
            # and https:// formatted URLs)
53
            checkcontentref = get_content_ref_if_exists_and_not_remote(check)
54
            if checkcontentref is not None:
55
                checkfiles.add(checkcontentref.get("href"))
56
        return checkfiles
57
58
    def _get_input_fname(self):
59
        """
60
        Returns the input filename referenced from the related check.
61
62
        Raises SSGError if there are more than one filenames related to
63
        this check system.
64
        """
65
        fnames = self._get_fnames_from_related_checks()
66
        if len(fnames) > 1:
67
            msg = ("referencing more than one file per check system "
68
                   "is not yet supported by this script.")
69
            raise SSGError(msg)
70
        return fnames.pop() if fnames else None
71
72
    def save_linked_tree(self):
73
        """
74
        Write internal tree to the file in self.linked_fname.
75
        """
76
        assert self.tree is not None, \
77
            "There is no tree to save, you have probably skipped the linking phase"
78
        ET.ElementTree(self.tree).write(self.linked_fname)
79
80
    def _get_checkid_string(self):
81
        raise NotImplementedError()
82
83
    def add_missing_check_exports(self, check, checkcontentref):
84
        pass
85
86
    def link_xccdf(self):
87
        for check in self.checks_related_to_us:
88
            checkcontentref = get_content_ref_if_exists_and_not_remote(check)
89
            if checkcontentref is None:
90
                continue
91
92
            self.add_missing_check_exports(check, checkcontentref)
93
94
            checkexports = check.findall("./{%s}check-export" % XCCDF11_NS)
95
96
            self._link_xccdf_checkcontentref(checkcontentref, checkexports)
97
98
    def _link_xccdf_checkcontentref(self, checkcontentref, checkexports):
99
        checkid = self.translator.generate_id(
100
            self._get_checkid_string(), checkcontentref.get("name"))
101
        checkcontentref.set("name", checkid)
102
        checkcontentref.set("href", self.linked_fname)
103
104
        variable_str = "{%s}variable" % self.CHECK_NAMESPACE
105
        for checkexport in checkexports:
106
            newexportname = self.translator.generate_id(
107
                variable_str, checkexport.get("export-name"))
108
            checkexport.set("export-name", newexportname)
109
110
111
class OVALFileLinker(FileLinker):
112
    CHECK_SYSTEM = oval_cs
113
    CHECK_NAMESPACE = oval_ns
114
115
    def __init__(self, translator, xccdftree, checks):
116
        super(OVALFileLinker, self).__init__(translator, xccdftree, checks)
117
        self.oval_groups = None
118
119
    def _get_checkid_string(self):
120
        return "{%s}definition" % self.CHECK_NAMESPACE
121
122
    def link(self):
123
        self.oval_groups = get_container_groups(self.fname)
124
        self.tree = parse_file(self.fname)
125
        try:
126
            self._link_oval_tree()
127
128
            # Verify if CCE identifiers present in the XCCDF follow the required form
129
            # (either CCE-XXXX-X, or CCE-XXXXX-X). Drop from XCCDF those who don't follow it
130
            verify_correct_form_of_referenced_cce_identifiers(self.xccdftree)
131
        except SSGError as exc:
132
            raise SSGError(
133
                "Error processing {0}: {1}"
134
                .format(self.fname, str(exc)))
135
        self.tree = self.translator.translate(self.tree, store_defname=True)
136
137
    def _link_oval_tree(self):
138
        xccdf_to_cce_id_mapping = create_xccdf_id_to_cce_id_mapping(self.xccdftree)
139
140
        indexed_oval_defs = map_elements_to_their_ids(
141
            self.tree, ".//{0}".format(self._get_checkid_string()))
142
143
        defs_miss = get_oval_checks_extending_non_existing_checks(self.tree, indexed_oval_defs)
144
        if defs_miss:
145
            msg = ["Following extending definitions are missing:"]
146
            for missing, broken in transpose_dict_with_sets(defs_miss).items():
147
                broken = [b.get("id") for b in broken]
148
                msg.append("\t'{missing}' needed by: {broken}"
149
                           .format(missing=missing, broken=broken))
150
            raise RuntimeError("\n".join(msg))
151
152
        self._add_cce_id_refs_to_oval_checks(xccdf_to_cce_id_mapping)
153
154
        # Verify all by XCCDF referenced (local) OVAL checks are defined in OVAL file
155
        # If not drop the <check-content> OVAL checksystem reference from XCCDF
156
        self._ensure_by_xccdf_referenced_oval_def_is_defined_in_oval_file(
157
            indexed_oval_defs)
158
159
        check_and_correct_xccdf_to_oval_data_export_matching_constraints(self.xccdftree, self.tree)
160
161
    def _add_cce_id_refs_to_oval_checks(self, idmappingdict):
162
        """
163
        For each XCCDF rule ID having <ident> CCE set and
164
        having OVAL check implemented (remote OVAL isn't sufficient!)
165
        add a new <reference> element into the OVAL definition having the
166
        following form:
167
168
        <reference source="CCE" ref_id="CCE-ID" />
169
170
        where "CCE-ID" is the CCE identifier for that particular rule
171
        retrieved from the XCCDF file
172
        """
173
        ovalrules = self.tree.findall(".//{0}".format(self._get_checkid_string()))
174
        for rule in ovalrules:
175
            ovalid = rule.get("id")
176
            assert ovalid is not None, \
177
                "An OVAL rule doesn't have an ID"
178
179
            if ovalid not in idmappingdict:
180
                continue
181
182
            ovaldesc = rule.find(".//{%s}description" % self.CHECK_NAMESPACE)
183
            assert ovaldesc is not None, \
184
                "OVAL rule '{0}' doesn't have a description, which is mandatory".format(ovalid)
185
186
            xccdfcceid = idmappingdict[ovalid]
187
            if is_cce_format_valid(xccdfcceid) and is_cce_value_valid(xccdfcceid):
188
                # Then append the <reference source="CCE" ref_id="CCE-ID" /> element right
189
                # after <description> element of specific OVAL check
190
                ccerefelem = ET.Element('{%s}reference' % self.CHECK_NAMESPACE, ref_id=xccdfcceid,
191
                                        source="CCE")
192
                metadata = rule.find(".//{%s}metadata" % self.CHECK_NAMESPACE)
193
                metadata.append(ccerefelem)
194
195
    def get_nested_definitions(self, oval_def_id):
196
        processed_def_ids = set()
197
        queue = set([oval_def_id])
198
        while queue:
199
            def_id = queue.pop()
200
            processed_def_ids.add(def_id)
201
            definition_tree = self.oval_groups["definitions"].get(def_id)
202
            if definition_tree is None:
203
                print("WARNING: Definition '%s' was not found, can't figure "
204
                      "out what depends on it." % (def_id), file=sys.stderr)
205
                continue
206
            extensions = find_extending_defs(self.oval_groups, definition_tree)
207
            if not extensions:
208
                continue
209
            queue |= extensions - processed_def_ids
210
        return processed_def_ids
211
212
    def add_missing_check_exports(self, check, checkcontentref):
213
        check_name = checkcontentref.get("name")
214
        if check_name is None:
215
            return
216
        oval_def = self.oval_groups["definitions"].get(check_name)
217
        if oval_def is None:
218
            return
219
        all_vars = set()
220
        for def_id in self.get_nested_definitions(check_name):
221
            extended_def = self.oval_groups["definitions"].get(def_id)
222
            if extended_def is None:
223
                print("WARNING: Definition '%s' was not found, can't figure "
224
                      "out which variables it needs." % (def_id), file=sys.stderr)
225
                continue
226
            all_vars |= resolve_definition(self.oval_groups, extended_def)
227
        for varname in all_vars:
228
            export = ET.Element("{%s}check-export" % XCCDF11_NS)
229
            export.attrib["export-name"] = varname
230
            export.attrib["value-id"] = varname
231
            check.insert(0, export)
232
233
    def _ensure_by_xccdf_referenced_oval_def_is_defined_in_oval_file(
234
            self, indexed_oval_defs):
235
        # Ensure all OVAL checks referenced by XCCDF are implemented in OVAL file
236
        # Drop the reference from XCCDF to OVAL definition if:
237
        # * Particular OVAL definition isn't present in OVAL file,
238
        # * That OVAL definition doesn't constitute a remote OVAL
239
        #   (@href of <check-content-ref> doesn't start with 'http'
240
241
        for xccdfid, rule in rules_with_ids_generator(self.xccdftree):
242
            # Search OVAL ID in OVAL document
243
            ovalid = indexed_oval_defs.get(xccdfid)
244
            if ovalid is not None:
245
                # The OVAL check was found, we can continue
246
                continue
247
248
            for check in rule.findall(".//{%s}check" % (XCCDF11_NS)):
249
                if check.get("system") != oval_cs:
250
                    continue
251
252
                if get_content_ref_if_exists_and_not_remote(check) is None:
253
                    continue
254
255
                # For local OVAL drop the reference to OVAL definition from XCCDF document
256
                # in the case:
257
                # * OVAL definition is referenced from XCCDF file,
258
                # * But not defined in OVAL file
259
                rule.remove(check)
260
261
262
class OCILFileLinker(FileLinker):
263
    CHECK_SYSTEM = ocil_cs
264
    CHECK_NAMESPACE = ocil_namespace
265
266
    def _get_checkid_string(self):
267
        return "{%s}questionnaire" % self.CHECK_NAMESPACE
268
269
    def link(self):
270
        self.tree = parse_file(self.fname)
271
        self.tree = self.translator.translate(self.tree, store_defname=True)
272
273
274
def _find_identcce(rule):
275
    for ident in rule.findall("./{%s}ident" % XCCDF11_NS):
276
        if ident.get("system") == cce_uri:
277
            return ident
278
    return None
279
280
281
def rules_with_ids_generator(xccdftree):
282
    xccdfrules = xccdftree.findall(".//{%s}Rule" % XCCDF11_NS)
283
    for rule in xccdfrules:
284
        xccdfid = rule.get("id")
285
        if xccdfid is None:
286
            continue
287
        yield xccdfid, rule
288
289
290
def create_xccdf_id_to_cce_id_mapping(xccdftree):
291
    #
292
    # Create dictionary having form of
293
    #
294
    # 'XCCDF ID' : 'CCE ID'
295
    #
296
    # for each XCCDF rule having <ident system='http://cce.mitre.org'>CCE-ID</ident>
297
    # element set in the XCCDF document
298
    xccdftocce_idmapping = {}
299
300
    for xccdfid, rule in rules_with_ids_generator(xccdftree):
301
        identcce = _find_identcce(rule)
302
        if identcce is None:
303
            continue
304
305
        xccdftocce_idmapping[xccdfid] = identcce.text
306
307
    return xccdftocce_idmapping
308
309
310
def get_nonexisting_check_definition_extends(definition, indexed_oval_defs):
311
    # TODO: handle multiple levels of referrals.
312
    # OVAL checks that go beyond one level of extend_definition won't be properly identified
313
    for extdefinition in definition.findall(".//{%s}extend_definition" % oval_ns):
314
        # Verify each extend_definition in the definition
315
        extdefinitionref = extdefinition.get("definition_ref")
316
317
        # Search the OVAL tree for a definition with the referred ID
318
        referreddefinition = indexed_oval_defs.get(extdefinitionref)
319
320
        if referreddefinition is None:
321
            # There is no oval satisfying the extend_definition referal
322
            return extdefinitionref
323
    return None
324
325
326
def get_oval_checks_extending_non_existing_checks(ovaltree, indexed_oval_defs):
327
    # Incomplete OVAL checks are as useful as non existing checks
328
    # Here we check if all extend_definition refs from a definition exists in local OVAL file
329
    definitions = ovaltree.find(".//{%s}definitions" % oval_ns)
330
    definitions_misses = collections.defaultdict(set)
331
    for definition in definitions:
332
        nonexisting_ref = get_nonexisting_check_definition_extends(definition, indexed_oval_defs)
333
        if nonexisting_ref is not None:
334
            definitions_misses[definition].add(nonexisting_ref)
335
336
    return definitions_misses
337
338
339
def transpose_dict_with_sets(dict_in):
340
    """
341
    Given a mapping X: key -> set of values, produce a mapping Y of the same type, where
342
        for every combination of a, b for which a in X[b], the following holds: b in Y[a].
343
    """
344
    result = collections.defaultdict(set)
345
    for key, values in dict_in.items():
346
        for val in values:
347
            result[val].add(key)
348
    return result
349
350
351
def drop_oval_definitions(ovaltree, defstoremove, oval_groups, indexed_oval_defs):
352
    definitions = ovaltree.find(".//{%s}definitions" % oval_ns)
353
    for definition in defstoremove:
354
        del oval_groups["definitions"][definition.get("id")]
355
        del indexed_oval_defs[definition.get("id")]
356
        definitions.remove(definition)
357
358
359
def check_and_correct_xccdf_to_oval_data_export_matching_constraints(xccdftree, ovaltree):
360
    """
361
    Verify if <xccdf:Value> 'type' to corresponding OVAL variable
362
    'datatype' export matching constraint:
363
364
    http://csrc.nist.gov/publications/nistpubs/800-126-rev2/SP800-126r2.pdf#page=30&zoom=auto,69,313
365
366
    is met. Also correct the 'type' attribute of those <xccdf:Value> elements where necessary
367
    in order the produced content to meet this constraint.
368
369
    To correct the constraint we use simpler approach - prefer to fix
370
    'type' attribute of <xccdf:Value> rather than 'datatype' attribute
371
    of the corresponding OVAL variable since there might be additional
372
    OVAL variables, derived from the affected OVAL variable, and in that
373
    case we would need to fix the 'datatype' attribute in each of them.
374
375
    Define the <xccdf:Value> 'type' to OVAL variable 'datatype' export matching
376
    constraints mapping as specified in Table 16 of XCCDF v1.2 standard:
377
378
    http://csrc.nist.gov/publications/nistpubs/800-126-rev2/SP800-126r2.pdf#page=30&zoom=auto,69,313
379
    """
380
    indexed_xccdf_values = map_elements_to_their_ids(
381
        xccdftree, ".//{%s}Value" % (XCCDF11_NS))
382
383
    # Loop through all <external_variables> in the OVAL document
384
    ovalextvars = ovaltree.findall(".//{%s}external_variable" % oval_ns)
385
    if ovalextvars is None:
386
        return
387
388
    for ovalextvar in ovalextvars:
389
        # Verify the found external variable has both 'id' and 'datatype' set
390
        if 'id' not in ovalextvar.attrib or 'datatype' not in ovalextvar.attrib:
391
            msg = "Invalid OVAL <external_variable> found - either without 'id' or 'datatype'."
392
            raise SSGError(msg)
393
394
        ovalvarid = ovalextvar.get('id')
395
        ovalvartype = ovalextvar.get('datatype')
396
397
        # Locate the corresponding <xccdf:Value> with the same ID in the XCCDF
398
        xccdfvar = indexed_xccdf_values.get(ovalvarid)
399
400
        if xccdfvar is None:
401
            return
402
403
        xccdfvartype = xccdfvar.get('type')
404
        # Verify the found value has 'type' attribute set
405
        if xccdfvartype is None:
406
            msg = (
407
                "Invalid XCCDF variable '{0}': Missing the 'type' attribute."
408
                .format(xccdfvar.attrib("id")))
409
            raise SSGError(msg)
410
411
        # This is the required XCCDF 'type' for <xccdf:Value> derived
412
        # from OVAL variable 'datatype' and mapping above
413
        reqxccdftype = OVAL_TO_XCCDF_DATATYPE_CONSTRAINTS[ovalvartype]
414
        # Compare the actual value of 'type' of <xccdf:Value> with the requirement
415
        if xccdfvartype != reqxccdftype:
416
            # If discrepancy is found, issue a warning
417
            sys.stderr.write(
418
                "Warning: XCCDF 'type' of \"%s\" value does "
419
                "not meet the XCCDF value 'type' to OVAL "
420
                "variable 'datatype' export matching "
421
                "constraint! Got: \"%s\", Expected: \"%s\". "
422
                "Resetting it! Set 'type' of \"%s\" "
423
                "<xccdf:value> to '%s' directly in the XCCDF "
424
                "content to dismiss this warning!\n" %
425
                (ovalvarid, xccdfvartype, reqxccdftype,
426
                 ovalvarid, reqxccdftype)
427
            )
428
            # And reset the 'type' attribute of such a <xccdf:Value> to the required type
429
            xccdfvar.attrib['type'] = reqxccdftype
430
431
432
def verify_correct_form_of_referenced_cce_identifiers(xccdftree):
433
    """
434
    In SSG benchmarks, the CCEs till unassigned have the form of e.g. "RHEL7-CCE-TBD"
435
    (or any other format possibly not matching the above two requirements)
436
437
    If this is the case for specific SSG product, drop such CCE identifiers from the XCCDF
438
    since they are in invalid format!
439
    """
440
    xccdfrules = xccdftree.findall(".//{%s}Rule" % XCCDF11_NS)
441
    for rule in xccdfrules:
442
        identcce = _find_identcce(rule)
443
        if identcce is not None:
444
            cceid = identcce.text
445
            if not is_cce_format_valid(cceid):
446
                print("Warning: CCE '{0}' is invalid for rule '{1}'. Removing CCE..."
447
                      .format(cceid, rule.get("id"), file=sys.stderr))
448
                rule.remove(identcce)
449
                sys.exit(1)
450
451
452
def assert_that_check_ids_match_rule_id(checks, xccdf_rule):
453
    for check in checks:
454
        check_name = check.get("name")
455
        # Verify match of XCCDF vs OVAL / OCIL IDs for
456
        # * the case of OVAL <check>
457
        # * the case of OCIL <check>
458
        if (xccdf_rule != check_name and check_name is not None and
459
                xccdf_rule + '_ocil' != check_name and
460
                xccdf_rule != 'sample_rule'):
461
            msg_lines = ["The OVAL / OCIL ID does not match the XCCDF Rule ID!"]
462
            if '_ocil' in check_name:
463
                id_name = "OCIL ID"
464
            else:
465
                id_name = "OVAL ID"
466
            msg_lines.append(" {0:>14}: {1}".format(id_name, check_name))
467
            msg_lines.append(" {0:>14}: {1}".format("XCCDF Rule ID", xccdf_rule))
468
            raise SSGError("\n".join(msg_lines))
469
470
471
def check_that_oval_and_rule_id_match(xccdftree):
472
    for xccdfid, rule in rules_with_ids_generator(xccdftree):
473
        checks = rule.find("./{%s}check" % XCCDF11_NS)
474
        if checks is None:
475
            print("Rule {0} doesn't have checks."
476
                  .format(xccdfid), file=sys.stderr)
477
            continue
478
479
        assert_that_check_ids_match_rule_id(checks, xccdfid)
480