tests.ssg_test_suite.common   F
last analyzed

Complexity

Total Complexity 103

Size/Duplication

Total Lines 650
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 454
dl 0
loc 650
rs 2
c 0
b 0
f 0
wmc 103

29 Functions

Rating   Name   Duplication   Size   Complexity  
A _run_cmd() 0 11 3
A _match_rhel_version() 0 10 4
B fetch_all_templated_tests_paths() 0 35 6
A _make_file_root_owned() 0 7 2
A run_cmd_local() 0 5 1
B cpe_to_platform() 0 13 6
A run_with_stdout_logging() 0 12 3
A get_cpe_of_tested_os() 0 19 4
A fetch_templated_tests_paths() 0 12 3
A load_templated_tests() 0 8 2
A select_templated_tests() 0 23 3
A get_test_dir_config() 0 6 2
A _exclude_garbage() 0 7 3
A load_rule_and_env() 0 31 1
A get_product_context() 0 27 2
A retry_with_stdout_logging() 0 9 3
A get_prefixed_name() 0 2 1
A install_packages() 0 15 3
A walk_through_benchmark_dirs() 0 12 4
B create_tarball() 0 41 8
A fetch_local_tests_paths() 0 15 5
B _get_platform_cpes() 0 35 5
A load_test() 0 15 2
A load_local_tests() 0 7 2
A file_known_as_useless() 0 2 1
A send_scripts() 0 24 2
A cpes_to_platform() 0 7 3
A write_rule_test_content_to_dir() 0 9 5
A matches_platform() 0 7 3

7 Methods

Rating   Name   Duplication   Size   Complexity  
A RuleResult.relative_conditions_to() 0 5 2
A RuleResult.__eq__() 0 3 1
A RuleResult.load_from_dict() 0 15 2
A RuleResult.__lt__() 0 2 1
A RuleResult.__init__() 0 10 2
A RuleResult.record_stage_result() 0 6 1
A RuleResult.save_to_dict() 0 16 2

How to fix   Complexity   

Complexity

Complex classes like tests.ssg_test_suite.common often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from __future__ import print_function
2
3
import functools
4
import logging
5
import os
6
import re
7
import shutil
8
import subprocess
9
import tarfile
10
import tempfile
11
import time
12
from collections import namedtuple
13
14
import ssg.yaml
15
from ssg.build_cpe import ProductCPEs
16
from ssg.build_yaml import Rule as RuleYAML
17
from ssg.constants import MULTI_PLATFORM_MAPPING
18
from ssg.constants import FULL_NAME_TO_PRODUCT_MAPPING
19
from ssg.constants import OSCAP_RULE
20
from ssg.jinja import process_file_with_macros
21
from ssg.products import product_yaml_path, load_product_yaml
22
from ssg.rules import get_rule_dir_yaml, is_rule_dir
23
from ssg.rule_yaml import parse_prodtype
24
from ssg.utils import mkdir_p
25
from ssg_test_suite.log import LogHelper
26
27
import ssg.templates
28
29
30
Scenario_run = namedtuple(
31
    "Scenario_run",
32
    ("rule_id", "script"))
33
Scenario_conditions = namedtuple(
34
    "Scenario_conditions",
35
    ("backend", "scanning_mode", "remediated_by", "datastream"))
36
37
SSG_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
38
39
_BENCHMARK_DIRS = [
40
        os.path.abspath(os.path.join(SSG_ROOT, 'linux_os', 'guide')),
41
        os.path.abspath(os.path.join(SSG_ROOT, 'applications')),
42
        ]
43
44
_SHARED_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '../shared'))
45
46
_SHARED_TEMPLATES = os.path.abspath(os.path.join(SSG_ROOT, 'shared/templates'))
47
48
TEST_SUITE_NAME="ssgts"
49
TEST_SUITE_PREFIX = "_{}".format(TEST_SUITE_NAME)
50
REMOTE_USER = "root"
51
REMOTE_USER_HOME_DIRECTORY = "/root"
52
REMOTE_TEST_SCENARIOS_DIRECTORY = os.path.join(REMOTE_USER_HOME_DIRECTORY, TEST_SUITE_NAME)
53
54
try:
55
    SSH_ADDITIONAL_OPTS = tuple(os.environ.get('SSH_ADDITIONAL_OPTIONS').split())
