Test Failed
Push — master ( 36ef11...81e955 )
by Jan
03:06 queued 24s
created

ssg.build_yaml.Platform.__eq__()   A

Complexity

Conditions 2

Size

Total Lines 5
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 2.0625

Importance

Changes 0
Metric Value
cc 2
eloc 4
nop 2
dl 0
loc 5
ccs 3
cts 4
cp 0.75
crap 2.0625
rs 10
c 0
b 0
f 0
1 2
from __future__ import absolute_import
2 2
from __future__ import print_function
3
4 2
from collections import defaultdict
5 2
from copy import deepcopy
6 2
import datetime
7 2
import json
8 2
import os
9 2
import os.path
10 2
import re
11 2
import sys
12 2
from xml.sax.saxutils import escape
13 2
import glob
14
15 2
import yaml
16
17 2
from .build_cpe import CPEDoesNotExist, CPEALLogicalTest, CPEALFactRef
18 2
from .constants import (XCCDF_REFINABLE_PROPERTIES,
19
                        SCE_SYSTEM,
20
                        cce_uri,
21
                        dc_namespace,
22
                        ocil_cs,
23
                        ocil_namespace,
24
                        oval_namespace,
25
                        xhtml_namespace,
26
                        xsi_namespace,
27
                        timestamp,
28
                        SSG_BENCHMARK_LATEST_URI,
29
                        SSG_PROJECT_NAME,
30
                        SSG_REF_URIS,
31
                        PREFIX_TO_NS
32
                        )
33 2
from .rules import get_rule_dir_id, get_rule_dir_yaml, is_rule_dir
34 2
from .rule_yaml import parse_prodtype
35
36 2
from .cce import is_cce_format_valid, is_cce_value_valid
37 2
from .yaml import DocumentationNotComplete, open_and_expand, open_and_macro_expand
38 2
from .utils import required_key, mkdir_p
39
40 2
from .xml import ElementTree as ET, add_xhtml_namespace, register_namespaces, parse_file
41 2
from .shims import unicode_func
42 2
from .build_cpe import ProductCPEs
43
44 2
def dump_yaml_preferably_in_original_order(dictionary, file_object):
45
    try:
46
        return yaml.dump(dictionary, file_object, indent=4, sort_keys=False)
47
    except TypeError as exc:
48
        # Older versions of libyaml don't understand the sort_keys kwarg
49
        if "sort_keys" not in str(exc):
50
            raise exc
51
        return yaml.dump(dictionary, file_object, indent=4)
52
53
54 2
def add_sub_element(parent, tag, data):
55
    """
56
    Creates a new child element under parent with tag tag, and sets
57
    data as the content under the tag. In particular, data is a string
58
    to be parsed as an XML tree, allowing sub-elements of children to be
59
    added.
60
61
    If data should not be parsed as an XML tree, either escape the contents
62
    before passing into this function, or use ElementTree.SubElement().
63
64
    Returns the newly created subelement of type tag.
65
    """
66
    namespaced_data = add_xhtml_namespace(data)
67
    # This is used because our YAML data contain XML and XHTML elements
68
    # ET.SubElement() escapes the < > characters by &lt; and &gt;
69
    # and therefore it does not add child elements
70
    # we need to do a hack instead
71
    # TODO: Remove this function after we move to Markdown everywhere in SSG
72
    ustr = unicode_func('<{0} xmlns:xhtml="{2}">{1}</{0}>').format(tag, namespaced_data, xhtml_namespace)
73
74
    try:
75
        element = ET.fromstring(ustr.encode("utf-8"))
76
    except Exception:
77
        msg = ("Error adding subelement to an element '{0}' from string: '{1}'"
78
               .format(parent.tag, ustr))
79
        raise RuntimeError(msg)
80
81
    parent.append(element)
82
    return element
83
84
85 2
def reorder_according_to_ordering(unordered, ordering, regex=None):
86 2
    ordered = []
87 2
    if regex is None:
88 2
        regex = "|".join(["({0})".format(item) for item in ordering])
89 2
    regex = re.compile(regex)
90
91 2
    items_to_order = list(filter(regex.match, unordered))
92 2
    unordered = set(unordered)
93
94 2
    for priority_type in ordering:
95 2
        for item in items_to_order:
96 2
            if priority_type in item and item in unordered:
97 2
                ordered.append(item)
98 2
                unordered.remove(item)
99 2
    ordered.extend(sorted(unordered))
100 2
    return ordered
101
102
103 2
def add_warning_elements(element, warnings):
104
    # The use of [{dict}, {dict}] in warnings is to handle the following
105
    # scenario where multiple warnings have the same category which is
106
    # valid in SCAP and our content:
107
    #
108
    # warnings:
109
    #     - general: Some general warning
110
    #     - general: Some other general warning
111
    #     - general: |-
112
    #         Some really long multiline general warning
113
    #
114
    # Each of the {dict} should have only one key/value pair.
115
    for warning_dict in warnings:
116
        warning = add_sub_element(element, "warning", list(warning_dict.values())[0])
117
        warning.set("category", list(warning_dict.keys())[0])
118
119
120 2
def add_nondata_subelements(element, subelement, attribute, attr_data):
121
    """Add multiple iterations of a sublement that contains an attribute but no data
122
       For example, <requires id="my_required_id"/>"""
123
    for data in attr_data:
124
        req = ET.SubElement(element, subelement)
125
        req.set(attribute, data)
126
127
128 2
def check_warnings(xccdf_structure):
129 2
    for warning_list in xccdf_structure.warnings:
130 2
        if len(warning_list) != 1:
131
            msg = "Only one key/value pair should exist for each warnings dictionary"
132
            raise ValueError(msg)
133
134
135 2
def add_reference_elements(element, references, ref_uri_dict):
136
    for ref_type, ref_vals in references.items():
137
        for ref_val in ref_vals.split(","):
138
            # This assumes that a single srg key may have items from multiple SRG types
139
            if ref_type == 'srg':
140
                if ref_val.startswith('SRG-OS-'):
141
                    ref_href = ref_uri_dict['os-srg']
142
                elif ref_val.startswith('SRG-APP-'):
143
                    ref_href = ref_uri_dict['app-srg']
144
                else:
145
                    raise ValueError("SRG {0} doesn't have a URI defined.".format(ref_val))
146
            else:
147
                try:
148
                    ref_href = ref_uri_dict[ref_type]
149
                except KeyError as exc:
150
                    msg = (
151
                        "Error processing reference {0}: {1} in Rule {2}."
152
                        .format(ref_type, ref_vals, self.id_))
153
                    raise ValueError(msg)
154
155
            ref = ET.SubElement(element, 'reference')
156
            ref.set("href", ref_href)
157
            ref.text = ref_val
158
159
160 2
def add_benchmark_metadata(element, contributors_file):
161
    metadata = ET.SubElement(element, "metadata")
162
163
    publisher = ET.SubElement(metadata, "{%s}publisher" % dc_namespace)
164
    publisher.text = SSG_PROJECT_NAME
165
166
    creator = ET.SubElement(metadata, "{%s}creator" % dc_namespace)
167
    creator.text = SSG_PROJECT_NAME
168
169
    contrib_tree = parse_file(contributors_file)
170
    for c in contrib_tree.iter('contributor'):
171
        contributor = ET.SubElement(metadata, "{%s}contributor" % dc_namespace)
172
        contributor.text = c.text
173
174
    source = ET.SubElement(metadata, "{%s}source" % dc_namespace)
175
    source.text = SSG_BENCHMARK_LATEST_URI
176
177
178 2
class SelectionHandler(object):
179 2
    def __init__(self):
180 2
        self.refine_rules = defaultdict(list)
181 2
        self.variables = dict()
182 2
        self.unselected = []
183 2
        self.selected = []
184
185 2
    @property
186
    def selections(self):
187
        selections = []
188
        for item in self.selected:
189
            selections.append(str(item))
190
        for item in self.unselected:
191
            selections.append("!"+str(item))
192
        for varname in self.variables.keys():
193
            selections.append(varname+"="+self.variables.get(varname))
194
        for rule, refinements in self.refine_rules.items():
195
            for prop, val in refinements:
196
                selections.append("{rule}.{property}={value}"
197
                                  .format(rule=rule, property=prop, value=val))
198
        return selections
199
200 2
    @selections.setter
201
    def selections(self, entries):
202 2
        for item in entries:
203 2
            self.apply_selection(item)
204
205 2
    def apply_selection(self, item):
206 2
        if "." in item:
207
            rule, refinement = item.split(".", 1)
208
            property_, value = refinement.split("=", 1)
209
            if property_ not in XCCDF_REFINABLE_PROPERTIES:
210
                msg = ("Property '{property_}' cannot be refined. "
211
                       "Rule properties that can be refined are {refinables}. "
212
                       "Fix refinement '{rule_id}.{property_}={value}' in profile '{profile}'."
213
                       .format(property_=property_, refinables=XCCDF_REFINABLE_PROPERTIES,
214
                               rule_id=rule, value=value, profile=self.id_)
215
                       )
216
                raise ValueError(msg)
217
            self.refine_rules[rule].append((property_, value))
218 2
        elif "=" in item:
219 2
            varname, value = item.split("=", 1)
220 2
            self.variables[varname] = value
221 2
        elif item.startswith("!"):
222
            self.unselected.append(item[1:])
223
        else:
224 2
            self.selected.append(item)
225
226 2
    def _subtract_refinements(self, extended_refinements):
227
        """
228
        Given a dict of rule refinements from the extended profile,
229
        "undo" every refinement prefixed with '!' in this profile.
230
        """
231
        for rule, refinements in list(self.refine_rules.items()):
232
            if rule.startswith("!"):
233
                for prop, val in refinements:
234
                    extended_refinements[rule[1:]].remove((prop, val))
235
                del self.refine_rules[rule]
236
        return extended_refinements
237
238 2
    def update_with(self, rhs):
239
        extended_selects = set(rhs.selected)
240
        extra_selections = extended_selects.difference(set(self.selected))
241
        self.selected.extend(list(extra_selections))
242
243
        updated_variables = dict(rhs.variables)
244
        updated_variables.update(self.variables)
245
        self.variables = updated_variables
246
247
        extended_refinements = deepcopy(rhs.refine_rules)
248
        updated_refinements = self._subtract_refinements(extended_refinements)
249
        updated_refinements.update(self.refine_rules)
250
        self.refine_rules = updated_refinements
251
252
253 2
class XCCDFEntity(object):
254
    """
255
    This class can load itself from a YAML with Jinja macros,
256
    and it can also save itself to YAML.
257
258
    It is supposed to work with the content in the project,
259
    when entities are defined in the benchmark tree,
260
    and they are compiled into flat YAMLs to the build directory.
261
    """
262 2
    KEYS = dict(
263
            id_=lambda: "",
264
            definition_location=lambda: "",
265
    )
