Test Failed
Pull Request — master (#7075)
by Alexander
02:16
created

ssg.build_yaml   F

Complexity

Total Complexity 341

Size/Duplication

Total Lines 1641
Duplicated Lines 1.65 %

Test Coverage

Coverage 33.36%

Importance

Changes 0
Metric Value
eloc 1180
dl 27
loc 1641
ccs 354
cts 1061
cp 0.3336
rs 0.88
c 0
b 0
f 0
wmc 341

4 Functions

Rating   Name   Duplication   Size   Complexity  
B reorder_according_to_ordering() 0 16 6
A add_warning_elements() 0 15 2
A add_sub_element() 0 28 2
A add_nondata_subelements() 0 6 2

85 Methods

Rating   Name   Duplication   Size   Complexity  
A ResolvableProfile.extend_by() 0 12 1
A Profile.get_variable_selectors() 0 2 1
A Profile.__sub__() 0 13 1
A ProfileWithInlinePolicies.__init__() 0 3 1
C Profile.dump_yaml() 0 29 10
A Profile.get_rule_selectors() 0 2 1
A ResolvableProfile._merge_control() 0 5 3
A ResolvableProfile.__init__() 0 4 1
A Profile.validate_refine_rules() 0 26 4
A ProfileWithSeparatePolicies._parse_policies() 0 11 4
A ProfileWithSeparatePolicies.extend_by() 0 3 1
C Profile.to_xml_element() 0 43 9
A ResolvableProfile._controls_ids_to_controls() 0 3 1
B Profile.from_yaml() 0 34 8
B ResolvableProfile.resolve() 0 30 5
A Profile.apply_selection() 0 20 5
A Profile.read_yaml_contents() 0 12 2
A ResolvableProfile._subtract_refinements() 0 11 4
A ProfileWithSeparatePolicies.resolve_controls() 0 18 5
A Profile._parse_selections() 0 3 2
A Profile.validate_rules() 0 21 4
A Profile.__init__() 0 17 1
A ProfileWithSeparatePolicies.__init__() 0 3 1
A ProfileWithSeparatePolicies.read_yaml_contents() 0 5 2
A ResolvableProfile.resolve_controls() 0 2 1
B Profile.validate_variables() 0 31 5
A ProfileWithSeparatePolicies._process_controls_ids_into_controls() 13 13 4
A Group.validate_prodtype() 0 8 3
A ProfileWithInlinePolicies.resolve_controls() 0 7 3
A Benchmark.to_file() 0 4 1
A Benchmark.add_value() 0 4 2
A Benchmark.__init__() 0 26 1
C Group.from_yaml() 0 40 9
A Benchmark.add_rule() 0 4 2
A Value.to_file() 0 4 1
B Benchmark.from_yaml() 0 39 5
B Benchmark.add_profiles_from_dir() 0 27 7
A Benchmark.to_xccdf() 0 5 1
A Benchmark.add_bash_remediation_fns_from_file() 0 7 2
A ProfileWithInlinePolicies._process_controls_ids_into_controls() 14 14 4
A Group.__init__() 0 17 1
A Benchmark.add_group() 0 4 2
B Value.from_yaml() 0 42 6
A Benchmark.__str__() 0 2 1
A ProfileWithInlinePolicies.apply_selection() 0 7 3
A Value.to_xml_element() 0 23 5
B Benchmark.to_xml_element() 0 54 8
A Value.__init__() 0 9 1
A Group.add_value() 0 4 2
B Group.to_xml_element() 0 77 6
B Group.add_rule() 0 16 7
C Rule.validate_identifiers() 0 19 9
A Rule._set_attributes_from_dict() 0 10 4
B Rule.make_refs_and_identifiers_product_specific() 0 38 6
F Rule.from_yaml() 0 71 20
A Rule.normalize() 0 10 2
C Rule._make_items_product_specific() 0 36 9
A Group.to_file() 0 4 1
A Group._pass_our_properties_on_to() 0 4 4
A Rule.validate_prodtype() 0 8 3
C Rule.validate_references() 0 20 9
A Rule._verify_disa_cci_format() 0 11 4
B Group.add_group() 0 16 7
A Rule._verify_stigid_format() 0 6 3
A Group.__str__() 0 2 1
A Rule._get_product_only_references() 0 9 5
A Rule.__init__() 0 27 1
A Rule.to_contents_dict() 0 10 2
A Rule.make_template_product_specific() 0 15 2
A BuildLoader.__init__() 0 11 5
A Rule.to_file() 0 4 1
F Rule.to_xml_element() 0 102 21
B BuildLoader._process_rules() 0 23 8
A DirectoryLoader._recurse_into_subdirs() 0 8 2
A DirectoryLoader._process_values() 0 2 1
A DirectoryLoader.__init__() 0 18 1
A BuildLoader._get_new_loader() 0 6 1
B DirectoryLoader.load_benchmark_or_group() 0 27 6
C DirectoryLoader._collect_items_to_load() 0 27 11
A DirectoryLoader._get_new_loader() 0 2 1
A BuildLoader.export_group_to_file() 0 2 1
A DirectoryLoader.process_directory_tree() 0 5 2
A BuildLoader._process_values() 0 5 2
A DirectoryLoader._load_group_process_and_recurse() 0 10 3
A DirectoryLoader._process_rules() 0 2 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like ssg.build_yaml often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 2
from __future__ import absolute_import
2 2
from __future__ import print_function
3
4 2
from collections import defaultdict
5 2
from copy import deepcopy
6 2
import datetime
7 2
import json
8 2
import os
9 2
import os.path
10 2
import re
11 2
import sys
12 2
from xml.sax.saxutils import escape
13
14 2
import yaml
15
16 2
from .build_cpe import CPEDoesNotExist
17 2
from .constants import XCCDF_REFINABLE_PROPERTIES, SCE_SYSTEM
18 2
from .rules import get_rule_dir_id, get_rule_dir_yaml, is_rule_dir
19 2
from .rule_yaml import parse_prodtype
20 2
from .controls import Control
21
22 2
from .checks import is_cce_format_valid, is_cce_value_valid
23 2
from .yaml import DocumentationNotComplete, open_and_expand, open_and_macro_expand
24 2
from .utils import required_key, mkdir_p
25
26 2
from .xml import ElementTree as ET
27 2
from .shims import unicode_func
28
29
30 2
def add_sub_element(parent, tag, data):
31
    """
32
    Creates a new child element under parent with tag tag, and sets
33
    data as the content under the tag. In particular, data is a string
34
    to be parsed as an XML tree, allowing sub-elements of children to be
35
    added.
36
37
    If data should not be parsed as an XML tree, either escape the contents
38
    before passing into this function, or use ElementTree.SubElement().
39
40
    Returns the newly created subelement of type tag.
41
    """
42
    # This is used because our YAML data contain XML and XHTML elements
43
    # ET.SubElement() escapes the < > characters by &lt; and &gt;
44
    # and therefore it does not add child elements
45
    # we need to do a hack instead
46
    # TODO: Remove this function after we move to Markdown everywhere in SSG
47
    ustr = unicode_func("<{0}>{1}</{0}>").format(tag, data)
48
49
    try:
50
        element = ET.fromstring(ustr.encode("utf-8"))
51
    except Exception:
52
        msg = ("Error adding subelement to an element '{0}' from string: '{1}'"
53
               .format(parent.tag, ustr))
54
        raise RuntimeError(msg)
55
56
    parent.append(element)
57
    return element
58
59
60 2
def reorder_according_to_ordering(unordered, ordering, regex=None):
61 2
    ordered = []
62 2
    if regex is None:
63 2
        regex = "|".join(["({0})".format(item) for item in ordering])
64 2
    regex = re.compile(regex)
65
66 2
    items_to_order = list(filter(regex.match, unordered))
67 2
    unordered = set(unordered)
68
69 2
    for priority_type in ordering:
70 2
        for item in items_to_order:
71 2
            if priority_type in item and item in unordered:
72 2
                ordered.append(item)
73 2
                unordered.remove(item)
74 2
    ordered.extend(sorted(unordered))
75 2
    return ordered
76
77
78 2
def add_warning_elements(element, warnings):
79
    # The use of [{dict}, {dict}] in warnings is to handle the following
80
    # scenario where multiple warnings have the same category which is
81
    # valid in SCAP and our content:
82
    #
83
    # warnings:
84
    #     - general: Some general warning
85
    #     - general: Some other general warning
86
    #     - general: |-
87
    #         Some really long multiline general warning
88
    #
89
    # Each of the {dict} should have only one key/value pair.
90
    for warning_dict in warnings:
91
        warning = add_sub_element(element, "warning", list(warning_dict.values())[0])
92
        warning.set("category", list(warning_dict.keys())[0])
93
94
95 2
def add_nondata_subelements(element, subelement, attribute, attr_data):
96
    """Add multiple iterations of a sublement that contains an attribute but no data
97
       For example, <requires id="my_required_id"/>"""
98
    for data in attr_data:
99
        req = ET.SubElement(element, subelement)
100
        req.set(attribute, data)
101
102
103 2
class Profile(object):
104
    """Represents XCCDF profile
105
    """
106
107 2
    def __init__(self, id_):
108 2
        self.id_ = id_
109 2
        self.title = ""
110 2
        self.description = ""
111 2
        self.extends = None
112 2
        self.selected = []
113 2
        self.unselected = []
114 2
        self.variables = dict()
115 2
        self.refine_rules = defaultdict(list)
116 2
        self.metadata = None
117 2
        self.reference = None
118
        # self.platforms is used further in the build system
119
        # self.platform is merged into self.platforms
120
        # it is here for backward compatibility
121 2
        self.platforms = set()
122 2
        self.cpe_names = set()
123 2
        self.platform = None
124
125
126 2
    def read_yaml_contents(self, yaml_contents):
127 2
        self.title = required_key(yaml_contents, "title")
128 2
        del yaml_contents["title"]
129 2
        self.description = required_key(yaml_contents, "description")
130 2
        del yaml_contents["description"]
131 2
        self.extends = yaml_contents.pop("extends", None)
132 2
        selection_entries = required_key(yaml_contents, "selections")
133 2
        if selection_entries:
134 2
            self._parse_selections(selection_entries)
135 2
        del yaml_contents["selections"]
136 2
        self.platforms = yaml_contents.pop("platforms", set())
137 2
        self.platform = yaml_contents.pop("platform", None)
138
139 2
    @classmethod
140 2
    def from_yaml(cls, yaml_file, env_yaml=None):
141 2
        yaml_contents = open_and_expand(yaml_file, env_yaml)
142 2
        if yaml_contents is None:
143
            return None
144
145 2
        basename, _ = os.path.splitext(os.path.basename(yaml_file))
146
147 2
        profile = cls(basename)
148 2
        profile.read_yaml_contents(yaml_contents)
149
150 2
        profile.reference = yaml_contents.pop("reference", None)
151
        # ensure that content of profile.platform is in profile.platforms as
152
        # well
153 2
        if profile.platform is not None:
154
            profile.platforms.add(profile.platform)
155
156 2
        if env_yaml:
157
            for platform in profile.platforms:
158
                try:
159
                    profile.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform))
