Passed
Push — master ( 2e0e1e...e323ea )
by Marek
01:51 queued 13s
created

ssg.build_yaml   F

Complexity

Total Complexity 208

Size/Duplication

Total Lines 982
Duplicated Lines 0 %

Test Coverage

Coverage 30.84%

Importance

Changes 0
Metric Value
eloc 728
dl 0
loc 982
ccs 206
cts 668
cp 0.3084
rs 1.872
c 0
b 0
f 0
wmc 208

62 Methods

Rating   Name   Duplication   Size   Complexity  
A Profile.from_yaml() 0 23 3
A Profile.__init__() 0 8 1
A Profile.get_variable_selectors() 0 2 1
A Profile.__sub__() 0 10 1
B Profile.dump_yaml() 0 18 6
A Profile.get_rule_selectors() 0 2 1
A Benchmark.__init__() 0 25 1
B Profile.to_xml_element() 0 29 5
A Value.to_file() 0 4 1
B Benchmark.from_yaml() 0 39 5
B Value.from_yaml() 0 42 6
A Profile._parse_selections() 0 9 4
A Value.to_xml_element() 0 23 5
B Profile.validate_variables() 0 31 5
A Value.__init__() 0 9 1
A Group.add_value() 0 4 2
A BuildLoader.__init__() 0 8 3
B Group.to_xml_element() 0 26 7
A Rule.to_file() 0 4 1
A Group.validate_prodtype() 0 8 3
F Rule.to_xml_element() 0 70 15
A ListInputsLoader.load_benchmark_or_group() 0 10 3
A ListInputsLoader._process_rules() 0 3 2
A Group.add_rule() 0 7 4
B BuildLoader._process_rules() 0 13 6
A DirectoryLoader._recurse_into_subdirs() 0 6 2
C Rule.validate_identifiers() 0 19 9
A Rule._set_attributes_from_dict() 0 10 4
A DirectoryLoader._process_values() 0 2 1
A Benchmark.to_file() 0 4 1
A Benchmark.add_value() 0 4 2
B Rule.from_yaml() 0 33 8
B Group.from_yaml() 0 25 5
A Benchmark.add_rule() 0 4 2
A DirectoryLoader.__init__() 0 16 1
A BuildLoader._get_new_loader() 0 3 1
A Group.to_file() 0 4 1
A Group._pass_our_properties_on_to() 0 4 4
B DirectoryLoader.load_benchmark_or_group() 0 26 6
A Rule.validate_prodtype() 0 8 3
C DirectoryLoader._collect_items_to_load() 0 27 11
A DirectoryLoader._get_new_loader() 0 2 1
C Rule.validate_references() 0 20 9
B Benchmark.add_profiles_from_dir() 0 25 7
A BuildLoader.export_group_to_file() 0 2 1
A Benchmark.to_xccdf() 0 5 1
A Benchmark.add_bash_remediation_fns_from_file() 0 6 2
A Group.__init__() 0 10 1
A DirectoryLoader.process_directory_tree() 0 3 1
A BuildLoader._process_values() 0 5 2
A ListInputsLoader.__init__() 0 4 1
A Group.add_group() 0 7 4
A Benchmark.add_group() 0 4 2
A Benchmark.__str__() 0 2 1
A DirectoryLoader._load_group_process_and_recurse() 0 10 3
A DirectoryLoader._process_rules() 0 2 1
A Group.__str__() 0 2 1
A Rule.__init__() 0 14 1
A Rule.to_contents_dict() 0 10 2
A ListInputsLoader._process_values() 0 3 2
A ListInputsLoader._get_new_loader() 0 3 1
B Benchmark.to_xml_element() 0 42 7

2 Functions

Rating   Name   Duplication   Size   Complexity  
A add_warning_elements() 0 15 2
A add_sub_element() 0 28 2

How to fix   Complexity   

Complexity

Complex classes like ssg.build_yaml often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 2
from __future__ import absolute_import
2 2
from __future__ import print_function
3
4 2
import os
5 2
import os.path
6 2
import datetime
7 2
import sys
8
9 2
import yaml
10
11 2
from .constants import XCCDF_PLATFORM_TO_CPE
12 2
from .constants import PRODUCT_TO_CPE_MAPPING
13 2
from .rules import get_rule_dir_id, get_rule_dir_yaml, is_rule_dir
14 2
from .rule_yaml import parse_prodtype
15
16 2
from .checks import is_cce_format_valid, is_cce_value_valid
17 2
from .yaml import DocumentationNotComplete, open_and_expand, open_and_macro_expand
18 2
from .utils import required_key, mkdir_p
19
20 2
from .xml import ElementTree as ET
21 2
from .shims import unicode_func
22
23
24 2
def add_sub_element(parent, tag, data):
25
    """
26
    Creates a new child element under parent with tag tag, and sets
27
    data as the content under the tag. In particular, data is a string
28
    to be parsed as an XML tree, allowing sub-elements of children to be
29
    added.
30
31
    If data should not be parsed as an XML tree, either escape the contents
32
    before passing into this function, or use ElementTree.SubElement().
33
34
    Returns the newly created subelement of type tag.
35
    """
36
    # This is used because our YAML data contain XML and XHTML elements
