Test Failed
Push — master ( 7d44fc...438145 )
by Jan
01:19 queued 16s
created

ssg.build_yaml.Benchmark.to_file()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.4218

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 2
dl 0
loc 4
ccs 1
cts 4
cp 0.25
crap 1.4218
rs 10
c 0
b 0
f 0
1 2
from __future__ import absolute_import
2 2
from __future__ import print_function
3
4 2
from collections import defaultdict
5 2
from copy import deepcopy
6 2
import datetime
7 2
import json
8 2
import os
9 2
import os.path
10 2
import re
11 2
import sys
12 2
from xml.sax.saxutils import escape
13
14 2
import yaml
15
16 2
from .build_cpe import CPEDoesNotExist
17 2
from .constants import XCCDF_REFINABLE_PROPERTIES, SCE_SYSTEM
18 2
from .rules import get_rule_dir_id, get_rule_dir_yaml, is_rule_dir
19 2
from .rule_yaml import parse_prodtype
20
21 2
from .cce import is_cce_format_valid, is_cce_value_valid
22 2
from .yaml import DocumentationNotComplete, open_and_expand, open_and_macro_expand
23 2
from .utils import required_key, mkdir_p
24
25 2
from .xml import ElementTree as ET
26 2
from .shims import unicode_func
27
28
29 2
def add_sub_element(parent, tag, data):
30
    """
31
    Creates a new child element under parent with tag tag, and sets
32
    data as the content under the tag. In particular, data is a string
33
    to be parsed as an XML tree, allowing sub-elements of children to be
34
    added.
35
36
    If data should not be parsed as an XML tree, either escape the contents
37
    before passing into this function, or use ElementTree.SubElement().
38
39
    Returns the newly created subelement of type tag.
40
    """
41
    # This is used because our YAML data contain XML and XHTML elements
42
    # ET.SubElement() escapes the < > characters by &lt; and &gt;
43
    # and therefore it does not add child elements
44
    # we need to do a hack instead
45
    # TODO: Remove this function after we move to Markdown everywhere in SSG
46
    ustr = unicode_func("<{0}>{1}</{0}>").format(tag, data)
47
48
    try:
49
        element = ET.fromstring(ustr.encode("utf-8"))
50
    except Exception:
51
        msg = ("Error adding subelement to an element '{0}' from string: '{1}'"
52
               .format(parent.tag, ustr))
53
        raise RuntimeError(msg)
54
55
    parent.append(element)
56
    return element
57
58
59 2
def reorder_according_to_ordering(unordered, ordering, regex=None):
60 2
    ordered = []
61 2
    if regex is None:
62 2
        regex = "|".join(["({0})".format(item) for item in ordering])
63 2
    regex = re.compile(regex)
64
65 2
    items_to_order = list(filter(regex.match, unordered))
66 2
    unordered = set(unordered)
67
68 2
    for priority_type in ordering:
69 2
        for item in items_to_order:
70 2
            if priority_type in item and item in unordered:
71 2
                ordered.append(item)
72 2
                unordered.remove(item)
73 2
    ordered.extend(sorted(unordered))
74 2
    return ordered
75
76
77 2
def add_warning_elements(element, warnings):
78
    # The use of [{dict}, {dict}] in warnings is to handle the following
79
    # scenario where multiple warnings have the same category which is
80
    # valid in SCAP and our content:
81
    #
82
    # warnings:
83
    #     - general: Some general warning
84
    #     - general: Some other general warning
85
    #     - general: |-
86
    #         Some really long multiline general warning
87
    #
88
    # Each of the {dict} should have only one key/value pair.
89
    for warning_dict in warnings:
90
        warning = add_sub_element(element, "warning", list(warning_dict.values())[0])
91
        warning.set("category", list(warning_dict.keys())[0])
92
93
94 2
def add_nondata_subelements(element, subelement, attribute, attr_data):
95
    """Add multiple iterations of a sublement that contains an attribute but no data
96
       For example, <requires id="my_required_id"/>"""
97
    for data in attr_data:
98
        req = ET.SubElement(element, subelement)
99
        req.set(attribute, data)
100
101
102 2
class Profile(object):
103
    """Represents XCCDF profile
104
    """
105
106 2
    def __init__(self, id_):
107 2
        self.id_ = id_
108 2
        self.title = ""
109 2
        self.description = ""
110 2
        self.extends = None
111 2
        self.selected = []
112 2
        self.unselected = []
113 2
        self.variables = dict()
114 2
        self.refine_rules = defaultdict(list)
115 2
        self.metadata = None
116 2
        self.reference = None
117
        # self.platforms is used further in the build system
118
        # self.platform is merged into self.platforms
119
        # it is here for backward compatibility
120 2
        self.platforms = set()
121 2
        self.cpe_names = set()
122 2
        self.platform = None
123
124
125 2
    def read_yaml_contents(self, yaml_contents):
126 2
        self.title = required_key(yaml_contents, "title")
127 2
        del yaml_contents["title"]
128 2
        self.description = required_key(yaml_contents, "description")
129 2
        del yaml_contents["description"]
130 2
        self.extends = yaml_contents.pop("extends", None)
131 2
        selection_entries = required_key(yaml_contents, "selections")
132 2
        if selection_entries:
133 2
            self._parse_selections(selection_entries)
134 2
        del yaml_contents["selections"]
135 2
        self.platforms = yaml_contents.pop("platforms", set())
136 2
        self.platform = yaml_contents.pop("platform", None)
137
138 2
    @classmethod
139 2
    def from_yaml(cls, yaml_file, env_yaml=None):
140 2
        yaml_contents = open_and_expand(yaml_file, env_yaml)
141 2
        if yaml_contents is None:
142
            return None
143
144 2
        basename, _ = os.path.splitext(os.path.basename(yaml_file))
145
146 2
        profile = cls(basename)
147 2
        profile.read_yaml_contents(yaml_contents)
148
149 2
        profile.reference = yaml_contents.pop("reference", None)
150
        # ensure that content of profile.platform is in profile.platforms as
151
        # well
152 2
        if profile.platform is not None:
153
            profile.platforms.add(profile.platform)
154
155 2
        if env_yaml:
156
            for platform in profile.platforms:
157
                try:
158
                    profile.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform))
159
                except CPEDoesNotExist:
160
                    print("Unsupported platform '%s' in profile '%s'." % (platform, profile.id_))
161
                    raise
162
163
        # At the moment, metadata is not used to build content
164 2
        if "metadata" in yaml_contents:
165
            del yaml_contents["metadata"]
166
167 2
        if yaml_contents:
168
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
169
                               % (yaml_file, yaml_contents))
170
171 2
        return profile
172
173 2
    def dump_yaml(self, file_name, documentation_complete=True):
174
        to_dump = {}
175
        to_dump["documentation_complete"] = documentation_complete
176
        to_dump["title"] = self.title
177
        to_dump["description"] = self.description
178
        to_dump["reference"] = self.reference
179
        if self.metadata is not None:
180
            to_dump["metadata"] = self.metadata
181
182
        if self.extends is not None:
183
            to_dump["extends"] = self.extends
184
185
        if self.platforms:
186
            to_dump["platforms"] = self.platforms
187
188
        selections = []
189
        for item in self.selected:
190
            selections.append(item)
191
        for item in self.unselected:
192
            selections.append("!"+item)
193
        for varname in self.variables.keys():
194
            selections.append(varname+"="+self.variables.get(varname))
195
        for rule, refinements in self.refine_rules.items():
196
            for prop, val in refinements:
197
                selections.append("{rule}.{property}={value}"
198
                                  .format(rule=rule, property=prop, value=val))
199
        to_dump["selections"] = selections
200
        with open(file_name, "w+") as f:
201
            yaml.dump(to_dump, f, indent=4)
202
203 2
    def _parse_selections(self, entries):
204 2
        for item in entries:
205 2
            self.apply_selection(item)
206
207 2
    def apply_selection(self, item):
208 2
        if "." in item:
209
            rule, refinement = item.split(".", 1)
210
            property_, value = refinement.split("=", 1)
211
            if property_ not in XCCDF_REFINABLE_PROPERTIES:
212
                msg = ("Property '{property_}' cannot be refined. "
213
                       "Rule properties that can be refined are {refinables}. "
214
                       "Fix refinement '{rule_id}.{property_}={value}' in profile '{profile}'."
215
                       .format(property_=property_, refinables=XCCDF_REFINABLE_PROPERTIES,
216
                               rule_id=rule, value=value, profile=self.id_)
217
                       )
