tests.ssg_test_suite.rule._apply_script()   A
last analyzed

Complexity

Conditions 3

Size

Total Lines 23
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 19
nop 3
dl 0
loc 23
rs 9.45
c 0
b 0
f 0
1
from __future__ import print_function
2
3
import collections
4
import contextlib
5
import fnmatch
6
import itertools
7
import json
8
import logging
9
import math
10
import os
11
import os.path
12
import re
13
import shutil
14
import subprocess
15
import tempfile
16
17
from ssg.constants import OSCAP_PROFILE, OSCAP_PROFILE_ALL_ID, OSCAP_RULE
18
from ssg_test_suite import oscap
19
from ssg_test_suite import xml_operations
20
from ssg_test_suite import test_env
21
from ssg_test_suite import common
22
from ssg_test_suite.log import LogHelper
23
24
import ssg.templates
25
26
Rule = collections.namedtuple(
27
    "Rule",
28
    ["directory", "id", "short_id", "template", "local_env_yaml", "rule"])
29
30
RuleTestContent = collections.namedtuple(
31
    "RuleTestContent", ["scenarios", "other_content"])
32
33
logging.getLogger(__name__).addHandler(logging.NullHandler())
34
35
36
def get_viable_profiles(selected_profiles, datastream, benchmark, script=None):
37
    """Read datastream, and return set intersection of profiles of given
38
    benchmark and those provided in `selected_profiles` parameter.
39
    """
40
41
    valid_profiles = []
42
    all_profiles_elements = xml_operations.get_all_profiles_in_benchmark(
43
        datastream, benchmark, logging)
44
    all_profiles = [el.attrib["id"] for el in all_profiles_elements]
45
    all_profiles.append(OSCAP_PROFILE_ALL_ID)
46
47
    for ds_profile in all_profiles:
48
        if 'ALL' in selected_profiles:
49
            valid_profiles += [ds_profile]
50
            continue
51
        for sel_profile in selected_profiles:
52
            if ds_profile.endswith(sel_profile):
53
                valid_profiles += [ds_profile]
54
55
    if not valid_profiles:
56
        if script:
57
            logging.warning('Script {0} - profile {1} not found in datastream'
58
                            .format(script, ", ".join(selected_profiles)))
59
        else:
60
            logging.warning('Profile {0} not found in datastream'
61
                            .format(", ".join(selected_profiles)))
62
    return valid_profiles
63
64
65
def generate_xslt_change_value_template(value_short_id, new_value):
66
    XSLT_TEMPLATE = """<xsl:stylesheet version="1.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:ds="http://scap.nist.gov/schema/scap/source/1.2" xmlns:xccdf-1.2="http://checklists.nist.gov/xccdf/1.2">
67
    <xsl:output omit-xml-declaration="yes" indent="yes"/>
68
        <xsl:strip-space elements="*"/>
69
        <xsl:template match="node()|@*">
70
            <xsl:copy>
71
                <xsl:apply-templates select="node()|@*"/>
72
            </xsl:copy>
73
        </xsl:template>
74
        <xsl:template match="ds:component/xccdf-1.2:Benchmark//xccdf-1.2:Value[@id='xccdf_org.ssgproject.content_value_{value_short_id}']/xccdf-1.2:value[not(@selector)]/text()">{new_value}</xsl:template>
75
</xsl:stylesheet>"""
76
    return XSLT_TEMPLATE.format(value_short_id=value_short_id, new_value=new_value)
77
78
79
def _apply_script(rule_dir, test_env, script):
80
    """Run particular test script on VM and log it's output."""
81
    logging.debug("Applying script {0}".format(script))
82
    rule_name = os.path.basename(rule_dir)
83
    log_file_name = os.path.join(
84
        LogHelper.LOG_DIR, rule_name + ".prescripts.log")
85
86
    with open(log_file_name, 'a') as log_file:
87
        log_file.write('##### {0} / {1} #####\n'.format(rule_name, script))
88
        shared_dir = os.path.join(common.REMOTE_TEST_SCENARIOS_DIRECTORY, "shared")
89
        command = "cd {0}; SHARED={1} bash -x {2}".format(rule_dir, shared_dir, script)
