Passed
Pull Request — master (#5209)
by Matěj
02:19
created

ssg.build_yaml.Profile.from_yaml()   A

Complexity

Conditions 4

Size

Total Lines 24
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 17
CRAP Score 4.0186

Importance

Changes 0
Metric Value
cc 4
eloc 20
nop 3
dl 0
loc 24
ccs 17
cts 19
cp 0.8947
crap 4.0186
rs 9.4
c 0
b 0
f 0
1 2
from __future__ import absolute_import
2 2
from __future__ import print_function
3
4 2
import os
5 2
import os.path
6 2
from collections import defaultdict
7 2
import datetime
8 2
import re
9 2
import sys
10
11 2
import yaml
12
13 2
from .constants import XCCDF_PLATFORM_TO_CPE
14 2
from .constants import PRODUCT_TO_CPE_MAPPING
15 2
from .constants import STIG_PLATFORM_ID_MAP
16 2
from .constants import XCCDF_REFINABLE_PROPERTIES
17 2
from .rules import get_rule_dir_id, get_rule_dir_yaml, is_rule_dir
18 2
from .rule_yaml import parse_prodtype
19
20 2
from .checks import is_cce_format_valid, is_cce_value_valid
21 2
from .yaml import DocumentationNotComplete, open_and_expand, open_and_macro_expand
22 2
from .utils import required_key, mkdir_p
23
24 2
from .xml import ElementTree as ET
25 2
from .shims import unicode_func
26
27
28 2
def add_sub_element(parent, tag, data):
29
    """
30
    Creates a new child element under parent with tag tag, and sets
31
    data as the content under the tag. In particular, data is a string
32
    to be parsed as an XML tree, allowing sub-elements of children to be
33
    added.
34
35
    If data should not be parsed as an XML tree, either escape the contents
36
    before passing into this function, or use ElementTree.SubElement().
37
38
    Returns the newly created subelement of type tag.
39
    """
40
    # This is used because our YAML data contain XML and XHTML elements
41
    # ET.SubElement() escapes the < > characters by &lt; and &gt;
42
    # and therefore it does not add child elements
43
    # we need to do a hack instead
44
    # TODO: Remove this function after we move to Markdown everywhere in SSG
45
    ustr = unicode_func("<{0}>{1}</{0}>").format(tag, data)
46
47
    try:
48
        element = ET.fromstring(ustr.encode("utf-8"))
49
    except Exception:
50
        msg = ("Error adding subelement to an element '{0}' from string: '{1}'"
51
               .format(parent.tag, ustr))
52
        raise RuntimeError(msg)
53
54
    parent.append(element)
55
    return element
56
57
58 2
def reorder_according_to_ordering(unordered, ordering, regex=None):
59 2
    ordered = []
60 2
    if regex is None:
61 2
        regex = "|".join(["({0})".format(item) for item in ordering])
62 2
    regex = re.compile(regex)
63
64 2
    items_to_order = list(filter(regex.match, unordered))
65 2
    unordered = set(unordered)
66
67 2
    for priority_type in ordering:
68 2
        for item in items_to_order:
69 2
            if priority_type in item and item in unordered:
70 2
                ordered.append(item)
71 2
                unordered.remove(item)
72 2
    ordered.extend(list(unordered))
73 2
    return ordered
74
75
76 2
def add_warning_elements(element, warnings):
77
    # The use of [{dict}, {dict}] in warnings is to handle the following
78
    # scenario where multiple warnings have the same category which is
79
    # valid in SCAP and our content:
80
    #
81
    # warnings:
82
    #     - general: Some general warning
83
    #     - general: Some other general warning
84
    #     - general: |-
85
    #         Some really long multiline general warning
86
    #
87
    # Each of the {dict} should have only one key/value pair.
88
    for warning_dict in warnings:
89
        warning = add_sub_element(element, "warning", list(warning_dict.values())[0])
90
        warning.set("category", list(warning_dict.keys())[0])
91
92
93 2
class Profile(object):
94
    """Represents XCCDF profile
95
    """
96
97 2
    def __init__(self, id_):
98 2
        self.id_ = id_
99 2
        self.title = ""
100 2
        self.description = ""
101 2
        self.extends = None
102 2
        self.selected = []
103 2
        self.unselected = []
104 2
        self.variables = dict()
105 2
        self.refine_rules = defaultdict(list)
106
107 2
    @classmethod
108 2
    def from_yaml(cls, yaml_file, env_yaml=None):
109 2
        yaml_contents = open_and_expand(yaml_file, env_yaml)
110 2
        if yaml_contents is None:
111
            return None
112
113 2
        basename, _ = os.path.splitext(os.path.basename(yaml_file))
114
115 2
        profile = cls(basename)
116 2
        profile.title = required_key(yaml_contents, "title")
117 2
        del yaml_contents["title"]
118 2
        profile.description = required_key(yaml_contents, "description")
119 2
        del yaml_contents["description"]
120 2
        profile.extends = yaml_contents.pop("extends", None)
121 2
        selection_entries = required_key(yaml_contents, "selections")
122 2
        if selection_entries:
123 2
            profile._parse_selections(selection_entries)
124 2
        del yaml_contents["selections"]
125
126 2
        if yaml_contents:
127
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
128
                               % (yaml_file, yaml_contents))
129
130 2
        return profile
131
132 2
    def dump_yaml(self, file_name, documentation_complete=True):
133
        to_dump = {}
134
        to_dump["documentation_complete"] = documentation_complete
135
        to_dump["title"] = self.title
136
        to_dump["description"] = self.description
137
        if self.extends is not None:
138
            to_dump["extends"] = self.extends
139
140
        selections = []
141
        for item in self.selected:
142
            selections.append(item)
143
        for item in self.unselected:
144
            selections.append("!"+item)
145
        for varname in self.variables.keys():
146
            selections.append(varname+"="+self.variables.get(varname))
147
        for rule, refinements in self.refine_rules.items():
148
            for prop, val in refinements:
149
                selections.append("{rule}.{property}={value}"
150
                                  .format(rule=rule, property=prop, value=val))
151
        to_dump["selections"] = selections
152
        with open(file_name, "w+") as f:
153
            yaml.dump(to_dump, f, indent=4)
154
155 2
    def _parse_selections(self, entries):
