ssg.build_yaml.Rule.get_template_context()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 5
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
cc 2
eloc 5
nop 2
dl 0
loc 5
ccs 0
cts 5
cp 0
crap 6
rs 10
c 0
b 0
f 0
1
from __future__ import absolute_import
2
from __future__ import print_function
3
4
from copy import deepcopy
5
import collections
6
import datetime
7
import json
8
import os
9
import os.path
10
import re
11
import sys
12
import glob
13
14
15
import ssg.build_remediations
16
import ssg.components
17
from .build_cpe import CPEALLogicalTest, CPEALCheckFactRef, ProductCPEs
18
from .constants import (XCCDF12_NS,
19
                        OSCAP_BENCHMARK,
20
                        OSCAP_GROUP,
21
                        OSCAP_RULE,
22
                        OSCAP_VALUE,
23
                        SCE_SYSTEM,
24
                        cce_uri,
25
                        dc_namespace,
26
                        ocil_cs,
27
                        ocil_namespace,
28
                        oval_namespace,
29
                        xhtml_namespace,
30
                        xsi_namespace,
31
                        timestamp,
32
                        SSG_BENCHMARK_LATEST_URI,
33
                        SSG_PROJECT_NAME,
34
                        SSG_REF_URIS,
35
                        PREFIX_TO_NS,
36
                        FIX_TYPE_TO_SYSTEM
37
                        )
38
from .rules import get_rule_dir_yaml, is_rule_dir
39
from .rule_yaml import parse_prodtype
40
41
from .cce import is_cce_format_valid, is_cce_value_valid
42
from .yaml import DocumentationNotComplete, open_and_macro_expand
43
from .utils import required_key, mkdir_p
44
45
from .xml import ElementTree as ET, register_namespaces, parse_file
46
import ssg.build_stig
47
48
from .entities.common import add_sub_element, make_items_product_specific, \
49
                             XCCDFEntity, Templatable
50
from .entities.profile import Profile, ProfileWithInlinePolicies
51
52
53
def reorder_according_to_ordering(unordered, ordering, regex=None):
54
    ordered = []
55
    if regex is None:
56
        regex = "|".join(["({0})".format(item) for item in ordering])
57
    regex = re.compile(regex)
58
59
    items_to_order = list(filter(regex.match, unordered))
60
    unordered = set(unordered)
61
62
    for priority_type in ordering:
63
        for item in items_to_order:
64
            if priority_type in item and item in unordered:
65
                ordered.append(item)
66
                unordered.remove(item)
67
    ordered.extend(sorted(unordered))
68
    return ordered
69
70
71
def add_warning_elements(element, warnings):
72
    # The use of [{dict}, {dict}] in warnings is to handle the following
73
    # scenario where multiple warnings have the same category which is
74
    # valid in SCAP and our content:
75
    #
76
    # warnings:
77
    #     - general: Some general warning
78
    #     - general: Some other general warning
79
    #     - general: |-
80
    #         Some really long multiline general warning
81
    #
82
    # Each of the {dict} should have only one key/value pair.
83
    for warning_dict in warnings:
84
        warning = add_sub_element(
85
            element, "warning", XCCDF12_NS, list(warning_dict.values())[0])
86
        warning.set("category", list(warning_dict.keys())[0])
87
88
89
def add_nondata_subelements(element, subelement, attribute, attr_data):
90
    """Add multiple iterations of a sublement that contains an attribute but no data
91
       For example, <requires id="my_required_id"/>"""
92
    for data in attr_data:
93
        req = ET.SubElement(element, "{%s}%s" % (XCCDF12_NS, subelement))
94
        req.set(attribute, data)
95
96
97
def check_warnings(xccdf_structure):
98
    for warning_list in xccdf_structure.warnings:
99
        if len(warning_list) != 1:
100
            msg = "Only one key/value pair should exist for each warnings dictionary"
101
            raise ValueError(msg)
102
103
104
def add_reference_elements(element, references, ref_uri_dict):
105
    for ref_type, ref_vals in references.items():
106
        for ref_val in ref_vals.split(","):
107
            # This assumes that a single srg key may have items from multiple SRG types
108
            if ref_type == 'srg':
109
                if ref_val.startswith('SRG-OS-'):
110
                    ref_href = ref_uri_dict['os-srg']
111
                elif ref_val.startswith('SRG-APP-'):
112
                    ref_href = ref_uri_dict['app-srg']
113
                else:
114
                    raise ValueError("SRG {0} doesn't have a URI defined.".format(ref_val))
115
            else:
116
                try:
117
                    ref_href = ref_uri_dict[ref_type]
118
                except KeyError as exc:
119
                    msg = (
120
                        "Error processing reference {0}: {1} in Rule {2}."
121
                        .format(ref_type, ref_vals, self.id_))
122
                    raise ValueError(msg)
123
124
            ref = ET.SubElement(element, '{%s}reference' % XCCDF12_NS)
125
            ref.set("href", ref_href)
126
            ref.text = ref_val
127
128
129
def add_benchmark_metadata(element, contributors_file):
130
    metadata = ET.SubElement(element, "{%s}metadata" % XCCDF12_NS)
131
132
    publisher = ET.SubElement(metadata, "{%s}publisher" % dc_namespace)
133
    publisher.text = SSG_PROJECT_NAME
134
135
    creator = ET.SubElement(metadata, "{%s}creator" % dc_namespace)
136
    creator.text = SSG_PROJECT_NAME
137
138
    contrib_tree = parse_file(contributors_file)
139
    for c in contrib_tree.iter('contributor'):
140
        contributor = ET.SubElement(metadata, "{%s}contributor" % dc_namespace)
141
        contributor.text = c.text
142
143
    source = ET.SubElement(metadata, "{%s}source" % dc_namespace)
144
    source.text = SSG_BENCHMARK_LATEST_URI
145
146
147
class Value(XCCDFEntity):
148
    """Represents XCCDF Value
149
    """
150
    KEYS = dict(
151
        description=lambda: "",
152
        type=lambda: "",
153
        operator=lambda: "equals",
154
        interactive=lambda: False,
155
        options=lambda: dict(),
156
        warnings=lambda: list(),
157
        ** XCCDFEntity.KEYS
158
    )
159
160
    MANDATORY_KEYS = {
161
        "title",
162
        "description",
163
        "type",
164
    }
165
166
    @classmethod
167
    def process_input_dict(cls, input_contents, env_yaml, product_cpes=None):
168
        input_contents["interactive"] = (
169
            input_contents.get("interactive", "false").lower() == "true")
170
171
        data = super(Value, cls).process_input_dict(input_contents, env_yaml)
172
173
        possible_operators = ["equals", "not equal", "greater than",
174
                              "less than", "greater than or equal",
175
                              "less than or equal", "pattern match"]
176
177
        if data["operator"] not in possible_operators:
178
            raise ValueError(
179
                "Found an invalid operator value '%s'. "
180
                "Expected one of: %s"
181
                % (data["operator"], ", ".join(possible_operators))
182
            )
183
184
        return data
185
186
    @classmethod
187
    def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None):
188
        value = super(Value, cls).from_yaml(yaml_file, env_yaml)
189
190
        check_warnings(value)
191
192
        return value
193
194
    def to_xml_element(self):
195
        value = ET.Element('{%s}Value' % XCCDF12_NS)
196
        value.set('id', OSCAP_VALUE + self.id_)
197
        value.set('type', self.type)
198
        if self.operator != "equals":  # equals is the default
199
            value.set('operator', self.operator)
200
        if self.interactive:  # False is the default
201
            value.set('interactive', 'true')
202
        title = ET.SubElement(value, '{%s}title' % XCCDF12_NS)
203
        title.text = self.title
204
        add_sub_element(value, 'description', XCCDF12_NS, self.description)
205
        add_warning_elements(value, self.warnings)
206
207
        for selector, option in self.options.items():
208
            # do not confuse Value with big V with value with small v
209
            # value is child element of Value
210
            value_small = ET.SubElement(value, '{%s}value' % XCCDF12_NS)
211
            # by XCCDF spec, default value is value without selector
212
            if selector != "default":
213
                value_small.set('selector', str(selector))
214
            value_small.text = str(option)
215
216
        return value
217
218
219
class Benchmark(XCCDFEntity):
220
    """Represents XCCDF Benchmark
221
    """
