Passed
Push — master ( 13b0af...ea42c3 )
by Jan
13:56 queued 11:45
created

ssg.build_yaml.Rule._verify_stigid_format()   A

Complexity

Conditions 3

Size

Total Lines 6
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 6
nop 2
dl 0
loc 6
ccs 6
cts 6
cp 1
crap 3
rs 10
c 0
b 0
f 0
1 2
from __future__ import absolute_import
2 2
from __future__ import print_function
3
4 2
import os
5 2
import os.path
6 2
from collections import defaultdict
7 2
from copy import deepcopy
8 2
import datetime
9 2
import re
10 2
import sys
11 2
from xml.sax.saxutils import escape
12
13 2
import yaml
14
15 2
from .build_cpe import CPEDoesNotExist
16 2
from .constants import XCCDF_REFINABLE_PROPERTIES
17 2
from .rules import get_rule_dir_id, get_rule_dir_yaml, is_rule_dir
18 2
from .rule_yaml import parse_prodtype
19 2
from .controls import Control
20
21 2
from .checks import is_cce_format_valid, is_cce_value_valid
22 2
from .yaml import DocumentationNotComplete, open_and_expand, open_and_macro_expand
23 2
from .utils import required_key, mkdir_p
24
25 2
from .xml import ElementTree as ET
26 2
from .shims import unicode_func
27
28
29 2
def add_sub_element(parent, tag, data):
30
    """
31
    Creates a new child element under parent with tag tag, and sets
32
    data as the content under the tag. In particular, data is a string
33
    to be parsed as an XML tree, allowing sub-elements of children to be
34
    added.
35
36
    If data should not be parsed as an XML tree, either escape the contents
37
    before passing into this function, or use ElementTree.SubElement().
38
39
    Returns the newly created subelement of type tag.
40
    """
41
    # This is used because our YAML data contain XML and XHTML elements
42
    # ET.SubElement() escapes the < > characters by &lt; and &gt;
43
    # and therefore it does not add child elements
44
    # we need to do a hack instead
45
    # TODO: Remove this function after we move to Markdown everywhere in SSG
46
    ustr = unicode_func("<{0}>{1}</{0}>").format(tag, data)
47
48
    try:
49
        element = ET.fromstring(ustr.encode("utf-8"))
50
    except Exception:
51
        msg = ("Error adding subelement to an element '{0}' from string: '{1}'"
52
               .format(parent.tag, ustr))
53
        raise RuntimeError(msg)
54
55
    parent.append(element)
56
    return element
57
58
59 2
def reorder_according_to_ordering(unordered, ordering, regex=None):
60 2
    ordered = []
61 2
    if regex is None:
62 2
        regex = "|".join(["({0})".format(item) for item in ordering])
63 2
    regex = re.compile(regex)
64
65 2
    items_to_order = list(filter(regex.match, unordered))
66 2
    unordered = set(unordered)
67
68 2
    for priority_type in ordering:
69 2
        for item in items_to_order:
70 2
            if priority_type in item and item in unordered:
71 2
                ordered.append(item)
72 2
                unordered.remove(item)
73 2
    ordered.extend(list(unordered))
74 2
    return ordered
75
76
77 2
def add_warning_elements(element, warnings):
78
    # The use of [{dict}, {dict}] in warnings is to handle the following
79
    # scenario where multiple warnings have the same category which is
80
    # valid in SCAP and our content:
81
    #
82
    # warnings:
83
    #     - general: Some general warning
84
    #     - general: Some other general warning
85
    #     - general: |-
86
    #         Some really long multiline general warning
87
    #
88
    # Each of the {dict} should have only one key/value pair.
89
    for warning_dict in warnings:
90
        warning = add_sub_element(element, "warning", list(warning_dict.values())[0])
91
        warning.set("category", list(warning_dict.keys())[0])
92
93
94 2
def add_nondata_subelements(element, subelement, attribute, attr_data):
95
    """Add multiple iterations of a sublement that contains an attribute but no data
96
       For example, <requires id="my_required_id"/>"""
97
    for data in attr_data:
98
        req = ET.SubElement(element, subelement)
99
        req.set(attribute, data)
100
101
102 2
class Profile(object):
103
    """Represents XCCDF profile
104
    """
105
106 2
    def __init__(self, id_):
107 2
        self.id_ = id_
108 2
        self.title = ""
109 2
        self.description = ""
110 2
        self.extends = None
111 2
        self.selected = []
112 2
        self.unselected = []
113 2
        self.variables = dict()
114 2
        self.refine_rules = defaultdict(list)
115 2
        self.metadata = None
116 2
        self.reference = None
117 2
        self.platform = None
118
119 2
    def read_yaml_contents(self, yaml_contents):
120 2
        self.title = required_key(yaml_contents, "title")
121 2
        del yaml_contents["title"]
122 2
        self.description = required_key(yaml_contents, "description")
123 2
        del yaml_contents["description"]
124 2
        self.extends = yaml_contents.pop("extends", None)
125 2
        selection_entries = required_key(yaml_contents, "selections")
126 2
        if selection_entries:
127 2
            self._parse_selections(selection_entries)
128 2
        del yaml_contents["selections"]
129
130 2
    @classmethod
131 2
    def from_yaml(cls, yaml_file, env_yaml=None):
132 2
        yaml_contents = open_and_expand(yaml_file, env_yaml)
133 2
        if yaml_contents is None:
134
            return None
135
136 2
        basename, _ = os.path.splitext(os.path.basename(yaml_file))
137
138 2
        profile = cls(basename)
139 2
        profile.read_yaml_contents(yaml_contents)
140
141 2
        profile.reference = yaml_contents.pop("reference", None)
142 2
        profile.platform = yaml_contents.pop("platform", None)
143
144
        # At the moment, metadata is not used to build content
145 2
        if "metadata" in yaml_contents:
146
            del yaml_contents["metadata"]
147
148 2
        if yaml_contents:
149
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
150
                               % (yaml_file, yaml_contents))
151
152 2
        return profile
153
154 2
    def dump_yaml(self, file_name, documentation_complete=True):
155
        to_dump = {}
156
        to_dump["documentation_complete"] = documentation_complete
157
        to_dump["title"] = self.title
158
        to_dump["description"] = self.description
159
        to_dump["reference"] = self.reference
160
        if self.metadata is not None:
161
            to_dump["metadata"] = self.metadata
162
163
        if self.extends is not None:
164
            to_dump["extends"] = self.extends
165
166
        if self.platform is not None:
167
            to_dump["platform"] = self.platform
168
169
        selections = []
170
        for item in self.selected:
171
            selections.append(item)
172
        for item in self.unselected:
173
            selections.append("!"+item)
174
        for varname in self.variables.keys():
175
            selections.append(varname+"="+self.variables.get(varname))
176
        for rule, refinements in self.refine_rules.items():
177
            for prop, val in refinements:
178
                selections.append("{rule}.{property}={value}"
179
                                  .format(rule=rule, property=prop, value=val))
180
        to_dump["selections"] = selections
181
        with open(file_name, "w+") as f:
182
            yaml.dump(to_dump, f, indent=4)
183
184 2
    def _parse_selections(self, entries):
185 2
        for item in entries:
186 2
            self.apply_selection(item)
187
188 2
    def apply_selection(self, item):
189 2
        if "." in item:
190
            rule, refinement = item.split(".", 1)
191
            property_, value = refinement.split("=", 1)
192
            if property_ not in XCCDF_REFINABLE_PROPERTIES:
193
                msg = ("Property '{property_}' cannot be refined. "
194
                       "Rule properties that can be refined are {refinables}. "
195
                       "Fix refinement '{rule_id}.{property_}={value}' in profile '{profile}'."
196
                       .format(property_=property_, refinables=XCCDF_REFINABLE_PROPERTIES,
197
                               rule_id=rule, value=value, profile=self.id_)
198
                       )
199
                raise ValueError(msg)
200
            self.refine_rules[rule].append((property_, value))
201 2
        elif "=" in item:
202 2
            varname, value = item.split("=", 1)
203 2
            self.variables[varname] = value
204 2
        elif item.startswith("!"):
205
            self.unselected.append(item[1:])
206
        else:
207 2
            self.selected.append(item)
208
209 2
    def to_xml_element(self, product_cpes):
210
        element = ET.Element('Profile')
211
        element.set("id", self.id_)
212
        if self.extends:
213
            element.set("extends", self.extends)
214
        title = add_sub_element(element, "title", self.title)
215
        title.set("override", "true")
216
        desc = add_sub_element(element, "description", self.description)
217
        desc.set("override", "true")
218
219
        if self.reference:
220
            add_sub_element(element, "reference", escape(self.reference))
221
222
        if self.platform:
223
            try:
224
                platform_cpe = product_cpes.get_cpe_name(self.platform)
225
            except KeyError:
226
                raise ValueError("Unsupported platform '%s' in profile '%s'." % (self.platform, self.id_))
227
            plat = ET.SubElement(element, "platform")
228
            plat.set("idref", platform_cpe)
229
230
        for selection in self.selected:
231
            select = ET.Element("select")
232
            select.set("idref", selection)
233
            select.set("selected", "true")
234
            element.append(select)
235
236
        for selection in self.unselected:
237
            unselect = ET.Element("select")
238
            unselect.set("idref", selection)
239
            unselect.set("selected", "false")
240
            element.append(unselect)
241
242
        for value_id, selector in self.variables.items():
243
            refine_value = ET.Element("refine-value")
244
            refine_value.set("idref", value_id)
245
            refine_value.set("selector", selector)
246
            element.append(refine_value)
247
248
        for refined_rule, refinement_list in self.refine_rules.items():
249
            refine_rule = ET.Element("refine-rule")
250
            refine_rule.set("idref", refined_rule)
251
            for refinement in refinement_list:
252
                refine_rule.set(refinement[0], refinement[1])
253
            element.append(refine_rule)
254
255
        return element
256
257 2
    def get_rule_selectors(self):
258 2
        return list(self.selected + self.unselected)
259
260 2
    def get_variable_selectors(self):
261 2
        return self.variables
262
263 2
    def validate_refine_rules(self, rules):
264
        existing_rule_ids = [r.id_ for r in rules]
265
        for refine_rule, refinement_list in self.refine_rules.items():
266
            # Take first refinement to ilustrate where the error is
267
            # all refinements in list are invalid, so it doesn't really matter
268
            a_refinement = refinement_list[0]
269
270
            if refine_rule not in existing_rule_ids:
271
                msg = (
272
                    "You are trying to refine a rule that doesn't exist. "
273
                    "Rule '{rule_id}' was not found in the benchmark. "
274
                    "Please check all rule refinements for rule: '{rule_id}', for example: "
275
                    "- {rule_id}.{property_}={value}' in profile {profile_id}."
276
                    .format(rule_id=refine_rule, profile_id=self.id_,
277
                            property_=a_refinement[0], value=a_refinement[1])
278
                    )
279
                raise ValueError(msg)
280
281
            if refine_rule not in self.get_rule_selectors():
282
                msg = ("- {rule_id}.{property_}={value}' in profile '{profile_id}' is refining "
283
                       "a rule that is not selected by it. The refinement will not have any "
284
                       "noticeable effect. Either select the rule or remove the rule refinement."
285
                       .format(rule_id=refine_rule, property_=a_refinement[0],
286
                               value=a_refinement[1], profile_id=self.id_)
287
                       )
288
                raise ValueError(msg)
289
290 2
    def validate_variables(self, variables):
291
        variables_by_id = dict()
292
        for var in variables:
293
            variables_by_id[var.id_] = var
294
295
        for var_id, our_val in self.variables.items():
296
            if var_id not in variables_by_id:
297
                all_vars_list = [" - %s" % v for v in variables_by_id.keys()]
298
                msg = (
299
                    "Value '{var_id}' in profile '{profile_name}' is not known. "
300
                    "We know only variables:\n{var_names}"
301
                    .format(
302
                        var_id=var_id, profile_name=self.id_,
303
                        var_names="\n".join(sorted(all_vars_list)))
304
                )
305
                raise ValueError(msg)
306
307
            allowed_selectors = [str(s) for s in variables_by_id[var_id].options.keys()]
308
            if our_val not in allowed_selectors:
309
                msg = (
310
                    "Value '{var_id}' in profile '{profile_name}' "
311
                    "uses the selector '{our_val}'. "
312
                    "This is not possible, as only selectors {all_selectors} are available. "
313
                    "Either change the selector used in the profile, or "
314
                    "add the selector-value pair to the variable definition."
315
                    .format(
316
                        var_id=var_id, profile_name=self.id_, our_val=our_val,
317
                        all_selectors=allowed_selectors,
318
                    )
319
                )
320
                raise ValueError(msg)
321
322 2
    def validate_rules(self, rules, groups):
323
        existing_rule_ids = [r.id_ for r in rules]
324
        rule_selectors = self.get_rule_selectors()
325
        for id_ in rule_selectors:
326
            if id_ in groups:
327
                msg = (
328
                    "You have selected a group '{group_id}' instead of a "
329
                    "rule. Groups have no effect in the profile and are not "
330
                    "allowed to be selected. Please remove '{group_id}' "
331
                    "from profile '{profile_id}' before proceeding."
332
                    .format(group_id=id_, profile_id=self.id_)
333
                )
334
                raise ValueError(msg)
335
            if id_ not in existing_rule_ids:
336
                msg = (
337
                    "Rule '{rule_id}' was not found in the benchmark. Please "
338
                    "remove rule '{rule_id}' from profile '{profile_id}' "
339
                    "before proceeding."
340
                    .format(rule_id=id_, profile_id=self.id_)
341
                )
342
                raise ValueError(msg)
343
344 2
    def __sub__(self, other):
345
        profile = Profile(self.id_)
346
        profile.title = self.title
347
        profile.description = self.description
348
        profile.extends = self.extends
349
        profile.platform = self.platform
350
        profile.selected = list(set(self.selected) - set(other.selected))
351
        profile.selected.sort()
352
        profile.unselected = list(set(self.unselected) - set(other.unselected))
353
        profile.variables = dict ((k, v) for (k, v) in self.variables.items()
354
                             if k not in other.variables or v != other.variables[k])
355
        return profile
356
357
358 2
class ResolvableProfile(Profile):
359 2
    def __init__(self, * args, ** kwargs):
360 2
        super(ResolvableProfile, self).__init__(* args, ** kwargs)
361 2
        self.resolved = False
362 2
        self.resolved_selections = set()
363
364 2
    def _controls_ids_to_controls(self, controls_manager, policy_id, control_id_list):
365 2
        items = [controls_manager.get_control(policy_id, cid) for cid in control_id_list]
366 2
        return items
367
368 2
    def _merge_control(self, control):
369 2
        self.selected.extend(control.rules)
370 2
        for varname, value in control.variables.items():
371 2
            if varname not in self.variables:
372 2
                self.variables[varname] = value
373
374 2
    def resolve_controls(self, controls_manager):
