Test Failed
Pull Request — master (#5078)
by Matěj
02:09
created

ssg.build_yaml.Value.from_yaml()   B

Complexity

Conditions 6

Size

Total Lines 42
Code Lines 33

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 22
CRAP Score 6.2286

Importance

Changes 0
Metric Value
cc 6
eloc 33
nop 2
dl 0
loc 42
ccs 22
cts 27
cp 0.8148
crap 6.2286
rs 8.1546
c 0
b 0
f 0
1 2
from __future__ import absolute_import
2 2
from __future__ import print_function
3
4 2
import os
5 2
import os.path
6 2
from collections import defaultdict
7 2
import datetime
8 2
import re
9 2
import sys
10
11 2
import yaml
12
13 2
from .constants import XCCDF_PLATFORM_TO_CPE
14 2
from .constants import PRODUCT_TO_CPE_MAPPING
15 2
from .constants import STIG_PLATFORM_ID_MAP
16 2
from .constants import XCCDF_REFINABLE_PROPERTIES
17 2
from .rules import get_rule_dir_id, get_rule_dir_yaml, is_rule_dir
18 2
from .rule_yaml import parse_prodtype
19
20 2
from .checks import is_cce_format_valid, is_cce_value_valid
21 2
from .yaml import DocumentationNotComplete, open_and_expand, open_and_macro_expand
22 2
from .utils import required_key, mkdir_p
23
24 2
from .xml import ElementTree as ET
25 2
from .shims import unicode_func
26
27
28 2
def add_sub_element(parent, tag, data):
29
    """
30
    Creates a new child element under parent with tag tag, and sets
31
    data as the content under the tag. In particular, data is a string
32
    to be parsed as an XML tree, allowing sub-elements of children to be
33
    added.
34
35
    If data should not be parsed as an XML tree, either escape the contents
36
    before passing into this function, or use ElementTree.SubElement().
37
38
    Returns the newly created subelement of type tag.
39
    """
40
    # This is used because our YAML data contain XML and XHTML elements
41
    # ET.SubElement() escapes the < > characters by &lt; and &gt;
42
    # and therefore it does not add child elements
43
    # we need to do a hack instead
44
    # TODO: Remove this function after we move to Markdown everywhere in SSG
45
    ustr = unicode_func("<{0}>{1}</{0}>").format(tag, data)
46
47
    try:
48
        element = ET.fromstring(ustr.encode("utf-8"))
49
    except Exception:
50
        msg = ("Error adding subelement to an element '{0}' from string: '{1}'"
51
               .format(parent.tag, ustr))
52
        raise RuntimeError(msg)
53
54
    parent.append(element)
55
    return element
56
57
58 2
def reorder_according_to_ordering(unordered, ordering, regex=None):
59 2
    ordered = []
60 2
    if regex is None:
61 2
        regex = "|".join(["({0})".format(item) for item in ordering])
62 2
    regex = re.compile(regex)
63
64 2
    items_to_order = list(filter(regex.match, unordered))
65 2
    unordered = set(unordered)
66
67 2
    for priority_type in ordering:
68 2
        for item in items_to_order:
69 2
            if priority_type in item and item in unordered:
70 2
                ordered.append(item)
71 2
                unordered.remove(item)
72 2
    ordered.extend(list(unordered))
73 2
    return ordered
74
75
76 2
def add_warning_elements(element, warnings):
77
    # The use of [{dict}, {dict}] in warnings is to handle the following
78
    # scenario where multiple warnings have the same category which is
79
    # valid in SCAP and our content:
80
    #
81
    # warnings:
82
    #     - general: Some general warning
83
    #     - general: Some other general warning
84
    #     - general: |-
85
    #         Some really long multiline general warning
86
    #
87
    # Each of the {dict} should have only one key/value pair.
88
    for warning_dict in warnings:
89
        warning = add_sub_element(element, "warning", list(warning_dict.values())[0])
90
        warning.set("category", list(warning_dict.keys())[0])
91
92
93 2
class Profile(object):
94
    """Represents XCCDF profile
95
    """
96
97 2
    def __init__(self, id_):
98 2
        self.id_ = id_
99 2
        self.title = ""
100 2
        self.description = ""
101 2
        self.extends = None
102 2
        self.selected = []
103 2
        self.unselected = []
104 2
        self.variables = dict()
105 2
        self.refine_rules = defaultdict(list)
106
107 2
    @staticmethod
108 2
    def from_yaml(yaml_file, env_yaml=None):
109 2
        yaml_contents = open_and_expand(yaml_file, env_yaml)
110 2
        if yaml_contents is None:
111
            return None
112
113 2
        basename, _ = os.path.splitext(os.path.basename(yaml_file))
114
115 2
        profile = Profile(basename)
116 2
        profile.title = required_key(yaml_contents, "title")
117 2
        del yaml_contents["title"]
118 2
        profile.description = required_key(yaml_contents, "description")
119 2
        del yaml_contents["description"]
120 2
        profile.extends = yaml_contents.pop("extends", None)
121 2
        selection_entries = required_key(yaml_contents, "selections")
122 2
        if selection_entries:
123 2
            profile._parse_selections(selection_entries)
124 2
        del yaml_contents["selections"]
125
126 2
        if yaml_contents:
127
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
128
                               % (yaml_file, yaml_contents))
129
130 2
        return profile
131
132 2
    def dump_yaml(self, file_name, documentation_complete=True):
133
        to_dump = {}
134
        to_dump["documentation_complete"] = documentation_complete
135
        to_dump["title"] = self.title
136
        to_dump["description"] = self.description
137
        if self.extends is not None:
138
            to_dump["extends"] = self.extends
139
140
        selections = []
141
        for item in self.selected:
142
            selections.append(item)
143
        for item in self.unselected:
144
            selections.append("!"+item)
145
        for varname in self.variables.keys():
146
            selections.append(varname+"="+self.variables.get(varname))
147
        to_dump["selections"] = selections
148
        with open(file_name, "w+") as f:
149
            yaml.dump(to_dump, f, indent=4)
150
151 2
    def _parse_selections(self, entries):
152 2
        for item in entries:
153 2
            if "." in item:
154
                rule, refinement = item.split(".", 1)