56
except AttributeError:
57
    # If SSH_ADDITIONAL_OPTIONS is not defined set it to empty tuple.
58
    SSH_ADDITIONAL_OPTS = tuple()
59
60
SSH_ADDITIONAL_OPTS = (
61
    "-o", "StrictHostKeyChecking=no",
62
    "-o", "UserKnownHostsFile=/dev/null",
63
) + SSH_ADDITIONAL_OPTS
64
65
TESTS_CONFIG_NAME = "test_config.yml"
66
67
68
def walk_through_benchmark_dirs(product=None):
69
    directories = _BENCHMARK_DIRS
70
    if product is not None:
71
        yaml_path = product_yaml_path(SSG_ROOT, product)
72
        product_base = os.path.dirname(yaml_path)
73
        product_yaml = load_product_yaml(yaml_path)
74
        benchmark_root = os.path.join(product_base, product_yaml['benchmark_root'])
75
        directories = [os.path.abspath(benchmark_root)]
76
77
    for dirname in directories:
78
        for dirpath, dirnames, filenames in os.walk(dirname):
79
            yield dirpath, dirnames, filenames
80
81
82
class Stage(object):
83
    NONE = 0
84
    PREPARATION = 1
85
    INITIAL_SCAN = 2
86
    REMEDIATION = 3
87
    FINAL_SCAN = 4
88
89
90
@functools.total_ordering
91
class RuleResult(object):
92
    STAGE_STRINGS = {
93
        "preparation",
94
        "initial_scan",
95
        "remediation",
96
        "final_scan",
97
    }
98
99
    """
100
    Result of a test suite testing rule under a scenario.
101
102
    Supports ordering by success - the most successful run orders first.
103
    """
104
    def __init__(self, result_dict=None):
105
        self.scenario = Scenario_run("", "")
106
        self.conditions = Scenario_conditions("", "", "", "")
107
        self.when = ""
108
        self.passed_stages = dict()
109
        self.passed_stages_count = 0
110
        self.success = False
111
112
        if result_dict:
113
            self.load_from_dict(result_dict)
114
115
    def load_from_dict(self, data):
116
        self.scenario = Scenario_run(data["rule_id"], data["scenario_script"])
117
        self.conditions = Scenario_conditions(
118
            data["backend"], data["scanning_mode"],
119
            data["remediated_by"], data["datastream"])
120
        self.when = data["run_timestamp"]
121
122
        self.passed_stages = {key: data[key] for key in self.STAGE_STRINGS if key in data}
123
        self.passed_stages_count = sum(self.passed_stages.values())
124
125
        self.success = data.get("final_scan", False)
126
        if not self.success:
127
            self.success = (
128
                "remediation" not in data
129
                and data.get("initial_scan", False))
130
131
    def save_to_dict(self):
132
        data = dict()
133
        data["rule_id"] = self.scenario.rule_id
134
        data["scenario_script"] = self.scenario.script
135
136
        data["backend"] = self.conditions.backend
137
        data["scanning_mode"] = self.conditions.scanning_mode
138
        data["remediated_by"] = self.conditions.remediated_by
139
        data["datastream"] = self.conditions.datastream
140
141
        data["run_timestamp"] = self.when
142
143
        for stage_str, result in self.passed_stages.items():
144
            data[stage_str] = result
145
146
        return data
147
148
    def record_stage_result(self, stage, successful):
149
        assert stage in self.STAGE_STRINGS, (
150
            "Stage name {name} is invalid, choose one from {choices}"
151
            .format(name=stage, choices=", ".join(self.STAGE_STRINGS))
152
        )
153
        self.passed_stages[stage] = successful
154
155
    def relative_conditions_to(self, other):
156
        if self.conditions == other.conditions:
157
            return self.when, other.when
158
        else:
159
            return tuple(self.conditions), tuple(other.conditions)
