Test Failed
Push — master ( 0ed6cc...8a1c32 )
by Matěj
01:17 queued 18s
created

ssg.build_yaml.BuildLoader._get_new_loader()   A

Complexity

Conditions 1

Size

Total Lines 6
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.4218

Importance

Changes 0
Metric Value
cc 1
eloc 5
nop 1
dl 0
loc 6
ccs 1
cts 4
cp 0.25
crap 1.4218
rs 10
c 0
b 0
f 0
1 2
from __future__ import absolute_import
2 2
from __future__ import print_function
3
4 2
from collections import defaultdict
5 2
from copy import deepcopy
6 2
import datetime
7 2
import json
8 2
import os
9 2
import os.path
10 2
import re
11 2
import sys
12 2
from xml.sax.saxutils import escape
13
14 2
import yaml
15
16 2
from .build_cpe import CPEDoesNotExist
17 2
from .constants import XCCDF_REFINABLE_PROPERTIES, SCE_SYSTEM
18 2
from .rules import get_rule_dir_id, get_rule_dir_yaml, is_rule_dir
19 2
from .rule_yaml import parse_prodtype
20
21 2
from .cce import is_cce_format_valid, is_cce_value_valid
22 2
from .yaml import DocumentationNotComplete, open_and_expand, open_and_macro_expand
23 2
from .utils import required_key, mkdir_p
24
25 2
from .xml import ElementTree as ET
26 2
from .shims import unicode_func
27
28
29 2
def add_sub_element(parent, tag, data):
30
    """
31
    Creates a new child element under parent with tag tag, and sets
32
    data as the content under the tag. In particular, data is a string
33
    to be parsed as an XML tree, allowing sub-elements of children to be
34
    added.
35
36
    If data should not be parsed as an XML tree, either escape the contents
37
    before passing into this function, or use ElementTree.SubElement().
38
39
    Returns the newly created subelement of type tag.
40
    """
41
    # This is used because our YAML data contain XML and XHTML elements
42
    # ET.SubElement() escapes the < > characters by &lt; and &gt;
43
    # and therefore it does not add child elements
44
    # we need to do a hack instead
45
    # TODO: Remove this function after we move to Markdown everywhere in SSG
46
    ustr = unicode_func("<{0}>{1}</{0}>").format(tag, data)
47
48
    try:
49
        element = ET.fromstring(ustr.encode("utf-8"))
50
    except Exception:
51
        msg = ("Error adding subelement to an element '{0}' from string: '{1}'"
52
               .format(parent.tag, ustr))
53
        raise RuntimeError(msg)
54
55
    parent.append(element)
56
    return element
57
58
59 2
def reorder_according_to_ordering(unordered, ordering, regex=None):
60 2
    ordered = []
61 2
    if regex is None:
62 2
        regex = "|".join(["({0})".format(item) for item in ordering])
63 2
    regex = re.compile(regex)
64
65 2
    items_to_order = list(filter(regex.match, unordered))
66 2
    unordered = set(unordered)
67
68 2
    for priority_type in ordering:
69 2
        for item in items_to_order:
70 2
            if priority_type in item and item in unordered:
71 2
                ordered.append(item)
72 2
                unordered.remove(item)
73 2
    ordered.extend(sorted(unordered))
74 2
    return ordered
75
76
77 2
def add_warning_elements(element, warnings):
78
    # The use of [{dict}, {dict}] in warnings is to handle the following
79
    # scenario where multiple warnings have the same category which is
80
    # valid in SCAP and our content:
81
    #
82
    # warnings:
83
    #     - general: Some general warning
84
    #     - general: Some other general warning
85
    #     - general: |-
86
    #         Some really long multiline general warning
87
    #
88
    # Each of the {dict} should have only one key/value pair.
89
    for warning_dict in warnings:
90
        warning = add_sub_element(element, "warning", list(warning_dict.values())[0])
91
        warning.set("category", list(warning_dict.keys())[0])
92
93
94 2
def add_nondata_subelements(element, subelement, attribute, attr_data):
95
    """Add multiple iterations of a sublement that contains an attribute but no data
96
       For example, <requires id="my_required_id"/>"""
97
    for data in attr_data:
98
        req = ET.SubElement(element, subelement)
99
        req.set(attribute, data)
100
101
102 2
class Profile(object):
103
    """Represents XCCDF profile
104
    """
105
106 2
    def __init__(self, id_):
107 2
        self.id_ = id_
108 2
        self.title = ""
109 2
        self.description = ""
110 2
        self.extends = None
111 2
        self.selected = []
112 2
        self.unselected = []
113 2
        self.variables = dict()
114 2
        self.refine_rules = defaultdict(list)
115 2
        self.metadata = None
116 2
        self.reference = None
117
        # self.platforms is used further in the build system
118
        # self.platform is merged into self.platforms
119
        # it is here for backward compatibility
120 2
        self.platforms = set()
121 2
        self.cpe_names = set()
122 2
        self.platform = None
123
124
125 2
    def read_yaml_contents(self, yaml_contents):
126 2
        self.title = required_key(yaml_contents, "title")
127 2
        del yaml_contents["title"]
128 2
        self.description = required_key(yaml_contents, "description")
129 2
        del yaml_contents["description"]
130 2
        self.extends = yaml_contents.pop("extends", None)
131 2
        selection_entries = required_key(yaml_contents, "selections")
132 2
        if selection_entries:
133 2
            self._parse_selections(selection_entries)
134 2
        del yaml_contents["selections"]
135 2
        self.platforms = yaml_contents.pop("platforms", set())
136 2
        self.platform = yaml_contents.pop("platform", None)
137
138 2
    @classmethod
139 2
    def from_yaml(cls, yaml_file, env_yaml=None):
140 2
        yaml_contents = open_and_expand(yaml_file, env_yaml)
141 2
        if yaml_contents is None:
142
            return None
143
144 2
        basename, _ = os.path.splitext(os.path.basename(yaml_file))
145
146 2
        profile = cls(basename)
147 2
        profile.read_yaml_contents(yaml_contents)
148
149 2
        profile.reference = yaml_contents.pop("reference", None)
150
        # ensure that content of profile.platform is in profile.platforms as
151
        # well
152 2
        if profile.platform is not None:
153
            profile.platforms.add(profile.platform)
154
155 2
        if env_yaml:
156
            for platform in profile.platforms:
157
                try:
158
                    profile.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform))
159
                except CPEDoesNotExist:
160
                    print("Unsupported platform '%s' in profile '%s'." % (platform, profile.id_))
161
                    raise
162
163
        # At the moment, metadata is not used to build content
164 2
        if "metadata" in yaml_contents:
165
            del yaml_contents["metadata"]
166
167 2
        if yaml_contents:
168
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
169
                               % (yaml_file, yaml_contents))
170
171 2
        return profile
172
173 2
    def dump_yaml(self, file_name, documentation_complete=True):
174
        to_dump = {}
175
        to_dump["documentation_complete"] = documentation_complete
176
        to_dump["title"] = self.title
177
        to_dump["description"] = self.description
178
        to_dump["reference"] = self.reference
179
        if self.metadata is not None:
180
            to_dump["metadata"] = self.metadata
181
182
        if self.extends is not None:
183
            to_dump["extends"] = self.extends
184
185
        if self.platforms:
186
            to_dump["platforms"] = self.platforms
187
188
        selections = []
189
        for item in self.selected:
190
            selections.append(item)
191
        for item in self.unselected:
192
            selections.append("!"+item)
193
        for varname in self.variables.keys():
194
            selections.append(varname+"="+self.variables.get(varname))
195
        for rule, refinements in self.refine_rules.items():
196
            for prop, val in refinements:
197
                selections.append("{rule}.{property}={value}"
198
                                  .format(rule=rule, property=prop, value=val))
199
        to_dump["selections"] = selections
200
        with open(file_name, "w+") as f:
201
            yaml.dump(to_dump, f, indent=4)
202
203 2
    def _parse_selections(self, entries):
204 2
        for item in entries:
205 2
            self.apply_selection(item)
206
207 2
    def apply_selection(self, item):
208 2
        if "." in item:
209
            rule, refinement = item.split(".", 1)
210
            property_, value = refinement.split("=", 1)
211
            if property_ not in XCCDF_REFINABLE_PROPERTIES:
212
                msg = ("Property '{property_}' cannot be refined. "
213
                       "Rule properties that can be refined are {refinables}. "
214
                       "Fix refinement '{rule_id}.{property_}={value}' in profile '{profile}'."
215
                       .format(property_=property_, refinables=XCCDF_REFINABLE_PROPERTIES,
216
                               rule_id=rule, value=value, profile=self.id_)
217
                       )
218
                raise ValueError(msg)
219
            self.refine_rules[rule].append((property_, value))
220 2
        elif "=" in item:
221 2
            varname, value = item.split("=", 1)
222 2
            self.variables[varname] = value
223 2
        elif item.startswith("!"):
224
            self.unselected.append(item[1:])
225
        else:
226 2
            self.selected.append(item)
227
228 2
    def to_xml_element(self):
229
        element = ET.Element('Profile')
230
        element.set("id", self.id_)
231
        if self.extends:
232
            element.set("extends", self.extends)
233
        title = add_sub_element(element, "title", self.title)
234
        title.set("override", "true")
235
        desc = add_sub_element(element, "description", self.description)
236
        desc.set("override", "true")
237
238
        if self.reference:
239
            add_sub_element(element, "reference", escape(self.reference))
240
241
        for cpe_name in self.cpe_names:
242
            plat = ET.SubElement(element, "platform")
243
            plat.set("idref", cpe_name)
244
245
        for selection in self.selected:
246
            select = ET.Element("select")
247
            select.set("idref", selection)
248
            select.set("selected", "true")
249
            element.append(select)
250
251
        for selection in self.unselected:
252
            unselect = ET.Element("select")
253
            unselect.set("idref", selection)
254
            unselect.set("selected", "false")
255
            element.append(unselect)
256
257
        for value_id, selector in self.variables.items():
258
            refine_value = ET.Element("refine-value")
259
            refine_value.set("idref", value_id)
260
            refine_value.set("selector", selector)
261
            element.append(refine_value)
262
263
        for refined_rule, refinement_list in self.refine_rules.items():
264
            refine_rule = ET.Element("refine-rule")
265
            refine_rule.set("idref", refined_rule)
266
            for refinement in refinement_list:
267
                refine_rule.set(refinement[0], refinement[1])
268
            element.append(refine_rule)
269
270
        return element
271
272 2
    def get_rule_selectors(self):
273 2
        return list(self.selected + self.unselected)
274
275 2
    def get_variable_selectors(self):
276 2
        return self.variables
277
278 2
    def validate_refine_rules(self, rules):
279
        existing_rule_ids = [r.id_ for r in rules]
280
        for refine_rule, refinement_list in self.refine_rules.items():
281
            # Take first refinement to ilustrate where the error is
282
            # all refinements in list are invalid, so it doesn't really matter
283
            a_refinement = refinement_list[0]
284
285
            if refine_rule not in existing_rule_ids:
286
                msg = (
287
                    "You are trying to refine a rule that doesn't exist. "
288
                    "Rule '{rule_id}' was not found in the benchmark. "
289
                    "Please check all rule refinements for rule: '{rule_id}', for example: "
290
                    "- {rule_id}.{property_}={value}' in profile {profile_id}."
291
                    .format(rule_id=refine_rule, profile_id=self.id_,
292
                            property_=a_refinement[0], value=a_refinement[1])
293
                    )
294
                raise ValueError(msg)
295
296
            if refine_rule not in self.get_rule_selectors():
297
                msg = ("- {rule_id}.{property_}={value}' in profile '{profile_id}' is refining "
298
                       "a rule that is not selected by it. The refinement will not have any "
299
                       "noticeable effect. Either select the rule or remove the rule refinement."
300
                       .format(rule_id=refine_rule, property_=a_refinement[0],
301
                               value=a_refinement[1], profile_id=self.id_)
302
                       )
303
                raise ValueError(msg)
304
305 2
    def validate_variables(self, variables):
306
        variables_by_id = dict()
307
        for var in variables:
308
            variables_by_id[var.id_] = var
309
310
        for var_id, our_val in self.variables.items():
311
            if var_id not in variables_by_id:
312
                all_vars_list = [" - %s" % v for v in variables_by_id.keys()]
313
                msg = (
314
                    "Value '{var_id}' in profile '{profile_name}' is not known. "
315
                    "We know only variables:\n{var_names}"
316
                    .format(
317
                        var_id=var_id, profile_name=self.id_,
318
                        var_names="\n".join(sorted(all_vars_list)))
319
                )
320
                raise ValueError(msg)
321
322
            allowed_selectors = [str(s) for s in variables_by_id[var_id].options.keys()]
323
            if our_val not in allowed_selectors:
324
                msg = (
325
                    "Value '{var_id}' in profile '{profile_name}' "
326
                    "uses the selector '{our_val}'. "
327
                    "This is not possible, as only selectors {all_selectors} are available. "
328
                    "Either change the selector used in the profile, or "
329
                    "add the selector-value pair to the variable definition."
330
                    .format(
331
                        var_id=var_id, profile_name=self.id_, our_val=our_val,
332
                        all_selectors=allowed_selectors,
333
                    )
334
                )
335
                raise ValueError(msg)
336
337 2
    def validate_rules(self, rules, groups):
338
        existing_rule_ids = [r.id_ for r in rules]
339
        rule_selectors = self.get_rule_selectors()
340
        for id_ in rule_selectors:
341
            if id_ in groups:
342
                msg = (
343
                    "You have selected a group '{group_id}' instead of a "
344
                    "rule. Groups have no effect in the profile and are not "
345
                    "allowed to be selected. Please remove '{group_id}' "
346
                    "from profile '{profile_id}' before proceeding."
347
                    .format(group_id=id_, profile_id=self.id_)
348
                )
349
                raise ValueError(msg)
350
            if id_ not in existing_rule_ids:
351
                msg = (
352
                    "Rule '{rule_id}' was not found in the benchmark. Please "
353
                    "remove rule '{rule_id}' from profile '{profile_id}' "
354
                    "before proceeding."
355
                    .format(rule_id=id_, profile_id=self.id_)
356
                )
357
                raise ValueError(msg)
358
359 2
    def __sub__(self, other):
360
        profile = Profile(self.id_)
361
        profile.title = self.title
362
        profile.description = self.description
363
        profile.extends = self.extends
364
        profile.platforms = self.platforms
365
        profile.platform = self.platform
366
        profile.selected = list(set(self.selected) - set(other.selected))
367
        profile.selected.sort()
368
        profile.unselected = list(set(self.unselected) - set(other.unselected))
369
        profile.variables = dict ((k, v) for (k, v) in self.variables.items()
370
                             if k not in other.variables or v != other.variables[k])
371
        return profile
372
373
374 2
class ResolvableProfile(Profile):
375 2
    def __init__(self, * args, ** kwargs):
376
        super(ResolvableProfile, self).__init__(* args, ** kwargs)
377
        self.resolved = False
378
        self.resolved_selections = set()
379
380 2
    def _controls_ids_to_controls(self, controls_manager, policy_id, control_id_list):
381
        items = [controls_manager.get_control(policy_id, cid) for cid in control_id_list]
382
        return items
383
384 2
    def _merge_control(self, control):
385
        self.selected.extend(control.rules)
386
        for varname, value in control.variables.items():
387
            if varname not in self.variables:
388
                self.variables[varname] = value
389
390 2
    def resolve_controls(self, controls_manager):
391
        pass
392
393 2
    def extend_by(self, extended_profile):
394
        extended_selects = set(extended_profile.selected)
395
        self.resolved_selections.update(extended_selects)
396
397
        updated_variables = dict(extended_profile.variables)
398
        updated_variables.update(self.variables)
399
        self.variables = updated_variables
400
401
        extended_refinements = deepcopy(extended_profile.refine_rules)
402
        updated_refinements = self._subtract_refinements(extended_refinements)
403
        updated_refinements.update(self.refine_rules)
404
        self.refine_rules = updated_refinements
405
406 2
    def resolve(self, all_profiles, controls_manager=None):
407
        if self.resolved:
408
            return
409
410
        self.resolve_controls(controls_manager)
411
412
        self.resolved_selections = set(self.selected)
413
414
        if self.extends:
415
            if self.extends not in all_profiles:
416
                msg = (
417
                    "Profile {name} extends profile {extended}, but "
418
                    "only profiles {known_profiles} are available for resolution."
419
                    .format(name=self.id_, extended=self.extends,
420
                            known_profiles=list(all_profiles.keys())))