266
267 2
    MANDATORY_KEYS = set()
268
269 2
    GENERIC_FILENAME = ""
270 2
    ID_LABEL = "id"
271
272 2
    def __init__(self, id_):
273 2
        super(XCCDFEntity, self).__init__()
274 2
        self._assign_defaults()
275 2
        self.id_ = id_
276
277 2
    def _assign_defaults(self):
278 2
        for key, default in self.KEYS.items():
279 2
            default_val = default()
280 2
            if isinstance(default_val, RuntimeError):
281
                default_val = None
282 2
            setattr(self, key, default_val)
283
284 2
    @classmethod
285
    def get_instance_from_full_dict(cls, data):
286
        """
287
        Given a defining dictionary, produce an instance
288
        by treating all dict elements as attributes.
289
290
        Extend this if you want tight control over the instance creation process.
291
        """
292 2
        entity = cls(data["id_"])
293 2
        for key, value in data.items():
294 2
            setattr(entity, key, value)
295 2
        return entity
296
297 2
    @classmethod
298 2
    def process_input_dict(cls, input_contents, env_yaml, product_cpes=None):
299
        """
300
        Take the contents of the definition as a dictionary, and
301
        add defaults or raise errors if a required member is not present.
302
303
        Extend this if you want to add, remove or alter the result
304
        that will constitute the new instance.
305
        """
306 2
        data = dict()
307
308 2
        for key, default in cls.KEYS.items():
309 2
            if key in input_contents:
310 2
                if input_contents[key] is not None:
311 2
                    data[key] = input_contents[key]
312 2
                del input_contents[key]
313 2
                continue
314
315 2
            if key not in cls.MANDATORY_KEYS:
316 2
                data[key] = cls.KEYS[key]()
317
            else:
318
                msg = (
319
                    "Key '{key}' is mandatory for definition of '{class_name}'."
320
                    .format(key=key, class_name=cls.__name__))
321
                raise ValueError(msg)
322
323 2
        return data
324
325 2
    @classmethod
326 2
    def parse_yaml_into_processed_dict(cls, yaml_file, env_yaml=None, product_cpes=None):
327
        """
328
        Given yaml filename and environment info, produce a dictionary
329
        that defines the instance to be created.
330
        This wraps :meth:`process_input_dict` and it adds generic keys on the top:
331
332
        - `id_` as the entity ID that is deduced either from thefilename,
333
          or from the parent directory name.
334
        - `definition_location` as the original location whenre the entity got defined.
335
        """
336 2
        file_basename = os.path.basename(yaml_file)
337 2
        entity_id = file_basename.split(".")[0]
338 2
        if file_basename == cls.GENERIC_FILENAME:
339 2
            entity_id = os.path.basename(os.path.dirname(yaml_file))
340
341 2
        if env_yaml:
342 2
            env_yaml[cls.ID_LABEL] = entity_id
343 2
        yaml_data = open_and_macro_expand(yaml_file, env_yaml)
344
345 2
        try:
346 2
            processed_data = cls.process_input_dict(yaml_data, env_yaml, product_cpes)
347
        except ValueError as exc:
348
            msg = (
349
                "Error processing {yaml_file}: {exc}"
350
                .format(yaml_file=yaml_file, exc=str(exc)))
351
            raise ValueError(msg)
352
353 2
        if yaml_data:
354
            msg = (
355
                "Unparsed YAML data in '{yaml_file}': {keys}"
356
                .format(yaml_file=yaml_file, keys=list(yaml_data.keys())))
357
            raise RuntimeError(msg)
358
359 2
        if not processed_data.get("definition_location", ""):
360 2
            processed_data["definition_location"] = yaml_file
361
362 2
        processed_data["id_"] = entity_id
363
364 2
        return processed_data
365
366 2
    @classmethod
367 2
    def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None):
368 2
        yaml_file = os.path.normpath(yaml_file)
369
370 2
        local_env_yaml = None
371 2
        if env_yaml:
372 2
            local_env_yaml = dict()
373 2
            local_env_yaml.update(env_yaml)
374
375 2
        try:
376 2
            data_dict = cls.parse_yaml_into_processed_dict(yaml_file, local_env_yaml, product_cpes)
377
        except DocumentationNotComplete as exc:
378
            raise
379
        except Exception as exc:
380
            msg = (
381
                "Error loading a {class_name} from {filename}: {error}"
382
                .format(class_name=cls.__name__, filename=yaml_file, error=str(exc)))
383
            raise RuntimeError(msg)
384
385 2
        result = cls.get_instance_from_full_dict(data_dict)
386
387 2
        return result
388
389 2
    def represent_as_dict(self):
390
        """
391
        Produce a dict representation of the class.
392
393
        Extend this method if you need the representation to be different from the object.
394
        """
395 2
        data = dict()
396 2
        for key in self.KEYS:
397 2
            value = getattr(self, key)
398 2
            if value or True:
399 2
                data[key] = getattr(self, key)
400 2
        del data["id_"]
401 2
        return data
402
403 2
    def dump_yaml(self, file_name, documentation_complete=True):
404
        to_dump = self.represent_as_dict()
405
        to_dump["documentation_complete"] = documentation_complete
406
        with open(file_name, "w+") as f:
407
            dump_yaml_preferably_in_original_order(to_dump, f)
408
409 2
    def to_xml_element(self):
410
        raise NotImplementedError()
411
412
413 2
class Profile(XCCDFEntity, SelectionHandler):
414
    """Represents XCCDF profile
415
    """
416 2
    KEYS = dict(
417
        title=lambda: "",
418
        description=lambda: "",
419
        extends=lambda: "",
420
        metadata=lambda: None,
421
        reference=lambda: None,
422
        selections=lambda: list(),
423
        platforms=lambda: set(),
424
        cpe_names=lambda: set(),
425
        platform=lambda: None,
426
        filter_rules=lambda: "",
427
        ** XCCDFEntity.KEYS
428
    )
429
430 2
    MANDATORY_KEYS = {
431
        "title",
432
        "description",
433
        "selections",
434
    }
435
436 2
    @classmethod
437
    def process_input_dict(cls, input_contents, env_yaml, product_cpes):
438 2
        input_contents = super(Profile, cls).process_input_dict(input_contents, env_yaml)
439
440 2
        platform = input_contents.get("platform")
441 2
        if platform is not None:
442
            input_contents["platforms"].add(platform)
443
444 2
        if env_yaml:
445 2
            for platform in input_contents["platforms"]:
446
                try:
447
                    new_cpe_name = product_cpes.get_cpe_name(platform)
448
                    input_contents["cpe_names"].add(new_cpe_name)
449
                except CPEDoesNotExist:
450
                    msg = (
451
                        "Unsupported platform '{platform}' in a profile."
452
                        .format(platform=platform))
453
                    raise CPEDoesNotExist(msg)
454
455 2
        return input_contents
456
457 2
    @property
458
    def rule_filter(self):
459
        if self.filter_rules:
460
            return rule_filter_from_def(self.filter_rules)
461
        else:
462
            return noop_rule_filterfunc
463
464 2
    def to_xml_element(self):
465
        element = ET.Element('Profile')
466
        element.set("id", self.id_)
467
        if self.extends:
468
            element.set("extends", self.extends)
469
        title = add_sub_element(element, "title", self.title)
470
        title.set("override", "true")
471
        desc = add_sub_element(element, "description", self.description)
472
        desc.set("override", "true")
473
474
        if self.reference:
475
            add_sub_element(element, "reference", escape(self.reference))
476
477
        for cpe_name in self.cpe_names:
478
            plat = ET.SubElement(element, "platform")
479
            plat.set("idref", cpe_name)
480
481
        for selection in self.selected:
482
            select = ET.Element("select")
483
            select.set("idref", selection)
484
            select.set("selected", "true")
485
            element.append(select)
486
487
        for selection in self.unselected:
488
            unselect = ET.Element("select")
489
            unselect.set("idref", selection)
490
            unselect.set("selected", "false")
491
            element.append(unselect)
492
493
        for value_id, selector in self.variables.items():
494
            refine_value = ET.Element("refine-value")
495
            refine_value.set("idref", value_id)
496
            refine_value.set("selector", selector)
497
            element.append(refine_value)
498
499
        for refined_rule, refinement_list in self.refine_rules.items():
500
            refine_rule = ET.Element("refine-rule")
501
            refine_rule.set("idref", refined_rule)
502
            for refinement in refinement_list:
503
                refine_rule.set(refinement[0], refinement[1])
504
            element.append(refine_rule)
505
506
        return element
507
508 2
    def get_rule_selectors(self):
509 2
        return self.selected + self.unselected
510
511 2
    def get_variable_selectors(self):
512 2
        return self.variables
513
514 2
    def validate_refine_rules(self, rules):
515
        existing_rule_ids = [r.id_ for r in rules]
516
        for refine_rule, refinement_list in self.refine_rules.items():
517
            # Take first refinement to ilustrate where the error is
518
            # all refinements in list are invalid, so it doesn't really matter
519
            a_refinement = refinement_list[0]
520
521
            if refine_rule not in existing_rule_ids:
522
                msg = (
523
                    "You are trying to refine a rule that doesn't exist. "
524
                    "Rule '{rule_id}' was not found in the benchmark. "
525
                    "Please check all rule refinements for rule: '{rule_id}', for example: "
526
                    "- {rule_id}.{property_}={value}' in profile {profile_id}."
527
                    .format(rule_id=refine_rule, profile_id=self.id_,
528
                            property_=a_refinement[0], value=a_refinement[1])
529
                    )
530
                raise ValueError(msg)
531
532
            if refine_rule not in self.get_rule_selectors():
533
                msg = ("- {rule_id}.{property_}={value}' in profile '{profile_id}' is refining "
534
                       "a rule that is not selected by it. The refinement will not have any "
535
                       "noticeable effect. Either select the rule or remove the rule refinement."
536
                       .format(rule_id=refine_rule, property_=a_refinement[0],
537
                               value=a_refinement[1], profile_id=self.id_)
538
                       )
539
                raise ValueError(msg)
540
541 2
    def validate_variables(self, variables):
542
        variables_by_id = dict()
543
        for var in variables:
544
            variables_by_id[var.id_] = var
545
546
        for var_id, our_val in self.variables.items():
547
            if var_id not in variables_by_id:
548
                all_vars_list = [" - %s" % v for v in variables_by_id.keys()]
549
                msg = (
550
                    "Value '{var_id}' in profile '{profile_name}' is not known. "
551
                    "We know only variables:\n{var_names}"
552
                    .format(
553
                        var_id=var_id, profile_name=self.id_,
554
                        var_names="\n".join(sorted(all_vars_list)))
555
                )
556
                raise ValueError(msg)
557
558
            allowed_selectors = [str(s) for s in variables_by_id[var_id].options.keys()]
