Test Failed
Push — master ( aa538d...df2cf1 )
by Jan
02:59 queued 10s
created

ssg.build_yaml.Group.to_file()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.4218

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 2
dl 0
loc 4
ccs 1
cts 4
cp 0.25
crap 1.4218
rs 10
c 0
b 0
f 0
1 2
from __future__ import absolute_import
2 2
from __future__ import print_function
3
4 2
from collections import defaultdict
5 2
from copy import deepcopy
6 2
import datetime
7 2
import json
8 2
import os
9 2
import os.path
10 2
import re
11 2
import sys
12 2
from xml.sax.saxutils import escape
13 2
import glob
14
15 2
import yaml
16
17 2
from .build_cpe import CPEDoesNotExist, parse_platform_definition
18 2
from .constants import (XCCDF_REFINABLE_PROPERTIES,
19
                        SCE_SYSTEM,
20
                        cce_uri,
21
                        dc_namespace,
22
                        ocil_cs,
23
                        ocil_namespace,
24
                        oval_namespace,
25
                        xhtml_namespace,
26
                        xsi_namespace,
27
                        timestamp,
28
                        SSG_BENCHMARK_LATEST_URI,
29
                        SSG_PROJECT_NAME,
30
                        SSG_REF_URIS
31
                        )
32 2
from .rules import get_rule_dir_id, get_rule_dir_yaml, is_rule_dir
33 2
from .rule_yaml import parse_prodtype
34
35 2
from .cce import is_cce_format_valid, is_cce_value_valid
36 2
from .yaml import DocumentationNotComplete, open_and_expand, open_and_macro_expand
37 2
from .utils import required_key, mkdir_p
38
39 2
from .xml import ElementTree as ET, add_xhtml_namespace, register_namespaces, parse_file
40 2
from .shims import unicode_func
41
42
43 2
def dump_yaml_preferably_in_original_order(dictionary, file_object):
44
    try:
45
        return yaml.dump(dictionary, file_object, indent=4, sort_keys=False)
46
    except TypeError as exc:
47
        # Older versions of libyaml don't understand the sort_keys kwarg
48
        if "sort_keys" not in str(exc):
49
            raise exc
50
        return yaml.dump(dictionary, file_object, indent=4)
51
52
53 2
def add_sub_element(parent, tag, data):
54
    """
55
    Creates a new child element under parent with tag tag, and sets
56
    data as the content under the tag. In particular, data is a string
57
    to be parsed as an XML tree, allowing sub-elements of children to be
58
    added.
59
60
    If data should not be parsed as an XML tree, either escape the contents
61
    before passing into this function, or use ElementTree.SubElement().
62
63
    Returns the newly created subelement of type tag.
64
    """
65
    namespaced_data = add_xhtml_namespace(data)
66
    # This is used because our YAML data contain XML and XHTML elements
67
    # ET.SubElement() escapes the < > characters by &lt; and &gt;
68
    # and therefore it does not add child elements
69
    # we need to do a hack instead
70
    # TODO: Remove this function after we move to Markdown everywhere in SSG
71
    ustr = unicode_func('<{0} xmlns:xhtml="{2}">{1}</{0}>').format(tag, namespaced_data, xhtml_namespace)
72
73
    try:
74
        element = ET.fromstring(ustr.encode("utf-8"))
75
    except Exception:
76
        msg = ("Error adding subelement to an element '{0}' from string: '{1}'"
77
               .format(parent.tag, ustr))
78
        raise RuntimeError(msg)
79
80
    parent.append(element)
81
    return element
82
83
84 2
def reorder_according_to_ordering(unordered, ordering, regex=None):
85 2
    ordered = []
86 2
    if regex is None:
87 2
        regex = "|".join(["({0})".format(item) for item in ordering])
88 2
    regex = re.compile(regex)
89
90 2
    items_to_order = list(filter(regex.match, unordered))
91 2
    unordered = set(unordered)
92
93 2
    for priority_type in ordering:
94 2
        for item in items_to_order:
95 2
            if priority_type in item and item in unordered:
96 2
                ordered.append(item)
97 2
                unordered.remove(item)
98 2
    ordered.extend(sorted(unordered))
99 2
    return ordered
100
101
102 2
def add_warning_elements(element, warnings):
103
    # The use of [{dict}, {dict}] in warnings is to handle the following
104
    # scenario where multiple warnings have the same category which is
105
    # valid in SCAP and our content:
106
    #
107
    # warnings:
108
    #     - general: Some general warning
109
    #     - general: Some other general warning
110
    #     - general: |-
111
    #         Some really long multiline general warning
112
    #
113
    # Each of the {dict} should have only one key/value pair.
114
    for warning_dict in warnings:
115
        warning = add_sub_element(element, "warning", list(warning_dict.values())[0])
116
        warning.set("category", list(warning_dict.keys())[0])
117
118
119 2
def add_nondata_subelements(element, subelement, attribute, attr_data):
120
    """Add multiple iterations of a sublement that contains an attribute but no data
121
       For example, <requires id="my_required_id"/>"""
122
    for data in attr_data:
123
        req = ET.SubElement(element, subelement)
124
        req.set(attribute, data)
125
126
127 2
def check_warnings(xccdf_structure):
128 2
    for warning_list in xccdf_structure.warnings:
129
        if len(warning_list) != 1:
130
            msg = "Only one key/value pair should exist for each warnings dictionary"
131
            raise ValueError(msg)
132
133
134 2
def add_reference_elements(element, references, ref_uri_dict):
135
    for ref_type, ref_vals in references.items():
136
        for ref_val in ref_vals.split(","):
137
            # This assumes that a single srg key may have items from multiple SRG types
138
            if ref_type == 'srg':
139
                if ref_val.startswith('SRG-OS-'):
140
                    ref_href = ref_uri_dict['os-srg']
141
                elif ref_val.startswith('SRG-APP-'):
142
                    ref_href = ref_uri_dict['app-srg']
143
                else:
144
                    raise ValueError("SRG {0} doesn't have a URI defined.".format(ref_val))
145
            else:
146
                try:
147
                    ref_href = ref_uri_dict[ref_type]
148
                except KeyError as exc:
149
                    msg = (
150
                        "Error processing reference {0}: {1} in Rule {2}."
151
                        .format(ref_type, ref_vals, self.id_))
152
                    raise ValueError(msg)
153
154
            ref = ET.SubElement(element, 'reference')
155
            ref.set("href", ref_href)
156
            ref.text = ref_val
157
158
159 2
def add_benchmark_metadata(element, contributors_file):
160
    metadata = ET.SubElement(element, "metadata")
161
162
    publisher = ET.SubElement(metadata, "{%s}publisher" % dc_namespace)
163
    publisher.text = SSG_PROJECT_NAME
164
165
    creator = ET.SubElement(metadata, "{%s}creator" % dc_namespace)
166
    creator.text = SSG_PROJECT_NAME
167
168
    contrib_tree = parse_file(contributors_file)
169
    for c in contrib_tree.iter('contributor'):
170
        contributor = ET.SubElement(metadata, "{%s}contributor" % dc_namespace)
171
        contributor.text = c.text
172
173
    source = ET.SubElement(metadata, "{%s}source" % dc_namespace)
174
    source.text = SSG_BENCHMARK_LATEST_URI
175
176
177 2
class SelectionHandler(object):
178 2
    def __init__(self):
179 2
        self.refine_rules = defaultdict(list)
180 2
        self.variables = dict()
181 2
        self.unselected = []
182 2
        self.selected = []
183
184 2
    @property
185
    def selections(self):
186
        selections = []
187
        for item in self.selected:
188
            selections.append(str(item))
189
        for item in self.unselected:
190
            selections.append("!"+str(item))
191
        for varname in self.variables.keys():
192
            selections.append(varname+"="+self.variables.get(varname))
193
        for rule, refinements in self.refine_rules.items():
194
            for prop, val in refinements:
195
                selections.append("{rule}.{property}={value}"
196
                                  .format(rule=rule, property=prop, value=val))
197
        return selections
198
199 2
    @selections.setter
200
    def selections(self, entries):
201 2
        for item in entries:
202 2
            self.apply_selection(item)
203
204 2
    def apply_selection(self, item):
205 2
        if "." in item:
206
            rule, refinement = item.split(".", 1)
207
            property_, value = refinement.split("=", 1)
208
            if property_ not in XCCDF_REFINABLE_PROPERTIES:
209
                msg = ("Property '{property_}' cannot be refined. "
210
                       "Rule properties that can be refined are {refinables}. "
211
                       "Fix refinement '{rule_id}.{property_}={value}' in profile '{profile}'."
212
                       .format(property_=property_, refinables=XCCDF_REFINABLE_PROPERTIES,
213
                               rule_id=rule, value=value, profile=self.id_)
214
                       )
215
                raise ValueError(msg)
216
            self.refine_rules[rule].append((property_, value))
217 2
        elif "=" in item:
218 2
            varname, value = item.split("=", 1)
219 2
            self.variables[varname] = value
220 2
        elif item.startswith("!"):
221
            self.unselected.append(item[1:])
222
        else:
223 2
            self.selected.append(item)
224
225 2
    def _subtract_refinements(self, extended_refinements):
226
        """
227
        Given a dict of rule refinements from the extended profile,
228
        "undo" every refinement prefixed with '!' in this profile.
229
        """
230
        for rule, refinements in list(self.refine_rules.items()):
231
            if rule.startswith("!"):
232
                for prop, val in refinements:
233
                    extended_refinements[rule[1:]].remove((prop, val))
234
                del self.refine_rules[rule]
235
        return extended_refinements
236
237 2
    def update_with(self, rhs):
238
        extended_selects = set(rhs.selected)
239
        extra_selections = extended_selects.difference(set(self.selected))
240
        self.selected.extend(list(extra_selections))
241
242
        updated_variables = dict(rhs.variables)
243
        updated_variables.update(self.variables)
244
        self.variables = updated_variables
245
246
        extended_refinements = deepcopy(rhs.refine_rules)
247
        updated_refinements = self._subtract_refinements(extended_refinements)
248
        updated_refinements.update(self.refine_rules)
249
        self.refine_rules = updated_refinements