90
91
        try:
92
            error_msg_template = (
93
                "Rule '{rule_name}' test setup script '{script}' "
94
                "failed with exit code {{rc}}".format(rule_name=rule_name, script=script)
95
            )
96
            test_env.execute_ssh_command(
97
                command, log_file, error_msg_template=error_msg_template)
98
        except RuntimeError as exc:
99
            logging.error(str(exc))
100
            return False
101
    return True
102
103
104
class RuleChecker(oscap.Checker):
105
    """
106
    Rule checks generally work like this -
107
    for every profile that supports that rule:
108
109
    - Alter the system.
110
    - Run the scan, check that the result meets expectations.
111
      If the test scenario passed as requested, return True,
112
      if it failed or passed unexpectedly, return False.
113
114
    The following sequence applies if the initial scan
115
    has failed as expected:
116
117
    - If there are no remediations, return True.
118
    - Run remediation, return False if it failed.
119
    - Return result of the final scan of remediated system.
120
    """
121
    def __init__(self, test_env):
122
        super(RuleChecker, self).__init__(test_env)
123
124
        self.results = list()
125
        self._current_result = None
126
        self.remote_dir = ""
127
        self.target_type = "rule ID"
128
        self.used_templated_test_scenarios = collections.defaultdict(set)
129
        self.rule_spec = None
130
        self.template_spec = None
131
        self.scenarios_profile = None
132
133
    def _run_test(self, profile, test_data):
134
        scenario = test_data["scenario"]
135
        rule_id = test_data["rule_id"]
136
        remediation_available = test_data["remediation_available"]
137
138
        LogHelper.preload_log(
139
            logging.INFO, "Script {0} using profile {1} OK".format(scenario.script, profile),
140
            log_target='pass')
141
        LogHelper.preload_log(
142
            logging.WARNING, "Script {0} using profile {1} notapplicable".format(scenario.script, profile),
143
            log_target='notapplicable')
144
        LogHelper.preload_log(
145
            logging.ERROR,
146
            "Script {0} using profile {1} found issue:".format(scenario.script, profile),
147
            log_target='fail')
148
149
        runner_cls = oscap.REMEDIATION_RULE_RUNNERS[self.remediate_using]
150
        runner_instance = runner_cls(
151
            self.test_env, oscap.process_profile_id(profile), self.datastream, self.benchmark_id,
152
            rule_id, scenario.script, self.dont_clean, self.no_reports, self.manual_debug)
153
154
        with runner_instance as runner:
155
            initial_scan_res = self._initial_scan_went_ok(runner, rule_id, scenario.context)
156
            if not initial_scan_res:
157
                return False
158
            if initial_scan_res == 2:
159
                # notapplicable
160
                return True
161
162
            supported_and_available_remediations = self._get_available_remediations(scenario)
163
            if (scenario.context not in ['fail', 'error']
164
                    or not supported_and_available_remediations):
165
                return True
166
167
            if remediation_available:
168
                if not self._remediation_went_ok(runner, rule_id):
169
                    return False
170
171
                return self._final_scan_went_ok(runner, rule_id)
172
            else:
173
                msg = ("No remediation is available for rule '{}'."
174
                       .format(rule_id))
175
                logging.warning(msg)
176
                return False
177
178
    def _initial_scan_went_ok(self, runner, rule_id, context):
179
        success = runner.run_stage_with_context("initial", context)
180
        self._current_result.record_stage_result("initial_scan", success)
181
        if not success:
182
            msg = ("The initial scan failed for rule '{}'."
183
                   .format(rule_id))
184
            logging.error(msg)
185
        return success
186
187
    def _is_remediation_available(self, rule):
188
        if xml_operations.find_fix_in_benchmark(
189
                self.datastream, self.benchmark_id, rule.id, self.remediate_using) is None:
190
            return False
191
        else:
192
            return True
193
194
195
    def _get_available_remediations(self, scenario):
196
        is_supported = set(['all'])
197
        is_supported.add(
198
            oscap.REMEDIATION_RUNNER_TO_REMEDIATION_MEANS[self.remediate_using])