375
        pass
376
377 2
    def extend_by(self, extended_profile):
378 2
        extended_selects = set(extended_profile.selected)
379 2
        self.resolved_selections.update(extended_selects)
380
381 2
        updated_variables = dict(extended_profile.variables)
382 2
        updated_variables.update(self.variables)
383 2
        self.variables = updated_variables
384
385 2
        extended_refinements = deepcopy(extended_profile.refine_rules)
386 2
        updated_refinements = self._subtract_refinements(extended_refinements)
387 2
        updated_refinements.update(self.refine_rules)
388 2
        self.refine_rules = updated_refinements
389
390 2
    def resolve(self, all_profiles, controls_manager=None):
391 2
        if self.resolved:
392
            return
393
394 2
        self.resolve_controls(controls_manager)
395
396 2
        self.resolved_selections = set(self.selected)
397
398 2
        if self.extends:
399 2
            if self.extends not in all_profiles:
400
                msg = (
401
                    "Profile {name} extends profile {extended}, but "
402
                    "only profiles {known_profiles} are available for resolution."
403
                    .format(name=self.id_, extended=self.extends,
404
                            known_profiles=list(all_profiles.keys())))
405
                raise RuntimeError(msg)
406 2
            extended_profile = all_profiles[self.extends]
407 2
            extended_profile.resolve(all_profiles, controls_manager)
408
409 2
            self.extend_by(extended_profile)
410
411 2
        for uns in self.unselected:
412
            self.resolved_selections.discard(uns)
413
414 2
        self.unselected = []
415 2
        self.extends = None
416
417 2
        self.selected = sorted(self.resolved_selections)
418
419 2
        self.resolved = True
420
421 2
    def _subtract_refinements(self, extended_refinements):
422
        """
423
        Given a dict of rule refinements from the extended profile,
424
        "undo" every refinement prefixed with '!' in this profile.
425
        """
426 2
        for rule, refinements in list(self.refine_rules.items()):
427
            if rule.startswith("!"):
428
                for prop, val in refinements:
429
                    extended_refinements[rule[1:]].remove((prop, val))
430
                del self.refine_rules[rule]
431 2
        return extended_refinements
432
433
434 2
class ProfileWithSeparatePolicies(ResolvableProfile):
435 2
    def __init__(self, * args, ** kwargs):
436 2
        super(ProfileWithSeparatePolicies, self).__init__(* args, ** kwargs)
437 2
        self.policies = {}
438
439 2
    def read_yaml_contents(self, yaml_contents):
440 2
        policies = yaml_contents.pop("policies", None)
441 2
        if policies:
442 2
            self._parse_policies(policies)
443 2
        super(ProfileWithSeparatePolicies, self).read_yaml_contents(yaml_contents)
444
445 2
    def _parse_policies(self, policies_yaml):
446 2
        for item in policies_yaml:
447 2
            id_ = required_key(item, "id")
448 2
            controls_ids = required_key(item, "controls")
449 2
            if not isinstance(controls_ids, list):
450 2
                if controls_ids != "all":
451
                    msg = (
452
                        "Policy {id_} contains invalid controls list {controls}."
453
                        .format(id_=id_, controls=str(controls_ids)))
454
                    raise ValueError(msg)
455 2
            self.policies[id_] = controls_ids
456
457 2 View Code Duplication
    def _process_controls_ids_into_controls(self, controls_manager, policy_id, controls_ids):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
458 2
        controls = []
459 2
        for cid in controls_ids:
460 2
            if not cid.startswith("all"):
461 2
                controls.extend(
462
                    self._controls_ids_to_controls(controls_manager, policy_id, [cid]))
463 2
            elif ":" in cid:
464
                _, level_id = cid.split(":", 1)
465
                controls.extend(
466
                    controls_manager.get_all_controls_of_level_at_least(policy_id, level_id))
467
            else:
468 2
                controls.extend(controls_manager.get_all_controls(policy_id))
469 2
        return controls
470
471 2
    def resolve_controls(self, controls_manager):
472 2
        for policy_id, controls_ids in self.policies.items():
473 2
            controls = []
474
475 2
            if isinstance(controls_ids, list):
476 2
                controls = self._process_controls_ids_into_controls(
477
                    controls_manager, policy_id, controls_ids)
478 2
            elif controls_ids.startswith("all"):
479 2
                controls = self._process_controls_ids_into_controls(
480
                    controls_manager, policy_id, [controls_ids])
481
            else:
482
                msg = (
483
                    "Unknown policy content {content} in profile {profile_id}"
484
                    .format(content=controls_ids, profile_id=self.id_))
485
                raise ValueError(msg)
486
487 2
            for c in controls:
488 2
                self._merge_control(c)
489
490 2
    def extend_by(self, extended_profile):
491 2
        self.policies.update(extended_profile.policies)
492 2
        super(ProfileWithSeparatePolicies, self).extend_by(extended_profile)
493
494
495 2
class ProfileWithInlinePolicies(ResolvableProfile):
496 2
    def __init__(self, * args, ** kwargs):
497 2
        super(ProfileWithInlinePolicies, self).__init__(* args, ** kwargs)
498 2
        self.controls_by_policy = defaultdict(list)
499
500 2
    def apply_selection(self, item):
501 2
        if ":" in item:
502 2
            policy_id, control_id = item.split(":", 1)
503 2
            self.controls_by_policy[policy_id].append(control_id)
504
        else:
505 2
            super(ProfileWithInlinePolicies, self).apply_selection(item)
506
507 2 View Code Duplication
    def _process_controls_ids_into_controls(self, controls_manager, policy_id, controls_ids):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
508 2
        controls = []
509 2
        for cid in controls_ids:
510 2
            if not cid.startswith("all"):
511 2
                controls.extend(
512
                    self._controls_ids_to_controls(controls_manager, policy_id, [cid]))
513 2
            elif ":" in cid:
514
                _, level_id = cid.split(":", 1)
515
                controls.extend(
516
                    controls_manager.get_all_controls_of_level_at_least(policy_id, level_id))
517
            else:
518 2
                controls.extend(
519
                    controls_manager.get_all_controls(policy_id))
520 2
        return controls
521
522 2
    def resolve_controls(self, controls_manager):
523 2
        for policy_id, controls_ids in self.controls_by_policy.items():
524 2
            controls = self._process_controls_ids_into_controls(
525
                controls_manager, policy_id, controls_ids)
526
527 2
            for c in controls:
528 2
                self._merge_control(c)
529
530
531 2
class Value(object):
532
    """Represents XCCDF Value
533
    """
534
535 2
    def __init__(self, id_):
536 2
        self.id_ = id_
537 2
        self.title = ""
538 2
        self.description = ""
539 2
        self.type_ = "string"
540 2
        self.operator = "equals"
541 2
        self.interactive = False
542 2
        self.options = {}
543 2
        self.warnings = []
544
545 2
    @staticmethod
546 2
    def from_yaml(yaml_file, env_yaml=None):
547 2
        yaml_contents = open_and_macro_expand(yaml_file, env_yaml)
548 2
        if yaml_contents is None:
549
            return None
550
551 2
        value_id, _ = os.path.splitext(os.path.basename(yaml_file))
552 2
        value = Value(value_id)
553 2
        value.title = required_key(yaml_contents, "title")
554 2
        del yaml_contents["title"]
555 2
        value.description = required_key(yaml_contents, "description")
556 2
        del yaml_contents["description"]
557 2
        value.type_ = required_key(yaml_contents, "type")
558 2
        del yaml_contents["type"]