222
    KEYS = dict(
223
        status=lambda: "",
224
        description=lambda: "",
225
        notice_id=lambda: "",
226
        notice_description=lambda: "",
227
        front_matter=lambda: "",
228
        rear_matter=lambda: "",
229
        cpes=lambda: list(),
230
        version=lambda: "",
231
        profiles=lambda: list(),
232
        values=lambda: dict(),
233
        groups=lambda: dict(),
234
        rules=lambda: dict(),
235
        platforms=lambda: dict(),
236
        product_cpe_names=lambda: list(),
237
        ** XCCDFEntity.KEYS
238
    )
239
240
    MANDATORY_KEYS = {
241
        "title",
242
        "status",
243
        "description",
244
        "front_matter",
245
        "rear_matter",
246
    }
247
248
    GENERIC_FILENAME = "benchmark.yml"
249
250
    def load_entities(self, rules_by_id, values_by_id, groups_by_id):
251
        for rid, val in self.rules.items():
252
            if not val:
253
                self.rules[rid] = rules_by_id[rid]
254
255
        for vid, val in self.values.items():
256
            if not val:
257
                self.values[vid] = values_by_id[vid]
258
259
        for gid, val in self.groups.items():
260
            if not val:
261
                self.groups[gid] = groups_by_id[gid]
262
263
    @classmethod
264
    def process_input_dict(cls, input_contents, env_yaml, product_cpes):
265
        input_contents["front_matter"] = input_contents["front-matter"]
266
        del input_contents["front-matter"]
267
        input_contents["rear_matter"] = input_contents["rear-matter"]
268
        del input_contents["rear-matter"]
269
270
        data = super(Benchmark, cls).process_input_dict(input_contents, env_yaml, product_cpes)
271
272
        notice_contents = required_key(input_contents, "notice")
273
        del input_contents["notice"]
274
275
        data["notice_id"] = required_key(notice_contents, "id")
276
        del notice_contents["id"]
277
278
        data["notice_description"] = required_key(notice_contents, "description")
279
        del notice_contents["description"]
280
281
        return data
282
283
    def represent_as_dict(self):
284
        data = super(Benchmark, self).represent_as_dict()
285
        data["rear-matter"] = data["rear_matter"]
286
        del data["rear_matter"]
287
288
        data["front-matter"] = data["front_matter"]
289
        del data["front_matter"]
290
        return data
291
292
    @classmethod
293
    def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None):
294
        benchmark = super(Benchmark, cls).from_yaml(yaml_file, env_yaml)
295
        if env_yaml:
296
            benchmark.product_cpe_names = product_cpes.get_product_cpe_names()
297
            benchmark.product_cpes = product_cpes
298
            benchmark.id_ = env_yaml["benchmark_id"]
299
            benchmark.version = env_yaml["ssg_version_str"]
300
        else:
301
            benchmark.id_ = "product-name"
302
            benchmark.version = "0.0"
303
304
        return benchmark
305
306
    def add_profiles_from_dir(self, dir_, env_yaml, product_cpes):
307
        for dir_item in sorted(os.listdir(dir_)):
308
            dir_item_path = os.path.join(dir_, dir_item)
309
            if not os.path.isfile(dir_item_path):
310
                continue
311
312
            _, ext = os.path.splitext(os.path.basename(dir_item_path))
313
            if ext != '.profile':
314
                sys.stderr.write(
315
                    "Encountered file '%s' while looking for profiles, "
316
                    "extension '%s' is unknown. Skipping..\n"
317
                    % (dir_item, ext)
318
                )
319
                continue
320
321
            try:
322
                new_profile = ProfileWithInlinePolicies.from_yaml(
323
                    dir_item_path, env_yaml, product_cpes)
324
            except DocumentationNotComplete:
325
                continue
326
            except Exception as exc:
327
                msg = ("Error building profile from '{fname}': '{error}'"
328
                       .format(fname=dir_item_path, error=str(exc)))
329
                raise RuntimeError(msg)
330
            if new_profile is None:
331
                continue
332
333
            self.profiles.append(new_profile)
334
335
    def unselect_empty_groups(self):
336
        for p in self.profiles:
337
            p.unselect_empty_groups(self)
338
339
    def to_xml_element(self, env_yaml=None, product_cpes=None):
340
        root = ET.Element('{%s}Benchmark' % XCCDF12_NS)
341
        root.set('id', OSCAP_BENCHMARK + self.id_)
342
        root.set('xmlns:xsi', 'http://www.w3.org/2001/XMLSchema-instance')
343
        root.set(
344
            'xsi:schemaLocation',
345
            'http://checklists.nist.gov/xccdf/1.2 xccdf-1.2.xsd')
346
        root.set('style', 'SCAP_1.2')
347
        root.set('resolved', 'true')
348
        root.set('xml:lang', 'en-US')
349
        status = ET.SubElement(root, '{%s}status' % XCCDF12_NS)
350
        status.set('date', datetime.date.today().strftime("%Y-%m-%d"))
351
        status.text = self.status
352
        add_sub_element(root, "title", XCCDF12_NS, self.title)
353
        add_sub_element(root, "description", XCCDF12_NS, self.description)
354
        notice = add_sub_element(
355
            root, "notice", XCCDF12_NS, self.notice_description)
356
        notice.set('id', self.notice_id)
357
        add_sub_element(root, "front-matter", XCCDF12_NS, self.front_matter)
358
        add_sub_element(root, "rear-matter",  XCCDF12_NS, self.rear_matter)
359
        # if there are no platforms, do not output platform-specification at all
360
        if len(self.product_cpes.platforms) > 0:
361
            cpe_platform_spec = ET.Element(
362
                "{%s}platform-specification" % PREFIX_TO_NS["cpe-lang"])
363
            for platform in self.product_cpes.platforms.values():
364
                cpe_platform_spec.append(platform.to_xml_element())
365
            root.append(cpe_platform_spec)
366
367
        # The Benchmark applicability is determined by the CPEs
368
        # defined in the product.yml
369
        for cpe_name in self.product_cpe_names:
370
            plat = ET.SubElement(root, "{%s}platform" % XCCDF12_NS)
371
            plat.set("idref", cpe_name)
372
373
        version = ET.SubElement(root, '{%s}version' % XCCDF12_NS)
374
        version.text = self.version
375
        version.set('update', SSG_BENCHMARK_LATEST_URI)
376
377
        contributors_file = os.path.join(os.path.dirname(__file__), "../Contributors.xml")
378
        add_benchmark_metadata(root, contributors_file)
379
380
        for profile in self.profiles:
381
            root.append(profile.to_xml_element())
382
383
        for value in self.values.values():
384
            root.append(value.to_xml_element())
385
386
        groups_in_bench = list(self.groups.keys())
387
        priority_order = ["system", "services"]
388
        groups_in_bench = reorder_according_to_ordering(groups_in_bench, priority_order)
389
390
        # Make system group the first, followed by services group
391
        for group_id in groups_in_bench:
392
            group = self.groups.get(group_id)
393
            # Products using application benchmark don't have system or services group
394
            if group is not None:
395
                root.append(group.to_xml_element(env_yaml))
396
397
        for rule in self.rules.values():
398
            root.append(rule.to_xml_element(env_yaml))
399
400
        return root
401
402
    def to_file(self, file_name, env_yaml=None):
403
        root = self.to_xml_element(env_yaml)
404
        tree = ET.ElementTree(root)
405
        tree.write(file_name)
406
407
    def add_value(self, value):
408
        if value is None:
409
            return
410
        self.values[value.id_] = value
411
412
    # The benchmark is also considered a group, so this function signature needs to match
413
    # Group()'s add_group()
414
    def add_group(self, group, env_yaml=None, product_cpes=None):
415
        if group is None:
416
            return
417
        self.groups[group.id_] = group
418
419
    def add_rule(self, rule):
420
        if rule is None:
421
            return
422
        self.rules[rule.id_] = rule
423
424
    def to_xccdf(self):
425
        """We can easily extend this script to generate a valid XCCDF instead
426
        of SSG SHORTHAND.
427
        """
428
        raise NotImplementedError
429
430
    def __str__(self):
431
        return self.id_
432
433
434
class Group(XCCDFEntity):
435
    """Represents XCCDF Group
436
    """
437
438
    GENERIC_FILENAME = "group.yml"
439
440
    KEYS = dict(
441
        prodtype=lambda: "all",
442
        description=lambda: "",
443
        warnings=lambda: list(),
444
        requires=lambda: list(),
445
        conflicts=lambda: list(),
446
        values=lambda: dict(),
447
        groups=lambda: dict(),
448
        rules=lambda: dict(),
449
        platform=lambda: "",
450
        platforms=lambda: set(),
451
        inherited_platforms=lambda: set(),
452
        cpe_platform_names=lambda: set(),
453
        ** XCCDFEntity.KEYS
454
    )
