Test Failed
Push — master ( 56d79d...105233 )
by Alexander
02:28 queued 10s
created

ssg.build_yaml   F

Complexity

Total Complexity 322

Size/Duplication

Total Lines 1565
Duplicated Lines 1.41 %

Test Coverage

Coverage 33.93%

Importance

Changes 0
Metric Value
eloc 1126
dl 22
loc 1565
ccs 343
cts 1011
cp 0.3393
rs 1.096
c 0
b 0
f 0
wmc 322

4 Functions

Rating   Name   Duplication   Size   Complexity  
A add_warning_elements() 0 15 2
B reorder_according_to_ordering() 0 16 6
A add_sub_element() 0 28 2
A add_nondata_subelements() 0 6 2

85 Methods

Rating   Name   Duplication   Size   Complexity  
A ProfileWithSeparatePolicies._process_controls_ids_into_controls() 11 13 4
A ResolvableProfile.extend_by() 0 12 1
A Profile.get_variable_selectors() 0 2 1
A Profile.__sub__() 0 13 1
A ProfileWithInlinePolicies.__init__() 0 3 1
C Profile.dump_yaml() 0 29 10
A Profile.get_rule_selectors() 0 2 1
A ProfileWithInlinePolicies.resolve_controls() 0 7 3
A ResolvableProfile._merge_control() 0 5 3
A ResolvableProfile.__init__() 0 4 1
A Benchmark.to_file() 0 4 1
A Benchmark.add_value() 0 4 2
A Profile.validate_refine_rules() 0 26 4
A Benchmark.__init__() 0 26 1
A ProfileWithSeparatePolicies._parse_policies() 0 11 4
A ProfileWithSeparatePolicies.extend_by() 0 3 1
C Profile.to_xml_element() 0 43 9
A ResolvableProfile._controls_ids_to_controls() 0 3 1
B Profile.from_yaml() 0 34 8
B ResolvableProfile.resolve() 0 30 5
A Profile.apply_selection() 0 20 5
A Value.to_file() 0 4 1
A Profile.read_yaml_contents() 0 12 2
B Benchmark.from_yaml() 0 39 5
B Benchmark.add_profiles_from_dir() 0 27 7
A ResolvableProfile._subtract_refinements() 0 11 4
A Benchmark.add_bash_remediation_fns_from_file() 0 7 2
A ProfileWithInlinePolicies._process_controls_ids_into_controls() 11 14 4
A ProfileWithSeparatePolicies.resolve_controls() 0 18 5
B Value.from_yaml() 0 42 6
A Profile._parse_selections() 0 3 2
A Profile.validate_rules() 0 21 4
A ProfileWithInlinePolicies.apply_selection() 0 6 2
A Profile.__init__() 0 17 1
A Value.to_xml_element() 0 23 5
A ProfileWithSeparatePolicies.__init__() 0 3 1
A ProfileWithSeparatePolicies.read_yaml_contents() 0 5 2
B Benchmark.to_xml_element() 0 54 8
A ResolvableProfile.resolve_controls() 0 2 1
B Profile.validate_variables() 0 31 5
A Value.__init__() 0 9 1
A Group.add_value() 0 4 2
B Group.to_xml_element() 0 74 6
A Group.validate_prodtype() 0 8 3
C Group.from_yaml() 0 40 9
A Benchmark.add_rule() 0 4 2
A Group.to_file() 0 4 1
A Benchmark.to_xccdf() 0 5 1
A Group.__init__() 0 17 1
A Benchmark.add_group() 0 4 2
A Benchmark.__str__() 0 2 1
A Group.add_rule() 0 7 4
A Group._pass_our_properties_on_to() 0 4 4
B Group.add_group() 0 16 7
A Group.__str__() 0 2 1
B Rule.make_refs_and_identifiers_product_specific() 0 38 6
F Rule.from_yaml() 0 52 14
A Rule.normalize() 0 10 2
A Rule._verify_disa_cci_format() 0 11 4
A Rule._verify_stigid_format() 0 6 3
A Rule._get_product_only_references() 0 9 5
A Rule.__init__() 0 24 1
A Rule.make_template_product_specific() 0 15 2
A BuildLoader.__init__() 0 6 3
A Rule.to_file() 0 4 1
F Rule.to_xml_element() 0 70 14
B BuildLoader._process_rules() 0 23 8
A DirectoryLoader._recurse_into_subdirs() 0 8 2
C Rule.validate_identifiers() 0 19 9
A Rule._set_attributes_from_dict() 0 10 4
A DirectoryLoader._process_values() 0 2 1
A DirectoryLoader.__init__() 0 18 1
C Rule._make_items_product_specific() 0 36 9
A BuildLoader._get_new_loader() 0 3 1
B DirectoryLoader.load_benchmark_or_group() 0 27 6
A Rule.validate_prodtype() 0 8 3
C DirectoryLoader._collect_items_to_load() 0 27 11
A DirectoryLoader._get_new_loader() 0 2 1
C Rule.validate_references() 0 20 9
A BuildLoader.export_group_to_file() 0 2 1
A DirectoryLoader.process_directory_tree() 0 5 2
A BuildLoader._process_values() 0 5 2
A DirectoryLoader._load_group_process_and_recurse() 0 10 3
A DirectoryLoader._process_rules() 0 2 1
A Rule.to_contents_dict() 0 10 2

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like ssg.build_yaml often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 2
from __future__ import absolute_import
2 2
from __future__ import print_function
3
4 2
import os
5 2
import os.path
6 2
from collections import defaultdict
7 2
from copy import deepcopy
8 2
import datetime
9 2
import re
10 2
import sys
11 2
from xml.sax.saxutils import escape
12
13 2
import yaml
14
15 2
from .build_cpe import CPEDoesNotExist
16 2
from .constants import XCCDF_REFINABLE_PROPERTIES
17 2
from .rules import get_rule_dir_id, get_rule_dir_yaml, is_rule_dir
18 2
from .rule_yaml import parse_prodtype
19 2
from .controls import Control
20
21 2
from .checks import is_cce_format_valid, is_cce_value_valid
22 2
from .yaml import DocumentationNotComplete, open_and_expand, open_and_macro_expand
23 2
from .utils import required_key, mkdir_p
24
25 2
from .xml import ElementTree as ET
26 2
from .shims import unicode_func
27
28
29 2
def add_sub_element(parent, tag, data):
30
    """
31
    Creates a new child element under parent with tag tag, and sets
32
    data as the content under the tag. In particular, data is a string
33
    to be parsed as an XML tree, allowing sub-elements of children to be
34
    added.
35
36
    If data should not be parsed as an XML tree, either escape the contents
37
    before passing into this function, or use ElementTree.SubElement().
38
39
    Returns the newly created subelement of type tag.
40
    """
41
    # This is used because our YAML data contain XML and XHTML elements
42
    # ET.SubElement() escapes the < > characters by &lt; and &gt;
43
    # and therefore it does not add child elements
44
    # we need to do a hack instead
45
    # TODO: Remove this function after we move to Markdown everywhere in SSG
46
    ustr = unicode_func("<{0}>{1}</{0}>").format(tag, data)
47
48
    try:
49
        element = ET.fromstring(ustr.encode("utf-8"))
50
    except Exception:
51
        msg = ("Error adding subelement to an element '{0}' from string: '{1}'"
52
               .format(parent.tag, ustr))
53
        raise RuntimeError(msg)
54
55
    parent.append(element)
56
    return element
57
58
59 2
def reorder_according_to_ordering(unordered, ordering, regex=None):
60 2
    ordered = []
61 2
    if regex is None:
62 2
        regex = "|".join(["({0})".format(item) for item in ordering])
63 2
    regex = re.compile(regex)
64
65 2
    items_to_order = list(filter(regex.match, unordered))
66 2
    unordered = set(unordered)
67
68 2
    for priority_type in ordering:
69 2
        for item in items_to_order:
70 2
            if priority_type in item and item in unordered:
71 2
                ordered.append(item)
72 2
                unordered.remove(item)
73 2
    ordered.extend(list(unordered))
74 2
    return ordered
75
76
77 2
def add_warning_elements(element, warnings):
78
    # The use of [{dict}, {dict}] in warnings is to handle the following
79
    # scenario where multiple warnings have the same category which is
80
    # valid in SCAP and our content:
81
    #
82
    # warnings:
83
    #     - general: Some general warning
84
    #     - general: Some other general warning
85
    #     - general: |-
86
    #         Some really long multiline general warning
87
    #
88
    # Each of the {dict} should have only one key/value pair.