160
                except CPEDoesNotExist:
161
                    print("Unsupported platform '%s' in profile '%s'." % (platform, profile.id_))
162
                    raise
163
164
        # At the moment, metadata is not used to build content
165 2
        if "metadata" in yaml_contents:
166
            del yaml_contents["metadata"]
167
168 2
        if yaml_contents:
169
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
170
                               % (yaml_file, yaml_contents))
171
172 2
        return profile
173
174 2
    def dump_yaml(self, file_name, documentation_complete=True):
175
        to_dump = {}
176
        to_dump["documentation_complete"] = documentation_complete
177
        to_dump["title"] = self.title
178
        to_dump["description"] = self.description
179
        to_dump["reference"] = self.reference
180
        if self.metadata is not None:
181
            to_dump["metadata"] = self.metadata
182
183
        if self.extends is not None:
184
            to_dump["extends"] = self.extends
185
186
        if self.platforms:
187
            to_dump["platforms"] = self.platforms
188
189
        selections = []
190
        for item in self.selected:
191
            selections.append(item)
192
        for item in self.unselected:
193
            selections.append("!"+item)
194
        for varname in self.variables.keys():
195
            selections.append(varname+"="+self.variables.get(varname))
196
        for rule, refinements in self.refine_rules.items():
197
            for prop, val in refinements:
198
                selections.append("{rule}.{property}={value}"
199
                                  .format(rule=rule, property=prop, value=val))
200
        to_dump["selections"] = selections
201
        with open(file_name, "w+") as f:
202
            yaml.dump(to_dump, f, indent=4)
203
204 2
    def _parse_selections(self, entries):
205 2
        for item in entries:
206 2
            self.apply_selection(item)
207
208 2
    def apply_selection(self, item):
209 2
        if "." in item:
210
            rule, refinement = item.split(".", 1)
211
            property_, value = refinement.split("=", 1)
212
            if property_ not in XCCDF_REFINABLE_PROPERTIES:
213
                msg = ("Property '{property_}' cannot be refined. "
214
                       "Rule properties that can be refined are {refinables}. "
215
                       "Fix refinement '{rule_id}.{property_}={value}' in profile '{profile}'."
216
                       .format(property_=property_, refinables=XCCDF_REFINABLE_PROPERTIES,
217
                               rule_id=rule, value=value, profile=self.id_)
218
                       )
219
                raise ValueError(msg)
220
            self.refine_rules[rule].append((property_, value))
221 2
        elif "=" in item:
222 2
            varname, value = item.split("=", 1)
223 2
            self.variables[varname] = value
224 2
        elif item.startswith("!"):
225
            self.unselected.append(item[1:])
226
        else:
227 2
            self.selected.append(item)
228
229 2
    def to_xml_element(self):
230
        element = ET.Element('Profile')
231
        element.set("id", self.id_)
232
        if self.extends:
233
            element.set("extends", self.extends)
234
        title = add_sub_element(element, "title", self.title)
235
        title.set("override", "true")
236
        desc = add_sub_element(element, "description", self.description)
237
        desc.set("override", "true")
238
239
        if self.reference:
240
            add_sub_element(element, "reference", escape(self.reference))
241
242
        for cpe_name in self.cpe_names:
243
            plat = ET.SubElement(element, "platform")
244
            plat.set("idref", cpe_name)
245
246
        for selection in self.selected:
247
            select = ET.Element("select")
248
            select.set("idref", selection)
249
            select.set("selected", "true")
250
            element.append(select)
251
252
        for selection in self.unselected:
253
            unselect = ET.Element("select")
254
            unselect.set("idref", selection)
255
            unselect.set("selected", "false")
256
            element.append(unselect)
257
258
        for value_id, selector in self.variables.items():
259
            refine_value = ET.Element("refine-value")
260
            refine_value.set("idref", value_id)
261
            refine_value.set("selector", selector)
262
            element.append(refine_value)
263
264
        for refined_rule, refinement_list in self.refine_rules.items():
265
            refine_rule = ET.Element("refine-rule")
266
            refine_rule.set("idref", refined_rule)
267
            for refinement in refinement_list:
268
                refine_rule.set(refinement[0], refinement[1])
269
            element.append(refine_rule)
270
271
        return element
272
273 2
    def get_rule_selectors(self):
274 2
        return list(self.selected + self.unselected)
275
276 2
    def get_variable_selectors(self):
277 2
        return self.variables
278
279 2
    def validate_refine_rules(self, rules):
280
        existing_rule_ids = [r.id_ for r in rules]
281
        for refine_rule, refinement_list in self.refine_rules.items():
282
            # Take first refinement to ilustrate where the error is
283
            # all refinements in list are invalid, so it doesn't really matter
284
            a_refinement = refinement_list[0]
285
286
            if refine_rule not in existing_rule_ids:
287
                msg = (
288
                    "You are trying to refine a rule that doesn't exist. "
289
                    "Rule '{rule_id}' was not found in the benchmark. "
290
                    "Please check all rule refinements for rule: '{rule_id}', for example: "
291
                    "- {rule_id}.{property_}={value}' in profile {profile_id}."
292
                    .format(rule_id=refine_rule, profile_id=self.id_,
293
                            property_=a_refinement[0], value=a_refinement[1])
294
                    )
295
                raise ValueError(msg)
296
297
            if refine_rule not in self.get_rule_selectors():
298
                msg = ("- {rule_id}.{property_}={value}' in profile '{profile_id}' is refining "
299
                       "a rule that is not selected by it. The refinement will not have any "
300
                       "noticeable effect. Either select the rule or remove the rule refinement."
301
                       .format(rule_id=refine_rule, property_=a_refinement[0],
302
                               value=a_refinement[1], profile_id=self.id_)
303
                       )
304
                raise ValueError(msg)
305
306 2
    def validate_variables(self, variables):
307
        variables_by_id = dict()
308
        for var in variables:
309
            variables_by_id[var.id_] = var
310
311
        for var_id, our_val in self.variables.items():
312
            if var_id not in variables_by_id:
313
                all_vars_list = [" - %s" % v for v in variables_by_id.keys()]
314
                msg = (
315
                    "Value '{var_id}' in profile '{profile_name}' is not known. "
316
                    "We know only variables:\n{var_names}"
317
                    .format(
318
                        var_id=var_id, profile_name=self.id_,
319
                        var_names="\n".join(sorted(all_vars_list)))
320
                )
321
                raise ValueError(msg)
322
323
            allowed_selectors = [str(s) for s in variables_by_id[var_id].options.keys()]
324
            if our_val not in allowed_selectors:
325
                msg = (
326
                    "Value '{var_id}' in profile '{profile_name}' "
327
                    "uses the selector '{our_val}'. "
328
                    "This is not possible, as only selectors {all_selectors} are available. "
329
                    "Either change the selector used in the profile, or "
330
                    "add the selector-value pair to the variable definition."
331
                    .format(
332
                        var_id=var_id, profile_name=self.id_, our_val=our_val,
333
                        all_selectors=allowed_selectors,
334
                    )
335
                )
336
                raise ValueError(msg)
337
338 2
    def validate_rules(self, rules, groups):
339
        existing_rule_ids = [r.id_ for r in rules]
340
        rule_selectors = self.get_rule_selectors()
341
        for id_ in rule_selectors:
342
            if id_ in groups:
343
                msg = (
344
                    "You have selected a group '{group_id}' instead of a "
345
                    "rule. Groups have no effect in the profile and are not "
346
                    "allowed to be selected. Please remove '{group_id}' "
347
                    "from profile '{profile_id}' before proceeding."
348
                    .format(group_id=id_, profile_id=self.id_)
349
                )
350
                raise ValueError(msg)
351
            if id_ not in existing_rule_ids:
352
                msg = (
353
                    "Rule '{rule_id}' was not found in the benchmark. Please "
354
                    "remove rule '{rule_id}' from profile '{profile_id}' "
355
                    "before proceeding."
356
                    .format(rule_id=id_, profile_id=self.id_)
357
                )
358
                raise ValueError(msg)
359
360 2
    def __sub__(self, other):
361
        profile = Profile(self.id_)
362
        profile.title = self.title
363
        profile.description = self.description
364
        profile.extends = self.extends
365
        profile.platforms = self.platforms
366
        profile.platform = self.platform
367
        profile.selected = list(set(self.selected) - set(other.selected))
368
        profile.selected.sort()
369
        profile.unselected = list(set(self.unselected) - set(other.unselected))
370
        profile.variables = dict ((k, v) for (k, v) in self.variables.items()
371
                             if k not in other.variables or v != other.variables[k])
372
        return profile
373
374
375 2
class ResolvableProfile(Profile):
376 2
    def __init__(self, * args, ** kwargs):
377
        super(ResolvableProfile, self).__init__(* args, ** kwargs)
378
        self.resolved = False
379
        self.resolved_selections = set()
380
381 2
    def _controls_ids_to_controls(self, controls_manager, policy_id, control_id_list):
382
        items = [controls_manager.get_control(policy_id, cid) for cid in control_id_list]
383
        return items
384
385 2
    def _merge_control(self, control):
386
        self.selected.extend(control.rules)
387
        for varname, value in control.variables.items():
388
            if varname not in self.variables:
389
                self.variables[varname] = value
390
391 2
    def resolve_controls(self, controls_manager):
392
        pass
393
394 2
    def extend_by(self, extended_profile):
395
        extended_selects = set(extended_profile.selected)
396
        self.resolved_selections.update(extended_selects)
397
398
        updated_variables = dict(extended_profile.variables)
399
        updated_variables.update(self.variables)
400
        self.variables = updated_variables
401
402
        extended_refinements = deepcopy(extended_profile.refine_rules)
403
        updated_refinements = self._subtract_refinements(extended_refinements)
404
        updated_refinements.update(self.refine_rules)
405
        self.refine_rules = updated_refinements
406
407 2
    def resolve(self, all_profiles, controls_manager=None):
408
        if self.resolved:
409
            return
410
411
        self.resolve_controls(controls_manager)
412
413
        self.resolved_selections = set(self.selected)
414
415
        if self.extends:
416
            if self.extends not in all_profiles:
417
                msg = (
418
                    "Profile {name} extends profile {extended}, but "
419
                    "only profiles {known_profiles} are available for resolution."
420
                    .format(name=self.id_, extended=self.extends,
421
                            known_profiles=list(all_profiles.keys())))
422
                raise RuntimeError(msg)
423
            extended_profile = all_profiles[self.extends]
424
            extended_profile.resolve(all_profiles, controls_manager)
425
426
            self.extend_by(extended_profile)
427
428
        for uns in self.unselected:
429
            self.resolved_selections.discard(uns)
430
431
        self.unselected = []
432
        self.extends = None
433
434
        self.selected = sorted(self.resolved_selections)
435
436
        self.resolved = True
437
438 2
    def _subtract_refinements(self, extended_refinements):
439
        """
440
        Given a dict of rule refinements from the extended profile,
441
        "undo" every refinement prefixed with '!' in this profile.
442
        """
443
        for rule, refinements in list(self.refine_rules.items()):
444
            if rule.startswith("!"):
445
                for prop, val in refinements:
446
                    extended_refinements[rule[1:]].remove((prop, val))
447
                del self.refine_rules[rule]
448
        return extended_refinements
449
450
451 2
class ProfileWithSeparatePolicies(ResolvableProfile):
452 2
    def __init__(self, * args, ** kwargs):
453
        super(ProfileWithSeparatePolicies, self).__init__(* args, ** kwargs)
454
        self.policies = {}
455
456 2
    def read_yaml_contents(self, yaml_contents):
457
        policies = yaml_contents.pop("policies", None)
458
        if policies:
459
            self._parse_policies(policies)
460
        super(ProfileWithSeparatePolicies, self).read_yaml_contents(yaml_contents)
461
462 2
    def _parse_policies(self, policies_yaml):
463
        for item in policies_yaml:
464
            id_ = required_key(item, "id")
465
            controls_ids = required_key(item, "controls")
466
            if not isinstance(controls_ids, list):
467
                if controls_ids != "all":
468
                    msg = (
469
                        "Policy {id_} contains invalid controls list {controls}."
470
                        .format(id_=id_, controls=str(controls_ids)))
471
                    raise ValueError(msg)
472
            self.policies[id_] = controls_ids
473
474 2 View Code Duplication
    def _process_controls_ids_into_controls(self, controls_manager, policy_id, controls_ids):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
475
        controls = []
476
        for cid in controls_ids:
477
            if not cid.startswith("all"):
478
                controls.extend(
479
                    self._controls_ids_to_controls(controls_manager, policy_id, [cid]))
480
            elif ":" in cid:
481
                _, level_id = cid.split(":", 1)
482
                controls.extend(
483
                    controls_manager.get_all_controls_of_level(policy_id, level_id))
484
            else:
485
                controls.extend(controls_manager.get_all_controls(policy_id))
486
        return controls
487
488 2
    def resolve_controls(self, controls_manager):
489
        for policy_id, controls_ids in self.policies.items():
490
            controls = []
491
492
            if isinstance(controls_ids, list):
493
                controls = self._process_controls_ids_into_controls(
494
                    controls_manager, policy_id, controls_ids)
495
            elif controls_ids.startswith("all"):
496
                controls = self._process_controls_ids_into_controls(
497
                    controls_manager, policy_id, [controls_ids])
498
            else:
499
                msg = (
500
                    "Unknown policy content {content} in profile {profile_id}"
501
                    .format(content=controls_ids, profile_id=self.id_))
502
                raise ValueError(msg)
503
504
            for c in controls:
505
                self._merge_control(c)
506
507 2
    def extend_by(self, extended_profile):
508
        self.policies.update(extended_profile.policies)
509
        super(ProfileWithSeparatePolicies, self).extend_by(extended_profile)
510
511
512 2
class ProfileWithInlinePolicies(ResolvableProfile):
513 2
    def __init__(self, * args, ** kwargs):
514
        super(ProfileWithInlinePolicies, self).__init__(* args, ** kwargs)
515
        self.controls_by_policy = defaultdict(list)
516
517 2
    def apply_selection(self, item):
518
        # ":" is the delimiter for controls but not when the item is a variable
519
        if ":" in item and "=" not in item:
520
            policy_id, control_id = item.split(":", 1)
521
            self.controls_by_policy[policy_id].append(control_id)
522
        else:
523
            super(ProfileWithInlinePolicies, self).apply_selection(item)
524
525 2 View Code Duplication
    def _process_controls_ids_into_controls(self, controls_manager, policy_id, controls_ids):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
526
        controls = []
527
        for cid in controls_ids:
528
            if not cid.startswith("all"):
529
                controls.extend(
530
                    self._controls_ids_to_controls(controls_manager, policy_id, [cid]))
531
            elif ":" in cid:
532
                _, level_id = cid.split(":", 1)
533
                controls.extend(
534
                    controls_manager.get_all_controls_of_level(policy_id, level_id))
535
            else:
536
                controls.extend(
537
                    controls_manager.get_all_controls(policy_id))
538
        return controls
539
540 2
    def resolve_controls(self, controls_manager):
541
        for policy_id, controls_ids in self.controls_by_policy.items():
542
            controls = self._process_controls_ids_into_controls(
543
                controls_manager, policy_id, controls_ids)
544
545
            for c in controls:
546
                self._merge_control(c)
547
548
549 2
class Value(object):
550
    """Represents XCCDF Value
551
    """
552
553 2
    def __init__(self, id_):
554 2
        self.id_ = id_
555 2
        self.title = ""
556 2
        self.description = ""