155
                property_, value = refinement.split("=", 1)
156
                if property_ not in XCCDF_REFINABLE_PROPERTIES:
157
                    msg = ("Property '{property_}' cannot be refined. "
158
                           "Rule properties that can be refined are {refinables}. "
159
                           "Fix refinement '{rule_id}.{property_}={value}' in profile '{profile}'."
160
                           .format(property_=property_, refinables=XCCDF_REFINABLE_PROPERTIES,
161
                                   rule_id=rule, value=value, profile=self.id_)
162
                           )
163
                    raise ValueError(msg)
164
                self.refine_rules[rule].append((property_, value))
165 2
            elif "=" in item:
166 2
                varname, value = item.split("=", 1)
167 2
                self.variables[varname] = value
168 2
            elif item.startswith("!"):
169
                self.unselected.append(item[1:])
170
            else:
171 2
                self.selected.append(item)
172
173 2
    def to_xml_element(self):
174
        element = ET.Element('Profile')
175
        element.set("id", self.id_)
176
        if self.extends:
177
            element.set("extends", self.extends)
178
        title = add_sub_element(element, "title", self.title)
179
        title.set("override", "true")
180
        desc = add_sub_element(element, "description", self.description)
181
        desc.set("override", "true")
182
183
        for selection in self.selected:
184
            select = ET.Element("select")
185
            select.set("idref", selection)
186
            select.set("selected", "true")
187
            element.append(select)
188
189
        for selection in self.unselected:
190
            unselect = ET.Element("select")
191
            unselect.set("idref", selection)
192
            unselect.set("selected", "false")
193
            element.append(unselect)
194
195
        for value_id, selector in self.variables.items():
196
            refine_value = ET.Element("refine-value")
197
            refine_value.set("idref", value_id)
198
            refine_value.set("selector", selector)
199
            element.append(refine_value)
200
201
        for refined_rule, refinement_list in self.refine_rules.items():
202
            refine_rule = ET.Element("refine-rule")
203
            refine_rule.set("idref", refined_rule)
204
            for refinement in refinement_list:
205
                refine_rule.set(refinement[0], refinement[1])
206
            element.append(refine_rule)
207
208
        return element
209
210 2
    def get_rule_selectors(self):
211 2
        return list(self.selected + self.unselected)
212
213 2
    def get_variable_selectors(self):
214 2
        return self.variables
215
216 2
    def validate_refine_rules(self, rules):
217
        existing_rule_ids = [r.id_ for r in rules]
218
        for refine_rule, refinement_list in self.refine_rules.items():
219
            # Take first refinement to ilustrate where the error is
220
            # all refinements in list are invalid, so it doesn't really matter
221
            a_refinement = refinement_list[0]
222
223
            if refine_rule not in existing_rule_ids:
224
                msg = (
225
                    "You are trying to refine a rule that doesn't exist. "
226
                    "Rule '{rule_id}' was not found in the benchmark. "
227
                    "Please check all rule refinements for rule: '{rule_id}', for example: "
228
                    "- {rule_id}.{property_}={value}' in profile {profile_id}."
229
                    .format(rule_id=refine_rule, profile_id=self.id_,
230
                            property_=a_refinement[0], value=a_refinement[1])
231
                    )
232
                raise ValueError(msg)
233
234
            if refine_rule not in self.get_rule_selectors():
235
                msg = ("- {rule_id}.{property_}={value}' in profile '{profile_id}' is refining "
236
                       "a rule that is not selected by it. The refinement will not have any "
237
                       "noticeable effect. Either select the rule or remove the rule refinement."
238
                       .format(rule_id=refine_rule, property_=a_refinement[0],
239
                               value=a_refinement[1], profile_id=self.id_)
240
                       )
241
                raise ValueError(msg)
242
243 2
    def validate_variables(self, variables):
244
        variables_by_id = dict()
245
        for var in variables:
246
            variables_by_id[var.id_] = var
247
248
        for var_id, our_val in self.variables.items():
249
            if var_id not in variables_by_id:
250
                all_vars_list = [" - %s" % v for v in variables_by_id.keys()]
251
                msg = (
252
                    "Value '{var_id}' in profile '{profile_name}' is not known. "
253
                    "We know only variables:\n{var_names}"
254
                    .format(
255
                        var_id=var_id, profile_name=self.id_,
256
                        var_names="\n".join(sorted(all_vars_list)))
257
                )
258
                raise ValueError(msg)
259
260
            allowed_selectors = [str(s) for s in variables_by_id[var_id].options.keys()]
261
            if our_val not in allowed_selectors:
262
                msg = (
263
                    "Value '{var_id}' in profile '{profile_name}' "
264
                    "uses the selector '{our_val}'. "
265
                    "This is not possible, as only selectors {all_selectors} are available. "
266
                    "Either change the selector used in the profile, or "
267
                    "add the selector-value pair to the variable definition."
268
                    .format(
269
                        var_id=var_id, profile_name=self.id_, our_val=our_val,
270
                        all_selectors=allowed_selectors,
271
                    )
272
                )
273
                raise ValueError(msg)
274
275 2
    def validate_rules(self, rules, groups):
276
        existing_rule_ids = [r.id_ for r in rules]
277
        rule_selectors = self.get_rule_selectors()
278
        for id_ in rule_selectors:
279
            if id_ in groups:
280
                msg = (
281
                    "You have selected a group '{group_id}' instead of a "
282
                    "rule. Groups have no effect in the profile and are not "
283
                    "allowed to be selected. Please remove '{group_id}' "
284
                    "from profile '{profile_id}' before proceeding."
285
                    .format(group_id=id_, profile_id=self.id_)
286
                )
287
                raise ValueError(msg)
288
            if id_ not in existing_rule_ids:
289
                msg = (
290
                    "Rule '{rule_id}' was not found in the benchmark. Please "
291
                    "remove rule '{rule_id}' from profile '{profile_id}' "
292
                    "before proceeding."
293
                    .format(rule_id=id_, profile_id=self.id_)
294
                )
295
                raise ValueError(msg)
296
297 2
    def __sub__(self, other):
298
        profile = Profile(self.id_)
299
        profile.title = self.title
300
        profile.description = self.description
