Passed
Pull Request — master (#3825)
by Matěj
01:55
created

ssg.build_yaml.Group.__init__()   A

Complexity

Conditions 1

Size

Total Lines 10
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.729

Importance

Changes 0
Metric Value
cc 1
eloc 10
nop 2
dl 0
loc 10
ccs 1
cts 10
cp 0.1
crap 1.729
rs 9.9
c 0
b 0
f 0
1 2
from __future__ import absolute_import
2 2
from __future__ import print_function
3
4 2
import os
5 2
import os.path
6 2
import datetime
7 2
import sys
8
9 2
import yaml
10
11 2
from .constants import XCCDF_PLATFORM_TO_CPE
12 2
from .constants import PRODUCT_TO_CPE_MAPPING
13 2
from .rules import get_rule_dir_id, get_rule_dir_yaml, is_rule_dir
14
15 2
from .checks import is_cce_valid
16 2
from .yaml import open_and_expand, open_and_macro_expand
17 2
from .utils import required_key
18
19 2
from .xml import ElementTree as ET
20 2
from .shims import unicode_func
21
22
23 2
def add_sub_element(parent, tag, data):
24
    """
25
    Creates a new child element under parent with tag tag, and sets
26
    data as the content under the tag. In particular, data is a string
27
    to be parsed as an XML tree, allowing sub-elements of children to be
28
    added.
29
30
    If data should not be parsed as an XML tree, either escape the contents
31
    before passing into this function, or use ElementTree.SubElement().
32
33
    Returns the newly created subelement of type tag.
34
    """
35
    # This is used because our YAML data contain XML and XHTML elements
36
    # ET.SubElement() escapes the < > characters by &lt; and &gt;
37
    # and therefore it does not add child elements
38
    # we need to do a hack instead
39
    # TODO: Remove this function after we move to Markdown everywhere in SSG
40
    ustr = unicode_func("<{0}>{1}</{0}>").format(tag, data)
41
42
    try:
43
        element = ET.fromstring(ustr.encode("utf-8"))
44
    except Exception:
45
        msg = ("Error adding subelement to an element '{0}' from string: '{1}'"
46
               .format(parent.tag, ustr))
47
        raise RuntimeError(msg)
48
49
    parent.append(element)
50
    return element
51
52
53 2
def add_warning_elements(element, warnings):
54
    # The use of [{dict}, {dict}] in warnings is to handle the following
55
    # scenario where multiple warnings have the same category which is
56
    # valid in SCAP and our content:
57
    #
58
    # warnings:
59
    #     - general: Some general warning
60
    #     - general: Some other general warning
61
    #     - general: |-
62
    #         Some really long multiline general warning
63
    #
64
    # Each of the {dict} should have only one key/value pair.
65
    for warning_dict in warnings:
66
        warning = add_sub_element(element, "warning", list(warning_dict.values())[0])
67
        warning.set("category", list(warning_dict.keys())[0])
68
69
70 2
class Profile(object):
71
    """Represents XCCDF profile
72
    """
73
74 2
    def __init__(self, id_):
75
        self.id_ = id_
76
        self.title = ""
77
        self.description = ""
78
        self.extends = None
79
        self.selections = []
80
81 2
    @staticmethod
82 2
    def from_yaml(yaml_file, env_yaml=None):
83
        yaml_contents = open_and_expand(yaml_file, env_yaml)
84
        if yaml_contents is None:
85
            return None
86
87
        basename, _ = os.path.splitext(os.path.basename(yaml_file))
88
89
        profile = Profile(basename)
90
        profile.title = required_key(yaml_contents, "title")
91
        del yaml_contents["title"]
92
        profile.description = required_key(yaml_contents, "description")
93
        del yaml_contents["description"]
94
        profile.extends = yaml_contents.pop("extends", None)
95
        profile.selections = required_key(yaml_contents, "selections")
96
        del yaml_contents["selections"]
97
98
        if yaml_contents:
99
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
100
                               % (yaml_file, yaml_contents))
101
102
        return profile
103
104 2
    def to_xml_element(self):
105
        element = ET.Element('Profile')
