Test Failed
Push — master ( f41b3f...0bcc33 )
by Matěj
03:22 queued 37s
created

ssg.build_yaml.Rule.add_stig_references()   A

Complexity

Conditions 3

Size

Total Lines 8
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 9.0292

Importance

Changes 0
Metric Value
cc 3
eloc 8
nop 2
dl 0
loc 8
ccs 1
cts 8
cp 0.125
crap 9.0292
rs 10
c 0
b 0
f 0
1 2
from __future__ import absolute_import
2 2
from __future__ import print_function
3
4 2
from collections import defaultdict
5 2
from copy import deepcopy
6 2
import datetime
7 2
import json
8 2
import os
9 2
import os.path
10 2
import re
11 2
import sys
12 2
from xml.sax.saxutils import escape
13 2
import glob
14
15 2
import yaml
16
17 2
from .build_cpe import CPEDoesNotExist, CPEALLogicalTest, CPEALFactRef
18 2
from .constants import (XCCDF_REFINABLE_PROPERTIES,
19
                        SCE_SYSTEM,
20
                        cce_uri,
21
                        dc_namespace,
22
                        ocil_cs,
23
                        ocil_namespace,
24
                        oval_namespace,
25
                        xhtml_namespace,
26
                        xsi_namespace,
27
                        stig_ns,
28
                        timestamp,
29
                        SSG_BENCHMARK_LATEST_URI,
30
                        SSG_PROJECT_NAME,
31
                        SSG_REF_URIS,
32
                        PREFIX_TO_NS
33
                        )
34 2
from .rules import get_rule_dir_id, get_rule_dir_yaml, is_rule_dir
35 2
from .rule_yaml import parse_prodtype
36
37 2
from .cce import is_cce_format_valid, is_cce_value_valid
38 2
from .yaml import DocumentationNotComplete, open_and_expand, open_and_macro_expand
39 2
from .utils import required_key, mkdir_p
40
41 2
from .xml import ElementTree as ET, add_xhtml_namespace, register_namespaces, parse_file
42 2
from .shims import unicode_func
43 2
from .build_cpe import ProductCPEs
44 2
import ssg.build_stig
45
46 2
def dump_yaml_preferably_in_original_order(dictionary, file_object):
47
    try:
48
        return yaml.dump(dictionary, file_object, indent=4, sort_keys=False)
49
    except TypeError as exc:
50
        # Older versions of libyaml don't understand the sort_keys kwarg
51
        if "sort_keys" not in str(exc):
52
            raise exc
53
        return yaml.dump(dictionary, file_object, indent=4)
54
55
56 2
def add_sub_element(parent, tag, data):
57
    """
58
    Creates a new child element under parent with tag tag, and sets
59
    data as the content under the tag. In particular, data is a string
60
    to be parsed as an XML tree, allowing sub-elements of children to be
61
    added.
62
63
    If data should not be parsed as an XML tree, either escape the contents
64
    before passing into this function, or use ElementTree.SubElement().
65
66
    Returns the newly created subelement of type tag.
67
    """
68
    namespaced_data = add_xhtml_namespace(data)
69
    # This is used because our YAML data contain XML and XHTML elements
70
    # ET.SubElement() escapes the < > characters by &lt; and &gt;
71
    # and therefore it does not add child elements
72
    # we need to do a hack instead
73
    # TODO: Remove this function after we move to Markdown everywhere in SSG
74
    ustr = unicode_func('<{0} xmlns:xhtml="{2}">{1}</{0}>').format(tag, namespaced_data, xhtml_namespace)
75
76
    try:
77
        element = ET.fromstring(ustr.encode("utf-8"))
78
    except Exception:
79
        msg = ("Error adding subelement to an element '{0}' from string: '{1}'"
80
               .format(parent.tag, ustr))
81
        raise RuntimeError(msg)
82
83
    parent.append(element)
84
    return element
85
86
87 2
def reorder_according_to_ordering(unordered, ordering, regex=None):
88 2
    ordered = []
89 2
    if regex is None:
90 2
        regex = "|".join(["({0})".format(item) for item in ordering])
91 2
    regex = re.compile(regex)
92
93 2
    items_to_order = list(filter(regex.match, unordered))
94 2
    unordered = set(unordered)
95
96 2
    for priority_type in ordering:
97 2
        for item in items_to_order:
98 2
            if priority_type in item and item in unordered:
99 2
                ordered.append(item)
100 2
                unordered.remove(item)
101 2
    ordered.extend(sorted(unordered))
102 2
    return ordered
103
104
105 2
def add_warning_elements(element, warnings):
106
    # The use of [{dict}, {dict}] in warnings is to handle the following
107
    # scenario where multiple warnings have the same category which is
108
    # valid in SCAP and our content:
109
    #
110
    # warnings:
111
    #     - general: Some general warning
112
    #     - general: Some other general warning
113
    #     - general: |-
114
    #         Some really long multiline general warning
115
    #
116
    # Each of the {dict} should have only one key/value pair.
117
    for warning_dict in warnings:
118
        warning = add_sub_element(element, "warning", list(warning_dict.values())[0])
119
        warning.set("category", list(warning_dict.keys())[0])
120
121
122 2
def add_nondata_subelements(element, subelement, attribute, attr_data):
123
    """Add multiple iterations of a sublement that contains an attribute but no data
124
       For example, <requires id="my_required_id"/>"""
125
    for data in attr_data:
126
        req = ET.SubElement(element, subelement)
127
        req.set(attribute, data)
128
129
130 2
def check_warnings(xccdf_structure):
131 2
    for warning_list in xccdf_structure.warnings:
132 2
        if len(warning_list) != 1:
133
            msg = "Only one key/value pair should exist for each warnings dictionary"
134
            raise ValueError(msg)
135
136
137 2
def add_reference_elements(element, references, ref_uri_dict):
138
    for ref_type, ref_vals in references.items():
139
        for ref_val in ref_vals.split(","):
140
            # This assumes that a single srg key may have items from multiple SRG types
141
            if ref_type == 'srg':
142
                if ref_val.startswith('SRG-OS-'):
143
                    ref_href = ref_uri_dict['os-srg']
144
                elif ref_val.startswith('SRG-APP-'):
145
                    ref_href = ref_uri_dict['app-srg']
146
                else:
147
                    raise ValueError("SRG {0} doesn't have a URI defined.".format(ref_val))
148
            else:
149
                try:
150
                    ref_href = ref_uri_dict[ref_type]
151
                except KeyError as exc:
152
                    msg = (
153
                        "Error processing reference {0}: {1} in Rule {2}."
154
                        .format(ref_type, ref_vals, self.id_))
155
                    raise ValueError(msg)
156
157
            ref = ET.SubElement(element, 'reference')
158
            ref.set("href", ref_href)
159
            ref.text = ref_val
160
161
162 2
def add_benchmark_metadata(element, contributors_file):
163
    metadata = ET.SubElement(element, "metadata")
164
165
    publisher = ET.SubElement(metadata, "{%s}publisher" % dc_namespace)
166
    publisher.text = SSG_PROJECT_NAME
167
168
    creator = ET.SubElement(metadata, "{%s}creator" % dc_namespace)
169
    creator.text = SSG_PROJECT_NAME
170
171
    contrib_tree = parse_file(contributors_file)
172
    for c in contrib_tree.iter('contributor'):
173
        contributor = ET.SubElement(metadata, "{%s}contributor" % dc_namespace)
174
        contributor.text = c.text
175
176
    source = ET.SubElement(metadata, "{%s}source" % dc_namespace)
177
    source.text = SSG_BENCHMARK_LATEST_URI
178
179
180 2
class SelectionHandler(object):
181 2
    def __init__(self):
182 2
        self.refine_rules = defaultdict(list)
183 2
        self.variables = dict()
184 2
        self.unselected = []
185 2
        self.selected = []
186
187 2
    @property
188
    def selections(self):
189
        selections = []
190
        for item in self.selected:
191
            selections.append(str(item))
192
        for item in self.unselected:
193
            selections.append("!"+str(item))
194
        for varname in self.variables.keys():
195
            selections.append(varname+"="+self.variables.get(varname))
196
        for rule, refinements in self.refine_rules.items():
197
            for prop, val in refinements:
198
                selections.append("{rule}.{property}={value}"
199
                                  .format(rule=rule, property=prop, value=val))
200
        return selections
201
202 2
    @selections.setter
203
    def selections(self, entries):
204 2
        for item in entries:
205 2
            self.apply_selection(item)
206
207 2
    def apply_selection(self, item):
208 2
        if "." in item:
209
            rule, refinement = item.split(".", 1)
210
            property_, value = refinement.split("=", 1)
211
            if property_ not in XCCDF_REFINABLE_PROPERTIES:
212
                msg = ("Property '{property_}' cannot be refined. "
213
                       "Rule properties that can be refined are {refinables}. "
214
                       "Fix refinement '{rule_id}.{property_}={value}' in profile '{profile}'."
215
                       .format(property_=property_, refinables=XCCDF_REFINABLE_PROPERTIES,
216
                               rule_id=rule, value=value, profile=self.id_)
217
                       )
218
                raise ValueError(msg)
219
            self.refine_rules[rule].append((property_, value))
220 2
        elif "=" in item:
221 2
            varname, value = item.split("=", 1)
222 2
            self.variables[varname] = value
223 2
        elif item.startswith("!"):
224
            self.unselected.append(item[1:])
225
        else:
226 2
            self.selected.append(item)
227
228 2
    def _subtract_refinements(self, extended_refinements):
229
        """
230
        Given a dict of rule refinements from the extended profile,
231
        "undo" every refinement prefixed with '!' in this profile.
232
        """
233
        for rule, refinements in list(self.refine_rules.items()):
234
            if rule.startswith("!"):
235
                for prop, val in refinements:
236
                    extended_refinements[rule[1:]].remove((prop, val))
237
                del self.refine_rules[rule]
238
        return extended_refinements
239
240 2
    def update_with(self, rhs):
241
        extended_selects = set(rhs.selected)
242
        extra_selections = extended_selects.difference(set(self.selected))
243
        self.selected.extend(list(extra_selections))
244
245
        updated_variables = dict(rhs.variables)
246
        updated_variables.update(self.variables)
247
        self.variables = updated_variables
248
249
        extended_refinements = deepcopy(rhs.refine_rules)
250
        updated_refinements = self._subtract_refinements(extended_refinements)
251
        updated_refinements.update(self.refine_rules)
252
        self.refine_rules = updated_refinements
253
254
255 2
class XCCDFEntity(object):
256
    """
257
    This class can load itself from a YAML with Jinja macros,
258
    and it can also save itself to YAML.
259
260
    It is supposed to work with the content in the project,
261
    when entities are defined in the benchmark tree,
262
    and they are compiled into flat YAMLs to the build directory.
263
    """
264 2
    KEYS = dict(
265
            id_=lambda: "",
266
            definition_location=lambda: "",
267
    )
