Test Failed
Push — master ( e4a587...cd4d59 )
by Milan
02:37 queued 12s
created

ssg.build_yaml   F

Complexity

Total Complexity 328

Size/Duplication

Total Lines 1589
Duplicated Lines 1.7 %

Test Coverage

Coverage 33.98%

Importance

Changes 0
Metric Value
eloc 1142
dl 27
loc 1589
ccs 349
cts 1027
cp 0.3398
rs 1.032
c 0
b 0
f 0
wmc 328

4 Functions

Rating   Name   Duplication   Size   Complexity  
A add_warning_elements() 0 15 2
B reorder_according_to_ordering() 0 16 6
A add_sub_element() 0 28 2
A add_nondata_subelements() 0 6 2

85 Methods

Rating   Name   Duplication   Size   Complexity  
A ResolvableProfile.extend_by() 0 12 1
A Profile.get_variable_selectors() 0 2 1
A Profile.__sub__() 0 13 1
A ProfileWithInlinePolicies.__init__() 0 3 1
C Profile.dump_yaml() 0 29 10
A Profile.get_rule_selectors() 0 2 1
A ResolvableProfile._merge_control() 0 5 3
A ResolvableProfile.__init__() 0 4 1
A Profile.validate_refine_rules() 0 26 4
A ProfileWithSeparatePolicies._parse_policies() 0 11 4
A ProfileWithSeparatePolicies.extend_by() 0 3 1
C Profile.to_xml_element() 0 43 9
A ResolvableProfile._controls_ids_to_controls() 0 3 1
B Profile.from_yaml() 0 34 8
B ResolvableProfile.resolve() 0 30 5
A Profile.apply_selection() 0 20 5
A Profile.read_yaml_contents() 0 12 2
A ResolvableProfile._subtract_refinements() 0 11 4
A ProfileWithSeparatePolicies.resolve_controls() 0 18 5
A Profile._parse_selections() 0 3 2
A Profile.validate_rules() 0 21 4
A Profile.__init__() 0 17 1
A ProfileWithSeparatePolicies.__init__() 0 3 1
A ProfileWithSeparatePolicies.read_yaml_contents() 0 5 2
A ResolvableProfile.resolve_controls() 0 2 1
B Profile.validate_variables() 0 31 5
A ProfileWithSeparatePolicies._process_controls_ids_into_controls() 13 13 4
A Group.add_value() 0 4 2
B Group.to_xml_element() 0 74 6
A Group.validate_prodtype() 0 8 3
B Group.add_rule() 0 16 7
A ProfileWithInlinePolicies.resolve_controls() 0 7 3
A Benchmark.to_file() 0 4 1
A Benchmark.add_value() 0 4 2
A Benchmark.__init__() 0 26 1
C Group.from_yaml() 0 40 9
A Benchmark.add_rule() 0 4 2
A Value.to_file() 0 4 1
A Group.to_file() 0 4 1
A Group._pass_our_properties_on_to() 0 4 4
B Benchmark.from_yaml() 0 39 5
B Benchmark.add_profiles_from_dir() 0 27 7
A Benchmark.to_xccdf() 0 5 1
A Benchmark.add_bash_remediation_fns_from_file() 0 7 2
A Group.__init__() 0 17 1
B Group.add_group() 0 16 7
A Benchmark.add_group() 0 4 2
B Value.from_yaml() 0 42 6
A Benchmark.__str__() 0 2 1
A Group.__str__() 0 2 1
A ProfileWithInlinePolicies.apply_selection() 0 7 3
A Value.to_xml_element() 0 23 5
B Benchmark.to_xml_element() 0 54 8
A Value.__init__() 0 9 1
A ProfileWithInlinePolicies._process_controls_ids_into_controls() 14 14 4
A BuildLoader.__init__() 0 6 3
A Rule.to_file() 0 4 1
F Rule.to_xml_element() 0 70 14
B BuildLoader._process_rules() 0 23 8
A DirectoryLoader._recurse_into_subdirs() 0 8 2
C Rule.validate_identifiers() 0 19 9
A Rule._set_attributes_from_dict() 0 10 4
A DirectoryLoader._process_values() 0 2 1
B Rule.make_refs_and_identifiers_product_specific() 0 38 6
A Rule.normalize() 0 10 2
A DirectoryLoader.__init__() 0 18 1
C Rule._make_items_product_specific() 0 36 9
A BuildLoader._get_new_loader() 0 3 1
B DirectoryLoader.load_benchmark_or_group() 0 27 6
A Rule.validate_prodtype() 0 8 3
C DirectoryLoader._collect_items_to_load() 0 27 11
A DirectoryLoader._get_new_loader() 0 2 1
C Rule.validate_references() 0 20 9
A BuildLoader.export_group_to_file() 0 2 1
A Rule._verify_disa_cci_format() 0 11 4
A DirectoryLoader.process_directory_tree() 0 5 2
A BuildLoader._process_values() 0 5 2
A DirectoryLoader._load_group_process_and_recurse() 0 10 3
A DirectoryLoader._process_rules() 0 2 1
A Rule._verify_stigid_format() 0 6 3
A Rule._get_product_only_references() 0 9 5
A Rule.__init__() 0 25 1
A Rule.to_contents_dict() 0 10 2
A Rule.make_template_product_specific() 0 15 2
F Rule.from_yaml() 0 65 16

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like ssg.build_yaml often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 2
from __future__ import absolute_import
2 2
from __future__ import print_function
3
4 2
import os
5 2
import os.path
6 2
from collections import defaultdict
7 2
from copy import deepcopy
8 2
import datetime
9 2
import re
10 2
import sys
11 2
from xml.sax.saxutils import escape
12
13 2
import yaml
14
15 2
from .build_cpe import CPEDoesNotExist
16 2
from .constants import XCCDF_REFINABLE_PROPERTIES
17 2
from .rules import get_rule_dir_id, get_rule_dir_yaml, is_rule_dir
18 2
from .rule_yaml import parse_prodtype
19 2
from .controls import Control
20
21 2
from .checks import is_cce_format_valid, is_cce_value_valid
22 2
from .yaml import DocumentationNotComplete, open_and_expand, open_and_macro_expand
23 2
from .utils import required_key, mkdir_p
24
25 2
from .xml import ElementTree as ET
26 2
from .shims import unicode_func
27
28
29 2
def add_sub_element(parent, tag, data):
30
    """
31
    Creates a new child element under parent with tag tag, and sets
32
    data as the content under the tag. In particular, data is a string
33
    to be parsed as an XML tree, allowing sub-elements of children to be
34
    added.
35
36
    If data should not be parsed as an XML tree, either escape the contents
37
    before passing into this function, or use ElementTree.SubElement().
38
39
    Returns the newly created subelement of type tag.
40
    """
41
    # This is used because our YAML data contain XML and XHTML elements
42
    # ET.SubElement() escapes the < > characters by &lt; and &gt;
43
    # and therefore it does not add child elements
44
    # we need to do a hack instead
45
    # TODO: Remove this function after we move to Markdown everywhere in SSG
46
    ustr = unicode_func("<{0}>{1}</{0}>").format(tag, data)
47
48
    try:
49
        element = ET.fromstring(ustr.encode("utf-8"))
50
    except Exception:
51
        msg = ("Error adding subelement to an element '{0}' from string: '{1}'"
52
               .format(parent.tag, ustr))
53
        raise RuntimeError(msg)
54
55
    parent.append(element)
56
    return element
57
58
59 2
def reorder_according_to_ordering(unordered, ordering, regex=None):
60 2
    ordered = []
61 2
    if regex is None:
62 2
        regex = "|".join(["({0})".format(item) for item in ordering])
63 2
    regex = re.compile(regex)
64
65 2
    items_to_order = list(filter(regex.match, unordered))
66 2
    unordered = set(unordered)
67
68 2
    for priority_type in ordering:
69 2
        for item in items_to_order:
70 2
            if priority_type in item and item in unordered:
71 2
                ordered.append(item)
72 2
                unordered.remove(item)
73 2
    ordered.extend(list(unordered))
74 2
    return ordered
75
76
77 2
def add_warning_elements(element, warnings):
78
    # The use of [{dict}, {dict}] in warnings is to handle the following
79
    # scenario where multiple warnings have the same category which is
80
    # valid in SCAP and our content:
81
    #
82
    # warnings:
83
    #     - general: Some general warning
84
    #     - general: Some other general warning
85
    #     - general: |-
86
    #         Some really long multiline general warning
87
    #
88
    # Each of the {dict} should have only one key/value pair.
89
    for warning_dict in warnings:
90
        warning = add_sub_element(element, "warning", list(warning_dict.values())[0])
