Test Failed
Pull Request — master (#7716)
by Matěj
02:51
created

ssg.build_yaml.Group.load_entities()   B

Complexity

Conditions 7

Size

Total Lines 12
Code Lines 10

Duplication

Lines 12
Ratio 100 %

Code Coverage

Tests 1
CRAP Score 42.7209

Importance

Changes 0
Metric Value
cc 7
eloc 10
nop 4
dl 12
loc 12
ccs 1
cts 10
cp 0.1
crap 42.7209
rs 8
c 0
b 0
f 0
1 2
from __future__ import absolute_import
2 2
from __future__ import print_function
3
4 2
from collections import defaultdict
5 2
from copy import deepcopy
6 2
import datetime
7 2
import json
8 2
import os
9 2
import os.path
10 2
import re
11 2
import sys
12 2
from xml.sax.saxutils import escape
13 2
import glob
14
15 2
import yaml
16
17 2
from .build_cpe import CPEDoesNotExist
18 2
from .constants import XCCDF_REFINABLE_PROPERTIES, SCE_SYSTEM
19 2
from .rules import get_rule_dir_id, get_rule_dir_yaml, is_rule_dir
20 2
from .rule_yaml import parse_prodtype
21
22 2
from .cce import is_cce_format_valid, is_cce_value_valid
23 2
from .yaml import DocumentationNotComplete, open_and_expand, open_and_macro_expand
24 2
from .utils import required_key, mkdir_p
25
26 2
from .xml import ElementTree as ET
27 2
from .shims import unicode_func
28
29
30 2
def add_sub_element(parent, tag, data):
31
    """
32
    Creates a new child element under parent with tag tag, and sets
33
    data as the content under the tag. In particular, data is a string
34
    to be parsed as an XML tree, allowing sub-elements of children to be
35
    added.
36
37
    If data should not be parsed as an XML tree, either escape the contents
38
    before passing into this function, or use ElementTree.SubElement().
39
40
    Returns the newly created subelement of type tag.
41
    """
42
    # This is used because our YAML data contain XML and XHTML elements
43
    # ET.SubElement() escapes the < > characters by &lt; and &gt;
44
    # and therefore it does not add child elements
45
    # we need to do a hack instead
46
    # TODO: Remove this function after we move to Markdown everywhere in SSG
47
    ustr = unicode_func("<{0}>{1}</{0}>").format(tag, data)
48
49
    try:
50
        element = ET.fromstring(ustr.encode("utf-8"))
51
    except Exception:
52
        msg = ("Error adding subelement to an element '{0}' from string: '{1}'"
53
               .format(parent.tag, ustr))
54
        raise RuntimeError(msg)
55
56
    parent.append(element)
57
    return element
58
59
60 2
def reorder_according_to_ordering(unordered, ordering, regex=None):
61 2
    ordered = []
62 2
    if regex is None:
63 2
        regex = "|".join(["({0})".format(item) for item in ordering])
64 2
    regex = re.compile(regex)
65
66 2
    items_to_order = list(filter(regex.match, unordered))
67 2
    unordered = set(unordered)
68
69 2
    for priority_type in ordering:
70 2
        for item in items_to_order:
71 2
            if priority_type in item and item in unordered:
72 2
                ordered.append(item)
73 2
                unordered.remove(item)
74 2
    ordered.extend(sorted(unordered))
75 2
    return ordered
76
77
78 2
def add_warning_elements(element, warnings):
79
    # The use of [{dict}, {dict}] in warnings is to handle the following
80
    # scenario where multiple warnings have the same category which is
81
    # valid in SCAP and our content:
82
    #
83
    # warnings:
84
    #     - general: Some general warning
85
    #     - general: Some other general warning
86
    #     - general: |-
87
    #         Some really long multiline general warning
88
    #
89
    # Each of the {dict} should have only one key/value pair.
90
    for warning_dict in warnings:
91
        warning = add_sub_element(element, "warning", list(warning_dict.values())[0])
92
        warning.set("category", list(warning_dict.keys())[0])
93
94
95 2
def add_nondata_subelements(element, subelement, attribute, attr_data):
96
    """Add multiple iterations of a sublement that contains an attribute but no data
97
       For example, <requires id="my_required_id"/>"""
98
    for data in attr_data:
99
        req = ET.SubElement(element, subelement)
100
        req.set(attribute, data)
101
102
103 2
def check_warnings(xccdf_structure):
104 2
    for warning_list in xccdf_structure.warnings:
105
        if len(warning_list) != 1:
106
            msg = "Only one key/value pair should exist for each warnings dictionary"
107
            raise ValueError(msg)
108
109
110 2
class SelectionHandler(object):
111 2
    def __init__(self):
112 2
        self.refine_rules = defaultdict(list)
113 2
        self.variables = dict()
114 2
        self.unselected = []
115 2
        self.selected = []
116
117 2
    @property
118
    def selections(self):
119
        selections = []
120
        for item in self.selected:
121
            selections.append(str(item))
122
        for item in self.unselected:
123
            selections.append("!"+str(item))
124
        for varname in self.variables.keys():
125
            selections.append(varname+"="+self.variables.get(varname))
126
        for rule, refinements in self.refine_rules.items():
127
            for prop, val in refinements:
128
                selections.append("{rule}.{property}={value}"
129
                                  .format(rule=rule, property=prop, value=val))
130
        return selections
131
132 2
    @selections.setter
133
    def selections(self, entries):
134 2
        for item in entries:
135 2
            self.apply_selection(item)
136
137 2
    def apply_selection(self, item):
138 2
        if "." in item:
139
            rule, refinement = item.split(".", 1)
140
            property_, value = refinement.split("=", 1)
141
            if property_ not in XCCDF_REFINABLE_PROPERTIES:
142
                msg = ("Property '{property_}' cannot be refined. "
143
                       "Rule properties that can be refined are {refinables}. "
144
                       "Fix refinement '{rule_id}.{property_}={value}' in profile '{profile}'."
145
                       .format(property_=property_, refinables=XCCDF_REFINABLE_PROPERTIES,
146
                               rule_id=rule, value=value, profile=self.id_)
147
                       )
148
                raise ValueError(msg)
149
            self.refine_rules[rule].append((property_, value))
150 2
        elif "=" in item:
151 2
            varname, value = item.split("=", 1)
152 2
            self.variables[varname] = value
153 2
        elif item.startswith("!"):
154
            self.unselected.append(item[1:])
155
        else:
156 2
            self.selected.append(item)
157
158 2
    def _subtract_refinements(self, extended_refinements):
159
        """
160
        Given a dict of rule refinements from the extended profile,
161
        "undo" every refinement prefixed with '!' in this profile.
162
        """
163
        for rule, refinements in list(self.refine_rules.items()):
164
            if rule.startswith("!"):
165
                for prop, val in refinements:
166
                    extended_refinements[rule[1:]].remove((prop, val))
167
                del self.refine_rules[rule]
168
        return extended_refinements
169
170 2
    def update_with(self, rhs):
171
        extended_selects = set(rhs.selected)
172
        extra_selections = extended_selects.difference(set(self.selected))
173
        self.selected.extend(list(extra_selections))
174
175
        updated_variables = dict(rhs.variables)
176
        updated_variables.update(self.variables)
177
        self.variables = updated_variables
178
179
        extended_refinements = deepcopy(rhs.refine_rules)
180
        updated_refinements = self._subtract_refinements(extended_refinements)
181
        updated_refinements.update(self.refine_rules)
182
        self.refine_rules = updated_refinements
183
184
185 2
class XCCDFEntity(object):
186
    """
187
    This class can load itself from a YAML with Jinja macros,
188
    and it can also save itself to YAML.
189
190
    It is supposed to work with the content in the project,
191
    when entities are defined in the benchmark tree,
192
    and they are compiled into flat YAMLs to the build directory.
193
    """
194 2
    KEYS = dict(
195
            id_=lambda: "",
196
            definition_location=lambda: "",
197
    )
198
199 2
    MANDATORY_KEYS = set()
200
201 2
    GENERIC_FILENAME = ""
202 2
    ID_LABEL = "id"
203
204 2
    def __init__(self, id_):
205 2
        super(XCCDFEntity, self).__init__()
206 2
        self._assign_defaults()
207 2
        self.id_ = id_
208
209 2
    def _assign_defaults(self):
210 2
        for key, default in self.KEYS.items():
211 2
            default_val = default()
212 2
            if isinstance(default_val, RuntimeError):
213
                default_val = None
214 2
            setattr(self, key, default_val)
215
216 2
    @classmethod
217
    def get_instance_from_full_dict(cls, data):
218
        """
219
        Given a defining dictionary, produce an instance
220
        by treating all dict elements as attributes.
221
222
        Extend this if you want tight control over the instance creation process.
223
        """
224 2
        entity = cls(data["id_"])
225 2
        for key, value in data.items():
226 2
            setattr(entity, key, value)
227 2
        return entity
228
229 2
    @classmethod
230
    def process_input_dict(cls, input_contents, env_yaml):
231
        """
232
        Take the contents of the definition as a dictionary, and
233
        add defaults or raise errors if a required member is not present.
234
235
        Extend this if you want to add, remove or alter the result
236
        that will constitute the new instance.
237
        """
238 2
        data = dict()
239
240 2
        for key, default in cls.KEYS.items():
241 2
            if key in input_contents:
242 2
                data[key] = input_contents[key]
243 2
                del input_contents[key]
244 2
                continue
245
246 2
            if key not in cls.MANDATORY_KEYS:
247 2
                data[key] = cls.KEYS[key]()
248
            else:
249
                msg = (
250
                    "Key '{key}' is mandatory for definition of '{class_name}'."
251
                    .format(key=key, class_name=cls.__name__))
252
                raise ValueError(msg)
253
254 2
        return data
255
256 2
    @classmethod
257 2
    def parse_yaml_into_processed_dict(cls, yaml_file, env_yaml=None):
258
        """
259
        Given yaml filename and environment info, produce a dictionary
260
        that defines the instance to be created.
261
        This wraps :meth:`process_input_dict` and it adds generic keys on the top:
262
263
        - `id_` as the entity ID that is deduced either from thefilename,
264
          or from the parent directory name.
265
        - `definition_location` as the original location whenre the entity got defined.
266
        """
267 2
        file_basename = os.path.basename(yaml_file)
268 2
        entity_id = file_basename.split(".")[0]
269 2
        if file_basename == cls.GENERIC_FILENAME:
270 2
            entity_id = os.path.basename(os.path.dirname(yaml_file))
271
272 2
        if env_yaml:
273 2
            env_yaml[cls.ID_LABEL] = entity_id
274 2
        yaml_data = open_and_macro_expand(yaml_file, env_yaml)
275
276 2
        try:
277 2
            processed_data = cls.process_input_dict(yaml_data, env_yaml)
278
        except ValueError as exc:
279
            msg = (
280
                "Error processing {yaml_file}: {exc}"
281
                .format(yaml_file=yaml_file, exc=str(exc)))
282
            raise ValueError(msg)
283
284 2
        if yaml_data:
285
            msg = (
286
                "Unparsed YAML data in '{yaml_file}': {keys}"
287
                .format(yaml_file=yaml_file, keys=list(yaml_data.keys())))
288
            raise RuntimeError(msg)
289
290 2
        if not processed_data.get("definition_location", ""):
291 2
            processed_data["definition_location"] = yaml_file
292
293 2
        processed_data["id_"] = entity_id
294
295 2
        return processed_data
296
297 2
    @classmethod
298 2
    def from_yaml(cls, yaml_file, env_yaml=None):
299 2
        yaml_file = os.path.normpath(yaml_file)
300
301 2
        local_env_yaml = None
302 2
        if env_yaml:
303 2
            local_env_yaml = dict()
304 2
            local_env_yaml.update(env_yaml)
305
306 2
        data_dict = cls.parse_yaml_into_processed_dict(yaml_file, local_env_yaml)
307
308 2
        result = cls.get_instance_from_full_dict(data_dict)
309
310 2
        return result
311
312 2
    def represent_as_dict(self):
313
        """
314
        Produce a dict representation of the class.
315
316
        Extend this method if you need the representation to be different from the object.
317
        """
318 2
        data = dict()
319 2
        for key in self.KEYS:
320 2
            value = getattr(self, key)
321 2
            if value or True:
322 2
                data[key] = getattr(self, key)
323 2
        del data["id_"]
324 2
        return data
325
326 2
    def dump_yaml(self, file_name, documentation_complete=True):
327
        to_dump = self.represent_as_dict()
328
        to_dump["documentation_complete"] = documentation_complete
329
        with open(file_name, "w+") as f:
330
            yaml.dump(to_dump, f, indent=4, sort_keys=False)
331
332 2
    def to_xml_element(self):
333
        raise NotImplementedError()
334
335
336 2
class Profile(XCCDFEntity, SelectionHandler):
337
    """Represents XCCDF profile
338
    """
339 2
    KEYS = {
340
        "title": lambda: "",
341
        "description": lambda: "",
342
        "extends": lambda: "",
343
        "metadata": lambda: None,
344
        "reference": lambda: None,
345
        "selections": lambda: list(),
346
        "platforms": lambda: set(),
347
        "cpe_names": lambda: set(),
348
        "platform": lambda: None,
349
        "filter_rules": lambda: "",
350
        ** XCCDFEntity.KEYS
351
    }
352
353 2
    MANDATORY_KEYS = {
354
        "title",
355
        "description",
356
        "selections",
357
    }
358
359 2
    @classmethod
360
    def process_input_dict(cls, input_contents, env_yaml):
361 2
        input_contents = super(Profile, cls).process_input_dict(input_contents, env_yaml)
362
363 2
        platform = input_contents.get("platform")
364 2
        if platform is not None:
365
            input_contents["platforms"].add(platform)
366
367 2
        if env_yaml:
368 2
            for platform in input_contents["platforms"]:
369
                try:
370
                    new_cpe_name = env_yaml["product_cpes"].get_cpe_name(platform)
371
                    input_contents["cpe_names"].add(new_cpe_name)
372
                except CPEDoesNotExist:
373
                    msg = (
374
                        "Unsupported platform '{platform}' in a profile."
375
                        .format(platform=platform))
376
                    raise CPEDoesNotExist(msg)
377
378 2
        return input_contents
379
380 2
    @property
381
    def rule_filter(self):
382
        if self.filter_rules:
383
            return rule_filter_from_def(self.filter_rules)
384
        else:
385
            return noop_rule_filterfunc
386
387 2
    def to_xml_element(self):
388
        element = ET.Element('Profile')
389
        element.set("id", self.id_)
390
        if self.extends:
391
            element.set("extends", self.extends)
392
        title = add_sub_element(element, "title", self.title)
393
        title.set("override", "true")
394
        desc = add_sub_element(element, "description", self.description)
395
        desc.set("override", "true")
396
397
        if self.reference:
398
            add_sub_element(element, "reference", escape(self.reference))
399
400
        for cpe_name in self.cpe_names:
401
            plat = ET.SubElement(element, "platform")
402
            plat.set("idref", cpe_name)
403
404
        for selection in self.selected:
405
            select = ET.Element("select")
406
            select.set("idref", selection)
407
            select.set("selected", "true")
408
            element.append(select)
409
410
        for selection in self.unselected:
411
            unselect = ET.Element("select")
412
            unselect.set("idref", selection)
413
            unselect.set("selected", "false")
414
            element.append(unselect)
415
416
        for value_id, selector in self.variables.items():
417
            refine_value = ET.Element("refine-value")
418
            refine_value.set("idref", value_id)
419
            refine_value.set("selector", selector)
420
            element.append(refine_value)
421
422
        for refined_rule, refinement_list in self.refine_rules.items():
423
            refine_rule = ET.Element("refine-rule")
424
            refine_rule.set("idref", refined_rule)
425
            for refinement in refinement_list:
426
                refine_rule.set(refinement[0], refinement[1])
427
            element.append(refine_rule)
428
429
        return element
430
431 2
    def get_rule_selectors(self):
432 2
        return self.selected + self.unselected
433
434 2
    def get_variable_selectors(self):
435 2
        return self.variables
436
437 2
    def validate_refine_rules(self, rules):
438
        existing_rule_ids = [r.id_ for r in rules]
439
        for refine_rule, refinement_list in self.refine_rules.items():
440
            # Take first refinement to ilustrate where the error is
441
            # all refinements in list are invalid, so it doesn't really matter
442
            a_refinement = refinement_list[0]
443
444
            if refine_rule not in existing_rule_ids:
445
                msg = (
446
                    "You are trying to refine a rule that doesn't exist. "
447
                    "Rule '{rule_id}' was not found in the benchmark. "
448
                    "Please check all rule refinements for rule: '{rule_id}', for example: "
449
                    "- {rule_id}.{property_}={value}' in profile {profile_id}."
450
                    .format(rule_id=refine_rule, profile_id=self.id_,
451
                            property_=a_refinement[0], value=a_refinement[1])
452
                    )
453
                raise ValueError(msg)
454
455
            if refine_rule not in self.get_rule_selectors():
456
                msg = ("- {rule_id}.{property_}={value}' in profile '{profile_id}' is refining "
457
                       "a rule that is not selected by it. The refinement will not have any "
458
                       "noticeable effect. Either select the rule or remove the rule refinement."
459
                       .format(rule_id=refine_rule, property_=a_refinement[0],
460
                               value=a_refinement[1], profile_id=self.id_)
461
                       )
462
                raise ValueError(msg)
463
464 2
    def validate_variables(self, variables):
465
        variables_by_id = dict()
466
        for var in variables:
467
            variables_by_id[var.id_] = var
468
469
        for var_id, our_val in self.variables.items():
470
            if var_id not in variables_by_id:
471
                all_vars_list = [" - %s" % v for v in variables_by_id.keys()]
472
                msg = (
473
                    "Value '{var_id}' in profile '{profile_name}' is not known. "
474
                    "We know only variables:\n{var_names}"
475
                    .format(
476
                        var_id=var_id, profile_name=self.id_,
477
                        var_names="\n".join(sorted(all_vars_list)))
478
                )
479
                raise ValueError(msg)
480
481
            allowed_selectors = [str(s) for s in variables_by_id[var_id].options.keys()]
482
            if our_val not in allowed_selectors:
483
                msg = (
484
                    "Value '{var_id}' in profile '{profile_name}' "
485
                    "uses the selector '{our_val}'. "
486
                    "This is not possible, as only selectors {all_selectors} are available. "
487
                    "Either change the selector used in the profile, or "
488
                    "add the selector-value pair to the variable definition."
489
                    .format(
490
                        var_id=var_id, profile_name=self.id_, our_val=our_val,
491
                        all_selectors=allowed_selectors,
492
                    )
493
                )
494
                raise ValueError(msg)
495
496 2
    def validate_rules(self, rules, groups):
497
        existing_rule_ids = [r.id_ for r in rules]
498
        rule_selectors = self.get_rule_selectors()
499
        for id_ in rule_selectors:
500
            if id_ in groups:
501
                msg = (
502
                    "You have selected a group '{group_id}' instead of a "
503
                    "rule. Groups have no effect in the profile and are not "
504
                    "allowed to be selected. Please remove '{group_id}' "
505
                    "from profile '{profile_id}' before proceeding."
506
                    .format(group_id=id_, profile_id=self.id_)
507
                )
508
                raise ValueError(msg)
509
            if id_ not in existing_rule_ids:
510
                msg = (
511
                    "Rule '{rule_id}' was not found in the benchmark. Please "
512
                    "remove rule '{rule_id}' from profile '{profile_id}' "
513
                    "before proceeding."
514
                    .format(rule_id=id_, profile_id=self.id_)
515
                )
516
                raise ValueError(msg)
517
518 2
    def __sub__(self, other):
519
        profile = Profile(self.id_)
520
        profile.title = self.title
521
        profile.description = self.description
522
        profile.extends = self.extends
523
        profile.platforms = self.platforms
524
        profile.platform = self.platform
525
        profile.selected = list(set(self.selected) - set(other.selected))
526
        profile.selected.sort()
527
        profile.unselected = list(set(self.unselected) - set(other.unselected))
528
        profile.variables = dict ((k, v) for (k, v) in self.variables.items()
529
                             if k not in other.variables or v != other.variables[k])
530
        return profile
531
532
533 2
class ResolvableProfile(Profile):
534 2
    def __init__(self, * args, ** kwargs):
535
        super(ResolvableProfile, self).__init__(* args, ** kwargs)
536
        self.resolved = False
537
538 2
    def _controls_ids_to_controls(self, controls_manager, policy_id, control_id_list):
539
        items = [controls_manager.get_control(policy_id, cid) for cid in control_id_list]
540
        return items
541
542 2
    def resolve_controls(self, controls_manager):
543
        pass
544
545 2
    def extend_by(self, extended_profile):
546
        self.update_with(extended_profile)
547
548 2
    def resolve_selections_with_rules(self, rules_by_id):
549
        selections = set()
550
        for rid in self.selected:
551
            if rid not in rules_by_id:
552
                continue
553
            rule = rules_by_id[rid]
554
            if not self.rule_filter(rule):
555
                continue
556
            selections.add(rid)
557
        self.selected = list(selections)
558
559 2
    def resolve(self, all_profiles, rules_by_id, controls_manager=None):
560
        if self.resolved:
561
            return
562
563
        if controls_manager:
564
            self.resolve_controls(controls_manager)
565
566
        self.resolve_selections_with_rules(rules_by_id)
567
568
        if self.extends:
569
            if self.extends not in all_profiles:
570
                msg = (
571
                    "Profile {name} extends profile {extended}, but "
572
                    "only profiles {known_profiles} are available for resolution."
573
                    .format(name=self.id_, extended=self.extends,
574
                            known_profiles=list(all_profiles.keys())))
575
                raise RuntimeError(msg)
576
            extended_profile = all_profiles[self.extends]
577
            extended_profile.resolve(all_profiles, rules_by_id, controls_manager)
578
579
            self.extend_by(extended_profile)
580
581
        self.selected = [s for s in set(self.selected) if s not in self.unselected]
582
583
        self.unselected = []
584
        self.extends = None
585
586
        self.selected = sorted(self.selected)
587
588
        for rid in self.selected:
589
            if rid not in rules_by_id:
590
                msg = (
591
                    "Rule {rid} is selected by {profile}, but the rule is not available. "
592
                    "This may be caused by a discrepancy of prodtypes."
593
                    .format(rid=rid, profile=self.id_))
594
                raise ValueError(msg)
595
596
        self.resolved = True
597
598
599 2
class ProfileWithInlinePolicies(ResolvableProfile):
600 2
    def __init__(self, * args, ** kwargs):
601
        super(ProfileWithInlinePolicies, self).__init__(* args, ** kwargs)
602
        self.controls_by_policy = defaultdict(list)
603
604 2
    def apply_selection(self, item):
605
        # ":" is the delimiter for controls but not when the item is a variable
606
        if ":" in item and "=" not in item:
607
            policy_id, control_id = item.split(":", 1)
608
            self.controls_by_policy[policy_id].append(control_id)
609
        else:
610
            super(ProfileWithInlinePolicies, self).apply_selection(item)
611
612 2
    def _process_controls_ids_into_controls(self, controls_manager, policy_id, controls_ids):
613
        controls = []
614
        for cid in controls_ids:
615
            if not cid.startswith("all"):
616
                controls.extend(
617
                    self._controls_ids_to_controls(controls_manager, policy_id, [cid]))
618
            elif ":" in cid:
619
                _, level_id = cid.split(":", 1)
620
                controls.extend(
621
                    controls_manager.get_all_controls_of_level(policy_id, level_id))
622
            else:
623
                controls.extend(
624
                    controls_manager.get_all_controls(policy_id))
625
        return controls
626
627 2
    def resolve_controls(self, controls_manager):
628
        for policy_id, controls_ids in self.controls_by_policy.items():
629
            controls = self._process_controls_ids_into_controls(
630
                controls_manager, policy_id, controls_ids)
631
632
            for c in controls:
633
                self.update_with(c)
634
635
636 2
class Value(XCCDFEntity):
637
    """Represents XCCDF Value
638
    """
639 2
    KEYS = {
640
        "title": lambda: "",
641
        "description": lambda: "",
642
        "type": lambda: "",
643
        "operator": lambda: "equals",
644
        "interactive": lambda: False,
645
        "options": lambda: dict(),
646
        "warnings": lambda: list(),
647
        ** XCCDFEntity.KEYS
648
    }
649
650 2
    MANDATORY_KEYS = {
651
        "title",
652
        "description",
653
        "type",
654
    }
655
656 2
    @classmethod
657
    def process_input_dict(cls, input_contents, env_yaml):
658 2
        input_contents["interactive"] = (
659
            input_contents.get("interactive", "false").lower() == "true")
660
661 2
        data = super(Value, cls).process_input_dict(input_contents, env_yaml)
662
663 2
        possible_operators = ["equals", "not equal", "greater than",
664
                              "less than", "greater than or equal",
665
                              "less than or equal", "pattern match"]
666
667 2
        if data["operator"] not in possible_operators:
668
            raise ValueError(
669
                "Found an invalid operator value '%s'. "
670
                "Expected one of: %s"
671
                % (data["operator"], ", ".join(possible_operators))
672
            )
673
674 2
        return data
675
676 2
    @classmethod
677 2
    def from_yaml(cls, yaml_file, env_yaml=None):
678 2
        value = super(Value, cls).from_yaml(yaml_file, env_yaml)
679
680 2
        check_warnings(value)
681
682 2
        return value
683
684 2
    def to_xml_element(self):
685
        value = ET.Element('Value')
686
        value.set('id', self.id_)
687
        value.set('type', self.type)
688
        if self.operator != "equals":  # equals is the default
689
            value.set('operator', self.operator)
690
        if self.interactive:  # False is the default
691
            value.set('interactive', 'true')
692
        title = ET.SubElement(value, 'title')
693
        title.text = self.title
694
        add_sub_element(value, 'description', self.description)
695
        add_warning_elements(value, self.warnings)
696
697
        for selector, option in self.options.items():
698
            # do not confuse Value with big V with value with small v
699
            # value is child element of Value
700
            value_small = ET.SubElement(value, 'value')
701
            # by XCCDF spec, default value is value without selector
702
            if selector != "default":
703
                value_small.set('selector', str(selector))
704
            value_small.text = str(option)
705
706
        return value
707
708 2
    def to_file(self, file_name):
709
        root = self.to_xml_element()
710
        tree = ET.ElementTree(root)
711
        tree.write(file_name)
712
713
714 2
class Benchmark(XCCDFEntity):
715
    """Represents XCCDF Benchmark
716
    """
717 2
    KEYS = {
718
        "title": lambda: "",
719
        "status": lambda: "",
720
        "description": lambda: "",
721
        "notice_id": lambda: "",
722
        "notice_description": lambda: "",
723
        "front_matter": lambda: "",
724
        "rear_matter": lambda: "",
725
        "cpes": lambda: list(),
726
        "version": lambda: "0",
727
        "profiles": lambda: list(),
728
        "values": lambda: dict(),
729
        "groups": lambda: dict(),
730
        "rules": lambda: dict(),
731
        "product_cpe_names": lambda: list(),
732
        ** XCCDFEntity.KEYS
733
    }
734
735 2
    MANDATORY_KEYS = {
736
        "title",
737
        "status",
738
        "description",
739
        "front_matter",
740
        "rear_matter",
741
        "version",
742
    }
743
744 2
    GENERIC_FILENAME = "benchmark.yml"
745
746 2
    def add_value_needed_for_ocil_clauses(self):
747
        conditional_clause = Value("conditional_clause")
748
        conditional_clause.title = "A conditional clause for check statements."
749
        conditional_clause.description = conditional_clause.title
750
        conditional_clause.type = "string"
751
        conditional_clause.options = {"": "This is a placeholder"}
752
753
        self.add_value(conditional_clause)
754
755 2 View Code Duplication
    def load_entities(self, rules_by_id, values_by_id, groups_by_id):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
756
        for rid, val in self.rules.items():
757
            if not val:
758
                self.rules[rid] = rules_by_id[rid]
759
760
        for vid, val in self.values.items():
761
            if not val:
762
                self.values[vid] = values_by_id[vid]
763
764
        for gid, val in self.groups.items():
765
            if not val:
766
                self.groups[gid] = groups_by_id[gid]
767
768 2
    @classmethod
769
    def process_input_dict(cls, input_contents, env_yaml):
770
        input_contents["front_matter"] = input_contents["front-matter"]
771
        del input_contents["front-matter"]
772
        input_contents["rear_matter"] = input_contents["rear-matter"]
773
        del input_contents["rear-matter"]
774
775
        data = super(Benchmark, cls).process_input_dict(input_contents, env_yaml)
776
777
        notice_contents = required_key(input_contents, "notice")
778
        del input_contents["notice"]
779
780
        data["notice_id"] = required_key(notice_contents, "id")
781
        del notice_contents["id"]
782
783
        data["notice_description"] = required_key(notice_contents, "description")
784
        del notice_contents["description"]
785
786
        data["version"] = str(data["version"])
787
788
        return data
789
790 2
    def represent_as_dict(self):
791
        data = super(Benchmark, cls).represent_as_dict()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable cls does not seem to be defined.
Loading history...
792
        data["rear-matter"] = data["rear_matter"]
793
        del data["rear_matter"]
794
795
        data["front-matter"] = data["front_matter"]
796
        del data["front_matter"]
797
        return data
798
799 2
    @classmethod
800 2
    def from_yaml(cls, yaml_file, env_yaml=None, benchmark_id="product-name"):
801
        benchmark = super(Benchmark, cls).from_yaml(yaml_file, env_yaml)
802
        if env_yaml:
803
            benchmark.product_cpe_names = env_yaml["product_cpes"].get_product_cpe_names()
804
805
        benchmark.id_ = benchmark_id
806
807
        return benchmark
808
809 2
    def add_profiles_from_dir(self, dir_, env_yaml):
810
        for dir_item in sorted(os.listdir(dir_)):
811
            dir_item_path = os.path.join(dir_, dir_item)
812
            if not os.path.isfile(dir_item_path):
813
                continue
814
815
            _, ext = os.path.splitext(os.path.basename(dir_item_path))
816
            if ext != '.profile':
817
                sys.stderr.write(
818
                    "Encountered file '%s' while looking for profiles, "
819
                    "extension '%s' is unknown. Skipping..\n"
820
                    % (dir_item, ext)
821
                )
822
                continue
823
824
            try:
825
                new_profile = ProfileWithInlinePolicies.from_yaml(dir_item_path, env_yaml)
826
            except DocumentationNotComplete:
827
                continue
828
            except Exception as exc:
829
                msg = ("Error building profile from '{fname}': '{error}'"
830
                       .format(fname=dir_item_path, error=str(exc)))
831
                raise RuntimeError(msg)
832
            if new_profile is None:
833
                continue
834
835
            self.profiles.append(new_profile)
836
837 2
    def to_xml_element(self):
838
        root = ET.Element('Benchmark')
839
        root.set('xmlns:xsi', 'http://www.w3.org/2001/XMLSchema-instance')
840
        root.set('xmlns:xhtml', 'http://www.w3.org/1999/xhtml')
841
        root.set('xmlns:dc', 'http://purl.org/dc/elements/1.1/')
842
        root.set('id', 'product-name')
843
        root.set('xsi:schemaLocation',
844
                 'http://checklists.nist.gov/xccdf/1.1 xccdf-1.1.4.xsd')
845
        root.set('style', 'SCAP_1.1')
846
        root.set('resolved', 'false')
847
        root.set('xml:lang', 'en-US')
848
        status = ET.SubElement(root, 'status')
849
        status.set('date', datetime.date.today().strftime("%Y-%m-%d"))
850
        status.text = self.status
851
        add_sub_element(root, "title", self.title)
852
        add_sub_element(root, "description", self.description)
853
        notice = add_sub_element(root, "notice", self.notice_description)
854
        notice.set('id', self.notice_id)
855
        add_sub_element(root, "front-matter", self.front_matter)
856
        add_sub_element(root, "rear-matter", self.rear_matter)
857
858
        # The Benchmark applicability is determined by the CPEs
859
        # defined in the product.yml
860
        for cpe_name in self.product_cpe_names:
861
            plat = ET.SubElement(root, "platform")
862
            plat.set("idref", cpe_name)
863
864
        version = ET.SubElement(root, 'version')
865
        version.text = self.version
866
        ET.SubElement(root, "metadata")
867
868
        for profile in self.profiles:
869
            root.append(profile.to_xml_element())
870
871
        for value in self.values.values():
872
            root.append(value.to_xml_element())
873
874
        groups_in_bench = list(self.groups.keys())
875
        priority_order = ["system", "services"]
876
        groups_in_bench = reorder_according_to_ordering(groups_in_bench, priority_order)
877
878
        # Make system group the first, followed by services group
879
        for group_id in groups_in_bench:
880
            group = self.groups.get(group_id)
881
            # Products using application benchmark don't have system or services group
882
            if group is not None:
883
                root.append(group.to_xml_element())
884
885
        for rule in self.rules.values():
886
            root.append(rule.to_xml_element())
887
888
        return root
889
890 2
    def to_file(self, file_name, ):
891
        root = self.to_xml_element()
892
        tree = ET.ElementTree(root)
893
        tree.write(file_name)
894
895 2
    def add_value(self, value):
896
        if value is None:
897
            return
898
        self.values[value.id_] = value
899
900
    # The benchmark is also considered a group, so this function signature needs to match
901
    # Group()'s add_group()
902 2
    def add_group(self, group, env_yaml=None):
903
        if group is None:
904
            return
905
        self.groups[group.id_] = group
906
907 2
    def add_rule(self, rule):
908
        if rule is None:
909
            return
910
        self.rules[rule.id_] = rule
911
912 2
    def to_xccdf(self):
913
        """We can easily extend this script to generate a valid XCCDF instead
914
        of SSG SHORTHAND.
915
        """
916
        raise NotImplementedError
917
918 2
    def __str__(self):
919
        return self.id_
920
921
922 2
class Group(XCCDFEntity):
923
    """Represents XCCDF Group
924
    """
925 2
    ATTRIBUTES_TO_PASS_ON = (
926
        "platforms",
927
    )
928
929 2
    GENERIC_FILENAME = "group.yml"
930
931 2
    KEYS = {
932
        "prodtype": lambda: "all",
933
        "title": lambda: "",
934
        "description": lambda: "",
935
        "warnings": lambda: list(),
936
        "requires": lambda: list(),
937
        "conflicts": lambda: list(),
938
        "values": lambda: dict(),
939
        "groups": lambda: dict(),
940
        "rules": lambda: dict(),
941
        "platform": lambda: "",
942
        "platforms": lambda: set(),
943
        "cpe_names": lambda: set(),
944
        ** XCCDFEntity.KEYS
945
    }
946
947 2
    MANDATORY_KEYS = {
948
        "title",
949
        "status",
950
        "description",
951
        "front_matter",
952
        "rear_matter",
953
        "version",
954
    }
955
956 2
    @classmethod
957
    def process_input_dict(cls, input_contents, env_yaml):
958
        data = super(Group, cls).process_input_dict(input_contents, env_yaml)
959
        if data["rules"]:
960
            rule_ids = data["rules"]
961
            data["rules"] = {rid: None for rid in rule_ids}
962
963
        if data["groups"]:
964
            group_ids = data["groups"]
965
            data["groups"] = {gid: None for gid in group_ids}
966
967
        if data["values"]:
968
            value_ids = data["values"]
969
            data["values"] = {vid: None for vid in value_ids}
970
971
        if data["platform"]:
972
            data["platforms"].add(data["platform"])
973
        return data
974
975 2 View Code Duplication
    def load_entities(self, rules_by_id, values_by_id, groups_by_id):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
976
        for rid, val in self.rules.items():
977
            if not val:
978
                self.rules[rid] = rules_by_id[rid]
979
980
        for vid, val in self.values.items():
981
            if not val:
982
                self.values[vid] = values_by_id[vid]
983
984
        for gid, val in self.groups.items():
985
            if not val:
986
                self.groups[gid] = groups_by_id[gid]
987
988 2
    def represent_as_dict(self):
989
        yaml_contents = super(Group, self).represent_as_dict()
990
991
        if self.rules:
992
            yaml_contents["rules"] = sorted(list(self.rules.keys()))
993
        if self.groups:
994
            yaml_contents["groups"] = sorted(list(self.groups.keys()))
995
        if self.values:
996
            yaml_contents["values"] = sorted(list(self.values.keys()))
997
998
        return yaml_contents
999
1000 2
    def validate_prodtype(self, yaml_file):
1001
        for ptype in self.prodtype.split(","):
1002
            if ptype.strip() != ptype:
1003
                msg = (
1004
                    "Comma-separated '{prodtype}' prodtype "
1005
                    "in {yaml_file} contains whitespace."
1006
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
1007
                raise ValueError(msg)
1008
1009 2
    def to_xml_element(self):
1010
        group = ET.Element('Group')
1011
        group.set('id', self.id_)
1012
        if self.prodtype != "all":
1013
            group.set("prodtype", self.prodtype)
1014
        title = ET.SubElement(group, 'title')
1015
        title.text = self.title
1016
        add_sub_element(group, 'description', self.description)
1017
        add_warning_elements(group, self.warnings)
1018
        add_nondata_subelements(group, "requires", "id", self.requires)
1019
        add_nondata_subelements(group, "conflicts", "id", self.conflicts)
1020
1021
        for cpe_name in self.cpe_names:
1022
            platform_el = ET.SubElement(group, "platform")
1023
            platform_el.set("idref", cpe_name)
1024
1025
        for _value in self.values.values():
1026
            group.append(_value.to_xml_element())
1027
1028
        # Rules that install or remove packages affect remediation
1029
        # of other rules.
1030
        # When packages installed/removed rules come first:
1031
        # The Rules are ordered in more logical way, and
1032
        # remediation order is natural, first the package is installed, then configured.
1033
        rules_in_group = list(self.rules.keys())
1034
        regex = (r'(package_.*_(installed|removed))|' +
1035
                 r'(service_.*_(enabled|disabled))|' +
1036
                 r'install_smartcard_packages$')
1037
        priority_order = ["installed", "install_smartcard_packages", "removed",
1038
                          "enabled", "disabled"]
1039
        rules_in_group = reorder_according_to_ordering(rules_in_group, priority_order, regex)
1040
1041
        # Add rules in priority order, first all packages installed, then removed,
1042
        # followed by services enabled, then disabled
1043
        for rule_id in rules_in_group:
1044
            group.append(self.rules.get(rule_id).to_xml_element())
1045
1046
        # Add the sub groups after any current level group rules.
1047
        # As package installed/removed and service enabled/disabled rules are usuallly in
1048
        # top level group, this ensures groups that further configure a package or service
1049
        # are after rules that install or remove it.
1050
        groups_in_group = list(self.groups.keys())
1051
        priority_order = [
1052
            # Make sure rpm_verify_(hashes|permissions|ownership) are run before any other rule.
1053
            # Due to conflicts between rules rpm_verify_* rules and any rule that configures
1054
            # stricter settings, like file_permissions_grub2_cfg and sudo_dedicated_group,
1055
            # the rules deviating from the system default should be evaluated later.
1056
            # So that in the end the system has contents, permissions and ownership reset, and
1057
            # any deviations or stricter settings are applied by the rules in the profile.
1058
            "software", "integrity", "integrity-software", "rpm_verification",
1059
1060
            # The account group has to precede audit group because
1061
            # the rule package_screen_installed is desired to be executed before the rule
1062
            # audit_rules_privileged_commands, othervise the rule
1063
            # does not catch newly installed screen binary during remediation
1064
            # and report fail
1065
            "accounts", "auditing",
1066
1067
1068
            # The FIPS group should come before Crypto,
1069
            # if we want to set a different (stricter) Crypto Policy than FIPS.
1070
            "fips", "crypto",
1071
1072
            # The firewalld_activation must come before ruleset_modifications, othervise
1073
            # remediations for ruleset_modifications won't work
1074
            "firewalld_activation", "ruleset_modifications",
1075
1076
            # Rules from group disabling_ipv6 must precede rules from configuring_ipv6,
1077
            # otherwise the remediation prints error although it is successful
1078
            "disabling_ipv6", "configuring_ipv6"
1079
        ]
1080
        groups_in_group = reorder_according_to_ordering(groups_in_group, priority_order)
1081
        for group_id in groups_in_group:
1082
            _group = self.groups[group_id]
1083
            group.append(_group.to_xml_element())
1084
1085
        return group
1086
1087 2
    def to_file(self, file_name):
1088
        root = self.to_xml_element()
1089
        tree = ET.ElementTree(root)
1090
        tree.write(file_name)
1091
1092 2
    def add_value(self, value):
1093
        if value is None:
1094
            return
1095
        self.values[value.id_] = value
1096
1097 2
    def add_group(self, group, env_yaml=None):
1098
        if group is None:
1099
            return
1100
        if self.platforms and not group.platforms:
1101
            group.platforms = self.platforms
1102
        self.groups[group.id_] = group
1103
        self._pass_our_properties_on_to(group)
1104
1105
        # Once the group has inherited properties, update cpe_names
1106
        if env_yaml:
1107
            for platform in group.platforms:
1108
                try:
1109
                    group.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform))