156 2
        for item in entries:
157 2
            if "." in item:
158
                rule, refinement = item.split(".", 1)
159
                property_, value = refinement.split("=", 1)
160
                if property_ not in XCCDF_REFINABLE_PROPERTIES:
161
                    msg = ("Property '{property_}' cannot be refined. "
162
                           "Rule properties that can be refined are {refinables}. "
163
                           "Fix refinement '{rule_id}.{property_}={value}' in profile '{profile}'."
164
                           .format(property_=property_, refinables=XCCDF_REFINABLE_PROPERTIES,
165
                                   rule_id=rule, value=value, profile=self.id_)
166
                           )
167
                    raise ValueError(msg)
168
                self.refine_rules[rule].append((property_, value))
169 2
            elif "=" in item:
170 2
                varname, value = item.split("=", 1)
171 2
                self.variables[varname] = value
172 2
            elif item.startswith("!"):
173
                self.unselected.append(item[1:])
174
            else:
175 2
                self.selected.append(item)
176
177 2
    def to_xml_element(self):
178
        element = ET.Element('Profile')
179
        element.set("id", self.id_)
180
        if self.extends:
181
            element.set("extends", self.extends)
182
        title = add_sub_element(element, "title", self.title)
183
        title.set("override", "true")
184
        desc = add_sub_element(element, "description", self.description)
185
        desc.set("override", "true")
186
187
        for selection in self.selected:
188
            select = ET.Element("select")
189
            select.set("idref", selection)
190
            select.set("selected", "true")
191
            element.append(select)
192
193
        for selection in self.unselected:
194
            unselect = ET.Element("select")
195
            unselect.set("idref", selection)
196
            unselect.set("selected", "false")
197
            element.append(unselect)
198
199
        for value_id, selector in self.variables.items():
200
            refine_value = ET.Element("refine-value")
201
            refine_value.set("idref", value_id)
202
            refine_value.set("selector", selector)
203
            element.append(refine_value)
204
205
        for refined_rule, refinement_list in self.refine_rules.items():
206
            refine_rule = ET.Element("refine-rule")
207
            refine_rule.set("idref", refined_rule)
208
            for refinement in refinement_list:
209
                refine_rule.set(refinement[0], refinement[1])
210
            element.append(refine_rule)
211
212
        return element
213
214 2
    def get_rule_selectors(self):
215 2
        return list(self.selected + self.unselected)
216
217 2
    def get_variable_selectors(self):
218 2
        return self.variables
219
220 2
    def validate_refine_rules(self, rules):
221
        existing_rule_ids = [r.id_ for r in rules]
222
        for refine_rule, refinement_list in self.refine_rules.items():
223
            # Take first refinement to ilustrate where the error is
224
            # all refinements in list are invalid, so it doesn't really matter
225
            a_refinement = refinement_list[0]
226
227
            if refine_rule not in existing_rule_ids:
228
                msg = (
229
                    "You are trying to refine a rule that doesn't exist. "
230
                    "Rule '{rule_id}' was not found in the benchmark. "
231
                    "Please check all rule refinements for rule: '{rule_id}', for example: "
232
                    "- {rule_id}.{property_}={value}' in profile {profile_id}."
233
                    .format(rule_id=refine_rule, profile_id=self.id_,
234
                            property_=a_refinement[0], value=a_refinement[1])
235
                    )
236
                raise ValueError(msg)
237
238
            if refine_rule not in self.get_rule_selectors():
239
                msg = ("- {rule_id}.{property_}={value}' in profile '{profile_id}' is refining "
240
                       "a rule that is not selected by it. The refinement will not have any "
241
                       "noticeable effect. Either select the rule or remove the rule refinement."
242
                       .format(rule_id=refine_rule, property_=a_refinement[0],
243
                               value=a_refinement[1], profile_id=self.id_)
244
                       )
245
                raise ValueError(msg)
246
247 2
    def validate_variables(self, variables):
248
        variables_by_id = dict()
249
        for var in variables:
250
            variables_by_id[var.id_] = var
251
252
        for var_id, our_val in self.variables.items():
253
            if var_id not in variables_by_id:
254
                all_vars_list = [" - %s" % v for v in variables_by_id.keys()]
255
                msg = (
256
                    "Value '{var_id}' in profile '{profile_name}' is not known. "
257
                    "We know only variables:\n{var_names}"
258
                    .format(
259
                        var_id=var_id, profile_name=self.id_,
260
                        var_names="\n".join(sorted(all_vars_list)))
261
                )
262
                raise ValueError(msg)
263
264
            allowed_selectors = [str(s) for s in variables_by_id[var_id].options.keys()]
265
            if our_val not in allowed_selectors:
266
                msg = (
267
                    "Value '{var_id}' in profile '{profile_name}' "
268
                    "uses the selector '{our_val}'. "
269
                    "This is not possible, as only selectors {all_selectors} are available. "
270
                    "Either change the selector used in the profile, or "
271
                    "add the selector-value pair to the variable definition."
272
                    .format(
273
                        var_id=var_id, profile_name=self.id_, our_val=our_val,
274
                        all_selectors=allowed_selectors,
275
                    )
276
                )
277
                raise ValueError(msg)
278
279 2
    def validate_rules(self, rules, groups):
280
        existing_rule_ids = [r.id_ for r in rules]
281
        rule_selectors = self.get_rule_selectors()
282
        for id_ in rule_selectors:
283
            if id_ in groups:
284
                msg = (
285
                    "You have selected a group '{group_id}' instead of a "
286
                    "rule. Groups have no effect in the profile and are not "
287
                    "allowed to be selected. Please remove '{group_id}' "
288
                    "from profile '{profile_id}' before proceeding."
289
                    .format(group_id=id_, profile_id=self.id_)
290
                )
291
                raise ValueError(msg)
292
            if id_ not in existing_rule_ids:
293
                msg = (
294
                    "Rule '{rule_id}' was not found in the benchmark. Please "
295
                    "remove rule '{rule_id}' from profile '{profile_id}' "
296
                    "before proceeding."
297
                    .format(rule_id=id_, profile_id=self.id_)
298
                )
299
                raise ValueError(msg)
300
301 2
    def __sub__(self, other):
302
        profile = Profile(self.id_)
303
        profile.title = self.title