106
        element.set("id", self.id_)
107
        if self.extends:
108
            element.set("extends", self.extends)
109
        title = add_sub_element(element, "title", self.title)
110
        title.set("override", "true")
111
        desc = add_sub_element(element, "description", self.description)
112
        desc.set("override", "true")
113
114
        for selection in self.selections:
115
            if selection.startswith("!"):
116
                unselect = ET.Element("select")
117
                unselect.set("idref", selection[1:])
118
                unselect.set("selected", "false")
119
                element.append(unselect)
120
            elif "=" in selection:
121
                refine_value = ET.Element("refine-value")
122
                value_id, selector = selection.split("=", 1)
123
                refine_value.set("idref", value_id)
124
                refine_value.set("selector", selector)
125
                element.append(refine_value)
126
            else:
127
                select = ET.Element("select")
128
                select.set("idref", selection)
129
                select.set("selected", "true")
130
                element.append(select)
131
132
        return element
133
134
135 2
class Value(object):
136
    """Represents XCCDF Value
137
    """
138
139 2
    def __init__(self, id_):
140
        self.id_ = id_
141
        self.title = ""
142
        self.description = ""
143
        self.type_ = "string"
144
        self.operator = "equals"
145
        self.interactive = False
146
        self.options = {}
147
        self.warnings = []
148
149 2
    @staticmethod
150 2
    def from_yaml(yaml_file, env_yaml=None):
151
        yaml_contents = open_and_expand(yaml_file, env_yaml)
152
        if yaml_contents is None:
153
            return None
154
155
        value_id, _ = os.path.splitext(os.path.basename(yaml_file))
156
        value = Value(value_id)
157
        value.title = required_key(yaml_contents, "title")
158
        del yaml_contents["title"]
159
        value.description = required_key(yaml_contents, "description")
160
        del yaml_contents["description"]
161
        value.type_ = required_key(yaml_contents, "type")
162
        del yaml_contents["type"]
163
        value.operator = yaml_contents.pop("operator", "equals")
164
        possible_operators = ["equals", "not equal", "greater than",
165
                              "less than", "greater than or equal",
166
                              "less than or equal", "pattern match"]
167
168
        if value.operator not in possible_operators:
169
            raise ValueError(
170
                "Found an invalid operator value '%s' in '%s'. "
171
                "Expected one of: %s"
172
                % (value.operator, yaml_file, ", ".join(possible_operators))
173
            )
174
175
        value.interactive = \
176
            yaml_contents.pop("interactive", "false").lower() == "true"
177
178
        value.options = required_key(yaml_contents, "options")
179
        del yaml_contents["options"]
180
        value.warnings = yaml_contents.pop("warnings", [])
181
182
        for warning_list in value.warnings:
183
            if len(warning_list) != 1:
184
                raise ValueError("Only one key/value pair should exist for each dictionary")
185
186
        if yaml_contents:
187
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
188
                               % (yaml_file, yaml_contents))
189
190
        return value
191
192 2
    def to_xml_element(self):
193
        value = ET.Element('Value')
194
        value.set('id', self.id_)
195
        value.set('type', self.type_)
196
        if self.operator != "equals":  # equals is the default
197
            value.set('operator', self.operator)
198
        if self.interactive:  # False is the default
199
            value.set('interactive', 'true')
200
        title = ET.SubElement(value, 'title')
201
        title.text = self.title
202
        add_sub_element(value, 'description', self.description)
203
        add_warning_elements(value, self.warnings)
204
205
        for selector, option in self.options.items():
206
            # do not confuse Value with big V with value with small v
207
            # value is child element of Value
208
            value_small = ET.SubElement(value, 'value')
209
            # by XCCDF spec, default value is value without selector
210
            if selector != "default":
211
                value_small.set('selector', str(selector))
212
            value_small.text = str(option)
213
214
        return value
215
216 2
    def to_file(self, file_name):
217
        root = self.to_xml_element()
218
        tree = ET.ElementTree(root)
219
        tree.write(file_name)
220
221
222 2
class Benchmark(object):
223
    """Represents XCCDF Benchmark
224
    """
