Test Failed
Push — master ( 09b8ac...ee796b )
by Jan
04:42 queued 12s
created

tests.ssg_test_suite.common   F

Complexity

Total Complexity 96

Size/Duplication

Total Lines 672
Duplicated Lines 0 %

Test Coverage

Coverage 39.63%

Importance

Changes 0
Metric Value
eloc 399
dl 0
loc 672
ccs 134
cts 338
cp 0.3963
rs 2
c 0
b 0
f 0
wmc 96

20 Functions

Rating   Name   Duplication   Size   Complexity  
A _run_cmd() 0 11 3
A run_cmd_local() 0 5 1
A run_with_stdout_logging() 0 12 3
A _exclude_garbage() 0 7 3
A walk_through_benchmark_dirs() 0 12 4
B _get_platform_cpes() 0 31 5
A matches_platform() 0 7 3
A _make_file_root_owned() 0 7 2
B template_rule_tests() 0 38 8
F iterate_over_rules() 0 91 14
A get_cpe_of_tested_os() 0 19 4
A load_rule_and_env() 0 31 1
A get_product_context() 0 25 2
A write_rule_templated_tests() 0 18 4
A install_packages() 0 15 3
B create_tarball() 0 33 7
B write_rule_dir_tests() 0 38 6
A send_scripts() 0 24 2
A template_tests() 0 38 4
B cpes_to_platform() 0 13 6

7 Methods

Rating   Name   Duplication   Size   Complexity  
A RuleResult.relative_conditions_to() 0 5 2
A RuleResult.__eq__() 0 3 1
A RuleResult.load_from_dict() 0 15 2
A RuleResult.__lt__() 0 2 1
A RuleResult.__init__() 0 10 2
A RuleResult.record_stage_result() 0 6 1
A RuleResult.save_to_dict() 0 16 2

How to fix   Complexity   

Complexity

Complex classes like tests.ssg_test_suite.common often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
from __future__ import print_function
2
3 1
import os
4 1
import logging
5 1
import subprocess
6 1
from collections import namedtuple
7 1
import functools
8 1
import tarfile
9 1
import tempfile
10 1
import re
11 1
import shutil
12
13 1
from ssg.build_cpe import ProductCPEs
14 1
from ssg.build_yaml import Rule as RuleYAML
15 1
from ssg.constants import MULTI_PLATFORM_MAPPING
16 1
from ssg.constants import FULL_NAME_TO_PRODUCT_MAPPING
17 1
from ssg.constants import OSCAP_RULE
18 1
from ssg.jinja import process_file_with_macros
19 1
from ssg.products import product_yaml_path, load_product_yaml
20 1
from ssg.rules import get_rule_dir_yaml, is_rule_dir
21 1
from ssg.rule_yaml import parse_prodtype
22 1
from ssg_test_suite.log import LogHelper
23
24 1
import ssg.templates
25
26
27 1
Scenario_run = namedtuple(
28
    "Scenario_run",
29
    ("rule_id", "script"))
30 1
Scenario_conditions = namedtuple(
31
    "Scenario_conditions",
32
    ("backend", "scanning_mode", "remediated_by", "datastream"))
33 1
Rule = namedtuple(
34
    "Rule", ["directory", "id", "short_id", "files", "template"])
35
36 1
SSG_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
37
38 1
_BENCHMARK_DIRS = [
39
        os.path.abspath(os.path.join(SSG_ROOT, 'linux_os', 'guide')),
40
        os.path.abspath(os.path.join(SSG_ROOT, 'applications')),
41
        ]
42
43 1
_SHARED_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '../shared'))
44
45 1
_SHARED_TEMPLATES = os.path.abspath(os.path.join(SSG_ROOT, 'shared/templates'))
46
47 1
REMOTE_USER = "root"
48 1
REMOTE_USER_HOME_DIRECTORY = "/root"
49 1
REMOTE_TEST_SCENARIOS_DIRECTORY = os.path.join(REMOTE_USER_HOME_DIRECTORY, "ssgts")
50
51 1
try:
52 1
    SSH_ADDITIONAL_OPTS = tuple(os.environ.get('SSH_ADDITIONAL_OPTIONS').split())
53 1
except AttributeError:
54
    # If SSH_ADDITIONAL_OPTIONS is not defined set it to empty tuple.
55 1
    SSH_ADDITIONAL_OPTS = tuple()
