Passed
Push — master ( 041d2f...63c4fa )
by Jan
02:27 queued 11s
created

ssg_test_suite.rule.RuleChecker.test_rule()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 5
nop 4
dl 0
loc 5
ccs 0
cts 3
cp 0
crap 2
rs 10
c 0
b 0
f 0
1
from __future__ import print_function
2
3
import logging
4
import os
5
import os.path
6
import re
7
import subprocess
8
import collections
9
import json
10
import fnmatch
11
12
from ssg.constants import OSCAP_PROFILE, OSCAP_PROFILE_ALL_ID, OSCAP_RULE
13
from ssg_test_suite import oscap
14
from ssg_test_suite import xml_operations
15
from ssg_test_suite import test_env
16
from ssg_test_suite import common
17
from ssg_test_suite.log import LogHelper
18
19
20
logging.getLogger(__name__).addHandler(logging.NullHandler())
21
22
23
Scenario = collections.namedtuple(
24
    "Scenario", ["script", "context", "script_params"])
25
26
27
def get_viable_profiles(selected_profiles, datastream, benchmark, script=None):
28
    """Read datastream, and return set intersection of profiles of given
29
    benchmark and those provided in `selected_profiles` parameter.
30
    """
31
32
    valid_profiles = []
33
    all_profiles_elements = xml_operations.get_all_profiles_in_benchmark(
34
        datastream, benchmark, logging)
35
    all_profiles = [el.attrib["id"] for el in all_profiles_elements]
36
    all_profiles.append(OSCAP_PROFILE_ALL_ID)
37
38
    for ds_profile in all_profiles:
39
        if 'ALL' in selected_profiles:
40
            valid_profiles += [ds_profile]
41
            continue
42
        for sel_profile in selected_profiles:
43
            if ds_profile.endswith(sel_profile):
44
                valid_profiles += [ds_profile]
45
46
    if not valid_profiles:
47
        if script:
48
            logging.warning('Script {0} - profile {1} not found in datastream'
49
                            .format(script, ", ".join(selected_profiles)))
50
        else:
51
            logging.warning('Profile {0} not found in datastream'
52
                            .format(", ".join(selected_profiles)))
53
    return valid_profiles
54
55
56
def _apply_script(rule_dir, domain_ip, script):
57
    """Run particular test script on VM and log it's output."""
58
    machine = "{0}@{1}".format(common.REMOTE_USER, domain_ip)
59
    logging.debug("Applying script {0}".format(script))
60
    rule_name = os.path.basename(rule_dir)
61
    log_file_name = os.path.join(
62
        LogHelper.LOG_DIR, rule_name + ".prescripts.log")
63
64
    with open(log_file_name, 'a') as log_file:
65
        log_file.write('##### {0} / {1} #####\n'.format(rule_name, script))
66
        shared_dir = os.path.join(common.REMOTE_TEST_SCENARIOS_DIRECTORY, "shared")
67
        command = "cd {0}; SHARED={1} bash -x {2}".format(rule_dir, shared_dir, script)
68
        args = common.SSH_ADDITIONAL_OPTS + (machine, command)
69
70
        try:
71
            common.run_with_stdout_logging("ssh", args, log_file)
72
        except subprocess.CalledProcessError as exc:
73
            logging.error("Rule testing script {script} failed with exit code {rc}"
74
                          .format(script=script, rc=exc.returncode))
75
            return False
76
    return True
77
78
79
def _get_script_context(script):
80
    """Return context of the script."""
81
    result = re.search(r'.*\.([^.]*)\.[^.]*$', script)
82
    if result is None:
83
        return None
84
    return result.group(1)
85
86
87
class RuleChecker(oscap.Checker):
88
    """
89
    Rule checks generally work like this -
90
    for every profile that supports that rule:
91
92
    - Alter the system.
93
    - Run the scan, check that the result meets expectations.
94
      If the test scenario passed as requested, return True,
95
      if it failed or passed unexpectedly, return False.
96
97
    The following sequence applies if the initial scan
98
    has failed as expected:
99
100
    - If there are no remediations, return True.
101
    - Run remediation, return False if it failed.
102
    - Return result of the final scan of remediated system.
103
    """
104
    def __init__(self, test_env):