199
        supported_and_available_remediations = set(
200
            scenario.script_params['remediation']).intersection(is_supported)
201
        return supported_and_available_remediations
202
203
    def _remediation_went_ok(self, runner, rule_id):
204
        success = runner.run_stage_with_context('remediation', 'fixed')
205
        self._current_result.record_stage_result("remediation", success)
206
        if not success:
207
            msg = ("The remediation failed for rule '{}'."
208
                   .format(rule_id))
209
            logging.error(msg)
210
211
        return success
212
213
    def _final_scan_went_ok(self, runner, rule_id):
214
        success = runner.run_stage_with_context('final', 'pass')
215
        self._current_result.record_stage_result("final_scan", success)
216
        if not success:
217
            msg = ("The check after remediation failed for rule '{}'."
218
                   .format(rule_id))
219
            logging.error(msg)
220
        return success
221
222
223
    def _rule_matches_rule_spec(self, rule_short_id):
224
        rule_id = OSCAP_RULE + rule_short_id
225
        if 'ALL' in self.rule_spec:
226
            return True
227
        else:
228
            for rule_to_be_tested in self.rule_spec:
229
                # we check for a substring
230
                if rule_to_be_tested.startswith(OSCAP_RULE):
231
                    pattern = rule_to_be_tested
232
                else:
233
                    pattern = OSCAP_RULE + rule_to_be_tested
234
                if fnmatch.fnmatch(rule_id, pattern):
235
                    return True
236
            return False
237
238
    def _rule_matches_template_spec(self, template):
239
        return True
240
241
    def _replace_platform_specific_packages(self, packages):
242
        """ Returns the provided package list with names
243
            updated according to the platform alternatives
244
            listed in platform_package_overrides product field """
245
        product_yaml = common.get_product_context(self.test_env.product)
246
        platform_package_overrides = product_yaml["platform_package_overrides"]
247
        packages_with_alternatives = set()
248
        for package in packages:
249
            if package in platform_package_overrides and platform_package_overrides[package]:
250
                packages_with_alternatives.add(platform_package_overrides[package])
251
            else:
252
                packages_with_alternatives.add(package)
253
        return packages_with_alternatives
254
255
    def _ensure_package_present_for_all_scenarios(
256
            self, test_content_by_rule_id):
257
        packages_required = set()
258
259
        for rule_test_content in test_content_by_rule_id.values():
260
            for s in rule_test_content.scenarios:
261
                scenario_packages = s.script_params["packages"]
262
                packages_required.update(scenario_packages)
263
        if packages_required:
264
            packages_to_install = self._replace_platform_specific_packages(packages_required)
265
            common.install_packages(self.test_env, packages_to_install)
266
267
    def _prepare_environment(self, test_content_by_rule_id):
268
        try:
269
            self.remote_dir = common.send_scripts(
270
                self.test_env, test_content_by_rule_id)
271
        except RuntimeError as exc:
272
            msg = "Unable to upload test scripts: {more_info}".format(more_info=str(exc))
273
            raise RuntimeError(msg)
274
275
        self._ensure_package_present_for_all_scenarios(test_content_by_rule_id)
276
277
    def _get_rules_to_test(self):
278
        """
279
        Returns:
280
            List of named tuples Rule having these fields:
281
                directory -- absolute path to the rule "tests" subdirectory
282
                            containing the test scenarios in Bash
283
                id -- full rule id as it is present in datastream
284
                short_id -- short rule ID, the same as basename of the directory
285
                            containing the test scenarios in Bash
286
                template -- name of the template the rule uses
287
                local_env_yaml -- env_yaml specific to rule's own context
288
                rule -- rule class, contains information parsed from rule.yml
289
        """
290
291
        # Here we need to perform some magic to handle parsing the rule (from a
292
        # product perspective) and loading any templated tests. In particular,
293
        # identifying which tests to potentially run involves invoking the
294
        # templating engine.
295
        #
296
        # Begin by loading context about our execution environment, if any.
297
        product = self.test_env.product
298
        product_yaml = common.get_product_context(product)
299
        all_rules_in_benchmark = xml_operations.get_all_rules_in_benchmark(
300
            self.datastream, self.benchmark_id)