301
        profile.extends = self.extends
302
        profile.selected = list(set(self.selected) - set(other.selected))
303
        profile.selected.sort()
304
        profile.unselected = list(set(self.unselected) - set(other.unselected))
305
        profile.variables = dict ((k, v) for (k, v) in self.variables.items()
306
                             if k not in other.variables or v != other.variables[k])
307
        return profile
308
309
310 2
class Value(object):
311
    """Represents XCCDF Value
312
    """
313
314 2
    def __init__(self, id_):
315 2
        self.id_ = id_
316 2
        self.title = ""
317 2
        self.description = ""
318 2
        self.type_ = "string"
319 2
        self.operator = "equals"
320 2
        self.interactive = False
321 2
        self.options = {}
322 2
        self.warnings = []
323
324 2
    @staticmethod
325 2
    def from_yaml(yaml_file, env_yaml=None):
326 2
        yaml_contents = open_and_expand(yaml_file, env_yaml)
327 2
        if yaml_contents is None:
328
            return None
329
330 2
        value_id, _ = os.path.splitext(os.path.basename(yaml_file))
331 2
        value = Value(value_id)
332 2
        value.title = required_key(yaml_contents, "title")
333 2
        del yaml_contents["title"]
334 2
        value.description = required_key(yaml_contents, "description")
335 2
        del yaml_contents["description"]
336 2
        value.type_ = required_key(yaml_contents, "type")
337 2
        del yaml_contents["type"]
338 2
        value.operator = yaml_contents.pop("operator", "equals")
339 2
        possible_operators = ["equals", "not equal", "greater than",
340
                              "less than", "greater than or equal",
341
                              "less than or equal", "pattern match"]
342
343 2
        if value.operator not in possible_operators:
344
            raise ValueError(
345
                "Found an invalid operator value '%s' in '%s'. "
346
                "Expected one of: %s"
347
                % (value.operator, yaml_file, ", ".join(possible_operators))
348
            )
349
350 2
        value.interactive = \
351
            yaml_contents.pop("interactive", "false").lower() == "true"
352
353 2
        value.options = required_key(yaml_contents, "options")
354 2
        del yaml_contents["options"]
355 2
        value.warnings = yaml_contents.pop("warnings", [])
356
357 2
        for warning_list in value.warnings:
358
            if len(warning_list) != 1:
359
                raise ValueError("Only one key/value pair should exist for each dictionary")
360
361 2
        if yaml_contents:
362
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
363
                               % (yaml_file, yaml_contents))
364
365 2
        return value
366
367 2
    def to_xml_element(self):
368
        value = ET.Element('Value')
369
        value.set('id', self.id_)
370
        value.set('type', self.type_)
371
        if self.operator != "equals":  # equals is the default
372
            value.set('operator', self.operator)
373
        if self.interactive:  # False is the default
374
            value.set('interactive', 'true')
375
        title = ET.SubElement(value, 'title')
376
        title.text = self.title
377
        add_sub_element(value, 'description', self.description)
378
        add_warning_elements(value, self.warnings)
379
380
        for selector, option in self.options.items():
381
            # do not confuse Value with big V with value with small v
382
            # value is child element of Value
383
            value_small = ET.SubElement(value, 'value')
384
            # by XCCDF spec, default value is value without selector
385
            if selector != "default":
386
                value_small.set('selector', str(selector))
387
            value_small.text = str(option)
388
389
        return value
390
391 2
    def to_file(self, file_name):
392
        root = self.to_xml_element()
393
        tree = ET.ElementTree(root)
394
        tree.write(file_name)
395
396
397 2
class Benchmark(object):
398
    """Represents XCCDF Benchmark
399
    """
400 2
    def __init__(self, id_):
401
        self.id_ = id_
402
        self.title = ""
403
        self.status = ""
404
        self.description = ""
405
        self.notice_id = ""
406
        self.notice_description = ""
407
        self.front_matter = ""
408
        self.rear_matter = ""
409
        self.cpes = []
410
        self.version = "0.1"
411
        self.profiles = []
412
        self.values = {}
413
        self.bash_remediation_fns_group = None
414
        self.groups = {}
415
        self.rules = {}
416
417
        # This is required for OCIL clauses
418
        conditional_clause = Value("conditional_clause")
419
        conditional_clause.title = "A conditional clause for check statements."
420
        conditional_clause.description = conditional_clause.title
421
        conditional_clause.type_ = "string"
422
        conditional_clause.options = {"": "This is a placeholder"}
423
424
        self.add_value(conditional_clause)
425
426 2
    @staticmethod
427 2
    def from_yaml(yaml_file, id_, product_yaml=None):
428
        yaml_contents = open_and_macro_expand(yaml_file, product_yaml)
429
        if yaml_contents is None:
430
            return None
431
432
        benchmark = Benchmark(id_)
433
        benchmark.title = required_key(yaml_contents, "title")
434
        del yaml_contents["title"]
435
        benchmark.status = required_key(yaml_contents, "status")
436
        del yaml_contents["status"]
437
        benchmark.description = required_key(yaml_contents, "description")
438
        del yaml_contents["description"]
439
        notice_contents = required_key(yaml_contents, "notice")
440
        benchmark.notice_id = required_key(notice_contents, "id")
441
        del notice_contents["id"]
442
        benchmark.notice_description = required_key(notice_contents,
443
                                                    "description")
444
        del notice_contents["description"]
445
        if not notice_contents:
446
            del yaml_contents["notice"]
447
448
        benchmark.front_matter = required_key(yaml_contents,
449
                                              "front-matter")
450
        del yaml_contents["front-matter"]
451
        benchmark.rear_matter = required_key(yaml_contents,
452
                                             "rear-matter")
453
        del yaml_contents["rear-matter"]
454
        benchmark.version = str(required_key(yaml_contents, "version"))
455
        del yaml_contents["version"]
456
457
        if yaml_contents:
458
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
459
                               % (yaml_file, yaml_contents))
460
461
        if product_yaml:
462
            benchmark.cpes = PRODUCT_TO_CPE_MAPPING[product_yaml["product"]]
463
464
        return benchmark
465
466 2
    def add_profiles_from_dir(self, action, dir_, env_yaml):