421
                raise RuntimeError(msg)
422
            extended_profile = all_profiles[self.extends]
423
            extended_profile.resolve(all_profiles, controls_manager)
424
425
            self.extend_by(extended_profile)
426
427
        for uns in self.unselected:
428
            self.resolved_selections.discard(uns)
429
430
        self.unselected = []
431
        self.extends = None
432
433
        self.selected = sorted(self.resolved_selections)
434
435
        self.resolved = True
436
437 2
    def _subtract_refinements(self, extended_refinements):
438
        """
439
        Given a dict of rule refinements from the extended profile,
440
        "undo" every refinement prefixed with '!' in this profile.
441
        """
442
        for rule, refinements in list(self.refine_rules.items()):
443
            if rule.startswith("!"):
444
                for prop, val in refinements:
445
                    extended_refinements[rule[1:]].remove((prop, val))
446
                del self.refine_rules[rule]
447
        return extended_refinements
448
449
450 2
class ProfileWithSeparatePolicies(ResolvableProfile):
451 2
    def __init__(self, * args, ** kwargs):
452
        super(ProfileWithSeparatePolicies, self).__init__(* args, ** kwargs)
453
        self.policies = {}
454
455 2
    def read_yaml_contents(self, yaml_contents):
456
        policies = yaml_contents.pop("policies", None)
457
        if policies:
458
            self._parse_policies(policies)
459
        super(ProfileWithSeparatePolicies, self).read_yaml_contents(yaml_contents)
460
461 2
    def _parse_policies(self, policies_yaml):
462
        for item in policies_yaml:
463
            id_ = required_key(item, "id")
464
            controls_ids = required_key(item, "controls")
465
            if not isinstance(controls_ids, list):
466
                if controls_ids != "all":
467
                    msg = (
468
                        "Policy {id_} contains invalid controls list {controls}."
469
                        .format(id_=id_, controls=str(controls_ids)))
470
                    raise ValueError(msg)
471
            self.policies[id_] = controls_ids
472
473 2 View Code Duplication
    def _process_controls_ids_into_controls(self, controls_manager, policy_id, controls_ids):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
474
        controls = []
475
        for cid in controls_ids:
476
            if not cid.startswith("all"):
477
                controls.extend(
478
                    self._controls_ids_to_controls(controls_manager, policy_id, [cid]))
479
            elif ":" in cid:
480
                _, level_id = cid.split(":", 1)
481
                controls.extend(
482
                    controls_manager.get_all_controls_of_level(policy_id, level_id))
483
            else:
484
                controls.extend(controls_manager.get_all_controls(policy_id))
485
        return controls
486
487 2
    def resolve_controls(self, controls_manager):
488
        for policy_id, controls_ids in self.policies.items():
489
            controls = []
490
491
            if isinstance(controls_ids, list):
492
                controls = self._process_controls_ids_into_controls(
493
                    controls_manager, policy_id, controls_ids)
494
            elif controls_ids.startswith("all"):
495
                controls = self._process_controls_ids_into_controls(
496
                    controls_manager, policy_id, [controls_ids])
497
            else:
498
                msg = (
499
                    "Unknown policy content {content} in profile {profile_id}"
500
                    .format(content=controls_ids, profile_id=self.id_))
501
                raise ValueError(msg)
502
503
            for c in controls:
504
                self._merge_control(c)
505
506 2
    def extend_by(self, extended_profile):
507
        self.policies.update(extended_profile.policies)
508
        super(ProfileWithSeparatePolicies, self).extend_by(extended_profile)
509
510
511 2
class ProfileWithInlinePolicies(ResolvableProfile):
512 2
    def __init__(self, * args, ** kwargs):
513
        super(ProfileWithInlinePolicies, self).__init__(* args, ** kwargs)
514
        self.controls_by_policy = defaultdict(list)
515
516 2
    def apply_selection(self, item):
517
        # ":" is the delimiter for controls but not when the item is a variable
518
        if ":" in item and "=" not in item:
519
            policy_id, control_id = item.split(":", 1)
520
            self.controls_by_policy[policy_id].append(control_id)
521
        else:
522
            super(ProfileWithInlinePolicies, self).apply_selection(item)
523
524 2 View Code Duplication
    def _process_controls_ids_into_controls(self, controls_manager, policy_id, controls_ids):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
525
        controls = []
526
        for cid in controls_ids:
527
            if not cid.startswith("all"):
528
                controls.extend(
529
                    self._controls_ids_to_controls(controls_manager, policy_id, [cid]))
530
            elif ":" in cid:
531
                _, level_id = cid.split(":", 1)
532
                controls.extend(
533
                    controls_manager.get_all_controls_of_level(policy_id, level_id))
534
            else:
535
                controls.extend(
536
                    controls_manager.get_all_controls(policy_id))
537
        return controls
538
539 2
    def resolve_controls(self, controls_manager):
540
        for policy_id, controls_ids in self.controls_by_policy.items():
541
            controls = self._process_controls_ids_into_controls(
542
                controls_manager, policy_id, controls_ids)
543
544
            for c in controls:
545
                self._merge_control(c)
546
547
548 2
class Value(object):
549
    """Represents XCCDF Value
550
    """
551
552 2
    def __init__(self, id_):
553 2
        self.id_ = id_
554 2
        self.title = ""
555 2
        self.description = ""
556 2
        self.type_ = "string"
557 2
        self.operator = "equals"
558 2
        self.interactive = False
559 2
        self.options = {}
560 2
        self.warnings = []
561
562 2
    @staticmethod
563 2
    def from_yaml(yaml_file, env_yaml=None):
564 2
        yaml_contents = open_and_macro_expand(yaml_file, env_yaml)
565 2
        if yaml_contents is None:
566
            return None
567
568 2
        value_id, _ = os.path.splitext(os.path.basename(yaml_file))
569 2
        value = Value(value_id)
570 2
        value.title = required_key(yaml_contents, "title")
571 2
        del yaml_contents["title"]
572 2
        value.description = required_key(yaml_contents, "description")
573 2
        del yaml_contents["description"]
574 2
        value.type_ = required_key(yaml_contents, "type")
575 2
        del yaml_contents["type"]
576 2
        value.operator = yaml_contents.pop("operator", "equals")
577 2
        possible_operators = ["equals", "not equal", "greater than",
578
                              "less than", "greater than or equal",
579
                              "less than or equal", "pattern match"]
580
581 2
        if value.operator not in possible_operators:
582
            raise ValueError(
583
                "Found an invalid operator value '%s' in '%s'. "
584
                "Expected one of: %s"
585
                % (value.operator, yaml_file, ", ".join(possible_operators))
586
            )
587
588 2
        value.interactive = \
589
            yaml_contents.pop("interactive", "false").lower() == "true"
590
591 2
        value.options = required_key(yaml_contents, "options")
592 2
        del yaml_contents["options"]
593 2
        value.warnings = yaml_contents.pop("warnings", [])
594
595 2
        for warning_list in value.warnings:
596
            if len(warning_list) != 1:
597
                raise ValueError("Only one key/value pair should exist for each dictionary")
598
599 2
        if yaml_contents:
600
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
601
                               % (yaml_file, yaml_contents))
602
603 2
        return value
604
605 2
    def to_xml_element(self):
606
        value = ET.Element('Value')
607
        value.set('id', self.id_)
608
        value.set('type', self.type_)
609
        if self.operator != "equals":  # equals is the default
610
            value.set('operator', self.operator)
611
        if self.interactive:  # False is the default
612
            value.set('interactive', 'true')
613
        title = ET.SubElement(value, 'title')
614
        title.text = self.title
615
        add_sub_element(value, 'description', self.description)
616
        add_warning_elements(value, self.warnings)
617
618
        for selector, option in self.options.items():
619
            # do not confuse Value with big V with value with small v
620
            # value is child element of Value
621
            value_small = ET.SubElement(value, 'value')
622
            # by XCCDF spec, default value is value without selector
623
            if selector != "default":
624
                value_small.set('selector', str(selector))
625
            value_small.text = str(option)
626
627
        return value
628
629 2
    def to_file(self, file_name):
630
        root = self.to_xml_element()
631
        tree = ET.ElementTree(root)
632
        tree.write(file_name)
633
634
635 2
class Benchmark(object):
636
    """Represents XCCDF Benchmark
637
    """
