Passed
Pull Request — master (#3179)
by Matěj
02:26
created

ssg_test_suite.rule.RuleChecker._run_test()   B

Complexity

Conditions 5

Size

Total Lines 29
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 23
nop 3
dl 0
loc 29
rs 8.8613
c 0
b 0
f 0
1
#!/usr/bin/env python2
2
from __future__ import print_function
3
4
import logging
5
import os
6
import os.path
7
import re
8
import subprocess
9
import collections
10
11
import ssg_test_suite.oscap as oscap
12
import ssg_test_suite.virt
13
from ssg_test_suite import xml_operations
14
from ssg_test_suite.log import LogHelper
15
import data
16
17
logging.getLogger(__name__).addHandler(logging.NullHandler())
18
19
20
Scenario = collections.namedtuple(
21
    "Scenario", ["script", "context", "script_params"])
22
23
24
def _parse_parameters(script):
25
    """Parse parameters from script header"""
26
    params = {'profiles': [],
27
              'templates': [],
28
              'remediation': ['all']}
29
    with open(script, 'r') as script_file:
30
        script_content = script_file.read()
31
        for parameter in params:
32
            found = re.search('^# {0} = ([ ,_\.\-\w]*)$'.format(parameter),
33
                              script_content,
34
                              re.MULTILINE)
35
            if found is None:
36
                continue
37
            params[parameter] = found.group(1).split(', ')
38
    return params
39
40
41
def get_viable_profiles(selected_profiles, datastream, benchmark):
42
    """Read datastream, and return set intersection of profiles of given
43
    benchmark and those provided in `selected_profiles` parameter.
44
    """
45
46
    valid_profiles = []
47
    all_profiles = xml_operations.get_all_profiles_in_benchmark(
48
        datastream, benchmark, logging)
49
    for ds_profile_element in all_profiles:
50
        ds_profile = ds_profile_element.attrib['id']
51
        if 'ALL' in selected_profiles:
52
            valid_profiles += [ds_profile]
53
            continue
54
        for sel_profile in selected_profiles:
55
            if ds_profile.endswith(sel_profile):
56
                valid_profiles += [ds_profile]
57
    if not valid_profiles:
58
        logging.error('No profile matched with "{0}"'
59
                      .format(", ".join(selected_profiles)))
60
    return valid_profiles
61
62
63
def _run_with_stdout_logging(command, log_file):
64
    log_file.write(" ".join(command) + "\n")
65
    try:
66
        subprocess.check_call(command,
67
                              stdout=log_file,
68
                              stderr=subprocess.STDOUT)
69
        return True
70
    except subprocess.CalledProcessError as e:
71
        return False
72
73
74
def _send_scripts(domain_ip):
75
    remote_dir = './ssgts'
76
    archive_file = data.create_tarball('.')
77
    remote_archive_file = os.path.join(remote_dir, archive_file)
78
    machine = "root@{0}".format(domain_ip)
79
    logging.debug("Uploading scripts.")
80
    log_file_name = os.path.join(LogHelper.LOG_DIR, "data.upload.log")
81
82
    with open(log_file_name, 'a') as log_file:
83
        command = ("ssh", machine, "mkdir", "-p", remote_dir)
84
        if not _run_with_stdout_logging(command, log_file):
85
            logging.error("Cannot create directory {0}.".format(remote_dir))
86
            return False
87
88
        command = ("scp", archive_file, "{0}:{1}".format(machine, remote_dir))
89
        if not _run_with_stdout_logging(command, log_file):
90
            logging.error("Cannot copy archive {0} to the target machine's directory {1}."
91
                          .format(archive_file, remote_dir))
92
            return False
93
94
        command = ("ssh", machine, "tar xf {0} -C {1}".format(remote_archive_file, remote_dir))
95
        if not _run_with_stdout_logging(command, log_file):
96
            logging.error("Cannot extract data tarball {0}.".format(remote_archive_file))
97
            return False
98
99
    return remote_dir
100
101
102
def _apply_script(rule_dir, domain_ip, script):
103
    """Run particular test script on VM and log it's output."""
104
    machine = "root@{0}".format(domain_ip)
105
    logging.debug("Applying script {0}".format(script))
106
    rule_name = os.path.basename(rule_dir)
107
    log_file_name = os.path.join(
108
        LogHelper.LOG_DIR, rule_name + ".prescripts.log")
109
110
    with open(log_file_name, 'a') as log_file:
111
        log_file.write('##### {0} / {1} #####\n'.format(rule_name, script))
112
113
        command = ("ssh", machine, "cd {0}; bash -x {1}".format(rule_dir, script))
114
        try:
115
            subprocess.check_call(command,
116
                                  stdout=log_file,
117
                                  stderr=subprocess.STDOUT)
118
        except subprocess.CalledProcessError as e:
119
            logging.error("Rule testing script {0} failed with exit code {1}"
120
                          .format(script, e.returncode))
121
            return False
122
    return True
123
124
125
def _get_script_context(script):
126
    """Return context of the script."""
127
    result = re.search('.*\.([^.]*)\.[^.]*$', script)
128
    if result is None:
129
        return None
130
    return result.group(1)
131
132
133
def _matches_target(rule_dir, targets):
134
    if 'ALL' in targets:
135
        # we want to have them all
136
        return True
137
    else:
138
        for target in targets:
139
            if target in rule_dir:
140
                return True
141
        return False
142
143
144
def _get_scenarios(rule_dir, scripts):
145
    """ Returns only valid scenario files, rest is ignored (is not meant
146
    to be executed directly.
147
    """
148
149
    scenarios = []
150
    for script in scripts:
151
        script_context = _get_script_context(script)
152
        if script_context is not None:
153
            script_params = _parse_parameters(os.path.join(rule_dir, script))
154
            scenarios += [Scenario(script, script_context, script_params)]
155
    return scenarios
156
157
158
class RuleChecker(ssg_test_suite.oscap.Checker):
159
    """
160
    Rule checks generally work like this -
161
    for every profile that supports that rule:
162
163
    - Alter the system.
164
    - Run the scan, check that the result meets expectations.
165
      If the test scenario passed as requested, return True,
166
      if it failed or passed unexpectedly, return False.
167
168
    The following sequence applies if the initial scan
169
    has failed as expected:
170
171
    - If there are no remediations, return True.
172
    - Run remediation, return False if it failed.
173
    - Return result of the final scan of remediated system.
174
    """
175
    def __init__(self, test_env):
176
        super(RuleChecker, self).__init__(test_env)
177
        self._matching_rule_found = False
178
179
    def _run_test(self, profile, ** run_test_args):
180
        scenario = run_test_args["scenario"]
181
        rule_id = run_test_args["rule_id"]
182
183
        LogHelper.preload_log(
184
            logging.INFO, "Script {0} using profile {1} OK".format(scenario.script, profile),
185
            log_target='pass')
186
        LogHelper.preload_log(
187
            logging.ERROR,
188
            "Script {0} using profile {1} found issue:".format(scenario.script, profile),
189
            log_target='fail')
190
191
        runner_cls = ssg_test_suite.oscap.REMEDIATION_RULE_RUNNERS[self.remediate_using]
192
        runner = runner_cls(
193
            self.test_env.domain_ip, profile, self.datastream, self.benchmark_id,
194
            rule_id, scenario.script, self.dont_clean)
195
196
        if not self._initial_scan_went_ok(runner, rule_id, scenario.context):
197
            return False
198
199
        supported_and_available_remediations = self._get_available_remediations()
200
        if (scenario.context not in ['fail', 'error']
201
                or not supported_and_available_remediations):
202
            return True
203
204
        if not self._remediation_went_ok(runner, rule_id):
205
            return False
206
207
        return self._final_scan_went_ok(runner, rule_id)
208
209
    def _initial_scan_went_ok(self, runner, rule_id, context):
210
        success = runner.run_stage_with_context("initial", context)
211
        if not success:
212
            msg = ("The initial scan failed for rule '{}'."
213
                   .format(rule_id))
214
            logging.error(msg)
215
            return False
216
217
    def _get_available_remediations(self, scenario):
218
        is_supported = set(['all'])
219
        is_supported.add(
220
            oscap.REMEDIATION_RUNNER_TO_REMEDIATION_MEANS[self.remediate_using])
221
        supported_and_available_remediations = set(
222
            scenario.script_params['remediation']).intersection(is_supported)
223
        return supported_and_available_remediations
224
225
    def _remediation_went_ok(self, runner, rule_id):
226
        success = runner.run_stage_with_context('remediation', 'fixed')
227
        if not success:
228
            msg = ("The remediation failed for rule '{}'."
229
                   .format(rule_id))
230
            logging.error(msg)
231
            return success
232
233
    def _final_scan_went_ok(self, runner, rule_id):
234
        success = runner.run_stage_with_context('final', 'pass')
235
        if not success:
236
            msg = ("The check after remediation failed for rule '{}'."
237
                   .format(rule_id))
238
            logging.error(msg)
239
        return success
240
241
    def _test_target(self, target):
242
        remote_dir = _send_scripts(self.test_env.domain_ip)
243
        if not remote_dir:
244
            msg = "Unable to upload test scripts"
245
            raise RuntimeError(msg)
246
247
        self._matching_rule_found = False
248
249
        for rule in data.iterate_over_rules():
250
            self._check_rule(rule, remote_dir, target)
251
252
        if not self._matching_rule_found:
253
            logging.error("No matching rule ID found for '{0}'".format(target))
254
255
    def _check_rule(self, rule, remote_dir, target):
256
        rule_dir = rule.directory
257
        remote_rule_dir = os.path.join(remote_dir, rule_dir)
258
        local_rule_dir = os.path.join(data.DATA_DIR, rule_dir)
259
260
        if not _matches_target(rule_dir, target):
261
            return
262
263
        logging.info(rule.id)
264
        self._matching_rule_found = True
265
266
        logging.debug("Testing rule directory {0}".format(rule_dir))
267
268
        for scenario in _get_scenarios(local_rule_dir, rule.files):
269
            logging.debug('Using test script {0} with context {1}'
270
                          .format(scenario.script, scenario.context))
271
            with self.test_env.in_layer('script'):
272
                self._check_rule_scenario(scenario, remote_rule_dir, rule.id)
273
            self.executed_tests += 1
274
275
    def _check_rule_scenario(self, scenario, remote_rule_dir, rule_id):
276
        if not _apply_script(
277
                remote_rule_dir, self.test_env.domain_ip, scenario.script):
278
            logging.error("Environment failed to prepare, skipping test")
279
            return
280
281
        profiles = get_viable_profiles(
282
            scenario.script_params['profiles'], self.datastream, self.benchmark_id)
283
        self._test_by_profiles(profiles, scenario=scenario, rule_id=rule_id)
284
285
286
def perform_rule_check(options):
287
    checker = RuleChecker(options.test_env)
288
289
    checker.datastream = options.datastream
290
    checker.benchmark_id = options.benchmark_id
291
    checker.remediate_using = options.remediate_using
292
    checker.dont_clean = options.dont_clean
293
294
    checker.test_target(options.target)
295