304
        profile.description = self.description
305
        profile.extends = self.extends
306
        profile.selected = list(set(self.selected) - set(other.selected))
307
        profile.selected.sort()
308
        profile.unselected = list(set(self.unselected) - set(other.unselected))
309
        profile.variables = dict ((k, v) for (k, v) in self.variables.items()
310
                             if k not in other.variables or v != other.variables[k])
311
        return profile
312
313
314 2
class Value(object):
315
    """Represents XCCDF Value
316
    """
317
318 2
    def __init__(self, id_):
319 2
        self.id_ = id_
320 2
        self.title = ""
321 2
        self.description = ""
322 2
        self.type_ = "string"
323 2
        self.operator = "equals"
324 2
        self.interactive = False
325 2
        self.options = {}
326 2
        self.warnings = []
327
328 2
    @staticmethod
329 2
    def from_yaml(yaml_file, env_yaml=None):
330 2
        yaml_contents = open_and_expand(yaml_file, env_yaml)
331 2
        if yaml_contents is None:
332
            return None
333
334 2
        value_id, _ = os.path.splitext(os.path.basename(yaml_file))
335 2
        value = Value(value_id)
336 2
        value.title = required_key(yaml_contents, "title")
337 2
        del yaml_contents["title"]
338 2
        value.description = required_key(yaml_contents, "description")
339 2
        del yaml_contents["description"]
340 2
        value.type_ = required_key(yaml_contents, "type")
341 2
        del yaml_contents["type"]
342 2
        value.operator = yaml_contents.pop("operator", "equals")
343 2
        possible_operators = ["equals", "not equal", "greater than",
344
                              "less than", "greater than or equal",
345
                              "less than or equal", "pattern match"]
346
347 2
        if value.operator not in possible_operators:
348
            raise ValueError(
349
                "Found an invalid operator value '%s' in '%s'. "
350
                "Expected one of: %s"
351
                % (value.operator, yaml_file, ", ".join(possible_operators))
352
            )
353
354 2
        value.interactive = \
355
            yaml_contents.pop("interactive", "false").lower() == "true"
356
357 2
        value.options = required_key(yaml_contents, "options")
358 2
        del yaml_contents["options"]
359 2
        value.warnings = yaml_contents.pop("warnings", [])
360
361 2
        for warning_list in value.warnings:
362
            if len(warning_list) != 1:
363
                raise ValueError("Only one key/value pair should exist for each dictionary")
364
365 2
        if yaml_contents:
366
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
367
                               % (yaml_file, yaml_contents))
368
369 2
        return value
370
371 2
    def to_xml_element(self):
372
        value = ET.Element('Value')
373
        value.set('id', self.id_)
374
        value.set('type', self.type_)
375
        if self.operator != "equals":  # equals is the default
376
            value.set('operator', self.operator)
377
        if self.interactive:  # False is the default
378
            value.set('interactive', 'true')
379
        title = ET.SubElement(value, 'title')
380
        title.text = self.title
381
        add_sub_element(value, 'description', self.description)
382
        add_warning_elements(value, self.warnings)
383
384
        for selector, option in self.options.items():
385
            # do not confuse Value with big V with value with small v
386
            # value is child element of Value
387
            value_small = ET.SubElement(value, 'value')
388
            # by XCCDF spec, default value is value without selector
389
            if selector != "default":
390
                value_small.set('selector', str(selector))
391
            value_small.text = str(option)
392
393
        return value
394
395 2
    def to_file(self, file_name):
396
        root = self.to_xml_element()
397
        tree = ET.ElementTree(root)
398
        tree.write(file_name)
399
400
401 2
class Benchmark(object):
402
    """Represents XCCDF Benchmark
403
    """
404 2
    def __init__(self, id_):
405
        self.id_ = id_
406
        self.title = ""
407
        self.status = ""
408
        self.description = ""
409
        self.notice_id = ""
410
        self.notice_description = ""
411
        self.front_matter = ""
412
        self.rear_matter = ""
413
        self.cpes = []
414
        self.version = "0.1"
415
        self.profiles = []
416
        self.values = {}
417
        self.bash_remediation_fns_group = None
418
        self.groups = {}
419
        self.rules = {}
420
421
        # This is required for OCIL clauses
422
        conditional_clause = Value("conditional_clause")
423
        conditional_clause.title = "A conditional clause for check statements."
424
        conditional_clause.description = conditional_clause.title
425
        conditional_clause.type_ = "string"
426
        conditional_clause.options = {"": "This is a placeholder"}
427
428
        self.add_value(conditional_clause)
429
430 2
    @classmethod
431 2
    def from_yaml(cls, yaml_file, id_, product_yaml=None):
432
        yaml_contents = open_and_macro_expand(yaml_file, product_yaml)
433
        if yaml_contents is None:
434
            return None
435
436
        benchmark = cls(id_)
437
        benchmark.title = required_key(yaml_contents, "title")
438
        del yaml_contents["title"]
439
        benchmark.status = required_key(yaml_contents, "status")
440
        del yaml_contents["status"]
441
        benchmark.description = required_key(yaml_contents, "description")
442
        del yaml_contents["description"]
443
        notice_contents = required_key(yaml_contents, "notice")
444
        benchmark.notice_id = required_key(notice_contents, "id")
445
        del notice_contents["id"]
446
        benchmark.notice_description = required_key(notice_contents,
447
                                                    "description")
448
        del notice_contents["description"]
449
        if not notice_contents:
450
            del yaml_contents["notice"]
451
452
        benchmark.front_matter = required_key(yaml_contents,
453
                                              "front-matter")
454
        del yaml_contents["front-matter"]
455
        benchmark.rear_matter = required_key(yaml_contents,
456
                                             "rear-matter")
457
        del yaml_contents["rear-matter"]
458
        benchmark.version = str(required_key(yaml_contents, "version"))
459
        del yaml_contents["version"]
460
461
        if yaml_contents:
462
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
463
                               % (yaml_file, yaml_contents))
464
465
        if product_yaml:
466
            benchmark.cpes = PRODUCT_TO_CPE_MAPPING[product_yaml["product"]]
467
468
        return benchmark
469
470 2
    def add_profiles_from_dir(self, action, dir_, env_yaml):
471
        for dir_item in os.listdir(dir_):