268
269 2
    MANDATORY_KEYS = set()
270
271 2
    GENERIC_FILENAME = ""
272 2
    ID_LABEL = "id"
273
274 2
    def __init__(self, id_):
275 2
        super(XCCDFEntity, self).__init__()
276 2
        self._assign_defaults()
277 2
        self.id_ = id_
278
279 2
    def _assign_defaults(self):
280 2
        for key, default in self.KEYS.items():
281 2
            default_val = default()
282 2
            if isinstance(default_val, RuntimeError):
283
                default_val = None
284 2
            setattr(self, key, default_val)
285
286 2
    @classmethod
287
    def get_instance_from_full_dict(cls, data):
288
        """
289
        Given a defining dictionary, produce an instance
290
        by treating all dict elements as attributes.
291
292
        Extend this if you want tight control over the instance creation process.
293
        """
294 2
        entity = cls(data["id_"])
295 2
        for key, value in data.items():
296 2
            setattr(entity, key, value)
297 2
        return entity
298
299 2
    @classmethod
300 2
    def process_input_dict(cls, input_contents, env_yaml, product_cpes=None):
301
        """
302
        Take the contents of the definition as a dictionary, and
303
        add defaults or raise errors if a required member is not present.
304
305
        Extend this if you want to add, remove or alter the result
306
        that will constitute the new instance.
307
        """
308 2
        data = dict()
309
310 2
        for key, default in cls.KEYS.items():
311 2
            if key in input_contents:
312 2
                if input_contents[key] is not None:
313 2
                    data[key] = input_contents[key]
314 2
                del input_contents[key]
315 2
                continue
316
317 2
            if key not in cls.MANDATORY_KEYS:
318 2
                data[key] = cls.KEYS[key]()
319
            else:
320
                msg = (
321
                    "Key '{key}' is mandatory for definition of '{class_name}'."
322
                    .format(key=key, class_name=cls.__name__))
323
                raise ValueError(msg)
324
325 2
        return data
326
327 2
    @classmethod
328 2
    def parse_yaml_into_processed_dict(cls, yaml_file, env_yaml=None, product_cpes=None):
329
        """
330
        Given yaml filename and environment info, produce a dictionary
331
        that defines the instance to be created.
332
        This wraps :meth:`process_input_dict` and it adds generic keys on the top:
333
334
        - `id_` as the entity ID that is deduced either from thefilename,
335
          or from the parent directory name.
336
        - `definition_location` as the original location whenre the entity got defined.
337
        """
338 2
        file_basename = os.path.basename(yaml_file)
339 2
        entity_id = file_basename.split(".")[0]
340 2
        if file_basename == cls.GENERIC_FILENAME:
341 2
            entity_id = os.path.basename(os.path.dirname(yaml_file))
342
343 2
        if env_yaml:
344 2
            env_yaml[cls.ID_LABEL] = entity_id
345 2
        yaml_data = open_and_macro_expand(yaml_file, env_yaml)
346
347 2
        try:
348 2
            processed_data = cls.process_input_dict(yaml_data, env_yaml, product_cpes)
349
        except ValueError as exc:
350
            msg = (
351
                "Error processing {yaml_file}: {exc}"
352
                .format(yaml_file=yaml_file, exc=str(exc)))
353
            raise ValueError(msg)
354
355 2
        if yaml_data:
356
            msg = (
357
                "Unparsed YAML data in '{yaml_file}': {keys}"
358
                .format(yaml_file=yaml_file, keys=list(yaml_data.keys())))
359
            raise RuntimeError(msg)
360
361 2
        if not processed_data.get("definition_location", ""):
362 2
            processed_data["definition_location"] = yaml_file
363
364 2
        processed_data["id_"] = entity_id
365
366 2
        return processed_data
367
368 2
    @classmethod
369 2
    def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None):
370 2
        yaml_file = os.path.normpath(yaml_file)
371
372 2
        local_env_yaml = None
373 2
        if env_yaml:
374 2
            local_env_yaml = dict()
375 2
            local_env_yaml.update(env_yaml)
376
377 2
        try:
378 2
            data_dict = cls.parse_yaml_into_processed_dict(yaml_file, local_env_yaml, product_cpes)
379
        except DocumentationNotComplete as exc:
380
            raise
381
        except Exception as exc:
382
            msg = (
383
                "Error loading a {class_name} from {filename}: {error}"
384
                .format(class_name=cls.__name__, filename=yaml_file, error=str(exc)))
385
            raise RuntimeError(msg)
386
387 2
        result = cls.get_instance_from_full_dict(data_dict)
388
389 2
        return result
390
391 2
    def represent_as_dict(self):
392
        """
393
        Produce a dict representation of the class.
394
395
        Extend this method if you need the representation to be different from the object.
396
        """
397 2
        data = dict()
398 2
        for key in self.KEYS:
399 2
            value = getattr(self, key)
400 2
            if value or True:
401 2
                data[key] = getattr(self, key)
402 2
        del data["id_"]
403 2
        return data
404
405 2
    def dump_yaml(self, file_name, documentation_complete=True):
406
        to_dump = self.represent_as_dict()
407
        to_dump["documentation_complete"] = documentation_complete
408
        with open(file_name, "w+") as f:
409
            dump_yaml_preferably_in_original_order(to_dump, f)
410
411 2
    def to_xml_element(self):
412
        raise NotImplementedError()
413
414
415 2
class Profile(XCCDFEntity, SelectionHandler):
416
    """Represents XCCDF profile
417
    """
418 2
    KEYS = dict(
419
        title=lambda: "",
420
        description=lambda: "",
421
        extends=lambda: "",
422
        metadata=lambda: None,
423
        reference=lambda: None,
424
        selections=lambda: list(),
425
        platforms=lambda: set(),
426
        cpe_names=lambda: set(),
427
        platform=lambda: None,
428
        filter_rules=lambda: "",
429
        ** XCCDFEntity.KEYS
430
    )
431
432 2
    MANDATORY_KEYS = {
433
        "title",
434
        "description",
435
        "selections",
436
    }
437
438 2
    @classmethod
439
    def process_input_dict(cls, input_contents, env_yaml, product_cpes):
440 2
        input_contents = super(Profile, cls).process_input_dict(input_contents, env_yaml)
441
442 2
        platform = input_contents.get("platform")
443 2
        if platform is not None:
444
            input_contents["platforms"].add(platform)
445
446 2
        if env_yaml:
447 2
            for platform in input_contents["platforms"]:
448
                try:
449
                    new_cpe_name = product_cpes.get_cpe_name(platform)
450
                    input_contents["cpe_names"].add(new_cpe_name)
451
                except CPEDoesNotExist:
452
                    msg = (
453
                        "Unsupported platform '{platform}' in a profile."
454
                        .format(platform=platform))
455
                    raise CPEDoesNotExist(msg)
456
457 2
        return input_contents
458
459 2
    @property
460
    def rule_filter(self):
461
        if self.filter_rules:
462
            return rule_filter_from_def(self.filter_rules)
463
        else:
464
            return noop_rule_filterfunc
465
466 2
    def to_xml_element(self):
467
        element = ET.Element('Profile')
468
        element.set("id", self.id_)
469
        if self.extends:
470
            element.set("extends", self.extends)
471
        title = add_sub_element(element, "title", self.title)
472
        title.set("override", "true")
473
        desc = add_sub_element(element, "description", self.description)
474
        desc.set("override", "true")
475
476
        if self.reference:
477
            add_sub_element(element, "reference", escape(self.reference))
478
479
        for cpe_name in self.cpe_names:
480
            plat = ET.SubElement(element, "platform")
481
            plat.set("idref", cpe_name)
482
483
        for selection in self.selected:
484
            select = ET.Element("select")
485
            select.set("idref", selection)
486
            select.set("selected", "true")
487
            element.append(select)
488
489
        for selection in self.unselected:
490
            unselect = ET.Element("select")
491
            unselect.set("idref", selection)
492
            unselect.set("selected", "false")
493
            element.append(unselect)
494
495
        for value_id, selector in self.variables.items():
496
            refine_value = ET.Element("refine-value")
497
            refine_value.set("idref", value_id)
498
            refine_value.set("selector", selector)
499
            element.append(refine_value)
500
501
        for refined_rule, refinement_list in self.refine_rules.items():
502
            refine_rule = ET.Element("refine-rule")
503
            refine_rule.set("idref", refined_rule)
504
            for refinement in refinement_list:
505
                refine_rule.set(refinement[0], refinement[1])
506
            element.append(refine_rule)
507
508
        return element
509
510 2
    def get_rule_selectors(self):
511 2
        return self.selected + self.unselected
512
513 2
    def get_variable_selectors(self):
514 2
        return self.variables
515
516 2
    def validate_refine_rules(self, rules):
517
        existing_rule_ids = [r.id_ for r in rules]
518
        for refine_rule, refinement_list in self.refine_rules.items():
519
            # Take first refinement to ilustrate where the error is
520
            # all refinements in list are invalid, so it doesn't really matter
521
            a_refinement = refinement_list[0]
522
523
            if refine_rule not in existing_rule_ids:
524
                msg = (
525
                    "You are trying to refine a rule that doesn't exist. "
526
                    "Rule '{rule_id}' was not found in the benchmark. "
527
                    "Please check all rule refinements for rule: '{rule_id}', for example: "
528
                    "- {rule_id}.{property_}={value}' in profile {profile_id}."
529
                    .format(rule_id=refine_rule, profile_id=self.id_,
530
                            property_=a_refinement[0], value=a_refinement[1])
531
                    )
532
                raise ValueError(msg)
533
534
            if refine_rule not in self.get_rule_selectors():
535
                msg = ("- {rule_id}.{property_}={value}' in profile '{profile_id}' is refining "
536
                       "a rule that is not selected by it. The refinement will not have any "
537
                       "noticeable effect. Either select the rule or remove the rule refinement."
538
                       .format(rule_id=refine_rule, property_=a_refinement[0],
539
                               value=a_refinement[1], profile_id=self.id_)
540
                       )
541
                raise ValueError(msg)
542
543 2
    def validate_variables(self, variables):
544
        variables_by_id = dict()
545
        for var in variables:
546
            variables_by_id[var.id_] = var
547
548
        for var_id, our_val in self.variables.items():
549
            if var_id not in variables_by_id:
550
                all_vars_list = [" - %s" % v for v in variables_by_id.keys()]
551
                msg = (
552
                    "Value '{var_id}' in profile '{profile_name}' is not known. "
553
                    "We know only variables:\n{var_names}"
554
                    .format(
555
                        var_id=var_id, profile_name=self.id_,
556
                        var_names="\n".join(sorted(all_vars_list)))
557
                )
558
                raise ValueError(msg)
559
560
            allowed_selectors = [str(s) for s in variables_by_id[var_id].options.keys()]
561
            if our_val not in allowed_selectors:
562
                msg = (
563
                    "Value '{var_id}' in profile '{profile_name}' "
564
                    "uses the selector '{our_val}'. "
565
                    "This is not possible, as only selectors {all_selectors} are available. "
566
                    "Either change the selector used in the profile, or "
567
                    "add the selector-value pair to the variable definition."
568
                    .format(
569
                        var_id=var_id, profile_name=self.id_, our_val=our_val,
570
                        all_selectors=allowed_selectors,
571
                    )
572
                )
573
                raise ValueError(msg)
574
575 2
    def validate_rules(self, rules, groups):
576
        existing_rule_ids = [r.id_ for r in rules]
577
        rule_selectors = self.get_rule_selectors()
578
        for id_ in rule_selectors:
579
            if id_ in groups:
580
                msg = (
581
                    "You have selected a group '{group_id}' instead of a "
582
                    "rule. Groups have no effect in the profile and are not "
583
                    "allowed to be selected. Please remove '{group_id}' "
584
                    "from profile '{profile_id}' before proceeding."
585
                    .format(group_id=id_, profile_id=self.id_)
586
                )
587
                raise ValueError(msg)
588
            if id_ not in existing_rule_ids:
589
                msg = (
590
                    "Rule '{rule_id}' was not found in the benchmark. Please "
591
                    "remove rule '{rule_id}' from profile '{profile_id}' "
592
                    "before proceeding."
593
                    .format(rule_id=id_, profile_id=self.id_)
594
                )
595
                raise ValueError(msg)
596
597 2
    def __sub__(self, other):
598
        profile = Profile(self.id_)
599
        profile.title = self.title
600
        profile.description = self.description
601
        profile.extends = self.extends
602
        profile.platforms = self.platforms
603
        profile.platform = self.platform
604
        profile.selected = list(set(self.selected) - set(other.selected))
605
        profile.selected.sort()
606
        profile.unselected = list(set(self.unselected) - set(other.unselected))
607
        profile.variables = dict ((k, v) for (k, v) in self.variables.items()
608
                             if k not in other.variables or v != other.variables[k])
609
        return profile
610
611
612 2
class ResolvableProfile(Profile):
613 2
    def __init__(self, * args, ** kwargs):
614
        super(ResolvableProfile, self).__init__(* args, ** kwargs)
615
        self.resolved = False
616
617 2
    def _controls_ids_to_controls(self, controls_manager, policy_id, control_id_list):
618
        items = [controls_manager.get_control(policy_id, cid) for cid in control_id_list]
619
        return items
620
621 2
    def resolve_controls(self, controls_manager):
622
        pass
623
624 2
    def extend_by(self, extended_profile):
625
        self.update_with(extended_profile)
626
627 2
    def resolve_selections_with_rules(self, rules_by_id):
628
        selections = set()
629
        for rid in self.selected:
630
            if rid not in rules_by_id:
631
                continue
632
            rule = rules_by_id[rid]
633
            if not self.rule_filter(rule):
634
                continue
635
            selections.add(rid)
636
        self.selected = list(selections)
637
638 2
    def resolve(self, all_profiles, rules_by_id, controls_manager=None):
639
        if self.resolved:
640
            return
641
642
        if controls_manager:
643
            self.resolve_controls(controls_manager)
644
645
        self.resolve_selections_with_rules(rules_by_id)
646
647
        if self.extends:
648
            if self.extends not in all_profiles:
649
                msg = (
650
                    "Profile {name} extends profile {extended}, but "
651
                    "only profiles {known_profiles} are available for resolution."
652
                    .format(name=self.id_, extended=self.extends,
653
                            known_profiles=list(all_profiles.keys())))
654
                raise RuntimeError(msg)
655
            extended_profile = all_profiles[self.extends]
656
            extended_profile.resolve(all_profiles, rules_by_id, controls_manager)
657
658
            self.extend_by(extended_profile)
659
660
        self.selected = [s for s in set(self.selected) if s not in self.unselected]
661
662
        self.unselected = []
663
        self.extends = None
664
665
        self.selected = sorted(self.selected)
666
667
        for rid in self.selected:
668
            if rid not in rules_by_id:
669
                msg = (
670
                    "Rule {rid} is selected by {profile}, but the rule is not available. "
671
                    "This may be caused by a discrepancy of prodtypes."
672
                    .format(rid=rid, profile=self.id_))
673
                raise ValueError(msg)
674
675
        self.resolved = True
676
677
678 2
class ProfileWithInlinePolicies(ResolvableProfile):
679 2
    def __init__(self, * args, ** kwargs):
680
        super(ProfileWithInlinePolicies, self).__init__(* args, ** kwargs)
681
        self.controls_by_policy = defaultdict(list)
682
683 2
    def apply_selection(self, item):
684
        # ":" is the delimiter for controls but not when the item is a variable
685
        if ":" in item and "=" not in item:
686
            policy_id, control_id = item.split(":", 1)
687
            self.controls_by_policy[policy_id].append(control_id)
688
        else:
689
            super(ProfileWithInlinePolicies, self).apply_selection(item)
690
691 2
    def _process_controls_ids_into_controls(self, controls_manager, policy_id, controls_ids):
692
        controls = []
693
        for cid in controls_ids:
694
            if not cid.startswith("all"):
695
                controls.extend(
696
                    self._controls_ids_to_controls(controls_manager, policy_id, [cid]))
697
            elif ":" in cid:
698
                _, level_id = cid.split(":", 1)
699
                controls.extend(
700
                    controls_manager.get_all_controls_of_level(policy_id, level_id))
701
            else:
702
                controls.extend(
703
                    controls_manager.get_all_controls(policy_id))
704
        return controls
705
706 2
    def resolve_controls(self, controls_manager):
707
        for policy_id, controls_ids in self.controls_by_policy.items():
708
            controls = self._process_controls_ids_into_controls(
709
                controls_manager, policy_id, controls_ids)
710
711
            for c in controls:
712
                self.update_with(c)
713
714
715 2
class Value(XCCDFEntity):
716
    """Represents XCCDF Value
717
    """
718 2
    KEYS = dict(
719
        title=lambda: "",
720
        description=lambda: "",
721
        type=lambda: "",
722
        operator=lambda: "equals",
723
        interactive=lambda: False,
724
        options=lambda: dict(),
725
        warnings=lambda: list(),
726
        ** XCCDFEntity.KEYS
727
    )
728
729 2
    MANDATORY_KEYS = {
730
        "title",
731
        "description",
732
        "type",
733
    }
734
735 2
    @classmethod
736 2
    def process_input_dict(cls, input_contents, env_yaml, product_cpes=None):
737 2
        input_contents["interactive"] = (
738
            input_contents.get("interactive", "false").lower() == "true")
739
740 2
        data = super(Value, cls).process_input_dict(input_contents, env_yaml)
741
742 2
        possible_operators = ["equals", "not equal", "greater than",
743
                              "less than", "greater than or equal",
744
                              "less than or equal", "pattern match"]
745
746 2
        if data["operator"] not in possible_operators:
747
            raise ValueError(
748
                "Found an invalid operator value '%s'. "
749
                "Expected one of: %s"
750
                % (data["operator"], ", ".join(possible_operators))
751
            )
752
753 2
        return data
754
755 2
    @classmethod
756 2
    def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None):
757 2
        value = super(Value, cls).from_yaml(yaml_file, env_yaml)
758
759 2
        check_warnings(value)
760
761 2
        return value
762
763 2
    def to_xml_element(self):
764
        value = ET.Element('Value')
765
        value.set('id', self.id_)
766
        value.set('type', self.type)
767
        if self.operator != "equals":  # equals is the default
768
            value.set('operator', self.operator)
769
        if self.interactive:  # False is the default
770
            value.set('interactive', 'true')
771
        title = ET.SubElement(value, 'title')
772
        title.text = self.title
773
        add_sub_element(value, 'description', self.description)
774
        add_warning_elements(value, self.warnings)
775
776
        for selector, option in self.options.items():
777
            # do not confuse Value with big V with value with small v
778
            # value is child element of Value
779
            value_small = ET.SubElement(value, 'value')
780
            # by XCCDF spec, default value is value without selector
781
            if selector != "default":
782
                value_small.set('selector', str(selector))
783
            value_small.text = str(option)
784
785
        return value
786
787 2
    def to_file(self, file_name):
788
        root = self.to_xml_element()
789
        tree = ET.ElementTree(root)
790
        tree.write(file_name)
791
792
793 2
class Benchmark(XCCDFEntity):
794
    """Represents XCCDF Benchmark
795
    """
796 2
    KEYS = dict(
797
        title=lambda: "",
798
        status=lambda: "",
799
        description=lambda: "",
800
        notice_id=lambda: "",
801
        notice_description=lambda: "",
802
        front_matter=lambda: "",
803
        rear_matter=lambda: "",
804
        cpes=lambda: list(),
805
        version=lambda: "",
806
        profiles=lambda: list(),
807
        values=lambda: dict(),
808
        groups=lambda: dict(),
809
        rules=lambda: dict(),
810
        platforms=lambda: dict(),
811
        product_cpe_names=lambda: list(),
812
        ** XCCDFEntity.KEYS
813
    )
814
815 2
    MANDATORY_KEYS = {
816
        "title",
817
        "status",
818
        "description",
819
        "front_matter",
820
        "rear_matter",
821
    }
822
823 2
    GENERIC_FILENAME = "benchmark.yml"
824
825 2
    def load_entities(self, rules_by_id, values_by_id, groups_by_id):
826
        for rid, val in self.rules.items():
827
            if not val:
828
                self.rules[rid] = rules_by_id[rid]
829
830
        for vid, val in self.values.items():
831
            if not val:
832
                self.values[vid] = values_by_id[vid]
833
834
        for gid, val in self.groups.items():
835
            if not val:
836
                self.groups[gid] = groups_by_id[gid]
837
838 2
    @classmethod
839
    def process_input_dict(cls, input_contents, env_yaml, product_cpes):
840
        input_contents["front_matter"] = input_contents["front-matter"]
841
        del input_contents["front-matter"]
842
        input_contents["rear_matter"] = input_contents["rear-matter"]
843
        del input_contents["rear-matter"]
844
845
        data = super(Benchmark, cls).process_input_dict(input_contents, env_yaml, product_cpes)
846
847
        notice_contents = required_key(input_contents, "notice")
848
        del input_contents["notice"]
849
850
        data["notice_id"] = required_key(notice_contents, "id")
851
        del notice_contents["id"]
852
853
        data["notice_description"] = required_key(notice_contents, "description")
854
        del notice_contents["description"]
855
856
        return data
857
858 2
    def represent_as_dict(self):
859
        data = super(Benchmark, cls).represent_as_dict()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable cls does not seem to be defined.
Loading history...
860
        data["rear-matter"] = data["rear_matter"]
861
        del data["rear_matter"]
862
863
        data["front-matter"] = data["front_matter"]
864
        del data["front_matter"]
865
        return data
866
867 2
    @classmethod
868 2
    def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None):