56
57 1
SSH_ADDITIONAL_OPTS = (
58
    "-o", "StrictHostKeyChecking=no",
59
    "-o", "UserKnownHostsFile=/dev/null",
60
) + SSH_ADDITIONAL_OPTS
61
62
63 1
def walk_through_benchmark_dirs(product=None):
64
    directories = _BENCHMARK_DIRS
65
    if product is not None:
66
        yaml_path = product_yaml_path(SSG_ROOT, product)
67
        product_base = os.path.dirname(yaml_path)
68
        product_yaml = load_product_yaml(yaml_path)
69
        benchmark_root = os.path.join(product_base, product_yaml['benchmark_root'])
70
        directories = [os.path.abspath(benchmark_root)]
71
72
    for dirname in directories:
73
        for dirpath, dirnames, filenames in os.walk(dirname):
74
            yield dirpath, dirnames, filenames
75
76
77 1
class Stage(object):
78 1
    NONE = 0
79 1
    PREPARATION = 1
80 1
    INITIAL_SCAN = 2
81 1
    REMEDIATION = 3
82 1
    FINAL_SCAN = 4
83
84
85 1
@functools.total_ordering
86 1
class RuleResult(object):
87 1
    STAGE_STRINGS = {
88
        "preparation",
89
        "initial_scan",
90
        "remediation",
91
        "final_scan",
92
    }
93
94
    """
95
    Result of a test suite testing rule under a scenario.
96
97
    Supports ordering by success - the most successful run orders first.
98
    """
99 1
    def __init__(self, result_dict=None):
100 1
        self.scenario = Scenario_run("", "")
101 1
        self.conditions = Scenario_conditions("", "", "", "")
102 1
        self.when = ""
103 1
        self.passed_stages = dict()
104 1
        self.passed_stages_count = 0
105 1
        self.success = False
106
107 1
        if result_dict:
108 1
            self.load_from_dict(result_dict)
109
110 1
    def load_from_dict(self, data):
111 1
        self.scenario = Scenario_run(data["rule_id"], data["scenario_script"])
112 1
        self.conditions = Scenario_conditions(
113
            data["backend"], data["scanning_mode"],
114
            data["remediated_by"], data["datastream"])
115 1
        self.when = data["run_timestamp"]
116
117 1
        self.passed_stages = {key: data[key] for key in self.STAGE_STRINGS if key in data}
118 1
        self.passed_stages_count = sum(self.passed_stages.values())
119
120 1
        self.success = data.get("final_scan", False)
121 1
        if not self.success:
122 1
            self.success = (
123
                "remediation" not in data
124
                and data.get("initial_scan", False))
125
126 1
    def save_to_dict(self):
127 1
        data = dict()
128 1
        data["rule_id"] = self.scenario.rule_id
129 1
        data["scenario_script"] = self.scenario.script
130
131 1
        data["backend"] = self.conditions.backend
132 1
        data["scanning_mode"] = self.conditions.scanning_mode
133 1
        data["remediated_by"] = self.conditions.remediated_by
134 1
        data["datastream"] = self.conditions.datastream
135
136 1
        data["run_timestamp"] = self.when
137
138 1
        for stage_str, result in self.passed_stages.items():
139 1
            data[stage_str] = result
140
141 1
        return data
142
143 1
    def record_stage_result(self, stage, successful):
144
        assert stage in self.STAGE_STRINGS, (
145
            "Stage name {name} is invalid, choose one from {choices}"
146
            .format(name=stage, choices=", ".join(self.STAGE_STRINGS))
147
        )
148
        self.passed_stages[stage] = successful
149
150 1
    def relative_conditions_to(self, other):
151 1
        if self.conditions == other.conditions:
152
            return self.when, other.when
153
        else:
154 1
            return tuple(self.conditions), tuple(other.conditions)
155
156 1
    def __eq__(self, other):
157 1
        return (self.success == other.success
158
                and tuple(self.passed_stages) == tuple(self.passed_stages))
159
160 1
    def __lt__(self, other):
161 1
        return self.passed_stages_count > other.passed_stages_count
162
163
164 1
def run_cmd_local(command, verbose_path, env=None):
165
    command_string = ' '.join(command)
166
    logging.debug('Running {}'.format(command_string))
167
    returncode, output = _run_cmd(command, verbose_path, env)
