ssg.build_ovals.OVALBuilder._process_oval_file()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 12
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 0
Metric Value
cc 4
eloc 12
nop 4
dl 0
loc 12
ccs 0
cts 11
cp 0
crap 20
rs 9.8
c 0
b 0
f 0
1
from __future__ import absolute_import
2
from __future__ import print_function
3
4
import os
5
import os.path
6
import sys
7
import re
8
from copy import deepcopy
9
import collections
10
11
from .build_yaml import Rule, DocumentationNotComplete
12
from .constants import oval_namespace as oval_ns
13
from .constants import oval_footer
14
from .constants import oval_header
15
from .constants import MULTI_PLATFORM_LIST
16
from .id_translate import IDTranslator
17
from .jinja import process_file_with_macros
18
from .rule_yaml import parse_prodtype
19
from .rules import get_rule_dir_id, get_rule_dir_ovals, find_rule_dirs_in_paths
20
from . import utils, products
21
from .utils import mkdir_p
22
from .xml import ElementTree, oval_generated_header
23
24
25
def _create_subtree(shorthand_tree, category):
26
    parent_tag = "{%s}%ss" % (oval_ns, category)
27
    parent = ElementTree.Element(parent_tag)
28
    for node in shorthand_tree.findall(".//{%s}def-group/*" % oval_ns):
29
        if node.tag is ElementTree.Comment:
30
            continue
31
        elif node.tag.endswith(category):
32
            append(parent, node)
33
    return parent
34
35
36
def expand_shorthand(shorthand_path, oval_path, env_yaml):
37
    shorthand_file_content = process_file_with_macros(shorthand_path, env_yaml)
38
    wrapped_shorthand = (oval_header + shorthand_file_content + oval_footer)
39
    shorthand_tree = ElementTree.fromstring(wrapped_shorthand.encode("utf-8"))
40
    header = oval_generated_header("test", "5.11", "1.0")
41
    skeleton = header + oval_footer
42
    root = ElementTree.fromstring(skeleton.encode("utf-8"))
43
    for category in ["definition", "test", "object", "state", "variable"]:
44
        subtree = _create_subtree(shorthand_tree, category)
45
        if list(subtree):
46
            root.append(subtree)
47
    id_translator = IDTranslator("test")
48
    root_translated = id_translator.translate(root)
49
50
    ElementTree.ElementTree(root_translated).write(oval_path)
51
52
53
def _check_is_applicable_for_product(oval_check_def, product):
54
    """Based on the <platform> specifier of the OVAL check determine if this
55
    OVAL check is applicable for this product. Return 'True' if so, 'False'
56
    otherwise"""
57
58
    product, product_version = utils.parse_name(product)
59
60
    # Define general platforms
61
    multi_platforms = ['<platform>multi_platform_all',
62
                       '<platform>multi_platform_' + product]
63
64
    # First test if OVAL check isn't for 'multi_platform_all' or
65
    # 'multi_platform_' + product
66
    for multi_prod in multi_platforms:
67
        if multi_prod in oval_check_def and product in MULTI_PLATFORM_LIST:
68
            return True
69
70
    # Current SSG checks aren't unified which element of '<platform>'
71
    # and '<product>' to use as OVAL AffectedType metadata element,
72
    # e.g. Chromium content uses both of them across the various checks
73
    # Thus for now check both of them when checking concrete platform / product
74
    affected_type_elements = ['<platform>', '<product>']
75
76
    for afftype in affected_type_elements:
77
        # Get official name for product (prefixed with content of afftype)
78
        product_name = afftype + utils.map_name(product)
79
        # Append the product version to the official name
80
        if product_version is not None:
81
            # Some product versions have a dot in between the numbers
82
            # While the prodtype doesn't have the dot, the full product name does
83
            if product == "ubuntu" or product == "macos":
84
                product_version = product_version[:2] + "." + product_version[2:]
85
            product_name += ' ' + product_version
86
87
        # Test if this OVAL check is for the concrete product version