559
            if our_val not in allowed_selectors:
560
                msg = (
561
                    "Value '{var_id}' in profile '{profile_name}' "
562
                    "uses the selector '{our_val}'. "
563
                    "This is not possible, as only selectors {all_selectors} are available. "
564
                    "Either change the selector used in the profile, or "
565
                    "add the selector-value pair to the variable definition."
566
                    .format(
567
                        var_id=var_id, profile_name=self.id_, our_val=our_val,
568
                        all_selectors=allowed_selectors,
569
                    )
570
                )
571
                raise ValueError(msg)
572
573 2
    def validate_rules(self, rules, groups):
574
        existing_rule_ids = [r.id_ for r in rules]
575
        rule_selectors = self.get_rule_selectors()
576
        for id_ in rule_selectors:
577
            if id_ in groups:
578
                msg = (
579
                    "You have selected a group '{group_id}' instead of a "
580
                    "rule. Groups have no effect in the profile and are not "
581
                    "allowed to be selected. Please remove '{group_id}' "
582
                    "from profile '{profile_id}' before proceeding."
583
                    .format(group_id=id_, profile_id=self.id_)
584
                )
585
                raise ValueError(msg)
586
            if id_ not in existing_rule_ids:
587
                msg = (
588
                    "Rule '{rule_id}' was not found in the benchmark. Please "
589
                    "remove rule '{rule_id}' from profile '{profile_id}' "
590
                    "before proceeding."
591
                    .format(rule_id=id_, profile_id=self.id_)
592
                )
593
                raise ValueError(msg)
594
595 2
    def __sub__(self, other):
596
        profile = Profile(self.id_)
597
        profile.title = self.title
598
        profile.description = self.description
599
        profile.extends = self.extends
600
        profile.platforms = self.platforms
601
        profile.platform = self.platform
602
        profile.selected = list(set(self.selected) - set(other.selected))
603
        profile.selected.sort()
604
        profile.unselected = list(set(self.unselected) - set(other.unselected))
605
        profile.variables = dict ((k, v) for (k, v) in self.variables.items()
606
                             if k not in other.variables or v != other.variables[k])
607
        return profile
608
609
610 2
class ResolvableProfile(Profile):
611 2
    def __init__(self, * args, ** kwargs):
612
        super(ResolvableProfile, self).__init__(* args, ** kwargs)
613
        self.resolved = False
614
615 2
    def _controls_ids_to_controls(self, controls_manager, policy_id, control_id_list):
616
        items = [controls_manager.get_control(policy_id, cid) for cid in control_id_list]
617
        return items
618
619 2
    def resolve_controls(self, controls_manager):
620
        pass
621
622 2
    def extend_by(self, extended_profile):
623
        self.update_with(extended_profile)
624
625 2
    def resolve_selections_with_rules(self, rules_by_id):
626
        selections = set()
627
        for rid in self.selected:
628
            if rid not in rules_by_id:
629
                continue
630
            rule = rules_by_id[rid]
631
            if not self.rule_filter(rule):
632
                continue
633
            selections.add(rid)
634
        self.selected = list(selections)
635
636 2
    def resolve(self, all_profiles, rules_by_id, controls_manager=None):
637
        if self.resolved:
638
            return
639
640
        if controls_manager:
641
            self.resolve_controls(controls_manager)
642
643
        self.resolve_selections_with_rules(rules_by_id)
644
645
        if self.extends:
646
            if self.extends not in all_profiles:
647
                msg = (
648
                    "Profile {name} extends profile {extended}, but "
649
                    "only profiles {known_profiles} are available for resolution."
650
                    .format(name=self.id_, extended=self.extends,
651
                            known_profiles=list(all_profiles.keys())))
652
                raise RuntimeError(msg)
653
            extended_profile = all_profiles[self.extends]
654
            extended_profile.resolve(all_profiles, rules_by_id, controls_manager)
655
656
            self.extend_by(extended_profile)
657
658
        self.selected = [s for s in set(self.selected) if s not in self.unselected]
659
660
        self.unselected = []
661
        self.extends = None
662
663
        self.selected = sorted(self.selected)
664
665
        for rid in self.selected:
666
            if rid not in rules_by_id:
667
                msg = (
668
                    "Rule {rid} is selected by {profile}, but the rule is not available. "
669
                    "This may be caused by a discrepancy of prodtypes."
670
                    .format(rid=rid, profile=self.id_))
671
                raise ValueError(msg)
672
673
        self.resolved = True
674
675
676 2
class ProfileWithInlinePolicies(ResolvableProfile):
677 2
    def __init__(self, * args, ** kwargs):
678
        super(ProfileWithInlinePolicies, self).__init__(* args, ** kwargs)
679
        self.controls_by_policy = defaultdict(list)
680
681 2
    def apply_selection(self, item):
682
        # ":" is the delimiter for controls but not when the item is a variable
683
        if ":" in item and "=" not in item:
684
            policy_id, control_id = item.split(":", 1)
685
            self.controls_by_policy[policy_id].append(control_id)
686
        else:
687
            super(ProfileWithInlinePolicies, self).apply_selection(item)
688
689 2
    def _process_controls_ids_into_controls(self, controls_manager, policy_id, controls_ids):
690
        controls = []
691
        for cid in controls_ids:
692
            if not cid.startswith("all"):
693
                controls.extend(
694
                    self._controls_ids_to_controls(controls_manager, policy_id, [cid]))
695
            elif ":" in cid:
696
                _, level_id = cid.split(":", 1)
697
                controls.extend(
698
                    controls_manager.get_all_controls_of_level(policy_id, level_id))
699
            else:
700
                controls.extend(
701
                    controls_manager.get_all_controls(policy_id))
702
        return controls
703
704 2
    def resolve_controls(self, controls_manager):
705
        for policy_id, controls_ids in self.controls_by_policy.items():
706
            controls = self._process_controls_ids_into_controls(
707
                controls_manager, policy_id, controls_ids)
708
709
            for c in controls:
710
                self.update_with(c)
711
712
713 2
class Value(XCCDFEntity):
714
    """Represents XCCDF Value
715
    """
716 2
    KEYS = dict(
717
        title=lambda: "",
718
        description=lambda: "",
719
        type=lambda: "",
720
        operator=lambda: "equals",
721
        interactive=lambda: False,
722
        options=lambda: dict(),
723
        warnings=lambda: list(),
724
        ** XCCDFEntity.KEYS
725
    )
726
727 2
    MANDATORY_KEYS = {
728
        "title",
729
        "description",
730
        "type",
731
    }
732
733 2
    @classmethod
734 2
    def process_input_dict(cls, input_contents, env_yaml, product_cpes=None):
735 2
        input_contents["interactive"] = (
736
            input_contents.get("interactive", "false").lower() == "true")
737
738 2
        data = super(Value, cls).process_input_dict(input_contents, env_yaml)
739
740 2
        possible_operators = ["equals", "not equal", "greater than",
741
                              "less than", "greater than or equal",
742
                              "less than or equal", "pattern match"]
743
744 2
        if data["operator"] not in possible_operators:
745
            raise ValueError(
746
                "Found an invalid operator value '%s'. "
747
                "Expected one of: %s"
748
                % (data["operator"], ", ".join(possible_operators))
749
            )
750
751 2
        return data
752
753 2
    @classmethod
754 2
    def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None):
755 2
        value = super(Value, cls).from_yaml(yaml_file, env_yaml)
756
757 2
        check_warnings(value)
758
759 2
        return value
760
761 2
    def to_xml_element(self):
762
        value = ET.Element('Value')
763
        value.set('id', self.id_)
764
        value.set('type', self.type)
765
        if self.operator != "equals":  # equals is the default
766
            value.set('operator', self.operator)
767
        if self.interactive:  # False is the default
768
            value.set('interactive', 'true')
769
        title = ET.SubElement(value, 'title')
770
        title.text = self.title
771
        add_sub_element(value, 'description', self.description)
772
        add_warning_elements(value, self.warnings)
773
774
        for selector, option in self.options.items():
775
            # do not confuse Value with big V with value with small v
776
            # value is child element of Value
777
            value_small = ET.SubElement(value, 'value')
778
            # by XCCDF spec, default value is value without selector
779
            if selector != "default":
780
                value_small.set('selector', str(selector))
781
            value_small.text = str(option)
782
783
        return value
784
785 2
    def to_file(self, file_name):
786
        root = self.to_xml_element()
787
        tree = ET.ElementTree(root)
788
        tree.write(file_name)
789
790
791 2
class Benchmark(XCCDFEntity):
792
    """Represents XCCDF Benchmark
793
    """
794 2
    KEYS = dict(
795
        title=lambda: "",
796
        status=lambda: "",
797
        description=lambda: "",
798
        notice_id=lambda: "",
799
        notice_description=lambda: "",
800
        front_matter=lambda: "",
801
        rear_matter=lambda: "",
802
        cpes=lambda: list(),
803
        version=lambda: "",
804
        profiles=lambda: list(),
805
        values=lambda: dict(),
806
        groups=lambda: dict(),
807
        rules=lambda: dict(),
808
        platforms=lambda: dict(),
809
        product_cpe_names=lambda: list(),
810
        ** XCCDFEntity.KEYS
811
    )
812
813 2
    MANDATORY_KEYS = {
814
        "title",
815
        "status",
816
        "description",
817
        "front_matter",
818
        "rear_matter",
819
    }
820
821 2
    GENERIC_FILENAME = "benchmark.yml"
822
823 2
    def load_entities(self, rules_by_id, values_by_id, groups_by_id):
824
        for rid, val in self.rules.items():
825
            if not val:
826
                self.rules[rid] = rules_by_id[rid]
827
828
        for vid, val in self.values.items():
829
            if not val:
830
                self.values[vid] = values_by_id[vid]
831
832
        for gid, val in self.groups.items():
833
            if not val:
834
                self.groups[gid] = groups_by_id[gid]
835
836 2
    @classmethod
837
    def process_input_dict(cls, input_contents, env_yaml, product_cpes):
838
        input_contents["front_matter"] = input_contents["front-matter"]
839
        del input_contents["front-matter"]
840
        input_contents["rear_matter"] = input_contents["rear-matter"]
841
        del input_contents["rear-matter"]
842
843
        data = super(Benchmark, cls).process_input_dict(input_contents, env_yaml, product_cpes)
844
845
        notice_contents = required_key(input_contents, "notice")
846
        del input_contents["notice"]
847
848
        data["notice_id"] = required_key(notice_contents, "id")
849
        del notice_contents["id"]
850
851
        data["notice_description"] = required_key(notice_contents, "description")
852
        del notice_contents["description"]
853
854
        return data
855
856 2
    def represent_as_dict(self):
857
        data = super(Benchmark, cls).represent_as_dict()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable cls does not seem to be defined.