91
        warning.set("category", list(warning_dict.keys())[0])
92
93
94 2
def add_nondata_subelements(element, subelement, attribute, attr_data):
95
    """Add multiple iterations of a sublement that contains an attribute but no data
96
       For example, <requires id="my_required_id"/>"""
97
    for data in attr_data:
98
        req = ET.SubElement(element, subelement)
99
        req.set(attribute, data)
100
101
102 2
class Profile(object):
103
    """Represents XCCDF profile
104
    """
105
106 2
    def __init__(self, id_):
107 2
        self.id_ = id_
108 2
        self.title = ""
109 2
        self.description = ""
110 2
        self.extends = None
111 2
        self.selected = []
112 2
        self.unselected = []
113 2
        self.variables = dict()
114 2
        self.refine_rules = defaultdict(list)
115 2
        self.metadata = None
116 2
        self.reference = None
117
        # self.platforms is used further in the build system
118
        # self.platform is merged into self.platforms
119
        # it is here for backward compatibility
120 2
        self.platforms = set()
121 2
        self.cpe_names = set()
122 2
        self.platform = None
123
124
125 2
    def read_yaml_contents(self, yaml_contents):
126 2
        self.title = required_key(yaml_contents, "title")
127 2
        del yaml_contents["title"]
128 2
        self.description = required_key(yaml_contents, "description")
129 2
        del yaml_contents["description"]
130 2
        self.extends = yaml_contents.pop("extends", None)
131 2
        selection_entries = required_key(yaml_contents, "selections")
132 2
        if selection_entries:
133 2
            self._parse_selections(selection_entries)
134 2
        del yaml_contents["selections"]
135 2
        self.platforms = yaml_contents.pop("platforms", set())
136 2
        self.platform = yaml_contents.pop("platform", None)
137
138 2
    @classmethod
139 2
    def from_yaml(cls, yaml_file, env_yaml=None):
140 2
        yaml_contents = open_and_expand(yaml_file, env_yaml)
141 2
        if yaml_contents is None:
142
            return None
143
144 2
        basename, _ = os.path.splitext(os.path.basename(yaml_file))
145
146 2
        profile = cls(basename)
147 2
        profile.read_yaml_contents(yaml_contents)
148
149 2
        profile.reference = yaml_contents.pop("reference", None)
150
        # ensure that content of profile.platform is in profile.platforms as
151
        # well
152 2
        if profile.platform is not None:
153
            profile.platforms.add(profile.platform)
154
155 2
        if env_yaml:
156
            for platform in profile.platforms:
157
                try:
158
                    profile.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform))
159
                except CPEDoesNotExist:
160
                    print("Unsupported platform '%s' in profile '%s'." % (platform, profile.id_))
161
                    raise
162
163
        # At the moment, metadata is not used to build content
164 2
        if "metadata" in yaml_contents:
165
            del yaml_contents["metadata"]
166
167 2
        if yaml_contents:
168
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
169
                               % (yaml_file, yaml_contents))
170
171 2
        return profile
172
173 2
    def dump_yaml(self, file_name, documentation_complete=True):
174
        to_dump = {}
175
        to_dump["documentation_complete"] = documentation_complete
176
        to_dump["title"] = self.title
177
        to_dump["description"] = self.description
178
        to_dump["reference"] = self.reference
179
        if self.metadata is not None:
180
            to_dump["metadata"] = self.metadata
181
182
        if self.extends is not None:
183
            to_dump["extends"] = self.extends
184
185
        if self.platforms:
186
            to_dump["platforms"] = self.platforms
187
188
        selections = []
189
        for item in self.selected:
190
            selections.append(item)
191
        for item in self.unselected:
192
            selections.append("!"+item)
193
        for varname in self.variables.keys():
194
            selections.append(varname+"="+self.variables.get(varname))
195
        for rule, refinements in self.refine_rules.items():
196
            for prop, val in refinements:
197
                selections.append("{rule}.{property}={value}"
198
                                  .format(rule=rule, property=prop, value=val))
199
        to_dump["selections"] = selections
200
        with open(file_name, "w+") as f:
201
            yaml.dump(to_dump, f, indent=4)
202
203 2
    def _parse_selections(self, entries):
204 2
        for item in entries:
205 2
            self.apply_selection(item)
206
207 2
    def apply_selection(self, item):
208 2
        if "." in item:
209
            rule, refinement = item.split(".", 1)
210
            property_, value = refinement.split("=", 1)
211
            if property_ not in XCCDF_REFINABLE_PROPERTIES:
212
                msg = ("Property '{property_}' cannot be refined. "
213
                       "Rule properties that can be refined are {refinables}. "
214
                       "Fix refinement '{rule_id}.{property_}={value}' in profile '{profile}'."
215
                       .format(property_=property_, refinables=XCCDF_REFINABLE_PROPERTIES,
216
                               rule_id=rule, value=value, profile=self.id_)
217
                       )
218
                raise ValueError(msg)
219
            self.refine_rules[rule].append((property_, value))
220 2
        elif "=" in item:
221 2
            varname, value = item.split("=", 1)
222 2
            self.variables[varname] = value
223 2
        elif item.startswith("!"):
224
            self.unselected.append(item[1:])
225
        else:
226 2
            self.selected.append(item)
227
228 2
    def to_xml_element(self):
229
        element = ET.Element('Profile')
230
        element.set("id", self.id_)
231
        if self.extends:
232
            element.set("extends", self.extends)
233
        title = add_sub_element(element, "title", self.title)
234
        title.set("override", "true")
235
        desc = add_sub_element(element, "description", self.description)
236
        desc.set("override", "true")
237
238
        if self.reference:
239
            add_sub_element(element, "reference", escape(self.reference))
240
241
        for cpe_name in self.cpe_names:
242
            plat = ET.SubElement(element, "platform")
243
            plat.set("idref", cpe_name)
244
245
        for selection in self.selected:
246
            select = ET.Element("select")
247
            select.set("idref", selection)
248
            select.set("selected", "true")
249
            element.append(select)
250
251
        for selection in self.unselected:
252
            unselect = ET.Element("select")
253
            unselect.set("idref", selection)
254
            unselect.set("selected", "false")
255
            element.append(unselect)
256
257
        for value_id, selector in self.variables.items():
258
            refine_value = ET.Element("refine-value")
259
            refine_value.set("idref", value_id)
260
            refine_value.set("selector", selector)
261
            element.append(refine_value)
262
263
        for refined_rule, refinement_list in self.refine_rules.items():
264
            refine_rule = ET.Element("refine-rule")
265
            refine_rule.set("idref", refined_rule)
266
            for refinement in refinement_list:
267
                refine_rule.set(refinement[0], refinement[1])
268
            element.append(refine_rule)
269
270
        return element
271
272 2
    def get_rule_selectors(self):
273 2
        return list(self.selected + self.unselected)
274
275 2
    def get_variable_selectors(self):
276 2
        return self.variables
277
278 2
    def validate_refine_rules(self, rules):
279
        existing_rule_ids = [r.id_ for r in rules]
280
        for refine_rule, refinement_list in self.refine_rules.items():
281
            # Take first refinement to ilustrate where the error is
282
            # all refinements in list are invalid, so it doesn't really matter
283
            a_refinement = refinement_list[0]
284
285
            if refine_rule not in existing_rule_ids:
286
                msg = (
287
                    "You are trying to refine a rule that doesn't exist. "
288
                    "Rule '{rule_id}' was not found in the benchmark. "
289
                    "Please check all rule refinements for rule: '{rule_id}', for example: "
290
                    "- {rule_id}.{property_}={value}' in profile {profile_id}."
291
                    .format(rule_id=refine_rule, profile_id=self.id_,
292
                            property_=a_refinement[0], value=a_refinement[1])
293
                    )
294
                raise ValueError(msg)
295
296
            if refine_rule not in self.get_rule_selectors():
297
                msg = ("- {rule_id}.{property_}={value}' in profile '{profile_id}' is refining "
298
                       "a rule that is not selected by it. The refinement will not have any "
299
                       "noticeable effect. Either select the rule or remove the rule refinement."
300
                       .format(rule_id=refine_rule, property_=a_refinement[0],
301
                               value=a_refinement[1], profile_id=self.id_)
302
                       )
303
                raise ValueError(msg)
304
305 2
    def validate_variables(self, variables):
306
        variables_by_id = dict()
307
        for var in variables:
308
            variables_by_id[var.id_] = var
309
310
        for var_id, our_val in self.variables.items():
311
            if var_id not in variables_by_id:
312
                all_vars_list = [" - %s" % v for v in variables_by_id.keys()]