472
            dir_item_path = os.path.join(dir_, dir_item)
473
            if not os.path.isfile(dir_item_path):
474
                continue
475
476
            _, ext = os.path.splitext(os.path.basename(dir_item_path))
477
            if ext != '.profile':
478
                sys.stderr.write(
479
                    "Encountered file '%s' while looking for profiles, "
480
                    "extension '%s' is unknown. Skipping..\n"
481
                    % (dir_item, ext)
482
                )
483
                continue
484
485
            try:
486
                new_profile = Profile.from_yaml(dir_item_path, env_yaml)
487
            except DocumentationNotComplete:
488
                continue
489
            except Exception as exc:
490
                msg = ("Error building profile from '{fname}': '{error}'"
491
                       .format(fname=dir_item_path, error=str(exc)))
492
                raise RuntimeError(msg)
493
            if new_profile is None:
494
                continue
495
496
            self.profiles.append(new_profile)
497
            if action == "list-inputs":
498
                print(dir_item_path)
499
500 2
    def add_bash_remediation_fns_from_file(self, action, file_):
501
        if action == "list-inputs":
502
            print(file_)
503
        else:
504
            tree = ET.parse(file_)
505
            self.bash_remediation_fns_group = tree.getroot()
506
507 2
    def to_xml_element(self):
508
        root = ET.Element('Benchmark')
509
        root.set('xmlns:xsi', 'http://www.w3.org/2001/XMLSchema-instance')
510
        root.set('xmlns:xhtml', 'http://www.w3.org/1999/xhtml')
511
        root.set('xmlns:dc', 'http://purl.org/dc/elements/1.1/')
512
        root.set('id', 'product-name')
513
        root.set('xsi:schemaLocation',
514
                 'http://checklists.nist.gov/xccdf/1.1 xccdf-1.1.4.xsd')
515
        root.set('style', 'SCAP_1.1')
516
        root.set('resolved', 'false')
517
        root.set('xml:lang', 'en-US')
518
        status = ET.SubElement(root, 'status')
519
        status.set('date', datetime.date.today().strftime("%Y-%m-%d"))
520
        status.text = self.status
521
        add_sub_element(root, "title", self.title)
522
        add_sub_element(root, "description", self.description)
523
        notice = add_sub_element(root, "notice", self.notice_description)
524
        notice.set('id', self.notice_id)
525
        add_sub_element(root, "front-matter", self.front_matter)
526
        add_sub_element(root, "rear-matter", self.rear_matter)
527
528
        for idref in self.cpes:
529
            plat = ET.SubElement(root, "platform")
530
            plat.set("idref", idref)
531
532
        version = ET.SubElement(root, 'version')
533
        version.text = self.version
534
        ET.SubElement(root, "metadata")
535
536
        for profile in self.profiles:
537
            root.append(profile.to_xml_element())
538
539
        for value in self.values.values():
540
            root.append(value.to_xml_element())
541
        if self.bash_remediation_fns_group is not None:
542
            root.append(self.bash_remediation_fns_group)
543
544
        groups_in_bench = list(self.groups.keys())
545
        priority_order = ["system", "services"]
546
        groups_in_bench = reorder_according_to_ordering(groups_in_bench, priority_order)
547
548
        # Make system group the first, followed by services group
549
        for group_id in groups_in_bench:
550
            group = self.groups.get(group_id)
551
            # Products using application benchmark don't have system or services group
552
            if group is not None:
553
                root.append(group.to_xml_element())
554
555
        for rule in self.rules.values():
556
            root.append(rule.to_xml_element())
557
558
        return root
559
560 2
    def to_file(self, file_name):
561
        root = self.to_xml_element()
562
        tree = ET.ElementTree(root)
563
        tree.write(file_name)
564
565 2
    def add_value(self, value):
566
        if value is None:
567
            return
568
        self.values[value.id_] = value
569
570 2
    def add_group(self, group):
571
        if group is None:
572
            return
573
        self.groups[group.id_] = group
574
575 2
    def add_rule(self, rule):
576
        if rule is None:
577
            return
578
        self.rules[rule.id_] = rule
579
580 2
    def to_xccdf(self):
581
        """We can easily extend this script to generate a valid XCCDF instead
582
        of SSG SHORTHAND.
583
        """
584
        raise NotImplementedError
585
586 2
    def __str__(self):
587
        return self.id_
588
589
590 2
class Group(object):
591
    """Represents XCCDF Group
592
    """
593 2
    ATTRIBUTES_TO_PASS_ON = (
594
        "platform",
595
    )
596
597 2
    def __init__(self, id_):
598
        self.id_ = id_
599
        self.prodtype = "all"
600
        self.title = ""
601
        self.description = ""
602
        self.warnings = []
603
        self.values = {}
604
        self.groups = {}
605
        self.rules = {}
606
        self.platform = None
607
608 2
    @classmethod
609 2
    def from_yaml(cls, yaml_file, env_yaml=None):
610
        yaml_contents = open_and_macro_expand(yaml_file, env_yaml)
611
        if yaml_contents is None:
612
            return None
613
614
        group_id = os.path.basename(os.path.dirname(yaml_file))
615
        group = cls(group_id)
616
        group.prodtype = yaml_contents.pop("prodtype", "all")
617
        group.title = required_key(yaml_contents, "title")
618
        del yaml_contents["title"]
619
        group.description = required_key(yaml_contents, "description")
620
        del yaml_contents["description"]
621
        group.warnings = yaml_contents.pop("warnings", [])
622
        group.platform = yaml_contents.pop("platform", None)
623
624
        for warning_list in group.warnings:
625
            if len(warning_list) != 1:
626
                raise ValueError("Only one key/value pair should exist for each dictionary")
627
628
        if yaml_contents:
629
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
630
                               % (yaml_file, yaml_contents))
631
        group.validate_prodtype(yaml_file)
632
        return group
633
634 2
    def validate_prodtype(self, yaml_file):
635
        for ptype in self.prodtype.split(","):
636
            if ptype.strip() != ptype:
637
                msg = (
638
                    "Comma-separated '{prodtype}' prodtype "
639
                    "in {yaml_file} contains whitespace."
640
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
641
                raise ValueError(msg)
642
643 2
    def to_xml_element(self):
644
        group = ET.Element('Group')
645
        group.set('id', self.id_)
646
        if self.prodtype != "all":
647
            group.set("prodtype", self.prodtype)
648
        title = ET.SubElement(group, 'title')
649
        title.text = self.title
650
        add_sub_element(group, 'description', self.description)
651
        add_warning_elements(group, self.warnings)
652
653
        if self.platform:
654
            platform_el = ET.SubElement(group, "platform")
655
            try:
656
                platform_cpe = XCCDF_PLATFORM_TO_CPE[self.platform]
657
            except KeyError:
658
                raise ValueError("Unsupported platform '%s' in rule '%s'." % (self.platform, self.id_))
659
            platform_el.set("idref", platform_cpe)
660
661
        for _value in self.values.values():
662
            group.append(_value.to_xml_element())
663
664
        # Rules that install or remove packages affect remediation
665
        # of other rules.
666
        # When packages installed/removed rules come first:
667
        # The Rules are ordered in more logical way, and
668
        # remediation order is natural, first the package is installed, then configured.
669
        rules_in_group = list(self.rules.keys())
670
        regex = r'(package_.*_(installed|removed))|(service_.*_(enabled|disabled))$'
671
        priority_order = ["installed", "removed", "enabled", "disabled"]
672
        rules_in_group = reorder_according_to_ordering(rules_in_group, priority_order, regex)
673
674
        # Add rules in priority order, first all packages installed, then removed,
675
        # followed by services enabled, then disabled
676
        for rule_id in rules_in_group:
677
            group.append(self.rules.get(rule_id).to_xml_element())
678
679
        # Add the sub groups after any current level group rules.
680
        # As package installed/removed and service enabled/disabled rules are usuallly in
681
        # top level group, this ensures groups that further configure a package or service
682
        # are after rules that install or remove it.
683
        groups_in_group = list(self.groups.keys())
684
        # The FIPS group should come before Crypto - if we want to set a different (stricter) Crypto Policy than FIPS.
685
        # the firewalld_activation must come before ruleset_modifications, othervise
686
        # remediations for ruleset_modifications won't work
687
        # rules from group disabling_ipv6 must precede rules from configuring_ipv6,
688
        # otherwise the remediation prints error although it is successful
689
        priority_order = ["fips", "crypto", "firewalld_activation",
690
        "ruleset_modifications", "disabling_ipv6", "configuring_ipv6"]
691
        groups_in_group = reorder_according_to_ordering(groups_in_group, priority_order)
692
        for group_id in groups_in_group:
693
            _group = self.groups[group_id]
694
            group.append(_group.to_xml_element())
695
696
        return group
697
698 2
    def to_file(self, file_name):
699
        root = self.to_xml_element()
700
        tree = ET.ElementTree(root)
701
        tree.write(file_name)
702
703 2
    def add_value(self, value):
704
        if value is None:
705
            return
706
        self.values[value.id_] = value
707
708 2
    def add_group(self, group):
709
        if group is None:
710
            return
711
        if self.platform and not group.platform:
712
            group.platform = self.platform
713
        self.groups[group.id_] = group
714
        self._pass_our_properties_on_to(group)
715
716 2
    def _pass_our_properties_on_to(self, obj):
717
        for attr in self.ATTRIBUTES_TO_PASS_ON:
718
            if hasattr(obj, attr) and getattr(obj, attr) is None:
719
                setattr(obj, attr, getattr(self, attr))
720
721 2
    def add_rule(self, rule):
722
        if rule is None:
723
            return
724
        if self.platform and not rule.platform:
725
            rule.platform = self.platform
726
        self.rules[rule.id_] = rule
727
        self._pass_our_properties_on_to(rule)
728
729 2
    def __str__(self):
730
        return self.id_
731
732
733 2
class Rule(object):
734
    """Represents XCCDF Rule
735
    """
736 2
    YAML_KEYS_DEFAULTS = {
737
        "prodtype": lambda: "all",
738
        "title": lambda: RuntimeError("Missing key 'title'"),
739
        "description": lambda: RuntimeError("Missing key 'description'"),
740
        "rationale": lambda: RuntimeError("Missing key 'rationale'"),
741
        "severity": lambda: RuntimeError("Missing key 'severity'"),
742
        "references": lambda: dict(),
743
        "identifiers": lambda: dict(),
744
        "ocil_clause": lambda: None,
745
        "ocil": lambda: None,
746
        "oval_external_content": lambda: None,
747
        "warnings": lambda: list(),
748
        "platform": lambda: None,
749
        "template": lambda: None,
750
    }
751
752 2
    def __init__(self, id_):
753 2
        self.id_ = id_
754 2
        self.prodtype = "all"
755 2
        self.title = ""
756 2
        self.description = ""
757 2
        self.rationale = ""
758 2
        self.severity = "unknown"
759 2
        self.references = {}
760 2
        self.identifiers = {}
761 2
        self.ocil_clause = None
762 2
        self.ocil = None
763 2
        self.oval_external_content = None
764 2
        self.warnings = []
765 2
        self.platform = None
766 2
        self.template = None
767
768 2
    @classmethod
769 2
    def from_yaml(cls, yaml_file, env_yaml=None):
770 2
        yaml_file = os.path.normpath(yaml_file)
771
772 2
        yaml_contents = open_and_macro_expand(yaml_file, env_yaml)
773 2
        if yaml_contents is None:
774
            return None
775
776 2
        rule_id, ext = os.path.splitext(os.path.basename(yaml_file))
777 2
        if rule_id == "rule" and ext == ".yml":
778 2
            rule_id = get_rule_dir_id(yaml_file)
779
780 2
        rule = cls(rule_id)
781
782 2
        try:
783 2
            rule._set_attributes_from_dict(yaml_contents)
784
        except RuntimeError as exc:
785
            msg = ("Error processing '{fname}': {err}"
786
                   .format(fname=yaml_file, err=str(exc)))
787
            raise RuntimeError(msg)
788
789 2
        for warning_list in rule.warnings:
790
            if len(warning_list) != 1:
791
                raise ValueError("Only one key/value pair should exist for each dictionary")
792
793 2
        if yaml_contents:
794
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
795
                               % (yaml_file, yaml_contents))
796
797 2
        rule.validate_prodtype(yaml_file)