168
    return returncode, output
169
170
171 1
def _run_cmd(command_list, verbose_path, env=None):
172
    returncode = 0
173
    output = b""
174
    try:
175
        with open(verbose_path, 'w') as verbose_file:
176
            output = subprocess.check_output(
177
                command_list, stderr=verbose_file, env=env)
178
    except subprocess.CalledProcessError as e:
179
        returncode = e.returncode
180
        output = e.output
181
    return returncode, output.decode('utf-8')
182
183
184 1
def _get_platform_cpes(platform):
185 1
    ssg_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
186 1
    if platform.startswith("multi_platform_"):
187 1
        try:
188 1
            products = MULTI_PLATFORM_MAPPING[platform]
189 1
        except KeyError:
190 1
            logging.error(
191
                "Unknown multi_platform specifier: %s is not from %s"
192
                % (platform, ", ".join(MULTI_PLATFORM_MAPPING.keys())))
193 1
            raise ValueError
194 1
        platform_cpes = set()
195 1
        for p in products:
196 1
            product_yaml_path = os.path.join(ssg_root, "products", p, "product.yml")
197 1
            product_yaml = load_product_yaml(product_yaml_path)
198 1
            p_cpes = ProductCPEs(product_yaml)
199 1
            platform_cpes |= set(p_cpes.get_product_cpe_names())
200 1
        return platform_cpes
201
    else:
202
        # scenario platform is specified by a full product name
203 1
        try:
204 1
            product = FULL_NAME_TO_PRODUCT_MAPPING[platform]
205 1
        except KeyError:
206 1
            logging.error(
207
                "Unknown product name: %s is not from %s"
208
                % (platform, ", ".join(FULL_NAME_TO_PRODUCT_MAPPING.keys())))
209 1
            raise ValueError
210 1
        product_yaml_path = os.path.join(ssg_root, "products", product, "product.yml")
211 1
        product_yaml = load_product_yaml(product_yaml_path)
212 1
        product_cpes = ProductCPEs(product_yaml)
213 1
        platform_cpes = set(product_cpes.get_product_cpe_names())
214 1
        return platform_cpes
215
216
217 1
def matches_platform(scenario_platforms, benchmark_cpes):
218 1
    if "multi_platform_all" in scenario_platforms:
219 1
        return True
220 1
    scenario_cpes = set()
221 1
    for p in scenario_platforms:
222 1
        scenario_cpes |= _get_platform_cpes(p)
223 1
    return len(scenario_cpes & benchmark_cpes) > 0
224
225
226 1
def run_with_stdout_logging(command, args, log_file):
227
    log_file.write("{0} {1}\n".format(command, " ".join(args)))
228
    result = subprocess.run(
229
            (command,) + args, encoding="utf-8", stdout=subprocess.PIPE,
230
            stderr=subprocess.PIPE, check=False)
231
    if result.stdout:
232
        log_file.write("STDOUT: ")
233
        log_file.write(result.stdout)
234
    if result.stderr:
235
        log_file.write("STDERR: ")
236
        log_file.write(result.stderr)
237
    return result
238
239
240 1
def _exclude_garbage(tarinfo):
241
    file_name = tarinfo.name
242
    if file_name.endswith('pyc'):
243
        return None
244
    if file_name.endswith('swp'):
245
        return None
246
    return tarinfo
247
248
249 1
def _make_file_root_owned(tarinfo):
250
    if tarinfo:
251
        tarinfo.uid = 0
252
        tarinfo.gid = 0
253
        # set permission to 775
254
        tarinfo.mode = 509
255
    return tarinfo
256
257
258 1
def get_product_context(product=None):
259
    """
260
    Returns a product YAML context if any product is specified. Hard-coded to
261
    assume a debug build.
262
    """
263
    # Load product's YAML file if present. This will allow us to parse
264
    # tests in the context of the product we're executing under.
265
    product_yaml = dict()
266
    if product:
267
        yaml_path = product_yaml_path(SSG_ROOT, product)
268
        product_yaml = load_product_yaml(yaml_path)
269
270
    # We could run into a DocumentationNotComplete error when loading a
271
    # rule's YAML contents. However, because the test suite isn't executed
272
    # in the context of a particular build (though, ideally it would be
273
    # linked), we may not know exactly whether the top-level rule/profile
274
    # we're testing is actually completed. Thus, forcibly set the required
