Test Failed
Pull Request — master (#8052)
by Matěj
03:38
created

select_templated_tests()   A

Complexity

Conditions 3

Size

Total Lines 23
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 9.762

Importance

Changes 0
Metric Value
cc 3
eloc 17
nop 2
dl 0
loc 23
ccs 1
cts 11
cp 0.0909
crap 9.762
rs 9.55
c 0
b 0
f 0
1 1
from __future__ import print_function
2
3 1
import os
4 1
import logging
5 1
import subprocess
6 1
from collections import namedtuple
7 1
import functools
8 1
import tarfile
9 1
import tempfile
10 1
import re
11 1
import shutil
12
13 1
import ssg.yaml
14 1
from ssg.build_cpe import ProductCPEs
15 1
from ssg.build_yaml import Rule as RuleYAML
16 1
from ssg.constants import MULTI_PLATFORM_MAPPING
17 1
from ssg.constants import FULL_NAME_TO_PRODUCT_MAPPING
18 1
from ssg.constants import OSCAP_RULE
19 1
from ssg.jinja import process_file_with_macros
20 1
from ssg.products import product_yaml_path, load_product_yaml
21 1
from ssg.rules import get_rule_dir_yaml, is_rule_dir
22 1
from ssg.rule_yaml import parse_prodtype
23 1
from ssg_test_suite.log import LogHelper
24
25 1
import ssg.templates
26
27
28 1
Scenario_run = namedtuple(
29
    "Scenario_run",
30
    ("rule_id", "script"))
31 1
Scenario_conditions = namedtuple(
32
    "Scenario_conditions",
33
    ("backend", "scanning_mode", "remediated_by", "datastream"))
34 1
Rule = namedtuple(
35
    "Rule", ["directory", "id", "short_id", "scenarios_basenames", "template"])
36
37 1
SSG_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
38
39 1
_BENCHMARK_DIRS = [
40
        os.path.abspath(os.path.join(SSG_ROOT, 'linux_os', 'guide')),
41
        os.path.abspath(os.path.join(SSG_ROOT, 'applications')),
42
        ]
43
44 1
_SHARED_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '../shared'))
45
46 1
_SHARED_TEMPLATES = os.path.abspath(os.path.join(SSG_ROOT, 'shared/templates'))
47
48 1
REMOTE_USER = "root"
49 1
REMOTE_USER_HOME_DIRECTORY = "/root"
50 1
REMOTE_TEST_SCENARIOS_DIRECTORY = os.path.join(REMOTE_USER_HOME_DIRECTORY, "ssgts")
51
52 1
try:
53 1
    SSH_ADDITIONAL_OPTS = tuple(os.environ.get('SSH_ADDITIONAL_OPTIONS').split())
54 1
except AttributeError:
55
    # If SSH_ADDITIONAL_OPTIONS is not defined set it to empty tuple.
56 1
    SSH_ADDITIONAL_OPTS = tuple()
57
58 1
SSH_ADDITIONAL_OPTS = (
59
    "-o", "StrictHostKeyChecking=no",
60
    "-o", "UserKnownHostsFile=/dev/null",
61
) + SSH_ADDITIONAL_OPTS
62
63 1
TESTS_CONFIG_NAME = "test_config.yml"
64
65
66 1
def walk_through_benchmark_dirs(product=None):
67
    directories = _BENCHMARK_DIRS
68
    if product is not None:
69
        yaml_path = product_yaml_path(SSG_ROOT, product)
70
        product_base = os.path.dirname(yaml_path)
71
        product_yaml = load_product_yaml(yaml_path)
72
        benchmark_root = os.path.join(product_base, product_yaml['benchmark_root'])
73
        directories = [os.path.abspath(benchmark_root)]
74
75
    for dirname in directories:
76
        for dirpath, dirnames, filenames in os.walk(dirname):
77
            yield dirpath, dirnames, filenames
78
79
80 1
class Stage(object):
81 1
    NONE = 0
82 1
    PREPARATION = 1
83 1
    INITIAL_SCAN = 2
84 1
    REMEDIATION = 3
85 1
    FINAL_SCAN = 4
86
87
88 1
@functools.total_ordering
89 1
class RuleResult(object):
90 1
    STAGE_STRINGS = {
91
        "preparation",
92
        "initial_scan",
93
        "remediation",
94
        "final_scan",
95
    }