467
        for dir_item in os.listdir(dir_):
468
            dir_item_path = os.path.join(dir_, dir_item)
469
            if not os.path.isfile(dir_item_path):
470
                continue
471
472
            _, ext = os.path.splitext(os.path.basename(dir_item_path))
473
            if ext != '.profile':
474
                sys.stderr.write(
475
                    "Encountered file '%s' while looking for profiles, "
476
                    "extension '%s' is unknown. Skipping..\n"
477
                    % (dir_item, ext)
478
                )
479
                continue
480
481
            try:
482
                new_profile = Profile.from_yaml(dir_item_path, env_yaml)
483
            except DocumentationNotComplete:
484
                continue
485
            except Exception as exc:
486
                msg = ("Error building profile from '{fname}': '{error}'"
487
                       .format(fname=dir_item_path, error=str(exc)))
488
                raise RuntimeError(msg)
489
            if new_profile is None:
490
                continue
491
492
            self.profiles.append(new_profile)
493
            if action == "list-inputs":
494
                print(dir_item_path)
495
496 2
    def add_bash_remediation_fns_from_file(self, action, file_):
497
        if action == "list-inputs":
498
            print(file_)
499
        else:
500
            tree = ET.parse(file_)
501
            self.bash_remediation_fns_group = tree.getroot()
502
503 2
    def to_xml_element(self):
504
        root = ET.Element('Benchmark')
505
        root.set('xmlns:xsi', 'http://www.w3.org/2001/XMLSchema-instance')
506
        root.set('xmlns:xhtml', 'http://www.w3.org/1999/xhtml')
507
        root.set('xmlns:dc', 'http://purl.org/dc/elements/1.1/')
508
        root.set('id', 'product-name')
509
        root.set('xsi:schemaLocation',
510
                 'http://checklists.nist.gov/xccdf/1.1 xccdf-1.1.4.xsd')
511
        root.set('style', 'SCAP_1.1')
512
        root.set('resolved', 'false')
513
        root.set('xml:lang', 'en-US')
514
        status = ET.SubElement(root, 'status')
515
        status.set('date', datetime.date.today().strftime("%Y-%m-%d"))
516
        status.text = self.status
517
        add_sub_element(root, "title", self.title)
518
        add_sub_element(root, "description", self.description)
519
        notice = add_sub_element(root, "notice", self.notice_description)
520
        notice.set('id', self.notice_id)
521
        add_sub_element(root, "front-matter", self.front_matter)
522
        add_sub_element(root, "rear-matter", self.rear_matter)
523
524
        for idref in self.cpes:
525
            plat = ET.SubElement(root, "platform")
526
            plat.set("idref", idref)
527
528
        version = ET.SubElement(root, 'version')
529
        version.text = self.version
530
        ET.SubElement(root, "metadata")
531
532
        for profile in self.profiles:
533
            root.append(profile.to_xml_element())
534
535
        for value in self.values.values():
536
            root.append(value.to_xml_element())
537
        if self.bash_remediation_fns_group is not None:
538
            root.append(self.bash_remediation_fns_group)
539
540
        groups_in_bench = list(self.groups.keys())
541
        priority_order = ["system", "services"]
542
        groups_in_bench = reorder_according_to_ordering(groups_in_bench, priority_order)
543
544
        # Make system group the first, followed by services group
545
        for group_id in groups_in_bench:
546
            group = self.groups.get(group_id)
547
            # Products using application benchmark don't have system or services group
548
            if group is not None:
549
                root.append(group.to_xml_element())
550
551
        for rule in self.rules.values():
552
            root.append(rule.to_xml_element())
553
554
        return root
555
556 2
    def to_file(self, file_name):
557
        root = self.to_xml_element()
558
        tree = ET.ElementTree(root)
559
        tree.write(file_name)
560
561 2
    def add_value(self, value):
562
        if value is None:
563
            return
564
        self.values[value.id_] = value
565
566 2
    def add_group(self, group):
567
        if group is None:
568
            return
569
        self.groups[group.id_] = group
570
571 2
    def add_rule(self, rule):
572
        if rule is None:
573
            return
574
        self.rules[rule.id_] = rule
575
576 2
    def to_xccdf(self):
577
        """We can easily extend this script to generate a valid XCCDF instead
578
        of SSG SHORTHAND.
579
        """
580
        raise NotImplementedError
581
582 2
    def __str__(self):
583
        return self.id_
584
585
586 2
class Group(object):
587
    """Represents XCCDF Group
588
    """
589 2
    ATTRIBUTES_TO_PASS_ON = (
590
        "platform",
591
    )
592
593 2
    def __init__(self, id_):
594
        self.id_ = id_
595
        self.prodtype = "all"
596
        self.title = ""
597
        self.description = ""
598
        self.warnings = []
599
        self.values = {}
600
        self.groups = {}
601
        self.rules = {}
602
        self.platform = None
603
604 2
    @staticmethod
605 2
    def from_yaml(yaml_file, env_yaml=None):
606
        yaml_contents = open_and_macro_expand(yaml_file, env_yaml)
607
        if yaml_contents is None:
608
            return None
609
610
        group_id = os.path.basename(os.path.dirname(yaml_file))
611
        group = Group(group_id)
612
        group.prodtype = yaml_contents.pop("prodtype", "all")
613
        group.title = required_key(yaml_contents, "title")
614
        del yaml_contents["title"]
615
        group.description = required_key(yaml_contents, "description")
616
        del yaml_contents["description"]
617
        group.warnings = yaml_contents.pop("warnings", [])
618
        group.platform = yaml_contents.pop("platform", None)
619
620
        for warning_list in group.warnings:
621
            if len(warning_list) != 1:
622
                raise ValueError("Only one key/value pair should exist for each dictionary")
623
624
        if yaml_contents:
625
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
626
                               % (yaml_file, yaml_contents))
627
        group.validate_prodtype(yaml_file)
628
        return group
629
630 2
    def validate_prodtype(self, yaml_file):
631
        for ptype in self.prodtype.split(","):
632
            if ptype.strip() != ptype:
633
                msg = (
634
                    "Comma-separated '{prodtype}' prodtype "
635
                    "in {yaml_file} contains whitespace."
636
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
637
                raise ValueError(msg)
638
639 2
    def to_xml_element(self):
640
        group = ET.Element('Group')
641
        group.set('id', self.id_)
642
        if self.prodtype != "all":
643
            group.set("prodtype", self.prodtype)
644
        title = ET.SubElement(group, 'title')
645
        title.text = self.title
646
        add_sub_element(group, 'description', self.description)
647
        add_warning_elements(group, self.warnings)
648
649
        if self.platform:
650
            platform_el = ET.SubElement(group, "platform")
651
            try:
652
                platform_cpe = XCCDF_PLATFORM_TO_CPE[self.platform]
653
            except KeyError:
654
                raise ValueError("Unsupported platform '%s' in rule '%s'." % (self.platform, self.id_))
655
            platform_el.set("idref", platform_cpe)
656
657
        for _value in self.values.values():
658
            group.append(_value.to_xml_element())
659
660
        # Rules that install or remove packages affect remediation
661
        # of other rules.
662
        # When packages installed/removed rules come first:
663
        # The Rules are ordered in more logical way, and
664
        # remediation order is natural, first the package is installed, then configured.
665
        rules_in_group = list(self.rules.keys())
666
        regex = r'(package_.*_(installed|removed))|(service_.*_(enabled|disabled))$'
667
        priority_order = ["installed", "removed", "enabled", "disabled"]
668
        rules_in_group = reorder_according_to_ordering(rules_in_group, priority_order, regex)
669
670
        # Add rules in priority order, first all packages installed, then removed,
671
        # followed by services enabled, then disabled
672
        for rule_id in rules_in_group:
673
            group.append(self.rules.get(rule_id).to_xml_element())
674
675
        # Add the sub groups after any current level group rules.
676
        # As package installed/removed and service enabled/disabled rules are usuallly in
677
        # top level group, this ensures groups that further configure a package or service
678
        # are after rules that install or remove it.
679
        groups_in_group = list(self.groups.keys())
680
        # The FIPS group should come before Crypto - if we want to set a different (stricter) Crypto Policy than FIPS.
681
        priority_order = ["fips", "crypto"]
682
        groups_in_group = reorder_according_to_ordering(groups_in_group, priority_order)
683
        for group_id in groups_in_group:
684
            _group = self.groups[group_id]
685
            group.append(_group.to_xml_element())
686
687
        return group
688
689 2
    def to_file(self, file_name):
690
        root = self.to_xml_element()
691
        tree = ET.ElementTree(root)
692
        tree.write(file_name)
693
694 2
    def add_value(self, value):
695
        if value is None:
696
            return
697
        self.values[value.id_] = value
698
699 2
    def add_group(self, group):
700
        if group is None:
701
            return
702
        if self.platform and not group.platform:
703
            group.platform = self.platform
704
        self.groups[group.id_] = group
705
        self._pass_our_properties_on_to(group)
706
707 2
    def _pass_our_properties_on_to(self, obj):
708
        for attr in self.ATTRIBUTES_TO_PASS_ON:
709
            if hasattr(obj, attr) and getattr(obj, attr) is None:
710
                setattr(obj, attr, getattr(self, attr))
711
712 2
    def add_rule(self, rule):
713
        if rule is None:
714
            return
715
        if self.platform and not rule.platform:
716
            rule.platform = self.platform
717
        self.rules[rule.id_] = rule
718
        self._pass_our_properties_on_to(rule)
719
720 2
    def __str__(self):
721
        return self.id_
722
723
724 2
class Rule(object):
725
    """Represents XCCDF Rule
726
    """
727 2
    YAML_KEYS_DEFAULTS = {
728
        "prodtype": lambda: "all",
729
        "title": lambda: RuntimeError("Missing key 'title'"),
730
        "description": lambda: RuntimeError("Missing key 'description'"),
731
        "rationale": lambda: RuntimeError("Missing key 'rationale'"),
732
        "severity": lambda: RuntimeError("Missing key 'severity'"),
733
        "references": lambda: dict(),
734
        "identifiers": lambda: dict(),
735
        "ocil_clause": lambda: None,
736
        "ocil": lambda: None,
737
        "oval_external_content": lambda: None,
738
        "warnings": lambda: list(),
739
        "platform": lambda: None,
740
        "template": lambda: None,
741
    }
742
743 2
    def __init__(self, id_):
744 2
        self.id_ = id_
745 2
        self.prodtype = "all"
746 2
        self.title = ""
747 2
        self.description = ""
748 2
        self.rationale = ""
749 2
        self.severity = "unknown"
750 2
        self.references = {}
751 2
        self.identifiers = {}
752 2
        self.ocil_clause = None
753 2
        self.ocil = None
754 2
        self.oval_external_content = None
755 2
        self.warnings = []
756 2
        self.platform = None
757 2
        self.template = None
758
759 2
    @staticmethod
760 2
    def from_yaml(yaml_file, env_yaml=None):
761 2
        yaml_file = os.path.normpath(yaml_file)
762
763 2
        yaml_contents = open_and_macro_expand(yaml_file, env_yaml)
764 2
        if yaml_contents is None:
765
            return None
766
767 2
        rule_id, ext = os.path.splitext(os.path.basename(yaml_file))
768 2
        if rule_id == "rule" and ext == ".yml":
769 2
            rule_id = get_rule_dir_id(yaml_file)
770
771 2
        rule = Rule(rule_id)
772
773 2
        try:
774 2
            rule._set_attributes_from_dict(yaml_contents)
775
        except RuntimeError as exc:
776
            msg = ("Error processing '{fname}': {err}"
777
                   .format(fname=yaml_file, err=str(exc)))
778
            raise RuntimeError(msg)
779
780 2
        for warning_list in rule.warnings:
781
            if len(warning_list) != 1:
782
                raise ValueError("Only one key/value pair should exist for each dictionary")
783
784 2
        if yaml_contents:
785
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
786
                               % (yaml_file, yaml_contents))
787
788 2
        rule.validate_prodtype(yaml_file)
789 2
        rule.validate_identifiers(yaml_file)
790 2
        rule.validate_references(yaml_file)
791 2
        return rule
792
793 2
    def _make_stigid_product_specific(self, product):
