Test Failed
Push — master ( 9503db...9eadfe )
by Jan
02:16 queued 13s
created

tests.ssg_test_suite.common.template_tests()   A

Complexity

Conditions 4

Size

Total Lines 38
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 16.8116

Importance

Changes 0
Metric Value
cc 4
eloc 16
nop 1
dl 0
loc 38
ccs 1
cts 14
cp 0.0714
crap 16.8116
rs 9.6
c 0
b 0
f 0
1 1
from __future__ import print_function
2
3 1
import os
4 1
import logging
5 1
import subprocess
6 1
from collections import namedtuple
7 1
import functools
8 1
import tarfile
9 1
import tempfile
10 1
import re
11 1
import shutil
12
13 1
from ssg.build_cpe import ProductCPEs
14 1
from ssg.build_yaml import Rule as RuleYAML
15 1
from ssg.constants import MULTI_PLATFORM_MAPPING
16 1
from ssg.constants import FULL_NAME_TO_PRODUCT_MAPPING
17 1
from ssg.constants import OSCAP_RULE
18 1
from ssg.jinja import process_file_with_macros
19 1
from ssg.products import product_yaml_path, load_product_yaml
20 1
from ssg.rules import get_rule_dir_yaml, is_rule_dir
21 1
from ssg.rule_yaml import parse_prodtype
22 1
from ssg_test_suite.log import LogHelper
23
24 1
import ssg.templates
25
26
27 1
Scenario_run = namedtuple(
28
    "Scenario_run",
29
    ("rule_id", "script"))
30 1
Scenario_conditions = namedtuple(
31
    "Scenario_conditions",
32
    ("backend", "scanning_mode", "remediated_by", "datastream"))
33 1
Rule = namedtuple(
34
    "Rule", ["directory", "id", "short_id", "files", "template"])
35
36 1
SSG_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
37
38 1
_BENCHMARK_DIRS = [
39
        os.path.abspath(os.path.join(SSG_ROOT, 'linux_os', 'guide')),
40
        os.path.abspath(os.path.join(SSG_ROOT, 'applications')),
41
        ]
42
43 1
_SHARED_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '../shared'))
44
45 1
_SHARED_TEMPLATES = os.path.abspath(os.path.join(SSG_ROOT, 'shared/templates'))
46
47 1
REMOTE_USER = "root"
48 1
REMOTE_USER_HOME_DIRECTORY = "/root"
49 1
REMOTE_TEST_SCENARIOS_DIRECTORY = os.path.join(REMOTE_USER_HOME_DIRECTORY, "ssgts")
50
51 1
try:
52 1
    SSH_ADDITIONAL_OPTS = tuple(os.environ.get('SSH_ADDITIONAL_OPTIONS').split())
53 1
except AttributeError:
54
    # If SSH_ADDITIONAL_OPTIONS is not defined set it to empty tuple.
55 1
    SSH_ADDITIONAL_OPTS = tuple()
56
57 1
SSH_ADDITIONAL_OPTS = (
58
    "-o", "StrictHostKeyChecking=no",
59
    "-o", "UserKnownHostsFile=/dev/null",
60
) + SSH_ADDITIONAL_OPTS
61
62
63 1
def walk_through_benchmark_dirs(product=None):
64
    directories = _BENCHMARK_DIRS
65
    if product is not None:
66
        yaml_path = product_yaml_path(SSG_ROOT, product)
67
        product_base = os.path.dirname(yaml_path)
68
        product_yaml = load_product_yaml(yaml_path)
69
        benchmark_root = os.path.join(product_base, product_yaml['benchmark_root'])
70
        directories = [os.path.abspath(benchmark_root)]
71
72
    for dirname in directories:
73
        for dirpath, dirnames, filenames in os.walk(dirname):
74
            yield dirpath, dirnames, filenames
75
76
77 1
class Stage(object):
78 1
    NONE = 0
79 1
    PREPARATION = 1
80 1
    INITIAL_SCAN = 2
81 1
    REMEDIATION = 3
82 1
    FINAL_SCAN = 4