89
    for warning_dict in warnings:
90
        warning = add_sub_element(element, "warning", list(warning_dict.values())[0])
91
        warning.set("category", list(warning_dict.keys())[0])
92
93
94 2
def add_nondata_subelements(element, subelement, attribute, attr_data):
95
    """Add multiple iterations of a sublement that contains an attribute but no data
96
       For example, <requires id="my_required_id"/>"""
97
    for data in attr_data:
98
        req = ET.SubElement(element, subelement)
99
        req.set(attribute, data)
100
101
102 2
class Profile(object):
103
    """Represents XCCDF profile
104
    """
105
106 2
    def __init__(self, id_):
107 2
        self.id_ = id_
108 2
        self.title = ""
109 2
        self.description = ""
110 2
        self.extends = None
111 2
        self.selected = []
112 2
        self.unselected = []
113 2
        self.variables = dict()
114 2
        self.refine_rules = defaultdict(list)
115 2
        self.metadata = None
116 2
        self.reference = None
117
        # self.platforms is used further in the build system
118
        # self.platform is merged into self.platforms
119
        # it is here for backward compatibility
120 2
        self.platforms = set()
121 2
        self.cpe_names = set()
122 2
        self.platform = None
123
124
125 2
    def read_yaml_contents(self, yaml_contents):
126 2
        self.title = required_key(yaml_contents, "title")
127 2
        del yaml_contents["title"]
128 2
        self.description = required_key(yaml_contents, "description")
129 2
        del yaml_contents["description"]
130 2
        self.extends = yaml_contents.pop("extends", None)
131 2
        selection_entries = required_key(yaml_contents, "selections")
132 2
        if selection_entries:
133 2
            self._parse_selections(selection_entries)
134 2
        del yaml_contents["selections"]
135 2
        self.platforms = yaml_contents.pop("platforms", set())
136 2
        self.platform = yaml_contents.pop("platform", None)
137
138 2
    @classmethod
139 2
    def from_yaml(cls, yaml_file, env_yaml=None):
140 2
        yaml_contents = open_and_expand(yaml_file, env_yaml)
141 2
        if yaml_contents is None:
142
            return None
143
144 2
        basename, _ = os.path.splitext(os.path.basename(yaml_file))
145
146 2
        profile = cls(basename)
147 2
        profile.read_yaml_contents(yaml_contents)
148
149 2
        profile.reference = yaml_contents.pop("reference", None)
150
        # ensure that content of profile.platform is in profile.platforms as
151
        # well
152 2
        if profile.platform is not None:
153
            profile.platforms.add(profile.platform)
154
155 2
        if env_yaml:
156
            for platform in profile.platforms:
157
                try:
158
                    profile.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform))
159
                except CPEDoesNotExist:
160
                    print("Unsupported platform '%s' in profile '%s'." % (platform, profile.id_))
161
                    raise
162
163
        # At the moment, metadata is not used to build content
164 2
        if "metadata" in yaml_contents:
165
            del yaml_contents["metadata"]
166
167 2
        if yaml_contents:
168
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
169
                               % (yaml_file, yaml_contents))
170
171 2
        return profile
172
173 2
    def dump_yaml(self, file_name, documentation_complete=True):
174
        to_dump = {}
175
        to_dump["documentation_complete"] = documentation_complete
176
        to_dump["title"] = self.title
177
        to_dump["description"] = self.description
178
        to_dump["reference"] = self.reference
179
        if self.metadata is not None:
180
            to_dump["metadata"] = self.metadata
181
182
        if self.extends is not None:
183
            to_dump["extends"] = self.extends
184
185
        if self.platforms:
186
            to_dump["platforms"] = self.platforms
187
188
        selections = []
189
        for item in self.selected:
190
            selections.append(item)
191
        for item in self.unselected:
192
            selections.append("!"+item)
193
        for varname in self.variables.keys():
194
            selections.append(varname+"="+self.variables.get(varname))
195
        for rule, refinements in self.refine_rules.items():
196
            for prop, val in refinements:
197
                selections.append("{rule}.{property}={value}"
198
                                  .format(rule=rule, property=prop, value=val))
199
        to_dump["selections"] = selections
200
        with open(file_name, "w+") as f:
201
            yaml.dump(to_dump, f, indent=4)
202
203 2
    def _parse_selections(self, entries):
204 2
        for item in entries:
205 2
            self.apply_selection(item)
206
207 2
    def apply_selection(self, item):
208 2
        if "." in item:
209
            rule, refinement = item.split(".", 1)
210
            property_, value = refinement.split("=", 1)
211
            if property_ not in XCCDF_REFINABLE_PROPERTIES:
212
                msg = ("Property '{property_}' cannot be refined. "
213
                       "Rule properties that can be refined are {refinables}. "
214
                       "Fix refinement '{rule_id}.{property_}={value}' in profile '{profile}'."
215
                       .format(property_=property_, refinables=XCCDF_REFINABLE_PROPERTIES,
216
                               rule_id=rule, value=value, profile=self.id_)
217
                       )
218
                raise ValueError(msg)
219
            self.refine_rules[rule].append((property_, value))
220 2
        elif "=" in item:
221 2
            varname, value = item.split("=", 1)
222 2
            self.variables[varname] = value
223 2
        elif item.startswith("!"):
224
            self.unselected.append(item[1:])
225
        else:
226 2
            self.selected.append(item)
227
228 2
    def to_xml_element(self):
229
        element = ET.Element('Profile')
230
        element.set("id", self.id_)
231
        if self.extends:
232
            element.set("extends", self.extends)
233
        title = add_sub_element(element, "title", self.title)
234
        title.set("override", "true")
235
        desc = add_sub_element(element, "description", self.description)
236
        desc.set("override", "true")
237
238
        if self.reference:
239
            add_sub_element(element, "reference", escape(self.reference))
240
241
        for cpe_name in self.cpe_names:
242
            plat = ET.SubElement(element, "platform")
243
            plat.set("idref", cpe_name)
244
245
        for selection in self.selected:
246
            select = ET.Element("select")
247
            select.set("idref", selection)
248
            select.set("selected", "true")
249
            element.append(select)
250
251
        for selection in self.unselected:
252
            unselect = ET.Element("select")
253
            unselect.set("idref", selection)
254
            unselect.set("selected", "false")
255
            element.append(unselect)
256
257
        for value_id, selector in self.variables.items():
258
            refine_value = ET.Element("refine-value")
259
            refine_value.set("idref", value_id)
260
            refine_value.set("selector", selector)
261
            element.append(refine_value)
262
263
        for refined_rule, refinement_list in self.refine_rules.items():
264
            refine_rule = ET.Element("refine-rule")
265
            refine_rule.set("idref", refined_rule)
266
            for refinement in refinement_list:
267
                refine_rule.set(refinement[0], refinement[1])
268
            element.append(refine_rule)
269
270
        return element
271
272 2
    def get_rule_selectors(self):
273 2
        return list(self.selected + self.unselected)
274
275 2
    def get_variable_selectors(self):
276 2
        return self.variables
277
278 2
    def validate_refine_rules(self, rules):
279
        existing_rule_ids = [r.id_ for r in rules]
280
        for refine_rule, refinement_list in self.refine_rules.items():
281
            # Take first refinement to ilustrate where the error is
282
            # all refinements in list are invalid, so it doesn't really matter
283
            a_refinement = refinement_list[0]
284
285
            if refine_rule not in existing_rule_ids:
286
                msg = (
287
                    "You are trying to refine a rule that doesn't exist. "
288
                    "Rule '{rule_id}' was not found in the benchmark. "
289
                    "Please check all rule refinements for rule: '{rule_id}', for example: "
290
                    "- {rule_id}.{property_}={value}' in profile {profile_id}."
291
                    .format(rule_id=refine_rule, profile_id=self.id_,
292
                            property_=a_refinement[0], value=a_refinement[1])
293
                    )
294
                raise ValueError(msg)
295
296
            if refine_rule not in self.get_rule_selectors():
297
                msg = ("- {rule_id}.{property_}={value}' in profile '{profile_id}' is refining "
298
                       "a rule that is not selected by it. The refinement will not have any "
299
                       "noticeable effect. Either select the rule or remove the rule refinement."
300
                       .format(rule_id=refine_rule, property_=a_refinement[0],
301
                               value=a_refinement[1], profile_id=self.id_)
302
                       )
303
                raise ValueError(msg)
304
305 2
    def validate_variables(self, variables):
306
        variables_by_id = dict()
307
        for var in variables:
308
            variables_by_id[var.id_] = var
309
310
        for var_id, our_val in self.variables.items():
311
            if var_id not in variables_by_id:
312
                all_vars_list = [" - %s" % v for v in variables_by_id.keys()]
313
                msg = (
314
                    "Value '{var_id}' in profile '{profile_name}' is not known. "
315
                    "We know only variables:\n{var_names}"
316
                    .format(
317
                        var_id=var_id, profile_name=self.id_,
318
                        var_names="\n".join(sorted(all_vars_list)))
319
                )
320
                raise ValueError(msg)
321
322
            allowed_selectors = [str(s) for s in variables_by_id[var_id].options.keys()]
323
            if our_val not in allowed_selectors:
324
                msg = (
325
                    "Value '{var_id}' in profile '{profile_name}' "
326
                    "uses the selector '{our_val}'. "
327
                    "This is not possible, as only selectors {all_selectors} are available. "
328
                    "Either change the selector used in the profile, or "
329
                    "add the selector-value pair to the variable definition."
330
                    .format(
331
                        var_id=var_id, profile_name=self.id_, our_val=our_val,
332
                        all_selectors=allowed_selectors,
333
                    )
334
                )
335
                raise ValueError(msg)
336
337 2
    def validate_rules(self, rules, groups):
338
        existing_rule_ids = [r.id_ for r in rules]
339
        rule_selectors = self.get_rule_selectors()
340
        for id_ in rule_selectors:
341
            if id_ in groups:
342
                msg = (
343
                    "You have selected a group '{group_id}' instead of a "
344
                    "rule. Groups have no effect in the profile and are not "
345
                    "allowed to be selected. Please remove '{group_id}' "
346
                    "from profile '{profile_id}' before proceeding."
347
                    .format(group_id=id_, profile_id=self.id_)
348
                )
349
                raise ValueError(msg)
350
            if id_ not in existing_rule_ids:
351
                msg = (
352
                    "Rule '{rule_id}' was not found in the benchmark. Please "
353
                    "remove rule '{rule_id}' from profile '{profile_id}' "
354
                    "before proceeding."
355
                    .format(rule_id=id_, profile_id=self.id_)
356
                )
357
                raise ValueError(msg)
358
359 2
    def __sub__(self, other):
360
        profile = Profile(self.id_)
361
        profile.title = self.title
362
        profile.description = self.description
363
        profile.extends = self.extends
364
        profile.platforms = self.platforms
365
        profile.platform = self.platform
366
        profile.selected = list(set(self.selected) - set(other.selected))
367
        profile.selected.sort()
368
        profile.unselected = list(set(self.unselected) - set(other.unselected))
369
        profile.variables = dict ((k, v) for (k, v) in self.variables.items()
370
                             if k not in other.variables or v != other.variables[k])
371
        return profile
372
373
374 2
class ResolvableProfile(Profile):
375 2
    def __init__(self, * args, ** kwargs):
376
        super(ResolvableProfile, self).__init__(* args, ** kwargs)
377
        self.resolved = False
378
        self.resolved_selections = set()
379
380 2
    def _controls_ids_to_controls(self, controls_manager, policy_id, control_id_list):
381
        items = [controls_manager.get_control(policy_id, cid) for cid in control_id_list]
382
        return items
383
384 2
    def _merge_control(self, control):
385
        self.selected.extend(control.rules)
386
        for varname, value in control.variables.items():
387
            if varname not in self.variables:
388
                self.variables[varname] = value
389
390 2
    def resolve_controls(self, controls_manager):
391
        pass
392
393 2
    def extend_by(self, extended_profile):
394
        extended_selects = set(extended_profile.selected)
395
        self.resolved_selections.update(extended_selects)
396
397
        updated_variables = dict(extended_profile.variables)
398
        updated_variables.update(self.variables)
399
        self.variables = updated_variables
400
401
        extended_refinements = deepcopy(extended_profile.refine_rules)
402
        updated_refinements = self._subtract_refinements(extended_refinements)
403
        updated_refinements.update(self.refine_rules)
404
        self.refine_rules = updated_refinements
405
406 2
    def resolve(self, all_profiles, controls_manager=None):
407
        if self.resolved:
408
            return
409
410
        self.resolve_controls(controls_manager)
411
412
        self.resolved_selections = set(self.selected)
413
414
        if self.extends:
415
            if self.extends not in all_profiles:
416
                msg = (
417
                    "Profile {name} extends profile {extended}, but "
418
                    "only profiles {known_profiles} are available for resolution."
419
                    .format(name=self.id_, extended=self.extends,
420
                            known_profiles=list(all_profiles.keys())))
421
                raise RuntimeError(msg)
422
            extended_profile = all_profiles[self.extends]
423
            extended_profile.resolve(all_profiles, controls_manager)
424
425
            self.extend_by(extended_profile)
426
427
        for uns in self.unselected:
428
            self.resolved_selections.discard(uns)
429
430
        self.unselected = []
431
        self.extends = None
432
433
        self.selected = sorted(self.resolved_selections)
434
435
        self.resolved = True
436
437 2
    def _subtract_refinements(self, extended_refinements):
438
        """
439
        Given a dict of rule refinements from the extended profile,
440
        "undo" every refinement prefixed with '!' in this profile.
441
        """
442
        for rule, refinements in list(self.refine_rules.items()):
443
            if rule.startswith("!"):
444
                for prop, val in refinements:
445
                    extended_refinements[rule[1:]].remove((prop, val))
446
                del self.refine_rules[rule]
447
        return extended_refinements
448
449
450 2
class ProfileWithSeparatePolicies(ResolvableProfile):
451 2
    def __init__(self, * args, ** kwargs):
452
        super(ProfileWithSeparatePolicies, self).__init__(* args, ** kwargs)
453
        self.policies = {}
454
455 2
    def read_yaml_contents(self, yaml_contents):
456
        policies = yaml_contents.pop("policies", None)
457
        if policies:
458
            self._parse_policies(policies)
459
        super(ProfileWithSeparatePolicies, self).read_yaml_contents(yaml_contents)
460
461 2
    def _parse_policies(self, policies_yaml):
462
        for item in policies_yaml:
463
            id_ = required_key(item, "id")
464
            controls_ids = required_key(item, "controls")
465
            if not isinstance(controls_ids, list):
466
                if controls_ids != "all":
467
                    msg = (
468
                        "Policy {id_} contains invalid controls list {controls}."
469
                        .format(id_=id_, controls=str(controls_ids)))
470
                    raise ValueError(msg)
471
            self.policies[id_] = controls_ids
472
473 2 View Code Duplication
    def _process_controls_ids_into_controls(self, controls_manager, policy_id, controls_ids):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
474
        controls = []
475
        for cid in controls_ids:
476
            if not cid.startswith("all"):
477
                controls.extend(
478
                    self._controls_ids_to_controls(controls_manager, policy_id, [cid]))
479
            elif ":" in cid:
480
                _, level_id = cid.split(":", 1)
481
                controls.extend(
482
                    controls_manager.get_all_controls_of_level_at_least(policy_id, level_id))
483
            else:
484
                controls.extend(controls_manager.get_all_controls(policy_id))
485
        return controls
486
487 2
    def resolve_controls(self, controls_manager):
488
        for policy_id, controls_ids in self.policies.items():
489
            controls = []
490
491
            if isinstance(controls_ids, list):
492
                controls = self._process_controls_ids_into_controls(
493
                    controls_manager, policy_id, controls_ids)
494
            elif controls_ids.startswith("all"):
495
                controls = self._process_controls_ids_into_controls(
496
                    controls_manager, policy_id, [controls_ids])
497
            else:
498
                msg = (
499
                    "Unknown policy content {content} in profile {profile_id}"
500
                    .format(content=controls_ids, profile_id=self.id_))
501
                raise ValueError(msg)
502
503
            for c in controls:
504
                self._merge_control(c)
505
506 2
    def extend_by(self, extended_profile):
507
        self.policies.update(extended_profile.policies)
508
        super(ProfileWithSeparatePolicies, self).extend_by(extended_profile)
509
510
511 2
class ProfileWithInlinePolicies(ResolvableProfile):
512 2
    def __init__(self, * args, ** kwargs):
513
        super(ProfileWithInlinePolicies, self).__init__(* args, ** kwargs)
514
        self.controls_by_policy = defaultdict(list)
515
516 2
    def apply_selection(self, item):
517
        if ":" in item:
518
            policy_id, control_id = item.split(":", 1)
519
            self.controls_by_policy[policy_id].append(control_id)