160
161
    def __eq__(self, other):
162
        return (self.success == other.success
163
                and tuple(self.passed_stages) == tuple(other.passed_stages))
164
165
    def __lt__(self, other):
166
        return self.passed_stages_count > other.passed_stages_count
167
168
169
def run_cmd_local(command, verbose_path, env=None):
170
    command_string = ' '.join(command)
171
    logging.debug('Running {}'.format(command_string))
172
    returncode, output = _run_cmd(command, verbose_path, env)
173
    return returncode, output
174
175
176
def _run_cmd(command_list, verbose_path, env=None):
177
    returncode = 0
178
    output = b""
179
    try:
180
        with open(verbose_path, 'w') as verbose_file:
181
            output = subprocess.check_output(
182
                command_list, stderr=verbose_file, env=env)
183
    except subprocess.CalledProcessError as e:
184
        returncode = e.returncode
185
        output = e.output
186
    return returncode, output.decode('utf-8')
187
188
189
def _get_platform_cpes(platform):
190
    ssg_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
191
    if platform.startswith("multi_platform_"):
192
        try:
193
            products = MULTI_PLATFORM_MAPPING[platform]
194
        except KeyError:
195
            logging.error(
196
                "Unknown multi_platform specifier: %s is not from %s"
197
                % (platform, ", ".join(MULTI_PLATFORM_MAPPING.keys())))
198
            raise ValueError
199
        platform_cpes = set()
200
        for p in products:
201
            product_yaml_path = os.path.join(ssg_root, "products", p, "product.yml")
202
            product_yaml = load_product_yaml(product_yaml_path)
203
            p_cpes = ProductCPEs()
204
            p_cpes.load_product_cpes(product_yaml)
205
            p_cpes.load_content_cpes(product_yaml)
206
            platform_cpes |= set(p_cpes.get_product_cpe_names())
207
        return platform_cpes
208
    else:
209
        # scenario platform is specified by a full product name
210
        try:
211
            product = FULL_NAME_TO_PRODUCT_MAPPING[platform]
212
        except KeyError:
213
            logging.error(
214
                "Unknown product name: %s is not from %s"
215
                % (platform, ", ".join(FULL_NAME_TO_PRODUCT_MAPPING.keys())))
216
            raise ValueError
217
        product_yaml_path = os.path.join(ssg_root, "products", product, "product.yml")
218
        product_yaml = load_product_yaml(product_yaml_path)
219
        product_cpes = ProductCPEs()
220
        product_cpes.load_product_cpes(product_yaml)
221
        product_cpes.load_content_cpes(product_yaml)
222
        platform_cpes = set(product_cpes.get_product_cpe_names())
223
        return platform_cpes
224
225
226
def matches_platform(scenario_platforms, benchmark_cpes):
227
    if "multi_platform_all" in scenario_platforms:
228
        return True
229
    scenario_cpes = set()
230
    for p in scenario_platforms:
231
        scenario_cpes |= _get_platform_cpes(p)
232
    return len(scenario_cpes & benchmark_cpes) > 0
233
234
235
def run_with_stdout_logging(command, args, log_file):
236
    log_file.write("{0} {1}\n".format(command, " ".join(args)))
237
    result = subprocess.run(
238
            (command,) + args, encoding="utf-8", stdout=subprocess.PIPE,
239
            stderr=subprocess.PIPE, check=False)
240
    if result.stdout:
241
        log_file.write("STDOUT: ")
242
        log_file.write(result.stdout)
243
    if result.stderr:
244
        log_file.write("STDERR: ")
245
        log_file.write(result.stderr)
246
    return result
247
248
249
def _exclude_garbage(tarinfo):
250
    file_name = tarinfo.name
251
    if file_name.endswith('pyc'):
252
        return None
253
    if file_name.endswith('swp'):
254
        return None
255
    return tarinfo
256
257
258
def _make_file_root_owned(tarinfo):
259
    if tarinfo:
260
        tarinfo.uid = 0
261
        tarinfo.gid = 0
262
        # set permission to 775