225 2
    def __init__(self, id_):
226
        self.id_ = id_
227
        self.title = ""
228
        self.status = ""
229
        self.description = ""
230
        self.notice_id = ""
231
        self.notice_description = ""
232
        self.front_matter = ""
233
        self.rear_matter = ""
234
        self.cpes = []
235
        self.version = "0.1"
236
        self.profiles = []
237
        self.values = {}
238
        self.bash_remediation_fns_group = None
239
        self.groups = {}
240
        self.rules = {}
241
242
        # This is required for OCIL clauses
243
        conditional_clause = Value("conditional_clause")
244
        conditional_clause.title = "A conditional clause for check statements."
245
        conditional_clause.description = conditional_clause.title
246
        conditional_clause.type_ = "string"
247
        conditional_clause.options = {"": "This is a placeholder"}
248
249
        self.add_value(conditional_clause)
250
251 2
    @staticmethod
252 2
    def from_yaml(yaml_file, id_, product_yaml=None):
253
        yaml_contents = open_and_macro_expand(yaml_file, product_yaml)
254
        if yaml_contents is None:
255
            return None
256
257
        benchmark = Benchmark(id_)
258
        benchmark.title = required_key(yaml_contents, "title")
259
        del yaml_contents["title"]
260
        benchmark.status = required_key(yaml_contents, "status")
261
        del yaml_contents["status"]
262
        benchmark.description = required_key(yaml_contents, "description")
263
        del yaml_contents["description"]
264
        notice_contents = required_key(yaml_contents, "notice")
265
        benchmark.notice_id = required_key(notice_contents, "id")
266
        del notice_contents["id"]
267
        benchmark.notice_description = required_key(notice_contents,
268
                                                    "description")
269
        del notice_contents["description"]
270
        if not notice_contents:
271
            del yaml_contents["notice"]
272
273
        benchmark.front_matter = required_key(yaml_contents,
274
                                              "front-matter")
275
        del yaml_contents["front-matter"]
276
        benchmark.rear_matter = required_key(yaml_contents,
277
                                             "rear-matter")
278
        del yaml_contents["rear-matter"]
279
        benchmark.version = str(required_key(yaml_contents, "version"))
280
        del yaml_contents["version"]
281
282
        if yaml_contents:
283
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
284
                               % (yaml_file, yaml_contents))
285
286
        if product_yaml:
287
            benchmark.cpes = PRODUCT_TO_CPE_MAPPING[product_yaml["product"]]
288
289
        return benchmark
290
291 2
    def add_profiles_from_dir(self, action, dir_, env_yaml):
292
        for dir_item in os.listdir(dir_):
293
            dir_item_path = os.path.join(dir_, dir_item)
294
            if not os.path.isfile(dir_item_path):
295
                continue
296
297
            _, ext = os.path.splitext(os.path.basename(dir_item_path))
298
            if ext != '.profile':
299
                sys.stderr.write(
300
                    "Encountered file '%s' while looking for profiles, "
301
                    "extension '%s' is unknown. Skipping..\n"
302
                    % (dir_item, ext)
303
                )
304
                continue
305
306
            self.profiles.append(Profile.from_yaml(dir_item_path, env_yaml))
307
            if action == "list-inputs":
308
                print(dir_item_path)
309
310 2
    def add_bash_remediation_fns_from_file(self, action, file_):
311
        if action == "list-inputs":
312
            print(file_)
313
        else:
314
            tree = ET.parse(file_)
315
            self.bash_remediation_fns_group = tree.getroot()
316
317 2
    def to_xml_element(self):
318
        root = ET.Element('Benchmark')
319
        root.set('xmlns:xsi', 'http://www.w3.org/2001/XMLSchema-instance')
320
        root.set('xmlns:xhtml', 'http://www.w3.org/1999/xhtml')
321
        root.set('xmlns:dc', 'http://purl.org/dc/elements/1.1/')
322
        root.set('id', 'product-name')
323
        root.set('xsi:schemaLocation',
324
                 'http://checklists.nist.gov/xccdf/1.1 xccdf-1.1.4.xsd')