83
84
85 1
@functools.total_ordering
86 1
class RuleResult(object):
87 1
    STAGE_STRINGS = {
88
        "preparation",
89
        "initial_scan",
90
        "remediation",
91
        "final_scan",
92
    }
93
94
    """
95
    Result of a test suite testing rule under a scenario.
96
97
    Supports ordering by success - the most successful run orders first.
98
    """
99 1
    def __init__(self, result_dict=None):
100 1
        self.scenario = Scenario_run("", "")
101 1
        self.conditions = Scenario_conditions("", "", "", "")
102 1
        self.when = ""
103 1
        self.passed_stages = dict()
104 1
        self.passed_stages_count = 0
105 1
        self.success = False
106
107 1
        if result_dict:
108 1
            self.load_from_dict(result_dict)
109
110 1
    def load_from_dict(self, data):
111 1
        self.scenario = Scenario_run(data["rule_id"], data["scenario_script"])
112 1
        self.conditions = Scenario_conditions(
113
            data["backend"], data["scanning_mode"],
114
            data["remediated_by"], data["datastream"])
115 1
        self.when = data["run_timestamp"]
116
117 1
        self.passed_stages = {key: data[key] for key in self.STAGE_STRINGS if key in data}
118 1
        self.passed_stages_count = sum(self.passed_stages.values())
119
120 1
        self.success = data.get("final_scan", False)
121 1
        if not self.success:
122 1
            self.success = (
123
                "remediation" not in data
124
                and data.get("initial_scan", False))
125
126 1
    def save_to_dict(self):
127 1
        data = dict()
128 1
        data["rule_id"] = self.scenario.rule_id
129 1
        data["scenario_script"] = self.scenario.script
130
131 1
        data["backend"] = self.conditions.backend
132 1
        data["scanning_mode"] = self.conditions.scanning_mode
133 1
        data["remediated_by"] = self.conditions.remediated_by
134 1
        data["datastream"] = self.conditions.datastream
135
136 1
        data["run_timestamp"] = self.when
137
138 1
        for stage_str, result in self.passed_stages.items():
139 1
            data[stage_str] = result
140
141 1
        return data
142
143 1
    def record_stage_result(self, stage, successful):
144
        assert stage in self.STAGE_STRINGS, (
145
            "Stage name {name} is invalid, choose one from {choices}"
146
            .format(name=stage, choices=", ".join(self.STAGE_STRINGS))
147
        )
148
        self.passed_stages[stage] = successful
149
150 1
    def relative_conditions_to(self, other):
151 1
        if self.conditions == other.conditions:
152
            return self.when, other.when
153
        else:
154 1
            return tuple(self.conditions), tuple(other.conditions)
155
156 1
    def __eq__(self, other):
157 1
        return (self.success == other.success
158
                and tuple(self.passed_stages) == tuple(self.passed_stages))
159
160 1
    def __lt__(self, other):
161 1
        return self.passed_stages_count > other.passed_stages_count
162
163
164 1
def run_cmd_local(command, verbose_path, env=None):
165
    command_string = ' '.join(command)
166
    logging.debug('Running {}'.format(command_string))
167
    returncode, output = _run_cmd(command, verbose_path, env)
168
    return returncode, output
169
170
171 1
def _run_cmd(command_list, verbose_path, env=None):
172
    returncode = 0
173
    output = b""
174
    try:
175
        with open(verbose_path, 'w') as verbose_file:
176
            output = subprocess.check_output(
177
                command_list, stderr=verbose_file, env=env)
178
    except subprocess.CalledProcessError as e:
179
        returncode = e.returncode
180
        output = e.output
181
    return returncode, output.decode('utf-8')
182
183
184 1
def _get_platform_cpes(platform):
185 1
    ssg_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
186 1
    if platform.startswith("multi_platform_"):
187 1
        try:
188 1
            products = MULTI_PLATFORM_MAPPING[platform]
189 1
        except KeyError:
190 1
            logging.error(
191
                "Unknown multi_platform specifier: %s is not from %s"
192
                % (platform, ", ".join(MULTI_PLATFORM_MAPPING.keys())))
193 1
            raise ValueError
194 1
        platform_cpes = set()