520
        else:
521
            super(ProfileWithInlinePolicies, self).apply_selection(item)
522
523 2 View Code Duplication
    def _process_controls_ids_into_controls(self, controls_manager, policy_id, controls_ids):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
524
        controls = []
525
        for cid in controls_ids:
526
            if not cid.startswith("all"):
527
                controls.extend(
528
                    self._controls_ids_to_controls(controls_manager, policy_id, [cid]))
529
            elif ":" in cid:
530
                _, level_id = cid.split(":", 1)
531
                controls.extend(
532
                    controls_manager.get_all_controls_of_level_at_least(policy_id, level_id))
533
            else:
534
                controls.extend(
535
                    controls_manager.get_all_controls(policy_id))
536
        return controls
537
538 2
    def resolve_controls(self, controls_manager):
539
        for policy_id, controls_ids in self.controls_by_policy.items():
540
            controls = self._process_controls_ids_into_controls(
541
                controls_manager, policy_id, controls_ids)
542
543
            for c in controls:
544
                self._merge_control(c)
545
546
547 2
class Value(object):
548
    """Represents XCCDF Value
549
    """
550
551 2
    def __init__(self, id_):
552 2
        self.id_ = id_
553 2
        self.title = ""
554 2
        self.description = ""
555 2
        self.type_ = "string"
556 2
        self.operator = "equals"
557 2
        self.interactive = False
558 2
        self.options = {}
559 2
        self.warnings = []
560
561 2
    @staticmethod
562 2
    def from_yaml(yaml_file, env_yaml=None):
563 2
        yaml_contents = open_and_macro_expand(yaml_file, env_yaml)
564 2
        if yaml_contents is None:
565
            return None
566
567 2
        value_id, _ = os.path.splitext(os.path.basename(yaml_file))
568 2
        value = Value(value_id)
569 2
        value.title = required_key(yaml_contents, "title")
570 2
        del yaml_contents["title"]
571 2
        value.description = required_key(yaml_contents, "description")
572 2
        del yaml_contents["description"]
573 2
        value.type_ = required_key(yaml_contents, "type")
574 2
        del yaml_contents["type"]
575 2
        value.operator = yaml_contents.pop("operator", "equals")
576 2
        possible_operators = ["equals", "not equal", "greater than",
577
                              "less than", "greater than or equal",
578
                              "less than or equal", "pattern match"]
579
580 2
        if value.operator not in possible_operators:
581
            raise ValueError(
582
                "Found an invalid operator value '%s' in '%s'. "
583
                "Expected one of: %s"
584
                % (value.operator, yaml_file, ", ".join(possible_operators))
585
            )
586
587 2
        value.interactive = \
588
            yaml_contents.pop("interactive", "false").lower() == "true"
589
590 2
        value.options = required_key(yaml_contents, "options")
591 2
        del yaml_contents["options"]
592 2
        value.warnings = yaml_contents.pop("warnings", [])
593
594 2
        for warning_list in value.warnings:
595
            if len(warning_list) != 1:
596
                raise ValueError("Only one key/value pair should exist for each dictionary")
597
598 2
        if yaml_contents:
599
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
600
                               % (yaml_file, yaml_contents))
601
602 2
        return value
603
604 2
    def to_xml_element(self):
605
        value = ET.Element('Value')
606
        value.set('id', self.id_)
607
        value.set('type', self.type_)
608
        if self.operator != "equals":  # equals is the default
609
            value.set('operator', self.operator)
610
        if self.interactive:  # False is the default
611
            value.set('interactive', 'true')
612
        title = ET.SubElement(value, 'title')
613
        title.text = self.title
614
        add_sub_element(value, 'description', self.description)
615
        add_warning_elements(value, self.warnings)
616
617
        for selector, option in self.options.items():
618
            # do not confuse Value with big V with value with small v
619
            # value is child element of Value
620
            value_small = ET.SubElement(value, 'value')
621
            # by XCCDF spec, default value is value without selector
622
            if selector != "default":
623
                value_small.set('selector', str(selector))
624
            value_small.text = str(option)
625
626
        return value
627
628 2
    def to_file(self, file_name):
629
        root = self.to_xml_element()
630
        tree = ET.ElementTree(root)
631
        tree.write(file_name)
632
633
634 2
class Benchmark(object):
635
    """Represents XCCDF Benchmark
636
    """
637 2
    def __init__(self, id_):
638
        self.id_ = id_
639
        self.title = ""
640
        self.status = ""
641
        self.description = ""
642
        self.notice_id = ""
643
        self.notice_description = ""
644
        self.front_matter = ""
645
        self.rear_matter = ""
646
        self.cpes = []
647
        self.version = "0.1"
648
        self.profiles = []
649
        self.values = {}
650
        self.bash_remediation_fns_group = None
651
        self.groups = {}
652
        self.rules = {}
653
        self.product_cpe_names = []
654
655
        # This is required for OCIL clauses
656
        conditional_clause = Value("conditional_clause")
657
        conditional_clause.title = "A conditional clause for check statements."
658
        conditional_clause.description = conditional_clause.title
659
        conditional_clause.type_ = "string"
660
        conditional_clause.options = {"": "This is a placeholder"}
661
662
        self.add_value(conditional_clause)
663
664 2
    @classmethod
665 2
    def from_yaml(cls, yaml_file, id_, env_yaml=None):
666
        yaml_contents = open_and_macro_expand(yaml_file, env_yaml)
667
        if yaml_contents is None:
668
            return None
669
670
        benchmark = cls(id_)
671
        benchmark.title = required_key(yaml_contents, "title")
672
        del yaml_contents["title"]
673
        benchmark.status = required_key(yaml_contents, "status")
674
        del yaml_contents["status"]
675
        benchmark.description = required_key(yaml_contents, "description")
676
        del yaml_contents["description"]
677
        notice_contents = required_key(yaml_contents, "notice")
678
        benchmark.notice_id = required_key(notice_contents, "id")
679
        del notice_contents["id"]
680
        benchmark.notice_description = required_key(notice_contents,
681
                                                    "description")
682
        del notice_contents["description"]
683
        if not notice_contents:
684
            del yaml_contents["notice"]
685
686
        benchmark.front_matter = required_key(yaml_contents,
687
                                              "front-matter")
688
        del yaml_contents["front-matter"]
689
        benchmark.rear_matter = required_key(yaml_contents,
690
                                             "rear-matter")
691
        del yaml_contents["rear-matter"]
692
        benchmark.version = str(required_key(yaml_contents, "version"))
693
        del yaml_contents["version"]
694
695
        if env_yaml:
696
            benchmark.product_cpe_names = env_yaml["product_cpes"].get_product_cpe_names()
697
698
        if yaml_contents:
699
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
700
                               % (yaml_file, yaml_contents))
701
702
        return benchmark
703
704 2
    def add_profiles_from_dir(self, dir_, env_yaml):
705
        for dir_item in sorted(os.listdir(dir_)):
706
            dir_item_path = os.path.join(dir_, dir_item)
707
            if not os.path.isfile(dir_item_path):
708
                continue
709
710
            _, ext = os.path.splitext(os.path.basename(dir_item_path))
711
            if ext != '.profile':
712
                sys.stderr.write(
713
                    "Encountered file '%s' while looking for profiles, "
714
                    "extension '%s' is unknown. Skipping..\n"
715
                    % (dir_item, ext)
716
                )
717
                continue
718
719
            try:
720
                new_profile = ProfileWithInlinePolicies.from_yaml(dir_item_path, env_yaml)
721
            except DocumentationNotComplete:
722
                continue
723
            except Exception as exc:
724
                msg = ("Error building profile from '{fname}': '{error}'"
725
                       .format(fname=dir_item_path, error=str(exc)))
726
                raise RuntimeError(msg)
727
            if new_profile is None:
728
                continue
729
730
            self.profiles.append(new_profile)
731
732 2
    def add_bash_remediation_fns_from_file(self, file_):
733
        if not file_:
734
            # bash-remediation-functions.xml doens't exist
735
            return
736
737
        tree = ET.parse(file_)
738
        self.bash_remediation_fns_group = tree.getroot()
739
740 2
    def to_xml_element(self):
741
        root = ET.Element('Benchmark')
742
        root.set('xmlns:xsi', 'http://www.w3.org/2001/XMLSchema-instance')
743
        root.set('xmlns:xhtml', 'http://www.w3.org/1999/xhtml')
744
        root.set('xmlns:dc', 'http://purl.org/dc/elements/1.1/')
745
        root.set('id', 'product-name')
