Test Failed
Push — master ( 738e3d...d883b3 )
by Matěj
01:17 queued 13s
created

test_machine_only_rules.main()   A

Complexity

Conditions 3

Size

Total Lines 12
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 12
nop 0
dl 0
loc 12
rs 9.8
c 0
b 0
f 0
1
#!/usr/bin/python3
2
3
import os
4
import argparse
5
import xml.etree.ElementTree as ET
6
import sys
7
import ssg.constants
8
import ssg.yaml
9
import io
10
11
machine_cpe = "cpe:/a:machine"
12
13
14
def main():
15
    args = parse_command_line_args()
16
    for product in ssg.constants.product_directories:
17
        product_dir = os.path.join(args.source_dir, product)
18
        product_yaml_path = os.path.join(product_dir, "product.yml")
19
        product_yaml = ssg.yaml.open_raw(product_yaml_path)
20
        guide_dir = os.path.abspath(
21
            os.path.join(product_dir, product_yaml['benchmark_root']))
22
        additional_content_directories = product_yaml.get("additional_content_directories", [])
23
        add_content_dirs = [os.path.abspath(os.path.join(product_dir, rd)) for rd in additional_content_directories]
24
        if not check_product(args.build_dir, product, [guide_dir] + add_content_dirs):
25
            sys.exit(1)
26
27
28
def check_product(build_dir, product, rules_dirs):
29
    input_groups, input_rules = scan_rules_groups(rules_dirs, False)
30
    ds_path = os.path.join(build_dir, "ssg-" + product + "-ds.xml")
31
    if not check_ds(ds_path, "groups", input_groups):
32
        return False
33
    return True
34
35
36
def check_ds(ds_path, what, input_elems):
37
    try:
38
        tree = ET.parse(ds_path)
39
    except IOError as e:
40
        sys.stderr.write("The product datastream '%s' hasn't been build, "
41
                         "skipping the test." % (ds_path))
42
        return True
43
    root = tree.getroot()
44
    if what == "groups":
45
        replacement = "xccdf_org.ssgproject.content_group_"
46
        xpath_query = ".//{%s}Group" % ssg.constants.XCCDF12_NS
47
    if what == "rules":
48
        replacement = "xccdf_org.ssgproject.content_rule_"
49
        xpath_query = ".//{%s}Rule" % ssg.constants.XCCDF12_NS
50
    benchmark = root.find(".//{%s}Benchmark" % ssg.constants.XCCDF12_NS)
51
    for elem in benchmark.findall(xpath_query):
0 ignored issues
show
introduced by
The variable xpath_query does not seem to be defined in case what == "groups" on line 44 is False. Are you sure this can never be the case?
Loading history...
52
        elem_id = elem.get("id")
53
        elem_short_id = elem_id.replace(replacement, "")
0 ignored issues
show
introduced by
The variable replacement does not seem to be defined in case what == "groups" on line 44 is False. Are you sure this can never be the case?
Loading history...
54
        if elem_short_id not in input_elems:
55
            continue
56
        platforms = elem.findall("{%s}platform" % ssg.constants.XCCDF12_NS)
57
        machine_platform = False
58
        for p in platforms:
59
            idref = p.get("idref")
60
            if idref == machine_cpe:
61
                machine_platform = True
62
        if not machine_platform:
63
            sys.stderr.write("%s %s in %s is missing <platform> element" %
64
                             (what, elem_short_id, ds_path))
65
            return False
66
    return True
67
68
69
def parse_command_line_args():
70
    parser = argparse.ArgumentParser(
71
        description="Tests if 'machine' CPEs are "
72
                    "propagated to the built datastream")
73
    parser.add_argument("--source_dir", required=True,
74
                        help="Content source directory path")
75
    parser.add_argument("--build_dir", required=True,
76
                        help="Build directory containing built datastreams")
77
    args = parser.parse_args()
78
    return args
79
80
81
def check_if_machine_only(dirpath, name, is_machine_only_group):
82
    if name in os.listdir(dirpath):
83
        if is_machine_only_group:
84
            return True
85
        yml_path = os.path.join(dirpath, name)
86
        with io.open(yml_path, "r", encoding="utf-8") as yml_file:
87
            yml_file_contents = yml_file.read()
88
            if "platform: machine" in yml_file_contents:
89
                return True
90
    return False
91
92
93
def scan_rules_groups(dir_paths, parent_machine_only):
94
    groups = set()
95
    rules = set()
96
    for dir_path in dir_paths:
97
        groups, rules = scan_rules_group(dir_path, parent_machine_only, groups, rules)
98
    return groups, rules
99
100
101
def scan_rules_group(dir_path, parent_machine_only, groups, rules):
102
    name = os.path.basename(dir_path)
103
    is_machine_only = False
104
    if check_if_machine_only(dir_path, "group.yml", parent_machine_only):
105
        groups.add(name)
106
        is_machine_only = True
107
    if check_if_machine_only(dir_path, "rule.yml", parent_machine_only):
108
        rules.add(name)
109
    for dir_item in os.listdir(dir_path):
110
        subdir_path = os.path.join(dir_path, dir_item)
111
        if os.path.isdir(subdir_path):
112
            subdir_groups, subdir_rules = scan_rules_group(
113
                subdir_path, is_machine_only, groups, rules)
114
            groups |= subdir_groups
115
            rules |= subdir_rules
116
    return groups, rules
117
118
119
if __name__ == "__main__":
120
    main()
121