1110
                except CPEDoesNotExist:
1111
                    print("Unsupported platform '%s' in group '%s'." % (platform, group.id_))
1112
                    raise
1113
1114 2
    def _pass_our_properties_on_to(self, obj):
1115
        for attr in self.ATTRIBUTES_TO_PASS_ON:
1116
            if hasattr(obj, attr) and getattr(obj, attr) is None:
1117
                setattr(obj, attr, getattr(self, attr))
1118
1119 2
    def add_rule(self, rule, env_yaml=None):
1120
        if rule is None:
1121
            return
1122
        if self.platforms and not rule.platforms:
1123
            rule.platforms = self.platforms
1124
        self.rules[rule.id_] = rule
1125
        self._pass_our_properties_on_to(rule)
1126
1127
        # Once the rule has inherited properties, update cpe_names
1128
        if env_yaml:
1129
            for platform in rule.platforms:
1130
                try:
1131
                    rule.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform))
1132
                except CPEDoesNotExist:
1133
                    print("Unsupported platform '%s' in rule '%s'." % (platform, rule.id_))
1134
                    raise
1135
1136 2
    def __str__(self):
1137
        return self.id_
1138
1139
1140 2
def noop_rule_filterfunc(rule):
1141
    return True
1142
1143 2
def rule_filter_from_def(filterdef):
1144
    if filterdef is None or filterdef == "":