250
251
252 2
class XCCDFEntity(object):
253
    """
254
    This class can load itself from a YAML with Jinja macros,
255
    and it can also save itself to YAML.
256
257
    It is supposed to work with the content in the project,
258
    when entities are defined in the benchmark tree,
259
    and they are compiled into flat YAMLs to the build directory.
260
    """
261 2
    KEYS = dict(
262
            id_=lambda: "",
263
            definition_location=lambda: "",
264
    )
265
266 2
    MANDATORY_KEYS = set()
267
268 2
    GENERIC_FILENAME = ""
269 2
    ID_LABEL = "id"
270
271 2
    def __init__(self, id_):
272 2
        super(XCCDFEntity, self).__init__()
273 2
        self._assign_defaults()
274 2
        self.id_ = id_
275
276 2
    def _assign_defaults(self):
277 2
        for key, default in self.KEYS.items():
278 2
            default_val = default()
279 2
            if isinstance(default_val, RuntimeError):
280
                default_val = None
281 2
            setattr(self, key, default_val)
282
283 2
    @classmethod
284
    def get_instance_from_full_dict(cls, data):
285
        """
286
        Given a defining dictionary, produce an instance
287
        by treating all dict elements as attributes.
288
289
        Extend this if you want tight control over the instance creation process.
290
        """
291 2
        entity = cls(data["id_"])
292 2
        for key, value in data.items():
293 2
            setattr(entity, key, value)
294 2
        return entity
295
296 2
    @classmethod
297
    def process_input_dict(cls, input_contents, env_yaml):
298
        """
299
        Take the contents of the definition as a dictionary, and
300
        add defaults or raise errors if a required member is not present.
301
302
        Extend this if you want to add, remove or alter the result
303
        that will constitute the new instance.
304
        """
305 2
        data = dict()
306
307 2
        for key, default in cls.KEYS.items():
308 2
            if key in input_contents:
309 2
                data[key] = input_contents[key]
310 2
                del input_contents[key]
311 2
                continue
312
313 2
            if key not in cls.MANDATORY_KEYS:
314 2
                data[key] = cls.KEYS[key]()
315
            else:
316
                msg = (
317
                    "Key '{key}' is mandatory for definition of '{class_name}'."
318
                    .format(key=key, class_name=cls.__name__))
319
                raise ValueError(msg)
320
321 2
        return data
322
323 2
    @classmethod
324 2
    def parse_yaml_into_processed_dict(cls, yaml_file, env_yaml=None):
325
        """
326
        Given yaml filename and environment info, produce a dictionary
327
        that defines the instance to be created.
328
        This wraps :meth:`process_input_dict` and it adds generic keys on the top:
329
330
        - `id_` as the entity ID that is deduced either from thefilename,
331
          or from the parent directory name.
332
        - `definition_location` as the original location whenre the entity got defined.
333
        """
334 2
        file_basename = os.path.basename(yaml_file)
335 2
        entity_id = file_basename.split(".")[0]
336 2
        if file_basename == cls.GENERIC_FILENAME:
337 2
            entity_id = os.path.basename(os.path.dirname(yaml_file))
338
339 2
        if env_yaml:
340 2
            env_yaml[cls.ID_LABEL] = entity_id
341 2
        yaml_data = open_and_macro_expand(yaml_file, env_yaml)
342
343 2
        try:
344 2
            processed_data = cls.process_input_dict(yaml_data, env_yaml)
345
        except ValueError as exc:
346
            msg = (
347
                "Error processing {yaml_file}: {exc}"
348
                .format(yaml_file=yaml_file, exc=str(exc)))
349
            raise ValueError(msg)
350
351 2
        if yaml_data:
352
            msg = (
353
                "Unparsed YAML data in '{yaml_file}': {keys}"
354
                .format(yaml_file=yaml_file, keys=list(yaml_data.keys())))
355
            raise RuntimeError(msg)
356
357 2
        if not processed_data.get("definition_location", ""):
358 2
            processed_data["definition_location"] = yaml_file
359
360 2
        processed_data["id_"] = entity_id
361
362 2
        return processed_data
363
364 2
    @classmethod
365 2
    def from_yaml(cls, yaml_file, env_yaml=None):
366 2
        yaml_file = os.path.normpath(yaml_file)
367
368 2
        local_env_yaml = None
369 2
        if env_yaml:
370 2
            local_env_yaml = dict()
371 2
            local_env_yaml.update(env_yaml)
372
373 2
        try:
374 2
            data_dict = cls.parse_yaml_into_processed_dict(yaml_file, local_env_yaml)
375
        except DocumentationNotComplete as exc:
376
            raise
377
        except Exception as exc:
378
            msg = (
379
                "Error loading a {class_name} from {filename}: {error}"
380
                .format(class_name=cls.__name__, filename=yaml_file, error=str(exc)))
381
            raise RuntimeError(msg)
382
383 2
        result = cls.get_instance_from_full_dict(data_dict)
384
385 2
        return result
386
387 2
    def represent_as_dict(self):
388
        """
389
        Produce a dict representation of the class.
390
391
        Extend this method if you need the representation to be different from the object.
392
        """
393 2
        data = dict()
394 2
        for key in self.KEYS:
395 2
            value = getattr(self, key)
396 2
            if value or True:
397 2
                data[key] = getattr(self, key)
398 2
        del data["id_"]
399 2
        return data
400
401 2
    def dump_yaml(self, file_name, documentation_complete=True):
402
        to_dump = self.represent_as_dict()
403
        to_dump["documentation_complete"] = documentation_complete
404
        with open(file_name, "w+") as f:
405
            dump_yaml_preferably_in_original_order(to_dump, f)
406
407 2
    def to_xml_element(self):
408
        raise NotImplementedError()
409
410
411 2
class Profile(XCCDFEntity, SelectionHandler):
412
    """Represents XCCDF profile
413
    """
414 2
    KEYS = dict(
415
        title=lambda: "",
416
        description=lambda: "",
417
        extends=lambda: "",
418
        metadata=lambda: None,
419
        reference=lambda: None,
420
        selections=lambda: list(),
421
        platforms=lambda: set(),
422
        cpe_names=lambda: set(),
423
        platform=lambda: None,
424
        filter_rules=lambda: "",
425
        ** XCCDFEntity.KEYS
426
    )
427
428 2
    MANDATORY_KEYS = {
429
        "title",
430
        "description",
431
        "selections",
432
    }
433
434 2
    @classmethod
435
    def process_input_dict(cls, input_contents, env_yaml):
436 2
        input_contents = super(Profile, cls).process_input_dict(input_contents, env_yaml)
437
438 2
        platform = input_contents.get("platform")
439 2
        if platform is not None:
440
            input_contents["platforms"].add(platform)
441
442 2
        if env_yaml:
443 2
            for platform in input_contents["platforms"]:
444
                try:
445
                    new_cpe_name = env_yaml["product_cpes"].get_cpe_name(platform)
446
                    input_contents["cpe_names"].add(new_cpe_name)
447
                except CPEDoesNotExist:
448
                    msg = (
449
                        "Unsupported platform '{platform}' in a profile."
450
                        .format(platform=platform))
451
                    raise CPEDoesNotExist(msg)
452
453 2
        return input_contents
454
455 2
    @property
456
    def rule_filter(self):
457
        if self.filter_rules:
458
            return rule_filter_from_def(self.filter_rules)
459
        else:
460
            return noop_rule_filterfunc
461
462 2
    def to_xml_element(self):
463
        element = ET.Element('Profile')
464
        element.set("id", self.id_)
465
        if self.extends:
466
            element.set("extends", self.extends)
467
        title = add_sub_element(element, "title", self.title)
468
        title.set("override", "true")
469
        desc = add_sub_element(element, "description", self.description)
470
        desc.set("override", "true")
471
472
        if self.reference:
473
            add_sub_element(element, "reference", escape(self.reference))
474
475
        for cpe_name in self.cpe_names:
476
            plat = ET.SubElement(element, "platform")
477
            plat.set("idref", cpe_name)
478
479
        for selection in self.selected:
480
            select = ET.Element("select")
481
            select.set("idref", selection)
482
            select.set("selected", "true")
483
            element.append(select)
484
485
        for selection in self.unselected:
486
            unselect = ET.Element("select")
487
            unselect.set("idref", selection)
488
            unselect.set("selected", "false")
489
            element.append(unselect)
490
491
        for value_id, selector in self.variables.items():
492
            refine_value = ET.Element("refine-value")
493
            refine_value.set("idref", value_id)
494
            refine_value.set("selector", selector)
495
            element.append(refine_value)
496
497
        for refined_rule, refinement_list in self.refine_rules.items():
498
            refine_rule = ET.Element("refine-rule")
499
            refine_rule.set("idref", refined_rule)
500
            for refinement in refinement_list:
501
                refine_rule.set(refinement[0], refinement[1])
502
            element.append(refine_rule)
503
504
        return element
505
506 2
    def get_rule_selectors(self):
507 2
        return self.selected + self.unselected
508
509 2
    def get_variable_selectors(self):
510 2
        return self.variables
511
512 2
    def validate_refine_rules(self, rules):
513
        existing_rule_ids = [r.id_ for r in rules]
514
        for refine_rule, refinement_list in self.refine_rules.items():
515
            # Take first refinement to ilustrate where the error is
516
            # all refinements in list are invalid, so it doesn't really matter
517
            a_refinement = refinement_list[0]
518
519
            if refine_rule not in existing_rule_ids:
520
                msg = (
521
                    "You are trying to refine a rule that doesn't exist. "
522
                    "Rule '{rule_id}' was not found in the benchmark. "
523
                    "Please check all rule refinements for rule: '{rule_id}', for example: "
524
                    "- {rule_id}.{property_}={value}' in profile {profile_id}."
525
                    .format(rule_id=refine_rule, profile_id=self.id_,
526
                            property_=a_refinement[0], value=a_refinement[1])
527
                    )
528
                raise ValueError(msg)
529
530
            if refine_rule not in self.get_rule_selectors():