325
        root.set('style', 'SCAP_1.1')
326
        root.set('resolved', 'false')
327
        root.set('xml:lang', 'en-US')
328
        status = ET.SubElement(root, 'status')
329
        status.set('date', datetime.date.today().strftime("%Y-%m-%d"))
330
        status.text = self.status
331
        add_sub_element(root, "title", self.title)
332
        add_sub_element(root, "description", self.description)
333
        notice = add_sub_element(root, "notice", self.notice_description)
334
        notice.set('id', self.notice_id)
335
        add_sub_element(root, "front-matter", self.front_matter)
336
        add_sub_element(root, "rear-matter", self.rear_matter)
337
338
        for idref in self.cpes:
339
            plat = ET.SubElement(root, "platform")
340
            plat.set("idref", idref)
341
342
        version = ET.SubElement(root, 'version')
343
        version.text = self.version
344
        ET.SubElement(root, "metadata")
345
346
        for profile in self.profiles:
347
            if profile is not None:
348
                root.append(profile.to_xml_element())
349
350
        for value in self.values.values():
351
            root.append(value.to_xml_element())
352
        if self.bash_remediation_fns_group is not None:
353
            root.append(self.bash_remediation_fns_group)
354
        for group in self.groups.values():
355
            root.append(group.to_xml_element())
356
        for rule in self.rules.values():
357
            root.append(rule.to_xml_element())
358
359
        return root
360
361 2
    def to_file(self, file_name):
362
        root = self.to_xml_element()
363
        tree = ET.ElementTree(root)
364
        tree.write(file_name)
365
366 2
    def add_value(self, value):
367
        if value is None:
368
            return
369
        self.values[value.id_] = value
370
371 2
    def add_group(self, group):
372
        if group is None:
373
            return
374
        self.groups[group.id_] = group
375
376 2
    def add_rule(self, rule):
377
        if rule is None:
378
            return
379
        self.rules[rule.id_] = rule
380
381 2
    def to_xccdf(self):
382
        """We can easily extend this script to generate a valid XCCDF instead
383
        of SSG SHORTHAND.
384
        """
385
        raise NotImplementedError
386
387 2
    def __str__(self):
388
        return self.id_
389
390
391 2
class Group(object):
392
    """Represents XCCDF Group
393
    """
394 2
    ATTRIBUTES_TO_PASS_ON = (
395
        "platform",
396
    )
397
398 2
    def __init__(self, id_):
399
        self.id_ = id_
400
        self.prodtype = "all"
401
        self.title = ""
402
        self.description = ""
403
        self.warnings = []
404
        self.values = {}
405
        self.groups = {}
406
        self.rules = {}
407
        self.platform = None
408
409 2
    @staticmethod
410 2
    def from_yaml(yaml_file, env_yaml=None):
411
        yaml_contents = open_and_macro_expand(yaml_file, env_yaml)
412
        if yaml_contents is None:
413
            return None
414
415
        group_id = os.path.basename(os.path.dirname(yaml_file))
416
        group = Group(group_id)
417
        group.prodtype = yaml_contents.pop("prodtype", "all")
418
        group.title = required_key(yaml_contents, "title")
419
        del yaml_contents["title"]
420
        group.description = required_key(yaml_contents, "description")
421
        del yaml_contents["description"]
422
        group.warnings = yaml_contents.pop("warnings", [])
423
        group.platform = yaml_contents.pop("platform", None)
424
425
        for warning_list in group.warnings:
426
            if len(warning_list) != 1:
427
                raise ValueError("Only one key/value pair should exist for each dictionary")
428
429
        if yaml_contents:
430
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
431
                               % (yaml_file, yaml_contents))
432
433
        return group
434
435 2
    def to_xml_element(self):
436
        group = ET.Element('Group')
437
        group.set('id', self.id_)
438
        if self.prodtype != "all":
439
            group.set("prodtype", self.prodtype)
440
        title = ET.SubElement(group, 'title')
441
        title.text = self.title
442
        add_sub_element(group, 'description', self.description)