1145
        return noop_rule_filterfunc
1146
1147
    def filterfunc(rule):
1148
        # Remove globals for security and only expose
1149
        # variables relevant to the rule
1150
        return eval(filterdef, {"__builtins__": None}, rule.__dict__)
1151
    return filterfunc
1152
1153
1154 2
class Rule(XCCDFEntity):
1155
    """Represents XCCDF Rule
1156
    """
1157 2
    KEYS = {
1158
        "prodtype": lambda: "all",
1159
        "title": lambda: "",
1160
        "description": lambda: "",
1161
        "rationale": lambda: "",
1162
        "severity": lambda: "",
1163
        "references": lambda: dict(),
1164
        "identifiers": lambda: dict(),
1165
        "ocil_clause": lambda: None,
1166
        "ocil": lambda: None,
1167
        "oval_external_content": lambda: None,
1168
        "warnings": lambda: list(),
1169
        "conflicts": lambda: list(),
1170
        "requires": lambda: list(),
1171
        "platform": lambda: None,
1172
        "platforms": lambda: set(),
1173
        "inherited_platforms": lambda: list(),
1174
        "template": lambda: None,
1175
        "cpe_names": lambda: set(),
1176
        ** XCCDFEntity.KEYS
1177
    }
1178
1179 2
    MANDATORY_KEYS = {
1180
        "title",
1181
        "description",
1182
        "rationale",
1183
        "severity",
1184
    }