455
456
    MANDATORY_KEYS = {
457
        "title",
458
        "status",
459
        "description",
460
        "front_matter",
461
        "rear_matter",
462
    }
463
464
    @classmethod
465
    def process_input_dict(cls, input_contents, env_yaml, product_cpes=None):
466
        data = super(Group, cls).process_input_dict(input_contents, env_yaml, product_cpes)
467
        if data["rules"]:
468
            rule_ids = data["rules"]
469
            data["rules"] = {rid: None for rid in rule_ids}
470
471
        if data["groups"]:
472
            group_ids = data["groups"]
473
            data["groups"] = {gid: None for gid in group_ids}
474
475
        if data["values"]:
476
            value_ids = data["values"]
477
            data["values"] = {vid: None for vid in value_ids}
478
479
        if data["platform"]:
480
            data["platforms"].add(data["platform"])
481
482
        # parse platform definition and get CPEAL platform
483
        # if cpe_platform_names not already defined
484
        if data["platforms"] and not data["cpe_platform_names"]:
485
            for platform in data["platforms"]:
486
                cpe_platform = Platform.from_text(platform, product_cpes)
487
                cpe_platform = add_platform_if_not_defined(cpe_platform, product_cpes)
488
                data["cpe_platform_names"].add(cpe_platform.id_)
489
        return data
490
491
    def load_entities(self, rules_by_id, values_by_id, groups_by_id):
492
        for rid, val in self.rules.items():
493
            if not val:
494
                self.rules[rid] = rules_by_id[rid]
495
496
        for vid, val in self.values.items():
497
            if not val:
498
                self.values[vid] = values_by_id[vid]
499
500
        for gid in list(self.groups):
501
            val = self.groups.get(gid, None)
502
            if not val:
503
                try:
504
                    self.groups[gid] = groups_by_id[gid]
505
                except KeyError:
506
                    # Add only the groups we have compiled and loaded
507
                    del self.groups[gid]
508
                    pass
509
510
    def represent_as_dict(self):
511
        yaml_contents = super(Group, self).represent_as_dict()
512
513
        if self.rules:
514
            yaml_contents["rules"] = sorted(list(self.rules.keys()))
515
        if self.groups:
516
            yaml_contents["groups"] = sorted(list(self.groups.keys()))
517
        if self.values:
518
            yaml_contents["values"] = sorted(list(self.values.keys()))
519
520
        return yaml_contents
521
522
    def to_xml_element(self, env_yaml=None):
523
        group = ET.Element('{%s}Group' % XCCDF12_NS)
524
        group.set('id', OSCAP_GROUP + self.id_)
525
        title = ET.SubElement(group, '{%s}title' % XCCDF12_NS)
526
        title.text = self.title
527
        add_sub_element(group, 'description', XCCDF12_NS, self.description)
528
        add_warning_elements(group, self.warnings)
529
530
        # This is where references should be put if there are any
531
        # This is where rationale should be put if there are any
532
533
        for cpe_platform_name in self.cpe_platform_names:
534
            platform_el = ET.SubElement(group, "{%s}platform" % XCCDF12_NS)
535
            platform_el.set("idref", "#"+cpe_platform_name)
536
537
        add_nondata_subelements(
538
            group, "requires", "idref",
539
            list(map(lambda x: OSCAP_GROUP + x, self.requires)))
540
        add_nondata_subelements(
541
            group, "conflicts", "idref",
542
            list(map(lambda x: OSCAP_GROUP + x, self.conflicts)))
543
        for _value in self.values.values():
544
            if _value is not None:
545
                group.append(_value.to_xml_element())
546
547
        # Rules that install or remove packages affect remediation
548
        # of other rules.
549
        # When packages installed/removed rules come first:
550
        # The Rules are ordered in more logical way, and
551
        # remediation order is natural, first the package is installed, then configured.
552
        rules_in_group = list(self.rules.keys())
553
        regex = (r'(package_.*_(installed|removed))|' +
554
                 r'(service_.*_(enabled|disabled))|' +
555
                 r'install_smartcard_packages|' +
556
                 r'sshd_set_keepalive(_0)?|' +
557
                 r'sshd_set_idle_timeout$')
558
        priority_order = ["enable_authselect", "installed", "install_smartcard_packages", "removed",
559
                          "enabled", "disabled", "sshd_set_keepalive_0",
560
                          "sshd_set_keepalive", "sshd_set_idle_timeout"]
561
        rules_in_group = reorder_according_to_ordering(rules_in_group, priority_order, regex)
562
563
        # Add rules in priority order, first all packages installed, then removed,
564
        # followed by services enabled, then disabled
565
        for rule_id in rules_in_group:
566
            rule = self.rules.get(rule_id)
567
            if rule is not None:
568
                group.append(rule.to_xml_element(env_yaml))
569
570
        # Add the sub groups after any current level group rules.
571
        # As package installed/removed and service enabled/disabled rules are usuallly in
572
        # top level group, this ensures groups that further configure a package or service
573
        # are after rules that install or remove it.
574
        groups_in_group = list(self.groups.keys())
575
        priority_order = [
576
            # Make sure rpm_verify_(hashes|permissions|ownership) are run before any other rule.
577
            # Due to conflicts between rules rpm_verify_* rules and any rule that configures
578
            # stricter settings, like file_permissions_grub2_cfg and sudo_dedicated_group,
579
            # the rules deviating from the system default should be evaluated later.
580
            # So that in the end the system has contents, permissions and ownership reset, and
581
            # any deviations or stricter settings are applied by the rules in the profile.
582
            "software", "integrity", "integrity-software", "rpm_verification",
583
584
            # The account group has to precede audit group because
585
            # the rule package_screen_installed is desired to be executed before the rule
586
            # audit_rules_privileged_commands, othervise the rule
587
            # does not catch newly installed screen binary during remediation
588
            # and report fail
589
            "accounts", "auditing",
590
591
592
            # The FIPS group should come before Crypto,
593
            # if we want to set a different (stricter) Crypto Policy than FIPS.
594
            "fips", "crypto",
595
596
            # The firewalld_activation must come before ruleset_modifications, othervise
597
            # remediations for ruleset_modifications won't work
598
            "firewalld_activation", "ruleset_modifications",
599
600
            # Rules from group disabling_ipv6 must precede rules from configuring_ipv6,
601
            # otherwise the remediation prints error although it is successful
602
            "disabling_ipv6", "configuring_ipv6"
603
        ]
604
        groups_in_group = reorder_according_to_ordering(groups_in_group, priority_order)
605
        for group_id in groups_in_group:
606
            _group = self.groups[group_id]
607
            if _group is not None:
608
                group.append(_group.to_xml_element(env_yaml))
609
610
        return group
611
612
    def add_value(self, value):
613
        if value is None:
614
            return
615
        self.values[value.id_] = value
616
617
    def add_group(self, group, env_yaml=None, product_cpes=None):
618
        self._add_child(group, self.groups, env_yaml, product_cpes)
619
620
    def add_rule(self, rule, env_yaml=None, product_cpes=None):
621
        self._add_child(rule, self.rules, env_yaml, product_cpes)
622
        if env_yaml:
623
            for platform in rule.inherited_platforms:
624
                cpe_platform = Platform.from_text(platform, product_cpes)
625
                cpe_platform = add_platform_if_not_defined(cpe_platform, product_cpes)
626
                rule.inherited_cpe_platform_names.add(cpe_platform.id_)
627
628
    def _add_child(self, child, childs, env_yaml=None, product_cpes=None):
629
        if child is None:
630
            return
631
        child.inherited_platforms.update(self.platforms, self.inherited_platforms)
632
        childs[child.id_] = child
633
634
    def __str__(self):
635
        return self.id_
636
637
638
def noop_rule_filterfunc(rule):
639
    return True
640
641
642
def rule_filter_from_def(filterdef):
643
    if filterdef is None or filterdef == "":
644
        return noop_rule_filterfunc
645
646
    def filterfunc(rule):
647
        # Remove globals for security and only expose
648
        # variables relevant to the rule
649
        return eval(filterdef, {"__builtins__": None}, rule.__dict__)
650
    return filterfunc
651
652
653
class Rule(XCCDFEntity, Templatable):
654
    """Represents XCCDF Rule
655
    """