794 2
        stig_id = self.references.get("stigid", None)
795 2
        if not stig_id:
796 2
            return
797 2
        if "," in stig_id:
798 2
            raise ValueError("Rules can not have multiple STIG IDs.")
799 2
        stig_platform_id = STIG_PLATFORM_ID_MAP.get(product, product.upper())
800 2
        product_specific_stig_id = "{platform_id}-{stig_id}".format(
801
            platform_id=stig_platform_id, stig_id=stig_id)
802 2
        self.references["stigid"] = product_specific_stig_id
803
804 2
    def normalize(self, product):
805 2
        try:
806 2
            self.make_refs_and_identifiers_product_specific(product)
807 2
            self.make_template_product_specific(product)
808 2
        except Exception as exc:
809 2
            msg = (
810
                "Error normalizing '{rule}': {msg}"
811
                .format(rule=self.id_, msg=str(exc))
812
            )
813 2
            raise RuntimeError(msg)
814
815 2
    def _get_product_only_references(self):
816 2
        PRODUCT_REFERENCES = ("stigid",)
817
818 2
        product_references = dict()
819
820 2
        for ref in PRODUCT_REFERENCES:
821 2
            start = "{0}@".format(ref)
822 2
            for gref, gval in self.references.items():
823 2
                if ref == gref or gref.startswith(start):
824 2
                    product_references[gref] = gval
825 2
        return product_references
826
827 2
    def make_template_product_specific(self, product):
828 2
        product_suffix = "@{0}".format(product)
829
830 2
        if not self.template:
831
            return
832
833 2
        not_specific_vars = self.template.get("vars", dict())
834 2
        specific_vars = self._make_items_product_specific(
835
            not_specific_vars, product_suffix, True)
836 2
        self.template["vars"] = specific_vars
837
838 2
        not_specific_backends = self.template.get("backends", dict())
839 2
        specific_backends = self._make_items_product_specific(
840
            not_specific_backends, product_suffix, True)
841 2
        self.template["backends"] = specific_backends
842
843 2
    def make_refs_and_identifiers_product_specific(self, product):
844 2
        product_suffix = "@{0}".format(product)
845
846 2
        product_references = self._get_product_only_references()
847 2
        general_references = self.references.copy()
848 2
        for todel in product_references:
849 2
            general_references.pop(todel)
850
851 2
        to_set = dict(
852
            identifiers=(self.identifiers, False),
853
            general_references=(general_references, True),
854
            product_references=(product_references, False),
855
        )
856 2
        for name, (dic, allow_overwrites) in to_set.items():
857 2
            try:
858 2
                new_items = self._make_items_product_specific(
859
                    dic, product_suffix, allow_overwrites)
860 2
            except ValueError as exc:
861 2
                msg = (
862
                    "Error processing {what} for rule '{rid}': {msg}"
863
                    .format(what=name, rid=self.id_, msg=str(exc))
864
                )
865 2
                raise ValueError(msg)
866 2
            dic.clear()
867 2
            dic.update(new_items)
868
869 2
        self.references = general_references
870 2
        self.references.update(product_references)
871
872 2
        self._make_stigid_product_specific(product)
873
874 2
    def _make_items_product_specific(self, items_dict, product_suffix, allow_overwrites=False):
875 2
        new_items = dict()
876 2
        for full_label, value in items_dict.items():
877 2
            if "@" not in full_label and full_label not in new_items:
878 2
                new_items[full_label] = value
879 2
                continue
880
881 2
            if not full_label.endswith(product_suffix):
882 2
                continue
883
884 2
            label = full_label.split("@")[0]
885 2
            if label in items_dict and not allow_overwrites and value != items_dict[label]:
886 2
                msg = (
887
                    "There is a product-qualified '{item_q}' item, "
888
                    "but also an unqualified '{item_u}' item "
889
                    "and those two differ in value - "
890
                    "'{value_q}' vs '{value_u}' respectively."
891
                    .format(item_q=full_label, item_u=label,
892
                            value_q=value, value_u=items_dict[label])
893
                )
894 2
                raise ValueError(msg)
895 2
            new_items[label] = value
896 2
        return new_items
897
898 2
    def _set_attributes_from_dict(self, yaml_contents):
899 2
        for key, default_getter in self.YAML_KEYS_DEFAULTS.items():
900 2
            if key not in yaml_contents:
901 2
                value = default_getter()
902 2
                if isinstance(value, Exception):
903
                    raise value
904
            else:
905 2
                value = yaml_contents.pop(key)
906
907 2
            setattr(self, key, value)
908
909 2
    def to_contents_dict(self):
910
        """
911
        Returns a dictionary that is the same schema as the dict obtained when loading rule YAML.
912
        """
913
914 2
        yaml_contents = dict()
915 2
        for key in Rule.YAML_KEYS_DEFAULTS:
916 2
            yaml_contents[key] = getattr(self, key)
917
918 2
        return yaml_contents
919
920 2
    def validate_identifiers(self, yaml_file):
921 2
        if self.identifiers is None:
922
            raise ValueError("Empty identifier section in file %s" % yaml_file)
923
924
        # Validate all identifiers are non-empty:
925 2
        for ident_type, ident_val in self.identifiers.items():
926 2
            if not isinstance(ident_type, str) or not isinstance(ident_val, str):
927
                raise ValueError("Identifiers and values must be strings: %s in file %s"
928
                                 % (ident_type, yaml_file))
929 2
            if ident_val.strip() == "":
930
                raise ValueError("Identifiers must not be empty: %s in file %s"
931
                                 % (ident_type, yaml_file))
932 2
            if ident_type[0:3] == 'cce':
933 2
                if not is_cce_format_valid("CCE-" + ident_val):
