|
1
|
|
|
#!/usr/bin/env python2 |
|
2
|
|
|
import xml.etree.cElementTree as ET |
|
3
|
|
|
|
|
4
|
|
|
import logging |
|
5
|
|
|
import contextlib |
|
6
|
|
|
import re |
|
7
|
|
|
import subprocess |
|
8
|
|
|
|
|
9
|
|
|
from ssg.constants import OSCAP_RULE |
|
10
|
|
|
from ssg.constants import PREFIX_TO_NS |
|
11
|
|
|
from ssg.constants import bash_system as bash_rem_system |
|
12
|
|
|
from ssg.constants import ansible_system as ansible_rem_system |
|
13
|
|
|
from ssg.constants import puppet_system as puppet_rem_system |
|
14
|
|
|
from ssg.constants import anaconda_system as anaconda_rem_system |
|
15
|
|
|
from ssg.constants import ignition_system as ignition_rem_system |
|
16
|
|
|
|
|
17
|
|
|
SYSTEM_ATTRIBUTE = { |
|
18
|
|
|
'bash': bash_rem_system, |
|
19
|
|
|
'ansible': ansible_rem_system, |
|
20
|
|
|
'puppet': puppet_rem_system, |
|
21
|
|
|
'anaconda': anaconda_rem_system, |
|
22
|
|
|
'ignition': ignition_rem_system, |
|
23
|
|
|
} |
|
24
|
|
|
|
|
25
|
|
|
|
|
26
|
|
|
logging.getLogger(__name__).addHandler(logging.NullHandler()) |
|
27
|
|
|
|
|
28
|
|
|
|
|
29
|
|
|
def get_all_xccdf_ids_in_datastream(datastream): |
|
30
|
|
|
root = ET.parse(datastream).getroot() |
|
31
|
|
|
|
|
32
|
|
|
checklists_node = root.find(".//ds:checklists", PREFIX_TO_NS) |
|
33
|
|
|
if checklists_node is None: |
|
34
|
|
|
logging.error( |
|
35
|
|
|
"Checklists not found within DataStream") |
|
36
|
|
|
|
|
37
|
|
|
all_checklist_components = checklists_node.findall('ds:component-ref', |
|
38
|
|
|
PREFIX_TO_NS) |
|
39
|
|
|
xccdf_ids = [component.get("id") for component in all_checklist_components] |
|
40
|
|
|
return xccdf_ids |
|
41
|
|
|
|
|
42
|
|
|
|
|
43
|
|
|
def infer_benchmark_id_from_component_ref_id(datastream, ref_id): |
|
44
|
|
|
root = ET.parse(datastream).getroot() |
|
45
|
|
|
component_ref_node = root.find("*//ds:component-ref[@id='{0}']" |
|
46
|
|
|
.format(ref_id), PREFIX_TO_NS) |
|
47
|
|
|
if component_ref_node is None: |
|
48
|
|
|
msg = ( |
|
49
|
|
|
'Component reference of Ref-Id {} not found within datastream' |
|
50
|
|
|
.format(ref_id)) |
|
51
|
|
|
raise RuntimeError(msg) |
|
52
|
|
|
|
|
53
|
|
|
comp_id = component_ref_node.get('{%s}href' % PREFIX_TO_NS['xlink']) |
|
54
|
|
|
comp_id = comp_id.lstrip('#') |
|
55
|
|
|
|
|
56
|
|
|
query = ".//ds:component[@id='{}']/xccdf-1.2:Benchmark".format(comp_id) |
|
57
|
|
|
benchmark_node = root.find(query, PREFIX_TO_NS) |
|
58
|
|
|
if benchmark_node is None: |
|
59
|
|
|
msg = ( |
|
60
|
|
|
'Benchmark not found within component of Id {}' |
|
61
|
|
|
.format(comp_id) |
|
62
|
|
|
) |
|
63
|
|
|
raise RuntimeError(msg) |
|
64
|
|
|
|
|
65
|
|
|
return benchmark_node.get('id') |
|
66
|
|
|
|
|
67
|
|
|
|
|
68
|
|
|
@contextlib.contextmanager |
|
69
|
|
|
def datastream_root(ds_location, save_location=None): |
|
70
|
|
|
try: |
|
71
|
|
|
tree = ET.parse(ds_location) |
|
72
|
|
|
for prefix, uri in PREFIX_TO_NS.items(): |
|
73
|
|
|
ET.register_namespace(prefix, uri) |
|
74
|
|
|
root = tree.getroot() |
|
75
|
|
|
yield root |
|
76
|
|
|
finally: |
|
77
|
|
|
if save_location: |
|
78
|
|
|
tree.write(save_location) |
|
79
|
|
|
|
|
80
|
|
|
|
|
81
|
|
|
def remove_machine_platform(root): |
|
82
|
|
|
remove_machine_only_from_element(root, "xccdf-1.2:Rule") |
|
83
|
|
|
remove_machine_only_from_element(root, "xccdf-1.2:Group") |
|
84
|
|
|
|
|
85
|
|
|
|
|
86
|
|
|
def remove_machine_only_from_element(root, element_spec): |
|
87
|
|
|
query = ".//ds:component/xccdf-1.2:Benchmark//{0}".format(element_spec) |
|
88
|
|
|
elements = root.findall(query, PREFIX_TO_NS) |
|
89
|
|
|
for el in elements: |
|
90
|
|
|
platforms = el.findall("./xccdf-1.2:platform", PREFIX_TO_NS) |
|
91
|
|
|
for p in platforms: |
|
92
|
|
|
if p.get("idref") == "cpe:/a:machine": |
|
93
|
|
|
el.remove(p) |
|
94
|
|
|
|
|
95
|
|
|
|
|
96
|
|
|
def get_oscap_supported_cpes(): |
|
97
|
|
|
""" |
|
98
|
|
|
Obtain a list of CPEs that the scanner supports |
|
99
|
|
|
""" |
|
100
|
|
|
result = [] |
|
101
|
|
|
proc = subprocess.Popen( |
|
102
|
|
|
("oscap", "--version"), text=True, |
|
103
|
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
104
|
|
|
try: |
|
105
|
|
|
outs, errs = proc.communicate(timeout=3) |
|
106
|
|
|
except subprocess.TimeoutExpired: |
|
107
|
|
|
logging.warn("Scanner timeouted when asked about supported CPEs") |
|
108
|
|
|
proc.kill() |
|
109
|
|
|
return [] |
|
110
|
|
|
|
|
111
|
|
|
if proc.returncode != 0: |
|
112
|
|
|
first_error_line = errs.split("\n")[0] |
|
113
|
|
|
logging.warn("Error getting CPEs from the scanner: {msg}".format(msg=first_error_line)) |
|
114
|
|
|
|
|
115
|
|
|
cpe_regex = re.compile(r'\bcpe:\S+$') |
|
116
|
|
|
for line in outs.split("\n"): |
|
117
|
|
|
match = cpe_regex.search(line) |
|
118
|
|
|
if match: |
|
119
|
|
|
result.append(match.group(0)) |
|
120
|
|
|
return result |
|
121
|
|
|
|
|
122
|
|
|
|
|
123
|
|
|
def add_platform_to_benchmark(root, cpe_regex): |
|
124
|
|
|
benchmark_query = ".//ds:component/xccdf-1.2:Benchmark" |
|
125
|
|
|
benchmarks = root.findall(benchmark_query, PREFIX_TO_NS) |
|
126
|
|
|
if not benchmarks: |
|
127
|
|
|
msg = ( |
|
128
|
|
|
"No benchmarks found in the datastream" |
|
129
|
|
|
) |
|
130
|
|
|
raise RuntimeError(msg) |
|
131
|
|
|
|
|
132
|
|
|
all_cpes = get_oscap_supported_cpes() |
|
133
|
|
|
regex = re.compile(cpe_regex) |
|
134
|
|
|
|
|
135
|
|
|
cpes_to_add = [] |
|
136
|
|
|
for cpe_str in all_cpes: |
|
137
|
|
|
if regex.search(cpe_str): |
|
138
|
|
|
cpes_to_add.append(cpe_str) |
|
139
|
|
|
|
|
140
|
|
|
if not cpes_to_add: |
|
141
|
|
|
cpes_to_add = [cpe_regex] |
|
142
|
|
|
|
|
143
|
|
|
for benchmark in benchmarks: |
|
144
|
|
|
existing_platform_element = benchmark.find("xccdf-1.2:platform", PREFIX_TO_NS) |
|
145
|
|
|
if existing_platform_element is None: |
|
146
|
|
|
logging.warn( |
|
147
|
|
|
"Couldn't find platform element in a benchmark, " |
|
148
|
|
|
"not adding any additional platforms as a result.") |
|
149
|
|
|
continue |
|
150
|
|
|
platform_index = list(benchmark).index(existing_platform_element) |
|
151
|
|
|
for cpe_str in cpes_to_add: |
|
152
|
|
|
e = ET.Element("xccdf-1.2:platform", idref=cpe_str) |
|
153
|
|
|
benchmark.insert(platform_index, e) |
|
154
|
|
|
|
|
155
|
|
|
|
|
156
|
|
|
def _get_benchmark_node(datastream, benchmark_id, logging): |
|
157
|
|
|
root = ET.parse(datastream).getroot() |
|
158
|
|
|
benchmark_node = root.find( |
|
159
|
|
|
"*//xccdf-1.2:Benchmark[@id='{0}']".format(benchmark_id), PREFIX_TO_NS) |
|
160
|
|
|
if benchmark_node is None: |
|
161
|
|
|
if logging is not None: |
|
162
|
|
|
logging.error( |
|
163
|
|
|
"Benchmark ID '{}' not found within DataStream" |
|
164
|
|
|
.format(benchmark_id)) |
|
165
|
|
|
return benchmark_node |
|
166
|
|
|
|
|
167
|
|
|
|
|
168
|
|
|
def get_all_profiles_in_benchmark(datastream, benchmark_id, logging=None): |
|
169
|
|
|
benchmark_node = _get_benchmark_node(datastream, benchmark_id, logging) |
|
170
|
|
|
all_profiles = benchmark_node.findall('xccdf-1.2:Profile', PREFIX_TO_NS) |
|
171
|
|
|
return all_profiles |
|
172
|
|
|
|
|
173
|
|
|
|
|
174
|
|
|
def get_all_rule_selections_in_profile(datastream, benchmark_id, profile_id, logging=None): |
|
175
|
|
|
benchmark_node = _get_benchmark_node(datastream, benchmark_id, logging) |
|
176
|
|
|
profile = benchmark_node.find("xccdf-1.2:Profile[@id='{0}']".format(profile_id), PREFIX_TO_NS) |
|
177
|
|
|
rule_selections = profile.findall("xccdf-1.2:select[@selected='true']", PREFIX_TO_NS) |
|
178
|
|
|
return rule_selections |
|
179
|
|
|
|
|
180
|
|
|
|
|
181
|
|
|
def get_all_rule_ids_in_profile(datastream, benchmark_id, profile_id, logging=None): |
|
182
|
|
|
rule_selections = get_all_rule_selections_in_profile(datastream, benchmark_id, |
|
183
|
|
|
profile_id, logging=None) |
|
184
|
|
|
rule_ids = [select.get("idref") for select in rule_selections] |
|
185
|
|
|
|
|
186
|
|
|
# Strip xccdf 1.2 prefixes from rule ids |
|
187
|
|
|
# Necessary to search for the rules within test scenarios tree |
|
188
|
|
|
prefix_len = len(OSCAP_RULE) |
|
189
|
|
|
return [rule[prefix_len:] for rule in rule_ids] |
|
190
|
|
|
|
|
191
|
|
|
|
|
192
|
|
|
def benchmark_get_applicable_platforms(datastream, benchmark_id, logging=None): |
|
193
|
|
|
""" |
|
194
|
|
|
Returns a set of CPEs the given benchmark is applicable to. |
|
195
|
|
|
""" |
|
196
|
|
|
benchmark_node = _get_benchmark_node(datastream, benchmark_id, logging) |
|
197
|
|
|
platform_elements = benchmark_node.findall('xccdf-1.2:platform', PREFIX_TO_NS) |
|
198
|
|
|
cpes = {platform_el.get("idref") for platform_el in platform_elements} |
|
199
|
|
|
return cpes |
|
200
|
|
|
|
|
201
|
|
|
|
|
202
|
|
|
def find_rule_in_benchmark(datastream, benchmark_id, rule_id, logging=None): |
|
203
|
|
|
""" |
|
204
|
|
|
Returns rule node from the given benchmark. |
|
205
|
|
|
""" |
|
206
|
|
|
benchmark_node = _get_benchmark_node(datastream, benchmark_id, logging) |
|
207
|
|
|
rule = benchmark_node.find(".//xccdf-1.2:Rule[@id='{0}']".format(rule_id), PREFIX_TO_NS) |
|
208
|
|
|
return rule |
|
209
|
|
|
|
|
210
|
|
|
|
|
211
|
|
|
def find_fix_in_benchmark(datastream, benchmark_id, rule_id, fix_type='bash', logging=None): |
|
212
|
|
|
""" |
|
213
|
|
|
Return fix from benchmark. None if not found. |
|
214
|
|
|
""" |
|
215
|
|
|
rule = find_rule_in_benchmark(datastream, benchmark_id, rule_id, logging) |
|
216
|
|
|
if rule is None: |
|
217
|
|
|
return None |
|
218
|
|
|
|
|
219
|
|
|
system_attribute = SYSTEM_ATTRIBUTE.get(fix_type, bash_rem_system) |
|
220
|
|
|
|
|
221
|
|
|
fix = rule.find("xccdf-1.2:fix[@system='{0}']".format(system_attribute), PREFIX_TO_NS) |
|
222
|
|
|
return fix |
|
223
|
|
|
|