218
                raise ValueError(msg)
219
            self.refine_rules[rule].append((property_, value))
220 2
        elif "=" in item:
221 2
            varname, value = item.split("=", 1)
222 2
            self.variables[varname] = value
223 2
        elif item.startswith("!"):
224
            self.unselected.append(item[1:])
225
        else:
226 2
            self.selected.append(item)
227
228 2
    def to_xml_element(self):
229
        element = ET.Element('Profile')
230
        element.set("id", self.id_)
231
        if self.extends:
232
            element.set("extends", self.extends)
233
        title = add_sub_element(element, "title", self.title)
234
        title.set("override", "true")
235
        desc = add_sub_element(element, "description", self.description)
236
        desc.set("override", "true")
237
238
        if self.reference:
239
            add_sub_element(element, "reference", escape(self.reference))
240
241
        for cpe_name in self.cpe_names:
242
            plat = ET.SubElement(element, "platform")
243
            plat.set("idref", cpe_name)
244
245
        for selection in self.selected:
246
            select = ET.Element("select")
247
            select.set("idref", selection)
248
            select.set("selected", "true")
249
            element.append(select)
250
251
        for selection in self.unselected:
252
            unselect = ET.Element("select")
253
            unselect.set("idref", selection)
254
            unselect.set("selected", "false")
255
            element.append(unselect)
256
257
        for value_id, selector in self.variables.items():
258
            refine_value = ET.Element("refine-value")
259
            refine_value.set("idref", value_id)
260
            refine_value.set("selector", selector)
261
            element.append(refine_value)
262
263
        for refined_rule, refinement_list in self.refine_rules.items():
264
            refine_rule = ET.Element("refine-rule")
265
            refine_rule.set("idref", refined_rule)
266
            for refinement in refinement_list:
267
                refine_rule.set(refinement[0], refinement[1])
268
            element.append(refine_rule)
269
270
        return element
271
272 2
    def get_rule_selectors(self):
273 2
        return list(self.selected + self.unselected)
274
275 2
    def get_variable_selectors(self):
276 2
        return self.variables
277
278 2
    def validate_refine_rules(self, rules):
279
        existing_rule_ids = [r.id_ for r in rules]
280
        for refine_rule, refinement_list in self.refine_rules.items():
281
            # Take first refinement to ilustrate where the error is
282
            # all refinements in list are invalid, so it doesn't really matter
283
            a_refinement = refinement_list[0]
284
285
            if refine_rule not in existing_rule_ids:
286
                msg = (
287
                    "You are trying to refine a rule that doesn't exist. "
288
                    "Rule '{rule_id}' was not found in the benchmark. "
289
                    "Please check all rule refinements for rule: '{rule_id}', for example: "
290
                    "- {rule_id}.{property_}={value}' in profile {profile_id}."
291
                    .format(rule_id=refine_rule, profile_id=self.id_,
292
                            property_=a_refinement[0], value=a_refinement[1])
293
                    )
294
                raise ValueError(msg)
295
296
            if refine_rule not in self.get_rule_selectors():
297
                msg = ("- {rule_id}.{property_}={value}' in profile '{profile_id}' is refining "
298
                       "a rule that is not selected by it. The refinement will not have any "
299
                       "noticeable effect. Either select the rule or remove the rule refinement."
300
                       .format(rule_id=refine_rule, property_=a_refinement[0],
301
                               value=a_refinement[1], profile_id=self.id_)
302
                       )
303
                raise ValueError(msg)
304
305 2
    def validate_variables(self, variables):
306
        variables_by_id = dict()
307
        for var in variables:
308
            variables_by_id[var.id_] = var
309
310
        for var_id, our_val in self.variables.items():
311
            if var_id not in variables_by_id:
312
                all_vars_list = [" - %s" % v for v in variables_by_id.keys()]
313
                msg = (
314
                    "Value '{var_id}' in profile '{profile_name}' is not known. "
315
                    "We know only variables:\n{var_names}"
316
                    .format(
317
                        var_id=var_id, profile_name=self.id_,
318
                        var_names="\n".join(sorted(all_vars_list)))
319
                )
320
                raise ValueError(msg)
321
322
            allowed_selectors = [str(s) for s in variables_by_id[var_id].options.keys()]
323
            if our_val not in allowed_selectors:
324
                msg = (
325
                    "Value '{var_id}' in profile '{profile_name}' "
326
                    "uses the selector '{our_val}'. "
327
                    "This is not possible, as only selectors {all_selectors} are available. "
328
                    "Either change the selector used in the profile, or "
329
                    "add the selector-value pair to the variable definition."
330
                    .format(
331
                        var_id=var_id, profile_name=self.id_, our_val=our_val,
332
                        all_selectors=allowed_selectors,
333
                    )
334
                )
335
                raise ValueError(msg)
336
337 2
    def validate_rules(self, rules, groups):
338
        existing_rule_ids = [r.id_ for r in rules]
339
        rule_selectors = self.get_rule_selectors()
340
        for id_ in rule_selectors:
341
            if id_ in groups:
342
                msg = (
343
                    "You have selected a group '{group_id}' instead of a "
344
                    "rule. Groups have no effect in the profile and are not "
345
                    "allowed to be selected. Please remove '{group_id}' "
346
                    "from profile '{profile_id}' before proceeding."
347
                    .format(group_id=id_, profile_id=self.id_)
348
                )
349
                raise ValueError(msg)
350
            if id_ not in existing_rule_ids:
351
                msg = (
352
                    "Rule '{rule_id}' was not found in the benchmark. Please "
353
                    "remove rule '{rule_id}' from profile '{profile_id}' "
354
                    "before proceeding."
355
                    .format(rule_id=id_, profile_id=self.id_)
356
                )
357
                raise ValueError(msg)
358
359 2
    def __sub__(self, other):
360
        profile = Profile(self.id_)
361
        profile.title = self.title
362
        profile.description = self.description
363
        profile.extends = self.extends
364
        profile.platforms = self.platforms
365
        profile.platform = self.platform
366
        profile.selected = list(set(self.selected) - set(other.selected))
367
        profile.selected.sort()
368
        profile.unselected = list(set(self.unselected) - set(other.unselected))
369
        profile.variables = dict ((k, v) for (k, v) in self.variables.items()
370
                             if k not in other.variables or v != other.variables[k])
371
        return profile
372
373
374 2
class ResolvableProfile(Profile):
375 2
    def __init__(self, * args, ** kwargs):
376
        super(ResolvableProfile, self).__init__(* args, ** kwargs)
377
        self.resolved = False
378
        self.resolved_selections = set()
379
380 2
    def _controls_ids_to_controls(self, controls_manager, policy_id, control_id_list):
381
        items = [controls_manager.get_control(policy_id, cid) for cid in control_id_list]
382
        return items
383
384 2
    def _merge_control(self, control):
385
        self.selected.extend(control.rules)
386
        for varname, value in control.variables.items():
387
            if varname not in self.variables:
388
                self.variables[varname] = value
389
390 2
    def resolve_controls(self, controls_manager):
391
        pass
392
393 2
    def extend_by(self, extended_profile):
394
        extended_selects = set(extended_profile.selected)
395
        self.resolved_selections.update(extended_selects)
396
397
        updated_variables = dict(extended_profile.variables)
398
        updated_variables.update(self.variables)
399
        self.variables = updated_variables
400
401
        extended_refinements = deepcopy(extended_profile.refine_rules)
402
        updated_refinements = self._subtract_refinements(extended_refinements)
403
        updated_refinements.update(self.refine_rules)
404
        self.refine_rules = updated_refinements
405
406 2
    def resolve(self, all_profiles, controls_manager=None):
407
        if self.resolved:
408
            return
409
410
        self.resolve_controls(controls_manager)
411
412
        self.resolved_selections = set(self.selected)
413
414
        if self.extends:
415
            if self.extends not in all_profiles:
416
                msg = (
417
                    "Profile {name} extends profile {extended}, but "
418
                    "only profiles {known_profiles} are available for resolution."
419
                    .format(name=self.id_, extended=self.extends,
420
                            known_profiles=list(all_profiles.keys())))
421
                raise RuntimeError(msg)
422
            extended_profile = all_profiles[self.extends]
423
            extended_profile.resolve(all_profiles, controls_manager)
424
425
            self.extend_by(extended_profile)
426
427
        for uns in self.unselected:
428
            self.resolved_selections.discard(uns)
429
430
        self.unselected = []
431
        self.extends = None
432
433
        self.selected = sorted(self.resolved_selections)
434
435
        self.resolved = True
436
437 2
    def _subtract_refinements(self, extended_refinements):
438
        """
439
        Given a dict of rule refinements from the extended profile,
440
        "undo" every refinement prefixed with '!' in this profile.
441
        """
442
        for rule, refinements in list(self.refine_rules.items()):
443
            if rule.startswith("!"):
444
                for prop, val in refinements:
445
                    extended_refinements[rule[1:]].remove((prop, val))
446
                del self.refine_rules[rule]
447
        return extended_refinements
448
449
450 2
class ProfileWithSeparatePolicies(ResolvableProfile):
451 2
    def __init__(self, * args, ** kwargs):
452
        super(ProfileWithSeparatePolicies, self).__init__(* args, ** kwargs)
453
        self.policies = {}
454
455 2
    def read_yaml_contents(self, yaml_contents):
456
        policies = yaml_contents.pop("policies", None)
457
        if policies:
458
            self._parse_policies(policies)
459
        super(ProfileWithSeparatePolicies, self).read_yaml_contents(yaml_contents)
460
461 2
    def _parse_policies(self, policies_yaml):
462
        for item in policies_yaml:
463
            id_ = required_key(item, "id")
464
            controls_ids = required_key(item, "controls")
465
            if not isinstance(controls_ids, list):
466
                if controls_ids != "all":
467
                    msg = (
468
                        "Policy {id_} contains invalid controls list {controls}."
469
                        .format(id_=id_, controls=str(controls_ids)))
470
                    raise ValueError(msg)
471
            self.policies[id_] = controls_ids
472
473 2 View Code Duplication
    def _process_controls_ids_into_controls(self, controls_manager, policy_id, controls_ids):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
474
        controls = []
475
        for cid in controls_ids:
476
            if not cid.startswith("all"):
477
                controls.extend(
478
                    self._controls_ids_to_controls(controls_manager, policy_id, [cid]))
479
            elif ":" in cid:
480
                _, level_id = cid.split(":", 1)
481
                controls.extend(
482
                    controls_manager.get_all_controls_of_level(policy_id, level_id))
483
            else:
484
                controls.extend(controls_manager.get_all_controls(policy_id))
485
        return controls
486
487 2
    def resolve_controls(self, controls_manager):
488
        for policy_id, controls_ids in self.policies.items():
489
            controls = []
490
491
            if isinstance(controls_ids, list):
492
                controls = self._process_controls_ids_into_controls(
493
                    controls_manager, policy_id, controls_ids)
494
            elif controls_ids.startswith("all"):
495
                controls = self._process_controls_ids_into_controls(
496
                    controls_manager, policy_id, [controls_ids])
497
            else:
498
                msg = (
499
                    "Unknown policy content {content} in profile {profile_id}"
500
                    .format(content=controls_ids, profile_id=self.id_))
501
                raise ValueError(msg)
502
503
            for c in controls:
504
                self._merge_control(c)
505
506 2
    def extend_by(self, extended_profile):
507
        self.policies.update(extended_profile.policies)
508
        super(ProfileWithSeparatePolicies, self).extend_by(extended_profile)
509
510
511 2
class ProfileWithInlinePolicies(ResolvableProfile):
512 2
    def __init__(self, * args, ** kwargs):
513
        super(ProfileWithInlinePolicies, self).__init__(* args, ** kwargs)
514
        self.controls_by_policy = defaultdict(list)
515
516 2
    def apply_selection(self, item):
517
        # ":" is the delimiter for controls but not when the item is a variable
518
        if ":" in item and "=" not in item:
519
            policy_id, control_id = item.split(":", 1)
520
            self.controls_by_policy[policy_id].append(control_id)
521
        else:
522
            super(ProfileWithInlinePolicies, self).apply_selection(item)
523
524 2 View Code Duplication
    def _process_controls_ids_into_controls(self, controls_manager, policy_id, controls_ids):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
525
        controls = []
526
        for cid in controls_ids:
527
            if not cid.startswith("all"):
528
                controls.extend(
529
                    self._controls_ids_to_controls(controls_manager, policy_id, [cid]))
530
            elif ":" in cid:
531
                _, level_id = cid.split(":", 1)
532
                controls.extend(
533
                    controls_manager.get_all_controls_of_level(policy_id, level_id))
534
            else:
535
                controls.extend(
536
                    controls_manager.get_all_controls(policy_id))
537
        return controls
538
539 2
    def resolve_controls(self, controls_manager):
540
        for policy_id, controls_ids in self.controls_by_policy.items():
541
            controls = self._process_controls_ids_into_controls(
542
                controls_manager, policy_id, controls_ids)
543
544
            for c in controls:
545
                self._merge_control(c)
546
547
548 2
class Value(object):
549
    """Represents XCCDF Value
550
    """
551
552 2
    def __init__(self, id_):
553 2
        self.id_ = id_
554 2
        self.title = ""
555 2
        self.description = ""
556 2
        self.type_ = "string"
557 2
        self.operator = "equals"
558 2
        self.interactive = False
559 2
        self.options = {}
560 2
        self.warnings = []
561
562 2
    @staticmethod
563 2
    def from_yaml(yaml_file, env_yaml=None):
564 2
        yaml_contents = open_and_macro_expand(yaml_file, env_yaml)
565 2
        if yaml_contents is None:
566
            return None
567
568 2
        value_id, _ = os.path.splitext(os.path.basename(yaml_file))
569 2
        value = Value(value_id)
570 2
        value.title = required_key(yaml_contents, "title")
571 2
        del yaml_contents["title"]
572 2
        value.description = required_key(yaml_contents, "description")
573 2
        del yaml_contents["description"]
574 2
        value.type_ = required_key(yaml_contents, "type")
575 2
        del yaml_contents["type"]
576 2
        value.operator = yaml_contents.pop("operator", "equals")
577 2
        possible_operators = ["equals", "not equal", "greater than",
578
                              "less than", "greater than or equal",
579
                              "less than or equal", "pattern match"]
580
581 2
        if value.operator not in possible_operators:
582
            raise ValueError(
583
                "Found an invalid operator value '%s' in '%s'. "
584
                "Expected one of: %s"
585
                % (value.operator, yaml_file, ", ".join(possible_operators))
586
            )
587
588 2
        value.interactive = \
589
            yaml_contents.pop("interactive", "false").lower() == "true"
590
591 2
        value.options = required_key(yaml_contents, "options")
592 2
        del yaml_contents["options"]
593 2
        value.warnings = yaml_contents.pop("warnings", [])
594
595 2
        for warning_list in value.warnings:
596
            if len(warning_list) != 1:
597
                raise ValueError("Only one key/value pair should exist for each dictionary")
598
599 2
        if yaml_contents:
600
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
601
                               % (yaml_file, yaml_contents))
602
603 2
        return value
604
605 2
    def to_xml_element(self):
606
        value = ET.Element('Value')
607
        value.set('id', self.id_)
608
        value.set('type', self.type_)
609
        if self.operator != "equals":  # equals is the default
610
            value.set('operator', self.operator)
611
        if self.interactive:  # False is the default
612
            value.set('interactive', 'true')
613
        title = ET.SubElement(value, 'title')
614
        title.text = self.title
615
        add_sub_element(value, 'description', self.description)
616
        add_warning_elements(value, self.warnings)
617
618
        for selector, option in self.options.items():
619
            # do not confuse Value with big V with value with small v
620
            # value is child element of Value
621
            value_small = ET.SubElement(value, 'value')
622
            # by XCCDF spec, default value is value without selector
623
            if selector != "default":
624
                value_small.set('selector', str(selector))
625
            value_small.text = str(option)
626
627
        return value
628
629 2
    def to_file(self, file_name):
630
        root = self.to_xml_element()
631
        tree = ET.ElementTree(root)
632
        tree.write(file_name)
633
634
635 2
class Benchmark(object):
636
    """Represents XCCDF Benchmark
637
    """
638 2
    def __init__(self, id_):
639
        self.id_ = id_
640
        self.title = ""
641
        self.status = ""
642
        self.description = ""
643
        self.notice_id = ""
644
        self.notice_description = ""
645
        self.front_matter = ""
646
        self.rear_matter = ""
647
        self.cpes = []
648
        self.version = "0.1"
649
        self.profiles = []