1185
1186 2
    GENERIC_FILENAME = "rule.yml"
1187 2
    ID_LABEL = "rule_id"
1188
1189 2
    PRODUCT_REFERENCES = ("stigid", "cis",)
1190 2
    GLOBAL_REFERENCES = ("srg", "vmmsrg", "disa", "cis-csc",)
1191
1192 2
    def __init__(self, id_):
1193 2
        super(Rule, self).__init__(id_)
1194 2
        self.sce_metadata = None
1195
1196 2
    def __deepcopy__(self, memo):
1197
        cls = self.__class__
1198
        result = cls.__new__(cls)
1199
        memo[id(self)] = result
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable id does not seem to be defined.
Loading history...
1200
        for k, v in self.__dict__.items():
1201
            # These are difficult to deep copy, so let's just re-use them.
1202
            if k != "template" and k != "local_env_yaml":
1203
                setattr(result, k, deepcopy(v, memo))
1204
            else:
1205
                setattr(result, k, v)
1206
        return result
1207
1208 2
    @classmethod
1209 2
    def from_yaml(cls, yaml_file, env_yaml=None, sce_metadata=None):
1210 2
        rule = super(Rule, cls).from_yaml(yaml_file, env_yaml)
1211
1212
        # platforms are read as list from the yaml file