96
97
    """
98
    Result of a test suite testing rule under a scenario.
99
100
    Supports ordering by success - the most successful run orders first.
101
    """
102 1
    def __init__(self, result_dict=None):
103 1
        self.scenario = Scenario_run("", "")
104 1
        self.conditions = Scenario_conditions("", "", "", "")
105 1
        self.when = ""
106 1
        self.passed_stages = dict()
107 1
        self.passed_stages_count = 0
108 1
        self.success = False
109
110 1
        if result_dict:
111 1
            self.load_from_dict(result_dict)
112
113 1
    def load_from_dict(self, data):
114 1
        self.scenario = Scenario_run(data["rule_id"], data["scenario_script"])
115 1
        self.conditions = Scenario_conditions(
116
            data["backend"], data["scanning_mode"],
117
            data["remediated_by"], data["datastream"])
118 1
        self.when = data["run_timestamp"]
119
120 1
        self.passed_stages = {key: data[key] for key in self.STAGE_STRINGS if key in data}
121 1
        self.passed_stages_count = sum(self.passed_stages.values())
122
123 1
        self.success = data.get("final_scan", False)
124 1
        if not self.success:
125 1
            self.success = (
126
                "remediation" not in data
127
                and data.get("initial_scan", False))
128
129 1
    def save_to_dict(self):
130 1
        data = dict()
131 1
        data["rule_id"] = self.scenario.rule_id
132 1
        data["scenario_script"] = self.scenario.script
133
134 1
        data["backend"] = self.conditions.backend
135 1
        data["scanning_mode"] = self.conditions.scanning_mode
136 1
        data["remediated_by"] = self.conditions.remediated_by
137 1
        data["datastream"] = self.conditions.datastream
138
139 1
        data["run_timestamp"] = self.when
140
141 1
        for stage_str, result in self.passed_stages.items():
142 1
            data[stage_str] = result
143
144 1
        return data
145
146 1
    def record_stage_result(self, stage, successful):
147
        assert stage in self.STAGE_STRINGS, (
148
            "Stage name {name} is invalid, choose one from {choices}"
149
            .format(name=stage, choices=", ".join(self.STAGE_STRINGS))
150
        )
151
        self.passed_stages[stage] = successful
152
153 1
    def relative_conditions_to(self, other):
154 1
        if self.conditions == other.conditions:
155
            return self.when, other.when
156
        else:
157 1
            return tuple(self.conditions), tuple(other.conditions)
158
159 1
    def __eq__(self, other):
160 1
        return (self.success == other.success
161
                and tuple(self.passed_stages) == tuple(self.passed_stages))
162
163 1
    def __lt__(self, other):
164 1
        return self.passed_stages_count > other.passed_stages_count
165
166
167 1
def run_cmd_local(command, verbose_path, env=None):
168
    command_string = ' '.join(command)
169
    logging.debug('Running {}'.format(command_string))
170
    returncode, output = _run_cmd(command, verbose_path, env)
171
    return returncode, output
172
173
174 1
def _run_cmd(command_list, verbose_path, env=None):
175
    returncode = 0
176
    output = b""
177
    try:
178
        with open(verbose_path, 'w') as verbose_file:
179
            output = subprocess.check_output(
180
                command_list, stderr=verbose_file, env=env)
181
    except subprocess.CalledProcessError as e:
182
        returncode = e.returncode
183
        output = e.output
184
    return returncode, output.decode('utf-8')
185
186
187 1
def _get_platform_cpes(platform):
188 1
    ssg_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
189 1
    if platform.startswith("multi_platform_"):
190 1
        try:
191 1
            products = MULTI_PLATFORM_MAPPING[platform]
192 1
        except KeyError:
193 1
            logging.error(
194
                "Unknown multi_platform specifier: %s is not from %s"
195
                % (platform, ", ".join(MULTI_PLATFORM_MAPPING.keys())))
196 1
            raise ValueError
197 1
        platform_cpes = set()
198 1
        for p in products:
199 1
            product_yaml_path = os.path.join(ssg_root, "products", p, "product.yml")
200 1
            product_yaml = load_product_yaml(product_yaml_path)
201 1
            p_cpes = ProductCPEs(product_yaml)
202 1
            platform_cpes |= set(p_cpes.get_product_cpe_names())
203 1
        return platform_cpes
204
    else:
205
        # scenario platform is specified by a full product name
206 1
        try:
207 1
            product = FULL_NAME_TO_PRODUCT_MAPPING[platform]
208 1
        except KeyError:
209 1
            logging.error(
210
                "Unknown product name: %s is not from %s"
211
                % (platform, ", ".join(FULL_NAME_TO_PRODUCT_MAPPING.keys())))
212 1
            raise ValueError
213 1
        product_yaml_path = os.path.join(ssg_root, "products", product, "product.yml")
214 1
        product_yaml = load_product_yaml(product_yaml_path)
215 1
        product_cpes = ProductCPEs(product_yaml)
216 1
        platform_cpes = set(product_cpes.get_product_cpe_names())
217 1
        return platform_cpes
218
219
220 1
def matches_platform(scenario_platforms, benchmark_cpes):
221 1
    if "multi_platform_all" in scenario_platforms:
222 1
        return True
223 1
    scenario_cpes = set()
224 1
    for p in scenario_platforms:
225 1
        scenario_cpes |= _get_platform_cpes(p)
226 1
    return len(scenario_cpes & benchmark_cpes) > 0
227
228
229 1
def run_with_stdout_logging(command, args, log_file):
230
    log_file.write("{0} {1}\n".format(command, " ".join(args)))
231
    result = subprocess.run(
232
            (command,) + args, encoding="utf-8", stdout=subprocess.PIPE,
233
            stderr=subprocess.PIPE, check=False)
234
    if result.stdout:
235
        log_file.write("STDOUT: ")
236
        log_file.write(result.stdout)
237
    if result.stderr:
238
        log_file.write("STDERR: ")
239
        log_file.write(result.stderr)
240
    return result
241
242
243 1
def _exclude_garbage(tarinfo):
244
    file_name = tarinfo.name
245
    if file_name.endswith('pyc'):
246
        return None
247
    if file_name.endswith('swp'):
248
        return None
249
    return tarinfo
250
251
252 1
def _make_file_root_owned(tarinfo):
253
    if tarinfo:
254
        tarinfo.uid = 0
255
        tarinfo.gid = 0
256
        # set permission to 775
257
        tarinfo.mode = 509
258
    return tarinfo
259
260
261 1
def get_product_context(product=None):
262
    """
263
    Returns a product YAML context if any product is specified. Hard-coded to
264
    assume a debug build.
265
    """
266
    # Load product's YAML file if present. This will allow us to parse
267
    # tests in the context of the product we're executing under.
268
    product_yaml = dict()
269
    if product:
270
        yaml_path = product_yaml_path(SSG_ROOT, product)
271
        product_yaml = load_product_yaml(yaml_path)
272
273
    # We could run into a DocumentationNotComplete error when loading a
274
    # rule's YAML contents. However, because the test suite isn't executed
275
    # in the context of a particular build (though, ideally it would be
276
    # linked), we may not know exactly whether the top-level rule/profile
277
    # we're testing is actually completed. Thus, forcibly set the required
278
    # property to bypass this error.
279
    product_yaml['cmake_build_type'] = 'Debug'
280
281
    # Set the Jinja processing environment to Test Suite,
282
    # this allows Jinja macros to behave differently in a content build time and in a test time.
283
    product_yaml['SSG_TEST_SUITE_ENV'] = True
284
285
    return product_yaml
286
287
288 1
def load_rule_and_env(rule_dir_path, env_yaml, product=None):
289
    """
290
    Loads a rule and returns the combination of the RuleYAML class and
291
    the corresponding local environment for that rule.
292
    """
293
294
    # First build the path to the rule.yml file
295
    rule_path = get_rule_dir_yaml(rule_dir_path)
296
297
    # Load rule content in our environment. We use this to satisfy
298
    # some implied properties that might be used in the test suite.
299
    # Make sure we normalize to a specific product as well so that
300
    # when we load templated content it is correct.
301
    rule = RuleYAML.from_yaml(rule_path, env_yaml)
302
    rule.normalize(product)
303
304
    # Note that most places would check prodtype, but we don't care
305
    # about that here: if the rule is available to the product, we
306
    # load and parse it anyways as we have no knowledge of the
307
    # top-level profile or rule passed into the test suite.
308
    prodtypes = parse_prodtype(rule.prodtype)
309
310
    # Our local copy of env_yaml needs some properties from rule.yml
311
    # for completeness.
312
    local_env_yaml = dict()
313
    local_env_yaml.update(env_yaml)