195 1
        for p in products:
196 1
            product_yaml_path = os.path.join(ssg_root, "products", p, "product.yml")
197 1
            product_yaml = load_product_yaml(product_yaml_path)
198 1
            p_cpes = ProductCPEs(product_yaml)
199 1
            platform_cpes |= set(p_cpes.get_product_cpe_names())
200 1
        return platform_cpes
201
    else:
202
        # scenario platform is specified by a full product name
203 1
        try:
204 1
            product = FULL_NAME_TO_PRODUCT_MAPPING[platform]
205 1
        except KeyError:
206 1
            logging.error(
207
                "Unknown product name: %s is not from %s"
208
                % (platform, ", ".join(FULL_NAME_TO_PRODUCT_MAPPING.keys())))
209 1
            raise ValueError
210 1
        product_yaml_path = os.path.join(ssg_root, "products", product, "product.yml")
211 1
        product_yaml = load_product_yaml(product_yaml_path)
212 1
        product_cpes = ProductCPEs(product_yaml)
213 1
        platform_cpes = set(product_cpes.get_product_cpe_names())
214 1
        return platform_cpes
215
216
217 1
def matches_platform(scenario_platforms, benchmark_cpes):
218 1
    if "multi_platform_all" in scenario_platforms:
219 1
        return True
220 1
    scenario_cpes = set()
221 1
    for p in scenario_platforms:
222 1
        scenario_cpes |= _get_platform_cpes(p)
223 1
    return len(scenario_cpes & benchmark_cpes) > 0
224
225
226 1
def run_with_stdout_logging(command, args, log_file):
227
    log_file.write("{0} {1}\n".format(command, " ".join(args)))
228
    result = subprocess.run(
229
            (command,) + args, encoding="utf-8", stdout=subprocess.PIPE,
230
            stderr=subprocess.PIPE, check=False)
231
    if result.stdout:
232
        log_file.write("STDOUT: ")
233
        log_file.write(result.stdout)
234
    if result.stderr:
235
        log_file.write("STDERR: ")
236
        log_file.write(result.stderr)
237
    if result.returncode:
238
        raise RuntimeError("'%s' command returned non-zero." % command)
239
    return result.stdout
240
241
242 1
def _exclude_garbage(tarinfo):
243
    file_name = tarinfo.name
244
    if file_name.endswith('pyc'):
245
        return None
246
    if file_name.endswith('swp'):
247
        return None
248
    return tarinfo
249
250
251 1
def _make_file_root_owned(tarinfo):
252
    if tarinfo:
253
        tarinfo.uid = 0
254
        tarinfo.gid = 0
255
        # set permission to 775
256
        tarinfo.mode = 509
257
    return tarinfo
258
259
260 1
def get_product_context(product=None):
261
    """
262
    Returns a product YAML context if any product is specified. Hard-coded to
263
    assume a debug build.
264
    """
265
    # Load product's YAML file if present. This will allow us to parse
266
    # tests in the context of the product we're executing under.
267
    product_yaml = dict()
268
    if product:
269
        yaml_path = product_yaml_path(SSG_ROOT, product)
270
        product_yaml = load_product_yaml(yaml_path)
271
272
    # We could run into a DocumentationNotComplete error when loading a
273
    # rule's YAML contents. However, because the test suite isn't executed
274
    # in the context of a particular build (though, ideally it would be
275
    # linked), we may not know exactly whether the top-level rule/profile
276
    # we're testing is actually completed. Thus, forcibly set the required
277
    # property to bypass this error.
278
    product_yaml['cmake_build_type'] = 'Debug'
279
280
    return product_yaml
281
282
283 1
def load_rule_and_env(rule_dir_path, env_yaml, product=None):
284
    """
285
    Loads a rule and returns the combination of the RuleYAML class and
286
    the corresponding local environment for that rule.
287
    """
288
289
    # First build the path to the rule.yml file
290
    rule_path = get_rule_dir_yaml(rule_dir_path)
291
292
    # Load rule content in our environment. We use this to satisfy
293
    # some implied properties that might be used in the test suite.
294
    # Make sure we normalize to a specific product as well so that
