Passed
Push — master ( 42467a...9d011f )
by Matěj
03:19 queued 11s
created

find_fix_in_benchmark()   A

Complexity

Conditions 2

Size

Total Lines 12
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
cc 2
eloc 7
nop 5
dl 0
loc 12
ccs 0
cts 7
cp 0
crap 6
rs 10
c 0
b 0
f 0
1
#!/usr/bin/env python2
2
import xml.etree.cElementTree as ET
3
4
import logging
5
import contextlib
6
import re
7
import subprocess
8
9
from ssg.constants import OSCAP_RULE
10
from ssg.constants import PREFIX_TO_NS
11
from ssg.constants import bash_system as bash_rem_system
12
from ssg.constants import ansible_system as ansible_rem_system
13
from ssg.constants import puppet_system as puppet_rem_system
14
from ssg.constants import anaconda_system as anaconda_rem_system
15
from ssg.constants import ignition_system as ignition_rem_system
16
17
SYSTEM_ATTRIBUTE = {
18
    'bash': bash_rem_system,
19
    'ansible': ansible_rem_system,
20
    'puppet': puppet_rem_system,
21
    'anaconda': anaconda_rem_system,
22
    'ignition': ignition_rem_system,
23
}
24
25
26
logging.getLogger(__name__).addHandler(logging.NullHandler())
27
28
29
def get_all_xccdf_ids_in_datastream(datastream):
30
    root = ET.parse(datastream).getroot()
31
32
    checklists_node = root.find(".//ds:checklists", PREFIX_TO_NS)
33
    if checklists_node is None:
34
        logging.error(
35
            "Checklists not found within DataStream")
36
37
    all_checklist_components = checklists_node.findall('ds:component-ref',
38
                                                       PREFIX_TO_NS)
39
    xccdf_ids = [component.get("id") for component in all_checklist_components]
40
    return xccdf_ids
41
42
43
def infer_benchmark_id_from_component_ref_id(datastream, ref_id):
44
    root = ET.parse(datastream).getroot()
45
    component_ref_node = root.find("*//ds:component-ref[@id='{0}']"
46
                                   .format(ref_id), PREFIX_TO_NS)
47
    if component_ref_node is None:
48
        msg = (
49
            'Component reference of Ref-Id {} not found within datastream'
50
            .format(ref_id))
51
        raise RuntimeError(msg)
52
53
    comp_id = component_ref_node.get('{%s}href' % PREFIX_TO_NS['xlink'])
54
    comp_id = comp_id.lstrip('#')
55
56
    query = ".//ds:component[@id='{}']/xccdf-1.2:Benchmark".format(comp_id)
57
    benchmark_node = root.find(query, PREFIX_TO_NS)
58
    if benchmark_node is None:
59
        msg = (
60
            'Benchmark not found within component of Id {}'
61
            .format(comp_id)
62
        )
63
        raise RuntimeError(msg)
64
65
    return benchmark_node.get('id')
66
67
68
@contextlib.contextmanager
69
def datastream_root(ds_location, save_location=None):
70
    try:
71
        tree = ET.parse(ds_location)
72
        for prefix, uri in PREFIX_TO_NS.items():
73
            ET.register_namespace(prefix, uri)
74
        root = tree.getroot()
75
        yield root
76
    finally:
77
        if save_location:
78
            tree.write(save_location)
79
80
81
def remove_machine_platform(root):
82
    remove_machine_only_from_element(root, "xccdf-1.2:Rule")
83
    remove_machine_only_from_element(root, "xccdf-1.2:Group")
84
85
86
def remove_machine_only_from_element(root, element_spec):
87
    query = ".//ds:component/xccdf-1.2:Benchmark//{0}".format(element_spec)
88
    elements = root.findall(query, PREFIX_TO_NS)
89
    for el in elements:
90
        platforms = el.findall("./xccdf-1.2:platform", PREFIX_TO_NS)
91
        for p in platforms:
92
            if p.get("idref") == "cpe:/a:machine":
93
                el.remove(p)
94
95
96
def get_oscap_supported_cpes():
97
    """
98
    Obtain a list of CPEs that the scanner supports
99
    """
100
    result = []
101
    proc = subprocess.Popen(
102
            ("oscap", "--version"), text=True,
103
            stdout=subprocess.PIPE, stderr=subprocess.PIPE)
104
    try:
105
        outs, errs = proc.communicate(timeout=3)
106
    except subprocess.TimeoutExpired:
107
        logging.warn("Scanner timeouted when asked about supported CPEs")
108
        proc.kill()
109
        return []
110
111
    if proc.returncode != 0:
112
        first_error_line = errs.split("\n")[0]
113
        logging.warn("Error getting CPEs from the scanner: {msg}".format(msg=first_error_line))
114
115
    cpe_regex = re.compile(r'\bcpe:\S+$')
116
    for line in outs.split("\n"):