301
        rules = []
302
303
        for dirpath, dirnames, filenames in common.walk_through_benchmark_dirs(
304
                product):
305
            if not common.is_rule_dir(dirpath):
306
                continue
307
            short_rule_id = os.path.basename(dirpath)
308
            full_rule_id = OSCAP_RULE + short_rule_id
309
            if not self._rule_matches_rule_spec(short_rule_id):
310
                continue
311
            if full_rule_id not in all_rules_in_benchmark:
312
                # This is a problem only if the user specified the rules to be
313
                # tested explicitly using command line arguments
314
                if self.target_type == "rule ID":
315
                    logging.warning(
316
                        "Rule '{0}' isn't present in benchmark '{1}' in '{2}'"
317
                        .format(
318
                            full_rule_id, self.benchmark_id, self.datastream))
319
                continue
320
321
            # Load the rule itself to check for a template.
322
            rule, local_env_yaml = common.load_rule_and_env(
323
                dirpath, product_yaml, product)
324
325
            # Before we get too far, we wish to search the rule YAML to see if
326
            # it is applicable to the current product. If we have a product
327
            # and the rule isn't applicable for the product, there's no point
328
            # in continuing with the rest of the loading. This should speed up
329
            # the loading of the templated tests. Note that we've already
330
            # parsed the prodtype into local_env_yaml
331
            if product and local_env_yaml['products']:
332
                prodtypes = local_env_yaml['products']
333
                if "all" not in prodtypes and product not in prodtypes:
334
                    continue
335
336
            tests_dir = os.path.join(dirpath, "tests")
337
            template_name = None
338
            if rule.template and rule.template['vars']:
339
                template_name = rule.template['name']
340
            if not self._rule_matches_template_spec(template_name):
341
                continue
342
            result = Rule(
343
                directory=tests_dir, id=full_rule_id,
344
                short_id=short_rule_id, template=template_name,
345
                local_env_yaml=local_env_yaml, rule=rule)
346
            rules.append(result)
347
        return rules
348
349
    def test_rule(self, state, rule, scenarios):
350
        remediation_available = self._is_remediation_available(rule)
351
        self._check_rule(
352
            rule, scenarios,
353
            self.remote_dir, state, remediation_available)
354
355
    def _slice_sbr(self, test_content_by_rule_id, slice_current, slice_total):
356
        """  Returns only a subset of test scenarios, representing slice_current-th
357
        slice out of slice_total"""
358
359
        tuple_repr = []
360
        for rule_id in test_content_by_rule_id:
361
            tuple_repr += itertools.product(
362
                [rule_id], test_content_by_rule_id[rule_id].scenarios)
363
364
        total_scenarios = len(tuple_repr)
365
        slice_low_bound = math.ceil(total_scenarios / slice_total * (slice_current - 1))
366
        slice_high_bound = math.ceil(total_scenarios / slice_total * slice_current)
367
368
        new_sbr = {}
369
        for rule_id, scenario in tuple_repr[slice_low_bound:slice_high_bound]:
370
            try:
371
                new_sbr[rule_id].scenarios.append(scenario)
372
            except KeyError:
373
                scenarios = [scenario]
374
                other_content = test_content_by_rule_id[rule_id].other_content
375
                new_sbr[rule_id] = RuleTestContent(scenarios, other_content)
376
        return new_sbr
377
378
    def _find_tests_paths(self, rule, product_yaml):
379
        # Start by checking for templating tests
380
        templated_tests_paths = common.fetch_templated_tests_paths(
381
            rule, product_yaml)
382
383
        # Add additional tests from the local rule directory. Note that,
384
        # like the behavior in template_tests, this will overwrite any
385
        # templated tests with the same file name.
386
        local_tests_paths = common.fetch_local_tests_paths(rule.directory)
387
388
        for filename in local_tests_paths:
389
            templated_tests_paths.pop(filename, None)
390
        if self.target_type != "template" and not self.test_env.duplicate_templates:
391
            for filename in self.used_templated_test_scenarios[rule.template]:
392
                templated_tests_paths.pop(filename, None)