1213
        # we need them to convert to set again
1214 2
        rule.platforms = set(rule.platforms)
1215
1216
        # rule.platforms.update(set(rule.inherited_platforms))
1217
1218 2
        check_warnings(rule)
1219
1220
        # ensure that content of rule.platform is in rule.platforms as
1221
        # well
1222 2
        if rule.platform is not None:
1223 2
            rule.platforms.add(rule.platform)
1224
1225
        # Convert the platform names to CPE names
1226
        # But only do it if an env_yaml was specified (otherwise there would be no product CPEs
1227
        # to lookup), and the rule's prodtype matches the product being built
1228 2
        if (
1229
                env_yaml and env_yaml["product"] in parse_prodtype(rule.prodtype)
1230
                or env_yaml and rule.prodtype == "all"):
1231
            for platform in rule.platforms:
1232
                try:
1233
                    rule.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform))
1234
                except CPEDoesNotExist:
1235
                    print("Unsupported platform '%s' in rule '%s'." % (platform, rule.id_))
1236
                    raise
1237
1238 2
        if sce_metadata and rule.id_ in sce_metadata:
1239
            rule.sce_metadata = sce_metadata[rule.id_]
1240
            rule.sce_metadata["relative_path"] = os.path.join(
1241
                env_yaml["product"], "checks/sce", rule.sce_metadata['filename'])
