|
1
|
1 |
|
from __future__ import print_function |
|
2
|
|
|
|
|
3
|
1 |
|
import os |
|
4
|
1 |
|
import logging |
|
5
|
1 |
|
import subprocess |
|
6
|
1 |
|
from collections import namedtuple |
|
7
|
1 |
|
import functools |
|
8
|
1 |
|
import tarfile |
|
9
|
1 |
|
import tempfile |
|
10
|
1 |
|
import re |
|
11
|
1 |
|
import shutil |
|
12
|
|
|
|
|
13
|
1 |
|
import ssg.yaml |
|
14
|
1 |
|
from ssg.build_cpe import ProductCPEs |
|
15
|
1 |
|
from ssg.build_yaml import Rule as RuleYAML |
|
16
|
1 |
|
from ssg.constants import MULTI_PLATFORM_MAPPING |
|
17
|
1 |
|
from ssg.constants import FULL_NAME_TO_PRODUCT_MAPPING |
|
18
|
1 |
|
from ssg.constants import OSCAP_RULE |
|
19
|
1 |
|
from ssg.jinja import process_file_with_macros |
|
20
|
1 |
|
from ssg.products import product_yaml_path, load_product_yaml |
|
21
|
1 |
|
from ssg.rules import get_rule_dir_yaml, is_rule_dir |
|
22
|
1 |
|
from ssg.rule_yaml import parse_prodtype |
|
23
|
1 |
|
from ssg_test_suite.log import LogHelper |
|
24
|
|
|
|
|
25
|
1 |
|
import ssg.templates |
|
26
|
|
|
|
|
27
|
|
|
|
|
28
|
1 |
|
Scenario_run = namedtuple( |
|
29
|
|
|
"Scenario_run", |
|
30
|
|
|
("rule_id", "script")) |
|
31
|
1 |
|
Scenario_conditions = namedtuple( |
|
32
|
|
|
"Scenario_conditions", |
|
33
|
|
|
("backend", "scanning_mode", "remediated_by", "datastream")) |
|
34
|
1 |
|
Rule = namedtuple( |
|
35
|
|
|
"Rule", ["directory", "id", "short_id", "scenarios_basenames", "template"]) |
|
36
|
|
|
|
|
37
|
1 |
|
SSG_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) |
|
38
|
|
|
|
|
39
|
1 |
|
_BENCHMARK_DIRS = [ |
|
40
|
|
|
os.path.abspath(os.path.join(SSG_ROOT, 'linux_os', 'guide')), |
|
41
|
|
|
os.path.abspath(os.path.join(SSG_ROOT, 'applications')), |
|
42
|
|
|
] |
|
43
|
|
|
|
|
44
|
1 |
|
_SHARED_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '../shared')) |
|
45
|
|
|
|
|
46
|
1 |
|
_SHARED_TEMPLATES = os.path.abspath(os.path.join(SSG_ROOT, 'shared/templates')) |
|
47
|
|
|
|
|
48
|
1 |
|
REMOTE_USER = "root" |
|
49
|
1 |
|
REMOTE_USER_HOME_DIRECTORY = "/root" |
|
50
|
1 |
|
REMOTE_TEST_SCENARIOS_DIRECTORY = os.path.join(REMOTE_USER_HOME_DIRECTORY, "ssgts") |
|
51
|
|
|
|
|
52
|
1 |
|
try: |
|
53
|
1 |
|
SSH_ADDITIONAL_OPTS = tuple(os.environ.get('SSH_ADDITIONAL_OPTIONS').split()) |
|
54
|
1 |
|
except AttributeError: |
|
55
|
|
|
# If SSH_ADDITIONAL_OPTIONS is not defined set it to empty tuple. |
|
56
|
1 |
|
SSH_ADDITIONAL_OPTS = tuple() |
|
57
|
|
|
|
|
58
|
1 |
|
SSH_ADDITIONAL_OPTS = ( |
|
59
|
|
|
"-o", "StrictHostKeyChecking=no", |
|
60
|
|
|
"-o", "UserKnownHostsFile=/dev/null", |
|
61
|
|
|
) + SSH_ADDITIONAL_OPTS |
|
62
|
|
|
|
|
63
|
1 |
|
TESTS_CONFIG_NAME = "test_config.yml" |
|
64
|
|
|
|
|
65
|
|
|
|
|
66
|
1 |
|
def walk_through_benchmark_dirs(product=None): |
|
67
|
|
|
directories = _BENCHMARK_DIRS |
|
68
|
|
|
if product is not None: |
|
69
|
|
|
yaml_path = product_yaml_path(SSG_ROOT, product) |
|
70
|
|
|
product_base = os.path.dirname(yaml_path) |
|
71
|
|
|
product_yaml = load_product_yaml(yaml_path) |
|
72
|
|
|
benchmark_root = os.path.join(product_base, product_yaml['benchmark_root']) |
|
73
|
|
|
directories = [os.path.abspath(benchmark_root)] |
|
74
|
|
|
|
|
75
|
|
|
for dirname in directories: |
|
76
|
|
|
for dirpath, dirnames, filenames in os.walk(dirname): |
|
77
|
|
|
yield dirpath, dirnames, filenames |
|
78
|
|
|
|
|
79
|
|
|
|
|
80
|
1 |
|
class Stage(object): |
|
81
|
1 |
|
NONE = 0 |
|
82
|
1 |
|
PREPARATION = 1 |
|
83
|
1 |
|
INITIAL_SCAN = 2 |
|
84
|
1 |
|
REMEDIATION = 3 |
|
85
|
1 |
|
FINAL_SCAN = 4 |
|
86
|
|
|
|
|
87
|
|
|
|
|
88
|
1 |
|
@functools.total_ordering |
|
89
|
1 |
|
class RuleResult(object): |
|
90
|
1 |
|
STAGE_STRINGS = { |
|
91
|
|
|
"preparation", |
|
92
|
|
|
"initial_scan", |
|
93
|
|
|
"remediation", |
|
94
|
|
|
"final_scan", |
|
95
|
|
|
} |
|
96
|
|
|
|
|
97
|
|
|
""" |
|
98
|
|
|
Result of a test suite testing rule under a scenario. |
|
99
|
|
|
|
|
100
|
|
|
Supports ordering by success - the most successful run orders first. |
|
101
|
|
|
""" |
|
102
|
1 |
|
def __init__(self, result_dict=None): |
|
103
|
1 |
|
self.scenario = Scenario_run("", "") |
|
104
|
1 |
|
self.conditions = Scenario_conditions("", "", "", "") |
|
105
|
1 |
|
self.when = "" |
|
106
|
1 |
|
self.passed_stages = dict() |
|
107
|
1 |
|
self.passed_stages_count = 0 |
|
108
|
1 |
|
self.success = False |
|
109
|
|
|
|
|
110
|
1 |
|
if result_dict: |
|
111
|
1 |
|
self.load_from_dict(result_dict) |
|
112
|
|
|
|
|
113
|
1 |
|
def load_from_dict(self, data): |
|
114
|
1 |
|
self.scenario = Scenario_run(data["rule_id"], data["scenario_script"]) |
|
115
|
1 |
|
self.conditions = Scenario_conditions( |
|
116
|
|
|
data["backend"], data["scanning_mode"], |
|
117
|
|
|
data["remediated_by"], data["datastream"]) |
|
118
|
1 |
|
self.when = data["run_timestamp"] |
|
119
|
|
|
|
|
120
|
1 |
|
self.passed_stages = {key: data[key] for key in self.STAGE_STRINGS if key in data} |
|
121
|
1 |
|
self.passed_stages_count = sum(self.passed_stages.values()) |
|
122
|
|
|
|
|
123
|
1 |
|
self.success = data.get("final_scan", False) |
|
124
|
1 |
|
if not self.success: |
|
125
|
1 |
|
self.success = ( |
|
126
|
|
|
"remediation" not in data |
|
127
|
|
|
and data.get("initial_scan", False)) |
|
128
|
|
|
|
|
129
|
1 |
|
def save_to_dict(self): |
|
130
|
1 |
|
data = dict() |
|
131
|
1 |
|
data["rule_id"] = self.scenario.rule_id |
|
132
|
1 |
|
data["scenario_script"] = self.scenario.script |
|
133
|
|
|
|
|
134
|
1 |
|
data["backend"] = self.conditions.backend |
|
135
|
1 |
|
data["scanning_mode"] = self.conditions.scanning_mode |
|
136
|
1 |
|
data["remediated_by"] = self.conditions.remediated_by |
|
137
|
1 |
|
data["datastream"] = self.conditions.datastream |
|
138
|
|
|
|
|
139
|
1 |
|
data["run_timestamp"] = self.when |
|
140
|
|
|
|
|
141
|
1 |
|
for stage_str, result in self.passed_stages.items(): |
|
142
|
1 |
|
data[stage_str] = result |
|
143
|
|
|
|
|
144
|
1 |
|
return data |
|
145
|
|
|
|
|
146
|
1 |
|
def record_stage_result(self, stage, successful): |
|
147
|
|
|
assert stage in self.STAGE_STRINGS, ( |
|
148
|
|
|
"Stage name {name} is invalid, choose one from {choices}" |
|
149
|
|
|
.format(name=stage, choices=", ".join(self.STAGE_STRINGS)) |
|
150
|
|
|
) |
|
151
|
|
|
self.passed_stages[stage] = successful |
|
152
|
|
|
|
|
153
|
1 |
|
def relative_conditions_to(self, other): |
|
154
|
1 |
|
if self.conditions == other.conditions: |
|
155
|
|
|
return self.when, other.when |
|
156
|
|
|
else: |
|
157
|
1 |
|
return tuple(self.conditions), tuple(other.conditions) |
|
158
|
|
|
|
|
159
|
1 |
|
def __eq__(self, other): |
|
160
|
1 |
|
return (self.success == other.success |
|
161
|
|
|
and tuple(self.passed_stages) == tuple(self.passed_stages)) |
|
162
|
|
|
|
|
163
|
1 |
|
def __lt__(self, other): |
|
164
|
1 |
|
return self.passed_stages_count > other.passed_stages_count |
|
165
|
|
|
|
|
166
|
|
|
|
|
167
|
1 |
|
def run_cmd_local(command, verbose_path, env=None): |
|
168
|
|
|
command_string = ' '.join(command) |
|
169
|
|
|
logging.debug('Running {}'.format(command_string)) |
|
170
|
|
|
returncode, output = _run_cmd(command, verbose_path, env) |
|
171
|
|
|
return returncode, output |
|
172
|
|
|
|
|
173
|
|
|
|
|
174
|
1 |
|
def _run_cmd(command_list, verbose_path, env=None): |
|
175
|
|
|
returncode = 0 |
|
176
|
|
|
output = b"" |
|
177
|
|
|
try: |
|
178
|
|
|
with open(verbose_path, 'w') as verbose_file: |
|
179
|
|
|
output = subprocess.check_output( |
|
180
|
|
|
command_list, stderr=verbose_file, env=env) |
|
181
|
|
|
except subprocess.CalledProcessError as e: |
|
182
|
|
|
returncode = e.returncode |
|
183
|
|
|
output = e.output |
|
184
|
|
|
return returncode, output.decode('utf-8') |
|
185
|
|
|
|
|
186
|
|
|
|
|
187
|
1 |
|
def _get_platform_cpes(platform): |
|
188
|
1 |
|
ssg_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) |
|
189
|
1 |
|
if platform.startswith("multi_platform_"): |
|
190
|
1 |
|
try: |
|
191
|
1 |
|
products = MULTI_PLATFORM_MAPPING[platform] |
|
192
|
1 |
|
except KeyError: |
|
193
|
1 |
|
logging.error( |
|
194
|
|
|
"Unknown multi_platform specifier: %s is not from %s" |
|
195
|
|
|
% (platform, ", ".join(MULTI_PLATFORM_MAPPING.keys()))) |
|
196
|
1 |
|
raise ValueError |
|
197
|
1 |
|
platform_cpes = set() |
|
198
|
1 |
|
for p in products: |
|
199
|
1 |
|
product_yaml_path = os.path.join(ssg_root, "products", p, "product.yml") |
|
200
|
1 |
|
product_yaml = load_product_yaml(product_yaml_path) |
|
201
|
1 |
|
p_cpes = ProductCPEs(product_yaml) |
|
202
|
1 |
|
platform_cpes |= set(p_cpes.get_product_cpe_names()) |
|
203
|
1 |
|
return platform_cpes |
|
204
|
|
|
else: |
|
205
|
|
|
# scenario platform is specified by a full product name |
|
206
|
1 |
|
try: |
|
207
|
1 |
|
product = FULL_NAME_TO_PRODUCT_MAPPING[platform] |
|
208
|
1 |
|
except KeyError: |
|
209
|
1 |
|
logging.error( |
|
210
|
|
|
"Unknown product name: %s is not from %s" |
|
211
|
|
|
% (platform, ", ".join(FULL_NAME_TO_PRODUCT_MAPPING.keys()))) |
|
212
|
1 |
|
raise ValueError |
|
213
|
1 |
|
product_yaml_path = os.path.join(ssg_root, "products", product, "product.yml") |
|
214
|
1 |
|
product_yaml = load_product_yaml(product_yaml_path) |
|
215
|
1 |
|
product_cpes = ProductCPEs(product_yaml) |
|
216
|
1 |
|
platform_cpes = set(product_cpes.get_product_cpe_names()) |
|
217
|
1 |
|
return platform_cpes |
|
218
|
|
|
|
|
219
|
|
|
|
|
220
|
1 |
|
def matches_platform(scenario_platforms, benchmark_cpes): |
|
221
|
1 |
|
if "multi_platform_all" in scenario_platforms: |
|
222
|
1 |
|
return True |
|
223
|
1 |
|
scenario_cpes = set() |
|
224
|
1 |
|
for p in scenario_platforms: |
|
225
|
1 |
|
scenario_cpes |= _get_platform_cpes(p) |
|
226
|
1 |
|
return len(scenario_cpes & benchmark_cpes) > 0 |
|
227
|
|
|
|
|
228
|
|
|
|
|
229
|
1 |
|
def run_with_stdout_logging(command, args, log_file): |
|
230
|
|
|
log_file.write("{0} {1}\n".format(command, " ".join(args))) |
|
231
|
|
|
result = subprocess.run( |
|
232
|
|
|
(command,) + args, encoding="utf-8", stdout=subprocess.PIPE, |
|
233
|
|
|
stderr=subprocess.PIPE, check=False) |
|
234
|
|
|
if result.stdout: |
|
235
|
|
|
log_file.write("STDOUT: ") |
|
236
|
|
|
log_file.write(result.stdout) |
|
237
|
|
|
if result.stderr: |
|
238
|
|
|
log_file.write("STDERR: ") |
|
239
|
|
|
log_file.write(result.stderr) |
|
240
|
|
|
return result |
|
241
|
|
|
|
|
242
|
|
|
|
|
243
|
1 |
|
def _exclude_garbage(tarinfo): |
|
244
|
|
|
file_name = tarinfo.name |
|
245
|
|
|
if file_name.endswith('pyc'): |
|
246
|
|
|
return None |
|
247
|
|
|
if file_name.endswith('swp'): |
|
248
|
|
|
return None |
|
249
|
|
|
return tarinfo |
|
250
|
|
|
|
|
251
|
|
|
|
|
252
|
1 |
|
def _make_file_root_owned(tarinfo): |
|
253
|
|
|
if tarinfo: |
|
254
|
|
|
tarinfo.uid = 0 |
|
255
|
|
|
tarinfo.gid = 0 |
|
256
|
|
|
# set permission to 775 |
|
257
|
|
|
tarinfo.mode = 509 |
|
258
|
|
|
return tarinfo |
|
259
|
|
|
|
|
260
|
|
|
|
|
261
|
1 |
|
def get_product_context(product=None): |
|
262
|
|
|
""" |
|
263
|
|
|
Returns a product YAML context if any product is specified. Hard-coded to |
|
264
|
|
|
assume a debug build. |
|
265
|
|
|
""" |
|
266
|
|
|
# Load product's YAML file if present. This will allow us to parse |
|
267
|
|
|
# tests in the context of the product we're executing under. |
|
268
|
|
|
product_yaml = dict() |
|
269
|
|
|
if product: |
|
270
|
|
|
yaml_path = product_yaml_path(SSG_ROOT, product) |
|
271
|
|
|
product_yaml = load_product_yaml(yaml_path) |
|
272
|
|
|
|
|
273
|
|
|
# We could run into a DocumentationNotComplete error when loading a |
|
274
|
|
|
# rule's YAML contents. However, because the test suite isn't executed |
|
275
|
|
|
# in the context of a particular build (though, ideally it would be |
|
276
|
|
|
# linked), we may not know exactly whether the top-level rule/profile |
|
277
|
|
|
# we're testing is actually completed. Thus, forcibly set the required |
|
278
|
|
|
# property to bypass this error. |
|
279
|
|
|
product_yaml['cmake_build_type'] = 'Debug' |
|
280
|
|
|
|
|
281
|
|
|
# Set the Jinja processing environment to Test Suite, |
|
282
|
|
|
# this allows Jinja macros to behave differently in a content build time and in a test time. |
|
283
|
|
|
product_yaml['SSG_TEST_SUITE_ENV'] = True |
|
284
|
|
|
|
|
285
|
|
|
return product_yaml |
|
286
|
|
|
|
|
287
|
|
|
|
|
288
|
1 |
|
def load_rule_and_env(rule_dir_path, env_yaml, product=None): |
|
289
|
|
|
""" |
|
290
|
|
|
Loads a rule and returns the combination of the RuleYAML class and |
|
291
|
|
|
the corresponding local environment for that rule. |
|
292
|
|
|
""" |
|
293
|
|
|
|
|
294
|
|
|
# First build the path to the rule.yml file |
|
295
|
|
|
rule_path = get_rule_dir_yaml(rule_dir_path) |
|
296
|
|
|
|
|
297
|
|
|
# Load rule content in our environment. We use this to satisfy |
|
298
|
|
|
# some implied properties that might be used in the test suite. |
|
299
|
|
|
# Make sure we normalize to a specific product as well so that |
|
300
|
|
|
# when we load templated content it is correct. |
|
301
|
|
|
rule = RuleYAML.from_yaml(rule_path, env_yaml) |
|
302
|
|
|
rule.normalize(product) |
|
303
|
|
|
|
|
304
|
|
|
# Note that most places would check prodtype, but we don't care |
|
305
|
|
|
# about that here: if the rule is available to the product, we |
|
306
|
|
|
# load and parse it anyways as we have no knowledge of the |
|
307
|
|
|
# top-level profile or rule passed into the test suite. |
|
308
|
|
|
prodtypes = parse_prodtype(rule.prodtype) |
|
309
|
|
|
|
|
310
|
|
|
# Our local copy of env_yaml needs some properties from rule.yml |
|
311
|
|
|
# for completeness. |
|
312
|
|
|
local_env_yaml = dict() |
|
313
|
|
|
local_env_yaml.update(env_yaml) |
|
314
|
|
|
local_env_yaml['rule_id'] = rule.id_ |
|
315
|
|
|
local_env_yaml['rule_title'] = rule.title |
|
316
|
|
|
local_env_yaml['products'] = prodtypes |
|
317
|
|
|
|
|
318
|
|
|
return rule, local_env_yaml |
|
319
|
|
|
|
|
320
|
|
|
|
|
321
|
1 |
|
def write_rule_templated_tests(dest_path, relative_path, test_content): |
|
322
|
|
|
output_path = os.path.join(dest_path, relative_path) |
|
323
|
|
|
|
|
324
|
|
|
# If there's a separator in the file name, it means we have nested |
|
325
|
|
|
# directories to deal with. |
|
326
|
|
|
if os.path.sep in relative_path: |
|
327
|
|
|
parts = os.path.split(relative_path)[:-1] |
|
328
|
|
|
for subdir_index in range(len(parts)): |
|
329
|
|
|
# We need to expand all directories in the correct order, |
|
330
|
|
|
# preserving any previous directories (as they're nested). |
|
331
|
|
|
# Use the star operator to splat array parts into arguments |
|
332
|
|
|
# to os.path.join(...). |
|
333
|
|
|
new_directory = os.path.join(dest_path, *parts[:subdir_index]) |
|
334
|
|
|
os.mkdir(new_directory) |
|
335
|
|
|
|
|
336
|
|
|
# Write out the test content to the desired location on disk. |
|
337
|
|
|
with open(output_path, 'w') as output_fp: |
|
338
|
|
|
print(test_content, file=output_fp) |
|
339
|
|
|
|
|
340
|
|
|
|
|
341
|
1 |
|
def write_rule_dir_tests(local_env_yaml, dest_path, dirpath): |
|
342
|
|
|
# Walk the test directory, writing all tests into the output |
|
343
|
|
|
# directory, recursively. |
|
344
|
|
|
tests_dir_path = os.path.join(dirpath, "tests") |
|
345
|
|
|
tests_dir_path = os.path.abspath(tests_dir_path) |
|
346
|
|
|
|
|
347
|
|
|
# Note that the tests/ directory may not always exist any more. In |
|
348
|
|
|
# particular, when a rule uses a template, tests may be present there |
|
349
|
|
|
# but not present in the actual rule directory. |
|
350
|
|
|
if not os.path.exists(tests_dir_path): |
|
351
|
|
|
return |
|
352
|
|
|
|
|
353
|
|
|
for dirpath, dirnames, filenames in os.walk(tests_dir_path): |
|
354
|
|
|
for dirname in dirnames: |
|
355
|
|
|
# We want to recreate the correct path under the temporary |
|
356
|
|
|
# directory. Resolve it to a relative path from the tests/ |
|
357
|
|
|
# directory. |
|
358
|
|
|
dir_path = os.path.relpath(os.path.join(dirpath, dirname), tests_dir_path) |
|
359
|
|
|
tmp_dir_path = os.path.join(dest_path, dir_path) |
|
360
|
|
|
os.mkdir(tmp_dir_path) |
|
361
|
|
|
|
|
362
|
|
|
for filename in filenames: |
|
363
|
|
|
# We want to recreate the correct path under the temporary |
|
364
|
|
|
# directory. Resolve it to a relative path from the tests/ |
|
365
|
|
|
# directory. Assumption: directories should be created |
|
366
|
|
|
# prior to recursing into them, so we don't need to handle |
|
367
|
|
|
# if a file's parent directory doesn't yet exist under the |
|
368
|
|
|
# destination. |
|
369
|
|
|
src_test_path = os.path.join(dirpath, filename) |
|
370
|
|
|
rel_test_path = os.path.relpath(src_test_path, tests_dir_path) |
|
371
|
|
|
dest_test_path = os.path.join(dest_path, rel_test_path) |
|
372
|
|
|
|
|
373
|
|
|
# Rather than performing an OS-level copy, we need to |
|
374
|
|
|
# first parse the test with jinja and then write it back |
|
375
|
|
|
# out to the destination. |
|
376
|
|
|
parsed_test = process_file_with_macros(src_test_path, local_env_yaml) |
|
377
|
|
|
with open(dest_test_path, 'w') as output_fp: |
|
378
|
|
|
print(parsed_test, file=output_fp) |
|
379
|
|
|
|
|
380
|
|
|
|
|
381
|
1 |
|
def template_rule_tests(product, product_yaml, template_builder, tmpdir, dirpath): |
|
382
|
|
|
""" |
|
383
|
|
|
For a given rule directory, templates all contained tests into the output |
|
384
|
|
|
(tmpdir) directory. |
|
385
|
|
|
""" |
|
386
|
|
|
|
|
387
|
|
|
# Load the rule and its environment |
|
388
|
|
|
rule, local_env_yaml = load_rule_and_env(dirpath, product_yaml, product) |
|
389
|
|
|
|
|
390
|
|
|
# Before we get too far, we wish to search the rule YAML to see if |
|
391
|
|
|
# it is applicable to the current product. If we have a product |
|
392
|
|
|
# and the rule isn't applicable for the product, there's no point |
|
393
|
|
|
# in continuing with the rest of the loading. This should speed up |
|
394
|
|
|
# the loading of the templated tests. Note that we've already |
|
395
|
|
|
# parsed the prodtype into local_env_yaml |
|
396
|
|
|
if product and local_env_yaml['products']: |
|
397
|
|
|
prodtypes = local_env_yaml['products'] |
|
398
|
|
|
if "all" not in prodtypes and product not in prodtypes: |
|
399
|
|
|
return |
|
400
|
|
|
|
|
401
|
|
|
# Create the destination directory. |
|
402
|
|
|
dest_path = os.path.join(tmpdir, rule.id_) |
|
403
|
|
|
os.mkdir(dest_path) |
|
404
|
|
|
|
|
405
|
|
|
# The priority order is rule-specific tests over templated tests. |
|
406
|
|
|
# That is, for any test under rule_id/tests with a name matching a |
|
407
|
|
|
# test under shared/templates/<template_name>/tests/, the former |
|
408
|
|
|
# will preferred. This means we need to process templates first, |
|
409
|
|
|
# so they'll be overwritten later if necessary. |
|
410
|
|
|
if rule.template and rule.template['vars']: |
|
411
|
|
|
templated_tests = template_builder.get_all_tests(rule.id_, rule.template, |
|
412
|
|
|
local_env_yaml) |
|
413
|
|
|
|
|
414
|
|
|
for relative_path in templated_tests: |
|
415
|
|
|
test_content = templated_tests[relative_path] |
|
416
|
|
|
write_rule_templated_tests(dest_path, relative_path, test_content) |
|
417
|
|
|
|
|
418
|
|
|
write_rule_dir_tests(local_env_yaml, dest_path, dirpath) |
|
419
|
|
|
|
|
420
|
|
|
|
|
421
|
1 |
|
def template_tests(product=None): |
|
422
|
|
|
""" |
|
423
|
|
|
Create a temporary directory with test cases parsed via jinja using |
|
424
|
|
|
product-specific context. |
|
425
|
|
|
""" |
|
426
|
|
|
# Set up an empty temp directory |
|
427
|
|
|
tmpdir = tempfile.mkdtemp() |
|
428
|
|
|
|
|
429
|
|
|
# We want to remove the temporary directory on failure, but preserve |
|
430
|
|
|
# it on success. Wrap in a try/except block and reraise the original |
|
431
|
|
|
# exception after removing the temporary directory. |
|
432
|
|
|
try: |
|
433
|
|
|
# Load the product context we're executing under, if any. |
|
434
|
|
|
product_yaml = get_product_context(product) |
|
435
|
|
|
|
|
436
|
|
|
# Initialize a mock template_builder. |
|
437
|
|
|
empty = "/ssgts/empty/placeholder" |
|
438
|
|
|
template_builder = ssg.templates.Builder(product_yaml, empty, |
|
439
|
|
|
_SHARED_TEMPLATES, empty, |
|
440
|
|
|
empty) |
|
441
|
|
|
|
|
442
|
|
|
# Note that we're not exactly copying 1-for-1 the contents of the |
|
443
|
|
|
# directory structure into the temporary one. Instead we want a |
|
444
|
|
|
# flattened mapping with all rules in a single top-level directory |
|
445
|
|
|
# and all tests immediately contained within it. That is: |
|
446
|
|
|
# |
|
447
|
|
|
# /group_a/rule_a/tests/something.pass.sh -> /rule_a/something.pass.sh |
|
448
|
|
|
for dirpath, dirnames, _ in walk_through_benchmark_dirs(product): |
|
449
|
|
|
# Skip anything that isn't obviously a rule. |
|
450
|
|
|
if not is_rule_dir(dirpath): |
|
451
|
|
|
continue |
|
452
|
|
|
|
|
453
|
|
|
template_rule_tests(product, product_yaml, template_builder, tmpdir, dirpath) |
|
454
|
|
|
except Exception as exp: |
|
455
|
|
|
shutil.rmtree(tmpdir, ignore_errors=True) |
|
456
|
|
|
raise exp |
|
457
|
|
|
|
|
458
|
|
|
return tmpdir |
|
459
|
|
|
|
|
460
|
|
|
|
|
461
|
1 |
|
def create_tarball(product): |
|
462
|
|
|
"""Create a tarball which contains all test scenarios for every rule. |
|
463
|
|
|
Tarball contains directories with the test scenarios. The name of the |
|
464
|
|
|
directories is the same as short rule ID. There is no tree structure. |
|
465
|
|
|
""" |
|
466
|
|
|
templated_tests = template_tests(product=product) |
|
467
|
|
|
|
|
468
|
|
|
try: |
|
469
|
|
|
with tempfile.NamedTemporaryFile( |
|
470
|
|
|
"wb", suffix=".tar.gz", delete=False) as fp: |
|
471
|
|
|
with tarfile.TarFile.open(fileobj=fp, mode="w") as tarball: |
|
472
|
|
|
tarball.add(_SHARED_DIR, arcname="shared", filter=_make_file_root_owned) |
|
473
|
|
|
for rule_id in os.listdir(templated_tests): |
|
474
|
|
|
# When a top-level directory exists under the temporary |
|
475
|
|
|
# templated tests directory, we've already validated that |
|
476
|
|
|
# it is a valid rule directory. Thus we can simply add it |
|
477
|
|
|
# to the tarball. |
|
478
|
|
|
absolute_dir = os.path.join(templated_tests, rule_id) |
|
479
|
|
|
if not os.path.isdir(absolute_dir): |
|
480
|
|
|
continue |
|
481
|
|
|
|
|
482
|
|
|
tarball.add( |
|
483
|
|
|
absolute_dir, arcname=rule_id, |
|
484
|
|
|
filter=lambda tinfo: _exclude_garbage(_make_file_root_owned(tinfo)) |
|
485
|
|
|
) |
|
486
|
|
|
|
|
487
|
|
|
# Since we've added the templated contents into the tarball, we |
|
488
|
|
|
# can now delete the tree. |
|
489
|
|
|
shutil.rmtree(templated_tests, ignore_errors=True) |
|
490
|
|
|
return fp.name |
|
491
|
|
|
except Exception as exp: |
|
492
|
|
|
shutil.rmtree(templated_tests, ignore_errors=True) |
|
493
|
|
|
raise exp |
|
494
|
|
|
|
|
495
|
|
|
|
|
496
|
1 |
|
def send_scripts(test_env): |
|
497
|
|
|
remote_dir = REMOTE_TEST_SCENARIOS_DIRECTORY |
|
498
|
|
|
archive_file = create_tarball(test_env.product) |
|
499
|
|
|
archive_file_basename = os.path.basename(archive_file) |
|
500
|
|
|
remote_archive_file = os.path.join(remote_dir, archive_file_basename) |
|
501
|
|
|
logging.debug("Uploading scripts.") |
|
502
|
|
|
log_file_name = os.path.join(LogHelper.LOG_DIR, "env-preparation.log") |
|
503
|
|
|
|
|
504
|
|
|
with open(log_file_name, 'a') as log_file: |
|
505
|
|
|
print("Setting up test setup scripts", file=log_file) |
|
506
|
|
|
|
|
507
|
|
|
test_env.execute_ssh_command( |
|
508
|
|
|
"mkdir -p {remote_dir}".format(remote_dir=remote_dir), |
|
509
|
|
|
log_file, "Cannot create directory {0}".format(remote_dir)) |
|
510
|
|
|
test_env.scp_upload_file( |
|
511
|
|
|
archive_file, remote_dir, |
|
512
|
|
|
log_file, "Cannot copy archive {0} to the target machine's directory {1}" |
|
513
|
|
|
.format(archive_file, remote_dir)) |
|
514
|
|
|
test_env.execute_ssh_command( |
|
515
|
|
|
"tar xf {remote_archive_file} -C {remote_dir}" |
|
516
|
|
|
.format(remote_dir=remote_dir, remote_archive_file=remote_archive_file), |
|
517
|
|
|
log_file, "Cannot extract data tarball {0}".format(remote_archive_file)) |
|
518
|
|
|
os.unlink(archive_file) |
|
519
|
|
|
return remote_dir |
|
520
|
|
|
|
|
521
|
|
|
|
|
522
|
1 |
|
def get_test_dir_config(test_dir, product_yaml): |
|
523
|
|
|
test_config = dict() |
|
524
|
|
|
test_config_filename = os.path.join(test_dir, TESTS_CONFIG_NAME) |
|
525
|
|
|
if os.path.exists(test_config_filename): |
|
526
|
|
|
test_config = ssg.yaml.open_and_expand(test_config_filename, product_yaml) |
|
527
|
|
|
return test_config |
|
528
|
|
|
|
|
529
|
|
|
|
|
530
|
1 |
|
def select_templated_tests(test_dir_config, available_scenarios_basenames): |
|
531
|
|
|
deny_scenarios = set(test_dir_config.get("deny_templated_scenarios", [])) |
|
532
|
|
|
available_scenarios_basenames = { |
|
533
|
|
|
test_name for test_name in available_scenarios_basenames |
|
534
|
|
|
if test_name not in deny_scenarios |
|
535
|
|
|
} |
|
536
|
|
|
|
|
537
|
|
|
allow_scenarios = set(test_dir_config.get("allow_templated_scenarios", [])) |
|
538
|
|
|
if allow_scenarios: |
|
539
|
|
|
available_scenarios_basenames = { |
|
540
|
|
|
test_name for test_name in available_scenarios_basenames |
|
541
|
|
|
if test_name in allow_scenarios |
|
542
|
|
|
} |
|
543
|
|
|
|
|
544
|
|
|
allowed_and_denied = deny_scenarios.intersection(allow_scenarios) |
|
545
|
|
|
if allowed_and_denied: |
|
546
|
|
|
msg = ( |
|
547
|
|
|
"Test directory configuration contain inconsistencies: {allowed_and_denied} " |
|
548
|
|
|
"scenarios are both allowed and denied." |
|
549
|
|
|
.format(test_dir_config=test_dir_config, allowed_and_denied=allowed_and_denied) |
|
550
|
|
|
) |
|
551
|
|
|
raise ValueError(msg) |
|
552
|
|
|
return available_scenarios_basenames |
|
553
|
|
|
|
|
554
|
|
|
|
|
555
|
1 |
|
def iterate_over_rules(product=None): |
|
556
|
|
|
"""Iterate over rule directories which have test scenarios". |
|
557
|
|
|
|
|
558
|
|
|
Returns: |
|
559
|
|
|
Named tuple Rule having these fields: |
|
560
|
|
|
directory -- absolute path to the rule "tests" subdirectory |
|
561
|
|
|
containing the test scenarios in Bash |
|
562
|
|
|
id -- full rule id as it is present in datastream |
|
563
|
|
|
short_id -- short rule ID, the same as basename of the directory |
|
564
|
|
|
containing the test scenarios in Bash |
|
565
|
|
|
scenarios_basenames -- list of executable .sh files in the uploaded tarball |
|
566
|
|
|
""" |
|
567
|
|
|
|
|
568
|
|
|
# Here we need to perform some magic to handle parsing the rule (from a |
|
569
|
|
|
# product perspective) and loading any templated tests. In particular, |
|
570
|
|
|
# identifying which tests to potentially run involves invoking the |
|
571
|
|
|
# templating engine. |
|
572
|
|
|
# |
|
573
|
|
|
# Begin by loading context about our execution environment, if any. |
|
574
|
|
|
product_yaml = get_product_context(product) |
|
575
|
|
|
|
|
576
|
|
|
# Initialize a mock template_builder. |
|
577
|
|
|
empty = "/ssgts/empty/placeholder" |
|
578
|
|
|
template_builder = ssg.templates.Builder(product_yaml, empty, |
|
579
|
|
|
_SHARED_TEMPLATES, empty, empty) |
|
580
|
|
|
|
|
581
|
|
|
for dirpath, dirnames, filenames in walk_through_benchmark_dirs(product): |
|
582
|
|
|
if is_rule_dir(dirpath): |
|
583
|
|
|
short_rule_id = os.path.basename(dirpath) |
|
584
|
|
|
|
|
585
|
|
|
# Load the rule itself to check for a template. |
|
586
|
|
|
rule, local_env_yaml = load_rule_and_env(dirpath, product_yaml, product) |
|
587
|
|
|
template_name = None |
|
588
|
|
|
|
|
589
|
|
|
# Before we get too far, we wish to search the rule YAML to see if |
|
590
|
|
|
# it is applicable to the current product. If we have a product |
|
591
|
|
|
# and the rule isn't applicable for the product, there's no point |
|
592
|
|
|
# in continuing with the rest of the loading. This should speed up |
|
593
|
|
|
# the loading of the templated tests. Note that we've already |
|
594
|
|
|
# parsed the prodtype into local_env_yaml |
|
595
|
|
|
if product and local_env_yaml['products']: |
|
596
|
|
|
prodtypes = local_env_yaml['products'] |
|
597
|
|
|
if "all" not in prodtypes and product not in prodtypes: |
|
598
|
|
|
continue |
|
599
|
|
|
|
|
600
|
|
|
# All tests is a mapping from path (in the tarball) to contents |
|
601
|
|
|
# of the test case. This is necessary because later code (which |
|
602
|
|
|
# attempts to parse headers from the test case) don't have easy |
|
603
|
|
|
# access to templated content. By reading it and returning it |
|
604
|
|
|
# here, we can save later code from having to understand the |
|
605
|
|
|
# templating system. |
|
606
|
|
|
all_tests = dict() |
|
607
|
|
|
|
|
608
|
|
|
tests_dir = os.path.join(dirpath, "tests") |
|
609
|
|
|
test_config = get_test_dir_config(tests_dir, product_yaml) |
|
610
|
|
|
|
|
611
|
|
|
# Start by checking for templating tests and provision them if |
|
612
|
|
|
# present. |
|
613
|
|
|
if rule.template and rule.template['vars']: |
|
614
|
|
|
templated_tests = template_builder.get_all_tests( |
|
615
|
|
|
rule.id_, rule.template, local_env_yaml) |
|
616
|
|
|
|
|
617
|
|
|
allowed_templated_tests = select_templated_tests( |
|
618
|
|
|
test_config, templated_tests.keys()) |
|
619
|
|
|
all_tests.update({name: templated_tests[name] for name in allowed_templated_tests}) |
|
620
|
|
|
template_name = rule.template['name'] |
|
621
|
|
|
|
|
622
|
|
|
# Add additional tests from the local rule directory. Note that, |
|
623
|
|
|
# like the behavior in template_tests, this will overwrite any |
|
624
|
|
|
# templated tests with the same file name. |
|
625
|
|
|
if os.path.exists(tests_dir): |
|
626
|
|
|
tests_dir_files = os.listdir(tests_dir) |
|
627
|
|
|
for test_case in tests_dir_files: |
|
628
|
|
|
test_path = os.path.join(tests_dir, test_case) |
|
629
|
|
|
if os.path.isdir(test_path): |
|
630
|
|
|
continue |
|
631
|
|
|
|
|
632
|
|
|
all_tests[test_case] = process_file_with_macros(test_path, local_env_yaml) |
|
633
|
|
|
|
|
634
|
|
|
# Filter out everything except the shell test scenarios. |
|
635
|
|
|
# Other files in rule directories are editor swap files |
|
636
|
|
|
# or other content than a test case. |
|
637
|
|
|
allowed_scripts = filter(lambda x: x.endswith(".sh"), all_tests) |
|
638
|
|
|
content_mapping = {x: all_tests[x] for x in allowed_scripts} |
|
639
|
|
|
|
|
640
|
|
|
# Skip any rules that lack any content. This ensures that if we |
|
641
|
|
|
# end up with rules with a template lacking tests and without any |
|
642
|
|
|
# rule directory tests, we don't include the empty rule here. |
|
643
|
|
|
if not content_mapping: |
|
644
|
|
|
continue |
|
645
|
|
|
|
|
646
|
|
|
full_rule_id = OSCAP_RULE + short_rule_id |
|
647
|
|
|
result = Rule( |
|
648
|
|
|
directory=tests_dir, id=full_rule_id, short_id=short_rule_id, |
|
649
|
|
|
scenarios_basenames=content_mapping, template=template_name) |
|
650
|
|
|
yield result |
|
651
|
|
|
|
|
652
|
|
|
|
|
653
|
1 |
|
def get_cpe_of_tested_os(test_env, log_file): |
|
654
|
|
|
os_release_file = "/etc/os-release" |
|
655
|
|
|
cpe_line = test_env.execute_ssh_command( |
|
656
|
|
|
"grep CPE_NAME {os_release_file}".format(os_release_file=os_release_file), |
|
657
|
|
|
log_file) |
|
658
|
|
|
# We are parsing an assignment that is possibly quoted |
|
659
|
|
|
cpe = re.match(r'''CPE_NAME=(["']?)(.*)\1''', cpe_line) |
|
660
|
|
|
if cpe and cpe.groups()[1]: |
|
661
|
|
|
return cpe.groups()[1] |
|
662
|
|
|
msg = ["Unable to get a CPE of the system running tests"] |
|
663
|
|
|
if cpe_line: |
|
664
|
|
|
msg.append( |
|
665
|
|
|
"Retreived a CPE line that we couldn't parse: {cpe_line}" |
|
666
|
|
|
.format(cpe_line=cpe_line)) |
|
667
|
|
|
else: |
|
668
|
|
|
msg.append( |
|
669
|
|
|
"Couldn't get CPE entry from '{os_release_file}'" |
|
670
|
|
|
.format(os_release_file=os_release_file)) |
|
671
|
|
|
raise RuntimeError("\n".join(msg)) |
|
672
|
|
|
|
|
673
|
|
|
|
|
674
|
1 |
|
INSTALL_COMMANDS = dict( |
|
675
|
|
|
fedora=("dnf", "install", "-y"), |
|
676
|
|
|
rhel7=("yum", "install", "-y"), |
|
677
|
|
|
rhel8=("yum", "install", "-y"), |
|
678
|
|
|
rhel9=("yum", "install", "-y"), |
|
679
|
|
|
ubuntu=("DEBIAN_FRONTEND=noninteractive", "apt", "install", "-y"), |
|
680
|
|
|
) |
|
681
|
|
|
|
|
682
|
|
|
|
|
683
|
1 |
|
def install_packages(test_env, packages): |
|
684
|
|
|
log_file_name = os.path.join(LogHelper.LOG_DIR, "env-preparation.log") |
|
685
|
|
|
|
|
686
|
|
|
with open(log_file_name, "a") as log_file: |
|
687
|
|
|
platform_cpe = get_cpe_of_tested_os(test_env, log_file) |
|
688
|
|
|
platform = cpes_to_platform([platform_cpe]) |
|
689
|
|
|
|
|
690
|
|
|
command_str = " ".join(INSTALL_COMMANDS[platform] + tuple(packages)) |
|
691
|
|
|
|
|
692
|
|
|
with open(log_file_name, 'a') as log_file: |
|
693
|
|
|
print("Installing packages", file=log_file) |
|
694
|
|
|
log_file.flush() |
|
695
|
|
|
test_env.execute_ssh_command( |
|
696
|
|
|
command_str, log_file, |
|
697
|
|
|
"Couldn't install required packages: {packages}".format(packages=",".join(packages))) |
|
698
|
|
|
|
|
699
|
|
|
|
|
700
|
1 |
|
def cpes_to_platform(cpes): |
|
701
|
|
|
for cpe in cpes: |
|
702
|
|
|
if "fedora" in cpe: |
|
703
|
|
|
return "fedora" |
|
704
|
|
|
if "redhat:enterprise_linux" in cpe: |
|
705
|
|
|
match = re.search(r":enterprise_linux:([^:]+):", cpe) |
|
706
|
|
|
if match: |
|
707
|
|
|
major_version = match.groups()[0].split(".")[0] |
|
708
|
|
|
return "rhel" + major_version |
|
709
|
|
|
if "ubuntu" in cpe: |
|
710
|
|
|
return "ubuntu" |
|
711
|
|
|
msg = "Unable to deduce a platform from these CPEs: {cpes}".format(cpes=cpes) |
|
712
|
|
|
raise ValueError(msg) |
|
713
|
|
|
|