393
            self.used_templated_test_scenarios[rule.template] |= set(
394
                templated_tests_paths.keys())
395
        return templated_tests_paths.values(), local_tests_paths.values()
396
397
    def _load_all_tests(self, rule):
398
        product_yaml = common.get_product_context(self.test_env.product)
399
400
        templated_tests_paths, local_tests_paths = self._find_tests_paths(
401
            rule, product_yaml)
402
403
        # All tests is a mapping from path (in the tarball) to contents
404
        # of the test case. This is necessary because later code (which
405
        # attempts to parse headers from the test case) don't have easy
406
        # access to templated content. By reading it and returning it
407
        # here, we can save later code from having to understand the
408
        # templating system.
409
        all_tests = dict()
410
        templated_tests = common.load_templated_tests(
411
            templated_tests_paths, rule.rule.template,
412
            rule.local_env_yaml)
413
        local_tests = common.load_local_tests(
414
            local_tests_paths, rule.local_env_yaml)
415
        all_tests.update(templated_tests)
416
        all_tests.update(local_tests)
417
        return all_tests
418
419
    def _get_rule_test_content(self, rule):
420
        all_tests = self._load_all_tests(rule)
421
        scenarios = []
422
        other_content = dict()
423
        for file_name, file_content in all_tests.items():
424
            scenario_matches_regex = r'.*\.[^.]*\.sh$'
425
            if re.search(scenario_matches_regex, file_name):
426
                scenario = Scenario(file_name, file_content)
427
                scenario.override_profile(self.scenarios_profile)
428
                if scenario.matches_regex_and_platform(
429
                        self.scenarios_regex, self.benchmark_cpes):
430
                    scenarios.append(scenario)
431
            else:
432
                other_content[file_name] = file_content
433
        return RuleTestContent(scenarios, other_content)
434
435
    def _get_test_content_by_rule_id(self, rules_to_test):
436
        test_content_by_rule_id = dict()
437
        for rule in rules_to_test:
438
            rule_test_content = self._get_rule_test_content(rule)
439
            test_content_by_rule_id[rule.id] = rule_test_content
440
        sliced_test_content_by_rule_id = self._slice_sbr(
441
            test_content_by_rule_id, self.slice_current, self.slice_total)
442
        return sliced_test_content_by_rule_id
443
444
    def _test_target(self):
445
        rules_to_test = self._get_rules_to_test()
446
        source = self.rule_spec
447
        if not self.rule_spec:
448
            source = self.template_spec
449
        if not rules_to_test:
450
            logging.error("No tests found matching the {0}(s) '{1}'".format(
451
                self.target_type,
452
                ", ".join(source)))
453
            return
454
455
        test_content_by_rule_id = self._get_test_content_by_rule_id(
456
            rules_to_test)
457
458
        self._prepare_environment(test_content_by_rule_id)
459
460
        with test_env.SavedState.create_from_environment(
461
                self.test_env, "tests_uploaded") as state:
462
            for rule in rules_to_test:
463
                try:
464
                    scenarios = test_content_by_rule_id[rule.id].scenarios
465
                    self.test_rule(state, rule, scenarios)
466
                except KeyError:
467
                    # rule is not processed in given slice
468
                    pass
469
470
    def _check_rule(self, rule, scenarios, remote_dir, state, remediation_available):
471
        remote_rule_dir = os.path.join(remote_dir, rule.short_id)
472
        logging.info(rule.id)
473
474
        logging.debug("Testing rule directory {0}".format(rule.directory))
475
476
        args_list = [
477
            (s, remote_rule_dir, rule.id, remediation_available) for s in scenarios
478
        ]
479
        state.map_on_top(self._check_and_record_rule_scenario, args_list)
480
481
    def _check_and_record_rule_scenario(self, scenario, remote_rule_dir, rule_id, remediation_available):
482
        self._current_result = common.RuleResult()
483
484
        self._current_result.conditions = common.Scenario_conditions(
485
            self.test_env.name, self.test_env.scanning_mode,
486
            self.remediate_using, self.datastream)
487
        self._current_result.scenario = common.Scenario_run(rule_id, scenario.script)