656
    KEYS = dict(
657
        prodtype=lambda: "all",
658
        description=lambda: "",
659
        rationale=lambda: "",
660
        severity=lambda: "",
661
        references=lambda: dict(),
662
        components=lambda: list(),
663
        identifiers=lambda: dict(),
664
        ocil_clause=lambda: None,
665
        ocil=lambda: None,
666
        oval_external_content=lambda: None,
667
        fixtext=lambda: "",
668
        checktext=lambda: "",
669
        vuldiscussion=lambda: "",
670
        srg_requirement=lambda: "",
671
        warnings=lambda: list(),
672
        conflicts=lambda: list(),
673
        requires=lambda: list(),
674
        policy_specific_content=lambda: dict(),
675
        platform=lambda: None,
676
        platforms=lambda: set(),
677
        sce_metadata=lambda: dict(),
678
        inherited_platforms=lambda: set(),
679
        cpe_platform_names=lambda: set(),
680
        inherited_cpe_platform_names=lambda: set(),
681
        bash_conditional=lambda: None,
682
        fixes=lambda: dict(),
683
        **XCCDFEntity.KEYS
684
    )
685
    KEYS.update(**Templatable.KEYS)
686
687
    MANDATORY_KEYS = {
688
        "title",
689
        "description",
690
        "rationale",
691
        "severity",
692
    }
693
694
    GENERIC_FILENAME = "rule.yml"
695
    ID_LABEL = "rule_id"
696
697
    PRODUCT_REFERENCES = ("stigid", "cis",)
698
699
    def __init__(self, id_):
700
        super(Rule, self).__init__(id_)
701
        self.sce_metadata = None
702
703
    def __deepcopy__(self, memo):
704
        cls = self.__class__
705
        result = cls.__new__(cls)
706
        memo[id(self)] = result
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable id does not seem to be defined.
Loading history...
707
        for k, v in self.__dict__.items():
708
            # These are difficult to deep copy, so let's just re-use them.
709
            if k != "template" and k != "local_env_yaml":
710
                setattr(result, k, deepcopy(v, memo))
711
            else:
712
                setattr(result, k, v)
713
        return result
714
715
    @classmethod
716
    def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None, sce_metadata=None):
717
        rule = super(Rule, cls).from_yaml(yaml_file, env_yaml, product_cpes)
718
719
        # platforms are read as list from the yaml file
720
        # we need them to convert to set again
721
        rule.platforms = set(rule.platforms)
722
723
        # rule.platforms.update(set(rule.inherited_platforms))
724
725
        check_warnings(rule)
726
727
        # ensure that content of rule.platform is in rule.platforms as
728
        # well
729
        if rule.platform is not None:
730
            rule.platforms.add(rule.platform)
731
732
        # Convert the platform names to CPE names
733
        # But only do it if an env_yaml was specified (otherwise there would be no product CPEs
734
        # to lookup), and the rule's prodtype matches the product being built
735
        # also if the rule already has cpe_platform_names specified (compiled rule)
736
        # do not evaluate platforms again
737
        if env_yaml and (
738
            env_yaml["product"] in parse_prodtype(rule.prodtype)
739
            or rule.prodtype == "all") and (
740
                product_cpes and not rule.cpe_platform_names):
741
            # parse platform definition and get CPEAL platform
742
            for platform in rule.platforms:
743
                cpe_platform = Platform.from_text(platform, product_cpes)
744
                cpe_platform = add_platform_if_not_defined(cpe_platform, product_cpes)
745
                rule.cpe_platform_names.add(cpe_platform.id_)
746
        # Only load policy specific content if rule doesn't have it defined yet
747
        if not rule.policy_specific_content:
748
            rule.load_policy_specific_content(yaml_file, env_yaml)
749
750
        if sce_metadata and rule.id_ in sce_metadata:
751
            rule.sce_metadata = sce_metadata[rule.id_]
752
            rule.sce_metadata["relative_path"] = os.path.join(
753
                env_yaml["product"], "checks/sce", rule.sce_metadata['filename'])
754
755
        rule.validate_prodtype(yaml_file)
756
        rule.validate_identifiers(yaml_file)
757
        rule.validate_references(yaml_file)
758
        return rule
759
760
    def _verify_disa_cci_format(self):
761
        cci_id = self.references.get("disa", None)
762
        if not cci_id:
763
            return
764
        cci_ex = re.compile(r'^CCI-[0-9]{6}$')
765
        for cci in cci_id.split(","):
766
            if not cci_ex.match(cci):
767
                raise ValueError("CCI '{}' is in the wrong format! "
768
                                 "Format should be similar to: "
769
                                 "CCI-XXXXXX".format(cci))
770
        self.references["disa"] = cci_id
771
772
    def normalize(self, product):
773
        try:
774
            self.make_refs_and_identifiers_product_specific(product)
775
            self.make_template_product_specific(product)
776
        except Exception as exc:
777
            msg = (
778
                "Error normalizing '{rule}': {msg}"
779
                .format(rule=self.id_, msg=str(exc))
780
            )
781
            raise RuntimeError(msg)
782
783
    def add_stig_references(self, stig_references):
784
        stig_id = self.references.get("stigid", None)
785
        if not stig_id:
786
            return
787
788
        references = []
789
        for id in stig_id.split(","):
790
            reference = stig_references.get(id, None)
791
            if not reference:
792
                continue
793
            references.append(reference)
794
795
        if references:
796
            self.references["stigref"] = ",".join(references)
797
798
    def _get_product_only_references(self):
799
        product_references = dict()
800
801
        for ref in Rule.PRODUCT_REFERENCES:
802
            start = "{0}@".format(ref)
803
            for gref, gval in self.references.items():
804
                if ref == gref or gref.startswith(start):
805
                    product_references[gref] = gval
806
        return product_references
807
808
    def find_policy_specific_content(self, rule_root):
809
        policy_specific_dir = os.path.join(rule_root, "policy")
810
        policy_directories = glob.glob(os.path.join(policy_specific_dir, "*"))
811
        filenames = set()
812
        for pdir in policy_directories:
813
            policy_files = glob.glob(os.path.join(pdir, "*.yml"))
814
            filenames.update(set(policy_files))
815
        return filenames
816
817
    def triage_policy_specific_content(self, product_name, filenames):
818
        product_dot_yml = product_name + ".yml"
819
        filename_by_policy = dict()
820
        for fname in filenames:
821
            policy = os.path.basename(os.path.dirname(fname))
822
            filename_appropriate_for_storage = (
823
                fname.endswith(product_dot_yml) and product_name
824
                or fname.endswith("shared.yml") and policy not in filename_by_policy)
825
            if (filename_appropriate_for_storage):
826
                filename_by_policy[policy] = fname
827
        return filename_by_policy
828
829
    def read_policy_specific_content_file(self, env_yaml, filename):
830
        yaml_data = open_and_macro_expand(filename, env_yaml)
831
        return yaml_data
832
833
    def read_policy_specific_content(self, env_yaml, files):
834
        keys = dict()
835
        if env_yaml:
836
            product = env_yaml["product"]
837
        else:
838
            product = ""
839
        filename_by_policy = self.triage_policy_specific_content(product, files)
840
        for p, f in filename_by_policy.items():
841
            yaml_data = self.read_policy_specific_content_file(env_yaml, f)
842
            keys[p] = yaml_data
843
        return keys
844
845
    def load_policy_specific_content(self, rule_filename, env_yaml):
846
        rule_root = os.path.dirname(rule_filename)
847
        policy_specific_content_files = self.find_policy_specific_content(rule_root)
848
        policy_specific_content = dict()
849
        if policy_specific_content_files:
850
            policy_specific_content = self.read_policy_specific_content(
851
                env_yaml, policy_specific_content_files)
852
        self.policy_specific_content = policy_specific_content
853
854
    def get_template_context(self, env_yaml):
855
        ctx = super(Rule, self).get_template_context(env_yaml)
856
        if self.identifiers:
857
            ctx["cce_identifiers"] = self.identifiers
858
        return ctx
859
860
    def make_refs_and_identifiers_product_specific(self, product):
861
        product_suffix = "@{0}".format(product)
862
863
        product_references = self._get_product_only_references()
864
        general_references = self.references.copy()
865
        for todel in product_references:
866
            general_references.pop(todel)
867
        for ref in Rule.PRODUCT_REFERENCES:
868
            if ref in general_references:
869
                msg = "Unexpected reference identifier ({0}) without "
870
                msg += "product qualifier ({0}@{1}) while building rule "
871
                msg += "{2}"
872
                msg = msg.format(ref, product, self.id_)
873
                raise ValueError(msg)