275
    # property to bypass this error.
276
    product_yaml['cmake_build_type'] = 'Debug'
277
278
    # Set the Jinja processing environment to Test Suite,
279
    # this allows Jinja macros to behave differently in a content build time and in a test time.
280
    product_yaml['SSG_TEST_SUITE_ENV'] = True
281
282
    return product_yaml
283
284
285 1
def load_rule_and_env(rule_dir_path, env_yaml, product=None):
286
    """
287
    Loads a rule and returns the combination of the RuleYAML class and
288
    the corresponding local environment for that rule.
289
    """
290
291
    # First build the path to the rule.yml file
292
    rule_path = get_rule_dir_yaml(rule_dir_path)
293
294
    # Load rule content in our environment. We use this to satisfy
295
    # some implied properties that might be used in the test suite.
296
    # Make sure we normalize to a specific product as well so that
297
    # when we load templated content it is correct.
298
    rule = RuleYAML.from_yaml(rule_path, env_yaml)
299
    rule.normalize(product)
300
301
    # Note that most places would check prodtype, but we don't care
302
    # about that here: if the rule is available to the product, we
303
    # load and parse it anyways as we have no knowledge of the
304
    # top-level profile or rule passed into the test suite.
305
    prodtypes = parse_prodtype(rule.prodtype)
306
307
    # Our local copy of env_yaml needs some properties from rule.yml
308
    # for completeness.
309
    local_env_yaml = dict()
310
    local_env_yaml.update(env_yaml)
311
    local_env_yaml['rule_id'] = rule.id_
312
    local_env_yaml['rule_title'] = rule.title
313
    local_env_yaml['products'] = prodtypes
314
315
    return rule, local_env_yaml
316
317
318 1
def write_rule_templated_tests(dest_path, relative_path, test_content):
319
    output_path = os.path.join(dest_path, relative_path)
320
321
    # If there's a separator in the file name, it means we have nested
322
    # directories to deal with.
323
    if os.path.sep in relative_path:
324
        parts = os.path.split(relative_path)[:-1]
325
        for subdir_index in range(len(parts)):
326
            # We need to expand all directories in the correct order,
327
            # preserving any previous directories (as they're nested).
328
            # Use the star operator to splat array parts into arguments
329
            # to os.path.join(...).
330
            new_directory = os.path.join(dest_path, *parts[:subdir_index])
331
            os.mkdir(new_directory)
332
333
    # Write out the test content to the desired location on disk.
334
    with open(output_path, 'w') as output_fp:
335
        print(test_content, file=output_fp)
336
337
338 1
def write_rule_dir_tests(local_env_yaml, dest_path, dirpath):
339
    # Walk the test directory, writing all tests into the output
340
    # directory, recursively.
341
    tests_dir_path = os.path.join(dirpath, "tests")
342
    tests_dir_path = os.path.abspath(tests_dir_path)
343
344
    # Note that the tests/ directory may not always exist any more. In
345
    # particular, when a rule uses a template, tests may be present there
346
    # but not present in the actual rule directory.
347
    if not os.path.exists(tests_dir_path):
348
        return
349
350
    for dirpath, dirnames, filenames in os.walk(tests_dir_path):
351
        for dirname in dirnames:
352
            # We want to recreate the correct path under the temporary
353
            # directory. Resolve it to a relative path from the tests/
354
            # directory.
355
            dir_path = os.path.relpath(os.path.join(dirpath, dirname), tests_dir_path)
356
            tmp_dir_path = os.path.join(dest_path, dir_path)
357
            os.mkdir(tmp_dir_path)
358
359
        for filename in filenames:
360
            # We want to recreate the correct path under the temporary
361
            # directory. Resolve it to a relative path from the tests/
362
            # directory. Assumption: directories should be created
363
            # prior to recursing into them, so we don't need to handle
364
            # if a file's parent directory doesn't yet exist under the
365
            # destination.
366
            src_test_path = os.path.join(dirpath, filename)
367
            rel_test_path = os.path.relpath(src_test_path, tests_dir_path)
368
            dest_test_path = os.path.join(dest_path, rel_test_path)
369
370
            # Rather than performing an OS-level copy, we need to
371
            # first parse the test with jinja and then write it back
372
            # out to the destination.
373
            parsed_test = process_file_with_macros(src_test_path, local_env_yaml)