88
        if product_name in oval_check_def:
89
            return True
90
91
    # OVAL check isn't neither a multi platform one, nor isn't applicable
92
    # for this product => return False to indicate that
93
94
    return False
95
96
97
def finalize_affected_platforms(xml_tree, env_yaml):
98
    """Depending on your use-case of OVAL you may not need the <affected>
99
    element. Such use-cases including using OVAL as a check engine for XCCDF
100
    benchmarks. Since the XCCDF Benchmarks use cpe:platform with CPE IDs,
101
    the affected element in OVAL definitions is redundant and just bloats the
102
    files. This function removes all *irrelevant* affected platform elements
103
    from given OVAL tree. It then adds one platform of the product we are
104
    building.
105
    """
106
107
    for affected in xml_tree.findall(".//{%s}affected" % (oval_ns)):
108
        for platform in affected.findall("./{%s}platform" % (oval_ns)):
109
            affected.remove(platform)
110
        for product in affected.findall("./{%s}product" % (oval_ns)):
111
            affected.remove(product)
112
113
        final = ElementTree.SubElement(
114
            affected, "{%s}%s" % (oval_ns, utils.required_key(env_yaml, "type")))
115
        final.text = utils.required_key(env_yaml, "full_name")
116
117
    return xml_tree
118
119
120
def oval_entities_are_identical(firstelem, secondelem):
121
    """Check if OVAL entities represented by XML elements are identical
122
       Return: True if identical, False otherwise
123
       Based on: http://stackoverflow.com/a/24349916"""
124
125
    # Per https://github.com/ComplianceAsCode/content/pull/1343#issuecomment-234541909
126
    # and https://github.com/ComplianceAsCode/content/pull/1343#issuecomment-234545296
127
    # ignore the differences in 'comment', 'version', 'state_operator', and
128
    # 'deprecated' attributes. Also ignore different nsmap, since all these
129
    # don't affect the semantics of the OVAL entities
130
131
    # Operate on copies of the elements (since we will modify
132
    # some attributes). Deepcopy will also reset the namespace map
133
    # on copied elements for us
134
    firstcopy = deepcopy(firstelem)
135
    secondcopy = deepcopy(secondelem)
136
137
    # Ignore 'comment', 'version', 'state_operator', and 'deprecated'
138
    # attributes since they don't change the semantics of an element
139
    for copy in [firstcopy, secondcopy]:
140
        for key in copy.keys():
141
            if key in ["comment", "version", "state_operator",
142
                       "deprecated"]:
143
                del copy.attrib[key]
144
145
    # Compare the equality of the copies
146
    if firstcopy.tag != secondcopy.tag:
147
        return False
148
    if firstcopy.text != secondcopy.text:
149
        return False
150
    if firstcopy.tail != secondcopy.tail:
151
        return False
152
    if firstcopy.attrib != secondcopy.attrib:
153
        return False
154
    if len(firstcopy) != len(secondcopy):
155
        return False
156
157
    return all(oval_entities_are_identical(
158
        fchild, schild) for fchild, schild in zip(firstcopy, secondcopy))
159
160
161
def oval_entity_is_extvar(elem):
162
    """Check if OVAL entity represented by XML element is OVAL
163
       <external_variable> element
164
       Return: True if <external_variable>, False otherwise"""
165
166
    return elem.tag == '{%s}external_variable' % oval_ns
167
168
169
element_child_cache = collections.defaultdict(dict)
170
171
172
def append(element, newchild):
173
    """Append new child ONLY if it's not a duplicate"""
174
175
    global element_child_cache
176
177
    newid = newchild.get("id")
178
    existing = element_child_cache[element].get(newid, None)
179
180
    if existing is not None:
181
        # ID is identical and OVAL entities are identical
182
        if oval_entities_are_identical(existing, newchild):
183
            # Moreover the entity is OVAL <external_variable>
184
            if oval_entity_is_extvar(newchild):
185
                # If OVAL entity is identical to some already included