746
        root.set('xsi:schemaLocation',
747
                 'http://checklists.nist.gov/xccdf/1.1 xccdf-1.1.4.xsd')
748
        root.set('style', 'SCAP_1.1')
749
        root.set('resolved', 'false')
750
        root.set('xml:lang', 'en-US')
751
        status = ET.SubElement(root, 'status')
752
        status.set('date', datetime.date.today().strftime("%Y-%m-%d"))
753
        status.text = self.status
754
        add_sub_element(root, "title", self.title)
755
        add_sub_element(root, "description", self.description)
756
        notice = add_sub_element(root, "notice", self.notice_description)
757
        notice.set('id', self.notice_id)
758
        add_sub_element(root, "front-matter", self.front_matter)
759
        add_sub_element(root, "rear-matter", self.rear_matter)
760
761
        # The Benchmark applicability is determined by the CPEs
762
        # defined in the product.yml
763
        for cpe_name in self.product_cpe_names:
764
            plat = ET.SubElement(root, "platform")
765
            plat.set("idref", cpe_name)
766
767
        version = ET.SubElement(root, 'version')
768
        version.text = self.version
769
        ET.SubElement(root, "metadata")
770
771
        for profile in self.profiles:
772
            root.append(profile.to_xml_element())
773
774
        for value in self.values.values():
775
            root.append(value.to_xml_element())
776
        if self.bash_remediation_fns_group is not None:
777
            root.append(self.bash_remediation_fns_group)
778
779
        groups_in_bench = list(self.groups.keys())
780
        priority_order = ["system", "services"]
781
        groups_in_bench = reorder_according_to_ordering(groups_in_bench, priority_order)
782
783
        # Make system group the first, followed by services group
784
        for group_id in groups_in_bench:
785
            group = self.groups.get(group_id)
786
            # Products using application benchmark don't have system or services group
787
            if group is not None:
788
                root.append(group.to_xml_element())
789
790
        for rule in self.rules.values():
791
            root.append(rule.to_xml_element())
792
793
        return root
794
795 2
    def to_file(self, file_name, ):
796
        root = self.to_xml_element()
797
        tree = ET.ElementTree(root)
798
        tree.write(file_name)
799
800 2
    def add_value(self, value):
801
        if value is None:
802
            return
803
        self.values[value.id_] = value
804
805
    # The benchmark is also considered a group, so this function signature needs to match
806
    # Group()'s add_group()
807 2
    def add_group(self, group, env_yaml=None):
808
        if group is None:
809
            return
810
        self.groups[group.id_] = group
811
812 2
    def add_rule(self, rule):
813
        if rule is None:
814
            return
815
        self.rules[rule.id_] = rule
816
817 2
    def to_xccdf(self):
818
        """We can easily extend this script to generate a valid XCCDF instead
819
        of SSG SHORTHAND.
820
        """
821
        raise NotImplementedError
822
823 2
    def __str__(self):
824
        return self.id_
825
826
827 2
class Group(object):
828
    """Represents XCCDF Group
829
    """
830 2
    ATTRIBUTES_TO_PASS_ON = (
831
        "platforms",
832
    )
833
834 2
    def __init__(self, id_):
835
        self.id_ = id_
836
        self.prodtype = "all"
837
        self.title = ""
838
        self.description = ""
839
        self.warnings = []
840
        self.requires = []
841
        self.conflicts = []
842
        self.values = {}
843
        self.groups = {}
844
        self.rules = {}
845
        # self.platforms is used further in the build system
846
        # self.platform is merged into self.platforms
847
        # it is here for backward compatibility
848
        self.platforms = set()
849
        self.cpe_names = set()
850
        self.platform = None
851
852 2
    @classmethod
853 2
    def from_yaml(cls, yaml_file, env_yaml=None):
854
        yaml_contents = open_and_macro_expand(yaml_file, env_yaml)
855
        if yaml_contents is None:
856
            return None
857
858
        group_id = os.path.basename(os.path.dirname(yaml_file))
859
        group = cls(group_id)
860
        group.prodtype = yaml_contents.pop("prodtype", "all")
861
        group.title = required_key(yaml_contents, "title")
862
        del yaml_contents["title"]
863
        group.description = required_key(yaml_contents, "description")
864
        del yaml_contents["description"]
865
        group.warnings = yaml_contents.pop("warnings", [])
866
        group.conflicts = yaml_contents.pop("conflicts", [])
867
        group.requires = yaml_contents.pop("requires", [])
868
        group.platform = yaml_contents.pop("platform", None)
869
        group.platforms = yaml_contents.pop("platforms", set())
870
        # ensure that content of group.platform is in group.platforms as
871
        # well
872
        if group.platform is not None:
873
            group.platforms.add(group.platform)
874
875
        if env_yaml:
876
            for platform in group.platforms:
877
                try:
878
                    group.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform))
879
                except CPEDoesNotExist:
880
                    print("Unsupported platform '%s' in group '%s'." % (platform, group.id_))
881
                    raise
882
883
        for warning_list in group.warnings:
884
            if len(warning_list) != 1:
885
                raise ValueError("Only one key/value pair should exist for each dictionary")
886
887
        if yaml_contents:
888
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
889
                               % (yaml_file, yaml_contents))
890
        group.validate_prodtype(yaml_file)
891
        return group
892
893 2
    def validate_prodtype(self, yaml_file):
894
        for ptype in self.prodtype.split(","):
895
            if ptype.strip() != ptype:
896
                msg = (
897
                    "Comma-separated '{prodtype}' prodtype "
898
                    "in {yaml_file} contains whitespace."
899
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
900
                raise ValueError(msg)
901
902 2
    def to_xml_element(self):
903
        group = ET.Element('Group')
904
        group.set('id', self.id_)
905
        if self.prodtype != "all":
906
            group.set("prodtype", self.prodtype)
907
        title = ET.SubElement(group, 'title')
908
        title.text = self.title
909
        add_sub_element(group, 'description', self.description)
910
        add_warning_elements(group, self.warnings)
911
        add_nondata_subelements(group, "requires", "id", self.requires)
912
        add_nondata_subelements(group, "conflicts", "id", self.conflicts)
913
914
        for cpe_name in self.cpe_names:
915
            platform_el = ET.SubElement(group, "platform")
916
            platform_el.set("idref", cpe_name)
917
918
        for _value in self.values.values():
919
            group.append(_value.to_xml_element())
920
921
        # Rules that install or remove packages affect remediation
922
        # of other rules.
923
        # When packages installed/removed rules come first:
924
        # The Rules are ordered in more logical way, and
925
        # remediation order is natural, first the package is installed, then configured.
926
        rules_in_group = list(self.rules.keys())
927
        regex = r'(package_.*_(installed|removed))|(service_.*_(enabled|disabled))$'
928
        priority_order = ["installed", "removed", "enabled", "disabled"]
929
        rules_in_group = reorder_according_to_ordering(rules_in_group, priority_order, regex)
930
931
        # Add rules in priority order, first all packages installed, then removed,
932
        # followed by services enabled, then disabled
933
        for rule_id in rules_in_group:
934
            group.append(self.rules.get(rule_id).to_xml_element())
935
936
        # Add the sub groups after any current level group rules.
937
        # As package installed/removed and service enabled/disabled rules are usuallly in
938
        # top level group, this ensures groups that further configure a package or service
939
        # are after rules that install or remove it.
940
        groups_in_group = list(self.groups.keys())
941
        priority_order = [
942
            # Make sure rpm_verify_(hashes|permissions|ownership) are run before any other rule.
943
            # Due to conflicts between rules rpm_verify_* rules and any rule that configures
944
            # stricter settings, like file_permissions_grub2_cfg and sudo_dedicated_group,
945
            # the rules deviating from the system default should be evaluated later.
946
            # So that in the end the system has contents, permissions and ownership reset, and
947
            # any deviations or stricter settings are applied by the rules in the profile.
948
            "software", "integrity", "integrity-software", "rpm_verification",
949
950
            # The account group has to precede audit group because
951
            # the rule package_screen_installed is desired to be executed before the rule
952
            # audit_rules_privileged_commands, othervise the rule
953
            # does not catch newly installed screen binary during remediation
954
            # and report fail
955
            "accounts", "auditing",
956
957
958
            # The FIPS group should come before Crypto,
959
            # if we want to set a different (stricter) Crypto Policy than FIPS.
960
            "fips", "crypto",
961
962
            # The firewalld_activation must come before ruleset_modifications, othervise
963
            # remediations for ruleset_modifications won't work
964
            "firewalld_activation", "ruleset_modifications",
965
966
            # Rules from group disabling_ipv6 must precede rules from configuring_ipv6,
967
            # otherwise the remediation prints error although it is successful
968
            "disabling_ipv6", "configuring_ipv6"
969
        ]
970
        groups_in_group = reorder_according_to_ordering(groups_in_group, priority_order)
971
        for group_id in groups_in_group:
972
            _group = self.groups[group_id]
973
            group.append(_group.to_xml_element())
974
975
        return group
976
977 2
    def to_file(self, file_name):
978
        root = self.to_xml_element()
979
        tree = ET.ElementTree(root)
980
        tree.write(file_name)
981
982 2
    def add_value(self, value):
983
        if value is None:
984
            return
985
        self.values[value.id_] = value
986
987 2
    def add_group(self, group, env_yaml=None):
988
        if group is None:
989
            return
990
        if self.platforms and not group.platforms:
991
            group.platforms = self.platforms
992
        self.groups[group.id_] = group
993
        self._pass_our_properties_on_to(group)
994
995
        # Once the group has inherited properties, update cpe_names
996
        if env_yaml:
997
            for platform in group.platforms:
998
                try:
999
                    group.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform))
