1
|
|
|
#!/usr/bin/env python2 |
2
|
|
|
import xml.etree.cElementTree as ET |
3
|
|
|
|
4
|
|
|
import logging |
5
|
|
|
import contextlib |
6
|
|
|
import re |
7
|
|
|
import subprocess |
8
|
|
|
|
9
|
|
|
from ssg.constants import OSCAP_RULE |
10
|
|
|
from ssg.constants import PREFIX_TO_NS |
11
|
|
|
from ssg.constants import bash_system as bash_rem_system |
12
|
|
|
from ssg.constants import ansible_system as ansible_rem_system |
13
|
|
|
from ssg.constants import puppet_system as puppet_rem_system |
14
|
|
|
from ssg.constants import anaconda_system as anaconda_rem_system |
15
|
|
|
from ssg.constants import ignition_system as ignition_rem_system |
16
|
|
|
|
17
|
|
|
SYSTEM_ATTRIBUTE = { |
18
|
|
|
'bash': bash_rem_system, |
19
|
|
|
'ansible': ansible_rem_system, |
20
|
|
|
'puppet': puppet_rem_system, |
21
|
|
|
'anaconda': anaconda_rem_system, |
22
|
|
|
'ignition': ignition_rem_system, |
23
|
|
|
} |
24
|
|
|
|
25
|
|
|
|
26
|
|
|
logging.getLogger(__name__).addHandler(logging.NullHandler()) |
27
|
|
|
|
28
|
|
|
|
29
|
|
|
def get_all_xccdf_ids_in_datastream(datastream): |
30
|
|
|
root = ET.parse(datastream).getroot() |
31
|
|
|
|
32
|
|
|
checklists_node = root.find(".//ds:checklists", PREFIX_TO_NS) |
33
|
|
|
if checklists_node is None: |
34
|
|
|
logging.error( |
35
|
|
|
"Checklists not found within DataStream") |
36
|
|
|
|
37
|
|
|
all_checklist_components = checklists_node.findall('ds:component-ref', |
38
|
|
|
PREFIX_TO_NS) |
39
|
|
|
xccdf_ids = [component.get("id") for component in all_checklist_components] |
40
|
|
|
return xccdf_ids |
41
|
|
|
|
42
|
|
|
|
43
|
|
|
def infer_benchmark_id_from_component_ref_id(datastream, ref_id): |
44
|
|
|
root = ET.parse(datastream).getroot() |
45
|
|
|
component_ref_node = root.find("*//ds:component-ref[@id='{0}']" |
46
|
|
|
.format(ref_id), PREFIX_TO_NS) |
47
|
|
|
if component_ref_node is None: |
48
|
|
|
msg = ( |
49
|
|
|
'Component reference of Ref-Id {} not found within datastream' |
50
|
|
|
.format(ref_id)) |
51
|
|
|
raise RuntimeError(msg) |
52
|
|
|
|
53
|
|
|
comp_id = component_ref_node.get('{%s}href' % PREFIX_TO_NS['xlink']) |
54
|
|
|
comp_id = comp_id.lstrip('#') |
55
|
|
|
|
56
|
|
|
query = ".//ds:component[@id='{}']/xccdf-1.2:Benchmark".format(comp_id) |
57
|
|
|
benchmark_node = root.find(query, PREFIX_TO_NS) |
58
|
|
|
if benchmark_node is None: |
59
|
|
|
msg = ( |
60
|
|
|
'Benchmark not found within component of Id {}' |
61
|
|
|
.format(comp_id) |
62
|
|
|
) |
63
|
|
|
raise RuntimeError(msg) |
64
|
|
|
|
65
|
|
|
return benchmark_node.get('id') |
66
|
|
|
|
67
|
|
|
|
68
|
|
|
@contextlib.contextmanager |
69
|
|
|
def datastream_root(ds_location, save_location=None): |
70
|
|
|
try: |
71
|
|
|
tree = ET.parse(ds_location) |
72
|
|
|
for prefix, uri in PREFIX_TO_NS.items(): |
73
|
|
|
ET.register_namespace(prefix, uri) |
74
|
|
|
root = tree.getroot() |
75
|
|
|
yield root |
76
|
|
|
finally: |
77
|
|
|
if save_location: |
78
|
|
|
tree.write(save_location) |
79
|
|
|
|
80
|
|
|
|
81
|
|
|
def remove_machine_platform(root): |
82
|
|
|
remove_machine_only_from_element(root, "xccdf-1.2:Rule") |
83
|
|
|
remove_machine_only_from_element(root, "xccdf-1.2:Group") |
84
|
|
|
|
85
|
|
|
|
86
|
|
|
def remove_machine_only_from_element(root, element_spec): |
87
|
|
|
query = ".//ds:component/xccdf-1.2:Benchmark//{0}".format(element_spec) |
88
|
|
|
elements = root.findall(query, PREFIX_TO_NS) |
89
|
|
|
for el in elements: |
90
|
|
|
platforms = el.findall("./xccdf-1.2:platform", PREFIX_TO_NS) |
91
|
|
|
for p in platforms: |
92
|
|
|
if p.get("idref") == "cpe:/a:machine": |
93
|
|
|
el.remove(p) |
94
|
|
|
|
95
|
|
|
|
96
|
|
|
def get_oscap_supported_cpes(): |
97
|
|
|
""" |
98
|
|
|
Obtain a list of CPEs that the scanner supports |
99
|
|
|
""" |
100
|
|
|
result = [] |
101
|
|
|
proc = subprocess.Popen( |
102
|
|
|
("oscap", "--version"), text=True, |
103
|
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
104
|
|
|
try: |
105
|
|
|
outs, errs = proc.communicate(timeout=3) |
106
|
|
|
except subprocess.TimeoutExpired: |
107
|
|
|
logging.warn("Scanner timeouted when asked about supported CPEs") |
108
|
|
|
proc.kill() |
109
|
|
|
return [] |
110
|
|
|
|
111
|
|
|
if proc.returncode != 0: |
112
|
|
|
first_error_line = errs.split("\n")[0] |
113
|
|
|
logging.warn("Error getting CPEs from the scanner: {msg}".format(msg=first_error_line)) |
114
|
|
|
|
115
|
|
|
cpe_regex = re.compile(r'\bcpe:\S+$') |
116
|
|
|
for line in outs.split("\n"): |
117
|
|
|
match = cpe_regex.search(line) |
118
|
|
|
if match: |
119
|
|
|
result.append(match.group(0)) |
120
|
|
|
return result |
121
|
|
|
|
122
|
|
|
|
123
|
|
|
def add_platform_to_benchmark(root, cpe_regex): |
124
|
|
|
benchmark_query = ".//ds:component/xccdf-1.2:Benchmark" |
125
|
|
|
benchmarks = root.findall(benchmark_query, PREFIX_TO_NS) |
126
|
|
|
if not benchmarks: |
127
|
|
|
msg = ( |
128
|
|
|
"No benchmarks found in the datastream" |
129
|
|
|
) |
130
|
|
|
raise RuntimeError(msg) |
131
|
|
|
|
132
|
|
|
all_cpes = get_oscap_supported_cpes() |
133
|
|
|
regex = re.compile(cpe_regex) |
134
|
|
|
|
135
|
|
|
cpes_to_add = [] |
136
|
|
|
for cpe_str in all_cpes: |
137
|
|
|
if regex.search(cpe_str): |
138
|
|
|
cpes_to_add.append(cpe_str) |
139
|
|
|
|
140
|
|
|
if not cpes_to_add: |
141
|
|
|
cpes_to_add = [cpe_regex] |
142
|
|
|
|
143
|
|
|
for benchmark in benchmarks: |
144
|
|
|
existing_platform_element = benchmark.find("xccdf-1.2:platform", PREFIX_TO_NS) |
145
|
|
|
if existing_platform_element is None: |
146
|
|
|
logging.warn( |
147
|
|
|
"Couldn't find platform element in a benchmark, " |
148
|
|
|
"not adding any additional platforms as a result.") |
149
|
|
|
continue |
150
|
|
|
platform_index = list(benchmark).index(existing_platform_element) |
151
|
|
|
for cpe_str in cpes_to_add: |
152
|
|
|
e = ET.Element("xccdf-1.2:platform", idref=cpe_str) |
153
|
|
|
benchmark.insert(platform_index, e) |
154
|
|
|
|
155
|
|
|
|
156
|
|
|
def _get_benchmark_node(datastream, benchmark_id, logging): |
157
|
|
|
root = ET.parse(datastream).getroot() |
158
|
|
|
benchmark_node = root.find( |
159
|
|
|
"*//xccdf-1.2:Benchmark[@id='{0}']".format(benchmark_id), PREFIX_TO_NS) |
160
|
|
|
if benchmark_node is None: |
161
|
|
|
if logging is not None: |
162
|
|
|
logging.error( |
163
|
|
|
"Benchmark ID '{}' not found within DataStream" |
164
|
|
|
.format(benchmark_id)) |
165
|
|
|
return benchmark_node |
166
|
|
|
|
167
|
|
|
|
168
|
|
|
def get_all_profiles_in_benchmark(datastream, benchmark_id, logging=None): |
169
|
|
|
benchmark_node = _get_benchmark_node(datastream, benchmark_id, logging) |
170
|
|
|
all_profiles = benchmark_node.findall('xccdf-1.2:Profile', PREFIX_TO_NS) |
171
|
|
|
return all_profiles |
172
|
|
|
|
173
|
|
|
|
174
|
|
|
def get_all_rule_selections_in_profile(datastream, benchmark_id, profile_id, logging=None): |
175
|
|
|
benchmark_node = _get_benchmark_node(datastream, benchmark_id, logging) |
176
|
|
|
profile = benchmark_node.find("xccdf-1.2:Profile[@id='{0}']".format(profile_id), PREFIX_TO_NS) |
177
|
|
|
rule_selections = profile.findall("xccdf-1.2:select[@selected='true']", PREFIX_TO_NS) |
178
|
|
|
return rule_selections |
179
|
|
|
|
180
|
|
|
|
181
|
|
|
def get_all_rule_ids_in_profile(datastream, benchmark_id, profile_id, logging=None): |
182
|
|
|
rule_selections = get_all_rule_selections_in_profile(datastream, benchmark_id, |
183
|
|
|
profile_id, logging=None) |
184
|
|
|
rule_ids = [select.get("idref") for select in rule_selections] |
185
|
|
|
|
186
|
|
|
# Strip xccdf 1.2 prefixes from rule ids |
187
|
|
|
# Necessary to search for the rules within test scenarios tree |
188
|
|
|
prefix_len = len(OSCAP_RULE) |
189
|
|
|
return [rule[prefix_len:] for rule in rule_ids] |
190
|
|
|
|
191
|
|
|
|
192
|
|
|
def benchmark_get_applicable_platforms(datastream, benchmark_id, logging=None): |
193
|
|
|
""" |
194
|
|
|
Returns a set of CPEs the given benchmark is applicable to. |
195
|
|
|
""" |
196
|
|
|
benchmark_node = _get_benchmark_node(datastream, benchmark_id, logging) |
197
|
|
|
platform_elements = benchmark_node.findall('xccdf-1.2:platform', PREFIX_TO_NS) |
198
|
|
|
cpes = {platform_el.get("idref") for platform_el in platform_elements} |
199
|
|
|
return cpes |
200
|
|
|
|
201
|
|
|
|
202
|
|
|
def find_rule_in_benchmark(datastream, benchmark_id, rule_id, logging=None): |
203
|
|
|
""" |
204
|
|
|
Returns rule node from the given benchmark. |
205
|
|
|
""" |
206
|
|
|
benchmark_node = _get_benchmark_node(datastream, benchmark_id, logging) |
207
|
|
|
rule = benchmark_node.find(".//xccdf-1.2:Rule[@id='{0}']".format(rule_id), PREFIX_TO_NS) |
208
|
|
|
return rule |
209
|
|
|
|
210
|
|
|
|
211
|
|
|
def find_fix_in_benchmark(datastream, benchmark_id, rule_id, fix_type='bash', logging=None): |
212
|
|
|
""" |
213
|
|
|
Return fix from benchmark. None if not found. |
214
|
|
|
""" |
215
|
|
|
rule = find_rule_in_benchmark(datastream, benchmark_id, rule_id, logging) |
216
|
|
|
if rule is None: |
217
|
|
|
return None |
218
|
|
|
|
219
|
|
|
system_attribute = SYSTEM_ATTRIBUTE.get(fix_type, bash_rem_system) |
220
|
|
|
|
221
|
|
|
fix = rule.find("xccdf-1.2:fix[@system='{0}']".format(system_attribute), PREFIX_TO_NS) |
222
|
|
|
return fix |
223
|
|
|
|