186
                # in the benchmark and represents an OVAL <external_variable>
187
                # it's safe to ignore this ID (since external variables are
188
                # in multiple checks for clarity reasons)
189
                pass
190
            # Some other OVAL entity
191
            else:
192
                # If OVAL entity is identical, but not external_variable, the
193
                # implementation should be rewritten each entity to be present
194
                # just once
195
                sys.stderr.write("ERROR: OVAL ID '%s' is used multiple times "
196
                                 "and should represent the same elements.\n"
197
                                 % (newid))
198
                sys.stderr.write("Rewrite the OVAL checks. Place the identical "
199
                                 "IDs into their own definition and extend "
200
                                 "this definition by it.\n")
201
                sys.exit(1)
202
        # ID is identical, but OVAL entities are semantically difference =>
203
        # report and error and exit with failure
204
        # Fixes: https://github.com/ComplianceAsCode/content/issues/1275
205
        else:
206
            if not oval_entity_is_extvar(existing) and \
207
              not oval_entity_is_extvar(newchild):
208
                # This is an error scenario - since by skipping second
209
                # implementation and using the first one for both references,
210
                # we might evaluate wrong requirement for the second entity
211
                # => report an error and exit with failure in that case
212
                # See
213
                #   https://github.com/ComplianceAsCode/content/issues/1275
214
                # for a reproducer and what could happen in this case
215
                sys.stderr.write("ERROR: it's not possible to use the " +
216
                                 "same ID: %s " % newid + "for two " +
217
                                 "semantically different OVAL entities:\n")
218
                sys.stderr.write("First entity  %s\n" % ElementTree.tostring(existing))
219
                sys.stderr.write("Second entity %s\n" % ElementTree.tostring(newchild))
220
                sys.stderr.write("Use different ID for the second entity!!!\n")
221
                sys.exit(1)
222
    else:
223
        element.append(newchild)
224
        element_child_cache[element][newid] = newchild
225
226
227
def check_oval_version(oval_version):
228
    """Not necessary, but should help with typos"""
229
230
    supported_versions = ["5.11"]
231
    if oval_version not in supported_versions:
232
        supported_versions_str = ", ".join(supported_versions)
233
        sys.stderr.write(
234
            "Suspicious oval version \"%s\", one of {%s} is "
235
            "expected.\n" % (oval_version, supported_versions_str))
236
        sys.exit(1)
237
238
239
def _check_is_loaded(loaded_dict, filename, version):
240
    if filename in loaded_dict:
241
        if loaded_dict[filename] >= version:
242
            return True
243
244
        # Should rather fail, than override something unwanted
245
        sys.stderr.write(
246
            "You cannot override generic OVAL file in version '%s' "
247
            "by more specific one in older version '%s'" %
248
            (version, loaded_dict[filename])
249
        )
250
        sys.exit(1)
251
252
    return False
253
254
255
def _create_oval_tree_from_string(xml_content):
256
    try:
257
        argument = oval_header + xml_content + oval_footer
258
        oval_file_tree = ElementTree.fromstring(argument)
259
    except ElementTree.ParseError as error:
260
        line, column = error.position
261
        lines = argument.splitlines()
262
        before = '\n'.join(lines[:line])
263
        column_pointer = ' ' * (column - 1) + '^'
264
        sys.stderr.write(
265
            "%s\n%s\nError when parsing OVAL file.\n" %
266
            (before, column_pointer))
267
        sys.exit(1)
268
    return oval_file_tree
269
270
271
def _check_oval_version_from_oval(oval_file_tree, oval_version):
272
    for defgroup in oval_file_tree.findall("./{%s}def-group" % oval_ns):
273
        file_oval_version = defgroup.get("oval_version")
274
275
    if file_oval_version is None:
0 ignored issues
show
introduced by
The variable file_oval_version does not seem to be defined in case the for loop on line 272 is not entered. Are you sure this can never be the case?
Loading history...
276
        # oval_version does not exist in <def-group/>