1000
                except CPEDoesNotExist:
1001
                    print("Unsupported platform '%s' in group '%s'." % (platform, self.id_))
1002
                    raise
1003
1004
1005 2
    def _pass_our_properties_on_to(self, obj):
1006
        for attr in self.ATTRIBUTES_TO_PASS_ON:
1007
            if hasattr(obj, attr) and getattr(obj, attr) is None:
1008
                setattr(obj, attr, getattr(self, attr))
1009
1010 2
    def add_rule(self, rule):
1011
        if rule is None:
1012
            return
1013
        if self.platforms and not rule.platforms:
1014
            rule.platforms = self.platforms
1015
        self.rules[rule.id_] = rule
1016
        self._pass_our_properties_on_to(rule)
1017
1018 2
    def __str__(self):
1019
        return self.id_
1020
1021
1022 2
class Rule(object):
1023
    """Represents XCCDF Rule
1024
    """
1025 2
    YAML_KEYS_DEFAULTS = {
1026
        "prodtype": lambda: "all",
1027
        "title": lambda: RuntimeError("Missing key 'title'"),
1028
        "description": lambda: RuntimeError("Missing key 'description'"),
1029
        "rationale": lambda: RuntimeError("Missing key 'rationale'"),
1030
        "severity": lambda: RuntimeError("Missing key 'severity'"),
1031
        "references": lambda: dict(),
1032
        "identifiers": lambda: dict(),
1033
        "ocil_clause": lambda: None,
1034
        "ocil": lambda: None,
1035
        "oval_external_content": lambda: None,
1036
        "warnings": lambda: list(),
1037
        "conflicts": lambda: list(),
1038
        "requires": lambda: list(),
1039
        "platform": lambda: None,
1040
        "platforms": lambda: set(),
1041
        "inherited_platforms": lambda: list(),
1042
        "template": lambda: None,
1043
        "definition_location": lambda: None,
1044
    }
1045
1046 2
    PRODUCT_REFERENCES = ("stigid", "cis",)
1047 2
    GLOBAL_REFERENCES = ("srg", "vmmsrg", "disa", "cis-csc",)
1048
1049 2
    def __init__(self, id_):
1050 2
        self.id_ = id_
1051 2
        self.prodtype = "all"
1052 2
        self.title = ""
1053 2
        self.description = ""
1054 2
        self.definition_location = ""
1055 2
        self.rationale = ""
1056 2
        self.severity = "unknown"
1057 2
        self.references = {}
1058 2
        self.identifiers = {}
1059 2
        self.ocil_clause = None
1060 2
        self.ocil = None
1061 2
        self.oval_external_content = None
1062 2
        self.warnings = []
1063 2
        self.requires = []
1064 2
        self.conflicts = []
1065
        # self.platforms is used further in the build system
1066
        # self.platform is merged into self.platforms
1067
        # it is here for backward compatibility
1068 2
        self.platform = None
1069 2
        self.platforms = set()
1070 2
        self.cpe_names = set()
1071 2
        self.inherited_platforms = [] # platforms inherited from the group
1072 2
        self.template = None
1073
1074 2
    @classmethod
1075 2
    def from_yaml(cls, yaml_file, env_yaml=None):
1076 2
        yaml_file = os.path.normpath(yaml_file)
1077
1078 2
        yaml_contents = open_and_macro_expand(yaml_file, env_yaml)
1079 2
        if yaml_contents is None:
1080
            return None
1081
1082 2
        rule_id, ext = os.path.splitext(os.path.basename(yaml_file))
1083 2
        if rule_id == "rule" and ext == ".yml":
1084 2
            rule_id = get_rule_dir_id(yaml_file)
1085
1086 2
        rule = cls(rule_id)
1087
1088 2
        try:
1089 2
            rule._set_attributes_from_dict(yaml_contents)
1090
        except RuntimeError as exc:
1091
            msg = ("Error processing '{fname}': {err}"
1092
                   .format(fname=yaml_file, err=str(exc)))
1093
            raise RuntimeError(msg)
1094
1095 2
        for warning_list in rule.warnings:
1096
            if len(warning_list) != 1:
1097
                raise ValueError("Only one key/value pair should exist for each dictionary")
1098
1099
        # ensure that content of rule.platform is in rule.platforms as
1100
        # well
1101 2
        if rule.platform is not None:
1102
            rule.platforms.add(rule.platform)
1103
1104
        # Convert the platform names to CPE names
1105
        # But only do it if an env_yaml was specified (otherwise there would be no product CPEs
1106
        # to lookup), and the rule's prodtype matches the product being built
1107 2
        if env_yaml and env_yaml["product"] in parse_prodtype(rule.prodtype):
1108
            for platform in rule.platforms:
1109
                try:
1110
                    rule.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform))
1111
                except CPEDoesNotExist:
1112
                    print("Unsupported platform '%s' in rule '%s'." % (platform, rule.id_))
1113
                    raise
1114
1115 2
        if yaml_contents:
1116
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
1117
                               % (yaml_file, yaml_contents))
1118
1119 2
        if not rule.definition_location:
1120 2
            rule.definition_location = yaml_file
1121
1122 2
        rule.validate_prodtype(yaml_file)
1123 2
        rule.validate_identifiers(yaml_file)
1124 2
        rule.validate_references(yaml_file)
1125 2
        return rule
1126
1127 2
    def _verify_stigid_format(self, product):
1128 2
        stig_id = self.references.get("stigid", None)
1129 2
        if not stig_id:
1130 2
            return
1131 2
        if "," in stig_id:
1132 2
            raise ValueError("Rules can not have multiple STIG IDs.")
1133
1134 2
    def _verify_disa_cci_format(self):
1135 2
        cci_id = self.references.get("disa", None)
1136 2
        if not cci_id:
1137 2
            return
1138
        cci_ex = re.compile(r'^CCI-[0-9]{6}$')
1139
        for cci in cci_id.split(","):
1140
            if not cci_ex.match(cci):
1141
                raise ValueError("CCI '{}' is in the wrong format! "
1142
                                 "Format should be similar to: "
1143
                                 "CCI-XXXXXX".format(cci))
1144
        self.references["disa"] = cci_id
1145
1146 2
    def normalize(self, product):
1147 2
        try:
1148 2
            self.make_refs_and_identifiers_product_specific(product)
1149 2
            self.make_template_product_specific(product)
1150 2
        except Exception as exc:
1151 2
            msg = (
1152
                "Error normalizing '{rule}': {msg}"
1153
                .format(rule=self.id_, msg=str(exc))
1154
            )
1155 2
            raise RuntimeError(msg)
1156
1157 2
    def _get_product_only_references(self):
1158 2
        product_references = dict()
1159
1160 2
        for ref in Rule.PRODUCT_REFERENCES:
1161 2
            start = "{0}@".format(ref)
1162 2
            for gref, gval in self.references.items():
1163 2
                if ref == gref or gref.startswith(start):
1164 2
                    product_references[gref] = gval
1165 2
        return product_references
1166
1167 2
    def make_template_product_specific(self, product):
1168 2
        product_suffix = "@{0}".format(product)
1169
1170 2
        if not self.template:
1171
            return
1172
1173 2
        not_specific_vars = self.template.get("vars", dict())
1174 2
        specific_vars = self._make_items_product_specific(
1175
            not_specific_vars, product_suffix, True)
