Passed
Push — master ( 27db2b...ca6273 )
by Jan
02:21
created

RuleChecker._change_variable_value()   A

Complexity

Conditions 3

Size

Total Lines 13
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12

Importance

Changes 0
Metric Value
cc 3
eloc 13
nop 3
dl 0
loc 13
ccs 0
cts 11
cp 0
crap 12
rs 9.75
c 0
b 0
f 0
1
from __future__ import print_function
2
3
import logging
4
import os
5
import shutil
6
import os.path
7
import re
8
import subprocess
9
import collections
10
import json
11
import fnmatch
12
import tempfile
13
import contextlib
14
15
from ssg.constants import OSCAP_PROFILE, OSCAP_PROFILE_ALL_ID, OSCAP_RULE
16
from ssg_test_suite import oscap
17
from ssg_test_suite import xml_operations
18
from ssg_test_suite import test_env
19
from ssg_test_suite import common
20
from ssg_test_suite.log import LogHelper
21
22
23
logging.getLogger(__name__).addHandler(logging.NullHandler())
24
25
26
Scenario = collections.namedtuple(
27
    "Scenario", ["script", "context", "script_params"])
28
29
30
def get_viable_profiles(selected_profiles, datastream, benchmark, script=None):
31
    """Read datastream, and return set intersection of profiles of given
32
    benchmark and those provided in `selected_profiles` parameter.
33
    """
34
35
    valid_profiles = []
36
    all_profiles_elements = xml_operations.get_all_profiles_in_benchmark(
37
        datastream, benchmark, logging)
38
    all_profiles = [el.attrib["id"] for el in all_profiles_elements]
39
    all_profiles.append(OSCAP_PROFILE_ALL_ID)
40
41
    for ds_profile in all_profiles:
42
        if 'ALL' in selected_profiles:
43
            valid_profiles += [ds_profile]
44
            continue
45
        for sel_profile in selected_profiles:
46
            if ds_profile.endswith(sel_profile):
47
                valid_profiles += [ds_profile]
48
49
    if not valid_profiles:
50
        if script:
51
            logging.warning('Script {0} - profile {1} not found in datastream'
52
                            .format(script, ", ".join(selected_profiles)))
53
        else:
54
            logging.warning('Profile {0} not found in datastream'
55
                            .format(", ".join(selected_profiles)))
56
    return valid_profiles
57
58
59
def generate_xslt_change_value_template(value_short_id, new_value):
60
    XSLT_TEMPLATE = """<xsl:stylesheet version="1.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:ds="http://scap.nist.gov/schema/scap/source/1.2" xmlns:xccdf-1.2="http://checklists.nist.gov/xccdf/1.2">
61
    <xsl:output omit-xml-declaration="yes" indent="yes"/>
62
        <xsl:strip-space elements="*"/>
63
        <xsl:template match="node()|@*">
64
            <xsl:copy>
65
                <xsl:apply-templates select="node()|@*"/>
66
            </xsl:copy>
67
        </xsl:template>
68
        <xsl:template match="ds:component/xccdf-1.2:Benchmark//xccdf-1.2:Value[@id='xccdf_org.ssgproject.content_value_{value_short_id}']/xccdf-1.2:value[not(@selector)]/text()">{new_value}</xsl:template>
69
</xsl:stylesheet>"""
70
    return XSLT_TEMPLATE.format(value_short_id=value_short_id, new_value=new_value)
71
72
73
def _apply_script(rule_dir, test_env, script):
74
    """Run particular test script on VM and log it's output."""
75
    logging.debug("Applying script {0}".format(script))
76
    rule_name = os.path.basename(rule_dir)
77
    log_file_name = os.path.join(
78
        LogHelper.LOG_DIR, rule_name + ".prescripts.log")
79
80
    with open(log_file_name, 'a') as log_file:
81
        log_file.write('##### {0} / {1} #####\n'.format(rule_name, script))