874
875
        to_set = dict(
876
            identifiers=(self.identifiers, False),
877
            general_references=(general_references, True),
878
            product_references=(product_references, False),
879
        )
880
        for name, (dic, allow_overwrites) in to_set.items():
881
            try:
882
                new_items = make_items_product_specific(
883
                    dic, product_suffix, allow_overwrites)
884
            except ValueError as exc:
885
                msg = (
886
                    "Error processing {what} for rule '{rid}': {msg}"
887
                    .format(what=name, rid=self.id_, msg=str(exc))
888
                )
889
                raise ValueError(msg)
890
            dic.clear()
891
            dic.update(new_items)
892
893
        self.references = general_references
894
        self._verify_disa_cci_format()
895
        self.references.update(product_references)
896
897
    def validate_identifiers(self, yaml_file):
898
        if self.identifiers is None:
899
            raise ValueError("Empty identifier section in file %s" % yaml_file)
900
901
        # Validate all identifiers are non-empty:
902
        for ident_type, ident_val in self.identifiers.items():
903
            if not isinstance(ident_type, str) or not isinstance(ident_val, str):
904
                raise ValueError("Identifiers and values must be strings: %s in file %s"
905
                                 % (ident_type, yaml_file))
906
            if ident_val.strip() == "":
907
                raise ValueError("Identifiers must not be empty: %s in file %s"
908
                                 % (ident_type, yaml_file))
909
            if ident_type[0:3] == 'cce':
910
                if not is_cce_format_valid(ident_val):
