Passed
Push — master ( 42467a...9d011f )
by Matěj
03:19 queued 11s
created

RuleChecker._parse_parameters()   A

Complexity

Conditions 4

Size

Total Lines 18
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 0
Metric Value
cc 4
eloc 17
nop 2
dl 0
loc 18
ccs 0
cts 11
cp 0
crap 20
rs 9.55
c 0
b 0
f 0
1
from __future__ import print_function
2
3
import logging
4
import os
5
import os.path
6
import re
7
import subprocess
8
import collections
9
import json
10
import fnmatch
11
12
from ssg.constants import OSCAP_PROFILE, OSCAP_PROFILE_ALL_ID, OSCAP_RULE
13
from ssg_test_suite import oscap
14
from ssg_test_suite import xml_operations
15
from ssg_test_suite import test_env
16
from ssg_test_suite import common
17
from ssg_test_suite.log import LogHelper
18
19
20
logging.getLogger(__name__).addHandler(logging.NullHandler())
21
22
23
Scenario = collections.namedtuple(
24
    "Scenario", ["script", "context", "script_params"])
25
26
27
def get_viable_profiles(selected_profiles, datastream, benchmark, script=None):
28
    """Read datastream, and return set intersection of profiles of given
29
    benchmark and those provided in `selected_profiles` parameter.
30
    """
31
32
    valid_profiles = []
33
    all_profiles_elements = xml_operations.get_all_profiles_in_benchmark(
34
        datastream, benchmark, logging)
35
    all_profiles = [el.attrib["id"] for el in all_profiles_elements]
36
    all_profiles.append(OSCAP_PROFILE_ALL_ID)
37
38
    for ds_profile in all_profiles:
39
        if 'ALL' in selected_profiles:
40
            valid_profiles += [ds_profile]
41
            continue
42
        for sel_profile in selected_profiles:
43
            if ds_profile.endswith(sel_profile):
44
                valid_profiles += [ds_profile]
45
46
    if not valid_profiles:
47
        if script:
48
            logging.warning('Script {0} - profile {1} not found in datastream'
49
                            .format(script, ", ".join(selected_profiles)))
50
        else:
51
            logging.warning('Profile {0} not found in datastream'
52
                            .format(", ".join(selected_profiles)))
53
    return valid_profiles
54
55
56
def _apply_script(rule_dir, test_env, script):
57
    """Run particular test script on VM and log it's output."""
58
    logging.debug("Applying script {0}".format(script))
59
    rule_name = os.path.basename(rule_dir)
60
    log_file_name = os.path.join(
61
        LogHelper.LOG_DIR, rule_name + ".prescripts.log")
62
63
    with open(log_file_name, 'a') as log_file:
64
        log_file.write('##### {0} / {1} #####\n'.format(rule_name, script))
65
        shared_dir = os.path.join(common.REMOTE_TEST_SCENARIOS_DIRECTORY, "shared")
66
        command = "cd {0}; SHARED={1} bash -x {2}".format(rule_dir, shared_dir, script)
67
68
        try:
69
            test_env.execute_ssh_command(command, log_file)
70
        except subprocess.CalledProcessError as exc:
71
            logging.error("Rule testing script {script} failed with exit code {rc}"
72
                          .format(script=script, rc=exc.returncode))
73
            return False
74
    return True
75
76
77
def _get_script_context(script):
78
    """Return context of the script."""
79
    result = re.search(r'.*\.([^.]*)\.[^.]*$', script)
80
    if result is None:
81
        return None
82
    return result.group(1)
83
84
85
class RuleChecker(oscap.Checker):
86
    """
87
    Rule checks generally work like this -
88
    for every profile that supports that rule:
89
90
    - Alter the system.
91
    - Run the scan, check that the result meets expectations.
92
      If the test scenario passed as requested, return True,
93
      if it failed or passed unexpectedly, return False.
94
95
    The following sequence applies if the initial scan
96
    has failed as expected:
97
98
    - If there are no remediations, return True.
99
    - Run remediation, return False if it failed.
100
    - Return result of the final scan of remediated system.
101
    """
102
    def __init__(self, test_env):
103
        super(RuleChecker, self).__init__(test_env)
104
        self._matching_rule_found = False
