1
|
|
|
import logging |
2
|
|
|
import subprocess |
3
|
|
|
from collections import namedtuple |
4
|
|
|
import enum |
5
|
|
|
import functools |
6
|
|
|
from ssg.constants import MULTI_PLATFORM_MAPPING |
7
|
|
|
from ssg.constants import PRODUCT_TO_CPE_MAPPING |
8
|
|
|
from ssg.constants import FULL_NAME_TO_PRODUCT_MAPPING |
9
|
|
|
|
10
|
|
|
Scenario_run = namedtuple( |
11
|
|
|
"Scenario_run", |
12
|
|
|
("rule_id", "script")) |
13
|
|
|
Scenario_conditions = namedtuple( |
14
|
|
|
"Scenario_conditions", |
15
|
|
|
("backend", "scanning_mode", "remediated_by", "datastream")) |
16
|
|
|
|
17
|
|
|
|
18
|
|
|
IGNORE_KNOWN_HOSTS_OPTIONS = ( |
19
|
|
|
"-o", "StrictHostKeyChecking=no", |
20
|
|
|
"-o", "UserKnownHostsFile=/dev/null", |
21
|
|
|
) |
22
|
|
|
|
23
|
|
|
|
24
|
|
|
class Stage(enum.IntEnum): |
25
|
|
|
NONE = 0 |
26
|
|
|
PREPARATION = 1 |
27
|
|
|
INITIAL_SCAN = 2 |
28
|
|
|
REMEDIATION = 3 |
29
|
|
|
FINAL_SCAN = 4 |
30
|
|
|
|
31
|
|
|
|
32
|
|
|
@functools.total_ordering |
33
|
|
|
class RuleResult(object): |
34
|
|
|
STAGE_STRINGS = { |
35
|
|
|
"preparation", |
36
|
|
|
"initial_scan", |
37
|
|
|
"remediation", |
38
|
|
|
"final_scan", |
39
|
|
|
} |
40
|
|
|
|
41
|
|
|
""" |
42
|
|
|
Result of a test suite testing rule under a scenario. |
43
|
|
|
|
44
|
|
|
Supports ordering by success - the most successful run orders first. |
45
|
|
|
""" |
46
|
|
|
def __init__(self, result_dict=None): |
47
|
|
|
self.scenario = Scenario_run("", "") |
48
|
|
|
self.conditions = Scenario_conditions("", "", "", "") |
49
|
|
|
self.when = "" |
50
|
|
|
self.passed_stages = dict() |
51
|
|
|
self.passed_stages_count = 0 |
52
|
|
|
self.success = False |
53
|
|
|
|
54
|
|
|
if result_dict: |
55
|
|
|
self.load_from_dict(result_dict) |
56
|
|
|
|
57
|
|
|
def load_from_dict(self, data): |
58
|
|
|
self.scenario = Scenario_run(data["rule_id"], data["scenario_script"]) |
59
|
|
|
self.conditions = Scenario_conditions( |
60
|
|
|
data["backend"], data["scanning_mode"], |
61
|
|
|
data["remediated_by"], data["datastream"]) |
62
|
|
|
self.when = data["run_timestamp"] |
63
|
|
|
|
64
|
|
|
self.passed_stages = {key: data[key] for key in self.STAGE_STRINGS if key in data} |
65
|
|
|
self.passed_stages_count = sum(self.passed_stages.values()) |
66
|
|
|
|
67
|
|
|
self.success = data.get("final_scan", False) |
68
|
|
|
if not self.success: |
69
|
|
|
self.success = ( |
70
|
|
|
"remediation" not in data |
71
|
|
|
and data.get("initial_scan", False)) |
72
|
|
|
|
73
|
|
|
def save_to_dict(self): |
74
|
|
|
data = dict() |
75
|
|
|
data["rule_id"] = self.scenario.rule_id |
76
|
|
|
data["scenario_script"] = self.scenario.script |
77
|
|
|
|
78
|
|
|
data["backend"] = self.conditions.backend |
79
|
|
|
data["scanning_mode"] = self.conditions.scanning_mode |
80
|
|
|
data["remediated_by"] = self.conditions.remediated_by |
81
|
|
|
data["datastream"] = self.conditions.datastream |
82
|
|
|
|
83
|
|
|
data["run_timestamp"] = self.when |
84
|
|
|
|
85
|
|
|
for stage_str, result in self.passed_stages.items(): |
86
|
|
|
data[stage_str] = result |
87
|
|
|
|
88
|
|
|
return data |
89
|
|
|
|
90
|
|
|
def record_stage_result(self, stage, successful): |
91
|
|
|
assert stage in self.STAGE_STRINGS, ( |
92
|
|
|
"Stage name {name} is invalid, choose one from {choices}" |
93
|
|
|
.format(name=stage, choices=", ".join(self.STAGE_STRINGS)) |
94
|
|
|
) |
95
|
|
|
self.passed_stages[stage] = successful |
96
|
|
|
|
97
|
|
|
def relative_conditions_to(self, other): |
98
|
|
|
if self.conditions == other.conditions: |
99
|
|
|
return self.when, other.when |
100
|
|
|
else: |
101
|
|
|
return tuple(self.conditions), tuple(other.conditions) |
102
|
|
|
|
103
|
|
|
def __eq__(self, other): |
104
|
|
|
return (self.success == other.success |
105
|
|
|
and tuple(self.passed_stages) == tuple(self.passed_stages)) |
106
|
|
|
|
107
|
|
|
def __lt__(self, other): |
108
|
|
|
return self.passed_stages_count > other.passed_stages_count |
109
|
|
|
|
110
|
|
|
|
111
|
|
|
def run_cmd_local(command, verbose_path, env=None): |
112
|
|
|
command_string = ' '.join(command) |
113
|
|
|
logging.debug('Running {}'.format(command_string)) |
114
|
|
|
returncode, output = _run_cmd(command, verbose_path, env) |
115
|
|
|
return returncode, output |
116
|
|
|
|
117
|
|
|
|
118
|
|
|
def run_cmd_remote(command_string, domain_ip, verbose_path, env=None): |
119
|
|
|
machine = 'root@{0}'.format(domain_ip) |
120
|
|
|
remote_cmd = ['ssh'] + list(IGNORE_KNOWN_HOSTS_OPTIONS) + [machine, command_string] |
121
|
|
|
logging.debug('Running {}'.format(command_string)) |
122
|
|
|
returncode, output = _run_cmd(remote_cmd, verbose_path, env) |
123
|
|
|
return returncode, output |
124
|
|
|
|
125
|
|
|
|
126
|
|
|
def _run_cmd(command_list, verbose_path, env=None): |
127
|
|
|
returncode = 0 |
128
|
|
|
output = b"" |
129
|
|
|
try: |
130
|
|
|
with open(verbose_path, 'w') as verbose_file: |
131
|
|
|
output = subprocess.check_output( |
132
|
|
|
command_list, stderr=verbose_file, env=env) |
133
|
|
|
except subprocess.CalledProcessError as e: |
134
|
|
|
returncode = e.returncode |
135
|
|
|
output = e.output |
136
|
|
|
return returncode, output.decode('utf-8') |
137
|
|
|
|
138
|
|
|
|
139
|
|
|
def _get_platform_cpes(platform): |
140
|
|
|
if platform.startswith("multi_platform_"): |
141
|
|
|
try: |
142
|
|
|
products = MULTI_PLATFORM_MAPPING[platform] |
143
|
|
|
except KeyError: |
144
|
|
|
logging.error( |
145
|
|
|
"Unknown multi_platform specifier: %s is not from %s" |
146
|
|
|
% (platform, ", ".join(MULTI_PLATFORM_MAPPING.keys()))) |
147
|
|
|
raise ValueError |
148
|
|
|
platform_cpes = set() |
149
|
|
|
for p in products: |
150
|
|
|
platform_cpes |= set(PRODUCT_TO_CPE_MAPPING[p]) |
151
|
|
|
return platform_cpes |
152
|
|
|
else: |
153
|
|
|
# scenario platform is specified by a full product name |
154
|
|
|
try: |
155
|
|
|
product = FULL_NAME_TO_PRODUCT_MAPPING[platform] |
156
|
|
|
except KeyError: |
157
|
|
|
logging.error( |
158
|
|
|
"Unknown product name: %s is not from %s" |
159
|
|
|
% (platform, ", ".join(FULL_NAME_TO_PRODUCT_MAPPING.keys()))) |
160
|
|
|
raise ValueError |
161
|
|
|
platform_cpes = set(PRODUCT_TO_CPE_MAPPING[product]) |
162
|
|
|
return platform_cpes |
163
|
|
|
|
164
|
|
|
|
165
|
|
|
def matches_platform(scenario_platforms, benchmark_cpes): |
166
|
|
|
if "multi_platform_all" in scenario_platforms: |
167
|
|
|
return True |
168
|
|
|
scenario_cpes = set() |
169
|
|
|
for p in scenario_platforms: |
170
|
|
|
scenario_cpes |= _get_platform_cpes(p) |
171
|
|
|
return len(scenario_cpes & benchmark_cpes) > 0 |
172
|
|
|
|