869
        benchmark = super(Benchmark, cls).from_yaml(yaml_file, env_yaml)
870
        if env_yaml:
871
            benchmark.product_cpe_names = product_cpes.get_product_cpe_names()
872
            benchmark.product_cpes = product_cpes
873
            benchmark.id_ = env_yaml["benchmark_id"]
874
            benchmark.version = env_yaml["ssg_version_str"]
875
        else:
876
            benchmark.id_ = "product-name"
877
            benchmark.version = "0.0"
878
879
        return benchmark
880
881 2
    def add_profiles_from_dir(self, dir_, env_yaml, product_cpes):
882
        for dir_item in sorted(os.listdir(dir_)):
883
            dir_item_path = os.path.join(dir_, dir_item)
884
            if not os.path.isfile(dir_item_path):
885
                continue
886
887
            _, ext = os.path.splitext(os.path.basename(dir_item_path))
888
            if ext != '.profile':
889
                sys.stderr.write(
890
                    "Encountered file '%s' while looking for profiles, "
891
                    "extension '%s' is unknown. Skipping..\n"
892
                    % (dir_item, ext)
893
                )
894
                continue
895
896
            try:
897
                new_profile = ProfileWithInlinePolicies.from_yaml(dir_item_path, env_yaml, product_cpes)
898
            except DocumentationNotComplete:
899
                continue
900
            except Exception as exc:
901
                msg = ("Error building profile from '{fname}': '{error}'"
902
                       .format(fname=dir_item_path, error=str(exc)))
903
                raise RuntimeError(msg)
904
            if new_profile is None:
905
                continue
906
907
            self.profiles.append(new_profile)
908
909 2
    def to_xml_element(self, env_yaml=None, product_cpes=None):
910
        root = ET.Element('Benchmark')
911
        root.set('id', self.id_)
912
        root.set('xmlns', "http://checklists.nist.gov/xccdf/1.1")
913
        root.set('xmlns:xsi', 'http://www.w3.org/2001/XMLSchema-instance')
914
        root.set('xsi:schemaLocation',
915
                 'http://checklists.nist.gov/xccdf/1.1 xccdf-1.1.4.xsd')
916
        root.set('style', 'SCAP_1.1')
917
        root.set('resolved', 'true')
918
        root.set('xml:lang', 'en-US')
919
        status = ET.SubElement(root, 'status')
920
        status.set('date', datetime.date.today().strftime("%Y-%m-%d"))
921
        status.text = self.status
922
        add_sub_element(root, "title", self.title)
923
        add_sub_element(root, "description", self.description)
924
        notice = add_sub_element(root, "notice", self.notice_description)
925
        notice.set('id', self.notice_id)
926
        add_sub_element(root, "front-matter", self.front_matter)
927
        add_sub_element(root, "rear-matter", self.rear_matter)
928
        # if there are no platforms, do not output platform-specification at all
929
        if len(self.product_cpes.platforms) > 0:
930
            cpe_platform_spec = ET.Element(
931
                "{%s}platform-specification" % PREFIX_TO_NS["cpe-lang"])
932
            for platform in self.product_cpes.platforms.values():
933
                cpe_platform_spec.append(platform.to_xml_element())
934
            root.append(cpe_platform_spec)
935
936
        # The Benchmark applicability is determined by the CPEs
937
        # defined in the product.yml
938
        for cpe_name in self.product_cpe_names:
939
            plat = ET.SubElement(root, "platform")
940
            plat.set("idref", cpe_name)
941
942
        version = ET.SubElement(root, 'version')
943
        version.text = self.version
944
        version.set('update', SSG_BENCHMARK_LATEST_URI)
945
946
        contributors_file = os.path.join(os.path.dirname(__file__), "../Contributors.xml")
947
        add_benchmark_metadata(root, contributors_file)
948
949
        for profile in self.profiles:
950
            root.append(profile.to_xml_element())
951
952
        for value in self.values.values():
953
            root.append(value.to_xml_element())
954
955
        groups_in_bench = list(self.groups.keys())
956
        priority_order = ["system", "services"]
957
        groups_in_bench = reorder_according_to_ordering(groups_in_bench, priority_order)
958
959
        # Make system group the first, followed by services group
960
        for group_id in groups_in_bench:
961
            group = self.groups.get(group_id)
962
            # Products using application benchmark don't have system or services group
963
            if group is not None:
964
                root.append(group.to_xml_element(env_yaml))
965
966
        for rule in self.rules.values():
967
            root.append(rule.to_xml_element(env_yaml))
968
969
        return root
970
971 2
    def to_file(self, file_name, env_yaml=None):
972
        root = self.to_xml_element(env_yaml)
973
        tree = ET.ElementTree(root)
974
        tree.write(file_name)
975
976 2
    def add_value(self, value):
977
        if value is None:
978
            return
979
        self.values[value.id_] = value
980
981
    # The benchmark is also considered a group, so this function signature needs to match
982
    # Group()'s add_group()
983 2
    def add_group(self, group, env_yaml=None, product_cpes=None):
984
        if group is None:
985
            return
986
        self.groups[group.id_] = group
987
988 2
    def add_rule(self, rule):
989
        if rule is None:
990
            return
991
        self.rules[rule.id_] = rule
992
993 2
    def to_xccdf(self):
994
        """We can easily extend this script to generate a valid XCCDF instead
995
        of SSG SHORTHAND.
996
        """
997
        raise NotImplementedError
998
999 2
    def __str__(self):
1000
        return self.id_
1001
1002
1003 2
class Group(XCCDFEntity):
1004
    """Represents XCCDF Group
1005
    """
1006 2
    ATTRIBUTES_TO_PASS_ON = (
1007
        "platforms",
1008
        "cpe_platform_names",
1009
    )
1010
1011 2
    GENERIC_FILENAME = "group.yml"
1012
1013 2
    KEYS = dict(
1014
        prodtype=lambda: "all",
1015
        title=lambda: "",
1016
        description=lambda: "",
1017
        warnings=lambda: list(),
1018
        requires=lambda: list(),
1019
        conflicts=lambda: list(),
1020
        values=lambda: dict(),
1021
        groups=lambda: dict(),
1022
        rules=lambda: dict(),
1023
        platform=lambda: "",
1024
        platforms=lambda: set(),
1025
        cpe_platform_names=lambda: set(),
1026
        ** XCCDFEntity.KEYS
1027
    )
1028
1029 2
    MANDATORY_KEYS = {
1030
        "title",
1031
        "status",
1032
        "description",
1033
        "front_matter",
1034
        "rear_matter",
1035
    }
1036
1037 2
    @classmethod
1038 2
    def process_input_dict(cls, input_contents, env_yaml, product_cpes=None):
1039
        data = super(Group, cls).process_input_dict(input_contents, env_yaml, product_cpes)
1040
        if data["rules"]:
1041
            rule_ids = data["rules"]
1042
            data["rules"] = {rid: None for rid in rule_ids}
1043
1044
        if data["groups"]:
1045
            group_ids = data["groups"]
1046
            data["groups"] = {gid: None for gid in group_ids}
1047
1048
        if data["values"]:
1049
            value_ids = data["values"]
1050
            data["values"] = {vid: None for vid in value_ids}
1051
1052
        if data["platform"]:
1053
            data["platforms"].add(data["platform"])
1054
1055
        # parse platform definition and get CPEAL platform if cpe_platform_names not already defined
1056
        if data["platforms"] and not data["cpe_platform_names"]:
1057
            for platform in data["platforms"]:
1058
                cpe_platform = Platform.from_text(platform, product_cpes)
1059
                cpe_platform = add_platform_if_not_defined(cpe_platform, product_cpes)
1060
                data["cpe_platform_names"].add(cpe_platform.id_)
1061
        return data
1062
1063 2
    def load_entities(self, rules_by_id, values_by_id, groups_by_id):
1064
        for rid, val in self.rules.items():
1065
            if not val:
1066
                self.rules[rid] = rules_by_id[rid]
1067
1068
        for vid, val in self.values.items():
1069
            if not val:
1070
                self.values[vid] = values_by_id[vid]
1071
1072
        for gid in list(self.groups):
1073
            val = self.groups.get(gid, None)
1074
            if not val:
1075
                try:
1076
                    self.groups[gid] = groups_by_id[gid]
1077
                except KeyError:
1078
                    # Add only the groups we have compiled and loaded
1079
                    del self.groups[gid]
1080
                    pass
1081
1082 2
    def represent_as_dict(self):
1083
        yaml_contents = super(Group, self).represent_as_dict()
1084
1085
        if self.rules:
1086
            yaml_contents["rules"] = sorted(list(self.rules.keys()))
1087
        if self.groups:
1088
            yaml_contents["groups"] = sorted(list(self.groups.keys()))
1089
        if self.values:
1090
            yaml_contents["values"] = sorted(list(self.values.keys()))
1091
1092
        return yaml_contents
1093
1094 2
    def validate_prodtype(self, yaml_file):
1095
        for ptype in self.prodtype.split(","):
1096
            if ptype.strip() != ptype:
1097
                msg = (
1098
                    "Comma-separated '{prodtype}' prodtype "
1099
                    "in {yaml_file} contains whitespace."
1100
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
1101
                raise ValueError(msg)
1102
1103 2
    def to_xml_element(self, env_yaml=None):
1104
        group = ET.Element('Group')
1105
        group.set('id', self.id_)
1106
        title = ET.SubElement(group, 'title')
1107
        title.text = self.title
1108
        add_sub_element(group, 'description', self.description)
1109
        add_warning_elements(group, self.warnings)
1110
1111
        # This is where references should be put if there are any
1112
        # This is where rationale should be put if there are any
1113
1114
        for cpe_platform_name in self.cpe_platform_names:
1115
            platform_el = ET.SubElement(group, "platform")
1116
            platform_el.set("idref", "#"+cpe_platform_name)
1117
1118
        add_nondata_subelements(group, "requires", "idref", self.requires)
1119
        add_nondata_subelements(group, "conflicts", "idref", self.conflicts)
1120
1121
        for _value in self.values.values():
1122
            group.append(_value.to_xml_element())
1123
1124
        # Rules that install or remove packages affect remediation
1125
        # of other rules.
1126
        # When packages installed/removed rules come first:
1127
        # The Rules are ordered in more logical way, and
1128
        # remediation order is natural, first the package is installed, then configured.
1129
        rules_in_group = list(self.rules.keys())
1130
        regex = (r'(package_.*_(installed|removed))|' +
1131
                 r'(service_.*_(enabled|disabled))|' +
1132
                 r'install_smartcard_packages|' +
1133
                 r'sshd_set_keepalive(_0)?|' +
1134
                 r'sshd_set_idle_timeout$')
1135
        priority_order = ["installed", "install_smartcard_packages", "removed",
1136
                          "enabled", "disabled", "sshd_set_keepalive_0",
1137
                          "sshd_set_keepalive", "sshd_set_idle_timeout"]
1138
        rules_in_group = reorder_according_to_ordering(rules_in_group, priority_order, regex)
1139
1140
        # Add rules in priority order, first all packages installed, then removed,
1141
        # followed by services enabled, then disabled
1142
        for rule_id in rules_in_group:
1143
            group.append(self.rules.get(rule_id).to_xml_element(env_yaml))
1144
1145
        # Add the sub groups after any current level group rules.
1146
        # As package installed/removed and service enabled/disabled rules are usuallly in
1147
        # top level group, this ensures groups that further configure a package or service
1148
        # are after rules that install or remove it.
1149
        groups_in_group = list(self.groups.keys())
1150
        priority_order = [
1151
            # Make sure rpm_verify_(hashes|permissions|ownership) are run before any other rule.
1152
            # Due to conflicts between rules rpm_verify_* rules and any rule that configures
1153
            # stricter settings, like file_permissions_grub2_cfg and sudo_dedicated_group,
1154
            # the rules deviating from the system default should be evaluated later.
1155
            # So that in the end the system has contents, permissions and ownership reset, and
1156
            # any deviations or stricter settings are applied by the rules in the profile.
1157
            "software", "integrity", "integrity-software", "rpm_verification",
1158
1159
            # The account group has to precede audit group because
1160
            # the rule package_screen_installed is desired to be executed before the rule
1161
            # audit_rules_privileged_commands, othervise the rule
1162
            # does not catch newly installed screen binary during remediation
1163
            # and report fail
1164
            "accounts", "auditing",
1165
1166
1167
            # The FIPS group should come before Crypto,
1168
            # if we want to set a different (stricter) Crypto Policy than FIPS.
1169
            "fips", "crypto",
1170
1171
            # The firewalld_activation must come before ruleset_modifications, othervise
1172
            # remediations for ruleset_modifications won't work
1173
            "firewalld_activation", "ruleset_modifications",
1174
1175
            # Rules from group disabling_ipv6 must precede rules from configuring_ipv6,
1176
            # otherwise the remediation prints error although it is successful
1177
            "disabling_ipv6", "configuring_ipv6"
1178
        ]
1179
        groups_in_group = reorder_according_to_ordering(groups_in_group, priority_order)
1180
        for group_id in groups_in_group:
1181
            _group = self.groups[group_id]
1182
            group.append(_group.to_xml_element(env_yaml))
1183
1184
        return group
1185
1186 2
    def to_file(self, file_name):
1187
        root = self.to_xml_element()
1188
        tree = ET.ElementTree(root)
1189
        tree.write(file_name)
1190
1191 2
    def add_value(self, value):
1192
        if value is None:
1193
            return
1194
        self.values[value.id_] = value
1195
1196 2
    def add_group(self, group, env_yaml=None, product_cpes=None):
1197
        if group is None:
1198
            return
1199
        if self.platforms and not group.platforms:
1200
            group.platforms = self.platforms
1201
        self.groups[group.id_] = group
1202
        self._pass_our_properties_on_to(group)
1203
1204
        # Once the group has inherited properties, update cpe_names
1205
        if env_yaml:
1206
            for platform in group.platforms:
1207
                cpe_platform = Platform.from_text(platform, product_cpes)
1208
                cpe_platform = add_platform_if_not_defined(cpe_platform, product_cpes)
1209
                group.cpe_platform_names.add(cpe_platform.id_)
1210
1211 2
    def _pass_our_properties_on_to(self, obj):
1212
        for attr in self.ATTRIBUTES_TO_PASS_ON:
1213
            if hasattr(obj, attr) and getattr(obj, attr) is None:
1214
                setattr(obj, attr, getattr(self, attr))
1215
1216 2
    def add_rule(self, rule, env_yaml=None, product_cpes=None):
1217
        if rule is None:
1218
            return
1219
        if self.platforms and not rule.platforms:
1220
            rule.platforms = self.platforms
1221
        self.rules[rule.id_] = rule
1222
        self._pass_our_properties_on_to(rule)
1223
1224
        # Once the rule has inherited properties, update cpe_platform_names
1225
        if env_yaml:
1226
            for platform in rule.platforms:
1227
                cpe_platform = Platform.from_text(platform, product_cpes)
1228
                cpe_platform = add_platform_if_not_defined(cpe_platform, product_cpes)
1229
                rule.cpe_platform_names.add(cpe_platform.id_)
1230
1231 2
    def __str__(self):
1232
        return self.id_
1233
1234
1235 2
def noop_rule_filterfunc(rule):
1236
    return True
1237
1238 2
def rule_filter_from_def(filterdef):
1239
    if filterdef is None or filterdef == "":
1240
        return noop_rule_filterfunc
1241
1242
    def filterfunc(rule):
1243
        # Remove globals for security and only expose
1244
        # variables relevant to the rule
1245
        return eval(filterdef, {"__builtins__": None}, rule.__dict__)
1246
    return filterfunc
1247
1248
1249 2
class Rule(XCCDFEntity):
1250
    """Represents XCCDF Rule
1251
    """
1252 2
    KEYS = dict(
1253
        prodtype=lambda: "all",
1254
        title=lambda: "",
1255
        description=lambda: "",
1256
        rationale=lambda: "",
1257
        severity=lambda: "",
1258
        references=lambda: dict(),
1259
        identifiers=lambda: dict(),
1260
        ocil_clause=lambda: None,
1261
        ocil=lambda: None,
1262
        oval_external_content=lambda: None,
1263
        fix=lambda: "",
1264
        warnings=lambda: list(),
1265
        conflicts=lambda: list(),
1266
        requires=lambda: list(),
1267
        platform=lambda: None,
1268
        platforms=lambda: set(),
1269
        sce_metadata=lambda: dict(),
1270
        inherited_platforms=lambda: list(),
1271
        template=lambda: None,
1272
        cpe_platform_names=lambda: set(),
1273
        inherited_cpe_platform_names=lambda: list(),
1274
        bash_conditional=lambda: None,
1275
        ** XCCDFEntity.KEYS
1276
    )
1277
1278 2
    MANDATORY_KEYS = {
1279
        "title",
1280
        "description",
1281
        "rationale",
1282
        "severity",
1283
    }
1284
1285 2
    GENERIC_FILENAME = "rule.yml"
1286 2
    ID_LABEL = "rule_id"
1287
1288 2
    PRODUCT_REFERENCES = ("stigid", "cis",)
1289 2
    GLOBAL_REFERENCES = ("srg", "vmmsrg", "disa", "cis-csc",)
1290
1291 2
    def __init__(self, id_):
1292 2
        super(Rule, self).__init__(id_)
1293 2
        self.sce_metadata = None
1294
1295 2
    def __deepcopy__(self, memo):
1296
        cls = self.__class__
1297
        result = cls.__new__(cls)
1298
        memo[id(self)] = result
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable id does not seem to be defined.
Loading history...
1299
        for k, v in self.__dict__.items():
1300
            # These are difficult to deep copy, so let's just re-use them.
1301
            if k != "template" and k != "local_env_yaml":
1302
                setattr(result, k, deepcopy(v, memo))
1303
            else:
1304
                setattr(result, k, v)
1305
        return result
1306
1307 2
    @classmethod
1308 2
    def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None, sce_metadata=None):
1309 2
        rule = super(Rule, cls).from_yaml(yaml_file, env_yaml, product_cpes)
1310
1311
        # platforms are read as list from the yaml file
1312
        # we need them to convert to set again
1313 2
        rule.platforms = set(rule.platforms)
1314
1315
        # rule.platforms.update(set(rule.inherited_platforms))
1316
1317 2
        check_warnings(rule)
1318
1319
        # ensure that content of rule.platform is in rule.platforms as
1320
        # well
1321 2
        if rule.platform is not None:
1322 2
            rule.platforms.add(rule.platform)
1323
1324
        # Convert the platform names to CPE names
1325
        # But only do it if an env_yaml was specified (otherwise there would be no product CPEs
1326
        # to lookup), and the rule's prodtype matches the product being built
1327
        # also if the rule already has cpe_platform_names specified (compiled rule)
1328
        # do not evaluate platforms again
1329 2
        if (
1330
                env_yaml and env_yaml["product"] in parse_prodtype(rule.prodtype)
1331
                or env_yaml and rule.prodtype == "all") and product_cpes and not rule.cpe_platform_names:
1332
            # parse platform definition and get CPEAL platform
1333
            for platform in rule.platforms:
1334
                cpe_platform = Platform.from_text(platform, product_cpes)
1335
                cpe_platform = add_platform_if_not_defined(cpe_platform, product_cpes)
1336
                rule.cpe_platform_names.add(cpe_platform.id_)
1337
1338
1339 2
        if sce_metadata and rule.id_ in sce_metadata:
1340
            rule.sce_metadata = sce_metadata[rule.id_]
1341
            rule.sce_metadata["relative_path"] = os.path.join(
1342
                env_yaml["product"], "checks/sce", rule.sce_metadata['filename'])
1343
1344 2
        rule.validate_prodtype(yaml_file)
1345 2
        rule.validate_identifiers(yaml_file)
1346 2
        rule.validate_references(yaml_file)
1347 2
        return rule
1348
1349 2
    def _verify_stigid_format(self, product):
1350 2
        stig_id = self.references.get("stigid", None)
1351 2
        if not stig_id:
1352 2
            return
1353 2
        if "," in stig_id:
1354 2
            raise ValueError("Rules can not have multiple STIG IDs.")
1355
1356 2
    def _verify_disa_cci_format(self):
1357 2
        cci_id = self.references.get("disa", None)
1358 2
        if not cci_id:
1359 2
            return
1360
        cci_ex = re.compile(r'^CCI-[0-9]{6}$')
1361
        for cci in cci_id.split(","):
1362
            if not cci_ex.match(cci):
1363
                raise ValueError("CCI '{}' is in the wrong format! "
1364
                                 "Format should be similar to: "
1365
                                 "CCI-XXXXXX".format(cci))
1366
        self.references["disa"] = cci_id
1367
1368 2
    def normalize(self, product):
1369 2
        try:
1370 2
            self.make_refs_and_identifiers_product_specific(product)
1371 2
            self.make_template_product_specific(product)
1372 2
        except Exception as exc:
1373 2
            msg = (
1374
                "Error normalizing '{rule}': {msg}"
1375
                .format(rule=self.id_, msg=str(exc))
1376
            )
1377 2
            raise RuntimeError(msg)
1378
1379 2
    def add_stig_references(self, stig_references):
1380
        stig_id = self.references.get("stigid", None)
1381
        if not stig_id:
1382
            return
1383
        reference = stig_references.get(stig_id, None)
1384
        if not reference:
1385
            return
1386
        self.references["stigref"] = reference
1387
1388 2
    def _get_product_only_references(self):
1389 2
        product_references = dict()
1390
1391 2
        for ref in Rule.PRODUCT_REFERENCES:
1392 2
            start = "{0}@".format(ref)
1393 2
            for gref, gval in self.references.items():