37
    # ET.SubElement() escapes the < > characters by &lt; and &gt;
38
    # and therefore it does not add child elements
39
    # we need to do a hack instead
40
    # TODO: Remove this function after we move to Markdown everywhere in SSG
41
    ustr = unicode_func("<{0}>{1}</{0}>").format(tag, data)
42
43
    try:
44
        element = ET.fromstring(ustr.encode("utf-8"))
45
    except Exception:
46
        msg = ("Error adding subelement to an element '{0}' from string: '{1}'"
47
               .format(parent.tag, ustr))
48
        raise RuntimeError(msg)
49
50
    parent.append(element)
51
    return element
52
53
54 2
def add_warning_elements(element, warnings):
55
    # The use of [{dict}, {dict}] in warnings is to handle the following
56
    # scenario where multiple warnings have the same category which is
57
    # valid in SCAP and our content:
58
    #
59
    # warnings:
60
    #     - general: Some general warning
61
    #     - general: Some other general warning
62
    #     - general: |-
63
    #         Some really long multiline general warning
64
    #
65
    # Each of the {dict} should have only one key/value pair.
66
    for warning_dict in warnings:
67
        warning = add_sub_element(element, "warning", list(warning_dict.values())[0])
68
        warning.set("category", list(warning_dict.keys())[0])
69
70
71 2
class Profile(object):
72
    """Represents XCCDF profile
73
    """
74
75 2
    def __init__(self, id_):
76 2
        self.id_ = id_
77 2
        self.title = ""
78 2
        self.description = ""
79 2
        self.extends = None
80 2
        self.selected = []
81 2
        self.unselected = []
82 2
        self.variables = dict()
83
84 2
    @staticmethod
85 2
    def from_yaml(yaml_file, env_yaml=None):
86 2
        yaml_contents = open_and_expand(yaml_file, env_yaml)
87 2
        if yaml_contents is None:
88
            return None
89
90 2
        basename, _ = os.path.splitext(os.path.basename(yaml_file))
91
92 2
        profile = Profile(basename)
93 2
        profile.title = required_key(yaml_contents, "title")
94 2
        del yaml_contents["title"]
95 2
        profile.description = required_key(yaml_contents, "description")
96 2
        del yaml_contents["description"]
97 2
        profile.extends = yaml_contents.pop("extends", None)
98 2
        selection_entries = required_key(yaml_contents, "selections")
99 2
        profile._parse_selections(selection_entries)
100 2
        del yaml_contents["selections"]
101
102 2
        if yaml_contents:
103
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
104
                               % (yaml_file, yaml_contents))
105
106 2
        return profile
107
108 2
    def dump_yaml(self, file_name, documentation_complete=True):
109
        to_dump = {}
110
        to_dump["documentation_complete"] = documentation_complete
111
        to_dump["title"] = self.title
112
        to_dump["description"] = self.description
113
        if self.extends is not None:
114
            to_dump["extends"] = self.extends
115
116
        selections = []
117
        for item in self.selected:
118
            selections.append(item)
119
        for item in self.unselected:
120
            selections.append("!"+item)
121
        for varname in self.variables.keys():
122
            selections.append(varname+"="+self.variables.get(varname))
123
        to_dump["selections"] = selections
124
        with open(file_name, "w+") as f:
125
            yaml.dump(to_dump, f, indent=4)
126
127 2
    def _parse_selections(self, entries):
128 2
        for item in entries:
129 2
            if "=" in item:
130 2
                varname, value = item.split("=", 1)
131 2
                self.variables[varname] = value
132 2
            elif item.startswith("!"):
133
                self.unselected.append(item[1:])
134
            else:
135 2
                self.selected.append(item)
136
137 2
    def to_xml_element(self):
138
        element = ET.Element('Profile')
139
        element.set("id", self.id_)
140
        if self.extends:
141
            element.set("extends", self.extends)
142
        title = add_sub_element(element, "title", self.title)
143
        title.set("override", "true")
144
        desc = add_sub_element(element, "description", self.description)
145
        desc.set("override", "true")
146
147
        for selection in self.selected:
148
            select = ET.Element("select")
149
            select.set("idref", selection)
150
            select.set("selected", "true")
151
            element.append(select)
152
153
        for selection in self.unselected:
154
            unselect = ET.Element("select")
155
            unselect.set("idref", selection)
156
            unselect.set("selected", "false")
157
            element.append(unselect)
158
159
        for value_id, selector in self.variables.items():
160
            refine_value = ET.Element("refine-value")
161
            refine_value.set("idref", value_id)
162
            refine_value.set("selector", selector)
163
            element.append(refine_value)
164
165
        return element
166
167 2
    def get_rule_selectors(self):
168 2
        return list(self.selected + self.unselected)
169
170 2
    def get_variable_selectors(self):
171 2
        return self.variables
172
173 2
    def validate_variables(self, variables):
174
        variables_by_id = dict()
175
        for var in variables:
176
            variables_by_id[var.id_] = var
177
178
        for var_id, our_val in self.variables.items():
179
            if var_id not in variables_by_id:
180
                all_vars_list = [" - %s" % v for v in variables_by_id.keys()]