638 2
    def __init__(self, id_):
639
        self.id_ = id_
640
        self.title = ""
641
        self.status = ""
642
        self.description = ""
643
        self.notice_id = ""
644
        self.notice_description = ""
645
        self.front_matter = ""
646
        self.rear_matter = ""
647
        self.cpes = []
648
        self.version = "0.1"
649
        self.profiles = []
650
        self.values = {}
651
        self.bash_remediation_fns_group = None
652
        self.groups = {}
653
        self.rules = {}
654
        self.product_cpe_names = []
655
656
        # This is required for OCIL clauses
657
        conditional_clause = Value("conditional_clause")
658
        conditional_clause.title = "A conditional clause for check statements."
659
        conditional_clause.description = conditional_clause.title
660
        conditional_clause.type_ = "string"
661
        conditional_clause.options = {"": "This is a placeholder"}
662
663
        self.add_value(conditional_clause)
664
665 2
    @classmethod
666 2
    def from_yaml(cls, yaml_file, id_, env_yaml=None):
667
        yaml_contents = open_and_macro_expand(yaml_file, env_yaml)
668
        if yaml_contents is None:
669
            return None
670
671
        benchmark = cls(id_)
672
        benchmark.title = required_key(yaml_contents, "title")
673
        del yaml_contents["title"]
674
        benchmark.status = required_key(yaml_contents, "status")
675
        del yaml_contents["status"]
676
        benchmark.description = required_key(yaml_contents, "description")
677
        del yaml_contents["description"]
678
        notice_contents = required_key(yaml_contents, "notice")
679
        benchmark.notice_id = required_key(notice_contents, "id")
680
        del notice_contents["id"]
681
        benchmark.notice_description = required_key(notice_contents,
682
                                                    "description")
683
        del notice_contents["description"]
684
        if not notice_contents:
685
            del yaml_contents["notice"]
686
687
        benchmark.front_matter = required_key(yaml_contents,
688
                                              "front-matter")
689
        del yaml_contents["front-matter"]
690
        benchmark.rear_matter = required_key(yaml_contents,
691
                                             "rear-matter")
692
        del yaml_contents["rear-matter"]
693
        benchmark.version = str(required_key(yaml_contents, "version"))
694
        del yaml_contents["version"]
695
696
        if env_yaml:
697
            benchmark.product_cpe_names = env_yaml["product_cpes"].get_product_cpe_names()
698
699
        if yaml_contents:
700
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
701
                               % (yaml_file, yaml_contents))
702
703
        return benchmark
704
705 2
    def add_profiles_from_dir(self, dir_, env_yaml):
706
        for dir_item in sorted(os.listdir(dir_)):
707
            dir_item_path = os.path.join(dir_, dir_item)
708
            if not os.path.isfile(dir_item_path):
709
                continue
710
711
            _, ext = os.path.splitext(os.path.basename(dir_item_path))
712
            if ext != '.profile':
713
                sys.stderr.write(
714
                    "Encountered file '%s' while looking for profiles, "
715
                    "extension '%s' is unknown. Skipping..\n"
716
                    % (dir_item, ext)
717
                )
718
                continue
719
720
            try:
721
                new_profile = ProfileWithInlinePolicies.from_yaml(dir_item_path, env_yaml)
722
            except DocumentationNotComplete:
723
                continue
724
            except Exception as exc:
725
                msg = ("Error building profile from '{fname}': '{error}'"
726
                       .format(fname=dir_item_path, error=str(exc)))
727
                raise RuntimeError(msg)
728
            if new_profile is None:
729
                continue
730
731
            self.profiles.append(new_profile)
732
733 2
    def add_bash_remediation_fns_from_file(self, file_):
734
        if not file_:
735
            # bash-remediation-functions.xml doens't exist
736
            return
737
738
        tree = ET.parse(file_)
739
        self.bash_remediation_fns_group = tree.getroot()
740
741 2
    def to_xml_element(self):
742
        root = ET.Element('Benchmark')
743
        root.set('xmlns:xsi', 'http://www.w3.org/2001/XMLSchema-instance')
744
        root.set('xmlns:xhtml', 'http://www.w3.org/1999/xhtml')
745
        root.set('xmlns:dc', 'http://purl.org/dc/elements/1.1/')
746
        root.set('id', 'product-name')
747
        root.set('xsi:schemaLocation',
748
                 'http://checklists.nist.gov/xccdf/1.1 xccdf-1.1.4.xsd')
749
        root.set('style', 'SCAP_1.1')
750
        root.set('resolved', 'false')
751
        root.set('xml:lang', 'en-US')
752
        status = ET.SubElement(root, 'status')
753
        status.set('date', datetime.date.today().strftime("%Y-%m-%d"))
754
        status.text = self.status
755
        add_sub_element(root, "title", self.title)
756
        add_sub_element(root, "description", self.description)
757
        notice = add_sub_element(root, "notice", self.notice_description)
758
        notice.set('id', self.notice_id)
759
        add_sub_element(root, "front-matter", self.front_matter)
760
        add_sub_element(root, "rear-matter", self.rear_matter)
761
762
        # The Benchmark applicability is determined by the CPEs
763
        # defined in the product.yml
764
        for cpe_name in self.product_cpe_names:
765
            plat = ET.SubElement(root, "platform")
766
            plat.set("idref", cpe_name)
767
768
        version = ET.SubElement(root, 'version')
769
        version.text = self.version
770
        ET.SubElement(root, "metadata")
771
772
        for profile in self.profiles:
773
            root.append(profile.to_xml_element())
774
775
        for value in self.values.values():
776
            root.append(value.to_xml_element())
777
        if self.bash_remediation_fns_group is not None:
778
            root.append(self.bash_remediation_fns_group)
779
780
        groups_in_bench = list(self.groups.keys())
781
        priority_order = ["system", "services"]
782
        groups_in_bench = reorder_according_to_ordering(groups_in_bench, priority_order)
783
784
        # Make system group the first, followed by services group
785
        for group_id in groups_in_bench:
786
            group = self.groups.get(group_id)
787
            # Products using application benchmark don't have system or services group
788
            if group is not None:
789
                root.append(group.to_xml_element())
790
791
        for rule in self.rules.values():
792
            root.append(rule.to_xml_element())
793
794
        return root
795
796 2
    def to_file(self, file_name, ):
797
        root = self.to_xml_element()
798
        tree = ET.ElementTree(root)
799
        tree.write(file_name)
800
801 2
    def add_value(self, value):
802
        if value is None:
803
            return
804
        self.values[value.id_] = value
805
806
    # The benchmark is also considered a group, so this function signature needs to match
807
    # Group()'s add_group()
808 2
    def add_group(self, group, env_yaml=None):
809
        if group is None:
810
            return
811
        self.groups[group.id_] = group
812
813 2
    def add_rule(self, rule):
814
        if rule is None:
815
            return
816
        self.rules[rule.id_] = rule
817
818 2
    def to_xccdf(self):
819
        """We can easily extend this script to generate a valid XCCDF instead
820
        of SSG SHORTHAND.
821
        """
822
        raise NotImplementedError
823
824 2
    def __str__(self):
825
        return self.id_
826
827
828 2
class Group(object):
829
    """Represents XCCDF Group
830
    """
831 2
    ATTRIBUTES_TO_PASS_ON = (
832
        "platforms",
833
    )
834
835 2
    def __init__(self, id_):
836
        self.id_ = id_
837
        self.prodtype = "all"
838
        self.title = ""
839
        self.description = ""
840
        self.warnings = []
841
        self.requires = []
842
        self.conflicts = []
843
        self.values = {}
844
        self.groups = {}
845
        self.rules = {}
846
        # self.platforms is used further in the build system
847
        # self.platform is merged into self.platforms
848
        # it is here for backward compatibility
849
        self.platforms = set()
850
        self.cpe_names = set()
851
        self.platform = None
852
853 2
    @classmethod
854 2
    def from_yaml(cls, yaml_file, env_yaml=None):
855
        yaml_contents = open_and_macro_expand(yaml_file, env_yaml)
856
        if yaml_contents is None:
857
            return None
858
859
        group_id = os.path.basename(os.path.dirname(yaml_file))
860
        group = cls(group_id)
861
        group.prodtype = yaml_contents.pop("prodtype", "all")
862
        group.title = required_key(yaml_contents, "title")
863
        del yaml_contents["title"]