277
        # which means the OVAL is supported for any version.
278
        # By default, that version is 5.11
279
        file_oval_version = "5.11"
280
281
    if tuple(oval_version.split(".")) >= tuple(file_oval_version.split(".")):
282
        return True
283
284
285
def _check_rule_id(oval_file_tree, rule_id):
286
    for definition in oval_file_tree.findall(
287
            "./{%s}def-group/{%s}definition" % (oval_ns, oval_ns)):
288
        definition_id = definition.get("id")
289
        return definition_id == rule_id
290
    return False
291
292
293
def _list_full_paths(directory):
294
    full_paths = [os.path.join(directory, x) for x in os.listdir(directory)]
295
    return sorted(full_paths)
296
297
298
class OVALBuilder:
299
    def __init__(
300
            self, env_yaml, product_yaml_path, shared_directories,
301
            build_ovals_dir):
302
        self.env_yaml = env_yaml
303
        self.product_yaml = products.Product(product_yaml_path)
304
        self.shared_directories = shared_directories
305
        self.build_ovals_dir = build_ovals_dir
306
        self.already_loaded = dict()
307
        self.oval_version = utils.required_key(
308
            env_yaml, "target_oval_version_str")
309
        self.product = utils.required_key(env_yaml, "product")
310
311
    def build_shorthand(self, include_benchmark):
312
        if self.build_ovals_dir:
313
            mkdir_p(self.build_ovals_dir)
314
        all_checks = []
315
        if include_benchmark:
316
            all_checks += self._get_checks_from_benchmark()
317
        all_checks += self._get_checks_from_shared_directories()
318
        document_body = "".join(all_checks)
319
        return document_body
320
321
    def _get_checks_from_benchmark(self):
322
        product_dir = self.product_yaml["product_dir"]
323
        relative_guide_dir = utils.required_key(self.env_yaml, "benchmark_root")
324
        guide_dir = os.path.abspath(
325
            os.path.join(product_dir, relative_guide_dir))
326
        additional_content_directories = self.env_yaml.get(
327
            "additional_content_directories", [])
328
        dirs_to_scan = [guide_dir]
329
        for rd in additional_content_directories:
330
            abspath = os.path.abspath(os.path.join(product_dir, rd))
331
            dirs_to_scan.append(abspath)
332
        rule_dirs = list(find_rule_dirs_in_paths(dirs_to_scan))
333
        oval_checks = self._process_directories(rule_dirs, True)
334
        return oval_checks
335
336
    def _get_checks_from_shared_directories(self):
337
        # earlier directory has higher priority
338
        reversed_dirs = self.shared_directories[::-1]
339
        oval_checks = self._process_directories(reversed_dirs, False)
340
        return oval_checks
341
342
    def _process_directories(self, directories, from_benchmark):
343
        oval_checks = []
344
        for directory in directories:
345
            if not os.path.exists(directory):
346
                continue
347
            oval_checks += self._process_directory(directory, from_benchmark)
348
        return oval_checks
349
350
    def _get_list_of_oval_files(self, directory, from_benchmark):
351
        if from_benchmark:
352
            oval_files = get_rule_dir_ovals(directory, self.product)
353
        else:
354
            oval_files = _list_full_paths(directory)
355
        return oval_files
356
357
    def _process_directory(self, directory, from_benchmark):
358
        try:
359
            context = self._get_context(directory, from_benchmark)
360
        except DocumentationNotComplete:
361
            return []
362
        oval_files = self._get_list_of_oval_files(directory, from_benchmark)
363
        oval_checks = self._get_directory_oval_checks(
364
            context, oval_files, from_benchmark)
365
        return oval_checks
366
367
    def _get_directory_oval_checks(self, context, oval_files, from_benchmark):
368
        oval_checks = []
369
        for file_path in oval_files:
370
            xml_content = self._process_oval_file(
371
                file_path, from_benchmark, context)
372
            if xml_content is None:
373
                continue
374
            oval_checks.append(xml_content)
375
        return oval_checks