488
        self._current_result.when = self.test_timestamp_str
489
490
        with self.copy_of_datastream():
491
            self._check_rule_scenario(scenario, remote_rule_dir, rule_id, remediation_available)
492
        self.results.append(self._current_result.save_to_dict())
493
494
    @contextlib.contextmanager
495
    def copy_of_datastream(self, new_filename=None):
496
        prefixed_name = common.get_prefixed_name("ds_modified")
497
        old_filename = self.datastream
498
        if not new_filename:
499
            descriptor, new_filename = tempfile.mkstemp(prefix=prefixed_name, dir="/tmp")
500
        os.close(descriptor)
0 ignored issues
show
introduced by
The variable descriptor does not seem to be defined in case BooleanNotNode on line 498 is False. Are you sure this can never be the case?
Loading history...
501
        shutil.copy(old_filename, new_filename)
502
        self.datastream = new_filename
503
        yield new_filename
504
        self.datastream = old_filename
505
        os.unlink(new_filename)
506
507
    def _change_variable_value(self, varname, value):
508
        descriptor, xslt_filename = tempfile.mkstemp(prefix="xslt-change-value", dir="/tmp")
509
        os.close(descriptor)
510
        template = generate_xslt_change_value_template(varname, value)
511
        with open(xslt_filename, "w") as fp:
512
            fp.write(template)
513
        descriptor, temp_datastream = tempfile.mkstemp(prefix="ds-temp", dir="/tmp")
514
        os.close(descriptor)
515
        log_file_name = os.path.join(LogHelper.LOG_DIR, "env-preparation.log")
516
        with open(log_file_name, "a") as log_file:
517
            result = common.run_with_stdout_logging(
518
                    "xsltproc", ("--output", temp_datastream, xslt_filename, self.datastream),
519
                    log_file)
520
            if result.returncode:
521
                msg = (
522
                    "Error changing value of '{varname}': {stderr}"
523
                    .format(varname=varname, stderr=result.stderr)
524
                )
525
                raise RuntimeError(msg)
526
        os.rename(temp_datastream, self.datastream)
527
        os.unlink(xslt_filename)
528
529
    def _verify_rule_presence(self, rule_id, script, profiles):
530
        for profile_id in profiles:
531
            if profile_id == OSCAP_PROFILE_ALL_ID:
532
                continue
533
            rules_in_profile = xml_operations.get_all_rule_ids_in_profile(
534
                self.datastream, self.benchmark_id, profile_id, logging)
535
            short_rule_id = rule_id.replace(OSCAP_RULE, "")
536
            if short_rule_id not in rules_in_profile:
537
                logging.warning(
538
                    "Rule {0} isn't part of profile {1} requested by "
539
                    "script {2}.".format(rule_id, profile_id, script)
540
                )
541
                return False
542
        return True
543
544
545
    def _check_rule_scenario(self, scenario, remote_rule_dir, rule_id, remediation_available):
546
        if not _apply_script(
547
                remote_rule_dir, self.test_env, scenario.script):
548
            logging.error("Environment failed to prepare, skipping test")
549
            self._current_result.record_stage_result("preparation", False)
550
            return
551
552
        if scenario.script_params["variables"]:
553
            for assignment in scenario.script_params["variables"]:
554
                varname, value = assignment.split("=", 1)
555
                self._change_variable_value(varname, value)
556
        self._current_result.record_stage_result("preparation", True)
557
        logging.debug('Using test script {0} with context {1}'
558
                      .format(scenario.script, scenario.context))
559
560
        profiles = get_viable_profiles(
561
            scenario.script_params['profiles'],
562
            self.datastream, self.benchmark_id, scenario.script)
563
        logging.debug("viable profiles are {0}".format(profiles))
564
        if not self._verify_rule_presence(rule_id, scenario.script, profiles):
565
            return
566
        test_data = dict(scenario=scenario,
567
                         rule_id=rule_id,
568
                         remediation_available=remediation_available)
569
        self.run_test_for_all_profiles(profiles, test_data)
570
571
        self.executed_tests += 1
572
573
    def finalize(self):
574
        super(RuleChecker, self).finalize()
