1
|
|
|
from __future__ import print_function |
2
|
|
|
|
3
|
|
|
import functools |
4
|
|
|
import logging |
5
|
|
|
import os |
6
|
|
|
import re |
7
|
|
|
import shutil |
8
|
|
|
import subprocess |
9
|
|
|
import tarfile |
10
|
|
|
import tempfile |
11
|
|
|
import time |
12
|
|
|
from collections import namedtuple |
13
|
|
|
|
14
|
|
|
import ssg.yaml |
15
|
|
|
from ssg.build_cpe import ProductCPEs |
16
|
|
|
from ssg.build_yaml import Rule as RuleYAML |
17
|
|
|
from ssg.constants import MULTI_PLATFORM_MAPPING |
18
|
|
|
from ssg.constants import FULL_NAME_TO_PRODUCT_MAPPING |
19
|
|
|
from ssg.constants import OSCAP_RULE |
20
|
|
|
from ssg.jinja import process_file_with_macros |
21
|
|
|
from ssg.products import product_yaml_path, load_product_yaml |
22
|
|
|
from ssg.rules import get_rule_dir_yaml, is_rule_dir |
23
|
|
|
from ssg.rule_yaml import parse_prodtype |
24
|
|
|
from ssg.utils import mkdir_p |
25
|
|
|
from ssg_test_suite.log import LogHelper |
26
|
|
|
|
27
|
|
|
import ssg.templates |
28
|
|
|
|
29
|
|
|
|
30
|
|
|
Scenario_run = namedtuple( |
31
|
|
|
"Scenario_run", |
32
|
|
|
("rule_id", "script")) |
33
|
|
|
Scenario_conditions = namedtuple( |
34
|
|
|
"Scenario_conditions", |
35
|
|
|
("backend", "scanning_mode", "remediated_by", "datastream")) |
36
|
|
|
|
37
|
|
|
SSG_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) |
38
|
|
|
|
39
|
|
|
_BENCHMARK_DIRS = [ |
40
|
|
|
os.path.abspath(os.path.join(SSG_ROOT, 'linux_os', 'guide')), |
41
|
|
|
os.path.abspath(os.path.join(SSG_ROOT, 'applications')), |
42
|
|
|
] |
43
|
|
|
|
44
|
|
|
_SHARED_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '../shared')) |
45
|
|
|
|
46
|
|
|
_SHARED_TEMPLATES = os.path.abspath(os.path.join(SSG_ROOT, 'shared/templates')) |
47
|
|
|
|
48
|
|
|
TEST_SUITE_NAME="ssgts" |
49
|
|
|
TEST_SUITE_PREFIX = "_{}".format(TEST_SUITE_NAME) |
50
|
|
|
REMOTE_USER = "root" |
51
|
|
|
REMOTE_USER_HOME_DIRECTORY = "/root" |
52
|
|
|
REMOTE_TEST_SCENARIOS_DIRECTORY = os.path.join(REMOTE_USER_HOME_DIRECTORY, TEST_SUITE_NAME) |
53
|
|
|
|
54
|
|
|
try: |
55
|
|
|
SSH_ADDITIONAL_OPTS = tuple(os.environ.get('SSH_ADDITIONAL_OPTIONS').split()) |
56
|
|
|
except AttributeError: |
57
|
|
|
# If SSH_ADDITIONAL_OPTIONS is not defined set it to empty tuple. |
58
|
|
|
SSH_ADDITIONAL_OPTS = tuple() |
59
|
|
|
|
60
|
|
|
SSH_ADDITIONAL_OPTS = ( |
61
|
|
|
"-o", "StrictHostKeyChecking=no", |
62
|
|
|
"-o", "UserKnownHostsFile=/dev/null", |
63
|
|
|
) + SSH_ADDITIONAL_OPTS |
64
|
|
|
|
65
|
|
|
TESTS_CONFIG_NAME = "test_config.yml" |
66
|
|
|
|
67
|
|
|
|
68
|
|
|
def walk_through_benchmark_dirs(product=None): |
69
|
|
|
directories = _BENCHMARK_DIRS |
70
|
|
|
if product is not None: |
71
|
|
|
yaml_path = product_yaml_path(SSG_ROOT, product) |
72
|
|
|
product_base = os.path.dirname(yaml_path) |
73
|
|
|
product_yaml = load_product_yaml(yaml_path) |
74
|
|
|
benchmark_root = os.path.join(product_base, product_yaml['benchmark_root']) |
75
|
|
|
directories = [os.path.abspath(benchmark_root)] |
76
|
|
|
|
77
|
|
|
for dirname in directories: |
78
|
|
|
for dirpath, dirnames, filenames in os.walk(dirname): |
79
|
|
|
yield dirpath, dirnames, filenames |
80
|
|
|
|
81
|
|
|
|
82
|
|
|
class Stage(object): |
83
|
|
|
NONE = 0 |
84
|
|
|
PREPARATION = 1 |
85
|
|
|
INITIAL_SCAN = 2 |
86
|
|
|
REMEDIATION = 3 |
87
|
|
|
FINAL_SCAN = 4 |
88
|
|
|
|
89
|
|
|
|
90
|
|
|
@functools.total_ordering |
91
|
|
|
class RuleResult(object): |
92
|
|
|
STAGE_STRINGS = { |
93
|
|
|
"preparation", |
94
|
|
|
"initial_scan", |
95
|
|
|
"remediation", |
96
|
|
|
"final_scan", |
97
|
|
|
} |
98
|
|
|
|
99
|
|
|
""" |
100
|
|
|
Result of a test suite testing rule under a scenario. |
101
|
|
|
|
102
|
|
|
Supports ordering by success - the most successful run orders first. |
103
|
|
|
""" |
104
|
|
|
def __init__(self, result_dict=None): |
105
|
|
|
self.scenario = Scenario_run("", "") |
106
|
|
|
self.conditions = Scenario_conditions("", "", "", "") |
107
|
|
|
self.when = "" |
108
|
|
|
self.passed_stages = dict() |
109
|
|
|
self.passed_stages_count = 0 |
110
|
|
|
self.success = False |
111
|
|
|
|
112
|
|
|
if result_dict: |
113
|
|
|
self.load_from_dict(result_dict) |
114
|
|
|
|
115
|
|
|
def load_from_dict(self, data): |
116
|
|
|
self.scenario = Scenario_run(data["rule_id"], data["scenario_script"]) |
117
|
|
|
self.conditions = Scenario_conditions( |
118
|
|
|
data["backend"], data["scanning_mode"], |
119
|
|
|
data["remediated_by"], data["datastream"]) |
120
|
|
|
self.when = data["run_timestamp"] |
121
|
|
|
|
122
|
|
|
self.passed_stages = {key: data[key] for key in self.STAGE_STRINGS if key in data} |
123
|
|
|
self.passed_stages_count = sum(self.passed_stages.values()) |
124
|
|
|
|
125
|
|
|
self.success = data.get("final_scan", False) |
126
|
|
|
if not self.success: |
127
|
|
|
self.success = ( |
128
|
|
|
"remediation" not in data |
129
|
|
|
and data.get("initial_scan", False)) |
130
|
|
|
|
131
|
|
|
def save_to_dict(self): |
132
|
|
|
data = dict() |
133
|
|
|
data["rule_id"] = self.scenario.rule_id |
134
|
|
|
data["scenario_script"] = self.scenario.script |
135
|
|
|
|
136
|
|
|
data["backend"] = self.conditions.backend |
137
|
|
|
data["scanning_mode"] = self.conditions.scanning_mode |
138
|
|
|
data["remediated_by"] = self.conditions.remediated_by |
139
|
|
|
data["datastream"] = self.conditions.datastream |
140
|
|
|
|
141
|
|
|
data["run_timestamp"] = self.when |
142
|
|
|
|
143
|
|
|
for stage_str, result in self.passed_stages.items(): |
144
|
|
|
data[stage_str] = result |
145
|
|
|
|
146
|
|
|
return data |
147
|
|
|
|
148
|
|
|
def record_stage_result(self, stage, successful): |
149
|
|
|
assert stage in self.STAGE_STRINGS, ( |
150
|
|
|
"Stage name {name} is invalid, choose one from {choices}" |
151
|
|
|
.format(name=stage, choices=", ".join(self.STAGE_STRINGS)) |
152
|
|
|
) |
153
|
|
|
self.passed_stages[stage] = successful |
154
|
|
|
|
155
|
|
|
def relative_conditions_to(self, other): |
156
|
|
|
if self.conditions == other.conditions: |
157
|
|
|
return self.when, other.when |
158
|
|
|
else: |
159
|
|
|
return tuple(self.conditions), tuple(other.conditions) |
160
|
|
|
|
161
|
|
|
def __eq__(self, other): |
162
|
|
|
return (self.success == other.success |
163
|
|
|
and tuple(self.passed_stages) == tuple(other.passed_stages)) |
164
|
|
|
|
165
|
|
|
def __lt__(self, other): |
166
|
|
|
return self.passed_stages_count > other.passed_stages_count |
167
|
|
|
|
168
|
|
|
|
169
|
|
|
def run_cmd_local(command, verbose_path, env=None): |
170
|
|
|
command_string = ' '.join(command) |
171
|
|
|
logging.debug('Running {}'.format(command_string)) |
172
|
|
|
returncode, output = _run_cmd(command, verbose_path, env) |
173
|
|
|
return returncode, output |
174
|
|
|
|
175
|
|
|
|
176
|
|
|
def _run_cmd(command_list, verbose_path, env=None): |
177
|
|
|
returncode = 0 |
178
|
|
|
output = b"" |
179
|
|
|
try: |
180
|
|
|
with open(verbose_path, 'w') as verbose_file: |
181
|
|
|
output = subprocess.check_output( |
182
|
|
|
command_list, stderr=verbose_file, env=env) |
183
|
|
|
except subprocess.CalledProcessError as e: |
184
|
|
|
returncode = e.returncode |
185
|
|
|
output = e.output |
186
|
|
|
return returncode, output.decode('utf-8') |
187
|
|
|
|
188
|
|
|
|
189
|
|
|
def _get_platform_cpes(platform): |
190
|
|
|
ssg_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) |
191
|
|
|
if platform.startswith("multi_platform_"): |
192
|
|
|
try: |
193
|
|
|
products = MULTI_PLATFORM_MAPPING[platform] |
194
|
|
|
except KeyError: |
195
|
|
|
logging.error( |
196
|
|
|
"Unknown multi_platform specifier: %s is not from %s" |
197
|
|
|
% (platform, ", ".join(MULTI_PLATFORM_MAPPING.keys()))) |
198
|
|
|
raise ValueError |
199
|
|
|
platform_cpes = set() |
200
|
|
|
for p in products: |
201
|
|
|
product_yaml_path = os.path.join(ssg_root, "products", p, "product.yml") |
202
|
|
|
product_yaml = load_product_yaml(product_yaml_path) |
203
|
|
|
p_cpes = ProductCPEs() |
204
|
|
|
p_cpes.load_product_cpes(product_yaml) |
205
|
|
|
p_cpes.load_content_cpes(product_yaml) |
206
|
|
|
platform_cpes |= set(p_cpes.get_product_cpe_names()) |
207
|
|
|
return platform_cpes |
208
|
|
|
else: |
209
|
|
|
# scenario platform is specified by a full product name |
210
|
|
|
try: |
211
|
|
|
product = FULL_NAME_TO_PRODUCT_MAPPING[platform] |
212
|
|
|
except KeyError: |
213
|
|
|
logging.error( |
214
|
|
|
"Unknown product name: %s is not from %s" |
215
|
|
|
% (platform, ", ".join(FULL_NAME_TO_PRODUCT_MAPPING.keys()))) |
216
|
|
|
raise ValueError |
217
|
|
|
product_yaml_path = os.path.join(ssg_root, "products", product, "product.yml") |
218
|
|
|
product_yaml = load_product_yaml(product_yaml_path) |
219
|
|
|
product_cpes = ProductCPEs() |
220
|
|
|
product_cpes.load_product_cpes(product_yaml) |
221
|
|
|
product_cpes.load_content_cpes(product_yaml) |
222
|
|
|
platform_cpes = set(product_cpes.get_product_cpe_names()) |
223
|
|
|
return platform_cpes |
224
|
|
|
|
225
|
|
|
|
226
|
|
|
def matches_platform(scenario_platforms, benchmark_cpes): |
227
|
|
|
if "multi_platform_all" in scenario_platforms: |
228
|
|
|
return True |
229
|
|
|
scenario_cpes = set() |
230
|
|
|
for p in scenario_platforms: |
231
|
|
|
scenario_cpes |= _get_platform_cpes(p) |
232
|
|
|
return len(scenario_cpes & benchmark_cpes) > 0 |
233
|
|
|
|
234
|
|
|
|
235
|
|
|
def run_with_stdout_logging(command, args, log_file): |
236
|
|
|
log_file.write("{0} {1}\n".format(command, " ".join(args))) |
237
|
|
|
result = subprocess.run( |
238
|
|
|
(command,) + args, encoding="utf-8", stdout=subprocess.PIPE, |
239
|
|
|
stderr=subprocess.PIPE, check=False) |
240
|
|
|
if result.stdout: |
241
|
|
|
log_file.write("STDOUT: ") |
242
|
|
|
log_file.write(result.stdout) |
243
|
|
|
if result.stderr: |
244
|
|
|
log_file.write("STDERR: ") |
245
|
|
|
log_file.write(result.stderr) |
246
|
|
|
return result |
247
|
|
|
|
248
|
|
|
|
249
|
|
|
def _exclude_garbage(tarinfo): |
250
|
|
|
file_name = tarinfo.name |
251
|
|
|
if file_name.endswith('pyc'): |
252
|
|
|
return None |
253
|
|
|
if file_name.endswith('swp'): |
254
|
|
|
return None |
255
|
|
|
return tarinfo |
256
|
|
|
|
257
|
|
|
|
258
|
|
|
def _make_file_root_owned(tarinfo): |
259
|
|
|
if tarinfo: |
260
|
|
|
tarinfo.uid = 0 |
261
|
|
|
tarinfo.gid = 0 |
262
|
|
|
# set permission to 775 |
263
|
|
|
tarinfo.mode = 509 |
264
|
|
|
return tarinfo |
265
|
|
|
|
266
|
|
|
|
267
|
|
|
def get_product_context(product_id=None): |
268
|
|
|
""" |
269
|
|
|
Returns a product YAML context if any product is specified. Hard-coded to |
270
|
|
|
assume a debug build. |
271
|
|
|
""" |
272
|
|
|
# Load product's YAML file if present. This will allow us to parse |
273
|
|
|
# tests in the context of the product we're executing under. |
274
|
|
|
product_yaml = dict() |
275
|
|
|
if product_id: |
276
|
|
|
product = load_product_yaml(product_yaml_path(SSG_ROOT, product_id)) |
277
|
|
|
product_properties_path = os.path.join(SSG_ROOT, "product_properties") |
278
|
|
|
product.read_properties_from_directory(product_properties_path) |
279
|
|
|
product_yaml.update(product) |
280
|
|
|
|
281
|
|
|
# We could run into a DocumentationNotComplete error when loading a |
282
|
|
|
# rule's YAML contents. However, because the test suite isn't executed |
283
|
|
|
# in the context of a particular build (though, ideally it would be |
284
|
|
|
# linked), we may not know exactly whether the top-level rule/profile |
285
|
|
|
# we're testing is actually completed. Thus, forcibly set the required |
286
|
|
|
# property to bypass this error. |
287
|
|
|
product_yaml['cmake_build_type'] = 'Debug' |
288
|
|
|
|
289
|
|
|
# Set the Jinja processing environment to Test Suite, |
290
|
|
|
# this allows Jinja macros to behave differently in a content build time and in a test time. |
291
|
|
|
product_yaml['SSG_TEST_SUITE_ENV'] = True |
292
|
|
|
|
293
|
|
|
return product_yaml |
294
|
|
|
|
295
|
|
|
|
296
|
|
|
def load_rule_and_env(rule_dir_path, env_yaml, product=None): |
297
|
|
|
""" |
298
|
|
|
Loads a rule and returns the combination of the RuleYAML class and |
299
|
|
|
the corresponding local environment for that rule. |
300
|
|
|
""" |
301
|
|
|
|
302
|
|
|
# First build the path to the rule.yml file |
303
|
|
|
rule_path = get_rule_dir_yaml(rule_dir_path) |
304
|
|
|
|
305
|
|
|
# Load rule content in our environment. We use this to satisfy |
306
|
|
|
# some implied properties that might be used in the test suite. |
307
|
|
|
# Make sure we normalize to a specific product as well so that |
308
|
|
|
# when we load templated content it is correct. |
309
|
|
|
rule = RuleYAML.from_yaml(rule_path, env_yaml) |
310
|
|
|
rule.normalize(product) |
311
|
|
|
|
312
|
|
|
# Note that most places would check prodtype, but we don't care |
313
|
|
|
# about that here: if the rule is available to the product, we |
314
|
|
|
# load and parse it anyways as we have no knowledge of the |
315
|
|
|
# top-level profile or rule passed into the test suite. |
316
|
|
|
prodtypes = parse_prodtype(rule.prodtype) |
317
|
|
|
|
318
|
|
|
# Our local copy of env_yaml needs some properties from rule.yml |
319
|
|
|
# for completeness. |
320
|
|
|
local_env_yaml = dict() |
321
|
|
|
local_env_yaml.update(env_yaml) |
322
|
|
|
local_env_yaml['rule_id'] = rule.id_ |
323
|
|
|
local_env_yaml['rule_title'] = rule.title |
324
|
|
|
local_env_yaml['products'] = prodtypes |
325
|
|
|
|
326
|
|
|
return rule, local_env_yaml |
327
|
|
|
|
328
|
|
|
|
329
|
|
|
def write_rule_test_content_to_dir(rule_dir, test_content): |
330
|
|
|
for scenario in test_content.scenarios: |
331
|
|
|
scenario_file_path = os.path.join(rule_dir, scenario.script) |
332
|
|
|
with open(scenario_file_path, "w") as f: |
333
|
|
|
f.write(scenario.contents) |
334
|
|
|
for file_name, file_content in test_content.other_content.items(): |
335
|
|
|
file_path = os.path.join(rule_dir, file_name) |
336
|
|
|
with open(file_path, "w") as f: |
337
|
|
|
f.write(file_content) |
338
|
|
|
|
339
|
|
|
|
340
|
|
|
def create_tarball(test_content_by_rule_id): |
341
|
|
|
""" |
342
|
|
|
Create a tarball which contains all test scenarios and additional |
343
|
|
|
content for every rule that is selected to be tested. The tarball contains |
344
|
|
|
directories with the test scenarios. The name of the directories is the |
345
|
|
|
same as short rule ID. There is no tree structure. |
346
|
|
|
""" |
347
|
|
|
|
348
|
|
|
tmpdir = tempfile.mkdtemp() |
349
|
|
|
for rule_id, test_content in test_content_by_rule_id.items(): |
350
|
|
|
short_rule_id = rule_id.replace(OSCAP_RULE, "") |
351
|
|
|
rule_dir = os.path.join(tmpdir, short_rule_id) |
352
|
|
|
mkdir_p(rule_dir) |
353
|
|
|
write_rule_test_content_to_dir(rule_dir, test_content) |
354
|
|
|
|
355
|
|
|
try: |
356
|
|
|
with tempfile.NamedTemporaryFile( |
357
|
|
|
"wb", suffix=".tar.gz", delete=False) as fp: |
358
|
|
|
with tarfile.TarFile.open(fileobj=fp, mode="w") as tarball: |
359
|
|
|
tarball.add(_SHARED_DIR, arcname="shared", filter=_make_file_root_owned) |
360
|
|
|
for rule_id in os.listdir(tmpdir): |
361
|
|
|
# When a top-level directory exists under the temporary |
362
|
|
|
# templated tests directory, we've already validated that |
363
|
|
|
# it is a valid rule directory. Thus we can simply add it |
364
|
|
|
# to the tarball. |
365
|
|
|
absolute_dir = os.path.join(tmpdir, rule_id) |
366
|
|
|
if not os.path.isdir(absolute_dir): |
367
|
|
|
continue |
368
|
|
|
|
369
|
|
|
tarball.add( |
370
|
|
|
absolute_dir, arcname=rule_id, |
371
|
|
|
filter=lambda tinfo: _exclude_garbage(_make_file_root_owned(tinfo)) |
372
|
|
|
) |
373
|
|
|
|
374
|
|
|
# Since we've added the templated contents into the tarball, we |
375
|
|
|
# can now delete the tree. |
376
|
|
|
shutil.rmtree(tmpdir, ignore_errors=True) |
377
|
|
|
return fp.name |
378
|
|
|
except Exception as exp: |
379
|
|
|
shutil.rmtree(tmpdir, ignore_errors=True) |
380
|
|
|
raise exp |
381
|
|
|
|
382
|
|
|
|
383
|
|
|
def send_scripts(test_env, test_content_by_rule_id): |
384
|
|
|
remote_dir = REMOTE_TEST_SCENARIOS_DIRECTORY |
385
|
|
|
archive_file = create_tarball(test_content_by_rule_id) |
386
|
|
|
archive_file_basename = os.path.basename(archive_file) |
387
|
|
|
remote_archive_file = os.path.join(remote_dir, archive_file_basename) |
388
|
|
|
logging.debug("Uploading scripts.") |
389
|
|
|
log_file_name = os.path.join(LogHelper.LOG_DIR, "env-preparation.log") |
390
|
|
|
|
391
|
|
|
with open(log_file_name, 'a') as log_file: |
392
|
|
|
print("Setting up test setup scripts", file=log_file) |
393
|
|
|
|
394
|
|
|
test_env.execute_ssh_command( |
395
|
|
|
"mkdir -p {remote_dir}".format(remote_dir=remote_dir), |
396
|
|
|
log_file, "Cannot create directory {0}".format(remote_dir)) |
397
|
|
|
test_env.scp_upload_file( |
398
|
|
|
archive_file, remote_dir, |
399
|
|
|
log_file, "Cannot copy archive {0} to the target machine's directory {1}" |
400
|
|
|
.format(archive_file, remote_dir)) |
401
|
|
|
test_env.execute_ssh_command( |
402
|
|
|
"tar xf {remote_archive_file} -C {remote_dir}" |
403
|
|
|
.format(remote_dir=remote_dir, remote_archive_file=remote_archive_file), |
404
|
|
|
log_file, "Cannot extract data tarball {0}".format(remote_archive_file)) |
405
|
|
|
os.unlink(archive_file) |
406
|
|
|
return remote_dir |
407
|
|
|
|
408
|
|
|
|
409
|
|
|
def get_prefixed_name(state_name): |
410
|
|
|
return "{}_{}".format(TEST_SUITE_PREFIX, state_name) |
411
|
|
|
|
412
|
|
|
|
413
|
|
|
def get_test_dir_config(test_dir, product_yaml): |
414
|
|
|
test_config = dict() |
415
|
|
|
test_config_filename = os.path.join(test_dir, TESTS_CONFIG_NAME) |
416
|
|
|
if os.path.exists(test_config_filename): |
417
|
|
|
test_config = ssg.yaml.open_and_expand(test_config_filename, product_yaml) |
418
|
|
|
return test_config |
419
|
|
|
|
420
|
|
|
|
421
|
|
|
def select_templated_tests(test_dir_config, available_scenarios_basenames): |
422
|
|
|
deny_scenarios = set(test_dir_config.get("deny_templated_scenarios", [])) |
423
|
|
|
available_scenarios_basenames = { |
424
|
|
|
test_name for test_name in available_scenarios_basenames |
425
|
|
|
if test_name not in deny_scenarios |
426
|
|
|
} |
427
|
|
|
|
428
|
|
|
allow_scenarios = set(test_dir_config.get("allow_templated_scenarios", [])) |
429
|
|
|
if allow_scenarios: |
430
|
|
|
available_scenarios_basenames = { |
431
|
|
|
test_name for test_name in available_scenarios_basenames |
432
|
|
|
if test_name in allow_scenarios |
433
|
|
|
} |
434
|
|
|
|
435
|
|
|
allowed_and_denied = deny_scenarios.intersection(allow_scenarios) |
436
|
|
|
if allowed_and_denied: |
437
|
|
|
msg = ( |
438
|
|
|
"Test directory configuration contain inconsistencies: {allowed_and_denied} " |
439
|
|
|
"scenarios are both allowed and denied." |
440
|
|
|
.format(allowed_and_denied=allowed_and_denied) |
441
|
|
|
) |
442
|
|
|
raise ValueError(msg) |
443
|
|
|
return available_scenarios_basenames |
444
|
|
|
|
445
|
|
|
|
446
|
|
|
def fetch_templated_tests_paths( |
447
|
|
|
rule_namedtuple, product_yaml): |
448
|
|
|
rule = rule_namedtuple.rule |
449
|
|
|
if not rule.template or not rule.template['vars']: |
450
|
|
|
return dict() |
451
|
|
|
tests_paths = fetch_all_templated_tests_paths(rule.template) |
452
|
|
|
test_config = get_test_dir_config(rule_namedtuple.directory, product_yaml) |
453
|
|
|
allowed_tests_paths = select_templated_tests( |
454
|
|
|
test_config, tests_paths.keys()) |
455
|
|
|
templated_test_scenarios = { |
456
|
|
|
name: tests_paths[name] for name in allowed_tests_paths} |
457
|
|
|
return templated_test_scenarios |
458
|
|
|
|
459
|
|
|
|
460
|
|
|
def fetch_all_templated_tests_paths(rule_template): |
461
|
|
|
""" |
462
|
|
|
Builds a dictionary of a test case relative path -> test case absolute path mapping. |
463
|
|
|
|
464
|
|
|
Here, we want to know what the relative path on disk (under the tests/ |
465
|
|
|
subdirectory) is (such as "installed.pass.sh"), along with the actual |
466
|
|
|
absolute path. |
467
|
|
|
""" |
468
|
|
|
template_name = rule_template['name'] |
469
|
|
|
|
470
|
|
|
base_dir = os.path.abspath(os.path.join(_SHARED_TEMPLATES, template_name, "tests")) |
471
|
|
|
results = dict() |
472
|
|
|
|
473
|
|
|
# If no test cases exist, return an empty dictionary. |
474
|
|
|
if not os.path.exists(base_dir): |
475
|
|
|
return results |
476
|
|
|
|
477
|
|
|
# Walk files; note that we don't need to do anything about directories |
478
|
|
|
# as only files are recorded in the mapping; directories can be |
479
|
|
|
# inferred from the path. |
480
|
|
|
for dirpath, _, filenames in os.walk(base_dir): |
481
|
|
|
if not filenames: |
482
|
|
|
continue |
483
|
|
|
|
484
|
|
|
for filename in filenames: |
485
|
|
|
if filename.endswith(".swp"): |
486
|
|
|
continue |
487
|
|
|
|
488
|
|
|
# Relative path to the file becomes our results key. |
489
|
|
|
absolute_path = os.path.abspath(os.path.join(dirpath, filename)) |
490
|
|
|
relative_path = os.path.relpath(absolute_path, base_dir) |
491
|
|
|
|
492
|
|
|
# Save the results under the relative path. |
493
|
|
|
results[relative_path] = absolute_path |
494
|
|
|
return results |
495
|
|
|
|
496
|
|
|
|
497
|
|
|
def load_templated_tests( |
498
|
|
|
templated_tests_paths, template, local_env_yaml): |
499
|
|
|
templated_tests = dict() |
500
|
|
|
for path in templated_tests_paths: |
501
|
|
|
test = load_test(path, template, local_env_yaml) |
502
|
|
|
basename = os.path.basename(path) |
503
|
|
|
templated_tests[basename] = test |
504
|
|
|
return templated_tests |
505
|
|
|
|
506
|
|
|
|
507
|
|
|
def load_test(absolute_path, rule_template, local_env_yaml): |
508
|
|
|
template_name = rule_template['name'] |
509
|
|
|
template_vars = rule_template['vars'] |
510
|
|
|
# Load template parameters and apply it to the test case. |
511
|
|
|
maybe_template = ssg.templates.Template.load_template(_SHARED_TEMPLATES, template_name) |
512
|
|
|
if maybe_template is not None: |
513
|
|
|
template_parameters = maybe_template.preprocess(template_vars, "tests") |
514
|
|
|
else: |
515
|
|
|
raise ValueError("Rule uses template '{}' " |
516
|
|
|
"which doesn't exist in '{}".format(template_name, _SHARED_TEMPLATES)) |
517
|
|
|
|
518
|
|
|
jinja_dict = ssg.utils.merge_dicts(local_env_yaml, template_parameters) |
519
|
|
|
filled_template = ssg.jinja.process_file_with_macros( |
520
|
|
|
absolute_path, jinja_dict) |
521
|
|
|
return filled_template |
522
|
|
|
|
523
|
|
|
|
524
|
|
|
def file_known_as_useless(file_name): |
525
|
|
|
return file_name.endswith(".swp") |
526
|
|
|
|
527
|
|
|
|
528
|
|
|
def fetch_local_tests_paths(tests_dir): |
529
|
|
|
if not os.path.exists(tests_dir): |
530
|
|
|
return dict() |
531
|
|
|
all_tests = dict() |
532
|
|
|
tests_dir_files = os.listdir(tests_dir) |
533
|
|
|
for test_case in tests_dir_files: |
534
|
|
|
# Skip vim swap files, they are not relevant and cause Jinja |
535
|
|
|
# expansion tracebacks |
536
|
|
|
if file_known_as_useless(test_case): |
537
|
|
|
continue |
538
|
|
|
test_path = os.path.join(tests_dir, test_case) |
539
|
|
|
if os.path.isdir(test_path): |
540
|
|
|
continue |
541
|
|
|
all_tests[test_case] = test_path |
542
|
|
|
return all_tests |
543
|
|
|
|
544
|
|
|
|
545
|
|
|
def load_local_tests(local_tests_paths, local_env_yaml): |
546
|
|
|
local_tests = dict() |
547
|
|
|
for path in local_tests_paths: |
548
|
|
|
test = process_file_with_macros(path, local_env_yaml) |
549
|
|
|
basename = os.path.basename(path) |
550
|
|
|
local_tests[basename] = test |
551
|
|
|
return local_tests |
552
|
|
|
|
553
|
|
|
|
554
|
|
|
def get_cpe_of_tested_os(test_env, log_file): |
555
|
|
|
os_release_file = "/etc/os-release" |
556
|
|
|
cpe_line = test_env.execute_ssh_command( |
557
|
|
|
"grep CPE_NAME {os_release_file}".format(os_release_file=os_release_file), |
558
|
|
|
log_file) |
559
|
|
|
# We are parsing an assignment that is possibly quoted |
560
|
|
|
cpe = re.match(r'''CPE_NAME=(["']?)(.*)\1''', cpe_line) |
561
|
|
|
if cpe and cpe.groups()[1]: |
562
|
|
|
return cpe.groups()[1] |
563
|
|
|
msg = ["Unable to get a CPE of the system running tests"] |
564
|
|
|
if cpe_line: |
565
|
|
|
msg.append( |
566
|
|
|
"Retreived a CPE line that we couldn't parse: {cpe_line}" |
567
|
|
|
.format(cpe_line=cpe_line)) |
568
|
|
|
else: |
569
|
|
|
msg.append( |
570
|
|
|
"Couldn't get CPE entry from '{os_release_file}'" |
571
|
|
|
.format(os_release_file=os_release_file)) |
572
|
|
|
raise RuntimeError("\n".join(msg)) |
573
|
|
|
|
574
|
|
|
|
575
|
|
|
INSTALL_COMMANDS = dict( |
576
|
|
|
fedora=("dnf", "install", "-y"), |
577
|
|
|
ol7=("yum", "install", "-y"), |
578
|
|
|
ol8=("yum", "install", "-y"), |
579
|
|
|
ol9=("yum", "install", "-y"), |
580
|
|
|
rhel7=("yum", "install", "-y"), |
581
|
|
|
rhel8=("yum", "install", "-y"), |
582
|
|
|
rhel9=("yum", "install", "-y"), |
583
|
|
|
sles=("zypper", "install", "-y"), |
584
|
|
|
ubuntu=("DEBIAN_FRONTEND=noninteractive", "apt", "install", "-y"), |
585
|
|
|
) |
586
|
|
|
|
587
|
|
|
|
588
|
|
|
def install_packages(test_env, packages): |
589
|
|
|
log_file_name = os.path.join(LogHelper.LOG_DIR, "env-preparation.log") |
590
|
|
|
|
591
|
|
|
with open(log_file_name, "a") as log_file: |
592
|
|
|
platform_cpe = get_cpe_of_tested_os(test_env, log_file) |
593
|
|
|
platform = cpes_to_platform([platform_cpe]) |
594
|
|
|
|
595
|
|
|
command_str = " ".join(INSTALL_COMMANDS[platform] + tuple(packages)) |
596
|
|
|
|
597
|
|
|
with open(log_file_name, 'a') as log_file: |
598
|
|
|
print("Installing packages", file=log_file) |
599
|
|
|
log_file.flush() |
600
|
|
|
test_env.execute_ssh_command( |
601
|
|
|
command_str, log_file, |
602
|
|
|
"Couldn't install required packages: {packages}".format(packages=",".join(packages))) |
603
|
|
|
|
604
|
|
|
|
605
|
|
|
def _match_rhel_version(cpe): |
606
|
|
|
rhel_cpe = { |
607
|
|
|
"redhat:enterprise_linux": r":enterprise_linux:([^:]+):", |
608
|
|
|
"centos:centos": r"centos:centos:([0-9]+)"} |
609
|
|
|
for cpe_item in rhel_cpe.keys(): |
610
|
|
|
if cpe_item in cpe: |
611
|
|
|
match = re.search(rhel_cpe.get(cpe_item), cpe) |
612
|
|
|
if match: |
613
|
|
|
major_version = match.groups()[0].split(".")[0] |
614
|
|
|
return "rhel" + major_version |
615
|
|
|
|
616
|
|
|
|
617
|
|
|
def cpe_to_platform(cpe): |
618
|
|
|
trivials = ["fedora", "sles", "ubuntu"] |
619
|
|
|
for platform in trivials: |
620
|
|
|
if platform in cpe: |
621
|
|
|
return platform |
622
|
|
|
rhel_version = _match_rhel_version(cpe) |
623
|
|
|
if rhel_version is not None: |
624
|
|
|
return rhel_version |
625
|
|
|
if "oracle:linux" in cpe: |
626
|
|
|
match = re.search(r":linux:([^:]+):", cpe) |
627
|
|
|
if match: |
628
|
|
|
major_version = match.groups()[0] |
629
|
|
|
return "ol" + major_version |
630
|
|
|
|
631
|
|
|
|
632
|
|
|
def cpes_to_platform(cpes): |
633
|
|
|
for cpe in cpes: |
634
|
|
|
platform = cpe_to_platform(cpe) |
635
|
|
|
if platform is not None: |
636
|
|
|
return platform |
637
|
|
|
msg = "Unable to deduce a platform from these CPEs: {cpes}".format(cpes=cpes) |
638
|
|
|
raise ValueError(msg) |
639
|
|
|
|
640
|
|
|
|
641
|
|
|
def retry_with_stdout_logging(command, args, log_file, max_attempts=5): |
642
|
|
|
attempt = 0 |
643
|
|
|
while attempt < max_attempts: |
644
|
|
|
result = run_with_stdout_logging(command, args, log_file) |
645
|
|
|
if result.returncode == 0: |
646
|
|
|
return result |
647
|
|
|
attempt += 1 |
648
|
|
|
time.sleep(1) |
649
|
|
|
return result |
|
|
|
|
650
|
|
|
|