531
                msg = ("- {rule_id}.{property_}={value}' in profile '{profile_id}' is refining "
532
                       "a rule that is not selected by it. The refinement will not have any "
533
                       "noticeable effect. Either select the rule or remove the rule refinement."
534
                       .format(rule_id=refine_rule, property_=a_refinement[0],
535
                               value=a_refinement[1], profile_id=self.id_)
536
                       )
537
                raise ValueError(msg)
538
539 2
    def validate_variables(self, variables):
540
        variables_by_id = dict()
541
        for var in variables:
542
            variables_by_id[var.id_] = var
543
544
        for var_id, our_val in self.variables.items():
545
            if var_id not in variables_by_id:
546
                all_vars_list = [" - %s" % v for v in variables_by_id.keys()]
547
                msg = (
548
                    "Value '{var_id}' in profile '{profile_name}' is not known. "
549
                    "We know only variables:\n{var_names}"
550
                    .format(
551
                        var_id=var_id, profile_name=self.id_,
552
                        var_names="\n".join(sorted(all_vars_list)))
553
                )
554
                raise ValueError(msg)
555
556
            allowed_selectors = [str(s) for s in variables_by_id[var_id].options.keys()]
557
            if our_val not in allowed_selectors:
558
                msg = (
559
                    "Value '{var_id}' in profile '{profile_name}' "
560
                    "uses the selector '{our_val}'. "
561
                    "This is not possible, as only selectors {all_selectors} are available. "
562
                    "Either change the selector used in the profile, or "
563
                    "add the selector-value pair to the variable definition."
564
                    .format(
565
                        var_id=var_id, profile_name=self.id_, our_val=our_val,
566
                        all_selectors=allowed_selectors,
567
                    )
568
                )
569
                raise ValueError(msg)
570
571 2
    def validate_rules(self, rules, groups):
572
        existing_rule_ids = [r.id_ for r in rules]
573
        rule_selectors = self.get_rule_selectors()
574
        for id_ in rule_selectors:
575
            if id_ in groups:
576
                msg = (
577
                    "You have selected a group '{group_id}' instead of a "
578
                    "rule. Groups have no effect in the profile and are not "
579
                    "allowed to be selected. Please remove '{group_id}' "
580
                    "from profile '{profile_id}' before proceeding."
581
                    .format(group_id=id_, profile_id=self.id_)
582
                )
583
                raise ValueError(msg)
584
            if id_ not in existing_rule_ids:
585
                msg = (
586
                    "Rule '{rule_id}' was not found in the benchmark. Please "
587
                    "remove rule '{rule_id}' from profile '{profile_id}' "
588
                    "before proceeding."
589
                    .format(rule_id=id_, profile_id=self.id_)
590
                )
591
                raise ValueError(msg)
592
593 2
    def __sub__(self, other):
594
        profile = Profile(self.id_)
595
        profile.title = self.title
596
        profile.description = self.description
597
        profile.extends = self.extends
598
        profile.platforms = self.platforms
599
        profile.platform = self.platform
600
        profile.selected = list(set(self.selected) - set(other.selected))
601
        profile.selected.sort()
602
        profile.unselected = list(set(self.unselected) - set(other.unselected))
603
        profile.variables = dict ((k, v) for (k, v) in self.variables.items()
604
                             if k not in other.variables or v != other.variables[k])
605
        return profile
606
607
608 2
class ResolvableProfile(Profile):
609 2
    def __init__(self, * args, ** kwargs):
610
        super(ResolvableProfile, self).__init__(* args, ** kwargs)
611
        self.resolved = False
612
613 2
    def _controls_ids_to_controls(self, controls_manager, policy_id, control_id_list):
614
        items = [controls_manager.get_control(policy_id, cid) for cid in control_id_list]
615
        return items
616
617 2
    def resolve_controls(self, controls_manager):
618
        pass
619
620 2
    def extend_by(self, extended_profile):
621
        self.update_with(extended_profile)
622
623 2
    def resolve_selections_with_rules(self, rules_by_id):
624
        selections = set()
625
        for rid in self.selected:
626
            if rid not in rules_by_id:
627
                continue
628
            rule = rules_by_id[rid]
629
            if not self.rule_filter(rule):
630
                continue
631
            selections.add(rid)
632
        self.selected = list(selections)
633
634 2
    def resolve(self, all_profiles, rules_by_id, controls_manager=None):
635
        if self.resolved:
636
            return
637
638
        if controls_manager:
639
            self.resolve_controls(controls_manager)
640
641
        self.resolve_selections_with_rules(rules_by_id)
642
643
        if self.extends:
644
            if self.extends not in all_profiles:
645
                msg = (
646
                    "Profile {name} extends profile {extended}, but "
647
                    "only profiles {known_profiles} are available for resolution."
648
                    .format(name=self.id_, extended=self.extends,
649
                            known_profiles=list(all_profiles.keys())))
650
                raise RuntimeError(msg)
651
            extended_profile = all_profiles[self.extends]
652
            extended_profile.resolve(all_profiles, rules_by_id, controls_manager)
653
654
            self.extend_by(extended_profile)
655
656
        self.selected = [s for s in set(self.selected) if s not in self.unselected]
657
658
        self.unselected = []
659
        self.extends = None
660
661
        self.selected = sorted(self.selected)
662
663
        for rid in self.selected:
664
            if rid not in rules_by_id:
665
                msg = (
666
                    "Rule {rid} is selected by {profile}, but the rule is not available. "
667
                    "This may be caused by a discrepancy of prodtypes."
668
                    .format(rid=rid, profile=self.id_))
669
                raise ValueError(msg)
670
671
        self.resolved = True
672
673
674 2
class ProfileWithInlinePolicies(ResolvableProfile):
675 2
    def __init__(self, * args, ** kwargs):
676
        super(ProfileWithInlinePolicies, self).__init__(* args, ** kwargs)
677
        self.controls_by_policy = defaultdict(list)
678
679 2
    def apply_selection(self, item):
680
        # ":" is the delimiter for controls but not when the item is a variable
681
        if ":" in item and "=" not in item:
682
            policy_id, control_id = item.split(":", 1)
683
            self.controls_by_policy[policy_id].append(control_id)
684
        else:
685
            super(ProfileWithInlinePolicies, self).apply_selection(item)
686
687 2
    def _process_controls_ids_into_controls(self, controls_manager, policy_id, controls_ids):
688
        controls = []
689
        for cid in controls_ids:
690
            if not cid.startswith("all"):
691
                controls.extend(
692
                    self._controls_ids_to_controls(controls_manager, policy_id, [cid]))
693
            elif ":" in cid:
694
                _, level_id = cid.split(":", 1)
695
                controls.extend(
696
                    controls_manager.get_all_controls_of_level(policy_id, level_id))
697
            else:
698
                controls.extend(
699
                    controls_manager.get_all_controls(policy_id))
700
        return controls
701
702 2
    def resolve_controls(self, controls_manager):
703
        for policy_id, controls_ids in self.controls_by_policy.items():
704
            controls = self._process_controls_ids_into_controls(
705
                controls_manager, policy_id, controls_ids)
706
707
            for c in controls:
708
                self.update_with(c)
709
710
711 2
class Value(XCCDFEntity):
712
    """Represents XCCDF Value
713
    """
714 2
    KEYS = dict(
715
        title=lambda: "",
716
        description=lambda: "",
717
        type=lambda: "",
718
        operator=lambda: "equals",
719
        interactive=lambda: False,
720
        options=lambda: dict(),
721
        warnings=lambda: list(),
722
        ** XCCDFEntity.KEYS
723
    )
724
725 2
    MANDATORY_KEYS = {
726
        "title",
727
        "description",
728
        "type",
729
    }
730
731 2
    @classmethod
732
    def process_input_dict(cls, input_contents, env_yaml):
733 2
        input_contents["interactive"] = (
734
            input_contents.get("interactive", "false").lower() == "true")
735
736 2
        data = super(Value, cls).process_input_dict(input_contents, env_yaml)
737
738 2
        possible_operators = ["equals", "not equal", "greater than",
739
                              "less than", "greater than or equal",
740
                              "less than or equal", "pattern match"]
741
742 2
        if data["operator"] not in possible_operators:
743
            raise ValueError(
744
                "Found an invalid operator value '%s'. "
745
                "Expected one of: %s"
746
                % (data["operator"], ", ".join(possible_operators))
747
            )
748
749 2
        return data
750
751 2
    @classmethod
752 2
    def from_yaml(cls, yaml_file, env_yaml=None):
753 2
        value = super(Value, cls).from_yaml(yaml_file, env_yaml)
754
755 2
        check_warnings(value)
756
757 2
        return value
758
759 2
    def to_xml_element(self):
760
        value = ET.Element('Value')
761
        value.set('id', self.id_)
762
        value.set('type', self.type)
763
        if self.operator != "equals":  # equals is the default
764
            value.set('operator', self.operator)
765
        if self.interactive:  # False is the default
766
            value.set('interactive', 'true')
767
        title = ET.SubElement(value, 'title')
768
        title.text = self.title
769
        add_sub_element(value, 'description', self.description)
770
        add_warning_elements(value, self.warnings)
771
772
        for selector, option in self.options.items():
773
            # do not confuse Value with big V with value with small v
774
            # value is child element of Value
775
            value_small = ET.SubElement(value, 'value')
776
            # by XCCDF spec, default value is value without selector
777
            if selector != "default":
778
                value_small.set('selector', str(selector))
779
            value_small.text = str(option)
780
781
        return value
782
783 2
    def to_file(self, file_name):
784
        root = self.to_xml_element()
785
        tree = ET.ElementTree(root)
786
        tree.write(file_name)
787
788
789 2
class Benchmark(XCCDFEntity):
790
    """Represents XCCDF Benchmark
791
    """
792 2
    KEYS = dict(
793
        title=lambda: "",
794
        status=lambda: "",
795
        description=lambda: "",
796
        notice_id=lambda: "",
797
        notice_description=lambda: "",
798
        front_matter=lambda: "",
799
        rear_matter=lambda: "",
800
        cpes=lambda: list(),
801
        version=lambda: "",
802
        profiles=lambda: list(),
803
        values=lambda: dict(),
804
        groups=lambda: dict(),
805
        rules=lambda: dict(),
806
        product_cpe_names=lambda: list(),
807
        ** XCCDFEntity.KEYS
808
    )
