Test Failed
Push — master ( 883760...c9156d )
by Matěj
01:33 queued 15s
created

ssg.build_yaml.XCCDFEntity.process_input_dict()   A

Complexity

Conditions 5

Size

Total Lines 27
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 5.0909

Importance

Changes 0
Metric Value
cc 5
eloc 16
nop 3
dl 0
loc 27
ccs 11
cts 13
cp 0.8462
crap 5.0909
rs 9.1333
c 0
b 0
f 0
1 2
from __future__ import absolute_import
2 2
from __future__ import print_function
3
4 2
from collections import defaultdict
5 2
from copy import deepcopy
6 2
import datetime
7 2
import json
8 2
import os
9 2
import os.path
10 2
import re
11 2
import sys
12 2
from xml.sax.saxutils import escape
13 2
import glob
14
15 2
import yaml
16
17 2
from .build_cpe import CPEDoesNotExist, parse_platform_definition
18 2
from .constants import (XCCDF_REFINABLE_PROPERTIES,
19
                        SCE_SYSTEM,
20
                        cce_uri,
21
                        dc_namespace,
22
                        ocil_cs,
23
                        ocil_namespace,
24
                        oval_namespace,
25
                        xhtml_namespace,
26
                        xsi_namespace,
27
                        timestamp,
28
                        SSG_BENCHMARK_LATEST_URI,
29
                        SSG_PROJECT_NAME,
30
                        SSG_REF_URIS
31
                        )
32 2
from .rules import get_rule_dir_id, get_rule_dir_yaml, is_rule_dir
33 2
from .rule_yaml import parse_prodtype
34
35 2
from .cce import is_cce_format_valid, is_cce_value_valid
36 2
from .yaml import DocumentationNotComplete, open_and_expand, open_and_macro_expand
37 2
from .utils import required_key, mkdir_p
38
39 2
from .xml import ElementTree as ET, add_xhtml_namespace, register_namespaces, parse_file
40 2
from .shims import unicode_func
41
42
43 2
def dump_yaml_preferably_in_original_order(dictionary, file_object):
44
    try:
45
        return yaml.dump(dictionary, file_object, indent=4, sort_keys=False)
46
    except TypeError as exc:
47
        # Older versions of libyaml don't understand the sort_keys kwarg
48
        if "sort_keys" not in str(exc):
49
            raise exc
50
        return yaml.dump(dictionary, file_object, indent=4)
51
52
53 2
def add_sub_element(parent, tag, data):
54
    """
55
    Creates a new child element under parent with tag tag, and sets
56
    data as the content under the tag. In particular, data is a string
57
    to be parsed as an XML tree, allowing sub-elements of children to be
58
    added.
59
60
    If data should not be parsed as an XML tree, either escape the contents
61
    before passing into this function, or use ElementTree.SubElement().
62
63
    Returns the newly created subelement of type tag.
64
    """
65
    namespaced_data = add_xhtml_namespace(data)
66
    # This is used because our YAML data contain XML and XHTML elements
67
    # ET.SubElement() escapes the < > characters by &lt; and &gt;
68
    # and therefore it does not add child elements
69
    # we need to do a hack instead
70
    # TODO: Remove this function after we move to Markdown everywhere in SSG
71
    ustr = unicode_func('<{0} xmlns:xhtml="{2}">{1}</{0}>').format(tag, namespaced_data, xhtml_namespace)
72
73
    try:
74
        element = ET.fromstring(ustr.encode("utf-8"))
75
    except Exception:
76
        msg = ("Error adding subelement to an element '{0}' from string: '{1}'"
77
               .format(parent.tag, ustr))
78
        raise RuntimeError(msg)
79
80
    parent.append(element)
81
    return element
82
83
84 2
def reorder_according_to_ordering(unordered, ordering, regex=None):
85 2
    ordered = []
86 2
    if regex is None:
87 2
        regex = "|".join(["({0})".format(item) for item in ordering])
88 2
    regex = re.compile(regex)
89
90 2
    items_to_order = list(filter(regex.match, unordered))
91 2
    unordered = set(unordered)
92
93 2
    for priority_type in ordering:
94 2
        for item in items_to_order:
95 2
            if priority_type in item and item in unordered:
96 2
                ordered.append(item)
97 2
                unordered.remove(item)
98 2
    ordered.extend(sorted(unordered))
99 2
    return ordered
100
101
102 2
def add_warning_elements(element, warnings):
103
    # The use of [{dict}, {dict}] in warnings is to handle the following
104
    # scenario where multiple warnings have the same category which is
105
    # valid in SCAP and our content:
106
    #
107
    # warnings:
108
    #     - general: Some general warning
109
    #     - general: Some other general warning
110
    #     - general: |-
111
    #         Some really long multiline general warning
112
    #
113
    # Each of the {dict} should have only one key/value pair.
114
    for warning_dict in warnings:
115
        warning = add_sub_element(element, "warning", list(warning_dict.values())[0])
116
        warning.set("category", list(warning_dict.keys())[0])
117
118
119 2
def add_nondata_subelements(element, subelement, attribute, attr_data):
120
    """Add multiple iterations of a sublement that contains an attribute but no data
121
       For example, <requires id="my_required_id"/>"""
122
    for data in attr_data:
123
        req = ET.SubElement(element, subelement)
124
        req.set(attribute, data)
125
126
127 2
def check_warnings(xccdf_structure):
128 2
    for warning_list in xccdf_structure.warnings:
129
        if len(warning_list) != 1:
130
            msg = "Only one key/value pair should exist for each warnings dictionary"
131
            raise ValueError(msg)
132
133
134 2
def add_reference_elements(element, references, ref_uri_dict):
135
    for ref_type, ref_vals in references.items():
136
        for ref_val in ref_vals.split(","):
137
            # This assumes that a single srg key may have items from multiple SRG types
138
            if ref_type == 'srg':
139
                if ref_val.startswith('SRG-OS-'):
140
                    ref_href = ref_uri_dict['os-srg']
141
                elif ref_val.startswith('SRG-APP-'):
142
                    ref_href = ref_uri_dict['app-srg']
143
                else:
144
                    raise ValueError("SRG {0} doesn't have a URI defined.".format(ref_val))
145
            else:
146
                try:
147
                    ref_href = ref_uri_dict[ref_type]
148
                except KeyError as exc:
149
                    msg = (
150
                        "Error processing reference {0}: {1} in Rule {2}."
151
                        .format(ref_type, ref_vals, self.id_))
152
                    raise ValueError(msg)
153
154
            ref = ET.SubElement(element, 'reference')
155
            ref.set("href", ref_href)
156
            ref.text = ref_val
157
158
159 2
def add_benchmark_metadata(element, contributors_file):
160
    metadata = ET.SubElement(element, "metadata")
161
162
    publisher = ET.SubElement(metadata, "{%s}publisher" % dc_namespace)
163
    publisher.text = SSG_PROJECT_NAME
164
165
    creator = ET.SubElement(metadata, "{%s}creator" % dc_namespace)
166
    creator.text = SSG_PROJECT_NAME
167
168
    contrib_tree = parse_file(contributors_file)
169
    for c in contrib_tree.iter('contributor'):
170
        contributor = ET.SubElement(metadata, "{%s}contributor" % dc_namespace)
171
        contributor.text = c.text
172
173
    source = ET.SubElement(metadata, "{%s}source" % dc_namespace)
174
    source.text = SSG_BENCHMARK_LATEST_URI
175
176
177 2
class SelectionHandler(object):
178 2
    def __init__(self):
179 2
        self.refine_rules = defaultdict(list)
180 2
        self.variables = dict()
181 2
        self.unselected = []
182 2
        self.selected = []
183
184 2
    @property
185
    def selections(self):
186
        selections = []
187
        for item in self.selected:
188
            selections.append(str(item))
189
        for item in self.unselected:
190
            selections.append("!"+str(item))
191
        for varname in self.variables.keys():
192
            selections.append(varname+"="+self.variables.get(varname))
193
        for rule, refinements in self.refine_rules.items():
194
            for prop, val in refinements:
195
                selections.append("{rule}.{property}={value}"
196
                                  .format(rule=rule, property=prop, value=val))
197
        return selections
198
199 2
    @selections.setter
200
    def selections(self, entries):
201 2
        for item in entries:
202 2
            self.apply_selection(item)
203
204 2
    def apply_selection(self, item):
205 2
        if "." in item:
206
            rule, refinement = item.split(".", 1)
207
            property_, value = refinement.split("=", 1)
208
            if property_ not in XCCDF_REFINABLE_PROPERTIES:
209
                msg = ("Property '{property_}' cannot be refined. "
210
                       "Rule properties that can be refined are {refinables}. "
211
                       "Fix refinement '{rule_id}.{property_}={value}' in profile '{profile}'."
212
                       .format(property_=property_, refinables=XCCDF_REFINABLE_PROPERTIES,
213
                               rule_id=rule, value=value, profile=self.id_)
214
                       )
215
                raise ValueError(msg)
216
            self.refine_rules[rule].append((property_, value))
217 2
        elif "=" in item:
218 2
            varname, value = item.split("=", 1)
219 2
            self.variables[varname] = value
220 2
        elif item.startswith("!"):
221
            self.unselected.append(item[1:])
222
        else:
223 2
            self.selected.append(item)
224
225 2
    def _subtract_refinements(self, extended_refinements):
226
        """
227
        Given a dict of rule refinements from the extended profile,
228
        "undo" every refinement prefixed with '!' in this profile.
229
        """
230
        for rule, refinements in list(self.refine_rules.items()):
231
            if rule.startswith("!"):
232
                for prop, val in refinements:
233
                    extended_refinements[rule[1:]].remove((prop, val))
234
                del self.refine_rules[rule]
235
        return extended_refinements
236
237 2
    def update_with(self, rhs):
238
        extended_selects = set(rhs.selected)
239
        extra_selections = extended_selects.difference(set(self.selected))
240
        self.selected.extend(list(extra_selections))
241
242
        updated_variables = dict(rhs.variables)
243
        updated_variables.update(self.variables)
244
        self.variables = updated_variables
245
246
        extended_refinements = deepcopy(rhs.refine_rules)