864
        group.description = required_key(yaml_contents, "description")
865
        del yaml_contents["description"]
866
        group.warnings = yaml_contents.pop("warnings", [])
867
        group.conflicts = yaml_contents.pop("conflicts", [])
868
        group.requires = yaml_contents.pop("requires", [])
869
        group.platform = yaml_contents.pop("platform", None)
870
        group.platforms = yaml_contents.pop("platforms", set())
871
        # ensure that content of group.platform is in group.platforms as
872
        # well
873
        if group.platform is not None:
874
            group.platforms.add(group.platform)
875
876
        if env_yaml:
877
            for platform in group.platforms:
878
                try:
879
                    group.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform))
880
                except CPEDoesNotExist:
881
                    print("Unsupported platform '%s' in group '%s'." % (platform, group.id_))
882
                    raise
883
884
        for warning_list in group.warnings:
885
            if len(warning_list) != 1:
886
                raise ValueError("Only one key/value pair should exist for each dictionary")
887
888
        if yaml_contents:
889
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
890
                               % (yaml_file, yaml_contents))
891
        group.validate_prodtype(yaml_file)
892
        return group
893
894 2
    def validate_prodtype(self, yaml_file):
895
        for ptype in self.prodtype.split(","):
896
            if ptype.strip() != ptype:
897
                msg = (
898
                    "Comma-separated '{prodtype}' prodtype "
899
                    "in {yaml_file} contains whitespace."
900
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
901
                raise ValueError(msg)
902
903 2
    def to_xml_element(self):
904
        group = ET.Element('Group')
905
        group.set('id', self.id_)
906
        if self.prodtype != "all":
907
            group.set("prodtype", self.prodtype)
908
        title = ET.SubElement(group, 'title')
909
        title.text = self.title
910
        add_sub_element(group, 'description', self.description)
911
        add_warning_elements(group, self.warnings)
912
        add_nondata_subelements(group, "requires", "id", self.requires)
913
        add_nondata_subelements(group, "conflicts", "id", self.conflicts)
914
915
        for cpe_name in self.cpe_names:
916
            platform_el = ET.SubElement(group, "platform")
917
            platform_el.set("idref", cpe_name)
918
919
        for _value in self.values.values():
920
            group.append(_value.to_xml_element())
921
922
        # Rules that install or remove packages affect remediation
923
        # of other rules.
924
        # When packages installed/removed rules come first:
925
        # The Rules are ordered in more logical way, and
926
        # remediation order is natural, first the package is installed, then configured.
927
        rules_in_group = list(self.rules.keys())
928
        regex = (r'(package_.*_(installed|removed))|' +
929
                 r'(service_.*_(enabled|disabled))|' +
930
                 r'install_smartcard_packages$')
931
        priority_order = ["installed", "install_smartcard_packages", "removed",
932
                          "enabled", "disabled"]
933
        rules_in_group = reorder_according_to_ordering(rules_in_group, priority_order, regex)
934
935
        # Add rules in priority order, first all packages installed, then removed,
936
        # followed by services enabled, then disabled
937
        for rule_id in rules_in_group:
938
            group.append(self.rules.get(rule_id).to_xml_element())
939
940
        # Add the sub groups after any current level group rules.
941
        # As package installed/removed and service enabled/disabled rules are usuallly in
942
        # top level group, this ensures groups that further configure a package or service
943
        # are after rules that install or remove it.
944
        groups_in_group = list(self.groups.keys())
945
        priority_order = [
946
            # Make sure rpm_verify_(hashes|permissions|ownership) are run before any other rule.
947
            # Due to conflicts between rules rpm_verify_* rules and any rule that configures
948
            # stricter settings, like file_permissions_grub2_cfg and sudo_dedicated_group,
949
            # the rules deviating from the system default should be evaluated later.
950
            # So that in the end the system has contents, permissions and ownership reset, and
951
            # any deviations or stricter settings are applied by the rules in the profile.
952
            "software", "integrity", "integrity-software", "rpm_verification",
953
954
            # The account group has to precede audit group because
955
            # the rule package_screen_installed is desired to be executed before the rule
956
            # audit_rules_privileged_commands, othervise the rule
957
            # does not catch newly installed screen binary during remediation
958
            # and report fail
959
            "accounts", "auditing",
960
961
962
            # The FIPS group should come before Crypto,
963
            # if we want to set a different (stricter) Crypto Policy than FIPS.
964
            "fips", "crypto",
965
966
            # The firewalld_activation must come before ruleset_modifications, othervise
967
            # remediations for ruleset_modifications won't work
968
            "firewalld_activation", "ruleset_modifications",
969
970
            # Rules from group disabling_ipv6 must precede rules from configuring_ipv6,
971
            # otherwise the remediation prints error although it is successful
972
            "disabling_ipv6", "configuring_ipv6"
973
        ]
974
        groups_in_group = reorder_according_to_ordering(groups_in_group, priority_order)
975
        for group_id in groups_in_group:
976
            _group = self.groups[group_id]
977
            group.append(_group.to_xml_element())
978
979
        return group
980
981 2
    def to_file(self, file_name):
982
        root = self.to_xml_element()
983
        tree = ET.ElementTree(root)
984
        tree.write(file_name)
985
986 2
    def add_value(self, value):
987
        if value is None:
988
            return
989
        self.values[value.id_] = value
990
991 2
    def add_group(self, group, env_yaml=None):
992
        if group is None:
993
            return
994
        if self.platforms and not group.platforms:
995
            group.platforms = self.platforms
996
        self.groups[group.id_] = group
997
        self._pass_our_properties_on_to(group)
998
999
        # Once the group has inherited properties, update cpe_names
1000
        if env_yaml:
1001
            for platform in group.platforms:
1002
                try:
1003
                    group.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform))
1004
                except CPEDoesNotExist:
1005
                    print("Unsupported platform '%s' in group '%s'." % (platform, group.id_))
1006
                    raise
1007
1008
1009 2
    def _pass_our_properties_on_to(self, obj):
1010
        for attr in self.ATTRIBUTES_TO_PASS_ON:
1011
            if hasattr(obj, attr) and getattr(obj, attr) is None:
1012
                setattr(obj, attr, getattr(self, attr))
1013
1014 2
    def add_rule(self, rule, env_yaml=None):
1015
        if rule is None:
1016
            return
1017
        if self.platforms and not rule.platforms:
1018
            rule.platforms = self.platforms
1019
        self.rules[rule.id_] = rule
1020
        self._pass_our_properties_on_to(rule)
1021
1022
        # Once the rule has inherited properties, update cpe_names
1023
        if env_yaml:
1024
            for platform in rule.platforms:
1025
                try:
1026
                    rule.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform))
1027
                except CPEDoesNotExist:
1028
                    print("Unsupported platform '%s' in rule '%s'." % (platform, rule.id_))
1029
                    raise
1030
1031 2
    def __str__(self):
1032
        return self.id_
1033
1034
1035 2
class Rule(object):
1036
    """Represents XCCDF Rule
1037
    """
1038 2
    YAML_KEYS_DEFAULTS = {
1039
        "prodtype": lambda: "all",
1040
        "title": lambda: RuntimeError("Missing key 'title'"),
1041
        "description": lambda: RuntimeError("Missing key 'description'"),
1042
        "rationale": lambda: RuntimeError("Missing key 'rationale'"),
1043
        "severity": lambda: RuntimeError("Missing key 'severity'"),
1044
        "references": lambda: dict(),
1045
        "identifiers": lambda: dict(),
1046
        "ocil_clause": lambda: None,
1047
        "ocil": lambda: None,
1048
        "oval_external_content": lambda: None,
1049
        "warnings": lambda: list(),
1050
        "conflicts": lambda: list(),
1051
        "requires": lambda: list(),
1052
        "platform": lambda: None,
1053
        "platforms": lambda: set(),
1054
        "inherited_platforms": lambda: list(),
1055
        "template": lambda: None,
1056
        "definition_location": lambda: None,
1057
    }
1058
1059 2
    PRODUCT_REFERENCES = ("stigid", "cis",)
1060 2
    GLOBAL_REFERENCES = ("srg", "vmmsrg", "disa", "cis-csc",)
1061
1062 2
    def __init__(self, id_):
1063 2
        self.id_ = id_
1064 2
        self.prodtype = "all"
1065 2
        self.title = ""
1066 2
        self.description = ""
