Test Failed
Pull Request — master (#7667)
by Marek
02:50
created

tests.ssg_test_suite.rule.RuleChecker._slice_sbr()   A

Complexity

Conditions 4

Size

Total Lines 19
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 0
Metric Value
cc 4
eloc 14
nop 4
dl 0
loc 19
ccs 0
cts 14
cp 0
crap 20
rs 9.7
c 0
b 0
f 0
1
from __future__ import print_function
2
3
import logging
4
import os
5
import shutil
6
import os.path
7
import re
8
import subprocess
9
import collections
10
import json
11
import fnmatch
12
import tempfile
13
import contextlib
14
import itertools
15
import math
16
17
from ssg.constants import OSCAP_PROFILE, OSCAP_PROFILE_ALL_ID, OSCAP_RULE
18
from ssg_test_suite import oscap
19
from ssg_test_suite import xml_operations
20
from ssg_test_suite import test_env
21
from ssg_test_suite import common
22
from ssg_test_suite.log import LogHelper
23
24
25
logging.getLogger(__name__).addHandler(logging.NullHandler())
26
27
28
Scenario = collections.namedtuple(
29
    "Scenario", ["script", "context", "script_params", "contents"])
30
31
32
def get_viable_profiles(selected_profiles, datastream, benchmark, script=None):
33
    """Read datastream, and return set intersection of profiles of given
34
    benchmark and those provided in `selected_profiles` parameter.
35
    """
36
37
    valid_profiles = []
38
    all_profiles_elements = xml_operations.get_all_profiles_in_benchmark(
39
        datastream, benchmark, logging)
40
    all_profiles = [el.attrib["id"] for el in all_profiles_elements]
41
    all_profiles.append(OSCAP_PROFILE_ALL_ID)
42
43
    for ds_profile in all_profiles:
44
        if 'ALL' in selected_profiles:
45
            valid_profiles += [ds_profile]
46
            continue
47
        for sel_profile in selected_profiles:
48
            if ds_profile.endswith(sel_profile):
49
                valid_profiles += [ds_profile]
50
51
    if not valid_profiles:
52
        if script:
53
            logging.warning('Script {0} - profile {1} not found in datastream'
54
                            .format(script, ", ".join(selected_profiles)))
55
        else:
56
            logging.warning('Profile {0} not found in datastream'
57
                            .format(", ".join(selected_profiles)))
58
    return valid_profiles
59
60
61
def generate_xslt_change_value_template(value_short_id, new_value):
62
    XSLT_TEMPLATE = """<xsl:stylesheet version="1.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:ds="http://scap.nist.gov/schema/scap/source/1.2" xmlns:xccdf-1.2="http://checklists.nist.gov/xccdf/1.2">
63
    <xsl:output omit-xml-declaration="yes" indent="yes"/>
64
        <xsl:strip-space elements="*"/>
65
        <xsl:template match="node()|@*">
66
            <xsl:copy>
67
                <xsl:apply-templates select="node()|@*"/>
68
            </xsl:copy>
69
        </xsl:template>
70
        <xsl:template match="ds:component/xccdf-1.2:Benchmark//xccdf-1.2:Value[@id='xccdf_org.ssgproject.content_value_{value_short_id}']/xccdf-1.2:value[not(@selector)]/text()">{new_value}</xsl:template>
71
</xsl:stylesheet>"""
72
    return XSLT_TEMPLATE.format(value_short_id=value_short_id, new_value=new_value)
73
74
75
def _apply_script(rule_dir, test_env, script):
76
    """Run particular test script on VM and log it's output."""
77
    logging.debug("Applying script {0}".format(script))
78
    rule_name = os.path.basename(rule_dir)
79
    log_file_name = os.path.join(
80
        LogHelper.LOG_DIR, rule_name + ".prescripts.log")
81
82
    with open(log_file_name, 'a') as log_file:
83
        log_file.write('##### {0} / {1} #####\n'.format(rule_name, script))
84
        shared_dir = os.path.join(common.REMOTE_TEST_SCENARIOS_DIRECTORY, "shared")
85
        command = "cd {0}; SHARED={1} bash -x {2}".format(rule_dir, shared_dir, script)
86
87
        try:
88
            test_env.execute_ssh_command(command, log_file)
89
        except subprocess.CalledProcessError as exc:
90
            logging.error("Rule testing script {script} failed with exit code {rc}"
91
                          .format(script=script, rc=exc.returncode))
92
            return False
93
    return True
94
95
96
def _get_script_context(script):
97
    """Return context of the script."""
98
    result = re.search(r'.*\.([^.]*)\.[^.]*$', script)
99
    if result is None:
100
        return None
101
    return result.group(1)
102
103
104
class RuleChecker(oscap.Checker):
105
    """
106
    Rule checks generally work like this -
107
    for every profile that supports that rule:
108
109
    - Alter the system.
110
    - Run the scan, check that the result meets expectations.
111
      If the test scenario passed as requested, return True,
112
      if it failed or passed unexpectedly, return False.
113
114
    The following sequence applies if the initial scan
115
    has failed as expected:
116
117
    - If there are no remediations, return True.
118
    - Run remediation, return False if it failed.
119
    - Return result of the final scan of remediated system.
120
    """
121
    def __init__(self, test_env):
122
        super(RuleChecker, self).__init__(test_env)
123
124
        self.results = list()
125
        self._current_result = None
126
        self.remote_dir = ""
127
128
    def _run_test(self, profile, test_data):
129
        scenario = test_data["scenario"]
130
        rule_id = test_data["rule_id"]
131
        remediation_available = test_data["remediation_available"]
132
133
        LogHelper.preload_log(
134
            logging.INFO, "Script {0} using profile {1} OK".format(scenario.script, profile),
135
            log_target='pass')
136
        LogHelper.preload_log(
137
            logging.WARNING, "Script {0} using profile {1} notapplicable".format(scenario.script, profile),
138
            log_target='notapplicable')
139
        LogHelper.preload_log(
140
            logging.ERROR,
141
            "Script {0} using profile {1} found issue:".format(scenario.script, profile),
142
            log_target='fail')
143
144
        runner_cls = oscap.REMEDIATION_RULE_RUNNERS[self.remediate_using]
145
        runner = runner_cls(
146
            self.test_env, oscap.process_profile_id(profile), self.datastream, self.benchmark_id,
147
            rule_id, scenario.script, self.dont_clean, self.no_reports, self.manual_debug)
148
        initial_scan_res = self._initial_scan_went_ok(runner, rule_id, scenario.context)
149
        if not initial_scan_res:
150
            return False
151
        if initial_scan_res == 2:
152
            # notapplicable
153
            return True
154
155
        supported_and_available_remediations = self._get_available_remediations(scenario)
156
        if (scenario.context not in ['fail', 'error']
157
                or not supported_and_available_remediations):
158
            return True
159
160
        if remediation_available:
161
            if not self._remediation_went_ok(runner, rule_id):
162
                return False
163
164
            return self._final_scan_went_ok(runner, rule_id)
165
        else:
166
            msg = ("No remediation is available for rule '{}'."
167
                   .format(rule_id))
168
            logging.warning(msg)
169
            return False
170
171
    def _initial_scan_went_ok(self, runner, rule_id, context):
172
        success = runner.run_stage_with_context("initial", context)
173
        self._current_result.record_stage_result("initial_scan", success)
174
        if not success:
175
            msg = ("The initial scan failed for rule '{}'."
176
                   .format(rule_id))
177
            logging.error(msg)
178
        return success
179
180
    def _is_remediation_available(self, rule):
181
        if xml_operations.find_fix_in_benchmark(
182
                self.datastream, self.benchmark_id, rule.id, self.remediate_using) is None:
183
            return False
184
        else:
185
            return True
186
187
188
    def _get_available_remediations(self, scenario):
189
        is_supported = set(['all'])
190
        is_supported.add(
191
            oscap.REMEDIATION_RUNNER_TO_REMEDIATION_MEANS[self.remediate_using])
192
        supported_and_available_remediations = set(
193
            scenario.script_params['remediation']).intersection(is_supported)
194
        return supported_and_available_remediations
195
196
    def _remediation_went_ok(self, runner, rule_id):
197
        success = runner.run_stage_with_context('remediation', 'fixed')
198
        self._current_result.record_stage_result("remediation", success)
199
        if not success:
200
            msg = ("The remediation failed for rule '{}'."
201
                   .format(rule_id))
202
            logging.error(msg)
203
204
        return success
205
206
    def _final_scan_went_ok(self, runner, rule_id):
207
        success = runner.run_stage_with_context('final', 'pass')
208
        self._current_result.record_stage_result("final_scan", success)
209
        if not success:
210
            msg = ("The check after remediation failed for rule '{}'."
211
                   .format(rule_id))
212
            logging.error(msg)
213
        return success
214
215
    def _rule_template_been_tested(self, rule, tested_templates):
216
        if rule.template is None:
217
            return False
218
        if self.test_env.duplicate_templates:
219
            return False
220
        if rule.template in tested_templates:
221
            return True
222
        tested_templates.add(rule.template)
223
        return False
224
225
    def _rule_should_be_tested(self, rule, rules_to_be_tested, tested_templates):
226
        if 'ALL' in rules_to_be_tested:
227
            # don't select rules that are not present in benchmark
228
            if not xml_operations.find_rule_in_benchmark(
229
                    self.datastream, self.benchmark_id, rule.id):
230
                return False
231
            return not self._rule_template_been_tested(rule, tested_templates)
232
        else:
233
            for rule_to_be_tested in rules_to_be_tested:
234
                # we check for a substring
235
                if rule_to_be_tested.startswith(OSCAP_RULE):
236
                    pattern = rule_to_be_tested
237
                else:
238
                    pattern = OSCAP_RULE + rule_to_be_tested
239
                if fnmatch.fnmatch(rule.id, pattern):
240
                    return not self._rule_template_been_tested(rule, tested_templates)
241
            return False
242
243
    def _ensure_package_present_for_all_scenarios(self, scenarios_by_rule):
244
        packages_required = set()
245
        for rule, scenarios in scenarios_by_rule.items():
246
            for s in scenarios:
247
                scenario_packages = s.script_params["packages"]
248
                packages_required.update(scenario_packages)
249
        if packages_required:
250
            common.install_packages(self.test_env, packages_required)
251
252
    def _prepare_environment(self, scenarios_by_rule):
253
        domain_ip = self.test_env.domain_ip
254
        try:
255
            self.remote_dir = common.send_scripts(self.test_env)
256
        except RuntimeError as exc:
257
            msg = "Unable to upload test scripts: {more_info}".format(more_info=str(exc))
258
            raise RuntimeError(msg)
259
260
        self._ensure_package_present_for_all_scenarios(scenarios_by_rule)
261
262
    def _get_rules_to_test(self, target):
263
        rules_to_test = []
264
        tested_templates = set()
265
        for rule in common.iterate_over_rules(self.test_env.product):
266
            if not self._rule_should_be_tested(rule, target, tested_templates):
267
                continue
268
            if not xml_operations.find_rule_in_benchmark(
269
                    self.datastream, self.benchmark_id, rule.id):
270
                logging.error(
271
                    "Rule '{0}' isn't present in benchmark '{1}' in '{2}'"
272
                    .format(rule.id, self.benchmark_id, self.datastream))
273
                continue
274
            rules_to_test.append(rule)
275
276
        return rules_to_test
277
278
    def test_rule(self, state, rule, scenarios):
279
        remediation_available = self._is_remediation_available(rule)
280
        self._check_rule(
281
            rule, scenarios,
282
            self.remote_dir, state, remediation_available)
283
284
    def _slice_sbr(self, scenarios_by_rule_id, slice_current, slice_total):
285
        """  Returns only a subset of test scenarios, representing slice_current-th
286
        slice out of slice_total"""
287
288
        tuple_repr = []
289
        for rule_id in scenarios_by_rule_id:
290
            tuple_repr += itertools.product([rule_id], scenarios_by_rule_id[rule_id])
291
292
        total_scenarios = len(tuple_repr)
293
        slice_low_bound = math.ceil(total_scenarios / slice_total * (slice_current - 1))
294
        slice_high_bound = math.ceil(total_scenarios / slice_total * slice_current)
295
296
        new_sbr = {}
297
        for rule_id, scenario in tuple_repr[slice_low_bound:slice_high_bound]:
298
            try:
299
                new_sbr[rule_id].append(scenario)
300
            except KeyError:
301
                new_sbr[rule_id] = [scenario]
302
        return new_sbr
303
304
    def _test_target(self, target):
305
        rules_to_test = self._get_rules_to_test(target)
306
        if not rules_to_test:
307
            logging.error("No tests found matching the rule ID(s) '{0}'".format(", ".join(target)))
308
            return
309
310
        scenarios_by_rule_id = dict()
311
        for rule in rules_to_test:
312
            rule_scenarios = self._get_scenarios(
313
                rule.directory, rule.files, self.scenarios_regex,
314
                self.benchmark_cpes)
315
            scenarios_by_rule_id[rule.id] = rule_scenarios
316
        sliced_scenarios_by_rule_id = self._slice_sbr(scenarios_by_rule_id,
317
                                                      self.slice_current,
318
                                                      self.slice_total)
319
320
        self._prepare_environment(sliced_scenarios_by_rule_id)
321
322
        with test_env.SavedState.create_from_environment(self.test_env, "tests_uploaded") as state:
323
            for rule in rules_to_test:
324
                try:
325
                    self.test_rule(state, rule, sliced_scenarios_by_rule_id[rule.id])
326
                except KeyError:
327
                    # rule is not processed in given slice
328
                    pass
329
330
    def _modify_parameters(self, script, params):
331
        if self.scenarios_profile:
332
            params['profiles'] = [self.scenarios_profile]
333
334
        if not params["profiles"]:
335
            params["profiles"].append(OSCAP_PROFILE_ALL_ID)
336
            logging.debug(
337
                "Added the {0} profile to the list of available profiles for {1}"
338
                .format(OSCAP_PROFILE_ALL_ID, script))
339
        return params
340
341
    def _parse_parameters(self, script_content):
342
        """Parse parameters from script header"""
343
        params = {'profiles': [],
344
                  'templates': [],
345
                  'packages': [],
346
                  'platform': ['multi_platform_all'],
347
                  'remediation': ['all'],
348
                  'variables': [],
349
                  }
350
351
        for parameter in params:
352
            found = re.search(r'^# {0} = (.*)$'.format(parameter),
353
                              script_content, re.MULTILINE)
354
            if found is None:
355
                continue
356
            splitted = found.group(1).split(',')
357
            params[parameter] = [value.strip() for value in splitted]
358
359
        return params
360
361
    def _get_scenarios(self, rule_dir, scripts, scenarios_regex, benchmark_cpes):
362
        """ Returns only valid scenario files, rest is ignored (is not meant
363
        to be executed directly.
364
        """
365
366
        if scenarios_regex is not None:
367
            scenarios_pattern = re.compile(scenarios_regex)
368
369
        scenarios = []
370
        for script in scripts:
371
            script_contents = scripts[script]
372
            if scenarios_regex is not None:
373
                if scenarios_pattern.match(script) is None:
0 ignored issues
show
introduced by
The variable scenarios_pattern does not seem to be defined in case scenarios_regex is not None on line 366 is False. Are you sure this can never be the case?
Loading history...
374
                    logging.debug("Skipping script %s - it did not match "
375
                                  "--scenarios regex" % script)
376
                    continue
377
            script_context = _get_script_context(script)
378
            if script_context is not None:
379
                script_params = self._parse_parameters(script_contents)
380
                script_params = self._modify_parameters(script, script_params)
381
                if common.matches_platform(script_params["platform"], benchmark_cpes):
382
                    scenarios += [Scenario(script, script_context, script_params, script_contents)]
383
                else:
384
                    logging.warning("Script %s is not applicable on given platform" % script)
385
386
        return scenarios
387
388
    def _check_rule(self, rule, scenarios, remote_dir, state, remediation_available):
389
        remote_rule_dir = os.path.join(remote_dir, rule.short_id)
390
        logging.info(rule.id)
391
392
        logging.debug("Testing rule directory {0}".format(rule.directory))
393
394
        args_list = [
395
            (s, remote_rule_dir, rule.id, remediation_available) for s in scenarios
396
        ]
397
        state.map_on_top(self._check_and_record_rule_scenario, args_list)
398
399
    def _check_and_record_rule_scenario(self, scenario, remote_rule_dir, rule_id, remediation_available):
400
        self._current_result = common.RuleResult()
401
402
        self._current_result.conditions = common.Scenario_conditions(
403
            self.test_env.name, self.test_env.scanning_mode,
404
            self.remediate_using, self.datastream)
405
        self._current_result.scenario = common.Scenario_run(rule_id, scenario.script)
406
        self._current_result.when = self.test_timestamp_str
407
408
        with self.copy_of_datastream():
409
            self._check_rule_scenario(scenario, remote_rule_dir, rule_id, remediation_available)
410
        self.results.append(self._current_result.save_to_dict())
411
412
    @contextlib.contextmanager
413
    def copy_of_datastream(self, new_filename=None):
414
        old_filename = self.datastream
415
        if not new_filename:
416
            _, new_filename = tempfile.mkstemp(prefix="ssgts_ds_modified", dir="/tmp")
417
        shutil.copy(old_filename, new_filename)
418
        self.datastream = new_filename
419
        yield new_filename
420
        self.datastream = old_filename
421
        os.unlink(new_filename)
422
423
    def _change_variable_value(self, varname, value):
424
        _, xslt_filename = tempfile.mkstemp(prefix="xslt-change-value", dir="/tmp")
425
        template = generate_xslt_change_value_template(varname, value)
426
        with open(xslt_filename, "w") as fp:
427
            fp.write(template)
428
        _, temp_datastream = tempfile.mkstemp(prefix="ds-temp", dir="/tmp")
429
        log_file_name = os.path.join(LogHelper.LOG_DIR, "env-preparation.log")
430
        with open(log_file_name, "a") as log_file:
431
            common.run_with_stdout_logging(
432
                    "xsltproc", ("--output", temp_datastream, xslt_filename, self.datastream),
433
                    log_file)
434
        os.rename(temp_datastream, self.datastream)
435
        os.unlink(xslt_filename)
436
437
    def _check_rule_scenario(self, scenario, remote_rule_dir, rule_id, remediation_available):
438
        if not _apply_script(
439
                remote_rule_dir, self.test_env, scenario.script):
440
            logging.error("Environment failed to prepare, skipping test")
441
            self._current_result.record_stage_result("preparation", False)
442
            return
443
444
        if scenario.script_params["variables"]:
445
            for assignment in scenario.script_params["variables"]:
446
                varname, value = assignment.split("=", 1)
447
                self._change_variable_value(varname, value)
448
        self._current_result.record_stage_result("preparation", True)
449
        logging.debug('Using test script {0} with context {1}'
450
                      .format(scenario.script, scenario.context))
451
452
        if scenario.script_params['profiles']:
453
            profiles = get_viable_profiles(
454
                scenario.script_params['profiles'], self.datastream, self.benchmark_id, scenario.script)
455
        else:
456
            # Special case for combined mode when scenario.script_params['profiles']
457
            # is empty which means scenario is not applicable on given profile.
458
            logging.warning('Script {0} is not applicable on given profile'
459
                            .format(scenario.script))
460
            return
461
462
        test_data = dict(scenario=scenario,
463
                         rule_id=rule_id,
464
                         remediation_available=remediation_available)
465
        self.run_test_for_all_profiles(profiles, test_data)
466
467
        self.executed_tests += 1
468
469
    def finalize(self):
470
        super(RuleChecker, self).finalize()
471
        with open(os.path.join(LogHelper.LOG_DIR, "results.json"), "w") as f:
472
            json.dump(self.results, f)
473
474
475
def perform_rule_check(options):
476
    checker = RuleChecker(options.test_env)
477
478
    checker.datastream = options.datastream
479
    checker.benchmark_id = options.benchmark_id
480
    checker.remediate_using = options.remediate_using
481
    checker.dont_clean = options.dont_clean
482
    checker.no_reports = options.no_reports
483
    checker.manual_debug = options.manual_debug
484
    checker.benchmark_cpes = options.benchmark_cpes
485
    checker.scenarios_regex = options.scenarios_regex
486
    checker.slice_current = options.slice_current
487
    checker.slice_total = options.slice_total
488
489
    checker.scenarios_profile = options.scenarios_profile
490
    # check if target is a complete profile ID, if not prepend profile prefix
491
    if (checker.scenarios_profile is not None and
492
            not checker.scenarios_profile.startswith(OSCAP_PROFILE) and
493
            not oscap.is_virtual_oscap_profile(checker.scenarios_profile)):
494
        checker.scenarios_profile = OSCAP_PROFILE+options.scenarios_profile
495
496
    checker.test_target(options.target)
497