181
                msg = (
182
                    "Value '{var_id}' in profile '{profile_name}' is not known. "
183
                    "We know only variables:\n{var_names}"
184
                    .format(
185
                        var_id=var_id, profile_name=self.id_,
186
                        var_names="\n".join(sorted(all_vars_list)))
187
                )
188
                raise ValueError(msg)
189
190
            allowed_selectors = [str(s) for s in variables_by_id[var_id].options.keys()]
191
            if our_val not in allowed_selectors:
192
                msg = (
193
                    "Value '{var_id}' in profile '{profile_name}' "
194
                    "uses the selector '{our_val}'. "
195
                    "This is not possible, as only selectors {all_selectors} are available. "
196
                    "Either change the selector used in the profile, or "
197
                    "add the selector-value pair to the variable definition."
198
                    .format(
199
                        var_id=var_id, profile_name=self.id_, our_val=our_val,
200
                        all_selectors=allowed_selectors,
201
                    )
202
                )
203
                raise ValueError(msg)
204
205 2
    def __sub__(self, other):
206
        profile = Profile(self.id_)
207
        profile.title = self.title
208
        profile.description = self.description
209
        profile.extends = self.extends
210
        profile.selected = list(set(self.selected) - set(other.selected))
211
        profile.selected.sort()
212
        profile.unselected = list(set(self.unselected) - set(other.unselected))
213
        profile.variables = self.variables
214
        return profile
215
216
217 2
class Value(object):
218
    """Represents XCCDF Value
219
    """
220
221 2
    def __init__(self, id_):
222 2
        self.id_ = id_
223 2
        self.title = ""
224 2
        self.description = ""
225 2
        self.type_ = "string"
226 2
        self.operator = "equals"
227 2
        self.interactive = False
228 2
        self.options = {}
229 2
        self.warnings = []
230
231 2
    @staticmethod
232 2
    def from_yaml(yaml_file, env_yaml=None):
233 2
        yaml_contents = open_and_expand(yaml_file, env_yaml)
234 2
        if yaml_contents is None:
235
            return None
236
237 2
        value_id, _ = os.path.splitext(os.path.basename(yaml_file))
238 2
        value = Value(value_id)
239 2
        value.title = required_key(yaml_contents, "title")
240 2
        del yaml_contents["title"]
241 2
        value.description = required_key(yaml_contents, "description")
242 2
        del yaml_contents["description"]
243 2
        value.type_ = required_key(yaml_contents, "type")
244 2
        del yaml_contents["type"]
245 2
        value.operator = yaml_contents.pop("operator", "equals")
246 2
        possible_operators = ["equals", "not equal", "greater than",
247
                              "less than", "greater than or equal",
248
                              "less than or equal", "pattern match"]
249
250 2
        if value.operator not in possible_operators:
251
            raise ValueError(
252
                "Found an invalid operator value '%s' in '%s'. "
253
                "Expected one of: %s"
254
                % (value.operator, yaml_file, ", ".join(possible_operators))
255
            )
256
257 2
        value.interactive = \
258
            yaml_contents.pop("interactive", "false").lower() == "true"
259
260 2
        value.options = required_key(yaml_contents, "options")
261 2
        del yaml_contents["options"]
262 2
        value.warnings = yaml_contents.pop("warnings", [])
263
264 2
        for warning_list in value.warnings:
265
            if len(warning_list) != 1:
266
                raise ValueError("Only one key/value pair should exist for each dictionary")
267
268 2
        if yaml_contents:
269
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
270
                               % (yaml_file, yaml_contents))
271
272 2
        return value
273
274 2
    def to_xml_element(self):
275
        value = ET.Element('Value')
276
        value.set('id', self.id_)
277
        value.set('type', self.type_)
278
        if self.operator != "equals":  # equals is the default
279
            value.set('operator', self.operator)
280
        if self.interactive:  # False is the default
281
            value.set('interactive', 'true')
282
        title = ET.SubElement(value, 'title')
283
        title.text = self.title
284
        add_sub_element(value, 'description', self.description)
285
        add_warning_elements(value, self.warnings)
286
287
        for selector, option in self.options.items():
288
            # do not confuse Value with big V with value with small v
289
            # value is child element of Value
290
            value_small = ET.SubElement(value, 'value')
291
            # by XCCDF spec, default value is value without selector
292
            if selector != "default":
293
                value_small.set('selector', str(selector))
294
            value_small.text = str(option)
295
296
        return value
297
298 2
    def to_file(self, file_name):
299
        root = self.to_xml_element()
300
        tree = ET.ElementTree(root)
301
        tree.write(file_name)
302
303
304 2
class Benchmark(object):
305
    """Represents XCCDF Benchmark
306
    """
307 2
    def __init__(self, id_):
308
        self.id_ = id_
309
        self.title = ""
310
        self.status = ""
311
        self.description = ""
312
        self.notice_id = ""
313
        self.notice_description = ""
314
        self.front_matter = ""
315
        self.rear_matter = ""
316
        self.cpes = []
317
        self.version = "0.1"
318
        self.profiles = []
319
        self.values = {}
320
        self.bash_remediation_fns_group = None
321
        self.groups = {}
322
        self.rules = {}