313
                msg = (
314
                    "Value '{var_id}' in profile '{profile_name}' is not known. "
315
                    "We know only variables:\n{var_names}"
316
                    .format(
317
                        var_id=var_id, profile_name=self.id_,
318
                        var_names="\n".join(sorted(all_vars_list)))
319
                )
320
                raise ValueError(msg)
321
322
            allowed_selectors = [str(s) for s in variables_by_id[var_id].options.keys()]
323
            if our_val not in allowed_selectors:
324
                msg = (
325
                    "Value '{var_id}' in profile '{profile_name}' "
326
                    "uses the selector '{our_val}'. "
327
                    "This is not possible, as only selectors {all_selectors} are available. "
328
                    "Either change the selector used in the profile, or "
329
                    "add the selector-value pair to the variable definition."
330
                    .format(
331
                        var_id=var_id, profile_name=self.id_, our_val=our_val,
332
                        all_selectors=allowed_selectors,
333
                    )
334
                )
335
                raise ValueError(msg)
336
337 2
    def validate_rules(self, rules, groups):
338
        existing_rule_ids = [r.id_ for r in rules]
339
        rule_selectors = self.get_rule_selectors()
340
        for id_ in rule_selectors:
341
            if id_ in groups:
342
                msg = (
343
                    "You have selected a group '{group_id}' instead of a "
344
                    "rule. Groups have no effect in the profile and are not "
345
                    "allowed to be selected. Please remove '{group_id}' "
346
                    "from profile '{profile_id}' before proceeding."
347
                    .format(group_id=id_, profile_id=self.id_)
348
                )
349
                raise ValueError(msg)
350
            if id_ not in existing_rule_ids:
351
                msg = (
352
                    "Rule '{rule_id}' was not found in the benchmark. Please "
353
                    "remove rule '{rule_id}' from profile '{profile_id}' "
354
                    "before proceeding."
355
                    .format(rule_id=id_, profile_id=self.id_)
356
                )
357
                raise ValueError(msg)
358
359 2
    def __sub__(self, other):
360
        profile = Profile(self.id_)
361
        profile.title = self.title
362
        profile.description = self.description
363
        profile.extends = self.extends
364
        profile.platforms = self.platforms
365
        profile.platform = self.platform
366
        profile.selected = list(set(self.selected) - set(other.selected))
367
        profile.selected.sort()
368
        profile.unselected = list(set(self.unselected) - set(other.unselected))
369
        profile.variables = dict ((k, v) for (k, v) in self.variables.items()
370
                             if k not in other.variables or v != other.variables[k])
371
        return profile
372
373
374 2
class ResolvableProfile(Profile):
375 2
    def __init__(self, * args, ** kwargs):
376
        super(ResolvableProfile, self).__init__(* args, ** kwargs)
377
        self.resolved = False
378
        self.resolved_selections = set()
379
380 2
    def _controls_ids_to_controls(self, controls_manager, policy_id, control_id_list):
381
        items = [controls_manager.get_control(policy_id, cid) for cid in control_id_list]
382
        return items
383
384 2
    def _merge_control(self, control):
385
        self.selected.extend(control.rules)
386
        for varname, value in control.variables.items():
387
            if varname not in self.variables:
388
                self.variables[varname] = value
389
390 2
    def resolve_controls(self, controls_manager):
391
        pass
392
393 2
    def extend_by(self, extended_profile):
394
        extended_selects = set(extended_profile.selected)
395
        self.resolved_selections.update(extended_selects)
396
397
        updated_variables = dict(extended_profile.variables)
398
        updated_variables.update(self.variables)
399
        self.variables = updated_variables
400
401
        extended_refinements = deepcopy(extended_profile.refine_rules)
402
        updated_refinements = self._subtract_refinements(extended_refinements)
403
        updated_refinements.update(self.refine_rules)
404
        self.refine_rules = updated_refinements
405
406 2
    def resolve(self, all_profiles, controls_manager=None):
407
        if self.resolved:
408
            return
409
410
        self.resolve_controls(controls_manager)
411
412
        self.resolved_selections = set(self.selected)
413
414
        if self.extends:
415
            if self.extends not in all_profiles:
416
                msg = (
417
                    "Profile {name} extends profile {extended}, but "
418
                    "only profiles {known_profiles} are available for resolution."
419
                    .format(name=self.id_, extended=self.extends,
420
                            known_profiles=list(all_profiles.keys())))
421
                raise RuntimeError(msg)
422
            extended_profile = all_profiles[self.extends]
423
            extended_profile.resolve(all_profiles, controls_manager)
424
425
            self.extend_by(extended_profile)
426
427
        for uns in self.unselected:
428
            self.resolved_selections.discard(uns)
429
430
        self.unselected = []
431
        self.extends = None
432
433
        self.selected = sorted(self.resolved_selections)
434
435
        self.resolved = True
436
437 2
    def _subtract_refinements(self, extended_refinements):
438
        """
439
        Given a dict of rule refinements from the extended profile,
440
        "undo" every refinement prefixed with '!' in this profile.
441
        """
442
        for rule, refinements in list(self.refine_rules.items()):
443
            if rule.startswith("!"):
444
                for prop, val in refinements:
445
                    extended_refinements[rule[1:]].remove((prop, val))
446
                del self.refine_rules[rule]
447
        return extended_refinements
448
449
450 2
class ProfileWithSeparatePolicies(ResolvableProfile):
451 2
    def __init__(self, * args, ** kwargs):
452
        super(ProfileWithSeparatePolicies, self).__init__(* args, ** kwargs)
453
        self.policies = {}
454
455 2
    def read_yaml_contents(self, yaml_contents):
456
        policies = yaml_contents.pop("policies", None)
457
        if policies:
458
            self._parse_policies(policies)
459
        super(ProfileWithSeparatePolicies, self).read_yaml_contents(yaml_contents)
460
461 2
    def _parse_policies(self, policies_yaml):
462
        for item in policies_yaml:
463
            id_ = required_key(item, "id")
464
            controls_ids = required_key(item, "controls")
465
            if not isinstance(controls_ids, list):
466
                if controls_ids != "all":
467
                    msg = (
468
                        "Policy {id_} contains invalid controls list {controls}."
469
                        .format(id_=id_, controls=str(controls_ids)))
470
                    raise ValueError(msg)
471
            self.policies[id_] = controls_ids
472
473 2 View Code Duplication
    def _process_controls_ids_into_controls(self, controls_manager, policy_id, controls_ids):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
474
        controls = []
475
        for cid in controls_ids:
476
            if not cid.startswith("all"):
477
                controls.extend(
478
                    self._controls_ids_to_controls(controls_manager, policy_id, [cid]))
479
            elif ":" in cid:
480
                _, level_id = cid.split(":", 1)
481
                controls.extend(
482
                    controls_manager.get_all_controls_of_level(policy_id, level_id))
483
            else:
484
                controls.extend(controls_manager.get_all_controls(policy_id))
485
        return controls
486
487 2
    def resolve_controls(self, controls_manager):
488
        for policy_id, controls_ids in self.policies.items():
489
            controls = []
490
491
            if isinstance(controls_ids, list):
492
                controls = self._process_controls_ids_into_controls(
493
                    controls_manager, policy_id, controls_ids)
494
            elif controls_ids.startswith("all"):
495
                controls = self._process_controls_ids_into_controls(
496
                    controls_manager, policy_id, [controls_ids])
497
            else:
498
                msg = (
499
                    "Unknown policy content {content} in profile {profile_id}"
500
                    .format(content=controls_ids, profile_id=self.id_))
501
                raise ValueError(msg)
502
503
            for c in controls:
504
                self._merge_control(c)
505
506 2
    def extend_by(self, extended_profile):
507
        self.policies.update(extended_profile.policies)
508
        super(ProfileWithSeparatePolicies, self).extend_by(extended_profile)
509
510
511 2
class ProfileWithInlinePolicies(ResolvableProfile):
512 2
    def __init__(self, * args, ** kwargs):
513
        super(ProfileWithInlinePolicies, self).__init__(* args, ** kwargs)
514
        self.controls_by_policy = defaultdict(list)
515
516 2
    def apply_selection(self, item):
517
        # ":" is the delimiter for controls but not when the item is a variable
518
        if ":" in item and "=" not in item:
519
            policy_id, control_id = item.split(":", 1)
520
            self.controls_by_policy[policy_id].append(control_id)
521
        else:
522
            super(ProfileWithInlinePolicies, self).apply_selection(item)
523
524 2 View Code Duplication
    def _process_controls_ids_into_controls(self, controls_manager, policy_id, controls_ids):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
525
        controls = []
526
        for cid in controls_ids:
527
            if not cid.startswith("all"):