82
        shared_dir = os.path.join(common.REMOTE_TEST_SCENARIOS_DIRECTORY, "shared")
83
        command = "cd {0}; SHARED={1} bash -x {2}".format(rule_dir, shared_dir, script)
84
85
        try:
86
            test_env.execute_ssh_command(command, log_file)
87
        except subprocess.CalledProcessError as exc:
88
            logging.error("Rule testing script {script} failed with exit code {rc}"
89
                          .format(script=script, rc=exc.returncode))
90
            return False
91
    return True
92
93
94
def _get_script_context(script):
95
    """Return context of the script."""
96
    result = re.search(r'.*\.([^.]*)\.[^.]*$', script)
97
    if result is None:
98
        return None
99
    return result.group(1)
100
101
102
class RuleChecker(oscap.Checker):
103
    """
104
    Rule checks generally work like this -
105
    for every profile that supports that rule:
106
107
    - Alter the system.
108
    - Run the scan, check that the result meets expectations.
109
      If the test scenario passed as requested, return True,
110
      if it failed or passed unexpectedly, return False.
111
112
    The following sequence applies if the initial scan
113
    has failed as expected:
114
115
    - If there are no remediations, return True.
116
    - Run remediation, return False if it failed.
117
    - Return result of the final scan of remediated system.
118
    """
119
    def __init__(self, test_env):
120
        super(RuleChecker, self).__init__(test_env)
121
        self._matching_rule_found = False
122
123
        self.results = list()
124
        self._current_result = None
125
        self.remote_dir = ""
126
127
    def _run_test(self, profile, test_data):
128
        scenario = test_data["scenario"]
129
        rule_id = test_data["rule_id"]
130
        remediation_available = test_data["remediation_available"]
131
132
        LogHelper.preload_log(
133
            logging.INFO, "Script {0} using profile {1} OK".format(scenario.script, profile),
134
            log_target='pass')
135
        LogHelper.preload_log(
136
            logging.WARNING, "Script {0} using profile {1} notapplicable".format(scenario.script, profile),
137
            log_target='notapplicable')
138
        LogHelper.preload_log(
139
            logging.ERROR,
140
            "Script {0} using profile {1} found issue:".format(scenario.script, profile),
141
            log_target='fail')
142
143
        runner_cls = oscap.REMEDIATION_RULE_RUNNERS[self.remediate_using]
144
        runner = runner_cls(
145
            self.test_env, oscap.process_profile_id(profile), self.datastream, self.benchmark_id,
146
            rule_id, scenario.script, self.dont_clean, self.manual_debug)
147
        initial_scan_res = self._initial_scan_went_ok(runner, rule_id, scenario.context)
148
        if not initial_scan_res:
149
            return False
150
        if initial_scan_res == 2:
151
            # notapplicable
152
            return True
153
154
        supported_and_available_remediations = self._get_available_remediations(scenario)
155
        if (scenario.context not in ['fail', 'error']
156
                or not supported_and_available_remediations):
157
            return True
158
159
        if remediation_available:
160
            if not self._remediation_went_ok(runner, rule_id):
161
                return False
162
163
            return self._final_scan_went_ok(runner, rule_id)
164
        else:
165
            msg = ("No remediation is available for rule '{}'."
166
                   .format(rule_id))
167
            logging.warning(msg)
168
            return False
169
170
    def _initial_scan_went_ok(self, runner, rule_id, context):
171
        success = runner.run_stage_with_context("initial", context)
172
        self._current_result.record_stage_result("initial_scan", success)
173
        if not success:
174
            msg = ("The initial scan failed for rule '{}'."
175
                   .format(rule_id))
176
            logging.error(msg)
177
        return success
178
179
    def _is_remediation_available(self, rule):
180
        if xml_operations.find_fix_in_benchmark(
181
                self.datastream, self.benchmark_id, rule.id, self.remediate_using) is None:
182
            return False
183
        else:
184
            return True
185
186
187
    def _get_available_remediations(self, scenario):
188
        is_supported = set(['all'])
189
        is_supported.add(
190
            oscap.REMEDIATION_RUNNER_TO_REMEDIATION_MEANS[self.remediate_using])
191
        supported_and_available_remediations = set(
192
            scenario.script_params['remediation']).intersection(is_supported)
193
        return supported_and_available_remediations
194
195
    def _remediation_went_ok(self, runner, rule_id):
196
        success = runner.run_stage_with_context('remediation', 'fixed')
197
        self._current_result.record_stage_result("remediation", success)
198
        if not success:
199
            msg = ("The remediation failed for rule '{}'."
200
                   .format(rule_id))
201
            logging.error(msg)
202
203
        return success
204
205
    def _final_scan_went_ok(self, runner, rule_id):
206
        success = runner.run_stage_with_context('final', 'pass')
207
        self._current_result.record_stage_result("final_scan", success)
208
        if not success:
209
            msg = ("The check after remediation failed for rule '{}'."
210
                   .format(rule_id))
211
            logging.error(msg)
212
        return success
213
214
    def _rule_should_be_tested(self, rule, rules_to_be_tested):
215
        if 'ALL' in rules_to_be_tested:
216
            return True
217
        else:
218
            for rule_to_be_tested in rules_to_be_tested:
219
                # we check for a substring
220
                if rule_to_be_tested.startswith(OSCAP_RULE):
221
                    pattern = rule_to_be_tested
222
                else:
223
                    pattern = OSCAP_RULE + rule_to_be_tested
224
                if fnmatch.fnmatch(rule.id, pattern):
225
                    return True
226
            return False
227
228
    def _ensure_package_present_for_all_scenarios(self, scenarios_by_rule):
229
        packages_required = set()
230
        for rule, scenarios in scenarios_by_rule.items():
231
            for s in scenarios:
232
                scenario_packages = s.script_params["packages"]
233
                packages_required.update(scenario_packages)
234
        if packages_required:
235
            common.install_packages(self.test_env, packages_required)
236
237
    def _prepare_environment(self, scenarios_by_rule):
238
        domain_ip = self.test_env.domain_ip
239
        try:
240
            self.remote_dir = common.send_scripts(self.test_env)
241
        except RuntimeError as exc:
242
            msg = "Unable to upload test scripts: {more_info}".format(more_info=str(exc))
243
            raise RuntimeError(msg)
244
245
        self._ensure_package_present_for_all_scenarios(scenarios_by_rule)
246
247
    def _get_rules_to_test(self, target):
248
        rules_to_test = []
249
        for rule in common.iterate_over_rules():
250
            if not self._rule_should_be_tested(rule, target):
251
                continue
252
            if not xml_operations.find_rule_in_benchmark(
253
                    self.datastream, self.benchmark_id, rule.id):
254
                logging.error(
255
                    "Rule '{0}' isn't present in benchmark '{1}' in '{2}'"
256
                    .format(rule.id, self.benchmark_id, self.datastream))
257
                continue
258
            rules_to_test.append(rule)
259
        return rules_to_test
260
261
    def test_rule(self, state, rule, scenarios):
262
        remediation_available = self._is_remediation_available(rule)
263
        self._check_rule(
264
            rule, scenarios,
265
            self.remote_dir, state, remediation_available)
266
267
    def _test_target(self, target):
268
        rules_to_test = self._get_rules_to_test(target)
269
        if not rules_to_test:
270
            self._matching_rule_found = False
271
            logging.error("No matching rule ID found for '{0}'".format(target))
272
            return
273
        self._matching_rule_found = True
274
275
        scenarios_by_rule = dict()
276
        for rule in rules_to_test:
277
            rule_scenarios = self._get_scenarios(
278
                rule.directory, rule.files, self.scenarios_regex,
279
                self.benchmark_cpes)