1067 2
        self.definition_location = ""
1068 2
        self.rationale = ""
1069 2
        self.severity = "unknown"
1070 2
        self.references = {}
1071 2
        self.identifiers = {}
1072 2
        self.ocil_clause = None
1073 2
        self.ocil = None
1074 2
        self.oval_external_content = None
1075 2
        self.warnings = []
1076 2
        self.requires = []
1077 2
        self.conflicts = []
1078
        # self.platforms is used further in the build system
1079
        # self.platform is merged into self.platforms
1080
        # it is here for backward compatibility
1081 2
        self.platform = None
1082 2
        self.platforms = set()
1083 2
        self.cpe_names = set()
1084 2
        self.inherited_platforms = [] # platforms inherited from the group
1085 2
        self.sce_metadata = None
1086 2
        self.template = None
1087 2
        self.local_env_yaml = None
1088 2
        self.current_product = None
1089
1090 2
    @classmethod
1091 2
    def from_yaml(cls, yaml_file, env_yaml=None, sce_metadata=None):
1092 2
        yaml_file = os.path.normpath(yaml_file)
1093
1094 2
        rule_id, ext = os.path.splitext(os.path.basename(yaml_file))
1095 2
        if rule_id == "rule" and ext == ".yml":
1096 2
            rule_id = get_rule_dir_id(yaml_file)
1097
1098 2
        local_env_yaml = None
1099 2
        if env_yaml:
1100
            local_env_yaml = dict()
1101
            local_env_yaml.update(env_yaml)
1102
            local_env_yaml["rule_id"] = rule_id
1103
1104 2
        yaml_contents = open_and_macro_expand(yaml_file, local_env_yaml)
1105 2
        if yaml_contents is None:
1106
            return None
1107
1108 2
        rule = cls(rule_id)
1109
1110 2
        if local_env_yaml:
1111
            rule.local_env_yaml = local_env_yaml
1112
1113 2
        try:
1114 2
            rule._set_attributes_from_dict(yaml_contents)
1115
        except RuntimeError as exc:
1116
            msg = ("Error processing '{fname}': {err}"
1117
                   .format(fname=yaml_file, err=str(exc)))
1118
            raise RuntimeError(msg)
1119
1120
        # platforms are read as list from the yaml file
1121
        # we need them to convert to set again
1122 2
        rule.platforms = set(rule.platforms)
1123
1124 2
        for warning_list in rule.warnings:
1125
            if len(warning_list) != 1:
1126
                raise ValueError("Only one key/value pair should exist for each dictionary")
1127
1128
        # ensure that content of rule.platform is in rule.platforms as
1129
        # well
1130 2
        if rule.platform is not None:
1131 2
            rule.platforms.add(rule.platform)
1132
1133
        # Convert the platform names to CPE names
1134
        # But only do it if an env_yaml was specified (otherwise there would be no product CPEs
1135
        # to lookup), and the rule's prodtype matches the product being built
1136 2
        if env_yaml and env_yaml["product"] in parse_prodtype(rule.prodtype):
1137
            for platform in rule.platforms:
1138
                try:
1139
                    rule.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform))
1140
                except CPEDoesNotExist:
1141
                    print("Unsupported platform '%s' in rule '%s'." % (platform, rule.id_))
1142
                    raise
1143
1144 2
        if yaml_contents:
1145
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
1146
                               % (yaml_file, yaml_contents))
1147
1148 2
        if not rule.definition_location:
1149 2
            rule.definition_location = yaml_file
1150
1151 2
        if sce_metadata and rule_id in sce_metadata:
1152
            rule.sce_metadata = sce_metadata[rule_id]
1153
1154 2
        if env_yaml and 'product' in env_yaml:
1155
            rule.current_product = env_yaml['product']
1156
1157 2
        rule.validate_prodtype(yaml_file)
1158 2
        rule.validate_identifiers(yaml_file)
1159 2
        rule.validate_references(yaml_file)
1160 2
        return rule
1161
1162 2
    def _verify_stigid_format(self, product):
1163 2
        stig_id = self.references.get("stigid", None)
1164 2
        if not stig_id:
1165 2
            return
1166 2
        if "," in stig_id:
1167 2
            raise ValueError("Rules can not have multiple STIG IDs.")
1168
1169 2
    def _verify_disa_cci_format(self):
1170 2
        cci_id = self.references.get("disa", None)
1171 2
        if not cci_id:
1172 2
            return
1173
        cci_ex = re.compile(r'^CCI-[0-9]{6}$')
1174
        for cci in cci_id.split(","):
1175
            if not cci_ex.match(cci):
1176
                raise ValueError("CCI '{}' is in the wrong format! "
1177
                                 "Format should be similar to: "
1178
                                 "CCI-XXXXXX".format(cci))
1179
        self.references["disa"] = cci_id
1180
1181 2
    def normalize(self, product):
1182 2
        try:
1183 2
            self.make_refs_and_identifiers_product_specific(product)
1184 2
            self.make_template_product_specific(product)
1185 2
        except Exception as exc:
1186 2
            msg = (
1187
                "Error normalizing '{rule}': {msg}"
1188
                .format(rule=self.id_, msg=str(exc))
1189
            )
1190 2
            raise RuntimeError(msg)
1191
1192 2
    def _get_product_only_references(self):
1193 2
        product_references = dict()
1194
1195 2
        for ref in Rule.PRODUCT_REFERENCES:
1196 2
            start = "{0}@".format(ref)
1197 2
            for gref, gval in self.references.items():
1198 2
                if ref == gref or gref.startswith(start):
1199 2
                    product_references[gref] = gval
1200 2
        return product_references
1201
1202 2
    def make_template_product_specific(self, product):
1203 2
        product_suffix = "@{0}".format(product)
1204
1205 2
        if not self.template:
1206
            return
1207
1208 2
        not_specific_vars = self.template.get("vars", dict())
1209 2
        specific_vars = self._make_items_product_specific(
1210
            not_specific_vars, product_suffix, True)
1211 2
        self.template["vars"] = specific_vars
1212
1213 2
        not_specific_backends = self.template.get("backends", dict())
1214 2
        specific_backends = self._make_items_product_specific(
1215
            not_specific_backends, product_suffix, True)
1216 2
        self.template["backends"] = specific_backends
1217
1218 2
    def make_refs_and_identifiers_product_specific(self, product):
1219 2
        product_suffix = "@{0}".format(product)
1220
1221 2
        product_references = self._get_product_only_references()
1222 2
        general_references = self.references.copy()
1223 2
        for todel in product_references:
1224 2
            general_references.pop(todel)
1225 2
        for ref in Rule.PRODUCT_REFERENCES:
1226 2
            if ref in general_references:
1227
                msg = "Unexpected reference identifier ({0}) without "
1228
                msg += "product qualifier ({0}@{1}) while building rule "
1229
                msg += "{2}"
1230
                msg = msg.format(ref, product, self.id_)
1231
                raise ValueError(msg)
1232
1233 2
        to_set = dict(
1234
            identifiers=(self.identifiers, False),
1235
            general_references=(general_references, True),
1236
            product_references=(product_references, False),
1237
        )
1238 2
        for name, (dic, allow_overwrites) in to_set.items():
1239 2
            try:
1240 2
                new_items = self._make_items_product_specific(
1241
                    dic, product_suffix, allow_overwrites)
1242 2
            except ValueError as exc:
1243 2
                msg = (
1244
                    "Error processing {what} for rule '{rid}': {msg}"
1245
                    .format(what=name, rid=self.id_, msg=str(exc))
1246
                )
1247 2
                raise ValueError(msg)
1248 2
            dic.clear()
1249 2
            dic.update(new_items)
1250
1251 2
        self.references = general_references
1252 2
        self._verify_disa_cci_format()
1253 2
        self.references.update(product_references)
1254
1255 2
        self._verify_stigid_format(product)
1256
1257 2
    def _make_items_product_specific(self, items_dict, product_suffix, allow_overwrites=False):
1258 2
        new_items = dict()
1259 2
        for full_label, value in items_dict.items():
1260 2
            if "@" not in full_label and full_label not in new_items:
1261 2
                new_items[full_label] = value
1262 2
                continue
1263
1264 2
            label = full_label.split("@")[0]
1265
1266
            # this test should occur before matching product_suffix with the product qualifier
1267
            # present in the reference, so it catches problems even for products that are not
1268
            # being built at the moment