1176 2
        self.template["vars"] = specific_vars
1177
1178 2
        not_specific_backends = self.template.get("backends", dict())
1179 2
        specific_backends = self._make_items_product_specific(
1180
            not_specific_backends, product_suffix, True)
1181 2
        self.template["backends"] = specific_backends
1182
1183 2
    def make_refs_and_identifiers_product_specific(self, product):
1184 2
        product_suffix = "@{0}".format(product)
1185
1186 2
        product_references = self._get_product_only_references()
1187 2
        general_references = self.references.copy()
1188 2
        for todel in product_references:
1189 2
            general_references.pop(todel)
1190 2
        for ref in Rule.PRODUCT_REFERENCES:
1191 2
            if ref in general_references:
1192
                msg = "Unexpected reference identifier ({0}) without "
1193
                msg += "product qualifier ({0}@{1}) while building rule "
1194
                msg += "{2}"
1195
                msg = msg.format(ref, product, self.id_)
1196
                raise ValueError(msg)
1197
1198 2
        to_set = dict(
1199
            identifiers=(self.identifiers, False),
1200
            general_references=(general_references, True),
1201
            product_references=(product_references, False),
1202
        )
1203 2
        for name, (dic, allow_overwrites) in to_set.items():
1204 2
            try:
1205 2
                new_items = self._make_items_product_specific(
1206
                    dic, product_suffix, allow_overwrites)
1207 2
            except ValueError as exc:
1208 2
                msg = (
1209
                    "Error processing {what} for rule '{rid}': {msg}"
1210
                    .format(what=name, rid=self.id_, msg=str(exc))
1211
                )
1212 2
                raise ValueError(msg)
1213 2
            dic.clear()
1214 2
            dic.update(new_items)
1215
1216 2
        self.references = general_references
1217 2
        self._verify_disa_cci_format()
1218 2
        self.references.update(product_references)
1219
1220 2
        self._verify_stigid_format(product)
1221
1222 2
    def _make_items_product_specific(self, items_dict, product_suffix, allow_overwrites=False):
1223 2
        new_items = dict()
1224 2
        for full_label, value in items_dict.items():
1225 2
            if "@" not in full_label and full_label not in new_items:
1226 2
                new_items[full_label] = value
1227 2
                continue
1228
1229 2
            label = full_label.split("@")[0]
1230
1231
            # this test should occur before matching product_suffix with the product qualifier
1232
            # present in the reference, so it catches problems even for products that are not
1233
            # being built at the moment
1234 2
            if label in Rule.GLOBAL_REFERENCES:
1235
                msg = (
1236
                    "You cannot use product-qualified for the '{item_u}' reference. "
1237
                    "Please remove the product-qualifier and merge values with the "
1238
                    "existing reference if there is any. Original line: {item_q}: {value_q}"
1239
                    .format(item_u=label, item_q=full_label, value_q=value)
1240
                )
1241
                raise ValueError(msg)
1242
1243 2
            if not full_label.endswith(product_suffix):
1244 2
                continue
1245
1246 2
            if label in items_dict and not allow_overwrites and value != items_dict[label]:
1247 2
                msg = (
1248
                    "There is a product-qualified '{item_q}' item, "
1249
                    "but also an unqualified '{item_u}' item "
1250
                    "and those two differ in value - "
1251
                    "'{value_q}' vs '{value_u}' respectively."
1252
                    .format(item_q=full_label, item_u=label,
1253
                            value_q=value, value_u=items_dict[label])
1254
                )
1255 2
                raise ValueError(msg)
1256 2
            new_items[label] = value
1257 2
        return new_items
1258
1259 2
    def _set_attributes_from_dict(self, yaml_contents):
1260 2
        for key, default_getter in self.YAML_KEYS_DEFAULTS.items():
1261 2
            if key not in yaml_contents:
1262 2
                value = default_getter()
1263 2
                if isinstance(value, Exception):
1264
                    raise value
1265
            else:
1266 2
                value = yaml_contents.pop(key)
1267
1268 2
            setattr(self, key, value)
1269
1270 2
    def to_contents_dict(self):
1271
        """
1272
        Returns a dictionary that is the same schema as the dict obtained when loading rule YAML.
1273
        """
1274
1275 2
        yaml_contents = dict()
1276 2
        for key in Rule.YAML_KEYS_DEFAULTS:
1277 2
            yaml_contents[key] = getattr(self, key)
1278
1279 2
        return yaml_contents
1280
1281 2
    def validate_identifiers(self, yaml_file):
1282 2
        if self.identifiers is None:
1283
            raise ValueError("Empty identifier section in file %s" % yaml_file)
1284
1285
        # Validate all identifiers are non-empty:
1286 2
        for ident_type, ident_val in self.identifiers.items():
1287 2
            if not isinstance(ident_type, str) or not isinstance(ident_val, str):
1288
                raise ValueError("Identifiers and values must be strings: %s in file %s"
1289
                                 % (ident_type, yaml_file))
1290 2
            if ident_val.strip() == "":
1291
                raise ValueError("Identifiers must not be empty: %s in file %s"
1292
                                 % (ident_type, yaml_file))
1293 2
            if ident_type[0:3] == 'cce':
1294 2
                if not is_cce_format_valid(ident_val):