105
106
        self.results = list()
107
        self._current_result = None
108
        self.remote_dir = ""
109
110
    def _run_test(self, profile, test_data):
111
        scenario = test_data["scenario"]
112
        rule_id = test_data["rule_id"]
113
        remediation_available = test_data["remediation_available"]
114
115
        LogHelper.preload_log(
116
            logging.INFO, "Script {0} using profile {1} OK".format(scenario.script, profile),
117
            log_target='pass')
118
        LogHelper.preload_log(
119
            logging.WARNING, "Script {0} using profile {1} notapplicable".format(scenario.script, profile),
120
            log_target='notapplicable')
121
        LogHelper.preload_log(
122
            logging.ERROR,
123
            "Script {0} using profile {1} found issue:".format(scenario.script, profile),
124
            log_target='fail')
125
126
        runner_cls = oscap.REMEDIATION_RULE_RUNNERS[self.remediate_using]
127
        runner = runner_cls(
128
            self.test_env, oscap.process_profile_id(profile), self.datastream, self.benchmark_id,
129
            rule_id, scenario.script, self.dont_clean, self.manual_debug)
130
        initial_scan_res = self._initial_scan_went_ok(runner, rule_id, scenario.context)
131
        if not initial_scan_res:
132
            return False
133
        if initial_scan_res == 2:
134
            # notapplicable
135
            return True
136
137
        supported_and_available_remediations = self._get_available_remediations(scenario)
138
        if (scenario.context not in ['fail', 'error']
139
                or not supported_and_available_remediations):
140
            return True
141
142
        if remediation_available:
143
            if not self._remediation_went_ok(runner, rule_id):
144
                return False
145
146
            return self._final_scan_went_ok(runner, rule_id)
147
        else:
148
            msg = ("No remediation is available for rule '{}'."
149
                   .format(rule_id))
150
            logging.warning(msg)
151
            return False
152
153
    def _initial_scan_went_ok(self, runner, rule_id, context):
154
        success = runner.run_stage_with_context("initial", context)
155
        self._current_result.record_stage_result("initial_scan", success)
156
        if not success:
157
            msg = ("The initial scan failed for rule '{}'."
158
                   .format(rule_id))
159
            logging.error(msg)
160
        return success
161
162
    def _is_remediation_available(self, rule):
163
        if xml_operations.find_fix_in_benchmark(
164
                self.datastream, self.benchmark_id, rule.id, self.remediate_using) is None:
165
            return False
166
        else:
167
            return True
168
169
170
    def _get_available_remediations(self, scenario):
171
        is_supported = set(['all'])
172
        is_supported.add(
173
            oscap.REMEDIATION_RUNNER_TO_REMEDIATION_MEANS[self.remediate_using])
174
        supported_and_available_remediations = set(
175
            scenario.script_params['remediation']).intersection(is_supported)
176
        return supported_and_available_remediations
177
178
    def _remediation_went_ok(self, runner, rule_id):
179
        success = runner.run_stage_with_context('remediation', 'fixed')
180
        self._current_result.record_stage_result("remediation", success)
181
        if not success:
182
            msg = ("The remediation failed for rule '{}'."
183
                   .format(rule_id))
184
            logging.error(msg)
185
186
        return success
187
188
    def _final_scan_went_ok(self, runner, rule_id):
189
        success = runner.run_stage_with_context('final', 'pass')
190
        self._current_result.record_stage_result("final_scan", success)
191
        if not success:
192
            msg = ("The check after remediation failed for rule '{}'."
193
                   .format(rule_id))
194
            logging.error(msg)
195
        return success
196
197
    def _rule_should_be_tested(self, rule, rules_to_be_tested):
198
        if 'ALL' in rules_to_be_tested:
199
            return True
200
        else:
201
            for rule_to_be_tested in rules_to_be_tested:
202
                # we check for a substring
203
                if rule_to_be_tested.startswith(OSCAP_RULE):
204
                    pattern = rule_to_be_tested
205
                else:
206
                    pattern = OSCAP_RULE + rule_to_be_tested
207
                if fnmatch.fnmatch(rule.id, pattern):
208
                    return True
209
            return False
210
211
    def _ensure_package_present_for_all_scenarios(self, scenarios_by_rule):