323
324
        # This is required for OCIL clauses
325
        conditional_clause = Value("conditional_clause")
326
        conditional_clause.title = "A conditional clause for check statements."
327
        conditional_clause.description = conditional_clause.title
328
        conditional_clause.type_ = "string"
329
        conditional_clause.options = {"": "This is a placeholder"}
330
331
        self.add_value(conditional_clause)
332
333 2
    @staticmethod
334 2
    def from_yaml(yaml_file, id_, product_yaml=None):
335
        yaml_contents = open_and_macro_expand(yaml_file, product_yaml)
336
        if yaml_contents is None:
337
            return None
338
339
        benchmark = Benchmark(id_)
340
        benchmark.title = required_key(yaml_contents, "title")
341
        del yaml_contents["title"]
342
        benchmark.status = required_key(yaml_contents, "status")
343
        del yaml_contents["status"]
344
        benchmark.description = required_key(yaml_contents, "description")
345
        del yaml_contents["description"]
346
        notice_contents = required_key(yaml_contents, "notice")
347
        benchmark.notice_id = required_key(notice_contents, "id")
348
        del notice_contents["id"]
349
        benchmark.notice_description = required_key(notice_contents,
350
                                                    "description")
351
        del notice_contents["description"]
352
        if not notice_contents:
353
            del yaml_contents["notice"]
354
355
        benchmark.front_matter = required_key(yaml_contents,
356
                                              "front-matter")
357
        del yaml_contents["front-matter"]
358
        benchmark.rear_matter = required_key(yaml_contents,
359
                                             "rear-matter")
360
        del yaml_contents["rear-matter"]
361
        benchmark.version = str(required_key(yaml_contents, "version"))
362
        del yaml_contents["version"]
363
364
        if yaml_contents:
365
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
366
                               % (yaml_file, yaml_contents))
367
368
        if product_yaml:
369
            benchmark.cpes = PRODUCT_TO_CPE_MAPPING[product_yaml["product"]]
370
371
        return benchmark
372
373 2
    def add_profiles_from_dir(self, action, dir_, env_yaml):
374
        for dir_item in os.listdir(dir_):
375
            dir_item_path = os.path.join(dir_, dir_item)
376
            if not os.path.isfile(dir_item_path):
377
                continue
378
379
            _, ext = os.path.splitext(os.path.basename(dir_item_path))
380
            if ext != '.profile':
381
                sys.stderr.write(
382
                    "Encountered file '%s' while looking for profiles, "
383
                    "extension '%s' is unknown. Skipping..\n"
384
                    % (dir_item, ext)
385
                )
386
                continue
387
388
            try:
389
                new_profile = Profile.from_yaml(dir_item_path, env_yaml)
390
            except DocumentationNotComplete:
391
                continue
392
            if new_profile is None:
393
                continue
394
395
            self.profiles.append(new_profile)
396
            if action == "list-inputs":
397
                print(dir_item_path)
398
399 2
    def add_bash_remediation_fns_from_file(self, action, file_):
400
        if action == "list-inputs":
401
            print(file_)
402
        else:
403
            tree = ET.parse(file_)
404
            self.bash_remediation_fns_group = tree.getroot()
405
406 2
    def to_xml_element(self):
407
        root = ET.Element('Benchmark')
408
        root.set('xmlns:xsi', 'http://www.w3.org/2001/XMLSchema-instance')
409
        root.set('xmlns:xhtml', 'http://www.w3.org/1999/xhtml')
410
        root.set('xmlns:dc', 'http://purl.org/dc/elements/1.1/')
411
        root.set('id', 'product-name')
412
        root.set('xsi:schemaLocation',
413
                 'http://checklists.nist.gov/xccdf/1.1 xccdf-1.1.4.xsd')
414
        root.set('style', 'SCAP_1.1')
415
        root.set('resolved', 'false')
416
        root.set('xml:lang', 'en-US')
417
        status = ET.SubElement(root, 'status')
418
        status.set('date', datetime.date.today().strftime("%Y-%m-%d"))
419
        status.text = self.status
420
        add_sub_element(root, "title", self.title)
421
        add_sub_element(root, "description", self.description)
422
        notice = add_sub_element(root, "notice", self.notice_description)
423
        notice.set('id', self.notice_id)
424
        add_sub_element(root, "front-matter", self.front_matter)
425
        add_sub_element(root, "rear-matter", self.rear_matter)
426
427
        for idref in self.cpes:
428
            plat = ET.SubElement(root, "platform")
429
            plat.set("idref", idref)
430
431
        version = ET.SubElement(root, 'version')
432
        version.text = self.version
433
        ET.SubElement(root, "metadata")
434
435
        for profile in self.profiles:
436
            root.append(profile.to_xml_element())
437
438
        for value in self.values.values():
439
            root.append(value.to_xml_element())
440
        if self.bash_remediation_fns_group is not None:
441
            root.append(self.bash_remediation_fns_group)
442
        for group in self.groups.values():
443
            root.append(group.to_xml_element())
444
        for rule in self.rules.values():
445
            root.append(rule.to_xml_element())
446
447
        return root
448
449 2
    def to_file(self, file_name):
450
        root = self.to_xml_element()