559 2
        value.operator = yaml_contents.pop("operator", "equals")
560 2
        possible_operators = ["equals", "not equal", "greater than",
561
                              "less than", "greater than or equal",
562
                              "less than or equal", "pattern match"]
563
564 2
        if value.operator not in possible_operators:
565
            raise ValueError(
566
                "Found an invalid operator value '%s' in '%s'. "
567
                "Expected one of: %s"
568
                % (value.operator, yaml_file, ", ".join(possible_operators))
569
            )
570
571 2
        value.interactive = \
572
            yaml_contents.pop("interactive", "false").lower() == "true"
573
574 2
        value.options = required_key(yaml_contents, "options")
575 2
        del yaml_contents["options"]
576 2
        value.warnings = yaml_contents.pop("warnings", [])
577
578 2
        for warning_list in value.warnings:
579
            if len(warning_list) != 1:
580
                raise ValueError("Only one key/value pair should exist for each dictionary")
581
582 2
        if yaml_contents:
583
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
584
                               % (yaml_file, yaml_contents))
585
586 2
        return value
587
588 2
    def to_xml_element(self):
589
        value = ET.Element('Value')
590
        value.set('id', self.id_)
591
        value.set('type', self.type_)
592
        if self.operator != "equals":  # equals is the default
593
            value.set('operator', self.operator)
594
        if self.interactive:  # False is the default
595
            value.set('interactive', 'true')
596
        title = ET.SubElement(value, 'title')
597
        title.text = self.title
598
        add_sub_element(value, 'description', self.description)
599
        add_warning_elements(value, self.warnings)
600
601
        for selector, option in self.options.items():
602
            # do not confuse Value with big V with value with small v
603
            # value is child element of Value
604
            value_small = ET.SubElement(value, 'value')
605
            # by XCCDF spec, default value is value without selector
606
            if selector != "default":
607
                value_small.set('selector', str(selector))
608
            value_small.text = str(option)
609
610
        return value
611
612 2
    def to_file(self, file_name):
613
        root = self.to_xml_element()
614
        tree = ET.ElementTree(root)
615
        tree.write(file_name)
616
617
618 2
class Benchmark(object):
619
    """Represents XCCDF Benchmark
620
    """
621 2
    def __init__(self, id_):
622
        self.id_ = id_
623
        self.title = ""
624
        self.status = ""
625
        self.description = ""
626
        self.notice_id = ""
627
        self.notice_description = ""
628
        self.front_matter = ""
629
        self.rear_matter = ""
630
        self.cpes = []
631
        self.version = "0.1"
632
        self.profiles = []
633
        self.values = {}
634
        self.bash_remediation_fns_group = None
635
        self.groups = {}
636
        self.rules = {}
637
638
        # This is required for OCIL clauses
639
        conditional_clause = Value("conditional_clause")
640
        conditional_clause.title = "A conditional clause for check statements."
641
        conditional_clause.description = conditional_clause.title
642
        conditional_clause.type_ = "string"
643
        conditional_clause.options = {"": "This is a placeholder"}
644
645
        self.add_value(conditional_clause)
646
647 2
    @classmethod
648 2
    def from_yaml(cls, yaml_file, id_, product_yaml=None):
649
        yaml_contents = open_and_macro_expand(yaml_file, product_yaml)
650
        if yaml_contents is None:
651
            return None
652
653
        benchmark = cls(id_)
654
        benchmark.title = required_key(yaml_contents, "title")
655
        del yaml_contents["title"]
656
        benchmark.status = required_key(yaml_contents, "status")
657
        del yaml_contents["status"]
658
        benchmark.description = required_key(yaml_contents, "description")
659
        del yaml_contents["description"]
660
        notice_contents = required_key(yaml_contents, "notice")
661
        benchmark.notice_id = required_key(notice_contents, "id")
662
        del notice_contents["id"]
663
        benchmark.notice_description = required_key(notice_contents,
664
                                                    "description")
665
        del notice_contents["description"]
666
        if not notice_contents:
667
            del yaml_contents["notice"]
668
669
        benchmark.front_matter = required_key(yaml_contents,
670
                                              "front-matter")
671
        del yaml_contents["front-matter"]
672
        benchmark.rear_matter = required_key(yaml_contents,
673
                                             "rear-matter")
674
        del yaml_contents["rear-matter"]
675
        benchmark.version = str(required_key(yaml_contents, "version"))
676
        del yaml_contents["version"]
677
678
        if yaml_contents:
679
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
680
                               % (yaml_file, yaml_contents))
681
682
        return benchmark
683
684 2
    def add_profiles_from_dir(self, dir_, env_yaml):
685
        for dir_item in os.listdir(dir_):
686
            dir_item_path = os.path.join(dir_, dir_item)
687
            if not os.path.isfile(dir_item_path):
688
                continue
689
690
            _, ext = os.path.splitext(os.path.basename(dir_item_path))
691
            if ext != '.profile':
692
                sys.stderr.write(
693
                    "Encountered file '%s' while looking for profiles, "
694
                    "extension '%s' is unknown. Skipping..\n"
695
                    % (dir_item, ext)
696
                )
697
                continue
698
699
            try:
700
                new_profile = ProfileWithInlinePolicies.from_yaml(dir_item_path, env_yaml)
701
            except DocumentationNotComplete:
702
                continue
703
            except Exception as exc:
704
                msg = ("Error building profile from '{fname}': '{error}'"
705
                       .format(fname=dir_item_path, error=str(exc)))
706
                raise RuntimeError(msg)
707
            if new_profile is None:
708
                continue
709
710
            self.profiles.append(new_profile)
711
712 2
    def add_bash_remediation_fns_from_file(self, file_):
713
        if not file_:
714
            # bash-remediation-functions.xml doens't exist
715
            return
716
717
        tree = ET.parse(file_)
718
        self.bash_remediation_fns_group = tree.getroot()
719
720 2
    def to_xml_element(self, product_cpes):
721
        root = ET.Element('Benchmark')
722
        root.set('xmlns:xsi', 'http://www.w3.org/2001/XMLSchema-instance')
723
        root.set('xmlns:xhtml', 'http://www.w3.org/1999/xhtml')
724
        root.set('xmlns:dc', 'http://purl.org/dc/elements/1.1/')
725
        root.set('id', 'product-name')
726
        root.set('xsi:schemaLocation',
727
                 'http://checklists.nist.gov/xccdf/1.1 xccdf-1.1.4.xsd')
728
        root.set('style', 'SCAP_1.1')
729
        root.set('resolved', 'false')
730
        root.set('xml:lang', 'en-US')
731
        status = ET.SubElement(root, 'status')
732
        status.set('date', datetime.date.today().strftime("%Y-%m-%d"))
733
        status.text = self.status
734
        add_sub_element(root, "title", self.title)
735
        add_sub_element(root, "description", self.description)
736
        notice = add_sub_element(root, "notice", self.notice_description)
737
        notice.set('id', self.notice_id)
738
        add_sub_element(root, "front-matter", self.front_matter)
739
        add_sub_element(root, "rear-matter", self.rear_matter)
740
741
        # The Benchmark applicability is determined by the CPEs
742
        # defined in the product.yml
743
        product_cpe_names = product_cpes.get_product_cpe_names()
744
        for cpe_name in product_cpe_names:
745
            plat = ET.SubElement(root, "platform")
746
            plat.set("idref", cpe_name)
747
748
        version = ET.SubElement(root, 'version')
749
        version.text = self.version
750
        ET.SubElement(root, "metadata")
751
752
        for profile in self.profiles:
753
            root.append(profile.to_xml_element(product_cpes))
754
755
        for value in self.values.values():
756
            root.append(value.to_xml_element())
757
        if self.bash_remediation_fns_group is not None:
758
            root.append(self.bash_remediation_fns_group)
759
760
        groups_in_bench = list(self.groups.keys())
761
        priority_order = ["system", "services"]
762
        groups_in_bench = reorder_according_to_ordering(groups_in_bench, priority_order)
763
764
        # Make system group the first, followed by services group
765
        for group_id in groups_in_bench:
766
            group = self.groups.get(group_id)
767
            # Products using application benchmark don't have system or services group
768
            if group is not None:
769
                root.append(group.to_xml_element(product_cpes))
770
771
        for rule in self.rules.values():
772
            root.append(rule.to_xml_element(product_cpes))
773
774
        return root
775
776 2
    def to_file(self, file_name, product_cpes):
777
        root = self.to_xml_element(product_cpes)
778
        tree = ET.ElementTree(root)
779
        tree.write(file_name)
780
781 2
    def add_value(self, value):
782
        if value is None:
783
            return
784
        self.values[value.id_] = value
785
786 2
    def add_group(self, group):
787
        if group is None:
788
            return
789
        self.groups[group.id_] = group
790
791 2
    def add_rule(self, rule):
792
        if rule is None:
793
            return
794
        self.rules[rule.id_] = rule
795
796 2
    def to_xccdf(self):
797
        """We can easily extend this script to generate a valid XCCDF instead
798
        of SSG SHORTHAND.
799
        """
800
        raise NotImplementedError
801
802 2
    def __str__(self):
803
        return self.id_
804
805
806 2
class Group(object):
807
    """Represents XCCDF Group
808
    """
809 2
    ATTRIBUTES_TO_PASS_ON = (
810
        "platform",
811
    )
812
813 2
    def __init__(self, id_):
814
        self.id_ = id_
815
        self.prodtype = "all"
816
        self.title = ""
817
        self.description = ""
818
        self.warnings = []
819
        self.requires = []
820
        self.conflicts = []
821
        self.values = {}
822
        self.groups = {}
823
        self.rules = {}
824
        self.platform = None
825
826 2
    @classmethod
827 2
    def from_yaml(cls, yaml_file, env_yaml=None):
828
        yaml_contents = open_and_macro_expand(yaml_file, env_yaml)
829
        if yaml_contents is None:
830
            return None
831
832
        group_id = os.path.basename(os.path.dirname(yaml_file))
833
        group = cls(group_id)
834
        group.prodtype = yaml_contents.pop("prodtype", "all")
835
        group.title = required_key(yaml_contents, "title")
836
        del yaml_contents["title"]
837
        group.description = required_key(yaml_contents, "description")
838
        del yaml_contents["description"]
839
        group.warnings = yaml_contents.pop("warnings", [])
840
        group.conflicts = yaml_contents.pop("conflicts", [])
841
        group.requires = yaml_contents.pop("requires", [])
842
        group.platform = yaml_contents.pop("platform", None)
843
844
        for warning_list in group.warnings:
845
            if len(warning_list) != 1:
846
                raise ValueError("Only one key/value pair should exist for each dictionary")
847
848
        if yaml_contents:
849
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
850
                               % (yaml_file, yaml_contents))
851
        group.validate_prodtype(yaml_file)
852
        return group
853
854 2
    def validate_prodtype(self, yaml_file):
855
        for ptype in self.prodtype.split(","):
856
            if ptype.strip() != ptype:
857
                msg = (
858
                    "Comma-separated '{prodtype}' prodtype "
859
                    "in {yaml_file} contains whitespace."
860
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
861
                raise ValueError(msg)
862
863 2
    def to_xml_element(self, product_cpes):
864
        group = ET.Element('Group')
865
        group.set('id', self.id_)
866
        if self.prodtype != "all":
867
            group.set("prodtype", self.prodtype)
868
        title = ET.SubElement(group, 'title')
869
        title.text = self.title
870
        add_sub_element(group, 'description', self.description)
871
        add_warning_elements(group, self.warnings)
872
        add_nondata_subelements(group, "requires", "id", self.requires)
873
        add_nondata_subelements(group, "conflicts", "id", self.conflicts)
874
875
        if self.platform:
876
            platform_el = ET.SubElement(group, "platform")
877
            try:
878
                platform_cpe = product_cpes.get_cpe_name(self.platform)
879
            except CPEDoesNotExist:
880
                print("Unsupported platform '%s' in rule '%s'." % (self.platform, self.id_))
881
                raise
882
            platform_el.set("idref", platform_cpe)
883
884
        for _value in self.values.values():
885
            group.append(_value.to_xml_element())
886
887
        # Rules that install or remove packages affect remediation
888
        # of other rules.
889
        # When packages installed/removed rules come first:
890
        # The Rules are ordered in more logical way, and
891
        # remediation order is natural, first the package is installed, then configured.
892
        rules_in_group = list(self.rules.keys())
893
        regex = r'(package_.*_(installed|removed))|(service_.*_(enabled|disabled))$'
894
        priority_order = ["installed", "removed", "enabled", "disabled"]
895
        rules_in_group = reorder_according_to_ordering(rules_in_group, priority_order, regex)
896
897
        # Add rules in priority order, first all packages installed, then removed,
898
        # followed by services enabled, then disabled
899
        for rule_id in rules_in_group:
900
            group.append(self.rules.get(rule_id).to_xml_element(product_cpes))
901
902
        # Add the sub groups after any current level group rules.
903
        # As package installed/removed and service enabled/disabled rules are usuallly in
904
        # top level group, this ensures groups that further configure a package or service
905
        # are after rules that install or remove it.
906
        groups_in_group = list(self.groups.keys())
907
        priority_order = [
908
            # Make sure rpm_verify_(hashes|permissions|ownership) are run before any other rule.
909
            # Due to conflicts between rules rpm_verify_* rules and any rule that configures
910
            # stricter settings, like file_permissions_grub2_cfg and sudo_dedicated_group,
911
            # the rules deviating from the system default should be evaluated later.
912
            # So that in the end the system has contents, permissions and ownership reset, and
913
            # any deviations or stricter settings are applied by the rules in the profile.
914
            "software", "integrity", "integrity-software", "rpm_verification",
915
916
            # The account group has to precede audit group because
917
            # the rule package_screen_installed is desired to be executed before the rule
918
            # audit_rules_privileged_commands, othervise the rule
919
            # does not catch newly installed screen binary during remediation
920
            # and report fail
921
            "accounts", "auditing",
922
923
924
            # The FIPS group should come before Crypto,
925
            # if we want to set a different (stricter) Crypto Policy than FIPS.
926
            "fips", "crypto",
927
928
            # The firewalld_activation must come before ruleset_modifications, othervise
929
            # remediations for ruleset_modifications won't work
930
            "firewalld_activation", "ruleset_modifications",
931
932
            # Rules from group disabling_ipv6 must precede rules from configuring_ipv6,
933
            # otherwise the remediation prints error although it is successful
934
            "disabling_ipv6", "configuring_ipv6"
935
        ]
936
        groups_in_group = reorder_according_to_ordering(groups_in_group, priority_order)
937
        for group_id in groups_in_group:
938
            _group = self.groups[group_id]
939
            group.append(_group.to_xml_element(product_cpes))
940
941
        return group
942
943 2
    def to_file(self, file_name, product_cpes):
944
        root = self.to_xml_element(product_cpes)
945
        tree = ET.ElementTree(root)
946
        tree.write(file_name)
947
948 2
    def add_value(self, value):
949
        if value is None:
950
            return
951
        self.values[value.id_] = value
952
953 2
    def add_group(self, group):
954
        if group is None:
955
            return
956
        if self.platform and not group.platform:
957
            group.platform = self.platform
958
        self.groups[group.id_] = group
959
        self._pass_our_properties_on_to(group)
960
961 2
    def _pass_our_properties_on_to(self, obj):
962
        for attr in self.ATTRIBUTES_TO_PASS_ON:
963
            if hasattr(obj, attr) and getattr(obj, attr) is None:
964
                setattr(obj, attr, getattr(self, attr))
965
966 2
    def add_rule(self, rule):
967
        if rule is None:
968
            return
969
        if self.platform and not rule.platform:
970
            rule.platform = self.platform
971
        self.rules[rule.id_] = rule
972
        self._pass_our_properties_on_to(rule)
973
974 2
    def __str__(self):
975
        return self.id_
976
977
978 2
class Rule(object):
979
    """Represents XCCDF Rule
980
    """
981 2
    YAML_KEYS_DEFAULTS = {
982
        "prodtype": lambda: "all",
983
        "title": lambda: RuntimeError("Missing key 'title'"),
984
        "description": lambda: RuntimeError("Missing key 'description'"),
985
        "rationale": lambda: RuntimeError("Missing key 'rationale'"),
986
        "severity": lambda: RuntimeError("Missing key 'severity'"),
987
        "references": lambda: dict(),
988
        "identifiers": lambda: dict(),
989
        "ocil_clause": lambda: None,
990
        "ocil": lambda: None,
991
        "oval_external_content": lambda: None,
992
        "warnings": lambda: list(),
993
        "conflicts": lambda: list(),
994
        "requires": lambda: list(),
995
        "platform": lambda: None,
996
        "inherited_platforms": lambda: list(),
997
        "template": lambda: None,
998
        "definition_location": lambda: None,
999
    }
1000
1001 2
    def __init__(self, id_):
1002 2
        self.id_ = id_
1003 2
        self.prodtype = "all"
1004 2
        self.title = ""
1005 2
        self.description = ""
1006 2
        self.definition_location = ""
1007 2
        self.rationale = ""
1008 2
        self.severity = "unknown"
1009 2
        self.references = {}
1010 2
        self.identifiers = {}
1011 2
        self.ocil_clause = None
1012 2
        self.ocil = None
1013 2
        self.oval_external_content = None
1014 2
        self.warnings = []
1015 2
        self.requires = []
1016 2
        self.conflicts = []
1017 2
        self.platform = None
1018 2
        self.inherited_platforms = [] # platforms inherited from the group
1019 2
        self.template = None
1020
1021 2
    @classmethod
1022 2
    def from_yaml(cls, yaml_file, env_yaml=None):
1023 2
        yaml_file = os.path.normpath(yaml_file)
1024
1025 2
        yaml_contents = open_and_macro_expand(yaml_file, env_yaml)
1026 2
        if yaml_contents is None:
1027
            return None
1028
1029 2
        rule_id, ext = os.path.splitext(os.path.basename(yaml_file))
1030 2
        if rule_id == "rule" and ext == ".yml":
1031 2
            rule_id = get_rule_dir_id(yaml_file)
1032
1033 2
        rule = cls(rule_id)
1034
1035 2
        try:
1036 2
            rule._set_attributes_from_dict(yaml_contents)
1037
        except RuntimeError as exc:
1038
            msg = ("Error processing '{fname}': {err}"
1039
                   .format(fname=yaml_file, err=str(exc)))
1040
            raise RuntimeError(msg)
1041
1042 2
        for warning_list in rule.warnings:
1043
            if len(warning_list) != 1:
1044
                raise ValueError("Only one key/value pair should exist for each dictionary")
1045
1046 2
        if yaml_contents:
1047
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
1048
                               % (yaml_file, yaml_contents))