314
    local_env_yaml['rule_id'] = rule.id_
315
    local_env_yaml['rule_title'] = rule.title
316
    local_env_yaml['products'] = prodtypes
317
318
    return rule, local_env_yaml
319
320
321 1
def write_rule_templated_tests(dest_path, relative_path, test_content):
322
    output_path = os.path.join(dest_path, relative_path)
323
324
    # If there's a separator in the file name, it means we have nested
325
    # directories to deal with.
326
    if os.path.sep in relative_path:
327
        parts = os.path.split(relative_path)[:-1]
328
        for subdir_index in range(len(parts)):
329
            # We need to expand all directories in the correct order,
330
            # preserving any previous directories (as they're nested).
331
            # Use the star operator to splat array parts into arguments
332
            # to os.path.join(...).
333
            new_directory = os.path.join(dest_path, *parts[:subdir_index])
334
            os.mkdir(new_directory)
335
336
    # Write out the test content to the desired location on disk.
337
    with open(output_path, 'w') as output_fp:
338
        print(test_content, file=output_fp)
339
340
341 1
def write_rule_dir_tests(local_env_yaml, dest_path, dirpath):
342
    # Walk the test directory, writing all tests into the output
343
    # directory, recursively.
344
    tests_dir_path = os.path.join(dirpath, "tests")
345
    tests_dir_path = os.path.abspath(tests_dir_path)
346
347
    # Note that the tests/ directory may not always exist any more. In
348
    # particular, when a rule uses a template, tests may be present there
349
    # but not present in the actual rule directory.
350
    if not os.path.exists(tests_dir_path):
351
        return
352
353
    for dirpath, dirnames, filenames in os.walk(tests_dir_path):
354
        for dirname in dirnames:
355
            # We want to recreate the correct path under the temporary
356
            # directory. Resolve it to a relative path from the tests/
357
            # directory.
358
            dir_path = os.path.relpath(os.path.join(dirpath, dirname), tests_dir_path)
359
            tmp_dir_path = os.path.join(dest_path, dir_path)
360
            os.mkdir(tmp_dir_path)
361
362
        for filename in filenames:
363
            # We want to recreate the correct path under the temporary
364
            # directory. Resolve it to a relative path from the tests/
365
            # directory. Assumption: directories should be created
366
            # prior to recursing into them, so we don't need to handle
367
            # if a file's parent directory doesn't yet exist under the
368
            # destination.
369
            src_test_path = os.path.join(dirpath, filename)
370
            rel_test_path = os.path.relpath(src_test_path, tests_dir_path)
371
            dest_test_path = os.path.join(dest_path, rel_test_path)
372
373
            # Rather than performing an OS-level copy, we need to
374
            # first parse the test with jinja and then write it back
375
            # out to the destination.
376
            parsed_test = process_file_with_macros(src_test_path, local_env_yaml)
377
            with open(dest_test_path, 'w') as output_fp:
378
                print(parsed_test, file=output_fp)
379
380
381 1
def template_rule_tests(product, product_yaml, template_builder, tmpdir, dirpath):
382
    """
383
    For a given rule directory, templates all contained tests into the output
384
    (tmpdir) directory.
385
    """
386
387
    # Load the rule and its environment
388
    rule, local_env_yaml = load_rule_and_env(dirpath, product_yaml, product)
389
390
    # Before we get too far, we wish to search the rule YAML to see if
391
    # it is applicable to the current product. If we have a product
392
    # and the rule isn't applicable for the product, there's no point
393
    # in continuing with the rest of the loading. This should speed up
394
    # the loading of the templated tests. Note that we've already
395
    # parsed the prodtype into local_env_yaml
396
    if product and local_env_yaml['products']:
397
        prodtypes = local_env_yaml['products']
398
        if "all" not in prodtypes and product not in prodtypes:
399
            return
400
401
    # Create the destination directory.
402
    dest_path = os.path.join(tmpdir, rule.id_)
403
    os.mkdir(dest_path)
404
405
    # The priority order is rule-specific tests over templated tests.
406
    # That is, for any test under rule_id/tests with a name matching a
407
    # test under shared/templates/<template_name>/tests/, the former
408
    # will preferred. This means we need to process templates first,
409
    # so they'll be overwritten later if necessary.
410
    if rule.template and rule.template['vars']:
411
        templated_tests = template_builder.get_all_tests(rule.id_, rule.template,
412
                                                         local_env_yaml)