650
        self.values = {}
651
        self.groups = {}
652
        self.rules = {}
653
        self.product_cpe_names = []
654
655
        # This is required for OCIL clauses
656
        conditional_clause = Value("conditional_clause")
657
        conditional_clause.title = "A conditional clause for check statements."
658
        conditional_clause.description = conditional_clause.title
659
        conditional_clause.type_ = "string"
660
        conditional_clause.options = {"": "This is a placeholder"}
661
662
        self.add_value(conditional_clause)
663
664 2
    @classmethod
665 2
    def from_yaml(cls, yaml_file, id_, env_yaml=None):
666
        yaml_contents = open_and_macro_expand(yaml_file, env_yaml)
667
        if yaml_contents is None:
668
            return None
669
670
        benchmark = cls(id_)
671
        benchmark.title = required_key(yaml_contents, "title")
672
        del yaml_contents["title"]
673
        benchmark.status = required_key(yaml_contents, "status")
674
        del yaml_contents["status"]
675
        benchmark.description = required_key(yaml_contents, "description")
676
        del yaml_contents["description"]
677
        notice_contents = required_key(yaml_contents, "notice")
678
        benchmark.notice_id = required_key(notice_contents, "id")
679
        del notice_contents["id"]
680
        benchmark.notice_description = required_key(notice_contents,
681
                                                    "description")
682
        del notice_contents["description"]
683
        if not notice_contents:
684
            del yaml_contents["notice"]
685
686
        benchmark.front_matter = required_key(yaml_contents,
687
                                              "front-matter")
688
        del yaml_contents["front-matter"]
689
        benchmark.rear_matter = required_key(yaml_contents,
690
                                             "rear-matter")
691
        del yaml_contents["rear-matter"]
692
        benchmark.version = str(required_key(yaml_contents, "version"))
693
        del yaml_contents["version"]
694
695
        if env_yaml:
696
            benchmark.product_cpe_names = env_yaml["product_cpes"].get_product_cpe_names()
697
698
        if yaml_contents:
699
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
700
                               % (yaml_file, yaml_contents))
701
702
        return benchmark
703
704 2
    def add_profiles_from_dir(self, dir_, env_yaml):
705
        for dir_item in sorted(os.listdir(dir_)):
706
            dir_item_path = os.path.join(dir_, dir_item)
707
            if not os.path.isfile(dir_item_path):
708
                continue
709
710
            _, ext = os.path.splitext(os.path.basename(dir_item_path))
711
            if ext != '.profile':
712
                sys.stderr.write(
713
                    "Encountered file '%s' while looking for profiles, "
714
                    "extension '%s' is unknown. Skipping..\n"
715
                    % (dir_item, ext)
716
                )
717
                continue
718
719
            try:
720
                new_profile = ProfileWithInlinePolicies.from_yaml(dir_item_path, env_yaml)
721
            except DocumentationNotComplete:
722
                continue
723
            except Exception as exc:
724
                msg = ("Error building profile from '{fname}': '{error}'"
725
                       .format(fname=dir_item_path, error=str(exc)))
726
                raise RuntimeError(msg)
727
            if new_profile is None:
728
                continue
729
730
            self.profiles.append(new_profile)
731
732 2
    def to_xml_element(self):
733
        root = ET.Element('Benchmark')
734
        root.set('xmlns:xsi', 'http://www.w3.org/2001/XMLSchema-instance')
735
        root.set('xmlns:xhtml', 'http://www.w3.org/1999/xhtml')
736
        root.set('xmlns:dc', 'http://purl.org/dc/elements/1.1/')
737
        root.set('id', 'product-name')
738
        root.set('xsi:schemaLocation',
739
                 'http://checklists.nist.gov/xccdf/1.1 xccdf-1.1.4.xsd')
740
        root.set('style', 'SCAP_1.1')
741
        root.set('resolved', 'false')
742
        root.set('xml:lang', 'en-US')
743
        status = ET.SubElement(root, 'status')
744
        status.set('date', datetime.date.today().strftime("%Y-%m-%d"))
745
        status.text = self.status
746
        add_sub_element(root, "title", self.title)
747
        add_sub_element(root, "description", self.description)
748
        notice = add_sub_element(root, "notice", self.notice_description)
749
        notice.set('id', self.notice_id)
750
        add_sub_element(root, "front-matter", self.front_matter)
751
        add_sub_element(root, "rear-matter", self.rear_matter)
752
753
        # The Benchmark applicability is determined by the CPEs
754
        # defined in the product.yml
755
        for cpe_name in self.product_cpe_names:
756
            plat = ET.SubElement(root, "platform")
757
            plat.set("idref", cpe_name)
758
759
        version = ET.SubElement(root, 'version')
760
        version.text = self.version
761
        ET.SubElement(root, "metadata")
762
763
        for profile in self.profiles:
764
            root.append(profile.to_xml_element())
765
766
        for value in self.values.values():
767
            root.append(value.to_xml_element())
768
769
        groups_in_bench = list(self.groups.keys())
770
        priority_order = ["system", "services"]
771
        groups_in_bench = reorder_according_to_ordering(groups_in_bench, priority_order)
772
773
        # Make system group the first, followed by services group
774
        for group_id in groups_in_bench:
775
            group = self.groups.get(group_id)
776
            # Products using application benchmark don't have system or services group
777
            if group is not None:
778
                root.append(group.to_xml_element())
779
780
        for rule in self.rules.values():
781
            root.append(rule.to_xml_element())
782
783
        return root
784
785 2
    def to_file(self, file_name, ):
786
        root = self.to_xml_element()
787
        tree = ET.ElementTree(root)
788
        tree.write(file_name)
789
790 2
    def add_value(self, value):
791
        if value is None:
792
            return
793
        self.values[value.id_] = value
794
795
    # The benchmark is also considered a group, so this function signature needs to match
796
    # Group()'s add_group()
797 2
    def add_group(self, group, env_yaml=None):
798
        if group is None:
799
            return
800
        self.groups[group.id_] = group
801
802 2
    def add_rule(self, rule):
803
        if rule is None:
804
            return
805
        self.rules[rule.id_] = rule
806
807 2
    def to_xccdf(self):
808
        """We can easily extend this script to generate a valid XCCDF instead
809
        of SSG SHORTHAND.
810
        """
811
        raise NotImplementedError
812
813 2
    def __str__(self):
814
        return self.id_
815
816
817 2
class Group(object):
818
    """Represents XCCDF Group
819
    """
820 2
    ATTRIBUTES_TO_PASS_ON = (
821
        "platforms",
822
    )
823
824 2
    def __init__(self, id_):
825
        self.id_ = id_
826
        self.prodtype = "all"
827
        self.title = ""
828
        self.description = ""
829
        self.warnings = []
830
        self.requires = []
831
        self.conflicts = []
832
        self.values = {}
833
        self.groups = {}
834
        self.rules = {}
835
        # self.platforms is used further in the build system
836
        # self.platform is merged into self.platforms
837
        # it is here for backward compatibility
838
        self.platforms = set()
839
        self.cpe_names = set()
840
        self.platform = None
841
842 2
    @classmethod
843 2
    def from_yaml(cls, yaml_file, env_yaml=None):
844
        yaml_contents = open_and_macro_expand(yaml_file, env_yaml)
845
        if yaml_contents is None:
846
            return None
847
848
        group_id = os.path.basename(os.path.dirname(yaml_file))
849
        group = cls(group_id)
850
        group.prodtype = yaml_contents.pop("prodtype", "all")
851
        group.title = required_key(yaml_contents, "title")
852
        del yaml_contents["title"]
853
        group.description = required_key(yaml_contents, "description")
854
        del yaml_contents["description"]
855
        group.warnings = yaml_contents.pop("warnings", [])
856
        group.conflicts = yaml_contents.pop("conflicts", [])
857
        group.requires = yaml_contents.pop("requires", [])
858
        group.platform = yaml_contents.pop("platform", None)
859
        group.platforms = yaml_contents.pop("platforms", set())
860
        # ensure that content of group.platform is in group.platforms as
861
        # well
862
        if group.platform is not None:
863
            group.platforms.add(group.platform)
864
865
        if env_yaml:
866
            for platform in group.platforms:
867
                try:
868
                    group.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform))
869
                except CPEDoesNotExist:
870
                    print("Unsupported platform '%s' in group '%s'." % (platform, group.id_))
871
                    raise
872
873
        for warning_list in group.warnings:
874
            if len(warning_list) != 1:
875
                raise ValueError("Only one key/value pair should exist for each dictionary")
876
877
        if yaml_contents:
878
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
879
                               % (yaml_file, yaml_contents))
880
        group.validate_prodtype(yaml_file)
881
        return group
882
883 2
    def validate_prodtype(self, yaml_file):
884
        for ptype in self.prodtype.split(","):
885
            if ptype.strip() != ptype:
886
                msg = (
887
                    "Comma-separated '{prodtype}' prodtype "
888
                    "in {yaml_file} contains whitespace."
889
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
890
                raise ValueError(msg)
891
892 2
    def to_xml_element(self):
893
        group = ET.Element('Group')
894
        group.set('id', self.id_)
895
        if self.prodtype != "all":
896
            group.set("prodtype", self.prodtype)
897
        title = ET.SubElement(group, 'title')
898
        title.text = self.title
899
        add_sub_element(group, 'description', self.description)
900
        add_warning_elements(group, self.warnings)
901
        add_nondata_subelements(group, "requires", "id", self.requires)
902
        add_nondata_subelements(group, "conflicts", "id", self.conflicts)
903
904
        for cpe_name in self.cpe_names:
905
            platform_el = ET.SubElement(group, "platform")
906
            platform_el.set("idref", cpe_name)
907
908
        for _value in self.values.values():
909
            group.append(_value.to_xml_element())
910
911
        # Rules that install or remove packages affect remediation
912
        # of other rules.
913
        # When packages installed/removed rules come first:
914
        # The Rules are ordered in more logical way, and
915
        # remediation order is natural, first the package is installed, then configured.
916
        rules_in_group = list(self.rules.keys())
917
        regex = (r'(package_.*_(installed|removed))|' +
918
                 r'(service_.*_(enabled|disabled))|' +
919
                 r'install_smartcard_packages$')
920
        priority_order = ["installed", "install_smartcard_packages", "removed",
921
                          "enabled", "disabled"]
922
        rules_in_group = reorder_according_to_ordering(rules_in_group, priority_order, regex)
923
924
        # Add rules in priority order, first all packages installed, then removed,
925
        # followed by services enabled, then disabled
926
        for rule_id in rules_in_group:
927
            group.append(self.rules.get(rule_id).to_xml_element())
928
929
        # Add the sub groups after any current level group rules.
930
        # As package installed/removed and service enabled/disabled rules are usuallly in
931
        # top level group, this ensures groups that further configure a package or service
932
        # are after rules that install or remove it.
933
        groups_in_group = list(self.groups.keys())
934
        priority_order = [
935
            # Make sure rpm_verify_(hashes|permissions|ownership) are run before any other rule.
936
            # Due to conflicts between rules rpm_verify_* rules and any rule that configures
937
            # stricter settings, like file_permissions_grub2_cfg and sudo_dedicated_group,
938
            # the rules deviating from the system default should be evaluated later.
939
            # So that in the end the system has contents, permissions and ownership reset, and
940
            # any deviations or stricter settings are applied by the rules in the profile.
941
            "software", "integrity", "integrity-software", "rpm_verification",
942
943
            # The account group has to precede audit group because
944
            # the rule package_screen_installed is desired to be executed before the rule
945
            # audit_rules_privileged_commands, othervise the rule
946
            # does not catch newly installed screen binary during remediation
947
            # and report fail
948
            "accounts", "auditing",
949
950
951
            # The FIPS group should come before Crypto,
952
            # if we want to set a different (stricter) Crypto Policy than FIPS.
953
            "fips", "crypto",
954
955
            # The firewalld_activation must come before ruleset_modifications, othervise
956
            # remediations for ruleset_modifications won't work
957
            "firewalld_activation", "ruleset_modifications",
958
959
            # Rules from group disabling_ipv6 must precede rules from configuring_ipv6,
960
            # otherwise the remediation prints error although it is successful
961
            "disabling_ipv6", "configuring_ipv6"
962
        ]
963
        groups_in_group = reorder_according_to_ordering(groups_in_group, priority_order)
964
        for group_id in groups_in_group:
965
            _group = self.groups[group_id]
966
            group.append(_group.to_xml_element())
967
968
        return group
969
970 2
    def to_file(self, file_name):
971
        root = self.to_xml_element()
972
        tree = ET.ElementTree(root)
973
        tree.write(file_name)
974
975 2
    def add_value(self, value):
976
        if value is None:
977
            return
978
        self.values[value.id_] = value
979
980 2
    def add_group(self, group, env_yaml=None):
981
        if group is None:
982
            return
983
        if self.platforms and not group.platforms:
984
            group.platforms = self.platforms
985
        self.groups[group.id_] = group
986
        self._pass_our_properties_on_to(group)
987
988
        # Once the group has inherited properties, update cpe_names
989
        if env_yaml:
990
            for platform in group.platforms:
991
                try:
992
                    group.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform))
993
                except CPEDoesNotExist:
994
                    print("Unsupported platform '%s' in group '%s'." % (platform, group.id_))
995
                    raise
996
997
998 2
    def _pass_our_properties_on_to(self, obj):
999
        for attr in self.ATTRIBUTES_TO_PASS_ON:
1000
            if hasattr(obj, attr) and getattr(obj, attr) is None:
1001
                setattr(obj, attr, getattr(self, attr))
1002
1003 2
    def add_rule(self, rule, env_yaml=None):
1004
        if rule is None:
1005
            return
1006
        if self.platforms and not rule.platforms:
1007
            rule.platforms = self.platforms
1008
        self.rules[rule.id_] = rule
1009
        self._pass_our_properties_on_to(rule)
1010
1011
        # Once the rule has inherited properties, update cpe_names
1012
        if env_yaml:
1013
            for platform in rule.platforms:
1014
                try:
1015
                    rule.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform))
1016
                except CPEDoesNotExist:
1017
                    print("Unsupported platform '%s' in rule '%s'." % (platform, rule.id_))
1018
                    raise
1019
1020 2
    def __str__(self):
1021
        return self.id_
1022
1023
1024 2
class Rule(object):
1025
    """Represents XCCDF Rule
1026
    """
1027 2
    YAML_KEYS_DEFAULTS = {
1028
        "prodtype": lambda: "all",
1029
        "title": lambda: RuntimeError("Missing key 'title'"),
1030
        "description": lambda: RuntimeError("Missing key 'description'"),
1031
        "rationale": lambda: RuntimeError("Missing key 'rationale'"),
1032
        "severity": lambda: RuntimeError("Missing key 'severity'"),
1033
        "references": lambda: dict(),
1034
        "identifiers": lambda: dict(),
1035
        "ocil_clause": lambda: None,
1036
        "ocil": lambda: None,
1037
        "oval_external_content": lambda: None,
1038
        "warnings": lambda: list(),
1039
        "conflicts": lambda: list(),
1040
        "requires": lambda: list(),
1041
        "platform": lambda: None,
1042
        "platforms": lambda: set(),
1043
        "inherited_platforms": lambda: list(),
1044
        "template": lambda: None,
1045
        "definition_location": lambda: None,
1046
    }
1047
1048 2
    PRODUCT_REFERENCES = ("stigid", "cis",)
1049 2
    GLOBAL_REFERENCES = ("srg", "vmmsrg", "disa", "cis-csc",)
1050
1051 2
    def __init__(self, id_):
1052 2
        self.id_ = id_
1053 2
        self.prodtype = "all"
1054 2
        self.title = ""
1055 2
        self.description = ""
1056 2
        self.definition_location = ""
1057 2
        self.rationale = ""
1058 2
        self.severity = "unknown"
1059 2
        self.references = {}
1060 2
        self.identifiers = {}
1061 2
        self.ocil_clause = None
1062 2
        self.ocil = None
1063 2
        self.oval_external_content = None
1064 2
        self.warnings = []
1065 2
        self.requires = []
1066 2
        self.conflicts = []
1067
        # self.platforms is used further in the build system
1068
        # self.platform is merged into self.platforms
1069
        # it is here for backward compatibility
1070 2
        self.platform = None
1071 2
        self.platforms = set()
1072 2
        self.cpe_names = set()
1073 2
        self.inherited_platforms = [] # platforms inherited from the group
1074 2
        self.sce_metadata = None
1075 2
        self.template = None
1076 2
        self.local_env_yaml = None
1077 2
        self.current_product = None
1078
1079 2
    @classmethod