212
        packages_required = set()
213
        for rule, scenarios in scenarios_by_rule.items():
214
            for s in scenarios:
215
                scenario_packages = s.script_params["packages"]
216
                packages_required.update(scenario_packages)
217
        if packages_required:
218
            common.install_packages(self.test_env, packages_required)
219
220
    def _prepare_environment(self, scenarios_by_rule):
221
        domain_ip = self.test_env.domain_ip
222
        try:
223
            self.remote_dir = common.send_scripts(self.test_env)
224
        except RuntimeError as exc:
225
            msg = "Unable to upload test scripts: {more_info}".format(more_info=str(exc))
226
            raise RuntimeError(msg)
227
228
        self._ensure_package_present_for_all_scenarios(scenarios_by_rule)
229
230
    def _get_rules_to_test(self, target):
231
        rules_to_test = []
232
        for rule in common.iterate_over_rules():
233
            if not self._rule_should_be_tested(rule, target):
234
                continue
235
            if not xml_operations.find_rule_in_benchmark(
236
                    self.datastream, self.benchmark_id, rule.id):
237
                logging.error(
238
                    "Rule '{0}' isn't present in benchmark '{1}' in '{2}'"
239
                    .format(rule.id, self.benchmark_id, self.datastream))
240
                continue
241
            rules_to_test.append(rule)
242
        return rules_to_test
243
244
    def test_rule(self, state, rule, scenarios):
245
        remediation_available = self._is_remediation_available(rule)
246
        self._check_rule(
247
            rule, scenarios,
248
            self.remote_dir, state, remediation_available)
249
250
    def _test_target(self, target):
251
        rules_to_test = self._get_rules_to_test(target)
252
        if not rules_to_test:
253
            self._matching_rule_found = False
254
            logging.error("No matching rule ID found for '{0}'".format(target))
255
            return
256
        self._matching_rule_found = True
257
258
        scenarios_by_rule = dict()
259
        for rule in rules_to_test:
260
            rule_scenarios = self._get_scenarios(
261
                rule.directory, rule.files, self.scenarios_regex,
262
                self.benchmark_cpes)
263
            scenarios_by_rule[rule.id] = rule_scenarios
264
265
        self._prepare_environment(scenarios_by_rule)
266
267
        with test_env.SavedState.create_from_environment(self.test_env, "tests_uploaded") as state:
268
            for rule in rules_to_test:
269
                self.test_rule(state, rule, scenarios_by_rule[rule.id])
270
271
    def _modify_parameters(self, script, params):
272
        if self.scenarios_profile:
273
            params['profiles'] = [self.scenarios_profile]
274
275
        if not params["profiles"]:
276
            params["profiles"].append(OSCAP_PROFILE_ALL_ID)
277
            logging.debug(
278
                "Added the {0} profile to the list of available profiles for {1}"
279
                .format(OSCAP_PROFILE_ALL_ID, script))
280
        return params
281
282
    def _parse_parameters(self, script):
283
        """Parse parameters from script header"""
284
        params = {'profiles': [],
285
                  'templates': [],
286
                  'packages': [],
287
                  'platform': ['multi_platform_all'],
288
                  'remediation': ['all']}
289
        with open(script, 'r') as script_file:
290
            script_content = script_file.read()
291
            for parameter in params:
292
                found = re.search(r'^# {0} = ([ ,_\.\-\w\(\)]*)$'.format(parameter),
293
                                  script_content,
294
                                  re.MULTILINE)
295
                if found is None:
296
                    continue
297
                splitted = found.group(1).split(',')
298
                params[parameter] = [value.strip() for value in splitted]
299
        return params
300
301
    def _get_scenarios(self, rule_dir, scripts, scenarios_regex, benchmark_cpes):
302
        """ Returns only valid scenario files, rest is ignored (is not meant
303
        to be executed directly.
304
        """
305
306
        if scenarios_regex is not None:
307
            scenarios_pattern = re.compile(scenarios_regex)
308
309
        scenarios = []
310
        for script in scripts:
311
            if scenarios_regex is not None:
312
                if scenarios_pattern.match(script) is None:
0 ignored issues
show
introduced by
The variable scenarios_pattern does not seem to be defined in case scenarios_regex is not None on line 306 is False. Are you sure this can never be the case?
Loading history...
313
                    logging.debug("Skipping script %s - it did not match "
314
                                  "--scenarios regex" % script)
315
                    continue
316
            script_context = _get_script_context(script)
317
            if script_context is not None:
318
                script_params = self._parse_parameters(os.path.join(rule_dir, script))
319
                script_params = self._modify_parameters(script, script_params)
320
                if common.matches_platform(script_params["platform"], benchmark_cpes):
321
                    scenarios += [Scenario(script, script_context, script_params)]
322
                else:
323
                    logging.warning("Script %s is not applicable on given platform" % script)
324
325
        return scenarios
326
327
    def _check_rule(self, rule, scenarios, remote_dir, state, remediation_available):
328
        remote_rule_dir = os.path.join(remote_dir, rule.short_id)
329
        logging.info(rule.id)
330
331
        logging.debug("Testing rule directory {0}".format(rule.directory))
332
333
        args_list = [
334
            (s, remote_rule_dir, rule.id, remediation_available) for s in scenarios
335
        ]
336
        state.map_on_top(self._check_and_record_rule_scenario, args_list)
337
338
    def _check_and_record_rule_scenario(self, scenario, remote_rule_dir, rule_id, remediation_available):
339
        self._current_result = common.RuleResult()
340
341
        self._current_result.conditions = common.Scenario_conditions(
342
            self.test_env.name, self.test_env.scanning_mode,
343
            self.remediate_using, self.datastream)
344
        self._current_result.scenario = common.Scenario_run(rule_id, scenario.script)
345
        self._current_result.when = self.test_timestamp_str
346
347
        self._check_rule_scenario(scenario, remote_rule_dir, rule_id, remediation_available)
348
        self.results.append(self._current_result.save_to_dict())
349
350
    def _check_rule_scenario(self, scenario, remote_rule_dir, rule_id, remediation_available):
351
        if not _apply_script(
352
                remote_rule_dir, self.test_env, scenario.script):
353
            logging.error("Environment failed to prepare, skipping test")
354
            self._current_result.record_stage_result("preparation", False)
355
            return
356
357
        self._current_result.record_stage_result("preparation", True)
358
        logging.debug('Using test script {0} with context {1}'
359
                      .format(scenario.script, scenario.context))
360
361
        if scenario.script_params['profiles']:
362
            profiles = get_viable_profiles(
363
                scenario.script_params['profiles'], self.datastream, self.benchmark_id, scenario.script)
364
        else:
365
            # Special case for combined mode when scenario.script_params['profiles']
366
            # is empty which means scenario is not applicable on given profile.
367
            logging.warning('Script {0} is not applicable on given profile'
368
                            .format(scenario.script))
369
            return
370
371
        test_data = dict(scenario=scenario,
372
                         rule_id=rule_id,
373
                         remediation_available=remediation_available)
374
        self.run_test_for_all_profiles(profiles, test_data)
375
376
        self.executed_tests += 1
377
378
    def finalize(self):
379
        super(RuleChecker, self).finalize()
380
        with open(os.path.join(LogHelper.LOG_DIR, "results.json"), "w") as f:
381
            json.dump(self.results, f)
382
383
384
def perform_rule_check(options):
385
    checker = RuleChecker(options.test_env)
386
387
    checker.datastream = options.datastream
388
    checker.benchmark_id = options.benchmark_id
389
    checker.remediate_using = options.remediate_using
390
    checker.dont_clean = options.dont_clean
391
    checker.manual_debug = options.manual_debug
392
    checker.benchmark_cpes = options.benchmark_cpes
393
    checker.scenarios_regex = options.scenarios_regex
394
395
    checker.scenarios_profile = options.scenarios_profile
396
    # check if target is a complete profile ID, if not prepend profile prefix
397
    if (checker.scenarios_profile is not None and
398
            not checker.scenarios_profile.startswith(OSCAP_PROFILE) and
399
            not oscap.is_virtual_oscap_profile(checker.scenarios_profile)):
400
        checker.scenarios_profile = OSCAP_PROFILE+options.scenarios_profile
401
402
    checker.test_target(options.target)
403