809
810 2
    MANDATORY_KEYS = {
811
        "title",
812
        "status",
813
        "description",
814
        "front_matter",
815
        "rear_matter",
816
    }
817
818 2
    GENERIC_FILENAME = "benchmark.yml"
819
820 2
    def load_entities(self, rules_by_id, values_by_id, groups_by_id):
821
        for rid, val in self.rules.items():
822
            if not val:
823
                self.rules[rid] = rules_by_id[rid]
824
825
        for vid, val in self.values.items():
826
            if not val:
827
                self.values[vid] = values_by_id[vid]
828
829
        for gid, val in self.groups.items():
830
            if not val:
831
                self.groups[gid] = groups_by_id[gid]
832
833 2
    @classmethod
834
    def process_input_dict(cls, input_contents, env_yaml):
835
        input_contents["front_matter"] = input_contents["front-matter"]
836
        del input_contents["front-matter"]
837
        input_contents["rear_matter"] = input_contents["rear-matter"]
838
        del input_contents["rear-matter"]
839
840
        data = super(Benchmark, cls).process_input_dict(input_contents, env_yaml)
841
842
        notice_contents = required_key(input_contents, "notice")
843
        del input_contents["notice"]
844
845
        data["notice_id"] = required_key(notice_contents, "id")
846
        del notice_contents["id"]
847
848
        data["notice_description"] = required_key(notice_contents, "description")
849
        del notice_contents["description"]
850
851
        return data
852
853 2
    def represent_as_dict(self):
854
        data = super(Benchmark, cls).represent_as_dict()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable cls does not seem to be defined.
Loading history...
855
        data["rear-matter"] = data["rear_matter"]
856
        del data["rear_matter"]
857
858
        data["front-matter"] = data["front_matter"]
859
        del data["front_matter"]
860
        return data
861
862 2
    @classmethod
863 2
    def from_yaml(cls, yaml_file, env_yaml=None):
864
        benchmark = super(Benchmark, cls).from_yaml(yaml_file, env_yaml)
865
        if env_yaml:
866
            benchmark.product_cpe_names = env_yaml["product_cpes"].get_product_cpe_names()
867
            benchmark.cpe_platform_spec = env_yaml["product_cpes"].cpe_platform_specification
868
            benchmark.id_ = env_yaml["benchmark_id"]
869
            benchmark.version = env_yaml["ssg_version_str"]
870
        else:
871
            benchmark.id_ = "product-name"
872
            benchmark.version = "0.0"
873
874
        return benchmark
875
876 2
    def add_profiles_from_dir(self, dir_, env_yaml):
877
        for dir_item in sorted(os.listdir(dir_)):
878
            dir_item_path = os.path.join(dir_, dir_item)
879
            if not os.path.isfile(dir_item_path):
880
                continue
881
882
            _, ext = os.path.splitext(os.path.basename(dir_item_path))
883
            if ext != '.profile':
884
                sys.stderr.write(
885
                    "Encountered file '%s' while looking for profiles, "
886
                    "extension '%s' is unknown. Skipping..\n"
887
                    % (dir_item, ext)
888
                )
889
                continue
890
891
            try:
892
                new_profile = ProfileWithInlinePolicies.from_yaml(dir_item_path, env_yaml)
893
            except DocumentationNotComplete:
894
                continue
895
            except Exception as exc:
896
                msg = ("Error building profile from '{fname}': '{error}'"
897
                       .format(fname=dir_item_path, error=str(exc)))
898
                raise RuntimeError(msg)
899
            if new_profile is None:
900
                continue
901
902
            self.profiles.append(new_profile)
903
904 2
    def to_xml_element(self, env_yaml=None):
905
        root = ET.Element('Benchmark')
906
        root.set('id', self.id_)
907
        root.set('xmlns', "http://checklists.nist.gov/xccdf/1.1")
908
        root.set('xmlns:xsi', 'http://www.w3.org/2001/XMLSchema-instance')
909
        root.set('xsi:schemaLocation',
910
                 'http://checklists.nist.gov/xccdf/1.1 xccdf-1.1.4.xsd')
911
        root.set('style', 'SCAP_1.1')
912
        root.set('resolved', 'false')
913
        root.set('xml:lang', 'en-US')
914
        status = ET.SubElement(root, 'status')
915
        status.set('date', datetime.date.today().strftime("%Y-%m-%d"))
916
        status.text = self.status
917
        add_sub_element(root, "title", self.title)
918
        add_sub_element(root, "description", self.description)
919
        notice = add_sub_element(root, "notice", self.notice_description)
920
        notice.set('id', self.notice_id)
921
        add_sub_element(root, "front-matter", self.front_matter)
922
        add_sub_element(root, "rear-matter", self.rear_matter)
923
        # if there are no platforms, do not output platform-specification at all
924
        if len(self.cpe_platform_spec.platforms) > 0:
925
            root.append(self.cpe_platform_spec.to_xml_element())
926
927
        # The Benchmark applicability is determined by the CPEs
928
        # defined in the product.yml
929
        for cpe_name in self.product_cpe_names:
930
            plat = ET.SubElement(root, "platform")
931
            plat.set("idref", cpe_name)
932
933
        version = ET.SubElement(root, 'version')
934
        version.text = self.version
935
        version.set('update', SSG_BENCHMARK_LATEST_URI)
936
937
        contributors_file = os.path.join(os.path.dirname(__file__), "../Contributors.xml")
938
        add_benchmark_metadata(root, contributors_file)
939
940
        for profile in self.profiles:
941
            root.append(profile.to_xml_element())
942
943
        for value in self.values.values():
944
            root.append(value.to_xml_element())
945
946
        groups_in_bench = list(self.groups.keys())
947
        priority_order = ["system", "services"]
948
        groups_in_bench = reorder_according_to_ordering(groups_in_bench, priority_order)
949
950
        # Make system group the first, followed by services group
951
        for group_id in groups_in_bench:
952
            group = self.groups.get(group_id)
953
            # Products using application benchmark don't have system or services group
954
            if group is not None:
955
                root.append(group.to_xml_element(env_yaml))
956
957
        for rule in self.rules.values():
958
            root.append(rule.to_xml_element(env_yaml))
959
960
        return root
961
962 2
    def to_file(self, file_name, env_yaml=None):
963
        root = self.to_xml_element(env_yaml)
964
        tree = ET.ElementTree(root)
965
        tree.write(file_name)
966
967 2
    def add_value(self, value):
968
        if value is None:
969
            return
970
        self.values[value.id_] = value
971
972
    # The benchmark is also considered a group, so this function signature needs to match
973
    # Group()'s add_group()
974 2
    def add_group(self, group, env_yaml=None):
975
        if group is None:
976
            return
977
        self.groups[group.id_] = group
978
979 2
    def add_rule(self, rule):
980
        if rule is None:
981
            return
982
        self.rules[rule.id_] = rule
983
984 2
    def to_xccdf(self):
985
        """We can easily extend this script to generate a valid XCCDF instead
986
        of SSG SHORTHAND.
987
        """
988
        raise NotImplementedError
989
990 2
    def __str__(self):
991
        return self.id_
992
993
994 2
class Group(XCCDFEntity):
995
    """Represents XCCDF Group
996
    """
997 2
    ATTRIBUTES_TO_PASS_ON = (
998
        "platforms",
999
        "cpe_platform_names",
1000
    )
1001
1002 2
    GENERIC_FILENAME = "group.yml"
1003
1004 2
    KEYS = dict(
1005
        prodtype=lambda: "all",
1006
        title=lambda: "",
1007
        description=lambda: "",
1008
        warnings=lambda: list(),
1009
        requires=lambda: list(),
1010
        conflicts=lambda: list(),
1011
        values=lambda: dict(),
1012
        groups=lambda: dict(),
1013
        rules=lambda: dict(),
1014
        platform=lambda: "",
1015
        platforms=lambda: set(),
1016
        cpe_platform_names=lambda: set(),
1017
        ** XCCDFEntity.KEYS
1018
    )
1019
1020 2
    MANDATORY_KEYS = {
1021
        "title",
1022
        "status",
1023
        "description",
1024
        "front_matter",
1025
        "rear_matter",
1026
    }
1027
1028 2
    @classmethod
1029
    def process_input_dict(cls, input_contents, env_yaml):
1030
        data = super(Group, cls).process_input_dict(input_contents, env_yaml)
1031
        if data["rules"]:
1032
            rule_ids = data["rules"]
1033
            data["rules"] = {rid: None for rid in rule_ids}
1034
1035
        if data["groups"]:
1036
            group_ids = data["groups"]
1037
            data["groups"] = {gid: None for gid in group_ids}
1038
1039
        if data["values"]:
1040
            value_ids = data["values"]
1041
            data["values"] = {vid: None for vid in value_ids}
1042
1043
        if data["platform"]:
1044
            data["platforms"].add(data["platform"])
1045
1046
        # parse platform definition and get CPEAL platform
1047
        if data["platforms"]:
1048
            for platform in data["platforms"]:
1049
                cpe_platform = parse_platform_definition(platform, env_yaml["product_cpes"])
1050
                data["cpe_platform_names"].add(cpe_platform.id)
1051
                # add platform to platform specification
1052
                env_yaml["product_cpes"].cpe_platform_specification.add_platform(cpe_platform)
1053
        return data
1054
1055 2
    def load_entities(self, rules_by_id, values_by_id, groups_by_id):
1056
        for rid, val in self.rules.items():
1057
            if not val:
1058
                self.rules[rid] = rules_by_id[rid]
1059
1060
        for vid, val in self.values.items():
1061
            if not val:
1062
                self.values[vid] = values_by_id[vid]
1063
1064
        for gid in list(self.groups):
1065
            val = self.groups.get(gid, None)
1066
            if not val:
1067
                try:
1068
                    self.groups[gid] = groups_by_id[gid]
1069
                except KeyError:
1070
                    # Add only the groups we have compiled and loaded
1071
                    del self.groups[gid]
1072
                    pass
1073
1074 2
    def represent_as_dict(self):