557 2
        self.type_ = "string"
558 2
        self.operator = "equals"
559 2
        self.interactive = False
560 2
        self.options = {}
561 2
        self.warnings = []
562
563 2
    @staticmethod
564 2
    def from_yaml(yaml_file, env_yaml=None):
565 2
        yaml_contents = open_and_macro_expand(yaml_file, env_yaml)
566 2
        if yaml_contents is None:
567
            return None
568
569 2
        value_id, _ = os.path.splitext(os.path.basename(yaml_file))
570 2
        value = Value(value_id)
571 2
        value.title = required_key(yaml_contents, "title")
572 2
        del yaml_contents["title"]
573 2
        value.description = required_key(yaml_contents, "description")
574 2
        del yaml_contents["description"]
575 2
        value.type_ = required_key(yaml_contents, "type")
576 2
        del yaml_contents["type"]
577 2
        value.operator = yaml_contents.pop("operator", "equals")
578 2
        possible_operators = ["equals", "not equal", "greater than",
579
                              "less than", "greater than or equal",
580
                              "less than or equal", "pattern match"]
581
582 2
        if value.operator not in possible_operators:
583
            raise ValueError(
584
                "Found an invalid operator value '%s' in '%s'. "
585
                "Expected one of: %s"
586
                % (value.operator, yaml_file, ", ".join(possible_operators))
587
            )
588
589 2
        value.interactive = \
590
            yaml_contents.pop("interactive", "false").lower() == "true"
591
592 2
        value.options = required_key(yaml_contents, "options")
593 2
        del yaml_contents["options"]
594 2
        value.warnings = yaml_contents.pop("warnings", [])
595
596 2
        for warning_list in value.warnings:
597
            if len(warning_list) != 1:
598
                raise ValueError("Only one key/value pair should exist for each dictionary")
599
600 2
        if yaml_contents:
601
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
602
                               % (yaml_file, yaml_contents))
603
604 2
        return value
605
606 2
    def to_xml_element(self):
607
        value = ET.Element('Value')
608
        value.set('id', self.id_)
609
        value.set('type', self.type_)
610
        if self.operator != "equals":  # equals is the default
611
            value.set('operator', self.operator)
612
        if self.interactive:  # False is the default
613
            value.set('interactive', 'true')
614
        title = ET.SubElement(value, 'title')
615
        title.text = self.title
616
        add_sub_element(value, 'description', self.description)
617
        add_warning_elements(value, self.warnings)
618
619
        for selector, option in self.options.items():
620
            # do not confuse Value with big V with value with small v
621
            # value is child element of Value
622
            value_small = ET.SubElement(value, 'value')
623
            # by XCCDF spec, default value is value without selector
624
            if selector != "default":
625
                value_small.set('selector', str(selector))
626
            value_small.text = str(option)
627
628
        return value
629
630 2
    def to_file(self, file_name):
631
        root = self.to_xml_element()
632
        tree = ET.ElementTree(root)
633
        tree.write(file_name)
634
635
636 2
class Benchmark(object):
637
    """Represents XCCDF Benchmark
638
    """
639 2
    def __init__(self, id_):
640
        self.id_ = id_
641
        self.title = ""
642
        self.status = ""
643
        self.description = ""
644
        self.notice_id = ""
645
        self.notice_description = ""
646
        self.front_matter = ""
647
        self.rear_matter = ""
648
        self.cpes = []
649
        self.version = "0.1"
650
        self.profiles = []
651
        self.values = {}
652
        self.bash_remediation_fns_group = None
653
        self.groups = {}
654
        self.rules = {}
655
        self.product_cpe_names = []
656
657
        # This is required for OCIL clauses
658
        conditional_clause = Value("conditional_clause")
659
        conditional_clause.title = "A conditional clause for check statements."
660
        conditional_clause.description = conditional_clause.title
661
        conditional_clause.type_ = "string"
662
        conditional_clause.options = {"": "This is a placeholder"}
663
664
        self.add_value(conditional_clause)
665
666 2
    @classmethod
667 2
    def from_yaml(cls, yaml_file, id_, env_yaml=None):
668
        yaml_contents = open_and_macro_expand(yaml_file, env_yaml)
669
        if yaml_contents is None:
670
            return None
671
672
        benchmark = cls(id_)
673
        benchmark.title = required_key(yaml_contents, "title")
674
        del yaml_contents["title"]
675
        benchmark.status = required_key(yaml_contents, "status")
676
        del yaml_contents["status"]
677
        benchmark.description = required_key(yaml_contents, "description")
678
        del yaml_contents["description"]
679
        notice_contents = required_key(yaml_contents, "notice")
680
        benchmark.notice_id = required_key(notice_contents, "id")
681
        del notice_contents["id"]
682
        benchmark.notice_description = required_key(notice_contents,
683
                                                    "description")
684
        del notice_contents["description"]
685
        if not notice_contents:
686
            del yaml_contents["notice"]
687
688
        benchmark.front_matter = required_key(yaml_contents,
689
                                              "front-matter")
690
        del yaml_contents["front-matter"]
691
        benchmark.rear_matter = required_key(yaml_contents,
692
                                             "rear-matter")
693
        del yaml_contents["rear-matter"]
694
        benchmark.version = str(required_key(yaml_contents, "version"))
695
        del yaml_contents["version"]
696
697
        if env_yaml:
698
            benchmark.product_cpe_names = env_yaml["product_cpes"].get_product_cpe_names()
699
700
        if yaml_contents:
701
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
702
                               % (yaml_file, yaml_contents))
703
704
        return benchmark
705
706 2
    def add_profiles_from_dir(self, dir_, env_yaml):
707
        for dir_item in sorted(os.listdir(dir_)):
708
            dir_item_path = os.path.join(dir_, dir_item)
709
            if not os.path.isfile(dir_item_path):
710
                continue
711
712
            _, ext = os.path.splitext(os.path.basename(dir_item_path))
713
            if ext != '.profile':
714
                sys.stderr.write(
715
                    "Encountered file '%s' while looking for profiles, "
716
                    "extension '%s' is unknown. Skipping..\n"
717
                    % (dir_item, ext)
718
                )
719
                continue
720
721
            try:
722
                new_profile = ProfileWithInlinePolicies.from_yaml(dir_item_path, env_yaml)
723
            except DocumentationNotComplete:
724
                continue
725
            except Exception as exc:
726
                msg = ("Error building profile from '{fname}': '{error}'"
727
                       .format(fname=dir_item_path, error=str(exc)))
728
                raise RuntimeError(msg)
729
            if new_profile is None:
730
                continue
731
732
            self.profiles.append(new_profile)
733
734 2
    def add_bash_remediation_fns_from_file(self, file_):
735
        if not file_:
736
            # bash-remediation-functions.xml doens't exist
737
            return
738
739
        tree = ET.parse(file_)
740
        self.bash_remediation_fns_group = tree.getroot()
741
742 2
    def to_xml_element(self):
743
        root = ET.Element('Benchmark')
744
        root.set('xmlns:xsi', 'http://www.w3.org/2001/XMLSchema-instance')
745
        root.set('xmlns:xhtml', 'http://www.w3.org/1999/xhtml')
746
        root.set('xmlns:dc', 'http://purl.org/dc/elements/1.1/')
747
        root.set('id', 'product-name')
748
        root.set('xsi:schemaLocation',
749
                 'http://checklists.nist.gov/xccdf/1.1 xccdf-1.1.4.xsd')
750
        root.set('style', 'SCAP_1.1')
751
        root.set('resolved', 'false')
752
        root.set('xml:lang', 'en-US')
753
        status = ET.SubElement(root, 'status')
754
        status.set('date', datetime.date.today().strftime("%Y-%m-%d"))
755
        status.text = self.status
756
        add_sub_element(root, "title", self.title)
757
        add_sub_element(root, "description", self.description)
758
        notice = add_sub_element(root, "notice", self.notice_description)
759
        notice.set('id', self.notice_id)
760
        add_sub_element(root, "front-matter", self.front_matter)
761
        add_sub_element(root, "rear-matter", self.rear_matter)
762
763
        # The Benchmark applicability is determined by the CPEs
764
        # defined in the product.yml
765
        for cpe_name in self.product_cpe_names:
766
            plat = ET.SubElement(root, "platform")
767
            plat.set("idref", cpe_name)
768
769
        version = ET.SubElement(root, 'version')
770
        version.text = self.version
771
        ET.SubElement(root, "metadata")
772
773
        for profile in self.profiles:
774
            root.append(profile.to_xml_element())
775
776
        for value in self.values.values():
777
            root.append(value.to_xml_element())
778
        if self.bash_remediation_fns_group is not None:
779
            root.append(self.bash_remediation_fns_group)
780
781
        groups_in_bench = list(self.groups.keys())
782
        priority_order = ["system", "services"]
783
        groups_in_bench = reorder_according_to_ordering(groups_in_bench, priority_order)
784
785
        # Make system group the first, followed by services group