117
        match = cpe_regex.search(line)
118
        if match:
119
            result.append(match.group(0))
120
    return result
121
122
123
def add_platform_to_benchmark(root, cpe_regex):
124
    benchmark_query = ".//ds:component/xccdf-1.2:Benchmark"
125
    benchmarks = root.findall(benchmark_query, PREFIX_TO_NS)
126
    if not benchmarks:
127
        msg = (
128
            "No benchmarks found in the datastream"
129
        )
130
        raise RuntimeError(msg)
131
132
    all_cpes = get_oscap_supported_cpes()
133
    regex = re.compile(cpe_regex)
134
135
    cpes_to_add = []
136
    for cpe_str in all_cpes:
137
        if regex.search(cpe_str):
138
            cpes_to_add.append(cpe_str)
139
140
    if not cpes_to_add:
141
        cpes_to_add = [cpe_regex]
142
143
    for benchmark in benchmarks:
144
        existing_platform_element = benchmark.find("xccdf-1.2:platform", PREFIX_TO_NS)
145
        if existing_platform_element is None:
146
            logging.warn(
147
                "Couldn't find platform element in a benchmark, "
148
                "not adding any additional platforms as a result.")
149
            continue
150
        platform_index = list(benchmark).index(existing_platform_element)
151
        for cpe_str in cpes_to_add:
152
            e = ET.Element("xccdf-1.2:platform", idref=cpe_str)
153
            benchmark.insert(platform_index, e)
154
155
156
def _get_benchmark_node(datastream, benchmark_id, logging):
157
    root = ET.parse(datastream).getroot()
158
    benchmark_node = root.find(
159
        "*//xccdf-1.2:Benchmark[@id='{0}']".format(benchmark_id), PREFIX_TO_NS)
160
    if benchmark_node is None:
161
        if logging is not None:
162
            logging.error(
163
                "Benchmark ID '{}' not found within DataStream"
164
                .format(benchmark_id))
165
    return benchmark_node
166
167
168
def get_all_profiles_in_benchmark(datastream, benchmark_id, logging=None):
169
    benchmark_node = _get_benchmark_node(datastream, benchmark_id, logging)
170
    all_profiles = benchmark_node.findall('xccdf-1.2:Profile', PREFIX_TO_NS)
171
    return all_profiles
172
173
174
def get_all_rule_selections_in_profile(datastream, benchmark_id, profile_id, logging=None):
175
    benchmark_node = _get_benchmark_node(datastream, benchmark_id, logging)
176
    profile = benchmark_node.find("xccdf-1.2:Profile[@id='{0}']".format(profile_id), PREFIX_TO_NS)
177
    rule_selections = profile.findall("xccdf-1.2:select[@selected='true']", PREFIX_TO_NS)
178
    return rule_selections
179
180
181
def get_all_rule_ids_in_profile(datastream, benchmark_id, profile_id, logging=None):
182
    rule_selections = get_all_rule_selections_in_profile(datastream, benchmark_id,
183
                                                         profile_id, logging=None)
184
    rule_ids = [select.get("idref") for select in rule_selections]
185
186
    # Strip xccdf 1.2 prefixes from rule ids
187
    # Necessary to search for the rules within test scenarios tree
188
    prefix_len = len(OSCAP_RULE)
189
    return [rule[prefix_len:] for rule in rule_ids]
190
191
192
def benchmark_get_applicable_platforms(datastream, benchmark_id, logging=None):
193
    """
194
    Returns a set of CPEs the given benchmark is applicable to.
195
    """
196
    benchmark_node = _get_benchmark_node(datastream, benchmark_id, logging)
197
    platform_elements = benchmark_node.findall('xccdf-1.2:platform', PREFIX_TO_NS)
198
    cpes = {platform_el.get("idref") for platform_el in platform_elements}
199
    return cpes
200
201
202
def find_rule_in_benchmark(datastream, benchmark_id, rule_id, logging=None):
203
    """
204
    Returns rule node from the given benchmark.
205
    """
206
    benchmark_node = _get_benchmark_node(datastream, benchmark_id, logging)
207
    rule = benchmark_node.find(".//xccdf-1.2:Rule[@id='{0}']".format(rule_id), PREFIX_TO_NS)
208
    return rule
209
210
211
def find_fix_in_benchmark(datastream, benchmark_id, rule_id, fix_type='bash', logging=None):
212
    """
213
    Return fix from benchmark. None if not found.
214
    """
215
    rule = find_rule_in_benchmark(datastream, benchmark_id, rule_id, logging)
216
    if rule is None:
217
        return None
218
219
    system_attribute = SYSTEM_ATTRIBUTE.get(fix_type, bash_rem_system)
220
221
    fix = rule.find("xccdf-1.2:fix[@system='{0}']".format(system_attribute), PREFIX_TO_NS)
222
    return fix
223