1269 2
            if label in Rule.GLOBAL_REFERENCES:
1270
                msg = (
1271
                    "You cannot use product-qualified for the '{item_u}' reference. "
1272
                    "Please remove the product-qualifier and merge values with the "
1273
                    "existing reference if there is any. Original line: {item_q}: {value_q}"
1274
                    .format(item_u=label, item_q=full_label, value_q=value)
1275
                )
1276
                raise ValueError(msg)
1277
1278 2
            if not full_label.endswith(product_suffix):
1279 2
                continue
1280
1281 2
            if label in items_dict and not allow_overwrites and value != items_dict[label]:
1282 2
                msg = (
1283
                    "There is a product-qualified '{item_q}' item, "
1284
                    "but also an unqualified '{item_u}' item "
1285
                    "and those two differ in value - "
1286
                    "'{value_q}' vs '{value_u}' respectively."
1287
                    .format(item_q=full_label, item_u=label,
1288
                            value_q=value, value_u=items_dict[label])
1289
                )
1290 2
                raise ValueError(msg)
1291 2
            new_items[label] = value
1292 2
        return new_items
1293
1294 2
    def _set_attributes_from_dict(self, yaml_contents):
1295 2
        for key, default_getter in self.YAML_KEYS_DEFAULTS.items():
1296 2
            if key not in yaml_contents:
1297 2
                value = default_getter()
1298 2
                if isinstance(value, Exception):
1299
                    raise value
1300
            else:
1301 2
                value = yaml_contents.pop(key)
1302
1303 2
            setattr(self, key, value)
1304
1305 2
    def to_contents_dict(self):
1306
        """
1307
        Returns a dictionary that is the same schema as the dict obtained when loading rule YAML.
1308
        """
1309
1310 2
        yaml_contents = dict()
1311 2
        for key in Rule.YAML_KEYS_DEFAULTS:
1312 2
            yaml_contents[key] = getattr(self, key)
1313
1314 2
        return yaml_contents
1315
1316 2
    def validate_identifiers(self, yaml_file):
1317 2
        if self.identifiers is None:
1318
            raise ValueError("Empty identifier section in file %s" % yaml_file)
1319
1320
        # Validate all identifiers are non-empty:
1321 2
        for ident_type, ident_val in self.identifiers.items():
1322 2
            if not isinstance(ident_type, str) or not isinstance(ident_val, str):
1323
                raise ValueError("Identifiers and values must be strings: %s in file %s"
1324
                                 % (ident_type, yaml_file))
1325 2
            if ident_val.strip() == "":
1326
                raise ValueError("Identifiers must not be empty: %s in file %s"
1327
                                 % (ident_type, yaml_file))
1328 2
            if ident_type[0:3] == 'cce':
1329 2
                if not is_cce_format_valid(ident_val):