Loading history...
858
        data["rear-matter"] = data["rear_matter"]
859
        del data["rear_matter"]
860
861
        data["front-matter"] = data["front_matter"]
862
        del data["front_matter"]
863
        return data
864
865 2
    @classmethod
866 2
    def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None):
867
        benchmark = super(Benchmark, cls).from_yaml(yaml_file, env_yaml)
868
        if env_yaml:
869
            benchmark.product_cpe_names = product_cpes.get_product_cpe_names()
870
            benchmark.product_cpes = product_cpes
871
            benchmark.id_ = env_yaml["benchmark_id"]
872
            benchmark.version = env_yaml["ssg_version_str"]
873
        else:
874
            benchmark.id_ = "product-name"
875
            benchmark.version = "0.0"
876
877
        return benchmark
878
879 2
    def add_profiles_from_dir(self, dir_, env_yaml, product_cpes):
880
        for dir_item in sorted(os.listdir(dir_)):
881
            dir_item_path = os.path.join(dir_, dir_item)
882
            if not os.path.isfile(dir_item_path):
883
                continue
884
885
            _, ext = os.path.splitext(os.path.basename(dir_item_path))
886
            if ext != '.profile':
887
                sys.stderr.write(
888
                    "Encountered file '%s' while looking for profiles, "
889
                    "extension '%s' is unknown. Skipping..\n"
890
                    % (dir_item, ext)
891
                )
892
                continue
893
894
            try:
895
                new_profile = ProfileWithInlinePolicies.from_yaml(dir_item_path, env_yaml, product_cpes)
896
            except DocumentationNotComplete:
897
                continue
898
            except Exception as exc:
899
                msg = ("Error building profile from '{fname}': '{error}'"
900
                       .format(fname=dir_item_path, error=str(exc)))
901
                raise RuntimeError(msg)
902
            if new_profile is None:
903
                continue
904
905
            self.profiles.append(new_profile)
906
907 2
    def to_xml_element(self, env_yaml=None, product_cpes=None):
908
        root = ET.Element('Benchmark')
909
        root.set('id', self.id_)
910
        root.set('xmlns', "http://checklists.nist.gov/xccdf/1.1")
911
        root.set('xmlns:xsi', 'http://www.w3.org/2001/XMLSchema-instance')
912
        root.set('xsi:schemaLocation',
913
                 'http://checklists.nist.gov/xccdf/1.1 xccdf-1.1.4.xsd')
914
        root.set('style', 'SCAP_1.1')
915
        root.set('resolved', 'false')
916
        root.set('xml:lang', 'en-US')
917
        status = ET.SubElement(root, 'status')
918
        status.set('date', datetime.date.today().strftime("%Y-%m-%d"))
919
        status.text = self.status
920
        add_sub_element(root, "title", self.title)
921
        add_sub_element(root, "description", self.description)
922
        notice = add_sub_element(root, "notice", self.notice_description)
923
        notice.set('id', self.notice_id)
924
        add_sub_element(root, "front-matter", self.front_matter)
925
        add_sub_element(root, "rear-matter", self.rear_matter)
926
        # if there are no platforms, do not output platform-specification at all
927
        if len(self.product_cpes.platforms) > 0:
928
            cpe_platform_spec = ET.Element(
929
                "{%s}platform-specification" % PREFIX_TO_NS["cpe-lang"])
930
            for platform in self.product_cpes.platforms.values():
931
                cpe_platform_spec.append(platform.to_xml_element())
932
            root.append(cpe_platform_spec)
933
934
        # The Benchmark applicability is determined by the CPEs
935
        # defined in the product.yml
936
        for cpe_name in self.product_cpe_names:
937
            plat = ET.SubElement(root, "platform")
938
            plat.set("idref", cpe_name)
939
940
        version = ET.SubElement(root, 'version')
941
        version.text = self.version
942
        version.set('update', SSG_BENCHMARK_LATEST_URI)
943
944
        contributors_file = os.path.join(os.path.dirname(__file__), "../Contributors.xml")
945
        add_benchmark_metadata(root, contributors_file)
946
947
        for profile in self.profiles:
948
            root.append(profile.to_xml_element())
949
950
        for value in self.values.values():
951
            root.append(value.to_xml_element())
952
953
        groups_in_bench = list(self.groups.keys())
954
        priority_order = ["system", "services"]
955
        groups_in_bench = reorder_according_to_ordering(groups_in_bench, priority_order)
956
957
        # Make system group the first, followed by services group
958
        for group_id in groups_in_bench:
959
            group = self.groups.get(group_id)
960
            # Products using application benchmark don't have system or services group
961
            if group is not None:
962
                root.append(group.to_xml_element(env_yaml))
963
964
        for rule in self.rules.values():
965
            root.append(rule.to_xml_element(env_yaml))
966
967
        return root
968
969 2
    def to_file(self, file_name, env_yaml=None):
970
        root = self.to_xml_element(env_yaml)
971
        tree = ET.ElementTree(root)
972
        tree.write(file_name)
973
974 2
    def add_value(self, value):
975
        if value is None:
976
            return
977
        self.values[value.id_] = value
978
979
    # The benchmark is also considered a group, so this function signature needs to match
980
    # Group()'s add_group()
981 2
    def add_group(self, group, env_yaml=None, product_cpes=None):
982
        if group is None:
983
            return
984
        self.groups[group.id_] = group
985
986 2
    def add_rule(self, rule):
987
        if rule is None:
988
            return
989
        self.rules[rule.id_] = rule
990
991 2
    def to_xccdf(self):
992
        """We can easily extend this script to generate a valid XCCDF instead
993
        of SSG SHORTHAND.
994
        """
995
        raise NotImplementedError
996
997 2
    def __str__(self):
998
        return self.id_
999
1000
1001 2
class Group(XCCDFEntity):
1002
    """Represents XCCDF Group
1003
    """
1004 2
    ATTRIBUTES_TO_PASS_ON = (
1005
        "platforms",
1006
        "cpe_platform_names",
1007
    )
1008
1009 2
    GENERIC_FILENAME = "group.yml"
1010
1011 2
    KEYS = dict(
1012
        prodtype=lambda: "all",
1013
        title=lambda: "",
1014
        description=lambda: "",
1015
        warnings=lambda: list(),
1016
        requires=lambda: list(),
1017
        conflicts=lambda: list(),
1018
        values=lambda: dict(),
1019
        groups=lambda: dict(),
1020
        rules=lambda: dict(),
1021
        platform=lambda: "",
1022
        platforms=lambda: set(),
1023
        cpe_platform_names=lambda: set(),
1024
        ** XCCDFEntity.KEYS
1025
    )
1026
1027 2
    MANDATORY_KEYS = {
1028
        "title",
1029
        "status",
1030
        "description",
1031
        "front_matter",
1032
        "rear_matter",
1033
    }
1034
1035 2
    @classmethod
1036 2
    def process_input_dict(cls, input_contents, env_yaml, product_cpes=None):
1037
        data = super(Group, cls).process_input_dict(input_contents, env_yaml, product_cpes)
1038
        if data["rules"]:
1039
            rule_ids = data["rules"]
1040
            data["rules"] = {rid: None for rid in rule_ids}
1041
1042
        if data["groups"]:
1043
            group_ids = data["groups"]
1044
            data["groups"] = {gid: None for gid in group_ids}
1045
1046
        if data["values"]:
1047
            value_ids = data["values"]
1048
            data["values"] = {vid: None for vid in value_ids}
1049
1050
        if data["platform"]:
1051
            data["platforms"].add(data["platform"])
1052
1053
        # parse platform definition and get CPEAL platform if cpe_platform_names not already defined
1054
        if data["platforms"] and not data["cpe_platform_names"]:
1055
            for platform in data["platforms"]:
1056
                cpe_platform = Platform.from_text(platform, product_cpes)
1057
                cpe_platform = add_platform_if_not_defined(cpe_platform, product_cpes)
1058
                data["cpe_platform_names"].add(cpe_platform.id_)
1059
        return data
1060
1061 2
    def load_entities(self, rules_by_id, values_by_id, groups_by_id):
1062
        for rid, val in self.rules.items():
1063
            if not val:
1064
                self.rules[rid] = rules_by_id[rid]
1065
1066
        for vid, val in self.values.items():
1067
            if not val:
1068
                self.values[vid] = values_by_id[vid]
1069
1070
        for gid in list(self.groups):
1071
            val = self.groups.get(gid, None)
1072
            if not val:
1073
                try:
1074
                    self.groups[gid] = groups_by_id[gid]
1075
                except KeyError:
1076
                    # Add only the groups we have compiled and loaded
1077
                    del self.groups[gid]
1078
                    pass
1079
1080 2
    def represent_as_dict(self):
1081
        yaml_contents = super(Group, self).represent_as_dict()
1082
1083
        if self.rules:
1084
            yaml_contents["rules"] = sorted(list(self.rules.keys()))
1085
        if self.groups:
1086
            yaml_contents["groups"] = sorted(list(self.groups.keys()))
1087
        if self.values:
1088
            yaml_contents["values"] = sorted(list(self.values.keys()))
1089
1090
        return yaml_contents
1091
1092 2
    def validate_prodtype(self, yaml_file):
1093
        for ptype in self.prodtype.split(","):
1094
            if ptype.strip() != ptype:
1095
                msg = (
1096
                    "Comma-separated '{prodtype}' prodtype "
1097
                    "in {yaml_file} contains whitespace."
1098
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
1099
                raise ValueError(msg)
1100
1101 2
    def to_xml_element(self, env_yaml=None):
1102
        group = ET.Element('Group')
1103
        group.set('id', self.id_)
1104
        title = ET.SubElement(group, 'title')
1105
        title.text = self.title
1106
        add_sub_element(group, 'description', self.description)
1107
        add_warning_elements(group, self.warnings)
1108
1109
        # This is where references should be put if there are any
1110
        # This is where rationale should be put if there are any
1111
1112
        for cpe_platform_name in self.cpe_platform_names:
1113
            platform_el = ET.SubElement(group, "platform")
1114
            platform_el.set("idref", "#"+cpe_platform_name)
1115
1116
        add_nondata_subelements(group, "requires", "idref", self.requires)
1117
        add_nondata_subelements(group, "conflicts", "idref", self.conflicts)
1118
1119
        for _value in self.values.values():
1120
            group.append(_value.to_xml_element())
1121
1122
        # Rules that install or remove packages affect remediation
1123
        # of other rules.
1124
        # When packages installed/removed rules come first:
1125
        # The Rules are ordered in more logical way, and
1126
        # remediation order is natural, first the package is installed, then configured.
1127
        rules_in_group = list(self.rules.keys())
1128
        regex = (r'(package_.*_(installed|removed))|' +
1129
                 r'(service_.*_(enabled|disabled))|' +
1130
                 r'install_smartcard_packages$')
1131
        priority_order = ["installed", "install_smartcard_packages", "removed",
1132
                          "enabled", "disabled"]
1133
        rules_in_group = reorder_according_to_ordering(rules_in_group, priority_order, regex)
1134
1135
        # Add rules in priority order, first all packages installed, then removed,
1136
        # followed by services enabled, then disabled
1137
        for rule_id in rules_in_group:
1138
            group.append(self.rules.get(rule_id).to_xml_element(env_yaml))
1139
1140
        # Add the sub groups after any current level group rules.
1141
        # As package installed/removed and service enabled/disabled rules are usuallly in
1142
        # top level group, this ensures groups that further configure a package or service
1143
        # are after rules that install or remove it.
1144
        groups_in_group = list(self.groups.keys())
1145
        priority_order = [
1146
            # Make sure rpm_verify_(hashes|permissions|ownership) are run before any other rule.
1147
            # Due to conflicts between rules rpm_verify_* rules and any rule that configures
1148
            # stricter settings, like file_permissions_grub2_cfg and sudo_dedicated_group,
1149
            # the rules deviating from the system default should be evaluated later.
1150
            # So that in the end the system has contents, permissions and ownership reset, and
1151
            # any deviations or stricter settings are applied by the rules in the profile.
1152
            "software", "integrity", "integrity-software", "rpm_verification",
1153
1154
            # The account group has to precede audit group because
1155
            # the rule package_screen_installed is desired to be executed before the rule
1156
            # audit_rules_privileged_commands, othervise the rule
1157
            # does not catch newly installed screen binary during remediation
1158
            # and report fail
1159
            "accounts", "auditing",
1160
1161
1162
            # The FIPS group should come before Crypto,
1163
            # if we want to set a different (stricter) Crypto Policy than FIPS.
1164
            "fips", "crypto",
1165
1166
            # The firewalld_activation must come before ruleset_modifications, othervise
1167
            # remediations for ruleset_modifications won't work
1168
            "firewalld_activation", "ruleset_modifications",
1169
1170
            # Rules from group disabling_ipv6 must precede rules from configuring_ipv6,
1171
            # otherwise the remediation prints error although it is successful
1172
            "disabling_ipv6", "configuring_ipv6"
1173
        ]
1174
        groups_in_group = reorder_according_to_ordering(groups_in_group, priority_order)
1175
        for group_id in groups_in_group:
1176
            _group = self.groups[group_id]
1177
            group.append(_group.to_xml_element(env_yaml))
1178
1179
        return group
1180
1181 2
    def to_file(self, file_name):
1182
        root = self.to_xml_element()
1183
        tree = ET.ElementTree(root)
1184
        tree.write(file_name)
1185
1186 2
    def add_value(self, value):
1187
        if value is None:
1188
            return
1189
        self.values[value.id_] = value
1190
1191 2
    def add_group(self, group, env_yaml=None, product_cpes=None):
1192
        if group is None:
1193
            return
1194
        if self.platforms and not group.platforms:
1195
            group.platforms = self.platforms
1196
        self.groups[group.id_] = group
1197
        self._pass_our_properties_on_to(group)
1198
1199
        # Once the group has inherited properties, update cpe_names
1200
        if env_yaml:
1201
            for platform in group.platforms:
1202
                cpe_platform = Platform.from_text(platform, product_cpes)
1203
                cpe_platform = add_platform_if_not_defined(cpe_platform, product_cpes)
1204
                group.cpe_platform_names.add(cpe_platform.id_)
1205
1206 2
    def _pass_our_properties_on_to(self, obj):
1207
        for attr in self.ATTRIBUTES_TO_PASS_ON:
1208
            if hasattr(obj, attr) and getattr(obj, attr) is None:
1209
                setattr(obj, attr, getattr(self, attr))
1210
1211 2
    def add_rule(self, rule, env_yaml=None, product_cpes=None):
1212
        if rule is None:
1213
            return
1214
        if self.platforms and not rule.platforms:
1215
            rule.platforms = self.platforms
1216
        self.rules[rule.id_] = rule
1217
        self._pass_our_properties_on_to(rule)
1218
1219
        # Once the rule has inherited properties, update cpe_platform_names
1220
        if env_yaml:
1221
            for platform in rule.platforms:
1222
                cpe_platform = Platform.from_text(platform, product_cpes)
1223
                cpe_platform = add_platform_if_not_defined(cpe_platform, product_cpes)
1224
                rule.cpe_platform_names.add(cpe_platform.id_)
1225
1226 2
    def __str__(self):
1227
        return self.id_
1228
1229
1230 2
def noop_rule_filterfunc(rule):
1231
    return True
1232
1233 2
def rule_filter_from_def(filterdef):
1234
    if filterdef is None or filterdef == "":
1235
        return noop_rule_filterfunc
1236
1237
    def filterfunc(rule):
1238
        # Remove globals for security and only expose
1239
        # variables relevant to the rule
1240
        return eval(filterdef, {"__builtins__": None}, rule.__dict__)
1241
    return filterfunc
1242
1243
1244 2
class Rule(XCCDFEntity):
1245
    """Represents XCCDF Rule
1246
    """
1247 2
    KEYS = dict(
1248
        prodtype=lambda: "all",
1249
        title=lambda: "",
1250
        description=lambda: "",
1251
        rationale=lambda: "",
1252
        severity=lambda: "",
1253
        references=lambda: dict(),
1254
        identifiers=lambda: dict(),
1255
        ocil_clause=lambda: None,
1256
        ocil=lambda: None,
1257
        oval_external_content=lambda: None,
1258
        fix=lambda: "",
1259
        warnings=lambda: list(),
1260
        conflicts=lambda: list(),
1261
        requires=lambda: list(),
1262
        platform=lambda: None,
1263
        platforms=lambda: set(),
1264
        sce_metadata=lambda: dict(),
1265
        inherited_platforms=lambda: list(),
1266
        template=lambda: None,
1267
        cpe_platform_names=lambda: set(),
1268
        inherited_cpe_platform_names=lambda: list(),
1269
        bash_conditional=lambda: None,
1270
        ** XCCDFEntity.KEYS
1271
    )
1272
1273 2
    MANDATORY_KEYS = {
1274
        "title",
1275
        "description",
1276
        "rationale",
1277
        "severity",
1278
    }
1279
1280 2
    GENERIC_FILENAME = "rule.yml"
1281 2
    ID_LABEL = "rule_id"
1282
1283 2
    PRODUCT_REFERENCES = ("stigid", "cis",)
1284 2
    GLOBAL_REFERENCES = ("srg", "vmmsrg", "disa", "cis-csc",)
1285
1286 2
    def __init__(self, id_):
1287 2
        super(Rule, self).__init__(id_)
1288 2
        self.sce_metadata = None
1289
1290 2
    def __deepcopy__(self, memo):
1291
        cls = self.__class__
1292
        result = cls.__new__(cls)
1293
        memo[id(self)] = result
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable id does not seem to be defined.
Loading history...
1294
        for k, v in self.__dict__.items():
1295
            # These are difficult to deep copy, so let's just re-use them.
1296
            if k != "template" and k != "local_env_yaml":
1297
                setattr(result, k, deepcopy(v, memo))
1298
            else:
1299
                setattr(result, k, v)
1300
        return result
1301
1302 2
    @classmethod
1303 2
    def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None, sce_metadata=None):
1304 2
        rule = super(Rule, cls).from_yaml(yaml_file, env_yaml, product_cpes)
1305
1306
        # platforms are read as list from the yaml file
1307
        # we need them to convert to set again
1308 2
        rule.platforms = set(rule.platforms)
1309
1310
        # rule.platforms.update(set(rule.inherited_platforms))
1311
1312 2
        check_warnings(rule)
1313
1314
        # ensure that content of rule.platform is in rule.platforms as
1315
        # well
1316 2
        if rule.platform is not None:
1317 2
            rule.platforms.add(rule.platform)
1318
1319
        # Convert the platform names to CPE names
1320
        # But only do it if an env_yaml was specified (otherwise there would be no product CPEs
1321
        # to lookup), and the rule's prodtype matches the product being built
1322
        # also if the rule already has cpe_platform_names specified (compiled rule)
1323
        # do not evaluate platforms again
1324 2
        if (
1325
                env_yaml and env_yaml["product"] in parse_prodtype(rule.prodtype)
1326
                or env_yaml and rule.prodtype == "all") and product_cpes and not rule.cpe_platform_names:
1327
            # parse platform definition and get CPEAL platform
1328
            for platform in rule.platforms:
1329
                cpe_platform = Platform.from_text(platform, product_cpes)
1330
                cpe_platform = add_platform_if_not_defined(cpe_platform, product_cpes)
1331
                rule.cpe_platform_names.add(cpe_platform.id_)
1332
1333
1334 2
        if sce_metadata and rule.id_ in sce_metadata:
1335
            rule.sce_metadata = sce_metadata[rule.id_]
1336
            rule.sce_metadata["relative_path"] = os.path.join(
1337
                env_yaml["product"], "checks/sce", rule.sce_metadata['filename'])
1338
1339 2
        rule.validate_prodtype(yaml_file)
1340 2
        rule.validate_identifiers(yaml_file)
1341 2
        rule.validate_references(yaml_file)
1342 2
        return rule
1343
1344 2
    def _verify_stigid_format(self, product):
1345 2
        stig_id = self.references.get("stigid", None)
1346 2
        if not stig_id:
1347 2
            return
1348 2
        if "," in stig_id:
1349 2
            raise ValueError("Rules can not have multiple STIG IDs.")
1350
1351 2
    def _verify_disa_cci_format(self):
1352 2
        cci_id = self.references.get("disa", None)
1353 2
        if not cci_id:
1354 2
            return
1355
        cci_ex = re.compile(r'^CCI-[0-9]{6}$')
1356
        for cci in cci_id.split(","):
1357
            if not cci_ex.match(cci):
1358
                raise ValueError("CCI '{}' is in the wrong format! "
1359
                                 "Format should be similar to: "
1360
                                 "CCI-XXXXXX".format(cci))
1361
        self.references["disa"] = cci_id
1362
1363 2
    def normalize(self, product):
1364 2
        try:
1365 2
            self.make_refs_and_identifiers_product_specific(product)
1366 2
            self.make_template_product_specific(product)
1367 2
        except Exception as exc:
1368 2
            msg = (
1369
                "Error normalizing '{rule}': {msg}"
1370
                .format(rule=self.id_, msg=str(exc))
1371
            )
1372 2
            raise RuntimeError(msg)
1373
1374 2
    def _get_product_only_references(self):
1375 2
        product_references = dict()
1376
1377 2
        for ref in Rule.PRODUCT_REFERENCES:
1378 2
            start = "{0}@".format(ref)
1379 2
            for gref, gval in self.references.items():