786
        for group_id in groups_in_bench:
787
            group = self.groups.get(group_id)
788
            # Products using application benchmark don't have system or services group
789
            if group is not None:
790
                root.append(group.to_xml_element())
791
792
        for rule in self.rules.values():
793
            root.append(rule.to_xml_element())
794
795
        return root
796
797 2
    def to_file(self, file_name, ):
798
        root = self.to_xml_element()
799
        tree = ET.ElementTree(root)
800
        tree.write(file_name)
801
802 2
    def add_value(self, value):
803
        if value is None:
804
            return
805
        self.values[value.id_] = value
806
807
    # The benchmark is also considered a group, so this function signature needs to match
808
    # Group()'s add_group()
809 2
    def add_group(self, group, env_yaml=None):
810
        if group is None:
811
            return
812
        self.groups[group.id_] = group
813
814 2
    def add_rule(self, rule):
815
        if rule is None:
816
            return
817
        self.rules[rule.id_] = rule
818
819 2
    def to_xccdf(self):
820
        """We can easily extend this script to generate a valid XCCDF instead
821
        of SSG SHORTHAND.
822
        """
823
        raise NotImplementedError
824
825 2
    def __str__(self):
826
        return self.id_
827
828
829 2
class Group(object):
830
    """Represents XCCDF Group
831
    """
832 2
    ATTRIBUTES_TO_PASS_ON = (
833
        "platforms",
834
    )
835
836 2
    def __init__(self, id_):
837
        self.id_ = id_
838
        self.prodtype = "all"
839
        self.title = ""
840
        self.description = ""
841
        self.warnings = []
842
        self.requires = []
843
        self.conflicts = []
844
        self.values = {}
845
        self.groups = {}
846
        self.rules = {}
847
        # self.platforms is used further in the build system
848
        # self.platform is merged into self.platforms
849
        # it is here for backward compatibility
850
        self.platforms = set()
851
        self.cpe_names = set()
852
        self.platform = None
853
854 2
    @classmethod
855 2
    def from_yaml(cls, yaml_file, env_yaml=None):
856
        yaml_contents = open_and_macro_expand(yaml_file, env_yaml)
857
        if yaml_contents is None:
858
            return None
859
860
        group_id = os.path.basename(os.path.dirname(yaml_file))
861
        group = cls(group_id)
862
        group.prodtype = yaml_contents.pop("prodtype", "all")
863
        group.title = required_key(yaml_contents, "title")
864
        del yaml_contents["title"]
865
        group.description = required_key(yaml_contents, "description")
866
        del yaml_contents["description"]
867
        group.warnings = yaml_contents.pop("warnings", [])
868
        group.conflicts = yaml_contents.pop("conflicts", [])
869
        group.requires = yaml_contents.pop("requires", [])
870
        group.platform = yaml_contents.pop("platform", None)
871
        group.platforms = yaml_contents.pop("platforms", set())
872
        # ensure that content of group.platform is in group.platforms as
873
        # well
874
        if group.platform is not None:
875
            group.platforms.add(group.platform)
876
877
        if env_yaml:
878
            for platform in group.platforms:
879
                try:
880
                    group.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform))
881
                except CPEDoesNotExist:
882
                    print("Unsupported platform '%s' in group '%s'." % (platform, group.id_))
883
                    raise
884
885
        for warning_list in group.warnings:
886
            if len(warning_list) != 1:
887
                raise ValueError("Only one key/value pair should exist for each dictionary")
888
889
        if yaml_contents:
890
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
891
                               % (yaml_file, yaml_contents))
892
        group.validate_prodtype(yaml_file)
893
        return group
894
895 2
    def validate_prodtype(self, yaml_file):
896
        for ptype in self.prodtype.split(","):
897
            if ptype.strip() != ptype:
898
                msg = (
899
                    "Comma-separated '{prodtype}' prodtype "
900
                    "in {yaml_file} contains whitespace."
901
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
902
                raise ValueError(msg)
903
904 2
    def to_xml_element(self):
905
        group = ET.Element('Group')
906
        group.set('id', self.id_)
907
        if self.prodtype != "all":
908
            group.set("prodtype", self.prodtype)
909
        title = ET.SubElement(group, 'title')
910
        title.text = self.title
911
        add_sub_element(group, 'description', self.description)
912
        add_warning_elements(group, self.warnings)
913
        add_nondata_subelements(group, "requires", "id", self.requires)
914
        add_nondata_subelements(group, "conflicts", "id", self.conflicts)
915
916
        for cpe_name in self.cpe_names:
917
            platform_el = ET.SubElement(group, "platform")
918
            platform_el.set("idref", cpe_name)
919
920
        for _value in self.values.values():
921
            group.append(_value.to_xml_element())
922
923
        # Rules that install or remove packages affect remediation
924
        # of other rules.
925
        # When packages installed/removed rules come first:
926
        # The Rules are ordered in more logical way, and
927
        # remediation order is natural, first the package is installed, then configured.
928
        rules_in_group = list(self.rules.keys())
929
        regex = (r'(package_.*_(installed|removed))|' +
930
                 r'(service_.*_(enabled|disabled))|' +
931
                 r'install_smartcard_packages$')
932
        priority_order = ["installed", "install_smartcard_packages", "removed",
933
                          "enabled", "disabled"]
934
        rules_in_group = reorder_according_to_ordering(rules_in_group, priority_order, regex)
935
936
        # Add rules in priority order, first all packages installed, then removed,
937
        # followed by services enabled, then disabled
938
        for rule_id in rules_in_group:
939
            group.append(self.rules.get(rule_id).to_xml_element())
940
941
        # Add the sub groups after any current level group rules.
942
        # As package installed/removed and service enabled/disabled rules are usuallly in
943
        # top level group, this ensures groups that further configure a package or service
944
        # are after rules that install or remove it.
945
        groups_in_group = list(self.groups.keys())
946
        priority_order = [
947
            # Make sure rpm_verify_(hashes|permissions|ownership) are run before any other rule.
948
            # Due to conflicts between rules rpm_verify_* rules and any rule that configures
949
            # stricter settings, like file_permissions_grub2_cfg and sudo_dedicated_group,
950
            # the rules deviating from the system default should be evaluated later.
951
            # So that in the end the system has contents, permissions and ownership reset, and
952
            # any deviations or stricter settings are applied by the rules in the profile.
953
            "software", "integrity", "integrity-software", "rpm_verification",
954
955
            # The account group has to precede audit group because
956
            # the rule package_screen_installed is desired to be executed before the rule
957
            # audit_rules_privileged_commands, othervise the rule
958
            # does not catch newly installed screen binary during remediation
959
            # and report fail
960
            "accounts", "auditing",
961
962
963
            # The FIPS group should come before Crypto,
964
            # if we want to set a different (stricter) Crypto Policy than FIPS.
965
            "fips", "crypto",
966
967
            # The firewalld_activation must come before ruleset_modifications, othervise
968
            # remediations for ruleset_modifications won't work
969
            "firewalld_activation", "ruleset_modifications",
970
971
            # Rules from group disabling_ipv6 must precede rules from configuring_ipv6,
972
            # otherwise the remediation prints error although it is successful
973
            "disabling_ipv6", "configuring_ipv6"
974
        ]
975
        groups_in_group = reorder_according_to_ordering(groups_in_group, priority_order)
976
        for group_id in groups_in_group:
977
            _group = self.groups[group_id]
978
            group.append(_group.to_xml_element())
979
980
        return group
981
982 2
    def to_file(self, file_name):
983
        root = self.to_xml_element()
984
        tree = ET.ElementTree(root)
985
        tree.write(file_name)
986
987 2
    def add_value(self, value):
988
        if value is None:
989
            return
990
        self.values[value.id_] = value
991
992 2
    def add_group(self, group, env_yaml=None):
993
        if group is None:
994
            return
995
        if self.platforms and not group.platforms:
996
            group.platforms = self.platforms
997
        self.groups[group.id_] = group
998
        self._pass_our_properties_on_to(group)
999
1000
        # Once the group has inherited properties, update cpe_names
1001
        if env_yaml:
1002
            for platform in group.platforms:
1003
                try:
1004
                    group.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform))
1005
                except CPEDoesNotExist:
1006
                    print("Unsupported platform '%s' in group '%s'." % (platform, group.id_))
1007
                    raise
1008
1009
1010 2
    def _pass_our_properties_on_to(self, obj):
1011
        for attr in self.ATTRIBUTES_TO_PASS_ON:
1012
            if hasattr(obj, attr) and getattr(obj, attr) is None:
1013
                setattr(obj, attr, getattr(self, attr))
1014
1015 2
    def add_rule(self, rule, env_yaml=None):
1016
        if rule is None:
1017
            return
1018
        if self.platforms and not rule.platforms:
1019
            rule.platforms = self.platforms
1020
        self.rules[rule.id_] = rule
1021
        self._pass_our_properties_on_to(rule)
1022
1023
        # Once the rule has inherited properties, update cpe_names