263
        tarinfo.mode = 509
264
    return tarinfo
265
266
267
def get_product_context(product_id=None):
268
    """
269
    Returns a product YAML context if any product is specified. Hard-coded to
270
    assume a debug build.
271
    """
272
    # Load product's YAML file if present. This will allow us to parse
273
    # tests in the context of the product we're executing under.
274
    product_yaml = dict()
275
    if product_id:
276
        product = load_product_yaml(product_yaml_path(SSG_ROOT, product_id))
277
        product_properties_path = os.path.join(SSG_ROOT, "product_properties")
278
        product.read_properties_from_directory(product_properties_path)
279
        product_yaml.update(product)
280
281
    # We could run into a DocumentationNotComplete error when loading a
282
    # rule's YAML contents. However, because the test suite isn't executed
283
    # in the context of a particular build (though, ideally it would be
284
    # linked), we may not know exactly whether the top-level rule/profile
285
    # we're testing is actually completed. Thus, forcibly set the required
286
    # property to bypass this error.
287
    product_yaml['cmake_build_type'] = 'Debug'
288
289
    # Set the Jinja processing environment to Test Suite,
290
    # this allows Jinja macros to behave differently in a content build time and in a test time.
291
    product_yaml['SSG_TEST_SUITE_ENV'] = True
292
293
    return product_yaml
294
295
296
def load_rule_and_env(rule_dir_path, env_yaml, product=None):
297
    """
298
    Loads a rule and returns the combination of the RuleYAML class and
299
    the corresponding local environment for that rule.
300
    """
301
302
    # First build the path to the rule.yml file
303
    rule_path = get_rule_dir_yaml(rule_dir_path)
304
305
    # Load rule content in our environment. We use this to satisfy
306
    # some implied properties that might be used in the test suite.
307
    # Make sure we normalize to a specific product as well so that
308
    # when we load templated content it is correct.
309
    rule = RuleYAML.from_yaml(rule_path, env_yaml)
310
    rule.normalize(product)
311
312
    # Note that most places would check prodtype, but we don't care
313
    # about that here: if the rule is available to the product, we
314
    # load and parse it anyways as we have no knowledge of the
315
    # top-level profile or rule passed into the test suite.
316
    prodtypes = parse_prodtype(rule.prodtype)
317
318
    # Our local copy of env_yaml needs some properties from rule.yml
319
    # for completeness.
320
    local_env_yaml = dict()
321
    local_env_yaml.update(env_yaml)
322
    local_env_yaml['rule_id'] = rule.id_
323
    local_env_yaml['rule_title'] = rule.title
324
    local_env_yaml['products'] = prodtypes
325
326
    return rule, local_env_yaml
327
328
329
def write_rule_test_content_to_dir(rule_dir, test_content):
330
    for scenario in test_content.scenarios:
331
        scenario_file_path = os.path.join(rule_dir, scenario.script)
332
        with open(scenario_file_path, "w") as f:
333
            f.write(scenario.contents)
334
    for file_name, file_content in test_content.other_content.items():
335
        file_path = os.path.join(rule_dir, file_name)
336
        with open(file_path, "w") as f:
337
            f.write(file_content)
338
339
340
def create_tarball(test_content_by_rule_id):
341
    """
342
    Create a tarball which contains all test scenarios and additional
343
    content for every rule that is selected to be tested. The tarball contains
344
    directories with the test scenarios. The name of the directories is the
345
    same as short rule ID. There is no tree structure.
346
    """
347
348
    tmpdir = tempfile.mkdtemp()
349
    for rule_id, test_content in test_content_by_rule_id.items():
350
        short_rule_id = rule_id.replace(OSCAP_RULE, "")
351
        rule_dir = os.path.join(tmpdir, short_rule_id)
352
        mkdir_p(rule_dir)
353
        write_rule_test_content_to_dir(rule_dir, test_content)
354
355
    try:
356
        with tempfile.NamedTemporaryFile(
357
                "wb", suffix=".tar.gz", delete=False) as fp:
358
            with tarfile.TarFile.open(fileobj=fp, mode="w") as tarball:
359
                tarball.add(_SHARED_DIR, arcname="shared", filter=_make_file_root_owned)
360
                for rule_id in os.listdir(tmpdir):
361
                    # When a top-level directory exists under the temporary
362
                    # templated tests directory, we've already validated that
363
                    # it is a valid rule directory. Thus we can simply add it
364
                    # to the tarball.
365
                    absolute_dir = os.path.join(tmpdir, rule_id)
366
                    if not os.path.isdir(absolute_dir):
367
                        continue
368
369
                    tarball.add(
370
                        absolute_dir, arcname=rule_id,
371
                        filter=lambda tinfo: _exclude_garbage(_make_file_root_owned(tinfo))
372
                    )
373
374
            # Since we've added the templated contents into the tarball, we
375
            # can now delete the tree.
376
            shutil.rmtree(tmpdir, ignore_errors=True)
377
            return fp.name
378
    except Exception as exp:
379
        shutil.rmtree(tmpdir, ignore_errors=True)
380
        raise exp
381
382
383
def send_scripts(test_env, test_content_by_rule_id):
384
    remote_dir = REMOTE_TEST_SCENARIOS_DIRECTORY
385
    archive_file = create_tarball(test_content_by_rule_id)
386
    archive_file_basename = os.path.basename(archive_file)
387
    remote_archive_file = os.path.join(remote_dir, archive_file_basename)
388
    logging.debug("Uploading scripts.")
389
    log_file_name = os.path.join(LogHelper.LOG_DIR, "env-preparation.log")
390
391
    with open(log_file_name, 'a') as log_file:
392
        print("Setting up test setup scripts", file=log_file)
393
394
        test_env.execute_ssh_command(
395
            "mkdir -p {remote_dir}".format(remote_dir=remote_dir),
396
            log_file, "Cannot create directory {0}".format(remote_dir))
397
        test_env.scp_upload_file(
398
            archive_file, remote_dir,
399
            log_file, "Cannot copy archive {0} to the target machine's directory {1}"
400
            .format(archive_file, remote_dir))
401
        test_env.execute_ssh_command(
402
            "tar xf {remote_archive_file} -C {remote_dir}"
403
            .format(remote_dir=remote_dir, remote_archive_file=remote_archive_file),
404
            log_file, "Cannot extract data tarball {0}".format(remote_archive_file))
405
    os.unlink(archive_file)
406
    return remote_dir
407
408
409
def get_prefixed_name(state_name):
410
    return "{}_{}".format(TEST_SUITE_PREFIX, state_name)
411
412
413
def get_test_dir_config(test_dir, product_yaml):
414
    test_config = dict()
415
    test_config_filename = os.path.join(test_dir, TESTS_CONFIG_NAME)
416
    if os.path.exists(test_config_filename):
417
        test_config = ssg.yaml.open_and_expand(test_config_filename, product_yaml)
418
    return test_config
419
420
421
def select_templated_tests(test_dir_config, available_scenarios_basenames):
422
    deny_scenarios = set(test_dir_config.get("deny_templated_scenarios", []))
423
    available_scenarios_basenames = {
424
        test_name for test_name in available_scenarios_basenames
425
        if test_name not in deny_scenarios
426
    }
427
428
    allow_scenarios = set(test_dir_config.get("allow_templated_scenarios", []))
429
    if allow_scenarios:
430
        available_scenarios_basenames = {
431
            test_name for test_name in available_scenarios_basenames
432
            if test_name in allow_scenarios
433
        }
434
435
    allowed_and_denied = deny_scenarios.intersection(allow_scenarios)
436
    if allowed_and_denied:
437
        msg = (
438
            "Test directory configuration contain inconsistencies: {allowed_and_denied} "
439
            "scenarios are both allowed and denied."
440
            .format(allowed_and_denied=allowed_and_denied)
441
        )
442
        raise ValueError(msg)
443
    return available_scenarios_basenames
444
445
446
def fetch_templated_tests_paths(
447
        rule_namedtuple, product_yaml):
448
    rule = rule_namedtuple.rule