1075
        yaml_contents = super(Group, self).represent_as_dict()
1076
1077
        if self.rules:
1078
            yaml_contents["rules"] = sorted(list(self.rules.keys()))
1079
        if self.groups:
1080
            yaml_contents["groups"] = sorted(list(self.groups.keys()))
1081
        if self.values:
1082
            yaml_contents["values"] = sorted(list(self.values.keys()))
1083
1084
        return yaml_contents
1085
1086 2
    def validate_prodtype(self, yaml_file):
1087
        for ptype in self.prodtype.split(","):
1088
            if ptype.strip() != ptype:
1089
                msg = (
1090
                    "Comma-separated '{prodtype}' prodtype "
1091
                    "in {yaml_file} contains whitespace."
1092
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
1093
                raise ValueError(msg)
1094
1095 2
    def to_xml_element(self, env_yaml=None):
1096
        group = ET.Element('Group')
1097
        group.set('id', self.id_)
1098
        title = ET.SubElement(group, 'title')
1099
        title.text = self.title
1100
        add_sub_element(group, 'description', self.description)
1101
        add_warning_elements(group, self.warnings)
1102
1103
        # This is where references should be put if there are any
1104
        # This is where rationale should be put if there are any
1105
1106
        for cpe_platform_name in self.cpe_platform_names:
1107
            platform_el = ET.SubElement(group, "platform")
1108
            platform_el.set("idref", "#"+cpe_platform_name)
1109
1110
        add_nondata_subelements(group, "requires", "idref", self.requires)
1111
        add_nondata_subelements(group, "conflicts", "idref", self.conflicts)
1112
1113
        for _value in self.values.values():
1114
            group.append(_value.to_xml_element())
1115
1116
        # Rules that install or remove packages affect remediation
1117
        # of other rules.
1118
        # When packages installed/removed rules come first:
1119
        # The Rules are ordered in more logical way, and
1120
        # remediation order is natural, first the package is installed, then configured.
1121
        rules_in_group = list(self.rules.keys())
1122
        regex = (r'(package_.*_(installed|removed))|' +
1123
                 r'(service_.*_(enabled|disabled))|' +
1124
                 r'install_smartcard_packages$')
1125
        priority_order = ["installed", "install_smartcard_packages", "removed",
1126
                          "enabled", "disabled"]
1127
        rules_in_group = reorder_according_to_ordering(rules_in_group, priority_order, regex)
1128
1129
        # Add rules in priority order, first all packages installed, then removed,
1130
        # followed by services enabled, then disabled
1131
        for rule_id in rules_in_group:
1132
            group.append(self.rules.get(rule_id).to_xml_element(env_yaml))
1133
1134
        # Add the sub groups after any current level group rules.
1135
        # As package installed/removed and service enabled/disabled rules are usuallly in
1136
        # top level group, this ensures groups that further configure a package or service
1137
        # are after rules that install or remove it.
1138
        groups_in_group = list(self.groups.keys())
1139
        priority_order = [
1140
            # Make sure rpm_verify_(hashes|permissions|ownership) are run before any other rule.
1141
            # Due to conflicts between rules rpm_verify_* rules and any rule that configures
1142
            # stricter settings, like file_permissions_grub2_cfg and sudo_dedicated_group,
1143
            # the rules deviating from the system default should be evaluated later.
1144
            # So that in the end the system has contents, permissions and ownership reset, and
1145
            # any deviations or stricter settings are applied by the rules in the profile.
1146
            "software", "integrity", "integrity-software", "rpm_verification",
1147
1148
            # The account group has to precede audit group because
1149
            # the rule package_screen_installed is desired to be executed before the rule
1150
            # audit_rules_privileged_commands, othervise the rule
1151
            # does not catch newly installed screen binary during remediation
1152
            # and report fail
1153
            "accounts", "auditing",
1154
1155
1156
            # The FIPS group should come before Crypto,
1157
            # if we want to set a different (stricter) Crypto Policy than FIPS.
1158
            "fips", "crypto",
1159
1160
            # The firewalld_activation must come before ruleset_modifications, othervise
1161
            # remediations for ruleset_modifications won't work
1162
            "firewalld_activation", "ruleset_modifications",
1163
1164
            # Rules from group disabling_ipv6 must precede rules from configuring_ipv6,
1165
            # otherwise the remediation prints error although it is successful
1166
            "disabling_ipv6", "configuring_ipv6"
1167
        ]
1168
        groups_in_group = reorder_according_to_ordering(groups_in_group, priority_order)
1169
        for group_id in groups_in_group:
1170
            _group = self.groups[group_id]
1171
            group.append(_group.to_xml_element(env_yaml))
1172
1173
        return group
1174
1175 2
    def to_file(self, file_name):
1176
        root = self.to_xml_element()
1177
        tree = ET.ElementTree(root)
1178
        tree.write(file_name)
1179
1180 2
    def add_value(self, value):
1181
        if value is None:
1182
            return
1183
        self.values[value.id_] = value
1184
1185 2
    def add_group(self, group, env_yaml=None):
1186
        if group is None:
1187
            return
1188
        if self.platforms and not group.platforms:
1189
            group.platforms = self.platforms
1190
        self.groups[group.id_] = group
1191
        self._pass_our_properties_on_to(group)
1192
1193
        # Once the group has inherited properties, update cpe_names
1194
        if env_yaml:
1195
            for platform in group.platforms:
1196
                cpe_platform = parse_platform_definition(
1197
                    platform, env_yaml["product_cpes"])
1198
                group.cpe_platform_names.add(cpe_platform.id)
1199
                env_yaml["product_cpes"].cpe_platform_specification.add_platform(
1200
                    cpe_platform)
1201
1202 2
    def _pass_our_properties_on_to(self, obj):
1203
        for attr in self.ATTRIBUTES_TO_PASS_ON:
1204
            if hasattr(obj, attr) and getattr(obj, attr) is None:
1205
                setattr(obj, attr, getattr(self, attr))
1206
1207 2
    def add_rule(self, rule, env_yaml=None):
1208
        if rule is None:
1209
            return
1210
        if self.platforms and not rule.platforms:
1211
            rule.platforms = self.platforms
1212
        self.rules[rule.id_] = rule
1213
        self._pass_our_properties_on_to(rule)
1214
1215
        # Once the rule has inherited properties, update cpe_platform_names
1216
        if env_yaml:
1217
            for platform in rule.platforms:
1218
                cpe_platform = parse_platform_definition(
1219
                    platform, env_yaml["product_cpes"])
1220
                rule.cpe_platform_names.add(cpe_platform.id)
1221
                env_yaml["product_cpes"].cpe_platform_specification.add_platform(
1222
                    cpe_platform)
1223
1224 2
    def __str__(self):
1225
        return self.id_
1226
1227
1228 2
def noop_rule_filterfunc(rule):
1229
    return True
1230
1231 2
def rule_filter_from_def(filterdef):
1232
    if filterdef is None or filterdef == "":
1233
        return noop_rule_filterfunc
1234
1235
    def filterfunc(rule):
1236
        # Remove globals for security and only expose
1237
        # variables relevant to the rule
1238
        return eval(filterdef, {"__builtins__": None}, rule.__dict__)
1239
    return filterfunc
1240
1241
1242 2
class Rule(XCCDFEntity):
1243
    """Represents XCCDF Rule
1244
    """
1245 2
    KEYS = dict(
1246
        prodtype=lambda: "all",
1247
        title=lambda: "",
1248
        description=lambda: "",
1249
        rationale=lambda: "",
1250
        severity=lambda: "",
1251
        references=lambda: dict(),
1252
        identifiers=lambda: dict(),
1253
        ocil_clause=lambda: None,
1254
        ocil=lambda: None,
1255
        oval_external_content=lambda: None,
1256
        warnings=lambda: list(),
1257
        conflicts=lambda: list(),
1258
        requires=lambda: list(),
1259
        platform=lambda: None,
1260
        platforms=lambda: set(),
1261
        inherited_platforms=lambda: list(),
1262
        template=lambda: None,
1263
        cpe_platform_names=lambda: set(),
1264
        ** XCCDFEntity.KEYS
1265
    )
1266
1267 2
    MANDATORY_KEYS = {
1268
        "title",
1269
        "description",
1270
        "rationale",
1271
        "severity",
1272
    }
1273
1274 2
    GENERIC_FILENAME = "rule.yml"
1275 2
    ID_LABEL = "rule_id"
1276
1277 2
    PRODUCT_REFERENCES = ("stigid", "cis",)
1278 2
    GLOBAL_REFERENCES = ("srg", "vmmsrg", "disa", "cis-csc",)
1279
1280 2
    def __init__(self, id_):
1281 2
        super(Rule, self).__init__(id_)
1282 2
        self.sce_metadata = None
1283
1284 2
    def __deepcopy__(self, memo):
1285
        cls = self.__class__
1286
        result = cls.__new__(cls)
1287
        memo[id(self)] = result
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable id does not seem to be defined.
Loading history...
1288
        for k, v in self.__dict__.items():
1289
            # These are difficult to deep copy, so let's just re-use them.
1290
            if k != "template" and k != "local_env_yaml":
1291
                setattr(result, k, deepcopy(v, memo))
1292
            else:
1293
                setattr(result, k, v)
1294
        return result
1295
1296 2
    @classmethod
1297 2
    def from_yaml(cls, yaml_file, env_yaml=None, sce_metadata=None):
1298 2
        rule = super(Rule, cls).from_yaml(yaml_file, env_yaml)
1299
1300
        # platforms are read as list from the yaml file
1301
        # we need them to convert to set again
1302 2
        rule.platforms = set(rule.platforms)
1303
1304
        # rule.platforms.update(set(rule.inherited_platforms))
1305
1306 2
        check_warnings(rule)
1307
1308
        # ensure that content of rule.platform is in rule.platforms as
1309
        # well
1310 2
        if rule.platform is not None:
1311 2
            rule.platforms.add(rule.platform)
1312
1313
        # Convert the platform names to CPE names
1314
        # But only do it if an env_yaml was specified (otherwise there would be no product CPEs
1315
        # to lookup), and the rule's prodtype matches the product being built
1316 2
        if (
1317
                env_yaml and env_yaml["product"] in parse_prodtype(rule.prodtype)
1318
                or env_yaml and rule.prodtype == "all"):
1319
            # parse platform definition and get CPEAL platform
1320
            for platform in rule.platforms:
1321
                cpe_platform = parse_platform_definition(
1322
                    platform, env_yaml["product_cpes"])
1323
                rule.cpe_platform_names.add(cpe_platform.id)
1324
                # add platform to platform specification
1325
                env_yaml["product_cpes"].cpe_platform_specification.add_platform(
1326
                    cpe_platform)
1327
1328
1329 2
        if sce_metadata and rule.id_ in sce_metadata:
1330
            rule.sce_metadata = sce_metadata[rule.id_]
1331
            rule.sce_metadata["relative_path"] = os.path.join(
1332
                env_yaml["product"], "checks/sce", rule.sce_metadata['filename'])
1333
1334 2
        rule.validate_prodtype(yaml_file)
1335 2
        rule.validate_identifiers(yaml_file)
1336 2
        rule.validate_references(yaml_file)
1337 2
        return rule
1338
1339 2
    def _verify_stigid_format(self, product):
1340 2
        stig_id = self.references.get("stigid", None)
1341 2
        if not stig_id:
1342 2
            return
1343 2
        if "," in stig_id:
1344 2
            raise ValueError("Rules can not have multiple STIG IDs.")
1345
1346 2
    def _verify_disa_cci_format(self):
1347 2
        cci_id = self.references.get("disa", None)
1348 2
        if not cci_id:
1349 2
            return
1350
        cci_ex = re.compile(r'^CCI-[0-9]{6}$')
1351
        for cci in cci_id.split(","):
1352
            if not cci_ex.match(cci):
1353
                raise ValueError("CCI '{}' is in the wrong format! "
1354
                                 "Format should be similar to: "
1355
                                 "CCI-XXXXXX".format(cci))
1356
        self.references["disa"] = cci_id
1357
1358 2
    def normalize(self, product):
1359 2
        try:
1360 2
            self.make_refs_and_identifiers_product_specific(product)
1361 2
            self.make_template_product_specific(product)
1362 2
        except Exception as exc:
1363 2
            msg = (
1364
                "Error normalizing '{rule}': {msg}"
1365
                .format(rule=self.id_, msg=str(exc))
1366
            )
1367 2
            raise RuntimeError(msg)
1368
1369 2
    def _get_product_only_references(self):
1370 2
        product_references = dict()
1371
1372 2
        for ref in Rule.PRODUCT_REFERENCES:
1373 2
            start = "{0}@".format(ref)
1374 2
            for gref, gval in self.references.items():
1375 2
                if ref == gref or gref.startswith(start):
1376 2
                    product_references[gref] = gval
1377 2
        return product_references
1378
1379 2
    def make_template_product_specific(self, product):
1380 2
        product_suffix = "@{0}".format(product)
1381
1382 2
        if not self.template:
1383
            return
1384
1385 2
        not_specific_vars = self.template.get("vars", dict())
1386 2
        specific_vars = self._make_items_product_specific(
1387
            not_specific_vars, product_suffix, True)
1388 2
        self.template["vars"] = specific_vars
1389
1390 2
        not_specific_backends = self.template.get("backends", dict())
1391 2
        specific_backends = self._make_items_product_specific(
1392
            not_specific_backends, product_suffix, True)
1393 2
        self.template["backends"] = specific_backends
1394
1395 2
    def make_refs_and_identifiers_product_specific(self, product):
1396 2
        product_suffix = "@{0}".format(product)
1397
1398 2
        product_references = self._get_product_only_references()
1399 2
        general_references = self.references.copy()
1400 2
        for todel in product_references:
1401 2
            general_references.pop(todel)
1402 2
        for ref in Rule.PRODUCT_REFERENCES:
1403 2
            if ref in general_references:
1404
                msg = "Unexpected reference identifier ({0}) without "
1405
                msg += "product qualifier ({0}@{1}) while building rule "
1406
                msg += "{2}"
1407
                msg = msg.format(ref, product, self.id_)
1408
                raise ValueError(msg)
1409
1410 2
        to_set = dict(
1411
            identifiers=(self.identifiers, False),
1412
            general_references=(general_references, True),
1413
            product_references=(product_references, False),
1414
        )
1415 2
        for name, (dic, allow_overwrites) in to_set.items():
1416 2
            try:
1417 2
                new_items = self._make_items_product_specific(
1418
                    dic, product_suffix, allow_overwrites)
1419 2
            except ValueError as exc:
1420 2
                msg = (
1421
                    "Error processing {what} for rule '{rid}': {msg}"
1422
                    .format(what=name, rid=self.id_, msg=str(exc))
1423
                )
1424 2
                raise ValueError(msg)
1425 2
            dic.clear()
1426 2
            dic.update(new_items)
1427
1428 2
        self.references = general_references
1429 2
        self._verify_disa_cci_format()
1430 2
        self.references.update(product_references)
1431
1432 2
        self._verify_stigid_format(product)
1433
1434 2
    def _make_items_product_specific(self, items_dict, product_suffix, allow_overwrites=False):
1435 2
        new_items = dict()
1436 2
        for full_label, value in items_dict.items():
1437 2
            if "@" not in full_label and full_label not in new_items:
1438 2
                new_items[full_label] = value
1439 2
                continue
1440
1441 2
            label = full_label.split("@")[0]
1442
1443
            # this test should occur before matching product_suffix with the product qualifier
1444
            # present in the reference, so it catches problems even for products that are not
1445
            # being built at the moment
1446 2
            if label in Rule.GLOBAL_REFERENCES:
1447
                msg = (
1448
                    "You cannot use product-qualified for the '{item_u}' reference. "
1449
                    "Please remove the product-qualifier and merge values with the "
1450
                    "existing reference if there is any. Original line: {item_q}: {value_q}"
1451
                    .format(item_u=label, item_q=full_label, value_q=value)
1452
                )
1453
                raise ValueError(msg)
1454
1455 2
            if not full_label.endswith(product_suffix):
1456 2
                continue
1457
1458 2
            if label in items_dict and not allow_overwrites and value != items_dict[label]:
1459 2
                msg = (
1460
                    "There is a product-qualified '{item_q}' item, "
1461
                    "but also an unqualified '{item_u}' item "
1462
                    "and those two differ in value - "
1463
                    "'{value_q}' vs '{value_u}' respectively."
1464
                    .format(item_q=full_label, item_u=label,
1465
                            value_q=value, value_u=items_dict[label])
1466
                )
1467 2
                raise ValueError(msg)
1468 2
            new_items[label] = value
1469 2
        return new_items
1470
1471 2
    def validate_identifiers(self, yaml_file):
1472 2
        if self.identifiers is None:
1473
            raise ValueError("Empty identifier section in file %s" % yaml_file)
1474
1475
        # Validate all identifiers are non-empty:
1476 2
        for ident_type, ident_val in self.identifiers.items():
1477 2
            if not isinstance(ident_type, str) or not isinstance(ident_val, str):
1478
                raise ValueError("Identifiers and values must be strings: %s in file %s"
1479
                                 % (ident_type, yaml_file))
1480 2
            if ident_val.strip() == "":
1481
                raise ValueError("Identifiers must not be empty: %s in file %s"
1482
                                 % (ident_type, yaml_file))
1483 2
            if ident_type[0:3] == 'cce':
1484 2
                if not is_cce_format_valid(ident_val):
1485
                    raise ValueError("CCE Identifier format must be valid: invalid format '%s' for CEE '%s'"
1486
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
1487 2
                if not is_cce_value_valid("CCE-" + ident_val):
1488
                    raise ValueError("CCE Identifier value is not a valid checksum: invalid value '%s' for CEE '%s'"
1489
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
1490
1491 2
    def validate_references(self, yaml_file):
1492 2
        if self.references is None:
1493
            raise ValueError("Empty references section in file %s" % yaml_file)
1494
1495 2
        for ref_type, ref_val in self.references.items():
1496 2
            if not isinstance(ref_type, str) or not isinstance(ref_val, str):
1497
                raise ValueError("References and values must be strings: %s in file %s"
1498
                                 % (ref_type, yaml_file))
1499 2
            if ref_val.strip() == "":
1500
                raise ValueError("References must not be empty: %s in file %s"
1501
                                 % (ref_type, yaml_file))
1502
1503 2
        for ref_type, ref_val in self.references.items():
1504 2
            for ref in ref_val.split(","):
1505 2
                if ref.strip() != ref:
1506
                    msg = (
1507
                        "Comma-separated '{ref_type}' reference "
1508
                        "in {yaml_file} contains whitespace."
1509
                        .format(ref_type=ref_type, yaml_file=yaml_file))
1510
                    raise ValueError(msg)
1511
1512 2
    def validate_prodtype(self, yaml_file):
1513 2
        for ptype in self.prodtype.split(","):
1514 2
            if ptype.strip() != ptype:
1515
                msg = (
1516
                    "Comma-separated '{prodtype}' prodtype "
1517
                    "in {yaml_file} contains whitespace."
1518
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
1519
                raise ValueError(msg)
1520
1521 2
    def to_xml_element(self, env_yaml=None):
1522
        rule = ET.Element('Rule')
1523
        rule.set('selected', 'false')
1524
        rule.set('id', self.id_)
1525
        rule.set('severity', self.severity)
1526
        add_sub_element(rule, 'title', self.title)
1527
        add_sub_element(rule, 'description', self.description)
1528
        add_warning_elements(rule, self.warnings)
1529
1530
        if env_yaml:
1531
            ref_uri_dict = env_yaml['reference_uris']
1532
        else:
1533
            ref_uri_dict = SSG_REF_URIS
1534
        add_reference_elements(rule, self.references, ref_uri_dict)
1535
1536
        add_sub_element(rule, 'rationale', self.rationale)
1537
1538
        for cpe_platform_name in self.cpe_platform_names:
1539
            platform_el = ET.SubElement(rule, "platform")
1540
            platform_el.set("idref", "#"+cpe_platform_name)
1541
1542
        add_nondata_subelements(rule, "requires", "idref", self.requires)
1543
        add_nondata_subelements(rule, "conflicts", "idref", self.conflicts)
1544
1545
        for ident_type, ident_val in self.identifiers.items():
1546
            ident = ET.SubElement(rule, 'ident')
1547
            if ident_type == 'cce':
1548
                ident.set('system', cce_uri)
1549
                ident.text = ident_val
1550
1551
        ocil_parent = rule
1552
        check_parent = rule
1553
1554
        if self.sce_metadata:
1555
            # TODO: This is pretty much another hack, just like the previous OVAL
1556
            # one. However, we avoided the external SCE content as I'm not sure it
1557
            # is generally useful (unlike say, CVE checking with external OVAL)
1558
            #
1559
            # Additionally, we build the content (check subelement) here rather
1560
            # than in xslt due to the nature of our SCE metadata.
1561
            #
1562
            # Finally, before we begin, we might have an element with both SCE
1563
            # and OVAL. We have no way of knowing (right here) whether that is
1564
            # the case (due to a variety of issues, most notably, that linking
1565
            # hasn't yet occurred). So we must rely on the content author's
1566
            # good will, by annotating SCE content with a complex-check tag
1567
            # if necessary.
1568
1569
            if 'complex-check' in self.sce_metadata:
1570
                # Here we have an issue: XCCDF allows EITHER one or more check
1571
                # elements OR a single complex-check. While we have an explicit
1572
                # case handling the OVAL-and-SCE interaction, OCIL entries have
1573
                # (historically) been alongside OVAL content and been in an
1574
                # "OR" manner -- preferring OVAL to SCE. In order to accomplish
1575
                # this, we thus need to add _yet another parent_ when OCIL data
1576
                # is present, and add update ocil_parent accordingly.
1577
                if self.ocil or self.ocil_clause:
1578
                    ocil_parent = ET.SubElement(ocil_parent, "complex-check")
1579
                    ocil_parent.set('operator', 'OR')
1580
1581
                check_parent = ET.SubElement(ocil_parent, "complex-check")
1582
                check_parent.set('operator', self.sce_metadata['complex-check'])
1583
1584
            # Now, add the SCE check element to the tree.
1585
            check = ET.SubElement(check_parent, "check")
1586
            check.set("system", SCE_SYSTEM)
1587
1588
            if 'check-import' in self.sce_metadata:
1589
                if isinstance(self.sce_metadata['check-import'], str):
1590
                    self.sce_metadata['check-import'] = [self.sce_metadata['check-import']]
1591
                for entry in self.sce_metadata['check-import']:
1592
                    check_import = ET.SubElement(check, 'check-import')
1593
                    check_import.set('import-name', entry)
1594
                    check_import.text = None
1595
1596
            if 'check-export' in self.sce_metadata:
1597
                if isinstance(self.sce_metadata['check-export'], str):
1598
                    self.sce_metadata['check-export'] = [self.sce_metadata['check-export']]
1599
                for entry in self.sce_metadata['check-export']:
1600
                    export, value = entry.split('=')
1601
                    check_export = ET.SubElement(check, 'check-export')
1602
                    check_export.set('value-id', value)
1603
                    check_export.set('export-name', export)
1604
                    check_export.text = None
1605
1606
            check_ref = ET.SubElement(check, "check-content-ref")
1607
            href = self.sce_metadata['relative_path']
1608
            check_ref.set("href", href)
1609
1610
        check = ET.SubElement(check_parent, 'check')
1611
        check.set("system", oval_namespace)
1612
        check_content_ref = ET.SubElement(check, "check-content-ref")
1613
        if self.oval_external_content:
1614
            check_content_ref.set("href", self.oval_external_content)
1615
        else:
1616
            # TODO: This is pretty much a hack, oval ID will be the same as rule ID
1617
            #       and we don't want the developers to have to keep them in sync.
1618
            #       Therefore let's just add an OVAL ref of that ID.
1619
            # TODO  Can we not add the check element if the rule doesn't have an OVAL check?
1620
            #       At the moment, the check elements of rules without OVAL are removed by
1621
            #       relabel_ids.py
1622
            check_content_ref.set("href", "oval-unlinked.xml")
1623
            check_content_ref.set("name", self.id_)
1624
1625
        if self.ocil or self.ocil_clause:
1626
            ocil_check = ET.SubElement(check_parent, "check")
1627
            ocil_check.set("system", ocil_cs)
1628
            ocil_check_ref = ET.SubElement(ocil_check, "check-content-ref")
1629
            ocil_check_ref.set("href", "ocil-unlinked.xml")
1630
            ocil_check_ref.set("name", self.id_ + "_ocil")
1631
1632
        return rule
1633
1634 2
    def to_file(self, file_name):
1635
        root = self.to_xml_element()
1636
        tree = ET.ElementTree(root)
1637
        tree.write(file_name)
1638
1639 2
    def to_ocil(self):
1640
        if not self.ocil and not self.ocil_clause:
1641
            raise ValueError("Rule {0} doesn't have OCIL".format(self.id_))
1642
        # Create <questionnaire> for the rule
1643
        questionnaire = ET.Element("questionnaire", id=self.id_ + "_ocil")
1644
        title = ET.SubElement(questionnaire, "title")
1645
        title.text = self.title
1646
        actions = ET.SubElement(questionnaire, "actions")
1647
        test_action_ref = ET.SubElement(actions, "test_action_ref")
1648
        test_action_ref.text = self.id_ + "_action"
1649
        # Create <boolean_question_test_action> for the rule
1650
        action = ET.Element(
1651
            "boolean_question_test_action",
1652
            id=self.id_ + "_action",
1653
            question_ref=self.id_ + "_question")
1654
        when_true = ET.SubElement(action, "when_true")
1655
        result = ET.SubElement(when_true, "result")
1656
        result.text = "PASS"
1657
        when_true = ET.SubElement(action, "when_false")
1658
        result = ET.SubElement(when_true, "result")
1659
        result.text = "FAIL"
1660
        # Create <boolean_question>
1661
        boolean_question = ET.Element(
1662
            "boolean_question", id=self.id_ + "_question")
1663
        # TODO: The contents of <question_text> element used to be broken in
1664
        # the legacy XSLT implementation. The following code contains hacks
1665
        # to get the same results as in the legacy XSLT implementation.
1666
        # This enabled us a smooth transition to new OCIL generator
1667
        # without a need to mass-edit rule YAML files.
1668
        # We need to solve:
1669
        # TODO: using variables (aka XCCDF Values) in OCIL content
1670
        # TODO: using HTML formating tags eg. <pre> in OCIL content
1671
        #
1672
        # The "ocil" key in compiled rules contains HTML and XML elements
1673
        # but OCIL question texts shouldn't contain HTML or XML elements,
1674
        # therefore removing them.
1675
        if self.ocil is not None:
1676
            ocil_without_tags = re.sub(r"</?[^>]+>", "", self.ocil)
1677
        else:
1678
            ocil_without_tags = ""
1679
        # The "ocil" key in compiled rules contains XML entities which would
1680
        # be escaped by ET.Subelement() so we need to use add_sub_element()
1681
        # instead because we don't want to escape them.
1682
        question_text = add_sub_element(
1683
            boolean_question, "question_text", ocil_without_tags)
1684
        # The "ocil_clause" key in compiled rules also contains HTML and XML
1685
        # elements but unlike the "ocil" we want to escape the '<' and '>'
1686
        # characters.
1687
        # The empty ocil_clause causing broken question is in line with the
1688
        # legacy XSLT implementation.
1689
        ocil_clause = self.ocil_clause if self.ocil_clause else ""
1690
        question_text.text = (
1691
            "{0}\n      Is it the case that {1}?\n      ".format(
1692
                question_text.text if question_text.text is not None else "",
1693
                ocil_clause))
1694
        return (questionnaire, action, boolean_question)
1695
1696 2
    def __hash__(self):
1697
        """ Controls are meant to be unique, so using the
1698
        ID should suffice"""
1699
        return hash(self.id_)
1700
1701 2
    def __eq__(self, other):
1702
        return isinstance(other, self.__class__) and self.id_ == other.id_
1703
1704 2
    def __ne__(self, other):
1705
        return not self != other
1706
1707 2
    def __lt__(self, other):
1708
        return self.id_ < other.id_
1709
1710 2
    def __str__(self):
1711
        return self.id_
1712
1713
1714 2
class DirectoryLoader(object):
1715 2
    def __init__(self, profiles_dir, env_yaml):
1716
        self.benchmark_file = None
1717
        self.group_file = None
1718
        self.loaded_group = None
1719
        self.rule_files = []
1720
        self.value_files = []
1721
        self.subdirectories = []
1722
1723
        self.all_values = dict()
1724
        self.all_rules = dict()
1725
        self.all_groups = dict()
1726
1727
        self.profiles_dir = profiles_dir
1728
        self.env_yaml = env_yaml
1729
        self.product = env_yaml["product"]
1730
1731
        self.parent_group = None
1732
1733 2
    def _collect_items_to_load(self, guide_directory):
1734
        for dir_item in sorted(os.listdir(guide_directory)):
1735
            dir_item_path = os.path.join(guide_directory, dir_item)
1736
            _, extension = os.path.splitext(dir_item)
1737
1738
            if extension == '.var':
1739
                self.value_files.append(dir_item_path)
1740
            elif dir_item == "benchmark.yml":
1741
                if self.benchmark_file:
1742
                    raise ValueError("Multiple benchmarks in one directory")
1743
                self.benchmark_file = dir_item_path
1744
            elif dir_item == "group.yml":
1745
                if self.group_file:
1746
                    raise ValueError("Multiple groups in one directory")
1747
                self.group_file = dir_item_path
1748
            elif extension == '.rule':
1749
                self.rule_files.append(dir_item_path)
1750
            elif is_rule_dir(dir_item_path):
1751
                self.rule_files.append(get_rule_dir_yaml(dir_item_path))
1752
            elif dir_item != "tests":
1753
                if os.path.isdir(dir_item_path):
1754
                    self.subdirectories.append(dir_item_path)
1755
                else:
1756
                    sys.stderr.write(
1757
                        "Encountered file '%s' while recursing, extension '%s' "
1758
                        "is unknown. Skipping..\n"
1759
                        % (dir_item, extension)
1760
                    )
1761
1762 2
    def load_benchmark_or_group(self, guide_directory):
1763
        """
1764
        Loads a given benchmark or group from the specified benchmark_file or
1765
        group_file, in the context of guide_directory, profiles_dir and env_yaml.
1766
1767
        Returns the loaded group or benchmark.
1768
        """
1769
        group = None
1770
        if self.group_file and self.benchmark_file:
1771
            raise ValueError("A .benchmark file and a .group file were found in "
1772
                             "the same directory '%s'" % (guide_directory))
1773
1774
        # we treat benchmark as a special form of group in the following code
1775
        if self.benchmark_file:
1776
            group = Benchmark.from_yaml(
1777
                self.benchmark_file, self.env_yaml
1778
            )
1779
            if self.profiles_dir:
1780
                group.add_profiles_from_dir(self.profiles_dir, self.env_yaml)
1781
1782
        if self.group_file:
1783
            group = Group.from_yaml(self.group_file, self.env_yaml)
1784
            prodtypes = parse_prodtype(group.prodtype)
1785
            if "all" in prodtypes or self.product in prodtypes:
1786
                self.all_groups[group.id_] = group
1787
1788
        return group
1789
1790 2
    def _load_group_process_and_recurse(self, guide_directory):
1791
        self.loaded_group = self.load_benchmark_or_group(guide_directory)
1792
1793
        if self.loaded_group:
1794
1795
            if self.parent_group:
1796
                self.parent_group.add_group(self.loaded_group, env_yaml=self.env_yaml)
1797
1798
            self._process_values()
1799
            self._recurse_into_subdirs()
1800
            self._process_rules()
1801
1802 2
    def process_directory_tree(self, start_dir, extra_group_dirs=None):
1803
        self._collect_items_to_load(start_dir)
1804
        if extra_group_dirs:
1805
            self.subdirectories += extra_group_dirs
1806
        self._load_group_process_and_recurse(start_dir)
1807
1808 2
    def process_directory_trees(self, directories):
1809
        start_dir = directories[0]
1810
        extra_group_dirs = directories[1:]
1811
        return self.process_directory_tree(start_dir, extra_group_dirs)
1812
1813 2
    def _recurse_into_subdirs(self):
1814
        for subdir in self.subdirectories:
1815
            loader = self._get_new_loader()
1816
            loader.parent_group = self.loaded_group
1817
            loader.process_directory_tree(subdir)
1818
            self.all_values.update(loader.all_values)
1819
            self.all_rules.update(loader.all_rules)
1820
            self.all_groups.update(loader.all_groups)
1821
1822 2
    def _get_new_loader(self):
1823
        raise NotImplementedError()
1824
1825 2
    def _process_values(self):
1826
        raise NotImplementedError()
1827
1828 2
    def _process_rules(self):
1829
        raise NotImplementedError()
1830
1831 2
    def save_all_entities(self, base_dir):
1832
        destdir = os.path.join(base_dir, "rules")
1833
        mkdir_p(destdir)
1834
        if self.all_rules:
1835
            self.save_entities(self.all_rules.values(), destdir)
1836
1837
        destdir = os.path.join(base_dir, "groups")
1838
        mkdir_p(destdir)
1839
        if self.all_groups:
1840
            self.save_entities(self.all_groups.values(), destdir)
1841
1842
        destdir = os.path.join(base_dir, "values")
1843
        mkdir_p(destdir)
1844
        if self.all_values:
1845
            self.save_entities(self.all_values.values(), destdir)
1846
1847 2
    def save_entities(self, entities, destdir):
1848
        if not entities:
1849
            return
1850
        for entity in entities:
1851
            basename = entity.id_ + ".yml"
1852
            dest_filename = os.path.join(destdir, basename)
1853
            entity.dump_yaml(dest_filename)
1854
1855
1856 2
class BuildLoader(DirectoryLoader):
1857 2
    def __init__(self, profiles_dir, env_yaml,
1858
                 sce_metadata_path=None):
1859
        super(BuildLoader, self).__init__(profiles_dir, env_yaml)
1860
1861
        self.sce_metadata = None
1862
        if sce_metadata_path and os.path.getsize(sce_metadata_path):
1863
            self.sce_metadata = json.load(open(sce_metadata_path, 'r'))
1864
1865 2
    def _process_values(self):
1866
        for value_yaml in self.value_files:
1867
            value = Value.from_yaml(value_yaml, self.env_yaml)
1868
            self.all_values[value.id_] = value
1869
            self.loaded_group.add_value(value)
1870
1871 2
    def _process_rules(self):
1872
        for rule_yaml in self.rule_files:
1873
            try:
1874
                rule = Rule.from_yaml(rule_yaml, self.env_yaml, self.sce_metadata)
1875
            except DocumentationNotComplete:
1876
                # Happens on non-debug build when a rule is "documentation-incomplete"
1877
                continue
1878
            prodtypes = parse_prodtype(rule.prodtype)
1879
            if "all" not in prodtypes and self.product not in prodtypes:
1880
                continue
1881
            self.all_rules[rule.id_] = rule
1882
            self.loaded_group.add_rule(rule, env_yaml=self.env_yaml)
1883
1884
            if self.loaded_group.platforms:
1885
                rule.inherited_platforms += self.loaded_group.platforms
1886
1887
            rule.normalize(self.env_yaml["product"])
1888
1889 2
    def _get_new_loader(self):
1890
        loader = BuildLoader(
1891
            self.profiles_dir, self.env_yaml)
1892
        # Do it this way so we only have to parse the SCE metadata once.
1893
        loader.sce_metadata = self.sce_metadata
1894
        return loader
1895
1896 2
    def export_group_to_file(self, filename):
1897
        return self.loaded_group.to_file(filename)
1898
1899
1900 2
class LinearLoader(object):
1901 2
    def __init__(self, env_yaml, resolved_path):
1902
        self.resolved_rules_dir = os.path.join(resolved_path, "rules")
1903
        self.rules = dict()
1904
1905
        self.resolved_profiles_dir = os.path.join(resolved_path, "profiles")
1906
        self.profiles = dict()
1907
1908
        self.resolved_groups_dir = os.path.join(resolved_path, "groups")
1909
        self.groups = dict()
1910
1911
        self.resolved_values_dir = os.path.join(resolved_path, "values")
1912
        self.values = dict()
1913
1914
        self.benchmark = None
1915
        self.env_yaml = env_yaml
1916
1917 2
    def find_first_groups_ids(self, start_dir):
1918
        group_files = glob.glob(os.path.join(start_dir, "*", "group.yml"))
1919
        group_ids = [fname.split(os.path.sep)[-2] for fname in group_files]
1920
        return group_ids
1921
1922 2
    def load_entities_by_id(self, filenames, destination, cls):
1923
        for fname in filenames:
1924
            entity = cls.from_yaml(fname, self.env_yaml)
1925
            destination[entity.id_] = entity
1926
1927 2
    def load_benchmark(self, directory):
1928
        self.benchmark = Benchmark.from_yaml(
1929
            os.path.join(directory, "benchmark.yml"), self.env_yaml)
1930
1931
        self.benchmark.add_profiles_from_dir(self.resolved_profiles_dir, self.env_yaml)
1932
1933
        benchmark_first_groups = self.find_first_groups_ids(directory)
1934
        for gid in benchmark_first_groups:
1935
            try:
1936
                self.benchmark.add_group(self.groups[gid], self.env_yaml)
1937
            except KeyError as exc:
1938
                # Add only the groups we have compiled and loaded
1939
                pass
1940
1941 2
    def load_compiled_content(self):
1942
        filenames = glob.glob(os.path.join(self.resolved_rules_dir, "*.yml"))
1943
        self.load_entities_by_id(filenames, self.rules, Rule)
1944
1945
        filenames = glob.glob(os.path.join(self.resolved_groups_dir, "*.yml"))
1946
        self.load_entities_by_id(filenames, self.groups, Group)
1947
1948
        filenames = glob.glob(os.path.join(self.resolved_profiles_dir, "*.yml"))
1949
        self.load_entities_by_id(filenames, self.profiles, Profile)
1950
1951
        filenames = glob.glob(os.path.join(self.resolved_values_dir, "*.yml"))
1952
        self.load_entities_by_id(filenames, self.values, Value)
1953
1954
        for g in self.groups.values():
1955
            g.load_entities(self.rules, self.values, self.groups)
1956
1957 2
    def export_benchmark_to_file(self, filename):
1958
        register_namespaces()
1959
        return self.benchmark.to_file(filename, self.env_yaml)
1960
1961 2
    def export_ocil_to_file(self, filename):
1962
        root = ET.Element('ocil')
1963
        root.set('xmlns:xsi', xsi_namespace)
1964
        root.set("xmlns", ocil_namespace)
1965
        root.set("xmlns:xhtml", xhtml_namespace)
1966
        tree = ET.ElementTree(root)
1967
        generator = ET.SubElement(root, "generator")
1968
        product_name = ET.SubElement(generator, "product_name")
1969
        product_name.text = "build_shorthand.py from SCAP Security Guide"
1970
        product_version = ET.SubElement(generator, "product_version")
1971
        product_version.text = "ssg: " + self.env_yaml["ssg_version_str"]
1972
        schema_version = ET.SubElement(generator, "schema_version")
1973
        schema_version.text = "2.0"
1974
        timestamp_el = ET.SubElement(generator, "timestamp")
1975
        timestamp_el.text = timestamp
1976
        questionnaires = ET.SubElement(root, "questionnaires")
1977
        test_actions = ET.SubElement(root, "test_actions")
1978
        questions = ET.SubElement(root, "questions")
1979
        for rule in self.rules.values():
1980
            if not rule.ocil and not rule.ocil_clause:
1981
                continue
1982
            questionnaire, action, boolean_question = rule.to_ocil()
1983
            questionnaires.append(questionnaire)
1984
            test_actions.append(action)
1985
            questions.append(boolean_question)
1986
        tree.write(filename)
1987