1380 2
                if ref == gref or gref.startswith(start):
1381 2
                    product_references[gref] = gval
1382 2
        return product_references
1383
1384 2
    def make_template_product_specific(self, product):
1385 2
        product_suffix = "@{0}".format(product)
1386
1387 2
        if not self.template:
1388
            return
1389
1390 2
        not_specific_vars = self.template.get("vars", dict())
1391 2
        specific_vars = self._make_items_product_specific(
1392
            not_specific_vars, product_suffix, True)
1393 2
        self.template["vars"] = specific_vars
1394
1395 2
        not_specific_backends = self.template.get("backends", dict())
1396 2
        specific_backends = self._make_items_product_specific(
1397
            not_specific_backends, product_suffix, True)
1398 2
        self.template["backends"] = specific_backends
1399
1400 2
    def make_refs_and_identifiers_product_specific(self, product):
1401 2
        product_suffix = "@{0}".format(product)
1402
1403 2
        product_references = self._get_product_only_references()
1404 2
        general_references = self.references.copy()
1405 2
        for todel in product_references:
1406 2
            general_references.pop(todel)
1407 2
        for ref in Rule.PRODUCT_REFERENCES:
1408 2
            if ref in general_references:
1409
                msg = "Unexpected reference identifier ({0}) without "
1410
                msg += "product qualifier ({0}@{1}) while building rule "
1411
                msg += "{2}"
1412
                msg = msg.format(ref, product, self.id_)
1413
                raise ValueError(msg)
1414
1415 2
        to_set = dict(
1416
            identifiers=(self.identifiers, False),
1417
            general_references=(general_references, True),
1418
            product_references=(product_references, False),
1419
        )
1420 2
        for name, (dic, allow_overwrites) in to_set.items():
1421 2
            try:
1422 2
                new_items = self._make_items_product_specific(
1423
                    dic, product_suffix, allow_overwrites)
1424 2
            except ValueError as exc:
1425 2
                msg = (
1426
                    "Error processing {what} for rule '{rid}': {msg}"
1427
                    .format(what=name, rid=self.id_, msg=str(exc))
1428
                )
1429 2
                raise ValueError(msg)
1430 2
            dic.clear()
1431 2
            dic.update(new_items)
1432
1433 2
        self.references = general_references
1434 2
        self._verify_disa_cci_format()
1435 2
        self.references.update(product_references)
1436
1437 2
        self._verify_stigid_format(product)
1438
1439 2
    def _make_items_product_specific(self, items_dict, product_suffix, allow_overwrites=False):
1440 2
        new_items = dict()
1441 2
        for full_label, value in items_dict.items():
1442 2
            if "@" not in full_label and full_label not in new_items:
1443 2
                new_items[full_label] = value
1444 2
                continue
1445
1446 2
            label = full_label.split("@")[0]
1447
1448
            # this test should occur before matching product_suffix with the product qualifier
1449
            # present in the reference, so it catches problems even for products that are not
1450
            # being built at the moment
1451 2
            if label in Rule.GLOBAL_REFERENCES:
1452
                msg = (
1453
                    "You cannot use product-qualified for the '{item_u}' reference. "
1454
                    "Please remove the product-qualifier and merge values with the "
1455
                    "existing reference if there is any. Original line: {item_q}: {value_q}"
1456
                    .format(item_u=label, item_q=full_label, value_q=value)
1457
                )
1458
                raise ValueError(msg)
1459
1460 2
            if not full_label.endswith(product_suffix):
1461 2
                continue
1462
1463 2
            if label in items_dict and not allow_overwrites and value != items_dict[label]:
1464 2
                msg = (
1465
                    "There is a product-qualified '{item_q}' item, "
1466
                    "but also an unqualified '{item_u}' item "
1467
                    "and those two differ in value - "
1468
                    "'{value_q}' vs '{value_u}' respectively."
1469
                    .format(item_q=full_label, item_u=label,
1470
                            value_q=value, value_u=items_dict[label])
1471
                )
1472 2
                raise ValueError(msg)
1473 2
            new_items[label] = value
1474 2
        return new_items
1475
1476 2
    def validate_identifiers(self, yaml_file):
1477 2
        if self.identifiers is None:
1478
            raise ValueError("Empty identifier section in file %s" % yaml_file)
1479
1480
        # Validate all identifiers are non-empty:
1481 2
        for ident_type, ident_val in self.identifiers.items():
1482 2
            if not isinstance(ident_type, str) or not isinstance(ident_val, str):
1483
                raise ValueError("Identifiers and values must be strings: %s in file %s"
1484
                                 % (ident_type, yaml_file))
1485 2
            if ident_val.strip() == "":
1486
                raise ValueError("Identifiers must not be empty: %s in file %s"
1487
                                 % (ident_type, yaml_file))
1488 2
            if ident_type[0:3] == 'cce':
1489 2
                if not is_cce_format_valid(ident_val):
