Passed
Push — master ( ea7c4a...d61422 )
by Matěj
01:18 queued 12s
created

ssg_test_suite.rule.get_viable_profiles()   B

Complexity

Conditions 6

Size

Total Lines 23
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 42

Importance

Changes 0
Metric Value
cc 6
eloc 17
nop 3
dl 0
loc 23
ccs 0
cts 15
cp 0
crap 42
rs 8.6166
c 0
b 0
f 0
1
#!/usr/bin/env python2
2
from __future__ import print_function
3
4
import logging
5
import os
6
import os.path
7
import re
8
import subprocess
9
import collections
10
import json
11
import fnmatch
12
13
from ssg.constants import OSCAP_PROFILE, OSCAP_PROFILE_ALL_ID, OSCAP_RULE
14
from ssg_test_suite import oscap
15
from ssg_test_suite import xml_operations
16
from ssg_test_suite import test_env
17
from ssg_test_suite import common
18
from ssg_test_suite.log import LogHelper
19
20
21
logging.getLogger(__name__).addHandler(logging.NullHandler())
22
23
24
Scenario = collections.namedtuple(
25
    "Scenario", ["script", "context", "script_params"])
26
27
28
def get_viable_profiles(selected_profiles, datastream, benchmark):
29
    """Read datastream, and return set intersection of profiles of given
30
    benchmark and those provided in `selected_profiles` parameter.
31
    """
32
33
    valid_profiles = []
34
    all_profiles_elements = xml_operations.get_all_profiles_in_benchmark(
35
        datastream, benchmark, logging)
36
    all_profiles = [el.attrib["id"] for el in all_profiles_elements]
37
    all_profiles.append(OSCAP_PROFILE_ALL_ID)
38
39
    for ds_profile in all_profiles:
40
        if 'ALL' in selected_profiles:
41
            valid_profiles += [ds_profile]
42
            continue
43
        for sel_profile in selected_profiles:
44
            if ds_profile.endswith(sel_profile):
45
                valid_profiles += [ds_profile]
46
47
    if not valid_profiles:
48
        logging.warning('No profile ends with "{0}"'
49
                      .format(", ".join(selected_profiles)))
50
    return valid_profiles
51
52
53
def _apply_script(rule_dir, domain_ip, script):
54
    """Run particular test script on VM and log it's output."""
55
    machine = "{0}@{1}".format(common.REMOTE_USER, domain_ip)
56
    logging.debug("Applying script {0}".format(script))
57
    rule_name = os.path.basename(rule_dir)
58
    log_file_name = os.path.join(
59
        LogHelper.LOG_DIR, rule_name + ".prescripts.log")
60
61
    with open(log_file_name, 'a') as log_file:
62
        log_file.write('##### {0} / {1} #####\n'.format(rule_name, script))
63
        shared_dir = os.path.join(common.REMOTE_TEST_SCENARIOS_DIRECTORY, "shared")
64
        command = "cd {0}; SHARED={1} bash -x {2}".format(rule_dir, shared_dir, script)
65
        args = common.SSH_ADDITIONAL_OPTS + (machine, command)
66
67
        try:
68
            common.run_with_stdout_logging("ssh", args, log_file)
69
        except subprocess.CalledProcessError as exc:
70
            logging.error("Rule testing script {script} failed with exit code {rc}"
71
                          .format(script=script, rc=exc.returncode))
72
            return False
73
    return True
74
75
76
def _get_script_context(script):
77
    """Return context of the script."""
78
    result = re.search(r'.*\.([^.]*)\.[^.]*$', script)
79
    if result is None:
80
        return None
81
    return result.group(1)
82
83
84
class RuleChecker(oscap.Checker):
85
    """
86
    Rule checks generally work like this -
87
    for every profile that supports that rule:
88
89
    - Alter the system.
90
    - Run the scan, check that the result meets expectations.
91
      If the test scenario passed as requested, return True,
92
      if it failed or passed unexpectedly, return False.
93
94
    The following sequence applies if the initial scan
95
    has failed as expected:
96
97
    - If there are no remediations, return True.
98
    - Run remediation, return False if it failed.
99
    - Return result of the final scan of remediated system.
100
    """
101
    def __init__(self, test_env):
102
        super(RuleChecker, self).__init__(test_env)
103
        self._matching_rule_found = False
104
105
        self.results = list()
106
        self._current_result = None
107
108
    def _run_test(self, profile, test_data):
109
        scenario = test_data["scenario"]
110
        rule_id = test_data["rule_id"]
111
        remediation_available = test_data["remediation_available"]
112
113
        LogHelper.preload_log(
114
            logging.INFO, "Script {0} using profile {1} OK".format(scenario.script, profile),
115
            log_target='pass')
116
        LogHelper.preload_log(
117
            logging.ERROR,
118
            "Script {0} using profile {1} found issue:".format(scenario.script, profile),
119
            log_target='fail')
120
121
        runner_cls = oscap.REMEDIATION_RULE_RUNNERS[self.remediate_using]