528
                controls.extend(
529
                    self._controls_ids_to_controls(controls_manager, policy_id, [cid]))
530
            elif ":" in cid:
531
                _, level_id = cid.split(":", 1)
532
                controls.extend(
533
                    controls_manager.get_all_controls_of_level(policy_id, level_id))
534
            else:
535
                controls.extend(
536
                    controls_manager.get_all_controls(policy_id))
537
        return controls
538
539 2
    def resolve_controls(self, controls_manager):
540
        for policy_id, controls_ids in self.controls_by_policy.items():
541
            controls = self._process_controls_ids_into_controls(
542
                controls_manager, policy_id, controls_ids)
543
544
            for c in controls:
545
                self._merge_control(c)
546
547
548 2
class Value(object):
549
    """Represents XCCDF Value
550
    """
551
552 2
    def __init__(self, id_):
553 2
        self.id_ = id_
554 2
        self.title = ""
555 2
        self.description = ""
556 2
        self.type_ = "string"
557 2
        self.operator = "equals"
558 2
        self.interactive = False
559 2
        self.options = {}
560 2
        self.warnings = []
561
562 2
    @staticmethod
563 2
    def from_yaml(yaml_file, env_yaml=None):
564 2
        yaml_contents = open_and_macro_expand(yaml_file, env_yaml)
565 2
        if yaml_contents is None:
566
            return None
567
568 2
        value_id, _ = os.path.splitext(os.path.basename(yaml_file))
569 2
        value = Value(value_id)
570 2
        value.title = required_key(yaml_contents, "title")
571 2
        del yaml_contents["title"]
572 2
        value.description = required_key(yaml_contents, "description")
573 2
        del yaml_contents["description"]
574 2
        value.type_ = required_key(yaml_contents, "type")
575 2
        del yaml_contents["type"]
576 2
        value.operator = yaml_contents.pop("operator", "equals")
577 2
        possible_operators = ["equals", "not equal", "greater than",
578
                              "less than", "greater than or equal",
579
                              "less than or equal", "pattern match"]
580
581 2
        if value.operator not in possible_operators:
582
            raise ValueError(
583
                "Found an invalid operator value '%s' in '%s'. "
584
                "Expected one of: %s"
585
                % (value.operator, yaml_file, ", ".join(possible_operators))
586
            )
587
588 2
        value.interactive = \
589
            yaml_contents.pop("interactive", "false").lower() == "true"
590
591 2
        value.options = required_key(yaml_contents, "options")
592 2
        del yaml_contents["options"]
593 2
        value.warnings = yaml_contents.pop("warnings", [])
594
595 2
        for warning_list in value.warnings:
596
            if len(warning_list) != 1:
597
                raise ValueError("Only one key/value pair should exist for each dictionary")
598
599 2
        if yaml_contents:
600
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
601
                               % (yaml_file, yaml_contents))
602
603 2
        return value
604
605 2
    def to_xml_element(self):
606
        value = ET.Element('Value')
607
        value.set('id', self.id_)
608
        value.set('type', self.type_)
609
        if self.operator != "equals":  # equals is the default
610
            value.set('operator', self.operator)
611
        if self.interactive:  # False is the default
612
            value.set('interactive', 'true')
613
        title = ET.SubElement(value, 'title')
614
        title.text = self.title
615
        add_sub_element(value, 'description', self.description)
616
        add_warning_elements(value, self.warnings)
617
618
        for selector, option in self.options.items():
619
            # do not confuse Value with big V with value with small v
620
            # value is child element of Value
621
            value_small = ET.SubElement(value, 'value')
622
            # by XCCDF spec, default value is value without selector
623
            if selector != "default":
624
                value_small.set('selector', str(selector))
625
            value_small.text = str(option)
626
627
        return value
628
629 2
    def to_file(self, file_name):
630
        root = self.to_xml_element()
631
        tree = ET.ElementTree(root)
632
        tree.write(file_name)
633
634
635 2
class Benchmark(object):
636
    """Represents XCCDF Benchmark
637
    """
638 2
    def __init__(self, id_):
639
        self.id_ = id_
640
        self.title = ""
641
        self.status = ""
642
        self.description = ""
643
        self.notice_id = ""
644
        self.notice_description = ""
645
        self.front_matter = ""
646
        self.rear_matter = ""
647
        self.cpes = []
648
        self.version = "0.1"
649
        self.profiles = []
650
        self.values = {}
651
        self.bash_remediation_fns_group = None
652
        self.groups = {}
653
        self.rules = {}
654
        self.product_cpe_names = []
655
656
        # This is required for OCIL clauses
657
        conditional_clause = Value("conditional_clause")
658
        conditional_clause.title = "A conditional clause for check statements."
659
        conditional_clause.description = conditional_clause.title
660
        conditional_clause.type_ = "string"
661
        conditional_clause.options = {"": "This is a placeholder"}
662
663
        self.add_value(conditional_clause)
664
665 2
    @classmethod
666 2
    def from_yaml(cls, yaml_file, id_, env_yaml=None):
667
        yaml_contents = open_and_macro_expand(yaml_file, env_yaml)
668
        if yaml_contents is None:
669
            return None
670
671
        benchmark = cls(id_)
672
        benchmark.title = required_key(yaml_contents, "title")
673
        del yaml_contents["title"]
674
        benchmark.status = required_key(yaml_contents, "status")
675
        del yaml_contents["status"]
676
        benchmark.description = required_key(yaml_contents, "description")
677
        del yaml_contents["description"]
678
        notice_contents = required_key(yaml_contents, "notice")
679
        benchmark.notice_id = required_key(notice_contents, "id")
680
        del notice_contents["id"]
681
        benchmark.notice_description = required_key(notice_contents,
682
                                                    "description")
683
        del notice_contents["description"]
684
        if not notice_contents:
685
            del yaml_contents["notice"]
686
687
        benchmark.front_matter = required_key(yaml_contents,
688
                                              "front-matter")
689
        del yaml_contents["front-matter"]
690
        benchmark.rear_matter = required_key(yaml_contents,
691
                                             "rear-matter")
692
        del yaml_contents["rear-matter"]
693
        benchmark.version = str(required_key(yaml_contents, "version"))
694
        del yaml_contents["version"]
695
696
        if env_yaml:
697
            benchmark.product_cpe_names = env_yaml["product_cpes"].get_product_cpe_names()
698
699
        if yaml_contents:
700
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
701
                               % (yaml_file, yaml_contents))
702
703
        return benchmark
704
705 2
    def add_profiles_from_dir(self, dir_, env_yaml):
706
        for dir_item in sorted(os.listdir(dir_)):
707
            dir_item_path = os.path.join(dir_, dir_item)
708
            if not os.path.isfile(dir_item_path):
709
                continue
710
711
            _, ext = os.path.splitext(os.path.basename(dir_item_path))
712
            if ext != '.profile':
713
                sys.stderr.write(
714
                    "Encountered file '%s' while looking for profiles, "
715
                    "extension '%s' is unknown. Skipping..\n"
716
                    % (dir_item, ext)
717
                )
718
                continue
719
720
            try:
721
                new_profile = ProfileWithInlinePolicies.from_yaml(dir_item_path, env_yaml)
722
            except DocumentationNotComplete:
723
                continue
724
            except Exception as exc:
725
                msg = ("Error building profile from '{fname}': '{error}'"
726
                       .format(fname=dir_item_path, error=str(exc)))
727
                raise RuntimeError(msg)
728
            if new_profile is None:
729
                continue
730
731
            self.profiles.append(new_profile)
732
733 2
    def add_bash_remediation_fns_from_file(self, file_):
734
        if not file_:
735
            # bash-remediation-functions.xml doens't exist
736
            return
737
738
        tree = ET.parse(file_)
739
        self.bash_remediation_fns_group = tree.getroot()
740
741 2
    def to_xml_element(self):
742
        root = ET.Element('Benchmark')
743
        root.set('xmlns:xsi', 'http://www.w3.org/2001/XMLSchema-instance')
744
        root.set('xmlns:xhtml', 'http://www.w3.org/1999/xhtml')
745
        root.set('xmlns:dc', 'http://purl.org/dc/elements/1.1/')
746
        root.set('id', 'product-name')
747
        root.set('xsi:schemaLocation',
748
                 'http://checklists.nist.gov/xccdf/1.1 xccdf-1.1.4.xsd')
749
        root.set('style', 'SCAP_1.1')
750
        root.set('resolved', 'false')
751
        root.set('xml:lang', 'en-US')
752
        status = ET.SubElement(root, 'status')
753
        status.set('date', datetime.date.today().strftime("%Y-%m-%d"))
754
        status.text = self.status
