|
1
|
1 |
|
from __future__ import print_function |
|
2
|
|
|
|
|
3
|
1 |
|
import os |
|
4
|
1 |
|
import logging |
|
5
|
1 |
|
import subprocess |
|
6
|
1 |
|
from collections import namedtuple |
|
7
|
1 |
|
import functools |
|
8
|
1 |
|
import tarfile |
|
9
|
1 |
|
import tempfile |
|
10
|
1 |
|
import re |
|
11
|
1 |
|
import shutil |
|
12
|
|
|
|
|
13
|
1 |
|
from ssg.build_cpe import ProductCPEs |
|
14
|
1 |
|
from ssg.build_yaml import Rule as RuleYAML |
|
15
|
1 |
|
from ssg.constants import MULTI_PLATFORM_MAPPING |
|
16
|
1 |
|
from ssg.constants import FULL_NAME_TO_PRODUCT_MAPPING |
|
17
|
1 |
|
from ssg.constants import OSCAP_RULE |
|
18
|
1 |
|
from ssg.jinja import process_file |
|
19
|
1 |
|
from ssg.products import product_yaml_path, load_product_yaml |
|
20
|
1 |
|
from ssg.rules import get_rule_dir_yaml, is_rule_dir |
|
21
|
1 |
|
from ssg.rule_yaml import parse_prodtype |
|
22
|
1 |
|
from ssg_test_suite.log import LogHelper |
|
23
|
|
|
|
|
24
|
1 |
|
Scenario_run = namedtuple( |
|
25
|
|
|
"Scenario_run", |
|
26
|
|
|
("rule_id", "script")) |
|
27
|
1 |
|
Scenario_conditions = namedtuple( |
|
28
|
|
|
"Scenario_conditions", |
|
29
|
|
|
("backend", "scanning_mode", "remediated_by", "datastream")) |
|
30
|
1 |
|
Rule = namedtuple( |
|
31
|
|
|
"Rule", ["directory", "id", "short_id", "files"]) |
|
32
|
|
|
|
|
33
|
1 |
|
SSG_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) |
|
34
|
|
|
|
|
35
|
1 |
|
_BENCHMARK_DIRS = [ |
|
36
|
|
|
os.path.abspath(os.path.join(SSG_ROOT, 'linux_os', 'guide')), |
|
37
|
|
|
os.path.abspath(os.path.join(SSG_ROOT, 'applications')), |
|
38
|
|
|
] |
|
39
|
|
|
|
|
40
|
1 |
|
_SHARED_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '../shared')) |
|
41
|
|
|
|
|
42
|
1 |
|
REMOTE_USER = "root" |
|
43
|
1 |
|
REMOTE_USER_HOME_DIRECTORY = "/root" |
|
44
|
1 |
|
REMOTE_TEST_SCENARIOS_DIRECTORY = os.path.join(REMOTE_USER_HOME_DIRECTORY, "ssgts") |
|
45
|
|
|
|
|
46
|
1 |
|
try: |
|
47
|
1 |
|
SSH_ADDITIONAL_OPTS = tuple(os.environ.get('SSH_ADDITIONAL_OPTIONS').split()) |
|
48
|
1 |
|
except AttributeError: |
|
49
|
|
|
# If SSH_ADDITIONAL_OPTIONS is not defined set it to empty tuple. |
|
50
|
1 |
|
SSH_ADDITIONAL_OPTS = tuple() |
|
51
|
|
|
|
|
52
|
1 |
|
SSH_ADDITIONAL_OPTS = ( |
|
53
|
|
|
"-o", "StrictHostKeyChecking=no", |
|
54
|
|
|
"-o", "UserKnownHostsFile=/dev/null", |
|
55
|
|
|
) + SSH_ADDITIONAL_OPTS |
|
56
|
|
|
|
|
57
|
|
|
|
|
58
|
1 |
|
def walk_through_benchmark_dirs(product=None): |
|
59
|
|
|
directories = _BENCHMARK_DIRS |
|
60
|
|
|
if product is not None: |
|
61
|
|
|
yaml_path = product_yaml_path(SSG_ROOT, product) |
|
62
|
|
|
product_base = os.path.dirname(yaml_path) |
|
63
|
|
|
product_yaml = load_product_yaml(yaml_path) |
|
64
|
|
|
benchmark_root = os.path.join(product_base, product_yaml['benchmark_root']) |
|
65
|
|
|
directories = [os.path.abspath(benchmark_root)] |
|
66
|
|
|
|
|
67
|
|
|
for dirname in directories: |
|
68
|
|
|
for dirpath, dirnames, filenames in os.walk(dirname): |
|
69
|
|
|
yield dirpath, dirnames, filenames |
|
70
|
|
|
|
|
71
|
|
|
|
|
72
|
1 |
|
class Stage(object): |
|
73
|
1 |
|
NONE = 0 |
|
74
|
1 |
|
PREPARATION = 1 |
|
75
|
1 |
|
INITIAL_SCAN = 2 |
|
76
|
1 |
|
REMEDIATION = 3 |
|
77
|
1 |
|
FINAL_SCAN = 4 |
|
78
|
|
|
|
|
79
|
|
|
|
|
80
|
1 |
|
@functools.total_ordering |
|
81
|
1 |
|
class RuleResult(object): |
|
82
|
1 |
|
STAGE_STRINGS = { |
|
83
|
|
|
"preparation", |
|
84
|
|
|
"initial_scan", |
|
85
|
|
|
"remediation", |
|
86
|
|
|
"final_scan", |
|
87
|
|
|
} |
|
88
|
|
|
|
|
89
|
|
|
""" |
|
90
|
|
|
Result of a test suite testing rule under a scenario. |
|
91
|
|
|
|
|
92
|
|
|
Supports ordering by success - the most successful run orders first. |
|
93
|
|
|
""" |
|
94
|
1 |
|
def __init__(self, result_dict=None): |
|
95
|
1 |
|
self.scenario = Scenario_run("", "") |
|
96
|
1 |
|
self.conditions = Scenario_conditions("", "", "", "") |
|
97
|
1 |
|
self.when = "" |
|
98
|
1 |
|
self.passed_stages = dict() |
|
99
|
1 |
|
self.passed_stages_count = 0 |
|
100
|
1 |
|
self.success = False |
|
101
|
|
|
|
|
102
|
1 |
|
if result_dict: |
|
103
|
1 |
|
self.load_from_dict(result_dict) |
|
104
|
|
|
|
|
105
|
1 |
|
def load_from_dict(self, data): |
|
106
|
1 |
|
self.scenario = Scenario_run(data["rule_id"], data["scenario_script"]) |
|
107
|
1 |
|
self.conditions = Scenario_conditions( |
|
108
|
|
|
data["backend"], data["scanning_mode"], |
|
109
|
|
|
data["remediated_by"], data["datastream"]) |
|
110
|
1 |
|
self.when = data["run_timestamp"] |
|
111
|
|
|
|
|
112
|
1 |
|
self.passed_stages = {key: data[key] for key in self.STAGE_STRINGS if key in data} |
|
113
|
1 |
|
self.passed_stages_count = sum(self.passed_stages.values()) |
|
114
|
|
|
|
|
115
|
1 |
|
self.success = data.get("final_scan", False) |
|
116
|
1 |
|
if not self.success: |
|
117
|
1 |
|
self.success = ( |
|
118
|
|
|
"remediation" not in data |
|
119
|
|
|
and data.get("initial_scan", False)) |
|
120
|
|
|
|
|
121
|
1 |
|
def save_to_dict(self): |
|
122
|
1 |
|
data = dict() |
|
123
|
1 |
|
data["rule_id"] = self.scenario.rule_id |
|
124
|
1 |
|
data["scenario_script"] = self.scenario.script |
|
125
|
|
|
|
|
126
|
1 |
|
data["backend"] = self.conditions.backend |
|
127
|
1 |
|
data["scanning_mode"] = self.conditions.scanning_mode |
|
128
|
1 |
|
data["remediated_by"] = self.conditions.remediated_by |
|
129
|
1 |
|
data["datastream"] = self.conditions.datastream |
|
130
|
|
|
|
|
131
|
1 |
|
data["run_timestamp"] = self.when |
|
132
|
|
|
|
|
133
|
1 |
|
for stage_str, result in self.passed_stages.items(): |
|
134
|
1 |
|
data[stage_str] = result |
|
135
|
|
|
|
|
136
|
1 |
|
return data |
|
137
|
|
|
|
|
138
|
1 |
|
def record_stage_result(self, stage, successful): |
|
139
|
|
|
assert stage in self.STAGE_STRINGS, ( |
|
140
|
|
|
"Stage name {name} is invalid, choose one from {choices}" |
|
141
|
|
|
.format(name=stage, choices=", ".join(self.STAGE_STRINGS)) |
|
142
|
|
|
) |
|
143
|
|
|
self.passed_stages[stage] = successful |
|
144
|
|
|
|
|
145
|
1 |
|
def relative_conditions_to(self, other): |
|
146
|
1 |
|
if self.conditions == other.conditions: |
|
147
|
|
|
return self.when, other.when |
|
148
|
|
|
else: |
|
149
|
1 |
|
return tuple(self.conditions), tuple(other.conditions) |
|
150
|
|
|
|
|
151
|
1 |
|
def __eq__(self, other): |
|
152
|
1 |
|
return (self.success == other.success |
|
153
|
|
|
and tuple(self.passed_stages) == tuple(self.passed_stages)) |
|
154
|
|
|
|
|
155
|
1 |
|
def __lt__(self, other): |
|
156
|
1 |
|
return self.passed_stages_count > other.passed_stages_count |
|
157
|
|
|
|
|
158
|
|
|
|
|
159
|
1 |
|
def run_cmd_local(command, verbose_path, env=None): |
|
160
|
|
|
command_string = ' '.join(command) |
|
161
|
|
|
logging.debug('Running {}'.format(command_string)) |
|
162
|
|
|
returncode, output = _run_cmd(command, verbose_path, env) |
|
163
|
|
|
return returncode, output |
|
164
|
|
|
|
|
165
|
|
|
|
|
166
|
1 |
|
def _run_cmd(command_list, verbose_path, env=None): |
|
167
|
|
|
returncode = 0 |
|
168
|
|
|
output = b"" |
|
169
|
|
|
try: |
|
170
|
|
|
with open(verbose_path, 'w') as verbose_file: |
|
171
|
|
|
output = subprocess.check_output( |
|
172
|
|
|
command_list, stderr=verbose_file, env=env) |
|
173
|
|
|
except subprocess.CalledProcessError as e: |
|
174
|
|
|
returncode = e.returncode |
|
175
|
|
|
output = e.output |
|
176
|
|
|
return returncode, output.decode('utf-8') |
|
177
|
|
|
|
|
178
|
|
|
|
|
179
|
1 |
|
def _get_platform_cpes(platform): |
|
180
|
1 |
|
ssg_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) |
|
181
|
1 |
|
if platform.startswith("multi_platform_"): |
|
182
|
1 |
|
try: |
|
183
|
1 |
|
products = MULTI_PLATFORM_MAPPING[platform] |
|
184
|
1 |
|
except KeyError: |
|
185
|
1 |
|
logging.error( |
|
186
|
|
|
"Unknown multi_platform specifier: %s is not from %s" |
|
187
|
|
|
% (platform, ", ".join(MULTI_PLATFORM_MAPPING.keys()))) |
|
188
|
1 |
|
raise ValueError |
|
189
|
1 |
|
platform_cpes = set() |
|
190
|
1 |
|
for p in products: |
|
191
|
1 |
|
product_yaml_path = os.path.join(ssg_root, "products", p, "product.yml") |
|
192
|
1 |
|
product_yaml = load_product_yaml(product_yaml_path) |
|
193
|
1 |
|
p_cpes = ProductCPEs(product_yaml) |
|
194
|
1 |
|
platform_cpes |= set(p_cpes.get_product_cpe_names()) |
|
195
|
1 |
|
return platform_cpes |
|
196
|
|
|
else: |
|
197
|
|
|
# scenario platform is specified by a full product name |
|
198
|
1 |
|
try: |
|
199
|
1 |
|
product = FULL_NAME_TO_PRODUCT_MAPPING[platform] |
|
200
|
1 |
|
except KeyError: |
|
201
|
1 |
|
logging.error( |
|
202
|
|
|
"Unknown product name: %s is not from %s" |
|
203
|
|
|
% (platform, ", ".join(FULL_NAME_TO_PRODUCT_MAPPING.keys()))) |
|
204
|
1 |
|
raise ValueError |
|
205
|
1 |
|
product_yaml_path = os.path.join(ssg_root, "products", product, "product.yml") |
|
206
|
1 |
|
product_yaml = load_product_yaml(product_yaml_path) |
|
207
|
1 |
|
product_cpes = ProductCPEs(product_yaml) |
|
208
|
1 |
|
platform_cpes = set(product_cpes.get_product_cpe_names()) |
|
209
|
1 |
|
return platform_cpes |
|
210
|
|
|
|
|
211
|
|
|
|
|
212
|
1 |
|
def matches_platform(scenario_platforms, benchmark_cpes): |
|
213
|
1 |
|
if "multi_platform_all" in scenario_platforms: |
|
214
|
1 |
|
return True |
|
215
|
1 |
|
scenario_cpes = set() |
|
216
|
1 |
|
for p in scenario_platforms: |
|
217
|
1 |
|
scenario_cpes |= _get_platform_cpes(p) |
|
218
|
1 |
|
return len(scenario_cpes & benchmark_cpes) > 0 |
|
219
|
|
|
|
|
220
|
|
|
|
|
221
|
1 |
|
def run_with_stdout_logging(command, args, log_file): |
|
222
|
|
|
log_file.write("{0} {1}\n".format(command, " ".join(args))) |
|
223
|
|
|
result = subprocess.run( |
|
224
|
|
|
(command,) + args, encoding="utf-8", stdout=subprocess.PIPE, |
|
225
|
|
|
stderr=subprocess.PIPE, check=True) |
|
226
|
|
|
if result.stdout: |
|
227
|
|
|
log_file.write("STDOUT: ") |
|
228
|
|
|
log_file.write(result.stdout) |
|
229
|
|
|
if result.stderr: |
|
230
|
|
|
log_file.write("STDERR: ") |
|
231
|
|
|
log_file.write(result.stderr) |
|
232
|
|
|
return result.stdout |
|
233
|
|
|
|
|
234
|
|
|
|
|
235
|
1 |
|
def _exclude_garbage(tarinfo): |
|
236
|
|
|
file_name = tarinfo.name |
|
237
|
|
|
if file_name.endswith('pyc'): |
|
238
|
|
|
return None |
|
239
|
|
|
if file_name.endswith('swp'): |
|
240
|
|
|
return None |
|
241
|
|
|
return tarinfo |
|
242
|
|
|
|
|
243
|
|
|
|
|
244
|
1 |
|
def _make_file_root_owned(tarinfo): |
|
245
|
|
|
if tarinfo: |
|
246
|
|
|
tarinfo.uid = 0 |
|
247
|
|
|
tarinfo.gid = 0 |
|
248
|
|
|
return tarinfo |
|
249
|
|
|
|
|
250
|
|
|
|
|
251
|
1 |
|
def _rel_abs_path(current_path, base_path): |
|
252
|
|
|
""" |
|
253
|
|
|
Return the value of the current path, relative to the base path, but |
|
254
|
|
|
resolving paths absolutely first. This helps when walking a nested |
|
255
|
|
|
directory structure and want to get the subtree relative to the original |
|
256
|
|
|
path |
|
257
|
|
|
""" |
|
258
|
|
|
tmp_path = os.path.abspath(current_path) |
|
259
|
|
|
return os.path.relpath(current_path, base_path) |
|
260
|
|
|
|
|
261
|
|
|
|
|
262
|
1 |
|
def template_tests(product=None): |
|
263
|
|
|
""" |
|
264
|
|
|
Create a temporary directory with test cases parsed via jinja using |
|
265
|
|
|
product-specific context. |
|
266
|
|
|
""" |
|
267
|
|
|
# Set up an empty temp directory |
|
268
|
|
|
tmpdir = tempfile.mkdtemp() |
|
269
|
|
|
|
|
270
|
|
|
# We want to remove the temporary directory on failure, but preserve |
|
271
|
|
|
# it on success. Wrap in a try/except block and reraise the original |
|
272
|
|
|
# exception after removing the temporary directory. |
|
273
|
|
|
try: |
|
274
|
|
|
# Load product's YAML file if present. This will allow us to parse |
|
275
|
|
|
# tests in the context of the product we're executing under. |
|
276
|
|
|
product_yaml = dict() |
|
277
|
|
|
if product: |
|
278
|
|
|
yaml_path = product_yaml_path(SSG_ROOT, product) |
|
279
|
|
|
product_yaml = load_product_yaml(yaml_path) |
|
280
|
|
|
|
|
281
|
|
|
# Below we could run into a DocumentationNotComplete error. However, |
|
282
|
|
|
# because the test suite isn't executed in the context of a particular |
|
283
|
|
|
# build (though, ideally it would be linked), we may not know exactly |
|
284
|
|
|
# whether the top-level rule/profile we're testing is actually |
|
285
|
|
|
# completed. Thus, forcibly set the required property to bypass this |
|
286
|
|
|
# error. |
|
287
|
|
|
product_yaml['cmake_build_type'] = 'Debug' |
|
288
|
|
|
|
|
289
|
|
|
# Note that we're not exactly copying 1-for-1 the contents of the |
|
290
|
|
|
# directory structure into the temporary one. Instead we want a |
|
291
|
|
|
# flattened mapping with all rules in a single top-level directory |
|
292
|
|
|
# and all tests immediately contained within it. That is: |
|
293
|
|
|
# |
|
294
|
|
|
# /group_a/rule_a/tests/something.pass.sh -> /rule_a/something.pass.sh |
|
295
|
|
|
for dirpath, dirnames, _ in walk_through_benchmark_dirs(product): |
|
296
|
|
|
# Skip anything that isn't obviously a rule. |
|
297
|
|
|
if "tests" not in dirnames or not is_rule_dir(dirpath): |
|
298
|
|
|
continue |
|
299
|
|
|
|
|
300
|
|
|
# Load rule content in our environment. We use this to satisfy |
|
301
|
|
|
# some implied properties that might be used in the test suite. |
|
302
|
|
|
rule_path = get_rule_dir_yaml(dirpath) |
|
303
|
|
|
rule = RuleYAML.from_yaml(rule_path, product_yaml) |
|
304
|
|
|
|
|
305
|
|
|
# Note that most places would check prodtype, but we don't care |
|
306
|
|
|
# about that here: if the rule is available to the product, we |
|
307
|
|
|
# load and parse it anyways as we have no knowledge of the |
|
308
|
|
|
# top-level profile or rule passed into the test suite. |
|
309
|
|
|
prodtypes = parse_prodtype(rule.prodtype) |
|
310
|
|
|
|
|
311
|
|
|
# Our local copy of env_yaml needs some properties from rule.yml |
|
312
|
|
|
# for completeness. |
|
313
|
|
|
local_env_yaml = dict() |
|
314
|
|
|
local_env_yaml.update(product_yaml) |
|
315
|
|
|
local_env_yaml['rule_id'] = rule.id_ |
|
316
|
|
|
local_env_yaml['rule_title'] = rule.title |
|
317
|
|
|
local_env_yaml['products'] = prodtypes |
|
318
|
|
|
|
|
319
|
|
|
# Create the destination directory. |
|
320
|
|
|
dest_path = os.path.join(tmpdir, rule.id_) |
|
321
|
|
|
os.mkdir(dest_path) |
|
322
|
|
|
|
|
323
|
|
|
# Walk the test directory, writing all tests into the output |
|
324
|
|
|
# directory, recursively. |
|
325
|
|
|
tests_dir_path = os.path.join(dirpath, "tests") |
|
326
|
|
|
tests_dir_path = os.path.abspath(tests_dir_path) |
|
327
|
|
|
for dirpath, dirnames, filenames in os.walk(tests_dir_path): |
|
328
|
|
|
for dirname in dirnames: |
|
329
|
|
|
# We want to recreate the correct path under the temporary |
|
330
|
|
|
# directory. Resolve it to a relative path from the tests/ |
|
331
|
|
|
# directory. |
|
332
|
|
|
dir_path = _rel_abs_path(os.path.join(dirpath, dirname), tests_dir_path) |
|
333
|
|
|
assert '../' not in dir_path |
|
334
|
|
|
tmp_dir_path = os.path.join(dest_path, dir_path) |
|
335
|
|
|
os.mkdir(tmp_dir_path) |
|
336
|
|
|
|
|
337
|
|
|
for filename in filenames: |
|
338
|
|
|
# We want to recreate the correct path under the temporary |
|
339
|
|
|
# directory. Resolve it to a relative path from the tests/ |
|
340
|
|
|
# directory. Assumption: directories should be created |
|
341
|
|
|
# prior to recursing into them, so we don't need to handle |
|
342
|
|
|
# if a file's parent directory doesn't yet exist under the |
|
343
|
|
|
# destination. |
|
344
|
|
|
src_test_path = os.path.join(dirpath, filename) |
|
345
|
|
|
rel_test_path = _rel_abs_path(src_test_path, tests_dir_path) |
|
346
|
|
|
dest_test_path = os.path.join(dest_path, rel_test_path) |
|
347
|
|
|
|
|
348
|
|
|
# Rather than performing an OS-level copy, we need to |
|
349
|
|
|
# first parse the test with jinja and then write it back |
|
350
|
|
|
# out to the destination. |
|
351
|
|
|
parsed_test = process_file(src_test_path, local_env_yaml) |
|
352
|
|
|
with open(dest_test_path, 'w') as output_fp: |
|
353
|
|
|
print(parsed_test, file=output_fp) |
|
354
|
|
|
|
|
355
|
|
|
except Exception as exp: |
|
356
|
|
|
shutil.rmtree(tmpdir, ignore_errors=True) |
|
357
|
|
|
raise exp |
|
358
|
|
|
|
|
359
|
|
|
return tmpdir |
|
360
|
|
|
|
|
361
|
|
|
|
|
362
|
1 |
|
def create_tarball(product): |
|
363
|
|
|
"""Create a tarball which contains all test scenarios for every rule. |
|
364
|
|
|
Tarball contains directories with the test scenarios. The name of the |
|
365
|
|
|
directories is the same as short rule ID. There is no tree structure. |
|
366
|
|
|
""" |
|
367
|
|
|
templated_tests = template_tests(product=product) |
|
368
|
|
|
|
|
369
|
|
|
try: |
|
370
|
|
|
with tempfile.NamedTemporaryFile( |
|
371
|
|
|
"wb", suffix=".tar.gz", delete=False) as fp: |
|
372
|
|
|
with tarfile.TarFile.open(fileobj=fp, mode="w") as tarball: |
|
373
|
|
|
tarball.add(_SHARED_DIR, arcname="shared", filter=_make_file_root_owned) |
|
374
|
|
|
for rule_id in os.listdir(templated_tests): |
|
375
|
|
|
# When a top-level directory exists under the temporary |
|
376
|
|
|
# templated tests directory, we've already validated that |
|
377
|
|
|
# it is a valid rule directory. Thus we can simply add it |
|
378
|
|
|
# to the tarball. |
|
379
|
|
|
absolute_dir = os.path.join(templated_tests, rule_id) |
|
380
|
|
|
if not os.path.isdir(absolute_dir): |
|
381
|
|
|
continue |
|
382
|
|
|
|
|
383
|
|
|
tarball.add( |
|
384
|
|
|
absolute_dir, arcname=rule_id, |
|
385
|
|
|
filter=lambda tinfo: _exclude_garbage(_make_file_root_owned(tinfo)) |
|
386
|
|
|
) |
|
387
|
|
|
|
|
388
|
|
|
# Since we've added the templated contents into the tarball, we |
|
389
|
|
|
# can now delete the tree. |
|
390
|
|
|
shutil.rmtree(templated_tests, ignore_errors=True) |
|
391
|
|
|
return fp.name |
|
392
|
|
|
except Exception as exp: |
|
393
|
|
|
shutil.rmtree(templated_tests, ignore_errors=True) |
|
394
|
|
|
raise exp |
|
395
|
|
|
|
|
396
|
|
|
|
|
397
|
1 |
|
def send_scripts(test_env): |
|
398
|
|
|
remote_dir = REMOTE_TEST_SCENARIOS_DIRECTORY |
|
399
|
|
|
archive_file = create_tarball(test_env.product) |
|
400
|
|
|
archive_file_basename = os.path.basename(archive_file) |
|
401
|
|
|
remote_archive_file = os.path.join(remote_dir, archive_file_basename) |
|
402
|
|
|
logging.debug("Uploading scripts.") |
|
403
|
|
|
log_file_name = os.path.join(LogHelper.LOG_DIR, "env-preparation.log") |
|
404
|
|
|
|
|
405
|
|
|
with open(log_file_name, 'a') as log_file: |
|
406
|
|
|
print("Setting up test setup scripts", file=log_file) |
|
407
|
|
|
|
|
408
|
|
|
test_env.execute_ssh_command( |
|
409
|
|
|
"mkdir -p {remote_dir}".format(remote_dir=remote_dir), |
|
410
|
|
|
log_file, "Cannot create directory {0}".format(remote_dir)) |
|
411
|
|
|
test_env.scp_upload_file( |
|
412
|
|
|
archive_file, remote_dir, |
|
413
|
|
|
log_file, "Cannot copy archive {0} to the target machine's directory {1}" |
|
414
|
|
|
.format(archive_file, remote_dir)) |
|
415
|
|
|
test_env.execute_ssh_command( |
|
416
|
|
|
"tar xf {remote_archive_file} -C {remote_dir}" |
|
417
|
|
|
.format(remote_dir=remote_dir, remote_archive_file=remote_archive_file), |
|
418
|
|
|
log_file, "Cannot extract data tarball {0}".format(remote_archive_file)) |
|
419
|
|
|
os.unlink(archive_file) |
|
420
|
|
|
return remote_dir |
|
421
|
|
|
|
|
422
|
|
|
|
|
423
|
1 |
|
def iterate_over_rules(product=None): |
|
424
|
|
|
"""Iterate over rule directories which have test scenarios". |
|
425
|
|
|
|
|
426
|
|
|
Returns: |
|
427
|
|
|
Named tuple Rule having these fields: |
|
428
|
|
|
directory -- absolute path to the rule "tests" subdirectory |
|
429
|
|
|
containing the test scenarios in Bash |
|
430
|
|
|
id -- full rule id as it is present in datastream |
|
431
|
|
|
short_id -- short rule ID, the same as basename of the directory |
|
432
|
|
|
containing the test scenarios in Bash |
|
433
|
|
|
files -- list of executable .sh files in the "tests" directory |
|
434
|
|
|
""" |
|
435
|
|
|
for dirpath, dirnames, filenames in walk_through_benchmark_dirs(product): |
|
436
|
|
|
if "rule.yml" in filenames and "tests" in dirnames: |
|
437
|
|
|
short_rule_id = os.path.basename(dirpath) |
|
438
|
|
|
tests_dir = os.path.join(dirpath, "tests") |
|
439
|
|
|
tests_dir_files = os.listdir(tests_dir) |
|
440
|
|
|
# Filter out everything except the shell test scenarios. |
|
441
|
|
|
# Other files in rule directories are editor swap files |
|
442
|
|
|
# or other content than a test case. |
|
443
|
|
|
scripts = filter(lambda x: x.endswith(".sh"), tests_dir_files) |
|
444
|
|
|
full_rule_id = OSCAP_RULE + short_rule_id |
|
445
|
|
|
result = Rule( |
|
446
|
|
|
directory=tests_dir, id=full_rule_id, short_id=short_rule_id, |
|
447
|
|
|
files=scripts) |
|
448
|
|
|
yield result |
|
449
|
|
|
|
|
450
|
|
|
|
|
451
|
1 |
|
def get_cpe_of_tested_os(test_env, log_file): |
|
452
|
|
|
os_release_file = "/etc/os-release" |
|
453
|
|
|
cpe_line = test_env.execute_ssh_command( |
|
454
|
|
|
"grep CPE_NAME {os_release_file}".format(os_release_file=os_release_file), |
|
455
|
|
|
log_file) |
|
456
|
|
|
# We are parsing an assignment that is possibly quoted |
|
457
|
|
|
cpe = re.match(r'''CPE_NAME=(["']?)(.*)\1''', cpe_line) |
|
458
|
|
|
if cpe and cpe.groups()[1]: |
|
459
|
|
|
return cpe.groups()[1] |
|
460
|
|
|
msg = ["Unable to get a CPE of the system running tests"] |
|
461
|
|
|
if cpe_line: |
|
462
|
|
|
msg.append( |
|
463
|
|
|
"Retreived a CPE line that we couldn't parse: {cpe_line}" |
|
464
|
|
|
.format(cpe_line=cpe_line)) |
|
465
|
|
|
else: |
|
466
|
|
|
msg.append( |
|
467
|
|
|
"Couldn't get CPE entry from '{os_release_file}'" |
|
468
|
|
|
.format(os_release_file=os_release_file)) |
|
469
|
|
|
raise RuntimeError("\n".join(msg)) |
|
470
|
|
|
|
|
471
|
|
|
|
|
472
|
1 |
|
INSTALL_COMMANDS = dict( |
|
473
|
|
|
fedora=("dnf", "install", "-y"), |
|
474
|
|
|
rhel7=("yum", "install", "-y"), |
|
475
|
|
|
rhel8=("yum", "install", "-y"), |
|
476
|
|
|
ubuntu=("DEBIAN_FRONTEND=noninteractive", "apt", "install", "-y"), |
|
477
|
|
|
) |
|
478
|
|
|
|
|
479
|
|
|
|
|
480
|
1 |
|
def install_packages(test_env, packages): |
|
481
|
|
|
log_file_name = os.path.join(LogHelper.LOG_DIR, "env-preparation.log") |
|
482
|
|
|
|
|
483
|
|
|
with open(log_file_name, "a") as log_file: |
|
484
|
|
|
platform_cpe = get_cpe_of_tested_os(test_env, log_file) |
|
485
|
|
|
platform = cpes_to_platform([platform_cpe]) |
|
486
|
|
|
|
|
487
|
|
|
command_str = " ".join(INSTALL_COMMANDS[platform] + tuple(packages)) |
|
488
|
|
|
|
|
489
|
|
|
with open(log_file_name, 'a') as log_file: |
|
490
|
|
|
print("Installing packages", file=log_file) |
|
491
|
|
|
log_file.flush() |
|
492
|
|
|
test_env.execute_ssh_command( |
|
493
|
|
|
command_str, log_file, |
|
494
|
|
|
"Couldn't install required packages {packages}".format(packages=packages)) |
|
495
|
|
|
|
|
496
|
|
|
|
|
497
|
1 |
|
def cpes_to_platform(cpes): |
|
498
|
|
|
for cpe in cpes: |
|
499
|
|
|
if "fedora" in cpe: |
|
500
|
|
|
return "fedora" |
|
501
|
|
|
if "redhat:enterprise_linux" in cpe: |
|
502
|
|
|
match = re.search(r":enterprise_linux:([^:]+):", cpe) |
|
503
|
|
|
if match: |
|
504
|
|
|
major_version = match.groups()[0].split(".")[0] |
|
505
|
|
|
return "rhel" + major_version |
|
506
|
|
|
if "ubuntu" in cpe: |
|
507
|
|
|
return "ubuntu" |
|
508
|
|
|
msg = "Unable to deduce a platform from these CPEs: {cpes}".format(cpes=cpes) |
|
509
|
|
|
raise ValueError(msg) |
|
510
|
|
|
|