295
    # when we load templated content it is correct.
296
    rule = RuleYAML.from_yaml(rule_path, env_yaml)
297
    rule.normalize(product)
298
299
    # Note that most places would check prodtype, but we don't care
300
    # about that here: if the rule is available to the product, we
301
    # load and parse it anyways as we have no knowledge of the
302
    # top-level profile or rule passed into the test suite.
303
    prodtypes = parse_prodtype(rule.prodtype)
304
305
    # Our local copy of env_yaml needs some properties from rule.yml
306
    # for completeness.
307
    local_env_yaml = dict()
308
    local_env_yaml.update(env_yaml)
309
    local_env_yaml['rule_id'] = rule.id_
310
    local_env_yaml['rule_title'] = rule.title
311
    local_env_yaml['products'] = prodtypes
312
313
    return rule, local_env_yaml
314
315
316 1
def write_rule_templated_tests(dest_path, relative_path, test_content):
317
    output_path = os.path.join(dest_path, relative_path)
318
319
    # If there's a separator in the file name, it means we have nested
320
    # directories to deal with.
321
    if os.path.sep in relative_path:
322
        parts = os.path.split(relative_path)[:-1]
323
        for subdir_index in range(len(parts)):
324
            # We need to expand all directories in the correct order,
325
            # preserving any previous directories (as they're nested).
326
            # Use the star operator to splat array parts into arguments
327
            # to os.path.join(...).
328
            new_directory = os.path.join(dest_path, *parts[:subdir_index])
329
            os.mkdir(new_directory)
330
331
    # Write out the test content to the desired location on disk.
332
    with open(output_path, 'w') as output_fp:
333
        print(test_content, file=output_fp)
334
335
336 1
def write_rule_dir_tests(local_env_yaml, dest_path, dirpath):
337
    # Walk the test directory, writing all tests into the output
338
    # directory, recursively.
339
    tests_dir_path = os.path.join(dirpath, "tests")
340
    tests_dir_path = os.path.abspath(tests_dir_path)
341
342
    # Note that the tests/ directory may not always exist any more. In
343
    # particular, when a rule uses a template, tests may be present there
344
    # but not present in the actual rule directory.
345
    if not os.path.exists(tests_dir_path):
346
        return
347
348
    for dirpath, dirnames, filenames in os.walk(tests_dir_path):
349
        for dirname in dirnames:
350
            # We want to recreate the correct path under the temporary
351
            # directory. Resolve it to a relative path from the tests/
352
            # directory.
353
            dir_path = os.path.relpath(os.path.join(dirpath, dirname), tests_dir_path)
354
            tmp_dir_path = os.path.join(dest_path, dir_path)
355
            os.mkdir(tmp_dir_path)
356
357
        for filename in filenames:
358
            # We want to recreate the correct path under the temporary
359
            # directory. Resolve it to a relative path from the tests/
360
            # directory. Assumption: directories should be created
361
            # prior to recursing into them, so we don't need to handle
362
            # if a file's parent directory doesn't yet exist under the
363
            # destination.
364
            src_test_path = os.path.join(dirpath, filename)
365
            rel_test_path = os.path.relpath(src_test_path, tests_dir_path)
366
            dest_test_path = os.path.join(dest_path, rel_test_path)
367
368
            # Rather than performing an OS-level copy, we need to
369
            # first parse the test with jinja and then write it back
370
            # out to the destination.
371
            parsed_test = process_file_with_macros(src_test_path, local_env_yaml)
372
            with open(dest_test_path, 'w') as output_fp:
373
                print(parsed_test, file=output_fp)
374
375
376 1
def template_rule_tests(product, product_yaml, template_builder, tmpdir, dirpath):
377
    """
378
    For a given rule directory, templates all contained tests into the output
379
    (tmpdir) directory.
380
    """
381
382
    # Load the rule and its environment
383
    rule, local_env_yaml = load_rule_and_env(dirpath, product_yaml, product)
384
385
    # Before we get too far, we wish to search the rule YAML to see if
386
    # it is applicable to the current product. If we have a product
387
    # and the rule isn't applicable for the product, there's no point