755
        add_sub_element(root, "title", self.title)
756
        add_sub_element(root, "description", self.description)
757
        notice = add_sub_element(root, "notice", self.notice_description)
758
        notice.set('id', self.notice_id)
759
        add_sub_element(root, "front-matter", self.front_matter)
760
        add_sub_element(root, "rear-matter", self.rear_matter)
761
762
        # The Benchmark applicability is determined by the CPEs
763
        # defined in the product.yml
764
        for cpe_name in self.product_cpe_names:
765
            plat = ET.SubElement(root, "platform")
766
            plat.set("idref", cpe_name)
767
768
        version = ET.SubElement(root, 'version')
769
        version.text = self.version
770
        ET.SubElement(root, "metadata")
771
772
        for profile in self.profiles:
773
            root.append(profile.to_xml_element())
774
775
        for value in self.values.values():
776
            root.append(value.to_xml_element())
777
        if self.bash_remediation_fns_group is not None:
778
            root.append(self.bash_remediation_fns_group)
779
780
        groups_in_bench = list(self.groups.keys())
781
        priority_order = ["system", "services"]
782
        groups_in_bench = reorder_according_to_ordering(groups_in_bench, priority_order)
783
784
        # Make system group the first, followed by services group
785
        for group_id in groups_in_bench:
786
            group = self.groups.get(group_id)
787
            # Products using application benchmark don't have system or services group
788
            if group is not None:
789
                root.append(group.to_xml_element())
790
791
        for rule in self.rules.values():
792
            root.append(rule.to_xml_element())
793
794
        return root
795
796 2
    def to_file(self, file_name, ):
797
        root = self.to_xml_element()
798
        tree = ET.ElementTree(root)
799
        tree.write(file_name)
800
801 2
    def add_value(self, value):
802
        if value is None:
803
            return
804
        self.values[value.id_] = value
805
806
    # The benchmark is also considered a group, so this function signature needs to match
807
    # Group()'s add_group()
808 2
    def add_group(self, group, env_yaml=None):
809
        if group is None:
810
            return
811
        self.groups[group.id_] = group
812
813 2
    def add_rule(self, rule):
814
        if rule is None:
815
            return
816
        self.rules[rule.id_] = rule
817
818 2
    def to_xccdf(self):
819
        """We can easily extend this script to generate a valid XCCDF instead
820
        of SSG SHORTHAND.
821
        """
822
        raise NotImplementedError
823
824 2
    def __str__(self):
825
        return self.id_
826
827
828 2
class Group(object):
829
    """Represents XCCDF Group
830
    """
831 2
    ATTRIBUTES_TO_PASS_ON = (
832
        "platforms",
833
    )
834
835 2
    def __init__(self, id_):
836
        self.id_ = id_
837
        self.prodtype = "all"
838
        self.title = ""
839
        self.description = ""
840
        self.warnings = []
841
        self.requires = []
842
        self.conflicts = []
843
        self.values = {}
844
        self.groups = {}
845
        self.rules = {}
846
        # self.platforms is used further in the build system
847
        # self.platform is merged into self.platforms
848
        # it is here for backward compatibility
849
        self.platforms = set()
850
        self.cpe_names = set()
851
        self.platform = None
852
853 2
    @classmethod
854 2
    def from_yaml(cls, yaml_file, env_yaml=None):
855
        yaml_contents = open_and_macro_expand(yaml_file, env_yaml)
856
        if yaml_contents is None:
857
            return None
858
859
        group_id = os.path.basename(os.path.dirname(yaml_file))
860
        group = cls(group_id)
861
        group.prodtype = yaml_contents.pop("prodtype", "all")
862
        group.title = required_key(yaml_contents, "title")
863
        del yaml_contents["title"]
864
        group.description = required_key(yaml_contents, "description")
865
        del yaml_contents["description"]
866
        group.warnings = yaml_contents.pop("warnings", [])
867
        group.conflicts = yaml_contents.pop("conflicts", [])
868
        group.requires = yaml_contents.pop("requires", [])
869
        group.platform = yaml_contents.pop("platform", None)
870
        group.platforms = yaml_contents.pop("platforms", set())
871
        # ensure that content of group.platform is in group.platforms as
872
        # well
873
        if group.platform is not None:
874
            group.platforms.add(group.platform)
875
876
        if env_yaml:
877
            for platform in group.platforms:
878
                try:
879
                    group.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform))
880
                except CPEDoesNotExist:
881
                    print("Unsupported platform '%s' in group '%s'." % (platform, group.id_))
882
                    raise
883
884
        for warning_list in group.warnings:
885
            if len(warning_list) != 1:
886
                raise ValueError("Only one key/value pair should exist for each dictionary")
887
888
        if yaml_contents:
889
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
890
                               % (yaml_file, yaml_contents))
891
        group.validate_prodtype(yaml_file)
892
        return group
893
894 2
    def validate_prodtype(self, yaml_file):
895
        for ptype in self.prodtype.split(","):
896
            if ptype.strip() != ptype:
897
                msg = (
898
                    "Comma-separated '{prodtype}' prodtype "
899
                    "in {yaml_file} contains whitespace."
900
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
901
                raise ValueError(msg)
902
903 2
    def to_xml_element(self):
904
        group = ET.Element('Group')
905
        group.set('id', self.id_)
906
        if self.prodtype != "all":
907
            group.set("prodtype", self.prodtype)
908
        title = ET.SubElement(group, 'title')
909
        title.text = self.title
910
        add_sub_element(group, 'description', self.description)
911
        add_warning_elements(group, self.warnings)
912
        add_nondata_subelements(group, "requires", "id", self.requires)
913
        add_nondata_subelements(group, "conflicts", "id", self.conflicts)
914
915
        for cpe_name in self.cpe_names:
916
            platform_el = ET.SubElement(group, "platform")
917
            platform_el.set("idref", cpe_name)
918
919
        for _value in self.values.values():
920
            group.append(_value.to_xml_element())
921
922
        # Rules that install or remove packages affect remediation
923
        # of other rules.
924
        # When packages installed/removed rules come first:
925
        # The Rules are ordered in more logical way, and
926
        # remediation order is natural, first the package is installed, then configured.
927
        rules_in_group = list(self.rules.keys())
928
        regex = r'(package_.*_(installed|removed))|(service_.*_(enabled|disabled))$'
929
        priority_order = ["installed", "removed", "enabled", "disabled"]
930
        rules_in_group = reorder_according_to_ordering(rules_in_group, priority_order, regex)
931
932
        # Add rules in priority order, first all packages installed, then removed,
933
        # followed by services enabled, then disabled
934
        for rule_id in rules_in_group:
935
            group.append(self.rules.get(rule_id).to_xml_element())
936
937
        # Add the sub groups after any current level group rules.
938
        # As package installed/removed and service enabled/disabled rules are usuallly in
939
        # top level group, this ensures groups that further configure a package or service
940
        # are after rules that install or remove it.
941
        groups_in_group = list(self.groups.keys())
942
        priority_order = [
943
            # Make sure rpm_verify_(hashes|permissions|ownership) are run before any other rule.
944
            # Due to conflicts between rules rpm_verify_* rules and any rule that configures
945
            # stricter settings, like file_permissions_grub2_cfg and sudo_dedicated_group,
946
            # the rules deviating from the system default should be evaluated later.
947
            # So that in the end the system has contents, permissions and ownership reset, and
948
            # any deviations or stricter settings are applied by the rules in the profile.
949
            "software", "integrity", "integrity-software", "rpm_verification",
950
951
            # The account group has to precede audit group because
952
            # the rule package_screen_installed is desired to be executed before the rule
953
            # audit_rules_privileged_commands, othervise the rule
954
            # does not catch newly installed screen binary during remediation
955
            # and report fail
956
            "accounts", "auditing",
957
958
959
            # The FIPS group should come before Crypto,
960
            # if we want to set a different (stricter) Crypto Policy than FIPS.
961
            "fips", "crypto",
962
963
            # The firewalld_activation must come before ruleset_modifications, othervise
964
            # remediations for ruleset_modifications won't work
965
            "firewalld_activation", "ruleset_modifications",
966
967
            # Rules from group disabling_ipv6 must precede rules from configuring_ipv6,
968
            # otherwise the remediation prints error although it is successful
969
            "disabling_ipv6", "configuring_ipv6"
970
        ]
971
        groups_in_group = reorder_according_to_ordering(groups_in_group, priority_order)
972
        for group_id in groups_in_group:
973
            _group = self.groups[group_id]
974
            group.append(_group.to_xml_element())
975
976
        return group
977
978 2
    def to_file(self, file_name):
979
        root = self.to_xml_element()
980
        tree = ET.ElementTree(root)
981
        tree.write(file_name)
982
983 2
    def add_value(self, value):
984
        if value is None:
985
            return
986
        self.values[value.id_] = value
987
988 2
    def add_group(self, group, env_yaml=None):
989
        if group is None:
990
            return
991
        if self.platforms and not group.platforms:
992
            group.platforms = self.platforms
993
        self.groups[group.id_] = group
994
        self._pass_our_properties_on_to(group)
995
996
        # Once the group has inherited properties, update cpe_names
997
        if env_yaml:
998
            for platform in group.platforms:
999
                try:
1000
                    group.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform))