1049
1050 2
        if not rule.definition_location:
1051 2
            rule.definition_location = yaml_file
1052
1053 2
        rule.validate_prodtype(yaml_file)
1054 2
        rule.validate_identifiers(yaml_file)
1055 2
        rule.validate_references(yaml_file)
1056 2
        return rule
1057
1058 2
    def _verify_stigid_format(self, product):
1059 2
        stig_id = self.references.get("stigid", None)
1060 2
        if not stig_id:
1061 2
            return
1062 2
        if "," in stig_id:
1063 2
            raise ValueError("Rules can not have multiple STIG IDs.")
1064
1065 2
    def _verify_disa_cci_format(self):
1066 2
        cci_id = self.references.get("disa", None)
1067 2
        if not cci_id:
1068 2
            return
1069
        cci_ex = re.compile(r'^CCI-[0-9]{6}$')
1070
        for cci in cci_id.split(","):
1071
            if not cci_ex.match(cci):
1072
                raise ValueError("CCI '{}' is in the wrong format! "
1073
                                 "Format should be similar to: "
1074
                                 "CCI-XXXXXX".format(cci))
1075
        self.references["disa"] = cci_id
1076
1077 2
    def normalize(self, product):
1078 2
        try:
1079 2
            self.make_refs_and_identifiers_product_specific(product)
1080 2
            self.make_template_product_specific(product)
1081 2
        except Exception as exc:
1082 2
            msg = (
1083
                "Error normalizing '{rule}': {msg}"
1084
                .format(rule=self.id_, msg=str(exc))
1085
            )
1086 2
            raise RuntimeError(msg)
1087
1088 2
    def _get_product_only_references(self):
1089 2
        PRODUCT_REFERENCES = ("stigid",)
1090
1091 2
        product_references = dict()
1092
1093 2
        for ref in PRODUCT_REFERENCES:
1094 2
            start = "{0}@".format(ref)
1095 2
            for gref, gval in self.references.items():
1096 2
                if ref == gref or gref.startswith(start):
1097 2
                    product_references[gref] = gval
1098 2
        return product_references
1099
1100 2
    def make_template_product_specific(self, product):
1101 2
        product_suffix = "@{0}".format(product)
1102
1103 2
        if not self.template:
1104
            return
1105
1106 2
        not_specific_vars = self.template.get("vars", dict())
1107 2
        specific_vars = self._make_items_product_specific(
1108
            not_specific_vars, product_suffix, True)
1109 2
        self.template["vars"] = specific_vars
1110
1111 2
        not_specific_backends = self.template.get("backends", dict())
1112 2
        specific_backends = self._make_items_product_specific(
1113
            not_specific_backends, product_suffix, True)
1114 2
        self.template["backends"] = specific_backends
1115
1116 2
    def make_refs_and_identifiers_product_specific(self, product):
1117 2
        product_suffix = "@{0}".format(product)
1118
1119 2
        product_references = self._get_product_only_references()
1120 2
        general_references = self.references.copy()
1121 2
        for todel in product_references:
1122 2
            general_references.pop(todel)
1123
1124 2
        to_set = dict(
1125
            identifiers=(self.identifiers, False),
1126
            general_references=(general_references, True),
1127
            product_references=(product_references, False),
1128
        )
1129 2
        for name, (dic, allow_overwrites) in to_set.items():
1130 2
            try:
1131 2
                new_items = self._make_items_product_specific(
1132
                    dic, product_suffix, allow_overwrites)
1133 2
            except ValueError as exc:
1134 2
                msg = (
1135
                    "Error processing {what} for rule '{rid}': {msg}"
1136
                    .format(what=name, rid=self.id_, msg=str(exc))
1137
                )
1138 2
                raise ValueError(msg)