1242
1243 2
        rule.validate_prodtype(yaml_file)
1244 2
        rule.validate_identifiers(yaml_file)
1245 2
        rule.validate_references(yaml_file)
1246 2
        return rule
1247
1248 2
    def _verify_stigid_format(self, product):
1249 2
        stig_id = self.references.get("stigid", None)
1250 2
        if not stig_id:
1251 2
            return
1252 2
        if "," in stig_id:
1253 2
            raise ValueError("Rules can not have multiple STIG IDs.")
1254
1255 2
    def _verify_disa_cci_format(self):
1256 2
        cci_id = self.references.get("disa", None)
1257 2
        if not cci_id:
1258 2
            return
1259
        cci_ex = re.compile(r'^CCI-[0-9]{6}$')
1260
        for cci in cci_id.split(","):
1261
            if not cci_ex.match(cci):
1262
                raise ValueError("CCI '{}' is in the wrong format! "
1263
                                 "Format should be similar to: "
1264
                                 "CCI-XXXXXX".format(cci))
1265
        self.references["disa"] = cci_id
1266
1267 2
    def normalize(self, product):
1268 2
        try:
1269 2
            self.make_refs_and_identifiers_product_specific(product)
1270 2
            self.make_template_product_specific(product)
1271 2
        except Exception as exc:
1272 2
            msg = (
1273
                "Error normalizing '{rule}': {msg}"
1274
                .format(rule=self.id_, msg=str(exc))
1275
            )
1276 2
            raise RuntimeError(msg)
1277
1278 2
    def _get_product_only_references(self):
1279 2
        product_references = dict()
1280
1281 2
        for ref in Rule.PRODUCT_REFERENCES:
1282 2
            start = "{0}@".format(ref)
1283 2
            for gref, gval in self.references.items():
1284 2
                if ref == gref or gref.startswith(start):
1285 2
                    product_references[gref] = gval
1286 2
        return product_references
1287
1288 2
    def make_template_product_specific(self, product):
1289 2
        product_suffix = "@{0}".format(product)
1290
1291 2
        if not self.template:
1292
            return
1293
1294 2
        not_specific_vars = self.template.get("vars", dict())
1295 2
        specific_vars = self._make_items_product_specific(
1296
            not_specific_vars, product_suffix, True)
1297 2
        self.template["vars"] = specific_vars
1298
1299 2
        not_specific_backends = self.template.get("backends", dict())
1300 2
        specific_backends = self._make_items_product_specific(
1301
            not_specific_backends, product_suffix, True)
1302 2
        self.template["backends"] = specific_backends
1303
1304 2
    def make_refs_and_identifiers_product_specific(self, product):
1305 2
        product_suffix = "@{0}".format(product)
1306
1307 2
        product_references = self._get_product_only_references()
1308 2
        general_references = self.references.copy()
1309 2
        for todel in product_references:
1310 2
            general_references.pop(todel)
1311 2
        for ref in Rule.PRODUCT_REFERENCES:
1312 2
            if ref in general_references:
1313
                msg = "Unexpected reference identifier ({0}) without "
1314
                msg += "product qualifier ({0}@{1}) while building rule "
1315
                msg += "{2}"
1316
                msg = msg.format(ref, product, self.id_)
1317
                raise ValueError(msg)
1318
1319 2
        to_set = dict(
1320
            identifiers=(self.identifiers, False),
1321
            general_references=(general_references, True),
1322
            product_references=(product_references, False),
1323
        )
1324 2
        for name, (dic, allow_overwrites) in to_set.items():
1325 2
            try:
1326 2
                new_items = self._make_items_product_specific(
1327
                    dic, product_suffix, allow_overwrites)
1328 2
            except ValueError as exc:
1329 2
                msg = (
1330
                    "Error processing {what} for rule '{rid}': {msg}"
1331
                    .format(what=name, rid=self.id_, msg=str(exc))
1332
                )
1333 2
                raise ValueError(msg)
1334 2
            dic.clear()
1335 2
            dic.update(new_items)
1336
1337 2
        self.references = general_references
1338 2
        self._verify_disa_cci_format()
1339 2
        self.references.update(product_references)
1340
1341 2
        self._verify_stigid_format(product)
1342
1343 2
    def _make_items_product_specific(self, items_dict, product_suffix, allow_overwrites=False):
1344 2
        new_items = dict()
1345 2
        for full_label, value in items_dict.items():
1346 2
            if "@" not in full_label and full_label not in new_items:
1347 2
                new_items[full_label] = value
1348 2
                continue
1349
1350 2
            label = full_label.split("@")[0]
1351
1352
            # this test should occur before matching product_suffix with the product qualifier
1353
            # present in the reference, so it catches problems even for products that are not
1354
            # being built at the moment
1355 2
            if label in Rule.GLOBAL_REFERENCES:
1356
                msg = (
1357
                    "You cannot use product-qualified for the '{item_u}' reference. "
1358
                    "Please remove the product-qualifier and merge values with the "
1359
                    "existing reference if there is any. Original line: {item_q}: {value_q}"
1360
                    .format(item_u=label, item_q=full_label, value_q=value)
1361
                )
1362
                raise ValueError(msg)
1363
1364 2
            if not full_label.endswith(product_suffix):
1365 2
                continue
1366
1367 2
            if label in items_dict and not allow_overwrites and value != items_dict[label]:
1368 2
                msg = (
1369
                    "There is a product-qualified '{item_q}' item, "
1370
                    "but also an unqualified '{item_u}' item "
1371
                    "and those two differ in value - "
1372
                    "'{value_q}' vs '{value_u}' respectively."
1373
                    .format(item_q=full_label, item_u=label,
1374
                            value_q=value, value_u=items_dict[label])
1375
                )
1376 2
                raise ValueError(msg)
1377 2
            new_items[label] = value
1378 2
        return new_items
1379
1380 2
    def validate_identifiers(self, yaml_file):
1381 2
        if self.identifiers is None:
1382
            raise ValueError("Empty identifier section in file %s" % yaml_file)
1383
1384
        # Validate all identifiers are non-empty:
1385 2
        for ident_type, ident_val in self.identifiers.items():
1386 2
            if not isinstance(ident_type, str) or not isinstance(ident_val, str):
1387
                raise ValueError("Identifiers and values must be strings: %s in file %s"
1388
                                 % (ident_type, yaml_file))
1389 2
            if ident_val.strip() == "":
1390
                raise ValueError("Identifiers must not be empty: %s in file %s"
1391
                                 % (ident_type, yaml_file))
1392 2
            if ident_type[0:3] == 'cce':
1393 2
                if not is_cce_format_valid(ident_val):