1024
        if env_yaml:
1025
            for platform in rule.platforms:
1026
                try:
1027
                    rule.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform))
1028
                except CPEDoesNotExist:
1029
                    print("Unsupported platform '%s' in rule '%s'." % (platform, rule.id_))
1030
                    raise
1031
1032 2
    def __str__(self):
1033
        return self.id_
1034
1035
1036 2
class Rule(object):
1037
    """Represents XCCDF Rule
1038
    """
1039 2
    YAML_KEYS_DEFAULTS = {
1040
        "prodtype": lambda: "all",
1041
        "title": lambda: RuntimeError("Missing key 'title'"),
1042
        "description": lambda: RuntimeError("Missing key 'description'"),
1043
        "rationale": lambda: RuntimeError("Missing key 'rationale'"),
1044
        "severity": lambda: RuntimeError("Missing key 'severity'"),
1045
        "references": lambda: dict(),
1046
        "identifiers": lambda: dict(),
1047
        "ocil_clause": lambda: None,
1048
        "ocil": lambda: None,
1049
        "oval_external_content": lambda: None,
1050
        "warnings": lambda: list(),
1051
        "conflicts": lambda: list(),
1052
        "requires": lambda: list(),
1053
        "platform": lambda: None,
1054
        "platforms": lambda: set(),
1055
        "inherited_platforms": lambda: list(),
1056
        "template": lambda: None,
1057
        "definition_location": lambda: None,
1058
    }
1059
1060 2
    PRODUCT_REFERENCES = ("stigid", "cis",)
1061 2
    GLOBAL_REFERENCES = ("srg", "vmmsrg", "disa", "cis-csc",)
1062
1063 2
    def __init__(self, id_):
1064 2
        self.id_ = id_
1065 2
        self.prodtype = "all"
1066 2
        self.title = ""
1067 2
        self.description = ""
1068 2
        self.definition_location = ""
1069 2
        self.rationale = ""
1070 2
        self.severity = "unknown"
1071 2
        self.references = {}
1072 2
        self.identifiers = {}
1073 2
        self.ocil_clause = None
1074 2
        self.ocil = None
1075 2
        self.oval_external_content = None
1076 2
        self.warnings = []
1077 2
        self.requires = []
1078 2
        self.conflicts = []
1079
        # self.platforms is used further in the build system
1080
        # self.platform is merged into self.platforms
1081
        # it is here for backward compatibility
1082 2
        self.platform = None
1083 2
        self.platforms = set()
1084 2
        self.cpe_names = set()
1085 2
        self.inherited_platforms = [] # platforms inherited from the group
1086 2
        self.sce_metadata = None
1087 2
        self.template = None
1088 2
        self.local_env_yaml = None
1089 2
        self.current_product = None
1090
1091 2
    @classmethod
1092 2
    def from_yaml(cls, yaml_file, env_yaml=None, sce_metadata=None):
1093 2
        yaml_file = os.path.normpath(yaml_file)
1094
1095 2
        rule_id, ext = os.path.splitext(os.path.basename(yaml_file))
1096 2
        if rule_id == "rule" and ext == ".yml":
1097 2
            rule_id = get_rule_dir_id(yaml_file)
1098
1099 2
        local_env_yaml = None
1100 2
        if env_yaml:
1101
            local_env_yaml = dict()
1102
            local_env_yaml.update(env_yaml)
1103
            local_env_yaml["rule_id"] = rule_id
1104
1105 2
        yaml_contents = open_and_macro_expand(yaml_file, local_env_yaml)
1106 2
        if yaml_contents is None:
1107
            return None
1108
1109 2
        rule = cls(rule_id)
1110
1111 2
        if local_env_yaml:
1112
            rule.local_env_yaml = local_env_yaml
1113
1114 2
        try:
1115 2
            rule._set_attributes_from_dict(yaml_contents)
1116
        except RuntimeError as exc:
1117
            msg = ("Error processing '{fname}': {err}"
1118
                   .format(fname=yaml_file, err=str(exc)))
1119
            raise RuntimeError(msg)
1120
1121
        # platforms are read as list from the yaml file
1122
        # we need them to convert to set again
1123 2
        rule.platforms = set(rule.platforms)
1124
1125 2
        for warning_list in rule.warnings:
1126
            if len(warning_list) != 1:
1127
                raise ValueError("Only one key/value pair should exist for each dictionary")
1128
1129
        # ensure that content of rule.platform is in rule.platforms as
1130
        # well
1131 2
        if rule.platform is not None:
1132 2
            rule.platforms.add(rule.platform)
1133
1134
        # Convert the platform names to CPE names
1135
        # But only do it if an env_yaml was specified (otherwise there would be no product CPEs
1136
        # to lookup), and the rule's prodtype matches the product being built
1137 2
        if env_yaml and env_yaml["product"] in parse_prodtype(rule.prodtype):
1138
            for platform in rule.platforms:
1139
                try:
1140
                    rule.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform))
1141
                except CPEDoesNotExist:
1142
                    print("Unsupported platform '%s' in rule '%s'." % (platform, rule.id_))
1143
                    raise
1144
1145 2
        if yaml_contents:
1146
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
1147
                               % (yaml_file, yaml_contents))
1148
1149 2
        if not rule.definition_location:
1150 2
            rule.definition_location = yaml_file
1151
1152 2
        if sce_metadata and rule_id in sce_metadata:
1153
            rule.sce_metadata = sce_metadata[rule_id]
1154
1155 2
        if env_yaml and 'product' in env_yaml:
1156
            rule.current_product = env_yaml['product']
1157
1158 2
        rule.validate_prodtype(yaml_file)
1159 2
        rule.validate_identifiers(yaml_file)
1160 2
        rule.validate_references(yaml_file)
1161 2
        return rule
1162
1163 2
    def _verify_stigid_format(self, product):
1164 2
        stig_id = self.references.get("stigid", None)
1165 2
        if not stig_id:
1166 2
            return
1167 2
        if "," in stig_id:
1168 2
            raise ValueError("Rules can not have multiple STIG IDs.")
1169
1170 2
    def _verify_disa_cci_format(self):
1171 2
        cci_id = self.references.get("disa", None)
1172 2
        if not cci_id:
1173 2
            return
1174
        cci_ex = re.compile(r'^CCI-[0-9]{6}$')
1175
        for cci in cci_id.split(","):
1176
            if not cci_ex.match(cci):
1177
                raise ValueError("CCI '{}' is in the wrong format! "
1178
                                 "Format should be similar to: "
1179
                                 "CCI-XXXXXX".format(cci))
1180
        self.references["disa"] = cci_id
1181
1182 2
    def normalize(self, product):
1183 2
        try:
1184 2
            self.make_refs_and_identifiers_product_specific(product)
1185 2
            self.make_template_product_specific(product)
1186 2
        except Exception as exc:
1187 2
            msg = (
1188
                "Error normalizing '{rule}': {msg}"
1189
                .format(rule=self.id_, msg=str(exc))
1190
            )
1191 2
            raise RuntimeError(msg)
1192
1193 2
    def _get_product_only_references(self):
1194 2
        product_references = dict()
1195
1196 2
        for ref in Rule.PRODUCT_REFERENCES:
1197 2
            start = "{0}@".format(ref)
1198 2
            for gref, gval in self.references.items():
1199 2
                if ref == gref or gref.startswith(start):
1200 2
                    product_references[gref] = gval
1201 2
        return product_references
1202
1203 2
    def make_template_product_specific(self, product):
1204 2
        product_suffix = "@{0}".format(product)
1205
1206 2
        if not self.template:
1207
            return
1208
1209 2
        not_specific_vars = self.template.get("vars", dict())
1210 2
        specific_vars = self._make_items_product_specific(
1211
            not_specific_vars, product_suffix, True)
1212 2
        self.template["vars"] = specific_vars
1213
1214 2
        not_specific_backends = self.template.get("backends", dict())
1215 2
        specific_backends = self._make_items_product_specific(
1216
            not_specific_backends, product_suffix, True)
1217 2
        self.template["backends"] = specific_backends
1218
1219 2
    def make_refs_and_identifiers_product_specific(self, product):
1220 2
        product_suffix = "@{0}".format(product)
1221
1222 2
        product_references = self._get_product_only_references()
1223 2
        general_references = self.references.copy()
1224 2
        for todel in product_references:
1225 2
            general_references.pop(todel)
1226 2
        for ref in Rule.PRODUCT_REFERENCES:
1227 2
            if ref in general_references:
1228
                msg = "Unexpected reference identifier ({0}) without "
1229
                msg += "product qualifier ({0}@{1}) while building rule "
1230
                msg += "{2}"
1231
                msg = msg.format(ref, product, self.id_)
1232
                raise ValueError(msg)
1233
1234 2
        to_set = dict(
1235
            identifiers=(self.identifiers, False),
1236
            general_references=(general_references, True),
1237
            product_references=(product_references, False),
1238
        )
1239 2
        for name, (dic, allow_overwrites) in to_set.items():