1139 2
            dic.clear()
1140 2
            dic.update(new_items)
1141
1142 2
        self.references = general_references
1143 2
        self._verify_disa_cci_format()
1144 2
        self.references.update(product_references)
1145
1146 2
        self._verify_stigid_format(product)
1147
1148 2
    def _make_items_product_specific(self, items_dict, product_suffix, allow_overwrites=False):
1149 2
        new_items = dict()
1150 2
        for full_label, value in items_dict.items():
1151 2
            if "@" not in full_label and full_label not in new_items:
1152 2
                new_items[full_label] = value
1153 2
                continue
1154
1155 2
            if not full_label.endswith(product_suffix):
1156 2
                continue
1157
1158 2
            label = full_label.split("@")[0]
1159 2
            if label in items_dict and not allow_overwrites and value != items_dict[label]:
1160 2
                msg = (
1161
                    "There is a product-qualified '{item_q}' item, "
1162
                    "but also an unqualified '{item_u}' item "
1163
                    "and those two differ in value - "
1164
                    "'{value_q}' vs '{value_u}' respectively."
1165
                    .format(item_q=full_label, item_u=label,
1166
                            value_q=value, value_u=items_dict[label])
1167
                )
1168 2
                raise ValueError(msg)
1169 2
            new_items[label] = value
1170 2
        return new_items
1171
1172 2
    def _set_attributes_from_dict(self, yaml_contents):
1173 2
        for key, default_getter in self.YAML_KEYS_DEFAULTS.items():
1174 2
            if key not in yaml_contents:
1175 2
                value = default_getter()
1176 2
                if isinstance(value, Exception):
1177
                    raise value
1178
            else:
1179 2
                value = yaml_contents.pop(key)
1180
1181 2
            setattr(self, key, value)
1182
1183 2
    def to_contents_dict(self):
1184
        """
1185
        Returns a dictionary that is the same schema as the dict obtained when loading rule YAML.
1186
        """
1187
1188 2
        yaml_contents = dict()
1189 2
        for key in Rule.YAML_KEYS_DEFAULTS:
1190 2
            yaml_contents[key] = getattr(self, key)
1191
1192 2
        return yaml_contents
1193
1194 2
    def validate_identifiers(self, yaml_file):
1195 2
        if self.identifiers is None:
1196
            raise ValueError("Empty identifier section in file %s" % yaml_file)
1197
1198
        # Validate all identifiers are non-empty:
1199 2
        for ident_type, ident_val in self.identifiers.items():
1200 2
            if not isinstance(ident_type, str) or not isinstance(ident_val, str):
1201
                raise ValueError("Identifiers and values must be strings: %s in file %s"
1202
                                 % (ident_type, yaml_file))
1203 2
            if ident_val.strip() == "":
1204
                raise ValueError("Identifiers must not be empty: %s in file %s"
1205
                                 % (ident_type, yaml_file))
1206 2
            if ident_type[0:3] == 'cce':
1207 2
                if not is_cce_format_valid(ident_val):