374
            with open(dest_test_path, 'w') as output_fp:
375
                print(parsed_test, file=output_fp)
376
377
378 1
def template_rule_tests(product, product_yaml, template_builder, tmpdir, dirpath):
379
    """
380
    For a given rule directory, templates all contained tests into the output
381
    (tmpdir) directory.
382
    """
383
384
    # Load the rule and its environment
385
    rule, local_env_yaml = load_rule_and_env(dirpath, product_yaml, product)
386
387
    # Before we get too far, we wish to search the rule YAML to see if
388
    # it is applicable to the current product. If we have a product
389
    # and the rule isn't applicable for the product, there's no point
390
    # in continuing with the rest of the loading. This should speed up
391
    # the loading of the templated tests. Note that we've already
392
    # parsed the prodtype into local_env_yaml
393
    if product and local_env_yaml['products']:
394
        prodtypes = local_env_yaml['products']
395
        if "all" not in prodtypes and product not in prodtypes:
396
            return
397
398
    # Create the destination directory.
399
    dest_path = os.path.join(tmpdir, rule.id_)
400
    os.mkdir(dest_path)
401
402
    # The priority order is rule-specific tests over templated tests.
403
    # That is, for any test under rule_id/tests with a name matching a
404
    # test under shared/templates/<template_name>/tests/, the former
405
    # will preferred. This means we need to process templates first,
406
    # so they'll be overwritten later if necessary.
407
    if rule.template and rule.template['vars']:
408
        templated_tests = template_builder.get_all_tests(rule.id_, rule.template,
409
                                                         local_env_yaml)
410
411
        for relative_path in templated_tests:
412
            test_content = templated_tests[relative_path]
413
            write_rule_templated_tests(dest_path, relative_path, test_content)
414
415
    write_rule_dir_tests(local_env_yaml, dest_path, dirpath)
416
417
418 1
def template_tests(product=None):
419
    """
420
    Create a temporary directory with test cases parsed via jinja using
421
    product-specific context.
422
    """
423
    # Set up an empty temp directory
424
    tmpdir = tempfile.mkdtemp()
425
426
    # We want to remove the temporary directory on failure, but preserve
427
    # it on success. Wrap in a try/except block and reraise the original
428
    # exception after removing the temporary directory.
429
    try:
430
        # Load the product context we're executing under, if any.
431
        product_yaml = get_product_context(product)
432
433
        # Initialize a mock template_builder.
434
        empty = "/ssgts/empty/placeholder"
435
        template_builder = ssg.templates.Builder(product_yaml, empty,
436
                                                 _SHARED_TEMPLATES, empty,
437
                                                 empty)
438
439
        # Note that we're not exactly copying 1-for-1 the contents of the
440
        # directory structure into the temporary one. Instead we want a
441
        # flattened mapping with all rules in a single top-level directory
442
        # and all tests immediately contained within it. That is:
443
        #
444
        # /group_a/rule_a/tests/something.pass.sh -> /rule_a/something.pass.sh
445
        for dirpath, dirnames, _ in walk_through_benchmark_dirs(product):
446
            # Skip anything that isn't obviously a rule.
447
            if not is_rule_dir(dirpath):
448
                continue
449
450
            template_rule_tests(product, product_yaml, template_builder, tmpdir, dirpath)
451
    except Exception as exp:
452
        shutil.rmtree(tmpdir, ignore_errors=True)
453
        raise exp
454
455
    return tmpdir
456
457
458 1
def create_tarball(product):
459
    """Create a tarball which contains all test scenarios for every rule.
460
    Tarball contains directories with the test scenarios. The name of the
461
    directories is the same as short rule ID. There is no tree structure.
462
    """
463
    templated_tests = template_tests(product=product)
464
465
    try:
466
        with tempfile.NamedTemporaryFile(
467
                "wb", suffix=".tar.gz", delete=False) as fp:
468
            with tarfile.TarFile.open(fileobj=fp, mode="w") as tarball:
469
                tarball.add(_SHARED_DIR, arcname="shared", filter=_make_file_root_owned)
470
                for rule_id in os.listdir(templated_tests):
471
                    # When a top-level directory exists under the temporary
472
                    # templated tests directory, we've already validated that
473
                    # it is a valid rule directory. Thus we can simply add it
474
                    # to the tarball.
