|
1
|
|
|
#!/usr/bin/env python |
|
2
|
|
|
from __future__ import print_function |
|
3
|
|
|
|
|
4
|
|
|
import logging |
|
5
|
|
|
import re |
|
6
|
|
|
|
|
7
|
|
|
from ssg.constants import OSCAP_PROFILE |
|
8
|
|
|
from ssg_test_suite import common |
|
9
|
|
|
from ssg_test_suite import rule |
|
10
|
|
|
from ssg_test_suite import xml_operations |
|
11
|
|
|
from ssg_test_suite import test_env |
|
12
|
|
|
|
|
13
|
|
|
|
|
14
|
|
|
class CombinedChecker(rule.RuleChecker): |
|
15
|
|
|
""" |
|
16
|
|
|
Combined mode works like pretty much like the Rule mode - |
|
17
|
|
|
for every rule selected in a profile: |
|
18
|
|
|
|
|
19
|
|
|
- Alter the system. |
|
20
|
|
|
- Run the scan, check that the result meets expectations. |
|
21
|
|
|
If the test scenario passed as requested, return True, |
|
22
|
|
|
if it failed or passed unexpectedly, return False. |
|
23
|
|
|
|
|
24
|
|
|
The following sequence applies if the initial scan |
|
25
|
|
|
has failed as expected: |
|
26
|
|
|
|
|
27
|
|
|
- If there are no remediations, return True. |
|
28
|
|
|
- Run remediation, return False if it failed. |
|
29
|
|
|
- Return result of the final scan of remediated system. |
|
30
|
|
|
|
|
31
|
|
|
If a rule doesn't have any test scenario, it is skipped. |
|
32
|
|
|
Skipped rules are reported at the end. |
|
33
|
|
|
""" |
|
34
|
|
|
def __init__(self, test_env): |
|
35
|
|
|
super(CombinedChecker, self).__init__(test_env) |
|
36
|
|
|
self._matching_rule_found = False |
|
37
|
|
|
|
|
38
|
|
|
self.rules_not_tested_yet = set() |
|
39
|
|
|
self.results = list() |
|
40
|
|
|
self._current_result = None |
|
41
|
|
|
|
|
42
|
|
|
def _rule_should_be_tested(self, rule, rules_to_be_tested, tested_templates): |
|
43
|
|
|
if rule.short_id not in rules_to_be_tested: |
|
44
|
|
|
return False |
|
45
|
|
|
return not self._rule_template_been_tested(rule, tested_templates) |
|
46
|
|
|
|
|
47
|
|
|
def _modify_parameters(self, script, params): |
|
48
|
|
|
# If there is no profiles metadata in a script we will use |
|
49
|
|
|
# the ALL profile - this will prevent failures which might |
|
50
|
|
|
# be caused by the tested profile selecting different values |
|
51
|
|
|
# in tested variables compared to defaults. The ALL profile |
|
52
|
|
|
# is always selecting default values. |
|
53
|
|
|
# If there is profiles metadata we check the metadata and set |
|
54
|
|
|
# it to self.profile (the tested profile) only if the metadata |
|
55
|
|
|
# contains self.profile - otherwise scenario is not supposed to |
|
56
|
|
|
# be tested using the self.profile and we return empty profiles |
|
57
|
|
|
# metadata. |
|
58
|
|
|
if not params["profiles"]: |
|
59
|
|
|
params["profiles"].append(rule.OSCAP_PROFILE_ALL_ID) |
|
60
|
|
|
logging.debug( |
|
61
|
|
|
"Added the {0} profile to the list of available profiles for {1}" |
|
62
|
|
|
.format(rule.OSCAP_PROFILE_ALL_ID, script)) |
|
63
|
|
|
else: |
|
64
|
|
|
params['profiles'] = [item for item in params['profiles'] if re.search(self.profile, item)] |
|
65
|
|
|
return params |
|
66
|
|
|
|
|
67
|
|
|
def _generate_target_rules(self, profile): |
|
68
|
|
|
# check if target is a complete profile ID, if not prepend profile prefix |
|
69
|
|
|
if not profile.startswith(OSCAP_PROFILE): |
|
70
|
|
|
profile = OSCAP_PROFILE + profile |
|
71
|
|
|
logging.info("Performing combined test using profile: {0}".format(profile)) |
|
72
|
|
|
|
|
73
|
|
|
# Fetch target list from rules selected in profile |
|
74
|
|
|
target_rules = xml_operations.get_all_rule_ids_in_profile( |
|
75
|
|
|
self.datastream, self.benchmark_id, |
|
76
|
|
|
profile, logging) |
|
77
|
|
|
logging.debug("Profile {0} expanded to following list of " |
|
78
|
|
|
"rules: {1}".format(profile, target_rules)) |
|
79
|
|
|
return target_rules |
|
80
|
|
|
|
|
81
|
|
|
def _test_target(self, target): |
|
82
|
|
|
self.rules_not_tested_yet = set(target) |
|
83
|
|
|
|
|
84
|
|
|
try: |
|
85
|
|
|
super(CombinedChecker, self)._test_target(target) |
|
86
|
|
|
except KeyboardInterrupt as exec_interrupt: |
|
87
|
|
|
self.run_aborted = True |
|
88
|
|
|
raise exec_interrupt |
|
89
|
|
|
|
|
90
|
|
|
if len(self.rules_not_tested_yet) != 0: |
|
91
|
|
|
not_tested = sorted(list(self.rules_not_tested_yet)) |
|
92
|
|
|
logging.info("The following rule(s) were not tested:") |
|
93
|
|
|
for rule in not_tested: |
|
94
|
|
|
logging.info("{0}".format(rule)) |
|
95
|
|
|
|
|
96
|
|
|
def test_rule(self, state, rule, scenarios): |
|
97
|
|
|
super(CombinedChecker, self).test_rule(state, rule, scenarios) |
|
98
|
|
|
# In combined mode there is no expectations of matching substrings, |
|
99
|
|
|
# every entry in the target is expected to be unique. |
|
100
|
|
|
# Let's remove matched targets, so we can track rules not tested |
|
101
|
|
|
self.rules_not_tested_yet.discard(rule.short_id) |
|
102
|
|
|
|
|
103
|
|
|
def perform_combined_check(options): |
|
104
|
|
|
checker = CombinedChecker(options.test_env) |
|
105
|
|
|
|
|
106
|
|
|
checker.datastream = options.datastream |
|
107
|
|
|
checker.benchmark_id = options.benchmark_id |
|
108
|
|
|
checker.remediate_using = options.remediate_using |
|
109
|
|
|
checker.dont_clean = options.dont_clean |
|
110
|
|
|
# No debug option is provided for combined mode |
|
111
|
|
|
checker.manual_debug = False |
|
112
|
|
|
checker.benchmark_cpes = options.benchmark_cpes |
|
113
|
|
|
checker.scenarios_regex = options.scenarios_regex |
|
114
|
|
|
for profile in options.target: |
|
115
|
|
|
# Let's keep track of originally targeted profile |
|
116
|
|
|
checker.profile = profile |
|
117
|
|
|
target_rules = checker._generate_target_rules(profile) |
|
118
|
|
|
|
|
119
|
|
|
checker.test_target(target_rules) |
|
120
|
|
|
if checker.run_aborted: |
|
121
|
|
|
return |
|
122
|
|
|
|