376
377
    def _read_oval_file(self, file_path, context, from_benchmark):
378
        if from_benchmark or "checks_from_templates" not in file_path:
379
            xml_content = process_file_with_macros(file_path, context)
380
        else:
381
            with open(file_path, "r") as f:
382
                xml_content = f.read()
383
        return xml_content
384
385
    def _create_key(self, file_path, from_benchmark):
386
        if from_benchmark:
387
            rule_id = os.path.basename(
388
                (os.path.dirname(os.path.dirname(file_path))))
389
            oval_key = "%s.xml" % rule_id
390
        else:
391
            oval_key = os.path.basename(file_path)
392
        return oval_key
393
394
    def _process_oval_file(self, file_path, from_benchmark, context):
395
        if not file_path.endswith(".xml"):
396
            return None
397
        oval_key = self._create_key(file_path, from_benchmark)
398
        if _check_is_loaded(self.already_loaded, oval_key, self.oval_version):
399
            return None
400
        xml_content = self._read_oval_file(file_path, context, from_benchmark)
401
        if not self._manage_oval_file_xml_content(
402
                file_path, xml_content, from_benchmark):
403
            return None
404
        self.already_loaded[oval_key] = self.oval_version
405
        return xml_content
406
407
    def _check_affected(self, tree):
408
        definitions = tree.findall(".//{%s}definition" % (oval_ns))
409
        for definition in definitions:
410
            def_id = definition.get("id")
411
            affected = definition.findall(
412
                "./{%s}metadata/{%s}affected" % (oval_ns, oval_ns))
413
            if not affected:
414
                raise ValueError(
415
                    "Definition '%s' doesn't contain OVAL 'affected' element"
416
                    % (def_id))
417
418
    def _manage_oval_file_xml_content(
419
            self, file_path, xml_content, from_benchmark):
420
        oval_file_tree = _create_oval_tree_from_string(xml_content)
421
        self._check_affected(oval_file_tree)
422
        if not _check_is_applicable_for_product(xml_content, self.product):
423
            return False
424
        if not _check_oval_version_from_oval(oval_file_tree, self.oval_version):
425
            return False
426
        if from_benchmark:
427
            self._benchmark_specific_actions(
428
                file_path, xml_content, oval_file_tree)
429
        return True
430
431
    def _benchmark_specific_actions(
432
            self, file_path, xml_content, oval_file_tree):
433
        rule_id = os.path.basename(
434
            (os.path.dirname(os.path.dirname(file_path))))
435
        self._store_intermediate_file(rule_id, xml_content)
436
        if not _check_rule_id(oval_file_tree, rule_id):
437
            msg = "ERROR: OVAL definition in '%s' doesn't match rule ID '%s'." % (
438
                file_path, rule_id)
439
            print(msg, file=sys.stderr)
440
            sys.exit(1)
441
442
    def _get_context(self, directory, from_benchmark):
443
        if from_benchmark:
444
            rule_path = os.path.join(directory, "rule.yml")
445
            rule = Rule.from_yaml(rule_path, self.env_yaml)
446
            context = self._create_local_env_yaml_for_rule(rule)
447
        else:
448
            context = self.env_yaml
449
        return context
450
451
    def _create_local_env_yaml_for_rule(self, rule):
452
        local_env_yaml = dict()
453
        local_env_yaml.update(self.env_yaml)
454
        local_env_yaml['rule_id'] = rule.id_
455
        local_env_yaml['rule_title'] = rule.title
456
        prodtypes = parse_prodtype(rule.prodtype)
457
        local_env_yaml['products'] = prodtypes  # default is all
458
        return local_env_yaml
459
460
    def _store_intermediate_file(self, rule_id, xml_content):
461
        if not self.build_ovals_dir:
462
            return
463
        output_file_name = rule_id + ".xml"
464
        output_filepath = os.path.join(self.build_ovals_dir, output_file_name)
465
        with open(output_filepath, "w") as f:
466
            f.write(xml_content)
467