475
                    absolute_dir = os.path.join(templated_tests, rule_id)
476
                    if not os.path.isdir(absolute_dir):
477
                        continue
478
479
                    tarball.add(
480
                        absolute_dir, arcname=rule_id,
481
                        filter=lambda tinfo: _exclude_garbage(_make_file_root_owned(tinfo))
482
                    )
483
484
            # Since we've added the templated contents into the tarball, we
485
            # can now delete the tree.
486
            shutil.rmtree(templated_tests, ignore_errors=True)
487
            return fp.name
488
    except Exception as exp:
489
        shutil.rmtree(templated_tests, ignore_errors=True)
490
        raise exp
491
492
493 1
def send_scripts(test_env):
494
    remote_dir = REMOTE_TEST_SCENARIOS_DIRECTORY
495
    archive_file = create_tarball(test_env.product)
496
    archive_file_basename = os.path.basename(archive_file)
497
    remote_archive_file = os.path.join(remote_dir, archive_file_basename)
498
    logging.debug("Uploading scripts.")
499
    log_file_name = os.path.join(LogHelper.LOG_DIR, "env-preparation.log")
500
501
    with open(log_file_name, 'a') as log_file:
502
        print("Setting up test setup scripts", file=log_file)
503
504
        test_env.execute_ssh_command(
505
            "mkdir -p {remote_dir}".format(remote_dir=remote_dir),
506
            log_file, "Cannot create directory {0}".format(remote_dir))
507
        test_env.scp_upload_file(
508
            archive_file, remote_dir,
509
            log_file, "Cannot copy archive {0} to the target machine's directory {1}"
510
            .format(archive_file, remote_dir))
511
        test_env.execute_ssh_command(
512
            "tar xf {remote_archive_file} -C {remote_dir}"
513
            .format(remote_dir=remote_dir, remote_archive_file=remote_archive_file),
514
            log_file, "Cannot extract data tarball {0}".format(remote_archive_file))
515
    os.unlink(archive_file)
516
    return remote_dir
517
518
519 1
def iterate_over_rules(product=None):
520
    """Iterate over rule directories which have test scenarios".
521
522
    Returns:
523
        Named tuple Rule having these fields:
524
            directory -- absolute path to the rule "tests" subdirectory
525
                         containing the test scenarios in Bash
526
            id -- full rule id as it is present in datastream
527
            short_id -- short rule ID, the same as basename of the directory
528
                        containing the test scenarios in Bash
529
            files -- list of executable .sh files in the uploaded tarball
530
    """
531
532
    # Here we need to perform some magic to handle parsing the rule (from a
533
    # product perspective) and loading any templated tests. In particular,
534
    # identifying which tests to potentially run involves invoking the
535
    # templating engine.
536
    #
537
    # Begin by loading context about our execution environment, if any.
538
    product_yaml = get_product_context(product)
539
540
    # Initialize a mock template_builder.
541
    empty = "/ssgts/empty/placeholder"
542
    template_builder = ssg.templates.Builder(product_yaml, empty,
543
                                             _SHARED_TEMPLATES, empty, empty)
544
545
    for dirpath, dirnames, filenames in walk_through_benchmark_dirs(product):
546
        if is_rule_dir(dirpath):
547
            short_rule_id = os.path.basename(dirpath)
548
549
            # Load the rule itself to check for a template.
550
            rule, local_env_yaml = load_rule_and_env(dirpath, product_yaml, product)
551
            template_name = None
552
553
            # Before we get too far, we wish to search the rule YAML to see if
554
            # it is applicable to the current product. If we have a product
555
            # and the rule isn't applicable for the product, there's no point
556
            # in continuing with the rest of the loading. This should speed up
557
            # the loading of the templated tests. Note that we've already
558
            # parsed the prodtype into local_env_yaml
559
            if product and local_env_yaml['products']:
560
                prodtypes = local_env_yaml['products']
561
                if "all" not in prodtypes and product not in prodtypes:
562
                    continue
563
564
            # All tests is a mapping from path (in the tarball) to contents
565
            # of the test case. This is necessary because later code (which
566
            # attempts to parse headers from the test case) don't have easy
567
            # access to templated content. By reading it and returning it
568
            # here, we can save later code from having to understand the
569
            # templating system.
570
            all_tests = dict()
571
572
            # Start by checking for templating tests and provision them if
573
            # present.