1001
                except CPEDoesNotExist:
1002
                    print("Unsupported platform '%s' in group '%s'." % (platform, group.id_))
1003
                    raise
1004
1005
1006 2
    def _pass_our_properties_on_to(self, obj):
1007
        for attr in self.ATTRIBUTES_TO_PASS_ON:
1008
            if hasattr(obj, attr) and getattr(obj, attr) is None:
1009
                setattr(obj, attr, getattr(self, attr))
1010
1011 2
    def add_rule(self, rule, env_yaml=None):
1012
        if rule is None:
1013
            return
1014
        if self.platforms and not rule.platforms:
1015
            rule.platforms = self.platforms
1016
        self.rules[rule.id_] = rule
1017
        self._pass_our_properties_on_to(rule)
1018
1019
        # Once the rule has inherited properties, update cpe_names
1020
        if env_yaml:
1021
            for platform in rule.platforms:
1022
                try:
1023
                    rule.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform))
1024
                except CPEDoesNotExist:
1025
                    print("Unsupported platform '%s' in rule '%s'." % (platform, rule.id_))
1026
                    raise
1027
1028 2
    def __str__(self):
1029
        return self.id_
1030
1031
1032 2
class Rule(object):
1033
    """Represents XCCDF Rule
1034
    """
1035 2
    YAML_KEYS_DEFAULTS = {
1036
        "prodtype": lambda: "all",
1037
        "title": lambda: RuntimeError("Missing key 'title'"),
1038
        "description": lambda: RuntimeError("Missing key 'description'"),
1039
        "rationale": lambda: RuntimeError("Missing key 'rationale'"),
1040
        "severity": lambda: RuntimeError("Missing key 'severity'"),
1041
        "references": lambda: dict(),
1042
        "identifiers": lambda: dict(),
1043
        "ocil_clause": lambda: None,
1044
        "ocil": lambda: None,
1045
        "oval_external_content": lambda: None,
1046
        "warnings": lambda: list(),
1047
        "conflicts": lambda: list(),
1048
        "requires": lambda: list(),
1049
        "platform": lambda: None,
1050
        "platforms": lambda: set(),
1051
        "inherited_platforms": lambda: list(),
1052
        "template": lambda: None,
1053
        "definition_location": lambda: None,
1054
    }
1055
1056 2
    PRODUCT_REFERENCES = ("stigid", "cis",)
1057 2
    GLOBAL_REFERENCES = ("srg", "vmmsrg", "disa", "cis-csc",)
1058
1059 2
    def __init__(self, id_):
1060 2
        self.id_ = id_
1061 2
        self.prodtype = "all"
1062 2
        self.title = ""
1063 2
        self.description = ""
1064 2
        self.definition_location = ""
1065 2
        self.rationale = ""
1066 2
        self.severity = "unknown"
1067 2
        self.references = {}
1068 2
        self.identifiers = {}
1069 2
        self.ocil_clause = None
1070 2
        self.ocil = None
1071 2
        self.oval_external_content = None
1072 2
        self.warnings = []
1073 2
        self.requires = []
1074 2
        self.conflicts = []
1075
        # self.platforms is used further in the build system
1076
        # self.platform is merged into self.platforms
1077
        # it is here for backward compatibility
1078 2
        self.platform = None
1079 2
        self.platforms = set()
1080 2
        self.cpe_names = set()
1081 2
        self.inherited_platforms = [] # platforms inherited from the group
1082 2
        self.template = None
1083 2
        self.local_env_yaml = None
1084
1085 2
    @classmethod
1086 2
    def from_yaml(cls, yaml_file, env_yaml=None):
1087 2
        yaml_file = os.path.normpath(yaml_file)
1088
1089 2
        rule_id, ext = os.path.splitext(os.path.basename(yaml_file))
1090 2
        if rule_id == "rule" and ext == ".yml":
1091 2
            rule_id = get_rule_dir_id(yaml_file)
1092
1093 2
        local_env_yaml = None
1094 2
        if env_yaml:
1095
            local_env_yaml = dict()
1096
            local_env_yaml.update(env_yaml)
1097
            local_env_yaml["rule_id"] = rule_id
1098
1099 2
        yaml_contents = open_and_macro_expand(yaml_file, local_env_yaml)
1100 2
        if yaml_contents is None:
1101
            return None
1102
1103 2
        rule = cls(rule_id)
1104
1105 2
        if local_env_yaml:
1106
            rule.local_env_yaml = local_env_yaml
1107
1108 2
        try:
1109 2
            rule._set_attributes_from_dict(yaml_contents)
1110
        except RuntimeError as exc:
1111
            msg = ("Error processing '{fname}': {err}"
1112
                   .format(fname=yaml_file, err=str(exc)))
1113
            raise RuntimeError(msg)
1114
1115
        # platforms are read as list from the yaml file
1116
        # we need them to convert to set again
1117 2
        rule.platforms = set(rule.platforms)
1118
1119 2
        for warning_list in rule.warnings:
1120
            if len(warning_list) != 1:
1121
                raise ValueError("Only one key/value pair should exist for each dictionary")
1122
1123
        # ensure that content of rule.platform is in rule.platforms as
1124
        # well
1125 2
        if rule.platform is not None:
1126 2
            rule.platforms.add(rule.platform)
1127
1128
        # Convert the platform names to CPE names
1129
        # But only do it if an env_yaml was specified (otherwise there would be no product CPEs
1130
        # to lookup), and the rule's prodtype matches the product being built
1131 2
        if env_yaml and env_yaml["product"] in parse_prodtype(rule.prodtype):
1132
            for platform in rule.platforms:
1133
                try:
1134
                    rule.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform))
1135
                except CPEDoesNotExist:
1136
                    print("Unsupported platform '%s' in rule '%s'." % (platform, rule.id_))
1137
                    raise
1138
1139 2
        if yaml_contents:
1140
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
1141
                               % (yaml_file, yaml_contents))
1142
1143 2
        if not rule.definition_location:
1144 2
            rule.definition_location = yaml_file
1145
1146 2
        rule.validate_prodtype(yaml_file)
1147 2
        rule.validate_identifiers(yaml_file)
1148 2
        rule.validate_references(yaml_file)
1149 2
        return rule
1150
1151 2
    def _verify_stigid_format(self, product):
1152 2
        stig_id = self.references.get("stigid", None)
1153 2
        if not stig_id:
1154 2
            return
1155 2
        if "," in stig_id:
1156 2
            raise ValueError("Rules can not have multiple STIG IDs.")
1157
1158 2
    def _verify_disa_cci_format(self):
1159 2
        cci_id = self.references.get("disa", None)
1160 2
        if not cci_id:
1161 2
            return
1162
        cci_ex = re.compile(r'^CCI-[0-9]{6}$')
1163
        for cci in cci_id.split(","):
1164
            if not cci_ex.match(cci):
1165
                raise ValueError("CCI '{}' is in the wrong format! "
1166
                                 "Format should be similar to: "
1167
                                 "CCI-XXXXXX".format(cci))
1168
        self.references["disa"] = cci_id
1169
1170 2
    def normalize(self, product):
1171 2
        try:
1172 2
            self.make_refs_and_identifiers_product_specific(product)
1173 2
            self.make_template_product_specific(product)
1174 2
        except Exception as exc:
1175 2
            msg = (
1176
                "Error normalizing '{rule}': {msg}"
1177
                .format(rule=self.id_, msg=str(exc))
1178
            )
1179 2
            raise RuntimeError(msg)
1180
1181 2
    def _get_product_only_references(self):
1182 2
        product_references = dict()
1183
1184 2
        for ref in Rule.PRODUCT_REFERENCES:
1185 2
            start = "{0}@".format(ref)
1186 2
            for gref, gval in self.references.items():
1187 2
                if ref == gref or gref.startswith(start):
1188 2
                    product_references[gref] = gval
1189 2
        return product_references
1190
1191 2
    def make_template_product_specific(self, product):
1192 2
        product_suffix = "@{0}".format(product)
1193
1194 2
        if not self.template:
1195
            return