1330
                    raise ValueError("CCE Identifier format must be valid: invalid format '%s' for CEE '%s'"
1331
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
1332 2
                if not is_cce_value_valid("CCE-" + ident_val):
1333
                    raise ValueError("CCE Identifier value is not a valid checksum: invalid value '%s' for CEE '%s'"
1334
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
1335
1336 2
    def validate_references(self, yaml_file):
1337 2
        if self.references is None:
1338
            raise ValueError("Empty references section in file %s" % yaml_file)
1339
1340 2
        for ref_type, ref_val in self.references.items():
1341 2
            if not isinstance(ref_type, str) or not isinstance(ref_val, str):
1342
                raise ValueError("References and values must be strings: %s in file %s"
1343
                                 % (ref_type, yaml_file))
1344 2
            if ref_val.strip() == "":
1345
                raise ValueError("References must not be empty: %s in file %s"
1346
                                 % (ref_type, yaml_file))
1347
1348 2
        for ref_type, ref_val in self.references.items():
1349 2
            for ref in ref_val.split(","):
1350 2
                if ref.strip() != ref:
1351
                    msg = (
1352
                        "Comma-separated '{ref_type}' reference "
1353
                        "in {yaml_file} contains whitespace."
1354
                        .format(ref_type=ref_type, yaml_file=yaml_file))
1355
                    raise ValueError(msg)
1356
1357 2
    def validate_prodtype(self, yaml_file):
1358 2
        for ptype in self.prodtype.split(","):
1359 2
            if ptype.strip() != ptype:
1360
                msg = (
1361
                    "Comma-separated '{prodtype}' prodtype "
1362
                    "in {yaml_file} contains whitespace."
1363
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
1364
                raise ValueError(msg)
1365
1366 2
    def to_xml_element(self):
1367
        rule = ET.Element('Rule')
1368
        rule.set('id', self.id_)
1369
        if self.prodtype != "all":
1370
            rule.set("prodtype", self.prodtype)
1371
        rule.set('severity', self.severity)
1372
        add_sub_element(rule, 'title', self.title)
1373
        add_sub_element(rule, 'description', self.description)
1374
        add_sub_element(rule, 'rationale', self.rationale)
1375
1376
        main_ident = ET.Element('ident')
1377
        for ident_type, ident_val in self.identifiers.items():
1378
            # This is not true if items were normalized
1379
            if '@' in ident_type:
1380
                # the ident is applicable only on some product
1381
                # format : 'policy@product', eg. 'stigid@product'
1382
                # for them, we create a separate <ref> element
1383
                policy, product = ident_type.split('@')
1384
                ident = ET.SubElement(rule, 'ident')
1385
                ident.set(policy, ident_val)
1386
                ident.set('prodtype', product)
1387
            else:
1388
                main_ident.set(ident_type, ident_val)
1389
1390
        if main_ident.attrib:
1391
            rule.append(main_ident)
1392
1393
        main_ref = ET.Element('ref')
1394
        for ref_type, ref_val in self.references.items():
1395
            # This is not true if items were normalized
1396
            if '@' in ref_type:
1397
                # the reference is applicable only on some product
1398
                # format : 'policy@product', eg. 'stigid@product'
1399
                # for them, we create a separate <ref> element
1400
                policy, product = ref_type.split('@')
1401
                ref = ET.SubElement(rule, 'ref')
1402
                ref.set(policy, ref_val)
1403
                ref.set('prodtype', product)
1404
            else:
1405
                main_ref.set(ref_type, ref_val)
1406
1407
        if main_ref.attrib:
1408
            rule.append(main_ref)
1409
1410
        if self.oval_external_content:
1411
            check = ET.SubElement(rule, 'check')
1412
            check.set("system", "http://oval.mitre.org/XMLSchema/oval-definitions-5")
1413
            external_content = ET.SubElement(check, "check-content-ref")
1414
            external_content.set("href", self.oval_external_content)
1415
        else:
1416
            # TODO: This is pretty much a hack, oval ID will be the same as rule ID
1417
            #       and we don't want the developers to have to keep them in sync.
1418
            #       Therefore let's just add an OVAL ref of that ID.
1419
            oval_ref = ET.SubElement(rule, "oval")
1420
            oval_ref.set("id", self.id_)
1421
1422
        if self.sce_metadata:
1423
            # TODO: This is pretty much another hack, just like the previous OVAL
1424
            # one. However, we avoided the external SCE content as I'm not sure it
1425
            # is generally useful (unlike say, CVE checking with external OVAL)
1426
            #
1427
            # Additionally, we build the content (check subelement) here rather
1428
            # than in xslt due to the nature of our SCE metadata.
1429
            check = ET.SubElement(rule, "check")
1430
            check.set("system", SCE_SYSTEM)
1431
1432
            if 'check-import' in self.sce_metadata:
1433
                if isinstance(self.sce_metadata['check-import'], str):
1434
                    self.sce_metadata['check-import'] = [self.sce_metadata['check-import']]
1435
                for entry in self.sce_metadata['check-import']:
1436
                    check_import = ET.SubElement(check, 'check-import')
1437
                    check_import.set('import-name', entry)
1438
                    check_import.text = None
1439
1440
            if 'check-export' in self.sce_metadata:
1441
                if isinstance(self.sce_metadata['check-export'], str):
1442
                    self.sce_metadata['check-export'] = [self.sce_metadata['check-export']]
1443
                for entry in self.sce_metadata['check-export']:
1444
                    value, export = entry.split('=')
1445
                    check_export = ET.SubElement(check, 'check-export')
1446
                    check_export.set('value-id', value)
1447
                    check_export.set('export-name', export)
1448
                    check_export.text = None
1449
1450
            check_ref = ET.SubElement(check, "check-content-ref")
1451
            href = self.current_product + "/checks/sce/" + self.sce_metadata['filename']
1452
            check_ref.set("href", href)
1453
1454
        if self.ocil or self.ocil_clause:
1455
            ocil = add_sub_element(rule, 'ocil', self.ocil if self.ocil else "")
1456
            if self.ocil_clause:
1457
                ocil.set("clause", self.ocil_clause)
1458
1459
        add_warning_elements(rule, self.warnings)
1460
        add_nondata_subelements(rule, "requires", "id", self.requires)
1461
        add_nondata_subelements(rule, "conflicts", "id", self.conflicts)
1462
1463
        for cpe_name in self.cpe_names:
1464
            platform_el = ET.SubElement(rule, "platform")
1465
            platform_el.set("idref", cpe_name)
1466
1467
        return rule
1468
1469 2
    def to_file(self, file_name):
1470
        root = self.to_xml_element()
1471
        tree = ET.ElementTree(root)
1472
        tree.write(file_name)
1473
1474
1475 2
class DirectoryLoader(object):
1476 2
    def __init__(self, profiles_dir, bash_remediation_fns, env_yaml):
1477
        self.benchmark_file = None
1478
        self.group_file = None
1479
        self.loaded_group = None
1480
        self.rule_files = []
1481
        self.value_files = []
1482
        self.subdirectories = []
1483
1484
        self.all_values = set()
1485
        self.all_rules = set()
1486
        self.all_groups = set()
1487
1488
        self.profiles_dir = profiles_dir
1489
        self.bash_remediation_fns = bash_remediation_fns
1490
        self.env_yaml = env_yaml
1491
        self.product = env_yaml["product"]
1492
1493
        self.parent_group = None
1494
1495 2
    def _collect_items_to_load(self, guide_directory):
1496
        for dir_item in sorted(os.listdir(guide_directory)):
1497
            dir_item_path = os.path.join(guide_directory, dir_item)
1498
            _, extension = os.path.splitext(dir_item)
1499
1500
            if extension == '.var':
1501
                self.value_files.append(dir_item_path)
1502
            elif dir_item == "benchmark.yml":
1503
                if self.benchmark_file:
1504
                    raise ValueError("Multiple benchmarks in one directory")
1505
                self.benchmark_file = dir_item_path
1506
            elif dir_item == "group.yml":
1507
                if self.group_file:
1508
                    raise ValueError("Multiple groups in one directory")
1509
                self.group_file = dir_item_path
1510
            elif extension == '.rule':
1511
                self.rule_files.append(dir_item_path)
1512
            elif is_rule_dir(dir_item_path):
1513
                self.rule_files.append(get_rule_dir_yaml(dir_item_path))
1514
            elif dir_item != "tests":
1515
                if os.path.isdir(dir_item_path):
1516
                    self.subdirectories.append(dir_item_path)
1517
                else:
1518
                    sys.stderr.write(
1519
                        "Encountered file '%s' while recursing, extension '%s' "
1520
                        "is unknown. Skipping..\n"
1521
                        % (dir_item, extension)
1522
                    )
1523
1524 2
    def load_benchmark_or_group(self, guide_directory):
1525
        """
1526
        Loads a given benchmark or group from the specified benchmark_file or
1527
        group_file, in the context of guide_directory, profiles_dir,
1528
        env_yaml, and bash_remediation_fns.
1529
1530
        Returns the loaded group or benchmark.
1531
        """
1532
        group = None
1533
        if self.group_file and self.benchmark_file:
1534
            raise ValueError("A .benchmark file and a .group file were found in "
1535
                             "the same directory '%s'" % (guide_directory))
1536
1537
        # we treat benchmark as a special form of group in the following code
1538
        if self.benchmark_file:
1539
            group = Benchmark.from_yaml(
1540
                self.benchmark_file, 'product-name', self.env_yaml
1541
            )
1542
            if self.profiles_dir:
1543
                group.add_profiles_from_dir(self.profiles_dir, self.env_yaml)
1544
            group.add_bash_remediation_fns_from_file(self.bash_remediation_fns)
1545
1546
        if self.group_file:
1547
            group = Group.from_yaml(self.group_file, self.env_yaml)
1548
            self.all_groups.add(group.id_)
1549
1550
        return group
1551
1552 2
    def _load_group_process_and_recurse(self, guide_directory):
1553
        self.loaded_group = self.load_benchmark_or_group(guide_directory)
1554
1555
        if self.loaded_group:
1556
            if self.parent_group:
1557
                self.parent_group.add_group(self.loaded_group, env_yaml=self.env_yaml)
1558
1559
            self._process_values()
1560
            self._recurse_into_subdirs()
1561
            self._process_rules()
1562
1563 2
    def process_directory_tree(self, start_dir, extra_group_dirs=None):
1564
        self._collect_items_to_load(start_dir)
1565
        if extra_group_dirs is not None:
1566
            self.subdirectories += extra_group_dirs
1567
        self._load_group_process_and_recurse(start_dir)
1568
1569 2
    def _recurse_into_subdirs(self):
1570
        for subdir in self.subdirectories:
1571
            loader = self._get_new_loader()
1572
            loader.parent_group = self.loaded_group
1573
            loader.process_directory_tree(subdir)
1574
            self.all_values.update(loader.all_values)
1575
            self.all_rules.update(loader.all_rules)
1576
            self.all_groups.update(loader.all_groups)
1577
1578 2
    def _get_new_loader(self):
1579
        raise NotImplementedError()
1580
1581 2
    def _process_values(self):
1582
        raise NotImplementedError()
1583
1584 2
    def _process_rules(self):
1585
        raise NotImplementedError()
1586
1587
1588 2
class BuildLoader(DirectoryLoader):
1589 2
    def __init__(self, profiles_dir, bash_remediation_fns, env_yaml,
1590
                 resolved_rules_dir=None, sce_metadata_path=None):
1591
        super(BuildLoader, self).__init__(profiles_dir, bash_remediation_fns, env_yaml)
1592
1593
        self.resolved_rules_dir = resolved_rules_dir
1594
        if resolved_rules_dir and not os.path.isdir(resolved_rules_dir):
1595
            os.mkdir(resolved_rules_dir)
1596
1597
        self.sce_metadata = None
1598
        if sce_metadata_path and os.path.getsize(sce_metadata_path):
1599
            self.sce_metadata = json.load(open(sce_metadata_path, 'r'))
1600
1601 2
    def _process_values(self):
1602
        for value_yaml in self.value_files:
1603
            value = Value.from_yaml(value_yaml, self.env_yaml)
1604
            self.all_values.add(value)
1605
            self.loaded_group.add_value(value)
1606
1607 2
    def _process_rules(self):
1608
        for rule_yaml in self.rule_files:
1609
            try:
1610
                rule = Rule.from_yaml(rule_yaml, self.env_yaml, self.sce_metadata)
1611
            except DocumentationNotComplete:
1612
                # Happens on non-debug build when a rule is "documentation-incomplete"
1613
                continue
1614
            prodtypes = parse_prodtype(rule.prodtype)
1615
            if "all" not in prodtypes and self.product not in prodtypes:
1616
                continue
1617
            self.all_rules.add(rule)
1618
            self.loaded_group.add_rule(rule, env_yaml=self.env_yaml)
1619
1620
            if self.loaded_group.platforms:
1621
                rule.inherited_platforms += self.loaded_group.platforms
1622
1623
            if self.resolved_rules_dir:
1624
                output_for_rule = os.path.join(
1625
                    self.resolved_rules_dir, "{id_}.yml".format(id_=rule.id_))
1626
                mkdir_p(self.resolved_rules_dir)
1627
                with open(output_for_rule, "w") as f:
1628
                    rule.normalize(self.env_yaml["product"])
1629
                    yaml.dump(rule.to_contents_dict(), f)
1630
1631 2
    def _get_new_loader(self):
1632
        loader = BuildLoader(
1633
            self.profiles_dir, self.bash_remediation_fns, self.env_yaml, self.resolved_rules_dir)
1634
        # Do it this way so we only have to parse the SCE metadata once.
1635
        loader.sce_metadata = self.sce_metadata
1636
        return loader
1637
1638 2
    def export_group_to_file(self, filename):
1639
        return self.loaded_group.to_file(filename)
1640