443
        add_warning_elements(group, self.warnings)
444
445
        if self.platform:
446
            platform_el = ET.SubElement(group, "platform")
447
            try:
448
                platform_cpe = XCCDF_PLATFORM_TO_CPE[self.platform]
449
            except KeyError:
450
                raise ValueError("Unsupported platform '%s' in rule '%s'." % (self.platform, self.id_))
451
            platform_el.set("idref", platform_cpe)
452
453
        for _value in self.values.values():
454
            group.append(_value.to_xml_element())
455
        for _group in self.groups.values():
456
            group.append(_group.to_xml_element())
457
        for _rule in self.rules.values():
458
            group.append(_rule.to_xml_element())
459
460
        return group
461
462 2
    def to_file(self, file_name):
463
        root = self.to_xml_element()
464
        tree = ET.ElementTree(root)
465
        tree.write(file_name)
466
467 2
    def add_value(self, value):
468
        if value is None:
469
            return
470
        self.values[value.id_] = value
471
472 2
    def add_group(self, group):
473
        if group is None:
474
            return
475
        if self.platform and not group.platform:
476
            group.platform = self.platform
477
        self.groups[group.id_] = group
478
        self._pass_our_properties_on_to(group)
479
480 2
    def _pass_our_properties_on_to(self, obj):
481
        for attr in self.ATTRIBUTES_TO_PASS_ON:
482
            if hasattr(obj, attr) and getattr(obj, attr) is None:
483
                setattr(obj, attr, getattr(self, attr))
484
485 2
    def add_rule(self, rule):
486
        if rule is None:
487
            return
488
        if self.platform and not rule.platform:
489
            rule.platform = self.platform
490
        self.rules[rule.id_] = rule
491
        self._pass_our_properties_on_to(rule)
492
493 2
    def __str__(self):
494
        return self.id_
495
496
497 2
class Rule(object):
498
    """Represents XCCDF Rule
499
    """
500 2
    YAML_KEYS_DEFAULTS = {
501
        "prodtype": lambda: "all",
502
        "title": lambda: RuntimeError("Missing key 'title'"),
503
        "description": lambda: RuntimeError("Missing key 'description'"),
504
        "rationale": lambda: RuntimeError("Missing key 'rationale'"),
505
        "severity": lambda: RuntimeError("Missing key 'severity'"),
506
        "references": lambda: dict(),
507
        "identifiers": lambda: dict(),
508
        "ocil_clause": lambda: None,
509
        "ocil": lambda: None,
510
        "oval_external_content": lambda: None,
511
        "warnings": lambda: list(),
512
        "platform": lambda: None,
513
    }
514
515 2
    def __init__(self, id_):
516 2
        self.id_ = id_
517 2
        self.prodtype = "all"
518 2
        self.title = ""
519 2
        self.description = ""
520 2
        self.rationale = ""
521 2
        self.severity = "unknown"
522 2
        self.references = {}
523 2
        self.identifiers = {}
524 2
        self.ocil_clause = None
525 2
        self.ocil = None
526 2
        self.oval_external_content = None
527 2
        self.warnings = []
528 2
        self.platform = None
529
530 2
    @staticmethod
531 2
    def from_yaml(yaml_file, env_yaml=None):
532 2
        yaml_contents = open_and_macro_expand(yaml_file, env_yaml)
533 2
        if yaml_contents is None:
534
            return None
535
536 2
        rule_id, ext = os.path.splitext(os.path.basename(yaml_file))
537 2
        if rule_id == "rule" and ext == ".yml":
538 2
            rule_id = get_rule_dir_id(yaml_file)
539
540 2
        rule = Rule(rule_id)
541
542 2
        try:
543 2
            rule._set_attributes_from_dict(yaml_contents)
544
        except RuntimeError as exc:
545
            msg = ("Error processing '{fname}': {err}"
546
                   .format(fname=yaml_file, err=str(exc)))
547
            raise RuntimeError(msg)
548
549 2
        for warning_list in rule.warnings:
550
            if len(warning_list) != 1:
551
                raise ValueError("Only one key/value pair should exist for each dictionary")