413
414
        for relative_path in templated_tests:
415
            test_content = templated_tests[relative_path]
416
            write_rule_templated_tests(dest_path, relative_path, test_content)
417
418
    write_rule_dir_tests(local_env_yaml, dest_path, dirpath)
419
420
421 1
def template_tests(product=None):
422
    """
423
    Create a temporary directory with test cases parsed via jinja using
424
    product-specific context.
425
    """
426
    # Set up an empty temp directory
427
    tmpdir = tempfile.mkdtemp()
428
429
    # We want to remove the temporary directory on failure, but preserve
430
    # it on success. Wrap in a try/except block and reraise the original
431
    # exception after removing the temporary directory.
432
    try:
433
        # Load the product context we're executing under, if any.
434
        product_yaml = get_product_context(product)
435
436
        # Initialize a mock template_builder.
437
        empty = "/ssgts/empty/placeholder"
438
        template_builder = ssg.templates.Builder(product_yaml, empty,
439
                                                 _SHARED_TEMPLATES, empty,
440
                                                 empty)
441
442
        # Note that we're not exactly copying 1-for-1 the contents of the
443
        # directory structure into the temporary one. Instead we want a
444
        # flattened mapping with all rules in a single top-level directory
445
        # and all tests immediately contained within it. That is:
446
        #
447
        # /group_a/rule_a/tests/something.pass.sh -> /rule_a/something.pass.sh
448
        for dirpath, dirnames, _ in walk_through_benchmark_dirs(product):
449
            # Skip anything that isn't obviously a rule.
450
            if not is_rule_dir(dirpath):
451
                continue
452
453
            template_rule_tests(product, product_yaml, template_builder, tmpdir, dirpath)
454
    except Exception as exp:
455
        shutil.rmtree(tmpdir, ignore_errors=True)
456
        raise exp
457
458
    return tmpdir
459
460
461 1
def create_tarball(product):
462
    """Create a tarball which contains all test scenarios for every rule.
463
    Tarball contains directories with the test scenarios. The name of the
464
    directories is the same as short rule ID. There is no tree structure.
465
    """
466
    templated_tests = template_tests(product=product)
467
468
    try:
469
        with tempfile.NamedTemporaryFile(
470
                "wb", suffix=".tar.gz", delete=False) as fp:
471
            with tarfile.TarFile.open(fileobj=fp, mode="w") as tarball:
472
                tarball.add(_SHARED_DIR, arcname="shared", filter=_make_file_root_owned)
473
                for rule_id in os.listdir(templated_tests):
474
                    # When a top-level directory exists under the temporary
475
                    # templated tests directory, we've already validated that
476
                    # it is a valid rule directory. Thus we can simply add it
477
                    # to the tarball.
478
                    absolute_dir = os.path.join(templated_tests, rule_id)
479
                    if not os.path.isdir(absolute_dir):
480
                        continue
481
482
                    tarball.add(
483
                        absolute_dir, arcname=rule_id,
484
                        filter=lambda tinfo: _exclude_garbage(_make_file_root_owned(tinfo))
485
                    )
486
487
            # Since we've added the templated contents into the tarball, we
488
            # can now delete the tree.
489
            shutil.rmtree(templated_tests, ignore_errors=True)
490
            return fp.name
491
    except Exception as exp:
492
        shutil.rmtree(templated_tests, ignore_errors=True)
493
        raise exp
494
495
496 1
def send_scripts(test_env):
497
    remote_dir = REMOTE_TEST_SCENARIOS_DIRECTORY
498
    archive_file = create_tarball(test_env.product)
499
    archive_file_basename = os.path.basename(archive_file)
500
    remote_archive_file = os.path.join(remote_dir, archive_file_basename)
501
    logging.debug("Uploading scripts.")
502
    log_file_name = os.path.join(LogHelper.LOG_DIR, "env-preparation.log")
503
504
    with open(log_file_name, 'a') as log_file:
505
        print("Setting up test setup scripts", file=log_file)
506
507
        test_env.execute_ssh_command(
508
            "mkdir -p {remote_dir}".format(remote_dir=remote_dir),
509
            log_file, "Cannot create directory {0}".format(remote_dir))
510
        test_env.scp_upload_file(
511
            archive_file, remote_dir,
512
            log_file, "Cannot copy archive {0} to the target machine's directory {1}"
513
            .format(archive_file, remote_dir))