247
        updated_refinements = self._subtract_refinements(extended_refinements)
248
        updated_refinements.update(self.refine_rules)
249
        self.refine_rules = updated_refinements
250
251
252 2
class XCCDFEntity(object):
253
    """
254
    This class can load itself from a YAML with Jinja macros,
255
    and it can also save itself to YAML.
256
257
    It is supposed to work with the content in the project,
258
    when entities are defined in the benchmark tree,
259
    and they are compiled into flat YAMLs to the build directory.
260
    """
261 2
    KEYS = dict(
262
            id_=lambda: "",
263
            definition_location=lambda: "",
264
    )
265
266 2
    MANDATORY_KEYS = set()
267
268 2
    GENERIC_FILENAME = ""
269 2
    ID_LABEL = "id"
270
271 2
    def __init__(self, id_):
272 2
        super(XCCDFEntity, self).__init__()
273 2
        self._assign_defaults()
274 2
        self.id_ = id_
275
276 2
    def _assign_defaults(self):
277 2
        for key, default in self.KEYS.items():
278 2
            default_val = default()
279 2
            if isinstance(default_val, RuntimeError):
280
                default_val = None
281 2
            setattr(self, key, default_val)
282
283 2
    @classmethod
284
    def get_instance_from_full_dict(cls, data):
285
        """
286
        Given a defining dictionary, produce an instance
287
        by treating all dict elements as attributes.
288
289
        Extend this if you want tight control over the instance creation process.
290
        """
291 2
        entity = cls(data["id_"])
292 2
        for key, value in data.items():
293 2
            setattr(entity, key, value)
294 2
        return entity
295
296 2
    @classmethod
297
    def process_input_dict(cls, input_contents, env_yaml):
298
        """
299
        Take the contents of the definition as a dictionary, and
300
        add defaults or raise errors if a required member is not present.
301
302
        Extend this if you want to add, remove or alter the result
303
        that will constitute the new instance.
304
        """
305 2
        data = dict()
306
307 2
        for key, default in cls.KEYS.items():
308 2
            if key in input_contents:
309 2
                if input_contents[key] is not None:
310 2
                    data[key] = input_contents[key]
311 2
                del input_contents[key]
312 2
                continue
313
314 2
            if key not in cls.MANDATORY_KEYS:
315 2
                data[key] = cls.KEYS[key]()
316
            else:
317
                msg = (
318
                    "Key '{key}' is mandatory for definition of '{class_name}'."
319
                    .format(key=key, class_name=cls.__name__))
320
                raise ValueError(msg)
321
322 2
        return data
323
324 2
    @classmethod
325 2
    def parse_yaml_into_processed_dict(cls, yaml_file, env_yaml=None):
326
        """
327
        Given yaml filename and environment info, produce a dictionary
328
        that defines the instance to be created.
329
        This wraps :meth:`process_input_dict` and it adds generic keys on the top:
330
331
        - `id_` as the entity ID that is deduced either from thefilename,
332
          or from the parent directory name.
333
        - `definition_location` as the original location whenre the entity got defined.
334
        """
335 2
        file_basename = os.path.basename(yaml_file)
336 2
        entity_id = file_basename.split(".")[0]
337 2
        if file_basename == cls.GENERIC_FILENAME:
338 2
            entity_id = os.path.basename(os.path.dirname(yaml_file))
339
340 2
        if env_yaml:
341 2
            env_yaml[cls.ID_LABEL] = entity_id
342 2
        yaml_data = open_and_macro_expand(yaml_file, env_yaml)
343
344 2
        try:
345 2
            processed_data = cls.process_input_dict(yaml_data, env_yaml)
346
        except ValueError as exc:
347
            msg = (
348
                "Error processing {yaml_file}: {exc}"
349
                .format(yaml_file=yaml_file, exc=str(exc)))
350
            raise ValueError(msg)
351
352 2
        if yaml_data:
353
            msg = (
354
                "Unparsed YAML data in '{yaml_file}': {keys}"
355
                .format(yaml_file=yaml_file, keys=list(yaml_data.keys())))
356
            raise RuntimeError(msg)
357
358 2
        if not processed_data.get("definition_location", ""):
359 2
            processed_data["definition_location"] = yaml_file
360
361 2
        processed_data["id_"] = entity_id
362
363 2
        return processed_data
364
365 2
    @classmethod
366 2
    def from_yaml(cls, yaml_file, env_yaml=None):
367 2
        yaml_file = os.path.normpath(yaml_file)
368
369 2
        local_env_yaml = None
370 2
        if env_yaml:
371 2
            local_env_yaml = dict()
372 2
            local_env_yaml.update(env_yaml)
373
374 2
        try:
375 2
            data_dict = cls.parse_yaml_into_processed_dict(yaml_file, local_env_yaml)
376
        except DocumentationNotComplete as exc:
377
            raise
378
        except Exception as exc:
379
            msg = (
380
                "Error loading a {class_name} from {filename}: {error}"
381
                .format(class_name=cls.__name__, filename=yaml_file, error=str(exc)))
382
            raise RuntimeError(msg)
383
384 2
        result = cls.get_instance_from_full_dict(data_dict)
385
386 2
        return result
387
388 2
    def represent_as_dict(self):
389
        """
390
        Produce a dict representation of the class.
391
392
        Extend this method if you need the representation to be different from the object.
393
        """
394 2
        data = dict()
395 2
        for key in self.KEYS:
396 2
            value = getattr(self, key)
397 2
            if value or True:
398 2
                data[key] = getattr(self, key)
399 2
        del data["id_"]
400 2
        return data
401
402 2
    def dump_yaml(self, file_name, documentation_complete=True):
403
        to_dump = self.represent_as_dict()
404
        to_dump["documentation_complete"] = documentation_complete
405
        with open(file_name, "w+") as f:
406
            dump_yaml_preferably_in_original_order(to_dump, f)
407
408 2
    def to_xml_element(self):
409
        raise NotImplementedError()
410
411
412 2
class Profile(XCCDFEntity, SelectionHandler):
413
    """Represents XCCDF profile
414
    """
415 2
    KEYS = dict(
416
        title=lambda: "",
417
        description=lambda: "",
418
        extends=lambda: "",
419
        metadata=lambda: None,
420
        reference=lambda: None,
421
        selections=lambda: list(),
422
        platforms=lambda: set(),
423
        cpe_names=lambda: set(),
424
        platform=lambda: None,
425
        filter_rules=lambda: "",
426
        ** XCCDFEntity.KEYS
427
    )
428
429 2
    MANDATORY_KEYS = {
430
        "title",
431
        "description",
432
        "selections",
433
    }
434
435 2
    @classmethod
436
    def process_input_dict(cls, input_contents, env_yaml):
437 2
        input_contents = super(Profile, cls).process_input_dict(input_contents, env_yaml)
438
439 2
        platform = input_contents.get("platform")
440 2
        if platform is not None:
441
            input_contents["platforms"].add(platform)
442
443 2
        if env_yaml:
444 2
            for platform in input_contents["platforms"]:
445
                try:
446
                    new_cpe_name = env_yaml["product_cpes"].get_cpe_name(platform)
447
                    input_contents["cpe_names"].add(new_cpe_name)
448
                except CPEDoesNotExist:
449
                    msg = (
450
                        "Unsupported platform '{platform}' in a profile."
451
                        .format(platform=platform))
452
                    raise CPEDoesNotExist(msg)
453
454 2
        return input_contents
455
456 2
    @property
457
    def rule_filter(self):
458
        if self.filter_rules:
459
            return rule_filter_from_def(self.filter_rules)
460
        else:
461
            return noop_rule_filterfunc
462
463 2
    def to_xml_element(self):
464
        element = ET.Element('Profile')
465
        element.set("id", self.id_)
466
        if self.extends:
467
            element.set("extends", self.extends)
468
        title = add_sub_element(element, "title", self.title)
469
        title.set("override", "true")
470
        desc = add_sub_element(element, "description", self.description)
471
        desc.set("override", "true")
472
473
        if self.reference:
474
            add_sub_element(element, "reference", escape(self.reference))
475
476
        for cpe_name in self.cpe_names:
477
            plat = ET.SubElement(element, "platform")
478
            plat.set("idref", cpe_name)
479
480
        for selection in self.selected:
481
            select = ET.Element("select")
482
            select.set("idref", selection)
483
            select.set("selected", "true")
484
            element.append(select)
485
486
        for selection in self.unselected:
487
            unselect = ET.Element("select")
488
            unselect.set("idref", selection)
489
            unselect.set("selected", "false")
490
            element.append(unselect)
491
492
        for value_id, selector in self.variables.items():
493
            refine_value = ET.Element("refine-value")
494
            refine_value.set("idref", value_id)
495
            refine_value.set("selector", selector)
496
            element.append(refine_value)
497
498
        for refined_rule, refinement_list in self.refine_rules.items():
499
            refine_rule = ET.Element("refine-rule")
500
            refine_rule.set("idref", refined_rule)
501
            for refinement in refinement_list:
502
                refine_rule.set(refinement[0], refinement[1])
503
            element.append(refine_rule)
504
505
        return element
506
507 2
    def get_rule_selectors(self):
508 2
        return self.selected + self.unselected
509
510 2
    def get_variable_selectors(self):
511 2
        return self.variables
512
513 2
    def validate_refine_rules(self, rules):
514
        existing_rule_ids = [r.id_ for r in rules]
515
        for refine_rule, refinement_list in self.refine_rules.items():
516
            # Take first refinement to ilustrate where the error is
517
            # all refinements in list are invalid, so it doesn't really matter
518
            a_refinement = refinement_list[0]
519
520
            if refine_rule not in existing_rule_ids:
521
                msg = (
522
                    "You are trying to refine a rule that doesn't exist. "
523
                    "Rule '{rule_id}' was not found in the benchmark. "
524
                    "Please check all rule refinements for rule: '{rule_id}', for example: "
525
                    "- {rule_id}.{property_}={value}' in profile {profile_id}."
526
                    .format(rule_id=refine_rule, profile_id=self.id_,
527
                            property_=a_refinement[0], value=a_refinement[1])
528
                    )
529
                raise ValueError(msg)
530
531
            if refine_rule not in self.get_rule_selectors():
