Passed
Push — master ( a0d626...24e340 )
by Jan
02:40 queued 10s
created

datastream_root()   A

Complexity

Conditions 3

Size

Total Lines 11
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12

Importance

Changes 0
Metric Value
cc 3
eloc 10
nop 2
dl 0
loc 11
ccs 0
cts 10
cp 0
crap 12
rs 9.9
c 0
b 0
f 0
1
#!/usr/bin/env python2
2
import xml.etree.cElementTree as ET
3
4
import logging
5
import contextlib
6
import re
7
import subprocess
8
9
from ssg.constants import OSCAP_RULE
10
from ssg.constants import PREFIX_TO_NS
11
from ssg.constants import bash_system as bash_rem_system
12
from ssg.constants import ansible_system as ansible_rem_system
13
from ssg.constants import puppet_system as puppet_rem_system
14
from ssg.constants import anaconda_system as anaconda_rem_system
15
from ssg.constants import ignition_system as ignition_rem_system
16
17
SYSTEM_ATTRIBUTE = {
18
    'bash': bash_rem_system,
19
    'ansible': ansible_rem_system,
20
    'puppet': puppet_rem_system,
21
    'anaconda': anaconda_rem_system,
22
    'ignition': ignition_rem_system,
23
}
24
25
26
BENCHMARK_QUERY = ".//ds:component/xccdf-1.2:Benchmark"
27
28
logging.getLogger(__name__).addHandler(logging.NullHandler())
29
30
31
def get_all_xccdf_ids_in_datastream(datastream):
32
    root = ET.parse(datastream).getroot()
33
34
    checklists_node = root.find(".//ds:checklists", PREFIX_TO_NS)
35
    if checklists_node is None:
36
        logging.error(
37
            "Checklists not found within DataStream")
38
39
    all_checklist_components = checklists_node.findall('ds:component-ref',
40
                                                       PREFIX_TO_NS)
41
    xccdf_ids = [component.get("id") for component in all_checklist_components]
42
    return xccdf_ids
43
44
45
def infer_benchmark_id_from_component_ref_id(datastream, ref_id):
46
    root = ET.parse(datastream).getroot()
47
    component_ref_node = root.find("*//ds:component-ref[@id='{0}']"
48
                                   .format(ref_id), PREFIX_TO_NS)
49
    if component_ref_node is None:
50
        msg = (
51
            'Component reference of Ref-Id {} not found within datastream'
52
            .format(ref_id))
53
        raise RuntimeError(msg)
54
55
    comp_id = component_ref_node.get('{%s}href' % PREFIX_TO_NS['xlink'])
56
    comp_id = comp_id.lstrip('#')
57
58
    query = ".//ds:component[@id='{}']/xccdf-1.2:Benchmark".format(comp_id)
59
    benchmark_node = root.find(query, PREFIX_TO_NS)
60
    if benchmark_node is None:
61
        msg = (
62
            'Benchmark not found within component of Id {}'
63
            .format(comp_id)
64
        )
65
        raise RuntimeError(msg)
66
67
    return benchmark_node.get('id')
68
69
70
@contextlib.contextmanager
71
def datastream_root(ds_location, save_location=None):
72
    try:
73
        tree = ET.parse(ds_location)
74
        for prefix, uri in PREFIX_TO_NS.items():
75
            ET.register_namespace(prefix, uri)
76
        root = tree.getroot()
77
        yield root
78
    finally:
79
        if save_location:
80
            tree.write(save_location)
81
82
83
def remove_machine_platform(root):
84
    remove_machine_only_from_element(root, "xccdf-1.2:Rule")
85
    remove_machine_only_from_element(root, "xccdf-1.2:Group")
86
87
88
def remove_machine_only_from_element(root, element_spec):
89
    query = BENCHMARK_QUERY + "//{0}".format(element_spec)
90
    elements = root.findall(query, PREFIX_TO_NS)
91
    for el in elements:
92
        platforms = el.findall("./xccdf-1.2:platform", PREFIX_TO_NS)
93
        for p in platforms:
94
            if p.get("idref") == "cpe:/a:machine":
95
                el.remove(p)
96
97
98
def remove_machine_remediation_condition(root):
99
    remove_bash_machine_remediation_condition(root)
100
    remove_ansible_machine_remediation_condition(root)
101
102
103
def remove_bash_machine_remediation_condition(root):
104
    query = BENCHMARK_QUERY + 'xccdf-1.2:fix[@system="urn:xccdf:fix:script:sh"]'
105
    fix_elements = root.findall(query, PREFIX_TO_NS)
106
    considered_machine_platform_checks = [
107
        r"\[\s+!\s+-f\s+/\.dockerenv\s+\]\s+&&\s+\[\s+!\s+-f\s+/run/\.containerenv\s+\]",
108
    ]
109
    for el in fix_elements:
110
        for check in considered_machine_platform_checks:
111
            el.text = re.sub(check, "true", el.text)
112
113
114
def remove_ansible_machine_remediation_condition(root):
115
    query = BENCHMARK_QUERY + '//xccdf-1.2:fix[@system="urn:xccdf:fix:script:ansible"]'
116
    fix_elements = root.findall(query, PREFIX_TO_NS)
117
    considered_machine_platform_checks = [
118
        r"\bansible_virtualization_type\s+not\s+in.*docker.*",
119
    ]
120
    for el in fix_elements:
121
        for check in considered_machine_platform_checks:
122
            el.text = re.sub(check, "True", el.text)
123
124
125
def get_oscap_supported_cpes():
126
    """
127
    Obtain a list of CPEs that the scanner supports
128
    """