1394 2
                if ref == gref or gref.startswith(start):
1395 2
                    product_references[gref] = gval
1396 2
        return product_references
1397
1398 2
    def make_template_product_specific(self, product):
1399 2
        product_suffix = "@{0}".format(product)
1400
1401 2
        if not self.template:
1402
            return
1403
1404 2
        not_specific_vars = self.template.get("vars", dict())
1405 2
        specific_vars = self._make_items_product_specific(
1406
            not_specific_vars, product_suffix, True)
1407 2
        self.template["vars"] = specific_vars
1408
1409 2
        not_specific_backends = self.template.get("backends", dict())
1410 2
        specific_backends = self._make_items_product_specific(
1411
            not_specific_backends, product_suffix, True)
1412 2
        self.template["backends"] = specific_backends
1413
1414 2
    def make_refs_and_identifiers_product_specific(self, product):
1415 2
        product_suffix = "@{0}".format(product)
1416
1417 2
        product_references = self._get_product_only_references()
1418 2
        general_references = self.references.copy()
1419 2
        for todel in product_references:
1420 2
            general_references.pop(todel)
1421 2
        for ref in Rule.PRODUCT_REFERENCES:
1422 2
            if ref in general_references:
1423
                msg = "Unexpected reference identifier ({0}) without "
1424
                msg += "product qualifier ({0}@{1}) while building rule "
1425
                msg += "{2}"
1426
                msg = msg.format(ref, product, self.id_)
1427
                raise ValueError(msg)
1428
1429 2
        to_set = dict(
1430
            identifiers=(self.identifiers, False),
1431
            general_references=(general_references, True),
1432
            product_references=(product_references, False),
1433
        )
1434 2
        for name, (dic, allow_overwrites) in to_set.items():
1435 2
            try:
1436 2
                new_items = self._make_items_product_specific(
1437
                    dic, product_suffix, allow_overwrites)
1438 2
            except ValueError as exc:
1439 2
                msg = (
1440
                    "Error processing {what} for rule '{rid}': {msg}"
1441
                    .format(what=name, rid=self.id_, msg=str(exc))
1442
                )
1443 2
                raise ValueError(msg)
1444 2
            dic.clear()
1445 2
            dic.update(new_items)
1446
1447 2
        self.references = general_references
1448 2
        self._verify_disa_cci_format()
1449 2
        self.references.update(product_references)
1450
1451 2
        self._verify_stigid_format(product)
1452
1453 2
    def _make_items_product_specific(self, items_dict, product_suffix, allow_overwrites=False):
1454 2
        new_items = dict()
1455 2
        for full_label, value in items_dict.items():
1456 2
            if "@" not in full_label and full_label not in new_items:
1457 2
                new_items[full_label] = value
1458 2
                continue
1459
1460 2
            label = full_label.split("@")[0]
1461
1462
            # this test should occur before matching product_suffix with the product qualifier
1463
            # present in the reference, so it catches problems even for products that are not
1464
            # being built at the moment
1465 2
            if label in Rule.GLOBAL_REFERENCES:
1466
                msg = (
1467
                    "You cannot use product-qualified for the '{item_u}' reference. "
1468
                    "Please remove the product-qualifier and merge values with the "
1469
                    "existing reference if there is any. Original line: {item_q}: {value_q}"
1470
                    .format(item_u=label, item_q=full_label, value_q=value)
1471
                )
1472
                raise ValueError(msg)
1473
1474 2
            if not full_label.endswith(product_suffix):
1475 2
                continue
1476
1477 2
            if label in items_dict and not allow_overwrites and value != items_dict[label]:
1478 2
                msg = (
1479
                    "There is a product-qualified '{item_q}' item, "
1480
                    "but also an unqualified '{item_u}' item "
1481
                    "and those two differ in value - "
1482
                    "'{value_q}' vs '{value_u}' respectively."
1483
                    .format(item_q=full_label, item_u=label,
1484
                            value_q=value, value_u=items_dict[label])
1485
                )
1486 2
                raise ValueError(msg)
1487 2
            new_items[label] = value
1488 2
        return new_items
1489
1490 2
    def validate_identifiers(self, yaml_file):
1491 2
        if self.identifiers is None:
1492
            raise ValueError("Empty identifier section in file %s" % yaml_file)
1493
1494
        # Validate all identifiers are non-empty:
1495 2
        for ident_type, ident_val in self.identifiers.items():
1496 2
            if not isinstance(ident_type, str) or not isinstance(ident_val, str):
1497
                raise ValueError("Identifiers and values must be strings: %s in file %s"
1498
                                 % (ident_type, yaml_file))
1499 2
            if ident_val.strip() == "":
1500
                raise ValueError("Identifiers must not be empty: %s in file %s"
1501
                                 % (ident_type, yaml_file))
1502 2
            if ident_type[0:3] == 'cce':
1503 2
                if not is_cce_format_valid(ident_val):