798 2
        rule.validate_identifiers(yaml_file)
799 2
        rule.validate_references(yaml_file)
800 2
        return rule
801
802 2
    def _make_stigid_product_specific(self, product):
803 2
        stig_id = self.references.get("stigid", None)
804 2
        if not stig_id:
805 2
            return
806 2
        if "," in stig_id:
807 2
            raise ValueError("Rules can not have multiple STIG IDs.")
808 2
        stig_platform_id = STIG_PLATFORM_ID_MAP.get(product, product.upper())
809 2
        product_specific_stig_id = "{platform_id}-{stig_id}".format(
810
            platform_id=stig_platform_id, stig_id=stig_id)
811 2
        self.references["stigid"] = product_specific_stig_id
812
813 2
    def normalize(self, product):
814 2
        try:
815 2
            self.make_refs_and_identifiers_product_specific(product)
816 2
            self.make_template_product_specific(product)
817 2
        except Exception as exc:
818 2
            msg = (
819
                "Error normalizing '{rule}': {msg}"
820
                .format(rule=self.id_, msg=str(exc))
821
            )
822 2
            raise RuntimeError(msg)
823
824 2
    def _get_product_only_references(self):
825 2
        PRODUCT_REFERENCES = ("stigid",)
826
827 2
        product_references = dict()
828
829 2
        for ref in PRODUCT_REFERENCES:
830 2
            start = "{0}@".format(ref)
831 2
            for gref, gval in self.references.items():
832 2
                if ref == gref or gref.startswith(start):
833 2
                    product_references[gref] = gval
834 2
        return product_references
835
836 2
    def make_template_product_specific(self, product):
837 2
        product_suffix = "@{0}".format(product)
838
839 2
        if not self.template:
840
            return
841
842 2
        not_specific_vars = self.template.get("vars", dict())
843 2
        specific_vars = self._make_items_product_specific(
844
            not_specific_vars, product_suffix, True)
845 2
        self.template["vars"] = specific_vars
846
847 2
        not_specific_backends = self.template.get("backends", dict())
848 2
        specific_backends = self._make_items_product_specific(
849
            not_specific_backends, product_suffix, True)
850 2
        self.template["backends"] = specific_backends
851
852 2
    def make_refs_and_identifiers_product_specific(self, product):
853 2
        product_suffix = "@{0}".format(product)
854
855 2
        product_references = self._get_product_only_references()
856 2
        general_references = self.references.copy()
857 2
        for todel in product_references:
858 2
            general_references.pop(todel)
859
860 2
        to_set = dict(
861
            identifiers=(self.identifiers, False),
862
            general_references=(general_references, True),
863
            product_references=(product_references, False),
864
        )
865 2
        for name, (dic, allow_overwrites) in to_set.items():
866 2
            try:
867 2
                new_items = self._make_items_product_specific(
868
                    dic, product_suffix, allow_overwrites)
869 2
            except ValueError as exc:
870 2
                msg = (
871
                    "Error processing {what} for rule '{rid}': {msg}"
872
                    .format(what=name, rid=self.id_, msg=str(exc))
873
                )
874 2
                raise ValueError(msg)
875 2
            dic.clear()
876 2
            dic.update(new_items)
877
878 2
        self.references = general_references
879 2
        self.references.update(product_references)
880
881 2
        self._make_stigid_product_specific(product)
882
883 2
    def _make_items_product_specific(self, items_dict, product_suffix, allow_overwrites=False):
884 2
        new_items = dict()
885 2
        for full_label, value in items_dict.items():
886 2
            if "@" not in full_label and full_label not in new_items:
887 2
                new_items[full_label] = value
888 2
                continue
889
890 2
            if not full_label.endswith(product_suffix):
891 2
                continue
892
893 2
            label = full_label.split("@")[0]
894 2
            if label in items_dict and not allow_overwrites and value != items_dict[label]:
895 2
                msg = (
896
                    "There is a product-qualified '{item_q}' item, "
897
                    "but also an unqualified '{item_u}' item "
898
                    "and those two differ in value - "
899
                    "'{value_q}' vs '{value_u}' respectively."
900
                    .format(item_q=full_label, item_u=label,
901
                            value_q=value, value_u=items_dict[label])
902
                )
903 2
                raise ValueError(msg)
904 2
            new_items[label] = value
905 2
        return new_items
906
907 2
    def _set_attributes_from_dict(self, yaml_contents):
908 2
        for key, default_getter in self.YAML_KEYS_DEFAULTS.items():
909 2
            if key not in yaml_contents:
910 2
                value = default_getter()
911 2
                if isinstance(value, Exception):
912
                    raise value
913
            else:
914 2
                value = yaml_contents.pop(key)
915
916 2
            setattr(self, key, value)
917
918 2
    def to_contents_dict(self):
919
        """
920
        Returns a dictionary that is the same schema as the dict obtained when loading rule YAML.
921
        """
922
923 2
        yaml_contents = dict()
924 2
        for key in Rule.YAML_KEYS_DEFAULTS:
925 2
            yaml_contents[key] = getattr(self, key)
926
927 2
        return yaml_contents
928
929 2
    def validate_identifiers(self, yaml_file):
930 2
        if self.identifiers is None:
931
            raise ValueError("Empty identifier section in file %s" % yaml_file)
932
933
        # Validate all identifiers are non-empty:
934 2
        for ident_type, ident_val in self.identifiers.items():
935 2
            if not isinstance(ident_type, str) or not isinstance(ident_val, str):
936
                raise ValueError("Identifiers and values must be strings: %s in file %s"
937
                                 % (ident_type, yaml_file))
938 2
            if ident_val.strip() == "":
939
                raise ValueError("Identifiers must not be empty: %s in file %s"
940
                                 % (ident_type, yaml_file))
941 2
            if ident_type[0:3] == 'cce':
942 2
                if not is_cce_format_valid("CCE-" + ident_val):