1208
                    raise ValueError("CCE Identifier format must be valid: invalid format '%s' for CEE '%s'"
1209
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
1210 2
                if not is_cce_value_valid("CCE-" + ident_val):
1211
                    raise ValueError("CCE Identifier value is not a valid checksum: invalid value '%s' for CEE '%s'"
1212
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
1213
1214 2
    def validate_references(self, yaml_file):
1215 2
        if self.references is None:
1216
            raise ValueError("Empty references section in file %s" % yaml_file)
1217
1218 2
        for ref_type, ref_val in self.references.items():
1219 2
            if not isinstance(ref_type, str) or not isinstance(ref_val, str):
1220
                raise ValueError("References and values must be strings: %s in file %s"
1221
                                 % (ref_type, yaml_file))
1222 2
            if ref_val.strip() == "":
1223
                raise ValueError("References must not be empty: %s in file %s"
1224
                                 % (ref_type, yaml_file))
1225
1226 2
        for ref_type, ref_val in self.references.items():
1227 2
            for ref in ref_val.split(","):
1228 2
                if ref.strip() != ref:
1229
                    msg = (
1230
                        "Comma-separated '{ref_type}' reference "
1231
                        "in {yaml_file} contains whitespace."
1232
                        .format(ref_type=ref_type, yaml_file=yaml_file))
1233
                    raise ValueError(msg)
1234
1235 2
    def validate_prodtype(self, yaml_file):
1236 2
        for ptype in self.prodtype.split(","):
1237 2
            if ptype.strip() != ptype:
1238
                msg = (
1239
                    "Comma-separated '{prodtype}' prodtype "
1240
                    "in {yaml_file} contains whitespace."
1241
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
1242
                raise ValueError(msg)
1243
1244 2
    def to_xml_element(self, product_cpes):
1245
        rule = ET.Element('Rule')
1246
        rule.set('id', self.id_)
1247
        if self.prodtype != "all":
1248
            rule.set("prodtype", self.prodtype)
1249
        rule.set('severity', self.severity)
1250
        add_sub_element(rule, 'title', self.title)
1251
        add_sub_element(rule, 'description', self.description)
1252
        add_sub_element(rule, 'rationale', self.rationale)
1253
1254
        main_ident = ET.Element('ident')
1255
        for ident_type, ident_val in self.identifiers.items():
1256
            # This is not true if items were normalized
1257
            if '@' in ident_type:
1258
                # the ident is applicable only on some product
1259
                # format : 'policy@product', eg. 'stigid@product'
1260
                # for them, we create a separate <ref> element
1261
                policy, product = ident_type.split('@')
1262
                ident = ET.SubElement(rule, 'ident')
1263
                ident.set(policy, ident_val)
1264
                ident.set('prodtype', product)
1265
            else:
1266
                main_ident.set(ident_type, ident_val)
1267
1268
        if main_ident.attrib:
1269
            rule.append(main_ident)
1270
1271
        main_ref = ET.Element('ref')
1272
        for ref_type, ref_val in self.references.items():
1273
            # This is not true if items were normalized
1274
            if '@' in ref_type:
1275
                # the reference is applicable only on some product
1276
                # format : 'policy@product', eg. 'stigid@product'
1277
                # for them, we create a separate <ref> element
1278
                policy, product = ref_type.split('@')
1279
                ref = ET.SubElement(rule, 'ref')
1280
                ref.set(policy, ref_val)
1281
                ref.set('prodtype', product)
1282
            else:
1283
                main_ref.set(ref_type, ref_val)
1284
1285
        if main_ref.attrib:
1286
            rule.append(main_ref)
1287
1288
        if self.oval_external_content:
1289
            check = ET.SubElement(rule, 'check')
1290
            check.set("system", "http://oval.mitre.org/XMLSchema/oval-definitions-5")
1291
            external_content = ET.SubElement(check, "check-content-ref")
1292
            external_content.set("href", self.oval_external_content)
1293
        else:
1294
            # TODO: This is pretty much a hack, oval ID will be the same as rule ID
1295
            #       and we don't want the developers to have to keep them in sync.
1296
            #       Therefore let's just add an OVAL ref of that ID.
1297
            oval_ref = ET.SubElement(rule, "oval")
1298
            oval_ref.set("id", self.id_)
1299
1300
        if self.ocil or self.ocil_clause:
1301
            ocil = add_sub_element(rule, 'ocil', self.ocil if self.ocil else "")
1302
            if self.ocil_clause:
1303
                ocil.set("clause", self.ocil_clause)
1304
1305
        add_warning_elements(rule, self.warnings)
1306
        add_nondata_subelements(rule, "requires", "id", self.requires)
1307
        add_nondata_subelements(rule, "conflicts", "id", self.conflicts)
1308
1309
        if self.platform:
1310
            platform_el = ET.SubElement(rule, "platform")
1311
            try:
1312
                platform_cpe = product_cpes.get_cpe_name(self.platform)
1313
            except CPEDoesNotExist:
1314
                print("Unsupported platform '%s' in rule '%s'." % (self.platform, self.id_))
1315
                raise
1316
            platform_el.set("idref", platform_cpe)
1317
1318
        return rule
1319
1320 2
    def to_file(self, file_name, product_cpes):
1321
        root = self.to_xml_element(product_cpes)
1322
        tree = ET.ElementTree(root)
1323
        tree.write(file_name)
1324
1325
1326 2
class DirectoryLoader(object):
1327 2
    def __init__(self, profiles_dir, bash_remediation_fns, env_yaml):
1328
        self.benchmark_file = None
1329
        self.group_file = None
1330
        self.loaded_group = None
1331
        self.rule_files = []
1332
        self.value_files = []
1333
        self.subdirectories = []
1334
1335
        self.all_values = set()
1336
        self.all_rules = set()
1337
        self.all_groups = set()
1338
1339
        self.profiles_dir = profiles_dir
1340
        self.bash_remediation_fns = bash_remediation_fns
1341
        self.env_yaml = env_yaml
1342
        self.product = env_yaml["product"]
1343
1344
        self.parent_group = None
1345
1346 2
    def _collect_items_to_load(self, guide_directory):
1347
        for dir_item in os.listdir(guide_directory):
1348
            dir_item_path = os.path.join(guide_directory, dir_item)
1349
            _, extension = os.path.splitext(dir_item)
1350
1351
            if extension == '.var':
1352
                self.value_files.append(dir_item_path)
1353
            elif dir_item == "benchmark.yml":
1354
                if self.benchmark_file:
1355
                    raise ValueError("Multiple benchmarks in one directory")
1356
                self.benchmark_file = dir_item_path
1357
            elif dir_item == "group.yml":
1358
                if self.group_file:
1359
                    raise ValueError("Multiple groups in one directory")
1360
                self.group_file = dir_item_path
1361
            elif extension == '.rule':
1362
                self.rule_files.append(dir_item_path)
1363
            elif is_rule_dir(dir_item_path):
1364
                self.rule_files.append(get_rule_dir_yaml(dir_item_path))
1365
            elif dir_item != "tests":
1366
                if os.path.isdir(dir_item_path):
1367
                    self.subdirectories.append(dir_item_path)
1368
                else:
1369
                    sys.stderr.write(
1370
                        "Encountered file '%s' while recursing, extension '%s' "
1371
                        "is unknown. Skipping..\n"
1372
                        % (dir_item, extension)
1373
                    )
1374
1375 2
    def load_benchmark_or_group(self, guide_directory):
1376
        """
1377
        Loads a given benchmark or group from the specified benchmark_file or
1378
        group_file, in the context of guide_directory, profiles_dir,
1379
        env_yaml, and bash_remediation_fns.
1380
1381
        Returns the loaded group or benchmark.
1382
        """
1383
        group = None
1384
        if self.group_file and self.benchmark_file:
1385
            raise ValueError("A .benchmark file and a .group file were found in "
1386
                             "the same directory '%s'" % (guide_directory))
1387
1388
        # we treat benchmark as a special form of group in the following code
1389
        if self.benchmark_file:
1390
            group = Benchmark.from_yaml(
1391
                self.benchmark_file, 'product-name', self.env_yaml
1392
            )
1393
            if self.profiles_dir:
1394
                group.add_profiles_from_dir(self.profiles_dir, self.env_yaml)
1395
            group.add_bash_remediation_fns_from_file(self.bash_remediation_fns)
1396
1397
        if self.group_file:
1398
            group = Group.from_yaml(self.group_file, self.env_yaml)
1399
            self.all_groups.add(group.id_)
1400
1401
        return group
1402
1403 2
    def _load_group_process_and_recurse(self, guide_directory):
1404
        self.loaded_group = self.load_benchmark_or_group(guide_directory)
1405
1406
        if self.loaded_group:
1407
            if self.parent_group:
1408
                self.parent_group.add_group(self.loaded_group)
1409
1410
            self._process_values()
1411
            self._recurse_into_subdirs()
1412
            self._process_rules()
1413
1414 2
    def process_directory_tree(self, start_dir, extra_group_dirs=None):
1415
        self._collect_items_to_load(start_dir)
1416
        if extra_group_dirs is not None:
1417
            self.subdirectories += extra_group_dirs
1418
        self._load_group_process_and_recurse(start_dir)
1419
1420 2
    def _recurse_into_subdirs(self):
1421
        for subdir in self.subdirectories:
1422
            loader = self._get_new_loader()
1423
            loader.parent_group = self.loaded_group
1424
            loader.process_directory_tree(subdir)
1425
            self.all_values.update(loader.all_values)
1426
            self.all_rules.update(loader.all_rules)
1427
            self.all_groups.update(loader.all_groups)
1428
1429 2
    def _get_new_loader(self):
1430
        raise NotImplementedError()
1431
1432 2
    def _process_values(self):
1433
        raise NotImplementedError()
1434
1435 2
    def _process_rules(self):
1436
        raise NotImplementedError()
1437
1438
1439 2
class BuildLoader(DirectoryLoader):
1440 2
    def __init__(self, profiles_dir, bash_remediation_fns, env_yaml, resolved_rules_dir=None):
1441
        super(BuildLoader, self).__init__(profiles_dir, bash_remediation_fns, env_yaml)
1442
1443
        self.resolved_rules_dir = resolved_rules_dir
1444
        if resolved_rules_dir and not os.path.isdir(resolved_rules_dir):
1445
            os.mkdir(resolved_rules_dir)
1446
1447 2
    def _process_values(self):
1448
        for value_yaml in self.value_files:
1449
            value = Value.from_yaml(value_yaml, self.env_yaml)
1450
            self.all_values.add(value)
1451
            self.loaded_group.add_value(value)
1452
1453 2
    def _process_rules(self):
1454
        for rule_yaml in self.rule_files:
1455
            try:
1456
                rule = Rule.from_yaml(rule_yaml, self.env_yaml)
1457
            except DocumentationNotComplete:
1458
                # Happens on non-debug build when a rule is "documentation-incomplete"
1459
                continue
1460
            prodtypes = parse_prodtype(rule.prodtype)
1461
            if "all" not in prodtypes and self.product not in prodtypes:
1462
                continue
1463
            self.all_rules.add(rule)
1464
            self.loaded_group.add_rule(rule)
1465
1466
            rule.inherited_platforms.append(self.loaded_group.platform)
1467
1468
            if self.resolved_rules_dir:
1469
                output_for_rule = os.path.join(
1470
                    self.resolved_rules_dir, "{id_}.yml".format(id_=rule.id_))
1471
                mkdir_p(self.resolved_rules_dir)
1472
                with open(output_for_rule, "w") as f:
1473
                    rule.normalize(self.env_yaml["product"])
1474
                    yaml.dump(rule.to_contents_dict(), f)
1475
1476 2
    def _get_new_loader(self):
1477
        return BuildLoader(
1478
            self.profiles_dir, self.bash_remediation_fns, self.env_yaml, self.resolved_rules_dir)
1479
1480 2
    def export_group_to_file(self, filename, product_cpes):
1481
        return self.loaded_group.to_file(filename, product_cpes)
1482