532
                msg = ("- {rule_id}.{property_}={value}' in profile '{profile_id}' is refining "
533
                       "a rule that is not selected by it. The refinement will not have any "
534
                       "noticeable effect. Either select the rule or remove the rule refinement."
535
                       .format(rule_id=refine_rule, property_=a_refinement[0],
536
                               value=a_refinement[1], profile_id=self.id_)
537
                       )
538
                raise ValueError(msg)
539
540 2
    def validate_variables(self, variables):
541
        variables_by_id = dict()
542
        for var in variables:
543
            variables_by_id[var.id_] = var
544
545
        for var_id, our_val in self.variables.items():
546
            if var_id not in variables_by_id:
547
                all_vars_list = [" - %s" % v for v in variables_by_id.keys()]
548
                msg = (
549
                    "Value '{var_id}' in profile '{profile_name}' is not known. "
550
                    "We know only variables:\n{var_names}"
551
                    .format(
552
                        var_id=var_id, profile_name=self.id_,
553
                        var_names="\n".join(sorted(all_vars_list)))
554
                )
555
                raise ValueError(msg)
556
557
            allowed_selectors = [str(s) for s in variables_by_id[var_id].options.keys()]
558
            if our_val not in allowed_selectors:
559
                msg = (
560
                    "Value '{var_id}' in profile '{profile_name}' "
561
                    "uses the selector '{our_val}'. "
562
                    "This is not possible, as only selectors {all_selectors} are available. "
563
                    "Either change the selector used in the profile, or "
564
                    "add the selector-value pair to the variable definition."
565
                    .format(
566
                        var_id=var_id, profile_name=self.id_, our_val=our_val,
567
                        all_selectors=allowed_selectors,
568
                    )
569
                )
570
                raise ValueError(msg)
571
572 2
    def validate_rules(self, rules, groups):
573
        existing_rule_ids = [r.id_ for r in rules]
574
        rule_selectors = self.get_rule_selectors()
575
        for id_ in rule_selectors:
576
            if id_ in groups:
577
                msg = (
578
                    "You have selected a group '{group_id}' instead of a "
579
                    "rule. Groups have no effect in the profile and are not "
580
                    "allowed to be selected. Please remove '{group_id}' "
581
                    "from profile '{profile_id}' before proceeding."
582
                    .format(group_id=id_, profile_id=self.id_)
583
                )
584
                raise ValueError(msg)
585
            if id_ not in existing_rule_ids:
586
                msg = (
587
                    "Rule '{rule_id}' was not found in the benchmark. Please "
588
                    "remove rule '{rule_id}' from profile '{profile_id}' "
589
                    "before proceeding."
590
                    .format(rule_id=id_, profile_id=self.id_)
591
                )
592
                raise ValueError(msg)
593
594 2
    def __sub__(self, other):
595
        profile = Profile(self.id_)
596
        profile.title = self.title
597
        profile.description = self.description
598
        profile.extends = self.extends
599
        profile.platforms = self.platforms
600
        profile.platform = self.platform
601
        profile.selected = list(set(self.selected) - set(other.selected))
602
        profile.selected.sort()
603
        profile.unselected = list(set(self.unselected) - set(other.unselected))
604
        profile.variables = dict ((k, v) for (k, v) in self.variables.items()
605
                             if k not in other.variables or v != other.variables[k])
606
        return profile
607
608
609 2
class ResolvableProfile(Profile):
610 2
    def __init__(self, * args, ** kwargs):
611
        super(ResolvableProfile, self).__init__(* args, ** kwargs)
612
        self.resolved = False
613
614 2
    def _controls_ids_to_controls(self, controls_manager, policy_id, control_id_list):
615
        items = [controls_manager.get_control(policy_id, cid) for cid in control_id_list]
616
        return items
617
618 2
    def resolve_controls(self, controls_manager):
619
        pass
620
621 2
    def extend_by(self, extended_profile):
622
        self.update_with(extended_profile)
623
624 2
    def resolve_selections_with_rules(self, rules_by_id):
625
        selections = set()
626
        for rid in self.selected:
627
            if rid not in rules_by_id:
628
                continue
629
            rule = rules_by_id[rid]
630
            if not self.rule_filter(rule):
631
                continue
632
            selections.add(rid)
633
        self.selected = list(selections)
634
635 2
    def resolve(self, all_profiles, rules_by_id, controls_manager=None):
636
        if self.resolved:
637
            return
638
639
        if controls_manager:
640
            self.resolve_controls(controls_manager)
641
642
        self.resolve_selections_with_rules(rules_by_id)
643
644
        if self.extends:
645
            if self.extends not in all_profiles:
646
                msg = (
647
                    "Profile {name} extends profile {extended}, but "
648
                    "only profiles {known_profiles} are available for resolution."
649
                    .format(name=self.id_, extended=self.extends,
650
                            known_profiles=list(all_profiles.keys())))
651
                raise RuntimeError(msg)
652
            extended_profile = all_profiles[self.extends]
653
            extended_profile.resolve(all_profiles, rules_by_id, controls_manager)
654
655
            self.extend_by(extended_profile)
656
657
        self.selected = [s for s in set(self.selected) if s not in self.unselected]
658
659
        self.unselected = []
660
        self.extends = None
661
662
        self.selected = sorted(self.selected)
663
664
        for rid in self.selected:
665
            if rid not in rules_by_id:
666
                msg = (
667
                    "Rule {rid} is selected by {profile}, but the rule is not available. "
668
                    "This may be caused by a discrepancy of prodtypes."
669
                    .format(rid=rid, profile=self.id_))
670
                raise ValueError(msg)
671
672
        self.resolved = True
673
674
675 2
class ProfileWithInlinePolicies(ResolvableProfile):
676 2
    def __init__(self, * args, ** kwargs):
677
        super(ProfileWithInlinePolicies, self).__init__(* args, ** kwargs)
678
        self.controls_by_policy = defaultdict(list)
679
680 2
    def apply_selection(self, item):
681
        # ":" is the delimiter for controls but not when the item is a variable
682
        if ":" in item and "=" not in item:
683
            policy_id, control_id = item.split(":", 1)
684
            self.controls_by_policy[policy_id].append(control_id)
685
        else:
686
            super(ProfileWithInlinePolicies, self).apply_selection(item)
687
688 2
    def _process_controls_ids_into_controls(self, controls_manager, policy_id, controls_ids):
689
        controls = []
690
        for cid in controls_ids:
691
            if not cid.startswith("all"):
692
                controls.extend(
693
                    self._controls_ids_to_controls(controls_manager, policy_id, [cid]))
694
            elif ":" in cid:
695
                _, level_id = cid.split(":", 1)
696
                controls.extend(
697
                    controls_manager.get_all_controls_of_level(policy_id, level_id))
698
            else:
699
                controls.extend(
700
                    controls_manager.get_all_controls(policy_id))
701
        return controls
702
703 2
    def resolve_controls(self, controls_manager):
704
        for policy_id, controls_ids in self.controls_by_policy.items():
705
            controls = self._process_controls_ids_into_controls(
706
                controls_manager, policy_id, controls_ids)
707
708
            for c in controls:
709
                self.update_with(c)
710
711
712 2
class Value(XCCDFEntity):
713
    """Represents XCCDF Value
714
    """
715 2
    KEYS = dict(
716
        title=lambda: "",
717
        description=lambda: "",
718
        type=lambda: "",
719
        operator=lambda: "equals",
720
        interactive=lambda: False,
721
        options=lambda: dict(),
722
        warnings=lambda: list(),
723
        ** XCCDFEntity.KEYS
724
    )
725
726 2
    MANDATORY_KEYS = {
727
        "title",
728
        "description",
729
        "type",
730
    }
731
732 2
    @classmethod
733
    def process_input_dict(cls, input_contents, env_yaml):
734 2
        input_contents["interactive"] = (
735
            input_contents.get("interactive", "false").lower() == "true")
736
737 2
        data = super(Value, cls).process_input_dict(input_contents, env_yaml)
738
739 2
        possible_operators = ["equals", "not equal", "greater than",
740
                              "less than", "greater than or equal",
741
                              "less than or equal", "pattern match"]
742
743 2
        if data["operator"] not in possible_operators:
744
            raise ValueError(
745
                "Found an invalid operator value '%s'. "
746
                "Expected one of: %s"
747
                % (data["operator"], ", ".join(possible_operators))
748
            )
749
750 2
        return data
751
752 2
    @classmethod
753 2
    def from_yaml(cls, yaml_file, env_yaml=None):
754 2
        value = super(Value, cls).from_yaml(yaml_file, env_yaml)
755
756 2
        check_warnings(value)
757
758 2
        return value
759
760 2
    def to_xml_element(self):
761
        value = ET.Element('Value')
762
        value.set('id', self.id_)
763
        value.set('type', self.type)
764
        if self.operator != "equals":  # equals is the default
765
            value.set('operator', self.operator)
766
        if self.interactive:  # False is the default
767
            value.set('interactive', 'true')
768
        title = ET.SubElement(value, 'title')
769
        title.text = self.title
770
        add_sub_element(value, 'description', self.description)
771
        add_warning_elements(value, self.warnings)
772
773
        for selector, option in self.options.items():
774
            # do not confuse Value with big V with value with small v
775
            # value is child element of Value
776
            value_small = ET.SubElement(value, 'value')
777
            # by XCCDF spec, default value is value without selector
778
            if selector != "default":
779
                value_small.set('selector', str(selector))
780
            value_small.text = str(option)
781
782
        return value
783
784 2
    def to_file(self, file_name):
785
        root = self.to_xml_element()
786
        tree = ET.ElementTree(root)
787
        tree.write(file_name)
788
789
790 2
class Benchmark(XCCDFEntity):
791
    """Represents XCCDF Benchmark
792
    """
793 2
    KEYS = dict(
794
        title=lambda: "",
795
        status=lambda: "",
796
        description=lambda: "",
797
        notice_id=lambda: "",
798
        notice_description=lambda: "",
799
        front_matter=lambda: "",
800
        rear_matter=lambda: "",
801
        cpes=lambda: list(),
802
        version=lambda: "",
803
        profiles=lambda: list(),
804
        values=lambda: dict(),
805
        groups=lambda: dict(),
806
        rules=lambda: dict(),
807
        product_cpe_names=lambda: list(),
808
        ** XCCDFEntity.KEYS
809
    )