1080 2
    def from_yaml(cls, yaml_file, env_yaml=None, sce_metadata=None):
1081 2
        yaml_file = os.path.normpath(yaml_file)
1082
1083 2
        rule_id, ext = os.path.splitext(os.path.basename(yaml_file))
1084 2
        if rule_id == "rule" and ext == ".yml":
1085 2
            rule_id = get_rule_dir_id(yaml_file)
1086
1087 2
        local_env_yaml = None
1088 2
        if env_yaml:
1089
            local_env_yaml = dict()
1090
            local_env_yaml.update(env_yaml)
1091
            local_env_yaml["rule_id"] = rule_id
1092
1093 2
        yaml_contents = open_and_macro_expand(yaml_file, local_env_yaml)
1094 2
        if yaml_contents is None:
1095
            return None
1096
1097 2
        rule = cls(rule_id)
1098
1099 2
        if local_env_yaml:
1100
            rule.local_env_yaml = local_env_yaml
1101
1102 2
        try:
1103 2
            rule._set_attributes_from_dict(yaml_contents)
1104
        except RuntimeError as exc:
1105
            msg = ("Error processing '{fname}': {err}"
1106
                   .format(fname=yaml_file, err=str(exc)))
1107
            raise RuntimeError(msg)
1108
1109
        # platforms are read as list from the yaml file
1110
        # we need them to convert to set again
1111 2
        rule.platforms = set(rule.platforms)
1112
1113 2
        for warning_list in rule.warnings:
1114
            if len(warning_list) != 1:
1115
                raise ValueError("Only one key/value pair should exist for each dictionary")
1116
1117
        # ensure that content of rule.platform is in rule.platforms as
1118
        # well
1119 2
        if rule.platform is not None:
1120 2
            rule.platforms.add(rule.platform)
1121
1122
        # Convert the platform names to CPE names
1123
        # But only do it if an env_yaml was specified (otherwise there would be no product CPEs
1124
        # to lookup), and the rule's prodtype matches the product being built
1125 2
        if env_yaml and env_yaml["product"] in parse_prodtype(rule.prodtype):
1126
            for platform in rule.platforms:
1127
                try:
1128
                    rule.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform))
1129
                except CPEDoesNotExist:
1130
                    print("Unsupported platform '%s' in rule '%s'." % (platform, rule.id_))
1131
                    raise
1132
1133 2
        if yaml_contents:
1134
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
1135
                               % (yaml_file, yaml_contents))
1136
1137 2
        if not rule.definition_location:
1138 2
            rule.definition_location = yaml_file
1139
1140 2
        if sce_metadata and rule_id in sce_metadata:
1141
            rule.sce_metadata = sce_metadata[rule_id]
1142
1143 2
        if env_yaml and 'product' in env_yaml:
1144
            rule.current_product = env_yaml['product']
1145
1146 2
        rule.validate_prodtype(yaml_file)
1147 2
        rule.validate_identifiers(yaml_file)
1148 2
        rule.validate_references(yaml_file)
1149 2
        return rule
1150
1151 2
    def _verify_stigid_format(self, product):
1152 2
        stig_id = self.references.get("stigid", None)
1153 2
        if not stig_id:
1154 2
            return
1155 2
        if "," in stig_id:
1156 2
            raise ValueError("Rules can not have multiple STIG IDs.")
1157
1158 2
    def _verify_disa_cci_format(self):
1159 2
        cci_id = self.references.get("disa", None)
1160 2
        if not cci_id:
1161 2
            return
1162
        cci_ex = re.compile(r'^CCI-[0-9]{6}$')
1163
        for cci in cci_id.split(","):
1164
            if not cci_ex.match(cci):
1165
                raise ValueError("CCI '{}' is in the wrong format! "
1166
                                 "Format should be similar to: "
1167
                                 "CCI-XXXXXX".format(cci))
1168
        self.references["disa"] = cci_id
1169
1170 2
    def normalize(self, product):
1171 2
        try:
1172 2
            self.make_refs_and_identifiers_product_specific(product)
1173 2
            self.make_template_product_specific(product)
1174 2
        except Exception as exc:
1175 2
            msg = (
1176
                "Error normalizing '{rule}': {msg}"
1177
                .format(rule=self.id_, msg=str(exc))
1178
            )
1179 2
            raise RuntimeError(msg)
1180
1181 2
    def _get_product_only_references(self):
1182 2
        product_references = dict()
1183
1184 2
        for ref in Rule.PRODUCT_REFERENCES:
1185 2
            start = "{0}@".format(ref)
1186 2
            for gref, gval in self.references.items():
1187 2
                if ref == gref or gref.startswith(start):
1188 2
                    product_references[gref] = gval
1189 2
        return product_references
1190
1191 2
    def make_template_product_specific(self, product):
1192 2
        product_suffix = "@{0}".format(product)
1193
1194 2
        if not self.template:
1195
            return
1196
1197 2
        not_specific_vars = self.template.get("vars", dict())
1198 2
        specific_vars = self._make_items_product_specific(
1199
            not_specific_vars, product_suffix, True)
1200 2
        self.template["vars"] = specific_vars
1201
1202 2
        not_specific_backends = self.template.get("backends", dict())
1203 2
        specific_backends = self._make_items_product_specific(
1204
            not_specific_backends, product_suffix, True)
1205 2
        self.template["backends"] = specific_backends
1206
1207 2
    def make_refs_and_identifiers_product_specific(self, product):
1208 2
        product_suffix = "@{0}".format(product)
1209
1210 2
        product_references = self._get_product_only_references()
1211 2
        general_references = self.references.copy()
1212 2
        for todel in product_references:
1213 2
            general_references.pop(todel)
1214 2
        for ref in Rule.PRODUCT_REFERENCES:
1215 2
            if ref in general_references:
1216
                msg = "Unexpected reference identifier ({0}) without "
1217
                msg += "product qualifier ({0}@{1}) while building rule "
1218
                msg += "{2}"
1219
                msg = msg.format(ref, product, self.id_)
1220
                raise ValueError(msg)
1221
1222 2
        to_set = dict(
1223
            identifiers=(self.identifiers, False),
1224
            general_references=(general_references, True),
1225
            product_references=(product_references, False),
1226
        )
1227 2
        for name, (dic, allow_overwrites) in to_set.items():
1228 2
            try:
1229 2
                new_items = self._make_items_product_specific(
1230
                    dic, product_suffix, allow_overwrites)
1231 2
            except ValueError as exc:
1232 2
                msg = (
1233
                    "Error processing {what} for rule '{rid}': {msg}"
1234
                    .format(what=name, rid=self.id_, msg=str(exc))
1235
                )
1236 2
                raise ValueError(msg)
1237 2
            dic.clear()
1238 2
            dic.update(new_items)
1239
1240 2
        self.references = general_references
1241 2
        self._verify_disa_cci_format()
1242 2
        self.references.update(product_references)
1243
1244 2
        self._verify_stigid_format(product)
1245
1246 2
    def _make_items_product_specific(self, items_dict, product_suffix, allow_overwrites=False):
1247 2
        new_items = dict()
1248 2
        for full_label, value in items_dict.items():
1249 2
            if "@" not in full_label and full_label not in new_items:
1250 2
                new_items[full_label] = value
1251 2
                continue
1252
1253 2
            label = full_label.split("@")[0]
1254
1255
            # this test should occur before matching product_suffix with the product qualifier
1256
            # present in the reference, so it catches problems even for products that are not
1257
            # being built at the moment
1258 2
            if label in Rule.GLOBAL_REFERENCES:
1259
                msg = (
1260
                    "You cannot use product-qualified for the '{item_u}' reference. "
1261
                    "Please remove the product-qualifier and merge values with the "
1262
                    "existing reference if there is any. Original line: {item_q}: {value_q}"
1263
                    .format(item_u=label, item_q=full_label, value_q=value)
1264
                )
1265
                raise ValueError(msg)
1266
1267 2
            if not full_label.endswith(product_suffix):
1268 2
                continue
1269
1270 2
            if label in items_dict and not allow_overwrites and value != items_dict[label]:
1271 2
                msg = (
1272
                    "There is a product-qualified '{item_q}' item, "
1273
                    "but also an unqualified '{item_u}' item "
1274
                    "and those two differ in value - "
1275
                    "'{value_q}' vs '{value_u}' respectively."
1276
                    .format(item_q=full_label, item_u=label,
1277
                            value_q=value, value_u=items_dict[label])
1278
                )