451
        tree = ET.ElementTree(root)
452
        tree.write(file_name)
453
454 2
    def add_value(self, value):
455
        if value is None:
456
            return
457
        self.values[value.id_] = value
458
459 2
    def add_group(self, group):
460
        if group is None:
461
            return
462
        self.groups[group.id_] = group
463
464 2
    def add_rule(self, rule):
465
        if rule is None:
466
            return
467
        self.rules[rule.id_] = rule
468
469 2
    def to_xccdf(self):
470
        """We can easily extend this script to generate a valid XCCDF instead
471
        of SSG SHORTHAND.
472
        """
473
        raise NotImplementedError
474
475 2
    def __str__(self):
476
        return self.id_
477
478
479 2
class Group(object):
480
    """Represents XCCDF Group
481
    """
482 2
    ATTRIBUTES_TO_PASS_ON = (
483
        "platform",
484
    )
485
486 2
    def __init__(self, id_):
487
        self.id_ = id_
488
        self.prodtype = "all"
489
        self.title = ""
490
        self.description = ""
491
        self.warnings = []
492
        self.values = {}
493
        self.groups = {}
494
        self.rules = {}
495
        self.platform = None
496
497 2
    @staticmethod
498 2
    def from_yaml(yaml_file, env_yaml=None):
499
        yaml_contents = open_and_macro_expand(yaml_file, env_yaml)
500
        if yaml_contents is None:
501
            return None
502
503
        group_id = os.path.basename(os.path.dirname(yaml_file))
504
        group = Group(group_id)
505
        group.prodtype = yaml_contents.pop("prodtype", "all")
506
        group.title = required_key(yaml_contents, "title")
507
        del yaml_contents["title"]
508
        group.description = required_key(yaml_contents, "description")
509
        del yaml_contents["description"]
510
        group.warnings = yaml_contents.pop("warnings", [])
511
        group.platform = yaml_contents.pop("platform", None)
512
513
        for warning_list in group.warnings:
514
            if len(warning_list) != 1:
515
                raise ValueError("Only one key/value pair should exist for each dictionary")
516
517
        if yaml_contents:
518
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
519
                               % (yaml_file, yaml_contents))
520
        group.validate_prodtype(yaml_file)
521
        return group
522
523 2
    def validate_prodtype(self, yaml_file):
524
        for ptype in self.prodtype.split(","):
525
            if ptype.strip() != ptype:
526
                msg = (
527
                    "Comma-separated '{prodtype}' prodtype "
528
                    "in {yaml_file} contains whitespace."
529
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
530
                raise ValueError(msg)
531
532 2
    def to_xml_element(self):
533
        group = ET.Element('Group')
534
        group.set('id', self.id_)
535
        if self.prodtype != "all":
536
            group.set("prodtype", self.prodtype)
537
        title = ET.SubElement(group, 'title')
538
        title.text = self.title
539
        add_sub_element(group, 'description', self.description)
540
        add_warning_elements(group, self.warnings)
541
542
        if self.platform:
543
            platform_el = ET.SubElement(group, "platform")
544
            try:
545
                platform_cpe = XCCDF_PLATFORM_TO_CPE[self.platform]
546
            except KeyError:
547
                raise ValueError("Unsupported platform '%s' in rule '%s'." % (self.platform, self.id_))
548
            platform_el.set("idref", platform_cpe)
549
550
        for _value in self.values.values():
551
            group.append(_value.to_xml_element())
552
        for _group in self.groups.values():
553
            group.append(_group.to_xml_element())
554
        for _rule in self.rules.values():
555
            group.append(_rule.to_xml_element())
556
557
        return group
558
559 2
    def to_file(self, file_name):
560
        root = self.to_xml_element()
561
        tree = ET.ElementTree(root)
562
        tree.write(file_name)
563
564 2
    def add_value(self, value):
565
        if value is None:
566
            return
567
        self.values[value.id_] = value
568
569 2
    def add_group(self, group):
570
        if group is None:
571
            return
572
        if self.platform and not group.platform:
573
            group.platform = self.platform
574
        self.groups[group.id_] = group
575
        self._pass_our_properties_on_to(group)
576
577 2
    def _pass_our_properties_on_to(self, obj):
578
        for attr in self.ATTRIBUTES_TO_PASS_ON:
579
            if hasattr(obj, attr) and getattr(obj, attr) is None:
580
                setattr(obj, attr, getattr(self, attr))
581
582 2
    def add_rule(self, rule):
583
        if rule is None:
584
            return
585
        if self.platform and not rule.platform:
586
            rule.platform = self.platform
587
        self.rules[rule.id_] = rule
588
        self._pass_our_properties_on_to(rule)
589
590 2
    def __str__(self):
591
        return self.id_
592
593
594 2
class Rule(object):
595
    """Represents XCCDF Rule
596
    """
597 2
    YAML_KEYS_DEFAULTS = {
598
        "prodtype": lambda: "all",
599
        "title": lambda: RuntimeError("Missing key 'title'"),
600
        "description": lambda: RuntimeError("Missing key 'description'"),
601
        "rationale": lambda: RuntimeError("Missing key 'rationale'"),
602
        "severity": lambda: RuntimeError("Missing key 'severity'"),
603
        "references": lambda: dict(),
604
        "identifiers": lambda: dict(),
605
        "ocil_clause": lambda: None,
606
        "ocil": lambda: None,
607
        "oval_external_content": lambda: None,
608
        "warnings": lambda: list(),
609
        "platform": lambda: None,
610
    }
611
612 2
    def __init__(self, id_):
613 2
        self.id_ = id_
614 2
        self.prodtype = "all"
615 2
        self.title = ""
616 2
        self.description = ""
617 2
        self.rationale = ""
618 2
        self.severity = "unknown"
619 2
        self.references = {}
620 2
        self.identifiers = {}
621 2
        self.ocil_clause = None
622 2
        self.ocil = None
623 2
        self.oval_external_content = None
624 2
        self.warnings = []
625 2
        self.platform = None
626
627 2
    @staticmethod
628 2
    def from_yaml(yaml_file, env_yaml=None):
629 2
        yaml_file = os.path.normpath(yaml_file)
630
631 2
        yaml_contents = open_and_macro_expand(yaml_file, env_yaml)
632 2
        if yaml_contents is None:
633
            return None
634
635 2
        rule_id, ext = os.path.splitext(os.path.basename(yaml_file))
636 2
        if rule_id == "rule" and ext == ".yml":
637 2
            rule_id = get_rule_dir_id(yaml_file)
638
639 2
        rule = Rule(rule_id)
640
641 2
        try:
642 2
            rule._set_attributes_from_dict(yaml_contents)
643
        except RuntimeError as exc:
644
            msg = ("Error processing '{fname}': {err}"
645
                   .format(fname=yaml_file, err=str(exc)))
646
            raise RuntimeError(msg)
647
648 2
        for warning_list in rule.warnings:
649
            if len(warning_list) != 1:
650
                raise ValueError("Only one key/value pair should exist for each dictionary")
651
652 2
        if yaml_contents:
653
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
654
                               % (yaml_file, yaml_contents))