810
811 2
    MANDATORY_KEYS = {
812
        "title",
813
        "status",
814
        "description",
815
        "front_matter",
816
        "rear_matter",
817
    }
818
819 2
    GENERIC_FILENAME = "benchmark.yml"
820
821 2
    def load_entities(self, rules_by_id, values_by_id, groups_by_id):
822
        for rid, val in self.rules.items():
823
            if not val:
824
                self.rules[rid] = rules_by_id[rid]
825
826
        for vid, val in self.values.items():
827
            if not val:
828
                self.values[vid] = values_by_id[vid]
829
830
        for gid, val in self.groups.items():
831
            if not val:
832
                self.groups[gid] = groups_by_id[gid]
833
834 2
    @classmethod
835
    def process_input_dict(cls, input_contents, env_yaml):
836
        input_contents["front_matter"] = input_contents["front-matter"]
837
        del input_contents["front-matter"]
838
        input_contents["rear_matter"] = input_contents["rear-matter"]
839
        del input_contents["rear-matter"]
840
841
        data = super(Benchmark, cls).process_input_dict(input_contents, env_yaml)
842
843
        notice_contents = required_key(input_contents, "notice")
844
        del input_contents["notice"]
845
846
        data["notice_id"] = required_key(notice_contents, "id")
847
        del notice_contents["id"]
848
849
        data["notice_description"] = required_key(notice_contents, "description")
850
        del notice_contents["description"]
851
852
        return data
853
854 2
    def represent_as_dict(self):
855
        data = super(Benchmark, cls).represent_as_dict()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable cls does not seem to be defined.
Loading history...
856
        data["rear-matter"] = data["rear_matter"]
857
        del data["rear_matter"]
858
859
        data["front-matter"] = data["front_matter"]
860
        del data["front_matter"]
861
        return data
862
863 2
    @classmethod
864 2
    def from_yaml(cls, yaml_file, env_yaml=None):
865
        benchmark = super(Benchmark, cls).from_yaml(yaml_file, env_yaml)
866
        if env_yaml:
867
            benchmark.product_cpe_names = env_yaml["product_cpes"].get_product_cpe_names()
868
            benchmark.cpe_platform_spec = env_yaml["product_cpes"].cpe_platform_specification
869
            benchmark.id_ = env_yaml["benchmark_id"]
870
            benchmark.version = env_yaml["ssg_version_str"]
871
        else:
872
            benchmark.id_ = "product-name"
873
            benchmark.version = "0.0"
874
875
        return benchmark
876
877 2
    def add_profiles_from_dir(self, dir_, env_yaml):
878
        for dir_item in sorted(os.listdir(dir_)):
879
            dir_item_path = os.path.join(dir_, dir_item)
880
            if not os.path.isfile(dir_item_path):
881
                continue
882
883
            _, ext = os.path.splitext(os.path.basename(dir_item_path))
884
            if ext != '.profile':
885
                sys.stderr.write(
886
                    "Encountered file '%s' while looking for profiles, "
887
                    "extension '%s' is unknown. Skipping..\n"
888
                    % (dir_item, ext)
889
                )
890
                continue
891
892
            try:
893
                new_profile = ProfileWithInlinePolicies.from_yaml(dir_item_path, env_yaml)
894
            except DocumentationNotComplete:
895
                continue
896
            except Exception as exc:
897
                msg = ("Error building profile from '{fname}': '{error}'"
898
                       .format(fname=dir_item_path, error=str(exc)))
899
                raise RuntimeError(msg)
900
            if new_profile is None:
901
                continue
902
903
            self.profiles.append(new_profile)
904
905 2
    def to_xml_element(self, env_yaml=None):
906
        root = ET.Element('Benchmark')
907
        root.set('id', self.id_)
908
        root.set('xmlns', "http://checklists.nist.gov/xccdf/1.1")
909
        root.set('xmlns:xsi', 'http://www.w3.org/2001/XMLSchema-instance')
910
        root.set('xsi:schemaLocation',
911
                 'http://checklists.nist.gov/xccdf/1.1 xccdf-1.1.4.xsd')
912
        root.set('style', 'SCAP_1.1')
913
        root.set('resolved', 'false')
914
        root.set('xml:lang', 'en-US')
915
        status = ET.SubElement(root, 'status')
916
        status.set('date', datetime.date.today().strftime("%Y-%m-%d"))
917
        status.text = self.status
918
        add_sub_element(root, "title", self.title)
919
        add_sub_element(root, "description", self.description)
920
        notice = add_sub_element(root, "notice", self.notice_description)
921
        notice.set('id', self.notice_id)
922
        add_sub_element(root, "front-matter", self.front_matter)
923
        add_sub_element(root, "rear-matter", self.rear_matter)
924
        # if there are no platforms, do not output platform-specification at all
925
        if len(self.cpe_platform_spec.platforms) > 0:
926
            root.append(self.cpe_platform_spec.to_xml_element())
927
928
        # The Benchmark applicability is determined by the CPEs
929
        # defined in the product.yml
930
        for cpe_name in self.product_cpe_names:
931
            plat = ET.SubElement(root, "platform")
932
            plat.set("idref", cpe_name)
933
934
        version = ET.SubElement(root, 'version')
935
        version.text = self.version
936
        version.set('update', SSG_BENCHMARK_LATEST_URI)
937
938
        contributors_file = os.path.join(os.path.dirname(__file__), "../Contributors.xml")
939
        add_benchmark_metadata(root, contributors_file)
940
941
        for profile in self.profiles:
942
            root.append(profile.to_xml_element())
943
944
        for value in self.values.values():
945
            root.append(value.to_xml_element())
946
947
        groups_in_bench = list(self.groups.keys())
948
        priority_order = ["system", "services"]
949
        groups_in_bench = reorder_according_to_ordering(groups_in_bench, priority_order)
950
951
        # Make system group the first, followed by services group
952
        for group_id in groups_in_bench:
953
            group = self.groups.get(group_id)
954
            # Products using application benchmark don't have system or services group
955
            if group is not None:
956
                root.append(group.to_xml_element(env_yaml))
957
958
        for rule in self.rules.values():
959
            root.append(rule.to_xml_element(env_yaml))
960
961
        return root
962
963 2
    def to_file(self, file_name, env_yaml=None):
964
        root = self.to_xml_element(env_yaml)
965
        tree = ET.ElementTree(root)
966
        tree.write(file_name)
967
968 2
    def add_value(self, value):
969
        if value is None:
970
            return
971
        self.values[value.id_] = value
972
973
    # The benchmark is also considered a group, so this function signature needs to match
974
    # Group()'s add_group()
975 2
    def add_group(self, group, env_yaml=None):
976
        if group is None:
977
            return
978
        self.groups[group.id_] = group
979
980 2
    def add_rule(self, rule):
981
        if rule is None:
982
            return
983
        self.rules[rule.id_] = rule
984
985 2
    def to_xccdf(self):
986
        """We can easily extend this script to generate a valid XCCDF instead
987
        of SSG SHORTHAND.
988
        """
989
        raise NotImplementedError
990
991 2
    def __str__(self):
992
        return self.id_
993
994
995 2
class Group(XCCDFEntity):
996
    """Represents XCCDF Group
997
    """
998 2
    ATTRIBUTES_TO_PASS_ON = (
999
        "platforms",
1000
        "cpe_platform_names",
1001
    )
1002
1003 2
    GENERIC_FILENAME = "group.yml"
1004
1005 2
    KEYS = dict(
1006
        prodtype=lambda: "all",
1007
        title=lambda: "",
1008
        description=lambda: "",
1009
        warnings=lambda: list(),
1010
        requires=lambda: list(),
1011
        conflicts=lambda: list(),
1012
        values=lambda: dict(),
1013
        groups=lambda: dict(),
1014
        rules=lambda: dict(),
1015
        platform=lambda: "",
1016
        platforms=lambda: set(),
1017
        cpe_platform_names=lambda: set(),
1018
        ** XCCDFEntity.KEYS
1019
    )
1020
1021 2
    MANDATORY_KEYS = {
1022
        "title",
1023
        "status",
1024
        "description",
1025
        "front_matter",
1026
        "rear_matter",
1027
    }
1028
1029 2
    @classmethod
1030
    def process_input_dict(cls, input_contents, env_yaml):
1031
        data = super(Group, cls).process_input_dict(input_contents, env_yaml)
1032
        if data["rules"]:
1033
            rule_ids = data["rules"]
1034
            data["rules"] = {rid: None for rid in rule_ids}
1035
1036
        if data["groups"]:
1037
            group_ids = data["groups"]
1038
            data["groups"] = {gid: None for gid in group_ids}
1039
1040
        if data["values"]:
1041
            value_ids = data["values"]
1042
            data["values"] = {vid: None for vid in value_ids}
1043
1044
        if data["platform"]:
1045
            data["platforms"].add(data["platform"])
1046
1047
        # parse platform definition and get CPEAL platform
1048
        if data["platforms"]:
1049
            for platform in data["platforms"]:
1050
                cpe_platform = parse_platform_definition(platform, env_yaml["product_cpes"])
1051
                data["cpe_platform_names"].add(cpe_platform.id)
1052
                # add platform to platform specification
1053
                env_yaml["product_cpes"].cpe_platform_specification.add_platform(cpe_platform)
1054
        return data
1055
1056 2
    def load_entities(self, rules_by_id, values_by_id, groups_by_id):
1057
        for rid, val in self.rules.items():
1058
            if not val:
1059
                self.rules[rid] = rules_by_id[rid]
1060
1061
        for vid, val in self.values.items():
1062
            if not val:
1063
                self.values[vid] = values_by_id[vid]
1064
1065
        for gid in list(self.groups):
1066
            val = self.groups.get(gid, None)
1067
            if not val:
1068
                try:
1069
                    self.groups[gid] = groups_by_id[gid]
1070
                except KeyError:
1071
                    # Add only the groups we have compiled and loaded
1072
                    del self.groups[gid]