105
        super(RuleChecker, self).__init__(test_env)
106
        self._matching_rule_found = False
107
108
        self.results = list()
109
        self._current_result = None
110
        self.remote_dir = ""
111
112
    def _run_test(self, profile, test_data):
113
        scenario = test_data["scenario"]
114
        rule_id = test_data["rule_id"]
115
        remediation_available = test_data["remediation_available"]
116
117
        LogHelper.preload_log(
118
            logging.INFO, "Script {0} using profile {1} OK".format(scenario.script, profile),
119
            log_target='pass')
120
        LogHelper.preload_log(
121
            logging.WARNING, "Script {0} using profile {1} notapplicable".format(scenario.script, profile),
122
            log_target='notapplicable')
123
        LogHelper.preload_log(
124
            logging.ERROR,
125
            "Script {0} using profile {1} found issue:".format(scenario.script, profile),
126
            log_target='fail')
127
128
        runner_cls = oscap.REMEDIATION_RULE_RUNNERS[self.remediate_using]
129
        runner = runner_cls(
130
            self.test_env, oscap.process_profile_id(profile), self.datastream, self.benchmark_id,
131
            rule_id, scenario.script, self.dont_clean, self.manual_debug)
132
        initial_scan_res = self._initial_scan_went_ok(runner, rule_id, scenario.context)
133
        if not initial_scan_res:
134
            return False
135
        if initial_scan_res == 2:
136
            # notapplicable
137
            return True
138
139
        supported_and_available_remediations = self._get_available_remediations(scenario)
140
        if (scenario.context not in ['fail', 'error']
141
                or not supported_and_available_remediations):
142
            return True
143
144
        if remediation_available:
145
            if not self._remediation_went_ok(runner, rule_id):
146
                return False
147
148
            return self._final_scan_went_ok(runner, rule_id)
149
        else:
150
            msg = ("No remediation is available for rule '{}'."
151
                   .format(rule_id))
152
            logging.warning(msg)
153
            return False
154
155
    def _initial_scan_went_ok(self, runner, rule_id, context):
156
        success = runner.run_stage_with_context("initial", context)
157
        self._current_result.record_stage_result("initial_scan", success)
158
        if not success:
159
            msg = ("The initial scan failed for rule '{}'."
160
                   .format(rule_id))
161
            logging.error(msg)
162
        return success
163
164
    def _is_remediation_available(self, rule):
165
        if xml_operations.find_fix_in_benchmark(
166
                self.datastream, self.benchmark_id, rule.id, self.remediate_using) is None:
167
            return False
168
        else:
169
            return True
170
171
172
    def _get_available_remediations(self, scenario):
173
        is_supported = set(['all'])
174
        is_supported.add(
175
            oscap.REMEDIATION_RUNNER_TO_REMEDIATION_MEANS[self.remediate_using])
176
        supported_and_available_remediations = set(
177
            scenario.script_params['remediation']).intersection(is_supported)
178
        return supported_and_available_remediations
179
180
    def _remediation_went_ok(self, runner, rule_id):
181
        success = runner.run_stage_with_context('remediation', 'fixed')
182
        self._current_result.record_stage_result("remediation", success)
183
        if not success:
184
            msg = ("The remediation failed for rule '{}'."
185
                   .format(rule_id))
186
            logging.error(msg)
187
188
        return success
189
190
    def _final_scan_went_ok(self, runner, rule_id):
191
        success = runner.run_stage_with_context('final', 'pass')
192
        self._current_result.record_stage_result("final_scan", success)
193
        if not success:
194
            msg = ("The check after remediation failed for rule '{}'."
195
                   .format(rule_id))
196
            logging.error(msg)
197
        return success
198
199
    def _rule_should_be_tested(self, rule, rules_to_be_tested):
200
        if 'ALL' in rules_to_be_tested:
201
            return True
202
        else:
203
            for rule_to_be_tested in rules_to_be_tested:
204
                # we check for a substring
205
                if rule_to_be_tested.startswith(OSCAP_RULE):
206
                    pattern = rule_to_be_tested
207
                else:
208
                    pattern = OSCAP_RULE + rule_to_be_tested
209
                if fnmatch.fnmatch(rule.id, pattern):
210
                    return True
211
            return False