1279 2
                raise ValueError(msg)
1280 2
            new_items[label] = value
1281 2
        return new_items
1282
1283 2
    def _set_attributes_from_dict(self, yaml_contents):
1284 2
        for key, default_getter in self.YAML_KEYS_DEFAULTS.items():
1285 2
            if key not in yaml_contents:
1286 2
                value = default_getter()
1287 2
                if isinstance(value, Exception):
1288
                    raise value
1289
            else:
1290 2
                value = yaml_contents.pop(key)
1291
1292 2
            setattr(self, key, value)
1293
1294 2
    def to_contents_dict(self):
1295
        """
1296
        Returns a dictionary that is the same schema as the dict obtained when loading rule YAML.
1297
        """
1298
1299 2
        yaml_contents = dict()
1300 2
        for key in Rule.YAML_KEYS_DEFAULTS:
1301 2
            yaml_contents[key] = getattr(self, key)
1302
1303 2
        return yaml_contents
1304
1305 2
    def validate_identifiers(self, yaml_file):
1306 2
        if self.identifiers is None:
1307
            raise ValueError("Empty identifier section in file %s" % yaml_file)
1308
1309
        # Validate all identifiers are non-empty:
1310 2
        for ident_type, ident_val in self.identifiers.items():
1311 2
            if not isinstance(ident_type, str) or not isinstance(ident_val, str):
1312
                raise ValueError("Identifiers and values must be strings: %s in file %s"
1313
                                 % (ident_type, yaml_file))
1314 2
            if ident_val.strip() == "":
1315
                raise ValueError("Identifiers must not be empty: %s in file %s"
1316
                                 % (ident_type, yaml_file))
1317 2
            if ident_type[0:3] == 'cce':
1318 2
                if not is_cce_format_valid(ident_val):
