Test Failed
Pull Request — master (#7613)
by Matěj
02:12
created

ssg.build_yaml.DirectoryLoader.save_all_entities()   A

Complexity

Conditions 4

Size

Total Lines 15
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 16.5853

Importance

Changes 0
Metric Value
cc 4
eloc 13
nop 2
dl 0
loc 15
ccs 1
cts 13
cp 0.0769
crap 16.5853
rs 9.75
c 0
b 0
f 0
1 2
from __future__ import absolute_import
2 2
from __future__ import print_function
3
4 2
from collections import defaultdict
5 2
from copy import deepcopy
6 2
import datetime
7 2
import json
8 2
import os
9 2
import os.path
10 2
import re
11 2
import sys
12 2
from xml.sax.saxutils import escape
13 2
import glob
14
15 2
import yaml
16
17 2
from .build_cpe import CPEDoesNotExist, parse_platform_definition
18 2
from .constants import XCCDF_REFINABLE_PROPERTIES, SCE_SYSTEM, ocil_cs, ocil_namespace, xhtml_namespace, xsi_namespace, timestamp
19 2
from .rules import get_rule_dir_id, get_rule_dir_yaml, is_rule_dir
20 2
from .rule_yaml import parse_prodtype
21
22 2
from .cce import is_cce_format_valid, is_cce_value_valid
23 2
from .yaml import DocumentationNotComplete, open_and_expand, open_and_macro_expand
24 2
from .utils import required_key, mkdir_p
25
26 2
from .xml import ElementTree as ET
27 2
from .shims import unicode_func
28
29
30 2
def dump_yaml_preferably_in_original_order(dictionary, file_object):
31
    try:
32
        return yaml.dump(dictionary, file_object, indent=4, sort_keys=False)
33
    except TypeError as exc:
34
        # Older versions of libyaml don't understand the sort_keys kwarg
35
        if "sort_keys" not in str(exc):
36
            raise exc
37
        return yaml.dump(dictionary, file_object, indent=4)
38
39
40 2
def add_sub_element(parent, tag, data):
41
    """
42
    Creates a new child element under parent with tag tag, and sets
43
    data as the content under the tag. In particular, data is a string
44
    to be parsed as an XML tree, allowing sub-elements of children to be
45
    added.
46
47
    If data should not be parsed as an XML tree, either escape the contents
48
    before passing into this function, or use ElementTree.SubElement().
49
50
    Returns the newly created subelement of type tag.
51
    """
52
    # This is used because our YAML data contain XML and XHTML elements
53
    # ET.SubElement() escapes the < > characters by &lt; and &gt;
54
    # and therefore it does not add child elements
55
    # we need to do a hack instead
56
    # TODO: Remove this function after we move to Markdown everywhere in SSG
57
    ustr = unicode_func("<{0}>{1}</{0}>").format(tag, data)
58
59
    try:
60
        element = ET.fromstring(ustr.encode("utf-8"))
61
    except Exception:
62
        msg = ("Error adding subelement to an element '{0}' from string: '{1}'"
63
               .format(parent.tag, ustr))
64
        raise RuntimeError(msg)
65
66
    parent.append(element)
67
    return element
68
69
70 2
def reorder_according_to_ordering(unordered, ordering, regex=None):
71 2
    ordered = []
72 2
    if regex is None:
73 2
        regex = "|".join(["({0})".format(item) for item in ordering])
74 2
    regex = re.compile(regex)
75
76 2
    items_to_order = list(filter(regex.match, unordered))
77 2
    unordered = set(unordered)
78
79 2
    for priority_type in ordering:
80 2
        for item in items_to_order:
81 2
            if priority_type in item and item in unordered:
82 2
                ordered.append(item)
83 2
                unordered.remove(item)
84 2
    ordered.extend(sorted(unordered))
85 2
    return ordered
86
87
88 2
def add_warning_elements(element, warnings):
89
    # The use of [{dict}, {dict}] in warnings is to handle the following
90
    # scenario where multiple warnings have the same category which is
91
    # valid in SCAP and our content:
92
    #
93
    # warnings:
94
    #     - general: Some general warning
95
    #     - general: Some other general warning
96
    #     - general: |-
97
    #         Some really long multiline general warning
98
    #
99
    # Each of the {dict} should have only one key/value pair.
100
    for warning_dict in warnings:
101
        warning = add_sub_element(element, "warning", list(warning_dict.values())[0])
102
        warning.set("category", list(warning_dict.keys())[0])
103
104
105 2
def add_nondata_subelements(element, subelement, attribute, attr_data):
106
    """Add multiple iterations of a sublement that contains an attribute but no data
107
       For example, <requires id="my_required_id"/>"""
108
    for data in attr_data:
109
        req = ET.SubElement(element, subelement)
110
        req.set(attribute, data)
111
112
113 2
def check_warnings(xccdf_structure):
114 2
    for warning_list in xccdf_structure.warnings:
115
        if len(warning_list) != 1:
116
            msg = "Only one key/value pair should exist for each warnings dictionary"
117
            raise ValueError(msg)
118
119
120 2
class SelectionHandler(object):
121 2
    def __init__(self):
122 2
        self.refine_rules = defaultdict(list)
123 2
        self.variables = dict()
124 2
        self.unselected = []
125 2
        self.selected = []
126
127 2
    @property
128
    def selections(self):
129
        selections = []
130
        for item in self.selected:
131
            selections.append(str(item))
132
        for item in self.unselected:
133
            selections.append("!"+str(item))
134
        for varname in self.variables.keys():
135
            selections.append(varname+"="+self.variables.get(varname))
136
        for rule, refinements in self.refine_rules.items():
137
            for prop, val in refinements:
138
                selections.append("{rule}.{property}={value}"
139
                                  .format(rule=rule, property=prop, value=val))
140
        return selections
141
142 2
    @selections.setter
143
    def selections(self, entries):
144 2
        for item in entries:
145 2
            self.apply_selection(item)
146
147 2
    def apply_selection(self, item):
148 2
        if "." in item:
149
            rule, refinement = item.split(".", 1)
150
            property_, value = refinement.split("=", 1)
151
            if property_ not in XCCDF_REFINABLE_PROPERTIES:
152
                msg = ("Property '{property_}' cannot be refined. "
153
                       "Rule properties that can be refined are {refinables}. "
154
                       "Fix refinement '{rule_id}.{property_}={value}' in profile '{profile}'."
155
                       .format(property_=property_, refinables=XCCDF_REFINABLE_PROPERTIES,
156
                               rule_id=rule, value=value, profile=self.id_)
157
                       )
158
                raise ValueError(msg)
159
            self.refine_rules[rule].append((property_, value))
160 2
        elif "=" in item:
161 2
            varname, value = item.split("=", 1)
162 2
            self.variables[varname] = value
163 2
        elif item.startswith("!"):
164
            self.unselected.append(item[1:])
165
        else:
166 2
            self.selected.append(item)
167
168 2
    def _subtract_refinements(self, extended_refinements):
169
        """
170
        Given a dict of rule refinements from the extended profile,
171
        "undo" every refinement prefixed with '!' in this profile.
172
        """
173
        for rule, refinements in list(self.refine_rules.items()):
174
            if rule.startswith("!"):
175
                for prop, val in refinements:
176
                    extended_refinements[rule[1:]].remove((prop, val))
177
                del self.refine_rules[rule]
178
        return extended_refinements
179
180 2
    def update_with(self, rhs):
181
        extended_selects = set(rhs.selected)
182
        extra_selections = extended_selects.difference(set(self.selected))
183
        self.selected.extend(list(extra_selections))
184
185
        updated_variables = dict(rhs.variables)
186
        updated_variables.update(self.variables)
187
        self.variables = updated_variables
188
189
        extended_refinements = deepcopy(rhs.refine_rules)
190
        updated_refinements = self._subtract_refinements(extended_refinements)
191
        updated_refinements.update(self.refine_rules)
192
        self.refine_rules = updated_refinements
193
194
195 2
class XCCDFEntity(object):
196
    """
197
    This class can load itself from a YAML with Jinja macros,
198
    and it can also save itself to YAML.
199
200
    It is supposed to work with the content in the project,
201
    when entities are defined in the benchmark tree,
202
    and they are compiled into flat YAMLs to the build directory.
203
    """
204 2
    KEYS = dict(
205
            id_=lambda: "",
206
            definition_location=lambda: "",
207
    )
208
209 2
    MANDATORY_KEYS = set()
210
211 2
    GENERIC_FILENAME = ""
212 2
    ID_LABEL = "id"
213
214 2
    def __init__(self, id_):
215 2
        super(XCCDFEntity, self).__init__()
216 2
        self._assign_defaults()
217 2
        self.id_ = id_
218
219 2
    def _assign_defaults(self):
220 2
        for key, default in self.KEYS.items():
221 2
            default_val = default()
222 2
            if isinstance(default_val, RuntimeError):
223
                default_val = None
224 2
            setattr(self, key, default_val)
225
226 2
    @classmethod
227
    def get_instance_from_full_dict(cls, data):
228
        """
229
        Given a defining dictionary, produce an instance
230
        by treating all dict elements as attributes.
231
232
        Extend this if you want tight control over the instance creation process.
233
        """
234 2
        entity = cls(data["id_"])
235 2
        for key, value in data.items():
236 2
            setattr(entity, key, value)
237 2
        return entity
238
239 2
    @classmethod
240
    def process_input_dict(cls, input_contents, env_yaml):
241
        """
242
        Take the contents of the definition as a dictionary, and
243
        add defaults or raise errors if a required member is not present.
244
245
        Extend this if you want to add, remove or alter the result
246
        that will constitute the new instance.
247
        """
248 2
        data = dict()
249
250 2
        for key, default in cls.KEYS.items():
251 2
            if key in input_contents:
252 2
                data[key] = input_contents[key]
253 2
                del input_contents[key]
254 2
                continue
255
256 2
            if key not in cls.MANDATORY_KEYS:
257 2
                data[key] = cls.KEYS[key]()
258
            else:
259
                msg = (
260
                    "Key '{key}' is mandatory for definition of '{class_name}'."
261
                    .format(key=key, class_name=cls.__name__))
262
                raise ValueError(msg)
263
264 2
        return data
265
266 2
    @classmethod
267 2
    def parse_yaml_into_processed_dict(cls, yaml_file, env_yaml=None):
268
        """
269
        Given yaml filename and environment info, produce a dictionary
270
        that defines the instance to be created.
271
        This wraps :meth:`process_input_dict` and it adds generic keys on the top:
272
273
        - `id_` as the entity ID that is deduced either from thefilename,
274
          or from the parent directory name.
275
        - `definition_location` as the original location whenre the entity got defined.
276
        """
277 2
        file_basename = os.path.basename(yaml_file)
278 2
        entity_id = file_basename.split(".")[0]
279 2
        if file_basename == cls.GENERIC_FILENAME:
280 2
            entity_id = os.path.basename(os.path.dirname(yaml_file))
281
282 2
        if env_yaml:
283 2
            env_yaml[cls.ID_LABEL] = entity_id
284 2
        yaml_data = open_and_macro_expand(yaml_file, env_yaml)
285
286 2
        try:
287 2
            processed_data = cls.process_input_dict(yaml_data, env_yaml)
288
        except ValueError as exc:
289
            msg = (
290
                "Error processing {yaml_file}: {exc}"
291
                .format(yaml_file=yaml_file, exc=str(exc)))
292
            raise ValueError(msg)
293
294 2
        if yaml_data:
295
            msg = (
296
                "Unparsed YAML data in '{yaml_file}': {keys}"
297
                .format(yaml_file=yaml_file, keys=list(yaml_data.keys())))
298
            raise RuntimeError(msg)
299
300 2
        if not processed_data.get("definition_location", ""):
301 2
            processed_data["definition_location"] = yaml_file
302
303 2
        processed_data["id_"] = entity_id
304
305 2
        return processed_data
306
307 2
    @classmethod
308 2
    def from_yaml(cls, yaml_file, env_yaml=None):
309 2
        yaml_file = os.path.normpath(yaml_file)
310
311 2
        local_env_yaml = None
312 2
        if env_yaml:
313 2
            local_env_yaml = dict()
314 2
            local_env_yaml.update(env_yaml)
315
316 2
        try:
317 2
            data_dict = cls.parse_yaml_into_processed_dict(yaml_file, local_env_yaml)
318
        except DocumentationNotComplete as exc:
319
            raise
320
        except Exception as exc:
321
            msg = (
322
                "Error loading a {class_name} from {filename}: {error}"
323
                .format(class_name=cls.__name__, filename=yaml_file, error=str(exc)))
324
            raise RuntimeError(msg)
325
326 2
        result = cls.get_instance_from_full_dict(data_dict)
327
328 2
        return result
329
330 2
    def represent_as_dict(self):
331
        """
332
        Produce a dict representation of the class.
333
334
        Extend this method if you need the representation to be different from the object.
335
        """
336 2
        data = dict()
337 2
        for key in self.KEYS:
338 2
            value = getattr(self, key)
339 2
            if value or True:
340 2
                data[key] = getattr(self, key)
341 2
        del data["id_"]
342 2
        return data
343
344 2
    def dump_yaml(self, file_name, documentation_complete=True):
345
        to_dump = self.represent_as_dict()
346
        to_dump["documentation_complete"] = documentation_complete
347
        with open(file_name, "w+") as f:
348
            dump_yaml_preferably_in_original_order(to_dump, f)
349
350 2
    def to_xml_element(self):
351
        raise NotImplementedError()
352
353
354 2
class Profile(XCCDFEntity, SelectionHandler):
355
    """Represents XCCDF profile
356
    """
357 2
    KEYS = dict(
358
        title=lambda: "",
359
        description=lambda: "",
360
        extends=lambda: "",
361
        metadata=lambda: None,
362
        reference=lambda: None,
363
        selections=lambda: list(),
364
        platforms=lambda: set(),
365
        cpe_names=lambda: set(),
366
        platform=lambda: None,
367
        filter_rules=lambda: "",
368
        ** XCCDFEntity.KEYS
369
    )
370
371 2
    MANDATORY_KEYS = {
372
        "title",
373
        "description",
374
        "selections",
375
    }
376
377 2
    @classmethod
378
    def process_input_dict(cls, input_contents, env_yaml):
379 2
        input_contents = super(Profile, cls).process_input_dict(input_contents, env_yaml)
380
381 2
        platform = input_contents.get("platform")
382 2
        if platform is not None:
383
            input_contents["platforms"].add(platform)
384
385 2
        if env_yaml:
386 2
            for platform in input_contents["platforms"]:
387
                try:
388
                    new_cpe_name = env_yaml["product_cpes"].get_cpe_name(platform)
389
                    input_contents["cpe_names"].add(new_cpe_name)
390
                except CPEDoesNotExist:
391
                    msg = (
392
                        "Unsupported platform '{platform}' in a profile."
393
                        .format(platform=platform))
394
                    raise CPEDoesNotExist(msg)
395
396 2
        return input_contents
397
398 2
    @property
399
    def rule_filter(self):
400
        if self.filter_rules:
401
            return rule_filter_from_def(self.filter_rules)
402
        else:
403
            return noop_rule_filterfunc
404
405 2
    def to_xml_element(self):
406
        element = ET.Element('Profile')
407
        element.set("id", self.id_)
408
        if self.extends:
409
            element.set("extends", self.extends)
410
        title = add_sub_element(element, "title", self.title)
411
        title.set("override", "true")
412
        desc = add_sub_element(element, "description", self.description)
413
        desc.set("override", "true")
414
415
        if self.reference:
416
            add_sub_element(element, "reference", escape(self.reference))
417
418
        for cpe_name in self.cpe_names:
419
            plat = ET.SubElement(element, "platform")
420
            plat.set("idref", cpe_name)
421
422
        for selection in self.selected:
423
            select = ET.Element("select")
424
            select.set("idref", selection)
425
            select.set("selected", "true")
426
            element.append(select)
427
428
        for selection in self.unselected:
429
            unselect = ET.Element("select")
430
            unselect.set("idref", selection)
431
            unselect.set("selected", "false")
432
            element.append(unselect)
433
434
        for value_id, selector in self.variables.items():
435
            refine_value = ET.Element("refine-value")
436
            refine_value.set("idref", value_id)
437
            refine_value.set("selector", selector)
438
            element.append(refine_value)
439
440
        for refined_rule, refinement_list in self.refine_rules.items():
441
            refine_rule = ET.Element("refine-rule")
442
            refine_rule.set("idref", refined_rule)
443
            for refinement in refinement_list:
444
                refine_rule.set(refinement[0], refinement[1])
445
            element.append(refine_rule)
446
447
        return element
448
449 2
    def get_rule_selectors(self):
450 2
        return self.selected + self.unselected
451
452 2
    def get_variable_selectors(self):
453 2
        return self.variables
454
455 2
    def validate_refine_rules(self, rules):
456
        existing_rule_ids = [r.id_ for r in rules]
457
        for refine_rule, refinement_list in self.refine_rules.items():
458
            # Take first refinement to ilustrate where the error is
459
            # all refinements in list are invalid, so it doesn't really matter
460
            a_refinement = refinement_list[0]
461
462
            if refine_rule not in existing_rule_ids:
463
                msg = (
464
                    "You are trying to refine a rule that doesn't exist. "
465
                    "Rule '{rule_id}' was not found in the benchmark. "
466
                    "Please check all rule refinements for rule: '{rule_id}', for example: "
467
                    "- {rule_id}.{property_}={value}' in profile {profile_id}."
468
                    .format(rule_id=refine_rule, profile_id=self.id_,
469
                            property_=a_refinement[0], value=a_refinement[1])
470
                    )
471
                raise ValueError(msg)
472
473
            if refine_rule not in self.get_rule_selectors():
474
                msg = ("- {rule_id}.{property_}={value}' in profile '{profile_id}' is refining "
475
                       "a rule that is not selected by it. The refinement will not have any "
476
                       "noticeable effect. Either select the rule or remove the rule refinement."
477
                       .format(rule_id=refine_rule, property_=a_refinement[0],
478
                               value=a_refinement[1], profile_id=self.id_)
479
                       )
480
                raise ValueError(msg)
481
482 2
    def validate_variables(self, variables):
483
        variables_by_id = dict()
484
        for var in variables:
485
            variables_by_id[var.id_] = var
486
487
        for var_id, our_val in self.variables.items():
488
            if var_id not in variables_by_id:
489
                all_vars_list = [" - %s" % v for v in variables_by_id.keys()]
490
                msg = (
491
                    "Value '{var_id}' in profile '{profile_name}' is not known. "
492
                    "We know only variables:\n{var_names}"
493
                    .format(
494
                        var_id=var_id, profile_name=self.id_,
495
                        var_names="\n".join(sorted(all_vars_list)))
496
                )
497
                raise ValueError(msg)
498
499
            allowed_selectors = [str(s) for s in variables_by_id[var_id].options.keys()]
500
            if our_val not in allowed_selectors:
501
                msg = (
502
                    "Value '{var_id}' in profile '{profile_name}' "
503
                    "uses the selector '{our_val}'. "
504
                    "This is not possible, as only selectors {all_selectors} are available. "
505
                    "Either change the selector used in the profile, or "
506
                    "add the selector-value pair to the variable definition."
507
                    .format(
508
                        var_id=var_id, profile_name=self.id_, our_val=our_val,
509
                        all_selectors=allowed_selectors,
510
                    )
511
                )
512
                raise ValueError(msg)
513
514 2
    def validate_rules(self, rules, groups):
515
        existing_rule_ids = [r.id_ for r in rules]
516
        rule_selectors = self.get_rule_selectors()
517
        for id_ in rule_selectors:
518
            if id_ in groups:
519
                msg = (
520
                    "You have selected a group '{group_id}' instead of a "
521
                    "rule. Groups have no effect in the profile and are not "
522
                    "allowed to be selected. Please remove '{group_id}' "
523
                    "from profile '{profile_id}' before proceeding."
524
                    .format(group_id=id_, profile_id=self.id_)
525
                )
526
                raise ValueError(msg)
527
            if id_ not in existing_rule_ids:
528
                msg = (
529
                    "Rule '{rule_id}' was not found in the benchmark. Please "
530
                    "remove rule '{rule_id}' from profile '{profile_id}' "
531
                    "before proceeding."
532
                    .format(rule_id=id_, profile_id=self.id_)
533
                )
534
                raise ValueError(msg)
535
536 2
    def __sub__(self, other):
537
        profile = Profile(self.id_)
538
        profile.title = self.title
539
        profile.description = self.description
540
        profile.extends = self.extends
541
        profile.platforms = self.platforms
542
        profile.platform = self.platform
543
        profile.selected = list(set(self.selected) - set(other.selected))
544
        profile.selected.sort()
545
        profile.unselected = list(set(self.unselected) - set(other.unselected))
546
        profile.variables = dict ((k, v) for (k, v) in self.variables.items()
547
                             if k not in other.variables or v != other.variables[k])
548
        return profile
549
550
551 2
class ResolvableProfile(Profile):
552 2
    def __init__(self, * args, ** kwargs):
553
        super(ResolvableProfile, self).__init__(* args, ** kwargs)
554
        self.resolved = False
555
556 2
    def _controls_ids_to_controls(self, controls_manager, policy_id, control_id_list):
557
        items = [controls_manager.get_control(policy_id, cid) for cid in control_id_list]
558
        return items
559
560 2
    def resolve_controls(self, controls_manager):
561
        pass
562
563 2
    def extend_by(self, extended_profile):
564
        self.update_with(extended_profile)
565
566 2
    def resolve_selections_with_rules(self, rules_by_id):
567
        selections = set()
568
        for rid in self.selected:
569
            if rid not in rules_by_id:
570
                continue
571
            rule = rules_by_id[rid]
572
            if not self.rule_filter(rule):
573
                continue
574
            selections.add(rid)
575
        self.selected = list(selections)
576
577 2
    def resolve(self, all_profiles, rules_by_id, controls_manager=None):
578
        if self.resolved:
579
            return
580
581
        if controls_manager:
582
            self.resolve_controls(controls_manager)
583
584
        self.resolve_selections_with_rules(rules_by_id)
585
586
        if self.extends:
587
            if self.extends not in all_profiles:
588
                msg = (
589
                    "Profile {name} extends profile {extended}, but "
590
                    "only profiles {known_profiles} are available for resolution."
591
                    .format(name=self.id_, extended=self.extends,
592
                            known_profiles=list(all_profiles.keys())))
593
                raise RuntimeError(msg)
594
            extended_profile = all_profiles[self.extends]
595
            extended_profile.resolve(all_profiles, rules_by_id, controls_manager)
596
597
            self.extend_by(extended_profile)
598
599
        self.selected = [s for s in set(self.selected) if s not in self.unselected]
600
601
        self.unselected = []
602
        self.extends = None
603
604
        self.selected = sorted(self.selected)
605
606
        for rid in self.selected:
607
            if rid not in rules_by_id:
608
                msg = (
609
                    "Rule {rid} is selected by {profile}, but the rule is not available. "
610
                    "This may be caused by a discrepancy of prodtypes."
611
                    .format(rid=rid, profile=self.id_))
612
                raise ValueError(msg)
613
614
        self.resolved = True
615
616
617 2
class ProfileWithInlinePolicies(ResolvableProfile):
618 2
    def __init__(self, * args, ** kwargs):
619
        super(ProfileWithInlinePolicies, self).__init__(* args, ** kwargs)
620
        self.controls_by_policy = defaultdict(list)
621
622 2
    def apply_selection(self, item):
623
        # ":" is the delimiter for controls but not when the item is a variable
624
        if ":" in item and "=" not in item:
625
            policy_id, control_id = item.split(":", 1)
626
            self.controls_by_policy[policy_id].append(control_id)
627
        else:
628
            super(ProfileWithInlinePolicies, self).apply_selection(item)
629
630 2
    def _process_controls_ids_into_controls(self, controls_manager, policy_id, controls_ids):
631
        controls = []
632
        for cid in controls_ids:
633
            if not cid.startswith("all"):
634
                controls.extend(
635
                    self._controls_ids_to_controls(controls_manager, policy_id, [cid]))
636
            elif ":" in cid:
637
                _, level_id = cid.split(":", 1)
638
                controls.extend(
639
                    controls_manager.get_all_controls_of_level(policy_id, level_id))
640
            else:
641
                controls.extend(
642
                    controls_manager.get_all_controls(policy_id))
643
        return controls
644
645 2
    def resolve_controls(self, controls_manager):
646
        for policy_id, controls_ids in self.controls_by_policy.items():
647
            controls = self._process_controls_ids_into_controls(
648
                controls_manager, policy_id, controls_ids)
649
650
            for c in controls:
651
                self.update_with(c)
652
653
654 2
class Value(XCCDFEntity):
655
    """Represents XCCDF Value
656
    """
657 2
    KEYS = dict(
658
        title=lambda: "",
659
        description=lambda: "",
660
        type=lambda: "",
661
        operator=lambda: "equals",
662
        interactive=lambda: False,
663
        options=lambda: dict(),
664
        warnings=lambda: list(),
665
        ** XCCDFEntity.KEYS
666
    )
667
668 2
    MANDATORY_KEYS = {
669
        "title",
670
        "description",
671
        "type",
672
    }
673
674 2
    @classmethod
675
    def process_input_dict(cls, input_contents, env_yaml):
676 2
        input_contents["interactive"] = (
677
            input_contents.get("interactive", "false").lower() == "true")
678
679 2
        data = super(Value, cls).process_input_dict(input_contents, env_yaml)
680
681 2
        possible_operators = ["equals", "not equal", "greater than",
682
                              "less than", "greater than or equal",
683
                              "less than or equal", "pattern match"]
684
685 2
        if data["operator"] not in possible_operators:
686
            raise ValueError(
687
                "Found an invalid operator value '%s'. "
688
                "Expected one of: %s"
689
                % (data["operator"], ", ".join(possible_operators))
690
            )
691
692 2
        return data
693
694 2
    @classmethod
695 2
    def from_yaml(cls, yaml_file, env_yaml=None):
696 2
        value = super(Value, cls).from_yaml(yaml_file, env_yaml)
697
698 2
        check_warnings(value)
699
700 2
        return value
701
702 2
    def to_xml_element(self):
703
        value = ET.Element('Value')
704
        value.set('id', self.id_)
705
        value.set('type', self.type)
706
        if self.operator != "equals":  # equals is the default
707
            value.set('operator', self.operator)
708
        if self.interactive:  # False is the default
709
            value.set('interactive', 'true')
710
        title = ET.SubElement(value, 'title')
711
        title.text = self.title
712
        add_sub_element(value, 'description', self.description)
713
        add_warning_elements(value, self.warnings)
714
715
        for selector, option in self.options.items():
716
            # do not confuse Value with big V with value with small v
717
            # value is child element of Value
718
            value_small = ET.SubElement(value, 'value')
719
            # by XCCDF spec, default value is value without selector
720
            if selector != "default":
721
                value_small.set('selector', str(selector))
722
            value_small.text = str(option)
723
724
        return value
725
726 2
    def to_file(self, file_name):
727
        root = self.to_xml_element()
728
        tree = ET.ElementTree(root)
729
        tree.write(file_name)
730
731
732 2
class Benchmark(XCCDFEntity):
733
    """Represents XCCDF Benchmark
734
    """
735 2
    KEYS = dict(
736
        title=lambda: "",
737
        status=lambda: "",
738
        description=lambda: "",
739
        notice_id=lambda: "",
740
        notice_description=lambda: "",
741
        front_matter=lambda: "",
742
        rear_matter=lambda: "",
743
        cpes=lambda: list(),
744
        version=lambda: "0",
745
        profiles=lambda: list(),
746
        values=lambda: dict(),
747
        groups=lambda: dict(),
748
        rules=lambda: dict(),
749
        product_cpe_names=lambda: list(),
750
        ** XCCDFEntity.KEYS
751
    )
752
753 2
    MANDATORY_KEYS = {
754
        "title",
755
        "status",
756
        "description",
757
        "front_matter",
758
        "rear_matter",
759
        "version",
760
    }
761
762 2
    GENERIC_FILENAME = "benchmark.yml"
763
764 2 View Code Duplication
    def load_entities(self, rules_by_id, values_by_id, groups_by_id):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
765
        for rid, val in self.rules.items():
766
            if not val:
767
                self.rules[rid] = rules_by_id[rid]
768
769
        for vid, val in self.values.items():
770
            if not val:
771
                self.values[vid] = values_by_id[vid]
772
773
        for gid, val in self.groups.items():
774
            if not val:
775
                self.groups[gid] = groups_by_id[gid]
776
777 2
    @classmethod
778
    def process_input_dict(cls, input_contents, env_yaml):
779
        input_contents["front_matter"] = input_contents["front-matter"]
780
        del input_contents["front-matter"]
781
        input_contents["rear_matter"] = input_contents["rear-matter"]
782
        del input_contents["rear-matter"]
783
784
        data = super(Benchmark, cls).process_input_dict(input_contents, env_yaml)
785
786
        notice_contents = required_key(input_contents, "notice")
787
        del input_contents["notice"]
788
789
        data["notice_id"] = required_key(notice_contents, "id")
790
        del notice_contents["id"]
791
792
        data["notice_description"] = required_key(notice_contents, "description")
793
        del notice_contents["description"]
794
795
        data["version"] = str(data["version"])
796
797
        return data
798
799 2
    def represent_as_dict(self):
800
        data = super(Benchmark, cls).represent_as_dict()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable cls does not seem to be defined.
Loading history...
801
        data["rear-matter"] = data["rear_matter"]
802
        del data["rear_matter"]
803
804
        data["front-matter"] = data["front_matter"]
805
        del data["front_matter"]
806
        return data
807
808 2
    @classmethod
809 2
    def from_yaml(cls, yaml_file, env_yaml=None, benchmark_id="product-name"):
810
        benchmark = super(Benchmark, cls).from_yaml(yaml_file, env_yaml)
811
        if env_yaml:
812
            benchmark.product_cpe_names = env_yaml["product_cpes"].get_product_cpe_names()
813
            benchmark.cpe_platform_spec = env_yaml["product_cpes"].cpe_platform_specification
814
815
        benchmark.id_ = benchmark_id
816
817
        return benchmark
818
819 2
    def add_profiles_from_dir(self, dir_, env_yaml):
820
        for dir_item in sorted(os.listdir(dir_)):
821
            dir_item_path = os.path.join(dir_, dir_item)
822
            if not os.path.isfile(dir_item_path):
823
                continue
824
825
            _, ext = os.path.splitext(os.path.basename(dir_item_path))
826
            if ext != '.profile':
827
                sys.stderr.write(
828
                    "Encountered file '%s' while looking for profiles, "
829
                    "extension '%s' is unknown. Skipping..\n"
830
                    % (dir_item, ext)
831
                )
832
                continue
833
834
            try:
835
                new_profile = ProfileWithInlinePolicies.from_yaml(dir_item_path, env_yaml)
836
            except DocumentationNotComplete:
837
                continue
838
            except Exception as exc:
839
                msg = ("Error building profile from '{fname}': '{error}'"
840
                       .format(fname=dir_item_path, error=str(exc)))
841
                raise RuntimeError(msg)
842
            if new_profile is None:
843
                continue
844
845
            self.profiles.append(new_profile)
846
847 2
    def to_xml_element(self):
848
        root = ET.Element('Benchmark')
849
        root.set('xmlns:xsi', 'http://www.w3.org/2001/XMLSchema-instance')
850
        root.set('xmlns:xhtml', 'http://www.w3.org/1999/xhtml')
851
        root.set('xmlns:dc', 'http://purl.org/dc/elements/1.1/')
852
        root.set('xmlns:cpe-lang', 'http://cpe.mitre.org/language/2.0')
853
        root.set('id', 'product-name')
854
        root.set('xsi:schemaLocation',
855
                 'http://checklists.nist.gov/xccdf/1.1 xccdf-1.1.4.xsd')
856
        root.set('style', 'SCAP_1.1')
857
        root.set('resolved', 'false')
858
        root.set('xml:lang', 'en-US')
859
        status = ET.SubElement(root, 'status')
860
        status.set('date', datetime.date.today().strftime("%Y-%m-%d"))
861
        status.text = self.status
862
        add_sub_element(root, "title", self.title)
863
        add_sub_element(root, "description", self.description)
864
        notice = add_sub_element(root, "notice", self.notice_description)
865
        notice.set('id', self.notice_id)
866
        add_sub_element(root, "front-matter", self.front_matter)
867
        add_sub_element(root, "rear-matter", self.rear_matter)
868
        # if there are no platforms, do not output platform-specification at all
869
        if len(self.cpe_platform_spec.platforms) > 0:
870
            root.append(self.cpe_platform_spec.to_xml_element())
871
872
        # The Benchmark applicability is determined by the CPEs
873
        # defined in the product.yml
874
        for cpe_name in self.product_cpe_names:
875
            plat = ET.SubElement(root, "platform")
876
            plat.set("idref", cpe_name)
877
878
        version = ET.SubElement(root, 'version')
879
        version.text = self.version
880
        ET.SubElement(root, "metadata")
881
882
        for profile in self.profiles:
883
            root.append(profile.to_xml_element())
884
885
        for value in self.values.values():
886
            root.append(value.to_xml_element())
887
888
        groups_in_bench = list(self.groups.keys())
889
        priority_order = ["system", "services"]
890
        groups_in_bench = reorder_according_to_ordering(groups_in_bench, priority_order)
891
892
        # Make system group the first, followed by services group
893
        for group_id in groups_in_bench:
894
            group = self.groups.get(group_id)
895
            # Products using application benchmark don't have system or services group
896
            if group is not None:
897
                root.append(group.to_xml_element())
898
899
        for rule in self.rules.values():
900
            root.append(rule.to_xml_element())
901
902
        return root
903
904 2
    def to_file(self, file_name, ):
905
        root = self.to_xml_element()
906
        tree = ET.ElementTree(root)
907
        tree.write(file_name)
908
909 2
    def add_value(self, value):
910
        if value is None:
911
            return
912
        self.values[value.id_] = value
913
914
    # The benchmark is also considered a group, so this function signature needs to match
915
    # Group()'s add_group()
916 2
    def add_group(self, group, env_yaml=None):
917
        if group is None:
918
            return
919
        self.groups[group.id_] = group
920
921 2
    def add_rule(self, rule):
922
        if rule is None:
923
            return
924
        self.rules[rule.id_] = rule
925
926 2
    def to_xccdf(self):
927
        """We can easily extend this script to generate a valid XCCDF instead
928
        of SSG SHORTHAND.
929
        """
930
        raise NotImplementedError
931
932 2
    def __str__(self):
933
        return self.id_
934
935
936 2
class Group(XCCDFEntity):
937
    """Represents XCCDF Group
938
    """
939 2
    ATTRIBUTES_TO_PASS_ON = (
940
        "platforms",
941
        "cpe_platform_names",
942
    )
943
944 2
    GENERIC_FILENAME = "group.yml"
945
946 2
    KEYS = dict(
947
        prodtype=lambda: "all",
948
        title=lambda: "",
949
        description=lambda: "",
950
        warnings=lambda: list(),
951
        requires=lambda: list(),
952
        conflicts=lambda: list(),
953
        values=lambda: dict(),
954
        groups=lambda: dict(),
955
        rules=lambda: dict(),
956
        platform=lambda: "",
957
        platforms=lambda: set(),
958
        cpe_platform_names=lambda: set(),
959
        ** XCCDFEntity.KEYS
960
    )
961
962 2
    MANDATORY_KEYS = {
963
        "title",
964
        "status",
965
        "description",
966
        "front_matter",
967
        "rear_matter",
968
        "version",
969
    }
970
971 2
    @classmethod
972
    def process_input_dict(cls, input_contents, env_yaml):
973
        data = super(Group, cls).process_input_dict(input_contents, env_yaml)
974
        if data["rules"]:
975
            rule_ids = data["rules"]
976
            data["rules"] = {rid: None for rid in rule_ids}
977
978
        if data["groups"]:
979
            group_ids = data["groups"]
980
            data["groups"] = {gid: None for gid in group_ids}
981
982
        if data["values"]:
983
            value_ids = data["values"]
984
            data["values"] = {vid: None for vid in value_ids}
985
986
        if data["platform"]:
987
            data["platforms"].add(data["platform"])
988
989
        # parse platform definition and get CPEAL platform
990
        if data["platforms"]:
991
            for platform in data["platforms"]:
992
                cpe_platform = parse_platform_definition(platform, env_yaml["product_cpes"])
993
                data["cpe_platform_names"].add(cpe_platform.id)
994
                # add platform to platform specification
995
                env_yaml["product_cpes"].cpe_platform_specification.add_platform(cpe_platform)
996
        return data
997
998 2 View Code Duplication
    def load_entities(self, rules_by_id, values_by_id, groups_by_id):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
999
        for rid, val in self.rules.items():
1000
            if not val:
1001
                self.rules[rid] = rules_by_id[rid]
1002
1003
        for vid, val in self.values.items():
1004
            if not val:
1005
                self.values[vid] = values_by_id[vid]
1006
1007
        for gid, val in self.groups.items():
1008
            if not val:
1009
                self.groups[gid] = groups_by_id[gid]
1010
1011 2
    def represent_as_dict(self):
1012
        yaml_contents = super(Group, self).represent_as_dict()
1013
1014
        if self.rules:
1015
            yaml_contents["rules"] = sorted(list(self.rules.keys()))
1016
        if self.groups:
1017
            yaml_contents["groups"] = sorted(list(self.groups.keys()))
1018
        if self.values:
1019
            yaml_contents["values"] = sorted(list(self.values.keys()))
1020
1021
        return yaml_contents
1022
1023 2
    def validate_prodtype(self, yaml_file):
1024
        for ptype in self.prodtype.split(","):
1025
            if ptype.strip() != ptype:
1026
                msg = (
1027
                    "Comma-separated '{prodtype}' prodtype "
1028
                    "in {yaml_file} contains whitespace."
1029
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
1030
                raise ValueError(msg)
1031
1032 2
    def to_xml_element(self):
1033
        group = ET.Element('Group')
1034
        group.set('id', self.id_)
1035
        if self.prodtype != "all":
1036
            group.set("prodtype", self.prodtype)
1037
        title = ET.SubElement(group, 'title')
1038
        title.text = self.title
1039
        add_sub_element(group, 'description', self.description)
1040
        add_warning_elements(group, self.warnings)
1041
        add_nondata_subelements(group, "requires", "id", self.requires)
1042
        add_nondata_subelements(group, "conflicts", "id", self.conflicts)
1043
1044
        for cpe_platform_name in self.cpe_platform_names:
1045
            platform_el = ET.SubElement(group, "platform")
1046
            platform_el.set("idref", "#"+cpe_platform_name)
1047
1048
        for _value in self.values.values():
1049
            group.append(_value.to_xml_element())
1050
1051
        # Rules that install or remove packages affect remediation
1052
        # of other rules.
1053
        # When packages installed/removed rules come first:
1054
        # The Rules are ordered in more logical way, and
1055
        # remediation order is natural, first the package is installed, then configured.
1056
        rules_in_group = list(self.rules.keys())
1057
        regex = (r'(package_.*_(installed|removed))|' +
1058
                 r'(service_.*_(enabled|disabled))|' +
1059
                 r'install_smartcard_packages$')
1060
        priority_order = ["installed", "install_smartcard_packages", "removed",
1061
                          "enabled", "disabled"]
1062
        rules_in_group = reorder_according_to_ordering(rules_in_group, priority_order, regex)
1063
1064
        # Add rules in priority order, first all packages installed, then removed,
1065
        # followed by services enabled, then disabled
1066
        for rule_id in rules_in_group:
1067
            group.append(self.rules.get(rule_id).to_xml_element())
1068
1069
        # Add the sub groups after any current level group rules.
1070
        # As package installed/removed and service enabled/disabled rules are usuallly in
1071
        # top level group, this ensures groups that further configure a package or service
1072
        # are after rules that install or remove it.
1073
        groups_in_group = list(self.groups.keys())
1074
        priority_order = [
1075
            # Make sure rpm_verify_(hashes|permissions|ownership) are run before any other rule.
1076
            # Due to conflicts between rules rpm_verify_* rules and any rule that configures
1077
            # stricter settings, like file_permissions_grub2_cfg and sudo_dedicated_group,
1078
            # the rules deviating from the system default should be evaluated later.
1079
            # So that in the end the system has contents, permissions and ownership reset, and
1080
            # any deviations or stricter settings are applied by the rules in the profile.
1081
            "software", "integrity", "integrity-software", "rpm_verification",
1082
1083
            # The account group has to precede audit group because
1084
            # the rule package_screen_installed is desired to be executed before the rule
1085
            # audit_rules_privileged_commands, othervise the rule
1086
            # does not catch newly installed screen binary during remediation
1087
            # and report fail
1088
            "accounts", "auditing",
1089
1090
1091
            # The FIPS group should come before Crypto,
1092
            # if we want to set a different (stricter) Crypto Policy than FIPS.
1093
            "fips", "crypto",
1094
1095
            # The firewalld_activation must come before ruleset_modifications, othervise
1096
            # remediations for ruleset_modifications won't work
1097
            "firewalld_activation", "ruleset_modifications",
1098
1099
            # Rules from group disabling_ipv6 must precede rules from configuring_ipv6,
1100
            # otherwise the remediation prints error although it is successful
1101
            "disabling_ipv6", "configuring_ipv6"
1102
        ]
1103
        groups_in_group = reorder_according_to_ordering(groups_in_group, priority_order)
1104
        for group_id in groups_in_group:
1105
            _group = self.groups[group_id]
1106
            group.append(_group.to_xml_element())
1107
1108
        return group
1109
1110 2
    def to_file(self, file_name):
1111
        root = self.to_xml_element()
1112
        tree = ET.ElementTree(root)
1113
        tree.write(file_name)
1114
1115 2
    def add_value(self, value):
1116
        if value is None:
1117
            return
1118
        self.values[value.id_] = value
1119
1120 2
    def add_group(self, group, env_yaml=None):
1121
        if group is None:
1122
            return
1123
        if self.platforms and not group.platforms:
1124
            group.platforms = self.platforms
1125
        self.groups[group.id_] = group
1126
        self._pass_our_properties_on_to(group)
1127
1128
        # Once the group has inherited properties, update cpe_names
1129
        if env_yaml:
1130
            for platform in group.platforms:
1131
                cpe_platform = parse_platform_definition(
1132
                    platform, env_yaml["product_cpes"])
1133
                group.cpe_platform_names.add(cpe_platform.id)
1134
                env_yaml["product_cpes"].cpe_platform_specification.add_platform(
1135
                    cpe_platform)
1136
1137 2
    def _pass_our_properties_on_to(self, obj):
1138
        for attr in self.ATTRIBUTES_TO_PASS_ON:
1139
            if hasattr(obj, attr) and getattr(obj, attr) is None:
1140
                setattr(obj, attr, getattr(self, attr))
1141
1142 2
    def add_rule(self, rule, env_yaml=None):
1143
        if rule is None:
1144
            return
1145
        if self.platforms and not rule.platforms:
1146
            rule.platforms = self.platforms
1147
        self.rules[rule.id_] = rule
1148
        self._pass_our_properties_on_to(rule)
1149
1150
        # Once the rule has inherited properties, update cpe_platform_names
1151
        if env_yaml:
1152
            for platform in rule.platforms:
1153
                cpe_platform = parse_platform_definition(
1154
                    platform, env_yaml["product_cpes"])
1155
                rule.cpe_platform_names.add(cpe_platform.id)
1156
                env_yaml["product_cpes"].cpe_platform_specification.add_platform(
1157
                    cpe_platform)
1158
1159 2
    def __str__(self):
1160
        return self.id_
1161
1162
1163 2
def noop_rule_filterfunc(rule):
1164
    return True
1165
1166 2
def rule_filter_from_def(filterdef):
1167
    if filterdef is None or filterdef == "":
1168
        return noop_rule_filterfunc
1169
1170
    def filterfunc(rule):
1171
        # Remove globals for security and only expose
1172
        # variables relevant to the rule
1173
        return eval(filterdef, {"__builtins__": None}, rule.__dict__)
1174
    return filterfunc
1175
1176
1177 2
class Rule(XCCDFEntity):
1178
    """Represents XCCDF Rule
1179
    """
1180 2
    KEYS = dict(
1181
        prodtype=lambda: "all",
1182
        title=lambda: "",
1183
        description=lambda: "",
1184
        rationale=lambda: "",
1185
        severity=lambda: "",
1186
        references=lambda: dict(),
1187
        identifiers=lambda: dict(),
1188
        ocil_clause=lambda: None,
1189
        ocil=lambda: None,
1190
        oval_external_content=lambda: None,
1191
        warnings=lambda: list(),
1192
        conflicts=lambda: list(),
1193
        requires=lambda: list(),
1194
        platform=lambda: None,
1195
        platforms=lambda: set(),
1196
        inherited_platforms=lambda: list(),
1197
        template=lambda: None,
1198
        cpe_platform_names=lambda: set(),
1199
        ** XCCDFEntity.KEYS
1200
    )
1201
1202 2
    MANDATORY_KEYS = {
1203
        "title",
1204
        "description",
1205
        "rationale",
1206
        "severity",
1207
    }
1208
1209 2
    GENERIC_FILENAME = "rule.yml"
1210 2
    ID_LABEL = "rule_id"
1211
1212 2
    PRODUCT_REFERENCES = ("stigid", "cis",)
1213 2
    GLOBAL_REFERENCES = ("srg", "vmmsrg", "disa", "cis-csc",)
1214
1215 2
    def __init__(self, id_):
1216 2
        super(Rule, self).__init__(id_)
1217 2
        self.sce_metadata = None
1218
1219 2
    def __deepcopy__(self, memo):
1220
        cls = self.__class__
1221
        result = cls.__new__(cls)
1222
        memo[id(self)] = result
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable id does not seem to be defined.
Loading history...
1223
        for k, v in self.__dict__.items():
1224
            # These are difficult to deep copy, so let's just re-use them.
1225
            if k != "template" and k != "local_env_yaml":
1226
                setattr(result, k, deepcopy(v, memo))
1227
            else:
1228
                setattr(result, k, v)
1229
        return result
1230
1231 2
    @classmethod
1232 2
    def from_yaml(cls, yaml_file, env_yaml=None, sce_metadata=None):
1233 2
        rule = super(Rule, cls).from_yaml(yaml_file, env_yaml)
1234
1235
        # platforms are read as list from the yaml file
1236
        # we need them to convert to set again
1237 2
        rule.platforms = set(rule.platforms)
1238
1239
        # rule.platforms.update(set(rule.inherited_platforms))
1240
1241 2
        check_warnings(rule)
1242
1243
        # ensure that content of rule.platform is in rule.platforms as
1244
        # well
1245 2
        if rule.platform is not None:
1246 2
            rule.platforms.add(rule.platform)
1247
1248
        # Convert the platform names to CPE names
1249
        # But only do it if an env_yaml was specified (otherwise there would be no product CPEs
1250
        # to lookup), and the rule's prodtype matches the product being built
1251 2
        if (
1252
                env_yaml and env_yaml["product"] in parse_prodtype(rule.prodtype)
1253
                or env_yaml and rule.prodtype == "all"):
1254
            # parse platform definition and get CPEAL platform
1255
            for platform in rule.platforms:
1256
                cpe_platform = parse_platform_definition(
1257
                    platform, env_yaml["product_cpes"])
1258
                rule.cpe_platform_names.add(cpe_platform.id)
1259
                # add platform to platform specification
1260
                env_yaml["product_cpes"].cpe_platform_specification.add_platform(
1261
                    cpe_platform)
1262
1263
1264 2
        if sce_metadata and rule.id_ in sce_metadata:
1265
            rule.sce_metadata = sce_metadata[rule.id_]
1266
            rule.sce_metadata["relative_path"] = os.path.join(
1267
                env_yaml["product"], "checks/sce", rule.sce_metadata['filename'])
1268
1269 2
        rule.validate_prodtype(yaml_file)
1270 2
        rule.validate_identifiers(yaml_file)
1271 2
        rule.validate_references(yaml_file)
1272 2
        return rule
1273
1274 2
    def _verify_stigid_format(self, product):
1275 2
        stig_id = self.references.get("stigid", None)
1276 2
        if not stig_id:
1277 2
            return
1278 2
        if "," in stig_id:
1279 2
            raise ValueError("Rules can not have multiple STIG IDs.")
1280
1281 2
    def _verify_disa_cci_format(self):
1282 2
        cci_id = self.references.get("disa", None)
1283 2
        if not cci_id:
1284 2
            return
1285
        cci_ex = re.compile(r'^CCI-[0-9]{6}$')
1286
        for cci in cci_id.split(","):
1287
            if not cci_ex.match(cci):
1288
                raise ValueError("CCI '{}' is in the wrong format! "
1289
                                 "Format should be similar to: "
1290
                                 "CCI-XXXXXX".format(cci))
1291
        self.references["disa"] = cci_id
1292
1293 2
    def normalize(self, product):
1294 2
        try:
1295 2
            self.make_refs_and_identifiers_product_specific(product)
1296 2
            self.make_template_product_specific(product)
1297 2
        except Exception as exc:
1298 2
            msg = (
1299
                "Error normalizing '{rule}': {msg}"
1300
                .format(rule=self.id_, msg=str(exc))
1301
            )
1302 2
            raise RuntimeError(msg)
1303
1304 2
    def _get_product_only_references(self):
1305 2
        product_references = dict()
1306
1307 2
        for ref in Rule.PRODUCT_REFERENCES:
1308 2
            start = "{0}@".format(ref)
1309 2
            for gref, gval in self.references.items():
1310 2
                if ref == gref or gref.startswith(start):
1311 2
                    product_references[gref] = gval
1312 2
        return product_references
1313
1314 2
    def make_template_product_specific(self, product):
1315 2
        product_suffix = "@{0}".format(product)
1316
1317 2
        if not self.template:
1318
            return
1319
1320 2
        not_specific_vars = self.template.get("vars", dict())
1321 2
        specific_vars = self._make_items_product_specific(
1322
            not_specific_vars, product_suffix, True)
1323 2
        self.template["vars"] = specific_vars
1324
1325 2
        not_specific_backends = self.template.get("backends", dict())
1326 2
        specific_backends = self._make_items_product_specific(
1327
            not_specific_backends, product_suffix, True)
1328 2
        self.template["backends"] = specific_backends
1329
1330 2
    def make_refs_and_identifiers_product_specific(self, product):
1331 2
        product_suffix = "@{0}".format(product)
1332
1333 2
        product_references = self._get_product_only_references()
1334 2
        general_references = self.references.copy()
1335 2
        for todel in product_references:
1336 2
            general_references.pop(todel)
1337 2
        for ref in Rule.PRODUCT_REFERENCES:
1338 2
            if ref in general_references:
1339
                msg = "Unexpected reference identifier ({0}) without "
1340
                msg += "product qualifier ({0}@{1}) while building rule "
1341
                msg += "{2}"
1342
                msg = msg.format(ref, product, self.id_)
1343
                raise ValueError(msg)
1344
1345 2
        to_set = dict(
1346
            identifiers=(self.identifiers, False),
1347
            general_references=(general_references, True),
1348
            product_references=(product_references, False),
1349
        )
1350 2
        for name, (dic, allow_overwrites) in to_set.items():
1351 2
            try:
1352 2
                new_items = self._make_items_product_specific(
1353
                    dic, product_suffix, allow_overwrites)
1354 2
            except ValueError as exc:
1355 2
                msg = (
1356
                    "Error processing {what} for rule '{rid}': {msg}"
1357
                    .format(what=name, rid=self.id_, msg=str(exc))
1358
                )
1359 2
                raise ValueError(msg)
1360 2
            dic.clear()
1361 2
            dic.update(new_items)
1362
1363 2
        self.references = general_references
1364 2
        self._verify_disa_cci_format()
1365 2
        self.references.update(product_references)
1366
1367 2
        self._verify_stigid_format(product)
1368
1369 2
    def _make_items_product_specific(self, items_dict, product_suffix, allow_overwrites=False):
1370 2
        new_items = dict()
1371 2
        for full_label, value in items_dict.items():
1372 2
            if "@" not in full_label and full_label not in new_items:
1373 2
                new_items[full_label] = value
1374 2
                continue
1375
1376 2
            label = full_label.split("@")[0]
1377
1378
            # this test should occur before matching product_suffix with the product qualifier
1379
            # present in the reference, so it catches problems even for products that are not
1380
            # being built at the moment
1381 2
            if label in Rule.GLOBAL_REFERENCES:
1382
                msg = (
1383
                    "You cannot use product-qualified for the '{item_u}' reference. "
1384
                    "Please remove the product-qualifier and merge values with the "
1385
                    "existing reference if there is any. Original line: {item_q}: {value_q}"
1386
                    .format(item_u=label, item_q=full_label, value_q=value)
1387
                )
1388
                raise ValueError(msg)
1389
1390 2
            if not full_label.endswith(product_suffix):
1391 2
                continue
1392
1393 2
            if label in items_dict and not allow_overwrites and value != items_dict[label]:
1394 2
                msg = (
1395
                    "There is a product-qualified '{item_q}' item, "
1396
                    "but also an unqualified '{item_u}' item "
1397
                    "and those two differ in value - "
1398
                    "'{value_q}' vs '{value_u}' respectively."
1399
                    .format(item_q=full_label, item_u=label,
1400
                            value_q=value, value_u=items_dict[label])
1401
                )
1402 2
                raise ValueError(msg)
1403 2
            new_items[label] = value
1404 2
        return new_items
1405
1406 2
    def validate_identifiers(self, yaml_file):
1407 2
        if self.identifiers is None:
1408
            raise ValueError("Empty identifier section in file %s" % yaml_file)
1409
1410
        # Validate all identifiers are non-empty:
1411 2
        for ident_type, ident_val in self.identifiers.items():
1412 2
            if not isinstance(ident_type, str) or not isinstance(ident_val, str):
1413
                raise ValueError("Identifiers and values must be strings: %s in file %s"
1414
                                 % (ident_type, yaml_file))
1415 2
            if ident_val.strip() == "":
1416
                raise ValueError("Identifiers must not be empty: %s in file %s"
1417
                                 % (ident_type, yaml_file))
1418 2
            if ident_type[0:3] == 'cce':
1419 2
                if not is_cce_format_valid(ident_val):
1420
                    raise ValueError("CCE Identifier format must be valid: invalid format '%s' for CEE '%s'"
1421
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
1422 2
                if not is_cce_value_valid("CCE-" + ident_val):
1423
                    raise ValueError("CCE Identifier value is not a valid checksum: invalid value '%s' for CEE '%s'"
1424
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
1425
1426 2
    def validate_references(self, yaml_file):
1427 2
        if self.references is None:
1428
            raise ValueError("Empty references section in file %s" % yaml_file)
1429
1430 2
        for ref_type, ref_val in self.references.items():
1431 2
            if not isinstance(ref_type, str) or not isinstance(ref_val, str):
1432
                raise ValueError("References and values must be strings: %s in file %s"
1433
                                 % (ref_type, yaml_file))
1434 2
            if ref_val.strip() == "":
1435
                raise ValueError("References must not be empty: %s in file %s"
1436
                                 % (ref_type, yaml_file))
1437
1438 2
        for ref_type, ref_val in self.references.items():
1439 2
            for ref in ref_val.split(","):
1440 2
                if ref.strip() != ref:
1441
                    msg = (
1442
                        "Comma-separated '{ref_type}' reference "
1443
                        "in {yaml_file} contains whitespace."
1444
                        .format(ref_type=ref_type, yaml_file=yaml_file))
1445
                    raise ValueError(msg)
1446
1447 2
    def validate_prodtype(self, yaml_file):
1448 2
        for ptype in self.prodtype.split(","):
1449 2
            if ptype.strip() != ptype:
1450
                msg = (
1451
                    "Comma-separated '{prodtype}' prodtype "
1452
                    "in {yaml_file} contains whitespace."
1453
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
1454
                raise ValueError(msg)
1455
1456 2
    def to_xml_element(self):
1457
        rule = ET.Element('Rule')
1458
        rule.set('id', self.id_)
1459
        if self.prodtype != "all":
1460
            rule.set("prodtype", self.prodtype)
1461
        rule.set('severity', self.severity)
1462
        add_sub_element(rule, 'title', self.title)
1463
        add_sub_element(rule, 'description', self.description)
1464
        add_sub_element(rule, 'rationale', self.rationale)
1465
1466
        main_ident = ET.Element('ident')
1467
        for ident_type, ident_val in self.identifiers.items():
1468
            # This is not true if items were normalized
1469
            if '@' in ident_type:
1470
                # the ident is applicable only on some product
1471
                # format : 'policy@product', eg. 'stigid@product'
1472
                # for them, we create a separate <ref> element
1473
                policy, product = ident_type.split('@')
1474
                ident = ET.SubElement(rule, 'ident')
1475
                ident.set(policy, ident_val)
1476
                ident.set('prodtype', product)
1477
            else:
1478
                main_ident.set(ident_type, ident_val)
1479
1480
        if main_ident.attrib:
1481
            rule.append(main_ident)
1482
1483
        main_ref = ET.Element('ref')
1484
        for ref_type, ref_val in self.references.items():
1485
            # This is not true if items were normalized
1486
            if '@' in ref_type:
1487
                # the reference is applicable only on some product
1488
                # format : 'policy@product', eg. 'stigid@product'
1489
                # for them, we create a separate <ref> element
1490
                policy, product = ref_type.split('@')
1491
                ref = ET.SubElement(rule, 'ref')
1492
                ref.set(policy, ref_val)
1493
                ref.set('prodtype', product)
1494
            else:
1495
                main_ref.set(ref_type, ref_val)
1496
1497
        if main_ref.attrib:
1498
            rule.append(main_ref)
1499
1500
        ocil_parent = rule
1501
        check_parent = rule
1502
1503
        if self.sce_metadata:
1504
            # TODO: This is pretty much another hack, just like the previous OVAL
1505
            # one. However, we avoided the external SCE content as I'm not sure it
1506
            # is generally useful (unlike say, CVE checking with external OVAL)
1507
            #
1508
            # Additionally, we build the content (check subelement) here rather
1509
            # than in xslt due to the nature of our SCE metadata.
1510
            #
1511
            # Finally, before we begin, we might have an element with both SCE
1512
            # and OVAL. We have no way of knowing (right here) whether that is
1513
            # the case (due to a variety of issues, most notably, that linking
1514
            # hasn't yet occurred). So we must rely on the content author's
1515
            # good will, by annotating SCE content with a complex-check tag
1516
            # if necessary.
1517
1518
            if 'complex-check' in self.sce_metadata:
1519
                # Here we have an issue: XCCDF allows EITHER one or more check
1520
                # elements OR a single complex-check. While we have an explicit
1521
                # case handling the OVAL-and-SCE interaction, OCIL entries have
1522
                # (historically) been alongside OVAL content and been in an
1523
                # "OR" manner -- preferring OVAL to SCE. In order to accomplish
1524
                # this, we thus need to add _yet another parent_ when OCIL data
1525
                # is present, and add update ocil_parent accordingly.
1526
                if self.ocil or self.ocil_clause:
1527
                    ocil_parent = ET.SubElement(ocil_parent, "complex-check")
1528
                    ocil_parent.set('operator', 'OR')
1529
1530
                check_parent = ET.SubElement(ocil_parent, "complex-check")
1531
                check_parent.set('operator', self.sce_metadata['complex-check'])
1532
1533
            # Now, add the SCE check element to the tree.
1534
            check = ET.SubElement(check_parent, "check")
1535
            check.set("system", SCE_SYSTEM)
1536
1537
            if 'check-import' in self.sce_metadata:
1538
                if isinstance(self.sce_metadata['check-import'], str):
1539
                    self.sce_metadata['check-import'] = [self.sce_metadata['check-import']]
1540
                for entry in self.sce_metadata['check-import']:
1541
                    check_import = ET.SubElement(check, 'check-import')
1542
                    check_import.set('import-name', entry)
1543
                    check_import.text = None
1544
1545
            if 'check-export' in self.sce_metadata:
1546
                if isinstance(self.sce_metadata['check-export'], str):
1547
                    self.sce_metadata['check-export'] = [self.sce_metadata['check-export']]
1548
                for entry in self.sce_metadata['check-export']:
1549
                    export, value = entry.split('=')
1550
                    check_export = ET.SubElement(check, 'check-export')
1551
                    check_export.set('value-id', value)
1552
                    check_export.set('export-name', export)
1553
                    check_export.text = None
1554
1555
            check_ref = ET.SubElement(check, "check-content-ref")
1556
            href = self.sce_metadata['relative_path']
1557
            check_ref.set("href", href)
1558
1559
        if self.oval_external_content:
1560
            check = ET.SubElement(check_parent, 'check')
1561
            check.set("system", "http://oval.mitre.org/XMLSchema/oval-definitions-5")
1562
            external_content = ET.SubElement(check, "check-content-ref")
1563
            external_content.set("href", self.oval_external_content)
1564
        else:
1565
            # TODO: This is pretty much a hack, oval ID will be the same as rule ID
1566
            #       and we don't want the developers to have to keep them in sync.
1567
            #       Therefore let's just add an OVAL ref of that ID.
1568
            oval_ref = ET.SubElement(check_parent, "oval")
1569
            oval_ref.set("id", self.id_)
1570
1571
        if self.ocil or self.ocil_clause:
1572
            ocil_check = ET.SubElement(check_parent, "check")
1573
            ocil_check.set("system", ocil_cs)
1574
            ocil_check_ref = ET.SubElement(ocil_check, "check-content-ref")
1575
            ocil_check_ref.set("href", "ocil-unlinked.xml")
1576
            ocil_check_ref.set("name", self.id_ + "_ocil")
1577
1578
        add_warning_elements(rule, self.warnings)
1579
        add_nondata_subelements(rule, "requires", "id", self.requires)
1580
        add_nondata_subelements(rule, "conflicts", "id", self.conflicts)
1581
1582
        for cpe_platform_name in self.cpe_platform_names:
1583
            platform_el = ET.SubElement(rule, "platform")
1584
            platform_el.set("idref", "#"+cpe_platform_name)
1585
1586
        return rule
1587
1588 2
    def to_file(self, file_name):
1589
        root = self.to_xml_element()
1590
        tree = ET.ElementTree(root)
1591
        tree.write(file_name)
1592
1593 2
    def to_ocil(self):
1594
        if not self.ocil and not self.ocil_clause:
1595
            raise ValueError("Rule {0} doesn't have OCIL".format(self.id_))
1596
        # Create <questionnaire> for the rule
1597
        questionnaire = ET.Element("questionnaire", id=self.id_ + "_ocil")
1598
        title = ET.SubElement(questionnaire, "title")
1599
        title.text = self.title
1600
        actions = ET.SubElement(questionnaire, "actions")
1601
        test_action_ref = ET.SubElement(actions, "test_action_ref")
1602
        test_action_ref.text = self.id_ + "_action"
1603
        # Create <boolean_question_test_action> for the rule
1604
        action = ET.Element(
1605
            "boolean_question_test_action",
1606
            id=self.id_ + "_action",
1607
            question_ref=self.id_ + "_question")
1608
        when_true = ET.SubElement(action, "when_true")
1609
        result = ET.SubElement(when_true, "result")
1610
        result.text = "PASS"
1611
        when_true = ET.SubElement(action, "when_false")
1612
        result = ET.SubElement(when_true, "result")
1613
        result.text = "FAIL"
1614
        # Create <boolean_question>
1615
        boolean_question = ET.Element(
1616
            "boolean_question", id=self.id_ + "_question")
1617
        # TODO: The contents of <question_text> element used to be broken in
1618
        # the legacy XSLT implementation. The following code contains hacks
1619
        # to get the same results as in the legacy XSLT implementation.
1620
        # This enabled us a smooth transition to new OCIL generator
1621
        # without a need to mass-edit rule YAML files.
1622
        # We need to solve:
1623
        # TODO: using variables (aka XCCDF Values) in OCIL content
1624
        # TODO: using HTML formating tags eg. <pre> in OCIL content
1625
        #
1626
        # The "ocil" key in compiled rules contains HTML and XML elements
1627
        # but OCIL question texts shouldn't contain HTML or XML elements,
1628
        # therefore removing them.
1629
        if self.ocil is not None:
1630
            ocil_without_tags = re.sub(r"</?[^>]+>", "", self.ocil)
1631
        else:
1632
            ocil_without_tags = ""
1633
        # The "ocil" key in compiled rules contains XML entities which would
1634
        # be escaped by ET.Subelement() so we need to use add_sub_element()
1635
        # instead because we don't want to escape them.
1636
        question_text = add_sub_element(
1637
            boolean_question, "question_text", ocil_without_tags)
1638
        # The "ocil_clause" key in compiled rules also contains HTML and XML
1639
        # elements but unlike the "ocil" we want to escape the '<' and '>'
1640
        # characters.
1641
        # The empty ocil_clause causing broken question is in line with the
1642
        # legacy XSLT implementation.
1643
        ocil_clause = self.ocil_clause if self.ocil_clause else ""
1644
        question_text.text = (
1645
            "{0}\n      Is it the case that {1}?\n      ".format(
1646
                question_text.text if question_text.text is not None else "",
1647
                ocil_clause))
1648
        return (questionnaire, action, boolean_question)
1649
1650 2
    def __hash__(self):
1651
        """ Controls are meant to be unique, so using the
1652
        ID should suffice"""
1653
        return hash(self.id_)
1654
1655 2
    def __eq__(self, other):
1656
        return isinstance(other, self.__class__) and self.id_ == other.id_
1657
1658 2
    def __ne__(self, other):
1659
        return not self != other
1660
1661 2
    def __lt__(self, other):
1662
        return self.id_ < other.id_
1663
1664 2
    def __str__(self):
1665
        return self.id_
1666
1667
1668 2
class DirectoryLoader(object):
1669 2
    def __init__(self, profiles_dir, env_yaml):
1670
        self.benchmark_file = None
1671
        self.group_file = None
1672
        self.loaded_group = None
1673
        self.rule_files = []
1674
        self.value_files = []
1675
        self.subdirectories = []
1676
1677
        self.all_values = dict()
1678
        self.all_rules = dict()
1679
        self.all_groups = dict()
1680
1681
        self.profiles_dir = profiles_dir
1682
        self.env_yaml = env_yaml
1683
        self.product = env_yaml["product"]
1684
1685
        self.parent_group = None
1686
1687 2
    def _collect_items_to_load(self, guide_directory):
1688
        for dir_item in sorted(os.listdir(guide_directory)):
1689
            dir_item_path = os.path.join(guide_directory, dir_item)
1690
            _, extension = os.path.splitext(dir_item)
1691
1692
            if extension == '.var':
1693
                self.value_files.append(dir_item_path)
1694
            elif dir_item == "benchmark.yml":
1695
                if self.benchmark_file:
1696
                    raise ValueError("Multiple benchmarks in one directory")
1697
                self.benchmark_file = dir_item_path
1698
            elif dir_item == "group.yml":
1699
                if self.group_file:
1700
                    raise ValueError("Multiple groups in one directory")
1701
                self.group_file = dir_item_path
1702
            elif extension == '.rule':
1703
                self.rule_files.append(dir_item_path)
1704
            elif is_rule_dir(dir_item_path):
1705
                self.rule_files.append(get_rule_dir_yaml(dir_item_path))
1706
            elif dir_item != "tests":
1707
                if os.path.isdir(dir_item_path):
1708
                    self.subdirectories.append(dir_item_path)
1709
                else:
1710
                    sys.stderr.write(
1711
                        "Encountered file '%s' while recursing, extension '%s' "
1712
                        "is unknown. Skipping..\n"
1713
                        % (dir_item, extension)
1714
                    )
1715
1716 2
    def load_benchmark_or_group(self, guide_directory):
1717
        """
1718
        Loads a given benchmark or group from the specified benchmark_file or
1719
        group_file, in the context of guide_directory, profiles_dir and env_yaml.
1720
1721
        Returns the loaded group or benchmark.
1722
        """
1723
        group = None
1724
        if self.group_file and self.benchmark_file:
1725
            raise ValueError("A .benchmark file and a .group file were found in "
1726
                             "the same directory '%s'" % (guide_directory))
1727
1728
        # we treat benchmark as a special form of group in the following code
1729
        if self.benchmark_file:
1730
            group = Benchmark.from_yaml(
1731
                self.benchmark_file, self.env_yaml, 'product-name'
1732
            )
1733
            if self.profiles_dir:
1734
                group.add_profiles_from_dir(self.profiles_dir, self.env_yaml)
1735
1736
        if self.group_file:
1737
            group = Group.from_yaml(self.group_file, self.env_yaml)
1738
            self.all_groups[group.id_] = group
1739
1740
        return group
1741
1742 2
    def _load_group_process_and_recurse(self, guide_directory):
1743
        self.loaded_group = self.load_benchmark_or_group(guide_directory)
1744
1745
        if self.loaded_group:
1746
1747
            if self.parent_group:
1748
                self.parent_group.add_group(self.loaded_group, env_yaml=self.env_yaml)
1749
1750
            self._process_values()
1751
            self._recurse_into_subdirs()
1752
            self._process_rules()
1753
1754 2
    def process_directory_tree(self, start_dir, extra_group_dirs=None):
1755
        self._collect_items_to_load(start_dir)
1756
        if extra_group_dirs:
1757
            self.subdirectories += extra_group_dirs
1758
        self._load_group_process_and_recurse(start_dir)
1759
1760 2
    def process_directory_trees(self, directories):
1761
        start_dir = directories[0]
1762
        extra_group_dirs = directories[1:]
1763
        return self.process_directory_tree(start_dir, extra_group_dirs)
1764
1765 2
    def _recurse_into_subdirs(self):
1766
        for subdir in self.subdirectories:
1767
            loader = self._get_new_loader()
1768
            loader.parent_group = self.loaded_group
1769
            loader.process_directory_tree(subdir)
1770
            self.all_values.update(loader.all_values)
1771
            self.all_rules.update(loader.all_rules)
1772
            self.all_groups.update(loader.all_groups)
1773
1774 2
    def _get_new_loader(self):
1775
        raise NotImplementedError()
1776
1777 2
    def _process_values(self):
1778
        raise NotImplementedError()
1779
1780 2
    def _process_rules(self):
1781
        raise NotImplementedError()
1782
1783 2
    def save_all_entities(self, base_dir):
1784
        destdir = os.path.join(base_dir, "rules")
1785
        mkdir_p(destdir)
1786
        if self.all_rules:
1787
            self.save_entities(self.all_rules.values(), destdir)
1788
1789
        destdir = os.path.join(base_dir, "groups")
1790
        mkdir_p(destdir)
1791
        if self.all_groups:
1792
            self.save_entities(self.all_groups.values(), destdir)
1793
1794
        destdir = os.path.join(base_dir, "values")
1795
        mkdir_p(destdir)
1796
        if self.all_values:
1797
            self.save_entities(self.all_values.values(), destdir)
1798
1799 2
    def save_entities(self, entities, destdir):
1800
        if not entities:
1801
            return
1802
        for entity in entities:
1803
            basename = entity.id_ + ".yml"
1804
            dest_filename = os.path.join(destdir, basename)
1805
            entity.dump_yaml(dest_filename)
1806
1807
1808 2
class BuildLoader(DirectoryLoader):
1809 2
    def __init__(self, profiles_dir, env_yaml,
1810
                 sce_metadata_path=None):
1811
        super(BuildLoader, self).__init__(profiles_dir, env_yaml)
1812
1813
        self.sce_metadata = None
1814
        if sce_metadata_path and os.path.getsize(sce_metadata_path):
1815
            self.sce_metadata = json.load(open(sce_metadata_path, 'r'))
1816
1817 2
    def _process_values(self):
1818
        for value_yaml in self.value_files:
1819
            value = Value.from_yaml(value_yaml, self.env_yaml)
1820
            self.all_values[value.id_] = value
1821
            self.loaded_group.add_value(value)
1822
1823 2
    def _process_rules(self):
1824
        for rule_yaml in self.rule_files:
1825
            try:
1826
                rule = Rule.from_yaml(rule_yaml, self.env_yaml, self.sce_metadata)
1827
            except DocumentationNotComplete:
1828
                # Happens on non-debug build when a rule is "documentation-incomplete"
1829
                continue
1830
            prodtypes = parse_prodtype(rule.prodtype)
1831
            if "all" not in prodtypes and self.product not in prodtypes:
1832
                continue
1833
            self.all_rules[rule.id_] = rule
1834
            self.loaded_group.add_rule(rule, env_yaml=self.env_yaml)
1835
1836
            if self.loaded_group.platforms:
1837
                rule.inherited_platforms += self.loaded_group.platforms
1838
1839
            rule.normalize(self.env_yaml["product"])
1840
1841 2
    def _get_new_loader(self):
1842
        loader = BuildLoader(
1843
            self.profiles_dir, self.env_yaml)
1844
        # Do it this way so we only have to parse the SCE metadata once.
1845
        loader.sce_metadata = self.sce_metadata
1846
        return loader
1847
1848 2
    def export_group_to_file(self, filename):
1849
        return self.loaded_group.to_file(filename)
1850
1851
1852 2
class LinearLoader(object):
1853 2
    def __init__(self, env_yaml, resolved_path):
1854
        self.resolved_rules_dir = os.path.join(resolved_path, "rules")
1855
        self.rules = dict()
1856
1857
        self.resolved_profiles_dir = os.path.join(resolved_path, "profiles")
1858
        self.profiles = dict()
1859
1860
        self.resolved_groups_dir = os.path.join(resolved_path, "groups")
1861
        self.groups = dict()
1862
1863
        self.resolved_values_dir = os.path.join(resolved_path, "values")
1864
        self.values = dict()
1865
1866
        self.benchmark = None
1867
        self.env_yaml = env_yaml
1868
1869 2
    def find_first_groups_ids(self, start_dir):
1870
        group_files = glob.glob(os.path.join(start_dir, "*", "group.yml"))
1871
        group_ids = [fname.split(os.path.sep)[-2] for fname in group_files]
1872
        return group_ids
1873
1874 2
    def load_entities_by_id(self, filenames, destination, cls):
1875
        for fname in filenames:
1876
            entity = cls.from_yaml(fname, self.env_yaml)
1877
            destination[entity.id_] = entity
1878
1879 2
    def load_benchmark(self, directory):
1880
        self.benchmark = Benchmark.from_yaml(
1881
            os.path.join(directory, "benchmark.yml"), self.env_yaml, "product-name")
1882
1883
        self.benchmark.add_profiles_from_dir(self.resolved_profiles_dir, self.env_yaml)
1884
1885
        benchmark_first_groups = self.find_first_groups_ids(directory)
1886
        for gid in benchmark_first_groups:
1887
            self.benchmark.add_group(self.groups[gid], self.env_yaml)
1888
1889 2
    def load_compiled_content(self):
1890
        filenames = glob.glob(os.path.join(self.resolved_rules_dir, "*.yml"))
1891
        self.load_entities_by_id(filenames, self.rules, Rule)
1892
1893
        filenames = glob.glob(os.path.join(self.resolved_groups_dir, "*.yml"))
1894
        self.load_entities_by_id(filenames, self.groups, Group)
1895
1896
        filenames = glob.glob(os.path.join(self.resolved_profiles_dir, "*.yml"))
1897
        self.load_entities_by_id(filenames, self.profiles, Profile)
1898
1899
        filenames = glob.glob(os.path.join(self.resolved_values_dir, "*.yml"))
1900
        self.load_entities_by_id(filenames, self.values, Value)
1901
1902
        for g in self.groups.values():
1903
            g.load_entities(self.rules, self.values, self.groups)
1904
1905 2
    def export_benchmark_to_file(self, filename):
1906
        return self.benchmark.to_file(filename)
1907
1908 2
    def export_ocil_to_file(self, filename):
1909
        root = ET.Element('ocil')
1910
        root.set('xmlns:xsi', xsi_namespace)
1911
        root.set("xmlns", ocil_namespace)
1912
        root.set("xmlns:xhtml", xhtml_namespace)
1913
        tree = ET.ElementTree(root)
1914
        generator = ET.SubElement(root, "generator")
1915
        product_name = ET.SubElement(generator, "product_name")
1916
        product_name.text = "build_shorthand.py from SCAP Security Guide"
1917
        product_version = ET.SubElement(generator, "product_version")
1918
        product_version.text = "ssg: " + self.env_yaml["ssg_version_str"]
1919
        schema_version = ET.SubElement(generator, "schema_version")
1920
        schema_version.text = "2.0"
1921
        timestamp_el = ET.SubElement(generator, "timestamp")
1922
        timestamp_el.text = timestamp
1923
        questionnaires = ET.SubElement(root, "questionnaires")
1924
        test_actions = ET.SubElement(root, "test_actions")
1925
        questions = ET.SubElement(root, "questions")
1926
        for rule in self.rules.values():
1927
            if not rule.ocil and not rule.ocil_clause:
1928
                continue
1929
            questionnaire, action, boolean_question = rule.to_ocil()
1930
            questionnaires.append(questionnaire)
1931
            test_actions.append(action)
1932
            questions.append(boolean_question)
1933
        tree.write(filename)
1934