1295
                    raise ValueError("CCE Identifier format must be valid: invalid format '%s' for CEE '%s'"
1296
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
1297 2
                if not is_cce_value_valid("CCE-" + ident_val):
1298
                    raise ValueError("CCE Identifier value is not a valid checksum: invalid value '%s' for CEE '%s'"
1299
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
1300
1301 2
    def validate_references(self, yaml_file):
1302 2
        if self.references is None:
1303
            raise ValueError("Empty references section in file %s" % yaml_file)
1304
1305 2
        for ref_type, ref_val in self.references.items():
1306 2
            if not isinstance(ref_type, str) or not isinstance(ref_val, str):
1307
                raise ValueError("References and values must be strings: %s in file %s"
1308
                                 % (ref_type, yaml_file))
1309 2
            if ref_val.strip() == "":
1310
                raise ValueError("References must not be empty: %s in file %s"
1311
                                 % (ref_type, yaml_file))
1312
1313 2
        for ref_type, ref_val in self.references.items():
1314 2
            for ref in ref_val.split(","):
1315 2
                if ref.strip() != ref:
1316
                    msg = (
1317
                        "Comma-separated '{ref_type}' reference "
1318
                        "in {yaml_file} contains whitespace."
1319
                        .format(ref_type=ref_type, yaml_file=yaml_file))
1320
                    raise ValueError(msg)
1321
1322 2
    def validate_prodtype(self, yaml_file):
1323 2
        for ptype in self.prodtype.split(","):
1324 2
            if ptype.strip() != ptype:
1325
                msg = (
1326
                    "Comma-separated '{prodtype}' prodtype "
1327
                    "in {yaml_file} contains whitespace."
1328
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
1329
                raise ValueError(msg)
1330
1331 2
    def to_xml_element(self):
1332
        rule = ET.Element('Rule')
1333
        rule.set('id', self.id_)
1334
        if self.prodtype != "all":
1335
            rule.set("prodtype", self.prodtype)
1336
        rule.set('severity', self.severity)
1337
        add_sub_element(rule, 'title', self.title)
1338
        add_sub_element(rule, 'description', self.description)
1339
        add_sub_element(rule, 'rationale', self.rationale)
1340
1341
        main_ident = ET.Element('ident')
1342
        for ident_type, ident_val in self.identifiers.items():
1343
            # This is not true if items were normalized
1344
            if '@' in ident_type:
1345
                # the ident is applicable only on some product
1346
                # format : 'policy@product', eg. 'stigid@product'
1347
                # for them, we create a separate <ref> element
1348
                policy, product = ident_type.split('@')
1349
                ident = ET.SubElement(rule, 'ident')
1350
                ident.set(policy, ident_val)
1351
                ident.set('prodtype', product)
1352
            else:
1353
                main_ident.set(ident_type, ident_val)
1354
1355
        if main_ident.attrib:
1356
            rule.append(main_ident)
1357
1358
        main_ref = ET.Element('ref')
1359
        for ref_type, ref_val in self.references.items():
1360
            # This is not true if items were normalized
1361
            if '@' in ref_type:
1362
                # the reference is applicable only on some product
1363
                # format : 'policy@product', eg. 'stigid@product'
1364
                # for them, we create a separate <ref> element
1365
                policy, product = ref_type.split('@')
1366
                ref = ET.SubElement(rule, 'ref')
1367
                ref.set(policy, ref_val)
1368
                ref.set('prodtype', product)
1369
            else:
1370
                main_ref.set(ref_type, ref_val)
1371
1372
        if main_ref.attrib:
1373
            rule.append(main_ref)
1374
1375
        if self.oval_external_content:
1376
            check = ET.SubElement(rule, 'check')
1377
            check.set("system", "http://oval.mitre.org/XMLSchema/oval-definitions-5")
1378
            external_content = ET.SubElement(check, "check-content-ref")
1379
            external_content.set("href", self.oval_external_content)
1380
        else:
1381
            # TODO: This is pretty much a hack, oval ID will be the same as rule ID
1382
            #       and we don't want the developers to have to keep them in sync.
1383
            #       Therefore let's just add an OVAL ref of that ID.
1384
            oval_ref = ET.SubElement(rule, "oval")
1385
            oval_ref.set("id", self.id_)
1386
1387
        if self.ocil or self.ocil_clause:
1388
            ocil = add_sub_element(rule, 'ocil', self.ocil if self.ocil else "")
1389
            if self.ocil_clause:
1390
                ocil.set("clause", self.ocil_clause)
1391
1392
        add_warning_elements(rule, self.warnings)
1393
        add_nondata_subelements(rule, "requires", "id", self.requires)
1394
        add_nondata_subelements(rule, "conflicts", "id", self.conflicts)
1395
1396
        for cpe_name in self.cpe_names:
1397
            platform_el = ET.SubElement(rule, "platform")
1398
            platform_el.set("idref", cpe_name)
1399
1400
        return rule
1401
1402 2
    def to_file(self, file_name):
1403
        root = self.to_xml_element()
1404
        tree = ET.ElementTree(root)
1405
        tree.write(file_name)
1406
1407
1408 2
class DirectoryLoader(object):
1409 2
    def __init__(self, profiles_dir, bash_remediation_fns, env_yaml):
1410
        self.benchmark_file = None
1411
        self.group_file = None
1412
        self.loaded_group = None
1413
        self.rule_files = []
1414
        self.value_files = []
1415
        self.subdirectories = []
1416
1417
        self.all_values = set()
1418
        self.all_rules = set()
1419
        self.all_groups = set()
1420
1421
        self.profiles_dir = profiles_dir
1422
        self.bash_remediation_fns = bash_remediation_fns
1423
        self.env_yaml = env_yaml
1424
        self.product = env_yaml["product"]
1425
1426
        self.parent_group = None
1427
1428 2
    def _collect_items_to_load(self, guide_directory):
1429
        for dir_item in sorted(os.listdir(guide_directory)):
1430
            dir_item_path = os.path.join(guide_directory, dir_item)
1431
            _, extension = os.path.splitext(dir_item)
1432
1433
            if extension == '.var':
1434
                self.value_files.append(dir_item_path)
1435
            elif dir_item == "benchmark.yml":
1436
                if self.benchmark_file:
1437
                    raise ValueError("Multiple benchmarks in one directory")
1438
                self.benchmark_file = dir_item_path
1439
            elif dir_item == "group.yml":
1440
                if self.group_file:
1441
                    raise ValueError("Multiple groups in one directory")
1442
                self.group_file = dir_item_path
1443
            elif extension == '.rule':
1444
                self.rule_files.append(dir_item_path)
1445
            elif is_rule_dir(dir_item_path):
1446
                self.rule_files.append(get_rule_dir_yaml(dir_item_path))
1447
            elif dir_item != "tests":
1448
                if os.path.isdir(dir_item_path):
1449
                    self.subdirectories.append(dir_item_path)
1450
                else:
1451
                    sys.stderr.write(
1452
                        "Encountered file '%s' while recursing, extension '%s' "
1453
                        "is unknown. Skipping..\n"
1454
                        % (dir_item, extension)
1455
                    )
1456
1457 2
    def load_benchmark_or_group(self, guide_directory):
1458
        """
1459
        Loads a given benchmark or group from the specified benchmark_file or
1460
        group_file, in the context of guide_directory, profiles_dir,
1461
        env_yaml, and bash_remediation_fns.
1462
1463
        Returns the loaded group or benchmark.
1464
        """
1465
        group = None
1466
        if self.group_file and self.benchmark_file:
1467
            raise ValueError("A .benchmark file and a .group file were found in "
1468
                             "the same directory '%s'" % (guide_directory))
1469
1470
        # we treat benchmark as a special form of group in the following code
1471
        if self.benchmark_file:
1472
            group = Benchmark.from_yaml(
1473
                self.benchmark_file, 'product-name', self.env_yaml
1474
            )
1475
            if self.profiles_dir:
1476
                group.add_profiles_from_dir(self.profiles_dir, self.env_yaml)
1477
            group.add_bash_remediation_fns_from_file(self.bash_remediation_fns)
1478
1479
        if self.group_file:
1480
            group = Group.from_yaml(self.group_file, self.env_yaml)
1481
            self.all_groups.add(group.id_)
1482
1483
        return group
1484
1485 2
    def _load_group_process_and_recurse(self, guide_directory):
1486
        self.loaded_group = self.load_benchmark_or_group(guide_directory)
1487
1488
        if self.loaded_group:
1489
            if self.parent_group:
1490
                self.parent_group.add_group(self.loaded_group, env_yaml=self.env_yaml)
1491
1492
            self._process_values()
1493
            self._recurse_into_subdirs()
1494
            self._process_rules()
1495
1496 2
    def process_directory_tree(self, start_dir, extra_group_dirs=None):
1497
        self._collect_items_to_load(start_dir)
1498
        if extra_group_dirs is not None:
1499
            self.subdirectories += extra_group_dirs
1500
        self._load_group_process_and_recurse(start_dir)
1501
1502 2
    def _recurse_into_subdirs(self):
1503
        for subdir in self.subdirectories:
1504
            loader = self._get_new_loader()
1505
            loader.parent_group = self.loaded_group
1506
            loader.process_directory_tree(subdir)
1507
            self.all_values.update(loader.all_values)
1508
            self.all_rules.update(loader.all_rules)
1509
            self.all_groups.update(loader.all_groups)
1510
1511 2
    def _get_new_loader(self):
1512
        raise NotImplementedError()
1513
1514 2
    def _process_values(self):
1515
        raise NotImplementedError()
1516
1517 2
    def _process_rules(self):
1518
        raise NotImplementedError()
1519
1520
1521 2
class BuildLoader(DirectoryLoader):
1522 2
    def __init__(self, profiles_dir, bash_remediation_fns, env_yaml, resolved_rules_dir=None):
1523
        super(BuildLoader, self).__init__(profiles_dir, bash_remediation_fns, env_yaml)
1524
1525
        self.resolved_rules_dir = resolved_rules_dir
1526
        if resolved_rules_dir and not os.path.isdir(resolved_rules_dir):
1527
            os.mkdir(resolved_rules_dir)
1528
1529 2
    def _process_values(self):
1530
        for value_yaml in self.value_files:
1531
            value = Value.from_yaml(value_yaml, self.env_yaml)
1532
            self.all_values.add(value)
1533
            self.loaded_group.add_value(value)
1534
1535 2
    def _process_rules(self):
1536
        for rule_yaml in self.rule_files:
1537
            try:
1538
                rule = Rule.from_yaml(rule_yaml, self.env_yaml)
1539
            except DocumentationNotComplete:
1540
                # Happens on non-debug build when a rule is "documentation-incomplete"
1541
                continue
1542
            prodtypes = parse_prodtype(rule.prodtype)
1543
            if "all" not in prodtypes and self.product not in prodtypes:
1544
                continue
1545
            self.all_rules.add(rule)
1546
            self.loaded_group.add_rule(rule)
1547
1548
            if self.loaded_group.platforms:
1549
                rule.inherited_platforms += self.loaded_group.platforms
1550
1551
            if self.resolved_rules_dir:
1552
                output_for_rule = os.path.join(
1553
                    self.resolved_rules_dir, "{id_}.yml".format(id_=rule.id_))
1554
                mkdir_p(self.resolved_rules_dir)
1555
                with open(output_for_rule, "w") as f:
1556
                    rule.normalize(self.env_yaml["product"])
1557
                    yaml.dump(rule.to_contents_dict(), f)
1558
1559 2
    def _get_new_loader(self):
1560
        return BuildLoader(
1561
            self.profiles_dir, self.bash_remediation_fns, self.env_yaml, self.resolved_rules_dir)
1562
1563 2
    def export_group_to_file(self, filename):
1564
        return self.loaded_group.to_file(filename)
1565