129
    result = []
130
    proc = subprocess.Popen(
131
            ("oscap", "--version"), text=True,
132
            stdout=subprocess.PIPE, stderr=subprocess.PIPE)
133
    try:
134
        outs, errs = proc.communicate(timeout=3)
135
    except subprocess.TimeoutExpired:
136
        logging.warn("Scanner timeouted when asked about supported CPEs")
137
        proc.kill()
138
        return []
139
140
    if proc.returncode != 0:
141
        first_error_line = errs.split("\n")[0]
142
        logging.warn("Error getting CPEs from the scanner: {msg}".format(msg=first_error_line))
143
144
    cpe_regex = re.compile(r'\bcpe:\S+$')
145
    for line in outs.split("\n"):
146
        match = cpe_regex.search(line)
147
        if match:
148
            result.append(match.group(0))
149
    return result
150
151
152
def add_platform_to_benchmark(root, cpe_regex):
153
    benchmark_query = ".//ds:component/xccdf-1.2:Benchmark"
154
    benchmarks = root.findall(benchmark_query, PREFIX_TO_NS)
155
    if not benchmarks:
156
        msg = (
157
            "No benchmarks found in the datastream"
158
        )
159
        raise RuntimeError(msg)
160
161
    all_cpes = get_oscap_supported_cpes()
162
    regex = re.compile(cpe_regex)
163
164
    cpes_to_add = []
165
    for cpe_str in all_cpes:
166
        if regex.search(cpe_str):
167
            cpes_to_add.append(cpe_str)
168
169
    if not cpes_to_add:
170
        cpes_to_add = [cpe_regex]
171
172
    for benchmark in benchmarks:
173
        existing_platform_element = benchmark.find("xccdf-1.2:platform", PREFIX_TO_NS)
174
        if existing_platform_element is None:
175
            logging.warn(
176
                "Couldn't find platform element in a benchmark, "
177
                "not adding any additional platforms as a result.")
178
            continue
179
        platform_index = list(benchmark).index(existing_platform_element)
180
        for cpe_str in cpes_to_add:
181
            e = ET.Element("xccdf-1.2:platform", idref=cpe_str)
182
            benchmark.insert(platform_index, e)
183
184
185
def _get_benchmark_node(datastream, benchmark_id, logging):
186
    root = ET.parse(datastream).getroot()
187
    benchmark_node = root.find(
188
        "*//xccdf-1.2:Benchmark[@id='{0}']".format(benchmark_id), PREFIX_TO_NS)
189
    if benchmark_node is None:
190
        if logging is not None:
191
            logging.error(
192
                "Benchmark ID '{}' not found within DataStream"
193
                .format(benchmark_id))
194
    return benchmark_node
195
196
197
def get_all_profiles_in_benchmark(datastream, benchmark_id, logging=None):
198
    benchmark_node = _get_benchmark_node(datastream, benchmark_id, logging)
199
    all_profiles = benchmark_node.findall('xccdf-1.2:Profile', PREFIX_TO_NS)
200
    return all_profiles
201
202
203
def get_all_rule_selections_in_profile(datastream, benchmark_id, profile_id, logging=None):
204
    benchmark_node = _get_benchmark_node(datastream, benchmark_id, logging)
205
    profile = benchmark_node.find("xccdf-1.2:Profile[@id='{0}']".format(profile_id), PREFIX_TO_NS)
206
    rule_selections = profile.findall("xccdf-1.2:select[@selected='true']", PREFIX_TO_NS)
207
    return rule_selections
208
209
210
def get_all_rule_ids_in_profile(datastream, benchmark_id, profile_id, logging=None):
211
    rule_selections = get_all_rule_selections_in_profile(datastream, benchmark_id,
212
                                                         profile_id, logging=None)
213
    rule_ids = [select.get("idref") for select in rule_selections]
214
215
    # Strip xccdf 1.2 prefixes from rule ids
216
    # Necessary to search for the rules within test scenarios tree
217
    prefix_len = len(OSCAP_RULE)
218
    return [rule[prefix_len:] for rule in rule_ids]
219
220
221
def benchmark_get_applicable_platforms(datastream, benchmark_id, logging=None):
222
    """
223
    Returns a set of CPEs the given benchmark is applicable to.
224
    """
225
    benchmark_node = _get_benchmark_node(datastream, benchmark_id, logging)
226
    platform_elements = benchmark_node.findall('xccdf-1.2:platform', PREFIX_TO_NS)
227
    cpes = {platform_el.get("idref") for platform_el in platform_elements}
228
    return cpes
229
230
231
def find_rule_in_benchmark(datastream, benchmark_id, rule_id, logging=None):
232
    """
233
    Returns rule node from the given benchmark.
234
    """
235
    benchmark_node = _get_benchmark_node(datastream, benchmark_id, logging)
236
    rule = benchmark_node.find(".//xccdf-1.2:Rule[@id='{0}']".format(rule_id), PREFIX_TO_NS)
237
    return rule
238
239
240
def find_fix_in_benchmark(datastream, benchmark_id, rule_id, fix_type='bash', logging=None):
241
    """
242
    Return fix from benchmark. None if not found.
243
    """
244
    rule = find_rule_in_benchmark(datastream, benchmark_id, rule_id, logging)
245
    if rule is None:
246
        return None
247
248
    system_attribute = SYSTEM_ATTRIBUTE.get(fix_type, bash_rem_system)
249
250
    fix = rule.find("xccdf-1.2:fix[@system='{0}']".format(system_attribute), PREFIX_TO_NS)
251
    return fix
252