1490
                    raise ValueError("CCE Identifier format must be valid: invalid format '%s' for CEE '%s'"
1491
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
1492 2
                if not is_cce_value_valid("CCE-" + ident_val):
1493
                    raise ValueError("CCE Identifier value is not a valid checksum: invalid value '%s' for CEE '%s'"
1494
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
1495
1496 2
    def validate_references(self, yaml_file):
1497 2
        if self.references is None:
1498
            raise ValueError("Empty references section in file %s" % yaml_file)
1499
1500 2
        for ref_type, ref_val in self.references.items():
1501 2
            if not isinstance(ref_type, str) or not isinstance(ref_val, str):
1502
                raise ValueError("References and values must be strings: %s in file %s"
1503
                                 % (ref_type, yaml_file))
1504 2
            if ref_val.strip() == "":
1505
                raise ValueError("References must not be empty: %s in file %s"
1506
                                 % (ref_type, yaml_file))
1507
1508 2
        for ref_type, ref_val in self.references.items():
1509 2
            for ref in ref_val.split(","):
1510 2
                if ref.strip() != ref:
1511
                    msg = (
1512
                        "Comma-separated '{ref_type}' reference "
1513
                        "in {yaml_file} contains whitespace."
1514
                        .format(ref_type=ref_type, yaml_file=yaml_file))
1515
                    raise ValueError(msg)
1516
1517 2
    def validate_prodtype(self, yaml_file):
1518 2
        for ptype in self.prodtype.split(","):
1519 2
            if ptype.strip() != ptype:
1520
                msg = (
1521
                    "Comma-separated '{prodtype}' prodtype "
1522
                    "in {yaml_file} contains whitespace."
1523
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
1524
                raise ValueError(msg)
1525
1526 2
    def to_xml_element(self, env_yaml=None):
1527
        rule = ET.Element('Rule')
1528
        rule.set('selected', 'false')
1529
        rule.set('id', self.id_)
1530
        rule.set('severity', self.severity)
1531
        add_sub_element(rule, 'title', self.title)
1532
        add_sub_element(rule, 'description', self.description)
1533
        add_warning_elements(rule, self.warnings)
1534
1535
        if env_yaml:
1536
            ref_uri_dict = env_yaml['reference_uris']
1537
        else:
1538
            ref_uri_dict = SSG_REF_URIS
1539
        add_reference_elements(rule, self.references, ref_uri_dict)
1540
1541
        add_sub_element(rule, 'rationale', self.rationale)
1542
1543
        for cpe_platform_name in sorted(self.cpe_platform_names):
1544
            platform_el = ET.SubElement(rule, "platform")
1545
            platform_el.set("idref", "#"+cpe_platform_name)
1546
1547
        add_nondata_subelements(rule, "requires", "idref", self.requires)
1548
        add_nondata_subelements(rule, "conflicts", "idref", self.conflicts)
1549
1550
        for ident_type, ident_val in self.identifiers.items():
1551
            ident = ET.SubElement(rule, 'ident')
1552
            if ident_type == 'cce':
1553
                ident.set('system', cce_uri)
1554
                ident.text = ident_val
1555
1556
        ocil_parent = rule
1557
        check_parent = rule
1558
1559
        if self.sce_metadata:
1560
            # TODO: This is pretty much another hack, just like the previous OVAL
1561
            # one. However, we avoided the external SCE content as I'm not sure it
1562
            # is generally useful (unlike say, CVE checking with external OVAL)
1563
            #
1564
            # Additionally, we build the content (check subelement) here rather
1565
            # than in xslt due to the nature of our SCE metadata.
1566
            #
1567
            # Finally, before we begin, we might have an element with both SCE
1568
            # and OVAL. We have no way of knowing (right here) whether that is
1569
            # the case (due to a variety of issues, most notably, that linking
1570
            # hasn't yet occurred). So we must rely on the content author's
1571
            # good will, by annotating SCE content with a complex-check tag
1572
            # if necessary.
1573
1574
            if 'complex-check' in self.sce_metadata:
1575
                # Here we have an issue: XCCDF allows EITHER one or more check
1576
                # elements OR a single complex-check. While we have an explicit
1577
                # case handling the OVAL-and-SCE interaction, OCIL entries have
1578
                # (historically) been alongside OVAL content and been in an
1579
                # "OR" manner -- preferring OVAL to SCE. In order to accomplish
1580
                # this, we thus need to add _yet another parent_ when OCIL data
1581
                # is present, and add update ocil_parent accordingly.
1582
                if self.ocil or self.ocil_clause:
1583
                    ocil_parent = ET.SubElement(ocil_parent, "complex-check")
1584
                    ocil_parent.set('operator', 'OR')
1585
1586
                check_parent = ET.SubElement(ocil_parent, "complex-check")
1587
                check_parent.set('operator', self.sce_metadata['complex-check'])
1588
1589
            # Now, add the SCE check element to the tree.
1590
            check = ET.SubElement(check_parent, "check")
1591
            check.set("system", SCE_SYSTEM)
1592
1593
            if 'check-import' in self.sce_metadata:
1594
                if isinstance(self.sce_metadata['check-import'], str):
1595
                    self.sce_metadata['check-import'] = [self.sce_metadata['check-import']]
1596
                for entry in self.sce_metadata['check-import']:
1597
                    check_import = ET.SubElement(check, 'check-import')
1598
                    check_import.set('import-name', entry)
1599
                    check_import.text = None
1600
1601
            if 'check-export' in self.sce_metadata:
1602
                if isinstance(self.sce_metadata['check-export'], str):
1603
                    self.sce_metadata['check-export'] = [self.sce_metadata['check-export']]
1604
                for entry in self.sce_metadata['check-export']:
1605
                    export, value = entry.split('=')
1606
                    check_export = ET.SubElement(check, 'check-export')
1607
                    check_export.set('value-id', value)
1608
                    check_export.set('export-name', export)
1609
                    check_export.text = None
1610
1611
            check_ref = ET.SubElement(check, "check-content-ref")
1612
            href = self.sce_metadata['relative_path']
1613
            check_ref.set("href", href)
1614
1615
        check = ET.SubElement(check_parent, 'check')
1616
        check.set("system", oval_namespace)
1617
        check_content_ref = ET.SubElement(check, "check-content-ref")
1618
        if self.oval_external_content:
1619
            check_content_ref.set("href", self.oval_external_content)
1620
        else:
1621
            # TODO: This is pretty much a hack, oval ID will be the same as rule ID
1622
            #       and we don't want the developers to have to keep them in sync.
1623
            #       Therefore let's just add an OVAL ref of that ID.
1624
            # TODO  Can we not add the check element if the rule doesn't have an OVAL check?
1625
            #       At the moment, the check elements of rules without OVAL are removed by
1626
            #       relabel_ids.py
1627
            check_content_ref.set("href", "oval-unlinked.xml")
1628
            check_content_ref.set("name", self.id_)
1629
1630
        if self.ocil or self.ocil_clause:
1631
            ocil_check = ET.SubElement(check_parent, "check")
1632
            ocil_check.set("system", ocil_cs)
1633
            ocil_check_ref = ET.SubElement(ocil_check, "check-content-ref")
1634
            ocil_check_ref.set("href", "ocil-unlinked.xml")
1635
            ocil_check_ref.set("name", self.id_ + "_ocil")
1636
1637
        return rule
1638
1639 2
    def to_file(self, file_name):
1640
        root = self.to_xml_element()
1641
        tree = ET.ElementTree(root)
1642
        tree.write(file_name)
1643
1644 2
    def to_ocil(self):
1645
        if not self.ocil and not self.ocil_clause:
1646
            raise ValueError("Rule {0} doesn't have OCIL".format(self.id_))
1647
        # Create <questionnaire> for the rule
1648
        questionnaire = ET.Element("questionnaire", id=self.id_ + "_ocil")
1649
        title = ET.SubElement(questionnaire, "title")
1650
        title.text = self.title
1651
        actions = ET.SubElement(questionnaire, "actions")
1652
        test_action_ref = ET.SubElement(actions, "test_action_ref")
1653
        test_action_ref.text = self.id_ + "_action"
1654
        # Create <boolean_question_test_action> for the rule
1655
        action = ET.Element(
1656
            "boolean_question_test_action",
1657
            id=self.id_ + "_action",
1658
            question_ref=self.id_ + "_question")
1659
        when_true = ET.SubElement(action, "when_true")
1660
        result = ET.SubElement(when_true, "result")
1661
        result.text = "PASS"
1662
        when_true = ET.SubElement(action, "when_false")
1663
        result = ET.SubElement(when_true, "result")
1664
        result.text = "FAIL"
1665
        # Create <boolean_question>
1666
        boolean_question = ET.Element(
1667
            "boolean_question", id=self.id_ + "_question")
1668
        # TODO: The contents of <question_text> element used to be broken in
1669
        # the legacy XSLT implementation. The following code contains hacks
1670
        # to get the same results as in the legacy XSLT implementation.
1671
        # This enabled us a smooth transition to new OCIL generator
1672
        # without a need to mass-edit rule YAML files.
1673
        # We need to solve:
1674
        # TODO: using variables (aka XCCDF Values) in OCIL content
1675
        # TODO: using HTML formating tags eg. <pre> in OCIL content
1676
        #
1677
        # The "ocil" key in compiled rules contains HTML and XML elements
1678
        # but OCIL question texts shouldn't contain HTML or XML elements,
1679
        # therefore removing them.
1680
        if self.ocil is not None:
1681
            ocil_without_tags = re.sub(r"</?[^>]+>", "", self.ocil)
1682
        else:
1683
            ocil_without_tags = ""
1684
        # The "ocil" key in compiled rules contains XML entities which would
1685
        # be escaped by ET.Subelement() so we need to use add_sub_element()
1686
        # instead because we don't want to escape them.
1687
        question_text = add_sub_element(
1688
            boolean_question, "question_text", ocil_without_tags)
1689
        # The "ocil_clause" key in compiled rules also contains HTML and XML
1690
        # elements but unlike the "ocil" we want to escape the '<' and '>'
1691
        # characters.
1692
        # The empty ocil_clause causing broken question is in line with the
1693
        # legacy XSLT implementation.
1694
        ocil_clause = self.ocil_clause if self.ocil_clause else ""
1695
        question_text.text = (
1696
            u"{0}\n      Is it the case that {1}?\n      ".format(
1697
                question_text.text if question_text.text is not None else "",
1698
                ocil_clause))
1699
        return (questionnaire, action, boolean_question)
1700
1701 2
    def __hash__(self):
1702
        """ Controls are meant to be unique, so using the
1703
        ID should suffice"""
1704
        return hash(self.id_)
1705
1706 2
    def __eq__(self, other):
1707
        return isinstance(other, self.__class__) and self.id_ == other.id_
1708
1709 2
    def __ne__(self, other):
1710
        return not self != other
1711
1712 2
    def __lt__(self, other):
1713
        return self.id_ < other.id_
1714
1715 2
    def __str__(self):
1716
        return self.id_
1717
1718
1719 2
class DirectoryLoader(object):
1720 2
    def __init__(self, profiles_dir, env_yaml, product_cpes):
1721
        self.benchmark_file = None
1722
        self.group_file = None
1723
        self.loaded_group = None
1724
        self.rule_files = []
1725
        self.value_files = []
1726
        self.subdirectories = []
1727
1728
        self.all_values = dict()
1729
        self.all_rules = dict()
1730
        self.all_groups = dict()
1731
1732
        self.profiles_dir = profiles_dir
1733
        self.env_yaml = env_yaml
1734
        self.product = env_yaml["product"]
1735
1736
        self.parent_group = None
1737
        self.product_cpes = product_cpes
1738
1739 2
    def _collect_items_to_load(self, guide_directory):
1740
        for dir_item in sorted(os.listdir(guide_directory)):
1741
            dir_item_path = os.path.join(guide_directory, dir_item)
1742
            _, extension = os.path.splitext(dir_item)
1743
1744
            if extension == '.var':
1745
                self.value_files.append(dir_item_path)
1746
            elif dir_item == "benchmark.yml":
1747
                if self.benchmark_file:
1748
                    raise ValueError("Multiple benchmarks in one directory")
1749
                self.benchmark_file = dir_item_path
1750
            elif dir_item == "group.yml":
1751
                if self.group_file:
1752
                    raise ValueError("Multiple groups in one directory")
1753
                self.group_file = dir_item_path
1754
            elif extension == '.rule':
1755
                self.rule_files.append(dir_item_path)
1756
            elif is_rule_dir(dir_item_path):
1757
                self.rule_files.append(get_rule_dir_yaml(dir_item_path))
1758
            elif dir_item != "tests":
1759
                if os.path.isdir(dir_item_path):
1760
                    self.subdirectories.append(dir_item_path)
1761
                else:
1762
                    sys.stderr.write(
1763
                        "Encountered file '%s' while recursing, extension '%s' "
1764
                        "is unknown. Skipping..\n"
1765
                        % (dir_item, extension)
1766
                    )
1767
1768 2
    def load_benchmark_or_group(self, guide_directory):
1769
        """
1770
        Loads a given benchmark or group from the specified benchmark_file or
1771
        group_file, in the context of guide_directory, profiles_dir and env_yaml.
1772
1773
        Returns the loaded group or benchmark.
1774
        """
1775
        group = None
1776
        if self.group_file and self.benchmark_file:
1777
            raise ValueError("A .benchmark file and a .group file were found in "
1778
                             "the same directory '%s'" % (guide_directory))
1779
1780
        # we treat benchmark as a special form of group in the following code
1781
        if self.benchmark_file:
1782
            group = Benchmark.from_yaml(
1783
                self.benchmark_file, self.env_yaml, self.product_cpes
1784
            )
1785
            if self.profiles_dir:
1786
                group.add_profiles_from_dir(self.profiles_dir, self.env_yaml)
1787
1788
        if self.group_file:
1789
            group = Group.from_yaml(self.group_file, self.env_yaml, self.product_cpes)
1790
            prodtypes = parse_prodtype(group.prodtype)
1791
            if "all" in prodtypes or self.product in prodtypes:
1792
                self.all_groups[group.id_] = group
1793
1794
        return group
1795
1796 2
    def _load_group_process_and_recurse(self, guide_directory):
1797
        self.loaded_group = self.load_benchmark_or_group(guide_directory)
1798
1799
        if self.loaded_group:
1800
1801
            if self.parent_group:
1802
                self.parent_group.add_group(
1803
                    self.loaded_group, env_yaml=self.env_yaml, product_cpes=self.product_cpes)
1804
1805
            self._process_values()
1806
            self._recurse_into_subdirs()
1807
            self._process_rules()
1808
1809 2
    def process_directory_tree(self, start_dir, extra_group_dirs=None):
1810
        self._collect_items_to_load(start_dir)
1811
        if extra_group_dirs:
1812
            self.subdirectories += extra_group_dirs
1813
        self._load_group_process_and_recurse(start_dir)
1814
1815 2
    def process_directory_trees(self, directories):
1816
        start_dir = directories[0]
1817
        extra_group_dirs = directories[1:]
1818
        return self.process_directory_tree(start_dir, extra_group_dirs)
1819
1820 2
    def _recurse_into_subdirs(self):
1821
        for subdir in self.subdirectories:
1822
            loader = self._get_new_loader()
1823
            loader.parent_group = self.loaded_group
1824
            loader.process_directory_tree(subdir)
1825
            self.all_values.update(loader.all_values)
1826
            self.all_rules.update(loader.all_rules)
1827
            self.all_groups.update(loader.all_groups)
1828
1829 2
    def _get_new_loader(self):
1830
        raise NotImplementedError()
1831
1832 2
    def _process_values(self):
1833
        raise NotImplementedError()
1834
1835 2
    def _process_rules(self):
1836
        raise NotImplementedError()
1837
1838 2
    def save_all_entities(self, base_dir):
1839
        destdir = os.path.join(base_dir, "rules")
1840
        mkdir_p(destdir)
1841
        if self.all_rules:
1842
            self.save_entities(self.all_rules.values(), destdir)
1843
1844
        destdir = os.path.join(base_dir, "groups")
1845
        mkdir_p(destdir)
1846
        if self.all_groups:
1847
            self.save_entities(self.all_groups.values(), destdir)
1848
1849
        destdir = os.path.join(base_dir, "values")
1850
        mkdir_p(destdir)
1851
        if self.all_values:
1852
            self.save_entities(self.all_values.values(), destdir)
1853
1854
        destdir = os.path.join(base_dir, "platforms")
1855
        mkdir_p(destdir)
1856
        if self.product_cpes.platforms:
1857
            self.save_entities(self.product_cpes.platforms.values(), destdir)
1858
1859 2
    def save_entities(self, entities, destdir):
1860
        if not entities:
1861
            return
1862
        for entity in entities:
1863
            basename = entity.id_ + ".yml"
1864
            dest_filename = os.path.join(destdir, basename)
1865
            entity.dump_yaml(dest_filename)
1866
1867
1868 2
class BuildLoader(DirectoryLoader):
1869 2
    def __init__(self, profiles_dir, env_yaml,
1870
                 product_cpes, sce_metadata_path=None):
1871
        super(BuildLoader, self).__init__(profiles_dir, env_yaml, product_cpes)
1872
1873
        self.sce_metadata = None
1874
        if sce_metadata_path and os.path.getsize(sce_metadata_path):
1875
            self.sce_metadata = json.load(open(sce_metadata_path, 'r'))
1876
1877 2
    def _process_values(self):
1878
        for value_yaml in self.value_files:
1879
            value = Value.from_yaml(value_yaml, self.env_yaml)
1880
            self.all_values[value.id_] = value
1881
            self.loaded_group.add_value(value)
1882
1883 2
    def _process_rules(self):
1884
        for rule_yaml in self.rule_files:
1885
            try:
1886
                rule = Rule.from_yaml(
1887
                    rule_yaml, self.env_yaml, self.product_cpes, self.sce_metadata)
1888
            except DocumentationNotComplete:
1889
                # Happens on non-debug build when a rule is "documentation-incomplete"
1890
                continue
1891
            prodtypes = parse_prodtype(rule.prodtype)
1892
            if "all" not in prodtypes and self.product not in prodtypes:
1893
                continue
1894
            self.all_rules[rule.id_] = rule
1895
            self.loaded_group.add_rule(
1896
                rule, env_yaml=self.env_yaml, product_cpes=self.product_cpes)
1897
1898
            if self.loaded_group.cpe_platform_names:
1899
                rule.inherited_cpe_platform_names += self.loaded_group.cpe_platform_names
1900
1901
            rule.normalize(self.env_yaml["product"])
1902
1903 2
    def _get_new_loader(self):
1904
        loader = BuildLoader(
1905
            self.profiles_dir, self.env_yaml, self.product_cpes)
1906
        # Do it this way so we only have to parse the SCE metadata once.
1907
        loader.sce_metadata = self.sce_metadata
1908
        return loader
1909
1910 2
    def export_group_to_file(self, filename):
1911
        return self.loaded_group.to_file(filename)
1912
1913
1914 2
class LinearLoader(object):
1915 2
    def __init__(self, env_yaml, resolved_path):
1916
        self.resolved_rules_dir = os.path.join(resolved_path, "rules")
1917
        self.rules = dict()
1918
1919
        self.resolved_profiles_dir = os.path.join(resolved_path, "profiles")
1920
        self.profiles = dict()
1921
1922
        self.resolved_groups_dir = os.path.join(resolved_path, "groups")
1923
        self.groups = dict()
1924
1925
        self.resolved_values_dir = os.path.join(resolved_path, "values")
1926
        self.values = dict()
1927
1928
        self.resolved_platforms_dir = os.path.join(resolved_path, "platforms")
1929
        self.platforms = dict()
1930
1931
        self.benchmark = None
1932
        self.env_yaml = env_yaml
1933
        self.product_cpes = ProductCPEs(env_yaml)
1934
1935 2
    def find_first_groups_ids(self, start_dir):
1936
        group_files = glob.glob(os.path.join(start_dir, "*", "group.yml"))
1937
        group_ids = [fname.split(os.path.sep)[-2] for fname in group_files]
1938
        return group_ids
1939
1940 2
    def load_entities_by_id(self, filenames, destination, cls):
1941
        for fname in filenames:
1942
            entity = cls.from_yaml(fname, self.env_yaml, self.product_cpes)
1943
            destination[entity.id_] = entity
1944
1945 2
    def load_benchmark(self, directory):
1946
        self.benchmark = Benchmark.from_yaml(
1947
            os.path.join(directory, "benchmark.yml"), self.env_yaml, self.product_cpes)
1948
1949
        self.benchmark.add_profiles_from_dir(self.resolved_profiles_dir, self.env_yaml, self.product_cpes)
1950
1951
        benchmark_first_groups = self.find_first_groups_ids(directory)
1952
        for gid in benchmark_first_groups:
1953
            try:
1954
                self.benchmark.add_group(self.groups[gid], self.env_yaml, self.product_cpes)
1955
            except KeyError as exc:
1956
                # Add only the groups we have compiled and loaded
1957
                pass
1958
1959 2
    def load_compiled_content(self):
1960
        filenames = glob.glob(os.path.join(self.resolved_rules_dir, "*.yml"))
1961
        self.load_entities_by_id(filenames, self.rules, Rule)
1962
1963
        filenames = glob.glob(os.path.join(self.resolved_groups_dir, "*.yml"))
1964
        self.load_entities_by_id(filenames, self.groups, Group)
1965
1966
        filenames = glob.glob(os.path.join(self.resolved_profiles_dir, "*.yml"))
1967
        self.load_entities_by_id(filenames, self.profiles, Profile)
1968
1969
        filenames = glob.glob(os.path.join(self.resolved_values_dir, "*.yml"))
1970
        self.load_entities_by_id(filenames, self.values, Value)
1971
1972
        filenames = glob.glob(os.path.join(self.resolved_platforms_dir, "*.yml"))
1973
        self.load_entities_by_id(filenames, self.platforms, Platform)
1974
        self.product_cpes.platforms = self.platforms
1975
1976
        for g in self.groups.values():
1977
            g.load_entities(self.rules, self.values, self.groups)
1978
1979 2
    def export_benchmark_to_file(self, filename):
1980
        register_namespaces()
1981
        return self.benchmark.to_file(filename, self.env_yaml)
1982
1983 2
    def export_ocil_to_file(self, filename):
1984
        root = ET.Element('ocil')
1985
        root.set('xmlns:xsi', xsi_namespace)
1986
        root.set("xmlns", ocil_namespace)
1987
        root.set("xmlns:xhtml", xhtml_namespace)
1988
        tree = ET.ElementTree(root)
1989
        generator = ET.SubElement(root, "generator")
1990
        product_name = ET.SubElement(generator, "product_name")
1991
        product_name.text = "build_shorthand.py from SCAP Security Guide"
1992
        product_version = ET.SubElement(generator, "product_version")
1993
        product_version.text = "ssg: " + self.env_yaml["ssg_version_str"]
1994
        schema_version = ET.SubElement(generator, "schema_version")
1995
        schema_version.text = "2.0"
1996
        timestamp_el = ET.SubElement(generator, "timestamp")
1997
        timestamp_el.text = timestamp
1998
        questionnaires = ET.SubElement(root, "questionnaires")
1999
        test_actions = ET.SubElement(root, "test_actions")
2000
        questions = ET.SubElement(root, "questions")
2001
        for rule in self.rules.values():
2002
            if not rule.ocil and not rule.ocil_clause:
2003
                continue
2004
            questionnaire, action, boolean_question = rule.to_ocil()
2005
            questionnaires.append(questionnaire)
2006
            test_actions.append(action)
2007
            questions.append(boolean_question)
2008
        tree.write(filename)
2009
2010
2011 2
class Platform(XCCDFEntity):
2012
2013 2
    KEYS = dict(
2014
        name=lambda: "",
2015
        original_expression=lambda: "",
2016
        xml_content=lambda: "",
2017
        bash_conditional=lambda: "",
2018
        ansible_conditional=lambda: "",
2019
        ** XCCDFEntity.KEYS
2020
    )
2021
2022 2
    MANDATORY_KEYS = [
2023
        "name",
2024
        "xml_content",
2025
        "original_expression",
2026
        "bash_conditional",
2027
        "ansible_conditional"
2028
    ]
2029
2030 2
    prefix = "cpe-lang"
2031 2
    ns = PREFIX_TO_NS[prefix]
2032
2033 2
    @classmethod
2034
    def from_text(cls, expression, product_cpes):
2035 2
        if not product_cpes:
2036
            return None
2037 2
        test = product_cpes.algebra.parse(
2038
            expression, simplify=True)
2039 2
        id = test.as_id()
2040 2
        platform = cls(id)
2041 2
        platform.test = test
2042 2
        platform.test.enrich_with_cpe_info(product_cpes)
2043 2
        platform.name = id
2044 2
        platform.original_expression = expression
2045 2
        platform.xml_content = platform.get_xml()
2046 2
        platform.bash_conditional = platform.test.to_bash_conditional()
2047 2
        platform.ansible_conditional = platform.test.to_ansible_conditional()
2048 2
        return platform
2049
2050 2
    def get_xml(self):
2051 2
        cpe_platform = ET.Element("{%s}platform" % Platform.ns)
2052 2
        cpe_platform.set('id', self.name)
2053
        # in case the platform contains only single CPE name, fake the logical test
2054
        # we have to athere to CPE specification
2055 2
        if isinstance(self.test, CPEALFactRef):
2056 2
            cpe_test = ET.Element("{%s}logical-test" % CPEALLogicalTest.ns)
2057 2
            cpe_test.set('operator', 'AND')
2058 2
            cpe_test.set('negate', 'false')
2059 2
            cpe_test.append(self.test.to_xml_element())
2060 2
            cpe_platform.append(cpe_test)
2061
        else:
2062 2
            cpe_platform.append(self.test.to_xml_element())
2063 2
        xmlstr = ET.tostring(cpe_platform, encoding="utf8", method="xml")
2064 2
        return xmlstr
2065
2066 2
    def to_xml_element(self):
2067 2
        return self.xml_content
2068
2069 2
    def to_bash_conditional(self):
2070 2
        return self.bash_conditional
2071
2072 2
    def to_ansible_conditional(self):
2073 2
        return self.ansible_conditional
2074
2075 2
    @classmethod
2076 2
    def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None):
2077 2
        platform = super(Platform, cls).from_yaml(yaml_file, env_yaml)
2078 2
        platform.xml_content = ET.fromstring(platform.xml_content)
2079
        # if we did receive a product_cpes, we can restore also the original test object
2080
        # it can be later used e.g. for comparison
2081 2
        if product_cpes:
2082
            platform.test = product_cpes.algebra.parse(
2083
                platform.original_expression, simplify=True)
2084 2
        return platform
2085
2086 2
    def __eq__(self, other):
2087 2
        if not isinstance(other, Platform):
2088
            return False
2089
        else:
2090 2
            return self.test == other.test
2091
2092
2093 2
def add_platform_if_not_defined(platform, product_cpes):
2094
    # check if the platform is already in the dictionary. If yes, return the existing one
2095
    for p in product_cpes.platforms.values():
2096
        if platform == p:
2097
            return p
2098
    product_cpes.platforms[platform.id_] = platform
2099
    return platform
2100