122
        runner = runner_cls(
123
            self.test_env, oscap.process_profile_id(profile), self.datastream, self.benchmark_id,
124
            rule_id, scenario.script, self.dont_clean, self.manual_debug)
125
        if not self._initial_scan_went_ok(runner, rule_id, scenario.context):
126
            return False
127
128
        supported_and_available_remediations = self._get_available_remediations(scenario)
129
        if (scenario.context not in ['fail', 'error']
130
                or not supported_and_available_remediations):
131
            return True
132
133
        if remediation_available:
134
            if not self._remediation_went_ok(runner, rule_id):
135
                return False
136
137
            return self._final_scan_went_ok(runner, rule_id)
138
        else:
139
            msg = ("No remediation is available for rule '{}'."
140
                   .format(rule_id))
141
            logging.warning(msg)
142
            return False
143
144
    def _initial_scan_went_ok(self, runner, rule_id, context):
145
        success = runner.run_stage_with_context("initial", context)
146
        self._current_result.record_stage_result("initial_scan", success)
147
        if not success:
148
            msg = ("The initial scan failed for rule '{}'."
149
                   .format(rule_id))
150
            logging.error(msg)
151
        return success
152
153
    def _is_remediation_available(self, rule):
154
        if xml_operations.find_fix_in_benchmark(
155
                self.datastream, self.benchmark_id, rule.id, self.remediate_using) is None:
156
            return False
157
        else:
158
            return True
159
160
161
    def _get_available_remediations(self, scenario):
162
        is_supported = set(['all'])
163
        is_supported.add(
164
            oscap.REMEDIATION_RUNNER_TO_REMEDIATION_MEANS[self.remediate_using])
165
        supported_and_available_remediations = set(
166
            scenario.script_params['remediation']).intersection(is_supported)
167
        return supported_and_available_remediations
168
169
    def _remediation_went_ok(self, runner, rule_id):
170
        success = runner.run_stage_with_context('remediation', 'fixed')
171
        self._current_result.record_stage_result("remediation", success)
172
        if not success:
173
            msg = ("The remediation failed for rule '{}'."
174
                   .format(rule_id))
175
            logging.error(msg)
176
177
        return success
178
179
    def _final_scan_went_ok(self, runner, rule_id):
180
        success = runner.run_stage_with_context('final', 'pass')
181
        self._current_result.record_stage_result("final_scan", success)
182
        if not success:
183
            msg = ("The check after remediation failed for rule '{}'."
184
                   .format(rule_id))
185
            logging.error(msg)
186
        return success
187
188
    def _rule_should_be_tested(self, rule_id, rules_to_be_tested):
189
        if 'ALL' in rules_to_be_tested:
190
            return True
191
        else:
192
            for rule_to_be_tested in rules_to_be_tested:
193
                # we check for a substring
194
                if rule_to_be_tested.startswith(OSCAP_RULE):
195
                    pattern = rule_to_be_tested
196
                else:
197
                    pattern = OSCAP_RULE + rule_to_be_tested
198
                if fnmatch.fnmatch(rule_id, pattern):
199
                    return True
200
            return False
201
202
    def _test_target(self, target):
203
        try:
204
            remote_dir = common.send_scripts(self.test_env.domain_ip)
205
        except RuntimeError as exc:
206
            msg = "Unable to upload test scripts: {more_info}".format(more_info=str(exc))
207
            raise RuntimeError(msg)
208
209
        self._matching_rule_found = False
210
211
        with test_env.SavedState.create_from_environment(self.test_env, "tests_uploaded") as state:
212
            for rule in common.iterate_over_rules():
213
                if not self._rule_should_be_tested(rule.id, target):
214
                    continue
215
                self._matching_rule_found = True
216
                if not xml_operations.find_rule_in_benchmark(
217
                        self.datastream, self.benchmark_id, rule.id):
218
                    logging.error(
219
                        "Rule '{0}' isn't present in benchmark '{1}' in '{2}'"
220
                        .format(rule.id, self.benchmark_id, self.datastream))
221
                    continue
222
                remediation_available = self._is_remediation_available(rule)
223
224
                self._check_rule(rule, remote_dir, state, remediation_available)
225
226
        if not self._matching_rule_found:
227
            logging.error("No matching rule ID found for '{0}'".format(target))
228
229
    def _modify_parameters(self, script, params):
230
        if self.scenarios_profile:
231
            params['profiles'] = [self.scenarios_profile]
232
233
        if not params["profiles"]:
234
            params["profiles"].append(OSCAP_PROFILE_ALL_ID)
235
            logging.debug(
236
                "Added the {0} profile to the list of available profiles for {1}"
237
                .format(OSCAP_PROFILE_ALL_ID, script))
238
        return params
239
240
    def _parse_parameters(self, script):
241
        """Parse parameters from script header"""
242
        params = {'profiles': [],
243
                  'templates': [],
244
                  'platform': ['multi_platform_all'],
245
                  'remediation': ['all']}