934
                    raise ValueError("CCE Identifier format must be valid: invalid format '%s' for CEE '%s'"
935
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
936 2
                if not is_cce_value_valid("CCE-" + ident_val):
937
                    raise ValueError("CCE Identifier value is not a valid checksum: invalid value '%s' for CEE '%s'"
938
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
939
940 2
    def validate_references(self, yaml_file):
941 2
        if self.references is None:
942
            raise ValueError("Empty references section in file %s" % yaml_file)
943
944 2
        for ref_type, ref_val in self.references.items():
945 2
            if not isinstance(ref_type, str) or not isinstance(ref_val, str):
946
                raise ValueError("References and values must be strings: %s in file %s"
947
                                 % (ref_type, yaml_file))
948 2
            if ref_val.strip() == "":
949
                raise ValueError("References must not be empty: %s in file %s"
950
                                 % (ref_type, yaml_file))
951
952 2
        for ref_type, ref_val in self.references.items():
953 2
            for ref in ref_val.split(","):
954 2
                if ref.strip() != ref:
955
                    msg = (
956
                        "Comma-separated '{ref_type}' reference "
957
                        "in {yaml_file} contains whitespace."
958
                        .format(ref_type=ref_type, yaml_file=yaml_file))
959
                    raise ValueError(msg)
960
961 2
    def validate_prodtype(self, yaml_file):
962 2
        for ptype in self.prodtype.split(","):
963 2
            if ptype.strip() != ptype:
964
                msg = (
965
                    "Comma-separated '{prodtype}' prodtype "
966
                    "in {yaml_file} contains whitespace."
967
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
968
                raise ValueError(msg)
969
970 2
    def to_xml_element(self):
971
        rule = ET.Element('Rule')
972
        rule.set('id', self.id_)
973
        if self.prodtype != "all":
974
            rule.set("prodtype", self.prodtype)
975
        rule.set('severity', self.severity)
976
        add_sub_element(rule, 'title', self.title)
977
        add_sub_element(rule, 'description', self.description)
978
        add_sub_element(rule, 'rationale', self.rationale)
979
980
        main_ident = ET.Element('ident')
981
        for ident_type, ident_val in self.identifiers.items():
982
            # This is not true if items were normalized
983
            if '@' in ident_type:
984
                # the ident is applicable only on some product
985
                # format : 'policy@product', eg. 'stigid@product'
986
                # for them, we create a separate <ref> element
987
                policy, product = ident_type.split('@')
988
                ident = ET.SubElement(rule, 'ident')
989
                ident.set(policy, ident_val)
990
                ident.set('prodtype', product)
991
            else:
992
                main_ident.set(ident_type, ident_val)
993
994
        if main_ident.attrib:
995
            rule.append(main_ident)
996
997
        main_ref = ET.Element('ref')
998
        for ref_type, ref_val in self.references.items():
999
            # This is not true if items were normalized
1000
            if '@' in ref_type:
1001
                # the reference is applicable only on some product
1002
                # format : 'policy@product', eg. 'stigid@product'
1003
                # for them, we create a separate <ref> element
1004
                policy, product = ref_type.split('@')
1005
                ref = ET.SubElement(rule, 'ref')
1006
                ref.set(policy, ref_val)
1007
                ref.set('prodtype', product)
1008
            else:
1009
                main_ref.set(ref_type, ref_val)
1010
1011
        if main_ref.attrib:
1012
            rule.append(main_ref)
1013
1014
        if self.oval_external_content:
1015
            check = ET.SubElement(rule, 'check')
1016
            check.set("system", "http://oval.mitre.org/XMLSchema/oval-definitions-5")
1017
            external_content = ET.SubElement(check, "check-content-ref")
1018
            external_content.set("href", self.oval_external_content)
1019
        else:
1020
            # TODO: This is pretty much a hack, oval ID will be the same as rule ID
1021
            #       and we don't want the developers to have to keep them in sync.
1022
            #       Therefore let's just add an OVAL ref of that ID.
1023
            oval_ref = ET.SubElement(rule, "oval")
1024
            oval_ref.set("id", self.id_)
1025
1026
        if self.ocil or self.ocil_clause:
1027
            ocil = add_sub_element(rule, 'ocil', self.ocil if self.ocil else "")
1028
            if self.ocil_clause:
1029
                ocil.set("clause", self.ocil_clause)
1030
1031
        add_warning_elements(rule, self.warnings)
1032
1033
        if self.platform:
1034
            platform_el = ET.SubElement(rule, "platform")
1035
            try:
1036
                platform_cpe = XCCDF_PLATFORM_TO_CPE[self.platform]
1037
            except KeyError:
1038
                raise ValueError("Unsupported platform '%s' in rule '%s'." % (self.platform, self.id_))
1039
            platform_el.set("idref", platform_cpe)
1040
1041
        return rule
1042
1043 2
    def to_file(self, file_name):
1044
        root = self.to_xml_element()
1045
        tree = ET.ElementTree(root)
1046
        tree.write(file_name)
1047
1048
1049 2
class DirectoryLoader(object):
1050 2
    def __init__(self, profiles_dir, bash_remediation_fns, env_yaml):
1051
        self.benchmark_file = None
1052
        self.group_file = None
1053
        self.loaded_group = None
1054
        self.rule_files = []
1055
        self.value_files = []
1056
        self.subdirectories = []
1057
1058
        self.all_values = set()
1059
        self.all_rules = set()
1060
        self.all_groups = set()
1061
1062
        self.profiles_dir = profiles_dir
1063
        self.bash_remediation_fns = bash_remediation_fns
1064
        self.env_yaml = env_yaml
1065
        self.product = env_yaml["product"]
1066
1067
        self.parent_group = None
1068
1069 2
    def _collect_items_to_load(self, guide_directory):
1070
        for dir_item in os.listdir(guide_directory):
1071
            dir_item_path = os.path.join(guide_directory, dir_item)
1072
            _, extension = os.path.splitext(dir_item)
1073
1074
            if extension == '.var':
1075
                self.value_files.append(dir_item_path)
1076
            elif dir_item == "benchmark.yml":
1077
                if self.benchmark_file:
1078
                    raise ValueError("Multiple benchmarks in one directory")
1079
                self.benchmark_file = dir_item_path
1080
            elif dir_item == "group.yml":
1081
                if self.group_file:
1082
                    raise ValueError("Multiple groups in one directory")
1083
                self.group_file = dir_item_path
1084
            elif extension == '.rule':
1085
                self.rule_files.append(dir_item_path)
1086
            elif is_rule_dir(dir_item_path):
1087
                self.rule_files.append(get_rule_dir_yaml(dir_item_path))
1088
            elif dir_item != "tests":
1089
                if os.path.isdir(dir_item_path):
1090
                    self.subdirectories.append(dir_item_path)
1091
                else:
1092
                    sys.stderr.write(
1093
                        "Encountered file '%s' while recursing, extension '%s' "
1094
                        "is unknown. Skipping..\n"
1095
                        % (dir_item, extension)
1096
                    )
1097
1098 2
    def load_benchmark_or_group(self, guide_directory):
1099
        """
1100
        Loads a given benchmark or group from the specified benchmark_file or
1101
        group_file, in the context of guide_directory, action, profiles_dir,
1102
        env_yaml, and bash_remediation_fns.
1103
1104
        Returns the loaded group or benchmark.
1105
        """
1106
        group = None
1107
        if self.group_file and self.benchmark_file:
1108
            raise ValueError("A .benchmark file and a .group file were found in "
1109
                             "the same directory '%s'" % (guide_directory))
1110
1111
        # we treat benchmark as a special form of group in the following code
1112
        if self.benchmark_file:
1113
            group = Benchmark.from_yaml(
1114
                self.benchmark_file, 'product-name', self.env_yaml
1115
            )
1116
            if self.profiles_dir:
1117
                group.add_profiles_from_dir(self.action, self.profiles_dir, self.env_yaml)
1118
            group.add_bash_remediation_fns_from_file(self.action, self.bash_remediation_fns)
1119
1120
        if self.group_file:
1121
            group = Group.from_yaml(self.group_file, self.env_yaml)
1122
            self.all_groups.add(group.id_)
1123
1124
        return group
1125
1126 2
    def _load_group_process_and_recurse(self, guide_directory):
1127
        self.loaded_group = self.load_benchmark_or_group(guide_directory)
1128
1129
        if self.loaded_group:
1130
            if self.parent_group:
1131
                self.parent_group.add_group(self.loaded_group)
1132
1133
            self._process_values()
1134
            self._recurse_into_subdirs()
1135
            self._process_rules()
1136
1137 2
    def process_directory_tree(self, start_dir, extra_group_dirs=None):
1138
        self._collect_items_to_load(start_dir)
1139
        if extra_group_dirs is not None:
1140
            self.subdirectories += extra_group_dirs
1141
        self._load_group_process_and_recurse(start_dir)
1142
1143 2
    def _recurse_into_subdirs(self):
1144
        for subdir in self.subdirectories:
1145
            loader = self._get_new_loader()
1146
            loader.parent_group = self.loaded_group
1147
            loader.process_directory_tree(subdir)
1148
            self.all_values.update(loader.all_values)
1149
            self.all_rules.update(loader.all_rules)
1150
            self.all_groups.update(loader.all_groups)
1151
1152 2
    def _get_new_loader(self):
1153
        raise NotImplementedError()
1154
1155 2
    def _process_values(self):
1156
        raise NotImplementedError()
1157
1158 2
    def _process_rules(self):
1159
        raise NotImplementedError()
1160
1161
1162 2
class BuildLoader(DirectoryLoader):
1163 2
    def __init__(self, profiles_dir, bash_remediation_fns, env_yaml, resolved_rules_dir=None):
1164
        super(BuildLoader, self).__init__(profiles_dir, bash_remediation_fns, env_yaml)
1165
1166
        self.action = "build"
1167
1168
        self.resolved_rules_dir = resolved_rules_dir
1169
        if resolved_rules_dir and not os.path.isdir(resolved_rules_dir):
1170
            os.mkdir(resolved_rules_dir)
1171
1172 2
    def _process_values(self):
1173
        for value_yaml in self.value_files:
1174
            value = Value.from_yaml(value_yaml, self.env_yaml)
1175
            self.all_values.add(value)
1176
            self.loaded_group.add_value(value)
1177
1178 2
    def _process_rules(self):
1179
        for rule_yaml in self.rule_files:
1180
            try:
1181
                rule = Rule.from_yaml(rule_yaml, self.env_yaml)
1182
            except DocumentationNotComplete:
1183
                # Happens on non-debug build when a rule is "documentation-incomplete"
1184
                continue
1185
            prodtypes = parse_prodtype(rule.prodtype)
1186
            if "all" not in prodtypes and self.product not in prodtypes:
1187
                continue
1188
            self.all_rules.add(rule)
1189
            self.loaded_group.add_rule(rule)
1190
            if self.resolved_rules_dir:
1191
                output_for_rule = os.path.join(
1192
                    self.resolved_rules_dir, "{id_}.yml".format(id_=rule.id_))
1193
                mkdir_p(self.resolved_rules_dir)
1194
                with open(output_for_rule, "w") as f:
1195
                    rule.normalize(self.env_yaml["product"])
1196
                    yaml.dump(rule.to_contents_dict(), f)
1197
1198 2
    def _get_new_loader(self):
1199
        return BuildLoader(
1200
            self.profiles_dir, self.bash_remediation_fns, self.env_yaml, self.resolved_rules_dir)
1201
1202 2
    def export_group_to_file(self, filename):
1203
        return self.loaded_group.to_file(filename)
1204
1205
1206 2
class ListInputsLoader(DirectoryLoader):
1207 2
    def __init__(self, profiles_dir, bash_remediation_fns, env_yaml):
1208
        super(ListInputsLoader, self).__init__(profiles_dir, bash_remediation_fns, env_yaml)
1209
1210
        self.action = "list-inputs"
1211
1212 2
    def _process_values(self):
1213
        for value_yaml in self.value_files:
1214
            print(value_yaml)
1215
1216 2
    def _process_rules(self):
1217
        for rule_yaml in self.rule_files:
1218
            print(rule_yaml)
1219
1220 2
    def _get_new_loader(self):
1221
        return ListInputsLoader(
1222
            self.profiles_dir, self.bash_remediation_fns, self.env_yaml)
1223
1224 2
    def load_benchmark_or_group(self, guide_directory):
1225
        result = super(ListInputsLoader, self).load_benchmark_or_group(guide_directory)
1226
1227
        if self.benchmark_file:
1228
            print(self.benchmark_file)
1229
1230
        if self.group_file:
1231
            print(self.group_file)
1232
1233
        return result
1234