Passed
Pull Request — master (#3212)
by Matěj
02:53
created

ssg_test_suite.rule.RuleChecker._test_target()   B

Complexity

Conditions 6

Size

Total Lines 18
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 15
nop 2
dl 0
loc 18
rs 8.6666
c 0
b 0
f 0
1
#!/usr/bin/env python2
2
from __future__ import print_function
3
4
import logging
5
import os
6
import os.path
7
import re
8
import subprocess
9
import collections
10
11
import ssg_test_suite.oscap as oscap
12
import ssg_test_suite.virt
13
from ssg_test_suite import xml_operations
14
from ssg_test_suite import test_env
15
from ssg_test_suite import common
16
from ssg_test_suite.log import LogHelper
17
import data
18
19
logging.getLogger(__name__).addHandler(logging.NullHandler())
20
21
22
Scenario = collections.namedtuple(
23
    "Scenario", ["script", "context", "script_params"])
24
25
26
def _parse_parameters(script):
27
    """Parse parameters from script header"""
28
    params = {'profiles': [],
29
              'templates': [],
30
              'remediation': ['all']}
31
    with open(script, 'r') as script_file:
32
        script_content = script_file.read()
33
        for parameter in params:
34
            found = re.search('^# {0} = ([ ,_\.\-\w]*)$'.format(parameter),
35
                              script_content,
36
                              re.MULTILINE)
37
            if found is None:
38
                continue
39
            params[parameter] = found.group(1).split(', ')
40
    return params
41
42
43
def get_viable_profiles(selected_profiles, datastream, benchmark):
44
    """Read datastream, and return set intersection of profiles of given
45
    benchmark and those provided in `selected_profiles` parameter.
46
    """
47
48
    valid_profiles = []
49
    all_profiles = xml_operations.get_all_profiles_in_benchmark(
50
        datastream, benchmark, logging)
51
    for ds_profile_element in all_profiles:
52
        ds_profile = ds_profile_element.attrib['id']
53
        if 'ALL' in selected_profiles:
54
            valid_profiles += [ds_profile]
55
            continue
56
        for sel_profile in selected_profiles:
57
            if ds_profile.endswith(sel_profile):
58
                valid_profiles += [ds_profile]
59
    if not valid_profiles:
60
        logging.error('No profile ends with "{0}"'
61
                      .format(", ".join(selected_profiles)))
62
    return valid_profiles
63
64
65
def _run_with_stdout_logging(command, args, log_file):
66
    log_file.write("{0} {1}\n".format(command, " ".join(args)))
67
    try:
68
        subprocess.check_call(
69
            (command,) + args, stdout=log_file, stderr=subprocess.STDOUT)
70
        return 0
71
    except subprocess.CalledProcessError as e:
72
        return e.returncode
73
74
75
def _send_scripts(domain_ip):
76
    remote_dir = './ssgts'
77
    archive_file = data.create_tarball('.')
78
    remote_archive_file = os.path.join(remote_dir, archive_file)
79
    machine = "root@{0}".format(domain_ip)
80
    logging.debug("Uploading scripts.")
81
    log_file_name = os.path.join(LogHelper.LOG_DIR, "data.upload.log")
82
83
    with open(log_file_name, 'a') as log_file:
84
        args = common.IGNORE_KNOWN_HOSTS_OPTIONS + (machine, "mkdir", "-p", remote_dir)
85
        try:
86
            _run_with_stdout_logging("ssh", args, log_file)
87
        except Exception:
88
            msg = "Cannot create directory {0}.".format(remote_dir)
89
            logging.error(msg)
90
            raise RuntimeError(msg)
91
92
        args = (common.IGNORE_KNOWN_HOSTS_OPTIONS
93
                + (archive_file, "{0}:{1}".format(machine, remote_dir)))
94
        try:
95
            _run_with_stdout_logging("scp", args, log_file)
96
        except Exception:
97
            msg = ("Cannot copy archive {0} to the target machine's directory {1}."
98
                   .format(archive_file, remote_dir))
99
            logging.error(msg)
100
            raise RuntimeError(msg)
101
102
        args = (common.IGNORE_KNOWN_HOSTS_OPTIONS
103
                + (machine, "tar xf {0} -C {1}".format(remote_archive_file, remote_dir)))
104
        try:
105
            _run_with_stdout_logging("ssh", args, log_file)
106
        except Exception:
107
            msg = "Cannot extract data tarball {0}.".format(remote_archive_file)
108
            logging.error(msg)
109
            raise RuntimeError(msg)
110
111
    return remote_dir
112
113
114
def _apply_script(rule_dir, domain_ip, script):
115
    """Run particular test script on VM and log it's output."""
116
    machine = "root@{0}".format(domain_ip)
117
    logging.debug("Applying script {0}".format(script))
118
    rule_name = os.path.basename(rule_dir)
119
    log_file_name = os.path.join(
120
        LogHelper.LOG_DIR, rule_name + ".prescripts.log")
121
122
    with open(log_file_name, 'a') as log_file:
123
        log_file.write('##### {0} / {1} #####\n'.format(rule_name, script))
124
125
        command = "cd {0}; bash -x {1}".format(rule_dir, script)
126
        args = common.IGNORE_KNOWN_HOSTS_OPTIONS + (machine, command)
127
128
        try:
129
            _run_with_stdout_logging("ssh", args, log_file)
130
        except subprocess.CalledProcessError as exc:
131
            logging.error("Rule testing script {script} failed with exit code {rc}"
132
                          .format(script=script, rc=exc.returncode))
133
            return False
134
    return True
135
136
137
def _get_script_context(script):
138
    """Return context of the script."""
139
    result = re.search('.*\.([^.]*)\.[^.]*$', script)
140
    if result is None:
141
        return None
142
    return result.group(1)
143
144
145
def _matches_target(rule_dir, targets):
146
    if 'ALL' in targets:
147
        # we want to have them all
148
        return True
149
    else:
150
        for target in targets:
151
            if target in rule_dir:
152
                return True
153
        return False
154
155
156
def _get_scenarios(rule_dir, scripts):
157
    """ Returns only valid scenario files, rest is ignored (is not meant
158
    to be executed directly.
159
    """
160
161
    scenarios = []
162
    for script in scripts:
163
        script_context = _get_script_context(script)
164
        if script_context is not None:
165
            script_params = _parse_parameters(os.path.join(rule_dir, script))
166
            scenarios += [Scenario(script, script_context, script_params)]
167
    return scenarios
168
169
170
class RuleChecker(ssg_test_suite.oscap.Checker):
171
    """
172
    Rule checks generally work like this -
173
    for every profile that supports that rule:
174
175
    - Alter the system.
176
    - Run the scan, check that the result meets expectations.
177
      If the test scenario passed as requested, return True,
178
      if it failed or passed unexpectedly, return False.
179
180
    The following sequence applies if the initial scan
181
    has failed as expected:
182
183
    - If there are no remediations, return True.
184
    - Run remediation, return False if it failed.
185
    - Return result of the final scan of remediated system.
186
    """
187
    def __init__(self, test_env):
188
        super(RuleChecker, self).__init__(test_env)
189
        self._matching_rule_found = False
190
191
    def _run_test(self, profile, test_data):
192
        scenario = test_data["scenario"]
193
        rule_id = test_data["rule_id"]
194
195
        LogHelper.preload_log(
196
            logging.INFO, "Script {0} using profile {1} OK".format(scenario.script, profile),
197
            log_target='pass')
198
        LogHelper.preload_log(
199
            logging.ERROR,
200
            "Script {0} using profile {1} found issue:".format(scenario.script, profile),
201
            log_target='fail')
202
203
        runner_cls = ssg_test_suite.oscap.REMEDIATION_RULE_RUNNERS[self.remediate_using]
204
        runner = runner_cls(
205
            self.test_env, profile, self.datastream, self.benchmark_id,
206
            rule_id, scenario.script, self.dont_clean)
207
208
        if not self._initial_scan_went_ok(runner, rule_id, scenario.context):
209
            return False
210
211
        supported_and_available_remediations = self._get_available_remediations()
212
        if (scenario.context not in ['fail', 'error']
213
                or not supported_and_available_remediations):
214
            return True
215
216
        if not self._remediation_went_ok(runner, rule_id):
217
            return False
218
219
        return self._final_scan_went_ok(runner, rule_id)
220
221
    def _initial_scan_went_ok(self, runner, rule_id, context):
222
        success = runner.run_stage_with_context("initial", context)
223
        if not success:
224
            msg = ("The initial scan failed for rule '{}'."
225
                   .format(rule_id))
226
            logging.error(msg)
227
            return False
228
229
    def _get_available_remediations(self, scenario):
230
        is_supported = set(['all'])
231
        is_supported.add(
232
            oscap.REMEDIATION_RUNNER_TO_REMEDIATION_MEANS[self.remediate_using])
233
        supported_and_available_remediations = set(
234
            scenario.script_params['remediation']).intersection(is_supported)
235
        return supported_and_available_remediations
236
237
    def _remediation_went_ok(self, runner, rule_id):
238
        success = runner.run_stage_with_context('remediation', 'fixed')
239
        if not success:
240
            msg = ("The remediation failed for rule '{}'."
241
                   .format(rule_id))
242
            logging.error(msg)
243
            return success
244
245
    def _final_scan_went_ok(self, runner, rule_id):
246
        success = runner.run_stage_with_context('final', 'pass')
247
        if not success:
248
            msg = ("The check after remediation failed for rule '{}'."
249
                   .format(rule_id))
250
            logging.error(msg)
251
        return success
252
253
    def _test_target(self, target):
254
        try:
255
            remote_dir = _send_scripts(self.test_env.domain_ip)
256
        except RuntimeError as exc:
257
            msg = "Unable to upload test scripts: {more_info}".format(more_info=str(exc))
258
            raise RuntimeError(msg)
259
260
        self._matching_rule_found = False
261
262
        with test_env.SavedState.create_from_environment(self.test_env, "tests_uploaded") as state:
263
            for rule in data.iterate_over_rules():
264
                if not _matches_target(rule.directory, target):
265
                    continue
266
                self._matching_rule_found = True
267
                self._check_rule(rule, remote_dir, state)
268
269
        if not self._matching_rule_found:
270
            logging.error("No matching rule ID found for '{0}'".format(target))
271
272
    def _check_rule(self, rule, remote_dir, state):
273
        remote_rule_dir = os.path.join(remote_dir, rule.directory)
274
        local_rule_dir = os.path.join(data.DATA_DIR, rule.directory)
275
276
        logging.info(rule.id)
277
278
        logging.debug("Testing rule directory {0}".format(rule.directory))
279
280
        args_list = [(s, remote_rule_dir, rule.id)
281
                     for s in _get_scenarios(local_rule_dir, rule.files)]
282
        state.map_on_top(self._check_rule_scenario, args_list)
283
284
    def _check_rule_scenario(self, scenario, remote_rule_dir, rule_id):
285
        if not _apply_script(
286
                remote_rule_dir, self.test_env.domain_ip, scenario.script):
287
            logging.error("Environment failed to prepare, skipping test")
288
            return
289
290
        logging.debug('Using test script {0} with context {1}'
291
                      .format(scenario.script, scenario.context))
292
293
        profiles = get_viable_profiles(
294
            scenario.script_params['profiles'], self.datastream, self.benchmark_id)
295
        test_data = dict(scenario=scenario, rule_id=rule_id)
296
        self.run_test_for_all_profiles(profiles, test_data)
297
298
        self.executed_tests += 1
299
300
301
def perform_rule_check(options):
302
    checker = RuleChecker(options.test_env)
303
304
    checker.datastream = options.datastream
305
    checker.benchmark_id = options.benchmark_id
306
    checker.remediate_using = options.remediate_using
307
    checker.dont_clean = options.dont_clean
308
309
    checker.test_target(options.target)
310