246
        with open(script, 'r') as script_file:
247
            script_content = script_file.read()
248
            for parameter in params:
249
                found = re.search(r'^# {0} = ([ ,_\.\-\w\(\)]*)$'.format(parameter),
250
                                  script_content,
251
                                  re.MULTILINE)
252
                if found is None:
253
                    continue
254
                splitted = found.group(1).split(',')
255
                params[parameter] = [value.strip() for value in splitted]
256
        return params
257
258
    def _get_scenarios(self, rule_dir, scripts, scenarios_regex, benchmark_cpes):
259
        """ Returns only valid scenario files, rest is ignored (is not meant
260
        to be executed directly.
261
        """
262
263
        if scenarios_regex is not None:
264
            scenarios_pattern = re.compile(scenarios_regex)
265
266
        scenarios = []
267
        for script in scripts:
268
            if scenarios_regex is not None:
269
                if scenarios_pattern.match(script) is None:
0 ignored issues
show
introduced by
The variable scenarios_pattern does not seem to be defined in case scenarios_regex is not None on line 263 is False. Are you sure this can never be the case?
Loading history...
270
                    logging.debug("Skipping script %s - it did not match "
271
                                  "--scenarios regex" % script)
272
                    continue
273
            script_context = _get_script_context(script)
274
            if script_context is not None:
275
                script_params = self._parse_parameters(os.path.join(rule_dir, script))
276
                script_params = self._modify_parameters(script, script_params)
277
                if common.matches_platform(script_params["platform"], benchmark_cpes):
278
                    scenarios += [Scenario(script, script_context, script_params)]
279
                else:
280
                    logging.warning("Script %s is not applicable on given platform" % script)
281
282
        return scenarios
283
284
    def _check_rule(self, rule, remote_dir, state, remediation_available):
285
        remote_rule_dir = os.path.join(remote_dir, rule.short_id)
286
        logging.info(rule.id)
287
288
        logging.debug("Testing rule directory {0}".format(rule.directory))
289
290
        args_list = [
291
            (s, remote_rule_dir, rule.id, remediation_available)
292
            for s in self._get_scenarios(
293
                rule.directory, rule.files, self.scenarios_regex,
294
                self.benchmark_cpes)
295
        ]
296
        state.map_on_top(self._check_and_record_rule_scenario, args_list)
297
298
    def _check_and_record_rule_scenario(self, scenario, remote_rule_dir, rule_id, remediation_available):
299
        self._current_result = common.RuleResult()
300
301
        self._current_result.conditions = common.Scenario_conditions(
302
            self.test_env.name, self.test_env.scanning_mode,
303
            self.remediate_using, self.datastream)
304
        self._current_result.scenario = common.Scenario_run(rule_id, scenario.script)
305
        self._current_result.when = self.test_timestamp_str
306
307
        self._check_rule_scenario(scenario, remote_rule_dir, rule_id, remediation_available)
308
        self.results.append(self._current_result.save_to_dict())
309
310
    def _check_rule_scenario(self, scenario, remote_rule_dir, rule_id, remediation_available):
311
        if not _apply_script(
312
                remote_rule_dir, self.test_env.domain_ip, scenario.script):
313
            logging.error("Environment failed to prepare, skipping test")
314
            self._current_result.record_stage_result("preparation", False)
315
            return
316
317
        self._current_result.record_stage_result("preparation", True)
318
        logging.debug('Using test script {0} with context {1}'
319
                      .format(scenario.script, scenario.context))
320
321
        profiles = get_viable_profiles(
322
            scenario.script_params['profiles'], self.datastream, self.benchmark_id)
323
        test_data = dict(scenario=scenario,
324
                         rule_id=rule_id,
325
                         remediation_available=remediation_available)
326
        self.run_test_for_all_profiles(profiles, test_data)
327
328
        self.executed_tests += 1
329
330
    def finalize(self):
331
        super(RuleChecker, self).finalize()
332
        with open(os.path.join(LogHelper.LOG_DIR, "results.json"), "w") as f:
333
            json.dump(self.results, f)
334
335
336
def perform_rule_check(options):
337
    checker = RuleChecker(options.test_env)
338
339
    checker.datastream = options.datastream
340
    checker.benchmark_id = options.benchmark_id
341
    checker.remediate_using = options.remediate_using
342
    checker.dont_clean = options.dont_clean
343
    checker.manual_debug = options.manual_debug
344
    checker.benchmark_cpes = options.benchmark_cpes
345
    checker.scenarios_regex = options.scenarios_regex
346
347
    checker.scenarios_profile = options.scenarios_profile
348
    # check if target is a complete profile ID, if not prepend profile prefix
349
    if (checker.scenarios_profile is not None and
350
            not checker.scenarios_profile.startswith(OSCAP_PROFILE) and
351
            not oscap.is_virtual_oscap_profile(checker.scenarios_profile)):
352
        checker.scenarios_profile = OSCAP_PROFILE+options.scenarios_profile
353
354
    checker.test_target(options.target)
355