1394
                    raise ValueError("CCE Identifier format must be valid: invalid format '%s' for CEE '%s'"
1395
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
1396 2
                if not is_cce_value_valid("CCE-" + ident_val):
1397
                    raise ValueError("CCE Identifier value is not a valid checksum: invalid value '%s' for CEE '%s'"
1398
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
1399
1400 2
    def validate_references(self, yaml_file):
1401 2
        if self.references is None:
1402
            raise ValueError("Empty references section in file %s" % yaml_file)
1403
1404 2
        for ref_type, ref_val in self.references.items():
1405 2
            if not isinstance(ref_type, str) or not isinstance(ref_val, str):
1406
                raise ValueError("References and values must be strings: %s in file %s"
1407
                                 % (ref_type, yaml_file))
1408 2
            if ref_val.strip() == "":
1409
                raise ValueError("References must not be empty: %s in file %s"
1410
                                 % (ref_type, yaml_file))
1411
1412 2
        for ref_type, ref_val in self.references.items():
1413 2
            for ref in ref_val.split(","):
1414 2
                if ref.strip() != ref:
1415
                    msg = (
1416
                        "Comma-separated '{ref_type}' reference "
1417
                        "in {yaml_file} contains whitespace."
1418
                        .format(ref_type=ref_type, yaml_file=yaml_file))
1419
                    raise ValueError(msg)
1420
1421 2
    def validate_prodtype(self, yaml_file):
1422 2
        for ptype in self.prodtype.split(","):
1423 2
            if ptype.strip() != ptype:
1424
                msg = (
1425
                    "Comma-separated '{prodtype}' prodtype "
1426
                    "in {yaml_file} contains whitespace."
1427
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
1428
                raise ValueError(msg)
1429
1430 2
    def to_xml_element(self):
1431
        rule = ET.Element('Rule')
1432
        rule.set('id', self.id_)
1433
        if self.prodtype != "all":
1434
            rule.set("prodtype", self.prodtype)
1435
        rule.set('severity', self.severity)
1436
        add_sub_element(rule, 'title', self.title)
1437
        add_sub_element(rule, 'description', self.description)
1438
        add_sub_element(rule, 'rationale', self.rationale)
1439
1440
        main_ident = ET.Element('ident')
1441
        for ident_type, ident_val in self.identifiers.items():
1442
            # This is not true if items were normalized
1443
            if '@' in ident_type:
1444
                # the ident is applicable only on some product
1445
                # format : 'policy@product', eg. 'stigid@product'
1446
                # for them, we create a separate <ref> element
1447
                policy, product = ident_type.split('@')
1448
                ident = ET.SubElement(rule, 'ident')
1449
                ident.set(policy, ident_val)
1450
                ident.set('prodtype', product)
1451
            else:
1452
                main_ident.set(ident_type, ident_val)
1453
1454
        if main_ident.attrib:
1455
            rule.append(main_ident)
1456
1457
        main_ref = ET.Element('ref')
1458
        for ref_type, ref_val in self.references.items():
1459
            # This is not true if items were normalized
1460
            if '@' in ref_type:
1461
                # the reference is applicable only on some product
1462
                # format : 'policy@product', eg. 'stigid@product'
1463
                # for them, we create a separate <ref> element
1464
                policy, product = ref_type.split('@')
1465
                ref = ET.SubElement(rule, 'ref')
1466
                ref.set(policy, ref_val)
1467
                ref.set('prodtype', product)
1468
            else:
1469
                main_ref.set(ref_type, ref_val)
1470
1471
        if main_ref.attrib:
1472
            rule.append(main_ref)
1473
1474
        ocil_parent = rule
1475
        check_parent = rule
1476
1477
        if self.sce_metadata:
1478
            # TODO: This is pretty much another hack, just like the previous OVAL
1479
            # one. However, we avoided the external SCE content as I'm not sure it
1480
            # is generally useful (unlike say, CVE checking with external OVAL)
1481
            #
1482
            # Additionally, we build the content (check subelement) here rather
1483
            # than in xslt due to the nature of our SCE metadata.
1484
            #
1485
            # Finally, before we begin, we might have an element with both SCE
1486
            # and OVAL. We have no way of knowing (right here) whether that is
1487
            # the case (due to a variety of issues, most notably, that linking
1488
            # hasn't yet occurred). So we must rely on the content author's
1489
            # good will, by annotating SCE content with a complex-check tag
1490
            # if necessary.
1491
1492
            if 'complex-check' in self.sce_metadata:
1493
                # Here we have an issue: XCCDF allows EITHER one or more check
1494
                # elements OR a single complex-check. While we have an explicit
1495
                # case handling the OVAL-and-SCE interaction, OCIL entries have
1496
                # (historically) been alongside OVAL content and been in an
1497
                # "OR" manner -- preferring OVAL to SCE. In order to accomplish
1498
                # this, we thus need to add _yet another parent_ when OCIL data
1499
                # is present, and add update ocil_parent accordingly.
1500
                if self.ocil or self.ocil_clause:
1501
                    ocil_parent = ET.SubElement(ocil_parent, "complex-check")
1502
                    ocil_parent.set('operator', 'OR')
1503
1504
                check_parent = ET.SubElement(ocil_parent, "complex-check")
1505
                check_parent.set('operator', self.sce_metadata['complex-check'])
1506
1507
            # Now, add the SCE check element to the tree.
1508
            check = ET.SubElement(check_parent, "check")
1509
            check.set("system", SCE_SYSTEM)
1510
1511
            if 'check-import' in self.sce_metadata:
1512
                if isinstance(self.sce_metadata['check-import'], str):
1513
                    self.sce_metadata['check-import'] = [self.sce_metadata['check-import']]
1514
                for entry in self.sce_metadata['check-import']:
1515
                    check_import = ET.SubElement(check, 'check-import')
1516
                    check_import.set('import-name', entry)
1517
                    check_import.text = None
1518
1519
            if 'check-export' in self.sce_metadata:
1520
                if isinstance(self.sce_metadata['check-export'], str):
1521
                    self.sce_metadata['check-export'] = [self.sce_metadata['check-export']]
1522
                for entry in self.sce_metadata['check-export']:
1523
                    export, value = entry.split('=')
1524
                    check_export = ET.SubElement(check, 'check-export')
1525
                    check_export.set('value-id', value)
1526
                    check_export.set('export-name', export)
1527
                    check_export.text = None
1528
1529
            check_ref = ET.SubElement(check, "check-content-ref")
1530
            href = self.sce_metadata['relative_path']
1531
            check_ref.set("href", href)
1532
1533
        if self.oval_external_content:
1534
            check = ET.SubElement(check_parent, 'check')
1535
            check.set("system", "http://oval.mitre.org/XMLSchema/oval-definitions-5")
1536
            external_content = ET.SubElement(check, "check-content-ref")
1537
            external_content.set("href", self.oval_external_content)
1538
        else:
1539
            # TODO: This is pretty much a hack, oval ID will be the same as rule ID
1540
            #       and we don't want the developers to have to keep them in sync.
1541
            #       Therefore let's just add an OVAL ref of that ID.
1542
            oval_ref = ET.SubElement(check_parent, "oval")
1543
            oval_ref.set("id", self.id_)
1544
1545
        if self.ocil or self.ocil_clause:
1546
            ocil = add_sub_element(ocil_parent, 'ocil', self.ocil if self.ocil else "")
1547
            if self.ocil_clause:
1548
                ocil.set("clause", self.ocil_clause)
1549
1550
        add_warning_elements(rule, self.warnings)
1551
        add_nondata_subelements(rule, "requires", "id", self.requires)
1552
        add_nondata_subelements(rule, "conflicts", "id", self.conflicts)
1553
1554
        for cpe_name in self.cpe_names:
1555
            platform_el = ET.SubElement(rule, "platform")
1556
            platform_el.set("idref", cpe_name)
1557
1558
        return rule
1559
1560 2
    def to_file(self, file_name):
1561
        root = self.to_xml_element()
1562
        tree = ET.ElementTree(root)
1563
        tree.write(file_name)
1564
1565 2
    def __hash__(self):
1566
        """ Controls are meant to be unique, so using the
1567
        ID should suffice"""
1568
        return hash(self.id_)
1569
1570 2
    def __eq__(self, other):
1571
        return isinstance(other, self.__class__) and self.id_ == other.id_
1572
1573 2
    def __ne__(self, other):
1574
        return not self != other
1575
1576 2
    def __lt__(self, other):
1577
        return self.id_ < other.id_
1578
1579 2
    def __str__(self):
1580
        return self.id_
1581
1582
1583 2
class DirectoryLoader(object):
1584 2
    def __init__(self, profiles_dir, env_yaml):
1585
        self.benchmark_file = None
1586
        self.group_file = None
1587
        self.loaded_group = None
1588
        self.rule_files = []
1589
        self.value_files = []
1590
        self.subdirectories = []
1591
1592
        self.all_values = dict()
1593
        self.all_rules = dict()
1594
        self.all_groups = dict()
1595
1596
        self.profiles_dir = profiles_dir
1597
        self.env_yaml = env_yaml
1598
        self.product = env_yaml["product"]
1599
1600
        self.parent_group = None
1601
1602 2
    def _collect_items_to_load(self, guide_directory):
1603
        for dir_item in sorted(os.listdir(guide_directory)):
1604
            dir_item_path = os.path.join(guide_directory, dir_item)
1605
            _, extension = os.path.splitext(dir_item)
1606
1607
            if extension == '.var':
1608
                self.value_files.append(dir_item_path)
1609
            elif dir_item == "benchmark.yml":
1610
                if self.benchmark_file:
1611
                    raise ValueError("Multiple benchmarks in one directory")
1612
                self.benchmark_file = dir_item_path
1613
            elif dir_item == "group.yml":
1614
                if self.group_file:
1615
                    raise ValueError("Multiple groups in one directory")
1616
                self.group_file = dir_item_path
1617
            elif extension == '.rule':
1618
                self.rule_files.append(dir_item_path)
1619
            elif is_rule_dir(dir_item_path):
1620
                self.rule_files.append(get_rule_dir_yaml(dir_item_path))
1621
            elif dir_item != "tests":
1622
                if os.path.isdir(dir_item_path):
1623
                    self.subdirectories.append(dir_item_path)
1624
                else:
1625
                    sys.stderr.write(
1626
                        "Encountered file '%s' while recursing, extension '%s' "
1627
                        "is unknown. Skipping..\n"
1628
                        % (dir_item, extension)
1629
                    )
1630
1631 2
    def load_benchmark_or_group(self, guide_directory):
1632
        """
1633
        Loads a given benchmark or group from the specified benchmark_file or
1634
        group_file, in the context of guide_directory, profiles_dir and env_yaml.
1635
1636
        Returns the loaded group or benchmark.
1637
        """
1638
        group = None
1639
        if self.group_file and self.benchmark_file:
1640
            raise ValueError("A .benchmark file and a .group file were found in "
1641
                             "the same directory '%s'" % (guide_directory))
1642
1643
        # we treat benchmark as a special form of group in the following code
1644
        if self.benchmark_file:
1645
            group = Benchmark.from_yaml(
1646
                self.benchmark_file, self.env_yaml, 'product-name'
1647
            )
1648
            group.add_value_needed_for_ocil_clauses()
1649
            if self.profiles_dir:
1650
                group.add_profiles_from_dir(self.profiles_dir, self.env_yaml)
1651
1652
        if self.group_file:
1653
            group = Group.from_yaml(self.group_file, self.env_yaml)
1654
            self.all_groups[group.id_] = group
1655
1656
        return group
1657
1658 2
    def _load_group_process_and_recurse(self, guide_directory):
1659
        self.loaded_group = self.load_benchmark_or_group(guide_directory)
1660
1661
        if self.loaded_group:
1662
1663
            if self.parent_group:
1664
                self.parent_group.add_group(self.loaded_group, env_yaml=self.env_yaml)
1665
1666
            self._process_values()
1667
            self._recurse_into_subdirs()
1668
            self._process_rules()
1669
1670 2
    def process_directory_tree(self, start_dir, extra_group_dirs=None):
1671
        self._collect_items_to_load(start_dir)
1672
        if extra_group_dirs:
1673
            self.subdirectories += extra_group_dirs
1674
        self._load_group_process_and_recurse(start_dir)
1675
1676 2
    def process_directory_trees(self, directories):
1677
        start_dir = directories[0]
1678
        extra_group_dirs = directories[1:]
1679
        return self.process_directory_tree(start_dir, extra_group_dirs)
1680
1681 2
    def _recurse_into_subdirs(self):
1682
        for subdir in self.subdirectories:
1683
            loader = self._get_new_loader()
1684
            loader.parent_group = self.loaded_group
1685
            loader.process_directory_tree(subdir)
1686
            self.all_values.update(loader.all_values)
1687
            self.all_rules.update(loader.all_rules)
1688
            self.all_groups.update(loader.all_groups)
1689
1690 2
    def _get_new_loader(self):
1691
        raise NotImplementedError()
1692
1693 2
    def _process_values(self):
1694
        raise NotImplementedError()
1695
1696 2
    def _process_rules(self):
1697
        raise NotImplementedError()
1698
1699 2
    def save_all_entities(self, base_dir):
1700
        destdir = os.path.join(base_dir, "rules")
1701
        mkdir_p(destdir)
1702
        if self.all_rules:
1703
            self.save_entities(self.all_rules.values(), destdir)
1704
1705
        destdir = os.path.join(base_dir, "groups")
1706
        mkdir_p(destdir)
1707
        if self.all_groups:
1708
            self.save_entities(self.all_groups.values(), destdir)
1709
1710
        destdir = os.path.join(base_dir, "values")
1711
        mkdir_p(destdir)
1712
        if self.all_values:
1713
            self.save_entities(self.all_values.values(), destdir)
1714
1715 2
    def save_entities(self, entities, destdir):
1716
        if not entities:
1717
            return
1718
        for entity in entities:
1719
            basename = entity.id_ + ".yml"
1720
            dest_filename = os.path.join(destdir, basename)
1721
            entity.dump_yaml(dest_filename)
1722
1723
1724 2
class BuildLoader(DirectoryLoader):
1725 2
    def __init__(self, profiles_dir, env_yaml,
1726
                 sce_metadata_path=None):
1727
        super(BuildLoader, self).__init__(profiles_dir, env_yaml)
1728
1729
        self.sce_metadata = None
1730
        if sce_metadata_path and os.path.getsize(sce_metadata_path):
1731
            self.sce_metadata = json.load(open(sce_metadata_path, 'r'))
1732
1733 2
    def _process_values(self):
1734
        for value_yaml in self.value_files:
1735
            value = Value.from_yaml(value_yaml, self.env_yaml)
1736
            self.all_values[value.id_] = value
1737
            self.loaded_group.add_value(value)
1738
1739 2
    def _process_rules(self):
1740
        for rule_yaml in self.rule_files:
1741
            try:
1742
                rule = Rule.from_yaml(rule_yaml, self.env_yaml, self.sce_metadata)
1743
            except DocumentationNotComplete:
1744
                # Happens on non-debug build when a rule is "documentation-incomplete"
1745
                continue
1746
            prodtypes = parse_prodtype(rule.prodtype)
1747
            if "all" not in prodtypes and self.product not in prodtypes:
1748
                continue
1749
            self.all_rules[rule.id_] = rule
1750
            self.loaded_group.add_rule(rule, env_yaml=self.env_yaml)
1751
1752
            if self.loaded_group.platforms:
1753
                rule.inherited_platforms += self.loaded_group.platforms
1754
1755
            rule.normalize(self.env_yaml["product"])
1756
1757 2
    def _get_new_loader(self):
1758
        loader = BuildLoader(
1759
            self.profiles_dir, self.env_yaml)
1760
        # Do it this way so we only have to parse the SCE metadata once.
1761
        loader.sce_metadata = self.sce_metadata
1762
        return loader
1763
1764 2
    def export_group_to_file(self, filename):
1765
        return self.loaded_group.to_file(filename)
1766
1767
1768 2
class LinearLoader(object):
1769 2
    def __init__(self, env_yaml, resolved_path):
1770
        self.resolved_rules_dir = os.path.join(resolved_path, "rules")
1771
        self.rules = dict()
1772
1773
        self.resolved_profiles_dir = os.path.join(resolved_path, "profiles")
1774
        self.profiles = dict()
1775
1776
        self.resolved_groups_dir = os.path.join(resolved_path, "groups")
1777
        self.groups = dict()
1778
1779
        self.resolved_values_dir = os.path.join(resolved_path, "values")
1780
        self.values = dict()
1781
1782
        self.benchmark = None
1783
        self.env_yaml = env_yaml
1784
1785 2
    def find_first_groups_ids(self, start_dir):
1786
        group_files = glob.glob(os.path.join(start_dir, "*", "group.yml"))
1787
        group_ids = [fname.split(os.path.sep)[-2] for fname in group_files]
1788
        return group_ids
1789
1790 2
    def load_entities_by_id(self, filenames, destination, cls):
1791
        for fname in filenames:
1792
            entity = cls.from_yaml(fname, self.env_yaml)
1793
            destination[entity.id_] = entity
1794
1795 2
    def load_benchmark(self, directory):
1796
        self.benchmark = Benchmark.from_yaml(
1797
            os.path.join(directory, "benchmark.yml"), self.env_yaml, "product-name")
1798
        self.benchmark.add_value_needed_for_ocil_clauses()
1799
1800
        self.benchmark.add_profiles_from_dir(self.resolved_profiles_dir, self.env_yaml)
1801
1802
        benchmark_first_groups = self.find_first_groups_ids(directory)
1803
        for gid in benchmark_first_groups:
1804
            self.benchmark.add_group(self.groups[gid], self.env_yaml)
1805
1806 2
    def load_compiled_content(self):
1807
        filenames = glob.glob(os.path.join(self.resolved_rules_dir, "*.yml"))
1808
        self.load_entities_by_id(filenames, self.rules, Rule)
1809
1810
        filenames = glob.glob(os.path.join(self.resolved_groups_dir, "*.yml"))
1811
        self.load_entities_by_id(filenames, self.groups, Group)
1812
1813
        filenames = glob.glob(os.path.join(self.resolved_profiles_dir, "*.yml"))
1814
        self.load_entities_by_id(filenames, self.profiles, Profile)
1815
1816
        filenames = glob.glob(os.path.join(self.resolved_values_dir, "*.yml"))
1817
        self.load_entities_by_id(filenames, self.values, Value)
1818
1819
        for g in self.groups.values():
1820
            g.load_entities(self.rules, self.values, self.groups)
1821
1822 2
    def export_benchmark_to_file(self, filename):
1823
        return self.benchmark.to_file(filename)
1824