575
        with open(os.path.join(LogHelper.LOG_DIR, "results.json"), "w") as f:
576
            json.dump(self.results, f)
577
578
579
class Scenario():
580
    def __init__(self, script, script_contents):
581
        self.script = script
582
        self.context = self._get_script_context()
583
        self.contents = script_contents
584
        self.script_params = self._parse_parameters()
585
586
    def _get_script_context(self):
587
        """Return context of the script."""
588
        result = re.search(r'.*\.([^.]*)\.[^.]*$', self.script)
589
        if result is None:
590
            return None
591
        return result.group(1)
592
593
    def _parse_parameters(self):
594
        """Parse parameters from script header"""
595
        params = {
596
            'profiles': [],
597
            'templates': [],
598
            'packages': [],
599
            'platform': ['multi_platform_all'],
600
            'remediation': ['all'],
601
            'variables': [],
602
        }
603
604
        for parameter in params:
605
            found = re.search(
606
                r'^# {0} = (.*)$'.format(parameter),
607
                self.contents, re.MULTILINE)
608
            if found is None:
609
                continue
610
            if parameter == "variables":
611
                variables = []
612
                for token in found.group(1).split(','):
613
                    token = token.strip()
614
                    if '=' in token:
615
                        variables.append(token)
616
                    else:
617
                        variables[-1] += "," + token
618
                params["variables"] = variables
619
            else:
620
                splitted = found.group(1).split(',')
621
                params[parameter] = [value.strip() for value in splitted]
622
623
        if not params["profiles"]:
624
            params["profiles"].append(OSCAP_PROFILE_ALL_ID)
625
            logging.debug(
626
                "Added the {0} profile to the list of available profiles "
627
                "for {1}"
628
                .format(OSCAP_PROFILE_ALL_ID, self.script))
629
630
        return params
631
632
    def override_profile(self, scenarios_profile):
633
        if scenarios_profile:
634
            self.script_params['profiles'] = [scenarios_profile]
635
636
    def matches_regex(self, scenarios_regex):
637
        if scenarios_regex is not None:
638
            scenarios_pattern = re.compile(scenarios_regex)
639
            if scenarios_pattern.match(self.script) is None:
640
                logging.debug(
641
                    "Skipping script %s - it did not match "
642
                    "--scenarios regex" % self.script
643
                )
644
                return False
645
        return True
646
647
    def matches_platform(self, benchmark_cpes):
648
        if self.context is None:
649
            return False
650
        if common.matches_platform(
651
                self.script_params["platform"], benchmark_cpes):
652
            return True
653
        else:
654
            logging.warning(
655
                "Script %s is not applicable on given platform" %
656
                self.script)
657
            return False
658
659
    def matches_regex_and_platform(self, scenarios_regex, benchmark_cpes):
660
        return (
661
            self.matches_regex(scenarios_regex)
662
            and self.matches_platform(benchmark_cpes))
663
664
665
def perform_rule_check(options):
666
    checker = RuleChecker(options.test_env)
667
668
    checker.datastream = options.datastream
669
    checker.benchmark_id = options.benchmark_id
670
    checker.remediate_using = options.remediate_using
671
    checker.dont_clean = options.dont_clean
672
    checker.no_reports = options.no_reports
673
    checker.manual_debug = options.manual_debug
674
    checker.benchmark_cpes = options.benchmark_cpes
675
    checker.scenarios_regex = options.scenarios_regex
676
    checker.slice_current = options.slice_current
677
    checker.slice_total = options.slice_total
678
    checker.keep_snapshots = options.keep_snapshots
679
    checker.rule_spec = options.target
680
    checker.template_spec = None
681
    checker.scenarios_profile = options.scenarios_profile
682
    # check if target is a complete profile ID, if not prepend profile prefix
683
    if (checker.scenarios_profile is not None and
684
            not checker.scenarios_profile.startswith(OSCAP_PROFILE) and
685
            not oscap.is_virtual_oscap_profile(checker.scenarios_profile)):
686
        checker.scenarios_profile = OSCAP_PROFILE+options.scenarios_profile
687
    checker.test_target()
688