1073
                    pass
1074
1075 2
    def represent_as_dict(self):
1076
        yaml_contents = super(Group, self).represent_as_dict()
1077
1078
        if self.rules:
1079
            yaml_contents["rules"] = sorted(list(self.rules.keys()))
1080
        if self.groups:
1081
            yaml_contents["groups"] = sorted(list(self.groups.keys()))
1082
        if self.values:
1083
            yaml_contents["values"] = sorted(list(self.values.keys()))
1084
1085
        return yaml_contents
1086
1087 2
    def validate_prodtype(self, yaml_file):
1088
        for ptype in self.prodtype.split(","):
1089
            if ptype.strip() != ptype:
1090
                msg = (
1091
                    "Comma-separated '{prodtype}' prodtype "
1092
                    "in {yaml_file} contains whitespace."
1093
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
1094
                raise ValueError(msg)
1095
1096 2
    def to_xml_element(self, env_yaml=None):
1097
        group = ET.Element('Group')
1098
        group.set('id', self.id_)
1099
        title = ET.SubElement(group, 'title')
1100
        title.text = self.title
1101
        add_sub_element(group, 'description', self.description)
1102
        add_warning_elements(group, self.warnings)
1103
1104
        # This is where references should be put if there are any
1105
        # This is where rationale should be put if there are any
1106
1107
        for cpe_platform_name in self.cpe_platform_names:
1108
            platform_el = ET.SubElement(group, "platform")
1109
            platform_el.set("idref", "#"+cpe_platform_name)
1110
1111
        add_nondata_subelements(group, "requires", "idref", self.requires)
1112
        add_nondata_subelements(group, "conflicts", "idref", self.conflicts)
1113
1114
        for _value in self.values.values():
1115
            group.append(_value.to_xml_element())
1116
1117
        # Rules that install or remove packages affect remediation
1118
        # of other rules.
1119
        # When packages installed/removed rules come first:
1120
        # The Rules are ordered in more logical way, and
1121
        # remediation order is natural, first the package is installed, then configured.
1122
        rules_in_group = list(self.rules.keys())
1123
        regex = (r'(package_.*_(installed|removed))|' +
1124
                 r'(service_.*_(enabled|disabled))|' +
1125
                 r'install_smartcard_packages$')
1126
        priority_order = ["installed", "install_smartcard_packages", "removed",
1127
                          "enabled", "disabled"]
1128
        rules_in_group = reorder_according_to_ordering(rules_in_group, priority_order, regex)
1129
1130
        # Add rules in priority order, first all packages installed, then removed,
1131
        # followed by services enabled, then disabled
1132
        for rule_id in rules_in_group:
1133
            group.append(self.rules.get(rule_id).to_xml_element(env_yaml))
1134
1135
        # Add the sub groups after any current level group rules.
1136
        # As package installed/removed and service enabled/disabled rules are usuallly in
1137
        # top level group, this ensures groups that further configure a package or service
1138
        # are after rules that install or remove it.
1139
        groups_in_group = list(self.groups.keys())
1140
        priority_order = [
1141
            # Make sure rpm_verify_(hashes|permissions|ownership) are run before any other rule.
1142
            # Due to conflicts between rules rpm_verify_* rules and any rule that configures
1143
            # stricter settings, like file_permissions_grub2_cfg and sudo_dedicated_group,
1144
            # the rules deviating from the system default should be evaluated later.
1145
            # So that in the end the system has contents, permissions and ownership reset, and
1146
            # any deviations or stricter settings are applied by the rules in the profile.
1147
            "software", "integrity", "integrity-software", "rpm_verification",
1148
1149
            # The account group has to precede audit group because
1150
            # the rule package_screen_installed is desired to be executed before the rule
1151
            # audit_rules_privileged_commands, othervise the rule
1152
            # does not catch newly installed screen binary during remediation
1153
            # and report fail
1154
            "accounts", "auditing",
1155
1156
1157
            # The FIPS group should come before Crypto,
1158
            # if we want to set a different (stricter) Crypto Policy than FIPS.
1159
            "fips", "crypto",
1160
1161
            # The firewalld_activation must come before ruleset_modifications, othervise
1162
            # remediations for ruleset_modifications won't work
1163
            "firewalld_activation", "ruleset_modifications",
1164
1165
            # Rules from group disabling_ipv6 must precede rules from configuring_ipv6,
1166
            # otherwise the remediation prints error although it is successful
1167
            "disabling_ipv6", "configuring_ipv6"
1168
        ]
1169
        groups_in_group = reorder_according_to_ordering(groups_in_group, priority_order)
1170
        for group_id in groups_in_group:
1171
            _group = self.groups[group_id]
1172
            group.append(_group.to_xml_element(env_yaml))
1173
1174
        return group
1175
1176 2
    def to_file(self, file_name):
1177
        root = self.to_xml_element()
1178
        tree = ET.ElementTree(root)
1179
        tree.write(file_name)
1180
1181 2
    def add_value(self, value):
1182
        if value is None:
1183
            return
1184
        self.values[value.id_] = value
1185
1186 2
    def add_group(self, group, env_yaml=None):
1187
        if group is None:
1188
            return
1189
        if self.platforms and not group.platforms:
1190
            group.platforms = self.platforms
1191
        self.groups[group.id_] = group
1192
        self._pass_our_properties_on_to(group)
1193
1194
        # Once the group has inherited properties, update cpe_names
1195
        if env_yaml:
1196
            for platform in group.platforms:
1197
                cpe_platform = parse_platform_definition(
1198
                    platform, env_yaml["product_cpes"])
1199
                group.cpe_platform_names.add(cpe_platform.id)
1200
                env_yaml["product_cpes"].cpe_platform_specification.add_platform(
1201
                    cpe_platform)
1202
1203 2
    def _pass_our_properties_on_to(self, obj):
1204
        for attr in self.ATTRIBUTES_TO_PASS_ON:
1205
            if hasattr(obj, attr) and getattr(obj, attr) is None:
1206
                setattr(obj, attr, getattr(self, attr))
1207
1208 2
    def add_rule(self, rule, env_yaml=None):
1209
        if rule is None:
1210
            return
1211
        if self.platforms and not rule.platforms:
1212
            rule.platforms = self.platforms
1213
        self.rules[rule.id_] = rule
1214
        self._pass_our_properties_on_to(rule)
1215
1216
        # Once the rule has inherited properties, update cpe_platform_names
1217
        if env_yaml:
1218
            for platform in rule.platforms:
1219
                cpe_platform = parse_platform_definition(
1220
                    platform, env_yaml["product_cpes"])
1221
                rule.cpe_platform_names.add(cpe_platform.id)
1222
                env_yaml["product_cpes"].cpe_platform_specification.add_platform(
1223
                    cpe_platform)
1224
1225 2
    def __str__(self):
1226
        return self.id_
1227
1228
1229 2
def noop_rule_filterfunc(rule):
1230
    return True
1231
1232 2
def rule_filter_from_def(filterdef):
1233
    if filterdef is None or filterdef == "":
1234
        return noop_rule_filterfunc
1235
1236
    def filterfunc(rule):
1237
        # Remove globals for security and only expose
1238
        # variables relevant to the rule
1239
        return eval(filterdef, {"__builtins__": None}, rule.__dict__)
1240
    return filterfunc
1241
1242
1243 2
class Rule(XCCDFEntity):
1244
    """Represents XCCDF Rule
1245
    """
1246 2
    KEYS = dict(
1247
        prodtype=lambda: "all",
1248
        title=lambda: "",
1249
        description=lambda: "",
1250
        rationale=lambda: "",
1251
        severity=lambda: "",
1252
        references=lambda: dict(),
1253
        identifiers=lambda: dict(),
1254
        ocil_clause=lambda: None,
1255
        ocil=lambda: None,
1256
        oval_external_content=lambda: None,
1257
        fix=lambda: "",
1258
        warnings=lambda: list(),
1259
        conflicts=lambda: list(),
1260
        requires=lambda: list(),
1261
        platform=lambda: None,
1262
        platforms=lambda: set(),
1263
        sce_metadata=lambda: dict(),
1264
        inherited_platforms=lambda: list(),
1265
        template=lambda: None,
1266
        cpe_platform_names=lambda: set(),
1267
        ** XCCDFEntity.KEYS
1268
    )
1269
1270 2
    MANDATORY_KEYS = {
1271
        "title",
1272
        "description",
1273
        "rationale",
1274
        "severity",
1275
    }
1276
1277 2
    GENERIC_FILENAME = "rule.yml"
1278 2
    ID_LABEL = "rule_id"
1279
1280 2
    PRODUCT_REFERENCES = ("stigid", "cis",)
1281 2
    GLOBAL_REFERENCES = ("srg", "vmmsrg", "disa", "cis-csc",)
1282
1283 2
    def __init__(self, id_):
1284 2
        super(Rule, self).__init__(id_)
1285 2
        self.sce_metadata = None
1286
1287 2
    def __deepcopy__(self, memo):
1288
        cls = self.__class__
1289
        result = cls.__new__(cls)
1290
        memo[id(self)] = result
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable id does not seem to be defined.
Loading history...
1291
        for k, v in self.__dict__.items():
1292
            # These are difficult to deep copy, so let's just re-use them.
1293
            if k != "template" and k != "local_env_yaml":
1294
                setattr(result, k, deepcopy(v, memo))
1295
            else:
1296
                setattr(result, k, v)
1297
        return result
1298
1299 2
    @classmethod
1300 2
    def from_yaml(cls, yaml_file, env_yaml=None, sce_metadata=None):
1301 2
        rule = super(Rule, cls).from_yaml(yaml_file, env_yaml)
1302
1303
        # platforms are read as list from the yaml file
1304
        # we need them to convert to set again
1305 2
        rule.platforms = set(rule.platforms)
1306
1307
        # rule.platforms.update(set(rule.inherited_platforms))
1308
1309 2
        check_warnings(rule)
1310
1311
        # ensure that content of rule.platform is in rule.platforms as
1312
        # well
1313 2
        if rule.platform is not None:
1314 2
            rule.platforms.add(rule.platform)
1315
1316
        # Convert the platform names to CPE names
1317
        # But only do it if an env_yaml was specified (otherwise there would be no product CPEs
1318
        # to lookup), and the rule's prodtype matches the product being built
1319 2
        if (
1320
                env_yaml and env_yaml["product"] in parse_prodtype(rule.prodtype)
1321
                or env_yaml and rule.prodtype == "all"):
1322
            # parse platform definition and get CPEAL platform
1323
            for platform in rule.platforms:
1324
                cpe_platform = parse_platform_definition(
1325
                    platform, env_yaml["product_cpes"])
1326
                rule.cpe_platform_names.add(cpe_platform.id)
1327
                # add platform to platform specification
1328
                env_yaml["product_cpes"].cpe_platform_specification.add_platform(
1329
                    cpe_platform)
1330
1331
1332 2
        if sce_metadata and rule.id_ in sce_metadata:
1333
            rule.sce_metadata = sce_metadata[rule.id_]
1334
            rule.sce_metadata["relative_path"] = os.path.join(
1335
                env_yaml["product"], "checks/sce", rule.sce_metadata['filename'])
1336
1337 2
        rule.validate_prodtype(yaml_file)
1338 2
        rule.validate_identifiers(yaml_file)
1339 2
        rule.validate_references(yaml_file)
1340 2
        return rule
1341
1342 2
    def _verify_stigid_format(self, product):
1343 2
        stig_id = self.references.get("stigid", None)
1344 2
        if not stig_id:
1345 2
            return
1346 2
        if "," in stig_id:
1347 2
            raise ValueError("Rules can not have multiple STIG IDs.")
1348
1349 2
    def _verify_disa_cci_format(self):
1350 2
        cci_id = self.references.get("disa", None)
1351 2
        if not cci_id:
1352 2
            return
1353
        cci_ex = re.compile(r'^CCI-[0-9]{6}$')
1354
        for cci in cci_id.split(","):
1355
            if not cci_ex.match(cci):
1356
                raise ValueError("CCI '{}' is in the wrong format! "
1357
                                 "Format should be similar to: "
1358
                                 "CCI-XXXXXX".format(cci))
1359
        self.references["disa"] = cci_id
1360
1361 2
    def normalize(self, product):
1362 2
        try:
1363 2
            self.make_refs_and_identifiers_product_specific(product)
1364 2
            self.make_template_product_specific(product)
1365 2
        except Exception as exc:
1366 2
            msg = (
1367
                "Error normalizing '{rule}': {msg}"
1368
                .format(rule=self.id_, msg=str(exc))
1369
            )
1370 2
            raise RuntimeError(msg)
1371
1372 2
    def _get_product_only_references(self):
1373 2
        product_references = dict()
1374
1375 2
        for ref in Rule.PRODUCT_REFERENCES:
1376 2
            start = "{0}@".format(ref)
1377 2
            for gref, gval in self.references.items():
1378 2
                if ref == gref or gref.startswith(start):
1379 2
                    product_references[gref] = gval
1380 2
        return product_references
1381
1382 2
    def make_template_product_specific(self, product):
1383 2
        product_suffix = "@{0}".format(product)
1384
1385 2
        if not self.template:
1386
            return
1387
1388 2
        not_specific_vars = self.template.get("vars", dict())
1389 2
        specific_vars = self._make_items_product_specific(
1390
            not_specific_vars, product_suffix, True)
1391 2
        self.template["vars"] = specific_vars
1392
1393 2
        not_specific_backends = self.template.get("backends", dict())
1394 2
        specific_backends = self._make_items_product_specific(
1395
            not_specific_backends, product_suffix, True)
1396 2
        self.template["backends"] = specific_backends
1397
1398 2
    def make_refs_and_identifiers_product_specific(self, product):
1399 2
        product_suffix = "@{0}".format(product)
1400
1401 2
        product_references = self._get_product_only_references()
1402 2
        general_references = self.references.copy()
1403 2
        for todel in product_references:
1404 2
            general_references.pop(todel)
1405 2
        for ref in Rule.PRODUCT_REFERENCES:
1406 2
            if ref in general_references:
1407
                msg = "Unexpected reference identifier ({0}) without "
1408
                msg += "product qualifier ({0}@{1}) while building rule "
1409
                msg += "{2}"
1410
                msg = msg.format(ref, product, self.id_)
1411
                raise ValueError(msg)
1412
1413 2
        to_set = dict(
1414
            identifiers=(self.identifiers, False),
1415
            general_references=(general_references, True),
1416
            product_references=(product_references, False),
1417
        )
1418 2
        for name, (dic, allow_overwrites) in to_set.items():
1419 2
            try:
1420 2
                new_items = self._make_items_product_specific(
1421
                    dic, product_suffix, allow_overwrites)
1422 2
            except ValueError as exc:
1423 2
                msg = (
1424
                    "Error processing {what} for rule '{rid}': {msg}"
1425
                    .format(what=name, rid=self.id_, msg=str(exc))
1426
                )
1427 2
                raise ValueError(msg)
1428 2
            dic.clear()
1429 2
            dic.update(new_items)
1430
1431 2
        self.references = general_references
1432 2
        self._verify_disa_cci_format()
1433 2
        self.references.update(product_references)
1434
1435 2
        self._verify_stigid_format(product)
1436
1437 2
    def _make_items_product_specific(self, items_dict, product_suffix, allow_overwrites=False):
1438 2
        new_items = dict()
1439 2
        for full_label, value in items_dict.items():
1440 2
            if "@" not in full_label and full_label not in new_items:
1441 2
                new_items[full_label] = value
1442 2
                continue
1443
1444 2
            label = full_label.split("@")[0]
1445
1446
            # this test should occur before matching product_suffix with the product qualifier
1447
            # present in the reference, so it catches problems even for products that are not
1448
            # being built at the moment
1449 2
            if label in Rule.GLOBAL_REFERENCES:
1450
                msg = (
1451
                    "You cannot use product-qualified for the '{item_u}' reference. "
1452
                    "Please remove the product-qualifier and merge values with the "
1453
                    "existing reference if there is any. Original line: {item_q}: {value_q}"
1454
                    .format(item_u=label, item_q=full_label, value_q=value)
1455
                )
1456
                raise ValueError(msg)
1457
1458 2
            if not full_label.endswith(product_suffix):
1459 2
                continue
1460
1461 2
            if label in items_dict and not allow_overwrites and value != items_dict[label]:
1462 2
                msg = (
1463
                    "There is a product-qualified '{item_q}' item, "
1464
                    "but also an unqualified '{item_u}' item "
1465
                    "and those two differ in value - "
1466
                    "'{value_q}' vs '{value_u}' respectively."
1467
                    .format(item_q=full_label, item_u=label,
1468
                            value_q=value, value_u=items_dict[label])
1469
                )
1470 2
                raise ValueError(msg)
1471 2
            new_items[label] = value
1472 2
        return new_items
1473
1474 2
    def validate_identifiers(self, yaml_file):
1475 2
        if self.identifiers is None:
1476
            raise ValueError("Empty identifier section in file %s" % yaml_file)
1477
1478
        # Validate all identifiers are non-empty:
1479 2
        for ident_type, ident_val in self.identifiers.items():
1480 2
            if not isinstance(ident_type, str) or not isinstance(ident_val, str):
1481
                raise ValueError("Identifiers and values must be strings: %s in file %s"
1482
                                 % (ident_type, yaml_file))
1483 2
            if ident_val.strip() == "":
1484
                raise ValueError("Identifiers must not be empty: %s in file %s"
1485
                                 % (ident_type, yaml_file))
1486 2
            if ident_type[0:3] == 'cce':
1487 2
                if not is_cce_format_valid(ident_val):
1488
                    raise ValueError("CCE Identifier format must be valid: invalid format '%s' for CEE '%s'"
1489
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
1490 2
                if not is_cce_value_valid("CCE-" + ident_val):
1491
                    raise ValueError("CCE Identifier value is not a valid checksum: invalid value '%s' for CEE '%s'"
1492
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
1493
1494 2
    def validate_references(self, yaml_file):
1495 2
        if self.references is None:
1496
            raise ValueError("Empty references section in file %s" % yaml_file)
1497
1498 2
        for ref_type, ref_val in self.references.items():
1499 2
            if not isinstance(ref_type, str) or not isinstance(ref_val, str):
1500
                raise ValueError("References and values must be strings: %s in file %s"
1501
                                 % (ref_type, yaml_file))
1502 2
            if ref_val.strip() == "":
1503
                raise ValueError("References must not be empty: %s in file %s"
1504
                                 % (ref_type, yaml_file))
1505
1506 2
        for ref_type, ref_val in self.references.items():
1507 2
            for ref in ref_val.split(","):
1508 2
                if ref.strip() != ref:
1509
                    msg = (
1510
                        "Comma-separated '{ref_type}' reference "
1511
                        "in {yaml_file} contains whitespace."
1512
                        .format(ref_type=ref_type, yaml_file=yaml_file))
1513
                    raise ValueError(msg)
1514
1515 2
    def validate_prodtype(self, yaml_file):
1516 2
        for ptype in self.prodtype.split(","):
1517 2
            if ptype.strip() != ptype:
1518
                msg = (
1519
                    "Comma-separated '{prodtype}' prodtype "
1520
                    "in {yaml_file} contains whitespace."
1521
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
1522
                raise ValueError(msg)
1523
1524 2
    def to_xml_element(self, env_yaml=None):
1525
        rule = ET.Element('Rule')
1526
        rule.set('selected', 'false')
1527
        rule.set('id', self.id_)
1528
        rule.set('severity', self.severity)
1529
        add_sub_element(rule, 'title', self.title)
1530
        add_sub_element(rule, 'description', self.description)
1531
        add_warning_elements(rule, self.warnings)
1532
1533
        if env_yaml:
1534
            ref_uri_dict = env_yaml['reference_uris']
1535
        else:
1536
            ref_uri_dict = SSG_REF_URIS
1537
        add_reference_elements(rule, self.references, ref_uri_dict)
1538
1539
        add_sub_element(rule, 'rationale', self.rationale)
1540
1541
        for cpe_platform_name in sorted(self.cpe_platform_names):
1542
            platform_el = ET.SubElement(rule, "platform")
1543
            platform_el.set("idref", "#"+cpe_platform_name)
1544
1545
        add_nondata_subelements(rule, "requires", "idref", self.requires)
1546
        add_nondata_subelements(rule, "conflicts", "idref", self.conflicts)
1547
1548
        for ident_type, ident_val in self.identifiers.items():
1549
            ident = ET.SubElement(rule, 'ident')
1550
            if ident_type == 'cce':
1551
                ident.set('system', cce_uri)
1552
                ident.text = ident_val
1553
1554
        ocil_parent = rule
1555
        check_parent = rule
1556
1557
        if self.sce_metadata:
1558
            # TODO: This is pretty much another hack, just like the previous OVAL
1559
            # one. However, we avoided the external SCE content as I'm not sure it
1560
            # is generally useful (unlike say, CVE checking with external OVAL)
1561
            #
1562
            # Additionally, we build the content (check subelement) here rather
1563
            # than in xslt due to the nature of our SCE metadata.
1564
            #
1565
            # Finally, before we begin, we might have an element with both SCE
1566
            # and OVAL. We have no way of knowing (right here) whether that is
1567
            # the case (due to a variety of issues, most notably, that linking
1568
            # hasn't yet occurred). So we must rely on the content author's
1569
            # good will, by annotating SCE content with a complex-check tag
1570
            # if necessary.
1571
1572
            if 'complex-check' in self.sce_metadata:
1573
                # Here we have an issue: XCCDF allows EITHER one or more check
1574
                # elements OR a single complex-check. While we have an explicit
1575
                # case handling the OVAL-and-SCE interaction, OCIL entries have
1576
                # (historically) been alongside OVAL content and been in an
1577
                # "OR" manner -- preferring OVAL to SCE. In order to accomplish
1578
                # this, we thus need to add _yet another parent_ when OCIL data
1579
                # is present, and add update ocil_parent accordingly.
1580
                if self.ocil or self.ocil_clause:
1581
                    ocil_parent = ET.SubElement(ocil_parent, "complex-check")
1582
                    ocil_parent.set('operator', 'OR')
1583
1584
                check_parent = ET.SubElement(ocil_parent, "complex-check")
1585
                check_parent.set('operator', self.sce_metadata['complex-check'])
1586
1587
            # Now, add the SCE check element to the tree.
1588
            check = ET.SubElement(check_parent, "check")
1589
            check.set("system", SCE_SYSTEM)
1590
1591
            if 'check-import' in self.sce_metadata:
1592
                if isinstance(self.sce_metadata['check-import'], str):
1593
                    self.sce_metadata['check-import'] = [self.sce_metadata['check-import']]
1594
                for entry in self.sce_metadata['check-import']:
1595
                    check_import = ET.SubElement(check, 'check-import')
1596
                    check_import.set('import-name', entry)
1597
                    check_import.text = None
1598
1599
            if 'check-export' in self.sce_metadata:
1600
                if isinstance(self.sce_metadata['check-export'], str):
1601
                    self.sce_metadata['check-export'] = [self.sce_metadata['check-export']]
1602
                for entry in self.sce_metadata['check-export']:
1603
                    export, value = entry.split('=')
1604
                    check_export = ET.SubElement(check, 'check-export')
1605
                    check_export.set('value-id', value)
1606
                    check_export.set('export-name', export)
1607
                    check_export.text = None
1608
1609
            check_ref = ET.SubElement(check, "check-content-ref")
1610
            href = self.sce_metadata['relative_path']
1611
            check_ref.set("href", href)
1612
1613
        check = ET.SubElement(check_parent, 'check')
1614
        check.set("system", oval_namespace)
1615
        check_content_ref = ET.SubElement(check, "check-content-ref")
1616
        if self.oval_external_content:
1617
            check_content_ref.set("href", self.oval_external_content)
1618
        else:
1619
            # TODO: This is pretty much a hack, oval ID will be the same as rule ID
1620
            #       and we don't want the developers to have to keep them in sync.
1621
            #       Therefore let's just add an OVAL ref of that ID.
1622
            # TODO  Can we not add the check element if the rule doesn't have an OVAL check?
1623
            #       At the moment, the check elements of rules without OVAL are removed by
1624
            #       relabel_ids.py
1625
            check_content_ref.set("href", "oval-unlinked.xml")
1626
            check_content_ref.set("name", self.id_)
1627
1628
        if self.ocil or self.ocil_clause:
1629
            ocil_check = ET.SubElement(check_parent, "check")
1630
            ocil_check.set("system", ocil_cs)
1631
            ocil_check_ref = ET.SubElement(ocil_check, "check-content-ref")
1632
            ocil_check_ref.set("href", "ocil-unlinked.xml")
1633
            ocil_check_ref.set("name", self.id_ + "_ocil")
1634
1635
        return rule
1636
1637 2
    def to_file(self, file_name):
1638
        root = self.to_xml_element()
1639
        tree = ET.ElementTree(root)
1640
        tree.write(file_name)
1641
1642 2
    def to_ocil(self):
1643
        if not self.ocil and not self.ocil_clause:
1644
            raise ValueError("Rule {0} doesn't have OCIL".format(self.id_))
1645
        # Create <questionnaire> for the rule
1646
        questionnaire = ET.Element("questionnaire", id=self.id_ + "_ocil")
1647
        title = ET.SubElement(questionnaire, "title")
1648
        title.text = self.title
1649
        actions = ET.SubElement(questionnaire, "actions")
1650
        test_action_ref = ET.SubElement(actions, "test_action_ref")
1651
        test_action_ref.text = self.id_ + "_action"
1652
        # Create <boolean_question_test_action> for the rule
1653
        action = ET.Element(
1654
            "boolean_question_test_action",
1655
            id=self.id_ + "_action",
1656
            question_ref=self.id_ + "_question")
1657
        when_true = ET.SubElement(action, "when_true")
1658
        result = ET.SubElement(when_true, "result")
1659
        result.text = "PASS"
1660
        when_true = ET.SubElement(action, "when_false")
1661
        result = ET.SubElement(when_true, "result")
1662
        result.text = "FAIL"
1663
        # Create <boolean_question>
1664
        boolean_question = ET.Element(
1665
            "boolean_question", id=self.id_ + "_question")
1666
        # TODO: The contents of <question_text> element used to be broken in
1667
        # the legacy XSLT implementation. The following code contains hacks
1668
        # to get the same results as in the legacy XSLT implementation.
1669
        # This enabled us a smooth transition to new OCIL generator
1670
        # without a need to mass-edit rule YAML files.
1671
        # We need to solve:
1672
        # TODO: using variables (aka XCCDF Values) in OCIL content
1673
        # TODO: using HTML formating tags eg. <pre> in OCIL content
1674
        #
1675
        # The "ocil" key in compiled rules contains HTML and XML elements
1676
        # but OCIL question texts shouldn't contain HTML or XML elements,
1677
        # therefore removing them.
1678
        if self.ocil is not None:
1679
            ocil_without_tags = re.sub(r"</?[^>]+>", "", self.ocil)
1680
        else:
1681
            ocil_without_tags = ""
1682
        # The "ocil" key in compiled rules contains XML entities which would
1683
        # be escaped by ET.Subelement() so we need to use add_sub_element()
1684
        # instead because we don't want to escape them.
1685
        question_text = add_sub_element(
1686
            boolean_question, "question_text", ocil_without_tags)
1687
        # The "ocil_clause" key in compiled rules also contains HTML and XML
1688
        # elements but unlike the "ocil" we want to escape the '<' and '>'
1689
        # characters.
1690
        # The empty ocil_clause causing broken question is in line with the
1691
        # legacy XSLT implementation.
1692
        ocil_clause = self.ocil_clause if self.ocil_clause else ""
1693
        question_text.text = (
1694
            u"{0}\n      Is it the case that {1}?\n      ".format(
1695
                question_text.text if question_text.text is not None else "",
1696
                ocil_clause))
1697
        return (questionnaire, action, boolean_question)
1698
1699 2
    def __hash__(self):
1700
        """ Controls are meant to be unique, so using the
1701
        ID should suffice"""
1702
        return hash(self.id_)
1703
1704 2
    def __eq__(self, other):
1705
        return isinstance(other, self.__class__) and self.id_ == other.id_
1706
1707 2
    def __ne__(self, other):
1708
        return not self != other
1709
1710 2
    def __lt__(self, other):
1711
        return self.id_ < other.id_
1712
1713 2
    def __str__(self):
1714
        return self.id_
1715
1716
1717 2
class DirectoryLoader(object):
1718 2
    def __init__(self, profiles_dir, env_yaml):
1719
        self.benchmark_file = None
1720
        self.group_file = None
1721
        self.loaded_group = None
1722
        self.rule_files = []
1723
        self.value_files = []
1724
        self.subdirectories = []
1725
1726
        self.all_values = dict()
1727
        self.all_rules = dict()
1728
        self.all_groups = dict()
1729
1730
        self.profiles_dir = profiles_dir
1731
        self.env_yaml = env_yaml
1732
        self.product = env_yaml["product"]
1733
1734
        self.parent_group = None
1735
1736 2
    def _collect_items_to_load(self, guide_directory):
1737
        for dir_item in sorted(os.listdir(guide_directory)):
1738
            dir_item_path = os.path.join(guide_directory, dir_item)
1739
            _, extension = os.path.splitext(dir_item)
1740
1741
            if extension == '.var':
1742
                self.value_files.append(dir_item_path)
1743
            elif dir_item == "benchmark.yml":
1744
                if self.benchmark_file:
1745
                    raise ValueError("Multiple benchmarks in one directory")
1746
                self.benchmark_file = dir_item_path
1747
            elif dir_item == "group.yml":
1748
                if self.group_file:
1749
                    raise ValueError("Multiple groups in one directory")
1750
                self.group_file = dir_item_path
1751
            elif extension == '.rule':
1752
                self.rule_files.append(dir_item_path)
1753
            elif is_rule_dir(dir_item_path):
1754
                self.rule_files.append(get_rule_dir_yaml(dir_item_path))
1755
            elif dir_item != "tests":
1756
                if os.path.isdir(dir_item_path):
1757
                    self.subdirectories.append(dir_item_path)
1758
                else:
1759
                    sys.stderr.write(
1760
                        "Encountered file '%s' while recursing, extension '%s' "
1761
                        "is unknown. Skipping..\n"
1762
                        % (dir_item, extension)
1763
                    )
1764
1765 2
    def load_benchmark_or_group(self, guide_directory):
1766
        """
1767
        Loads a given benchmark or group from the specified benchmark_file or
1768
        group_file, in the context of guide_directory, profiles_dir and env_yaml.
1769
1770
        Returns the loaded group or benchmark.
1771
        """
1772
        group = None
1773
        if self.group_file and self.benchmark_file:
1774
            raise ValueError("A .benchmark file and a .group file were found in "
1775
                             "the same directory '%s'" % (guide_directory))
1776
1777
        # we treat benchmark as a special form of group in the following code
1778
        if self.benchmark_file:
1779
            group = Benchmark.from_yaml(
1780
                self.benchmark_file, self.env_yaml
1781
            )
1782
            if self.profiles_dir:
1783
                group.add_profiles_from_dir(self.profiles_dir, self.env_yaml)
1784
1785
        if self.group_file:
1786
            group = Group.from_yaml(self.group_file, self.env_yaml)
1787
            prodtypes = parse_prodtype(group.prodtype)
1788
            if "all" in prodtypes or self.product in prodtypes:
1789
                self.all_groups[group.id_] = group
1790
1791
        return group
1792
1793 2
    def _load_group_process_and_recurse(self, guide_directory):
1794
        self.loaded_group = self.load_benchmark_or_group(guide_directory)
1795
1796
        if self.loaded_group:
1797
1798
            if self.parent_group:
1799
                self.parent_group.add_group(self.loaded_group, env_yaml=self.env_yaml)
1800
1801
            self._process_values()
1802
            self._recurse_into_subdirs()
1803
            self._process_rules()
1804
1805 2
    def process_directory_tree(self, start_dir, extra_group_dirs=None):
1806
        self._collect_items_to_load(start_dir)
1807
        if extra_group_dirs:
1808
            self.subdirectories += extra_group_dirs
1809
        self._load_group_process_and_recurse(start_dir)
1810
1811 2
    def process_directory_trees(self, directories):
1812
        start_dir = directories[0]
1813
        extra_group_dirs = directories[1:]
1814
        return self.process_directory_tree(start_dir, extra_group_dirs)
1815
1816 2
    def _recurse_into_subdirs(self):
1817
        for subdir in self.subdirectories:
1818
            loader = self._get_new_loader()
1819
            loader.parent_group = self.loaded_group
1820
            loader.process_directory_tree(subdir)
1821
            self.all_values.update(loader.all_values)
1822
            self.all_rules.update(loader.all_rules)
1823
            self.all_groups.update(loader.all_groups)
1824
1825 2
    def _get_new_loader(self):
1826
        raise NotImplementedError()
1827
1828 2
    def _process_values(self):
1829
        raise NotImplementedError()
1830
1831 2
    def _process_rules(self):
1832
        raise NotImplementedError()
1833
1834 2
    def save_all_entities(self, base_dir):
1835
        destdir = os.path.join(base_dir, "rules")
1836
        mkdir_p(destdir)
1837
        if self.all_rules:
1838
            self.save_entities(self.all_rules.values(), destdir)
1839
1840
        destdir = os.path.join(base_dir, "groups")
1841
        mkdir_p(destdir)
1842
        if self.all_groups:
1843
            self.save_entities(self.all_groups.values(), destdir)
1844
1845
        destdir = os.path.join(base_dir, "values")
1846
        mkdir_p(destdir)
1847
        if self.all_values:
1848
            self.save_entities(self.all_values.values(), destdir)
1849
1850 2
    def save_entities(self, entities, destdir):
1851
        if not entities:
1852
            return
1853
        for entity in entities:
1854
            basename = entity.id_ + ".yml"
1855
            dest_filename = os.path.join(destdir, basename)
1856
            entity.dump_yaml(dest_filename)
1857
1858
1859 2
class BuildLoader(DirectoryLoader):
1860 2
    def __init__(self, profiles_dir, env_yaml,
1861
                 sce_metadata_path=None):
1862
        super(BuildLoader, self).__init__(profiles_dir, env_yaml)
1863
1864
        self.sce_metadata = None
1865
        if sce_metadata_path and os.path.getsize(sce_metadata_path):
1866
            self.sce_metadata = json.load(open(sce_metadata_path, 'r'))
1867
1868 2
    def _process_values(self):
1869
        for value_yaml in self.value_files:
1870
            value = Value.from_yaml(value_yaml, self.env_yaml)
1871
            self.all_values[value.id_] = value
1872
            self.loaded_group.add_value(value)
1873
1874 2
    def _process_rules(self):
1875
        for rule_yaml in self.rule_files:
1876
            try:
1877
                rule = Rule.from_yaml(rule_yaml, self.env_yaml, self.sce_metadata)
1878
            except DocumentationNotComplete:
1879
                # Happens on non-debug build when a rule is "documentation-incomplete"
1880
                continue
1881
            prodtypes = parse_prodtype(rule.prodtype)
1882
            if "all" not in prodtypes and self.product not in prodtypes:
1883
                continue
1884
            self.all_rules[rule.id_] = rule
1885
            self.loaded_group.add_rule(rule, env_yaml=self.env_yaml)
1886
1887
            if self.loaded_group.platforms:
1888
                rule.inherited_platforms += self.loaded_group.platforms
1889
1890
            rule.normalize(self.env_yaml["product"])
1891
1892 2
    def _get_new_loader(self):
1893
        loader = BuildLoader(
1894
            self.profiles_dir, self.env_yaml)
1895
        # Do it this way so we only have to parse the SCE metadata once.
1896
        loader.sce_metadata = self.sce_metadata
1897
        return loader
1898
1899 2
    def export_group_to_file(self, filename):
1900
        return self.loaded_group.to_file(filename)
1901
1902
1903 2
class LinearLoader(object):
1904 2
    def __init__(self, env_yaml, resolved_path):
1905
        self.resolved_rules_dir = os.path.join(resolved_path, "rules")
1906
        self.rules = dict()
1907
1908
        self.resolved_profiles_dir = os.path.join(resolved_path, "profiles")
1909
        self.profiles = dict()
1910
1911
        self.resolved_groups_dir = os.path.join(resolved_path, "groups")
1912
        self.groups = dict()
1913
1914
        self.resolved_values_dir = os.path.join(resolved_path, "values")
1915
        self.values = dict()
1916
1917
        self.benchmark = None
1918
        self.env_yaml = env_yaml
1919
1920 2
    def find_first_groups_ids(self, start_dir):
1921
        group_files = glob.glob(os.path.join(start_dir, "*", "group.yml"))
1922
        group_ids = [fname.split(os.path.sep)[-2] for fname in group_files]
1923
        return group_ids
1924
1925 2
    def load_entities_by_id(self, filenames, destination, cls):
1926
        for fname in filenames:
1927
            entity = cls.from_yaml(fname, self.env_yaml)
1928
            destination[entity.id_] = entity
1929
1930 2
    def load_benchmark(self, directory):
1931
        self.benchmark = Benchmark.from_yaml(
1932
            os.path.join(directory, "benchmark.yml"), self.env_yaml)
1933
1934
        self.benchmark.add_profiles_from_dir(self.resolved_profiles_dir, self.env_yaml)
1935
1936
        benchmark_first_groups = self.find_first_groups_ids(directory)
1937
        for gid in benchmark_first_groups:
1938
            try:
1939
                self.benchmark.add_group(self.groups[gid], self.env_yaml)
1940
            except KeyError as exc:
1941
                # Add only the groups we have compiled and loaded
1942
                pass
1943
1944 2
    def load_compiled_content(self):
1945
        filenames = glob.glob(os.path.join(self.resolved_rules_dir, "*.yml"))
1946
        self.load_entities_by_id(filenames, self.rules, Rule)
1947
1948
        filenames = glob.glob(os.path.join(self.resolved_groups_dir, "*.yml"))
1949
        self.load_entities_by_id(filenames, self.groups, Group)
1950
1951
        filenames = glob.glob(os.path.join(self.resolved_profiles_dir, "*.yml"))
1952
        self.load_entities_by_id(filenames, self.profiles, Profile)
1953
1954
        filenames = glob.glob(os.path.join(self.resolved_values_dir, "*.yml"))
1955
        self.load_entities_by_id(filenames, self.values, Value)
1956
1957
        for g in self.groups.values():
1958
            g.load_entities(self.rules, self.values, self.groups)
1959
1960 2
    def export_benchmark_to_file(self, filename):
1961
        register_namespaces()
1962
        return self.benchmark.to_file(filename, self.env_yaml)
1963
1964 2
    def export_ocil_to_file(self, filename):
1965
        root = ET.Element('ocil')
1966
        root.set('xmlns:xsi', xsi_namespace)
1967
        root.set("xmlns", ocil_namespace)
1968
        root.set("xmlns:xhtml", xhtml_namespace)
1969
        tree = ET.ElementTree(root)
1970
        generator = ET.SubElement(root, "generator")
1971
        product_name = ET.SubElement(generator, "product_name")
1972
        product_name.text = "build_shorthand.py from SCAP Security Guide"
1973
        product_version = ET.SubElement(generator, "product_version")
1974
        product_version.text = "ssg: " + self.env_yaml["ssg_version_str"]
1975
        schema_version = ET.SubElement(generator, "schema_version")
1976
        schema_version.text = "2.0"
1977
        timestamp_el = ET.SubElement(generator, "timestamp")
1978
        timestamp_el.text = timestamp
1979
        questionnaires = ET.SubElement(root, "questionnaires")
1980
        test_actions = ET.SubElement(root, "test_actions")
1981
        questions = ET.SubElement(root, "questions")
1982
        for rule in self.rules.values():
1983
            if not rule.ocil and not rule.ocil_clause:
1984
                continue
1985
            questionnaire, action, boolean_question = rule.to_ocil()
1986
            questionnaires.append(questionnaire)
1987
            test_actions.append(action)
1988
            questions.append(boolean_question)
1989
        tree.write(filename)
1990