552
553 2
        if yaml_contents:
554
            raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
555
                               % (yaml_file, yaml_contents))
556
557 2
        rule.validate_identifiers(yaml_file)
558 2
        rule.validate_references(yaml_file)
559 2
        return rule
560
561 2
    def _set_attributes_from_dict(self, yaml_contents):
562 2
        for key, default_getter in self.YAML_KEYS_DEFAULTS.items():
563 2
            if key not in yaml_contents:
564 2
                value = default_getter()
565 2
                if isinstance(value, Exception):
566
                    raise value
567
            else:
568 2
                value = yaml_contents.pop(key)
569
570 2
            setattr(self, key, value)
571
572 2
    def to_contents_dict(self):
573
        """
574
        Returns a dictionary that is the same schema as the dict obtained when loading rule YAML.
575
        """
576
577 2
        yaml_contents = dict()
578 2
        for key in Rule.YAML_KEYS_DEFAULTS:
579 2
            yaml_contents[key] = getattr(self, key)
580
581 2
        return yaml_contents
582
583 2
    def validate_identifiers(self, yaml_file):
584 2
        if self.identifiers is None:
585
            raise ValueError("Empty identifier section in file %s" % yaml_file)
586
587
        # Validate all identifiers are non-empty:
588 2
        for ident_type, ident_val in self.identifiers.items():
589 2
            if not isinstance(ident_type, str) or not isinstance(ident_val, str):
590
                raise ValueError("Identifiers and values must be strings: %s in file %s"
591
                                 % (ident_type, yaml_file))
592 2
            if ident_val.strip() == "":
593
                raise ValueError("Identifiers must not be empty: %s in file %s"
594
                                 % (ident_type, yaml_file))
595 2
            if ident_type[0:3] == 'cce':
596 2
                if not is_cce_valid("CCE-" + ident_val):
597
                    raise ValueError("CCE Identifiers must be valid: value %s for cce %s"
598
                                     " in file %s" % (ident_val, ident_type, yaml_file))
599
600 2
    def validate_references(self, yaml_file):
601 2
        if self.references is None:
602
            raise ValueError("Empty references section in file %s" % yaml_file)
603
604 2
        for ref_type, ref_val in self.references.items():
605 2
            if not isinstance(ref_type, str) or not isinstance(ref_val, str):
606
                raise ValueError("References and values must be strings: %s in file %s"
607
                                 % (ref_type, yaml_file))
608 2
            if ref_val.strip() == "":
609
                raise ValueError("References must not be empty: %s in file %s"
610
                                 % (ref_type, yaml_file))
611
612 2
    def to_xml_element(self):
613
        rule = ET.Element('Rule')
614
        rule.set('id', self.id_)
615
        if self.prodtype != "all":
616
            rule.set("prodtype", self.prodtype)
617
        rule.set('severity', self.severity)
618
        add_sub_element(rule, 'title', self.title)
619
        add_sub_element(rule, 'description', self.description)
620
        add_sub_element(rule, 'rationale', self.rationale)
621
622
        main_ident = ET.Element('ident')
623
        for ident_type, ident_val in self.identifiers.items():
624
            if '@' in ident_type:
625
                # the ident is applicable only on some product
626
                # format : 'policy@product', eg. 'stigid@product'
627
                # for them, we create a separate <ref> element
628
                policy, product = ident_type.split('@')
629
                ident = ET.SubElement(rule, 'ident')
630
                ident.set(policy, ident_val)
631
                ident.set('prodtype', product)
632
            else:
633
                main_ident.set(ident_type, ident_val)
634
635
        if main_ident.attrib:
636
            rule.append(main_ident)
637
638
        main_ref = ET.Element('ref')
639
        for ref_type, ref_val in self.references.items():
640
            if '@' in ref_type:
641
                # the reference is applicable only on some product
642
                # format : 'policy@product', eg. 'stigid@product'
643
                # for them, we create a separate <ref> element
644
                policy, product = ref_type.split('@')
645
                ref = ET.SubElement(rule, 'ref')
646
                ref.set(policy, ref_val)