449
    if not rule.template or not rule.template['vars']:
450
        return dict()
451
    tests_paths = fetch_all_templated_tests_paths(rule.template)
452
    test_config = get_test_dir_config(rule_namedtuple.directory, product_yaml)
453
    allowed_tests_paths = select_templated_tests(
454
        test_config, tests_paths.keys())
455
    templated_test_scenarios = {
456
        name: tests_paths[name] for name in allowed_tests_paths}
457
    return templated_test_scenarios
458
459
460
def fetch_all_templated_tests_paths(rule_template):
461
    """
462
    Builds a dictionary of a test case relative path -> test case absolute path mapping.
463
464
    Here, we want to know what the relative path on disk (under the tests/
465
    subdirectory) is (such as "installed.pass.sh"), along with the actual
466
    absolute path.
467
    """
468
    template_name = rule_template['name']
469
470
    base_dir = os.path.abspath(os.path.join(_SHARED_TEMPLATES, template_name, "tests"))
471
    results = dict()
472
473
    # If no test cases exist, return an empty dictionary.
474
    if not os.path.exists(base_dir):
475
        return results
476
477
    # Walk files; note that we don't need to do anything about directories
478
    # as only files are recorded in the mapping; directories can be
479
    # inferred from the path.
480
    for dirpath, _, filenames in os.walk(base_dir):
481
        if not filenames:
482
            continue
483
484
        for filename in filenames:
485
            if filename.endswith(".swp"):
486
                continue
487
488
            # Relative path to the file becomes our results key.
489
            absolute_path = os.path.abspath(os.path.join(dirpath, filename))
490
            relative_path = os.path.relpath(absolute_path, base_dir)
491
492
            # Save the results under the relative path.
493
            results[relative_path] = absolute_path
494
    return results
495
496
497
def load_templated_tests(
498
        templated_tests_paths, template, local_env_yaml):
499
    templated_tests = dict()
500
    for path in templated_tests_paths:
501
        test = load_test(path, template, local_env_yaml)
502
        basename = os.path.basename(path)
503
        templated_tests[basename] = test
504
    return templated_tests
505
506
507
def load_test(absolute_path, rule_template, local_env_yaml):
508
    template_name = rule_template['name']
509
    template_vars = rule_template['vars']
510
    # Load template parameters and apply it to the test case.
511
    maybe_template = ssg.templates.Template.load_template(_SHARED_TEMPLATES, template_name)
512
    if maybe_template is not None:
513
        template_parameters = maybe_template.preprocess(template_vars, "tests")
514
    else:
515
        raise ValueError("Rule uses template '{}' "
516
                         "which doesn't exist in '{}".format(template_name, _SHARED_TEMPLATES))
517
518
    jinja_dict = ssg.utils.merge_dicts(local_env_yaml, template_parameters)
519
    filled_template = ssg.jinja.process_file_with_macros(
520
        absolute_path, jinja_dict)
521
    return filled_template
522
523
524
def file_known_as_useless(file_name):
525
    return file_name.endswith(".swp")
526
527
528
def fetch_local_tests_paths(tests_dir):
529
    if not os.path.exists(tests_dir):
530
        return dict()
531
    all_tests = dict()
532
    tests_dir_files = os.listdir(tests_dir)
533
    for test_case in tests_dir_files:
534
        # Skip vim swap files, they are not relevant and cause Jinja
535
        # expansion tracebacks
536
        if file_known_as_useless(test_case):
537
            continue
538
        test_path = os.path.join(tests_dir, test_case)
539
        if os.path.isdir(test_path):
540
            continue
541
        all_tests[test_case] = test_path
542
    return all_tests
543
544
545
def load_local_tests(local_tests_paths, local_env_yaml):
546
    local_tests = dict()
547
    for path in local_tests_paths:
548
        test = process_file_with_macros(path, local_env_yaml)
549
        basename = os.path.basename(path)
550
        local_tests[basename] = test
551
    return local_tests
552
553
554
def get_cpe_of_tested_os(test_env, log_file):
555
    os_release_file = "/etc/os-release"