655
656 2
        rule.validate_prodtype(yaml_file)
657 2
        rule.validate_identifiers(yaml_file)
658 2
        rule.validate_references(yaml_file)
659 2
        return rule
660
661 2
    def _set_attributes_from_dict(self, yaml_contents):
662 2
        for key, default_getter in self.YAML_KEYS_DEFAULTS.items():
663 2
            if key not in yaml_contents:
664 2
                value = default_getter()
665 2
                if isinstance(value, Exception):
666
                    raise value
667
            else:
668 2
                value = yaml_contents.pop(key)
669
670 2
            setattr(self, key, value)
671
672 2
    def to_contents_dict(self):
673
        """
674
        Returns a dictionary that is the same schema as the dict obtained when loading rule YAML.
675
        """
676
677 2
        yaml_contents = dict()
678 2
        for key in Rule.YAML_KEYS_DEFAULTS:
679 2
            yaml_contents[key] = getattr(self, key)
680
681 2
        return yaml_contents
682
683 2
    def validate_identifiers(self, yaml_file):
684 2
        if self.identifiers is None:
685
            raise ValueError("Empty identifier section in file %s" % yaml_file)
686
687
        # Validate all identifiers are non-empty:
688 2
        for ident_type, ident_val in self.identifiers.items():
689 2
            if not isinstance(ident_type, str) or not isinstance(ident_val, str):
690
                raise ValueError("Identifiers and values must be strings: %s in file %s"
691
                                 % (ident_type, yaml_file))
692 2
            if ident_val.strip() == "":
693
                raise ValueError("Identifiers must not be empty: %s in file %s"
694
                                 % (ident_type, yaml_file))
695 2
            if ident_type[0:3] == 'cce':
696 2
                if not is_cce_format_valid("CCE-" + ident_val):