647
                ref.set('prodtype', product)
648
            else:
649
                main_ref.set(ref_type, ref_val)
650
651
        if main_ref.attrib:
652
            rule.append(main_ref)
653
654
        if self.oval_external_content:
655
            check = ET.SubElement(rule, 'check')
656
            check.set("system", "http://oval.mitre.org/XMLSchema/oval-definitions-5")
657
            external_content = ET.SubElement(check, "check-content-ref")
658
            external_content.set("href", self.oval_external_content)
659
        else:
660
            # TODO: This is pretty much a hack, oval ID will be the same as rule ID
661
            #       and we don't want the developers to have to keep them in sync.
662
            #       Therefore let's just add an OVAL ref of that ID.
663
            oval_ref = ET.SubElement(rule, "oval")
664
            oval_ref.set("id", self.id_)
665
666
        if self.ocil or self.ocil_clause:
667
            ocil = add_sub_element(rule, 'ocil', self.ocil if self.ocil else "")
668
            if self.ocil_clause:
669
                ocil.set("clause", self.ocil_clause)
670
671
        add_warning_elements(rule, self.warnings)
672
673
        if self.platform:
674
            platform_el = ET.SubElement(rule, "platform")
675
            try:
676
                platform_cpe = XCCDF_PLATFORM_TO_CPE[self.platform]
677
            except KeyError:
678
                raise ValueError("Unsupported platform '%s' in rule '%s'." % (self.platform, self.id_))
679
            platform_el.set("idref", platform_cpe)
680
681
        return rule
682
683 2
    def to_file(self, file_name):
684
        root = self.to_xml_element()
685
        tree = ET.ElementTree(root)
686
        tree.write(file_name)
687
688
689 2
class DirectoryLoader(object):
690 2
    def __init__(self, profiles_dir, bash_remediation_fns, env_yaml):
691
        self.benchmark_file = None
692
        self.group_file = None
693
        self.loaded_group = None
694
        self.rules = []
695
        self.values = []
696
        self.subdirectories = []
697
698
        self.profiles_dir = profiles_dir
699
        self.bash_remediation_fns = bash_remediation_fns
700
        self.env_yaml = env_yaml
701
702
        self.parent_group = None
703
704 2
    def _collect_items_to_load(self, guide_directory):
705
        for dir_item in os.listdir(guide_directory):
706
            dir_item_path = os.path.join(guide_directory, dir_item)
707
            _, extension = os.path.splitext(dir_item)
708
709
            if extension == '.var':
710
                self.values.append(dir_item_path)
711
            elif dir_item == "benchmark.yml":
712
                if self.benchmark_file:
713
                    raise ValueError("Multiple benchmarks in one directory")
714
                self.benchmark_file = dir_item_path
715
            elif dir_item == "group.yml":
716
                if self.group_file:
717
                    raise ValueError("Multiple groups in one directory")
718
                self.group_file = dir_item_path
719
            elif extension == '.rule':
720
                self.rules.append(dir_item_path)
721
            elif is_rule_dir(dir_item_path):
722
                self.rules.append(get_rule_dir_yaml(dir_item_path))
723
            elif dir_item != "tests":
724
                if os.path.isdir(dir_item_path):
725
                    self.subdirectories.append(dir_item_path)
726
                else:
727
                    sys.stderr.write(
728
                        "Encountered file '%s' while recursing, extension '%s' "
729
                        "is unknown. Skipping..\n"
730
                        % (dir_item, extension)
731
                    )
732
733 2
    def load_benchmark_or_group(self, guide_directory):
734
        """
735
        Loads a given benchmark or group from the specified benchmark_file or
736
        group_file, in the context of guide_directory, action, profiles_dir,
737
        env_yaml, and bash_remediation_fns.
738
739
        Returns the loaded group or benchmark.
740
        """
741
        group = None
742
        if self.group_file and self.benchmark_file:
743
            raise ValueError("A .benchmark file and a .group file were found in "
744
                             "the same directory '%s'" % (guide_directory))
745
746
        # we treat benchmark as a special form of group in the following code
747
        if self.benchmark_file:
748
            group = Benchmark.from_yaml(
749
                self.benchmark_file, 'product-name', self.env_yaml
750
            )
751
            if self.profiles_dir:
752
                group.add_profiles_from_dir(self.action, self.profiles_dir, self.env_yaml)
753
            group.add_bash_remediation_fns_from_file(self.action, self.bash_remediation_fns)
754
755
        if self.group_file:
756
            group = Group.from_yaml(self.group_file, self.env_yaml)
757
758
        return group
759
760 2
    def _load_group_process_and_recurse(self, guide_directory):
761
        self.loaded_group = self.load_benchmark_or_group(guide_directory)
762
763
        if self.loaded_group:
764
            if self.parent_group:
765
                self.parent_group.add_group(self.loaded_group)
766
767
            self._process_values()
768
            self._recurse_into_subdirs()
769
            self._process_rules()
770
771 2
    def process_directory_tree(self, start_dir):
772
        self._collect_items_to_load(start_dir)
773
        self._load_group_process_and_recurse(start_dir)
774
775 2
    def _recurse_into_subdirs(self):
776
        for subdir in self.subdirectories:
777
            loader = self._get_new_loader()
778
            loader.parent_group = self.loaded_group
779
            loader.process_directory_tree(subdir)
780
781 2
    def _get_new_loader(self):
782
        raise NotImplementedError()
783
784 2
    def _process_values(self):
785
        raise NotImplementedError()
786
787 2
    def _process_rules(self):
788
        raise NotImplementedError()
789
790
791 2
class BuildLoader(DirectoryLoader):
792 2
    def __init__(self, profiles_dir, bash_remediation_fns, env_yaml, resolved_rules_dir=None):
793
        super(BuildLoader, self).__init__(profiles_dir, bash_remediation_fns, env_yaml)
794
795
        self.action = "build"
796
797
        self.resolved_rules_dir = resolved_rules_dir
798
        if resolved_rules_dir and not os.path.isdir(resolved_rules_dir):
799
            os.mkdir(resolved_rules_dir)
800
801 2
    def _process_values(self):
802
        for value_yaml in self.values:
803
            value = Value.from_yaml(value_yaml, self.env_yaml)
804
            self.loaded_group.add_value(value)
805
806 2
    def _process_rules(self):
807
        for rule_yaml in self.rules:
808
            rule = Rule.from_yaml(rule_yaml, self.env_yaml)
809
            self.loaded_group.add_rule(rule)
810
            if self.resolved_rules_dir:
811
                output_for_rule = os.path.join(
812
                    self.resolved_rules_dir, "{id_}.yml".format(id_=rule.id_))
813
                with open(output_for_rule, "w") as f:
814
                    yaml.dump(rule.to_contents_dict(), f)
815
816 2
    def _get_new_loader(self):
817
        return BuildLoader(
818
            self.profiles_dir, self.bash_remediation_fns, self.env_yaml, self.resolved_rules_dir)
819
820 2
    def export_group_to_file(self, filename):
821
        return self.loaded_group.to_file(filename)
822
823
824 2
class ListInputsLoader(DirectoryLoader):
825 2
    def __init__(self, profiles_dir, bash_remediation_fns, env_yaml):
826
        super(ListInputsLoader, self).__init__(profiles_dir, bash_remediation_fns, env_yaml)
827
828
        self.action = "list-inputs"
829
830 2
    def _process_values(self):
831
        for value_yaml in self.values:
832
            print(value_yaml)
833
834 2
    def _process_rules(self):
835
        for rule_yaml in self.rules:
836
            print(rule_yaml)
837
838 2
    def _get_new_loader(self):
839
        return ListInputsLoader(
840
            self.profiles_dir, self.bash_remediation_fns, self.env_yaml)
841
842 2
    def load_benchmark_or_group(self, guide_directory):
843
        result = super(ListInputsLoader, self).load_benchmark_or_group(guide_directory)
844
845
        if self.benchmark_file:
846
            print(self.benchmark_file)
847
848
        if self.group_file:
849
            print(self.group_file)
850
851
        return result
852