1196
1197 2
        not_specific_vars = self.template.get("vars", dict())
1198 2
        specific_vars = self._make_items_product_specific(
1199
            not_specific_vars, product_suffix, True)
1200 2
        self.template["vars"] = specific_vars
1201
1202 2
        not_specific_backends = self.template.get("backends", dict())
1203 2
        specific_backends = self._make_items_product_specific(
1204
            not_specific_backends, product_suffix, True)
1205 2
        self.template["backends"] = specific_backends
1206
1207 2
    def make_refs_and_identifiers_product_specific(self, product):
1208 2
        product_suffix = "@{0}".format(product)
1209
1210 2
        product_references = self._get_product_only_references()
1211 2
        general_references = self.references.copy()
1212 2
        for todel in product_references:
1213 2
            general_references.pop(todel)
1214 2
        for ref in Rule.PRODUCT_REFERENCES:
1215 2
            if ref in general_references:
1216
                msg = "Unexpected reference identifier ({0}) without "
1217
                msg += "product qualifier ({0}@{1}) while building rule "
1218
                msg += "{2}"
1219
                msg = msg.format(ref, product, self.id_)
1220
                raise ValueError(msg)
1221
1222 2
        to_set = dict(
1223
            identifiers=(self.identifiers, False),
1224
            general_references=(general_references, True),
1225
            product_references=(product_references, False),
1226
        )
1227 2
        for name, (dic, allow_overwrites) in to_set.items():
1228 2
            try:
1229 2
                new_items = self._make_items_product_specific(
1230
                    dic, product_suffix, allow_overwrites)
1231 2
            except ValueError as exc:
1232 2
                msg = (
1233
                    "Error processing {what} for rule '{rid}': {msg}"
1234
                    .format(what=name, rid=self.id_, msg=str(exc))
1235
                )
1236 2
                raise ValueError(msg)
1237 2
            dic.clear()
1238 2
            dic.update(new_items)
1239
1240 2
        self.references = general_references
1241 2
        self._verify_disa_cci_format()
1242 2
        self.references.update(product_references)
1243
1244 2
        self._verify_stigid_format(product)
1245
1246 2
    def _make_items_product_specific(self, items_dict, product_suffix, allow_overwrites=False):
1247 2
        new_items = dict()
1248 2
        for full_label, value in items_dict.items():
1249 2
            if "@" not in full_label and full_label not in new_items:
1250 2
                new_items[full_label] = value
1251 2
                continue
1252
1253 2
            label = full_label.split("@")[0]
1254
1255
            # this test should occur before matching product_suffix with the product qualifier
1256
            # present in the reference, so it catches problems even for products that are not
1257
            # being built at the moment
1258 2
            if label in Rule.GLOBAL_REFERENCES:
1259
                msg = (
1260
                    "You cannot use product-qualified for the '{item_u}' reference. "
1261
                    "Please remove the product-qualifier and merge values with the "
1262
                    "existing reference if there is any. Original line: {item_q}: {value_q}"
1263
                    .format(item_u=label, item_q=full_label, value_q=value)
1264
                )
1265
                raise ValueError(msg)
1266
1267 2
            if not full_label.endswith(product_suffix):
1268 2
                continue
1269
1270 2
            if label in items_dict and not allow_overwrites and value != items_dict[label]:
1271 2
                msg = (
1272
                    "There is a product-qualified '{item_q}' item, "
1273
                    "but also an unqualified '{item_u}' item "
1274
                    "and those two differ in value - "
1275
                    "'{value_q}' vs '{value_u}' respectively."
1276
                    .format(item_q=full_label, item_u=label,
1277
                            value_q=value, value_u=items_dict[label])
1278
                )
1279 2
                raise ValueError(msg)
1280 2
            new_items[label] = value
1281 2
        return new_items
1282
1283 2
    def _set_attributes_from_dict(self, yaml_contents):
1284 2
        for key, default_getter in self.YAML_KEYS_DEFAULTS.items():
1285 2
            if key not in yaml_contents:
1286 2
                value = default_getter()
1287 2
                if isinstance(value, Exception):
1288
                    raise value
1289
            else:
1290 2
                value = yaml_contents.pop(key)
1291
1292 2
            setattr(self, key, value)
1293
1294 2
    def to_contents_dict(self):
1295
        """
1296
        Returns a dictionary that is the same schema as the dict obtained when loading rule YAML.
1297
        """
1298
1299 2
        yaml_contents = dict()
1300 2
        for key in Rule.YAML_KEYS_DEFAULTS:
1301 2
            yaml_contents[key] = getattr(self, key)
1302
1303 2
        return yaml_contents
1304
1305 2
    def validate_identifiers(self, yaml_file):
1306 2
        if self.identifiers is None:
1307
            raise ValueError("Empty identifier section in file %s" % yaml_file)
1308
1309
        # Validate all identifiers are non-empty:
1310 2
        for ident_type, ident_val in self.identifiers.items():
1311 2
            if not isinstance(ident_type, str) or not isinstance(ident_val, str):
1312
                raise ValueError("Identifiers and values must be strings: %s in file %s"
1313
                                 % (ident_type, yaml_file))
1314 2
            if ident_val.strip() == "":
1315
                raise ValueError("Identifiers must not be empty: %s in file %s"
1316
                                 % (ident_type, yaml_file))
1317 2
            if ident_type[0:3] == 'cce':
1318 2
                if not is_cce_format_valid(ident_val):