697
                    raise ValueError("CCE Identifier format must be valid: invalid format '%s' for CEE '%s'"
698
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
699 2
                if not is_cce_value_valid("CCE-" + ident_val):
700
                    raise ValueError("CCE Identifier value is not a valid checksum: invalid value '%s' for CEE '%s'"
701
                                     " in file '%s'" % (ident_val, ident_type, yaml_file))
702
703 2
    def validate_references(self, yaml_file):
704 2
        if self.references is None:
705
            raise ValueError("Empty references section in file %s" % yaml_file)
706
707 2
        for ref_type, ref_val in self.references.items():
708 2
            if not isinstance(ref_type, str) or not isinstance(ref_val, str):
709
                raise ValueError("References and values must be strings: %s in file %s"
710
                                 % (ref_type, yaml_file))
711 2
            if ref_val.strip() == "":
712
                raise ValueError("References must not be empty: %s in file %s"
713
                                 % (ref_type, yaml_file))
714
715 2
        for ref_type, ref_val in self.references.items():
716 2
            for ref in ref_val.split(","):
717 2
                if ref.strip() != ref:
718
                    msg = (
719
                        "Comma-separated '{ref_type}' reference "
720
                        "in {yaml_file} contains whitespace."
721
                        .format(ref_type=ref_type, yaml_file=yaml_file))
722
                    raise ValueError(msg)
723
724 2
    def validate_prodtype(self, yaml_file):
725 2
        for ptype in self.prodtype.split(","):
726 2
            if ptype.strip() != ptype:
727
                msg = (
728
                    "Comma-separated '{prodtype}' prodtype "
729
                    "in {yaml_file} contains whitespace."
730
                    .format(prodtype=self.prodtype, yaml_file=yaml_file))
731
                raise ValueError(msg)
732
733 2
    def to_xml_element(self):
734
        rule = ET.Element('Rule')
735
        rule.set('id', self.id_)
736
        if self.prodtype != "all":
737
            rule.set("prodtype", self.prodtype)
738
        rule.set('severity', self.severity)
739
        add_sub_element(rule, 'title', self.title)
740
        add_sub_element(rule, 'description', self.description)
741
        add_sub_element(rule, 'rationale', self.rationale)
742
743
        main_ident = ET.Element('ident')
744
        for ident_type, ident_val in self.identifiers.items():
745
            if '@' in ident_type:
746
                # the ident is applicable only on some product
747
                # format : 'policy@product', eg. 'stigid@product'
748
                # for them, we create a separate <ref> element
749
                policy, product = ident_type.split('@')
750
                ident = ET.SubElement(rule, 'ident')
751
                ident.set(policy, ident_val)
752
                ident.set('prodtype', product)
753
            else:
754
                main_ident.set(ident_type, ident_val)
755
756
        if main_ident.attrib:
757
            rule.append(main_ident)
758
759
        main_ref = ET.Element('ref')
760
        for ref_type, ref_val in self.references.items():
761
            if '@' in ref_type:
762
                # the reference is applicable only on some product
763
                # format : 'policy@product', eg. 'stigid@product'
764
                # for them, we create a separate <ref> element
765
                policy, product = ref_type.split('@')
766
                ref = ET.SubElement(rule, 'ref')
767
                ref.set(policy, ref_val)
768
                ref.set('prodtype', product)
769
            else:
770
                main_ref.set(ref_type, ref_val)
771
772
        if main_ref.attrib:
773
            rule.append(main_ref)
774
775
        if self.oval_external_content:
776
            check = ET.SubElement(rule, 'check')
777
            check.set("system", "http://oval.mitre.org/XMLSchema/oval-definitions-5")
778
            external_content = ET.SubElement(check, "check-content-ref")
779
            external_content.set("href", self.oval_external_content)
780
        else:
781
            # TODO: This is pretty much a hack, oval ID will be the same as rule ID
782
            #       and we don't want the developers to have to keep them in sync.
783
            #       Therefore let's just add an OVAL ref of that ID.
784
            oval_ref = ET.SubElement(rule, "oval")
785
            oval_ref.set("id", self.id_)
786
787
        if self.ocil or self.ocil_clause:
788
            ocil = add_sub_element(rule, 'ocil', self.ocil if self.ocil else "")
789
            if self.ocil_clause:
790
                ocil.set("clause", self.ocil_clause)
791
792
        add_warning_elements(rule, self.warnings)
793
794
        if self.platform:
795
            platform_el = ET.SubElement(rule, "platform")
796
            try:
797
                platform_cpe = XCCDF_PLATFORM_TO_CPE[self.platform]
798
            except KeyError:
799
                raise ValueError("Unsupported platform '%s' in rule '%s'." % (self.platform, self.id_))
800
            platform_el.set("idref", platform_cpe)
801
802
        return rule
803
804 2
    def to_file(self, file_name):
805
        root = self.to_xml_element()
806
        tree = ET.ElementTree(root)
807
        tree.write(file_name)
808
809
810 2
class DirectoryLoader(object):
811 2
    def __init__(self, profiles_dir, bash_remediation_fns, env_yaml):
812
        self.benchmark_file = None
813
        self.group_file = None
814
        self.loaded_group = None
815
        self.rule_files = []
816
        self.value_files = []
817
        self.subdirectories = []
818
819
        self.all_values = set()
820
821
        self.profiles_dir = profiles_dir
822
        self.bash_remediation_fns = bash_remediation_fns
823
        self.env_yaml = env_yaml
824
        self.product = env_yaml["product"]
825
826
        self.parent_group = None
827
828 2
    def _collect_items_to_load(self, guide_directory):
829
        for dir_item in os.listdir(guide_directory):
830
            dir_item_path = os.path.join(guide_directory, dir_item)
831
            _, extension = os.path.splitext(dir_item)
832
833
            if extension == '.var':
834
                self.value_files.append(dir_item_path)
835
            elif dir_item == "benchmark.yml":
836
                if self.benchmark_file:
837
                    raise ValueError("Multiple benchmarks in one directory")
838
                self.benchmark_file = dir_item_path
839
            elif dir_item == "group.yml":
840
                if self.group_file:
841
                    raise ValueError("Multiple groups in one directory")
842
                self.group_file = dir_item_path
843
            elif extension == '.rule':
844
                self.rule_files.append(dir_item_path)
845
            elif is_rule_dir(dir_item_path):
846
                self.rule_files.append(get_rule_dir_yaml(dir_item_path))
847
            elif dir_item != "tests":
848
                if os.path.isdir(dir_item_path):
849
                    self.subdirectories.append(dir_item_path)
850
                else:
851
                    sys.stderr.write(
852
                        "Encountered file '%s' while recursing, extension '%s' "
853
                        "is unknown. Skipping..\n"
854
                        % (dir_item, extension)
855
                    )
856
857 2
    def load_benchmark_or_group(self, guide_directory):
858
        """
859
        Loads a given benchmark or group from the specified benchmark_file or
860
        group_file, in the context of guide_directory, action, profiles_dir,
861
        env_yaml, and bash_remediation_fns.
862
863
        Returns the loaded group or benchmark.
864
        """
865
        group = None
866
        if self.group_file and self.benchmark_file:
867
            raise ValueError("A .benchmark file and a .group file were found in "
868
                             "the same directory '%s'" % (guide_directory))
869
870
        # we treat benchmark as a special form of group in the following code
871
        if self.benchmark_file:
872
            group = Benchmark.from_yaml(
873
                self.benchmark_file, 'product-name', self.env_yaml
874
            )
875
            if self.profiles_dir:
876
                group.add_profiles_from_dir(self.action, self.profiles_dir, self.env_yaml)
877
            group.add_bash_remediation_fns_from_file(self.action, self.bash_remediation_fns)
878
879
        if self.group_file:
880
            group = Group.from_yaml(self.group_file, self.env_yaml)
881
882
        return group
883
884 2
    def _load_group_process_and_recurse(self, guide_directory):
885
        self.loaded_group = self.load_benchmark_or_group(guide_directory)
886
887
        if self.loaded_group:
888
            if self.parent_group:
889
                self.parent_group.add_group(self.loaded_group)
890
891
            self._process_values()
892
            self._recurse_into_subdirs()
893
            self._process_rules()
894
895 2
    def process_directory_tree(self, start_dir):
896
        self._collect_items_to_load(start_dir)
897
        self._load_group_process_and_recurse(start_dir)
898
899 2
    def _recurse_into_subdirs(self):
900
        for subdir in self.subdirectories:
901
            loader = self._get_new_loader()
902
            loader.parent_group = self.loaded_group
903
            loader.process_directory_tree(subdir)
904
            self.all_values.update(loader.all_values)
905
906 2
    def _get_new_loader(self):
907
        raise NotImplementedError()
908
909 2
    def _process_values(self):
910
        raise NotImplementedError()
911
912 2
    def _process_rules(self):
913
        raise NotImplementedError()
914
915
916 2
class BuildLoader(DirectoryLoader):
917 2
    def __init__(self, profiles_dir, bash_remediation_fns, env_yaml, resolved_rules_dir=None):
918
        super(BuildLoader, self).__init__(profiles_dir, bash_remediation_fns, env_yaml)
919
920
        self.action = "build"
921
922
        self.resolved_rules_dir = resolved_rules_dir
923
        if resolved_rules_dir and not os.path.isdir(resolved_rules_dir):
924
            os.mkdir(resolved_rules_dir)
925
926 2
    def _process_values(self):
927
        for value_yaml in self.value_files:
928
            value = Value.from_yaml(value_yaml, self.env_yaml)
929
            self.all_values.add(value)
930
            self.loaded_group.add_value(value)
931
932 2
    def _process_rules(self):
933
        for rule_yaml in self.rule_files:
934
            rule = Rule.from_yaml(rule_yaml, self.env_yaml)
935
            prodtypes = parse_prodtype(rule.prodtype)
936
            if "all" not in prodtypes and self.product not in prodtypes:
937
                continue
938
            self.loaded_group.add_rule(rule)
939
            if self.resolved_rules_dir:
940
                output_for_rule = os.path.join(
941
                    self.resolved_rules_dir, "{id_}.yml".format(id_=rule.id_))
942
                mkdir_p(self.resolved_rules_dir)
943
                with open(output_for_rule, "w") as f:
944
                    yaml.dump(rule.to_contents_dict(), f)
945
946 2
    def _get_new_loader(self):
947
        return BuildLoader(
948
            self.profiles_dir, self.bash_remediation_fns, self.env_yaml, self.resolved_rules_dir)
949
950 2
    def export_group_to_file(self, filename):
951
        return self.loaded_group.to_file(filename)
952
953
954 2
class ListInputsLoader(DirectoryLoader):
955 2
    def __init__(self, profiles_dir, bash_remediation_fns, env_yaml):
956
        super(ListInputsLoader, self).__init__(profiles_dir, bash_remediation_fns, env_yaml)
957
958
        self.action = "list-inputs"
959
960 2
    def _process_values(self):
961
        for value_yaml in self.value_files:
962
            print(value_yaml)
963
964 2
    def _process_rules(self):
965
        for rule_yaml in self.rule_files:
966
            print(rule_yaml)
967
968 2
    def _get_new_loader(self):
969
        return ListInputsLoader(
970
            self.profiles_dir, self.bash_remediation_fns, self.env_yaml)
971
972 2
    def load_benchmark_or_group(self, guide_directory):
973
        result = super(ListInputsLoader, self).load_benchmark_or_group(guide_directory)
974
975
        if self.benchmark_file:
976
            print(self.benchmark_file)
977
978
        if self.group_file:
979
            print(self.group_file)
980
981
        return result
982