388
    # in continuing with the rest of the loading. This should speed up
389
    # the loading of the templated tests. Note that we've already
390
    # parsed the prodtype into local_env_yaml
391
    if product and local_env_yaml['products']:
392
        prodtypes = local_env_yaml['products']
393
        if "all" not in prodtypes and product not in prodtypes:
394
            return
395
396
    # Create the destination directory.
397
    dest_path = os.path.join(tmpdir, rule.id_)
398
    os.mkdir(dest_path)
399
400
    # The priority order is rule-specific tests over templated tests.
401
    # That is, for any test under rule_id/tests with a name matching a
402
    # test under shared/templates/<template_name>/tests/, the former
403
    # will preferred. This means we need to process templates first,
404
    # so they'll be overwritten later if necessary.
405
    if rule.template and rule.template['vars']:
406
        templated_tests = template_builder.get_all_tests(rule.id_, rule.template,
407
                                                         local_env_yaml)
408
409
        for relative_path in templated_tests:
410
            test_content = templated_tests[relative_path]
411
            write_rule_templated_tests(dest_path, relative_path, test_content)
412
413
    write_rule_dir_tests(local_env_yaml, dest_path, dirpath)
414
415
416 1
def template_tests(product=None):
417
    """
418
    Create a temporary directory with test cases parsed via jinja using
419
    product-specific context.
420
    """
421
    # Set up an empty temp directory
422
    tmpdir = tempfile.mkdtemp()
423
424
    # We want to remove the temporary directory on failure, but preserve
425
    # it on success. Wrap in a try/except block and reraise the original
426
    # exception after removing the temporary directory.
427
    try:
428
        # Load the product context we're executing under, if any.
429
        product_yaml = get_product_context(product)
430
431
        # Initialize a mock template_builder.
432
        empty = "/ssgts/empty/placeholder"
433
        template_builder = ssg.templates.Builder(product_yaml, empty,
434
                                                 _SHARED_TEMPLATES, empty,
435
                                                 empty)
436
437
        # Note that we're not exactly copying 1-for-1 the contents of the
438
        # directory structure into the temporary one. Instead we want a
439
        # flattened mapping with all rules in a single top-level directory
440
        # and all tests immediately contained within it. That is:
441
        #
442
        # /group_a/rule_a/tests/something.pass.sh -> /rule_a/something.pass.sh
443
        for dirpath, dirnames, _ in walk_through_benchmark_dirs(product):
444
            # Skip anything that isn't obviously a rule.
445
            if not is_rule_dir(dirpath):
446
                continue
447
448
            template_rule_tests(product, product_yaml, template_builder, tmpdir, dirpath)
449
    except Exception as exp:
450
        shutil.rmtree(tmpdir, ignore_errors=True)
451
        raise exp
452
453
    return tmpdir
454
455
456 1
def create_tarball(product):
457
    """Create a tarball which contains all test scenarios for every rule.
458
    Tarball contains directories with the test scenarios. The name of the
459
    directories is the same as short rule ID. There is no tree structure.
460
    """
461
    templated_tests = template_tests(product=product)
462
463
    try:
464
        with tempfile.NamedTemporaryFile(
465
                "wb", suffix=".tar.gz", delete=False) as fp:
466
            with tarfile.TarFile.open(fileobj=fp, mode="w") as tarball:
467
                tarball.add(_SHARED_DIR, arcname="shared", filter=_make_file_root_owned)
468
                for rule_id in os.listdir(templated_tests):
469
                    # When a top-level directory exists under the temporary
470
                    # templated tests directory, we've already validated that
471
                    # it is a valid rule directory. Thus we can simply add it
472
                    # to the tarball.
473
                    absolute_dir = os.path.join(templated_tests, rule_id)
474
                    if not os.path.isdir(absolute_dir):
475
                        continue
476
477
                    tarball.add(
478
                        absolute_dir, arcname=rule_id,
479
                        filter=lambda tinfo: _exclude_garbage(_make_file_root_owned(tinfo))
480
                    )
481
482
            # Since we've added the templated contents into the tarball, we
483
            # can now delete the tree.