1319
                    raise ValueError("CCE Identifier format must be valid: invalid format '%s' for CEE '%s'"
1320
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
1321 2
                if not is_cce_value_valid("CCE-" + ident_val):
1322
                    raise ValueError("CCE Identifier value is not a valid checksum: invalid value '%s' for CEE '%s'"
1323
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
1324
1325 2
    def validate_references(self, yaml_file):
1326 2
        if self.references is None:
1327
            raise ValueError("Empty references section in file %s" % yaml_file)
1328
1329 2
        for ref_type, ref_val in self.references.items():
1330 2
            if not isinstance(ref_type, str) or not isinstance(ref_val, str):
1331
                raise ValueError("References and values must be strings: %s in file %s"
1332
                                 % (ref_type, yaml_file))
1333 2
            if ref_val.strip() == "":
1334
                raise ValueError("References must not be empty: %s in file %s"
1335
                                 % (ref_type, yaml_file))
1336
1337 2
        for ref_type, ref_val in self.references.items():
1338 2
            for ref in ref_val.split(","):
1339 2
                if ref.strip() != ref:
1340
                    msg = (
1341
                        "Comma-separated '{ref_type}' reference "
1342
                        "in {yaml_file} contains whitespace."
1343
                        .format(ref_type=ref_type, yaml_file=yaml_file))
1344
                    raise ValueError(msg)
1345
1346 2
    def validate_prodtype(self, yaml_file):
1347 2
        for ptype in self.prodtype.split(","):
1348 2
            if ptype.strip() != ptype:
1349
                msg = (
1350
                    "Comma-separated '{prodtype}' prodtype "
1351
                    "in {yaml_file} contains whitespace."
1352
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
1353
                raise ValueError(msg)
1354
1355 2
    def to_xml_element(self):
1356
        rule = ET.Element('Rule')
1357
        rule.set('id', self.id_)
1358
        if self.prodtype != "all":
1359
            rule.set("prodtype", self.prodtype)
1360
        rule.set('severity', self.severity)
1361
        add_sub_element(rule, 'title', self.title)
1362
        add_sub_element(rule, 'description', self.description)
1363
        add_sub_element(rule, 'rationale', self.rationale)
1364
1365
        main_ident = ET.Element('ident')
1366
        for ident_type, ident_val in self.identifiers.items():
1367
            # This is not true if items were normalized
1368
            if '@' in ident_type:
1369
                # the ident is applicable only on some product
1370
                # format : 'policy@product', eg. 'stigid@product'
1371
                # for them, we create a separate <ref> element
1372
                policy, product = ident_type.split('@')
1373
                ident = ET.SubElement(rule, 'ident')
1374
                ident.set(policy, ident_val)
1375
                ident.set('prodtype', product)
1376
            else:
1377
                main_ident.set(ident_type, ident_val)
1378
1379
        if main_ident.attrib:
1380
            rule.append(main_ident)
1381
1382
        main_ref = ET.Element('ref')
1383
        for ref_type, ref_val in self.references.items():
1384
            # This is not true if items were normalized
1385
            if '@' in ref_type:
1386
                # the reference is applicable only on some product
1387
                # format : 'policy@product', eg. 'stigid@product'
1388
                # for them, we create a separate <ref> element
1389
                policy, product = ref_type.split('@')
1390
                ref = ET.SubElement(rule, 'ref')
1391
                ref.set(policy, ref_val)
1392
                ref.set('prodtype', product)
1393
            else:
1394
                main_ref.set(ref_type, ref_val)
1395
1396
        if main_ref.attrib:
1397
            rule.append(main_ref)
1398
1399
        if self.oval_external_content:
1400
            check = ET.SubElement(rule, 'check')
1401
            check.set("system", "http://oval.mitre.org/XMLSchema/oval-definitions-5")
1402
            external_content = ET.SubElement(check, "check-content-ref")
1403
            external_content.set("href", self.oval_external_content)
1404
        else:
1405
            # TODO: This is pretty much a hack, oval ID will be the same as rule ID
1406
            #       and we don't want the developers to have to keep them in sync.
1407
            #       Therefore let's just add an OVAL ref of that ID.
1408
            oval_ref = ET.SubElement(rule, "oval")
1409
            oval_ref.set("id", self.id_)
1410
1411
        if self.ocil or self.ocil_clause:
1412
            ocil = add_sub_element(rule, 'ocil', self.ocil if self.ocil else "")
1413
            if self.ocil_clause:
1414
                ocil.set("clause", self.ocil_clause)
1415
1416
        add_warning_elements(rule, self.warnings)
1417
        add_nondata_subelements(rule, "requires", "id", self.requires)
1418
        add_nondata_subelements(rule, "conflicts", "id", self.conflicts)
1419
1420
        for cpe_name in self.cpe_names:
1421
            platform_el = ET.SubElement(rule, "platform")
1422
            platform_el.set("idref", cpe_name)
1423
1424
        return rule
1425
1426 2
    def to_file(self, file_name):
1427
        root = self.to_xml_element()
1428
        tree = ET.ElementTree(root)
1429
        tree.write(file_name)
1430
1431
1432 2
class DirectoryLoader(object):
1433 2
    def __init__(self, profiles_dir, bash_remediation_fns, env_yaml):
1434
        self.benchmark_file = None
1435
        self.group_file = None
1436
        self.loaded_group = None
1437
        self.rule_files = []
1438
        self.value_files = []
1439
        self.subdirectories = []
1440
1441
        self.all_values = set()
1442
        self.all_rules = set()
1443
        self.all_groups = set()
1444
1445
        self.profiles_dir = profiles_dir
1446
        self.bash_remediation_fns = bash_remediation_fns
1447
        self.env_yaml = env_yaml
1448
        self.product = env_yaml["product"]
1449
1450
        self.parent_group = None
1451
1452 2
    def _collect_items_to_load(self, guide_directory):
1453
        for dir_item in sorted(os.listdir(guide_directory)):
1454
            dir_item_path = os.path.join(guide_directory, dir_item)
1455
            _, extension = os.path.splitext(dir_item)
1456
1457
            if extension == '.var':
1458
                self.value_files.append(dir_item_path)
1459
            elif dir_item == "benchmark.yml":
1460
                if self.benchmark_file:
1461
                    raise ValueError("Multiple benchmarks in one directory")
1462
                self.benchmark_file = dir_item_path
1463
            elif dir_item == "group.yml":
1464
                if self.group_file:
1465
                    raise ValueError("Multiple groups in one directory")
1466
                self.group_file = dir_item_path
1467
            elif extension == '.rule':
1468
                self.rule_files.append(dir_item_path)
1469
            elif is_rule_dir(dir_item_path):
1470
                self.rule_files.append(get_rule_dir_yaml(dir_item_path))
1471
            elif dir_item != "tests":
1472
                if os.path.isdir(dir_item_path):
1473
                    self.subdirectories.append(dir_item_path)
1474
                else:
1475
                    sys.stderr.write(
1476
                        "Encountered file '%s' while recursing, extension '%s' "
1477
                        "is unknown. Skipping..\n"
1478
                        % (dir_item, extension)
1479
                    )
1480
1481 2
    def load_benchmark_or_group(self, guide_directory):
1482
        """
1483
        Loads a given benchmark or group from the specified benchmark_file or
1484
        group_file, in the context of guide_directory, profiles_dir,
1485
        env_yaml, and bash_remediation_fns.
1486
1487
        Returns the loaded group or benchmark.
1488
        """
1489
        group = None
1490
        if self.group_file and self.benchmark_file:
1491
            raise ValueError("A .benchmark file and a .group file were found in "
1492
                             "the same directory '%s'" % (guide_directory))
1493
1494
        # we treat benchmark as a special form of group in the following code
1495
        if self.benchmark_file:
1496
            group = Benchmark.from_yaml(
1497
                self.benchmark_file, 'product-name', self.env_yaml
1498
            )
1499
            if self.profiles_dir:
1500
                group.add_profiles_from_dir(self.profiles_dir, self.env_yaml)
1501
            group.add_bash_remediation_fns_from_file(self.bash_remediation_fns)
1502
1503
        if self.group_file:
1504
            group = Group.from_yaml(self.group_file, self.env_yaml)
1505
            self.all_groups.add(group.id_)
1506
1507
        return group
1508
1509 2
    def _load_group_process_and_recurse(self, guide_directory):
1510
        self.loaded_group = self.load_benchmark_or_group(guide_directory)
1511
1512
        if self.loaded_group:
1513
            if self.parent_group:
1514
                self.parent_group.add_group(self.loaded_group, env_yaml=self.env_yaml)
1515
1516
            self._process_values()
1517
            self._recurse_into_subdirs()
1518
            self._process_rules()
1519
1520 2
    def process_directory_tree(self, start_dir, extra_group_dirs=None):
1521
        self._collect_items_to_load(start_dir)
1522
        if extra_group_dirs is not None:
1523
            self.subdirectories += extra_group_dirs
1524
        self._load_group_process_and_recurse(start_dir)
1525
1526 2
    def _recurse_into_subdirs(self):
1527
        for subdir in self.subdirectories:
1528
            loader = self._get_new_loader()
1529
            loader.parent_group = self.loaded_group
1530
            loader.process_directory_tree(subdir)
1531
            self.all_values.update(loader.all_values)
1532
            self.all_rules.update(loader.all_rules)
1533
            self.all_groups.update(loader.all_groups)
1534
1535 2
    def _get_new_loader(self):
1536
        raise NotImplementedError()
1537
1538 2
    def _process_values(self):
1539
        raise NotImplementedError()
1540
1541 2
    def _process_rules(self):
1542
        raise NotImplementedError()
1543
1544
1545 2
class BuildLoader(DirectoryLoader):
1546 2
    def __init__(self, profiles_dir, bash_remediation_fns, env_yaml, resolved_rules_dir=None):
1547
        super(BuildLoader, self).__init__(profiles_dir, bash_remediation_fns, env_yaml)
1548
1549
        self.resolved_rules_dir = resolved_rules_dir
1550
        if resolved_rules_dir and not os.path.isdir(resolved_rules_dir):
1551
            os.mkdir(resolved_rules_dir)
1552
1553 2
    def _process_values(self):
1554
        for value_yaml in self.value_files:
1555
            value = Value.from_yaml(value_yaml, self.env_yaml)
1556
            self.all_values.add(value)
1557
            self.loaded_group.add_value(value)
1558
1559 2
    def _process_rules(self):
1560
        for rule_yaml in self.rule_files:
1561
            try:
1562
                rule = Rule.from_yaml(rule_yaml, self.env_yaml)
1563
            except DocumentationNotComplete:
1564
                # Happens on non-debug build when a rule is "documentation-incomplete"
1565
                continue
1566
            prodtypes = parse_prodtype(rule.prodtype)
1567
            if "all" not in prodtypes and self.product not in prodtypes:
1568
                continue
1569
            self.all_rules.add(rule)
1570
            self.loaded_group.add_rule(rule, env_yaml=self.env_yaml)
1571
1572
            if self.loaded_group.platforms:
1573
                rule.inherited_platforms += self.loaded_group.platforms
1574
1575
            if self.resolved_rules_dir:
1576
                output_for_rule = os.path.join(
1577
                    self.resolved_rules_dir, "{id_}.yml".format(id_=rule.id_))
1578
                mkdir_p(self.resolved_rules_dir)
1579
                with open(output_for_rule, "w") as f:
1580
                    rule.normalize(self.env_yaml["product"])
1581
                    yaml.dump(rule.to_contents_dict(), f)
1582
1583 2
    def _get_new_loader(self):
1584
        return BuildLoader(
1585
            self.profiles_dir, self.bash_remediation_fns, self.env_yaml, self.resolved_rules_dir)
1586
1587 2
    def export_group_to_file(self, filename):
1588
        return self.loaded_group.to_file(filename)
1589