212
213
    def _ensure_package_present_for_all_scenarios(self, scenarios_by_rule):
214
        packages_required = set()
215
        for rule, scenarios in scenarios_by_rule.items():
216
            for s in scenarios:
217
                scenario_packages = s.script_params["packages"]
218
                packages_required.update(scenario_packages)
219
        if packages_required:
220
            common.install_packages(self.test_env.domain_ip, packages_required)
221
222
    def _prepare_environment(self, scenarios_by_rule):
223
        domain_ip = self.test_env.domain_ip
224
        try:
225
            self.remote_dir = common.send_scripts(domain_ip)
226
        except RuntimeError as exc:
227
            msg = "Unable to upload test scripts: {more_info}".format(more_info=str(exc))
228
            raise RuntimeError(msg)
229
230
        self._ensure_package_present_for_all_scenarios(scenarios_by_rule)
231
232
    def _get_rules_to_test(self, target):
233
        rules_to_test = []
234
        for rule in common.iterate_over_rules():
235
            if not self._rule_should_be_tested(rule, target):
236
                continue
237
            if not xml_operations.find_rule_in_benchmark(
238
                    self.datastream, self.benchmark_id, rule.id):
239
                logging.error(
240
                    "Rule '{0}' isn't present in benchmark '{1}' in '{2}'"
241
                    .format(rule.id, self.benchmark_id, self.datastream))
242
                continue
243
            rules_to_test.append(rule)
244
        return rules_to_test
245
246
    def test_rule(self, state, rule, scenarios):
247
        remediation_available = self._is_remediation_available(rule)
248
        self._check_rule(
249
            rule, scenarios,
250
            self.remote_dir, state, remediation_available)
251
252
    def _test_target(self, target):
253
        rules_to_test = self._get_rules_to_test(target)
254
        if not rules_to_test:
255
            self._matching_rule_found = False
256
            logging.error("No matching rule ID found for '{0}'".format(target))
257
            return
258
        self._matching_rule_found = True
259
260
        scenarios_by_rule = dict()
261
        for rule in rules_to_test:
262
            rule_scenarios = self._get_scenarios(
263
                rule.directory, rule.files, self.scenarios_regex,
264
                self.benchmark_cpes)
265
            scenarios_by_rule[rule.id] = rule_scenarios
266
267
        self._prepare_environment(scenarios_by_rule)
268
269
        with test_env.SavedState.create_from_environment(self.test_env, "tests_uploaded") as state:
270
            for rule in rules_to_test:
271
                self.test_rule(state, rule, scenarios_by_rule[rule.id])
272
273
    def _modify_parameters(self, script, params):
274
        if self.scenarios_profile:
275
            params['profiles'] = [self.scenarios_profile]
276
277
        if not params["profiles"]:
278
            params["profiles"].append(OSCAP_PROFILE_ALL_ID)
279
            logging.debug(
280
                "Added the {0} profile to the list of available profiles for {1}"
281
                .format(OSCAP_PROFILE_ALL_ID, script))
282
        return params
283
284
    def _parse_parameters(self, script):
285
        """Parse parameters from script header"""
286
        params = {'profiles': [],
287
                  'templates': [],
288
                  'packages': [],
289
                  'platform': ['multi_platform_all'],
290
                  'remediation': ['all']}
291
        with open(script, 'r') as script_file:
292
            script_content = script_file.read()
293
            for parameter in params:
294
                found = re.search(r'^# {0} = ([ ,_\.\-\w\(\)]*)$'.format(parameter),
295
                                  script_content,
296
                                  re.MULTILINE)
297
                if found is None:
298
                    continue
299
                splitted = found.group(1).split(',')
300
                params[parameter] = [value.strip() for value in splitted]
301
        return params
302
303
    def _get_scenarios(self, rule_dir, scripts, scenarios_regex, benchmark_cpes):
304
        """ Returns only valid scenario files, rest is ignored (is not meant
305
        to be executed directly.
306
        """
307
308
        if scenarios_regex is not None:
309
            scenarios_pattern = re.compile(scenarios_regex)
310
311
        scenarios = []
312
        for script in scripts:
313
            if scenarios_regex is not None:
314
                if scenarios_pattern.match(script) is None:
0 ignored issues
show
introduced by
The variable scenarios_pattern does not seem to be defined in case scenarios_regex is not None on line 308 is False. Are you sure this can never be the case?
Loading history...
315
                    logging.debug("Skipping script %s - it did not match "
316
                                  "--scenarios regex" % script)
317
                    continue
318
            script_context = _get_script_context(script)
319
            if script_context is not None:
320
                script_params = self._parse_parameters(os.path.join(rule_dir, script))
321
                script_params = self._modify_parameters(script, script_params)
322
                if common.matches_platform(script_params["platform"], benchmark_cpes):
323
                    scenarios += [Scenario(script, script_context, script_params)]
324
                else:
325
                    logging.warning("Script %s is not applicable on given platform" % script)
326
327
        return scenarios
328
329
    def _check_rule(self, rule, scenarios, remote_dir, state, remediation_available):
330
        remote_rule_dir = os.path.join(remote_dir, rule.short_id)
331
        logging.info(rule.id)
332
333
        logging.debug("Testing rule directory {0}".format(rule.directory))
334
335
        args_list = [
336
            (s, remote_rule_dir, rule.id, remediation_available) for s in scenarios
337
        ]
338
        state.map_on_top(self._check_and_record_rule_scenario, args_list)
339
340
    def _check_and_record_rule_scenario(self, scenario, remote_rule_dir, rule_id, remediation_available):
341
        self._current_result = common.RuleResult()
342
343
        self._current_result.conditions = common.Scenario_conditions(
344
            self.test_env.name, self.test_env.scanning_mode,
345
            self.remediate_using, self.datastream)
346
        self._current_result.scenario = common.Scenario_run(rule_id, scenario.script)
347
        self._current_result.when = self.test_timestamp_str
348
349
        self._check_rule_scenario(scenario, remote_rule_dir, rule_id, remediation_available)
350
        self.results.append(self._current_result.save_to_dict())
351
352
    def _check_rule_scenario(self, scenario, remote_rule_dir, rule_id, remediation_available):
353
        if not _apply_script(
354
                remote_rule_dir, self.test_env.domain_ip, scenario.script):
355
            logging.error("Environment failed to prepare, skipping test")
356
            self._current_result.record_stage_result("preparation", False)
357
            return
358
359
        self._current_result.record_stage_result("preparation", True)
360
        logging.debug('Using test script {0} with context {1}'
361
                      .format(scenario.script, scenario.context))
362
363
        if scenario.script_params['profiles']:
364
            profiles = get_viable_profiles(
365
                scenario.script_params['profiles'], self.datastream, self.benchmark_id, scenario.script)
366
        else:
367
            # Special case for combined mode when scenario.script_params['profiles']
368
            # is empty which means scenario is not applicable on given profile.
369
            logging.warning('Script {0} is not applicable on given profile'
370
                            .format(scenario.script))
371
            return
372
373
        test_data = dict(scenario=scenario,
374
                         rule_id=rule_id,
375
                         remediation_available=remediation_available)
376
        self.run_test_for_all_profiles(profiles, test_data)
377
378
        self.executed_tests += 1
379
380
    def finalize(self):
381
        super(RuleChecker, self).finalize()
382
        with open(os.path.join(LogHelper.LOG_DIR, "results.json"), "w") as f:
383
            json.dump(self.results, f)
384
385
386
def perform_rule_check(options):
387
    checker = RuleChecker(options.test_env)
388
389
    checker.datastream = options.datastream
390
    checker.benchmark_id = options.benchmark_id
391
    checker.remediate_using = options.remediate_using
392
    checker.dont_clean = options.dont_clean
393
    checker.manual_debug = options.manual_debug
394
    checker.benchmark_cpes = options.benchmark_cpes
395
    checker.scenarios_regex = options.scenarios_regex
396
397
    checker.scenarios_profile = options.scenarios_profile
398
    # check if target is a complete profile ID, if not prepend profile prefix
399
    if (checker.scenarios_profile is not None and
400
            not checker.scenarios_profile.startswith(OSCAP_PROFILE) and
401
            not oscap.is_virtual_oscap_profile(checker.scenarios_profile)):
402
        checker.scenarios_profile = OSCAP_PROFILE+options.scenarios_profile
403
404
    checker.test_target(options.target)
405