484
            shutil.rmtree(templated_tests, ignore_errors=True)
485
            return fp.name
486
    except Exception as exp:
487
        shutil.rmtree(templated_tests, ignore_errors=True)
488
        raise exp
489
490
491 1
def send_scripts(test_env):
492
    remote_dir = REMOTE_TEST_SCENARIOS_DIRECTORY
493
    archive_file = create_tarball(test_env.product)
494
    archive_file_basename = os.path.basename(archive_file)
495
    remote_archive_file = os.path.join(remote_dir, archive_file_basename)
496
    logging.debug("Uploading scripts.")
497
    log_file_name = os.path.join(LogHelper.LOG_DIR, "env-preparation.log")
498
499
    with open(log_file_name, 'a') as log_file:
500
        print("Setting up test setup scripts", file=log_file)
501
502
        test_env.execute_ssh_command(
503
            "mkdir -p {remote_dir}".format(remote_dir=remote_dir),
504
            log_file, "Cannot create directory {0}".format(remote_dir))
505
        test_env.scp_upload_file(
506
            archive_file, remote_dir,
507
            log_file, "Cannot copy archive {0} to the target machine's directory {1}"
508
            .format(archive_file, remote_dir))
509
        test_env.execute_ssh_command(
510
            "tar xf {remote_archive_file} -C {remote_dir}"
511
            .format(remote_dir=remote_dir, remote_archive_file=remote_archive_file),
512
            log_file, "Cannot extract data tarball {0}".format(remote_archive_file))
513
    os.unlink(archive_file)
514
    return remote_dir
515
516
517 1
def iterate_over_rules(product=None):
518
    """Iterate over rule directories which have test scenarios".
519
520
    Returns:
521
        Named tuple Rule having these fields:
522
            directory -- absolute path to the rule "tests" subdirectory
523
                         containing the test scenarios in Bash
524
            id -- full rule id as it is present in datastream
525
            short_id -- short rule ID, the same as basename of the directory
526
                        containing the test scenarios in Bash
527
            files -- list of executable .sh files in the uploaded tarball
528
    """
529
530
    # Here we need to perform some magic to handle parsing the rule (from a
531
    # product perspective) and loading any templated tests. In particular,
532
    # identifying which tests to potentially run involves invoking the
533
    # templating engine.
534
    #
535
    # Begin by loading context about our execution environment, if any.
536
    product_yaml = get_product_context(product)
537
538
    # Initialize a mock template_builder.
539
    empty = "/ssgts/empty/placeholder"
540
    template_builder = ssg.templates.Builder(product_yaml, empty,
541
                                             _SHARED_TEMPLATES, empty, empty)
542
543
    for dirpath, dirnames, filenames in walk_through_benchmark_dirs(product):
544
        if is_rule_dir(dirpath):
545
            short_rule_id = os.path.basename(dirpath)
546
547
            # Load the rule itself to check for a template.
548
            rule, local_env_yaml = load_rule_and_env(dirpath, product_yaml, product)
549
            template_name = None
550
551
            # Before we get too far, we wish to search the rule YAML to see if
552
            # it is applicable to the current product. If we have a product
553
            # and the rule isn't applicable for the product, there's no point
554
            # in continuing with the rest of the loading. This should speed up
555
            # the loading of the templated tests. Note that we've already
556
            # parsed the prodtype into local_env_yaml
557
            if product and local_env_yaml['products']:
558
                prodtypes = local_env_yaml['products']
559
                if "all" not in prodtypes and product not in prodtypes:
560
                    continue
561
562
            # All tests is a mapping from path (in the tarball) to contents
563
            # of the test case. This is necessary because later code (which
564
            # attempts to parse headers from the test case) don't have easy
565
            # access to templated content. By reading it and returning it
566
            # here, we can save later code from having to understand the
567
            # templating system.
568
            all_tests = dict()
569
570
            # Start by checking for templating tests and provision them if
571
            # present.
572
            if rule.template and rule.template['vars']:
573
                templated_tests = template_builder.get_all_tests(
574
                    rule.id_, rule.template, local_env_yaml)
575
                all_tests.update(templated_tests)