574
            if rule.template and rule.template['vars']:
575
                templated_tests = template_builder.get_all_tests(
576
                    rule.id_, rule.template, local_env_yaml)
577
                all_tests.update(templated_tests)
578
                template_name = rule.template['name']
579
580
            # Add additional tests from the local rule directory. Note that,
581
            # like the behavior in template_tests, this will overwrite any
582
            # templated tests with the same file name.
583
            tests_dir = os.path.join(dirpath, "tests")
584
            if os.path.exists(tests_dir):
585
                tests_dir_files = os.listdir(tests_dir)
586
                for test_case in tests_dir_files:
587
                    test_path = os.path.join(tests_dir, test_case)
588
                    if os.path.isdir(test_path):
589
                        continue
590
591
                    all_tests[test_case] = process_file_with_macros(test_path, local_env_yaml)
592
593
            # Filter out everything except the shell test scenarios.
594
            # Other files in rule directories are editor swap files
595
            # or other content than a test case.
596
            allowed_scripts = filter(lambda x: x.endswith(".sh"), all_tests)
597
            content_mapping = {x: all_tests[x] for x in allowed_scripts}
598
599
            # Skip any rules that lack any content. This ensures that if we
600
            # end up with rules with a template lacking tests and without any
601
            # rule directory tests, we don't include the empty rule here.
602
            if not content_mapping:
603
                continue
604
605
            full_rule_id = OSCAP_RULE + short_rule_id
606
            result = Rule(
607
                directory=tests_dir, id=full_rule_id, short_id=short_rule_id,
608
                files=content_mapping, template=template_name)
609
            yield result
610
611
612 1
def get_cpe_of_tested_os(test_env, log_file):
613
    os_release_file = "/etc/os-release"
614
    cpe_line = test_env.execute_ssh_command(
615
        "grep CPE_NAME {os_release_file}".format(os_release_file=os_release_file),
616
        log_file)
617
    # We are parsing an assignment that is possibly quoted
618
    cpe = re.match(r'''CPE_NAME=(["']?)(.*)\1''', cpe_line)
619
    if cpe and cpe.groups()[1]:
620
        return cpe.groups()[1]
621
    msg = ["Unable to get a CPE of the system running tests"]
622
    if cpe_line:
623
        msg.append(
624
            "Retreived a CPE line that we couldn't parse: {cpe_line}"
625
            .format(cpe_line=cpe_line))
626
    else:
627
        msg.append(
628
            "Couldn't get CPE entry from '{os_release_file}'"
629
            .format(os_release_file=os_release_file))
630
    raise RuntimeError("\n".join(msg))
631
632
633 1
INSTALL_COMMANDS = dict(
634
    fedora=("dnf", "install", "-y"),
635
    rhel7=("yum", "install", "-y"),
636
    rhel8=("yum", "install", "-y"),
637
    rhel9=("yum", "install", "-y"),
638
    ubuntu=("DEBIAN_FRONTEND=noninteractive", "apt", "install", "-y"),
639
)
640
641
642 1
def install_packages(test_env, packages):
643
    log_file_name = os.path.join(LogHelper.LOG_DIR, "env-preparation.log")
644
645
    with open(log_file_name, "a") as log_file:
646
        platform_cpe = get_cpe_of_tested_os(test_env, log_file)
647
    platform = cpes_to_platform([platform_cpe])
648
649
    command_str = " ".join(INSTALL_COMMANDS[platform] + tuple(packages))
650
651
    with open(log_file_name, 'a') as log_file:
652
        print("Installing packages", file=log_file)
653
        log_file.flush()
654
        test_env.execute_ssh_command(
655
            command_str, log_file,
656
            "Couldn't install required packages {packages}".format(packages=packages))
657
658
659 1
def cpes_to_platform(cpes):
660
    for cpe in cpes:
661
        if "fedora" in cpe:
662
            return "fedora"
663
        if "redhat:enterprise_linux" in cpe:
664
            match = re.search(r":enterprise_linux:([^:]+):", cpe)
665
            if match:
666
                major_version = match.groups()[0].split(".")[0]
667
                return "rhel" + major_version
668
        if "ubuntu" in cpe:
669
            return "ubuntu"
670
    msg = "Unable to deduce a platform from these CPEs: {cpes}".format(cpes=cpes)
671
    raise ValueError(msg)
672