|
1
|
|
|
#!/usr/bin/python3 |
|
2
|
|
|
|
|
3
|
|
|
import os |
|
4
|
|
|
import argparse |
|
5
|
|
|
import xml.etree.ElementTree as ET |
|
6
|
|
|
import sys |
|
7
|
|
|
import ssg.constants |
|
8
|
|
|
import ssg.yaml |
|
9
|
|
|
import io |
|
10
|
|
|
|
|
11
|
|
|
machine_cpe = "cpe:/a:machine" |
|
12
|
|
|
|
|
13
|
|
|
|
|
14
|
|
|
def main(): |
|
15
|
|
|
args = parse_command_line_args() |
|
16
|
|
|
for product in ssg.constants.product_directories: |
|
17
|
|
|
product_dir = os.path.join(args.source_dir, product) |
|
18
|
|
|
product_yaml_path = os.path.join(product_dir, "product.yml") |
|
19
|
|
|
product_yaml = ssg.yaml.open_raw(product_yaml_path) |
|
20
|
|
|
guide_dir = os.path.abspath( |
|
21
|
|
|
os.path.join(product_dir, product_yaml['benchmark_root'])) |
|
22
|
|
|
additional_content_directories = product_yaml.get("additional_content_directories", []) |
|
23
|
|
|
add_content_dirs = [os.path.abspath(os.path.join(product_dir, rd)) for rd in additional_content_directories] |
|
24
|
|
|
if not check_product(args.build_dir, product, [guide_dir] + add_content_dirs): |
|
25
|
|
|
sys.exit(1) |
|
26
|
|
|
|
|
27
|
|
|
|
|
28
|
|
|
def check_product(build_dir, product, rules_dirs): |
|
29
|
|
|
input_groups, input_rules = scan_rules_groups(rules_dirs, False) |
|
30
|
|
|
ds_path = os.path.join(build_dir, "ssg-" + product + "-ds.xml") |
|
31
|
|
|
if not check_ds(ds_path, "groups", input_groups): |
|
32
|
|
|
return False |
|
33
|
|
|
return True |
|
34
|
|
|
|
|
35
|
|
|
|
|
36
|
|
|
def check_ds(ds_path, what, input_elems): |
|
37
|
|
|
try: |
|
38
|
|
|
tree = ET.parse(ds_path) |
|
39
|
|
|
except IOError as e: |
|
40
|
|
|
sys.stderr.write("The product datastream '%s' hasn't been build, " |
|
41
|
|
|
"skipping the test." % (ds_path)) |
|
42
|
|
|
return True |
|
43
|
|
|
root = tree.getroot() |
|
44
|
|
|
if what == "groups": |
|
45
|
|
|
replacement = "xccdf_org.ssgproject.content_group_" |
|
46
|
|
|
xpath_query = ".//{%s}Group" % ssg.constants.XCCDF12_NS |
|
47
|
|
|
if what == "rules": |
|
48
|
|
|
replacement = "xccdf_org.ssgproject.content_rule_" |
|
49
|
|
|
xpath_query = ".//{%s}Rule" % ssg.constants.XCCDF12_NS |
|
50
|
|
|
benchmark = root.find(".//{%s}Benchmark" % ssg.constants.XCCDF12_NS) |
|
51
|
|
|
for elem in benchmark.findall(xpath_query): |
|
|
|
|
|
|
52
|
|
|
elem_id = elem.get("id") |
|
53
|
|
|
elem_short_id = elem_id.replace(replacement, "") |
|
|
|
|
|
|
54
|
|
|
if elem_short_id not in input_elems: |
|
55
|
|
|
continue |
|
56
|
|
|
platforms = elem.findall("{%s}platform" % ssg.constants.XCCDF12_NS) |
|
57
|
|
|
machine_platform = False |
|
58
|
|
|
for p in platforms: |
|
59
|
|
|
idref = p.get("idref") |
|
60
|
|
|
if idref == machine_cpe: |
|
61
|
|
|
machine_platform = True |
|
62
|
|
|
if not machine_platform: |
|
63
|
|
|
sys.stderr.write("%s %s in %s is missing <platform> element" % |
|
64
|
|
|
(what, elem_short_id, ds_path)) |
|
65
|
|
|
return False |
|
66
|
|
|
return True |
|
67
|
|
|
|
|
68
|
|
|
|
|
69
|
|
|
def parse_command_line_args(): |
|
70
|
|
|
parser = argparse.ArgumentParser( |
|
71
|
|
|
description="Tests if 'machine' CPEs are " |
|
72
|
|
|
"propagated to the built datastream") |
|
73
|
|
|
parser.add_argument("--source_dir", required=True, |
|
74
|
|
|
help="Content source directory path") |
|
75
|
|
|
parser.add_argument("--build_dir", required=True, |
|
76
|
|
|
help="Build directory containing built datastreams") |
|
77
|
|
|
args = parser.parse_args() |
|
78
|
|
|
return args |
|
79
|
|
|
|
|
80
|
|
|
|
|
81
|
|
|
def check_if_machine_only(dirpath, name, is_machine_only_group): |
|
82
|
|
|
if name in os.listdir(dirpath): |
|
83
|
|
|
if is_machine_only_group: |
|
84
|
|
|
return True |
|
85
|
|
|
yml_path = os.path.join(dirpath, name) |
|
86
|
|
|
with io.open(yml_path, "r", encoding="utf-8") as yml_file: |
|
87
|
|
|
yml_file_contents = yml_file.read() |
|
88
|
|
|
if "platform: machine" in yml_file_contents: |
|
89
|
|
|
return True |
|
90
|
|
|
return False |
|
91
|
|
|
|
|
92
|
|
|
|
|
93
|
|
|
def scan_rules_groups(dir_paths, parent_machine_only): |
|
94
|
|
|
groups = set() |
|
95
|
|
|
rules = set() |
|
96
|
|
|
for dir_path in dir_paths: |
|
97
|
|
|
groups, rules = scan_rules_group(dir_path, parent_machine_only, groups, rules) |
|
98
|
|
|
return groups, rules |
|
99
|
|
|
|
|
100
|
|
|
|
|
101
|
|
|
def scan_rules_group(dir_path, parent_machine_only, groups, rules): |
|
102
|
|
|
name = os.path.basename(dir_path) |
|
103
|
|
|
is_machine_only = False |
|
104
|
|
|
if check_if_machine_only(dir_path, "group.yml", parent_machine_only): |
|
105
|
|
|
groups.add(name) |
|
106
|
|
|
is_machine_only = True |
|
107
|
|
|
if check_if_machine_only(dir_path, "rule.yml", parent_machine_only): |
|
108
|
|
|
rules.add(name) |
|
109
|
|
|
for dir_item in os.listdir(dir_path): |
|
110
|
|
|
subdir_path = os.path.join(dir_path, dir_item) |
|
111
|
|
|
if os.path.isdir(subdir_path): |
|
112
|
|
|
subdir_groups, subdir_rules = scan_rules_group( |
|
113
|
|
|
subdir_path, is_machine_only, groups, rules) |
|
114
|
|
|
groups |= subdir_groups |
|
115
|
|
|
rules |= subdir_rules |
|
116
|
|
|
return groups, rules |
|
117
|
|
|
|
|
118
|
|
|
|
|
119
|
|
|
if __name__ == "__main__": |
|
120
|
|
|
main() |
|
121
|
|
|
|