576
                template_name = rule.template['name']
577
578
            # Add additional tests from the local rule directory. Note that,
579
            # like the behavior in template_tests, this will overwrite any
580
            # templated tests with the same file name.
581
            tests_dir = os.path.join(dirpath, "tests")
582
            if os.path.exists(tests_dir):
583
                tests_dir_files = os.listdir(tests_dir)
584
                for test_case in tests_dir_files:
585
                    test_path = os.path.join(tests_dir, test_case)
586
                    if os.path.isdir(test_path):
587
                        continue
588
589
                    all_tests[test_case] = process_file_with_macros(test_path, local_env_yaml)
590
591
            # Filter out everything except the shell test scenarios.
592
            # Other files in rule directories are editor swap files
593
            # or other content than a test case.
594
            allowed_scripts = filter(lambda x: x.endswith(".sh"), all_tests)
595
            content_mapping = {x: all_tests[x] for x in allowed_scripts}
596
597
            # Skip any rules that lack any content. This ensures that if we
598
            # end up with rules with a template lacking tests and without any
599
            # rule directory tests, we don't include the empty rule here.
600
            if not content_mapping:
601
                continue
602
603
            full_rule_id = OSCAP_RULE + short_rule_id
604
            result = Rule(
605
                directory=tests_dir, id=full_rule_id, short_id=short_rule_id,
606
                files=content_mapping, template=template_name)
607
            yield result
608
609
610 1
def get_cpe_of_tested_os(test_env, log_file):
611
    os_release_file = "/etc/os-release"
612
    cpe_line = test_env.execute_ssh_command(
613
        "grep CPE_NAME {os_release_file}".format(os_release_file=os_release_file),
614
        log_file)
615
    # We are parsing an assignment that is possibly quoted
616
    cpe = re.match(r'''CPE_NAME=(["']?)(.*)\1''', cpe_line)
617
    if cpe and cpe.groups()[1]:
618
        return cpe.groups()[1]
619
    msg = ["Unable to get a CPE of the system running tests"]
620
    if cpe_line:
621
        msg.append(
622
            "Retreived a CPE line that we couldn't parse: {cpe_line}"
623
            .format(cpe_line=cpe_line))
624
    else:
625
        msg.append(
626
            "Couldn't get CPE entry from '{os_release_file}'"
627
            .format(os_release_file=os_release_file))
628
    raise RuntimeError("\n".join(msg))
629
630
631 1
INSTALL_COMMANDS = dict(
632
    fedora=("dnf", "install", "-y"),
633
    rhel7=("yum", "install", "-y"),
634
    rhel8=("yum", "install", "-y"),
635
    rhel9=("yum", "install", "-y"),
636
    ubuntu=("DEBIAN_FRONTEND=noninteractive", "apt", "install", "-y"),
637
)
638
639
640 1
def install_packages(test_env, packages):
641
    log_file_name = os.path.join(LogHelper.LOG_DIR, "env-preparation.log")
642
643
    with open(log_file_name, "a") as log_file:
644
        platform_cpe = get_cpe_of_tested_os(test_env, log_file)
645
    platform = cpes_to_platform([platform_cpe])
646
647
    command_str = " ".join(INSTALL_COMMANDS[platform] + tuple(packages))
648
649
    with open(log_file_name, 'a') as log_file:
650
        print("Installing packages", file=log_file)
651
        log_file.flush()
652
        test_env.execute_ssh_command(
653
            command_str, log_file,
654
            "Couldn't install required packages {packages}".format(packages=packages))
655
656
657 1
def cpes_to_platform(cpes):
658
    for cpe in cpes:
659
        if "fedora" in cpe:
660
            return "fedora"
661
        if "redhat:enterprise_linux" in cpe:
662
            match = re.search(r":enterprise_linux:([^:]+):", cpe)
663
            if match:
664
                major_version = match.groups()[0].split(".")[0]
665
                return "rhel" + major_version
666
        if "ubuntu" in cpe:
667
            return "ubuntu"
668
    msg = "Unable to deduce a platform from these CPEs: {cpes}".format(cpes=cpes)
669
    raise ValueError(msg)
670