1319
                    raise ValueError("CCE Identifier format must be valid: invalid format '%s' for CEE '%s'"
1320
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
1321 2
                if not is_cce_value_valid("CCE-" + ident_val):
1322
                    raise ValueError("CCE Identifier value is not a valid checksum: invalid value '%s' for CEE '%s'"
1323
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
1324
1325 2
    def validate_references(self, yaml_file):
1326 2
        if self.references is None:
1327
            raise ValueError("Empty references section in file %s" % yaml_file)
1328
1329 2
        for ref_type, ref_val in self.references.items():
1330 2
            if not isinstance(ref_type, str) or not isinstance(ref_val, str):
1331
                raise ValueError("References and values must be strings: %s in file %s"
1332
                                 % (ref_type, yaml_file))
1333 2
            if ref_val.strip() == "":
1334
                raise ValueError("References must not be empty: %s in file %s"
1335
                                 % (ref_type, yaml_file))
1336
1337 2
        for ref_type, ref_val in self.references.items():
1338 2
            for ref in ref_val.split(","):
1339 2
                if ref.strip() != ref:
1340
                    msg = (
1341
                        "Comma-separated '{ref_type}' reference "
1342
                        "in {yaml_file} contains whitespace."
1343
                        .format(ref_type=ref_type, yaml_file=yaml_file))
1344
                    raise ValueError(msg)
1345
1346 2
    def validate_prodtype(self, yaml_file):
1347 2
        for ptype in self.prodtype.split(","):
1348 2
            if ptype.strip() != ptype:
1349
                msg = (
1350
                    "Comma-separated '{prodtype}' prodtype "
1351
                    "in {yaml_file} contains whitespace."
1352
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
1353
                raise ValueError(msg)
1354
1355 2
    def to_xml_element(self):
1356
        rule = ET.Element('Rule')
1357
        rule.set('id', self.id_)
1358
        if self.prodtype != "all":
1359
            rule.set("prodtype", self.prodtype)
1360
        rule.set('severity', self.severity)
1361
        add_sub_element(rule, 'title', self.title)
1362
        add_sub_element(rule, 'description', self.description)
1363
        add_sub_element(rule, 'rationale', self.rationale)
1364
1365
        main_ident = ET.Element('ident')
1366
        for ident_type, ident_val in self.identifiers.items():
1367
            # This is not true if items were normalized
1368
            if '@' in ident_type:
1369
                # the ident is applicable only on some product
1370
                # format : 'policy@product', eg. 'stigid@product'
1371
                # for them, we create a separate <ref> element
1372
                policy, product = ident_type.split('@')
1373
                ident = ET.SubElement(rule, 'ident')
1374
                ident.set(policy, ident_val)
1375
                ident.set('prodtype', product)
1376
            else:
1377
                main_ident.set(ident_type, ident_val)
1378
1379
        if main_ident.attrib:
1380
            rule.append(main_ident)
1381
1382
        main_ref = ET.Element('ref')
1383
        for ref_type, ref_val in self.references.items():
1384
            # This is not true if items were normalized
1385
            if '@' in ref_type:
1386
                # the reference is applicable only on some product
1387
                # format : 'policy@product', eg. 'stigid@product'
1388
                # for them, we create a separate <ref> element
1389
                policy, product = ref_type.split('@')
1390
                ref = ET.SubElement(rule, 'ref')
1391
                ref.set(policy, ref_val)
1392
                ref.set('prodtype', product)
1393
            else:
1394
                main_ref.set(ref_type, ref_val)
1395
1396
        if main_ref.attrib:
1397
            rule.append(main_ref)
1398
1399
        ocil_parent = rule
1400
        check_parent = rule
1401
1402
        if self.sce_metadata:
1403
            # TODO: This is pretty much another hack, just like the previous OVAL
1404
            # one. However, we avoided the external SCE content as I'm not sure it
1405
            # is generally useful (unlike say, CVE checking with external OVAL)
1406
            #
1407
            # Additionally, we build the content (check subelement) here rather
1408
            # than in xslt due to the nature of our SCE metadata.
1409
            #
1410
            # Finally, before we begin, we might have an element with both SCE
1411
            # and OVAL. We have no way of knowing (right here) whether that is
1412
            # the case (due to a variety of issues, most notably, that linking
1413
            # hasn't yet occurred). So we must rely on the content author's
1414
            # good will, by annotating SCE content with a complex-check tag
1415
            # if necessary.
1416
1417
            if 'complex-check' in self.sce_metadata:
1418
                # Here we have an issue: XCCDF allows EITHER one or more check
1419
                # elements OR a single complex-check. While we have an explicit
1420
                # case handling the OVAL-and-SCE interaction, OCIL entries have
1421
                # (historically) been alongside OVAL content and been in an
1422
                # "OR" manner -- preferring OVAL to SCE. In order to accomplish
1423
                # this, we thus need to add _yet another parent_ when OCIL data
1424
                # is present, and add update ocil_parent accordingly.
1425
                if self.ocil or self.ocil_clause:
1426
                    ocil_parent = ET.SubElement(ocil_parent, "complex-check")
1427
                    ocil_parent.set('operator', 'OR')
1428
1429
                check_parent = ET.SubElement(ocil_parent, "complex-check")
1430
                check_parent.set('operator', self.sce_metadata['complex-check'])
1431
1432
            # Now, add the SCE check element to the tree.
1433
            check = ET.SubElement(check_parent, "check")
1434
            check.set("system", SCE_SYSTEM)
1435
1436
            if 'check-import' in self.sce_metadata:
1437
                if isinstance(self.sce_metadata['check-import'], str):
1438
                    self.sce_metadata['check-import'] = [self.sce_metadata['check-import']]
1439
                for entry in self.sce_metadata['check-import']:
1440
                    check_import = ET.SubElement(check, 'check-import')
1441
                    check_import.set('import-name', entry)
1442
                    check_import.text = None
1443
1444
            if 'check-export' in self.sce_metadata:
1445
                if isinstance(self.sce_metadata['check-export'], str):
1446
                    self.sce_metadata['check-export'] = [self.sce_metadata['check-export']]
1447
                for entry in self.sce_metadata['check-export']:
1448
                    value, export = entry.split('=')
1449
                    check_export = ET.SubElement(check, 'check-export')
1450
                    check_export.set('value-id', value)
1451
                    check_export.set('export-name', export)
1452
                    check_export.text = None
1453
1454
            check_ref = ET.SubElement(check, "check-content-ref")
1455
            href = self.current_product + "/checks/sce/" + self.sce_metadata['filename']
1456
            check_ref.set("href", href)
1457
1458
        if self.oval_external_content:
1459
            check = ET.SubElement(check_parent, 'check')
1460
            check.set("system", "http://oval.mitre.org/XMLSchema/oval-definitions-5")
1461
            external_content = ET.SubElement(check, "check-content-ref")
1462
            external_content.set("href", self.oval_external_content)
1463
        else:
1464
            # TODO: This is pretty much a hack, oval ID will be the same as rule ID
1465
            #       and we don't want the developers to have to keep them in sync.
1466
            #       Therefore let's just add an OVAL ref of that ID.
1467
            oval_ref = ET.SubElement(check_parent, "oval")
1468
            oval_ref.set("id", self.id_)
1469
1470
        if self.ocil or self.ocil_clause:
1471
            ocil = add_sub_element(ocil_parent, 'ocil', self.ocil if self.ocil else "")
1472
            if self.ocil_clause:
1473
                ocil.set("clause", self.ocil_clause)
1474
1475
        add_warning_elements(rule, self.warnings)
1476
        add_nondata_subelements(rule, "requires", "id", self.requires)
1477
        add_nondata_subelements(rule, "conflicts", "id", self.conflicts)
1478
1479
        for cpe_name in self.cpe_names:
1480
            platform_el = ET.SubElement(rule, "platform")
1481
            platform_el.set("idref", cpe_name)
1482
1483
        return rule
1484
1485 2
    def to_file(self, file_name):
1486
        root = self.to_xml_element()
1487
        tree = ET.ElementTree(root)
1488
        tree.write(file_name)
1489
1490
1491 2
class DirectoryLoader(object):
1492 2
    def __init__(self, profiles_dir, env_yaml):
1493
        self.benchmark_file = None
1494
        self.group_file = None
1495
        self.loaded_group = None
1496
        self.rule_files = []
1497
        self.value_files = []
1498
        self.subdirectories = []
1499
1500
        self.all_values = set()
1501
        self.all_rules = set()
1502
        self.all_groups = set()
1503
1504
        self.profiles_dir = profiles_dir
1505
        self.env_yaml = env_yaml
1506
        self.product = env_yaml["product"]
1507
1508
        self.parent_group = None
1509
1510 2
    def _collect_items_to_load(self, guide_directory):
1511
        for dir_item in sorted(os.listdir(guide_directory)):
1512
            dir_item_path = os.path.join(guide_directory, dir_item)
1513
            _, extension = os.path.splitext(dir_item)
1514
1515
            if extension == '.var':
1516
                self.value_files.append(dir_item_path)
1517
            elif dir_item == "benchmark.yml":
1518
                if self.benchmark_file:
1519
                    raise ValueError("Multiple benchmarks in one directory")
1520
                self.benchmark_file = dir_item_path
1521
            elif dir_item == "group.yml":
1522
                if self.group_file:
1523
                    raise ValueError("Multiple groups in one directory")
1524
                self.group_file = dir_item_path
1525
            elif extension == '.rule':
1526
                self.rule_files.append(dir_item_path)
1527
            elif is_rule_dir(dir_item_path):
1528
                self.rule_files.append(get_rule_dir_yaml(dir_item_path))
1529
            elif dir_item != "tests":
1530
                if os.path.isdir(dir_item_path):
1531
                    self.subdirectories.append(dir_item_path)
1532
                else:
1533
                    sys.stderr.write(
1534
                        "Encountered file '%s' while recursing, extension '%s' "
1535
                        "is unknown. Skipping..\n"
1536
                        % (dir_item, extension)
1537
                    )
1538
1539 2
    def load_benchmark_or_group(self, guide_directory):
1540
        """
1541
        Loads a given benchmark or group from the specified benchmark_file or
1542
        group_file, in the context of guide_directory, profiles_dir and env_yaml.
1543
1544
        Returns the loaded group or benchmark.
1545
        """
1546
        group = None
1547
        if self.group_file and self.benchmark_file:
1548
            raise ValueError("A .benchmark file and a .group file were found in "
1549
                             "the same directory '%s'" % (guide_directory))
1550
1551
        # we treat benchmark as a special form of group in the following code
1552
        if self.benchmark_file:
1553
            group = Benchmark.from_yaml(
1554
                self.benchmark_file, 'product-name', self.env_yaml
1555
            )
1556
            if self.profiles_dir:
1557
                group.add_profiles_from_dir(self.profiles_dir, self.env_yaml)
1558
1559
        if self.group_file:
1560
            group = Group.from_yaml(self.group_file, self.env_yaml)
1561
            self.all_groups.add(group.id_)
1562
1563
        return group
1564
1565 2
    def _load_group_process_and_recurse(self, guide_directory):
1566
        self.loaded_group = self.load_benchmark_or_group(guide_directory)
1567
1568
        if self.loaded_group:
1569
            if self.parent_group:
1570
                self.parent_group.add_group(self.loaded_group, env_yaml=self.env_yaml)
1571
1572
            self._process_values()
1573
            self._recurse_into_subdirs()
1574
            self._process_rules()
1575
1576 2
    def process_directory_tree(self, start_dir, extra_group_dirs=None):
1577
        self._collect_items_to_load(start_dir)
1578
        if extra_group_dirs is not None:
1579
            self.subdirectories += extra_group_dirs
1580
        self._load_group_process_and_recurse(start_dir)
1581
1582 2
    def _recurse_into_subdirs(self):
1583
        for subdir in self.subdirectories:
1584
            loader = self._get_new_loader()
1585
            loader.parent_group = self.loaded_group
1586
            loader.process_directory_tree(subdir)
1587
            self.all_values.update(loader.all_values)
1588
            self.all_rules.update(loader.all_rules)
1589
            self.all_groups.update(loader.all_groups)
1590
1591 2
    def _get_new_loader(self):
1592
        raise NotImplementedError()
1593
1594 2
    def _process_values(self):
1595
        raise NotImplementedError()
1596
1597 2
    def _process_rules(self):
1598
        raise NotImplementedError()
1599
1600
1601 2
class BuildLoader(DirectoryLoader):
1602 2
    def __init__(self, profiles_dir, env_yaml,
1603
                 resolved_rules_dir=None, sce_metadata_path=None):
1604
        super(BuildLoader, self).__init__(profiles_dir, env_yaml)
1605
1606
        self.resolved_rules_dir = resolved_rules_dir
1607
        if resolved_rules_dir and not os.path.isdir(resolved_rules_dir):
1608
            os.mkdir(resolved_rules_dir)
1609
1610
        self.sce_metadata = None
1611
        if sce_metadata_path and os.path.getsize(sce_metadata_path):
1612
            self.sce_metadata = json.load(open(sce_metadata_path, 'r'))
1613
1614 2
    def _process_values(self):
1615
        for value_yaml in self.value_files:
1616
            value = Value.from_yaml(value_yaml, self.env_yaml)
1617
            self.all_values.add(value)
1618
            self.loaded_group.add_value(value)
1619
1620 2
    def _process_rules(self):
1621
        for rule_yaml in self.rule_files:
1622
            try:
1623
                rule = Rule.from_yaml(rule_yaml, self.env_yaml, self.sce_metadata)
1624
            except DocumentationNotComplete:
1625
                # Happens on non-debug build when a rule is "documentation-incomplete"
1626
                continue
1627
            prodtypes = parse_prodtype(rule.prodtype)
1628
            if "all" not in prodtypes and self.product not in prodtypes:
1629
                continue
1630
            self.all_rules.add(rule)
1631
            self.loaded_group.add_rule(rule, env_yaml=self.env_yaml)
1632
1633
            if self.loaded_group.platforms:
1634
                rule.inherited_platforms += self.loaded_group.platforms
1635
1636
            if self.resolved_rules_dir:
1637
                output_for_rule = os.path.join(
1638
                    self.resolved_rules_dir, "{id_}.yml".format(id_=rule.id_))
1639
                mkdir_p(self.resolved_rules_dir)
1640
                with open(output_for_rule, "w") as f:
1641
                    rule.normalize(self.env_yaml["product"])
1642
                    yaml.dump(rule.to_contents_dict(), f)
1643
1644 2
    def _get_new_loader(self):
1645
        loader = BuildLoader(
1646
            self.profiles_dir, self.env_yaml, self.resolved_rules_dir)
1647
        # Do it this way so we only have to parse the SCE metadata once.
1648
        loader.sce_metadata = self.sce_metadata
1649
        return loader
1650
1651 2
    def export_group_to_file(self, filename):
1652
        return self.loaded_group.to_file(filename)
1653