Passed
Push — master ( 477ce1...9e8f04 )
by Matěj
01:16 queued 10s
created

ssg_test_suite.rule._matches_target()   A

Complexity

Conditions 4

Size

Total Lines 9
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 7
nop 2
dl 0
loc 9
rs 9.2
c 0
b 0
f 0
1
#!/usr/bin/env python2
2
from __future__ import print_function
3
4
import atexit
5
import logging
6
import os
7
import os.path
8
import re
9
import shlex
10
import string
11
import subprocess
12
import sys
13
14
import ssg_test_suite.oscap as oscap
15
import ssg_test_suite.virt
16
from ssg_test_suite import xml_operations
17
from ssg_test_suite.virt import SnapshotStack
18
from ssg_test_suite.log import LogHelper
19
import data
20
21
logging.getLogger(__name__).addHandler(logging.NullHandler())
22
23
24
def _parse_parameters(script):
25
    """Parse parameters from script header"""
26
    params = {'profiles': [],
27
              'templates': [],
28
              'remediation': ['all']}
29
    with open(script, 'r') as script_file:
30
        script_content = script_file.read()
31
        for parameter in params:
32
            found = re.search('^# {0} = ([ ,_\.\-\w]*)$'.format(parameter),
33
                              script_content,
34
                              re.MULTILINE)
35
            if found is None:
36
                continue
37
            params[parameter] = string.split(found.group(1), ', ')
38
    return params
39
40
41
def get_viable_profiles(selected_profiles, datastream, benchmark):
42
    """Read datastream, and return set intersection of profiles of given
43
    benchmark and those provided in `selected_profiles` parameter.
44
    """
45
46
    valid_profiles = []
47
    all_profiles = xml_operations.get_all_profiles_in_benchmark(
48
        datastream, benchmark, logging)
49
    for ds_profile_element in all_profiles:
50
        ds_profile = ds_profile_element.attrib['id']
51
        if 'ALL' in selected_profiles:
52
            valid_profiles += [ds_profile]
53
            continue
54
        for sel_profile in selected_profiles:
55
            if ds_profile.endswith(sel_profile):
56
                valid_profiles += [ds_profile]
57
    if not valid_profiles:
58
        logging.error('No profile matched with "{0}"'
59
                      .format(", ".join(selected_profiles)))
60
    return valid_profiles
61
62
63
def _send_scripts(domain_ip):
64
    remote_dir = './ssgts'
65
    archive_file = data.create_tarball('.')
66
    remote_archive_file = os.path.join(remote_dir, archive_file)
67
    machine = "root@{0}".format(domain_ip)
68
    logging.debug("Uploading scripts.")
69
    log_file_name = os.path.join(LogHelper.LOG_DIR, "data.upload.log")
70
71
    command = "ssh {0} mkdir -p {1}".format(machine, remote_dir)
72
    with open(log_file_name, 'a') as log_file:
73
        try:
74
            subprocess.check_call(shlex.split(command),
75
                                  stdout=log_file,
76
                                  stderr=subprocess.STDOUT)
77
        except subprocess.CalledProcessError as e:
78
            logging.error("Cannot create directory {0}.".format(remote_dir))
79
            return False
80
81
        command = "scp {0} {1}:{2}".format(archive_file, machine, remote_dir)
82
        subprocess.check_call(shlex.split(command),
83
                              stdout=log_file,
84
                              stderr=subprocess.STDOUT)
85
86
        command = "ssh {0} tar xf {1} -C {2}".format(machine, remote_archive_file, remote_dir)
87
        try:
88
            subprocess.check_call(shlex.split(command),
89
                                  stdout=log_file,
90
                                  stderr=subprocess.STDOUT)
91
        except subprocess.CalledProcessError as e:
92
            logging.error("Cannot extract data tarball {0}.".format(remote_archive_file))
93
            return False
94
    return remote_dir
95
96
97
def _apply_script(rule_dir, domain_ip, script):
98
    """Run particular test script on VM and log it's output."""
99
    machine = "root@{0}".format(domain_ip)
100
    logging.debug("Applying script {0}".format(script))
101
    rule_name = os.path.basename(rule_dir)
102
    log_file_name = os.path.join(LogHelper.LOG_DIR,
103
                                 rule_name + ".prescripts.log")
104
105
    with open(log_file_name, 'a') as log_file:
106
        log_file.write('##### {0} / {1} #####\n'.format(rule_name, script))
107
108
        command = "ssh {0} cd {1}; bash -x {2}".format(machine, rule_dir, script)
109
        try:
110
            subprocess.check_call(shlex.split(command),
111
                                  stdout=log_file,
112
                                  stderr=subprocess.STDOUT)
113
        except subprocess.CalledProcessError as e:
114
            logging.error(("Rule testing script {0} "
115
                           "failed with exit code {1}").format(script,
116
                                                               e.returncode))
117
            return False
118
    return True
119
120
121
def _get_script_context(script):
122
    """Return context of the script."""
123
    result = re.search('.*\.([^.]*)\.[^.]*$', script)
124
    if result is None:
125
        return None
126
    return result.group(1)
127
128
129
def _matches_target(rule_dir, targets):
130
    if 'ALL' in targets:
131
        # we want to have them all
132
        return True
133
    else:
134
        for target in targets:
135
            if target in rule_dir:
136
                return True
137
        return False
138
139
140
def _get_scenarios(rule_dir, scripts):
141
    """ Returns only valid scenario files, rest is ignored (is not meant
142
    to be executed directly.
143
    """
144
145
    scenarios = []
146
    for script in scripts:
147
        script_context = _get_script_context(script)
148
        if script_context is not None:
149
            script_params = _parse_parameters(os.path.join(rule_dir, script))
150
            scenarios += [(script, script_context, script_params)]
151
    return scenarios
152
153
154
def perform_rule_check(options):
155
    """Perform rule check.
156
157
    Iterate over rule-testing scenarios and utilize `oscap-ssh` to test every
158
    scenario. Expected result is encoded in scenario file name. In case of
159
    `fail` or `error` results expected, continue with remediation and
160
    reevaluation. Revert system to clean state using snapshots.
161
162
    Return value not defined, textual output and generated reports is the
163
    result.
164
    """
165
    dom = ssg_test_suite.virt.connect_domain(options.hypervisor,
166
                                             options.domain_name)
167
    if dom is None:
168
        sys.exit(1)
169
    snapshot_stack = SnapshotStack(dom)
170
    atexit.register(snapshot_stack.clear)
171
172
    snapshot_stack.create('origin')
173
    ssg_test_suite.virt.start_domain(dom)
174
    domain_ip = ssg_test_suite.virt.determine_ip(dom)
175
    scanned_something = False
176
177
    remote_dir = _send_scripts(domain_ip)
178
    if not remote_dir:
179
        return
180
181
    for rule_dir, rule, scripts in data.iterate_over_rules():
182
        remote_rule_dir = os.path.join(remote_dir, rule_dir)
183
        local_rule_dir = os.path.join(data.DATA_DIR, rule_dir)
184
        if not _matches_target(rule_dir, options.target):
185
            continue
186
        logging.info(rule)
187
        scanned_something = True
188
        logging.debug("Testing rule directory {0}".format(rule_dir))
189
190
        for script, script_context, script_params in _get_scenarios(local_rule_dir, scripts):
191
            logging.debug(('Using test script {0} '
192
                           'with context {1}').format(script, script_context))
193
            snapshot_stack.create('script')
194
            has_worked = False
195
196
            if not _apply_script(remote_rule_dir, domain_ip, script):
197
                logging.error("Environment failed to prepare, skipping test")
198
                snapshot_stack.revert()
199
                continue
200
            profiles = get_viable_profiles(script_params['profiles'],
201
                                           options.datastream,
202
                                           options.benchmark_id)
203
            if len(profiles) > 1:
204
                snapshot_stack.create('profile')
205
            for profile in profiles:
206
                LogHelper.preload_log(logging.INFO,
207
                                      ("Script {0} "
208
                                       "using profile {1} "
209
                                       "OK").format(script,
210
                                                    profile),
211
                                      log_target='pass')
212
                LogHelper.preload_log(logging.ERROR,
213
                                      ("Script {0} "
214
                                       "using profile {1} "
215
                                       "found issue:").format(script,
216
                                                              profile),
217
                                      log_target='fail')
218
                has_worked = True
219
                run_rule_checks(
220
                    domain_ip, profile, options.datastream,
221
                    options.benchmark_id, rule, script_context,
222
                    script, script_params, options.remediate_using,
223
                    options.dont_clean,
224
                )
225
                snapshot_stack.revert(delete=False)
226
            if not has_worked:
227
                logging.error("Nothing has been tested!")
228
            snapshot_stack.delete()
229
            if len(profiles) > 1:
230
                snapshot_stack.revert()
231
    if not scanned_something:
232
        logging.error("Rule {0} has not been found".format(options.target))
233
234
235
def run_rule_checks(
236
        domain_ip, profile, datastream, benchmark_id, rule,
237
        script_context, script_name, script_params, runner, dont_clean):
238
    def oscap_run_rule(stage, context):
239
        return oscap.run_rule(
240
            domain_ip=domain_ip,
241
            profile=profile,
242
            stage=stage,
243
            datastream=datastream,
244
            benchmark_id=benchmark_id,
245
            rule_id=rule,
246
            context=context,
247
            script_name=script_name,
248
            runner=runner,
249
            dont_clean=dont_clean)
250
251
    success = oscap_run_rule('initial', script_context)
252
    if not success:
253
        msg = ("The initial scan failed for rule '{}'."
254
               .format(rule))
255
        logging.error(msg)
256
        return False
257
258
    is_supported = set(['all'])
259
    is_supported.add(
260
        oscap.REMEDIATION_RUNNER_TO_REMEDIATION_MEANS[runner])
261
    supported_and_available_remediations = set(
262
        script_params['remediation']).intersection(is_supported)
263
264
    if (script_context not in ['fail', 'error'] or
265
            len(supported_and_available_remediations) == 0):
266
        return success
267
268
    success = oscap_run_rule('remediation', 'fixed')
269
    if not success:
270
        msg = ("The remediation failed for rule '{}'."
271
               .format(rule))
272
        logging.error(msg)
273
        return success
274
275
    success = oscap_run_rule('final', 'pass')
276
    if not success:
277
        msg = ("The check after remediation failed for rule '{}'."
278
               .format(rule))
279
        logging.error(msg)
280
    return success
281