911
                    raise ValueError("CCE Identifier format must be valid: invalid format '%s' for CEE '%s'"
912
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
913
                if not is_cce_value_valid("CCE-" + ident_val):
914
                    raise ValueError("CCE Identifier value is not a valid checksum: invalid value '%s' for CEE '%s'"
915
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
916
917
    def validate_references(self, yaml_file):
918
        if self.references is None:
919
            raise ValueError("Empty references section in file %s" % yaml_file)
920
921
        for ref_type, ref_val in self.references.items():
922
            if not isinstance(ref_type, str) or not isinstance(ref_val, str):
923
                raise ValueError("References and values must be strings: %s in file %s"
924
                                 % (ref_type, yaml_file))
925
            if ref_val.strip() == "":
926
                raise ValueError("References must not be empty: %s in file %s"
927
                                 % (ref_type, yaml_file))
928
929
        for ref_type, ref_val in self.references.items():
930
            for ref in ref_val.split(","):
931
                if ref.strip() != ref:
932
                    msg = (
933
                        "Comma-separated '{ref_type}' reference "
934
                        "in {yaml_file} contains whitespace."
935
                        .format(ref_type=ref_type, yaml_file=yaml_file))
936
                    raise ValueError(msg)
937
938
    def validate_prodtype(self, yaml_file):
939
        for ptype in self.prodtype.split(","):
940
            if ptype.strip() != ptype:
941
                msg = (
942
                    "Comma-separated '{prodtype}' prodtype "
943
                    "in {yaml_file} contains whitespace."
944
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
945
                raise ValueError(msg)
946
947
    def add_fixes(self, fixes):
948
        self.fixes = fixes
949
950
    def _add_fixes_elements(self, rule_el):
951
        for fix_type, fix in self.fixes.items():
952
            fix_el = ET.SubElement(rule_el, "{%s}fix" % XCCDF12_NS)
953
            fix_el.set("system", FIX_TYPE_TO_SYSTEM[fix_type])
954
            fix_el.set("id", self.id_)
955
            fix_contents, config = fix
956
            for key in ssg.build_remediations.REMEDIATION_ELM_KEYS:
957
                if config[key]:
958
                    fix_el.set(key, config[key])
959
            fix_el.text = fix_contents + "\n"
960
            # Expand shell variables and remediation functions
961
            # into corresponding XCCDF <sub> elements
962
            ssg.build_remediations.expand_xccdf_subs(fix_el, fix_type)
963
964
    def to_xml_element(self, env_yaml=None):
965
        rule = ET.Element('{%s}Rule' % XCCDF12_NS)
966
        rule.set('selected', 'false')
967
        rule.set('id', OSCAP_RULE + self.id_)
968
        rule.set('severity', self.severity)
969
        add_sub_element(rule, 'title', XCCDF12_NS, self.title)
970
        add_sub_element(rule, 'description', XCCDF12_NS, self.description)
971
        add_warning_elements(rule, self.warnings)
972
973
        if env_yaml:
974
            ref_uri_dict = env_yaml['reference_uris']
975
        else:
976
            ref_uri_dict = SSG_REF_URIS
977
        add_reference_elements(rule, self.references, ref_uri_dict)
978
979
        add_sub_element(rule, 'rationale', XCCDF12_NS, self.rationale)
980
981
        for cpe_platform_name in sorted(self.cpe_platform_names):
982
            platform_el = ET.SubElement(rule, "{%s}platform" % XCCDF12_NS)
983
            platform_el.set("idref", "#"+cpe_platform_name)
984
985
        add_nondata_subelements(
986
            rule, "requires", "idref",
987
            list(map(lambda x: OSCAP_RULE + x, self.requires)))
988
        add_nondata_subelements(
989
            rule, "conflicts", "idref",
990
            list(map(lambda x: OSCAP_RULE + x, self.conflicts)))
991
992
        for ident_type, ident_val in self.identifiers.items():
993
            ident = ET.SubElement(rule, '{%s}ident' % XCCDF12_NS)
994
            if ident_type == 'cce':
995
                ident.set('system', cce_uri)
996
                ident.text = ident_val
997
        self._add_fixes_elements(rule)
998
999
        ocil_parent = rule
1000
        check_parent = rule
1001
1002
        if self.sce_metadata:
1003
            # TODO: This is pretty much another hack, just like the previous OVAL
1004
            # one. However, we avoided the external SCE content as I'm not sure it
1005
            # is generally useful (unlike say, CVE checking with external OVAL)
1006
            #
1007
            # Additionally, we build the content (check subelement) here rather
1008
            # than in xslt due to the nature of our SCE metadata.
1009
            #
1010
            # Finally, before we begin, we might have an element with both SCE
1011
            # and OVAL. We have no way of knowing (right here) whether that is
1012
            # the case (due to a variety of issues, most notably, that linking
1013
            # hasn't yet occurred). So we must rely on the content author's
1014
            # good will, by annotating SCE content with a complex-check tag
1015
            # if necessary.
1016
1017
            if 'complex-check' in self.sce_metadata:
1018
                # Here we have an issue: XCCDF allows EITHER one or more check
1019
                # elements OR a single complex-check. While we have an explicit
1020
                # case handling the OVAL-and-SCE interaction, OCIL entries have
1021
                # (historically) been alongside OVAL content and been in an
1022
                # "OR" manner -- preferring OVAL to SCE. In order to accomplish
1023
                # this, we thus need to add _yet another parent_ when OCIL data
1024
                # is present, and add update ocil_parent accordingly.
1025
                if self.ocil or self.ocil_clause:
1026
                    ocil_parent = ET.SubElement(
1027
                        ocil_parent, "{%s}complex-check" % XCCDF12_NS)
1028
                    ocil_parent.set('operator', 'OR')
1029
1030
                check_parent = ET.SubElement(
1031
                    ocil_parent, "{%s}complex-check" % XCCDF12_NS)
1032
                check_parent.set('operator', self.sce_metadata['complex-check'])
1033
1034
            # Now, add the SCE check element to the tree.
1035
            check = ET.SubElement(check_parent, "{%s}check" % XCCDF12_NS)
1036
            check.set("system", SCE_SYSTEM)
1037
1038
            if 'check-import' in self.sce_metadata:
1039
                if isinstance(self.sce_metadata['check-import'], str):
1040
                    self.sce_metadata['check-import'] = [self.sce_metadata['check-import']]
1041
                for entry in self.sce_metadata['check-import']:
1042
                    check_import = ET.SubElement(
1043
                        check, '{%s}check-import' % XCCDF12_NS)
1044
                    check_import.set('import-name', entry)
1045
                    check_import.text = None
1046
1047
            if 'check-export' in self.sce_metadata:
1048
                if isinstance(self.sce_metadata['check-export'], str):
1049
                    self.sce_metadata['check-export'] = [self.sce_metadata['check-export']]
1050
                for entry in self.sce_metadata['check-export']:
1051
                    export, value = entry.split('=')
1052
                    check_export = ET.SubElement(
1053
                        check, '{%s}check-export' % XCCDF12_NS)
1054
                    check_export.set('value-id', value)
1055
                    check_export.set('export-name', export)
1056
                    check_export.text = None
1057
1058
            check_ref = ET.SubElement(
1059
                check, "{%s}check-content-ref" % XCCDF12_NS)
1060
            href = self.sce_metadata['relative_path']
1061
            check_ref.set("href", href)
1062
1063
        check = ET.SubElement(check_parent, '{%s}check' % XCCDF12_NS)
1064
        check.set("system", oval_namespace)
1065
        check_content_ref = ET.SubElement(
1066
            check, "{%s}check-content-ref" % XCCDF12_NS)
1067
        if self.oval_external_content:
1068
            check_content_ref.set("href", self.oval_external_content)
1069
        else:
1070
            # TODO: This is pretty much a hack, oval ID will be the same as rule ID
1071
            #       and we don't want the developers to have to keep them in sync.
1072
            #       Therefore let's just add an OVAL ref of that ID.
1073
            # TODO  Can we not add the check element if the rule doesn't have an OVAL check?
1074
            #       At the moment, the check elements of rules without OVAL are removed by
1075
            #       the OVALFileLinker class.
1076
            check_content_ref.set("href", "oval-unlinked.xml")
1077
            check_content_ref.set("name", self.id_)
1078
1079
        patches_up_to_date = (self.id_ == "security_patches_up_to_date")
1080
        if (self.ocil or self.ocil_clause) and not patches_up_to_date:
1081
            ocil_check = ET.SubElement(check_parent, "{%s}check" % XCCDF12_NS)
1082
            ocil_check.set("system", ocil_cs)
1083
            ocil_check_ref = ET.SubElement(
1084
                ocil_check, "{%s}check-content-ref" % XCCDF12_NS)
1085
            ocil_check_ref.set("href", "ocil-unlinked.xml")
1086
            ocil_check_ref.set("name", self.id_ + "_ocil")
1087
1088
        return rule
1089
1090
    def to_ocil(self):
1091
        if not self.ocil and not self.ocil_clause:
1092
            raise ValueError("Rule {0} doesn't have OCIL".format(self.id_))
1093
        # Create <questionnaire> for the rule
1094
        questionnaire = ET.Element("{%s}questionnaire" % ocil_namespace, id=self.id_ + "_ocil")
1095
        title = ET.SubElement(questionnaire, "{%s}title" % ocil_namespace)
1096
        title.text = self.title
1097
        actions = ET.SubElement(questionnaire, "{%s}actions" % ocil_namespace)
1098
        test_action_ref = ET.SubElement(actions, "{%s}test_action_ref" % ocil_namespace)
1099
        test_action_ref.text = self.id_ + "_action"
1100
        # Create <boolean_question_test_action> for the rule
1101
        action = ET.Element(
1102
            "{%s}boolean_question_test_action" % ocil_namespace,
1103
            id=self.id_ + "_action",
1104
            question_ref=self.id_ + "_question")
1105
        when_true = ET.SubElement(action, "{%s}when_true" % ocil_namespace)
1106
        result = ET.SubElement(when_true, "{%s}result" % ocil_namespace)
1107
        result.text = "PASS"
1108
        when_true = ET.SubElement(action, "{%s}when_false" % ocil_namespace)
1109
        result = ET.SubElement(when_true, "{%s}result" % ocil_namespace)
1110
        result.text = "FAIL"
1111
        # Create <boolean_question>
1112
        boolean_question = ET.Element(
1113
            "{%s}boolean_question" % ocil_namespace, id=self.id_ + "_question")
1114
        # TODO: The contents of <question_text> element used to be broken in
1115
        # the legacy XSLT implementation. The following code contains hacks
1116
        # to get the same results as in the legacy XSLT implementation.
1117
        # This enabled us a smooth transition to new OCIL generator
1118
        # without a need to mass-edit rule YAML files.
1119
        # We need to solve:
1120
        # TODO: using variables (aka XCCDF Values) in OCIL content
1121
        # TODO: using HTML formating tags eg. <pre> in OCIL content
1122
        #
1123
        # The "ocil" key in compiled rules contains HTML and XML elements
1124
        # but OCIL question texts shouldn't contain HTML or XML elements,
1125
        # therefore removing them.
1126
        if self.ocil is not None:
1127
            ocil_without_tags = re.sub(r"</?[^>]+>", "", self.ocil)
1128
        else:
1129
            ocil_without_tags = ""
1130
        # The "ocil" key in compiled rules contains XML entities which would
1131
        # be escaped by ET.Subelement() so we need to use add_sub_element()
1132
        # instead because we don't want to escape them.
1133
        question_text = add_sub_element(
1134
            boolean_question, "question_text", ocil_namespace,
1135
            ocil_without_tags)
1136
        # The "ocil_clause" key in compiled rules also contains HTML and XML
1137
        # elements but unlike the "ocil" we want to escape the '<' and '>'
1138
        # characters.
1139
        # The empty ocil_clause causing broken question is in line with the
1140
        # legacy XSLT implementation.
1141
        ocil_clause = self.ocil_clause if self.ocil_clause else ""
1142
        question_text.text = (
1143
            u"{0}\n      Is it the case that {1}?\n      ".format(
1144
                question_text.text if question_text.text is not None else "",
1145
                ocil_clause))
1146
        return (questionnaire, action, boolean_question)
1147
1148
    def __hash__(self):
1149
        """ Controls are meant to be unique, so using the
1150
        ID should suffice"""
1151
        return hash(self.id_)
1152
1153
    def __eq__(self, other):
1154
        return isinstance(other, self.__class__) and self.id_ == other.id_
1155
1156
    def __ne__(self, other):
1157
        return not self != other
1158
1159
    def __lt__(self, other):
1160
        return self.id_ < other.id_
1161
1162
    def __str__(self):
1163
        return self.id_
1164
1165
1166
class DirectoryLoader(object):
1167
    def __init__(self, profiles_dir, env_yaml, product_cpes):
1168
        self.benchmark_file = None
1169
        self.group_file = None
1170
        self.loaded_group = None
1171
        self.rule_files = []
1172
        self.value_files = []
1173
        self.subdirectories = []
1174
1175
        self.all_values = dict()
1176
        self.all_rules = dict()
1177
        self.all_groups = dict()
1178
1179
        self.profiles_dir = profiles_dir
1180
        self.env_yaml = env_yaml
1181
        self.product = env_yaml["product"]
1182
1183
        self.parent_group = None
1184
        self.product_cpes = product_cpes
1185
1186
    def _collect_items_to_load(self, guide_directory):
1187
        for dir_item in sorted(os.listdir(guide_directory)):
1188
            dir_item_path = os.path.join(guide_directory, dir_item)
1189
            _, extension = os.path.splitext(dir_item)
1190
1191
            if extension == '.var':
1192
                self.value_files.append(dir_item_path)
1193
            elif dir_item == "benchmark.yml":
1194
                if self.benchmark_file:
1195
                    raise ValueError("Multiple benchmarks in one directory")
1196
                self.benchmark_file = dir_item_path
1197
            elif dir_item == "group.yml":
1198
                if self.group_file:
1199
                    raise ValueError("Multiple groups in one directory")
1200
                self.group_file = dir_item_path
1201
            elif extension == '.rule':
1202
                self.rule_files.append(dir_item_path)
1203
            elif is_rule_dir(dir_item_path):
1204
                self.rule_files.append(get_rule_dir_yaml(dir_item_path))
1205
            elif dir_item != "tests":
1206
                if os.path.isdir(dir_item_path):
1207
                    self.subdirectories.append(dir_item_path)
1208
                else:
1209
                    sys.stderr.write(
1210
                        "Encountered file '%s' while recursing, extension '%s' "
1211
                        "is unknown. Skipping..\n"
1212
                        % (dir_item, extension)
1213
                    )
1214
1215
    def load_benchmark_or_group(self, guide_directory):
1216
        """
1217
        Loads a given benchmark or group from the specified benchmark_file or
1218
        group_file, in the context of guide_directory, profiles_dir and env_yaml.
1219
1220
        Returns the loaded group or benchmark.
1221
        """
1222
        group = None
1223
        if self.group_file and self.benchmark_file:
1224
            raise ValueError("A .benchmark file and a .group file were found in "
1225
                             "the same directory '%s'" % (guide_directory))
1226
1227
        # we treat benchmark as a special form of group in the following code
1228
        if self.benchmark_file:
1229
            group = Benchmark.from_yaml(
1230
                self.benchmark_file, self.env_yaml, self.product_cpes
1231
            )
1232
            if self.profiles_dir:
1233
                group.add_profiles_from_dir(self.profiles_dir, self.env_yaml)
1234
1235
        if self.group_file:
1236
            group = Group.from_yaml(self.group_file, self.env_yaml, self.product_cpes)
1237
            prodtypes = parse_prodtype(group.prodtype)
1238
            if "all" in prodtypes or self.product in prodtypes:
1239
                self.all_groups[group.id_] = group
1240
            else:
1241
                return None
1242
1243
        return group
1244
1245
    def _load_group_process_and_recurse(self, guide_directory):
1246
        self.loaded_group = self.load_benchmark_or_group(guide_directory)
1247
1248
        if self.loaded_group:
1249
1250
            if self.parent_group:
1251
                self.parent_group.add_group(
1252
                    self.loaded_group, env_yaml=self.env_yaml, product_cpes=self.product_cpes)
1253
1254
            self._process_values()
1255
            self._recurse_into_subdirs()
1256
            self._process_rules()
1257
1258
    def process_directory_tree(self, start_dir, extra_group_dirs=None):
1259
        self._collect_items_to_load(start_dir)
1260
        if extra_group_dirs:
1261
            self.subdirectories += extra_group_dirs
1262
        self._load_group_process_and_recurse(start_dir)
1263
1264
    def process_directory_trees(self, directories):
1265
        start_dir = directories[0]
1266
        extra_group_dirs = directories[1:]
1267
        return self.process_directory_tree(start_dir, extra_group_dirs)
1268
1269
    def _recurse_into_subdirs(self):
1270
        for subdir in self.subdirectories:
1271
            loader = self._get_new_loader()
1272
            loader.parent_group = self.loaded_group
1273
            loader.process_directory_tree(subdir)
1274
            self.all_values.update(loader.all_values)
1275
            self.all_rules.update(loader.all_rules)
1276
            self.all_groups.update(loader.all_groups)
1277
1278
    def _get_new_loader(self):
1279
        raise NotImplementedError()
1280
1281
    def _process_values(self):
1282
        raise NotImplementedError()
1283
1284
    def _process_rules(self):
1285
        raise NotImplementedError()
1286
1287
    def save_all_entities(self, base_dir):
1288
        destdir = os.path.join(base_dir, "rules")
1289
        mkdir_p(destdir)
1290
        if self.all_rules:
1291
            self.save_entities(self.all_rules.values(), destdir)
1292
1293
        destdir = os.path.join(base_dir, "groups")
1294
        mkdir_p(destdir)
1295
        if self.all_groups:
1296
            self.save_entities(self.all_groups.values(), destdir)
1297
1298
        destdir = os.path.join(base_dir, "values")
1299
        mkdir_p(destdir)
1300
        if self.all_values:
1301
            self.save_entities(self.all_values.values(), destdir)
1302
1303
        destdir = os.path.join(base_dir, "platforms")
1304
        mkdir_p(destdir)
1305
        if self.product_cpes.platforms:
1306
            self.save_entities(self.product_cpes.platforms.values(), destdir)
1307
1308
        destdir = os.path.join(base_dir, "cpe_items")
1309
        mkdir_p(destdir)
1310
        if self.product_cpes.cpes_by_id:
1311
            self.save_entities(self.product_cpes.cpes_by_id.values(), destdir)
1312
1313
    def save_entities(self, entities, destdir):
1314
        if not entities:
1315
            return
1316
        for entity in entities:
1317
            basename = entity.id_ + ".yml"
1318
            dest_filename = os.path.join(destdir, basename)
1319
            entity.dump_yaml(dest_filename)
1320
1321
1322
class BuildLoader(DirectoryLoader):
1323
    def __init__(
1324
            self, profiles_dir, env_yaml, product_cpes,
1325
            sce_metadata_path=None, stig_reference_path=None):
1326
        super(BuildLoader, self).__init__(profiles_dir, env_yaml, product_cpes)
1327
1328
        self.sce_metadata = None
1329
        if sce_metadata_path and os.path.getsize(sce_metadata_path):
1330
            self.sce_metadata = json.load(open(sce_metadata_path, 'r'))
1331
        self.stig_references = None
1332
        if stig_reference_path:
1333
            self.stig_references = ssg.build_stig.map_versions_to_rule_ids(stig_reference_path)
1334
        self.components_dir = None
1335
        self.rule_to_components = self._load_components()
1336
1337
    def _load_components(self):
1338
        if "components_root" not in self.env_yaml:
1339
            return None
1340
        product_dir = self.env_yaml["product_dir"]
1341
        components_root = self.env_yaml["components_root"]
1342
        self.components_dir = os.path.abspath(
1343
            os.path.join(product_dir, components_root))
1344
        components = ssg.components.load(self.components_dir)
1345
        rule_to_components = ssg.components.rule_component_mapping(
1346
            components)
1347
        return rule_to_components
1348
1349
    def _process_values(self):
1350
        for value_yaml in self.value_files:
1351
            value = Value.from_yaml(value_yaml, self.env_yaml)
1352
            self.all_values[value.id_] = value
1353
            self.loaded_group.add_value(value)
1354
1355
    def _process_rule(self, rule):
1356
        if self.rule_to_components is not None and rule.id_ not in self.rule_to_components:
1357
            raise ValueError(
1358
                "The rule '%s' isn't mapped to any component! Insert the "
1359
                "rule ID to at least one file in '%s'." %
1360
                (rule.id_, self.components_dir))
1361
        prodtypes = parse_prodtype(rule.prodtype)
1362
        if "all" not in prodtypes and self.product not in prodtypes:
1363
            return False
1364
        self.all_rules[rule.id_] = rule
1365
        self.loaded_group.add_rule(
1366
            rule, env_yaml=self.env_yaml, product_cpes=self.product_cpes)
1367
        rule.normalize(self.env_yaml["product"])
1368
        if self.stig_references:
1369
            rule.add_stig_references(self.stig_references)
1370
        if self.rule_to_components is not None:
1371
            rule.components = self.rule_to_components[rule.id_]
1372
        return True
1373
1374
    def _process_rules(self):
1375
        for rule_yaml in self.rule_files:
1376
            try:
1377
                rule = Rule.from_yaml(
1378
                    rule_yaml, self.env_yaml, self.product_cpes, self.sce_metadata)
1379
            except DocumentationNotComplete:
1380
                # Happens on non-debug build when a rule is "documentation-incomplete"
1381
                continue
1382
            if not self._process_rule(rule):
1383
                continue
1384
1385
    def _get_new_loader(self):
1386
        loader = BuildLoader(
1387
            self.profiles_dir, self.env_yaml, self.product_cpes)
1388
        # Do it this way so we only have to parse the SCE metadata once.
1389
        loader.sce_metadata = self.sce_metadata
1390
        # Do it this way so we only have to parse the STIG references once.
1391
        loader.stig_references = self.stig_references
1392
        # Do it this way so we only have to parse the component metadata once.
1393
        loader.rule_to_components = self.rule_to_components
1394
        return loader
1395
1396
    def export_group_to_file(self, filename):
1397
        return self.loaded_group.to_file(filename)
1398
1399
1400
class LinearLoader(object):
1401
    def __init__(self, env_yaml, resolved_path):
1402
        self.resolved_rules_dir = os.path.join(resolved_path, "rules")
1403
        self.rules = dict()
1404
1405
        self.resolved_profiles_dir = os.path.join(resolved_path, "profiles")
1406
        self.profiles = dict()
1407
1408
        self.resolved_groups_dir = os.path.join(resolved_path, "groups")
1409
        self.groups = dict()
1410
1411
        self.resolved_values_dir = os.path.join(resolved_path, "values")
1412
        self.values = dict()
1413
1414
        self.resolved_platforms_dir = os.path.join(resolved_path, "platforms")
1415
        self.platforms = dict()
1416
1417
        self.fixes_dir = os.path.join(resolved_path, "fixes")
1418
        self.fixes = dict()
1419
1420
        self.resolved_cpe_items_dir = os.path.join(resolved_path, "cpe_items")
1421
        self.cpe_items = dict()
1422
1423
        self.benchmark = None
1424
        self.env_yaml = env_yaml
1425
        self.product_cpes = ProductCPEs()
1426
1427
    def find_first_groups_ids(self, start_dir):
1428
        group_files = glob.glob(os.path.join(start_dir, "*", "group.yml"))
1429
        group_ids = [fname.split(os.path.sep)[-2] for fname in group_files]
1430
        return group_ids
1431
1432
    def load_entities_by_id(self, filenames, destination, cls):
1433
        for fname in filenames:
1434
            entity = cls.from_yaml(fname, self.env_yaml, self.product_cpes)
1435
            destination[entity.id_] = entity
1436
1437
    def add_fixes_to_rules(self):
1438
        for rule_id, rule_fixes in self.fixes.items():
1439
            self.rules[rule_id].add_fixes(rule_fixes)
1440
1441
    def load_benchmark(self, directory):
1442
        self.benchmark = Benchmark.from_yaml(
1443
            os.path.join(directory, "benchmark.yml"), self.env_yaml, self.product_cpes)
1444
1445
        self.benchmark.add_profiles_from_dir(
1446
            self.resolved_profiles_dir, self.env_yaml, self.product_cpes)
1447
1448
        benchmark_first_groups = self.find_first_groups_ids(directory)
1449
        for gid in benchmark_first_groups:
1450
            try:
1451
                self.benchmark.add_group(self.groups[gid], self.env_yaml, self.product_cpes)
1452
            except KeyError as exc:
1453
                # Add only the groups we have compiled and loaded
1454
                pass
1455
        self.benchmark.unselect_empty_groups()
1456
1457
    def load_compiled_content(self):
1458
        self.product_cpes.load_cpes_from_directory_tree(self.resolved_cpe_items_dir, self.env_yaml)
1459
1460
        self.fixes = ssg.build_remediations.load_compiled_remediations(self.fixes_dir)
1461
1462
        filenames = glob.glob(os.path.join(self.resolved_rules_dir, "*.yml"))
1463
        self.load_entities_by_id(filenames, self.rules, Rule)
1464
1465
        filenames = glob.glob(os.path.join(self.resolved_groups_dir, "*.yml"))
1466
        self.load_entities_by_id(filenames, self.groups, Group)
1467
1468
        filenames = glob.glob(os.path.join(self.resolved_values_dir, "*.yml"))
1469
        self.load_entities_by_id(filenames, self.values, Value)
1470
1471
        filenames = glob.glob(os.path.join(self.resolved_platforms_dir, "*.yml"))
1472
        self.load_entities_by_id(filenames, self.platforms, Platform)
1473
        self.product_cpes.platforms = self.platforms
1474
1475
        for g in self.groups.values():
1476
            g.load_entities(self.rules, self.values, self.groups)
1477
1478
    def export_benchmark_to_xml(self):
1479
        return self.benchmark.to_xml_element(self.env_yaml)
1480
1481
    def export_benchmark_to_file(self, filename):
1482
        register_namespaces()
1483
        return self.benchmark.to_file(filename, self.env_yaml)
1484
1485
    def export_ocil_to_xml(self):
1486
        root = ET.Element('{%s}ocil' % ocil_namespace)
1487
        root.set('xmlns:xsi', xsi_namespace)
1488
        root.set("xmlns:xhtml", xhtml_namespace)
1489
        generator = ET.SubElement(root, "{%s}generator" % ocil_namespace)
1490
        product_name = ET.SubElement(generator, "{%s}product_name" % ocil_namespace)
1491
        product_name.text = "build_shorthand.py from SCAP Security Guide"
1492
        product_version = ET.SubElement(generator, "{%s}product_version" % ocil_namespace)
1493
        product_version.text = "ssg: " + self.env_yaml["ssg_version_str"]
1494
        schema_version = ET.SubElement(generator, "{%s}schema_version" % ocil_namespace)
1495
        schema_version.text = "2.0"
1496
        timestamp_el = ET.SubElement(generator, "{%s}timestamp" % ocil_namespace)
1497
        timestamp_el.text = timestamp
1498
        questionnaires = ET.SubElement(root, "{%s}questionnaires" % ocil_namespace)
1499
        test_actions = ET.SubElement(root, "{%s}test_actions" % ocil_namespace)
1500
        questions = ET.SubElement(root, "{%s}questions" % ocil_namespace)
1501
        for rule in self.rules.values():
1502
            if not rule.ocil and not rule.ocil_clause:
1503
                continue
1504
            questionnaire, action, boolean_question = rule.to_ocil()
1505
            questionnaires.append(questionnaire)
1506
            test_actions.append(action)
1507
            questions.append(boolean_question)
1508
        return root
1509
1510
    def export_ocil_to_file(self, filename):
1511
        root = self.export_ocil_to_xml()
1512
        tree = ET.ElementTree(root)
1513
        tree.write(filename)
1514
1515
1516
class Platform(XCCDFEntity):
1517
1518
    KEYS = dict(
1519
        name=lambda: "",
1520
        original_expression=lambda: "",
1521
        xml_content=lambda: "",
1522
        bash_conditional=lambda: "",
1523
        ansible_conditional=lambda: "",
1524
        ** XCCDFEntity.KEYS
1525
    )
1526
1527
    MANDATORY_KEYS = [
1528
        "name",
1529
        "xml_content",
1530
        "original_expression",
1531
        "bash_conditional",
1532
        "ansible_conditional"
1533
    ]
1534
1535
    prefix = "cpe-lang"
1536
    ns = PREFIX_TO_NS[prefix]
1537
1538
    @classmethod
1539
    def from_text(cls, expression, product_cpes):
1540
        if not product_cpes:
1541
            return None
1542
        test = product_cpes.algebra.parse(expression, simplify=True)
1543
        id_ = test.as_id()
1544
        platform = cls(id_)
1545
        platform.test = test
1546
        product_cpes.add_resolved_cpe_items_from_platform(platform)
1547
        platform.test.enrich_with_cpe_info(product_cpes)
1548
        platform.name = id_
1549
        platform.original_expression = expression
1550
        platform.xml_content = platform.get_xml()
1551
        platform.update_conditional_from_cpe_items("bash", product_cpes)
1552
        platform.update_conditional_from_cpe_items("ansible", product_cpes)
1553
        return platform
1554
1555
    def get_xml(self):
1556
        cpe_platform = ET.Element("{%s}platform" % Platform.ns)
1557
        cpe_platform.set('id', self.name)
1558
        # In case the platform contains only single CPE name, fake the logical test
1559
        # we have to adhere to CPE specification
1560
        if isinstance(self.test, CPEALCheckFactRef):
1561
            cpe_test = ET.Element("{%s}logical-test" % CPEALLogicalTest.ns)
1562
            cpe_test.set('operator', 'AND')
1563
            cpe_test.set('negate', 'false')
1564
            cpe_test.append(self.test.to_xml_element())
1565
            cpe_platform.append(cpe_test)
1566
        else:
1567
            cpe_platform.append(self.test.to_xml_element())
1568
        xmlstr = ET.tostring(cpe_platform).decode()
1569
        return xmlstr
1570
1571
    def to_xml_element(self):
1572
        return ET.fromstring(self.xml_content)
1573
1574
    def get_remediation_conditional(self, language):
1575
        if language == "bash":
1576
            return self.bash_conditional
1577
        elif language == "ansible":
1578
            return self.ansible_conditional
1579
        else:
1580
            raise AttributeError("Invalid remediation language {0} specified.".format(language))
1581
1582
    @classmethod
1583
    def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None):
1584
        platform = super(Platform, cls).from_yaml(yaml_file, env_yaml)
1585
        # If we received a product_cpes, we can restore also the original test object
1586
        # it can be later used e.g. for comparison
1587
        if product_cpes:
1588
            platform.test = product_cpes.algebra.parse(platform.original_expression, simplify=True)
1589
            product_cpes.add_resolved_cpe_items_from_platform(platform)
1590
        return platform
1591
1592
    def get_fact_refs(self):
1593
        return self.test.get_symbols()
1594
1595
    def update_conditional_from_cpe_items(self, language, product_cpes):
1596
        self.test.enrich_with_cpe_info(product_cpes)
1597
        if language == "bash":
1598
            self.bash_conditional = self.test.to_bash_conditional()
1599
        elif language == "ansible":
1600
            self.ansible_conditional = self.test.to_ansible_conditional()
1601
        else:
1602
            raise RuntimeError(
1603
                "Platform remediations do not support the {0} language".format(language))
1604
1605
    def __eq__(self, other):
1606
        if not isinstance(other, Platform):
1607
            return False
1608
        else:
1609
            return self.test == other.test
1610
1611
1612
def add_platform_if_not_defined(platform, product_cpes):
1613
    # check if the platform is already in the dictionary. If yes, return the existing one
1614
    for p in product_cpes.platforms.values():
1615
        if platform == p:
1616
            return p
1617
    product_cpes.platforms[platform.id_] = platform
1618
    return platform
1619