1504
                    raise ValueError("CCE Identifier format must be valid: invalid format '%s' for CEE '%s'"
1505
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
1506 2
                if not is_cce_value_valid("CCE-" + ident_val):
1507
                    raise ValueError("CCE Identifier value is not a valid checksum: invalid value '%s' for CEE '%s'"
1508
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
1509
1510 2
    def validate_references(self, yaml_file):
1511 2
        if self.references is None:
1512
            raise ValueError("Empty references section in file %s" % yaml_file)
1513
1514 2
        for ref_type, ref_val in self.references.items():
1515 2
            if not isinstance(ref_type, str) or not isinstance(ref_val, str):
1516
                raise ValueError("References and values must be strings: %s in file %s"
1517
                                 % (ref_type, yaml_file))
1518 2
            if ref_val.strip() == "":
1519
                raise ValueError("References must not be empty: %s in file %s"
1520
                                 % (ref_type, yaml_file))
1521
1522 2
        for ref_type, ref_val in self.references.items():
1523 2
            for ref in ref_val.split(","):
1524 2
                if ref.strip() != ref:
1525
                    msg = (
1526
                        "Comma-separated '{ref_type}' reference "
1527
                        "in {yaml_file} contains whitespace."
1528
                        .format(ref_type=ref_type, yaml_file=yaml_file))
1529
                    raise ValueError(msg)
1530
1531 2
    def validate_prodtype(self, yaml_file):
1532 2
        for ptype in self.prodtype.split(","):
1533 2
            if ptype.strip() != ptype:
1534
                msg = (
1535
                    "Comma-separated '{prodtype}' prodtype "
1536
                    "in {yaml_file} contains whitespace."
1537
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
1538
                raise ValueError(msg)
1539
1540 2
    def to_xml_element(self, env_yaml=None):
1541
        rule = ET.Element('Rule')
1542
        rule.set('selected', 'false')
1543
        rule.set('id', self.id_)
1544
        rule.set('severity', self.severity)
1545
        add_sub_element(rule, 'title', self.title)
1546
        add_sub_element(rule, 'description', self.description)
1547
        add_warning_elements(rule, self.warnings)
1548
1549
        if env_yaml:
1550
            ref_uri_dict = env_yaml['reference_uris']
1551
        else:
1552
            ref_uri_dict = SSG_REF_URIS
1553
        add_reference_elements(rule, self.references, ref_uri_dict)
1554
1555
        add_sub_element(rule, 'rationale', self.rationale)
1556
1557
        for cpe_platform_name in sorted(self.cpe_platform_names):
1558
            platform_el = ET.SubElement(rule, "platform")
1559
            platform_el.set("idref", "#"+cpe_platform_name)
1560
1561
        add_nondata_subelements(rule, "requires", "idref", self.requires)
1562
        add_nondata_subelements(rule, "conflicts", "idref", self.conflicts)
1563
1564
        for ident_type, ident_val in self.identifiers.items():
1565
            ident = ET.SubElement(rule, 'ident')
1566
            if ident_type == 'cce':
1567
                ident.set('system', cce_uri)
1568
                ident.text = ident_val
1569
1570
        ocil_parent = rule
1571
        check_parent = rule
1572
1573
        if self.sce_metadata:
1574
            # TODO: This is pretty much another hack, just like the previous OVAL
1575
            # one. However, we avoided the external SCE content as I'm not sure it
1576
            # is generally useful (unlike say, CVE checking with external OVAL)
1577
            #
1578
            # Additionally, we build the content (check subelement) here rather
1579
            # than in xslt due to the nature of our SCE metadata.
1580
            #
1581
            # Finally, before we begin, we might have an element with both SCE
1582
            # and OVAL. We have no way of knowing (right here) whether that is
1583
            # the case (due to a variety of issues, most notably, that linking
1584
            # hasn't yet occurred). So we must rely on the content author's
1585
            # good will, by annotating SCE content with a complex-check tag
1586
            # if necessary.
1587
1588
            if 'complex-check' in self.sce_metadata:
1589
                # Here we have an issue: XCCDF allows EITHER one or more check
1590
                # elements OR a single complex-check. While we have an explicit
1591
                # case handling the OVAL-and-SCE interaction, OCIL entries have
1592
                # (historically) been alongside OVAL content and been in an
1593
                # "OR" manner -- preferring OVAL to SCE. In order to accomplish
1594
                # this, we thus need to add _yet another parent_ when OCIL data
1595
                # is present, and add update ocil_parent accordingly.
1596
                if self.ocil or self.ocil_clause:
1597
                    ocil_parent = ET.SubElement(ocil_parent, "complex-check")
1598
                    ocil_parent.set('operator', 'OR')
1599
1600
                check_parent = ET.SubElement(ocil_parent, "complex-check")
1601
                check_parent.set('operator', self.sce_metadata['complex-check'])
1602
1603
            # Now, add the SCE check element to the tree.
1604
            check = ET.SubElement(check_parent, "check")
1605
            check.set("system", SCE_SYSTEM)
1606
1607
            if 'check-import' in self.sce_metadata:
1608
                if isinstance(self.sce_metadata['check-import'], str):
1609
                    self.sce_metadata['check-import'] = [self.sce_metadata['check-import']]
1610
                for entry in self.sce_metadata['check-import']:
1611
                    check_import = ET.SubElement(check, 'check-import')
1612
                    check_import.set('import-name', entry)
1613
                    check_import.text = None
1614
1615
            if 'check-export' in self.sce_metadata:
1616
                if isinstance(self.sce_metadata['check-export'], str):
1617
                    self.sce_metadata['check-export'] = [self.sce_metadata['check-export']]
1618
                for entry in self.sce_metadata['check-export']:
1619
                    export, value = entry.split('=')
1620
                    check_export = ET.SubElement(check, 'check-export')
1621
                    check_export.set('value-id', value)
1622
                    check_export.set('export-name', export)
1623
                    check_export.text = None
1624
1625
            check_ref = ET.SubElement(check, "check-content-ref")
1626
            href = self.sce_metadata['relative_path']
1627
            check_ref.set("href", href)
1628
1629
        check = ET.SubElement(check_parent, 'check')
1630
        check.set("system", oval_namespace)
1631
        check_content_ref = ET.SubElement(check, "check-content-ref")
1632
        if self.oval_external_content:
1633
            check_content_ref.set("href", self.oval_external_content)
1634
        else:
1635
            # TODO: This is pretty much a hack, oval ID will be the same as rule ID
1636
            #       and we don't want the developers to have to keep them in sync.
1637
            #       Therefore let's just add an OVAL ref of that ID.
1638
            # TODO  Can we not add the check element if the rule doesn't have an OVAL check?
1639
            #       At the moment, the check elements of rules without OVAL are removed by
1640
            #       relabel_ids.py
1641
            check_content_ref.set("href", "oval-unlinked.xml")
1642
            check_content_ref.set("name", self.id_)
1643
1644
        if self.ocil or self.ocil_clause:
1645
            ocil_check = ET.SubElement(check_parent, "check")
1646
            ocil_check.set("system", ocil_cs)
1647
            ocil_check_ref = ET.SubElement(ocil_check, "check-content-ref")
1648
            ocil_check_ref.set("href", "ocil-unlinked.xml")
1649
            ocil_check_ref.set("name", self.id_ + "_ocil")
1650
1651
        return rule
1652
1653 2
    def to_file(self, file_name):
1654
        root = self.to_xml_element()
1655
        tree = ET.ElementTree(root)
1656
        tree.write(file_name)
1657
1658 2
    def to_ocil(self):
1659
        if not self.ocil and not self.ocil_clause:
1660
            raise ValueError("Rule {0} doesn't have OCIL".format(self.id_))
1661
        # Create <questionnaire> for the rule
1662
        questionnaire = ET.Element("questionnaire", id=self.id_ + "_ocil")
1663
        title = ET.SubElement(questionnaire, "title")
1664
        title.text = self.title
1665
        actions = ET.SubElement(questionnaire, "actions")
1666
        test_action_ref = ET.SubElement(actions, "test_action_ref")
1667
        test_action_ref.text = self.id_ + "_action"
1668
        # Create <boolean_question_test_action> for the rule
1669
        action = ET.Element(
1670
            "boolean_question_test_action",
1671
            id=self.id_ + "_action",
1672
            question_ref=self.id_ + "_question")
1673
        when_true = ET.SubElement(action, "when_true")
1674
        result = ET.SubElement(when_true, "result")
1675
        result.text = "PASS"
1676
        when_true = ET.SubElement(action, "when_false")
1677
        result = ET.SubElement(when_true, "result")
1678
        result.text = "FAIL"
1679
        # Create <boolean_question>
1680
        boolean_question = ET.Element(
1681
            "boolean_question", id=self.id_ + "_question")
1682
        # TODO: The contents of <question_text> element used to be broken in
1683
        # the legacy XSLT implementation. The following code contains hacks
1684
        # to get the same results as in the legacy XSLT implementation.
1685
        # This enabled us a smooth transition to new OCIL generator
1686
        # without a need to mass-edit rule YAML files.
1687
        # We need to solve:
1688
        # TODO: using variables (aka XCCDF Values) in OCIL content
1689
        # TODO: using HTML formating tags eg. <pre> in OCIL content
1690
        #
1691
        # The "ocil" key in compiled rules contains HTML and XML elements
1692
        # but OCIL question texts shouldn't contain HTML or XML elements,
1693
        # therefore removing them.
1694
        if self.ocil is not None:
1695
            ocil_without_tags = re.sub(r"</?[^>]+>", "", self.ocil)
1696
        else:
1697
            ocil_without_tags = ""
1698
        # The "ocil" key in compiled rules contains XML entities which would
1699
        # be escaped by ET.Subelement() so we need to use add_sub_element()
1700
        # instead because we don't want to escape them.
1701
        question_text = add_sub_element(
1702
            boolean_question, "question_text", ocil_without_tags)
1703
        # The "ocil_clause" key in compiled rules also contains HTML and XML
1704
        # elements but unlike the "ocil" we want to escape the '<' and '>'
1705
        # characters.
1706
        # The empty ocil_clause causing broken question is in line with the
1707
        # legacy XSLT implementation.
1708
        ocil_clause = self.ocil_clause if self.ocil_clause else ""
1709
        question_text.text = (
1710
            u"{0}\n      Is it the case that {1}?\n      ".format(
1711
                question_text.text if question_text.text is not None else "",
1712
                ocil_clause))
1713
        return (questionnaire, action, boolean_question)
1714
1715 2
    def __hash__(self):
1716
        """ Controls are meant to be unique, so using the
1717
        ID should suffice"""
1718
        return hash(self.id_)
1719
1720 2
    def __eq__(self, other):
1721
        return isinstance(other, self.__class__) and self.id_ == other.id_
1722
1723 2
    def __ne__(self, other):
1724
        return not self != other
1725
1726 2
    def __lt__(self, other):
1727
        return self.id_ < other.id_
1728
1729 2
    def __str__(self):
1730
        return self.id_
1731
1732
1733 2
class DirectoryLoader(object):
1734 2
    def __init__(self, profiles_dir, env_yaml, product_cpes):
1735
        self.benchmark_file = None
1736
        self.group_file = None
1737
        self.loaded_group = None
1738
        self.rule_files = []
1739
        self.value_files = []
1740
        self.subdirectories = []
1741
1742
        self.all_values = dict()
1743
        self.all_rules = dict()
1744
        self.all_groups = dict()
1745
1746
        self.profiles_dir = profiles_dir
1747
        self.env_yaml = env_yaml
1748
        self.product = env_yaml["product"]
1749
1750
        self.parent_group = None
1751
        self.product_cpes = product_cpes
1752
1753 2
    def _collect_items_to_load(self, guide_directory):
1754
        for dir_item in sorted(os.listdir(guide_directory)):
1755
            dir_item_path = os.path.join(guide_directory, dir_item)
1756
            _, extension = os.path.splitext(dir_item)
1757
1758
            if extension == '.var':
1759
                self.value_files.append(dir_item_path)
1760
            elif dir_item == "benchmark.yml":
1761
                if self.benchmark_file:
1762
                    raise ValueError("Multiple benchmarks in one directory")
1763
                self.benchmark_file = dir_item_path
1764
            elif dir_item == "group.yml":
1765
                if self.group_file:
1766
                    raise ValueError("Multiple groups in one directory")
1767
                self.group_file = dir_item_path
1768
            elif extension == '.rule':
1769
                self.rule_files.append(dir_item_path)
1770
            elif is_rule_dir(dir_item_path):
1771
                self.rule_files.append(get_rule_dir_yaml(dir_item_path))
1772
            elif dir_item != "tests":
1773
                if os.path.isdir(dir_item_path):
1774
                    self.subdirectories.append(dir_item_path)
1775
                else:
1776
                    sys.stderr.write(
1777
                        "Encountered file '%s' while recursing, extension '%s' "
1778
                        "is unknown. Skipping..\n"
1779
                        % (dir_item, extension)
1780
                    )
1781
1782 2
    def load_benchmark_or_group(self, guide_directory):
1783
        """
1784
        Loads a given benchmark or group from the specified benchmark_file or
1785
        group_file, in the context of guide_directory, profiles_dir and env_yaml.
1786
1787
        Returns the loaded group or benchmark.
1788
        """
1789
        group = None
1790
        if self.group_file and self.benchmark_file:
1791
            raise ValueError("A .benchmark file and a .group file were found in "
1792
                             "the same directory '%s'" % (guide_directory))
1793
1794
        # we treat benchmark as a special form of group in the following code
1795
        if self.benchmark_file:
1796
            group = Benchmark.from_yaml(
1797
                self.benchmark_file, self.env_yaml, self.product_cpes
1798
            )
1799
            if self.profiles_dir:
1800
                group.add_profiles_from_dir(self.profiles_dir, self.env_yaml)
1801
1802
        if self.group_file:
1803
            group = Group.from_yaml(self.group_file, self.env_yaml, self.product_cpes)
1804
            prodtypes = parse_prodtype(group.prodtype)
1805
            if "all" in prodtypes or self.product in prodtypes:
1806
                self.all_groups[group.id_] = group
1807
1808
        return group
1809
1810 2
    def _load_group_process_and_recurse(self, guide_directory):
1811
        self.loaded_group = self.load_benchmark_or_group(guide_directory)
1812
1813
        if self.loaded_group:
1814
1815
            if self.parent_group:
1816
                self.parent_group.add_group(
1817
                    self.loaded_group, env_yaml=self.env_yaml, product_cpes=self.product_cpes)
1818
1819
            self._process_values()
1820
            self._recurse_into_subdirs()
1821
            self._process_rules()
1822
1823 2
    def process_directory_tree(self, start_dir, extra_group_dirs=None):
1824
        self._collect_items_to_load(start_dir)
1825
        if extra_group_dirs:
1826
            self.subdirectories += extra_group_dirs
1827
        self._load_group_process_and_recurse(start_dir)
1828
1829 2
    def process_directory_trees(self, directories):
1830
        start_dir = directories[0]
1831
        extra_group_dirs = directories[1:]
1832
        return self.process_directory_tree(start_dir, extra_group_dirs)
1833
1834 2
    def _recurse_into_subdirs(self):
1835
        for subdir in self.subdirectories:
1836
            loader = self._get_new_loader()
1837
            loader.parent_group = self.loaded_group
1838
            loader.process_directory_tree(subdir)
1839
            self.all_values.update(loader.all_values)
1840
            self.all_rules.update(loader.all_rules)
1841
            self.all_groups.update(loader.all_groups)
1842
1843 2
    def _get_new_loader(self):
1844
        raise NotImplementedError()
1845
1846 2
    def _process_values(self):
1847
        raise NotImplementedError()
1848
1849 2
    def _process_rules(self):
1850
        raise NotImplementedError()
1851
1852 2
    def save_all_entities(self, base_dir):
1853
        destdir = os.path.join(base_dir, "rules")
1854
        mkdir_p(destdir)
1855
        if self.all_rules:
1856
            self.save_entities(self.all_rules.values(), destdir)
1857
1858
        destdir = os.path.join(base_dir, "groups")
1859
        mkdir_p(destdir)
1860
        if self.all_groups:
1861
            self.save_entities(self.all_groups.values(), destdir)
1862
1863
        destdir = os.path.join(base_dir, "values")
1864
        mkdir_p(destdir)
1865
        if self.all_values:
1866
            self.save_entities(self.all_values.values(), destdir)
1867
1868
        destdir = os.path.join(base_dir, "platforms")
1869
        mkdir_p(destdir)
1870
        if self.product_cpes.platforms:
1871
            self.save_entities(self.product_cpes.platforms.values(), destdir)
1872
1873 2
    def save_entities(self, entities, destdir):
1874
        if not entities:
1875
            return
1876
        for entity in entities:
1877
            basename = entity.id_ + ".yml"
1878
            dest_filename = os.path.join(destdir, basename)
1879
            entity.dump_yaml(dest_filename)
1880
1881
1882 2
class BuildLoader(DirectoryLoader):
1883 2
    def __init__(
1884
            self, profiles_dir, env_yaml, product_cpes,
1885
            sce_metadata_path=None, stig_reference_path=None):
1886
        super(BuildLoader, self).__init__(profiles_dir, env_yaml, product_cpes)
1887
1888
        self.sce_metadata = None
1889
        if sce_metadata_path and os.path.getsize(sce_metadata_path):
1890
            self.sce_metadata = json.load(open(sce_metadata_path, 'r'))
1891
        self.stig_references = None
1892
        if stig_reference_path:
1893
            self.stig_references = ssg.build_stig.map_versions_to_rule_ids(stig_reference_path)
1894
1895 2
    def _process_values(self):
1896
        for value_yaml in self.value_files:
1897
            value = Value.from_yaml(value_yaml, self.env_yaml)
1898
            self.all_values[value.id_] = value
1899
            self.loaded_group.add_value(value)
1900
1901 2
    def _process_rules(self):
1902
        for rule_yaml in self.rule_files:
1903
            try:
1904
                rule = Rule.from_yaml(
1905
                    rule_yaml, self.env_yaml, self.product_cpes, self.sce_metadata)
1906
            except DocumentationNotComplete:
1907
                # Happens on non-debug build when a rule is "documentation-incomplete"
1908
                continue
1909
            prodtypes = parse_prodtype(rule.prodtype)
1910
            if "all" not in prodtypes and self.product not in prodtypes:
1911
                continue
1912
            self.all_rules[rule.id_] = rule
1913
            self.loaded_group.add_rule(
1914
                rule, env_yaml=self.env_yaml, product_cpes=self.product_cpes)
1915
1916
            if self.loaded_group.cpe_platform_names:
1917
                rule.inherited_cpe_platform_names += self.loaded_group.cpe_platform_names
1918
1919
            rule.normalize(self.env_yaml["product"])
1920
            if self.stig_references:
1921
                rule.add_stig_references(self.stig_references)
1922
1923 2
    def _get_new_loader(self):
1924
        loader = BuildLoader(
1925
            self.profiles_dir, self.env_yaml, self.product_cpes)
1926
        # Do it this way so we only have to parse the SCE metadata once.
1927
        loader.sce_metadata = self.sce_metadata
1928
        # Do it this way so we only have to parse the STIG references once.
1929
        loader.stig_references = self.stig_references
1930
        return loader
1931
1932 2
    def export_group_to_file(self, filename):
1933
        return self.loaded_group.to_file(filename)
1934
1935
1936 2
class LinearLoader(object):
1937 2
    def __init__(self, env_yaml, resolved_path):
1938
        self.resolved_rules_dir = os.path.join(resolved_path, "rules")
1939
        self.rules = dict()
1940
1941
        self.resolved_profiles_dir = os.path.join(resolved_path, "profiles")
1942
        self.profiles = dict()
1943
1944
        self.resolved_groups_dir = os.path.join(resolved_path, "groups")
1945
        self.groups = dict()
1946
1947
        self.resolved_values_dir = os.path.join(resolved_path, "values")
1948
        self.values = dict()
1949
1950
        self.resolved_platforms_dir = os.path.join(resolved_path, "platforms")
1951
        self.platforms = dict()
1952
1953
        self.benchmark = None
1954
        self.env_yaml = env_yaml
1955
        self.product_cpes = ProductCPEs(env_yaml)
1956
1957 2
    def find_first_groups_ids(self, start_dir):
1958
        group_files = glob.glob(os.path.join(start_dir, "*", "group.yml"))
1959
        group_ids = [fname.split(os.path.sep)[-2] for fname in group_files]
1960
        return group_ids
1961
1962 2
    def load_entities_by_id(self, filenames, destination, cls):
1963
        for fname in filenames:
1964
            entity = cls.from_yaml(fname, self.env_yaml, self.product_cpes)
1965
            destination[entity.id_] = entity
1966
1967 2
    def load_benchmark(self, directory):
1968
        self.benchmark = Benchmark.from_yaml(
1969
            os.path.join(directory, "benchmark.yml"), self.env_yaml, self.product_cpes)
1970
1971
        self.benchmark.add_profiles_from_dir(self.resolved_profiles_dir, self.env_yaml, self.product_cpes)
1972
1973
        benchmark_first_groups = self.find_first_groups_ids(directory)
1974
        for gid in benchmark_first_groups:
1975
            try:
1976
                self.benchmark.add_group(self.groups[gid], self.env_yaml, self.product_cpes)
1977
            except KeyError as exc:
1978
                # Add only the groups we have compiled and loaded
1979
                pass
1980
1981 2
    def load_compiled_content(self):
1982
        filenames = glob.glob(os.path.join(self.resolved_rules_dir, "*.yml"))
1983
        self.load_entities_by_id(filenames, self.rules, Rule)
1984
1985
        filenames = glob.glob(os.path.join(self.resolved_groups_dir, "*.yml"))
1986
        self.load_entities_by_id(filenames, self.groups, Group)
1987
1988
        filenames = glob.glob(os.path.join(self.resolved_profiles_dir, "*.yml"))
1989
        self.load_entities_by_id(filenames, self.profiles, Profile)
1990
1991
        filenames = glob.glob(os.path.join(self.resolved_values_dir, "*.yml"))
1992
        self.load_entities_by_id(filenames, self.values, Value)
1993
1994
        filenames = glob.glob(os.path.join(self.resolved_platforms_dir, "*.yml"))
1995
        self.load_entities_by_id(filenames, self.platforms, Platform)
1996
        self.product_cpes.platforms = self.platforms
1997
1998
        for g in self.groups.values():
1999
            g.load_entities(self.rules, self.values, self.groups)
2000
2001 2
    def export_benchmark_to_file(self, filename):
2002
        register_namespaces()
2003
        return self.benchmark.to_file(filename, self.env_yaml)
2004
2005 2
    def export_ocil_to_file(self, filename):
2006
        root = ET.Element('ocil')
2007
        root.set('xmlns:xsi', xsi_namespace)
2008
        root.set("xmlns", ocil_namespace)
2009
        root.set("xmlns:xhtml", xhtml_namespace)
2010
        tree = ET.ElementTree(root)
2011
        generator = ET.SubElement(root, "generator")
2012
        product_name = ET.SubElement(generator, "product_name")
2013
        product_name.text = "build_shorthand.py from SCAP Security Guide"
2014
        product_version = ET.SubElement(generator, "product_version")
2015
        product_version.text = "ssg: " + self.env_yaml["ssg_version_str"]
2016
        schema_version = ET.SubElement(generator, "schema_version")
2017
        schema_version.text = "2.0"
2018
        timestamp_el = ET.SubElement(generator, "timestamp")
2019
        timestamp_el.text = timestamp
2020
        questionnaires = ET.SubElement(root, "questionnaires")
2021
        test_actions = ET.SubElement(root, "test_actions")
2022
        questions = ET.SubElement(root, "questions")
2023
        for rule in self.rules.values():
2024
            if not rule.ocil and not rule.ocil_clause:
2025
                continue
2026
            questionnaire, action, boolean_question = rule.to_ocil()
2027
            questionnaires.append(questionnaire)
2028
            test_actions.append(action)
2029
            questions.append(boolean_question)
2030
        tree.write(filename)
2031
2032
2033 2
class Platform(XCCDFEntity):
2034
2035 2
    KEYS = dict(
2036
        name=lambda: "",
2037
        original_expression=lambda: "",
2038
        xml_content=lambda: "",
2039
        bash_conditional=lambda: "",
2040
        ansible_conditional=lambda: "",
2041
        ** XCCDFEntity.KEYS
2042
    )
2043
2044 2
    MANDATORY_KEYS = [
2045
        "name",
2046
        "xml_content",
2047
        "original_expression",
2048
        "bash_conditional",
2049
        "ansible_conditional"
2050
    ]
2051
2052 2
    prefix = "cpe-lang"
2053 2
    ns = PREFIX_TO_NS[prefix]
2054
2055 2
    @classmethod
2056
    def from_text(cls, expression, product_cpes):
2057 2
        if not product_cpes:
2058
            return None
2059 2
        test = product_cpes.algebra.parse(
2060
            expression, simplify=True)
2061 2
        id = test.as_id()
2062 2
        platform = cls(id)
2063 2
        platform.test = test
2064 2
        platform.test.enrich_with_cpe_info(product_cpes)
2065 2
        platform.name = id
2066 2
        platform.original_expression = expression
2067 2
        platform.xml_content = platform.get_xml()
2068 2
        platform.bash_conditional = platform.test.to_bash_conditional()
2069 2
        platform.ansible_conditional = platform.test.to_ansible_conditional()
2070 2
        return platform
2071
2072 2
    def get_xml(self):
2073 2
        cpe_platform = ET.Element("{%s}platform" % Platform.ns)
2074 2
        cpe_platform.set('id', self.name)
2075
        # in case the platform contains only single CPE name, fake the logical test
2076
        # we have to athere to CPE specification
2077 2
        if isinstance(self.test, CPEALFactRef):
2078 2
            cpe_test = ET.Element("{%s}logical-test" % CPEALLogicalTest.ns)
2079 2
            cpe_test.set('operator', 'AND')
2080 2
            cpe_test.set('negate', 'false')
2081 2
            cpe_test.append(self.test.to_xml_element())
2082 2
            cpe_platform.append(cpe_test)
2083
        else:
2084 2
            cpe_platform.append(self.test.to_xml_element())
2085 2
        xmlstr = ET.tostring(cpe_platform).decode()
2086 2
        return xmlstr
2087
2088 2
    def to_xml_element(self):
2089 2
        return self.xml_content
2090
2091 2
    def to_bash_conditional(self):
2092 2
        return self.bash_conditional
2093
2094 2
    def to_ansible_conditional(self):
2095 2
        return self.ansible_conditional
2096
2097 2
    @classmethod
2098 2
    def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None):
2099 2
        platform = super(Platform, cls).from_yaml(yaml_file, env_yaml)
2100 2
        platform.xml_content = ET.fromstring(platform.xml_content)
2101
        # if we did receive a product_cpes, we can restore also the original test object
2102
        # it can be later used e.g. for comparison
2103 2
        if product_cpes:
2104
            platform.test = product_cpes.algebra.parse(
2105
                platform.original_expression, simplify=True)
2106 2
        return platform
2107
2108 2
    def __eq__(self, other):
2109 2
        if not isinstance(other, Platform):
2110
            return False
2111
        else:
2112 2
            return self.test == other.test
2113
2114
2115 2
def add_platform_if_not_defined(platform, product_cpes):
2116
    # check if the platform is already in the dictionary. If yes, return the existing one
2117
    for p in product_cpes.platforms.values():
2118
        if platform == p:
2119
            return p
2120
    product_cpes.platforms[platform.id_] = platform
2121
    return platform
2122