280
            scenarios_by_rule[rule.id] = rule_scenarios
281
282
        self._prepare_environment(scenarios_by_rule)
283
284
        with test_env.SavedState.create_from_environment(self.test_env, "tests_uploaded") as state:
285
            for rule in rules_to_test:
286
                self.test_rule(state, rule, scenarios_by_rule[rule.id])
287
288
    def _modify_parameters(self, script, params):
289
        if self.scenarios_profile:
290
            params['profiles'] = [self.scenarios_profile]
291
292
        if not params["profiles"]:
293
            params["profiles"].append(OSCAP_PROFILE_ALL_ID)
294
            logging.debug(
295
                "Added the {0} profile to the list of available profiles for {1}"
296
                .format(OSCAP_PROFILE_ALL_ID, script))
297
        return params
298
299
    def _parse_parameters(self, script):
300
        """Parse parameters from script header"""
301
        params = {'profiles': [],
302
                  'templates': [],
303
                  'packages': [],
304
                  'platform': ['multi_platform_all'],
305
                  'remediation': ['all'],
306
                  'variables': [],
307
                  }
308
        with open(script, 'r') as script_file:
309
            script_content = script_file.read()
310
            for parameter in params:
311
                found = re.search(r'^# {0} = ([ =,_\.\-\w\(\)]*)$'.format(parameter),
312
                                  script_content,
313
                                  re.MULTILINE)
314
                if found is None:
315
                    continue
316
                splitted = found.group(1).split(',')
317
                params[parameter] = [value.strip() for value in splitted]
318
        return params
319
320
    def _get_scenarios(self, rule_dir, scripts, scenarios_regex, benchmark_cpes):
321
        """ Returns only valid scenario files, rest is ignored (is not meant
322
        to be executed directly.
323
        """
324
325
        if scenarios_regex is not None:
326
            scenarios_pattern = re.compile(scenarios_regex)
327
328
        scenarios = []
329
        for script in scripts:
330
            if scenarios_regex is not None:
331
                if scenarios_pattern.match(script) is None:
0 ignored issues
show
introduced by
The variable scenarios_pattern does not seem to be defined in case scenarios_regex is not None on line 325 is False. Are you sure this can never be the case?
Loading history...
332
                    logging.debug("Skipping script %s - it did not match "
333
                                  "--scenarios regex" % script)
334
                    continue
335
            script_context = _get_script_context(script)
336
            if script_context is not None:
337
                script_params = self._parse_parameters(os.path.join(rule_dir, script))
338
                script_params = self._modify_parameters(script, script_params)
339
                if common.matches_platform(script_params["platform"], benchmark_cpes):
340
                    scenarios += [Scenario(script, script_context, script_params)]
341
                else:
342
                    logging.warning("Script %s is not applicable on given platform" % script)
343
344
        return scenarios
345
346
    def _check_rule(self, rule, scenarios, remote_dir, state, remediation_available):
347
        remote_rule_dir = os.path.join(remote_dir, rule.short_id)
348
        logging.info(rule.id)
349
350
        logging.debug("Testing rule directory {0}".format(rule.directory))
351
352
        args_list = [
353
            (s, remote_rule_dir, rule.id, remediation_available) for s in scenarios
354
        ]
355
        state.map_on_top(self._check_and_record_rule_scenario, args_list)
356
357
    def _check_and_record_rule_scenario(self, scenario, remote_rule_dir, rule_id, remediation_available):
358
        self._current_result = common.RuleResult()
359
360
        self._current_result.conditions = common.Scenario_conditions(
361
            self.test_env.name, self.test_env.scanning_mode,
362
            self.remediate_using, self.datastream)
363
        self._current_result.scenario = common.Scenario_run(rule_id, scenario.script)
364
        self._current_result.when = self.test_timestamp_str
365
366
        with self.copy_of_datastream():
367
            self._check_rule_scenario(scenario, remote_rule_dir, rule_id, remediation_available)