514
        test_env.execute_ssh_command(
515
            "tar xf {remote_archive_file} -C {remote_dir}"
516
            .format(remote_dir=remote_dir, remote_archive_file=remote_archive_file),
517
            log_file, "Cannot extract data tarball {0}".format(remote_archive_file))
518
    os.unlink(archive_file)
519
    return remote_dir
520
521
522 1
def get_test_dir_config(test_dir, product_yaml):
523
    test_config = dict()
524
    test_config_filename = os.path.join(test_dir, TESTS_CONFIG_NAME)
525
    if os.path.exists(test_config_filename):
526
        test_config = ssg.yaml.open_and_expand(test_config_filename, product_yaml)
527
    return test_config
528
529
530 1
def select_templated_tests(test_dir_config, available_scenarios_basenames):
531
    deny_scenarios = set(test_dir_config.get("deny_templated_scenarios", []))
532
    available_scenarios_basenames = {
533
        test_name for test_name in available_scenarios_basenames
534
        if test_name not in deny_scenarios
535
    }
536
537
    allow_scenarios = set(test_dir_config.get("allow_templated_scenarios", []))
538
    if allow_scenarios:
539
        available_scenarios_basenames = {
540
            test_name for test_name in available_scenarios_basenames
541
            if test_name in allow_scenarios
542
        }
543
    
544
    allowed_and_denied = deny_scenarios.intersection(allow_scenarios)
545
    if allowed_and_denied:
546
        msg = (
547
            "Test directory configuration contain inconsistencies: {allowed_and_denied} "
548
            "scenarios are both allowed and denied."
549
            .format(test_dir_config=test_dir_config, allowed_and_denied=allowed_and_denied)
550
        )
551
        raise ValueError(msg)
552
    return available_scenarios_basenames
553
554
555 1
def iterate_over_rules(product=None):
556
    """Iterate over rule directories which have test scenarios".
557
558
    Returns:
559
        Named tuple Rule having these fields:
560
            directory -- absolute path to the rule "tests" subdirectory
561
                         containing the test scenarios in Bash
562
            id -- full rule id as it is present in datastream
563
            short_id -- short rule ID, the same as basename of the directory
564
                        containing the test scenarios in Bash
565
            scenarios_basenames -- list of executable .sh files in the uploaded tarball
566
    """
567
568
    # Here we need to perform some magic to handle parsing the rule (from a
569
    # product perspective) and loading any templated tests. In particular,
570
    # identifying which tests to potentially run involves invoking the
571
    # templating engine.
572
    #
573
    # Begin by loading context about our execution environment, if any.
574
    product_yaml = get_product_context(product)
575
576
    # Initialize a mock template_builder.
577
    empty = "/ssgts/empty/placeholder"
578
    template_builder = ssg.templates.Builder(product_yaml, empty,
579
                                             _SHARED_TEMPLATES, empty, empty)
580
581
    for dirpath, dirnames, filenames in walk_through_benchmark_dirs(product):
582
        if is_rule_dir(dirpath):
583
            short_rule_id = os.path.basename(dirpath)
584
585
            # Load the rule itself to check for a template.
586
            rule, local_env_yaml = load_rule_and_env(dirpath, product_yaml, product)
587
            template_name = None
588
589
            # Before we get too far, we wish to search the rule YAML to see if
590
            # it is applicable to the current product. If we have a product
591
            # and the rule isn't applicable for the product, there's no point
592
            # in continuing with the rest of the loading. This should speed up
593
            # the loading of the templated tests. Note that we've already
594
            # parsed the prodtype into local_env_yaml
595
            if product and local_env_yaml['products']:
596
                prodtypes = local_env_yaml['products']
597
                if "all" not in prodtypes and product not in prodtypes:
598
                    continue
599
600
            # All tests is a mapping from path (in the tarball) to contents
601
            # of the test case. This is necessary because later code (which
602
            # attempts to parse headers from the test case) don't have easy
603
            # access to templated content. By reading it and returning it
604
            # here, we can save later code from having to understand the
605
            # templating system.
606
            all_tests = dict()
607
608
            tests_dir = os.path.join(dirpath, "tests")
609
            test_config = get_test_dir_config(tests_dir, product_yaml)
610
611
            # Start by checking for templating tests and provision them if
612
            # present.
613
            if rule.template and rule.template['vars']:
614
                templated_tests = template_builder.get_all_tests(
615
                    rule.id_, rule.template, local_env_yaml)
616
617
                allowed_templated_tests = select_templated_tests(
618
                    test_config, templated_tests.keys())
619
                all_tests.update({name: templated_tests[name] for name in allowed_templated_tests})
620
                template_name = rule.template['name']
621
622
            # Add additional tests from the local rule directory. Note that,
623
            # like the behavior in template_tests, this will overwrite any
624
            # templated tests with the same file name.
625
            if os.path.exists(tests_dir):
626
                tests_dir_files = os.listdir(tests_dir)
627
                for test_case in tests_dir_files:
628
                    test_path = os.path.join(tests_dir, test_case)
629
                    if os.path.isdir(test_path):
630
                        continue
631
632
                    all_tests[test_case] = process_file_with_macros(test_path, local_env_yaml)
633
634
            # Filter out everything except the shell test scenarios.
635
            # Other files in rule directories are editor swap files
636
            # or other content than a test case.
637
            allowed_scripts = filter(lambda x: x.endswith(".sh"), all_tests)
638
            content_mapping = {x: all_tests[x] for x in allowed_scripts}
639
640
            # Skip any rules that lack any content. This ensures that if we
641
            # end up with rules with a template lacking tests and without any
642
            # rule directory tests, we don't include the empty rule here.
643
            if not content_mapping:
644
                continue
645
646
            full_rule_id = OSCAP_RULE + short_rule_id
647
            result = Rule(
648
                directory=tests_dir, id=full_rule_id, short_id=short_rule_id,
649
                scenarios_basenames=content_mapping, template=template_name)
650
            yield result
651
652
653 1
def get_cpe_of_tested_os(test_env, log_file):
654
    os_release_file = "/etc/os-release"
655
    cpe_line = test_env.execute_ssh_command(
656
        "grep CPE_NAME {os_release_file}".format(os_release_file=os_release_file),
657
        log_file)
658
    # We are parsing an assignment that is possibly quoted
659
    cpe = re.match(r'''CPE_NAME=(["']?)(.*)\1''', cpe_line)
660
    if cpe and cpe.groups()[1]:
661
        return cpe.groups()[1]
662
    msg = ["Unable to get a CPE of the system running tests"]
663
    if cpe_line:
664
        msg.append(
665
            "Retreived a CPE line that we couldn't parse: {cpe_line}"
666
            .format(cpe_line=cpe_line))
667
    else:
668
        msg.append(
669
            "Couldn't get CPE entry from '{os_release_file}'"
670
            .format(os_release_file=os_release_file))
671
    raise RuntimeError("\n".join(msg))
672
673
674 1
INSTALL_COMMANDS = dict(
675
    fedora=("dnf", "install", "-y"),
676
    rhel7=("yum", "install", "-y"),
677
    rhel8=("yum", "install", "-y"),
678
    rhel9=("yum", "install", "-y"),
679
    ubuntu=("DEBIAN_FRONTEND=noninteractive", "apt", "install", "-y"),
680
)
681
682
683 1
def install_packages(test_env, packages):
684
    log_file_name = os.path.join(LogHelper.LOG_DIR, "env-preparation.log")
685
686
    with open(log_file_name, "a") as log_file:
687
        platform_cpe = get_cpe_of_tested_os(test_env, log_file)
688
    platform = cpes_to_platform([platform_cpe])
689
690
    command_str = " ".join(INSTALL_COMMANDS[platform] + tuple(packages))
691
692
    with open(log_file_name, 'a') as log_file:
693
        print("Installing packages", file=log_file)
694
        log_file.flush()
695
        test_env.execute_ssh_command(
696
            command_str, log_file,
697
            "Couldn't install required packages: {packages}".format(packages=",".join(packages)))
698
699
700 1
def cpes_to_platform(cpes):
701
    for cpe in cpes:
702
        if "fedora" in cpe:
703
            return "fedora"
704
        if "redhat:enterprise_linux" in cpe:
705
            match = re.search(r":enterprise_linux:([^:]+):", cpe)
706
            if match:
707
                major_version = match.groups()[0].split(".")[0]
708
                return "rhel" + major_version
709
        if "ubuntu" in cpe:
710
            return "ubuntu"
711
    msg = "Unable to deduce a platform from these CPEs: {cpes}".format(cpes=cpes)
712
    raise ValueError(msg)
713