943
                    raise ValueError("CCE Identifier format must be valid: invalid format '%s' for CEE '%s'"
944
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
945 2
                if not is_cce_value_valid("CCE-" + ident_val):
946
                    raise ValueError("CCE Identifier value is not a valid checksum: invalid value '%s' for CEE '%s'"
947
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
948
949 2
    def validate_references(self, yaml_file):
950 2
        if self.references is None:
951
            raise ValueError("Empty references section in file %s" % yaml_file)
952
953 2
        for ref_type, ref_val in self.references.items():
954 2
            if not isinstance(ref_type, str) or not isinstance(ref_val, str):
955
                raise ValueError("References and values must be strings: %s in file %s"
956
                                 % (ref_type, yaml_file))
957 2
            if ref_val.strip() == "":
958
                raise ValueError("References must not be empty: %s in file %s"
959
                                 % (ref_type, yaml_file))
960
961 2
        for ref_type, ref_val in self.references.items():
962 2
            for ref in ref_val.split(","):
963 2
                if ref.strip() != ref:
964
                    msg = (
965
                        "Comma-separated '{ref_type}' reference "
966
                        "in {yaml_file} contains whitespace."
967
                        .format(ref_type=ref_type, yaml_file=yaml_file))
968
                    raise ValueError(msg)
969
970 2
    def validate_prodtype(self, yaml_file):
971 2
        for ptype in self.prodtype.split(","):
972 2
            if ptype.strip() != ptype:
973
                msg = (
974
                    "Comma-separated '{prodtype}' prodtype "
975
                    "in {yaml_file} contains whitespace."
976
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
977
                raise ValueError(msg)
978
979 2
    def to_xml_element(self):
980
        rule = ET.Element('Rule')
981
        rule.set('id', self.id_)
982
        if self.prodtype != "all":
983
            rule.set("prodtype", self.prodtype)
984
        rule.set('severity', self.severity)
985
        add_sub_element(rule, 'title', self.title)
986
        add_sub_element(rule, 'description', self.description)
987
        add_sub_element(rule, 'rationale', self.rationale)
988
989
        main_ident = ET.Element('ident')
990
        for ident_type, ident_val in self.identifiers.items():
991
            # This is not true if items were normalized
992
            if '@' in ident_type:
993
                # the ident is applicable only on some product
994
                # format : 'policy@product', eg. 'stigid@product'
995
                # for them, we create a separate <ref> element
996
                policy, product = ident_type.split('@')
997
                ident = ET.SubElement(rule, 'ident')
998
                ident.set(policy, ident_val)
999
                ident.set('prodtype', product)
1000
            else:
1001
                main_ident.set(ident_type, ident_val)
1002
1003
        if main_ident.attrib:
1004
            rule.append(main_ident)
1005
1006
        main_ref = ET.Element('ref')
1007
        for ref_type, ref_val in self.references.items():
1008
            # This is not true if items were normalized
1009
            if '@' in ref_type:
1010
                # the reference is applicable only on some product
1011
                # format : 'policy@product', eg. 'stigid@product'
1012
                # for them, we create a separate <ref> element
1013
                policy, product = ref_type.split('@')
1014
                ref = ET.SubElement(rule, 'ref')
1015
                ref.set(policy, ref_val)
1016
                ref.set('prodtype', product)
1017
            else:
1018
                main_ref.set(ref_type, ref_val)
1019
1020
        if main_ref.attrib:
1021
            rule.append(main_ref)
1022
1023
        if self.oval_external_content:
1024
            check = ET.SubElement(rule, 'check')
1025
            check.set("system", "http://oval.mitre.org/XMLSchema/oval-definitions-5")
1026
            external_content = ET.SubElement(check, "check-content-ref")
1027
            external_content.set("href", self.oval_external_content)
1028
        else:
1029
            # TODO: This is pretty much a hack, oval ID will be the same as rule ID
1030
            #       and we don't want the developers to have to keep them in sync.
1031
            #       Therefore let's just add an OVAL ref of that ID.
1032
            oval_ref = ET.SubElement(rule, "oval")
1033
            oval_ref.set("id", self.id_)
1034
1035
        if self.ocil or self.ocil_clause:
1036
            ocil = add_sub_element(rule, 'ocil', self.ocil if self.ocil else "")
1037
            if self.ocil_clause:
1038
                ocil.set("clause", self.ocil_clause)
1039
1040
        add_warning_elements(rule, self.warnings)
1041
1042
        if self.platform:
1043
            platform_el = ET.SubElement(rule, "platform")
1044
            try:
1045
                platform_cpe = XCCDF_PLATFORM_TO_CPE[self.platform]
1046
            except KeyError:
1047
                raise ValueError("Unsupported platform '%s' in rule '%s'." % (self.platform, self.id_))
1048
            platform_el.set("idref", platform_cpe)
1049
1050
        return rule
1051
1052 2
    def to_file(self, file_name):
1053
        root = self.to_xml_element()
1054
        tree = ET.ElementTree(root)
1055
        tree.write(file_name)
1056
1057
1058 2
class DirectoryLoader(object):
1059 2
    def __init__(self, profiles_dir, bash_remediation_fns, env_yaml):
1060
        self.benchmark_file = None
1061
        self.group_file = None
1062
        self.loaded_group = None
1063
        self.rule_files = []
1064
        self.value_files = []
1065
        self.subdirectories = []
1066
1067
        self.all_values = set()
1068
        self.all_rules = set()
1069
        self.all_groups = set()
1070
1071
        self.profiles_dir = profiles_dir
1072
        self.bash_remediation_fns = bash_remediation_fns
1073
        self.env_yaml = env_yaml
1074
        self.product = env_yaml["product"]
1075
1076
        self.parent_group = None
1077
1078 2
    def _collect_items_to_load(self, guide_directory):
1079
        for dir_item in os.listdir(guide_directory):
1080
            dir_item_path = os.path.join(guide_directory, dir_item)
1081
            _, extension = os.path.splitext(dir_item)
1082
1083
            if extension == '.var':
1084
                self.value_files.append(dir_item_path)
1085
            elif dir_item == "benchmark.yml":
1086
                if self.benchmark_file:
1087
                    raise ValueError("Multiple benchmarks in one directory")
1088
                self.benchmark_file = dir_item_path
1089
            elif dir_item == "group.yml":
1090
                if self.group_file:
1091
                    raise ValueError("Multiple groups in one directory")
1092
                self.group_file = dir_item_path
1093
            elif extension == '.rule':
1094
                self.rule_files.append(dir_item_path)
1095
            elif is_rule_dir(dir_item_path):
1096
                self.rule_files.append(get_rule_dir_yaml(dir_item_path))
1097
            elif dir_item != "tests":
1098
                if os.path.isdir(dir_item_path):
1099
                    self.subdirectories.append(dir_item_path)
1100
                else:
1101
                    sys.stderr.write(
1102
                        "Encountered file '%s' while recursing, extension '%s' "
1103
                        "is unknown. Skipping..\n"
1104
                        % (dir_item, extension)
1105
                    )
1106
1107 2
    def load_benchmark_or_group(self, guide_directory):
1108
        """
1109
        Loads a given benchmark or group from the specified benchmark_file or
1110
        group_file, in the context of guide_directory, action, profiles_dir,
1111
        env_yaml, and bash_remediation_fns.
1112
1113
        Returns the loaded group or benchmark.
1114
        """
1115
        group = None
1116
        if self.group_file and self.benchmark_file:
1117
            raise ValueError("A .benchmark file and a .group file were found in "
1118
                             "the same directory '%s'" % (guide_directory))
1119
1120
        # we treat benchmark as a special form of group in the following code
1121
        if self.benchmark_file:
1122
            group = Benchmark.from_yaml(
1123
                self.benchmark_file, 'product-name', self.env_yaml
1124
            )
1125
            if self.profiles_dir:
1126
                group.add_profiles_from_dir(self.action, self.profiles_dir, self.env_yaml)
1127
            group.add_bash_remediation_fns_from_file(self.action, self.bash_remediation_fns)
1128
1129
        if self.group_file:
1130
            group = Group.from_yaml(self.group_file, self.env_yaml)
1131
            self.all_groups.add(group.id_)
1132
1133
        return group
1134
1135 2
    def _load_group_process_and_recurse(self, guide_directory):
1136
        self.loaded_group = self.load_benchmark_or_group(guide_directory)
1137
1138
        if self.loaded_group:
1139
            if self.parent_group:
1140
                self.parent_group.add_group(self.loaded_group)
1141
1142
            self._process_values()
1143
            self._recurse_into_subdirs()
1144
            self._process_rules()
1145
1146 2
    def process_directory_tree(self, start_dir, extra_group_dirs=None):
1147
        self._collect_items_to_load(start_dir)
1148
        if extra_group_dirs is not None:
1149
            self.subdirectories += extra_group_dirs
1150
        self._load_group_process_and_recurse(start_dir)
1151
1152 2
    def _recurse_into_subdirs(self):
1153
        for subdir in self.subdirectories:
1154
            loader = self._get_new_loader()
1155
            loader.parent_group = self.loaded_group
1156
            loader.process_directory_tree(subdir)
1157
            self.all_values.update(loader.all_values)
1158
            self.all_rules.update(loader.all_rules)
1159
            self.all_groups.update(loader.all_groups)
1160
1161 2
    def _get_new_loader(self):
1162
        raise NotImplementedError()
1163
1164 2
    def _process_values(self):
1165
        raise NotImplementedError()
1166
1167 2
    def _process_rules(self):
1168
        raise NotImplementedError()
1169
1170
1171 2
class BuildLoader(DirectoryLoader):
1172 2
    def __init__(self, profiles_dir, bash_remediation_fns, env_yaml, resolved_rules_dir=None):
1173
        super(BuildLoader, self).__init__(profiles_dir, bash_remediation_fns, env_yaml)
1174
1175
        self.action = "build"
1176
1177
        self.resolved_rules_dir = resolved_rules_dir
1178
        if resolved_rules_dir and not os.path.isdir(resolved_rules_dir):
1179
            os.mkdir(resolved_rules_dir)
1180
1181 2
    def _process_values(self):
1182
        for value_yaml in self.value_files:
1183
            value = Value.from_yaml(value_yaml, self.env_yaml)
1184
            self.all_values.add(value)
1185
            self.loaded_group.add_value(value)
1186
1187 2
    def _process_rules(self):
1188
        for rule_yaml in self.rule_files:
1189
            try:
1190
                rule = Rule.from_yaml(rule_yaml, self.env_yaml)
1191
            except DocumentationNotComplete:
1192
                # Happens on non-debug build when a rule is "documentation-incomplete"
1193
                continue
1194
            prodtypes = parse_prodtype(rule.prodtype)
1195
            if "all" not in prodtypes and self.product not in prodtypes:
1196
                continue
1197
            self.all_rules.add(rule)
1198
            self.loaded_group.add_rule(rule)
1199
            if self.resolved_rules_dir:
1200
                output_for_rule = os.path.join(
1201
                    self.resolved_rules_dir, "{id_}.yml".format(id_=rule.id_))
1202
                mkdir_p(self.resolved_rules_dir)
1203
                with open(output_for_rule, "w") as f:
1204
                    rule.normalize(self.env_yaml["product"])
1205
                    yaml.dump(rule.to_contents_dict(), f)
1206
1207 2
    def _get_new_loader(self):
1208
        return BuildLoader(
1209
            self.profiles_dir, self.bash_remediation_fns, self.env_yaml, self.resolved_rules_dir)
1210
1211 2
    def export_group_to_file(self, filename):
1212
        return self.loaded_group.to_file(filename)
1213
1214
1215 2
class ListInputsLoader(DirectoryLoader):
1216 2
    def __init__(self, profiles_dir, bash_remediation_fns, env_yaml):
1217
        super(ListInputsLoader, self).__init__(profiles_dir, bash_remediation_fns, env_yaml)
1218
1219
        self.action = "list-inputs"
1220
1221 2
    def _process_values(self):
1222
        for value_yaml in self.value_files:
1223
            print(value_yaml)
1224
1225 2
    def _process_rules(self):
1226
        for rule_yaml in self.rule_files:
1227
            print(rule_yaml)
1228
1229 2
    def _get_new_loader(self):
1230
        return ListInputsLoader(
1231
            self.profiles_dir, self.bash_remediation_fns, self.env_yaml)
1232
1233 2
    def load_benchmark_or_group(self, guide_directory):
1234
        result = super(ListInputsLoader, self).load_benchmark_or_group(guide_directory)
1235
1236
        if self.benchmark_file:
1237
            print(self.benchmark_file)
1238
1239
        if self.group_file:
1240
            print(self.group_file)
1241
1242
        return result
1243