1240 2
            try:
1241 2
                new_items = self._make_items_product_specific(
1242
                    dic, product_suffix, allow_overwrites)
1243 2
            except ValueError as exc:
1244 2
                msg = (
1245
                    "Error processing {what} for rule '{rid}': {msg}"
1246
                    .format(what=name, rid=self.id_, msg=str(exc))
1247
                )
1248 2
                raise ValueError(msg)
1249 2
            dic.clear()
1250 2
            dic.update(new_items)
1251
1252 2
        self.references = general_references
1253 2
        self._verify_disa_cci_format()
1254 2
        self.references.update(product_references)
1255
1256 2
        self._verify_stigid_format(product)
1257
1258 2
    def _make_items_product_specific(self, items_dict, product_suffix, allow_overwrites=False):
1259 2
        new_items = dict()
1260 2
        for full_label, value in items_dict.items():
1261 2
            if "@" not in full_label and full_label not in new_items:
1262 2
                new_items[full_label] = value
1263 2
                continue
1264
1265 2
            label = full_label.split("@")[0]
1266
1267
            # this test should occur before matching product_suffix with the product qualifier
1268
            # present in the reference, so it catches problems even for products that are not
1269
            # being built at the moment
1270 2
            if label in Rule.GLOBAL_REFERENCES:
1271
                msg = (
1272
                    "You cannot use product-qualified for the '{item_u}' reference. "
1273
                    "Please remove the product-qualifier and merge values with the "
1274
                    "existing reference if there is any. Original line: {item_q}: {value_q}"
1275
                    .format(item_u=label, item_q=full_label, value_q=value)
1276
                )
1277
                raise ValueError(msg)
1278
1279 2
            if not full_label.endswith(product_suffix):
1280 2
                continue
1281
1282 2
            if label in items_dict and not allow_overwrites and value != items_dict[label]:
1283 2
                msg = (
1284
                    "There is a product-qualified '{item_q}' item, "
1285
                    "but also an unqualified '{item_u}' item "
1286
                    "and those two differ in value - "
1287
                    "'{value_q}' vs '{value_u}' respectively."
1288
                    .format(item_q=full_label, item_u=label,
1289
                            value_q=value, value_u=items_dict[label])
1290
                )
1291 2
                raise ValueError(msg)
1292 2
            new_items[label] = value
1293 2
        return new_items
1294
1295 2
    def _set_attributes_from_dict(self, yaml_contents):
1296 2
        for key, default_getter in self.YAML_KEYS_DEFAULTS.items():
1297 2
            if key not in yaml_contents:
1298 2
                value = default_getter()
1299 2
                if isinstance(value, Exception):
1300
                    raise value
1301
            else:
1302 2
                value = yaml_contents.pop(key)
1303
1304 2
            setattr(self, key, value)
1305
1306 2
    def to_contents_dict(self):
1307
        """
1308
        Returns a dictionary that is the same schema as the dict obtained when loading rule YAML.
1309
        """
1310
1311 2
        yaml_contents = dict()
1312 2
        for key in Rule.YAML_KEYS_DEFAULTS:
1313 2
            yaml_contents[key] = getattr(self, key)
1314
1315 2
        return yaml_contents
1316
1317 2
    def validate_identifiers(self, yaml_file):
1318 2
        if self.identifiers is None:
1319
            raise ValueError("Empty identifier section in file %s" % yaml_file)
1320
1321
        # Validate all identifiers are non-empty:
1322 2
        for ident_type, ident_val in self.identifiers.items():
1323 2
            if not isinstance(ident_type, str) or not isinstance(ident_val, str):
1324
                raise ValueError("Identifiers and values must be strings: %s in file %s"
1325
                                 % (ident_type, yaml_file))
1326 2
            if ident_val.strip() == "":
1327
                raise ValueError("Identifiers must not be empty: %s in file %s"
1328
                                 % (ident_type, yaml_file))
1329 2
            if ident_type[0:3] == 'cce':
1330 2
                if not is_cce_format_valid(ident_val):