368
        self.results.append(self._current_result.save_to_dict())
369
370
    @contextlib.contextmanager
371
    def copy_of_datastream(self, new_filename=None):
372
        old_filename = self.datastream
373
        if not new_filename:
374
            _, new_filename = tempfile.mkstemp(prefix="ssgts_ds_modified", dir="/tmp")
375
        shutil.copy(old_filename, new_filename)
376
        self.datastream = new_filename
377
        yield new_filename
378
        self.datastream = old_filename
379
        os.unlink(new_filename)
380
381
    def _change_variable_value(self, varname, value):
382
        _, xslt_filename = tempfile.mkstemp(prefix="xslt-change-value", dir="/tmp")
383
        template = generate_xslt_change_value_template(varname, value)
384
        with open(xslt_filename, "w") as fp:
385
            fp.write(template)
386
        _, temp_datastream = tempfile.mkstemp(prefix="ds-temp", dir="/tmp")
387
        log_file_name = os.path.join(LogHelper.LOG_DIR, "env-preparation.log")
388
        with open(log_file_name, "a") as log_file:
389
            common.run_with_stdout_logging(
390
                    "xsltproc", ("--output", temp_datastream, xslt_filename, self.datastream),
391
                    log_file)
392
        os.rename(temp_datastream, self.datastream)
393
        os.unlink(xslt_filename)
394
395
    def _check_rule_scenario(self, scenario, remote_rule_dir, rule_id, remediation_available):
396
        if not _apply_script(
397
                remote_rule_dir, self.test_env, scenario.script):
398
            logging.error("Environment failed to prepare, skipping test")
399
            self._current_result.record_stage_result("preparation", False)
400
            return
401
402
        if scenario.script_params["variables"]:
403
            for assignment in scenario.script_params["variables"]:
404
                varname, value = assignment.split("=", 1)
405
                self._change_variable_value(varname, value)
406
        self._current_result.record_stage_result("preparation", True)
407
        logging.debug('Using test script {0} with context {1}'
408
                      .format(scenario.script, scenario.context))
409
410
        if scenario.script_params['profiles']:
411
            profiles = get_viable_profiles(
412
                scenario.script_params['profiles'], self.datastream, self.benchmark_id, scenario.script)
413
        else:
414
            # Special case for combined mode when scenario.script_params['profiles']
415
            # is empty which means scenario is not applicable on given profile.
416
            logging.warning('Script {0} is not applicable on given profile'
417
                            .format(scenario.script))
418
            return
419
420
        test_data = dict(scenario=scenario,
421
                         rule_id=rule_id,
422
                         remediation_available=remediation_available)
423
        self.run_test_for_all_profiles(profiles, test_data)
424
425
        self.executed_tests += 1
426
427
    def finalize(self):
428
        super(RuleChecker, self).finalize()
429
        with open(os.path.join(LogHelper.LOG_DIR, "results.json"), "w") as f:
430
            json.dump(self.results, f)
431
432
433
def perform_rule_check(options):
434
    checker = RuleChecker(options.test_env)
435
436
    checker.datastream = options.datastream
437
    checker.benchmark_id = options.benchmark_id
438
    checker.remediate_using = options.remediate_using
439
    checker.dont_clean = options.dont_clean
440
    checker.manual_debug = options.manual_debug
441
    checker.benchmark_cpes = options.benchmark_cpes
442
    checker.scenarios_regex = options.scenarios_regex
443
444
    checker.scenarios_profile = options.scenarios_profile
445
    # check if target is a complete profile ID, if not prepend profile prefix
446
    if (checker.scenarios_profile is not None and
447
            not checker.scenarios_profile.startswith(OSCAP_PROFILE) and
448
            not oscap.is_virtual_oscap_profile(checker.scenarios_profile)):
449
        checker.scenarios_profile = OSCAP_PROFILE+options.scenarios_profile
450
451
    checker.test_target(options.target)
452