556
    cpe_line = test_env.execute_ssh_command(
557
        "grep CPE_NAME {os_release_file}".format(os_release_file=os_release_file),
558
        log_file)
559
    # We are parsing an assignment that is possibly quoted
560
    cpe = re.match(r'''CPE_NAME=(["']?)(.*)\1''', cpe_line)
561
    if cpe and cpe.groups()[1]:
562
        return cpe.groups()[1]
563
    msg = ["Unable to get a CPE of the system running tests"]
564
    if cpe_line:
565
        msg.append(
566
            "Retreived a CPE line that we couldn't parse: {cpe_line}"
567
            .format(cpe_line=cpe_line))
568
    else:
569
        msg.append(
570
            "Couldn't get CPE entry from '{os_release_file}'"
571
            .format(os_release_file=os_release_file))
572
    raise RuntimeError("\n".join(msg))
573
574
575
INSTALL_COMMANDS = dict(
576
    fedora=("dnf", "install", "-y"),
577
    ol7=("yum", "install", "-y"),
578
    ol8=("yum", "install", "-y"),
579
    ol9=("yum", "install", "-y"),
580
    rhel7=("yum", "install", "-y"),
581
    rhel8=("yum", "install", "-y"),
582
    rhel9=("yum", "install", "-y"),
583
    sles=("zypper", "install", "-y"),
584
    ubuntu=("DEBIAN_FRONTEND=noninteractive", "apt", "install", "-y"),
585
)
586
587
588
def install_packages(test_env, packages):
589
    log_file_name = os.path.join(LogHelper.LOG_DIR, "env-preparation.log")
590
591
    with open(log_file_name, "a") as log_file:
592
        platform_cpe = get_cpe_of_tested_os(test_env, log_file)
593
    platform = cpes_to_platform([platform_cpe])
594
595
    command_str = " ".join(INSTALL_COMMANDS[platform] + tuple(packages))
596
597
    with open(log_file_name, 'a') as log_file:
598
        print("Installing packages", file=log_file)
599
        log_file.flush()
600
        test_env.execute_ssh_command(
601
            command_str, log_file,
602
            "Couldn't install required packages: {packages}".format(packages=",".join(packages)))
603
604
605
def _match_rhel_version(cpe):
606
    rhel_cpe = {
607
        "redhat:enterprise_linux": r":enterprise_linux:([^:]+):",
608
        "centos:centos": r"centos:centos:([0-9]+)"}
609
    for cpe_item in rhel_cpe.keys():
610
        if cpe_item in cpe:
611
            match = re.search(rhel_cpe.get(cpe_item), cpe)
612
            if match:
613
                major_version = match.groups()[0].split(".")[0]
614
                return "rhel" + major_version
615
616
617
def cpe_to_platform(cpe):
618
    trivials = ["fedora", "sles", "ubuntu"]
619
    for platform in trivials:
620
        if platform in cpe:
621
            return platform
622
    rhel_version = _match_rhel_version(cpe)
623
    if rhel_version is not None:
624
        return rhel_version
625
    if "oracle:linux" in cpe:
626
        match = re.search(r":linux:([^:]+):", cpe)
627
        if match:
628
            major_version = match.groups()[0]
629
            return "ol" + major_version
630
631
632
def cpes_to_platform(cpes):
633
    for cpe in cpes:
634
        platform = cpe_to_platform(cpe)
635
        if platform is not None:
636
            return platform
637
    msg = "Unable to deduce a platform from these CPEs: {cpes}".format(cpes=cpes)
638
    raise ValueError(msg)
639
640
641
def retry_with_stdout_logging(command, args, log_file, max_attempts=5):
642
    attempt = 0
643
    while attempt < max_attempts:
644
        result = run_with_stdout_logging(command, args, log_file)
645
        if result.returncode == 0:
646
            return result
647
        attempt += 1
648
        time.sleep(1)
649
    return result
0 ignored issues
show
introduced by
The variable result does not seem to be defined in case the while loop on line 643 is not entered. Are you sure this can never be the case?
Loading history...
650