1331
                    raise ValueError("CCE Identifier format must be valid: invalid format '%s' for CEE '%s'"
1332
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
1333 2
                if not is_cce_value_valid("CCE-" + ident_val):
1334
                    raise ValueError("CCE Identifier value is not a valid checksum: invalid value '%s' for CEE '%s'"
1335
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
1336
1337 2
    def validate_references(self, yaml_file):
1338 2
        if self.references is None:
1339
            raise ValueError("Empty references section in file %s" % yaml_file)
1340
1341 2
        for ref_type, ref_val in self.references.items():
1342 2
            if not isinstance(ref_type, str) or not isinstance(ref_val, str):
1343
                raise ValueError("References and values must be strings: %s in file %s"
1344
                                 % (ref_type, yaml_file))
1345 2
            if ref_val.strip() == "":
1346
                raise ValueError("References must not be empty: %s in file %s"
1347
                                 % (ref_type, yaml_file))
1348
1349 2
        for ref_type, ref_val in self.references.items():
1350 2
            for ref in ref_val.split(","):
1351 2
                if ref.strip() != ref:
1352
                    msg = (
1353
                        "Comma-separated '{ref_type}' reference "
1354
                        "in {yaml_file} contains whitespace."
1355
                        .format(ref_type=ref_type, yaml_file=yaml_file))
1356
                    raise ValueError(msg)
1357
1358 2
    def validate_prodtype(self, yaml_file):
1359 2
        for ptype in self.prodtype.split(","):
1360 2
            if ptype.strip() != ptype:
1361
                msg = (
1362
                    "Comma-separated '{prodtype}' prodtype "
1363
                    "in {yaml_file} contains whitespace."
1364
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
1365
                raise ValueError(msg)
1366
1367 2
    def to_xml_element(self):
1368
        rule = ET.Element('Rule')
1369
        rule.set('id', self.id_)
1370
        if self.prodtype != "all":
1371
            rule.set("prodtype", self.prodtype)
1372
        rule.set('severity', self.severity)
1373
        add_sub_element(rule, 'title', self.title)
1374
        add_sub_element(rule, 'description', self.description)
1375
        add_sub_element(rule, 'rationale', self.rationale)
1376
1377
        main_ident = ET.Element('ident')
1378
        for ident_type, ident_val in self.identifiers.items():
1379
            # This is not true if items were normalized
1380
            if '@' in ident_type:
1381
                # the ident is applicable only on some product
1382
                # format : 'policy@product', eg. 'stigid@product'
1383
                # for them, we create a separate <ref> element
1384
                policy, product = ident_type.split('@')
1385
                ident = ET.SubElement(rule, 'ident')
1386
                ident.set(policy, ident_val)
1387
                ident.set('prodtype', product)
1388
            else:
1389
                main_ident.set(ident_type, ident_val)
1390
1391
        if main_ident.attrib:
1392
            rule.append(main_ident)
1393
1394
        main_ref = ET.Element('ref')
1395
        for ref_type, ref_val in self.references.items():
1396
            # This is not true if items were normalized
1397
            if '@' in ref_type:
1398
                # the reference is applicable only on some product
1399
                # format : 'policy@product', eg. 'stigid@product'
1400
                # for them, we create a separate <ref> element
1401
                policy, product = ref_type.split('@')
1402
                ref = ET.SubElement(rule, 'ref')
1403
                ref.set(policy, ref_val)
1404
                ref.set('prodtype', product)
1405
            else:
1406
                main_ref.set(ref_type, ref_val)
1407
1408
        if main_ref.attrib:
1409
            rule.append(main_ref)
1410
1411
        if self.oval_external_content:
1412
            check = ET.SubElement(rule, 'check')
1413
            check.set("system", "http://oval.mitre.org/XMLSchema/oval-definitions-5")
1414
            external_content = ET.SubElement(check, "check-content-ref")
1415
            external_content.set("href", self.oval_external_content)
1416
        else:
1417
            # TODO: This is pretty much a hack, oval ID will be the same as rule ID
1418
            #       and we don't want the developers to have to keep them in sync.
1419
            #       Therefore let's just add an OVAL ref of that ID.
1420
            oval_ref = ET.SubElement(rule, "oval")
1421
            oval_ref.set("id", self.id_)
1422
1423
        if self.sce_metadata:
1424
            # TODO: This is pretty much another hack, just like the previous OVAL
1425
            # one. However, we avoided the external SCE content as I'm not sure it
1426
            # is generally useful (unlike say, CVE checking with external OVAL)
1427
            #
1428
            # Additionally, we build the content (check subelement) here rather
1429
            # than in xslt due to the nature of our SCE metadata.
1430
            check = ET.SubElement(rule, "check")
1431
            check.set("system", SCE_SYSTEM)
1432
1433
            if 'check-import' in self.sce_metadata:
1434
                if isinstance(self.sce_metadata['check-import'], str):
1435
                    self.sce_metadata['check-import'] = [self.sce_metadata['check-import']]
1436
                for entry in self.sce_metadata['check-import']:
1437
                    check_import = ET.SubElement(check, 'check-import')
1438
                    check_import.set('import-name', entry)
1439
                    check_import.text = None
1440
1441
            if 'check-export' in self.sce_metadata:
1442
                if isinstance(self.sce_metadata['check-export'], str):
1443
                    self.sce_metadata['check-export'] = [self.sce_metadata['check-export']]
1444
                for entry in self.sce_metadata['check-export']:
1445
                    value, export = entry.split('=')
1446
                    check_export = ET.SubElement(check, 'check-export')
1447
                    check_export.set('value-id', value)
1448
                    check_export.set('export-name', export)
1449
                    check_export.text = None
1450
1451
            check_ref = ET.SubElement(check, "check-content-ref")
1452
            href = self.current_product + "/checks/sce/" + self.sce_metadata['filename']
1453
            check_ref.set("href", href)
1454
1455
        if self.ocil or self.ocil_clause:
1456
            ocil = add_sub_element(rule, 'ocil', self.ocil if self.ocil else "")
1457
            if self.ocil_clause:
1458
                ocil.set("clause", self.ocil_clause)
1459
1460
        add_warning_elements(rule, self.warnings)
1461
        add_nondata_subelements(rule, "requires", "id", self.requires)
1462
        add_nondata_subelements(rule, "conflicts", "id", self.conflicts)
1463
1464
        for cpe_name in self.cpe_names:
1465
            platform_el = ET.SubElement(rule, "platform")
1466
            platform_el.set("idref", cpe_name)
1467
1468
        return rule
1469
1470 2
    def to_file(self, file_name):
1471
        root = self.to_xml_element()
1472
        tree = ET.ElementTree(root)
1473
        tree.write(file_name)
1474
1475
1476 2
class DirectoryLoader(object):
1477 2
    def __init__(self, profiles_dir, bash_remediation_fns, env_yaml):
1478
        self.benchmark_file = None
1479
        self.group_file = None
1480
        self.loaded_group = None
1481
        self.rule_files = []
1482
        self.value_files = []
1483
        self.subdirectories = []
1484
1485
        self.all_values = set()
1486
        self.all_rules = set()
1487
        self.all_groups = set()
1488
1489
        self.profiles_dir = profiles_dir
1490
        self.bash_remediation_fns = bash_remediation_fns
1491
        self.env_yaml = env_yaml
1492
        self.product = env_yaml["product"]
1493
1494
        self.parent_group = None
1495
1496 2
    def _collect_items_to_load(self, guide_directory):
1497
        for dir_item in sorted(os.listdir(guide_directory)):
1498
            dir_item_path = os.path.join(guide_directory, dir_item)
1499
            _, extension = os.path.splitext(dir_item)
1500
1501
            if extension == '.var':
1502
                self.value_files.append(dir_item_path)
1503
            elif dir_item == "benchmark.yml":
1504
                if self.benchmark_file:
1505
                    raise ValueError("Multiple benchmarks in one directory")
1506
                self.benchmark_file = dir_item_path
1507
            elif dir_item == "group.yml":
1508
                if self.group_file:
1509
                    raise ValueError("Multiple groups in one directory")
1510
                self.group_file = dir_item_path
1511
            elif extension == '.rule':
1512
                self.rule_files.append(dir_item_path)
1513
            elif is_rule_dir(dir_item_path):
1514
                self.rule_files.append(get_rule_dir_yaml(dir_item_path))
1515
            elif dir_item != "tests":
1516
                if os.path.isdir(dir_item_path):
1517
                    self.subdirectories.append(dir_item_path)
1518
                else:
1519
                    sys.stderr.write(
1520
                        "Encountered file '%s' while recursing, extension '%s' "
1521
                        "is unknown. Skipping..\n"
1522
                        % (dir_item, extension)
1523
                    )
1524
1525 2
    def load_benchmark_or_group(self, guide_directory):
1526
        """
1527
        Loads a given benchmark or group from the specified benchmark_file or
1528
        group_file, in the context of guide_directory, profiles_dir,
1529
        env_yaml, and bash_remediation_fns.
1530
1531
        Returns the loaded group or benchmark.
1532
        """
1533
        group = None
1534
        if self.group_file and self.benchmark_file:
1535
            raise ValueError("A .benchmark file and a .group file were found in "
1536
                             "the same directory '%s'" % (guide_directory))
1537
1538
        # we treat benchmark as a special form of group in the following code
1539
        if self.benchmark_file:
1540
            group = Benchmark.from_yaml(
1541
                self.benchmark_file, 'product-name', self.env_yaml
1542
            )
1543
            if self.profiles_dir:
1544
                group.add_profiles_from_dir(self.profiles_dir, self.env_yaml)
1545
            group.add_bash_remediation_fns_from_file(self.bash_remediation_fns)
1546
1547
        if self.group_file:
1548
            group = Group.from_yaml(self.group_file, self.env_yaml)
1549
            self.all_groups.add(group.id_)
1550
1551
        return group
1552
1553 2
    def _load_group_process_and_recurse(self, guide_directory):
1554
        self.loaded_group = self.load_benchmark_or_group(guide_directory)
1555
1556
        if self.loaded_group:
1557
            if self.parent_group:
1558
                self.parent_group.add_group(self.loaded_group, env_yaml=self.env_yaml)
1559
1560
            self._process_values()
1561
            self._recurse_into_subdirs()
1562
            self._process_rules()
1563
1564 2
    def process_directory_tree(self, start_dir, extra_group_dirs=None):
1565
        self._collect_items_to_load(start_dir)
1566
        if extra_group_dirs is not None:
1567
            self.subdirectories += extra_group_dirs
1568
        self._load_group_process_and_recurse(start_dir)
1569
1570 2
    def _recurse_into_subdirs(self):
1571
        for subdir in self.subdirectories:
1572
            loader = self._get_new_loader()
1573
            loader.parent_group = self.loaded_group
1574
            loader.process_directory_tree(subdir)
1575
            self.all_values.update(loader.all_values)
1576
            self.all_rules.update(loader.all_rules)
1577
            self.all_groups.update(loader.all_groups)
1578
1579 2
    def _get_new_loader(self):
1580
        raise NotImplementedError()
1581
1582 2
    def _process_values(self):
1583
        raise NotImplementedError()
1584
1585 2
    def _process_rules(self):
1586
        raise NotImplementedError()
1587
1588
1589 2
class BuildLoader(DirectoryLoader):
1590 2
    def __init__(self, profiles_dir, bash_remediation_fns, env_yaml,
1591
                 resolved_rules_dir=None, sce_metadata_path=None):
1592
        super(BuildLoader, self).__init__(profiles_dir, bash_remediation_fns, env_yaml)
1593
1594
        self.resolved_rules_dir = resolved_rules_dir
1595
        if resolved_rules_dir and not os.path.isdir(resolved_rules_dir):
1596
            os.mkdir(resolved_rules_dir)
1597
1598
        self.sce_metadata = None
1599
        if sce_metadata_path and os.path.getsize(sce_metadata_path):
1600
            self.sce_metadata = json.load(open(sce_metadata_path, 'r'))
1601
1602 2
    def _process_values(self):
1603
        for value_yaml in self.value_files:
1604
            value = Value.from_yaml(value_yaml, self.env_yaml)
1605
            self.all_values.add(value)
1606
            self.loaded_group.add_value(value)
1607
1608 2
    def _process_rules(self):
1609
        for rule_yaml in self.rule_files:
1610
            try:
1611
                rule = Rule.from_yaml(rule_yaml, self.env_yaml, self.sce_metadata)
1612
            except DocumentationNotComplete:
1613
                # Happens on non-debug build when a rule is "documentation-incomplete"
1614
                continue
1615
            prodtypes = parse_prodtype(rule.prodtype)
1616
            if "all" not in prodtypes and self.product not in prodtypes:
1617
                continue
1618
            self.all_rules.add(rule)
1619
            self.loaded_group.add_rule(rule, env_yaml=self.env_yaml)
1620
1621
            if self.loaded_group.platforms:
1622
                rule.inherited_platforms += self.loaded_group.platforms
1623
1624
            if self.resolved_rules_dir:
1625
                output_for_rule = os.path.join(
1626
                    self.resolved_rules_dir, "{id_}.yml".format(id_=rule.id_))
1627
                mkdir_p(self.resolved_rules_dir)
1628
                with open(output_for_rule, "w") as f:
1629
                    rule.normalize(self.env_yaml["product"])
1630
                    yaml.dump(rule.to_contents_dict(), f)
1631
1632 2
    def _get_new_loader(self):
1633
        loader = BuildLoader(
1634
            self.profiles_dir, self.bash_remediation_fns, self.env_yaml, self.resolved_rules_dir)
1635
        # Do it this way so we only have to parse the SCE metadata once.
1636
        loader.sce_metadata = self.sce_metadata
1637
        return loader
1638
1639 2
    def export_group_to_file(self, filename):
1640
        return self.loaded_group.to_file(filename)
1641